Android端使用Netty框架实现TCP/IP通信

/ AndroidJava / 没有评论 / 4053浏览

Android端采用Netty实现长连接通信

最近项目需要使用TCP/IP协议和服务器进行长连接通信,在网上找了些关于Android 网络通信的框架但是大部分都是采用http的请求框架并不能够满足需求,最后没有办法就只能够自己尝试采用java中使用的Netty框架来实现该功能。

Netty框架Jar包使用

在项目中我使用的是4.0的jar包如有需要可联系也可自行下载

netty-all-4.0.36.Final.jar

Netty客户端的编写

第一步:创建NettyClient类

public class NettyClient {
    public static final int DISCONNECTION = 0;
    public static final int CONNECTING = 1;
    public static final int CONNECTED = 2;

    private EventLoopGroup group = null;
    private Bootstrap bootstrap = null;
    private ChannelFuture channelFuture = null;
    private static NettyClient nettyClient = null;
    private ArrayBlockingQueue<String> sendQueue = new ArrayBlockingQueue<String>(5000);
    private boolean sendFlag = true;
    private SendThread sendThread = new SendThread();
    
    private int connectState = DISCONNECTION;
    private boolean flag = true;
    
    public static NettyClient getInstance() {
        if (nettyClient == null) {
            nettyClient = new NettyClient();
        }
        return nettyClient;
    }
    
    private NettyClient() {
        init();
    }
    
    private void init() {
        setConnectState(DISCONNECTION); 
        bootstrap = new Bootstrap();
        group = new NioEventLoopGroup();
        bootstrap.group(group);
        bootstrap.channel(NioSocketChannel.class);
        bootstrap.option(ChannelOption.TCP_NODELAY, true);
        bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
        bootstrap.handler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel socketChannel) throws Exception {
                ChannelPipeline pipeline = socketChannel.pipeline();
                //心跳包的添加
                //pipeline.addLast("idleStateHandler", new IdleStateHandler(60, 60, 0));
                //对消息格式进行验证(MessageDecoder为自定义的解析验证类因协议规定而定)
                pipeline.addLast("messageDecoder", new MessageDecoder());
                pipeline.addLast("clientHandler", new NettyClientHandler(nettyClient));
            }
        });
        startSendThread();
    }

    public void uninit() {
        stopSendThread();
        if (channelFuture != null) {
            channelFuture.channel().closeFuture();
            channelFuture.channel().close();
            channelFuture = null;
        }
        if (group != null) {
            group.shutdownGracefully();
            group = null;
            nettyClient = null;
            bootstrap = null;
        }
        setConnectState(DISCONNECTION);
        flag = false;
    }
    
    public void insertCmd(String cmd) {
        sendQueue.offer(cmd);
    }
    
    private void stopSendThread() {
        sendQueue.clear();
        sendFlag = false;
        sendThread.interrupt();
    }
    
    private void startSendThread() {
        sendQueue.clear();
        sendFlag = true;
        sendThread.start();
    }
    
    public void connect() {
        if (getConnectState() != CONNECTED) {
            setConnectState(CONNECTING);
            ChannelFuture f = bootstrap.connect("ip", "port");
            f.addListener(listener);
        }
    }
    
    private ChannelFutureListener listener = new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            if (future.isSuccess()) {
                channelFuture = future;
                setConnectState(CONNECTED);
            } else {
                setConnectState(DISCONNECTION);
                future.channel().eventLoop().schedule(new Runnable() {
                    @Override
                    public void run() {
                        if (flag) {
                            connect();
                        }
                    }
                }, 3L, TimeUnit.SECONDS);
            }
        }
    };
    
    public void setConnectState(int connectState) {
        this.connectState = connectState;
    }
    
    public int getConnectState() {
        return connectState;
    }
    
    /**
     * 发送消息的线程
     */
    private class SendThread extends Thread {
        @Override
        public void run() {
            while (sendFlag) {
                try {
                    String cmd = sendQueue.take();
                    if (channelFuture != null && cmd != null) { 
                                          channelFuture.channel().writeAndFlush(ByteBufUtils.getSendByteBuf(cmd));
                    }
                } catch (InterruptedException | UnsupportedEncodingException e) {
                    sendThread.interrupt();
                }
            }
        }
    }
}

第二步:创建NettyClientHandler对接收到的消息进行处理

public class NettyClientHandler extends ChannelInboundHandlerAdapter {
    private NettyClient nettyClient = null;

    public NettyClientHandler(NettyClient nettyClient) {
        super();
        this.nettyClient = nettyClient;
    }
    
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    
        String strMsg = (String) msg;
        Log.d("回复的消息:", strMsg);
        new CommandDecoder(strMsg).decode();//将返回的消息进行解析
        super.channelRead(ctx, msg);
    }
    
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        Log.d("ClientHandler", "-------重连回调------");
        nettyClient.setConnectState(NettyClient.DISCONNECTION);
            nettyClient.connect();
        super.channelInactive(ctx);
    }
    
    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        Log.d("NettyClientHandl", "registered");
        super.channelRegistered(ctx);
    }
    
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
    
        Log.d("NettyClientHandler", "=====连接成功回调=====");
        nettyClient.setConnectState(NettyClient.CONNECTED);
        super.channelActive(ctx);
    }
    
    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        super.channelUnregistered(ctx);
    }
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        Log.d("NettyClientHandl", "网络异常!");
        super.exceptionCaught(ctx, cause);
        ctx.close();
    }
}