Android Netty框架的使用

/ AndroidJava / 没有评论 / 2057浏览

Netty框架的使用

1 TCP开发范例

发送数据

{
    "userid":"mm910@mbk.com",
    "devicetype":3,
    "accounttype":0,
    "username":"",
    "password":"e10adc3949ba59abbe56e057f20f883e",
    "meiid":1000217,
    "deviceid":"864376025909275"
}

接受数据

{
    "message":"登录成功",
    "sessionkey":"EF81E1BD132D40DE8F1707A521D8B5A6",
    "mainsn":"C001B00010000002",
    "code":0
}

2 上代码

1 业务层代码

public class MainActivity extends Activity {

    private Base1106Entity entity1106;// 登录云棒协议

    public static final int RESPONSE_SUCCESS = 0x401;
    public static final int RESPONSE_FAIL = 0x402;
    public static final int RESPONSE_TIMEOUT = 0x403;
    public static final int REQUEST_HEARTBEAT_TIMEOUT = 0x410; //心跳超时
    public static final int  NOT_LOGIN= 0x411; //用户未登录

    public Handler mHandler = new Handler() {

        @Override
        public void handleMessage(Message msg) {
            super.handleMessage(msg);
            switch (msg.what) {
                case  RESPONSE_SUCCESS:
                    IEntity entity = (IEntity) msg.obj;
                    if (entity != null) {
                        responseSuccess((IEntity) msg.obj);
                    } else {
                        responseFail(-1, "返回数据为空!");
                    }
                    break;
                case   RESPONSE_FAIL:// 请求失败
                    if (msg != null && msg.obj != null)
                        responseFail(-10001, (String) msg.obj);
                    break;
                case   RESPONSE_TIMEOUT:// 请求超时
                    if (msg != null && msg.obj != null)
                        responseFail(-10000, (String) msg.obj);
                    break;
                case   NOT_LOGIN:// 用户未登录
                    if (msg != null && msg.obj != null)
                        responseFail(-10002, (String) msg.obj);
                    break;
            }
        }
    };


    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);
        Button login = (Button)findViewById(R.id.login);
        login.setOnClickListener(new View.OnClickListener() {
            @Override
            public void onClick(View v) {
                reqEntity1106();
            }
        });
    }

    public void reqEntity1106() {
        entity1106 = new Base1106Entity();
        entity1106.setMeiid(1000217);
        entity1106.setUserid("mm910@mbk.com");
        entity1106.setUsername("");
        entity1106.setPassword("e10adc3949ba59abbe56e057f20f883e");
        entity1106.setAccounttype( 0 );
        entity1106.setDevicetype(3);
        entity1106.setDeviceid("864376025909275");
        entity1106.setHandler(mHandler);
        ClientConnectFactory.getInstance().sendEntity(entity1106);
    }

    public void responseSuccess(IEntity entity) {
        Toast.makeText(MainActivity.this,  
                       ((Base1106Entity)entity).toString(), Toast.LENGTH_LONG).show();
    }

    public void responseFail(int code, String msg) {
        Toast.makeText(MainActivity.this, msg, Toast.LENGTH_SHORT).show();
    }

}
public class MeiApp extends Application{
    public static Context mContext;
    @Override
    public void onCreate() {
        super.onCreate();
        mContext = this;

        ClientConnectFactory.getInstance().init(mContext);
    }
}

2 业务通讯层代码

public interface IClientConnect {

  public void isConnect(String netType);

  public void sendAgain();

  public void sendMsgFail(String netType, byte[] msg);

  public void connectFail(String netType);

  // 根据实体发送数据
  public void sendEntity(IEntity entity);

  public void sendByte(byte[] b);

  // 关闭
  public void isClose();

  // 清除当前数据
  public void isClearMsg();

  public void callBack(PackageHeader header, byte[] data, String desc, int type);

  public void callBack(IEntity entity, String desc);
}
public abstract class BaseClientMgr extends Subject implements IClientConnect {

