通过netty进行二进制BIN文件传输的方法

meakel通信 2023-06-11 10:03:06 6972阅读 举报
最近有用到netty来进行文件的传输,建立通道然后进行文件的读写,主要是用到RandomAccessFile这个类,对可以对文件进行指定位置和指定字节大小读写,下面为具体实现思路:

服务端用于发送FileUploadFile Java对象,里面包括文件,文件信息等,使用RandomAccessFile对文件进行读取,每次1024b(1kb),分片段发送,首次连接时就开始发送第一段,然后由客户端返回,接收的字节大小,再从该位置开始读取下一片段,如果剩余片段小于1kb,则发送完,大于就只发送1kb大小。
客户端用于接收FileUploadFile Java对象,解析对象后,使用RandomAccessFile写入文件,每次写入1kb,然后重新确定位置,再返回数据给服务端。

代码实现
FileUploadFile 类,存放文件对象
@Data
@AllArgsConstructor
@NoArgsConstructor
public class FileUploadFile implements Serializable { //Serializable序列化不能漏
    /* 文件 */
    private File file;
    /* 文件名称 */
    private String fileName;
    /* 开始位置 */
    private int startPos;
    /* 文件字节数组 */
    private byte[] body;
    /* 结束位置 */
    private int endPos;

}

服务端连接Server类

package com.demo.netty.server;

import com.demo.LoggerFactory;
import com.demo.netty.file.FileUploadFile;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;

import java.io.File;

public class Server {

    private static final File file = new File("D:\\temp\\res.txt");
    private static final int port = 8888;

    public static void main(String[] args)throws Exception {
        FileUploadFile fileUploadFile = new FileUploadFile();
        fileUploadFile.setFile(file);
        fileUploadFile.setFileName(file.getName());
        new Server().serverStart(port,fileUploadFile );
    }

    public void serverStart(int port, final FileUploadFile fileUploadFile) throws InterruptedException {
        //1 第一个线程组 是用于接收client端的连接
        EventLoopGroup bossgroup = new NioEventLoopGroup();
        //2 第二个线程组 用于实际的业务开发处理操作
        EventLoopGroup workgroup = new NioEventLoopGroup();
        //3 创建一个辅助类Bootstrap,就是对我们的server进行一系列的配置
        ServerBootstrap sb = new ServerBootstrap();
        //4 把两个线程组添加进来
        sb.group(bossgroup,workgroup)
                //指定使用NioServerSocketChannl这种类型的通道
                .channel(NioServerSocketChannel.class)
                //一定要使用childHandler去绑定具体的事件处理器
                .childHandler(new ChannelInitializer<Channel>() {
                    @Override
                    protected void initChannel(Channel ch) throws Exception {
                        ch.pipeline().addLast(new ObjectEncoder());
                        ch.pipeline().addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.weakCachingConcurrentResolver(null))); // 最大长度
                        ch.pipeline().addLast(new ServerHandler(fileUploadFile));
                    }
                });
        //绑定指定的端口 进行监听
        LoggerFactory.log.info("启动netty Server");
        ChannelFuture f = sb.bind(port).sync();

        f.channel().closeFuture().sync();
        //关闭线程
        bossgroup.shutdownGracefully();
        workgroup.shutdownGracefully();
    }
}


服务端处理ServerHandler类

package com.demo.netty.server;
import com.demo.netty.file.FileUploadFile;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;

import java.io.RandomAccessFile;
import java.net.InetSocketAddress;
@Slf4j
public class ServerHandler extends ChannelInboundHandlerAdapter {
    private int total; //总包数量
    private int row;//剩余包数
    private int byteRead; //读取结束位置
    private volatile int start = 0; //开始位置
    private volatile int lastLength = 0;//文件读取长度
    public RandomAccessFile randomAccessFile;
    private FileUploadFile fileUploadFile;

