草庐IT

minio分片上传

__Loren 2024-04-26 原文

oss文件服务

一、前言

​ Minio是一个对象存储服务OSS(Object Storage Service)。是⼀种海量、安全、低成本、⾼可靠的云存储服务。本身的应用的并不复杂。

但是Minio的APi在对于大于5m的文件,自动采用了分片上传,它的分片上传我们无法得知上传的分片后的序号,也就是说,没上传一个分片,我们都需要自己去记录已上传分片的序号。这将导致一个文件一个文件分片5个,那么同样还需要调用5次后端接口去记录这5个分片的信息。这个无疑大大浪费了性能,且无法做到并发上传。

​ 因此基于Minio的javaAPI,我们采用另一种方案去替代。

二、初步流程:

  1. 前端服务进行大文件分片处理,将分片信息传递给文件服务oss。
  2. oss通过redis和mysql检查分片信息是否已存在,若存在直接返回数据。
  3. 若不存在oss生成上传链接以及uploadID, 然后记录并返回所有分片的上传链接及uploadId。
  4. 前端服务直接请求Minio 服务器,并发上传分片。
  5. 所有分片上传完成后,使用uploadId 调用文件服务进行文件合并,oss同时更新上传状态。

三、具体实现

1 相关配置准备

1.1 数据库表设计

表名:file_record

字段名类型注释
idbigint主键id
file_urltext上传分片的链接
file_namevarchar文件名
md5varcharMD5
upload_idvarchar上传id
is_uploadedint是否已上传
total_chunksint分片总块数
sizebigint文件大小(K)
completed_partsint已完成片数
created_atdatetime生成时间
updated_atdatetime更新时间
deleted_atdatetime删除时间(软删除)

1.2 minio集成配置

linux上部署minio,docker下载minio镜像

docker run -d -p 10000:10000 -p 11000:11000 --name minio2 -v ~/var/local/environment/data:/data -e "MINIO_ROOT_USER=root" -e "MINIO_ROOT_PASSWORD=minio123456" minio/minio server /data --console-address ":11000" --address ":10000"

配置说明:这里选择开放两个端口10000和11000。

–console-address ":11000"是将控制台的端口11000暴露

–address ":10000"是我们集成到项目中占用的端口。

nacos上配置oss-dev.yml

minio:
  url: http://192.168.137.129:10000
  accessKey: root
  secretKey: minio123456
  bucketName: test-bucket

minio控制台(可管理文件和bucket):

项目依赖引入,这里使用的是当前的最新版8.3.5

<dependency>
    <groupId>io.minio</groupId>
    <artifactId>minio</artifactId>
    <version>8.3.5</version>
</dependency>

2 后端实现

2.1 MinioProperty

MinioProperty类获取minio配置

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
/**
 * 获取minio配置
 */
@Data
@Configuration
@ConfigurationProperties(prefix = "minio")
public class MinioProperty {
    /**
     * 服务地址
     */
    public String url;
    /**
     * 用户名
     */
    public String accessKey;
    /**
     * 密码
     */
    public String secretKey;
    /**
     * 存储桶名称
     */
    public String bucketName;
    /**
     *  如果是true,则用的是https而不是http,默认值是true
     */
    public Boolean secure = false;
}

2.2 MyMinioClient

MyMinioClient类继承MinioClient来暴露出父类方法供扩展使用。

import com.google.common.collect.Multimap;
import io.minio.CreateMultipartUploadResponse;
import io.minio.ListPartsResponse;
import io.minio.MinioClient;
import io.minio.ObjectWriteResponse;
import io.minio.errors.*;
import io.minio.messages.Part;
import java.io.IOException;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;


public class MyMinioClient extends MinioClient {
    
	protected MyMinioClient(MinioClient client) {
        super(client);
    }