    protected boolean isRunning; // 当前是否正在连接
    protected boolean isSending; // 是否正在发送 线程是否被占用
    private int mPort; // 连接服务器的端口号
    private int mCommunication; // 通讯类型
    private int heartTimeOutCount = 0; // 记录心跳超时次数
    protected int function = 1200; // 关闭连接功能号

    public static final int RESPONSE_SUCCESS = 0x401;
    public static final int RESPONSE_FAIL = 0x402;
    public static final int RESPONSE_TIMEOUT = 0x403;
    public static final int REQUEST_HEARTBEAT_TIMEOUT = 0x410; // 心跳超时
    public static final int NOT_LOGIN = 0x411; // 用户未登录

    private String mConnectKey = "BasicServicesMgr";
    private String mHost; // 连接服务器的IP地址
    protected ArrayList<IEntity> mEntityMsg = null; // 待发送消息集合

    protected Context mContext; // Context对象
    protected CommunicationThreadManager mManager; // 该通讯层管理器
    protected ParseByteThread mParseByteThread = null; // 数据解析线程
    protected ExecutorService executor; // 线程连接池

    protected BaseClientMgr(String host, int port, String key) {
        init(host, port, key);
    }

    // 初始化
    private void init(String host, int port, String key) {
        this.mContext = MeiApp.mContext;
        isRunning = false;
        isSending = false;
        mHost = host;
        mPort = port;
        mConnectKey = key;
        mEntityMsg = new ArrayList<IEntity>();
        executor = Executors.newFixedThreadPool(10);
        mParseByteThread = new ParseByteThread(this);
        executor.execute(mParseByteThread);
    }

    protected Handler basicHandler = new Handler() {

        @Override
        public void handleMessage(Message msg) {
            super.handleMessage(msg);
            switch (msg.what) {
                case ClientConstants.REQUEST:
                    // 发送请求 连接占用
                    if (mEntityMsg != null && mEntityMsg.size() > 0) {
                        isSending = true;
                        // 清除handler的消息
                        basicHandler.removeMessages(ClientConstants.REQUEST);
                        basicHandler.removeMessages(
                            ClientConstants.REQUEST_CREATE_CONNECT);
                        basicHandler.removeMessages(ClientConstants.REQUEST_SEND_MESSAGE);
                        // 请求类型 当为网络请求时判断网络状态 建立连接
                        // 检查连接是否可用
                        if (isRunning) {
                            // 直接发送消息
                            basicHandler.removeMessages(
                                ClientConstants.REQUEST_SEND_MESSAGE);
                            basicHandler.sendEmptyMessage(
                                ClientConstants.REQUEST_SEND_MESSAGE);
                        } else {
                            // 建立连接
                            basicHandler.removeMessages(
                                ClientConstants.REQUEST_CREATE_CONNECT);
                            Message msgCreate = Message.obtain();
                            msgCreate.what = ClientConstants.REQUEST_CREATE_CONNECT;
                            msgCreate.arg1 = 0;
                            basicHandler.sendMessage(msgCreate);
                        }

                    }
                    break;
                case ClientConstants.REQUEST_CREATE_CONNECT:
                    // 建立连接
                    Log.i("mbk", "建立连接!");
                    isConnect("netty");

                    break;
                case ClientConstants.REQUEST_SEND_MESSAGE:
                    // 发送消息
                    Log.i("mbk", "发送消息!");
                    if (isRunning) {
                        if (mEntityMsg.size() > 0) {
                            Log.i("mbk", "发送数据!");
                            sendData(mEntityMsg.get(0));
                            basicHandler.removeMessages(ClientConstants.REQUEST_TIMEOUT);
                            // 设置请求超时
                            basicHandler.sendEmptyMessageDelayed(
                                ClientConstants.REQUEST_TIMEOUT, 3000);
                        } else {
                            Log.i("mbk", "数据发送完成!");
                            isSending = false;
                        }
                    } else {
                        // 重新建立连接
                        basicHandler.removeMessages(
                            ClientConstants.REQUEST_CREATE_CONNECT);
                        basicHandler.sendEmptyMessage(
                            ClientConstants.REQUEST_CREATE_CONNECT);
                    }
                    break;
                case ClientConstants.REQUEST_SEND_HEARTBEAT:
                    Log.i("mbk", "发送心跳!");
                    mManager.sendHeart(function);
                    heartTimeOutCount++;
                    Log.i("lzy02", "heartTimeOutCount" + heartTimeOutCount);
                    if (heartTimeOutCount >= 3) {// 大于等于3则认为与云棒无连接
                        callBack(null, null, "心跳超时!", REQUEST_HEARTBEAT_TIMEOUT);
                    }
                    // // 发送心跳
                    basicHandler.removeMessages(
                        ClientConstants.REQUEST_SEND_HEARTBEAT);
                    basicHandler.sendEmptyMessageDelayed(
                        ClientConstants.REQUEST_SEND_HEARTBEAT, 3000);
                    break;
                case ClientConstants.REQUEST_TIMEOUT:// 请求超时
                    Log.i("mbk", "请求超时!");
                    isRunning = false;
                    callBack(null, null, "请求超时!", RESPONSE_TIMEOUT);
                    break;

            }
        }
    };