    public ServerHandler(FileUploadFile fileUploadFile) {
        if (fileUploadFile.getFile().exists()){
            if (!fileUploadFile.getFile().isFile()){
                log.warn("this is not a file");
                return;
            }
        }
        this.fileUploadFile = fileUploadFile;
    }
    /*
    * 客户端连接会触发该方法
    * */
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        InetSocketAddress remoteAddress = (InetSocketAddress) ctx.channel().remoteAddress();
        String ip = remoteAddress.getAddress().getHostAddress();
        log.info("有客户端连接,ip地址为:{}",ip);
        log.info("开始发送FileUploadFile 对象");
        try{
            randomAccessFile = new RandomAccessFile(fileUploadFile.getFile(),"r");//文件只读,不存在不创建,抛出异常
            randomAccessFile.seek(fileUploadFile.getStartPos());//开始位置初始为0
            lastLength =1024;
            if(((randomAccessFile.length())%lastLength)!=0){
                total = (int) (randomAccessFile.length()/lastLength+1);
            }else {
                total = (int) (randomAccessFile.length()/lastLength);
            }
            row = total;
            byte[] body = new byte[lastLength];//创建一个字节长度为1024数组
            if ((byteRead = randomAccessFile.read(body))!=-1){
                fileUploadFile.setEndPos(byteRead);//存入结束位置
                fileUploadFile.setBody(body);//存入文件字节
                ctx.writeAndFlush(fileUploadFile);//发送对象
            }
            row--;
            log.info("第一段文件读取完毕!");
            log.info("文件总长度:[{}],剩余长度:[{}],发送长度:[{}]",randomAccessFile.length(),randomAccessFile.length()-byteRead,byteRead);
            log.info("总包数:[{}],剩余包数:[{}]",total,row);

        }catch (Exception e){
            e.printStackTrace();
        }
    }


    /*
    * 接收信息
    * */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception{
            if (msg instanceof Integer){
                start  = (int) msg;
                if (start!=-1){
                    randomAccessFile = new RandomAccessFile(fileUploadFile.getFile(),"r");
                    randomAccessFile.seek(start); //定位文件位置
                    int  a = (int) (randomAccessFile.length()-start);
                    if (a < lastLength){ //剩余长度小于1024一次发完
                        lastLength = a;
                    }
                    byte[] body = new byte[lastLength];
                    if ((byteRead = randomAccessFile.read(body))!=-1 && a>0){
                        fileUploadFile.setBody(body);
                        fileUploadFile.setEndPos(byteRead);
                        ctx.writeAndFlush(fileUploadFile);
                        row--;
                        log.info("文件总长度:[{}],剩余长度:[{}],发送长度:[{}]",randomAccessFile.length(),a,byteRead);
                        log.info("总包数:[{}],剩余包数:[{}]",total,row);
                    }else {
                        log.info("文件已全部发送完毕,即将关闭通道");
                        randomAccessFile.close();
                        ctx.flush();
                        ctx.close();
                    }
                }
            }
    }

    /*
    * 连接异常触发
    * */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}




客户端连接类Client

package com.demo.netty.client;

import com.demo.LoggerFactory;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;

/*
* netty 客户端
* */
public class Client {

    private static final String host = "127.0.0.1";

    public static void main(String[] args) throws Exception {
        new Client().clientStart(8888);
    }

    public void clientStart(int port) throws InterruptedException {
        EventLoopGroup workgroup = new NioEventLoopGroup();
        Bootstrap b = new Bootstrap();
        b.group(workgroup);
        b.channel(NioSocketChannel.class);
        b.handler(new ChannelInitializer<Channel>() {
            @Override
            protected void initChannel(Channel ch) {
                ch.pipeline().addLast(new ObjectEncoder());
                ch.pipeline().addLast(new ObjectDecoder(ClassResolvers.weakCachingConcurrentResolver(null)));
                ch.pipeline().addLast(new ClientHandler());
            }
        });
        ChannelFuture cf1 = b.connect(host,port).sync();

        LoggerFactory.log.info("Netty Client start");

        cf1.channel().closeFuture().sync();
        workgroup.shutdownGracefully();
    }
}

客户端处理类ClientHandler

package com.demo.netty.client;


import com.demo.netty.file.FileUploadFile;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;
import java.io.File;
import java.io.RandomAccessFile;
@Slf4j
public class ClientHandler extends ChannelInboundHandlerAdapter {

    private int byteRead;
    private volatile int start = 0; //开始位置
    private static final String file_path = "D:\\Download\\";

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        log.info("已连接到服务器");
    }


    @Override
    public void channelRead(ChannelHandlerContext cxt,Object msg) throws Exception {
       if (msg instanceof FileUploadFile){
            log.info("接收到服务端的FileUploadFile对象");
            FileUploadFile fileUploadFile = (FileUploadFile) msg;
            byte[] body = fileUploadFile.getBody(); //文件内容转换为字节流
            byteRead = fileUploadFile.getEndPos(); //获取文件上次读取的位置
            String file_name = fileUploadFile.getFileName();//获取文件名称
            String path = file_path+file_name; //文件路径
            File file = new File(path);//根据文件路径创建文件
            RandomAccessFile randomAccessFile = new RandomAccessFile(file,"rw");//读写操作,如果文件不存在,则自动创建文件
            randomAccessFile.seek(start); //移动至start位置开始写入文件
            randomAccessFile.write(body);
            start+=byteRead;
            cxt.writeAndFlush(start);//向服务端发送数据,下次开始的位置
            log.info("写入完毕,文件路径为:{}",path);
       }
    }


    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }

}
先启动Server类的主程序,再启动Client类的主程序,就可以运行了,路径可以自己修改一下,运行截图如下:
Server端

client端


最后上个maven依赖:

<dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.12.Final</version>
            <scope>compile</scope>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>RELEASE</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>com.dyuproject.protostuff</groupId>
            <artifactId>protostuff-core</artifactId>
            <version>1.0.8</version>
        </dependency>
        <dependency>
            <groupId>com.dyuproject.protostuff</groupId>
            <artifactId>protostuff-runtime</artifactId>
            <version>1.0.8</version>
        </dependency>

标签: #netty#

版权声明:
作者:meakel
链接:https://www.dianziwang.net/p/11cf49d4c4f0b1.html
来源:通信
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以点击 “举报”


登录 后发表评论
0条评论
还没有人评论过~