netty4 实现一个断点上传大文件功能
主要用于手机端,网络不稳定时上传视频文件,服务端支持断点上传,一是提升速度,二是节省流量。使用netty4 实现。
本来以为文件断点续传功能很简单,不就是提供2个方法:
一个返回已经上传的文件的长度;另外一个负责上传文件呗(请求带上content-range 指明本次上传的内容在整个文件中的位置),然后根据请求提供的位置写呗,太简单了。
但是实际情况还是比较复杂的,关键问题是,上面的描述现在想想只能称作为文件分段上传,而不是断点续传。
断点意味着网络会断,然后断了之后,服务端根本获取不到本次上传的内容,于是下次又只能从头开始传文件。一种解决办法是客户端将文件分成很小的片段(单个片段丢了就整个片段重传),这个方案要求客户端做很多工作,服务端还得根据片段的编号组织文件,总之客户端和服务端都挺麻烦。
于是就想到用netty在写一个服务filestoreApdapterServer,文件上传提交给这个代理服务。这个做法有个前提就是,客户端上传的文件名称保证唯一,并且在请求头里面带着这个名字,以便服务端定位文件。利用的原理是一般长度比较大的消息体,netty会使用chunk传输,我们取得chunk写入临时文件,这样即使网络断了,服务端已经获取的文件内容还是保留在临时文件里面。
流程如下:
1. filestoreApdapterServer将请求的消息体写到临时文件(网络断了也不要紧,读到多少写多少)。
2. 客户端下次传之前先调用getSize获取上传传递的文件长度,我们就在这个getSize方法里面偷偷的将第一步保存的临时文件追加到正式文件里面,然后返回文件长度。
3. 客户端根据获取的服务端文件长度,定位未传的文件位置,读取上传。重复1,2步骤。直到文件上传完成。
看代码:FilestoreAdaptorServerInitializer
public class FilestoreAdaptorServerInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("decoder", new HttpRequestDecoder()); pipeline.addLast("aggregator", new StreamChunkAggregator(-1)); pipeline.addLast("encoder", new HttpResponseEncoder()); pipeline.addLast("handler", new FileUploadAdaptorHandler()); } }
StreamChunkAggregator就是获取上传文件,写临时文件的:
public class StreamChunkAggregator extends MessageToMessageDecoder<HttpObject> { private static final Logger log = LoggerFactory.getLogger(StreamChunkAggregator.class); private volatile FullHttpMessage currentMessage; private volatile OutputStream out; private final int maxContentLength; private volatile File file; private ChannelHandlerContext ctx; public static final int DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS = 1024; private int maxCumulationBufferComponents = DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS; /** * Creates a new instance. */ public StreamChunkAggregator(int maxContentLength) { this.maxContentLength = maxContentLength; } @Override protected void decode(ChannelHandlerContext ctx, HttpObject msg, List<Object> out) throws Exception { FullHttpMessage currentMessage = this.currentMessage; if (msg instanceof HttpMessage) { HttpMessage m = (HttpMessage) msg; if (msg instanceof HttpRequest) { HttpRequest header = (HttpRequest) msg; this.currentMessage = currentMessage = new DefaultFullHttpRequest(header.getProtocolVersion(), header.getMethod(), header.getUri(), Unpooled.compositeBuffer(maxCumulationBufferComponents)); final String localName = m.headers().get("file"); // 取上传文件名 log.debug("upload file name is {}", localName); if(null == localName || "".equals(localName.trim())) { ctx.fireChannelRead(m); } File dir = new File(ServerHelper.getDestDir().getAbsolutePath() + File.separator + ServerHelper.getStorePath(localName)); if(!dir.exists()) dir.mkdirs(); log.debug("upload file path is {}", dir.getAbsolutePath()); File tempFile = new File(dir, localName + ".utmp"); if(tempFile.exists()) { // 文件已经存在可能是上次上传遗留的 tempFile.delete(); } this.file = tempFile; this.out = new FileOutputStream(file, true); } else { throw new Error(); } currentMessage.headers().set(m.headers()); } else if (msg instanceof HttpContent) { assert currentMessage != null; HttpContent chunk = (HttpContent) msg; if (chunk.content().isReadable()) { chunk.retain(); IOUtils.copyLarge(new ByteBufInputStream(chunk.content()), this.out); } final boolean last; if (!chunk.getDecoderResult().isSuccess()) { currentMessage.setDecoderResult( DecoderResult.failure(chunk.getDecoderResult().cause())); last = true; } else { last = chunk instanceof LastHttpContent; } if (last) { this.out.flush(); this.out.close(); this.out = null; this.currentMessage = null; this.file = null; out.add(currentMessage); } } else { throw new Error(); } }
FileUploadAdaptorHandler 这个是最后传成功后通知真正的服务端,并且获取服务的返回,给客户端:
public class FileUploadAdaptorHandler extends SimpleChannelInboundHandler<DefaultFullHttpRequest> { private static final Logger log = LoggerFactory.getLogger(FileUploadAdaptorHandler.class); @Override protected void channelRead0(final ChannelHandlerContext ctx, DefaultFullHttpRequest msg) throws Exception { if(log.isDebugEnabled()) { log.debug("message received: begin"); } final String filename = msg.headers().get("file"); if(filename == null || "".equals(filename.trim())) { //没有文件名 直接返回4001 参数错误 String responseBody = "{\"result_code\": 4001,\"result_msg\": \"请求参数错误\"}"; response(responseBody.getBytes(), HttpResponseStatus.BAD_REQUEST, ctx); } else { // 转发给play服务处理 final CloseableHttpAsyncClient httpclient = HttpAsyncClients.createDefault(); httpclient.start(); try { HttpGet request1 = new HttpGet(ServerHelper.getPlayServer()); request1.setHeader("Client-Session", msg.headers().get("client-session")); request1.setHeader("Content-Range", msg.headers().get("content-range")); request1.setHeader("file", msg.headers().get("file")); httpclient.start(); httpclient.execute(request1, new FutureCallback<org.apache.http.HttpResponse>() { @Override public void failed(Exception e) { try { httpclient.close(); } catch (IOException e1) { log.error(e1.getMessage(), e1); } serve500(ctx, filename); } @Override public void completed(org.apache.http.HttpResponse playResonse) { log.debug("HttpAsyncClient callback"); int status = playResonse.getStatusLine().getStatusCode(); log.debug("HttpAsyncClient callback playResonse status is {}", status); if(status != 200) { ServerHelper.deleteTmpFile(filename); } HttpEntity entity = playResonse.getEntity(); byte[] bytes = new byte[(int) entity.getContentLength()]; try { IOUtils.read(entity.getContent(), bytes); response(bytes, new HttpResponseStatus(status, ""), ctx); } catch (Exception e) { log.error(e.getMessage(), e); serve500(ctx, filename); } finally { try { httpclient.close(); } catch (IOException e1) { log.error(e1.getMessage(), e1); } } } @Override public void cancelled() { try { httpclient.close(); } catch (IOException e1) { log.error(e1.getMessage(), e1); } serve500(ctx, filename); } }); } catch (Exception e) { httpclient.close(); log.error(e.getMessage(), e); serve500(ctx, filename); } } if(log.isDebugEnabled()) { log.debug("message received: end"); } }
真正服务提供2个方法,一个是获取长度,一个是接收filestoreAapterServer请求的方法:
public static void getFileLength(String name) { Logger.debug("getFileLength path is " + FileHelper.getStorgePath(name)); File file = new File(FileHelper.getStorgePath(name)); long length = file.length(); response.status = StatusCode.OK; response.setHeader("Content-Size", String.valueOf(length)); LocalFile file = LocalFile .find(。。。).first(); if(file != null){ // 如果数据中有记录则认为文件已经保存完整 Logger.debug("getFileLength file has been in database"); FileResult result = new FileResult(); 。。。 throw new CustomJsonResult(result); } File fileTmp = new File(FileHelper.getStorgePath(name) + FileHelper.TMP_SUFFIX); if(Logger.isDebugEnabled()) Logger.debug("getFileLength temp path is " + fileTmp.getAbsolutePath() + ", existed is: " + fileTmp.exists()); if(fileTmp.exists()) { // 临时文件存在,则保存临时文件 Logger.debug("getFileLength save tmp file"); try { FileHelper.saveFileFromTmp(fileTmp, file); } catch (IOException ingore) { Logger.error(ingore.getMessage(), ingore); } length = file.length(); } response.setHeader("Content-Size", String.valueOf(length)); }
public static void saveUploadFile() { String filename = getFileName(); Logger.debug("saveUploadFile name is %s", filename); long total = getFileTotal(); // 整个文件的大小 File tempFile = new File(FileHelper.getStorgePath(filename) + FileHelper.TMP_SUFFIX); if(Logger.isDebugEnabled()) { Logger.debug("saveUploadFile upload tmp file is: " + tempFile.getAbsolutePath()); } if(!tempFile.exists()) { ApiResult result = new ApiResult(); result.resultCode = ApiResultCode.UPLOAD_FILE_FAIL; response.status = Http.StatusCode.INTERNAL_ERROR; throw new CustomJsonResult(result); } File destFile = new File(FileHelper.getStorgePath(filename)); if(destFile.length() >= total) { // 已经上传成功了 需要删除临时文件 FileUtils.deleteQuietly(tempFile); if(Logger.isDebugEnabled()) { Logger.debug("saveUploadFile video has upload completely"); } // 已经完整了,如果数据库不存在保存数据库 .... FileResult result = new FileResult(); result.resultCode = ApiResultCode.SUCCESS; result.videoUrl = video.videoUrl; result.shortUrl = video.shortUrl; throw new CustomJsonResult(result); } try { FileHelper.saveFileFromTmp(tempFile, destFile); } catch (IOException e) { Logger.error("saveUploadFile " + e.getMessage(), e); ApiResult result = new ApiResult(); result.resultCode = ApiResultCode.UPLOAD_FILE_FAIL; response.status = Http.StatusCode.INTERNAL_ERROR; throw new CustomJsonResult(result); } afterWrite(filename, destFile, total); //一些后续工作,如果文件保存完整,保存数据库返回成功结果给客户端 }
这个解决方法,和我们的服务绑定的比较紧,不能解决较为通用的问题 只是提出一种思路。