关于:Java网络IO编程(BIO、NIO、AIO)
跳到导航
跳到搜索
关于
本文主要讨论 Java 实现 network IO 的三种实现:BIO、NIO、AIO。
什么是 BIO、NIO、AIO?
- 见:“关于:5种I/O模型”
- BIO:“blocking IO”,阻塞IO;
- NIO:“non-blocking IO”,非阻塞IO;
- AIO:“asynchronous IO”,异步IO;
Java 的 NIO API
- 见:“关于NIO”(概念、组件、用法)
需要注意的是:
- Java NIO 是“Java New IO API”,而非“Non-Blocking IO”。二者不是一个概念。
- Java NIO 支持面向缓冲区的、基于通道的IO操作,以更加高效的方式进行文件的读写操作。
- 其 NIO API 既可以实现“Non-Blocking IO”(NIO)和“Asynchronous IO”(AIO),也可以实现“Blocking IO”(BIO)。
BIO(blocking IO)
传统的BIO编程
网络编程的基本模型是C/S模型,即两个进程间的通信:
- 服务端提供IP和监听端口,客户端通过连接操作想服务端监听的地址发起连接请求,通过三次握手连接,如果连接成功建立,双方就可以通过套接字进行通信。
- ServerSocket负责绑定IP地址,启动监听端口;Socket负责发起连接操作。连接成功后,双方通过输入和输出流进行同步阻塞式通信。
简单的描述一下BIO的服务端通信模型:
- 采用BIO通信模型的服务端,通常由一个独立的Acceptor线程负责监听客户端的连接,它接收到客户端连接请求之后为每个客户端创建一个新的线程进行链路处理没处理完成后,通过输出流返回应答给客户端,线程销毁。即典型的一请求一应答通宵模型。
特点:
- 缺乏弹性伸缩能力,服务端的线程个数和客户端并发访问数呈 1:1 的正比关系。
示例:
- Server:
package com.anxpp.io.calculator.bio; import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; /** * BIO服务端源码 * @author yangtao__anxpp.com * @version 1.0 */ public final class ServerNormal { // 默认的端口号 private static int DEFAULT_PORT = 12345; // 单例的ServerSocket private static ServerSocket server; //根据传入参数设置监听端口,如果没有参数调用以下方法并使用默认值 public static void start() throws IOException{ //使用默认值 start(DEFAULT_PORT); } // 这个方法不会被大量并发访问,不太需要考虑效率,直接进行方法同步就行了 public synchronized static void start(int port) throws IOException{ if(server != null) return; try{ // 通过构造函数创建ServerSocket // 如果端口合法且空闲,服务端就监听成功 server = new ServerSocket(port); System.out.println("服务器已启动,端口号:" + port); //通过无线循环监听客户端连接 //如果没有客户端接入,将阻塞在accept操作上。 while(true){ Socket socket = server.accept(); //当有新的客户端接入时,会执行下面的代码 //然后创建一个新的线程处理这条Socket链路 new Thread(new ServerHandler(socket)).start(); } }finally{ //一些必要的清理工作 if(server != null){ System.out.println("服务器已关闭。"); server.close(); server = null; } } } }
package com.anxpp.io.calculator.bio; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.Socket; import com.anxpp.io.utils.Calculator; /** * 客户端线程 * @author yangtao__anxpp.com * 用于处理一个客户端的Socket链路 */ public class ServerHandler implements Runnable{ private Socket socket; public ServerHandler(Socket socket) { this.socket = socket; } @Override public void run() { BufferedReader in = null; PrintWriter out = null; try{ in = new BufferedReader(new InputStreamReader(socket.getInputStream())); out = new PrintWriter(socket.getOutputStream(),true); String expression; String result; while(true){ //通过BufferedReader读取一行 //如果已经读到输入流尾部,返回null,退出循环 //如果得到非空值,就尝试计算结果并返回 if((expression = in.readLine())==null) break; System.out.println("服务器收到消息:" + expression); try{ result = Calculator.cal(expression).toString(); }catch(Exception e){ result = "计算错误:" + e.getMessage(); } out.println(result); } }catch(Exception e){ e.printStackTrace(); }finally{ //一些必要的清理工作 if(in != null){ try { in.close(); } catch (IOException e) { e.printStackTrace(); } in = null; } if(out != null){ out.close(); out = null; } if(socket != null){ try { socket.close(); } catch (IOException e) { e.printStackTrace(); } socket = null; } } } }
- Client:
package com.anxpp.io.calculator.bio; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.Socket; /** * 阻塞式I/O创建的客户端 * @author yangtao__anxpp.com * @version 1.0 */ public class Client { // 默认的端口号 private static int DEFAULT_SERVER_PORT = 12345; private static String DEFAULT_SERVER_IP = "127.0.0.1"; public static void send(String expression){ send(DEFAULT_SERVER_PORT,expression); } public static void send(int port,String expression){ System.out.println("算术表达式为:" + expression); Socket socket = null; BufferedReader in = null; PrintWriter out = null; try{ socket = new Socket(DEFAULT_SERVER_IP,port); in = new BufferedReader(new InputStreamReader(socket.getInputStream())); out = new PrintWriter(socket.getOutputStream(),true); out.println(expression); System.out.println("___结果为:" + in.readLine()); }catch(Exception e){ e.printStackTrace(); }finally{ // 必要的清理工作 if(in != null){ try { in.close(); } catch (IOException e) { e.printStackTrace(); } in = null; } if(out != null){ out.close(); out = null; } if(socket != null){ try { socket.close(); } catch (IOException e) { e.printStackTrace(); } socket = null; } } } }
- 测试代码:
package com.anxpp.io.calculator.bio; import java.io.IOException; import java.util.Random; /** * 测试方法 * @author yangtao__anxpp.com * @version 1.0 */ public class Test { // 测试主方法 public static void main(String[] args) throws InterruptedException { // 运行服务器 new Thread(new Runnable() { @Override public void run() { try { ServerBetter.start(); } catch (IOException e) { e.printStackTrace(); } } }).start(); // 避免客户端先于服务器启动前执行代码 Thread.sleep(100); // 运行客户端 char operators[] = {'+','-','*','/'}; Random random = new Random(System.currentTimeMillis()); new Thread(new Runnable() { @SuppressWarnings("static-access") @Override public void run() { while(true){ //随机产生算术表达式 String expression = random.nextInt(10)+""+operators[random.nextInt(4)]+(random.nextInt(10)+1); Client.send(expression); try { Thread.currentThread().sleep(random.nextInt(1000)); } catch (InterruptedException e) { e.printStackTrace(); } } } }).start(); } }
- 运行结果:
服务器已启动,端口号:12345 算术表达式为:4-2 服务器收到消息:4-2 ___结果为:2 算术表达式为:5-10 服务器收到消息:5-10 ___结果为:-5 算术表达式为:0-9 服务器收到消息:0-9 ___结果为:-9 算术表达式为:0+6 服务器收到消息:0+6 ___结果为:6 算术表达式为:1/6 服务器收到消息:1/6 ___结果为:0.16666666666666666 ...
伪异步I/O编程
为了改进以上一连接一线程的模型,我们可以使用线程池来管理这些线程,实现 1 个或多个线程处理 N 个客户端的模型(但是底层还是使用的同步阻塞I/O),通常被称为“伪异步I/O模型“。
- 只需要Server端新建线程交给线程池管理即可:【替换上节的“ServerNormal”】
package com.anxpp.io.calculator.bio; import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * BIO服务端源码__伪异步I/O * @author yangtao__anxpp.com * @version 1.0 */ public final class ServerBetter { // 默认的端口号 private static int DEFAULT_PORT = 12345; // 单例的ServerSocket private static ServerSocket server; // 线程池【懒汉式的单例】 private static ExecutorService executorService = Executors.newFixedThreadPool(60); // 根据传入参数设置监听端口,如果没有参数调用以下方法并使用默认值 public static void start() throws IOException{ //使用默认值 start(DEFAULT_PORT); } //这个方法不会被大量并发访问,不太需要考虑效率,直接进行方法同步就行了 public synchronized static void start(int port) throws IOException{ if(server != null) return; try{ // 通过构造函数创建ServerSocket // 如果端口合法且空闲,服务端就监听成功 server = new ServerSocket(port); System.out.println("服务器已启动,端口号:" + port); // 通过无线循环监听客户端连接 // 如果没有客户端接入,将阻塞在accept操作上。 while(true){ Socket socket = server.accept(); //当有新的客户端接入时,会执行下面的代码 //然后创建一个新的线程处理这条Socket链路 executorService.execute(new ServerHandler(socket)); } }finally{ //一些必要的清理工作 if(server != null){ System.out.println("服务器已关闭。"); server.close(); server = null; } } } }
如果使用CachedThreadPool线程池(不限制线程数量),其实除了能自动帮我们管理线程(复用),看起来也就像是 1:1 的 客户端:线程数 模型,
而使用FixedThreadPool就有效的控制了线程的最大数量,保证了系统有限的资源的控制,实现了 N:M 的伪异步I/O模型。
JavaNIO的BIO编程
NIO(non-blocking IO)
NIO 提供了,与传统BIO模型中的Socket和ServerSocket相对应的,SocketChannel和ServerSocketChannel两种不同的套接字通道实现。
- 新增的着两种通道都支持阻塞和非阻塞两种模式。
示例1:
- Server:
package com.anxpp.io.calculator.nio; public class Server { private static int DEFAULT_PORT = 12345; private static ServerHandle serverHandle; public static void start(){ start(DEFAULT_PORT); } public static synchronized void start(int port){ if(serverHandle!=null) serverHandle.stop(); serverHandle = new ServerHandle(port); new Thread(serverHandle,"Server").start(); } public static void main(String[] args){ start(); } }
package com.anxpp.io.calculator.nio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; import com.anxpp.io.utils.Calculator; /** * NIO服务端 * @author yangtao__anxpp.com * @version 1.0 */ public class ServerHandle implements Runnable{ private Selector selector; private ServerSocketChannel serverChannel; private volatile boolean started; /** * 构造方法 * @param port 指定要监听的端口号 */ public ServerHandle(int port) { try{ // 创建选择器 selector = Selector.open(); // 打开监听通道 serverChannel = ServerSocketChannel.open(); // 如果为 true,则此通道将被置于阻塞模式;如果为 false,则此通道将被置于非阻塞模式 serverChannel.configureBlocking(false);//开启非阻塞模式 // 绑定端口 backlog设为1024 serverChannel.socket().bind(new InetSocketAddress(port),1024); // 监听客户端连接请求 serverChannel.register(selector, SelectionKey.OP_ACCEPT); // 标记服务器已开启 started = true; System.out.println("服务器已启动,端口号:" + port); }catch(IOException e){ e.printStackTrace(); System.exit(1); } } public void stop(){ started = false; } @Override public void run() { // 循环遍历selector while(started){ try{ // 无论是否有读写事件发生,selector每隔1s被唤醒一次 selector.select(1000); // 阻塞,只有当至少一个注册的事件发生的时候才会继续. // selector.select(); Set<SelectionKey> keys = selector.selectedKeys(); Iterator<SelectionKey> it = keys.iterator(); SelectionKey key = null; while(it.hasNext()){ key = it.next(); it.remove(); // Selector不会自己从已选择键集中移除SelectionKey实例 try{ handleInput(key); }catch(Exception e){ if(key != null){ key.cancel(); if(key.channel() != null){ key.channel().close(); } } } } }catch(Throwable t){ t.printStackTrace(); } } // selector关闭后会自动释放里面管理的资源 if(selector != null) try{ selector.close(); }catch (Exception e) { e.printStackTrace(); } } private void handleInput(SelectionKey key) throws IOException{ if(key.isValid()){ // 处理新接入的请求消息 if(key.isAcceptable()){ ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); // 通过ServerSocketChannel的accept创建SocketChannel实例 // 完成该操作意味着完成TCP三次握手,TCP物理链路正式建立 SocketChannel sc = ssc.accept(); //设置为非阻塞的 sc.configureBlocking(false); //注册为读 sc.register(selector, SelectionKey.OP_READ); } // 读消息 if(key.isReadable()){ SocketChannel sc = (SocketChannel) key.channel(); // 创建ByteBuffer,并开辟一个1M的缓冲区 ByteBuffer buffer = ByteBuffer.allocate(1024); // 读取请求码流,返回读取到的字节数 int readBytes = sc.read(buffer); // 读取到字节,对字节进行编解码 if(readBytes>0){ // buffer从读转换为写:将缓冲区当前的limit设置为position=0,用于后续对缓冲区的读取操作 buffer.flip(); // 根据缓冲区可读字节数创建字节数组 byte[] bytes = new byte[buffer.remaining()]; // 将缓冲区可读字节数组复制到新建的数组中 buffer.get(bytes); String expression = new String(bytes,"UTF-8"); System.out.println("服务器收到消息:" + expression); // 处理数据 String result = null; try{ result = Calculator.cal(expression).toString(); }catch(Exception e){ result = "计算错误:" + e.getMessage(); } // 发送应答消息 doWrite(sc,result); } // 没有读取到字节 忽略 // else if(readBytes==0); // 链路已经关闭,释放资源 else if(readBytes<0){ key.cancel(); sc.close(); } } } } //异步发送应答消息 private void doWrite(SocketChannel channel,String response) throws IOException{ // 将消息编码为字节数组 byte[] bytes = response.getBytes(); // 根据数组容量创建ByteBuffer ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); // 将字节数组复制到缓冲区 writeBuffer.put(bytes); // flip操作 writeBuffer.flip(); // 发送缓冲区的字节数组 channel.write(writeBuffer); //****此处不含处理“写半包”的代码 } }
- SocketChannel 不能保证一次能吧需要发送的数据发送完,此时就会出现写半包的问题。我们需要注册写操作,不断轮询Selector将没有发送完的消息发送完毕,然后通过Buffer的hasRemain()方法判断消息是否发送完成。
- Client:
package com.anxpp.io.calculator.nio; public class Client { private static String DEFAULT_HOST = "127.0.0.1"; private static int DEFAULT_PORT = 12345; private static ClientHandle clientHandle; public static void start(){ start(DEFAULT_HOST,DEFAULT_PORT); } public static synchronized void start(String ip,int port){ if(clientHandle!=null) clientHandle.stop(); clientHandle = new ClientHandle(ip,port); new Thread(clientHandle,"Server").start(); } //向服务器发送消息 public static boolean sendMsg(String msg) throws Exception{ if(msg.equals("q")) return false; clientHandle.sendMsg(msg); return true; } public static void main(String[] args){ start(); } }
package com.anxpp.io.calculator.nio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; /** * NIO客户端 * @author yangtao__anxpp.com * @version 1.0 */ public class ClientHandle implements Runnable{ private String host; private int port; private Selector selector; private SocketChannel socketChannel; private volatile boolean started; public ClientHandle(String ip,int port) { this.host = ip; this.port = port; try{ // 创建选择器 selector = Selector.open(); // 打开监听通道 socketChannel = SocketChannel.open(); // 如果为 true,则此通道将被置于阻塞模式;如果为 false,则此通道将被置于非阻塞模式 socketChannel.configureBlocking(false);//开启非阻塞模式 started = true; }catch(IOException e){ e.printStackTrace(); System.exit(1); } } public void stop(){ started = false; } @Override public void run() { try{ doConnect(); }catch(IOException e){ e.printStackTrace(); System.exit(1); } // 循环遍历selector while(started){ try{ // 无论是否有读写事件发生,selector每隔1s被唤醒一次 selector.select(1000); // 阻塞,只有当至少一个注册的事件发生的时候才会继续. // selector.select(); Set<SelectionKey> keys = selector.selectedKeys(); Iterator<SelectionKey> it = keys.iterator(); SelectionKey key = null; while(it.hasNext()){ key = it.next(); it.remove(); try{ handleInput(key); }catch(Exception e){ if(key != null){ key.cancel(); if(key.channel() != null){ key.channel().close(); } } } } }catch(Exception e){ e.printStackTrace(); System.exit(1); } } // selector关闭后会自动释放里面管理的资源 if(selector != null) try{ selector.close(); }catch (Exception e) { e.printStackTrace(); } } private void handleInput(SelectionKey key) throws IOException{ if(key.isValid()){ SocketChannel sc = (SocketChannel) key.channel(); if(key.isConnectable()){ if(sc.finishConnect()); else System.exit(1); } // 读消息 if(key.isReadable()){ // 创建ByteBuffer,并开辟一个1M的缓冲区 ByteBuffer buffer = ByteBuffer.allocate(1024); // 读取请求码流,返回读取到的字节数 int readBytes = sc.read(buffer); // 读取到字节,对字节进行编解码 if(readBytes>0){ // 将缓冲区当前的limit设置为position=0,用于后续对缓冲区的读取操作 buffer.flip(); // 根据缓冲区可读字节数创建字节数组 byte[] bytes = new byte[buffer.remaining()]; // 将缓冲区可读字节数组复制到新建的数组中 buffer.get(bytes); String result = new String(bytes,"UTF-8"); System.out.println("客户端收到消息:" + result); } // 没有读取到字节 忽略 // else if(readBytes==0); // 链路已经关闭,释放资源 else if(readBytes<0){ key.cancel(); sc.close(); } } } } // 异步发送消息 private void doWrite(SocketChannel channel,String request) throws IOException{ // 将消息编码为字节数组 byte[] bytes = request.getBytes(); // 根据数组容量创建ByteBuffer ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); // 将字节数组复制到缓冲区 writeBuffer.put(bytes); // flip操作 writeBuffer.flip(); // 发送缓冲区的字节数组 channel.write(writeBuffer); //****此处不含处理“写半包”的代码 } private void doConnect() throws IOException{ if(socketChannel.connect(new InetSocketAddress(host,port))); else socketChannel.register(selector, SelectionKey.OP_CONNECT); } public void sendMsg(String msg) throws Exception{ socketChannel.register(selector, SelectionKey.OP_READ); doWrite(socketChannel, msg); } }
- 测试代码:
package com.anxpp.io.calculator.nio; import java.util.Scanner; /** * 测试方法 * @author yangtao__anxpp.com * @version 1.0 */ public class Test { //测试主方法 @SuppressWarnings("resource") public static void main(String[] args) throws Exception{ //运行服务器 Server.start(); //避免客户端先于服务器启动前执行代码 Thread.sleep(100); //运行客户端 Client.start(); while(Client.sendMsg(new Scanner(System.in).nextLine())); } }
- 运行结果:
示例2:
- 仅关键代码:
package com.openmind.io.nio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; /** * ${name} * * @author zhoujunwen * @date 2019-11-05 * @time 16:54 * @desc */ public class NIODemo { public static void main(String[] args) throws IOException { ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.bind(new InetSocketAddress("0.0.0.0", 8888), 50); serverSocketChannel.configureBlocking(false); Selector selector = Selector.open(); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); while (true) { selector.select(); Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); if (!key.isValid()) { continue; } if (key.isAcceptable()) { ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel(); SocketChannel clientChannel = serverChannel.accept(); clientChannel.configureBlocking(false); clientChannel.register(selector, SelectionKey.OP_READ); } else if (key.isReadable()) { ByteBuffer buffer = ByteBuffer.wrap(new byte[1024]); SocketChannel clientChannel = (SocketChannel) key.channel(); int read = clientChannel.read(buffer); if (read == -1) { key.cancel(); clientChannel.close(); } else { buffer.flip(); clientChannel.write(buffer); } } } iterator.remove(); } } }
AIO(asynchronous IO)
AIO 是 Java 1.7 之后引入的包,是 NIO 的升级版本(NIO 2.0),新增了提异步非阻塞的 IO 操作方式,所以人们叫它 AIO(Asynchronous IO),异步 IO 是基于事件和回调机制实现的,也就是应用操作之后会直接返回,不会堵塞在那里,当后台处理完成,操作系统会执行回调通知相应的线程进行后续的操作。
在Java 7中增加了asynchronous IO,具体结构和实现类框架如下:
- 提供了“异步文件通道”和“异步套接字通道”(“AsynchronousServerSocketChannel”、“AsynchronousSocketChannel”)的实现。
- 不需要过多的Selector对注册的通道进行轮询即可实现异步读写,从而简化了NIO的编程模型。
异步 Socket Channel是【被动执行】对象,我们不需要想 NIO 编程那样创建一个独立的IO线程来处理读写操作。对于 AsynchronousServerSocketChannel 和 AsynchronousSocketChannel,它们都由 JDK 底层的线程池负责回调并驱动读写操作。 正因为如此,基于 NIO2.0 新的异步非阻塞 Channel 进行编程比 NIO 编程更为简单。
异步通道提供两种方式获取操作结果。
- 通过 Java.util.concurrent.Future 类来表示异步操作的结果;
- 在执行异步操作的时候传入一个 Java.nio.channels.CompletionHandler 接口的实现类作为操作完成的回调。
示例1:【?????????????????????】
- Server:
package com.anxpp.io.calculator.aio.server; /** * AIO服务端 * @author yangtao__anxpp.com * @version 1.0 */ public class Server { private static int DEFAULT_PORT = 12345; private static AsyncServerHandler serverHandle; public volatile static long clientCount = 0; public static void start(){ start(DEFAULT_PORT); } public static synchronized void start(int port){ if(serverHandle!=null) return; serverHandle = new AsyncServerHandler(port); new Thread(serverHandle,"Server").start(); } public static void main(String[] args){ Server.start(); } }
package com.anxpp.io.calculator.aio.server; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.AsynchronousServerSocketChannel; import java.util.concurrent.CountDownLatch; public class AsyncServerHandler implements Runnable { public CountDownLatch latch; public AsynchronousServerSocketChannel channel; public AsyncServerHandler(int port) { try { // 创建服务端通道 channel = AsynchronousServerSocketChannel.open(); // 绑定端口 channel.bind(new InetSocketAddress(port)); System.out.println("服务器已启动,端口号:" + port); } catch (IOException e) { e.printStackTrace(); } } @Override public void run() { // CountDownLatch初始化 // 它的作用:在完成一组正在执行的操作之前,允许当前的现场一直阻塞 // 此处,让现场在此阻塞,防止服务端执行完成后退出 // 也可以使用while(true)+sleep // 生成环境就不需要担心这个问题,以为服务端是不会退出的 latch = new CountDownLatch(1); // 用于接收客户端的连接 channel.accept(this,new AcceptHandler()); try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } } }
package com.anxpp.io.calculator.aio.server; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; // 作为handler接收客户端连接 public class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, AsyncServerHandler> { @Override public void completed(AsynchronousSocketChannel channel,AsyncServerHandler serverHandler) { // 继续接受其他客户端的请求 Server.clientCount++; System.out.println("连接的客户端数:" + Server.clientCount); serverHandler.channel.accept(serverHandler, this); // 创建新的Buffer ByteBuffer buffer = ByteBuffer.allocate(1024); // 异步读(第三个参数为接收消息回调的业务Handler) channel.read(buffer, buffer, new ReadHandler(channel)); } @Override public void failed(Throwable exc, AsyncServerHandler serverHandler) { exc.printStackTrace(); serverHandler.latch.countDown(); } }
package com.anxpp.io.calculator.aio.server; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import com.anxpp.io.utils.Calculator; public class ReadHandler implements CompletionHandler<Integer, ByteBuffer> { // 用于读取半包消息和发送应答 private AsynchronousSocketChannel channel; public ReadHandler(AsynchronousSocketChannel channel) { this.channel = channel; } // 读取到消息后的处理 @Override public void completed(Integer result, ByteBuffer attachment) { // flip操作 attachment.flip(); // 根据 byte[] message = new byte[attachment.remaining()]; attachment.get(message); try { String expression = new String(message, "UTF-8"); System.out.println("服务器收到消息: " + expression); String calrResult = null; try{ calrResult = Calculator.cal(expression).toString(); }catch(Exception e){ calrResult = "计算错误:" + e.getMessage(); } // 向客户端发送消息 doWrite(calrResult); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } // 发送消息 private void doWrite(String result) { byte[] bytes = result.getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); writeBuffer.put(bytes); writeBuffer.flip(); // 异步写数据(参数与前面的read一样) channel.write(writeBuffer, writeBuffer,new CompletionHandler<Integer, ByteBuffer>() { @Override public void completed(Integer result, ByteBuffer buffer) { // 如果没有发送完,就继续发送直到完成 if (buffer.hasRemaining()) channel.write(buffer, buffer, this); else{ // 创建新的Buffer ByteBuffer readBuffer = ByteBuffer.allocate(1024); //异步读(第三个参数为接收消息回调的业务Handler) channel.read(readBuffer, readBuffer, new ReadHandler(channel)); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { channel.close(); } catch (IOException e) { } } }); } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { this.channel.close(); } catch (IOException e) { e.printStackTrace(); } } }
- 此处本应有一个 WriteHandler 的,我们在 ReadHandler 中以一个匿名内部类实现了它。
- Client:
package com.anxpp.io.calculator.aio.client; import java.util.Scanner; public class Client { private static String DEFAULT_HOST = "127.0.0.1"; private static int DEFAULT_PORT = 12345; private static AsyncClientHandler clientHandle; public static void start(){ start(DEFAULT_HOST,DEFAULT_PORT); } public static synchronized void start(String ip,int port){ if(clientHandle!=null) return; clientHandle = new AsyncClientHandler(ip,port); new Thread(clientHandle,"Client").start(); } // 向服务器发送消息 public static boolean sendMsg(String msg) throws Exception{ if(msg.equals("q")) return false; clientHandle.sendMsg(msg); return true; } @SuppressWarnings("resource") public static void main(String[] args) throws Exception{ Client.start(); System.out.println("请输入请求消息:"); Scanner scanner = new Scanner(System.in); while(Client.sendMsg(scanner.nextLine())); } }
package com.anxpp.io.calculator.aio.client; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.util.concurrent.CountDownLatch; public class AsyncClientHandler implements CompletionHandler<Void, AsyncClientHandler>, Runnable { private AsynchronousSocketChannel clientChannel; private String host; private int port; private CountDownLatch latch; public AsyncClientHandler(String host, int port) { this.host = host; this.port = port; try { // 创建异步的客户端通道 clientChannel = AsynchronousSocketChannel.open(); } catch (IOException e) { e.printStackTrace(); } } @Override public void run() { // 创建CountDownLatch等待 latch = new CountDownLatch(1); // 发起异步连接操作,回调参数就是这个类本身,如果连接成功会回调completed方法 clientChannel.connect(new InetSocketAddress(host, port), this, this); try { latch.await(); } catch (InterruptedException e1) { e1.printStackTrace(); } try { clientChannel.close(); } catch (IOException e) { e.printStackTrace(); } } // 连接服务器成功 // 意味着TCP三次握手完成 @Override public void completed(Void result, AsyncClientHandler attachment) { System.out.println("客户端成功连接到服务器..."); } // 连接服务器失败 @Override public void failed(Throwable exc, AsyncClientHandler attachment) { System.err.println("连接服务器失败..."); exc.printStackTrace(); try { clientChannel.close(); latch.countDown(); } catch (IOException e) { e.printStackTrace(); } } // 向服务器发送消息 public void sendMsg(String msg){ byte[] req = msg.getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(req.length); writeBuffer.put(req); writeBuffer.flip(); // 异步写 clientChannel.write(writeBuffer, writeBuffer,new WriteHandler(clientChannel, latch)); } }
package com.anxpp.io.calculator.aio.client; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.util.concurrent.CountDownLatch; public class WriteHandler implements CompletionHandler<Integer, ByteBuffer> { private AsynchronousSocketChannel clientChannel; private CountDownLatch latch; public WriteHandler(AsynchronousSocketChannel clientChannel,CountDownLatch latch) { this.clientChannel = clientChannel; this.latch = latch; } @Override public void completed(Integer result, ByteBuffer buffer) { // 完成全部数据的写入 if (buffer.hasRemaining()) { clientChannel.write(buffer, buffer, this); } else { // 读取数据 ByteBuffer readBuffer = ByteBuffer.allocate(1024); clientChannel.read(readBuffer,readBuffer,new ReadHandler(clientChannel, latch)); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { System.err.println("数据发送失败..."); try { clientChannel.close(); latch.countDown(); } catch (IOException e) { } } }
package com.anxpp.io.calculator.aio.client; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.util.concurrent.CountDownLatch; public class ReadHandler implements CompletionHandler<Integer, ByteBuffer> { private AsynchronousSocketChannel clientChannel; private CountDownLatch latch; public ReadHandler(AsynchronousSocketChannel clientChannel,CountDownLatch latch) { this.clientChannel = clientChannel; this.latch = latch; } @Override public void completed(Integer result,ByteBuffer buffer) { buffer.flip(); byte[] bytes = new byte[buffer.remaining()]; buffer.get(bytes); String body; try { body = new String(bytes,"UTF-8"); System.out.println("客户端收到结果:"+ body); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } @Override public void failed(Throwable exc,ByteBuffer attachment) { System.err.println("数据读取失败..."); try { clientChannel.close(); latch.countDown(); } catch (IOException e) { } } }
- 测试代码:
package com.anxpp.io.calculator.aio; import java.util.Scanner; import com.anxpp.io.calculator.aio.client.Client; import com.anxpp.io.calculator.aio.server.Server; /** * 测试方法 * @author yangtao__anxpp.com * @version 1.0 */ public class Test { //测试主方法 @SuppressWarnings("resource") public static void main(String[] args) throws Exception{ //运行服务器 Server.start(); //避免客户端先于服务器启动前执行代码 Thread.sleep(100); //运行客户端 Client.start(); System.out.println("请输入请求消息:"); Scanner scanner = new Scanner(System.in); while(Client.sendMsg(scanner.nextLine())); } }
- 运行结果:
服务器已启动,端口号:12345 请输入请求消息: 客户端成功连接到服务器... 连接的客户端数:1 123456+789+456 服务器收到消息: 123456+789+456 客户端收到结果:124701 9526*56 服务器收到消息: 9526*56 客户端收到结果:533456 ...
示例2:
- Server:
package com.openmind.io.aio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousChannelGroup; import java.nio.channels.AsynchronousServerSocketChannel; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.nio.charset.Charset; import java.util.Scanner; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** * AIO服务端 * * @author zhoujunwen * @date 2019-11-08 * @time 17:50 * @desc */ public class AIO_Server { static Charset charset = Charset.forName("UTF-8"); public static void main(String[] args) throws InterruptedException { int port = 7890; new Thread(new AioServer(port)).start(); TimeUnit.MINUTES.sleep(60); } static class AioServer implements Runnable { int port; AsynchronousChannelGroup group; AsynchronousServerSocketChannel serverSocketChannel; public AioServer(int port) { this.port = port; init(); } public void init() { try { // 创建处理线程池 group = AsynchronousChannelGroup.withCachedThreadPool(Executors.newCachedThreadPool(), 5); // 创建服务channel serverSocketChannel = AsynchronousServerSocketChannel.open(group); // 丙丁端口 serverSocketChannel.bind(new InetSocketAddress(port)); } catch (IOException e) { e.printStackTrace(); } } @Override public void run() { // 接收请求 // accept的第一个参数附件,第二个参数是收到请求后的接收处理器 // 接收处理器AcceptHandler泛型的第一个参数的处理结果,这里是AsynchronousSocketChannel,即接收到的请求的channel // 第二个参数是附件,这里是AioServer,即其实例 serverSocketChannel.accept(this, new AcceptHandler()); } } /** * 接收请求处理器 * completionHandler泛型的第一个参数的处理结果,这里是AsynchronousSocketChannel,即接收到的请求的channel, * 第二个参数是附件,这里是AioServer,即其实例 */ static class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, AioServer> { @Override public void completed(AsynchronousSocketChannel result, AioServer attachment) { // 继续接收下一个请求,构成循环调用 attachment.serverSocketChannel.accept(attachment, this); try { System.out.println("接收到连接请求:" + result.getRemoteAddress().toString()); // 定义数据读取缓存 ByteBuffer buffer = ByteBuffer.wrap(new byte[1024]); // 读取数据,并传入数据到达时的处理器 // read的第一个参数数据读取到目标缓存,第二个参数是附件,第三个传输的读取结束后的处理器 // 读取处理器泛型的第一个参数是读取的字节数,第二个参数输附件对象 result.read(buffer, buffer, new ReadHandler(result)); // 新开新城发送数据 new Thread(new WriteThread(result)).start(); } catch (Exception e) { e.printStackTrace(); } } @Override public void failed(Throwable exc, AioServer attachment) { } } /** * 读取数据处理器 * completionHandler第一个参数是读取的字节数,第二个参数输附件对象 */ static class ReadHandler implements CompletionHandler<Integer, ByteBuffer> { AsynchronousSocketChannel socketChannel; public ReadHandler(AsynchronousSocketChannel socketChannel) { this.socketChannel = socketChannel; } @Override public void completed(Integer result, ByteBuffer attachment) { if (result == -1) { attachment.clear(); try { socketChannel.close(); } catch (IOException e) { e.printStackTrace(); } return; } attachment.flip(); String readMsg = charset.decode(attachment).toString(); System.out.println("服务端接收到的数据:" + readMsg); attachment.compact(); // 继续接收数据,构成循环 socketChannel.read(attachment, attachment, this); } @Override public void failed(Throwable exc, ByteBuffer attachment) { } } /** * 写出数据处理器 */ static class WriteHandler implements CompletionHandler<Integer, ByteBuffer> { AsynchronousSocketChannel socketChannel; Scanner scanner; public WriteHandler(AsynchronousSocketChannel socketChannel, Scanner scanner) { this.socketChannel = socketChannel; this.scanner = scanner; } @Override public void completed(Integer result, ByteBuffer attachment) { attachment.compact(); String msg = scanner.nextLine(); System.out.println("服务端即将发送的数据:" + msg); attachment.put(charset.encode(msg)); attachment.flip(); // 继续写数据,构成循环 socketChannel.write(attachment, attachment, this); } @Override public void failed(Throwable exc, ByteBuffer attachment) { } } static class WriteThread implements Runnable { private AsynchronousSocketChannel channel; public WriteThread(AsynchronousSocketChannel channel) { this.channel = channel; } @Override public void run() { // 第一缓冲区 ByteBuffer buffer = ByteBuffer.allocate(1024); Scanner scanner = new Scanner(System.in); String msg = scanner.nextLine(); System.out.println("服务端输入数据:" + msg); buffer.put(charset.encode(msg + System.lineSeparator())); buffer.flip(); // 写入数据,并有写数据时的处理器 // write的第一个参数是数据写入的缓存,第二个参数是附件,第三个参数写结束后的处理器 // 读取处理器泛型的第一个参数是写入的字节数,第二个是附件类型 channel.write(buffer, buffer, new WriteHandler(channel, scanner)); } } }
- Client:
package com.openmind.io.aio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousChannelGroup; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.nio.charset.Charset; import java.util.Scanner; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** * AIO客户端 * * @author zhoujunwen * @date 2019-11-11 * @time 09:31 * @desc */ public class AIO_Client { static Charset charset = Charset.forName("UTF-8"); public static void main(String[] args) throws InterruptedException { int port = 7890; String host = "127.0.0.1"; // 启动客户端 new Thread(new AIOClient(port, host)).start(); TimeUnit.MINUTES.sleep(100); } static class AIOClient implements Runnable { int port; String host; AsynchronousChannelGroup group; AsynchronousSocketChannel channel; InetSocketAddress address; public AIOClient(int port, String host) { this.port = port; this.host = host; // 初始化 init(); } private void init() { try { // 创建处理线程组 group = AsynchronousChannelGroup.withCachedThreadPool(Executors.newCachedThreadPool(), 5); // 创建客户端channel channel = AsynchronousSocketChannel.open(group); address = new InetSocketAddress(host, port); } catch (IOException e) { e.printStackTrace(); } } @Override public void run() { // 接收请求,并传入收到请求后的处理器 // connect 方法的第一二个参数是目标地址,第二个参数是附件对象,第三个参数是连接处理器 // 连接处理器的泛型的第一个参数为空(即Void),第二个参数为附件 channel.connect(address, channel, new ConnectHandler()); } } /** * 连接处理器 */ static class ConnectHandler implements CompletionHandler<Void, AsynchronousSocketChannel> { @Override public void completed(Void result, AsynchronousSocketChannel attachment) { try { System.out.println("connect server: " + attachment.getRemoteAddress().toString()); // 定义数据读取缓存 ByteBuffer buffer = ByteBuffer.allocate(1024); // 读取数据,并传入到数据到达时的处理器 attachment.read(buffer, buffer, new ReadHandler(attachment)); // 新开线程,发送数据 new WriteThread(attachment).start(); } catch (Exception e) { e.printStackTrace(); } } @Override public void failed(Throwable exc, AsynchronousSocketChannel attachment) { } } /** * 读处理器 */ static class ReadHandler implements CompletionHandler<Integer, ByteBuffer> { AsynchronousSocketChannel channel; public ReadHandler(AsynchronousSocketChannel channel) { this.channel = channel; } @Override public void completed(Integer result, ByteBuffer attachment) { attachment.flip(); String readMsg = charset.decode(attachment).toString(); System.out.println("client receive msg: " + readMsg); attachment.compact(); // 继续接收数据,构成循坏 channel.read(attachment, attachment, this); } @Override public void failed(Throwable exc, ByteBuffer attachment) { } } /** * 写处理器 */ static class WriteHandler implements CompletionHandler<Integer, ByteBuffer> { AsynchronousSocketChannel channel; Scanner scanner; public WriteHandler(AsynchronousSocketChannel channel, Scanner scanner) { this.channel = channel; this.scanner = scanner; } @Override public void completed(Integer result, ByteBuffer attachment) { attachment.compact(); System.out.print("client input data: "); String msg = scanner.nextLine(); System.out.println("clinet will send msg:" + msg); attachment.put(charset.encode(msg)); attachment.flip(); // 继续写入数据,构成循环 channel.write(attachment, attachment, this); } @Override public void failed(Throwable exc, ByteBuffer attachment) { } } /** * 写处理独立创建线程 */ static class WriteThread extends Thread { private AsynchronousSocketChannel channel; public WriteThread(AsynchronousSocketChannel channel) { this.channel = channel; } @Override public void run() { ByteBuffer buffer = ByteBuffer.allocate(1024); Scanner scanner = new Scanner(System.in); System.out.print("client input data:"); String msg = scanner.nextLine(); System.out.println("client send msg:" + msg); buffer.put(charset.encode(msg)); buffer.flip(); channel.write(buffer, buffer, new WriteHandler(channel, scanner)); } } }
参考
- CSDN:Java 网络IO编程总结(BIO、NIO、AIO均含完整实例代码)
- 附:上文中服务端使用到的用于计算的工具类:
package com.anxpp.utils; import javax.script.ScriptEngine; import javax.script.ScriptEngineManager; import javax.script.ScriptException; public final class Calculator { private final static ScriptEngine jse = new ScriptEngineManager().getEngineByName("JavaScript"); public static Object cal(String expression) throws ScriptException{ return jse.eval(expression); } }
- 阿里云开发者社区:JAVA中BIO、NIO、AIO的分析理解