关于:Java网络IO编程(BIO、NIO、AIO)

来自Wikioe
跳到导航 跳到搜索


关于

本文主要讨论 Java 实现 network IO 的三种实现:BIO、NIO、AIO。

什么是 BIO、NIO、AIO?


  1. BIO:“blocking IO”,阻塞IO;
  2. NIO:“non-blocking IO”,非阻塞IO;
  3. AIO:“asynchronous IO”,异步IO;

Java 的 NIO API

  • 见:“关于NIO”(概念、组件、用法)


需要注意的是:

  • Java NIO 是“Java New IO API”,而非“Non-Blocking IO”。二者不是一个概念。
    • Java NIO 支持面向缓冲区的、基于通道的IO操作,以更加高效的方式进行文件的读写操作。
  • 其 NIO API 既可以实现“Non-Blocking IO”(NIO)和“Asynchronous IO”(AIO),也可以实现“Blocking IO”(BIO)。

BIO(blocking IO)

传统的BIO编程

网络编程的基本模型是C/S模型,即两个进程间的通信

服务端提供IP和监听端口,客户端通过连接操作想服务端监听的地址发起连接请求,通过三次握手连接,如果连接成功建立,双方就可以通过套接字进行通信。
ServerSocket负责绑定IP地址,启动监听端口;Socket负责发起连接操作。连接成功后,双方通过输入和输出流进行同步阻塞式通信。 


简单的描述一下BIO的服务端通信模型:

采用BIO通信模型的服务端,通常由一个独立的Acceptor线程负责监听客户端的连接,它接收到客户端连接请求之后为每个客户端创建一个新的线程进行链路处理没处理完成后,通过输出流返回应答给客户端,线程销毁。即典型的一请求一应答通宵模型。


特点:

  • 缺乏弹性伸缩能力,服务端的线程个数和客户端并发访问数呈 1:1 的正比关系。


