socket 相关内容前面现已讲解了,还写到了一些 Service 内容,上一篇博客还将央求类和回复类规划好了,接下来我们需求规划的就是通讯模块的 socket 收发了,这儿写在 Service 里面。之所以写在 Service 里面,是因为我这和设备的联接是一个长期的使命,接收和发送线程存在时刻特别久,应该写在 Service 里面。
待处理问题
前面现已将 Service 内联接 socket 和接收数据的部分大致写了,剩下的问题如下:
-
Service 的联接
-
参数的设置
-
央求类的发送
-
回复类、失常的传递
下面我们一个一个的处理上面的问题。
Service 的联接
Service 的联接很简单,四大组件嘛,关键是我们 Binder 里面该规划哪些函数,代码如下:
private ConnectBinder mBinder = new ConnectBinder();
class ConnectBinder extends Binder {
//设置参数
void setUpSocketConfigure(Bundle data) {
...
}
//发送央求
void sendMessage(BaseRequest request) {
...
}
//回来 Service 方针,用于双向交互
ConnectService getService() {
return ConnectService.this;
}
}
@Override
public IBinder onBind(Intent intent) {
return mBinder;
}
设置参数和发送央求是我们必须的操作,很好了解,而回来 Service 方针是用于主线程和 Service 的双向通讯,需求拿到 Service 设置我们的回调接口。(ps. 原本我用的广播将数据传递出去,但是觉得不可典雅,运用广播的话可以不用获取 Service 方针)
参数的设置
参数的设置也简单,上面 setUpSocketConfigure 通过 Bundle 现已把数据传递进来了,我们设置到全局变量就行了:
private void getConfigure(Bundle data) {
ip = data.getString("IP", "192.168.1.110");
port = data.getInt("PORT", 2050);
wait = data.getInt("WAIT", 5000);
}
央求类的发送
接下来是央求类的发送,也就是我们这个 socket 通讯模块的发送部分,这儿需求注意下我们的发送央求要在异步线程处理,而且央求需求串行发送,socket 只有一个嘛。
private Socket mSocket;
//接收线程
private Thread rxThread;
//单线程线程池
private ExecutorService mExecutor = Executors.newSingleThreadExecutor();
class ConnectBinder extends Binder {
...
void sendMessage(BaseRequest request) {
//将央求封装到 Runnable 中
Runnable runnable = wrapperRunnable(request);
//运用单线程线程池发送
mExecutor.execute(runnable);
}
...
}
//封装到 Runnable 中
private Runnable wrapperRunnable(final BaseRequest request) {
return new Runnable() {
@Override
public void run() {
try {
doSent(request.data);
} catch (IOException e) {
e.printStackTrace();
//将 Error 传递出去
onPostMessageListener.onPostError(request.requestMsgType);
}
}
};
}
//发送信息
public void doSent(byte[] data) throws IOException {
if (mSocket == null) {
setUpConnection();
}
//写入内容
writeDataToStream(data);
}
//写入内容
private void writeDataToStream(byte[] data) throws IOException {
try {
OutputStream os = mSocket.getOutputStream();
os.write(data);
os.flush();
} catch(IOException e) {
setUpConnection();
//再发一次
reSendData(data);
}
}
//再发一次
private void reSendData(byte[] data) throws IOException {
if (mSocket != null) {
try {
OutputStream os = mSocket.getOutputStream();
os.write(data);
os.flush();
} catch(IOException e) {
e.printStackTrace();
throw e;
}
}
}
这儿代码多了点,但是不难,大致就是把央求封装成 Runnable,通过单线程线程池发送出去,保证了串行发送,而且发送部分加了许多查验代码,终究发送失败还会再发一次。关于发送失败传递问题,下面讲解。
回复类、失常的传递
因为我们的代码写在 Service 里面,如何将消息(回复及失常)传递出去是一个问题,下面介绍几种办法。
本地广播
广播是安卓供应的一种通讯办法,而本地广播可以保证广播的消息在本应用中传递,传递的过程中运用 Bundle 携带数据,通过 Reciever 在需求运用的当地,比对消息类型,将数据拿出来便可以,而且广播现已完成了从异步线程到 UI 线程的转化,用起来仍是挺便利的。
我原本就是运用本地广播传递数据的,后边不用了。一个问题是广播功用并不是很好,另一个问题是拿数据不是很便利,我希望的是通过回调去拿到数据,下面看我的改进办法。
回调传递消息
先看代码,这儿触及了一个回调接口
private OnPostMessageListener onPostMessageListener;
public void setOnPostMessageListener(OnPostMessageListener onPostMessageListener) {
this.onPostMessageListener = onPostMessageListener;
}
public interface OnPostMessageListener {
void onPostError(int typeId);
void onPostResponse(BaseResponse response);
}
下面是设置回调接口,需求从 Service 外部设置
class ConnectBinder extends Binder {
...
ConnectService getService() {
return ConnectService.this;
}
}
//在外部设置
private ServiceConnection connection = new ServiceConnection() {
@Override
public void onServiceConnected(ComponentName componentName, IBinder iBinder) {
mBinder = (ConnectService.ConnectBinder) iBinder;
mBinder.getService().setOnPostMessageListener(new ConnectService.OnPostMessageListener() {
@Override
public void onPostError(int typeId) {
...
}
@Override
public void onPostResponse(BaseResponse response) {
...
}
});
}
@Override
public void onServiceDisconnected(ComponentName componentName) {
}
};
失常的传递
private Runnable wrapperRunnable(final BaseRequest request) {
return new Runnable() {
@Override
public void run() {
try {
doSent(request.data);
} catch (IOException e) {
e.printStackTrace();
onPostMessageListener.onPostError(request.requestMsgType);
}
}
};
}
回复的传递
//接受线程
private Thread initReceiveThread() {
return new Thread(new Runnable() {
@SuppressWarnings("ResultOfMethodCallIgnored")
@Override
public void run() {
try {
InputStream is;
while (mSocket != null) {
is = mSocket.getInputStream();
byte[] ackbuf = new byte[4]; //消息号
byte[] serialbuf = new byte[4]; //流水号
byte[] lenbuf = new byte[4]; //长度
byte[] datasbuf; //数据
int ackrs = is.read(ackbuf, 0, 4);
if (ackrs < 0) {
Thread.sleep(1000);
continue;
}
is.read(serialbuf, 0, 4);
is.read(lenbuf, 0, 4);
int len = ByteBuffer.wrap(lenbuf, 0, 4).getInt(); //长度
datasbuf = new byte[len];
is.read(datasbuf, 0, len);
BaseResponse response = new BaseResponse();
response.responseMsgType = ByteBuffer.wrap(ackbuf, 0, 4).getInt();
response.serialNumber = ByteBuffer.wrap(serialbuf, 0, 4).getInt();
response.data = datasbuf;
onPostMessageListener.onPostResponse(response);
}
}catch(IOException e) {
e.printStackTrace();
} catch(Exception e) {
e.printStackTrace();
}
}
});
}
这儿的回复类消息处理时按字节读取的,和业务有关,读者可以按需求批改。
掉线重连问题
这儿还有一个掉线重连问题,就是 socket 断了,但是我发送消息的时分,希望测验从头联接,联接上了再见消息发送出去。
实际上,我们上面的代码在发送消息之前现已检查 socket 了,掉线了会从头联接。但是有的设备联接需求我们发送登录央求,这个就有点耦合性滋味了,
思来想去,联接央求不就是一串字节数据嘛,在设置参数里面传进来,后边直接用不就可以嘛,具体操作看下面代码:
private byte[] connect = new byte[0];
private void getConfigure(Bundle data) {
ip = data.getString("IP", "192.168.1.110");
port = data.getInt("PORT", 2050);
wait = data.getInt("WAIT", 5000);
connect = data.getByteArray("ConnectData");
}
//发送信息
public void doSent(byte[] data) throws IOException {
if (mSocket == null) {
setUpConnection();
writeDataToStream(connect);
}
writeDataToStream(data);
}
private void writeDataToStream(byte[] data) throws IOException {
try {
OutputStream os = mSocket.getOutputStream();
os.write(data);
os.flush();
} catch(IOException e) {
setUpConnection();
writeDataToStream(connect);
reSendData(data);
}
}
上面代码就是将登录消息以字节数组的方式传递进来,在发送和重发消息的初始化 socket 之后发送一次。
这儿虽然还有一点耦合性滋味,但是假如你没有登录央求的需求,字节数组长度为零,底子不会对整个流程形成什么影响。
完好代码
public class ConnectService extends Service {
/** Socket相关,IP及端口号 **/
private String ip = "192.168.1.110";
private int port = 2050;
private int wait = 5000;
private byte[] connect = new byte[0];
private Socket mSocket;
private Thread rxThread;
private ExecutorService mExecutor = Executors.newSingleThreadExecutor();
private ConnectBinder mBinder = new ConnectBinder();
class ConnectBinder extends Binder {
void setUpSocketConfigure(Bundle data) {
getConfigure(data);
}
void sendMessage(BaseRequest request) {
Runnable runnable = wrapperRunnable(request);
mExecutor.execute(runnable);
}
ConnectService getService() {
return ConnectService.this;
}
}
@Override
public IBinder onBind(Intent intent) {
return mBinder;
}
private void getConfigure(Bundle data) {
ip = data.getString("IP", "192.168.1.110");
port = data.getInt("PORT", 2050);
wait = data.getInt("WAIT", 5000);
connect = data.getByteArray("ConnectData");
}
public void setUpConnection() throws IOException {
stopExistSocket();
createSocket();
startRxThread();
}
private void stopExistSocket() throws IOException {
if (mSocket != null) {
try {
mSocket.close();
mSocket = null;
} catch (IOException e) {
e.printStackTrace();
throw e;
}
}
}
private void createSocket() throws IOException {
SocketAddress sa = new InetSocketAddress(ip, port);
mSocket = new Socket();
try{
//设置信息及超时时刻
mSocket.connect(sa, wait);
boolean isConnect = mSocket.isConnected();
mSocket.setKeepAlive(isConnect);
} catch(IOException e) {
mSocket = null;
throw e;
}
}
private void startRxThread() {
if (rxThread == null || !rxThread.isAlive()) {
rxThread = initReceiveThread();
rxThread.start();
}
}
//发送信息
public void doSent(byte[] data) throws IOException {
if (mSocket == null) {
setUpConnection();
writeDataToStream(connect);
}
writeDataToStream(data);
}
private void writeDataToStream(byte[] data) throws IOException {
try {
OutputStream os = mSocket.getOutputStream();
os.write(data);
os.flush();
} catch(IOException e) {
setUpConnection();
writeDataToStream(connect);
reSendData(data);
}
}
private void reSendData(byte[] data) throws IOException {
if (mSocket != null) {
try {
OutputStream os = mSocket.getOutputStream();
os.write(data);
os.flush();
} catch(IOException e) {
e.printStackTrace();
throw e;
}
}
}
private Runnable wrapperRunnable(final BaseRequest request) {
return new Runnable() {
@Override
public void run() {
try {
doSent(request.data);
} catch (IOException e) {
e.printStackTrace();
onPostMessageListener.onPostError(request.requestMsgType);
}
}
};
}
//接受线程
private Thread initReceiveThread() {
return new Thread(new Runnable() {
@SuppressWarnings("ResultOfMethodCallIgnored")
@Override
public void run() {
try {
InputStream is;
while (mSocket != null) {
is = mSocket.getInputStream();
byte[] ackbuf = new byte[4]; //消息号
byte[] serialbuf = new byte[4]; //流水号
byte[] lenbuf = new byte[4]; //长度
byte[] datasbuf; //数据
int ackrs = is.read(ackbuf, 0, 4);
if (ackrs < 0) {
Thread.sleep(1000);
continue;
}
is.read(serialbuf, 0, 4);
is.read(lenbuf, 0, 4);
int len = ByteBuffer.wrap(lenbuf, 0, 4).getInt(); //长度
datasbuf = new byte[len];
is.read(datasbuf, 0, len);
BaseResponse response = new BaseResponse();
response.responseMsgType = ByteBuffer.wrap(ackbuf, 0, 4).getInt();
response.serialNumber = ByteBuffer.wrap(serialbuf, 0, 4).getInt();
response.data = datasbuf;
onPostMessageListener.onPostResponse(response);
}
}catch(IOException e) {
e.printStackTrace();
} catch(Exception e) {
e.printStackTrace();
}
}
});
}
private OnPostMessageListener onPostMessageListener;
public void setOnPostMessageListener(OnPostMessageListener onPostMessageListener) {
this.onPostMessageListener = onPostMessageListener;
}
public interface OnPostMessageListener {
void onPostError(int typeId);
void onPostResponse(BaseResponse response);
}
}
结语
好了,到这儿 socket 接收和发送的 Service 就规划完了,我觉得这篇博客写的还挺清楚的,希望对读者有帮忙。
end