[TOC]
前言
之前说到的服务端程序都是在一个线程上进行的,这个线程不仅负责连接客户端发来的请求,同时还要处理读写事件,这样效率还是不够高。如今电脑都是多核处理器,这意味着可以同时进行多个线程,所以服务端应该充分利用这一点。
一、概述
服务端线程可以建立多个线程,将这些线程分成两组:
- 单线程配一个选择器(Boss), 专门处理 accept 事件
- 创建 cpu 核心数的线程(Worker), 每个线程配一个选择器,轮流处理 read 事件
关系图
说明
- Boss线程只负责Accept事件,Worker线程负责客户端与服务端之间的读写问题,他们都各自维护一个Selector负责监听通道的事件。
- 当Boss线程检测到有客户端的连接请求,就会把这个连接返回的
SocketChannel
注册到某一个Worker线程上。 - 当有读写事件发生时,其中一个Worker线程就会检测到事件,就会在该线程中进行处理,这样的设计做到了功能在线程上的分离。
二、实现思路
-
创建 一个 负责处理Accept事件的Boss线程,与 多个 负责处理Read事件的Worker线程;
-
Boss线程 执行的操作
- 接受并处理Accepet事件,当Accept事件发生后,调用Worker的register(SocketChannel socket)方法,让Worker去处理Read事件,其中需要 根据标识index去判断将任务分配给哪个Worker
// 创建固定数量的Worker
Worker[] workers = new Worker[Runtime.getRuntime().availableProcessors()];
// 用于负载均衡的原子整数
AtomicInteger index = new AtomicInteger(0);
// 负载均衡,轮询分配Worker
workers[index.getAndIncrement()% workers.length].register(socket);
* `register(SocketChannel socket)`方法会 **通过同步队列完成Boss线程与Worker线程之间的通信** ,让`SocketChannel`的注册任务被Worker线程执行。添加任务后需要调用`selector.wakeup()`来唤醒被阻塞的Selector
public void register(final SocketChannel socket) throws IOException {
// 只启动一次
if (!started) {
// 初始化操作
}
// 向同步队列中添加SocketChannel的注册事件
// 在Worker线程中执行注册事件
queue.add(new Runnable() {
@Override
public void run() {
try {
socket.register(selector, SelectionKey.OP_READ);
} catch (IOException e) {
e.printStackTrace();
}
}
});
// 唤醒被阻塞的Selector
selector.wakeup();
}
-
Worker线程执行 的操作
- 从同步队列中获取注册任务,并处理Read事件
三、代码实现
- 服务端代码
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import static com.lilinchao.nio.bytebuffer_2.ByteBufferUtil.debugAll;
/**
* Created by lilinchao
* Date 2022/6/4
* Description 多线程优化 -- 服务端
*/
@Slf4j
public class MultiThreadServer {
public static void main(String[] args) throws IOException {
Thread.currentThread().setName("boss");
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
// 负责轮询Accept事件的Selector
Selector boss = Selector.open();
SelectionKey bossKey = ssc.register(boss, 0, null);
bossKey.interestOps(SelectionKey.OP_ACCEPT);
ssc.bind(new InetSocketAddress(8080));
//创建固定数量的worker = core 数
Worker[] workers = new Worker[Runtime.getRuntime().availableProcessors()];
for (int i=0;i<workers.length;i++){
workers[i] = new Worker("worker-"+i);
}
// 用于负载均衡的原子整数
AtomicInteger index = new AtomicInteger();
while (true){
boss.select();
Iterator<SelectionKey> iterator = boss.selectedKeys().iterator();
while (iterator.hasNext()){
SelectionKey key = iterator.next();
iterator.remove();
if(key.isAcceptable()){
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
log.debug("connected:{}",sc.getRemoteAddress());
// 2. 关联 selector (静态内部类可以访问到selector)
log.debug("before register:{}",sc.getRemoteAddress());
// 负载均衡,轮询分配Worker
workers[index.getAndIncrement() % workers.length].register(sc);
log.debug("after register:{}",sc.getRemoteAddress());
}
}
}
}
static class Worker implements Runnable{
private Thread thread;
private Selector selector;
private String name;
private volatile boolean start = false; //还未初始化
/**
* 同步队列,用于Boss线程与Worker线程之间的通信
*/
private ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<>();
public Worker(String name) {
this.name = name;
}
//初始化线程和Selector
public void register(SocketChannel sc) throws IOException {
//只启动一次
if(!this.start){
this.thread = new Thread(this,name);
this.selector = Selector.open();
this.thread.start();
this.start = true;
}
//向队列添加任务,但这个任务并没有立刻执行
queue.add(() -> {
try {
sc.register(selector,SelectionKey.OP_READ,null);
} catch (ClosedChannelException e) {
e.printStackTrace();
}
});
selector.wakeup(); //唤醒select方法
}
@Override
public void run() {
while (true){
try {
selector.select(); //阻塞
// 通过同步队列获得任务并运行
Runnable task = queue.poll();
if(task != null){
task.run(); //获得任务,执行注册
}
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()){
SelectionKey key = iterator.next();
iterator.remove();
// Worker只负责Read事件
if(key.isReadable()){
ByteBuffer buffer = ByteBuffer.allocate(16);
SocketChannel channel = (SocketChannel) key.channel();
log.debug("read...{}",channel.getRemoteAddress());
channel.read(buffer);
buffer.flip();
debugAll(buffer);
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
- 客户端代码
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
/**
* Created by lilinchao
* Date 2022/6/3
* Description 客户端
*/
public class TestClient {
public static void main(String[] args) throws IOException {
SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("localhost", 8080));
sc.write(Charset.defaultCharset().encode("0123456789abcdef"));
System.in.read();
}
}
- 运行结果
13:03:57 [DEBUG] [boss] c.l.n.t.MultiThreadServer - connected:/127.0.0.1:52622
13:03:57 [DEBUG] [boss] c.l.n.t.MultiThreadServer - before register:/127.0.0.1:52622
13:03:57 [DEBUG] [boss] c.l.n.t.MultiThreadServer - after register:/127.0.0.1:52622
13:03:57 [DEBUG] [worker-0] c.l.n.t.MultiThreadServer - read.../127.0.0.1:52622
+--------+-------------------- all ------------------------+----------------+
position: [0], limit: [16]
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 30 31 32 33 34 35 36 37 38 39 61 62 63 64 65 66 |0123456789abcdef|
+--------+-------------------------------------------------+----------------+
在运行时,可以同时运行多个客户端程序,查看服务端的输出效果。
问题:如何拿到 cpu 个数
*
Runtime.getRuntime().availableProcessors()
如果工作在 docker 容器下,因为容器不是物理隔离的,会拿到物理 cpu 个数,而不是容器申请时的个数 > * 这个问题直到 jdk 10 才修复,使用 jvm 参数 UseContainerSupport 配置, 默认开启