    public void sendHeartbeat(int function) {
        this.function = function;
    }

    public void sendData(IEntity entity) {
        sendByte(ClientSocketUtils.sendDatas(mEntityMsg.get(0)));
    }

    // 建立连接
    @Override
    public void isConnect(String netType) {
        UdpEntity udpEntity = null;
        int type = CommunicationThreadManager.MBK_COMMUNICATION_NETTY;
        if (netType.equals("netty")) {
            // 建立一个netty连接
            type = CommunicationThreadManager.MBK_COMMUNICATION_NETTY;
            mManager = new CommunicationThreadManager(
                mContext, null, mConnectKey, "192.168.31.241", 
                mPort, type, mCommunicationCallBack);
            Log.i("mbk", "发送地址---" + "192.168.31.241");
            Log.i("mbk", "发送端口号---" + mPort);

            /*
       * if (udpEntity != null) { Log.i("lzy02",
       * "udpEntity---209----------udpEntity=="+udpEntity.getYunbangIp());
       * mManager = new CommunicationThreadManager(mContext, null, mConnectKey,
       * "192.168.31.241", mPort, type, mCommunicationCallBack);
       * //Toast.makeText(mContext, "已通过Netty发送 ", Toast.LENGTH_SHORT).show();
       * Log.i("mbk","netty发送云棒IP号---" + udpEntity.getYunbangIp()); } else {
       * Log.i("lzy02", "udpEntity---211----------udpEntity == null");
       * callBack(null, null, "无法连接netty!", RESPONSE_FAIL); }
       */
            // 使用netty是时候 清理p2p
            P2pClearUp();
        } else {

        }
        Log.i("mbk", "初始化 连接服务器!" + netType);
    }

    @Override
    public void sendByte(byte[] b) {
        try {
            if (mManager != null) {
                mManager.sendDataToServer(new SendData(b));
            } else {
                isClose();
            }
        } catch (InterruptedException e) {
            isClose();
        }
    }

    // 服务端回调
    private CommunicationCallBack mCommunicationCallBack = new CommunicationCallBack() {

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            Log.i("mbk", "----------请求异常----------" + mCommunication);
            isRunning = false;
            callBack(null, null, "请求异常!", RESPONSE_FAIL);

        }

        @Override
        public void connected(ChannelHandlerContext ctx) {
            Log.i("mbk", "----------连接成功----------" + mCommunication);
            // mChx = ctx;
            isRunning = true;
            sendAgain();
        }

        @Override
        public void connectFailure(Exception e) {
            Log.i("mbk", "----------连接服务器失败----------" + mCommunication);
            isRunning = false;
            callBack(null, null, "连接服务器失败!", RESPONSE_FAIL);
        }

        @Override
        public void channelRead(ChannelHandlerContext ctx, byte[] msg) {
            Log.i("mbk", "----------服务端返回----------" + mCommunication);
            if (mParseByteThread != null) {
                mParseByteThread.sendParseByte(msg);
            }
        }

        @Override
        public void communicationOutTime() {
            Log.i("mbk", "----------连接超时----------" + mCommunication);
            isRunning = false;
            callBack(null, null, "连接超时!", RESPONSE_TIMEOUT);
        }