示例:

  • Server:
    package com.anxpp.io.calculator.bio;
    import java.io.IOException;
    import java.net.ServerSocket;
    import java.net.Socket;
    /**
     * BIO服务端源码
     * @author yangtao__anxpp.com
     * @version 1.0
     */
    public final class ServerNormal {
    	// 默认的端口号
    	private static int DEFAULT_PORT = 12345;
    	// 单例的ServerSocket
    	private static ServerSocket server;
        
    	//根据传入参数设置监听端口,如果没有参数调用以下方法并使用默认值
    	public static void start() throws IOException{
    		//使用默认值
    		start(DEFAULT_PORT);
    	}
        
    	// 这个方法不会被大量并发访问,不太需要考虑效率,直接进行方法同步就行了
    	public synchronized static void start(int port) throws IOException{
    		if(server != null) return;
    		try{
    			// 通过构造函数创建ServerSocket
    			// 如果端口合法且空闲,服务端就监听成功
    			server = new ServerSocket(port);
    			System.out.println("服务器已启动,端口号:" + port);
                
    			//通过无线循环监听客户端连接
    			//如果没有客户端接入,将阻塞在accept操作上。
    			while(true){
    				Socket socket = server.accept();
    				//当有新的客户端接入时,会执行下面的代码
    				//然后创建一个新的线程处理这条Socket链路
    				new Thread(new ServerHandler(socket)).start();
    			}
    		}finally{
    			//一些必要的清理工作
    			if(server != null){
    				System.out.println("服务器已关闭。");
    				server.close();
    				server = null;
    			}
    		}
    	}
    }
    
    package com.anxpp.io.calculator.bio;
    import java.io.BufferedReader;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.io.PrintWriter;
    import java.net.Socket;
     
    import com.anxpp.io.utils.Calculator;
    /**
     * 客户端线程
     * @author yangtao__anxpp.com
     * 用于处理一个客户端的Socket链路
     */
    public class ServerHandler implements Runnable{
    	private Socket socket;
    	public ServerHandler(Socket socket) {
    		this.socket = socket;
    	}
        
    	@Override
    	public void run() {
    		BufferedReader in = null;
    		PrintWriter out = null;
    		try{
    			in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
    			out = new PrintWriter(socket.getOutputStream(),true);
    			String expression;
    			String result;
                
    			while(true){
    				//通过BufferedReader读取一行
    				//如果已经读到输入流尾部,返回null,退出循环
    				//如果得到非空值,就尝试计算结果并返回
    				if((expression = in.readLine())==null) break;
    				System.out.println("服务器收到消息:" + expression);
    				try{
    					result = Calculator.cal(expression).toString();
    				}catch(Exception e){
    					result = "计算错误:" + e.getMessage();
    				}
    				out.println(result);
    			}
    		}catch(Exception e){
    			e.printStackTrace();
    		}finally{
    			//一些必要的清理工作
    			if(in != null){
    				try {
    					in.close();
    				} catch (IOException e) {
    					e.printStackTrace();
    				}
    				in = null;
    			}
    			if(out != null){
    				out.close();
    				out = null;
    			}
    			if(socket != null){
    				try {
    					socket.close();
    				} catch (IOException e) {
    					e.printStackTrace();
    				}
    				socket = null;
    			}
    		}
    	}
    }
    
  • Client:
    package com.anxpp.io.calculator.bio;
    import java.io.BufferedReader;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.io.PrintWriter;
    import java.net.Socket;
    /**
     * 阻塞式I/O创建的客户端
     * @author yangtao__anxpp.com
     * @version 1.0
     */
    public class Client {
    	// 默认的端口号
    	private static int DEFAULT_SERVER_PORT = 12345;
    	private static String DEFAULT_SERVER_IP = "127.0.0.1";
        
    	public static void send(String expression){
    		send(DEFAULT_SERVER_PORT,expression);
    	}
        
    	public static void send(int port,String expression){
    		System.out.println("算术表达式为:" + expression);
    		Socket socket = null;
    		BufferedReader in = null;
    		PrintWriter out = null;
    		try{
    			socket = new Socket(DEFAULT_SERVER_IP,port);
    			in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
    			out = new PrintWriter(socket.getOutputStream(),true);
    			out.println(expression);
    			System.out.println("___结果为:" + in.readLine());
    		}catch(Exception e){
    			e.printStackTrace();
    		}finally{
    			// 必要的清理工作
    			if(in != null){
    				try {
    					in.close();
    				} catch (IOException e) {
    					e.printStackTrace();
    				}
    				in = null;
    			}
    			if(out != null){
    				out.close();
    				out = null;
    			}
    			if(socket != null){
    				try {
    					socket.close();
    				} catch (IOException e) {
    					e.printStackTrace();
    				}
    				socket = null;
    			}
    		}
    	}
    }
    
  • 测试代码:
    package com.anxpp.io.calculator.bio;
    import java.io.IOException;
    import java.util.Random;
    /**
     * 测试方法
     * @author yangtao__anxpp.com
     * @version 1.0
     */
    public class Test {
    	// 测试主方法
    	public static void main(String[] args) throws InterruptedException {
    		// 运行服务器
    		new Thread(new Runnable() {
    			@Override
    			public void run() {
    				try {
    					ServerBetter.start();
    				} catch (IOException e) {
    					e.printStackTrace();
    				}
    			}
    		}).start();
            
    		// 避免客户端先于服务器启动前执行代码
    		Thread.sleep(100);
            
    		// 运行客户端 
    		char operators[] = {'+','-','*','/'};
    		Random random = new Random(System.currentTimeMillis());
    		new Thread(new Runnable() {
    			@SuppressWarnings("static-access")
    			@Override
    			public void run() {
    				while(true){
    					//随机产生算术表达式
    					String expression = random.nextInt(10)+""+operators[random.nextInt(4)]+(random.nextInt(10)+1);
    					Client.send(expression);
    					try {
    						Thread.currentThread().sleep(random.nextInt(1000));
    					} catch (InterruptedException e) {
    						e.printStackTrace();
    					}
    				}
    			}
    		}).start();
    	}
    }
    
  • 运行结果:
    服务器已启动端口号12345
    算术表达式为4-2
    服务器收到消息4-2
    ___结果为2
    算术表达式为5-10
    服务器收到消息5-10
    ___结果为-5
    算术表达式为0-9
    服务器收到消息0-9
    ___结果为-9
    算术表达式为0+6
    服务器收到消息0+6
    ___结果为6
    算术表达式为1/6
    服务器收到消息1/6
    ___结果为0.16666666666666666
    ...
    

伪异步I/O编程

