1201.NIOEventLoopGroup 源码?

NioEventLoopGroup(其实是 MultithreadEventExecutorGroup) 内部维护一个类型为EventExecutor children [], 默认大小是处理器核数 * 2, 这样就构成了一个线程池,初始化EventExecutor 时 NioEventLoopGroup 重载 newChild 方法,所以 children 元素的实际类型为NioEventLoop。

线程启动时调用 SingleThreadEventExecutor 的构造方法,执行 NioEventLoop 类的 run 方法,首先会调用 hasTasks()方法判断当前 taskQueue 是否有元素。如果 taskQueue 中有元素,执行 selectNow() 方法,最终执行 selector.selectNow(),该方法会立即返回。如果taskQueue 没有元素,执行 select(oldWakenUp) 方法select ( oldWakenUp) 方法解决了 Nio 中的 bug,selectCnt 用来记录 selector.select 方法的执行次数和标识是否执行过 selector.selectNow(),若触发了 epoll 的空轮询 bug,则会反复执行 selector.select(timeoutMillis),变量 selectCnt 会逐渐变大,当 selectCnt 达到阈值(默认 512),则执行 rebuildSelector 方法,进行 selector 重建,解决 cpu 占用 100%的 bug。

rebuildSelector 方法先通过 openSelector 方法创建一个新的 selector。然后将 old selector 的selectionKey 执行 cancel。最后将 old selector 的 channel 重新注册到新的 selector 中。

rebuild 后,需要重新执行方法 selectNow,检查是否有已 ready 的 selectionKey。

接下来调用 processSelectedKeys 方法(处理 I/O 任务),当 selectedKeys != null 时,调用processSelectedKeysOptimized 方法,迭代 selectedKeys 获取就绪的 IO 事件的 selectkey 存放在数组 selectedKeys 中, 然后为每个事件都调用 processSelectedKey 来处理它,processSelectedKey 中分别处理 OP_READ;OP_WRITE;OP_CONNECT 事件。

最后调用 runAllTasks 方法(非 IO 任务),该方法首先会调用 fetchFromScheduledTaskQueue方法,把 scheduledTaskQueue 中已经超过延迟执行时间的任务移到 taskQueue 中等待被执行,然后依次从 taskQueue 中取任务执行,每执行 64 个任务,进行耗时检查,如果已执行时间超过预先设定的执行时间,则停止执行非 IO 任务,避免非 IO 任务太多,影响 IO 任务的执行。

每个 NioEventLoop 对应一个线程和一个 Selector,NioServerSocketChannel 会主动注册到某一个 NioEventLoop 的 Selector 上,NioEventLoop 负责事件轮询。

Outbound 事件都是请求事件, 发起者是 Channel,处理者是 unsafe,通过 Outbound 事件进行通知,传播方向是 tail 到 head。Inbound 事件发起者是 unsafe,事件的处理者是Channel, 是通知事件,传播方向是从头到尾。

内存管理机制,首先会预申请一大块内存 Arena,Arena 由许多 Chunk 组成,而每个 Chunk默认由 2048 个 page 组成。Chunk 通过 AVL 树的形式组织 Page,每个叶子节点表示一个Page,而中间节点表示内存区域,节点自己记录它在整个 Arena 中的偏移地址。当区域被分配出去后,中间节点上的标记位会被标记,这样就表示这个中间节点以下的所有节点都已被分配了。大于 8k 的内存分配在 poolChunkList 中,而 PoolSubpage 用于分配小于 8k 的内存,它会把一个 page 分割成多段,进行内存分配。

ByteBuf 的特点:支持自动扩容(4M),保证 put 方法不会抛出异常、通过内置的复合缓冲类型,实现零拷贝(zero-copy);不需要调用 flip()来切换读/写模式,读取和写入索引分开;方法链;引用计数基于 AtomicIntegerFieldUpdater 用于内存回收;PooledByteBuf 采用二叉树来实现一个内存池,集中管理内存的分配和释放,不用每次使用都新建一个缓冲区对象。UnpooledHeapByteBuf 每次都会新建一个缓冲区对象