博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
第一章 java nio三大组件与使用姿势
阅读量:6841 次
发布时间:2019-06-26

本文共 8834 字,大约阅读时间需要 29 分钟。

本案例来源于《netty权威指南》

一、三大组件

  • Selector:多路复用器。轮询注册在其上的Channel,当发现某个或者多个Channel处于“就绪状态”后(accept接收连接事件、connect连接完成事件、read读事件、write写事件),从阻塞状态返回就绪的Channel的SelectionKey集合,之后进行IO操作。
  • Channel:封装了socket。
    • ServerSocketChannel:封装了ServerSocket,用于accept客户端连接请求;
    • SocketChannel:一对SocketChannel组成一条虚电路,进行读写通信
  • Buffer:用于存取数据,最主要的是ByteBuffer
    • position:下一个将被操作的字节位置
    • limit:在写模式下表示可以进行写的字节数,在读模式下表示可以进行读的字节数
    • capacity:Buffer的大小

 

二、服务端代码

1、服务端启动类

1 public class Server {2     public static void main(String[] args) throws IOException {3         new Thread(new ServerHandler(8080), "server-1").start();4     }5 }

创建一个任务ServerHandler,然后创建一条线程,启动执行该任务。

2、逻辑处理类

1 public class ServerHandler implements Runnable { 2     private Selector selector; 3     private ServerSocketChannel ssChannel; 4  5     public ServerHandler(int port) { 6         try { 7             //等价于 Selector selector = SelectorProvider.provider().openSelector(); 8             selector = Selector.open(); 9             //等价于 SelectorProvider.provider().openServerSocketChannel()10             ssChannel = ServerSocketChannel.open();11             ssChannel.configureBlocking(false);12             ssChannel.bind(new InetSocketAddress(port), 1024);13             ssChannel.register(selector, SelectionKey.OP_ACCEPT);14         } catch (IOException e) {15             e.printStackTrace();16             System.exit(1);17         }18     }19 20     public void run() {21         for (; ; ) {22             try {23                 selector.select();24                 Iterator
it = selector.selectedKeys().iterator();25 while (it.hasNext()) {26 SelectionKey key = it.next();27 it.remove();28 handleInput(key);29 }30 } catch (Throwable t) {31 t.printStackTrace();32 }33 }34 35 }36 37 private void handleInput(SelectionKey key) throws IOException {38 if (key.isValid()) {39 // 处理新接入的请求消息40 if (key.isAcceptable()) {41 // Accept the new connection42 ServerSocketChannel ssc = (ServerSocketChannel) key.channel();43 SocketChannel sc = ssc.accept();44 sc.configureBlocking(false);45 // Add the new connection to the selector46 sc.register(selector, SelectionKey.OP_READ);47 }48 if (key.isReadable()) {49 // Read the data50 SocketChannel sc = (SocketChannel) key.channel();51 ByteBuffer readBuffer = ByteBuffer.allocate(1024);52 int readBytes = sc.read(readBuffer);53 if (readBytes > 0) {54 readBuffer.flip();55 byte[] bytes = new byte[readBuffer.remaining()];56 readBuffer.get(bytes);57 String body = new String(bytes, "UTF-8");58 System.out.println("The time server receive order : "59 + body);60 String currentTime = "QUERY TIME ORDER"61 .equalsIgnoreCase(body) ? new java.util.Date(62 System.currentTimeMillis()).toString()63 : "BAD ORDER";64 doWrite(sc, currentTime);65 } else if (readBytes < 0) {66 // 对端链路关闭67 key.cancel();68 sc.close();69 } else70 ; // 读到0字节,忽略71 }72 }73 }74 75 private void doWrite(SocketChannel channel, String response)76 throws IOException {77 if (response != null && response.trim().length() > 0) {78 byte[] bytes = response.getBytes();79 ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);80 writeBuffer.put(bytes);81 writeBuffer.flip();82 channel.write(writeBuffer);83 }84 }85 }

步骤:

1、创建一个Selector和ServerSocketChannel实例

2、配置ServerSocketChannel实例为非阻塞

3、ServerSocketChannel实例bind端口

4、将ServerSocketChannel实例注册到selector上,监听OP_ACCEPT事件

下面的任务在Server创建的新的线程中执行,不影响主线程执行其他逻辑

5、之后进入死循环

5.1、使用select.select()阻塞等待就绪事件(这里是等待OP_ACCEPT事件),一旦有有就绪事件到达,立即向下执行

5.2、使用selector.selectedKeys()获取已经就绪的SelectionKey(即OP_ACCEPT/OP_CONECT/OP_READ/OP_WRITE)集合,之后循环遍历

5.3、从迭代器删除该SelectionKey,防止下一次再被遍历到

5.4、如果SelectionKey==OP_ACCEPT,则通过ServerSocketChannel.accept()创建SocketChannel,该SocketChannel是后续真正的与客户端的SocketChannel进行通信的实体

5.5、配置新创建的SocketChannel实例为非阻塞,然后将该SocketChannel实例注册到selector实例上,监听OP_READ事件

5.6、等客户端发出请求数据时,此处监听到SelectionKey==OP_READ,则创建ByteBuffer实例,将SocketChannel中的数据读取到ByteBuffer中,然后再创建ByteBuffer将信息写回到SocketChannel(也就是说数据的读写一定要通过Buffer)

 

三、客户端代码

1、客户端启动类

1 public class Client {2     public static void main(String[] args) {3         new Thread(new ClientHandler("127.0.0.1", 8080), "client-1").start();4     }5 }

创建一个任务ClientHandler,然后创建一条线程,启动执行该任务。

2、逻辑处理类

1 public class ClientHandler implements Runnable { 2     private String host; 3     private int port; 4  5     private Selector selector; 6     private SocketChannel socketChannel; 7  8     public ClientHandler(String host, int port) { 9         this.host = host;10         this.port = port;11         try {12             selector = Selector.open();13             socketChannel = SocketChannel.open();14             socketChannel.configureBlocking(false);15         } catch (IOException e) {16             e.printStackTrace();17             System.exit(1);18         }19     }20 21     public void run() {22         try {23             doConnect();24         } catch (IOException e) {25             e.printStackTrace();26             System.exit(1);27         }28         while (true) {29             try {30                 selector.select();31                 Iterator
it = selector.selectedKeys().iterator();32 while (it.hasNext()) {33 SelectionKey key = it.next();34 it.remove();35 handleInput(key);36 }37 } catch (Exception e) {38 e.printStackTrace();39 System.exit(1);40 }41 }42 }43 44 private void handleInput(SelectionKey key) throws IOException {45 if (key.isValid()) {46 // 判断是否连接成功47 SocketChannel sc = (SocketChannel) key.channel();48 if (key.isConnectable()) {49 if (sc.finishConnect()) {50 sc.register(selector, SelectionKey.OP_READ);51 doWrite(sc);52 } else53 System.exit(1);// 连接失败,进程退出54 } else if (key.isReadable()) {55 ByteBuffer readBuffer = ByteBuffer.allocate(1024);56 int readBytes = sc.read(readBuffer);57 if (readBytes > 0) {58 readBuffer.flip();59 byte[] bytes = new byte[readBuffer.remaining()];60 readBuffer.get(bytes);61 String body = new String(bytes, "UTF-8");62 System.out.println("Now is : " + body);63 } else if (readBytes < 0) {64 // 对端链路关闭65 key.cancel();66 sc.close();67 } else68 ; // 读到0字节,忽略69 }70 }71 72 }73 74 private void doConnect() throws IOException {75 // 如果直接连接成功,则注册到多路复用器上,发送请求消息,读应答76 if (socketChannel.connect(new InetSocketAddress(host, port))) {77 socketChannel.register(selector, SelectionKey.OP_READ);78 doWrite(socketChannel);79 } else80 socketChannel.register(selector, SelectionKey.OP_CONNECT);81 }82 83 private void doWrite(SocketChannel sc) throws IOException {84 byte[] req = "QUERY TIME ORDER".getBytes();85 ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);86 writeBuffer.put(req);87 writeBuffer.flip();88 sc.write(writeBuffer);89 if (!writeBuffer.hasRemaining())90 System.out.println("Send order 2 server succeed.");91 }92 }

步骤:

1、创建一个Selector和SocketChannel实例

2、配置SocketChannel实例为非阻塞

下面的任务在Cilent创建的新的线程中执行,不影响主线程执行其他逻辑

3、SocketChannel.connect连接到server端,如果连接没有马上成功,将该SocketChannel实例注册到selector上,监听OP_CONNECT事件;如果连接成功,将该SocketChannel实例注册到selector上,监听OP_READ事件,之后写数据给server端

4、之后进入死循环

4.1、使用select.select()阻塞等待就绪事件,一旦有有就绪事件到达,立即向下执行

4.2、使用selector.selectedKeys()获取已经就绪的SelectionKey(即OP_ACCEPT/OP_CONECT/OP_READ/OP_WRITE)集合,之后循环遍历

4.3、从迭代器删除该SelectionKey,防止下一次再被遍历到

4.4、如果SelectionKey==OP_CONNECT,将该SocketChannel实例注册到selector上,监听OP_READ事件,之后写数据给server端;如果监听到SelectionKey==OP_READ,则创建ByteBuffer实例,将SocketChannel中的数据读取到ByteBuffer中

 

转载地址:http://kdzul.baihongyu.com/

你可能感兴趣的文章
中国境内商业邮件年发送量将破“千亿”大关
查看>>
brew 安装mysql
查看>>
你听说过PHP 的面向方面编程吗?
查看>>
MYSQL开启慢查询日志实施
查看>>
&lt;备份&gt;LVM总结
查看>>
工作日志的利器:迷人的MARKDOWN
查看>>
Solaris下挂载光盘
查看>>
说说苏宁易购
查看>>
Hibernate 和 Mybatis 两者相比的优缺点
查看>>
负载均衡器部署方式和工作原理
查看>>
MBProgressHUD使用
查看>>
例说DNS递归/迭代名称解析原理
查看>>
逐步展示
查看>>
台湾一老师演讲词值得中国人深思
查看>>
看2014视频三国杀
查看>>
微信“封杀”网易云音乐,真的错了吗?
查看>>
支付宝架构师:从工程师到架构师的成长之路
查看>>
android系统如何自适应屏幕大小
查看>>
十周第二次课(5月28日)
查看>>
推荐系统之信息茧房问题
查看>>