本案例来源于《netty权威指南》
一、三大组件
- Selector:多路复用器。轮询注册在其上的Channel,当发现某个或者多个Channel处于“就绪状态”后(accept接收连接事件、connect连接完成事件、read读事件、write写事件),从阻塞状态返回就绪的Channel的SelectionKey集合,之后进行IO操作。
- Channel:封装了socket。
- ServerSocketChannel:封装了ServerSocket,用于accept客户端连接请求;
- SocketChannel:一对SocketChannel组成一条虚电路,进行读写通信
- Buffer:用于存取数据,最主要的是ByteBuffer
- position:下一个将被操作的字节位置
- limit:在写模式下表示可以进行写的字节数,在读模式下表示可以进行读的字节数
- capacity:Buffer的大小
二、服务端代码
1、服务端启动类
1 public class Server {2 public static void main(String[] args) throws IOException {3 new Thread(new ServerHandler(8080), "server-1").start();4 }5 }
创建一个任务ServerHandler,然后创建一条线程,启动执行该任务。
2、逻辑处理类
1 public class ServerHandler implements Runnable { 2 private Selector selector; 3 private ServerSocketChannel ssChannel; 4 5 public ServerHandler(int port) { 6 try { 7 //等价于 Selector selector = SelectorProvider.provider().openSelector(); 8 selector = Selector.open(); 9 //等价于 SelectorProvider.provider().openServerSocketChannel()10 ssChannel = ServerSocketChannel.open();11 ssChannel.configureBlocking(false);12 ssChannel.bind(new InetSocketAddress(port), 1024);13 ssChannel.register(selector, SelectionKey.OP_ACCEPT);14 } catch (IOException e) {15 e.printStackTrace();16 System.exit(1);17 }18 }19 20 public void run() {21 for (; ; ) {22 try {23 selector.select();24 Iteratorit = selector.selectedKeys().iterator();25 while (it.hasNext()) {26 SelectionKey key = it.next();27 it.remove();28 handleInput(key);29 }30 } catch (Throwable t) {31 t.printStackTrace();32 }33 }34 35 }36 37 private void handleInput(SelectionKey key) throws IOException {38 if (key.isValid()) {39 // 处理新接入的请求消息40 if (key.isAcceptable()) {41 // Accept the new connection42 ServerSocketChannel ssc = (ServerSocketChannel) key.channel();43 SocketChannel sc = ssc.accept();44 sc.configureBlocking(false);45 // Add the new connection to the selector46 sc.register(selector, SelectionKey.OP_READ);47 }48 if (key.isReadable()) {49 // Read the data50 SocketChannel sc = (SocketChannel) key.channel();51 ByteBuffer readBuffer = ByteBuffer.allocate(1024);52 int readBytes = sc.read(readBuffer);53 if (readBytes > 0) {54 readBuffer.flip();55 byte[] bytes = new byte[readBuffer.remaining()];56 readBuffer.get(bytes);57 String body = new String(bytes, "UTF-8");58 System.out.println("The time server receive order : "59 + body);60 String currentTime = "QUERY TIME ORDER"61 .equalsIgnoreCase(body) ? new java.util.Date(62 System.currentTimeMillis()).toString()63 : "BAD ORDER";64 doWrite(sc, currentTime);65 } else if (readBytes < 0) {66 // 对端链路关闭67 key.cancel();68 sc.close();69 } else70 ; // 读到0字节,忽略71 }72 }73 }74 75 private void doWrite(SocketChannel channel, String response)76 throws IOException {77 if (response != null && response.trim().length() > 0) {78 byte[] bytes = response.getBytes();79 ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);80 writeBuffer.put(bytes);81 writeBuffer.flip();82 channel.write(writeBuffer);83 }84 }85 }
步骤:
1、创建一个Selector和ServerSocketChannel实例
2、配置ServerSocketChannel实例为非阻塞
3、ServerSocketChannel实例bind端口
4、将ServerSocketChannel实例注册到selector上,监听OP_ACCEPT事件
下面的任务在Server创建的新的线程中执行,不影响主线程执行其他逻辑
5、之后进入死循环
5.1、使用select.select()阻塞等待就绪事件(这里是等待OP_ACCEPT事件),一旦有有就绪事件到达,立即向下执行
5.2、使用selector.selectedKeys()获取已经就绪的SelectionKey(即OP_ACCEPT/OP_CONECT/OP_READ/OP_WRITE)集合,之后循环遍历
5.3、从迭代器删除该SelectionKey,防止下一次再被遍历到
5.4、如果SelectionKey==OP_ACCEPT,则通过ServerSocketChannel.accept()创建SocketChannel,该SocketChannel是后续真正的与客户端的SocketChannel进行通信的实体
5.5、配置新创建的SocketChannel实例为非阻塞,然后将该SocketChannel实例注册到selector实例上,监听OP_READ事件
5.6、等客户端发出请求数据时,此处监听到SelectionKey==OP_READ,则创建ByteBuffer实例,将SocketChannel中的数据读取到ByteBuffer中,然后再创建ByteBuffer将信息写回到SocketChannel(也就是说数据的读写一定要通过Buffer)
三、客户端代码
1、客户端启动类
1 public class Client {2 public static void main(String[] args) {3 new Thread(new ClientHandler("127.0.0.1", 8080), "client-1").start();4 }5 }
创建一个任务ClientHandler,然后创建一条线程,启动执行该任务。
2、逻辑处理类
1 public class ClientHandler implements Runnable { 2 private String host; 3 private int port; 4 5 private Selector selector; 6 private SocketChannel socketChannel; 7 8 public ClientHandler(String host, int port) { 9 this.host = host;10 this.port = port;11 try {12 selector = Selector.open();13 socketChannel = SocketChannel.open();14 socketChannel.configureBlocking(false);15 } catch (IOException e) {16 e.printStackTrace();17 System.exit(1);18 }19 }20 21 public void run() {22 try {23 doConnect();24 } catch (IOException e) {25 e.printStackTrace();26 System.exit(1);27 }28 while (true) {29 try {30 selector.select();31 Iteratorit = selector.selectedKeys().iterator();32 while (it.hasNext()) {33 SelectionKey key = it.next();34 it.remove();35 handleInput(key);36 }37 } catch (Exception e) {38 e.printStackTrace();39 System.exit(1);40 }41 }42 }43 44 private void handleInput(SelectionKey key) throws IOException {45 if (key.isValid()) {46 // 判断是否连接成功47 SocketChannel sc = (SocketChannel) key.channel();48 if (key.isConnectable()) {49 if (sc.finishConnect()) {50 sc.register(selector, SelectionKey.OP_READ);51 doWrite(sc);52 } else53 System.exit(1);// 连接失败,进程退出54 } else if (key.isReadable()) {55 ByteBuffer readBuffer = ByteBuffer.allocate(1024);56 int readBytes = sc.read(readBuffer);57 if (readBytes > 0) {58 readBuffer.flip();59 byte[] bytes = new byte[readBuffer.remaining()];60 readBuffer.get(bytes);61 String body = new String(bytes, "UTF-8");62 System.out.println("Now is : " + body);63 } else if (readBytes < 0) {64 // 对端链路关闭65 key.cancel();66 sc.close();67 } else68 ; // 读到0字节,忽略69 }70 }71 72 }73 74 private void doConnect() throws IOException {75 // 如果直接连接成功,则注册到多路复用器上,发送请求消息,读应答76 if (socketChannel.connect(new InetSocketAddress(host, port))) {77 socketChannel.register(selector, SelectionKey.OP_READ);78 doWrite(socketChannel);79 } else80 socketChannel.register(selector, SelectionKey.OP_CONNECT);81 }82 83 private void doWrite(SocketChannel sc) throws IOException {84 byte[] req = "QUERY TIME ORDER".getBytes();85 ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);86 writeBuffer.put(req);87 writeBuffer.flip();88 sc.write(writeBuffer);89 if (!writeBuffer.hasRemaining())90 System.out.println("Send order 2 server succeed.");91 }92 }
步骤:
1、创建一个Selector和SocketChannel实例
2、配置SocketChannel实例为非阻塞
下面的任务在Cilent创建的新的线程中执行,不影响主线程执行其他逻辑
3、SocketChannel.connect连接到server端,如果连接没有马上成功,将该SocketChannel实例注册到selector上,监听OP_CONNECT事件;如果连接成功,将该SocketChannel实例注册到selector上,监听OP_READ事件,之后写数据给server端
4、之后进入死循环
4.1、使用select.select()阻塞等待就绪事件,一旦有有就绪事件到达,立即向下执行
4.2、使用selector.selectedKeys()获取已经就绪的SelectionKey(即OP_ACCEPT/OP_CONECT/OP_READ/OP_WRITE)集合,之后循环遍历
4.3、从迭代器删除该SelectionKey,防止下一次再被遍历到
4.4、如果SelectionKey==OP_CONNECT,将该SocketChannel实例注册到selector上,监听OP_READ事件,之后写数据给server端;如果监听到SelectionKey==OP_READ,则创建ByteBuffer实例,将SocketChannel中的数据读取到ByteBuffer中