    /**
     * 创建分片上传请求
     * @param bucketName       存储桶
     * @param region           区域
     * @param objectName       对象名
     * @param headers          消息头
     * @param extraQueryParams 额外查询参数
     */
    @Override
    public CreateMultipartUploadResponse createMultipartUpload(String bucketName, String region, String objectName, Multimap<String, String> headers, Multimap<String, String> extraQueryParams) throws NoSuchAlgorithmException, InsufficientDataException, IOException, InvalidKeyException, ServerException, XmlParserException, ErrorResponseException, InternalException, InvalidResponseException {
        return super.createMultipartUpload(bucketName, region, objectName, headers, extraQueryParams);
    }

    /**
     * 完成分片上传,执行合并文件
     * @param bucketName       存储桶
     * @param region           区域
     * @param objectName       对象名
     * @param uploadId         上传ID
     * @param parts            分片
     * @param extraHeaders     额外消息头
     * @param extraQueryParams 额外查询参数
     */
    @Override
    public ObjectWriteResponse completeMultipartUpload(String bucketName, String region, String objectName, String uploadId, Part[] parts, Multimap<String, String> extraHeaders, Multimap<String, String> extraQueryParams) throws NoSuchAlgorithmException, InsufficientDataException, IOException, InvalidKeyException, ServerException, XmlParserException, ErrorResponseException, InternalException, InvalidResponseException {
        return super.completeMultipartUpload(bucketName, region, objectName, uploadId, parts, extraHeaders, extraQueryParams);
    }

    /**
     * 查询分片数据
     * @param bucketName       存储桶
     * @param region           区域
     * @param objectName       对象名
     * @param uploadId         上传ID
     * @param extraHeaders     额外消息头
     * @param extraQueryParams 额外查询参数
     */
    public ListPartsResponse listMultipart(String bucketName, String region, String objectName, Integer maxParts, Integer partNumberMarker, String uploadId, Multimap<String, String> extraHeaders, Multimap<String, String> extraQueryParams) throws NoSuchAlgorithmException, InsufficientDataException, IOException, InvalidKeyException, ServerException, XmlParserException, ErrorResponseException, InternalException, InvalidResponseException {
        return super.listParts(bucketName, region, objectName, maxParts, partNumberMarker, uploadId, extraHeaders, extraQueryParams);
    }

    /**
     * 上传分片上传请求,返回uploadId
     * @param bucketName       存储桶
     * @param region           区域
     * @param objectName       对象名
     * @param headers           消息头
     * @param extraQueryParams 额外查询参数
     */
    public CreateMultipartUploadResponse uploadId(String bucketName, String region, String objectName, Multimap<String, String> headers, Multimap<String, String> extraQueryParams) throws NoSuchAlgorithmException, InsufficientDataException, IOException, InvalidKeyException, ServerException, XmlParserException, ErrorResponseException, InternalException, InvalidResponseException {
        return super.createMultipartUpload(bucketName, region, objectName, headers, extraQueryParams);
    }


    /**
     * 返回临时带签名、过期时间为1天的PUT请求方式的访问URL
     * @param bucketName  桶名
     * @param filePath    Oss文件路径
     * @param queryParams 查询参数
     * @return 临时带签名、过期时间为1天的PUT请求方式的访问URL
     */
    @SneakyThrows
    public String getPresignedObjectUrl(String bucketName, String filePath, Map<String, String> queryParams) {
        return super.getPresignedObjectUrl(
                GetPresignedObjectUrlArgs.builder()
                        .method(Method.PUT)
                        .bucket(bucketName)
                        .object(filePath)
                        .expiry(1, TimeUnit.DAYS)
                        .extraQueryParams(queryParams)
                        .build());
    }
}

2.3 MinioConfig

MinioConfig将MyMinioClient交给spring容器管理方便调用

import io.minio.MinioClient;
import lombok.SneakyThrows;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class MinoConfig {
    @Bean
    @SneakyThrows
    @ConditionalOnMissingBean(MyMinioClient.class)
    public MyMinioClientminioClient(MinioProperty minioProperty) {
        MinioClient minioClient = MinioClient.
                builder()
                .endpoint(minioProperty.getUrl())
                .credentials(minioProperty.getAccessKey(), minioProperty.getSecretKey())
                .build();
        return new MyMinioClient(minioClient);
    }
}