        @Override
        public void questTimeOut() {
            Log.i("mbk", "----------请求超时----------" + mCommunication);
            isRunning = false;
            callBack(null, null, "请求超时!", RESPONSE_TIMEOUT);
        }
    };

    @Override
    public void sendAgain() {
        // 连接成功 发起请求
        Log.i("mbk", "连接成功,数据重新发送!");

        // basicHandler.sendEmptyMessage(ClientConstants.REQUEST_SEND_MESSAGE);
        basicHandler.sendEmptyMessageDelayed(ClientConstants.REQUEST_SEND_MESSAGE, 500);
    }

    // 接收需要发送的实体
    @Override
    public void sendEntity(IEntity entity) {
        if (mEntityMsg != null && entity != null) {
            mEntityMsg.add(entity);
            if (!isSending) {
                // 启动一个发送
                Log.i("mbk", "发起请求!REQUEST_NET");
                basicHandler.sendEmptyMessage(ClientConstants.REQUEST);
            }
        }
        // if (mEntityMsg != null && mEntityMsg.size() == 2) {
        // mEntityMsg.remove(1);
        // }

    }

    @Override
    public void callBack(PackageHeader header, byte[] data, String desc, int type) {
        basicHandler.removeMessages(ClientConstants.REQUEST_SEND_HEARTBEAT);

        switch (type) {
            case RESPONSE_SUCCESS:
                heartTimeOutCount = 0;
                basicHandler.sendEmptyMessageDelayed(
                    ClientConstants.REQUEST_SEND_HEARTBEAT, 20000);
                switch (header.getFunction()) {
                    case 9998:
                        Log.i("mbk", "服务端关闭!");
                        isClose();
                        break;
                    case 9999:
                        Log.i("mbk", "成功返回一个心跳!");
                        break;
                    case 999:
                        Log.i("mbk", "未知错误!");
                        callBack(null, null, "未知错误", RESPONSE_FAIL);
                        break;
                    default:
                        responseSuccess(header, data, desc, type);
                        break;
                }
                break;
            case REQUEST_HEARTBEAT_TIMEOUT:// 心跳超时3次认为与云棒无连接
                /*
         * Intent m2Intent = new Intent(MeiConfigs.NETWORK_PROMPT);
         * m2Intent.putExtra("islogin", "3003");
         * MeiApp.mContext.sendBroadcast(m2Intent);
         */
                break;
            case RESPONSE_FAIL:
                responseFail(header, data, desc, type);
                break;
            case RESPONSE_TIMEOUT:
                responseFail(header, data, desc, type);
                break;
        }
    }

    // 请求成功
    public void responseSuccess(PackageHeader header,
                                byte[] data, String desc, int type) {
        try {
            if (mEntityMsg.size() > 0 && mEntityMsg.get(0).getHandler() != null) {
                IEntity entity = mEntityMsg.get(0);
                if (data != null && data.length > 0) {
                    entity.onDecode(new String(data, "utf-8"));
                    // Log.i("mbk","云棒返回---" + "---" + new String(data, "utf-8"));
                    // 请求成功
                    Log.i("lzy02", "1--------------" + entity.getCode());
                    Log.i("mbk", "返回一条数据!");
                    Message msg = Message.obtain();
                    msg.obj = entity;
                    msg.arg1 = header.getFunction();
                    msg.what = type;
                    entity.getHandler().sendMessage(msg);
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
            isClose();
        }
        if (mEntityMsg != null && mEntityMsg.size() > 0) {
            mEntityMsg.remove(0);
        }
        basicHandler.removeMessages(ClientConstants.REQUEST_TIMEOUT);
        isSending = false;
        if (mEntityMsg.size() > 0) {
            basicHandler.sendEmptyMessage(ClientConstants.REQUEST);
        }
    }

    // 请求失败
    public void responseFail(PackageHeader header, byte[] data, String desc, int type) {
        Log.i("mbk", "请求失败!   " + desc);
        Message msg = Message.obtain();
        msg.obj = desc;
        msg.arg1 = 0;
        msg.what = type;
        if (mEntityMsg.size() > 0 && mEntityMsg.get(0).getHandler() != null) {
            mEntityMsg.get(0).getHandler().sendMessage(msg);
        }
        isClose();
    }

    // 请求本地缓存返回
    @Override
    public void callBack(IEntity entity, String desc) {
        Log.i("mbk", "回一返个缓存数据!  ");
        if ("cache".equals(desc)) {
            if (entity != null && entity.getHandler() != null) {
                Message msg = Message.obtain();
                msg.obj = entity;
                msg.what = RESPONSE_SUCCESS;
                entity.getHandler().sendMessage(msg);
            }
        }
    }

    public void P2pClearUp() {
        if (mManager != null) {
            mManager.p2pCleanup();
        }
    }

    @Override
    public void isClose() {
        Log.i("mbk", "关闭连接!" + isRunning);
        if (mManager != null) {
            if (isRunning) {
                try {
                    mManager.sendDataToServer(
                        new SendData(ClientSocketUtils.sendExit(function)));
                } catch (InterruptedException e) {
                }
            } else {
                mManager.closeTheadManager();
                mManager = null;
            }
        }
        if (mParseByteThread != null)
            mParseByteThread.closeThread();
        if (mEntityMsg != null) {
            mEntityMsg.clear();
        }
        P2pClearUp();
        basicHandler.removeMessages(ClientConstants.REQUEST_SEND_HEARTBEAT);
        basicHandler.removeMessages(ClientConstants.REQUEST_TIMEOUT);
        isRunning = false;
        isSending = false;
    }

    @Override
    public void sendMsgFail(String netType, byte[] msg) {
    }

    @Override
    public void connectFail(String netType) {
    }

    @Override
    public void isClearMsg() {
        if (mEntityMsg != null) {
            mEntityMsg.clear();
        }
    }

}
public class BasicServicesMgr extends BaseClientMgr {

    public static BasicServicesMgr instance = null;

    public static BasicServicesMgr getInstance() {
        if (instance == null) {
            instance = new BasicServicesMgr();
        }
        return instance;
    }

    private BasicServicesMgr() {
        super( "192.168.43.1", 9223, ClientConnectorManager.BASIC_SERVICES_MGR_KEY);
    }


    //接收需要发送的实体
    @Override
    public void sendEntity(IEntity entity) {
        if (entity != null) {

            // 请求列表每次最多保存两个请求
            if (mEntityMsg != null && mEntityMsg.size() == 2) {
                mEntityMsg.remove(1);
            }
            mEntityMsg.add(entity);
            if (!isSending) {
                // 启动一个发送
                isSending = true;
                basicHandler.sendEmptyMessage(ClientConstants.REQUEST);
            }

        }
    }
}
public interface Observer {
    //更新接口
    public void update(IEntity state);
}
class ParseByteThread implements Runnable {

    private byte[] bufHeader = null;
    private byte[] readData = null;
    private PackageHeader header = null;
    private int headerLenth = PackageHeader.headerLenth;
    private int readDataLenth = 0;
    private int sLength = 0;// 添加到数组的长度
    private Handler fileParseHandler = null;
    private IClientConnect connect;

    public static final int RESPONSE_SUCCESS = 0x401;
    public static final int RESPONSE_FAIL = 0x402;
    public static final int RESPONSE_TIMEOUT = 0x403;
    /** 心跳超时  */
    public static final int REQUEST_HEARTBEAT_TIMEOUT = 0x410;
    /** 用户未登录  */
    public static final int  NOT_LOGIN= 0x411;

    public Handler getFileParseHandler() {
        return this.fileParseHandler;
    }

    public void sendParseByte(byte[] msg) {
        if (fileParseHandler != null) {
            Message msgData = Message.obtain();
            msgData.obj = msg;
            fileParseHandler.sendMessage(msgData);
        }
    }

    public ParseByteThread(IClientConnect connect) {
        readDataLenth = 0;
        sLength = 0;
        headerLenth = PackageHeader.headerLenth;
        bufHeader = new byte[PackageHeader.headerLenth];
        readData = null;
        header = new PackageHeader();
        this.connect = connect;
    }

    public void setFileParseHandler(Handler fileParseHandler) {
        this.fileParseHandler = fileParseHandler;
    }

    public void closeThread(){
        readDataLenth = 0;
        sLength = 0;
        headerLenth = PackageHeader.headerLenth;
        bufHeader = new byte[PackageHeader.headerLenth];
        readData = null;
        header = new PackageHeader();
    }
    
    @Override
    public void run() {
        Looper.prepare();
        fileParseHandler = new Handler() {
            public void handleMessage(Message data) {
                synchronized (data) {
                    handleDate(data);
                }
            }
        };
        Looper.loop();
    }

    private void handleDate(Message data){
        byte[] msg = (byte[]) data.obj;
        if (msg == null) {
            return;
        }
        int msgLength = msg.length;
        int useLength = 0;// 已经使用的长度
        while (msgLength - useLength > 0) {
            // 读取包头
            if (readDataLenth == 0) {
                if (msgLength - useLength >= headerLenth - sLength) {
                    // 读取了一个完整的包头
                    System.arraycopy(msg, 
                                     useLength, bufHeader, sLength, 
                                     headerLenth - sLength);
                    useLength += (headerLenth - sLength);
                    sLength = 0;
                    header.setPackageHeader(bufHeader);
                    if (header.getFunction() > 10000 
                        || header.getFunction() < 999) {
                        // 包头不符合,跳出循环 放弃整包
                        connect.callBack(null, null, "包头不符合",  RESPONSE_FAIL);
                        break;
                    }
                    if (header.getFunction() != 9999 && header.getFunction() != 9998) {
                        readDataLenth = (int) header.getInclusionLenth();
                        readData = null;
                        readData = new byte[readDataLenth];
                    } else if (header.getFunction() == 9999) {
                        // 发送心跳包
                        connect.callBack(header, readData, "",  RESPONSE_SUCCESS);
                    } else if (header.getFunction() == 9998) {
                        msgLength = 0;
                        useLength = 0;
                        connect.callBack(header, readData, "",  RESPONSE_SUCCESS);
                    }
                } else {
                    System.arraycopy(msg, useLength,
                                     bufHeader, sLength, msgLength - useLength);
                    sLength += (msgLength - useLength);
                    break;
                }
            }
            // 读取包体
            else {
                if (msgLength - useLength >= readDataLenth - sLength) {
                    // 读取了一个完整的包体
                    System.arraycopy(msg, useLength, 
                                     readData, sLength, readDataLenth - sLength);
                    useLength += (readDataLenth - sLength);
                    sLength = 0;
                    readDataLenth = 0;
                    bufHeader = null;
                    bufHeader = new byte[PackageHeader.headerLenth];
                    // 解析成功 返回数据
                    try {
                        connect.callBack(header, readData, "",  RESPONSE_SUCCESS);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                } else {
                    System.arraycopy(msg, useLength, 
                                     readData, sLength, msgLength - useLength);
                    sLength += (msgLength - useLength);
                    break;
                }
            }
        }
    }
}
public abstract class Subject {
    //用来保存注册的观察者对象
    private List<Observer> list = new ArrayList<Observer>();
    private Handler subHandler = new Handler(MeiApp.mContext.getMainLooper()) {
        public void handleMessage(Message msg) {
            if (list != null && list.size() > 0) {
                for (int i = 0; i < list.size(); i++) {
                    list.get(i).update((IEntity) msg.obj);
                }
            }
        }
    };
    //注册观察者对象
    public void attach(Observer observer) {
        if (list != null) {
            list.add(observer);
        }
    }
    //删除观察者对象
    public void detach(Observer observer) {
        if (list != null && list.size() > 0 && observer != null) {
            list.remove(observer);

        }
    }
    //删除观察者对象
    public void clear() {
        if (list != null && list.size() > 0) {
            list.clear();
        }
    }
    //通知所有注册的观察者对象
    public void nodifyObservers(final IEntity newState) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                Message msg = Message.obtain();
                msg.obj = newState;
                subHandler.sendMessage(msg);
            }
        }).start();
    }
}

代码见https://github.com/huanyi0723/NettyTest