2023-09-23
原文作者:李林超 原文地址: https://www.lilinchao.com/archives/2115.html

[TOC]

前言

之前说到的服务端程序都是在一个线程上进行的,这个线程不仅负责连接客户端发来的请求,同时还要处理读写事件,这样效率还是不够高。如今电脑都是多核处理器,这意味着可以同时进行多个线程,所以服务端应该充分利用这一点。

一、概述

服务端线程可以建立多个线程,将这些线程分成两组:

  • 单线程配一个选择器(Boss), 专门处理 accept 事件
  • 创建 cpu 核心数的线程(Worker), 每个线程配一个选择器,轮流处理 read 事件

关系图

202309232223479681.png

说明

  • Boss线程只负责Accept事件,Worker线程负责客户端与服务端之间的读写问题,他们都各自维护一个Selector负责监听通道的事件。
  • 当Boss线程检测到有客户端的连接请求,就会把这个连接返回的SocketChannel注册到某一个Worker线程上。
  • 当有读写事件发生时,其中一个Worker线程就会检测到事件,就会在该线程中进行处理,这样的设计做到了功能在线程上的分离。

二、实现思路

  • 创建 一个 负责处理Accept事件的Boss线程,与 多个 负责处理Read事件的Worker线程;

  • Boss线程 执行的操作

    • 接受并处理Accepet事件,当Accept事件发生后,调用Worker的register(SocketChannel socket)方法,让Worker去处理Read事件,其中需要 根据标识index去判断将任务分配给哪个Worker
            // 创建固定数量的Worker
            Worker[] workers = new Worker[Runtime.getRuntime().availableProcessors()];
            // 用于负载均衡的原子整数
            AtomicInteger index = new AtomicInteger(0);
            // 负载均衡,轮询分配Worker
            workers[index.getAndIncrement()% workers.length].register(socket);
 *  `register(SocketChannel socket)`方法会 **通过同步队列完成Boss线程与Worker线程之间的通信** ,让`SocketChannel`的注册任务被Worker线程执行。添加任务后需要调用`selector.wakeup()`来唤醒被阻塞的Selector
            public void register(final SocketChannel socket) throws IOException {
                // 只启动一次
                if (!started) {
                   // 初始化操作
                }
                // 向同步队列中添加SocketChannel的注册事件
                // 在Worker线程中执行注册事件
                queue.add(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            socket.register(selector, SelectionKey.OP_READ);
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }
                });
                // 唤醒被阻塞的Selector
                selector.wakeup();
            }
  • Worker线程执行 的操作

    • 从同步队列中获取注册任务,并处理Read事件