为了改进以上一连接一线程的模型,我们可以使用线程池来管理这些线程,实现 1 个或多个线程处理 N 个客户端的模型(但是底层还是使用的同步阻塞I/O),通常被称为“伪异步I/O模型“。


  • 只需要Server端新建线程交给线程池管理即可:【替换上节的“ServerNormal”】
    package com.anxpp.io.calculator.bio;
    import java.io.IOException;
    import java.net.ServerSocket;
    import java.net.Socket;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    /**
     * BIO服务端源码__伪异步I/O
     * @author yangtao__anxpp.com
     * @version 1.0
     */
    public final class ServerBetter {
    	// 默认的端口号
    	private static int DEFAULT_PORT = 12345;
    	// 单例的ServerSocket
    	private static ServerSocket server;
    	// 线程池【懒汉式的单例】
    	private static ExecutorService executorService = Executors.newFixedThreadPool(60);
        
    	// 根据传入参数设置监听端口,如果没有参数调用以下方法并使用默认值
    	public static void start() throws IOException{
    		//使用默认值
    		start(DEFAULT_PORT);
    	}
        
    	//这个方法不会被大量并发访问,不太需要考虑效率,直接进行方法同步就行了
    	public synchronized static void start(int port) throws IOException{
    		if(server != null) return;
    		try{
    			// 通过构造函数创建ServerSocket
    			// 如果端口合法且空闲,服务端就监听成功
    			server = new ServerSocket(port);
    			System.out.println("服务器已启动,端口号:" + port);
                
    			// 通过无线循环监听客户端连接
    			// 如果没有客户端接入,将阻塞在accept操作上。
    			while(true){
    				Socket socket = server.accept();
    				//当有新的客户端接入时,会执行下面的代码
    				//然后创建一个新的线程处理这条Socket链路
    				executorService.execute(new ServerHandler(socket));
    			}
    		}finally{
    			//一些必要的清理工作
    			if(server != null){
    				System.out.println("服务器已关闭。");
    				server.close();
    				server = null;
    			}
    		}
    	}
    }
    

如果使用CachedThreadPool线程池(不限制线程数量),其实除了能自动帮我们管理线程(复用),看起来也就像是 1:1 的 客户端:线程数 模型,

而使用FixedThreadPool就有效的控制了线程的最大数量,保证了系统有限的资源的控制,实现了 N:M 的伪异步I/O模型。

JavaNIO的BIO编程

NIO(non-blocking IO)

NIO 提供了,与传统BIO模型中的Socket和ServerSocket相对应的,SocketChannelServerSocketChannel两种不同的套接字通道实现。

  • 新增的着两种通道都支持阻塞和非阻塞两种模式