2.4 核心接口实现

2.4.1 createMultipartUpload方法

返回分片上传需要的签名数据URL及 uploadId。

1、fileMultipartDTO 是分片传输实体,包括:文件名fileName、分片数chunkSize、所属文件夹名bucketName。

2、检查redis中是否存在,若存在直接返回对应数据;若redis中不存在检查数据库中是否存在,若存在直接返回对应数据。

3、若redis、数据库中都不存在,处理生成分片。

4、调用minioAPI获得uploadId和签名url

5、相关数据在redis中缓存,并保存到数据库。返回给前端

public Map<String, Object> createMultipartUpload(FileMultipartDTO fileMultipartDTO) {
    log.info("fileMultipartDTO:{}", fileMultipartDTO);
    String fileName = fileMultipartDTO.getFileName();
    Integer chunkSize = fileMultipartDTO.getChunkSize();
    String bucketName = fileMultipartDTO.getBucketName();

    // 1. 根据文件名创建签名
    // 2. 获取uploadId
    String contentType = "application/octet-stream";
    HashMultimap<String, String> headers = HashMultimap.create();
    headers.put("Content-Type", contentType);
    CreateMultipartUploadResponse response = minioClient.uploadId(bucketName, null, fileName, null, null);
    String uploadId = response.result().uploadId();

    Map<String, Object> result = new HashMap<>(3, 1);
    String md5 = MD5Utils.encryptToMd5(fileName + chunkSize + bucketName);
    //查询是否已存在,若存在返回对应数据
    Map<String, Object> checkResult;
    //检查redis中是否已存在
    checkResult = checkIsExistInRedis(md5);
    if (checkResult != null && !checkResult.isEmpty()) {
        log.info(">>>>>>>>>>>>redis中存在");
        return checkResult;
    }
    //检查数据库中是否已存在
    checkResult = checkIsExistInDatabase(md5);
    if (checkResult != null && !checkResult.isEmpty()) {
        log.info(">>>>>>>>>>>>数据库中存在");
        //redis中补充
        checkResult.put("fileName", fileName);
        redisTemplate.opsForValue().set(RedisPrefixForKey.MINIO_KEY + md5, checkResult, 1, TimeUnit.HOURS);
        return checkResult;
    }

    //redis中和数据库中都没有,需要生成
    result.put("uploadId", uploadId);
    // 3. 请求Minio 服务,获取每个分块带签名的上传URL
    Map<String, String> reqParams = new HashMap<>(3, 1);
    reqParams.put("uploadId", uploadId);
    List<String> uploadUrlList = new ArrayList<>();
    // 4. 循环分块数 从1开始
    for (int i = 1; i <= chunkSize; i++) {
        reqParams.put("partNumber", String.valueOf(i));
        // 获取URL
        String uploadUrl = minioCilent.getPresignedObjectUrl(bucketName, fileName, reqParams);
        // 添加到集合
        result.put("chunk_" + (i - 1), uploadUrl);
        uploadUrlList.add(uploadUrl);
    }
    log.info(">>>>分片数据入redis和数据库");
    result.put("fileName", fileName);
    redisTemplate.opsForValue().set(RedisPrefixForKey.MINIO_KEY + md5, result, 8, TimeUnit.HOURS);
    //入库
    String uploadUrls = JSON.toJSONString(uploadUrlList);
    FileRecord fileRecord = new FileRecord()
        .setFileName(fileName)
        .setFileUrl(uploadUrls)
        .setUploadId(uploadId)
        .setTotalChunks(chunkSize)
        .setMd5(md5)
        //上传完成后,接收到合并请求时再改以下参数
        .setCompletedParts(0)
        .setSize(0)
        .setIsUploaded(0);
    fileRecordMapper.insert(fileRecord);
    return result;
}
2.4.2 completeMultipartUpload方法

分片上传完后合并。