三、代码实现

  • 服务端代码
    import lombok.extern.slf4j.Slf4j;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.*;
    import java.util.Iterator;
    import java.util.concurrent.ConcurrentLinkedQueue;
    import java.util.concurrent.atomic.AtomicInteger;
    
    import static com.lilinchao.nio.bytebuffer_2.ByteBufferUtil.debugAll;
    
    /**
     * Created by lilinchao
     * Date 2022/6/4
     * Description 多线程优化  -- 服务端
     */
    @Slf4j
    public class MultiThreadServer {
        public static void main(String[] args) throws IOException {
            Thread.currentThread().setName("boss");
            ServerSocketChannel ssc = ServerSocketChannel.open();
            ssc.configureBlocking(false);
            // 负责轮询Accept事件的Selector
            Selector boss = Selector.open();
            SelectionKey bossKey = ssc.register(boss, 0, null);
            bossKey.interestOps(SelectionKey.OP_ACCEPT);
            ssc.bind(new InetSocketAddress(8080));
    
            //创建固定数量的worker = core 数
            Worker[] workers = new Worker[Runtime.getRuntime().availableProcessors()];
            for (int i=0;i<workers.length;i++){
                workers[i] = new Worker("worker-"+i);
            }
            // 用于负载均衡的原子整数
            AtomicInteger index = new AtomicInteger();
            while (true){
                boss.select();
                Iterator<SelectionKey> iterator = boss.selectedKeys().iterator();
                while (iterator.hasNext()){
                    SelectionKey key = iterator.next();
                    iterator.remove();
                    if(key.isAcceptable()){
                        SocketChannel sc = ssc.accept();
                        sc.configureBlocking(false);
                        log.debug("connected:{}",sc.getRemoteAddress());
                        // 2. 关联 selector (静态内部类可以访问到selector)
                        log.debug("before register:{}",sc.getRemoteAddress());
                        // 负载均衡,轮询分配Worker
                        workers[index.getAndIncrement() % workers.length].register(sc);
                        log.debug("after register:{}",sc.getRemoteAddress());
                    }
                }
    
            }
        }
    
        static class Worker implements Runnable{
            private Thread thread;
            private Selector selector;
            private String name;
            private volatile boolean start = false; //还未初始化
            /**
             * 同步队列,用于Boss线程与Worker线程之间的通信
             */
            private ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<>();
    
            public Worker(String name) {
                this.name = name;
            }
    
            //初始化线程和Selector
            public void register(SocketChannel sc) throws IOException {
                //只启动一次
                if(!this.start){
                    this.thread = new Thread(this,name);
                    this.selector = Selector.open();
                    this.thread.start();
                    this.start = true;
                }
    
                //向队列添加任务,但这个任务并没有立刻执行
                queue.add(() -> {
                    try {
                        sc.register(selector,SelectionKey.OP_READ,null);
                    } catch (ClosedChannelException e) {
                        e.printStackTrace();
                    }
                });
                selector.wakeup(); //唤醒select方法
            }
    
            @Override
            public void run() {
                while (true){
                    try {
                        selector.select();  //阻塞
                        // 通过同步队列获得任务并运行
                        Runnable task = queue.poll();
                        if(task != null){
                            task.run(); //获得任务,执行注册
                        }
                        Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                        while (iterator.hasNext()){
                            SelectionKey key = iterator.next();
                            iterator.remove();
                            // Worker只负责Read事件
                            if(key.isReadable()){
                                ByteBuffer buffer = ByteBuffer.allocate(16);
                                SocketChannel channel = (SocketChannel) key.channel();
                                log.debug("read...{}",channel.getRemoteAddress());
                                channel.read(buffer);
                                buffer.flip();
                                debugAll(buffer);
                            }
                        }
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
  • 客户端代码
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.channels.SocketChannel;
    import java.nio.charset.Charset;
    
    /**
     * Created by lilinchao
     * Date 2022/6/3
     * Description 客户端
     */
    public class TestClient {
        public static void main(String[] args) throws IOException {
            SocketChannel sc = SocketChannel.open();
            sc.connect(new InetSocketAddress("localhost", 8080));
            sc.write(Charset.defaultCharset().encode("0123456789abcdef"));
            System.in.read();
        }
    }
  • 运行结果
    13:03:57 [DEBUG] [boss] c.l.n.t.MultiThreadServer - connected:/127.0.0.1:52622
    13:03:57 [DEBUG] [boss] c.l.n.t.MultiThreadServer - before register:/127.0.0.1:52622
    13:03:57 [DEBUG] [boss] c.l.n.t.MultiThreadServer - after register:/127.0.0.1:52622
    13:03:57 [DEBUG] [worker-0] c.l.n.t.MultiThreadServer - read.../127.0.0.1:52622
    +--------+-------------------- all ------------------------+----------------+
    position: [0], limit: [16]
             +-------------------------------------------------+
             |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
    +--------+-------------------------------------------------+----------------+
    |00000000| 30 31 32 33 34 35 36 37 38 39 61 62 63 64 65 66 |0123456789abcdef|
    +--------+-------------------------------------------------+----------------+

在运行时,可以同时运行多个客户端程序,查看服务端的输出效果。

问题:如何拿到 cpu 个数

* Runtime.getRuntime().availableProcessors() 如果工作在 docker 容器下,因为容器不是物理隔离的,会拿到物理 cpu 个数,而不是容器申请时的个数 > * 这个问题直到 jdk 10 才修复,使用 jvm 参数 UseContainerSupport 配置, 默认开启

阅读全文