示例:

  • Server:
    package com.anxpp.io.calculator.nio;
    public class Server {
    	private static int DEFAULT_PORT = 12345;
    	private static ServerHandle serverHandle;
        
    	public static void start(){
    		start(DEFAULT_PORT);
    	}
        
    	public static synchronized void start(int port){
    		if(serverHandle!=null)
    			serverHandle.stop();
    		serverHandle = new ServerHandle(port);
    		new Thread(serverHandle,"Server").start();
    	}
        
    	public static void main(String[] args){
    		start();
    	}
    }
    
    package com.anxpp.io.calculator.nio;
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.util.Iterator;
    import java.util.Set;
     
    import com.anxpp.io.utils.Calculator;
    /**
     * NIO服务端
     * @author yangtao__anxpp.com
     * @version 1.0
     */
    public class ServerHandle implements Runnable{
    	private Selector selector;
    	private ServerSocketChannel serverChannel;
    	private volatile boolean started;
        
    	/**
    	 * 构造方法
    	 * @param port 指定要监听的端口号
    	 */
    	public ServerHandle(int port) {
    		try{
    			// 创建选择器
    			selector = Selector.open();
    			// 打开监听通道
    			serverChannel = ServerSocketChannel.open();
    			// 如果为 true,则此通道将被置于阻塞模式;如果为 false,则此通道将被置于非阻塞模式
    			serverChannel.configureBlocking(false);//开启非阻塞模式
    			// 绑定端口 backlog设为1024
    			serverChannel.socket().bind(new InetSocketAddress(port),1024);
    			// 监听客户端连接请求
    			serverChannel.register(selector, SelectionKey.OP_ACCEPT);
    			// 标记服务器已开启
    			started = true;
    			System.out.println("服务器已启动,端口号:" + port);
    		}catch(IOException e){
    			e.printStackTrace();
    			System.exit(1);
    		}
    	}
        
    	public void stop(){
    		started = false;
    	}
        
    	@Override
    	public void run() {
    		// 循环遍历selector
    		while(started){
    			try{
    				// 无论是否有读写事件发生,selector每隔1s被唤醒一次
    				selector.select(1000);
    				// 阻塞,只有当至少一个注册的事件发生的时候才会继续.
    				// selector.select();
                    
    				Set<SelectionKey> keys = selector.selectedKeys();
    				Iterator<SelectionKey> it = keys.iterator();
    				SelectionKey key = null;
    				while(it.hasNext()){
    					key = it.next();
    					it.remove();  // Selector不会自己从已选择键集中移除SelectionKey实例
    					try{
    						handleInput(key);
    					}catch(Exception e){
    						if(key != null){
    							key.cancel();
    							if(key.channel() != null){
    								key.channel().close();
    							}
    						}
    					}
    				}
    			}catch(Throwable t){
    				t.printStackTrace();
    			}
    		}
    		// selector关闭后会自动释放里面管理的资源
    		if(selector != null)
    			try{
    				selector.close();
    			}catch (Exception e) {
    				e.printStackTrace();
    			}
    	}
        
    	private void handleInput(SelectionKey key) throws IOException{
    		if(key.isValid()){
    			// 处理新接入的请求消息
    			if(key.isAcceptable()){
    				ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
    				// 通过ServerSocketChannel的accept创建SocketChannel实例
    				// 完成该操作意味着完成TCP三次握手,TCP物理链路正式建立
    				SocketChannel sc = ssc.accept();
    				//设置为非阻塞的
    				sc.configureBlocking(false);
    				//注册为读
    				sc.register(selector, SelectionKey.OP_READ);
    			}
    			// 读消息
    			if(key.isReadable()){
    				SocketChannel sc = (SocketChannel) key.channel();
    				// 创建ByteBuffer,并开辟一个1M的缓冲区
    				ByteBuffer buffer = ByteBuffer.allocate(1024);
    				// 读取请求码流,返回读取到的字节数
    				int readBytes = sc.read(buffer);
    				// 读取到字节,对字节进行编解码
    				if(readBytes>0){
    					// buffer从读转换为写:将缓冲区当前的limit设置为position=0,用于后续对缓冲区的读取操作
    					buffer.flip();
    					// 根据缓冲区可读字节数创建字节数组
    					byte[] bytes = new byte[buffer.remaining()];
    					// 将缓冲区可读字节数组复制到新建的数组中
    					buffer.get(bytes);
    					String expression = new String(bytes,"UTF-8");
    					System.out.println("服务器收到消息:" + expression);
    					// 处理数据
    					String result = null;
    					try{
    						result = Calculator.cal(expression).toString();
    					}catch(Exception e){
    						result = "计算错误:" + e.getMessage();
    					}
    					// 发送应答消息
    					doWrite(sc,result);
    				}
    				// 没有读取到字节 忽略
    //				else if(readBytes==0);
    				// 链路已经关闭,释放资源
    				else if(readBytes<0){
    					key.cancel();
    					sc.close();
    				}
    			}
    		}
    	}
        
    	//异步发送应答消息
    	private void doWrite(SocketChannel channel,String response) throws IOException{
    		// 将消息编码为字节数组
    		byte[] bytes = response.getBytes();
    		// 根据数组容量创建ByteBuffer
    		ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
    		// 将字节数组复制到缓冲区
    		writeBuffer.put(bytes);
    		// flip操作
    		writeBuffer.flip();
    		// 发送缓冲区的字节数组
    		channel.write(writeBuffer);
    		//****此处不含处理“写半包”的代码
    	}
    }
    
    • SocketChannel 不能保证一次能吧需要发送的数据发送完,此时就会出现写半包的问题。我们需要注册写操作,不断轮询Selector将没有发送完的消息发送完毕,然后通过Buffer的hasRemain()方法判断消息是否发送完成。
  • Client:
    package com.anxpp.io.calculator.nio;
    public class Client {
    	private static String DEFAULT_HOST = "127.0.0.1";
    	private static int DEFAULT_PORT = 12345;
    	private static ClientHandle clientHandle;
        
    	public static void start(){
    		start(DEFAULT_HOST,DEFAULT_PORT);
    	}
        
    	public static synchronized void start(String ip,int port){
    		if(clientHandle!=null)
    			clientHandle.stop();
    		clientHandle = new ClientHandle(ip,port);
    		new Thread(clientHandle,"Server").start();
    	}
        
    	//向服务器发送消息
    	public static boolean sendMsg(String msg) throws Exception{
    		if(msg.equals("q")) return false;
    		clientHandle.sendMsg(msg);
    		return true;
    	}
    	public static void main(String[] args){
    		start();
    	}
    }
    
    package com.anxpp.io.calculator.nio;
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.SocketChannel;
    import java.util.Iterator;
    import java.util.Set;
    /**
     * NIO客户端
     * @author yangtao__anxpp.com
     * @version 1.0
     */
    public class ClientHandle implements Runnable{
    	private String host;
    	private int port;
    	private Selector selector;
    	private SocketChannel socketChannel;
    	private volatile boolean started;
     
    	public ClientHandle(String ip,int port) {
    		this.host = ip;
    		this.port = port;
    		try{
    			// 创建选择器
    			selector = Selector.open();
    			// 打开监听通道
    			socketChannel = SocketChannel.open();
    			// 如果为 true,则此通道将被置于阻塞模式;如果为 false,则此通道将被置于非阻塞模式
    			socketChannel.configureBlocking(false);//开启非阻塞模式
    			started = true;
    		}catch(IOException e){
    			e.printStackTrace();
    			System.exit(1);
    		}
    	}
        
    	public void stop(){
    		started = false;
    	}
        
    	@Override
    	public void run() {
    		try{
    			doConnect();
    		}catch(IOException e){
    			e.printStackTrace();
    			System.exit(1);
    		}
    		// 循环遍历selector
    		while(started){
    			try{
    				// 无论是否有读写事件发生,selector每隔1s被唤醒一次
    				selector.select(1000);
    				// 阻塞,只有当至少一个注册的事件发生的时候才会继续.
    				// selector.select();
                    
    				Set<SelectionKey> keys = selector.selectedKeys();
    				Iterator<SelectionKey> it = keys.iterator();
    				SelectionKey key = null;
    				while(it.hasNext()){
    					key = it.next();
    					it.remove();
    					try{
    						handleInput(key);
    					}catch(Exception e){
    						if(key != null){
    							key.cancel();
    							if(key.channel() != null){
    								key.channel().close();
    							}
    						}
    					}
    				}
    			}catch(Exception e){
    				e.printStackTrace();
    				System.exit(1);
    			}
    		}
    		// selector关闭后会自动释放里面管理的资源
    		if(selector != null)
    			try{
    				selector.close();
    			}catch (Exception e) {
    				e.printStackTrace();
    			}
    	}
        
    	private void handleInput(SelectionKey key) throws IOException{
    		if(key.isValid()){
    			SocketChannel sc = (SocketChannel) key.channel();
    			if(key.isConnectable()){
    				if(sc.finishConnect());
    				else System.exit(1);
    			}
    			// 读消息
    			if(key.isReadable()){
    				// 创建ByteBuffer,并开辟一个1M的缓冲区
    				ByteBuffer buffer = ByteBuffer.allocate(1024);
    				// 读取请求码流,返回读取到的字节数
    				int readBytes = sc.read(buffer);
    				// 读取到字节,对字节进行编解码
    				if(readBytes>0){
    					// 将缓冲区当前的limit设置为position=0,用于后续对缓冲区的读取操作
    					buffer.flip();
    					// 根据缓冲区可读字节数创建字节数组
    					byte[] bytes = new byte[buffer.remaining()];
    					// 将缓冲区可读字节数组复制到新建的数组中
    					buffer.get(bytes);
    					String result = new String(bytes,"UTF-8");
    					System.out.println("客户端收到消息:" + result);
    				}
    				// 没有读取到字节 忽略
    				// else if(readBytes==0);
    				// 链路已经关闭,释放资源
    				else if(readBytes<0){
    					key.cancel();
    					sc.close();
    				}
    			}
    		}
    	}
        
    	// 异步发送消息
    	private void doWrite(SocketChannel channel,String request) throws IOException{
    		// 将消息编码为字节数组
    		byte[] bytes = request.getBytes();
    		// 根据数组容量创建ByteBuffer
    		ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
    		// 将字节数组复制到缓冲区
    		writeBuffer.put(bytes);
    		// flip操作
    		writeBuffer.flip();
    		// 发送缓冲区的字节数组
    		channel.write(writeBuffer);
    		//****此处不含处理“写半包”的代码
    	}
        
    	private void doConnect() throws IOException{
    		if(socketChannel.connect(new InetSocketAddress(host,port)));
    		else socketChannel.register(selector, SelectionKey.OP_CONNECT);
    	}
        
    	public void sendMsg(String msg) throws Exception{
    		socketChannel.register(selector, SelectionKey.OP_READ);
    		doWrite(socketChannel, msg);
    	}
    }
    
  • 测试代码:
    package com.anxpp.io.calculator.nio;
    import java.util.Scanner;
    /**
     * 测试方法
     * @author yangtao__anxpp.com
     * @version 1.0
     */
    public class Test {
    	//测试主方法
    	@SuppressWarnings("resource")
    	public static void main(String[] args) throws Exception{
    		//运行服务器
    		Server.start();
            
    		//避免客户端先于服务器启动前执行代码
    		Thread.sleep(100);
            
    		//运行客户端 
    		Client.start();
            
    		while(Client.sendMsg(new Scanner(System.in).nextLine()));
    	}
    }
    
  • 运行结果:

