第十二章 高性能语言与框架
12.1 Golang 对 epoll 的封装
在协程没有流行以前,传统的网络编程中,同步阻塞是性能低下的代名词,一次切换就得是 3 us 左右的 CPU 开销。各种基于 epoll 的异步非阻塞的模型虽然提高了性能,但是基于回调函数的编程方式却非常不符合人的直线思维模式。开发出来的代码也不那么容易被理解。
Golang 的出现,可以说是将协程编程模式推向了一个高潮。这种新的编程方式既兼顾了同步编程方式的简单易用,也在底层通过协程和 epoll 的配合避免了线程切换的性能高损耗。换句话说就是既简单易用,性能又还挺不错。
飞哥当年也是相中了 Golang 的这个特点,开始带领团队转型 Golang 开发的。那么今天我们来深刻和大家分享一下 Golang 官方提供的 net 包,来看看它是如何达成上面所说的这样的效果的。
12.1.1 Golang net 包的使用方式
考虑到不少读者没有使用过 Golang,那么开头我先把一个基于官方 net 包的 Golang 服务的简单使用代码给大家列出来。为了方便大家理解,我只保留骨干代码。
func main() {
//构造一个 listener
listener, _ := net.Listen("tcp", "127.0.0.1:9008")
for {
//接收请求
conn, err := listener.Accept()
//启动一个协程来处理
go process(conn)
}
}
func process(conn net.Conn) {
//结束时关闭连接
defer conn.Close()
//读取连接上的数据
var buf [1024]byte
len, err := conn.Read(buf[:])
//发送数据
_, err = conn.Write([]byte("I am server!"))
...
}在这个示例服务程序中,先是使用 net.Listen 来监听了本地的 9008 这个端口。然后调用 Accept 进行接收连接处理。如果接收到了连接请求,通过 go process() 来启动一个协程进行处理。在连接的处理中我展示了读写操作(Read 和 Write)。
整个服务程序看起来,妥妥的就是一个同步模型,包括 Accept、Read 和 Write 都会将当前协程给”阻塞”掉。比如 Read 函数这里,如果服务器调用时客户端数据还没有到达,那么 Read 是不带返回的,会将当前的协程 park 住。直到有了数据 Read 才会返回,处理协程继续执行。
你如果在其它语言,例如 C 和 Java 中写出这样类似的服务器代码,估计会被打死的。因为每一次同步的 Accept、Read、Write 都会导致你当前的线程被阻塞掉,会浪费大量的 CPU 进行线程上下文的切换。
但是在 Golang 中这样的代码运行性能却是非常不错,为啥呢?我们继续看本文接下来的内容。
12.1.2 Listen 底层过程
在传统的 C、Java 等传统语言中,listen 所做的事情就是直接调用内核的 listen 系统调用,参见本书第六章。但是如果你也这么同等地理解 Golang net 包里的 Listen,那可就大错特错了。
和其它语言不同,在 Golang net 的 listen 中,会完成如下几件事:
- 创建 socket 并设置非阻塞
- bind 绑定并监听本地的一个端口
- 调用 listen 开始监听
- epoll_create 创建一个 epoll 对象
- epoll_ctl 将 listen 的 socket 添加到 epoll 中等待连接到来
一次 Golang 的 Listen 调用,相当于在 C 语言中的 socket、bind、listen、epoll_create、epoll_ctl 等多次函数调用的效果。封装度非常的高,更大程度地对程序员屏蔽了底层的实现细节。
NOTE
插一句题外话:现在的各种开发工具的封装程度越来越高,真不知道对码农来说是好事还是坏事。好处是开发效率更高了,缺点是将来的程序员想了解底层也越来越难了,越来越像传统企业里流水线上的工人。
口说无凭,我们挖开 Golang 的内部源码瞅一瞅,这样更真实。
Listen 的入口在 Golang 源码的 net/dial.go 文件中,让我们展开来看更细节的逻辑。
1) Listen 入口执行流程
源码不用细看,看懂大概流程就可以。
//file:go1.14.4/src/net/dial.go
func Listen(network, address string) (Listener, error) {
var lc ListenConfig
return lc.Listen(context.Background(), network, address)
}可见,这个 Listen 只是一个入口。接下来会进入到 ListenConfig 下的 Listen 方法中。在 ListenConfig 的 Listen 中判断这是一个 TCP 类型的话,会进入到 sysListener 下的 listenTCP 方法里(src/net/tcpsock_posix.go)。然后再经过两三次的函数调用跳转,会进入到 net/sock_posix.go 文件下的 socket 函数中。我们直接看它。
//file:go1.14.4/src/net/sock_posix.go
func socket(ctx context.Context, net string, family, ...) (fd *netFD, err error) {
//创建 socket,见 2) 创建 socket 小节
s, err := sysSocket(family, sotype, proto)
...
//TCP 绑定和监听,见 3) 绑定和监听小节
//epoll对象的创建以及文件描述符的添加 见 4) epoll创建和初始化小节
if laddr != nil && raddr == nil {
switch sotype {
case syscall.SOCK_STREAM, syscall.SOCK_SEQPACKET:
fd.listenStream(laddr, listenerBacklog(), ctrlFn);
......
}
}
}接下来我们分别在 2) 创建 socket 和 3) 绑定和监听 小节来介绍 sysSocket 和 listenStream 这两个函数。
2) 创建 socket
sysSocket 这个函数和其它语言中的 socket 函数有很大的不同。在这个一个函数内就完成了三件事,创建 socket、bind 和 listen 监听。我们来看 sysSocket 的具体代码。
//file:net/sys_cloexec.go
func sysSocket(family, sotype, proto int) (int, error) {
//创建 socket
s, err := socketFunc(family, sotype, proto)
//设置为非阻塞模式
syscall.SetNonblock(s, true)
}在 sysSocket 中,调用的 socketFunc 其实就是 socket 系统调用。见如下代码。
//file:net/hook_unix.go
var (
// Placeholders for socket system calls.
socketFunc func(int, int, int) (int, error) = syscall.Socket
connectFunc func(int, syscall.Sockaddr) error = syscall.Connect
listenFunc func(int, int) error = syscall.Listen
getsockoptIntFunc func(int, int, int) (int, error) = syscall.GetsockoptInt
)创建完 socket 之后,再调用 syscall.SetNonblock 将其设置为非阻塞模式。
//file:syscall/exec_unix.go
func SetNonblock(fd int, nonblocking bool) (err error) {
...
if nonblocking {
flag |= O_NONBLOCK
}
fcntl(fd, F_SETFL, flag)
}3) 绑定和监听
我们接着再来看 listenStream。这个函数一进来就调用了系统调用 bind 和 listen 来完成绑定和监听。
//file:net/sock_posix.go
func (fd *netFD) listenStream(laddr sockaddr,...) error {
...
//等同于 c 语言中的: bind(listenfd, ...)
syscall.Bind(fd.pfd.Sysfd, lsa);
//等同于 c 语言中的:listen(listenfd, ...)
listenFunc(fd.pfd.Sysfd, backlog);
//这里非常关键:初始化socket与异步IO相关的内容
if err = fd.init(); err != nil {
return err
}
}其中 listenFunc 是一个宏,指向的就是 syscall.Listen 系统调用。
//file:go1.14.4/src/net/hook_unix.go
import "syscall"
var (
// Placeholders for socket system calls.
socketFunc func(int, int, int) (int, error) = syscall.Socket
connectFunc func(int, syscall.Sockaddr) error = syscall.Connect
listenFunc func(int, int) error = syscall.Listen
getsockoptIntFunc func(int, int, int) (int, error) = syscall.GetsockoptInt
)4) epoll 创建和初始化
接下来在 fd.init 这一行,经过多次的函数调用展开以后会执行到 epoll 对象的创建,并还把在 listen 状态的 socket 句柄添加到了 epoll 对象中来管理其网络事件。
我们来看它是如何完成的。
//file:go1.14.4/src/internal/poll/fd_poll_runtime.go
func (pd *pollDesc) init(fd *FD) error {
serverInit.Do(runtime_pollServerInit)
ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
...
return nil
}serverInit.Do 这个是用来保证参数内的函数只执行一次的。不过多展开介绍。其参数 runtime_pollServerInit 是对 runtime 包的函数 poll_runtime_pollServerInit 的调用,其源码位于 runtime/netpoll.go 下。
//file:runtime/netpoll.go
//go:linkname poll_runtime_pollServerInit internal/poll.runtime_pollServerInit
func poll_runtime_pollServerInit() {
netpollGenericInit()
}该函数会执行到 netpollGenericInit,epoll 就是在它的内部创建的。
//file:netpoll_epoll.go
func netpollinit() {
// epoll 对象的创建
epfd = epollcreate1(_EPOLL_CLOEXEC)
...
}再来看 runtime_pollOpen。它的参数就是前面 listen 好了的 socket 的文件描述符。在这个函数里,它将被放到 epoll 对象中。
//file:runtime/netpoll_epoll.go
//go:linkname poll_runtime_pollOpen internal/poll.runtime_pollOpen
func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
...
errno = netpollopen(fd, pd)
return pd, int(errno)
}
//file:runtime/netpoll_epoll.go
func netpollopen(fd uintptr, pd *pollDesc) int32 {
var ev epollevent
ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
*(**pollDesc)(unsafe.Pointer(&ev.data)) = pd
// listen 状态的 socket 被添加到了 epoll 中.
return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)
}12.1.3 Accept 过程
服务端在 Listen 完了之后,就是对 Accept 的调用了。该函数主要做了三件事:
- 调用 accept 系统调用接收一个连接
- 如果没有连接到达,把当前协程阻塞掉
- 新连接到来的话,将其添加到 epoll 中管理,然后返回
通过 Golang 里的单步调试可以看到它进入到了 TCPListener 下的 Accept 里了。
//file: net/tcpsock.go
func (l *TCPListener) Accept() (Conn, error) {
c, err := l.accept()
...
}
func (ln *TCPListener) accept() (*TCPConn, error) {
//以 netFD 的形式返回一个新连接
fd, err := ln.fd.accept()
}我们上面说的三步都是在 netFD 的 accept 函数里处理的。
//file:net/fd_unix.go
func (fd *netFD) accept() (netfd *netFD, err error) {
// 1)接收一个连接
// 2)如果连接没有到达阻塞当前协程
d, rsa, errcall, err := fd.pfd.Accept()
// 3)将新到的连接也添加到 epoll 中进行管理
netfd, err = newFD(d, fd.family, fd.sotype, fd.net);
netfd.init();
...
return netfd, nil
}接下来我们详细看每一步的细节。
1) 接收一个连接
经过单步跟踪后发现 Accept 进入到了 FD 对象的 Accept 方法下。在这里将调用操作系统的 accept 系统调用。
//file:internal/poll/fd_unix.go
// Accept wraps the accept network call.
func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) {
for {
//调用 accept 系统调用接收一个连接
s, rsa, errcall, err := accept(fd.Sysfd)
//接收到了连接就返回它
if err == nil {
return s, rsa, "", err
}
switch err {
case syscall.EAGAIN:
//如果没有获取到,那就把协程给阻塞起来
if fd.pd.pollable() {
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}
...
}
}
...
}其中 accept 方法内部会触发 linux 操作系统的 accept 系统调用,我们就不过度展开了。调用 accept 目的是获取一个来自客户端的连接。如果接收到了,就把它返回回去。
2) 阻塞当前协程
我们来说说如果没 accept 调用的时候,客户端的连接请求还一个都没有过来怎么办。
这时候,accept 系统调用会返回 syscall.EAGAIN。Golang 在对这个状态的处理中,会把当前协程给阻塞起来。关键代码在这里:
//file: internal/poll/fd_poll_runtime.go
func (pd *pollDesc) waitRead(isFile bool) error {
return pd.wait('r', isFile)
}
func (pd *pollDesc) wait(mode int, isFile bool) error {
if pd.runtimeCtx == 0 {
return errors.New("waiting for unsupported file type")
}
res := runtime_pollWait(pd.runtimeCtx, mode)
return convertErr(res, isFile)
}runtime_pollWait 的源码在 runtime/netpoll.go 下。gopark(协程的阻塞)就是在这里完成的。
//file:runtime/netpoll.go
//go:linkname poll_runtime_pollWait internal/poll.runtime_pollWait
func poll_runtime_pollWait(pd *pollDesc, mode int) int {
...
for !netpollblock(pd, int32(mode), false) {
}
}
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
...
if waitio || netpollcheckerr(pd, mode) == 0 {
gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait,
traceEvGoBlockNet, 5)
}
}gopark 这个函数就是 golang 内部阻塞协程的入口。
3) 将新连接添加到 epoll 中
我们再来说说假如客户端连接已经到来了的情况。这时 fd.pfd.Accept 会返回新建的连接。然后会将该新连接也一并加入到 epoll 中进行高效的事件管理。
我们来看 netfd.init:
//file:net/fd_unix.go
func (fd *netFD) accept() (netfd *netFD, err error) {
//3.1 接收一个连接
//3.2 如果连接没有到达阻塞当前协程
d, rsa, errcall, err := fd.pfd.Accept()
//3.2 将新到的连接也添加到 epoll 中进行管理
netfd, err = newFD(d, fd.family, fd.sotype, fd.net);
netfd.init();
...
return netfd, nil
}//file:internal/poll/fd_poll_runtime.go
func (pd *pollDesc) init(fd *FD) error {
...
ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
...
}runtime_pollOpen 这个 runtime 函数我们在上面的 2.4 节介绍过了,就是把文件句柄添加到 epoll 对象中。
//file:runtime/netpoll_epoll.go
//go:linkname poll_runtime_pollOpen internal/poll.runtime_pollOpen
func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
...
errno = netpollopen(fd, pd)
return pd, int(errno)
}
func netpollopen(fd uintptr, pd *pollDesc) int32 {
var ev epollevent
ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
*(**pollDesc)(unsafe.Pointer(&ev.data)) = pd
//新连接的 socket 也被添加到了 epoll 中.
return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)
}12.1.4 Read 和 Write 内部过程
当连接接收完成后,剩下的就是在连接上的读写了。
1) Read 内部过程
我们先来看 Read。
//file:/Users/zhangyanfei/sdk```go
//file:/Users/zhangyanfei/sdk/go1.14.4/src/net/net.go
func (c *conn) Read(b []byte) (int, error) {
...
n, err := c.fd.Read(b)
}Read 函数会进入到 FD 的 Read 中.在这个函数内部调用 Read 系统调用来读取数据.如果数据还尚未到达则也是把自己阻塞起来.
//file:internal/poll/fd_unix.go
func (fd *FD) Read(p []byte) (int, error) {
for {
//调用 Read 系统调用
n, err := syscall.Read(fd.Sysfd, p)
if err != nil {
n = 0
//将自己添加到 epoll 中等待事件,然后阻塞掉。
if err == syscall.EAGAIN && fd.pd.pollable() {
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}
......
}
}其中 waitRead 是如何将当前协程阻塞掉的,这个和我们前面 3.2 节介绍是一样的,就不过多展开叙述了.
2) Write 内部过程
Write 的大体过程和 Read 是类似的.先是调用 Write 系统调用发送数据,如果内核发送缓存区不足的时候,就把自己先阻塞起来,然后等可写事件发生的时候再继续发送.其源码入口位于 net/net.go.
//file:net/net.go
func (c *conn) Write(b []byte) (int, error) {
...
n, err := c.fd.Write(b)
}pd.wait 之后的事情就又和前面介绍过的过程一样了.调用 runtime_pollWait 来将当前协程阻塞掉.
第十二章 高性能语言与框架
12.1.5 Golang 唤醒
前面我们讨论的很多步骤里都涉及到协程的阻塞.例如 Accept 时如果新连接还尚未到达.再比如像 Read 数据的时候对方还没有发送,当前协程都不会占着 CPU 不放,而是会阻塞起来.
那么当要等待的事件就绪的时候,被阻塞掉的协程又是如何被重新调度的呢? 相信大家一定会好奇这个问题.
Go 语言的运行时会在调度或者系统监控中调用 sysmon,它会调用 netpoll,来不断地调用 epoll_wait 来查看 epoll 对象所管理的文件描述符中哪一个有事件就绪需要被处理了.如果有,就唤醒对应的协程来执行.
其实除此之外还有几个地方会唤醒协程,如:
startTheWorldWithSemafindrunnable在schedule中调用- 有
top和stop之分.其中stop中会导致阻塞. pollWork
不过为了简便起见,我们只选择 sysmon 来作为一个切入点.sysmon 是一个周期性的监控协程,来看源码.
//file:internal/poll/fd_unix.go
func (fd *FD) Write(p []byte) (int, error) {
for {
n, err := syscall.Write(fd.Sysfd, p[nn:max])
if err == syscall.EAGAIN && fd.pd.pollable() {
if err = fd.pd.waitWrite(fd.isFile); err == nil {
continue
}
}
}
}
//file:internal/poll/fd_poll_runtime.go
func (pd *pollDesc) waitWrite(isFile bool) error {
return pd.wait('w', isFile)
}
func (pd *pollDesc) wait(mode int, isFile bool) error {
...
res := runtime_pollWait(pd.runtimeCtx, mode)
}
//file:src/runtime/proc.go
func sysmon() {
...
list := netpoll(0)
}它会不断触发对 netpoll 的调用,在 netpoll 会调用 epollwait 来查看是否有网络事件发生.
在 epoll 返回的时候,ev.data 中是就绪的网络 socket 的文件描述符.根据网络就绪 fd 拿到 pollDesc.在 netpollready 中,将对应的协程推入可运行队列等待调度执行.
//file:runtime/netpoll_epoll.go
func netpoll(delay int64) gList {
...
retry:
n := epollwait(epfd, &events[0], int32(len(events)), waitms)
if n < 0 {
//没有网络事件
goto retry
}
for i := int32(0); i < n; i++ {
//查看是读事件还是写事件发生
var mode int32
if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'r'
}
if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'w'
}
if mode != 0 {
pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
pd.everr = false
if ev.events == _EPOLLERR {
pd.everr = true
}
netpollready(&toRun, pd, mode)
}
}
}
//file:runtime/netpoll.go
func netpollready(toRun *gList, pd *pollDesc, mode int32) {
var rg, wg *g
if mode == 'r' || mode == 'r'+'w' {
rg = netpollunblock(pd, 'r', true)
}
if mode == 'w' || mode == 'r'+'w' {
wg = netpollunblock(pd, 'w', true)
}
if rg != nil {
toRun.push(rg)
}
if wg != nil {
toRun.push(wg)
}
}本节总结
同步编码方式的优点是符合人的直线思维.在这种模式下的代码很容易写,写出来也容易理解,但是缺点就是性能奇差.因为会导致频繁的线程上下文切换.
所以现在 epoll 是 Linux 下网络程序工作的最主要的模式.现在各种语言下的流行的网络框架模型都是基于 epoll 来工作的.区别就是各自对 epoll 的使用方式上存在一些差别.主流各种基于 epoll 的异步非阻塞的模型虽然提高了性能,但是基于回调函数的编程方式却非常不符合人的的直线思维模式.开发出来的代码也不那么容易被理解.
Golang 开辟了一种新的网络编程模型.这种模型在应用层看来仍然是同步的方式.但是在底层确实通过协程和 epoll 的配合避免了线程切换的性能高损耗,因此并不会阻塞用户线程.代替的是切换开销更小的协程.协程的切换开销大约只有线程切换的三十分之一,参见《协程究竟比线程牛在什么地方?》
我个人一直觉得,Golang 封装的网络编程模型非常之精妙,是世界级的代码.它非常值得你好好学习一下.
知识星球
在知识星球中我们会进行内核等底层技术的视频讲解,能让你的底层学起来更快,事半功倍.还会进行线上问题排查以及性能优化等方面的案例分享和交流.对大家技术深度和广度的积累很有好处.有想继续加入知识星球的同学微信扫描下面的二维码即可加入.另外在公众号后台发送「星球优惠券」可获取开发内功修炼读者的专属优惠券.
12.2 高性能 Netty 网络框架
Netty 是一个在 Java 生态里应用非常广泛的的网络编程工具包,它在 2004 年诞生到现在依然是火的一塌糊涂,光在 github 上就有 30000 多个项目在用.所以要想更好地掌握网络编程,我想就绕不开 Netty.所以今天我们就来分析分析 Netty 内部网络模块的工作原理.
友情提示
本文算上代码将近有两三万字,比较长,如果时间紧迫中间部分可以跳着看.第一节和最后的第六节建议必读.当然直接拖到尾部收藏点赞点转发,也是 ok 的,哈哈!
12.2.1 Netty 用法
我们首先找一个 Netty 的例子,本篇文章整体都是围绕这个例子来展开叙述的.我们下载 Netty 的源码,并在 examples 中找到 echo 这个 demo.同时,为了防止代码更新导致对本文叙述的影响,我们切到 4.1 分支上来.
在这个 demo 的 EchoServer 中,展示了使用 Netty 写 Server 的经典用法.(飞哥在文章中会在不影响核心逻辑的表达上,对原始代码进行适当的精简,比如下面代码中的 try 就被我丢了)
# git checkout https://github.com/netty/netty.git
# git checkout -b 4.1
# cd example/src/main/java/io/netty/example/echopublic final class EchoServer {
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
final EchoServerHandler serverHandler = new EchoServerHandler();
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
p.addLast(serverHandler);
}
});
// Start the server.
ChannelFuture f = b.bind(PORT).sync();
......
}
}如果你是一个 Java 新手,或者干脆像飞哥一样没用 Netty 写过服务,相信上述代码基本是看不懂的.究其根本原因是相比 C/C++,Java 的封装程度比较高.Java 语言本身的 JVM 中 NIO 对网络的封装就已经屏蔽了很多底层的概念了,再加上 Netty 又封装了一层,所以 Java 开发者常用的一些术语和概念和其它语言出入很大.
比如上面代码中的 Channel、NioEventLoopGroup 等都是其它语言中所没见过的.不过你也不用感到害怕,因为这里的每一个概念都是 socket、进程等底层概念穿了一身不同的衣服而已.接下来我们分别详细了解一些这些概念.
1) NioEventLoopGroup
如果你没接触过 Netty,可以简单把 NioEventLoopGroup 理解为一个线程池就可以.每一个 NioEventLoopGroup 内部包含一个或者多个 NioEventLoop.
其中 NioEventLoop 是对线程、epoll 等概念进行了一个集中的封装.
首先,EventLoop 本身就是一个线程.为什么这么说,我们通过看 NioEventLoop 的继承关系就能看出来.
NioEventLoop 继承于 SingleThreadEventLoop,而 SingleThreadEventLoop 又继承于 SingleThreadEventExecutor.SingleThreadEventExecutor 实现了在 Netty 中对本地线程的抽象.
在 SingleThreadEventExecutor 中不但封装了线程对象 Thread,而且还配置了一个任务队列 taskQueue,用于其它线程向它来放置待处理的任务.
public abstract class SingleThreadEventExecutor extends ... {
private volatile Thread thread;
private final Queue<Runnable> taskQueue;
}2) selector
另外 NioEventLoop 以 selector 的名义封装了 epoll(在 Linux 操作系统下).
在 NioEventLoop 对象内部,会有 selector 成员定义.这其实就是封装的 epoll 而来的.我们来看具体的封装过程.以及 selectedKeys,这是从 selector 上发现的待处理的事件列表.
public final class NioEventLoop extends SingleThreadEventLoop{
// selector
private Selector selector;
private Selector unwrappedSelector;
// selector 上发现的各种待处理事件
private SelectedSelectionKeySet selectedKeys;
}NioEventLoopGroup 在构造的时候,会调用 SelectorProvider#provider 来生成 provider,在默认情况下会调用 sun.nio.ch.DefaultSelectorProvider.create 来创建.
在 Linux 下,默认创建的 provider 使用的就是 epoll.
//file:java/nio/channels/spi/SelectorProvider.java
public abstract class SelectorProvider {
public static SelectorProvider provider() {
// 1. java.nio.channels.spi.SelectorProvider 属性指定实现类
// 2. SPI 指定实现类
......
// 3. 默认实现,Windows 和 Linux 下不同
provider = sun.nio.ch.DefaultSelectorProvider.create();
return provider;
}
}
//file:sun/nio/ch/DefaultSelectorProvider.java
public class DefaultSelectorProvider {
public static SelectorProvider create() {
String osname = AccessController
.doPrivileged(new GetPropertyAction("os.name"));
if (osname.equals("Linux"))
return createProvider("sun.nio.ch.EPollSelectorProvider");
}
}3) Channel
Channel 是 Java NIO 里的一个概念.大家把它理解成 socket,以及在 socket 之上的一系列操作方法的封装就可以了.
Java 在 Channel 中把 connect、bind、read、write 等方法都以成员方法的形式给封装起来了.
public interface Channel extends ... {
Channel read();
Channel flush();
......
interface Unsafe {
void bind(SocketAddress localAddress, ...);
void connect(SocketAddress remoteAddress, ...);
void write(Object msg, ...);
......
}
}另外在 Java 中,习惯把 listen socket 叫做父 channel,客户端握手请求到达以后创建出来的新连接叫做子 channel,方便区分.
4) Pipeline
在每个 Channel 对象的内部,除了封装了 socket 以外,还有一个特殊的数据结构 DefaultChannelPipeline pipeline.在这个 pipeline 里是各种时机里注册的 handler.
Channel 上的读写操作都会走到这个 DefaultChannelPipeline 中,当 channel 上完成 register、active、read、readComplete 等操作时,会触发 pipeline 中的相应方法.
这个 ChannelPipeline 其实就是一个双向链表,以及链表上的各式各样的操作方法.
public interface ChannelPipeline {
ChannelPipeline addFirst(String name, ChannelHandler handler);
ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler);
ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler);
ChannelPipeline addLast(String name, ChannelHandler handler);
ChannelPipeline fireChannelRead(Object msg);
}5) EchoServer 解读
现在我们具备了对 Java、对 Netty 的初步理解以后,我们再回过来看一下开篇提到的 EchoServer 源码.
在该代码一开头,bossGroup = new NioEventLoopGroup(1) 这一行是创建了一个只有一个线程的线程池.workerGroup = new NioEventLoopGroup() 又创建了 worker 线程池,没有指定数量,Netty 内部会根据当前机器的 CPU 核数来灵活决定.
ServerBootstrap 这是一个脚手架类,是为了让我们写起服务器程序来更方便一些.
b.group(bossGroup, workerGroup) 这一行是将两个线程池传入,第一个作为 boss 只处理 accept 接收新的客户端连接请求.第二个参数作为 worker 线程池,来处理连接上的请求接收、处理以及结果发送发送.
public final class EchoServer {
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
final EchoServerHandler serverHandler = new EchoServerHandler();
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
p.addLast(serverHandler);
}
});
// Start the server.
ChannelFuture f = b.bind(PORT).sync();
......
}
}我们注意下 childHandler 是传入了一个 ChannelInitializer,这是当有新的客户端连接到达时会回调的一个方法.在这个方法内部,我们给这个新的 channel 的 pipeline 上添加了一个处理器 serverHandler,以便收到数据的时候执行该处理器进行请求处理.
上面的几个方法都是定义,在 b.bind() 方法中真正开始启动服务,创建父 channel(listen socket),创建 boss 线程.当有新连接到达的时候 boss 线程再创建子 channel,为其 pipeline 添加处理器,并启动 worker 线程来进行处理.
12.2.2 Netty bootstrap 参数构建
简言之 bootstrap.group() .channel() .childHandler() .childOption() 就是在构建 Netty Server 的各种参数.
1) group 设置
ServerBootstrap 和其父类 AbstractBootstrap 内部分别定义了两个 EventLoopGroup group 成员.父类 AbstractBootstrap 的 group 是用来处理 accept 事件的,ServerBootstrap 下的 childGroup 用来处理其它所有的读写等事件.
group() 方法就是把 EventLoopGroup 参数设置到自己的成员上完事.其中如果调用 group() 只传入了一个线程池,那么将来本服务下的所有事件都由这个线程池来处理.详情查看飞哥精简后的源码.
//file:io/netty/bootstrap/ServerBootstrap.java
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel>
{
//用来处理非 accept 以外的线程池
private volatile EventLoopGroup childGroup;
public ServerBootstrap group(EventLoopGroup group) {
return group(group, group);
}
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
super.group(parentGroup);
this.childGroup = ObjectUtil.checkNotNull(childGroup, "childGroup");
return this;
}
}
public abstract class AbstractBootstrap ... {
//用来处理 accept 的线程
volatile EventLoopGroup group;
public B group(EventLoopGroup group) {
this.group = group;
......
}
}2) channel 设置
再看 ServerBootstrap#channel 方法是用来定义一个工厂方法,将来需要创建 channel 的时候都调用该工厂进行创建.
回头看本文开头 demo,.channel(NioServerSocketChannel.class) 指的是将来需要创建 channel 的时候,创建 NioServerSocketChannel 这个类型的.
//file:io/netty/bootstrap/ServerBootstrap.java
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel>
{
public B channel(Class<? extends C> channelClass) {
return channelFactory(new ReflectiveChannelFactory<C>(
ObjectUtil.checkNotNull(channelClass, "channelClass")
));
}
}3) option 设置
再看 option 方法,只是设置到了 options 成员中而已.
//file:io/netty/bootstrap/ServerBootstrap.java
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel>
{
public <T> B option(ChannelOption<T> option, T value) {
ObjectUtil.checkNotNull(option, "option```java
synchronized (options) {
if (value == null) {
options.remove(option);
} else {
options.put(option, value);
}
}
return self();
}
}4) handler 方法
本文 demo 设置了两处 handler,一处是 handler,另一处是 childHandler。他们都是分别设置到自己的成员上就完事,看源码。
//file:io/netty/bootstrap/AbstractBootstrap.java
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> {
private volatile EventLoopGroup group;
private volatile ChannelHandler handler;
...
public B handler(ChannelHandler handler) {
this.handler = ObjectUtil.checkNotNull(handler, "handler");
return self();
}
}
//file:io/netty/bootstrap/ServerBootstrap.java
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
private volatile ChannelHandler childHandler;
...
public ServerBootstrap childHandler(ChannelHandler childHandler) {
this.childHandler = ObjectUtil.checkNotNull(childHandler, "childHandler");
return this;
}
}NOTE
在 ServerBootstrap 中,
handler设置的处理器作用在父 channel(即监听 socket)上,用于处理服务器级别的日志等事件;而childHandler设置的处理器作用在子 channel(即每个客户端连接)上,用于处理具体的业务逻辑。
12. 第十二章 高性能语言与框架
12.2.3 Netty bootstrap 启动服务
ServerBootstrap 下的 bind 方法是服务启动过程中非常重要的一个方法。创建父 channel(listen socket),创建 boss 线程,为 boss 线程绑定 Acceptor 处理器,调用系统调用 bind 进行绑定和监听都是在这里完成的。
先来直接看一下 bind 相关的入口源码。
//file:io/netty/bootstrap/ServerBootstrap.java
public class ServerBootstrap extends ...... {
public B handler(ChannelHandler handler) {
this.handler = ObjectUtil.checkNotNull(handler, "handler");
return self();
}
public ServerBootstrap childHandler(ChannelHandler childHandler) {
this.childHandler = ObjectUtil.checkNotNull(childHandler, "childHandler");
return this;
}
}//file:io/netty/bootstrap/ServerBootstrap.java
public class ServerBootstrap extends AbstractBootstrap ... {
......
}//file:io/netty/bootstrap/AbstractBootstrap.java
public abstract class AbstractBootstrap ... {
public ChannelFuture bind(SocketAddress localAddress) {
validate();
return doBind(...);
}
private ChannelFuture doBind(final SocketAddress localAddress) {
//创建父 channel、初始化并且注册
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
......
//如果 Register 已经完成,则直接 doBind0
if (regFuture.isDone()) {
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
//否则就注册一个 listener(回调),等 register 完成的时候调用
} else {
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
});
return promise;
}
}
//创建 channel,对其初始化,并且 register(会创建 parent 线程)
final ChannelFuture initAndRegister() {
//3.1 创建父 channel(listen socket)
channel = channelFactory.newChannel();
//3.2 对父 channel(listen socket)进行初始化
init(channel);
//3.3 注册并启动 boss 线程
ChannelFuture regFuture = config().group().register(channel);
......
}
//3.4 真正的bind
private static void doBind0(...) {
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
......
}
});
}
}在这个过程中,做了如下几件重要的事情:
- 创建父 channel(listen socket)
- 对父 channel(listen socket)进行初始化
- register 父 channel(listen socket)到主 group,并启动主进程
- 真正的 bind
接下来我们分开来看。
1)创建父 channel(listen socket)
在 initAndRegister() 方法中创建 channel(socket),它调用了 channelFactory.newChannel()。
回想下 12.2.2 中 2)节的 channel 方法,返回的是一个反射 ReflectiveChannelFactory。没错这里的 newChannel 就是调用这个工厂方法来创建出来一个 NioServerSocketChannel 对象。
public abstract class AbstractBootstrap
//创建 channel,对其初始化,并且 register(会创建 parent 线程)
final ChannelFuture initAndRegister() {
//3.1 创建 listen socket
channel = channelFactory.newChannel();
......
}
}2)对父 channel(listen socket)进行初始化
在 initAndRegister 创建除了 channel 之后,需要调用 init 对其进行初始化。
在 init() 中对 channel 进行初始化,一是给 options 和 attrs 赋值,二是构建了父 channel 的 pipeline。
public abstract class AbstractBootstrap
final ChannelFuture initAndRegister() {
//3.1 创建父 channel(listen socket)
//3.2 对父 channel(listen socket)进行初始化
init(channel);
......
}
}//file:src/main/java/io/netty/bootstrap/ServerBootstrap.java
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
void init(Channel channel) {
//设置 option 和 attr
setChannelOptions(channel, newOptionsArray(), logger);
setAttributes(channel, newAttributesArray());
//设置 pipeline
ChannelPipeline p = channel.pipeline();
p.addLast(new ChannelInitializer<Channel>() {
......
});
}
}在 setChannelOptions 中对 channel 的各种 option 进行设置。回忆我们在使用 ServerBootstrap 时可以传入 SO_BACKLOG,这就是其中的一个 option。在这里会真正设置到 channel(socket)上。
SO_BACKLOG 选项
在创建 ServerBootstrap 时,可以通过
b.option(ChannelOption.SO_BACKLOG, 100)设置 backlog 参数。
在 init 中,稍微难理解一点是 p.addLast(new ChannelInitializer...)。这一段代码只是给父 channel 添加一个 handler 而已。其真正的执行要等到 register 后,我们待会再看。
3)register 父 channel
父 channel 在创建完,并且初始化之后,需要注册到 boss 线程上才可用。
其中 config().group() 最终会调用到 AbstractBootstrap#group,在这个方法里获取的是我们传入进来的 bossGroup。
其中 bossGroup 是一个 NioEventLoopGroup 实例,所以代码会进入 NioEventLoopGroup#register 方法。
public abstract class AbstractBootstrap
final ChannelFuture initAndRegister() {
//3.1 创建父 channel(listen socket)
//3.2 对父 channel(listen socket)进行初始化
//3.3 注册并启动 boss 线程
ChannelFuture regFuture = config().group().register(channel);
......
}
}
public abstract class AbstractBootstrap
volatile EventLoopGroup group;
public final EventLoopGroup group() {
return group;
}
}public class NioEventLoopGroup extends MultithreadEventLoopGroup {}
public abstract class MultithreadEventLoopGroup extends ... {
@Override
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
@Override
public EventLoop next() {
return (EventLoop) super.next();
}
}在 NioEventLoopGroup 里包含一个或多个 EventLoop。上面的 next 方法就是从中选择一个出来,然后将 channel 注册到其上。
对于本文来讲,我们使用的是 NioEventLoopGroup,其内部包含的自然也就是 NioEventLoop,我们继续查找其 register 方法。
可见,NioEventLoop 的 register 最后又调用到 channel 的 register 上了。在我们本文中,我们创建的 channel 是 NioServerSocketChannel,我们就依照这条线索来查。
public final class NioEventLoop extends SingleThreadEventLoop
//在 eventloop 里注册一个 channle(socket)
public void register(final SelectableChannel ch, ...) {
......
register0(ch, interestOps, task);
}
//最终调用 channel 的 register
private void register0(SelectableChannel ch, int interestOps, NioTask<?> task) {
ch.register(unwrappedSelector, interestOps, task);
}
}//file:src/main/java/io/netty/channel/AbstractChannel.java
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
......
//关联自己到 eventLoop
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (...) {
......
}
}
}
}在 channel 的父类 AbstractChannel 中的 register 中,先是把自己关联到传入的 eventLoop 上。接着调用 inEventLoop 来判断线程当前运行的线程是否是 EventExecutor 的支撑线程,是则返回直接 register0。
一般来说,服务在启动的时候都是主线程在运行。这个时候很可能 boss 线程还没有启动。所以如果发现当前不是 boss 线程的话,就调用 eventLoop.execute 来启动 boss 线程。
NioEventLoop 的父类是 SingleThreadEventExecutor,找到 execute 方法。
我们先来看 addTask(task),它是将 task 添加到任务队列中。等待线程起来以后再运行。
inEventLoop() 是判断当前线程是不是自己绑定的线程,这时还在主线程中运行,所以 inEventLoop 为假,会进入 startThread 开始为 EventLoop 创建线程。
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor
implements OrderedEventExecutor {
public void execute(Runnable task) {
execute0(task);
}
private void execute0(@Schedule Runnable task) {
execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task));
}
private void execute(Runnable task, boolean immediate) {
boolean inEventLoop = inEventLoop();
addTask(task);
if (!inEventLoop) {
startThread();
}
if (!addTaskWakesUp && immediate) {
wakeup(inEventLoop);
}
}
}
public abstract class SingleThreadEventExecutor extends ... {
private final Queue<Runnable> taskQueue;
protected void addTask(Runnable task) {
(task);
}
final boolean offerTask(Runnable task) {
return taskQueue.offer(task);
}
}在 doStartThread 中调用 Java 线程管理工具 Executor 来启动 boss 线程。
4)boss 线程启动
当线程起来以后就进入了自己的线程循环中了,会遍历自己的任务队列,然后开始处理自己的任务。
前面我们在 12.2.3 中 3)节看到 eventLoop.execute 把一个 Runnable 任务添加到了任务队列里。当 EventLoop 线程启动后,它会遍历自己的任务队列并开始处理。这时会进入 AbstractChannel#register0 方法开始运行。
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor
implements OrderedEventExecutor {
private void startThread() {
doStartThread();
......
}
private void doStartThread() {
executor.execute(new Runnable() {
@Override
public void run() {
SingleThreadEventExecutor.this.run();
......
}
});
}
}public final class NioEventLoop extends SingleThreadEventLoop {
protected void run() {
for (;;) {
if (!hasTasks()) {
strategy = select(curDeadlineNanos);
}
//如果有任务的话就开始处理
runAllTasks(0);
//任务处理完毕就调用 epoll_wait 等待事件发生
processSelectedKeys();
}
}
}//file:src/main/java/io/netty/channel/AbstractChannel.java
public abstract class AbstractChannel extends ... {
public final void register(...) {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
......
}
private void register0(ChannelPromise promise) {
doRegister();
......
}
}函数 doRegister 是在 AbstractNioChannel 类下。
//file:io/netty/channel/nio/AbstractNioChannel.java
public abstract class AbstractNioChannel extends AbstractChannel {
private final SelectableChannel ch;
protected SelectableChannel javaChannel() {
return ch;
}
public NioEventLoop eventLoop() {
return (NioEventLoop) super.eventLoop();
}
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
eventLoop().selectNow();
selected = true;
} else {
throw e;
}
}
}
}
}关键操作:将 listen socket 注册到 epoll
上面最关键的一句是
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);。这一句就相当于在 C 语言下调用epoll_ctl把 listen socket 添加到了 epoll 对象下。
javaChannel:获取父 channel,相当于 listen socketunwrappedSelector:获取 selector,相当于 epoll 对象register:相当于使用epoll_ctl执行 add 操作
当 channel 注册完后,前面 init 时注册的 ChannelInitializer 回调就会被执行。再回头看它的回调定义。
//file:src/main/java/io/netty/bootstrap/ServerBootstrap.java
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
void init(Channel channel) {
......
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {
......
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
}在 ChannelInitializer#initChannel 里,又给 boss 线程的 pipeline 里添加了一个任务。该任务是让其在自己的 pipeline 上注册一个 ServerBootstrapAcceptor handler。将来有新连接到达的时候,ServerBootstrapAcceptor 将会被执行。
5)真正的 bind
再看 doBind0 方法,调用 channel.bind 完成绑定。
private static void doBind0(...) {
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
......
}
});
}
}12.2.4 新连接到达
我们再回到 boss 线程的主循环中。
public final class NioEventLoop extends SingleThreadEventLoop {
protected void run() {
for (;;) {
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
//任务队列都处理完就开始 select
if (!hasTasks()) {
strategy = select(curDeadlineNanos);
}
//处理各种事件
if (strategy > 0) {
processSelectedKeys();
}
}
}
}假如线程任务队列中的任务都处理干净了的情况下,boss 线程会调用 select 来发现其 selector 上的各种事件。相当于 C 语言中的 epoll_wait。
当发现有事件发生的时候,例如 OP_WRITE、OP_ACCEPT、OP_READ 等的时候,会进入相应的处理。
对于服务端的 Unsafe.read() 这里会执行 io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe.read() 方法,它会调用 JDK 底层的 ServerSocketChannel.accept() 接收到客户端的连接后,将其封装成 Netty 的 NioSocketChannel,再通过 Pipeline 将 ChannelRead 事件传播出去,这样 ServerBootstrapAcceptor 就可以在 ChannelRead 回调里处理新的客户端连接了。
我们直接看 ServerBootstrapAcceptor#ChannelRead。
第十二章 高性能语言与框架
12.2.4 子Channel的注册与初始化
ServerBootstrapAcceptor 在 channelRead 中首先获取新创建的子 Channel,并为其 Pipeline 添加 childHandler。回顾 1.5 节,childHandler 是我们自定义的。
紧接着调用 childGroup.register(child) 将子 Channel 注册到 workerGroup 上。这个 register 过程与 12.2.3 中第 3)节、第 5)节的过程相同。区别在于:前面是父 Channel 注册到 bossGroup,这里是子 Channel 注册到 workerGroup。
register 完成后,子 Channel 被挂接到 workerGroup 的其中一个线程上,相应的线程(如果尚未创建)也会被创建出来并进入自己的线程循环。
当子 Channel 注册完毕时,childHandler 中 ChannelInitializer#initChannel 会被执行。
public final class NioEventLoop extends SingleThreadEventLoop {
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
// ...
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
ch.unsafe().forceFlush();
}
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
}
}public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 获取 child channel
final Channel child = (Channel) msg;
// 设置 childHandler 到 child channel
child.pipeline().addLast(childHandler);
// 设置 childOptions、childAttrs
setChannelOptions(child, childOptions, logger);
setAttributes(child, childAttrs);
// 将 child channel 注册到 childGroup
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
}
}
}public final class EchoServer {
public static void main(String[] args) throws Exception {
// ...
ServerBootstrap b = new ServerBootstrap();
b.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
p.addLast(serverHandler);
}
});
// ...
}
}在 initChannel 中,子 Channel 的处理类 serverHandler 被添加进来。Netty demo 中对该处理类的定义非常简单,仅仅只是打印而已。
12.2.5 用户请求到达
当 Worker 线程启动后,会进入线程循环(Boss 线程和 Worker 线程的 run 函数是同一个)。在循环中,会遍历自己的任务队列;如果没有任务可处理,便调用 select 来观察自己所负责的 Channel 上是否有事件发生。
Worker 线程会调用 select 发现自己所管理的所有子 Channel 上的可读可写事件。当发现有可读事件后,会调用 processSelectedKeys,最终触发 Pipeline 使得 EchoServerHandler 方法开始执行。
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
public void channelRead(/* ... */) {
ctx.write(msg);
}
// ...
}public final class NioEventLoop extends SingleThreadEventLoop {
protected void run() {
for (;;) {
if (!hasTasks()) {
strategy = select(curDeadlineNanos);
}
// 如果有任务的话就开始处理
runAllTasks(0);
// 任务处理完毕就调用 epoll_wait 等待事件发生
processSelectedKeys();
}
}
private int select(long deadlineNanos) throws IOException {
if (deadlineNanos == NONE) {
return selector.select();
}
// Timeout will only be 0 if deadline is within 5 microsecs
long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;
return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);
}
}12.2.6 总结
事实上,Netty 对网络的封装比较灵活。它既支持单线程 Reactor,也支持多线程 Reactor,还支持主从多线程 Reactor。三种模型对应的用法如下:
TIP
三种 Reactor 模型的配置方式:
// 单线程 Reactor EventLoopGroup eventGroup = new NioEventLoopGroup(1); ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(eventGroup); // ... // 多线程 Reactor EventLoopGroup eventGroup = new NioEventLoopGroup(); ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(eventGroup); // ... // 主从多线程 Reactor EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup); // ...
为了表述得更全面,本文飞哥选择的是最为经典的主从多线程 Reactor 模式。本文所描述的内容可以用下面一幅图来表示。
图:Netty 主从多线程 Reactor 模式架构
flowchart TD subgraph BossGroup BossThread["Boss Thread (NioEventLoop)"] end subgraph WorkerGroup Worker1["Worker Thread 1 (NioEventLoop)"] Worker2["Worker Thread 2 (NioEventLoop)"] WorkerN["Worker Thread N (NioEventLoop)"] end Client["客户端连接请求"] -->|"accept 事件"| BossThread BossThread -->|"创建子 Channel\n注册到 Worker"| WorkerGroup Worker1 -->|"监听 read/write 事件\n回调 EchoServerHandler"| Handler1["用户 Handler"] Worker2 -->|"监听 read/write 事件\n回调 EchoServerHandler"| Handler2["用户 Handler"] WorkerN -->|"监听 read/write 事件\n回调 EchoServerHandler"| HandlerN["用户 Handler"]
在 Netty 中,Boss 线程负责对父 Channel(Listen Socket)上事件的监听和处理;当有新连接到达时,选择一个 Worker 线程,把这个子 Channel(连接 Socket)交给 Worker 线程来处理。
其中 Worker 线程负责等待其管理的所有子 Channel(连接 Socket)上的事件的监听和处理。当发现有事件发生时,回调用户设置的 Handler 进行处理。在本文的例子中,这个用户 Handler 就是 EchoServerHandler#channelRead。
至此,Netty 网络模块的工作核心原理就介绍完了。飞哥一直“鼓吹”内功的好处。只要你具备了坚实的内功,各种语言里看似风马牛不相及的东西,在底层实际上原理是想通的。我本人从来没使用 Java 开发过服务器程序,更没碰过 Netty。但是当你对 epoll 有了深入理解时,再看 Netty 也能很容易看懂,很快就能理解它的核心。这就是锻炼内功的好处!
本章总结
本章总结了语言框架中对网络的封装方式。
- Golang 中官方的
net包,结合协程将多路复用网络 IO 封装起来,对外提供同步的方式进行编程。 - Java 生态中最经典的是 Netty,采用 Boss、Channel 等概念对网络编程进行了高度的抽象。
但底层还是最基本的线程、epoll 等核心。
作者简介
张彦飞,2010 年硕士毕业于西北大学计算机学院,有十多年的大型互联网公司项目经验,目前就职于腾讯。
他喜欢对技术进行深度思考,善于挖掘技术点背后的原理。他的技术文章特色鲜明,从日常工作中常见问题出发,深入到内核中挖掘原理,最后以通俗易懂的结论对问题进行归纳和总结。他的技术文在网络上受到了技术爱好者们广泛的喜爱。读者们都表示在看完文章之后有一种豁然开朗、醍醐灌顶的感觉,彻底弄清楚了以前困惑自己很久的问题。他的技术公众号「开发内功修炼」一年便收到五万多读者的关注,成为了硬核技术领域的知名技术号。
推荐语
这本书的出版令人欣喜。作者张彦飞能够坦露心扉,从 IT 大厂面试聊到中年焦虑,技术上从内核的工作原理聊起,将内核如何接收网络包、如何与用户进程协作讲解的深入浅出,从而带着读者深入理解网络 IO、TCP 连接建立过程,最后再与读者一起总结出系统性能优化的抓手与方案。阅读他的作品是一种享受,不但让我收获很多底层技术细节和原理,还能够激发我进一步思考。推荐给所有后端开发工程师阅读,与作者一起完成内功修炼。
—— 腾讯技术副总监,张勇
本书从实际工作中遇到的问题为出发点,以庖丁解牛式的手法,结合内核源码以简洁高效的话术把网络原理讲深讲透,是非常少见的网络进阶学习书籍。如果说 Stack Overflow、GitHub 是用于实践的“武当长拳”,那么本书属于提升内力的“小无相功”,前者能够找到可复用或可参考的业务逻辑实现,后者则在于提升“内力”。
—— 小米科技技术总监,黄学青
看彦飞的这部作品,满足你对 Linux 内核网络模块运行原理的好奇心,让你达到知其然也知其所以然的水平。通过深度掌握底层知识,也为你开发高性能后端应用打下坚实基础。
—— C++ Workflow 开源项目作者,谢翰
这是目前市面上少有的,深入底层回答实际工作中相关问题的书。飞哥通过日常工作中问题,带着你一步步深入其中的原理,不仅让你知其然,而且知其所以然。要提升自己,内功的修炼并不可少,飞哥的这本书给了你一个很好的起点,相信通过修炼自己的底层技术能力,你的技术能力一定会突飞猛进。
—— Go 语言中文网创始人,polarisxu
同时我也希望大家都能像飞哥一样不满足于解决一个问题,而是知道为什么,让自己爱思考,避免懒惰。相信你做到「内功」+「思考」,技术一定会突飞猛进!
而且,他不是简单的给出答案,而是通过源码分析的方式,同时绘制了大量的图,让读者更容易读懂、理解。
要提升自己,内功的修炼并不可少,飞哥的这本书给了你一个很好的起点,培养你重视内功的修炼,也教你如何修炼。同时,我也希望你做一个爱问问题的程序员,像飞哥一样,多问几个为什么,不满足于解决一个问题,而是知道为什么,让自己爱思考,避免懒惰。相信你做到「内功」+「思考」,技术一定会突飞猛进!
推荐序
因为写公众号的原因,结识了飞哥。短短的时间,「开发内功修炼」公众号粉丝增长飞速,单从这点看,大家对内功越来越重视。
一直以来,我特别重视基础、重视内功修炼,读书的时候就是如此。这点和飞哥不谋而合。这些年,面试过很多人,有服务端的,也有 Web 前端和客户端的,大部分人对网络的知识掌握的太少,很多还是工作 3 年以上的。很多人缺乏这方面的意识,从来不思考相关问题。举两个简单的例子:php-fpm 一般和 Nginx 是如何通信的?HTTP 请求参数放在 body 中,服务端该怎么解析?等等
几天前,飞哥跟我说,自己出了一本书,想找我给写个序,我没有多想,立马答应了。花时间认真研读了一遍,飞哥的行文非常新颖、亲切,采用了提问的方式进行讲解。通过日常工作的疑问,带着你一步步深入其中的原理,不仅让你知其然,而且知其所以然。这是目前市面上少有的,深入回答实际工作中大概率会遇到的问题(有些问题是飞哥工作中真实遇到的)。而且,他不是简单的给出答案,而是通过源码分析的方式,同时绘制了大量的图,让读者更容易读懂、理解。
很多人可能质疑:不懂这些,并不影响我完成工作的需求呀?为什么要懂这些呢?之前,也有同事问过我类似的问题。飞哥在本书中给出了相关的解答。我认为,主要有两个方面的原因:
- 从自身角度,你不应该只是会简单的 CURD,否则永远只是一个螺丝钉,永远没有大的突破。而内功,是你技术能突破、飞跃的基础,是保障;
- 从实际工作角度,工作中,我们会遇到各种各样的问题,不排除,有些问题你可以通过网络搜索到解决方案;
[Image 17 on Page 409] [Image 1730 on Page 411] [Image 1730 on Page 412] [Image 1744 on Page 415] [Image 1744 on Page 416] [Image 1762 on Page 420] [Image 17 on Page 424] [Image 1785 on Page 426] [Image 1786 on Page 426] [Image 1794 on Page 428] [Image 1795 on Page 428] [Image 1851 on Page 445] [Image 49 on Page 446]