您现在的位置是:亿华云 > 域名
NIO之多线程协作处理数据读写
亿华云2025-10-02 21:10:08【域名】7人已围观
简介本文转载自微信公众号「源码学徒」,作者皇甫嗷嗷叫 。转载本文请联系源码学徒公众号。经过前面几章的学习,我们已经 能够掌握了JDK NIO的开发方式,我们来总结一下NIO开发的流程:创建一个服务端通道
本文转载自微信公众号「源码学徒」,之作处作者皇甫嗷嗷叫 。多线转载本文请联系源码学徒公众号。程协
经过前面几章的理数学习,我们已经 能够掌握了JDK NIO的据读开发方式,我们来总结一下NIO开发的之作处流程:
创建一个服务端通道 ServerSocketChannel 创建一个选择器 Selector 将服务端通道注册到选择器上,并且关注我们感兴趣的多线事件serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); 绑定服务管道的地址 serverSocketChannel.bind(new InetSocketAddress(8989)); 开始进行事件选择,选择我们感兴趣的程协事件做对应的操作!具体的代码信息请参照第一章:多路复用模型章节,这里不做太多的理数赘述!
有关多路复用的概念,我们也在第一章进行了分析。据读多路复用模型能够最大限度的之作处将一个线程的执行能力榨干,一条线程执行所有的多线数据,包括新连接的程协接入、数据的理数读取、计算与回写,据读但是假设,我们的站群服务器数据计算及其缓慢,那么该任务的执行就势必影响下一个新链接的接入!
传统NIO单线程模型
单线程的NIO模型
如图,我们能了解到,单线程情况下,读事件因为要做一些业务性操作(数据库连接、图片、文件下载)等操作,导致线程阻塞再,读事件的处理上,此时单线程程序无法进行下一次新链接的处理!我们对该线程模型进行优化,select事件处理封装为任务,提交到线程池!
NIO多线程模型
上面的这种数据结构能够解决掉因为计算任务耗时过长,导致新链接接入阻塞的问题,我们能否再次进行一次优化呢?
我们能否创建多个事件选择器,每个事件选择器,负责不同的Socket连接,就像下面这种:
NIO多线程优化模型
这样我们就可以每一个Select选择器负责多个客户端Socket连接,主线程只需要将客户端新连接选择一个选择器注册到select选择器上就可以了!所以我们的架构图,就变成了下图这样:
我们在select选择器内部处理计算任务的时候,也可以将任务封装为task,提交到线程池里面去,彻底将新连接接入和读写事件处理分离开,高防服务器互不影响!事实上,这也是Netty的核心思想之一,我们可以根据上面的图例,自己简单写一个:
代码实现
构建一个事件执行器 对应上图的select选择器
/** * Nio事件处理器 * * @author huangfu * @date */ public class MyNioEventLoop implements Runnable { static final ByteBuffer ALLOCATE = ByteBuffer.allocate(128); private final Selector selector; private final LinkedBlockingQueue<Runnable> linkedBlockingQueue; public MyNioEventLoop(Selector selector) { this.selector = selector; linkedBlockingQueue = new LinkedBlockingQueue<>(); } public Selector getSelector() { return selector; } public LinkedBlockingQueue<Runnable> getLinkedBlockingQueue() { return linkedBlockingQueue; } //忽略 hashCode和eques /** * 任务处理器 */ @Override public void run() { while (!Thread.currentThread().isInterrupted()) { try { //进行事件选择 这里我们只处理读事件 if (selector.select() > 0) { Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectionKeys.iterator(); //处理读事件 while (iterator.hasNext()) { SelectionKey next = iterator.next(); iterator.remove(); if (next.isReadable()) { SocketChannel channel = (SocketChannel) next.channel(); int read = channel.read(ALLOCATE); if(read > 0) { System.out.printf("线程%s【%s】发来消-息:",Thread.currentThread().getName(), channel.getRemoteAddress()); System.out.println(new String(ALLOCATE.array(), StandardCharsets.UTF_8)); }else if(read == -1) { System.out.println("连接断开"); channel.close(); } ALLOCATE.clear(); } } selectionKeys.clear(); }else { //处理异步任务 进行注册 while (!linkedBlockingQueue.isEmpty()) { Runnable take = linkedBlockingQueue.take(); //异步事件执行 take.run(); } } } catch (IOException | InterruptedException e) { e.printStackTrace(); } } } }构建一个选择器组
/** * 选择器组 * * @author huangfu * @date 2021年3月12日09:44:37 */ public class SelectorGroup { private final List<MyNioEventLoop> SELECTOR_GROUP = new ArrayList<>(8); private static final int AVAILABLE_PROCESSORS = Runtime.getRuntime().availableProcessors(); private final static AtomicInteger IDX = new AtomicInteger(); /** * 初始化选择器 * @param count 处理器数量 * @throws IOException 异常欣喜 */ public SelectorGroup(int count) throws IOException { for (int i = 0; i < count; i++) { Selector open = Selector.open(); MyNioEventLoop myNioEventLoop = new MyNioEventLoop(open); SELECTOR_GROUP.add(myNioEventLoop); } } public SelectorGroup() throws IOException { this(AVAILABLE_PROCESSORS << 1); } /** * 轮询获取一个选择器 * @return 返回一个选择器 */ public MyNioEventLoop next(){ int andIncrement = IDX.getAndIncrement(); int length = SELECTOR_GROUP.size(); return SELECTOR_GROUP.get(Math.abs(andIncrement % length)); } }构建一个执行器记录器
/** * @author huangfu * @date */ public class ThreadContext { /** * 记录当前使用过的选择器 */ public static final Set<MyNioEventLoop> RUN_SELECT = new HashSet<>(); }构建一个新连接接入选择器
/** * 连接器 * * @author huangfu * @date 2021年3月12日10:15:37 */ public class Acceptor implements Runnable { private final ServerSocketChannel serverSocketChannel; private final SelectorGroup selectorGroup; public Acceptor(ServerSocketChannel serverSocketChannel, SelectorGroup selectorGroup) { this.serverSocketChannel = serverSocketChannel; this.selectorGroup = selectorGroup; } @Override public void run() { try { SocketChannel socketChannel = serverSocketChannel.accept(); MyNioEventLoop next = selectorGroup.next(); //向队列追加一个注册任务 next.getLinkedBlockingQueue().offer(() -> { try { //客户端注册为非阻塞 socketChannel.configureBlocking(false); //注册到选择器 关注一个读事件 socketChannel.register(next.getSelector(), SelectionKey.OP_READ); } catch (Exception e) { e.printStackTrace(); } }); //唤醒对应的任务,让其处理异步任务 next.getSelector().wakeup(); System.out.println("检测到连接:" + socketChannel.getRemoteAddress()); //当当前选择器已经被使用过了 就不再使用了,直接注册就行了 if (ThreadContext.RUN_SELECT.add(next)) { //启动任务 new Thread(next).start(); } } catch (IOException e) { e.printStackTrace(); } } }创建启动器
/** * @author huangfu * @date */ public class TestMain { public static void main(String[] args) throws IOException { //创建一个选择器组 传递选择器组的大小 决定使用多少选择器来实现 SelectorGroup selectorGroup = new SelectorGroup(2); //开启一个服务端管道 ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); //开启一个服务端专用的选择器 Selector selector = Selector.open(); //设置非阻塞 serverSocketChannel.configureBlocking(false); //创建一个连接器 Acceptor acceptor = new Acceptor(serverSocketChannel, selectorGroup); //将服务端通道注册到服务端选择器上 这里会绑定一个新连接接入器 serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT, acceptor); //绑定端口 serverSocketChannel.bind(new InetSocketAddress(8989)); //启动处理器 new Reactor(selector).run(); } }总结
单线程下的NIO存在性能瓶颈,当某一计算过程缓慢的时候会阻塞住整个线程,导致影响其他事件的处理!
为了解决这一缺陷,我们提出了使用异步线程的方式去操作任务,将耗时较长的业务,封装为一个异步任务,提交到线程池执行!
为了使业务操作和新连接接入完全分离开,我们做了另外一重优化,我们封装了一个选择器组,轮询的方式获取选择器,每一个选择器都能够处理多个新连接, socket连接->selector选择器 = 多 -> 1,在每一个选择器里面又可以使用线程池来处理任务,云南idc服务商进一步提高吞吐量!
很赞哦!(68464)