FileCompleteDTO是合并请求参数实体,包括:所属文件夹名bucketName、上传iduploadId、文件名objectName。

1、调用minioAPI获取该文件的所有分片partList。

2、将partList中part合并成parts。

3、调用minioAPI完成合并。

4、合并成功后修改数据库信息。

public Result completeMultipartUpload(FileCompleteDTO fileObject) {
    String bucketName = fileObject.getBucketName();
    String uploadId = fileObject.getUploadId();
    String objectName = fileObject.getObjectName();
    int completedParts;
    int size = 0;
    try {
        Part[] parts = new Part[10000];
        ListPartsResponse partResult = minioClient.listMultipart(bucketName, null, objectName, 1000, 0, uploadId, null, null);
        List<Part> partList = partResult.result().partList();
        completedParts = partList.size();
        int partNumber = 1;
        log.info("总片数:================" + completedParts + "========");
        for (Part part : partList) {
            parts[partNumber - 1] = new Part(partNumber, part.etag());   //etag就是分片信息
            partNumber++;
            size += part.partSize();
        }
        minioClient.completeMultipartUpload(bucketName, null, objectName, uploadId, parts, null, null);
    } catch (Exception e) {
        e.printStackTrace();
        return Result.fail("合并失败:" + e.getMessage());
    }
    //合并成功,入库
    //通过uploadId修改对应的记录行
    fileRecordMapper.updateStatusByUploadId(uploadId, completedParts, size);
    return Result.success();
}

3 前端实现

此处对前端实现不做重点描述,项目提供了一个demo,后续可做参考修改。

四、效果展示

当前端选择完文件后:

数据库中新增数据:

redis中新增数据

当前端点击开始上传按钮后,前端服务直接请求Minio服务

完成上传后,数据库更新is_uploaded、Size、completed_parts

查看minio控制台,可以看到该文件。