AIO(asynchronous IO)

AIO 是 Java 1.7 之后引入的包,是 NIO 的升级版本(NIO 2.0),新增了提异步非阻塞的 IO 操作方式,所以人们叫它 AIO(Asynchronous IO),异步 IO 是基于事件和回调机制实现的,也就是应用操作之后会直接返回,不会堵塞在那里,当后台处理完成,操作系统会执行回调通知相应的线程进行后续的操作。

在Java 7中增加了asynchronous IO,具体结构和实现类框架如下:

Java NIO:AsynchronousChannel类结构.png
  • 提供了“异步文件通道”和“异步套接字通道”(“AsynchronousServerSocketChannel”、“AsynchronousSocketChannel”)的实现。
  • 不需要过多的Selector对注册的通道进行轮询即可实现异步读写,从而简化了NIO的编程模型。


示例:【?????????????????????】

  • Server:
    package com.anxpp.io.calculator.aio.server;
    /**
     * AIO服务端
     * @author yangtao__anxpp.com
     * @version 1.0
     */
    public class Server {
    	private static int DEFAULT_PORT = 12345;
    	private static AsyncServerHandler serverHandle;
    	public volatile static long clientCount = 0;
        
    	public static void start(){
    		start(DEFAULT_PORT);
    	}
        
    	public static synchronized void start(int port){
    		if(serverHandle!=null)
    			return;
    		serverHandle = new AsyncServerHandler(port);
    		new Thread(serverHandle,"Server").start();
    	}
        
    	public static void main(String[] args){
    		Server.start();
    	}
    }
    
    package com.anxpp.io.calculator.aio.server;
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.channels.AsynchronousServerSocketChannel;
    import java.util.concurrent.CountDownLatch;
    public class AsyncServerHandler implements Runnable {
    	public CountDownLatch latch;
    	public AsynchronousServerSocketChannel channel;
        
    	public AsyncServerHandler(int port) {
    		try {
    			// 创建服务端通道
    			channel = AsynchronousServerSocketChannel.open();
    			// 绑定端口
    			channel.bind(new InetSocketAddress(port));
    			System.out.println("服务器已启动,端口号:" + port);
    		} catch (IOException e) {
    			e.printStackTrace();
    		}
    	}
        
    	@Override
    	public void run() {
    		// CountDownLatch初始化
    		// 它的作用:在完成一组正在执行的操作之前,允许当前的现场一直阻塞
    		// 此处,让现场在此阻塞,防止服务端执行完成后退出
    		// 也可以使用while(true)+sleep 
    		// 生成环境就不需要担心这个问题,以为服务端是不会退出的
    		latch = new CountDownLatch(1);
    		// 用于接收客户端的连接
    		channel.accept(this,new AcceptHandler());
    		try {
    			latch.await();
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		}
    	}
    }
    
    package com.anxpp.io.calculator.aio.server;
    import java.nio.ByteBuffer;
    import java.nio.channels.AsynchronousSocketChannel;
    import java.nio.channels.CompletionHandler;
    // 作为handler接收客户端连接
    public class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, AsyncServerHandler> {
    	@Override
    	public void completed(AsynchronousSocketChannel channel,AsyncServerHandler serverHandler) {
    		// 继续接受其他客户端的请求
    		Server.clientCount++;
    		System.out.println("连接的客户端数:" + Server.clientCount);
    		serverHandler.channel.accept(serverHandler, this);
    		// 创建新的Buffer
    		ByteBuffer buffer = ByteBuffer.allocate(1024);
    		// 异步读(第三个参数为接收消息回调的业务Handler)
    		channel.read(buffer, buffer, new ReadHandler(channel));
    	}
        
    	@Override
    	public void failed(Throwable exc, AsyncServerHandler serverHandler) {
    		exc.printStackTrace();
    		serverHandler.latch.countDown();
    	}
    }
    
    package com.anxpp.io.calculator.aio.server;
    import java.io.IOException;
    import java.io.UnsupportedEncodingException;
    import java.nio.ByteBuffer;
    import java.nio.channels.AsynchronousSocketChannel;
    import java.nio.channels.CompletionHandler;
    import com.anxpp.io.utils.Calculator;
    public class ReadHandler implements CompletionHandler<Integer, ByteBuffer> {
    	// 用于读取半包消息和发送应答
    	private AsynchronousSocketChannel channel;
        
    	public ReadHandler(AsynchronousSocketChannel channel) {
    			this.channel = channel;
    	}
        
    	// 读取到消息后的处理
    	@Override
    	public void completed(Integer result, ByteBuffer attachment) {
    		// flip操作
    		attachment.flip();
    		// 根据
    		byte[] message = new byte[attachment.remaining()];
    		attachment.get(message);
    		try {
    			String expression = new String(message, "UTF-8");
    			System.out.println("服务器收到消息: " + expression);
    			String calrResult = null;
    			try{
    				calrResult = Calculator.cal(expression).toString();
    			}catch(Exception e){
    				calrResult = "计算错误:" + e.getMessage();
    			}
    			// 向客户端发送消息
    			doWrite(calrResult);
    		} catch (UnsupportedEncodingException e) {
    			e.printStackTrace();
    		}
    	}
        
    	// 发送消息
    	private void doWrite(String result) {
    		byte[] bytes = result.getBytes();
    		ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
    		writeBuffer.put(bytes);
    		writeBuffer.flip();
            
    		// 异步写数据(参数与前面的read一样)
    		channel.write(writeBuffer, writeBuffer,new CompletionHandler<Integer, ByteBuffer>() {
    			@Override
    			public void completed(Integer result, ByteBuffer buffer) {
    				// 如果没有发送完,就继续发送直到完成
    				if (buffer.hasRemaining())
    					channel.write(buffer, buffer, this);
    				else{
    					// 创建新的Buffer
    					ByteBuffer readBuffer = ByteBuffer.allocate(1024);
    					//异步读(第三个参数为接收消息回调的业务Handler)
    					channel.read(readBuffer, readBuffer, new ReadHandler(channel));
    				}
    			}
    			@Override
    			public void failed(Throwable exc, ByteBuffer attachment) {
    				try {
    					channel.close();
    				} catch (IOException e) {
    				}
    			}
    		});
    	}
        
    	@Override
    	public void failed(Throwable exc, ByteBuffer attachment) {
    		try {
    			this.channel.close();
    		} catch (IOException e) {
    			e.printStackTrace();
    		}
    	}
    }
    
    此处本应有一个 WriteHandler 的,我们在 ReadHandler 中以一个匿名内部类实现了它。
  • Client:
    package com.anxpp.io.calculator.aio.client;
    import java.util.Scanner;
    public class Client {
    	private static String DEFAULT_HOST = "127.0.0.1";
    	private static int DEFAULT_PORT = 12345;
    	private static AsyncClientHandler clientHandle;
        
    	public static void start(){
    		start(DEFAULT_HOST,DEFAULT_PORT);
    	}
        
    	public static synchronized void start(String ip,int port){
    		if(clientHandle!=null)
    			return;
    		clientHandle = new AsyncClientHandler(ip,port);
    		new Thread(clientHandle,"Client").start();
    	}
        
    	// 向服务器发送消息
    	public static boolean sendMsg(String msg) throws Exception{
    		if(msg.equals("q")) return false;
    		clientHandle.sendMsg(msg);
    		return true;
    	}
        
    	@SuppressWarnings("resource")
    	public static void main(String[] args) throws Exception{
    		Client.start();
    		System.out.println("请输入请求消息:");
    		Scanner scanner = new Scanner(System.in);
    		while(Client.sendMsg(scanner.nextLine()));
    	}
    }
    
    package com.anxpp.io.calculator.aio.client;
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.AsynchronousSocketChannel;
    import java.nio.channels.CompletionHandler;
    import java.util.concurrent.CountDownLatch;
    public class AsyncClientHandler implements CompletionHandler<Void, AsyncClientHandler>, Runnable {
    	private AsynchronousSocketChannel clientChannel;
    	private String host;
    	private int port;
    	private CountDownLatch latch;
        
    	public AsyncClientHandler(String host, int port) {
    		this.host = host;
    		this.port = port;
    		try {
    			// 创建异步的客户端通道
    			clientChannel = AsynchronousSocketChannel.open();
    		} catch (IOException e) {
    			e.printStackTrace();
    		}
    	}
        
    	@Override
    	public void run() {
    		// 创建CountDownLatch等待
    		latch = new CountDownLatch(1);
    		// 发起异步连接操作,回调参数就是这个类本身,如果连接成功会回调completed方法
    		clientChannel.connect(new InetSocketAddress(host, port), this, this);
    		try {
    			latch.await();
    		} catch (InterruptedException e1) {
    			e1.printStackTrace();
    		}
    		try {
    			clientChannel.close();
    		} catch (IOException e) {
    			e.printStackTrace();
    		}
    	}
        
    	// 连接服务器成功
    	// 意味着TCP三次握手完成
    	@Override
    	public void completed(Void result, AsyncClientHandler attachment) {
    		System.out.println("客户端成功连接到服务器...");
    	}
        
    	// 连接服务器失败
    	@Override
    	public void failed(Throwable exc, AsyncClientHandler attachment) {
    		System.err.println("连接服务器失败...");
    		exc.printStackTrace();
    		try {
    			clientChannel.close();
    			latch.countDown();
    		} catch (IOException e) {
    			e.printStackTrace();
    		}
    	}
        
    	// 向服务器发送消息
    	public void sendMsg(String msg){
    		byte[] req = msg.getBytes();
    		ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
    		writeBuffer.put(req);
    		writeBuffer.flip();
    		// 异步写
    		clientChannel.write(writeBuffer, writeBuffer,new WriteHandler(clientChannel, latch));
    	}
    }
    
    package com.anxpp.io.calculator.aio.client;
    import java.io.IOException;
    import java.nio.ByteBuffer;
    import java.nio.channels.AsynchronousSocketChannel;
    import java.nio.channels.CompletionHandler;
    import java.util.concurrent.CountDownLatch;
    public class WriteHandler implements CompletionHandler<Integer, ByteBuffer> {
    	private AsynchronousSocketChannel clientChannel;
    	private CountDownLatch latch;
        
    	public WriteHandler(AsynchronousSocketChannel clientChannel,CountDownLatch latch) {
    		this.clientChannel = clientChannel;
    		this.latch = latch;
    	}
        
    	@Override
    	public void completed(Integer result, ByteBuffer buffer) {
    		// 完成全部数据的写入
    		if (buffer.hasRemaining()) {
    			clientChannel.write(buffer, buffer, this);
    		}
    		else {
    			// 读取数据
    			ByteBuffer readBuffer = ByteBuffer.allocate(1024);
    			clientChannel.read(readBuffer,readBuffer,new ReadHandler(clientChannel, latch));
    		}
    	}
    	@Override
    	public void failed(Throwable exc, ByteBuffer attachment) {
    		System.err.println("数据发送失败...");
    		try {
    			clientChannel.close();
    			latch.countDown();
    		} catch (IOException e) {
    		}
    	}
    }
    
    package com.anxpp.io.calculator.aio.client;
    import java.io.IOException;
    import java.io.UnsupportedEncodingException;
    import java.nio.ByteBuffer;
    import java.nio.channels.AsynchronousSocketChannel;
    import java.nio.channels.CompletionHandler;
    import java.util.concurrent.CountDownLatch;
    public class ReadHandler implements CompletionHandler<Integer, ByteBuffer> {
    	private AsynchronousSocketChannel clientChannel;
    	private CountDownLatch latch;
        
    	public ReadHandler(AsynchronousSocketChannel clientChannel,CountDownLatch latch) {
    		this.clientChannel = clientChannel;
    		this.latch = latch;
    	}
        
    	@Override
    	public void completed(Integer result,ByteBuffer buffer) {
    		buffer.flip();
    		byte[] bytes = new byte[buffer.remaining()];
    		buffer.get(bytes);
    		String body;
    		try {
    			body = new String(bytes,"UTF-8");
    			System.out.println("客户端收到结果:"+ body);
    		} catch (UnsupportedEncodingException e) {
    			e.printStackTrace();
    		}
    	}
        
    	@Override
    	public void failed(Throwable exc,ByteBuffer attachment) {
    		System.err.println("数据读取失败...");
    		try {
    			clientChannel.close();
    			latch.countDown();
    		} catch (IOException e) {
    		}
    	}
    }
    
  • 测试代码:
    package com.anxpp.io.calculator.aio;
    import java.util.Scanner;
    import com.anxpp.io.calculator.aio.client.Client;
    import com.anxpp.io.calculator.aio.server.Server;
    /**
     * 测试方法
     * @author yangtao__anxpp.com
     * @version 1.0
     */
    public class Test {
    	//测试主方法
    	@SuppressWarnings("resource")
    	public static void main(String[] args) throws Exception{
    		//运行服务器
    		Server.start();
            
    		//避免客户端先于服务器启动前执行代码
    		Thread.sleep(100);
            
    		//运行客户端 
    		Client.start();
            
    		System.out.println("请输入请求消息:");
    		Scanner scanner = new Scanner(System.in);
    		while(Client.sendMsg(scanner.nextLine()));
    	}
    }
    
  • 运行结果:
    服务器已启动端口号12345
    请输入请求消息
    客户端成功连接到服务器...
    连接的客户端数1
    123456+789+456
    服务器收到消息: 123456+789+456
    客户端收到结果:124701
    9526*56
    服务器收到消息: 9526*56
    客户端收到结果:533456
    ...
    

参考

  1. CSDN:Java 网络IO编程总结(BIO、NIO、AIO均含完整实例代码)


附:上文中服务端使用到的用于计算的工具类:

package com.anxpp.utils;
import javax.script.ScriptEngine;
import javax.script.ScriptEngineManager;
import javax.script.ScriptException;

public final class Calculator {  
    private final static ScriptEngine jse = new ScriptEngineManager().getEngineByName("JavaScript");
    public static Object cal(String expression) throws ScriptException{
        return jse.eval(expression);
    }
}