工作流项目上线的时候,我们采用容器式部署,先打包成镜像,因为镜像太大,所以我们使用分片上传,今天我们来使用Vue.js+Element-ui结合后端FastApi实现大型文件分片上传。
分片上传并不是什么新概念,尤其是大文件传输的处理中经常会被使用,其实原理很简单,原则就是化整为零,将大文件进行分片处理,切割成若干小文件,随后为每个分片创建一个新的临时文件来保存其内容,待全部分片上传完毕后,后端再按顺序读取所有临时文件的内容,将数据写入新文件中,最后将临时文件再删掉。大体流程请见下图:
其实现在市面上有很多前端的三方库都集成了分片上传的功能,比如百度的WebUploader,遗憾的是它已经淡出历史舞台,无人维护了。现在比较推荐主流的库是vue-simple-uploader,不过饿了么公司开源的elementUI市场占有率还是非常高的,但其实大家所不知道的是,这个非常著名的前端UI库也已经许久没人维护了,Vue3.0版本出来这么久了,也没有做适配,由此可见大公司的开源产品还是需要给业务让步。本次我们利用elementUI的自定义上传结合后端的网红框架FastAPI来实现分片上传。
首先前端需要安装需要的库:
1 2 3 npm install element-ui --save npm install spark-md5 --save npm install axios --save
随后在入口文件main.js中进行配置:
1 2 3 4 5 6 7 8 9 import ElementUI from 'element-ui' import 'element-ui/lib/theme-chalk/index.css' Vue .use (ElementUI )import Axios from 'axios' Vue .prototype .axios = Axios ;import QS from 'qs' Vue .prototype .qs = QS ;
配置好之后,设计方案,前端通过elementUI上传时,通过分片大小的阈值对文件进行切割,并且记录每一片文件的切割顺序(chunk),在这个过程中,通过SparkMD5来计算文件的唯一标识(防止多个文件同时上传的覆盖问题identifier),当然也可以用hash加盐,sha256;在每一次分片文件的上传中,会将分片文件实体,切割顺序(chunk)以及唯一标识(identifier)异步发送到后端接口(fastapi),后端将chunk和identifier结合在一起作为临时文件写入服务器磁盘中,当前端将所有的分片文件都发送完毕后,最后请求一次后端另外一个接口,后端将所有文件合并。
根据方案,前端建立chunkupload.js文件:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 import SparkMD5 from 'spark-md5' function getError (action, option, xhr ) { let msg if (xhr.response ) { msg = `${xhr.response.error || xhr.response} ` } else if (xhr.responseText ) { msg = `${xhr.responseText} ` } else { msg = `fail to post ${action} ${xhr.status} ` } const err = new Error (msg) err.status = xhr.status err.method = 'post' err.url = action return err } function getBody (xhr ) { const text = xhr.responseText || xhr.response if (!text) { return text } try { return JSON .parse (text) } catch (e) { return text } } export default function upload (option ) { if (typeof XMLHttpRequest === 'undefined' ) { return } const spark = new SparkMD5 .ArrayBuffer () const fileReader = new FileReader () const action = option.action const chunkSize = 1024 * 1024 * 1 let md5 = '' const optionFile = option.file let fileChunkedList = [] const percentage = [] for (let i = 0 ; i < optionFile.size ; i = i + chunkSize) { const tmp = optionFile.slice (i, Math .min ((i + chunkSize), optionFile.size )) if (i === 0 ) { fileReader.readAsArrayBuffer (tmp) } fileChunkedList.push (tmp) } fileReader.onload = async (e) => { spark.append (e.target .result ) md5 = spark.end () + new Date ().getTime () console .log ('文件唯一标识--------' , md5) fileChunkedList = fileChunkedList.map ((item, index ) => { const formData = new FormData () if (option.data ) { Object .keys (option.data ).forEach (key => { formData.append (key, option.data [key]) }) formData.append (option.filename , item, option.file .name ) formData.append ('chunkNumber' , index + 1 ) formData.append ('chunkSize' , chunkSize) formData.append ('currentChunkSize' , item.size ) formData.append ('totalSize' , optionFile.size ) formData.append ('identifier' , md5) formData.append ('filename' , option.file .name ) formData.append ('totalChunks' , fileChunkedList.length ) } return { formData : formData, index : index } }) const updataPercentage = (e ) => { let loaded = 0 percentage.forEach (item => { loaded += item }) e.percent = loaded / optionFile.size * 100 option.onProgress (e) } function sendRequest (chunks, limit = 2 ) { return new Promise ((resolve, reject ) => { const len = chunks.length let counter = 0 let isStop = false const start = async ( ) => { if (isStop) { return } const item = chunks.shift () console .log () if (item) { const xhr = new XMLHttpRequest () const index = item.index xhr.onerror = function error (e ) { isStop = true reject (e) } xhr.onload = function onload ( ) { if (xhr.status < 200 || xhr.status >= 300 ) { isStop = true reject (getError (action, option, xhr)) } if (counter === len - 1 ) { resolve () } else { counter++ start () } } if (xhr.upload ) { xhr.upload .onprogress = function progress (e ) { if (e.total > 0 ) { e.percent = e.loaded / e.total * 100 } percentage[index] = e.loaded console .log (index) updataPercentage (e) } } xhr.open ('post' , action, true ) if (option.withCredentials && 'withCredentials' in xhr) { xhr.withCredentials = true } const headers = option.headers || {} for (const item in headers) { if (headers.hasOwnProperty (item) && headers[item] !== null ) { xhr.setRequestHeader (item, headers[item]) } } xhr.send (item.formData ); } } while (limit > 0 ) { setTimeout (() => { start () }, Math .random () * 1000 ) limit -= 1 } }) } try { await sendRequest (fileChunkedList,2 ) const data = { identifier : md5, filename : option.file .name , totalSize : optionFile.size } const fileInfo = await this .axios ({ method : 'post' , url : 'http://localhost:8000/mergefile/' , data : this .qs .stringify (data) }, { headers : { "Content-Type" : "multipart/form-data" } }).catch (error => { console .log ("ERRRR:: " , error.response .data ); }); console .log (fileInfo); if (fileInfo.data .code === 200 ) { const success = getBody (fileInfo.request ) option.onSuccess (success) return } } catch (error) { option.onError (error) } } }
之后建立upload.vue模板文件,并且引入自定义上传控件:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 <template> <div > <el-upload :http-request ="chunkUpload" :ref ="chunkUpload" :action ="uploadUrl" :data ="uploadData" :on-error ="onError" :before-remove ="beforeRemove" name ="file" > <el-button size ="small" type ="primary" > 点击上传</el-button > </el-upload > </div > </template> <script > import chunkUpload from './chunkUpload' export default { data ( ) { return { uploadData : { }, uploadUrl : 'http://localhost:8000/uploadfile/' , chunkUpload : chunkUpload } }, methods : { onError (err, file, fileList ) { this .$store .getters .chunkUploadXhr .forEach (item => { item.abort () }) this .$alert('文件上传失败,请重试' , '错误' , { confirmButtonText : '确定' }) }, beforeRemove (file ) { if (file.percentage !== 100 ) { this .$store .getters .chunkUploadXhr .forEach (item => { item.abort () }) } } } } </script > <style > </style >
这里定义的后端上传接口是:http://localhost:8000/uploadfile/ 合并文件接口是:http://localhost:8000/mergefile/
此时启动前端的vue.js服务:
页面效果见下图:
前端搞定了,下面我们来编写接口,后端的任务相对简单,利用FastAPI接收分片文件、分片顺序以及唯一标识,并且将文件临时写入到服务器中,当最后一个分片文件完成上传后,第二个接口负责按照分片顺序合并所有文件,合并成功后再删除临时文件,用来节约空间,先安装依赖的三方库
1 pip3 install python-multipart
当然了,由于是前后端分离项目,别忘了设置一下跨域,编写main.py:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 from uploadfile import routerfrom fastapi import FastAPI, Requestfrom fastapi.responses import HTMLResponsefrom fastapi.staticfiles import StaticFilesfrom fastapi.templating import Jinja2Templatesfrom model import databasefrom fastapi.middleware.cors import CORSMiddlewareapp = FastAPI() origins = [ "*" ] app.add_middleware( CORSMiddleware, allow_origins=origins, allow_credentials=True , allow_methods=["*" ], allow_headers=["*" ], ) app.mount("/static" , StaticFiles(directory="static" ), name="static" ) templates = Jinja2Templates(directory="templates" ) app.include_router(router) @app.on_event("startup" ) async def startup (): await database.connect() @app.on_event("shutdown" ) async def shutdown (): await database.disconnect() @app.get("/" ) def read_root (): return {"Hello" : "World" }
然后编写uploadfile.py:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 @router.post("/uploadfile/" ) async def uploadfile (file: UploadFile = File(... ), chunkNumber: str = Form(... ), identifier: str = Form(... ) ): task = identifier chunk = chunkNumber filename = '%s%s' % (task,chunk) contents = await file.read() with open ('./static/upload/%s' % filename, "wb" ) as f: f.write(contents) print (file.filename) return {"filename" : file.filename} @router.post("/mergefile/" ) async def uploadfile (identifier: str = Form(... ), filename: str = Form(... ) ): target_filename = filename task = identifier chunk = 1 with open ('./static/upload/%s' % target_filename, 'wb' ) as target_file: while True : try : filename = './static/upload/%s%d' % (task,chunk) source_file = open (filename, 'rb' ) target_file.write(source_file.read()) source_file.close() except IOError: break chunk += 1 os.remove(filename) return {"code" :200 }
值得一提的是这里我们使用UploadFile来定义文件参数,它的优势在于在接收存储文件过程中如果文件过大超过了内存限制就会存储在硬盘中,相当灵活,同时配合await关键字异步读取文件内容,提高了性能和效率。
启动后端服务测试一下效果:
1 uvicorn main:app --reload
可以看到,当我们上传一张2.9m的图片时,前端会根据设置好的的分片阈值将该图片切割为四份,传递给后端接口uploadfile后,后端在根据参数用接口mergefile将其合并,就完美的复原到了原始文件。