有关minio分片上传的更多相关文章

  1. ruby - 我可以使用 aws-sdk-ruby 在 AWS S3 上使用事务性文件删除/上传吗? - 2

    我发现ActiveRecord::Base.transaction在复杂方法中非常有效。我想知道是否可以在如下事务中从AWSS3上传/删除文件:S3Object.transactiondo#writeintofiles#raiseanexceptionend引发异常后,每个操作都应在S3上回滚。S3Object这可能吗?? 最佳答案 虽然S3API具有批量删除功能,但它不支持事务,因为每个删除操作都可以独立于其他操作成功/失败。该API不提供任何批量上传功能(通过PUT或POST),因此每个上传操作都是通过一个独立的API调用完成的

  2. ruby-on-rails - 添加回形针新样式不影响旧上传的图像 - 2

    我有带有Logo图像的公司模型has_attached_file:logo我用他们的Logo创建了许多公司。现在,我需要添加新样式has_attached_file:logo,:styles=>{:small=>"30x15>",:medium=>"155x85>"}我是否应该重新上传所有旧数据以重新生成新样式?我不这么认为……或者有什么rake任务可以重新生成样式吗? 最佳答案 参见Thumbnail-Generation.如果rake任务不适合你,你应该能够在控制台中使用一个片段来调用重新处理!关于相关公司

  3. ruby-on-rails - 有没有办法为 CarrierWave/Fog 设置上传进度指示器? - 2

    我在Rails应用程序中使用CarrierWave/Fog将视频上传到AmazonS3。有没有办法判断上传的进度,让我可以显示上传进度如何? 最佳答案 CarrierWave和Fog本身没有这种功能;你需要一个前端uploader来显示进度。当我不得不解决这个问题时,我使用了jQueryfileupload因为我的堆栈中已经有jQuery。甚至还有apostonCarrierWaveintegration因此您只需按照那里的说明操作即可获得适用于您的应用的进度条。 关于ruby-on-r

  4. STM32读取串口传感器数据(颗粒物传感器,主动上传) - 2

    文章目录1.开发板选择*用到的资源2.串口通信(个人理解)3.代码分析(注释比较详细)1.主函数2.串口1配置3.串口2配置以及中断函数4.注意问题5.源码链接1.开发板选择我用的是STM32F103RCT6的板子,不过代码大概在F103系列的板子上都可以运行,我试过在野火103的霸道板上也可以,主要看一下串口对应的引脚一不一样就行了,不一样的就更改一下。*用到的资源keil5软件这里用到了两个串口资源,采集数据一个,串口通信一个,板子对应引脚如下:串口1,TX:PA9,RX:PA10串口2,TX:PA2,RX:PA32.串口通信(个人理解)我就从串口采集传感器数据这个过程说一下我自己的理解,

  5. ruby-on-rails - 安全地显示使用回形针 gem 上传的图像 - 2

    默认情况下:回形针gem将所有附件存储在公共(public)目录中。出于安全原因,我不想将附件存储在公共(public)目录中,所以我将它们保存在应用程序根目录的uploads目录中:classPost我没有指定url选项,因为我不希望每个图像附件都有一个url。如果指定了url:那么拥有该url的任何人都可以访问该图像。这是不安全的。在user#show页面中:我想实际显示图像。如果我使用所有回形针默认设置,那么我可以这样做,因为图像将在公共(public)目录中并且图像将具有一个url:Someimage:看来,如果我将图像附件保存在公共(public)目录之外并且不指定url(同

  6. ruby - 使用法拉第上传文件 - 2

    我在尝试使用Faraday将文件上传到网络服务时遇到问题。我的代码:conn=Faraday.new('http://myapi')do|f|f.request:multipartendpayload={:file=>Faraday::UploadIO.new('...','image/jpeg')}conn.post('/',payload)尝试发布后似乎没有任何反应。当我检查响应时this是我所看到的:#:post,:body=>#,#,@opts={}>,#],@index=0>>,#>],@ios=[#,#,@opts={}>,#],@index=0>,#],@index=0>

  7. ruby-on-rails - 如何在 Rails 2 中保存上传的文件 - 2

    您好,我在Rails2应用程序中集成了ZohoSheet,我可以从本地打开新的ZohoSheet,但是当我在zoho编辑器中单击保存时,它会将文件发送到我的服务器,这是我的生产日志ProcessingZohoController#indexto#(for*.*.*.*at2015-10-0811:24:08)[POST]Parameters:{"controller"=>"zoho","filename"=>#,"content"=>#,"eventsource"=>#,"format"=>#,"id"=>#,"action"=>"index"}ActionController::In

  8. ruby-on-rails - Rails - 回形针 - 多张照片上传不保存 - 2

    我正在尝试在Rails中创建一个产品页面。这包括添加多个图像和文本字段。我有一个产品模型和一个照片模型。我正在使用回形针gem上传照片。但是当我查看产品页面时,我看不到图片。照片未保存到数据库。附言我使用HAML。app/views/products/show.html.haml%bName=@product.name%br%bDescription=@product.description%br-@product.photos.eachdo|photo|=image_tagphoto.image.urlapp/controllers/products_controllerclassP

  9. ruby - 如何知道通过 ruby​​ 上传 ftp 是否成功? - 2

    下面的代码通过ftp上传文件并且它有效。require'net/ftp'ftp=Net::FTP.newftp.passive=trueftp.connect("***")ftp.login("***","***")ftp.chdir"claimsecure-xml-files"ftp.putbinaryfile("file.xls",File.basename("file.xls"))ftp.quit但是如何确定上传是否成功呢? 最佳答案 之后ftp.putbinaryfile("file.xls",File.basename("

  10. ruby-on-rails - Rails/Heroku - 如何对上传的文件进行反病毒扫描? - 2

    如何扫描上传文件中的病毒、木马等?只是想阻止一些用户上传一些讨厌的东西。我正在使用Heroku和AmazonS3。 最佳答案 Checkoutthis它支持REST/JSON防病毒网络服务。 关于ruby-on-rails-Rails/Heroku-如何对上传的文件进行反病毒扫描?,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.com/questions/9640516/

随机推荐