最近有用到netty来进行文件的传输,建立通道然后进行文件的读写,主要是用到RandomAccessFile这个类,对可以对文件进行指定位置和指定字节大小读写,下面为具体实现思路:
服务端用于发送FileUploadFile Java对象,里面包括文件,文件信息等,使用RandomAccessFile对文件进行读取,每次1024b(1kb),分片段发送,首次连接时就开始发送第一段,然后由客户端返回,接收的字节大小,再从该位置开始读取下一片段,如果剩余片段小于1kb,则发送完,大于就只发送1kb大小。
客户端用于接收FileUploadFile Java对象,解析对象后,使用RandomAccessFile写入文件,每次写入1kb,然后重新确定位置,再返回数据给服务端。
代码实现
FileUploadFile 类,存放文件对象
@Data @AllArgsConstructor @NoArgsConstructor public class FileUploadFile implements Serializable { //Serializable序列化不能漏 /* 文件 */ private File file; /* 文件名称 */ private String fileName; /* 开始位置 */ private int startPos; /* 文件字节数组 */ private byte[] body; /* 结束位置 */ private int endPos; }
服务端连接Server类
package com.demo.netty.server; import com.demo.LoggerFactory; import com.demo.netty.file.FileUploadFile; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.serialization.ClassResolvers; import io.netty.handler.codec.serialization.ObjectDecoder; import io.netty.handler.codec.serialization.ObjectEncoder; import java.io.File; public class Server { private static final File file = new File("D:\\temp\\res.txt"); private static final int port = 8888; public static void main(String[] args)throws Exception { FileUploadFile fileUploadFile = new FileUploadFile(); fileUploadFile.setFile(file); fileUploadFile.setFileName(file.getName()); new Server().serverStart(port,fileUploadFile ); } public void serverStart(int port, final FileUploadFile fileUploadFile) throws InterruptedException { //1 第一个线程组 是用于接收client端的连接 EventLoopGroup bossgroup = new NioEventLoopGroup(); //2 第二个线程组 用于实际的业务开发处理操作 EventLoopGroup workgroup = new NioEventLoopGroup(); //3 创建一个辅助类Bootstrap,就是对我们的server进行一系列的配置 ServerBootstrap sb = new ServerBootstrap(); //4 把两个线程组添加进来 sb.group(bossgroup,workgroup) //指定使用NioServerSocketChannl这种类型的通道 .channel(NioServerSocketChannel.class) //一定要使用childHandler去绑定具体的事件处理器 .childHandler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(new ObjectEncoder()); ch.pipeline().addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.weakCachingConcurrentResolver(null))); // 最大长度 ch.pipeline().addLast(new ServerHandler(fileUploadFile)); } }); //绑定指定的端口 进行监听 LoggerFactory.log.info("启动netty Server"); ChannelFuture f = sb.bind(port).sync(); f.channel().closeFuture().sync(); //关闭线程 bossgroup.shutdownGracefully(); workgroup.shutdownGracefully(); } }
服务端处理ServerHandler类
package com.demo.netty.server; import com.demo.netty.file.FileUploadFile; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import lombok.extern.slf4j.Slf4j; import java.io.RandomAccessFile; import java.net.InetSocketAddress; @Slf4j public class ServerHandler extends ChannelInboundHandlerAdapter { private int total; //总包数量 private int row;//剩余包数 private int byteRead; //读取结束位置 private volatile int start = 0; //开始位置 private volatile int lastLength = 0;//文件读取长度 public RandomAccessFile randomAccessFile; private FileUploadFile fileUploadFile; public ServerHandler(FileUploadFile fileUploadFile) { if (fileUploadFile.getFile().exists()){ if (!fileUploadFile.getFile().isFile()){ log.warn("this is not a file"); return; } } this.fileUploadFile = fileUploadFile; }
/*
* 客户端连接会触发该方法
* */
@Override public void channelActive(ChannelHandlerContext ctx) { InetSocketAddress remoteAddress = (InetSocketAddress) ctx.channel().remoteAddress(); String ip = remoteAddress.getAddress().getHostAddress(); log.info("有客户端连接,ip地址为:{}",ip); log.info("开始发送FileUploadFile 对象"); try{ randomAccessFile = new RandomAccessFile(fileUploadFile.getFile(),"r");//文件只读,不存在不创建,抛出异常 randomAccessFile.seek(fileUploadFile.getStartPos());//开始位置初始为0 lastLength =1024; if(((randomAccessFile.length())%lastLength)!=0){ total = (int) (randomAccessFile.length()/lastLength+1); }else { total = (int) (randomAccessFile.length()/lastLength); } row = total; byte[] body = new byte[lastLength];//创建一个字节长度为1024数组 if ((byteRead = randomAccessFile.read(body))!=-1){ fileUploadFile.setEndPos(byteRead);//存入结束位置 fileUploadFile.setBody(body);//存入文件字节 ctx.writeAndFlush(fileUploadFile);//发送对象 } row--; log.info("第一段文件读取完毕!"); log.info("文件总长度:[{}],剩余长度:[{}],发送长度:[{}]",randomAccessFile.length(),randomAccessFile.length()-byteRead,byteRead); log.info("总包数:[{}],剩余包数:[{}]",total,row); }catch (Exception e){ e.printStackTrace(); } }
/*
* 接收信息
* */
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception{ if (msg instanceof Integer){ start = (int) msg; if (start!=-1){ randomAccessFile = new RandomAccessFile(fileUploadFile.getFile(),"r"); randomAccessFile.seek(start); //定位文件位置 int a = (int) (randomAccessFile.length()-start); if (a < lastLength){ //剩余长度小于1024一次发完 lastLength = a; } byte[] body = new byte[lastLength]; if ((byteRead = randomAccessFile.read(body))!=-1 && a>0){ fileUploadFile.setBody(body); fileUploadFile.setEndPos(byteRead); ctx.writeAndFlush(fileUploadFile); row--; log.info("文件总长度:[{}],剩余长度:[{}],发送长度:[{}]",randomAccessFile.length(),a,byteRead); log.info("总包数:[{}],剩余包数:[{}]",total,row); }else { log.info("文件已全部发送完毕,即将关闭通道"); randomAccessFile.close(); ctx.flush(); ctx.close(); } } } } /* * 连接异常触发 * */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
客户端连接类Client
package com.demo.netty.client; import com.demo.LoggerFactory; import io.netty.bootstrap.Bootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.serialization.ClassResolvers; import io.netty.handler.codec.serialization.ObjectDecoder; import io.netty.handler.codec.serialization.ObjectEncoder; /* * netty 客户端 * */ public class Client { private static final String host = "127.0.0.1"; public static void main(String[] args) throws Exception { new Client().clientStart(8888); } public void clientStart(int port) throws InterruptedException { EventLoopGroup workgroup = new NioEventLoopGroup(); Bootstrap b = new Bootstrap(); b.group(workgroup); b.channel(NioSocketChannel.class); b.handler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) { ch.pipeline().addLast(new ObjectEncoder()); ch.pipeline().addLast(new ObjectDecoder(ClassResolvers.weakCachingConcurrentResolver(null))); ch.pipeline().addLast(new ClientHandler()); } }); ChannelFuture cf1 = b.connect(host,port).sync(); LoggerFactory.log.info("Netty Client start"); cf1.channel().closeFuture().sync(); workgroup.shutdownGracefully(); } }
客户端处理类ClientHandler
package com.demo.netty.client; import com.demo.netty.file.FileUploadFile; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import lombok.extern.slf4j.Slf4j; import java.io.File; import java.io.RandomAccessFile; @Slf4j public class ClientHandler extends ChannelInboundHandlerAdapter { private int byteRead; private volatile int start = 0; //开始位置 private static final String file_path = "D:\\Download\\"; @Override public void channelActive(ChannelHandlerContext ctx) { log.info("已连接到服务器"); } @Override public void channelRead(ChannelHandlerContext cxt,Object msg) throws Exception { if (msg instanceof FileUploadFile){ log.info("接收到服务端的FileUploadFile对象"); FileUploadFile fileUploadFile = (FileUploadFile) msg; byte[] body = fileUploadFile.getBody(); //文件内容转换为字节流 byteRead = fileUploadFile.getEndPos(); //获取文件上次读取的位置 String file_name = fileUploadFile.getFileName();//获取文件名称 String path = file_path+file_name; //文件路径 File file = new File(path);//根据文件路径创建文件 RandomAccessFile randomAccessFile = new RandomAccessFile(file,"rw");//读写操作,如果文件不存在,则自动创建文件 randomAccessFile.seek(start); //移动至start位置开始写入文件 randomAccessFile.write(body); start+=byteRead; cxt.writeAndFlush(start);//向服务端发送数据,下次开始的位置 log.info("写入完毕,文件路径为:{}",path); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
先启动Server类的主程序,再启动Client类的主程序,就可以运行了,路径可以自己修改一下,运行截图如下:
Server端
client端
最后上个maven依赖:
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.12.Final</version> <scope>compile</scope> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>RELEASE</version> <scope>compile</scope> </dependency> <dependency> <groupId>com.dyuproject.protostuff</groupId> <artifactId>protostuff-core</artifactId> <version>1.0.8</version> </dependency> <dependency> <groupId>com.dyuproject.protostuff</groupId> <artifactId>protostuff-runtime</artifactId> <version>1.0.8</version> </dependency>
标签:
#netty#