添加链接
link之家
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

Socket通信比较常见的问题有如下几种:

1、设置收发超时;

2、正确的每一个bit的收发;

3、物理线路故障的保护;

4、始终能正常工作;

5、尽量少占系统资源;

n、……

而Socket编程有一个共性,尽管100个人可能会写出1000种实现,但做的事情却只有一种,就是: 通信

为此,通过学习dnsjava的通信代码,加上自己在一些项目中的实践,现在给出TCP通信的例子实现如下,希望能够给想偷懒的人一个简单的解决方案。

本方案在正常的局域网连接中测试过几百万次没什么问题。缺乏更艰苦的环境,所以如果使用这些代码发生任何风险的话……

(TcpChannel代码为Brian Wellington所做,原名为TCPClient,经本人稍作改动)

Java代码

package asynchronizedchannel;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.security.MessageDigest;
import java.util.Iterator;

public class Server
{

/**
* 服务端通信范例程序主函数
*
* @param args
* @throws IOException
*/
public static void main(String[] args) throws IOException
{
// Create the selector
final Selector selector = Selector.open();
final ServerSocketChannel server = ServerSocketChannel.open();
server.configureBlocking(false);
server.socket().bind(new InetSocketAddress("xx.xx.xx.xx", 5656), 5);
// Register both channels with selector
server.register(selector, SelectionKey.OP_ACCEPT);
new Thread(new Daemon(selector)).start();
}
}

class Daemon implements Runnable
{
private final Selector selector;

Daemon(Selector selector)
{
this.selector = selector;
}

public void run()
{
while (true) {
try {
// Wait for an event
selector.select();

// Get list of selection keys with pending events
Iterator<SelectionKey> it = selector.selectedKeys().iterator();

// Process each key
while (it.hasNext()) {
// Get the selection key
SelectionKey selKey = it.next();

// Remove it from the list to indicate that it is being processed
it.remove();

// Check if it's a connection request
if (selKey.isAcceptable()) {
// Get channel with connection request
ServerSocketChannel server = (ServerSocketChannel) selKey.channel();
// Accept the connection request.
// If serverSocketChannel is blocking, this method blocks.
// The returned channel is in blocking mode.
SocketChannel channel = server.accept();

// If serverSocketChannel is non-blocking, sChannel may be null
if (channel != null) {
// Use the socket channel to communicate with the client
new Thread(new ServerHandler(channel)).start();
} else {
System.out.println("---No Connection---");
// There were no pending connection requests; try again later.
// To be notified of connection requests,
}
}
}
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}

class ServerHandler implements Runnable
{
private static final long timeout = 30 * 1000; // 设置超时时间为30秒
private static int counter = 0;
private final TcpChannel channel;
private final MessageDigest md;

ServerHandler(SocketChannel channel) throws Exception
{
this.channel = new TcpChannel(channel, System.currentTimeMillis() + timeout, SelectionKey.OP_READ);
md = MessageDigest.getInstance("md5");
}

public void run()
{
try {
while (true) {
work();
synchronized (ServerHandler.class) {
if ((++counter & 65535) == 0) {
System.out.println(counter);
}
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
channel.cleanup();
}
}

private void work() throws IOException
{ // 模拟工作流程
byte[] cache = new byte[256], reply = new byte[5];
read(cache, reply);
}

private void read(byte[] cache, byte[] reply) throws IOException
{ // 从套接字读入数据
channel.recv(ByteBuffer.wrap(cache));
md.reset();
md.update(cache, 0, 240);
byte[] md5 = md.digest(); // 使用前240字节产生MD5校验码
if (!ExtArrays.partialEquals(md5, 0, cache, 240, 16)) { // 与后16字节比较
reply[0] = '?';
System.out.println("MISMATCH!");
} else {
reply[0] = '.';
}
channel.send(ByteBuffer.wrap(reply)); // 返回接收结果
}
}

final class ExtArrays
{
private ExtArrays()
{
}

public static boolean partialEquals(byte[] a, int offset_a, byte[] b, int offset_b, int len)
{ // 字节数组的部分比较
if (a == null || b == null) {
return false;
}
if (offset_a + len > a.length || offset_b + len > b.length) {
return false;
}
for (int i = offset_a, j = offset_b, k = len; k > 0; i++, j++, k--) {
if (a[i] != b[j]) {
return false;
}
}
return true;
}
}
package asynchronizedchannel;  import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.security.MessageDigest; import java.util.Iterator;  public class Server {      /**      * 服务端通信范例程序主函数      *       * @param args      * @throws IOException      */     public static void main(String[] args) throws IOException     {         // Create the selector         final Selector selector = Selector.open();         final ServerSocketChannel server = ServerSocketChannel.open();         server.configureBlocking(false);         server.socket().bind(new InetSocketAddress("xx.xx.xx.xx", 5656), 5);         // Register both channels with selector         server.register(selector, SelectionKey.OP_ACCEPT);         new Thread(new Daemon(selector)).start();     } }  class Daemon implements Runnable {     private final Selector selector;      Daemon(Selector selector)     {         this.selector = selector;     }      public void run()     {         while (true) {             try {                 // Wait for an event                 selector.select();                  // Get list of selection keys with pending events                 Iterator<SelectionKey> it = selector.selectedKeys().iterator();                  // Process each key                 while (it.hasNext()) {                     // Get the selection key                     SelectionKey selKey = it.next();                      // Remove it from the list to indicate that it is being processed                     it.remove();                      // Check if it's a connection request                     if (selKey.isAcceptable()) {                         // Get channel with connection request                         ServerSocketChannel server = (ServerSocketChannel) selKey.channel();                         // Accept the connection request.                         // If serverSocketChannel is blocking, this method blocks.                         // The returned channel is in blocking mode.                         SocketChannel channel = server.accept();                          // If serverSocketChannel is non-blocking, sChannel may be null                         if (channel != null) {                             // Use the socket channel to communicate with the client                             new Thread(new ServerHandler(channel)).start();                         } else {                             System.out.println("---No Connection---");                             // There were no pending connection requests; try again later.                             // To be notified of connection requests,                         }                     }                 }             } catch (Exception ex) {                 ex.printStackTrace();             }         }     } }  class ServerHandler implements Runnable {     private static final long timeout = 30 * 1000; // 设置超时时间为30秒     private static int counter = 0;     private final TcpChannel channel;     private final MessageDigest md;      ServerHandler(SocketChannel channel) throws Exception     {         this.channel = new TcpChannel(channel, System.currentTimeMillis() + timeout, SelectionKey.OP_READ);         md = MessageDigest.getInstance("md5");     }      public void run()     {         try {             while (true) {                 work();                 synchronized (ServerHandler.class) {                     if ((++counter & 65535) == 0) {                         System.out.println(counter);                     }                 }             }         } catch (Exception e) {             e.printStackTrace();         } finally {             channel.cleanup();         }     }      private void work() throws IOException     { // 模拟工作流程         byte[] cache = new byte[256], reply = new byte[5];         read(cache, reply);     }      private void read(byte[] cache, byte[] reply) throws IOException     { // 从套接字读入数据         channel.recv(ByteBuffer.wrap(cache));         md.reset();         md.update(cache, 0, 240);         byte[] md5 = md.digest(); // 使用前240字节产生MD5校验码         if (!ExtArrays.partialEquals(md5, 0, cache, 240, 16)) { // 与后16字节比较             reply[0] = '?';             System.out.println("MISMATCH!");         } else {             reply[0] = '.';         }         channel.send(ByteBuffer.wrap(reply)); // 返回接收结果     } }  final class ExtArrays {     private ExtArrays()     {     }      public static boolean partialEquals(byte[] a, int offset_a, byte[] b, int offset_b, int len)     { // 字节数组的部分比较         if (a == null || b == null) {             return false;         }         if (offset_a + len > a.length || offset_b + len > b.length) {             return false;         }         for (int i = offset_a, j = offset_b, k = len; k > 0; i++, j++, k--) {             if (a[i] != b[j]) {                 return false;             }         }         return true;     } }


Client端,代码演示:

Java代码

package asynchronizedchannel;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.security.DigestException;
import java.security.MessageDigest;
import java.util.Random;

public class Client
{
private static int id = 0;
/**
* 客户端通信范例程序主函数
*
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception
{
new Thread(new ClientHandler(id++)).start();
new Thread(new ClientHandler(id++)).start();
new Thread(new ClientHandler(id++)).start();
new Thread(new ClientHandler(id++)).start();
new Thread(new ClientHandler(id++)).start();
}

}

class ClientHandler implements Runnable
{
private static final long timeout = 30 * 1000; // 设置超时时间为30秒
private final TcpChannel channel;

private final int id;

private final MessageDigest md;
private final Random rand;

ClientHandler(int id) throws Exception
{
this.id = id;
channel = new TcpChannel(SocketChannel.open(), System.currentTimeMillis() + timeout, SelectionKey.OP_WRITE);
md = MessageDigest.getInstance("md5");
rand = new Random();
}

@Override
public void run()
{
try {
channel.connect(new InetSocketAddress("xx.xx.xx.xx", 5656));
int i = 0;
while (true) {
work();
if ((++i & 16383) == 0) {
System.out.println(String.format("client(%1$d): %2$d", id, i));
}
Thread.yield();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
channel.cleanup();
}
}

private void work() throws IOException, DigestException
{
byte[] cache = new byte[256], reply = new byte[5];
write(cache, reply);
}

private void write(byte[] cache, byte[] reply) throws DigestException, IOException
{
rand.nextBytes(cache); // 只用前面的240字节
md.reset();
md.update(cache, 0, 240);
md.digest(cache, 240, 16); // MD5校验码占后面16字节
ByteBuffer buffer = ByteBuffer.wrap(cache);
channel.send(buffer);
buffer = ByteBuffer.wrap(reply);
channel.recv(buffer);
if (reply[0] != '.') { // 若接收的结果不正确,可以考虑尝试再次发送
System.out.println("MISMATCH!");
}
}
}
package asynchronizedchannel;  import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; import java.security.DigestException; import java.security.MessageDigest; import java.util.Random;  public class Client {     private static int id = 0;     /**      * 客户端通信范例程序主函数      *       * @param args      * @throws Exception      */     public static void main(String[] args) throws Exception     {         new Thread(new ClientHandler(id++)).start();         new Thread(new ClientHandler(id++)).start();         new Thread(new ClientHandler(id++)).start();         new Thread(new ClientHandler(id++)).start();         new Thread(new ClientHandler(id++)).start();     }  }  class ClientHandler implements Runnable {     private static final long timeout = 30 * 1000; // 设置超时时间为30秒     private final TcpChannel channel;          private final int id;      private final MessageDigest md;     private final Random rand;      ClientHandler(int id) throws Exception     {         this.id = id;         channel = new TcpChannel(SocketChannel.open(), System.currentTimeMillis() + timeout, SelectionKey.OP_WRITE);         md = MessageDigest.getInstance("md5");         rand = new Random();     }      @Override     public void run()     {         try {             channel.connect(new InetSocketAddress("xx.xx.xx.xx", 5656));             int i = 0;             while (true) {                 work();                 if ((++i & 16383) == 0) {                     System.out.println(String.format("client(%1$d): %2$d", id, i));                 }                 Thread.yield();             }         } catch (Exception e) {             e.printStackTrace();         } finally {             channel.cleanup();         }     }      private void work() throws IOException, DigestException     {         byte[] cache = new byte[256], reply = new byte[5];         write(cache, reply);     }      private void write(byte[] cache, byte[] reply) throws DigestException, IOException     {         rand.nextBytes(cache); // 只用前面的240字节         md.reset();         md.update(cache, 0, 240);         md.digest(cache, 240, 16); // MD5校验码占后面16字节         ByteBuffer buffer = ByteBuffer.wrap(cache);         channel.send(buffer);         buffer = ByteBuffer.wrap(reply);         channel.recv(buffer);         if (reply[0] != '.') { // 若接收的结果不正确,可以考虑尝试再次发送             System.out.println("MISMATCH!");         }     } }


重点说明:

发多少,收多少。要么固定发送和接收的字节数,要么在发送的时候带有发送字节数的信息,接收的时候根据该信息接收完整然后再处理。