第14章 网络通信:数据报文的接收
第13章完成了对数据包发送流程的分析和学习,本章则要学习数据包的接收过程,同样也从应用层开始入手,然后深入到内核的实现代码,从而真正理解接收数据的接口。本章也是网络通信的最后一章。
14.1 系统调用接口
与发送类似,内核也提供了多个接收数据的系统调用接口,接口定义如下:
#include <sys/types.h>
#include <sys/socket.h>
ssize_t recv(int sockfd, void *buf, size_t len, int flags);
ssize_t recvfrom(int sockfd, void *buf, size_t len, int flags,
struct sockaddr *src_addr, socklen_t *addrlen);
ssize_t recvmsg(int sockfd, struct msghdr *msg, int flags);与 send 类似,recv 一般也是面向连接的套接字。原因在于,对于非面向连接的套接字来说,若使用 recv 接收数据,通过该接口将不能获得发送端的地址,也就是说不知道这个数据是谁发过来的。所以,如果使用者不关心发送端信息,或者该信息可以从数据中获得,那么 recv 接口同样也可以用于非面向连接的套接字。
再来看看 recvfrom,它会通过额外的参数 src_addr 和 addrlen,来获得发送方的地址,其中需要注意的是 addrlen,它既是输入值又是输出值。
最后是 recvmsg,它与 sendmsg 一样,把接收到的数据和地址都保存在了 msg 中。其中 msg.msg_name 和 msg.msg_len 用于保存接收端地址,而 msg.msg_iov 用于保存接收到的数据。
这三个系统调用与对应的发送接口一样,都支持设置标志位 flags——都是比较现代的接口设计方法。
14.2 数据包从内核空间到用户空间的流程
第13章中,几个不同的发送数据包的系统调用,最终都是通过公共的函数 sock_sendmsg 来完成的。那么对于接收数据包的系统调用,我们相信它们也是殊途同归,最后会进入到一个公共的函数中。接下来,跟踪14.1节介绍的三个系统调用的实现,来证明我们的猜想。
首先是 recv 的源码:
asmlinkage long sys_recv(int fd, void __user *ubuf, size_t size,
unsigned flags)
{
return sys_recvfrom(fd, ubuf, size, flags, NULL, NULL);
}代码很简单,recv 完全是通过调用 sys_recvfrom 来实现的,仅仅是将 sys_recvfrom 的最后两个参数设置为0而已。
那么接下来就进入 recvfrom 的源码:
SYSCALL_DEFINE6(recvfrom, int, fd, void __user *, ubuf, size_t, size,
unsigned, flags, struct sockaddr __user *, addr,
int __user *, addr_len)
{
struct socket *sock;
struct iovec iov;
struct msghdr msg;
struct sockaddr_storage address;
int err, err2;
int fput_needed;
/* 限制读取字节长度的最大值为整数的最大值 INT_MAX */
if (size > INT_MAX)
size = INT_MAX;
/* 从文件描述符得到套接字结构 */
sock = sockfd_lookup_light(fd, &err, &fput_needed);
if (!sock)
goto out;
/* 控制信息清零 */
msg.msg_control = NULL;
msg.msg_controllen = 0;
/* 设置消息的数据段信息 */
msg.msg_iovlen = 1;
msg.msg_iov = &iov;
iov.iov_len = size;
iov.iov_base = ubuf;
/* 设置消息的存储地址信息 */
msg.msg_name = (struct sockaddr *)&address;
msg.msg_namelen = sizeof(address);
/* 如果套接字设置了 O_NONBLOCK 标志,即非阻塞标志,则设置 MSG_DONTWAIT 标志,表示此次接收消息,无须等待 */
if (sock->file->f_flags & O_NONBLOCK)
flags |= MSG_DONTWAIT;
/* 调用 sock_recvmsg 接收数据 */
err = sock_recvmsg(sock, &msg, size, flags);
/* 将地址信息复制到用户空间 */
if (err >= 0 && addr != NULL) {
err2 = move_addr_to_user((struct sockaddr *)&address,
msg.msg_namelen, addr, addr_len);
if (err2 < 0)
err = err2;
}
fput_light(sock->file, fput_needed);
out:
return err;
}后面的调用流程则为 sock_recvmsg → __sock_recvmsg → __sock_recvmsg_nosec。
下面跟踪第三个接收数据包的系统调用 recvmsg,代码如下:
SYSCALL_DEFINE3(recvmsg, int, fd, struct msghdr __user *, msg,
unsigned int, flags)
{
int fput_needed, err;
struct msghdr msg_sys;
/* 从文件描述符 fd 获得套接字 */
struct socket *sock = sockfd_lookup_light(fd, &err, &fput_needed);
if (!sock)
goto out;
/* __sys_recvmsg 用于实现接收数据 */
err = __sys_recvmsg(sock, msg, &msg_sys, flags, 0);
/* 释放 fd 引用(如果需要的话),这也是 fput_light 与 fput 的区别 */
fput_light(sock->file, fput_needed);
out:
return err;
}下面进入 __sys_recvmsg,代码如下:
static int __sys_recvmsg(struct socket *sock, struct msghdr __user *msg,
struct msghdr *msg_sys, unsigned flags, int nosec)
{
struct compat_msghdr __user *msg_compat =
(struct compat_msghdr __user *)msg;
struct iovec iovstack[UIO_FASTIOV];
struct iovec *iov = iovstack;
unsigned long cmsg_ptr;
int err, iov_size, total_len, len;
/* kernel mode address */
struct sockaddr_storage addr;
/* user mode address pointers */
struct sockaddr __user *uaddr;
int __user *uaddr_len;
/* 将消息头从用户空间复制到内核空间 */
if (MSG_CMSG_COMPAT & flags) {
if (get_compat_msghdr(msg_sys, msg_compat))
return -EFAULT;
} else if (copy_from_user(msg_sys, msg, sizeof(struct msghdr)))
return -EFAULT;
err = -EMSGSIZE;
/* 检查数据段的个数 */
if (msg_sys->msg_iovlen > UIO_MAXIOV)
goto out;
/*
为了避免频繁申请内存,内核在栈上申请了 UIO_FASTIOV 大小的 iovec 数组以供 iov 使用.
当数据段个数超过 UIO_FASTIOV 时,就需要动态申请内存.
*/
err = -ENOMEM;
iov_size = msg_sys->msg_iovlen * sizeof(struct iovec);
if (msg_sys->msg_iovlen > UIO_FASTIOV) {
iov = sock_kmalloc(sock->sk, iov_size, GFP_KERNEL);
if (!iov)
goto out;
}
/* 验证用户传递的数据段参数和地址参数 */
uaddr = (__force void __user *)msg_sys->msg_name;
uaddr_len = COMPAT_NAMELEN(msg);
if (MSG_CMSG_COMPAT & flags) {
err = verify_compat_iovec(msg_sys, iov,
(struct sockaddr *)&addr,
VERIFY_WRITE);
} else
err = verify_iovec(msg_sys, iov,
(struct sockaddr *)&addr,
VERIFY_WRITE);
if (err < 0)
goto out_freeiov;
total_len = err;
cmsg_ptr = (unsigned long)msg_sys->msg_control;
/* 确保消息标志中只有内核支持的两个标志 */
msg_sys->msg_flags = flags & (MSG_CMSG_CLOEXEC|MSG_CMSG_COMPAT);
/* 如果套接字为非阻塞,则设置标志位为不等待(非阻塞) */
if (sock->file->f_flags & O_NONBLOCK)
flags |= MSG_DONTWAIT;
/* 根据安全检查标志,调用不同的接收函数,但最终都会调用到 sock_recvmsg */
err = (nosec ? sock_recvmsg_nosec : sock_recvmsg)(sock, msg_sys,
total_len, flags);
if (err < 0)
goto out_freeiov;
len = err;
/* 将发送端的地址复制到用户空间 */
if (uaddr != NULL) {
err = move_addr_to_user((struct sockaddr *)&addr,
msg_sys->msg_namelen, uaddr,
uaddr_len);
if (err < 0)
goto out_freeiov;
}
......
......
}由上面的代码可以看出,内核提供的三个接收数据包的系统调用,最终确实如我们所期望的,都会走到一个共同的函数 __sock_recvmsg_nosec 里。下面来看一下这个函数,代码如下:
static inline int __sock_recvmsg_nosec(struct kiocb *iocb, struct socket *sock,
struct msghdr *msg, size_t size, int flags)
{
struct sock_iocb *si = kiocb_to_siocb(iocb);
sock_update_classid(sock->sk);
/* 设置套接字异步 IO 信息 */
si->sock = sock;
si->scm = NULL;
si->msg = msg;
si->size = size;
si->flags = flags;
/* 根据不同的套接字类型,调用不同的数据接收函数 */
return sock->ops->recvmsg(iocb, sock, msg, size, flags);
}根据上面的代码,后面的接收流程就要依赖于具体的协议实现了。
14.3 UDP数据包的接收流程
首先,我们来分析一下相对简单的UDP协议的数据包接收流程,代码如下:
int udp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
size_t len, int noblock, int flags, int *addr_len)
{
struct inet_sock *inet = inet_sk(sk);
/* 让 sin 指向 msg_name,用于保存发送端地址 */
struct sockaddr_in *sin = (struct sockaddr_in *)msg->msg_name;
struct sk_buff *skb;
unsigned int ulen, copied;
int peeked;
int err;
int is_udplite = IS_UDPLITE(sk);
bool slow;
/* 若 addr_len 不为 NULL,即用户传递了地址长度参数.进入了具体的协议层,已经可以明确地址的长度信息. */
if (addr_len)
*addr_len = sizeof(*sin);
/* 用户设置了 MSG_ERRQUEUE 标志,用于接收错误消息.因为这个应用并不广泛,因此在此忽略这种情况,不进入该函数. */
if (flags & MSG_ERRQUEUE)
return ip_recv_error(sk, msg, len);
try_again:
/* 接收了一个数据报文 */
skb = __skb_recv_datagram(sk, flags | (noblock ? MSG_DONTWAIT : 0),
&peeked, &err);
/* 若没有收到报文,则直接退出 */
if (!skb)
goto out;
/* 得到 UDP 的数据长度 */
ulen = skb->len - sizeof(struct udphdr);
/* 要复制的长度被初始化为用户指定的长度 */
copied = len;
/* 若复制长度大于 UDP 的数据长度,则调整复制长度为数据长度.若复制长度小于数据长度,则设置标志 MSG_TRUNC,表示数据发生了截断. */
if (copied > ulen)
copied = ulen;
else if (copied < ulen)
msg->msg_flags |= MSG_TRUNC;
/*
如果发生了数据截断,或者我们只需要部分覆盖的校验和,那么就在复制前进行校验.
*/
if (copied < ulen || UDP_SKB_CB(skb)->partial_cov) {
/* 进行 UDP 校验和校验 */
if (udp_lib_checksum_complete(skb))
goto csum_copy_err;
}
/* 判断是否需要进行校验和校验 */
if (skb_csum_unnecessary(skb)) {
/* 若不需要进行校验,则直接复制数据包内容到 msg_iov 中 */
err = skb_copy_datagram_iovec(skb, sizeof(struct udphdr),
msg->msg_iov, copied);
}
else {
/* 复制数据包内容的同时,进行校验和校验 */
err = skb_copy_and_csum_datagram_iovec(skb,
sizeof(struct udphdr),
msg->msg_iov);
if (err == -EINVAL)
goto csum_copy_err;
}
/* 复制错误检查 */
if (err)
goto out_free;
/* 如果不是 peek 动作,则增加相应的统计计数 */
if (!peeked)
UDP_INC_STATS_USER(sock_net(sk),
UDP_MIB_INDATAGRAMS, is_udplite);
/* 更新套接字的最新的接收数据包时间戳及丢包消息 */
sock_recv_ts_and_drops(msg, sk, skb);
/* 如果用户指定了保存对端地址的参数,则从数据包中复制地址和端口信息 */
if (sin) {
sin->sin_family = AF_INET;
sin->sin_port = udp_hdr(skb)->source;
sin->sin_addr.s_addr = ip_hdr(skb)->saddr;
memset(sin->sin_zero, 0, sizeof(sin->sin_zero));
}
/* 设置了接收控制消息 */
if (inet->cmsg_flags) {
/* 接收控制消息如 TTL、TOS 等 */
ip_cmsg_recv(msg, skb);
}
/* 设置了已复制的字节长度 */
err = copied;
if (flags & MSG_TRUNC)
err = ulen;
out_free:
/* 释放接收到的这个数据包 */
skb_free_datagram_locked(sk, skb);
out:
/* 返回读取的字节数 */
return err;
/* 错误处理 */
......
}关键洞察
从上面的代码中,我们可以得到一个大部分书中都不会涉及的信息。先想一想,在读取一个UDP数据包时,如果传递给接口的缓存空间小于UDP数据包的实际大小时,结果会是什么样的呢?对于TCP来说,这个问题比较简单,因为其是流协议,没有数据报文边界,所以这次未读取的数据,会在下一次读取时被复制。但是UDP是基于数据包的,从上面的内核源码可以看到,当缓存小于UDP报文的实际大小时,内核会将报文截断,只复制缓存大小的数据,同时设置上
MSG_TRUNC截断标志。这种情况,是很难从书本上了解到的,只有通过阅读源码才能理解其中的奥妙。
再进入 __skb_recv_datagram,来查看UDP是如何接收报文的,代码如下:
struct sk_buff *__skb_recv_datagram(struct sock *sk, unsigned flags,
int *peeked, int *err)
{
struct sk_buff *skb;
long timeo;
/* 检查套接字是否出错 */
int error = sock_error(sk);
if (error)
goto no_packet;
/* 得到超时时间,如果设置了 MSG_DONTWAIT,则超时为 0. */
timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
do {
unsigned long cpu_flags;
spin_lock_irqsave(&sk->sk_receive_queue.lock, cpu_flags);
/* 得到接收队列的第一个数据包 */
skb = skb_peek(&sk->sk_receive_queue);
if (skb) {
*peeked = skb->peeked;
/* 如果只是查看动作,则要增加数据包的引用计数,并不用把数据包从队列中移除. */
if (flags & MSG_PEEK) {
skb->peeked = 1;
atomic_inc(&skb->users);
} else {
/* 将数据包从接收队列中删除 */
__skb_unlink(skb, &sk->sk_receive_queue);
}
}
spin_unlock_irqrestore(&sk->sk_receive_queue.lock, cpu_flags);
/* 得到了数据包,直接返回 */
if (skb)
return skb;
/* 若已经没有了剩余的超时时间,则跳转到 no_packet 并返回 NULL */
error = -EAGAIN;
if (!timeo)
goto no_packet;
/* 使 task 在套接字上等待 */
} while (!wait_for_packet(sk, err, &timeo));
return```c
} while (!wait_for_packet(sk, err, &timeo));
return NULL;
no_packet:
*err = error;
return NULL;
}如果当前的UDP套接字没有数据包,则会进入 wait_for_packet 进行等待,代码如下:
static int wait_for_packet(struct sock *sk, int *err, long *timeo_p)
{
int error;
/* 定义等待队列和回调的唤醒函数 */
DEFINE_WAIT_FUNC(wait, receiver_wake_function);
/* 初始化等待队列,需要注意的是 TASK_INTERRUPTIBLE。这表明进程在睡眠等待时,是可以被中断的。 */
prepare_to_wait_exclusive(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
/* 检查套接字是否出错,如被 RESET。如有错误,则直接退出。 */
error = sock_error(sk);
if (error)
goto out_err;
/* 若接收队列不为空,则可以直接退出 */
if (!skb_queue_empty(&sk->sk_receive_queue))
goto out;
/* 检查套接字是否已经做了接收半关闭 */
if (sk->sk_shutdown & RCV_SHUTDOWN)
goto out_noerr;
/* 如果套接字是基于连接的,并且不是处于已连接状态或监听状态,则报错退出 */
error = -ENOTCONN;
if (connection_based(sk) &&
!(sk->sk_state == TCP_ESTABLISHED || sk->sk_state == TCP_LISTEN))
goto out_err;
/* 是否有未处理的信号 */
if (signal_pending(current))
goto interrupted;
error = 0;
/* 将当前进程调度出去,直到超时,即进程已经休眠了设定的超时时间。但是由于某些原因,进程被提前唤醒,
所以需要保存返回的时间 *timeo_p,表示还剩下多少时间。 */
*timeo_p = schedule_timeout(*timeo_p);
out:
finish_wait(sk_sleep(sk), &wait);
return error;
interrupted:
error = sock_intr_errno(*timeo_p);
out_err:
*err = error;
goto out;
out_noerr:
*err = 0;
error = 1;
goto out;
}至此,UDP数据包的接收流程已经跟踪完毕.但是这里遗留了一个问题:在上面的代码中,报文是从接收队列中获得的,但是数据包又是如何被保存到套接字的接收队列中的呢?这个问题留到后面再做分解讨论.
14.4 TCP数据包的接收流程
14.3节跟踪了UDP数据包的接收流程,本节来分析一下TCP数据包的接收流程.根据TCP协议的复杂性可想而知,其接收流程自然也比UDP要繁琐得多.
int tcp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
size_t len, int nonblock, int flags, int *addr_len)
{
struct tcp_sock *tp = tcp_sk(sk);
int copied = 0;
u32 peek_seq;
u32 *seq;
unsigned long used;
int err;
int target; /* Read at least this many bytes */
long timeo;
struct task_struct *user_recv = NULL;
int copied_early = 0;
struct sk_buff *skb;
u32 urg_hole = 0;
/* 对套接字上锁 */
lock_sock(sk);
err = -ENOTCONN;
/* 如果套接字为监听状态,则跳转到退出分支 */
if (sk->sk_state == TCP_LISTEN)
goto out;
/* 与UDP类似,得到超时时间 */
timeo = sock_rcvtimeo(sk, nonblock);
/* 设置了MSG_OOB标志,即带外数据,对于TCP来说,就是接收紧急数据 */
if (flags & MSG_OOB)
goto recv_urg;
/* 得到与预读取TCP数据相对应的序列号 */
seq = &tp->copied_seq;
if (flags & MSG_PEEK) {
peek_seq = tp->copied_seq;
seq = &peek_seq;
}
/*
因为TCP是流协议,数据没有边界,所以需要计算接收数据的最小长度。
1)若设置了MSG_WAITALL,则目标为用户指定的长度;
2)不然,则选择套接字的低水线和用户指定长度的最小值;
3)如果第二种情况的最小值为0,则数据长度为1字节;
*/
target = sock_rcvlowat(sk, flags & MSG_WAITALL, len);
/*
CONFIG_NET_DMA编译选项的含义为TCP接收复制卸载。
利用DMA来将接收到的数据复制到用户空间,从而节省CPU。
*/
#ifdef CONFIG_NET_DMA
tp->ucopy.dma_chan = NULL;
preempt_disable();
skb = skb_peek_tail(&sk->sk_receive_queue);
{
int available = 0;
/* 接收队列中有未读的数据包 */
if (skb) {
/* 计算可读的数据量 */
available = TCP_SKB_CB(skb)->seq + skb->len - (*seq);
}
if ((available < target) &&
(len > sysctl_tcp_dma_copybreak) && !(flags & MSG_PEEK) &&
!sysctl_tcp_low_latency &&
dma_find_channel(DMA_MEMCPY)) {
preempt_enable_no_resched();
/* 确定DMA要使用的数据段 */
tp->ucopy.pinned_list =
dma_pin_iovec_pages(msg->msg_iov, len);
} else {
preempt_enable_no_resched();
}
}
#endif
do {
u32 offset;
/* 判断是否正在读取紧急数据 */
if (tp->urg_data && tp->urg_seq == *seq) {
/* 如果已经读取了一定量的数据,则结束读取 */
if (copied)
break;
/* 如果有未处理的信号,也结束读取 */
if (signal_pending(current)) {
copied = timeo ? sock_intr_errno(timeo) : -EAGAIN;
break;
}
}
/* 遍历接收队列 */
skb_queue_walk(&sk->sk_receive_queue, skb) {
/* Now that we have two receive queues this
* shouldn't happen.
*/
if (WARN(before(*seq, TCP_SKB_CB(skb)->seq),
"recvmsg bug: copied %X seq %X rcvnxt %X fl %X\n",
*seq, TCP_SKB_CB(skb)->seq, tp->rcv_nxt,
flags))
break;
/* 取得在数据包中的偏移,即上次没有将这个数据包读取完毕 */
offset = *seq - TCP_SKB_CB(skb)->seq;
/* syn标志会占用一个sequence,所以偏移减一 */
if (tcp_hdr(skb)->syn)
offset--;
/* 若偏移小于数据包长度,则这个数据包就是要接收的数据包 */
if (offset < skb->len)
goto found_ok_skb;
/* 如果当前数据包包含FIN标志,则跳转到fin处 */
if (tcp_hdr(skb)->fin)
goto found_fin_ok;
WARN(!(flags & MSG_PEEK),
"recvmsg bug 2: copied %X seq %X rcvnxt %X fl %X\n",
*seq, TCP_SKB_CB(skb)->seq, tp->rcv_nxt, flags);
}
/* 若已经复制了超过目标的数据并且有积压的数据,则立刻跳出,并尝试处理积压数据。 */
if (copied >= target && !sk->sk_backlog.tail)
break;
/*
这里针对是否已经复制了部分数据做了条件判断,而且每个分支中都有相似的条件判断,为什么要
分两种情况呢?因为在读取过程中,如果发生了同样的错误,只读取了部分数据,那么系统调用的
返回值要返回成功读取的字节数;而未读取任何数据,则返回-1错误。
*/
if (copied) {
/*
已复制了部分数据,检查下面几个条件:
1)套接字出错。
2)连接已经关闭。
3)套接字关闭了接收端。
4)已经超时。
5)有待处理的信号。
若有一个条件符合,则跳出接收数据循环。
*/
if (sk->sk_err ||
sk->sk_state == TCP_CLOSE ||
(sk->sk_shutdown & RCV_SHUTDOWN) ||
!timeo ||
signal_pending(current))
break;
} else {
/*
若套接字设置了SOCK_DONE标志,则跳出循环。
对于TCP来说,被动关闭时,套接字会被设置上这个标志。这就意味着对端已经关闭,所以不
可能再有新的数据了。
*/
if (sock_flag(sk, SOCK_DONE))
break;
/* 判断套接字是否出错 */
if (sk->sk_err) {
copied = sock_error(sk);
break;
}
/* 套接字关闭了接收端 */
if (sk->sk_shutdown & RCV_SHUTDOWN)
break;
/* 套接字状态为关闭状态但又没有设置SOCK_DONE标志,这种情况只发生在用户企图从一个未连接的套接字中读取数据时。 */
if (sk->sk_state == TCP_CLOSE) {
if (!sock_flag(sk, SOCK_DONE)) {
copied = -ENOTCONN;
break;
}
break;
}
/* 已经超时 */
if (!timeo) {
copied = -EAGAIN;
break;
}
/* 有未处理的信号 */
if (signal_pending(current)) {
copied = sock_intr_errno(timeo);
break;
}
}
/* 清除已经读取的数据包 */
tcp_cleanup_rbuf(sk, copied);
/* 要进行低延时的TCP处理 */
if (!sysctl_tcp_low_latency && tp->ucopy.task == user_recv) {
/* 保存用户进程地址 */
if (!user_recv && !(flags & (MSG_TRUNC | MSG_PEEK))) {
user_recv = current;
tp->ucopy.task = user_recv;
tp->ucopy.iov = msg->msg_iov;
}
tp->ucopy.len = len;
WARN_ON(tp->copied_seq != tp->rcv_nxt &&
!(flags & (MSG_PEEK | MSG_TRUNC)));
/*
处理完receive queue,需要处理prequeue 。
TCP套接字有三个队列,需要按照以下顺序来处理:
1)receive_queue;
2)prequeue;
3)backlog;
*/
if (!skb_queue_empty(&tp->ucopy.prequeue))
goto do_prequeue;
/* __ Set realtime policy in scheduler __ */
}
#ifdef CONFIG_NET_DMA
if (tp->ucopy.dma_chan) {
if (tp->rcv_wnd == 0 &&
!skb_queue_empty(&sk->sk_async_wait_queue)) {
/*
接收窗口已经为0,并且有进程正在等待数据,这时就要尽快接收数据。所以这里的
dma操作为同步的。
*/
tcp_service_net_dma(sk, true);
tcp_cleanup_rbuf(sk, copied);
} else
dma_async_memcpy_issue_pending(tp->ucopy.dma_chan);
}
#endif
if (copied >= target) {
/* 若已经复制了超过目标的数据量,则释放该套接字 */
release_sock(sk);
lock_sock(sk);
} else {
/* 等待更多的数据 */
sk_wait_data(sk, &timeo);
}
#ifdef CONFIG_NET_DMA
tcp_service_net_dma(sk, false); /* Don't block */
tp->ucopy.wakeup = 0;
#endif
if (user_recv) {
int chunk;
/* __ Restore normal policy in scheduler __ */
if ((chunk = len - tp->ucopy.len) != 0) {
NET_ADD_STATS_USER(sock_net(sk),
LINUX_MIB_TCPDIRECTCOPYFROMBACKLOG, chunk);
len -= chunk;
copied += chunk;
}
/* 处理完receive_queue,再继续处理prequeue */
if (tp->rcv_nxt == tp->copied_seq &&
!skb_queue_empty(&tp->ucopy.prequeue)) {
do_prequeue:
tcp_prequeue_process(sk);
/* 计算从prequeue中读取的数据长度,并调整相应的len和copied。 */
if ((chunk = len - tp->ucopy.len) != 0) {
NET_ADD_STATS_USER(sock_net(sk),
LINUX_MIB_TCPDIRECTCOPYFROMPREQUEUE, chunk);
len -= chunk;
copied += chunk;
}
}
}
if ((flags & MSG_PEEK) &&
(peek_seq - copied - urg_hole != tp->copied_seq)) {
if (net_ratelimit())
printk(KERN_DEBUG "TCP(%s:%d): Application bug, race in MSG_PEEK.\n",
current->comm, task_pid_nr(current));
peek_seq = tp->copied_seq;
}
continue;
found_ok_skb:
/* Ok so how much can we use? */
/* 找到了正确的skb,计算该skb未读的可用数据长度 */
used = skb->len - offset;
/* 如果用户要读取的长度小于当前的剩余长度,则调整可用长度 */
if (len < used)
used = len;
/* 判断是否有紧急数据。
TCP的紧急数据又称带外数据,在协议定义本身一直都有些争议。所以其实现代码也比较奇怪。一般
不推荐在日常编码中使用紧急数据
*/
if (tp->urg_data) {
/* 得到紧急数据的偏移 */
u32 urg_offset = tp->urg_seq - *seq;
/* 判断紧急数据是否在我们要读取的数据范围内 */
if (urg_offset < used) {
if (!urg_offset) {
/* 判断紧急数据是否在普通数据流中 */
if (!sock_flag(sk, SOCK_URGINLINE)) {
/* 若不在普通数据流中,则要忽略当前这个字节 */
++*seq;
urg_hole++;
offset++;
used--;
if (!used)
goto skip_copy;
}
} else
used = urg_offset;
}
}
/* 没有设置截断标志 */
if (!(flags & MSG_TRUNC)) {
/* 先尝试使用DMA来将数据复制到用户空间 */
#ifdef CONFIG_NET_DMA
if (!tp->ucopy.dma_chan && tp->ucopy.pinned_list)
tp->ucopy.dma_chan = dma_find_channel(DMA_MEMCPY);
if (tp->ucopy.dma_chan) {
tp->ucopy.dma_cookie = dma_skb_copy_datagram_iovec(
tp->ucopy.dma_chan, skb, offset,
msg->msg_iov, used,
tp->ucopy.pinned_list);
if (tp->ucopy.dma_cookie < 0) {
printk(KERN_ALERT "dma_cookie < 0\n");
/* Exception. Bailout! */
if (!copied)
copied = -EFAULT;
break;
}
dma_async_memcpy_issue_pending(tp->ucopy.dma_chan);
if ((offset + used) == skb->len)
copied_early = 1;
} else
#endif
{
/* 复制数据到用户空间 */
err = skb_copy_datagram_iovec(skb, offset,
msg->msg_iov, used);
if (err) {
/* Exception. Bailout! */
if (!copied)
copied = -EFAULT;
break;
}
}
}
/* 调整序列号*seq、已复制长度、剩余长度 */
*seq += used;
copied += used;
len -= used;
/* 因为成功读取了数据,所以要调整TCP套接字的接收缓存 */
tcp_rcv_space_adjust(sk);
skip_copy:
/* 如果正在读取,并且已读取的序列号大于紧急数据,则意味着已经读取完了紧急数据,那么就
要重置urg_data,并且进行TCP快速路径检查(如果通过了检查条件,则打开快速路径开关。
打开快速路径的时候,表示接收的数据包是预期的数据包,TCP接收数据包时会做比较少的检
查,因此接收更为快速)
*/
if (tp->urg_data && after(tp->copied_seq, tp->urg_seq)) {
tp->urg_data = 0;
tcp_fast_path_check(sk);
}
/* 使用的数据长度加上偏移若小于数据包的长度,则该数据包可以继续使用 */
if (used + offset < skb->len)
continue;
/* 如果该数据包有FIN标志,则跳转到found_fin_ok */
if (tcp_hdr(skb)->fin)
goto found_fin_ok;
/* 如果没有设置MSG_PEEK标志,则需要从接收队列中消耗掉这个数据包,并根据copied_
early标志,将其直接释放,或者放置到异步队列
*/
if (!(flags & MSG_PEEK)) {
sk_eat_skb(sk, skb, copied_early);
copied_early = 0;
}
continue;
found_fin_ok:
/* 这里开始处理FIN数据包 */
/* FIN标志也占用一个序列号,因此要给序列号加一 */
++*seq;
/* 与前文相同,不再重复注释 */
if (!(flags & MSG_PEEK)) {
sk_eat_skb(sk, skb, copied_early);
copied_early = 0;
}
/* 接收到FIN标志,表示对端已经关闭了写通道,那么对于本端来说,这是最后一个可读数据包,
因此退出循环 */
break;
} while (len > 0);
if (user_recv) {
/* prequeue队列中仍然有未读取的数据包 */
if (!skb_queue_empty(&tp->ucopy.prequeue)) {代码中断
原文在此处截断,后续部分未提供.但根据上下文,
tcp_recvmsg函数的剩余部分应包含循环结束后的清理、锁释放、以及返回值设置等逻辑.
函数要点总结
tcp_recvmsg是 TCP 数据接收的核心系统调用实现.- 处理流程:加锁 → 检查状态 → 获取超时 → 处理紧急数据 → 遍历接收队列 → 复制数据 → 处理队列顺序(receive_queue → prequeue → backlog).
- 使用
target确定最少读取字节数(基于MSG_WAITALL或低水线). - 支持 DMA 复制卸载(
CONFIG_NET_DMA可选). - 紧急数据(带外数据)处理复杂,不推荐使用.
- 遇到 FIN 标志表示对端关闭,为本端最后一个可读数据包.
- 通过
sk_eat_skb消耗已读取的数据包,并考虑copied_early标志.
继续阅读
下一部分将分析```c } }
> [!NOTE] **原文至此结束**
>
> 以上为 `tcp_recvmsg` 函数在该文档中所提供的完整片段.后续内容(如循环结束后的清理、锁释放、返回值设置等)未在原始文本中给出,14.4 节就此结束.
int chunk;
/* 设置要读取的长度
*/
tp->ucopy.len = copied > 0 ? len : 0;
/* 处理
prequeue队列
*/
tcp_prequeue_process(sk);
if (copied > 0 && (chunk = len - tp->ucopy.len) != 0) {
NET_ADD_STATS_USER(sock_net(sk), LINUX_MIB_TCPDIRECTCOPYFROMPREQUEUE, chunk);
len -= chunk;
copied += chunk;
}
}
tp->ucopy.task = NULL;
tp->ucopy.len = 0;
}
#ifdef CONFIG_NET_DMA
tcp_service_net_dma(sk, true); /* 等待队列排空 */
tp->ucopy.dma_chan = NULL;
if (tp->ucopy.pinned_list) {
dma_unpin_iovec_pages(tp->ucopy.pinned_list);
tp->ucopy.pinned_list = NULL;
}
#endif
/* 释放已经读取的数据包
*/
tcp_cleanup_rbuf(sk, copied);
/* 释放套接字控制权
*/
release_sock(sk);
return copied;
out:
release_sock(sk);
return err;
recv_urg:
/* 接收紧急数据
*/
err = tcp_recv_urg(sk, msg, len, flags);
goto out;
}
尽管上面的 `tcp_recvmsg` 已经加了大量的注释,但是由于这个函数的逻辑过于复杂,再加上 TCP 接收队列的多样性,即使已经看完了这个函数的实现,却仍然无法清楚地掌握它的整体脉络.接下来,我们
### 14.5 TCP套接字的三个接收队列
在Linux内核中,除了错误队列外,TCP套接字一共有三个接收队列.它们分别是 `struct sock` 中的 `sk_receive_queue` 和 `sk_backlog`,以及 `struct tcp_sock` 中的 `prequeue`.先简单介绍一下它们各自的用途,然后再看具体的代码实现.
- **sk_receive_queue**:真正的接收队列.收到的TCP数据包经过检查和处理后,就会保存在这个队列中,用户态也是从这里读取数据的.
- **sk_backlog**:当socket正处于用户进程上下文(即用户正在对socket进行系统调用,如recv)时,若Linux内核收到数据包,在软中断的处理过程中,内核会将数据包保存在 `sk_backlog` 中,然后直接返回.
- **prequeue**:当该socket没有被用户进程使用时,由软中断直接将数据包保存在 `prequeue` 中,并返回.
从上面的说明可以看出,对于TCP套接字,它不管用户态是否正在使用套接字,都不做真正的处理,而是把数据包保存在队列中.这是因为TCP协议相对复杂,内核为了尽快让软中断结束,就不进行多余的处理了,尽量在用户进程上下文中处理数据包.
下面来看看TCP相关的源代码.
首先,查看TCP的接收处理函数 `tcp_v4_rcv` 中的一部分代码:
```c
bh_lock_sock_nested(sk);
ret = 0;
if (!sock_owned_by_user(sk)) {
/* 用户态没有正在使用这个套接字 */
#ifdef CONFIG_NET_DMA
struct tcp_sock *tp = tcp_sk(sk);
if (!tp->ucopy.dma_chan && tp->ucopy.pinned_list)
tp->ucopy.dma_chan = dma_find_channel(DMA_MEMCPY);
if (tp->ucopy.dma_chan)
ret = tcp_v4_do_rcv(sk, skb);
else
#endif
{
/* 先尝试保存到prequeue中,若失败的话再进入TCP真正的处理函数中 */
if (!tcp_prequeue(sk, skb))
ret = tcp_v4_do_rcv(sk, skb);
}
/* 若该套接字正在被用户态使用,则将数据包保存到backlog中。如果失败的话,就丢弃这个包。 */
} else if (unlikely(sk_add_backlog(sk, skb))) {
bh_unlock_sock(sk);
NET_INC_STATS_BH(net, LINUX_MIB_TCPBACKLOGDROP);
goto discard_and_relse;
}
bh_unlock_sock(sk);
然后进入 tcp_prequeue,看看什么时候会返回失败,代码如下:
static inline int tcp_prequeue(struct sock *sk, struct sk_buff *skb)
{
struct tcp_sock *tp = tcp_sk(sk);
/* 配置了低延时TCP,或者该套接字没有对应的用户态进程,返回失败。让内核直接处理TCP数据包。 */
if (sysctl_tcp_low_latency || !tp->ucopy.task)
return 0;
/* 将数据包追加到prequeue队列中,并增加相应的内存统计。 */
__skb_queue_tail(&tp->ucopy.prequeue, skb);
tp->ucopy.memory += skb->truesize;
if (tp->ucopy.memory > sk->sk_rcvbuf) {
/* 当超过了套接字指定的接收缓存大小时 */
struct sk_buff *skb1;
BUG_ON(sock_owned_by_user(sk));
/* 将数据包从prequeue中转移到backlog中 */
while ((skb1 = __skb_dequeue(&tp->ucopy.prequeue)) != NULL) {
sk_backlog_rcv(sk, skb1);
NET_INC_STATS_BH(sock_net(sk),
LINUX_MIB_TCPPREQUEUEDROPPED);
}
tp->ucopy.memory = 0;
} else if (skb_queue_len(&tp->ucopy.prequeue) == 1) {
/* 如果该数据包是prequeue中的第一个数据包,则唤醒在该套接字中等待接收的进程 */
wake_up_interruptible_sync_poll(sk_sleep(sk),
POLLIN | POLLRDNORM | POLLRDBAND);
/* 如果ack定时器没有被调度,则设置ack定时器 */
if (!inet_csk_ack_scheduled(sk))
inet_csk_reset_xmit_timer(sk, ICSK_TIME_DACK,
(3 * tcp_rto_min(sk)) / 4,
TCP_RTO_MAX);
}
return 1;
}然后查看 sk_add_backlog,代码如下:
static inline __must_check int sk_add_backlog(struct sock *sk, struct sk_buff *skb)
{
/* 接收队列已满,则返回ENOBUFS错误。所谓的接收队列已满,即接收缓存的数据包占用的内存超过了限制。 */
if (sk_rcvqueues_full(sk, skb))
return -ENOBUFS;
/* 将数据包追加到backlog队列中,并增加相应的内存统计。 */
__sk_add_backlog(sk, skb);
sk->sk_backlog.len += skb->truesize;
return 0;
}看完这些代码后,我们应该产生一个疑问:既然 prequeue 和 backlog 都是保存的未经处理的TCP数据包,那么为什么还需要两个不同的队列呢?为了解答这个疑问,就需要研究内核是如何使用这两个队列的.前面的代码是这两个队列的写入操作,接下来我们看一下这两个队列是何时被读取的.
prequeue 队列的处理函数是 tcp_prequeue_process,它是在TCP的读取数据函数 tcp_recvmsg 中被调用的.在 tcp_recvmsg 的入口,内核会调用 lock_sock 来设置 sk->sk_lock.owned,表示该套接字由用户进程所占有,然后会对 receive_queue 和 prequeue 中的数据包进行处理.正因为sock被用户进程占用时,会访问 prequeue 队列,所以为了避免竞争,软中断在收到数据包时就只能把数据包保存到 backlog 中.那么为什么当sock不被用户进程占用时,软中断不将数据包保存到 backlog 中,而是保存到 prequeue 中呢?
要回答这个问题,还是要继续查看 backlog 是何时被读取的.让人觉得有点出乎意料的是,backlog 的数据包居然是在 __release_sock 中被处理的.
static void __release_sock(struct sock *sk)
__releases(&sk->sk_lock.slock)
__acquires(&sk->sk_lock.slock)
{
struct sk_buff *skb = sk->sk_backlog.head;
/* 处理backlog队列的数据包 */
do {
sk->sk_backlog.head = sk->sk_backlog.tail = NULL;
bh_unlock_sock(sk);
do {
struct sk_buff *next = skb->next;
WARN_ON_ONCE(skb_dst_is_noref(skb));
skb->next = NULL;
sk_backlog_rcv(sk, skb);
/*
* We are in process context here with softirqs
* disabled, use cond_resched_softirq() to preempt.
* This is safe to do because we've taken the backlog
* queue private:
*/
cond_resched_softirq();
skb = next;
} while (skb != NULL);
bh_lock_sock(sk);
} while ((skb = sk->sk_backlog.head) != NULL);
/*
* Doing the zeroing here guarantee we can not loop forever
* while a wild producer attempts to flood us.
*/
sk->sk_backlog.len = 0;
}不过这也解释了对于TCP套接字,为什么需要两个队列来保存未处理的数据包.
对于套接字的使用情况,一共有两个状态:
- 用户进程正在占用该套接字.
- 用户进程未占用该套接字.
而内核在任何情况下,都要尽量保证尽快返回软中断,以避免资源竞争.因此,在套接字的这两个状态下,都要保证软中断可以毫无阻塞地将数据包保存到未处理队列中,自然也就需要两个队列了.当用户进程正在占用套接字时,其会访问 prequeue,那么软中断就将数据包保存到 backlog 中.当用户放弃对套接字的占用时,其会访问 backlog,而这时,软中断就会将数据包保存到 prequeue 中.
设计思路
两个队列的设计,本质上是利用用户进程上下文来处理数据包,同时避免软中断被长时间阻塞.
prequeue用于用户未占用时的快速投递,backlog用于用户占用时的安全暂存,最终在__release_sock中统一处理.
14.6 从网卡到套接字
对于一般的套接字编程来说,大多是应用编程,所以基本上都是UDP或TCP协议的套接字.前面两章是从应用层次的角度,自上而下地分析了UDP和TCP数据包的发送和接收流程.但同时也有了一个新问题:数据包是如何进入对应套接字的接收缓冲区的呢?本章将从网卡接收到数据包开始,一直跟踪到内核将数据包放入到对应的套接字缓冲区中为止.
14.6.1 从硬中断到软中断
对于网卡来说,数据包的到达是一个无法预料的事件,系统需要通过某种手段来得知该事件.一般来说,有两种方式:轮询和中断.用直白的语言来描述,轮询就是CPU不断地问网卡:“你那有准备好的数据包吗?”如果网卡回答有数据包的话,CPU就进行处理,不然要么干点别的,要么继续问.中断则是没有数据包时,CPU该干嘛干嘛,若网卡收到数据包,就直接喊话“喂,有活干了”.于是CPU赶紧把手头的工作保存一下,并尽快响应任务.第一种方式,毫无疑问会造成CPU的浪费,因为在网卡没有数据包的时候,CPU还要浪费计算周期来询问网卡.第二种中断方式看上去很美,网卡没有数据的时候,CPU可以做其他的事情;有数据的时候,就可以及时处理.然而在实际应用中,中断方式也有很大的问题.在CPU响应中断时,为了不影响当前的工作,需要将当前工作的上下文保存起来,然后再进行中断处理.试想,当前千兆、万兆网卡已经非常普遍,若是那时网卡满负载,那么每秒钟就会产生大量的中断.除了切换过程带来的计算代价,上下文的切换还会导致CPU Cache的失效——这对高性能设备来说,是一个不可忽视的问题.于是,Linux对这两种方式进行了折中,引入了一个 NAPI(New API).简单来说,在CPU响应网卡中断时,不再仅仅是处理一个数据包就退出,而是使用轮询的方式继续尝试处理新数据包,直到没有新数据包到来,或者达到设置的一次中断最多处理的数据包个数.这个NAPI同时兼有了轮询和中断两种方式的特点.
网卡硬中断的处理是在网卡驱动中进行的,这个与硬件的联系过于紧密,我们可以忽略细节.只需要知道对于支持NAPI的网卡来说,其读取数据包的硬中断处理函数会调用 __napi_schedule 将网卡加入NAPI的poll list中,代码如下:
void __napi_schedule(struct napi_struct *n)
{
unsigned long flags;
/* 禁止本地中断,保护添加poll list的临界区 */
local_irq_save(flags);
/* 加入到当前CPU的poll列表中 */
____napi_schedule(&__get_cpu_var(softnet_data), n);
local_irq_restore(flags);
}进入 ____napi_schedule,代码如下:
static inline void ____napi_schedule(struct softnet_data *sd,
struct napi_struct *napi)
{
/* 将napi加入队尾 */
list_add_tail(&napi->poll_list, &sd->poll_list);
/* 触发当前CPU接收软中断(实际上是设置一个标志位) */
__raise_softirq_irqoff(NET_RX_SOFTIRQ);
}关于中断处理为什么要分为硬中断和软中断(也经常被称为上下部分)的解释已经很多了.简单地说:硬中断处理是一个特殊的上下文,CPU会屏蔽掉绝大部分中断,并且有不少的限制.所以硬中断应尽可能快地处理,以提高系统的响应速度,因此内核将具体的处理工作放到了软中断中.
14.6.2 软中断处理
14.6.1节中,我们看到硬中断通过设置标志位“触发”了软中断.那么内核又是何时处理软中断的呢?目前,在以下几种条件下,内核会检查是否需要处理软中断:
- 退出硬中断上下文时.
- 重新enable软中断时.
- 每个CPU都有一个
ksoftirqd的内核线程.当内核的软中断数量过多时,就会唤醒该线程循环处理软中断.
接收数据包的软中断处理函数为 net_rx_action,代码如下:
static void net_rx_action(struct softirq_action *h)
{
/* 接收数据包的per cpu队列 */
struct softnet_data *sd = &__get_cpu_var(softnet_data);
/* 最长的运行时间限制为2个jiffies */
unsigned long time_limit = jiffies + 2;
/*
* 一次软中断最多处理的包个数。
* netdev_budget 的值为 /proc/sys/net/core/netdev_budget
*/
int budget = netdev_budget;
void *have;
/* 因为网卡驱动会访问poll list,因此需要禁止本地硬中断以进行保护 */
local_irq_disable();
/* 遍历加入到poll链表的所有网卡 */
while (!list_empty(&sd->poll_list)) {
struct napi_struct *n;
int work, weight;
/* 如果已经处理完了允许的最大包个数,或超出了允许的时间限制,则退出此次处理 */
if (unlikely(budget <= 0 || time_after(jiffies, time_limit))) {
/* 这时退出此次收包软中断只是为了避免过长时间地占用CPU,所以跳到
* softnet_break,再触发一次收包软中断,以便下次继续处理 */
goto softnet_break;
}
/* 打开本地硬中断 */
local_irq_enable();
/* 这里在打开硬中断时,虽然访问了poll_list,但仍然是安全的。
* 因为硬中断只是在往poll_list的末尾插入,并不会影响第一个元素。 */
n = list_first_entry(&sd->poll_list, struct napi_struct, poll_list);
/* 获得该设备的netpoll锁 */
have = netpoll_poll_lock(n);
/* 得到该网卡的权重,其意义一般为在这个网卡上接收几个数据包 */
weight = n->weight;
/*
* This NAPI_STATE_SCHED test is for avoiding a race
* with netpoll's poll_napi(). Only the entity which
* obtains the lock and sees NAPI_STATE_SCHED set will
* actually make the ->poll() call. Therefore we avoid
* accidentally calling ->poll```c
work = 0;
/* 再次检查该网卡是否有NAPI调用(因为与netpoll有竞争) */
if (test_bit(NAPI_STATE_SCHED, &n->state)) {
/* 对网卡进行查询操作,work值为读取的数据包个数 */
work = n->poll(n, weight);
trace_napi_poll(n);
}
WARN_ON_ONCE(work > weight);
/* 更新包预算即目前还可以读取的数据包个数 */
budget -= work;
local_irq_disable();
/* 判断从该网卡读取的数据包是否达到预算个数 */
if (unlikely(work == weight)) {
/* 判断该网卡的NAPI是否被禁止了 */
if (unlikely(napi_disable_pending(n))) {
/* 若NAPI已经被禁止了,则执行NAPI的完成处理 */
local_irq_enable();
napi_complete(n);
local_irq_disable();
} else {
/* 若该设备仍要继续进行NAPI操作,则将其移至队尾 */
list_move_tail(&n->poll_list, &sd->poll_list);
}
}
/* 释放netpoll锁 */
netpoll_poll_unlock(have);
}
out:
/* 执行RPS处理并打开本地硬中断 */
net_rps_action_and_irq_enable(sd);
#ifdef CONFIG_NET_DMA
/* 启动未处理的DMA操作 */
dma_issue_pending_all();
#endif
return;
softnet_break:
/* 本次没有接收完所有的数据包,再触发一次软中断 */
sd->time_squeeze++;
__raise_softirq_irqoff(NET_RX_SOFTIRQ);
goto out;
}在这个收包软中断处理函数中,CPU会遍历poll列表,调用挂载到NAPI列表上的网卡回调函数 poll,来轮询接收数据包。所以,我们还需要通过驱动代码,来跟踪收包流程。在此,以Intel的 e1000 网卡驱动为例来进行讲解,在使用NAPI的情况下,其收包流程为:
net_rx_action → e1000_clean → e1000_clean_rx_irq → e1000_receive_skb → napi_gro_receive → netif_receive_skb
在这个调用链上,有一个函数 napi_gro_receive,其用来支持 GRO(Generic Receive Offload)。这个GRO则是用于减轻CPU的处理压力的。大家可以计算一下,对于10GB、100GB的网卡来说,即使每个数据包都是1500字节(以太网的最大MTU,暂不考虑Jumbo帧),那么每秒钟系统需要处理多少个数据包?因此,为了减轻CPU的负担,Linux内核在驱动层引入了GRO,它会将符合条件的数据包合并为一个数据包再传递给系统协议栈。在此,我们只关注数据包的接收流程,就不研究GRO的实现了。有兴趣的读者可以自行阅读源代码。
14.6.3 传递给协议栈流程
数据包在脱离驱动层后,就进入了 netif_receive_skb,代码如下:
int netif_receive_skb(struct sk_buff *skb)
{
/* 判断是否在入队前给数据包打时间戳 */
if (netdev_tstamp_prequeue)
net_timestamp_check(skb);
if (skb_defer_rx_timestamp(skb))
return NET_RX_SUCCESS;
/* 是否打开了 RPS(Receive Packet Steering)编译开关,其根据数据包的
* IP地址和端口号进行 hash 运算,将其发送给对应的 CPU.这样,一方面保证了
* CPU间的负载均衡,另一方面将同一特征的数据包发给相同的 CPU,可以提高
* cache的命中率.
*/
#ifdef CONFIG_RPS
{
struct rps_dev_flow voidflow, *rflow = &voidflow;
int cpu, ret;
rcu_read_lock();
/* 根据 RPS 算法,计算得到处理这个数据包的 CPU */
cpu = get_rps_cpu(skb->dev, skb, &rflow);
/* 当 CPU 大于等于 0 时,表示 RPS 计算得到了正确的 CPU */
if (cpu >= 0) {
/* 向其他 CPU 的接收队列追加这个数据包 */
ret = enqueue_to_backlog(skb, cpu, &rflow->last_qtail);
rcu_read_unlock();
} else {
/* 由本 CPU 处理该数据包 */
rcu_read_unlock();
ret = __netif_receive_skb(skb);
}
return ret;
}
#else
/* 本 CPU 继续处理该数据包 */
return __netif_receive_skb(skb);
#endif
}NOTE
这里可以看出
netif_receive_skb只是对__netif_receive_skb的封装,增加了对 RPS(接收包导引)的支持。
继续跟进 __netif_receive_skb,代码如下:
static int __netif_receive_skb(struct sk_buff *skb)
{
struct packet_type *ptype, *pt_prev;
rx_handler_func_t *rx_handler;
struct net_device *orig_dev;
struct net_device *null_or_dev;
bool deliver_exact = false;
int ret = NET_RX_DROP;
__be16 type;
/* 如果没有打开入队前采样数据包的时间戳功能,则需要在这里进行数据包时间戳采样 */
if (!netdev_tstamp_prequeue)
net_timestamp_check(skb);
trace_netif_receive_skb(skb);
/* 判断是否由 netpoll 处理 */
if (netpoll_receive_skb(skb))
return NET_RX_DROP;
/* 设置网卡的入口网卡 */
if (!skb->skb_iif)
skb->skb_iif = skb->dev->ifindex;
orig_dev = skb->dev;
/* 初始化数据包的网络层首部、传输层首部,以及二层 MAC 首部的长度 */
skb_reset_network_header(skb);
skb_reset_transport_header(skb);
skb_reset_mac_len(skb);
pt_prev = NULL;
rcu_read_lock();
another_round:
__this_cpu_inc(softnet_data.processed);
/* 如果是 802.1Q 协议的数据包 */
if (skb->protocol == cpu_to_be16(ETH_P_8021Q)) {
/* 则去掉 vlan tag */
skb = vlan_untag(skb);
if (unlikely(!skb))
goto out;
}
#ifdef CONFIG_NET_CLS_ACT
/* 如果数据包被设置了流控结果,则跳过后面的流控处理 */
if (skb->tc_verd & TC_NCLS) {
skb->tc_verd = CLR_TC_NCLS(skb->tc_verd);
goto ncls;
}
#endif
/* 遍历注册在 ptype_all 上的所有节点.ptype_all 上的节点需要处理收到的所有以太网数据包 */
list_for_each_entry_rcu(ptype, &ptype_all, list) {
/* 如果注册节点没有绑定网卡,或者绑定的网卡与数据包接收的网卡相同,则这个节点符合接收数据包的条件 */
if (!ptype->dev || ptype->dev == skb->dev) {
/* 将数据包传递给对应的处理函数 */
if (pt_prev)
ret = deliver_skb(skb, pt_prev, orig_dev);
pt_prev = ptype;
}
}
#ifdef CONFIG_NET_CLS_ACT
skb = handle_ing(skb, &pt_prev, &ret, orig_dev);
if (!skb)
goto out;
ncls:
#endif
/* 如果这个数据包带有 vlan 标签 */
if (vlan_tx_tag_present(skb)) {
if (pt_prev) {
/* 则将数据包传递给之前确定的上层协议 */
ret = deliver_skb(skb, pt_prev, orig_dev);
pt_prev = NULL;
}
/* 进行 vlan 的处理 */
if (vlan_do_receive(&skb))
goto another_round;
else if (unlikely(!skb))
goto out;
}
/*
* 判断该设备是否注册了接收处理函数.
* 设备上何时会注册接收处理函数呢?
* netdev_rx_handler_register 是注册设备接收处理函数的接口.
* 通过搜索 netdev_rx_handler_register 的调用者,可以发现当网卡作为
* bond 加入桥接,或者创建 macvlan 时,会注册网卡的处理函数.
* 使用这种方式,就做到了网卡接收处理函数与接收框架的解耦.
* 对于框架来说,通过这个回调函数(用函数指针实现的,内核中充斥着这样的代码),
* 可以完全不用了解具体的细节.
* 未来增加更多的网卡处理函数时,只需要在该具体实现上,调用注册函数,
* 而不用更改接收框架的代码.
*/
rx_handler = rcu_dereference(skb->dev->rx_handler);
if (rx_handler) {
if (pt_prev) {
/* 将数据包传递给之前确定的上层协议 */
ret = deliver_skb(skb, pt_prev, orig_dev);
pt_prev = NULL;
}
/* 调用在设备上注册的处理函数 */
switch (rx_handler(&skb)) {
case RX_HANDLER_CONSUMED:
/* 处理函数已经消耗了这个数据包,直接跳至退出 */
ret = NET_RX_SUCCESS;
goto out;
case RX_HANDLER_ANOTHER:
/* 跳至 another_round,即跳至函数开头,重新处理 */
goto another_round;
case RX_HANDLER_EXACT:
/* 指示必须严格匹配接收网卡 */
deliver_exact = true;
case RX_HANDLER_PASS:
/* 继续后面的处理 */
break;
default:
BUG();
}
}
/* 如果数据包还带有 vlan tag,则证明该数据包是发给其他终端的 */
if (vlan_tx_nonzero_tag_present(skb))
skb->pkt_type = PACKET_OTHERHOST;
/* deliver only exact match when indicated */
null_or_dev = deliver_exact ? skb->dev : NULL;
/* 根据数据包的类型,遍历对应的处理函数 */
type = skb->protocol;
list_for_each_entry_rcu(ptype,
&ptype_base[ntohs(type) & PTYPE_HASH_MASK], list) {
/* 如果数据包类型匹配,并且接收接口设备也匹配,则证明这是正确的协议处理函数.
* 然后调用前面的响应处理函数,将数据包传递给上层协议 */
if (ptype->type == type &&
(ptype->dev == null_or_dev || ptype->dev == skb->dev ||
ptype->dev == orig_dev)) {
if (pt_prev)
ret = deliver_skb(skb, pt_prev, orig_dev);
pt_prev = ptype;
}
}
/* 最后检查 pt_prev 是否为真.若为真,则表示前面有匹配的处理函数,然后进行调用.
* 如果为假,则表示对于这个数据包,内核没有对应的处理函数,那就直接释放这个数据包. */
if (pt_prev) {
ret = pt_prev->func(skb, skb->dev, pt_prev, orig_dev);
} else {
atomic_long_inc(&skb->dev->rx_dropped);
kfree_skb(skb);
/* Jamal, now you will not able to escape explaining
* me how you were going to use this. :-)
*/
ret = NET_RX_DROP;
}
out:
rcu_read_unlock();
return ret;
}关键路径
- 首先处理时间戳、netpoll、初始化头部指针。
- 遍历
ptype_all列表,将数据包分发给所有注册的通用处理函数(如 tcpdump)。- 若开启包分类(
CONFIG_NET_CLS_ACT),执行入口流量控制。- 处理 VLAN 标签(去除、重新入队等)。
- 检查设备接收处理函数(
rx_handler),用于桥接、bond、macvlan 等场景。- 最后根据协议类型(
skb->protocol)在ptype_base哈希表中查找对应的协议处理函数(如 IP、ARP),并调用其回调。
设计思想
内核通过
ptype_all和ptype_base两套链表实现数据包的多路分发,前者用于全局嗅探,后者用于按协议精确匹配。rx_handler机制使得网卡可以透明地插入额外的接收处理逻辑,而无需修改核心框架。
该函数最后若无匹配处理函数,则丢弃数据包并递增 rx_dropped 计数器。
14.6.4 IP协议处理流程
以IPv4的协议栈处理为例进行讲解,首先来看看IPv4协议是如何注册处理回调函数的。在inet_init中,调用了dev_add_pack(&ip_packet_type)进行了IPv4协议的注册。ip_packet_type的定义为:
static struct packet_type ip_packet_type __read_mostly = {
.type = cpu_to_be16(ETH_P_IP),
.func = ip_rcv,
.gso_send_check = inet_gso_send_check,
.gso_segment = inet_gso_segment,
.gro_receive = inet_gro_receive,
.gro_complete = inet_gro_complete,
};因此,ip_rcv为IPv4协议数据包的入口函数。下面来看看该函数:
int ip_rcv(struct sk_buff *skb, struct net_device *dev, struct packet_type *pt, struct net_device *orig_dev)
{
const struct iphdr *iph;
u32 len;
/*
如果该数据是发给其他终端的,则丢弃这个数据包.
这里的pkt_type是根据二层地址来判断是否发给本机的.
*/
if (skb->pkt_type == PACKET_OTHERHOST)
goto drop;
IP_UPD_PO_STATS_BH(dev_net(dev), IPSTATS_MIB_IN, skb->len);
/* 对数据包进行共享检查,保证IP协议处理的数据包独享一个skb. */
if ((skb = skb_share_check(skb, GFP_ATOMIC)) == NULL) {
IP_INC_STATS_BH(dev_net(dev), IPSTATS_MIB_INDISCARDS);
goto out;
}
/* 检查IP首部,如果数据包小于IP首部的大小,则出错 */
if (!pskb_may_pull(skb, sizeof(struct iphdr)))
goto inhdr_error;
/* 得到IP首部地址 */
iph = ip_hdr(skb);
/* 根据RFC标准,IP首部小于20字节,或者版本号不是4的,就报错丢弃 */
if (iph->ihl < 5 || iph->version != 4)
goto inhdr_error;
/* 根据IP首部指定的长度,再次检查数据包的大小 */
if (!pskb_may_pull(skb, iph->ihl*4))
goto inhdr_error;
/* 重新获取IP首部地址.之所以要重新获取,是因为pskb_may_pull可能要重新申请skb */
iph = ip_hdr(skb);
/* 校验IPv4的校验和 */
if (unlikely(ip_fast_csum((u8 *)iph, iph->ihl)))
goto inhdr_error;
/* 对数据包长度进行检查 */
len = ntohs(iph->tot_len);
if (skb->len < len) {
IP_INC_STATS_BH(dev_net(dev), IPSTATS_MIB_INTRUNCATEDPKTS);
goto drop;
} else if (len < (iph->ihl*4))
goto inhdr_error;
/* Our transport medium may have padded the buffer out. Now we know it
* is IP we can trim to the true length of the frame.
* Note this now means skb->len holds ntohs(iph->tot_len).
*/
/* 因为传输媒介可能会给数据包进行补齐,现在已经根据IP首部明确了数据包的长度,因此需要将数据
* 包的长度变为真正的长度并改变
*/
if (pskb_trim_rcsum(skb, len)) {
IP_INC_STATS_BH(dev_net(dev), IPSTATS_MIB_INDISCARDS);
goto drop;
}
/* 重置数据包的控制块信息 */
memset(IPCB(skb), 0, sizeof(struct inet_skb_parm));
/* 重置数据包的套接字信息 */
skb_orphan(skb);
/* 遍历执行netfilter在PREROUTING点上的规则,如果数据包没有被丢弃,则进入ip_rcv_finish */
return NF_HOOK(NFPROTO_IPV4, NF_INET_PRE_ROUTING, skb, dev, NULL,
ip_rcv_finish);
inhdr_error:
IP_INC_STATS_BH(dev_net(dev), IPSTATS_MIB_INHDRERRORS);
drop:
kfree_skb(skb);
out:
return NET_RX_DROP;
}关于netfilter
本文不分析netfilter的相关代码。数据包经过netfilter的PREROUTING处的规则后,进入了
ip_rcv_finish。
ip_rcv_finish代码如下:
static int ip_rcv_finish(struct sk_buff *skb)
{
const struct iphdr *iph = ip_hdr(skb);
struct rtable *rt;
/* 如果数据包没有设置路由信息,则进行路由查询 */
if (skb_dst(skb) == NULL) {
int err = ip_route_input_noref(skb, iph->daddr, iph->saddr,
iph->tos, skb->dev);
if (unlikely(err)) {
/* 若查找路由失败,增加相应的错误计数,并丢弃数据包 */
if (err == -EHOSTUNREACH)
IP_INC_STATS_BH(dev_net(skb->dev),
IPSTATS_MIB_INADDRERRORS);
else if (err == -ENETUNREACH)
IP_INC_STATS_BH(dev_net(skb->dev),
IPSTATS_MIB_INNOROUTES);
else if (err == -EXDEV)
NET_INC_STATS_BH(dev_net(skb->dev),
LINUX_MIB_IPRPFILTER);
goto drop;
}
}
#ifdef CONFIG_IP_ROUTE_CLASSID
if (unlikely(skb_dst(skb)->tclassid)) {
struct ip_rt_acct *st = this_cpu_ptr(ip_rt_acct);
u32 idx = skb_dst(skb)->tclassid;
st[idx&0xFF].o_packets++;
st[idx&0xFF].o_bytes += skb->len;
st[(idx>>16)&0xFF].i_packets++;
st[(idx>>16)&0xFF].i_bytes += skb->len;
}
#endif
/* iph大于5,即首部长度大于固定首部长度20字节.因此说明该IP报文具有IP选项,于是调用ip_rcv_options处理IP选项 */
if (iph->ihl > 5 && ip_rcv_options(skb))
goto drop;
/* 根据路由类型,增加相应的计数 */
rt = skb_rtable(skb);
if (rt->rt_type == RTN_MULTICAST) {
IP_UPD_PO_STATS_BH(dev_net(rt->dst.dev), IPSTATS_MIB_INMCAST,
skb->len);
} else if (rt->rt_type == RTN_BROADCAST)
IP_UPD_PO_STATS_BH(dev_net(rt->dst.dev), IPSTATS_MIB_INBCAST,
skb->len);
/* 调用路由的输入函数 */
return dst_input(skb);
drop:
kfree_skb(skb);
return NET_RX_DROP;
}对于发往本机的数据包来说,其路由输入函数为ip_local_deliver,代码如下:
int ip_local_deliver(struct sk_buff *skb)
{
/* 该数据包是一个IP分片数据包 */
if (ip_is_fragment(ip_hdr(skb))) {
/* 进行IP分片重组处理 */
if (ip_defrag(skb, IP_DEFRAG_LOCAL_DELIVER))
return 0;
}
/* 遍历执行netfilter在LOCAL_IN上的规则,如为丢弃,则进入ip_local_deliver_finish */
return NF_HOOK(NFPROTO_IPV4, NF_INET_LOCAL_IN, skb, skb->dev, NULL,
ip_local_deliver_finish);
}进入ip_local_deliver_finish,代码如下:
static int ip_local_deliver_finish(struct sk_buff *skb)
{
struct net *net = dev_net(skb->dev);
/* 拉出IP报文首部,因为马上就要脱离IP层,进入传输层了. */
__skb_pull(skb, ip_hdrlen(skb));
/* 设置传输层首部地址 */
skb_reset_transport_header(skb);
rcu_read_lock();
{
/* 得到传输层协议 */
int protocol = ip_hdr(skb)->protocol;
int hash, raw;
const struct net_protocol *ipprot;
resubmit:
/* 将数据包传递给对应的原始套接字 */
raw = raw_local_deliver(skb, protocol);
/* 根据传输协议确定对应的inet协议 */
hash = protocol & (MAX_INET_PROTOS - 1);
ipprot = rcu_dereference(inet_protos[hash]);
if (ipprot != NULL) {
/* 找到了匹配传输层的协议 */
int ret;
/* 检查名称空间是否匹配 */
if (!net_eq(net, &init_net) && !ipprot->netns_ok) {
if (net_ratelimit())
printk("%s: proto %d isn't netns-ready\n",
__func__, protocol);
kfree_skb(skb);
goto out;
}
/* 协议的安全策略检查 */
if (!ipprot->no_policy) {
if (!xfrm4_policy_check(NULL, XFRM_POLICY_IN, skb)) {
kfree_skb(skb);
goto out;
}
nf_reset(skb);
}
/* 将数据包传递给传输层处理 */
ret = ipprot->handler(skb);
if (ret < 0) {
protocol = -ret;
goto resubmit;
}
IP_INC_STATS_BH(net, IPSTATS_MIB_INDELIVERS);
} else {
/* 没有对应的传输层协议 */
if (!raw) {
/* 若没有匹配的原始套接字,则进行安全策略检查 */
if (xfrm4_policy_check(NULL, XFRM_POLICY_IN, skb)) {
/* 若没有对应的安全策略,则使用ICMP返回不可达错误 */
IP_INC_STATS_BH(net, IPSTATS_MIB_INUNKNOWNPROTOS);
icmp_send(skb, ICMP_DEST_UNREACH,
ICMP_PROT_UNREACH, 0);
}
} else
IP_INC_STATS_BH(net, IPSTATS_MIB_INDELIVERS);
kfree_skb(skb);
}
}
out:
rcu_read_unlock();
return 0;
}14.6.5 大师的错误?原始套接字的接收
在UNP1的28.4“Raw Socket Input”一节中,Stevens大师是这样说的:
Received UDP packets and received TCP packets are never passed to a raw socket. If a process wants to read IP datagrams containing UDP or TCP packets, the packets must be read at the datalink layer, as described in Chapter 29.
UNP1一书中文版的翻译原文是这样的:
接收到UDP分组和TCP分组绝不传递到任何原始套接口。如果一个进程想要读取含有UDP分组或TCP分组的IP数据报,它就必须在数据链路层读取这些分组。
对于中文版的翻译,上文中的“分组”实在是不专业,因为这不是一个准确的术语。读者在看到这个部分后,绝对会很疑惑。分组?何谓分组?是分片的笔误还是组播?笔者自己也是对照了英文原版后才明白中文版的意思。与其用一个模糊的“分组”,还不如直接用“报文”更直截了当。
回到正题,根据UNP1的说法,普通的raw socket是无法收到TCP和UDP的数据包的,除非该套接字是从数据链路层就开始读取数据包的。而实际上Linux内核的实际行为却不是这样的。下面让我们用代码来说明:
int raw_local_deliver(struct sk_buff *skb, int protocol)
{
int hash;
struct sock *raw_sk;
/* 根据传输层协议确定hash桶索引 */
hash = protocol & (RAW_HTABLE_SIZE - 1);
/* 获得该桶的头结点 */
raw_sk = sk_head(&raw_v4_hashinfo.ht[hash]);
/* 当头结点不为空时,才进入raw_v4_input做进一步检查 */
if (raw_sk && !raw_v4_input(skb, ip_hdr(skb), hash)) {
/* 如果没有找到匹配的原始套接字,则重置raw_sk为NULL. */
raw_sk = NULL;
}
return raw_sk != NULL;
}然后进入raw_v4_input,代码如下:
static int raw_v4_input(struct sk_buff *skb, const struct iphdr *iph, int hash)
{
struct sock *sk;
struct hlist_head *head;
int delivered = 0;
struct net *net;
/* 与raw_local_deliver不同,因为需要使用到头结点中的内容,所以需要对这个桶上锁,才能保证
在处理这个桶的过程中,所有节点都是有效的.*/
read_lock(&raw_v4_hashinfo.lock);
/* 再次检查头结点 */
head = &raw_v4_hashinfo.ht[hash];
if (hlist_empty(head))
goto out;
/* 获得网络名称空间 */
net = dev_net(skb->dev);
/* 查询匹配的原始套接字 */
sk = __raw_v4_lookup(net, __sk_head(head), iph->protocol,
iph->saddr, iph->daddr,
skb->dev->ifindex);
/* 若找到了匹配的原始套接字,则继续处理 */
while (sk) {
delivered = 1;
/* 如果数据包不是ICMP数据包,或者不是被指定要过滤的ICMP类型 */
if (iph->protocol != IPPROTO_ICMP || !icmp_filter(sk, skb)) {
/* 数据包要发给该套接字,需要clone一个新的skb */
struct sk_buff *clone = skb_clone(skb, GFP_ATOMIC);
/* 若clone成功,则调用原始套接字的接收函数 */
if (clone)
raw_rcv(sk, clone);
}
/* 继续查询后面的套接字 */
sk = __raw_v4_lookup(net, sk_next(sk), iph->protocol,
iph->saddr, iph->daddr,
skb->dev->ifindex);
}
out:
read_unlock(&raw_v4_hashinfo.lock);
return delivered;
}进入__raw_v4_lookup,代码如下:
static struct sock *__raw_v4_lookup(struct net *net, struct sock *sk,
unsigned short num, __be32 raddr, __be32 laddr, int dif)
{
struct hlist_node *node;
/* 遍历套接字 */
sk_for_each_from(sk, node) {
struct inet_sock *inet = inet_sk(sk);
/*
检查如下几个条件:
1)检查名称空间.
2)比较协议号.
3)如果套接字设置了目的地址且地址相同.
4)如果套接字设置了源地址且地址相同.
5)如果套接字绑定了网卡,且网卡相同.
只有当以上五个条件都匹配的时候,该套接字才匹配.
*/
if (net_eq(sock_net(sk), net) && inet->inet_num == num &&
!(inet->inet_daddr && inet->inet_daddr != raddr) &&
!(inet->inet_rcv_saddr && inet->inet_rcv_saddr != laddr) &&
!(sk->sk_bound_dev_if && sk->sk_bound_dev_if != dif))
goto found; /* gotcha */
}
sk = NULL;
found:
return sk;
}在上面的匹配条件中,源地址、目的地址和绑定网卡是原始套接字调用connect、bind等系统调用设置的过滤条件。增加这些过滤条件,一般是为了让应用层减少不必要的消耗,避免过滤不需要过滤的数据包。可以发现,在原始套接字的接收流程中,并没有对TCP和UDP进行任何的限制。也就是说在Linux环境下,普通的原始套接字完全可以接受TCP和UDP的数据包,这与UNP的描述不符。那这是怎么回事呢?因为Stevens大师的UNP针对的是Unix环境的网络编程,而Linux虽然是与Unix兼容的,但在细节的实现上必然与Unix有所不同。需要注意的是,Stevens大师的另一本经典书籍APUE,也是针对Unix环境的介绍,在某些细节上肯定会与Linux环境有一定的出入。
关于原始套接字的差异
Linux的原始套接字实现与经典Unix书籍UNP的描述存在差异:Linux的原始套接字实际上可以接收TCP和UDP数据包,而UNP声称它们绝不会传递给原始套接字。这是由于Linux在实现上与经典的Unix环境有所不同所致。
14.6## 14.6.6 注册传输层协议
在14.5.4节提到的ip_local_deliver_finish函数中,内核通过调用ipprot->handler(skb)将数据包传递给了正确的传输层协议。对于IPv4协议来说,其传输层协议的处理函数的handler是在inet_init中添加的。下面是inet_init中的部分代码:
/* 添加ICMP协议 */
if (inet_add_protocol(&icmp_protocol, IPPROTO_ICMP) < 0)
printk(KERN_CRIT "inet_init: Cannot add ICMP protocol\n");
/* 添加UDP协议 */
if (inet_add_protocol(&udp_protocol, IPPROTO_UDP) < 0)
printk(KERN_CRIT "inet_init: Cannot add UDP protocol\n");
/* 添加TCP协议 */
if (inet_add_protocol(&tcp_protocol, IPPROTO_TCP) < 0)
printk(KERN_CRIT "inet_init: Cannot add TCP protocol\n");
#ifdef CONFIG_IP_MULTICAST
/* 添加IGMP协议 */
if (inet_add_protocol(&igmp_protocol, IPPROTO_IGMP) < 0)
printk(KERN_CRIT "inet_init: Cannot add IGMP protocol\n");
#endif通过调用inet_add_protocol函数,传输层将自己的处理函数添加到了inet_protos中,这样就可以在ip_local_deliver_finish中调用对应的传输层的处理函数了。
inet_init中的另一部分代码如下:
for (q = inetsw_array; q < &inetsw_array[INETSW_ARRAY_LEN]; ++q)
inet_register_protosw(q);这部分代码用于注册AF_INET的各种协议,如UDP、TCP等。那为什么inet会使用两种不同的方式来支持传输层协议的注册呢?为何不合并为一个结构呢?在笔者看来,inet_add_protocol面向的是底层接口,而inet_register_protosw面向的是上层应用,所以将其分为了两个结构。
14.6.7 确定UDP套接字
UDP协议面向底层接口的处理结构为:
static const struct net_protocol udp_protocol = {
.handler = udp_rcv,
.err_handler = udp_err,
.gso_send_check = udp4_ufo_send_check,
.gso_segment = udp4_ufo_fragment,
.no_policy = 1,
.netns_ok = 1,
};因此,如果是UDP数据包,会依次进入 udp_rcv → __udp4_lib_rcv,下面来看看 __udp4_lib_rcv 的相关代码:
int __udp4_lib_rcv(struct sk_buff *skb, struct udp_table *udptable,
int proto)
{
struct sock *sk;
struct udphdr *uh;
unsigned short ulen;
struct rtable *rt = skb_rtable(skb);
__be32 saddr, daddr;
struct net *net = dev_net(skb->dev);
/* 校验数据包至少要有UDP首部大小 */
if (!pskb_may_pull(skb, sizeof(struct udphdr)))
goto drop; /* No space for header. */
/* 得到UDP首部指针 */
uh = udp_hdr(skb);
/* 得到UDP数据包长度、源地址、目的地址 */
ulen = ntohs(uh->len);
saddr = ip_hdr(skb)->saddr;
daddr = ip_hdr(skb)->daddr;
/* 如果UDP数据包长度超过数据包的实际长度,则出错 */
if (ulen > skb->len)
goto short_packet;
/*
判断协议是否为UDP协议.
也许有的读者会觉得很奇怪,为什么在UDP的接收函数中还要判断协议是否为UDP?
因为这个函数还用于处理UDPLITE协议.
*/
if (proto == IPPROTO_UDP) {
/* 如果是UDP协议,则将数据包的长度更新为UDP指定的长度,并更新校验和 */
if (ulen < sizeof(*uh) || pskb_trim_rcsum(skb, ulen))
goto short_packet;
/* 因为前面的操作可能会导致skb内存变化,所以需要重新获得UDP首部指针 */
uh = udp_hdr(skb);
}
/* 初始化UDP校验和 */
if (udp4_csum_init(skb, uh, proto))
goto csum_error;
/* 如果路由标志位广播或多播,则表明该UDP数据包为广播或多播 */
if (rt->rt_flags & (RTCF_BROADCAST|RTCF_MULTICAST))
return __udp4_lib_mcast_deliver(net, skb, uh,
saddr, daddr, udptable);
/* 确定匹配的UDP套接字 */
sk = __udp4_lib_lookup_skb(skb, uh->source, uh->dest, udptable);
if (sk != NULL) {
/* 找到了匹配的套接字```c
int ret = udp_queue_rcv_skb(sk, skb);
sock_put(sk);
/* a return value > 0 means to resubmit the input, but
* it wants the return to be -protocol, or 0
*/
if (ret > 0)
return -ret;
return 0;
}
/* 进行xfrm策略检查 */
if (!xfrm4_policy_check(NULL, XFRM_POLICY_IN, skb))
goto drop;
/* 重置netfilter信息 */
nf_reset(skb);
/* 检查UDP检验和 */
if (udp_lib_checksum_complete(skb))
goto csum_error;
/* 若不知道匹配的UDP套接字,则发送ICMP错误消息 */
UDP_INC_STATS_BH(net, UDP_MIB_NOPORTS, proto == IPPROTO_UDPLITE);
icmp_send(skb, ICMP_DEST_UNREACH, ICMP_PORT_UNREACH, 0);
/*
* Hmm. We got an UDP packet to a port to which we
* don't wanna listen. Ignore it.
*/
kfree_skb(skb);
return 0;
/* 错误处理 */
……
}下面来看一下如何匹配UDP套接字,请看 __udp4_lib_lookup_skb → __udp4_lib_lookup 函数,代码如下:
static struct sock *__udp4_lib_lookup(struct net *net, __be32 saddr,
__be16 sport, __be32 daddr, __be16 dport,
int dif, struct udp_table *udptable)
{
struct sock *sk, *result;
struct hlist_nulls_node *node;
unsigned short hnum = ntohs(dport);
/* 使用目的端口确定hash桶索引 */
unsigned int hash2, slot2, slot = udp_hashfn(net, hnum, udptable->mask);
struct udp_hslot *hslot2, *hslot = &udptable->hash[slot];
int score, badness;
rcu_read_lock();
/* 若该桶的套接字个数多于10个,则需要再次定位 */
if (hslot->count > 10) {
/* 使用目的地址和目的端口确定hash桶索引 */
hash2 = udp4_portaddr_hash(net, daddr, hnum);
slot2 = hash2 & udptable->mask;
/*
UDP套接字表维护了两个hash表:
第一个hash表,使用端口来索引。
第二个hash表,使用地址+端口来索引。
在进行UDP套接字匹配的时候,优先使用第一个hash表,因为第一个hash表使用的是端口进行散
列索引,那么只要端口相同,无论是监听的指定IP还是任意IP,都可以在一个桶中进行匹配。但
是由于端口只有65535种可能,所以可能导致不够分散,一个桶的套接字个数会比较多。而第二个
hash表是使用地址+端口来索引的,因此理论上套接字的分布会比第一个hash表更加分散。
因此当第一个hash表对应桶的套接字多于10个时,内核会尝试去第二个hash表中进行匹配查找。
*/
hslot2 = &udptable->hash2[slot2];
/* 尽管第二个hash表理论上会比第一个hash表分散,但是如果实际上第二个表的桶中套接字个数大于第一个表的桶中套接字个数,那么这时还是利用第一个hash表进行匹配 */
if (hslot->count < hslot2->count)
goto begin;
/* 在第二个hash表的桶中匹配查找套接字 */
result = udp4_lib_lookup2(net, saddr, sport,
daddr, hnum, dif,
hslot2, slot2);
if (!result) {
/* 若利用指定的IP和端口在该桶中没能找到匹配的套接字,则通常使用任意IP+端口来进行
散列索引 */
hash2 = udp4_portaddr_hash(net, htonl(INADDR_ANY), hnum);
slot2 = hash2 & udptable->mask;
hslot2 = &udptable->hash2[slot2];
/* 还是要与第一个hash桶中的个数进行比较 */
if (hslot->count < hslot2->count)
goto begin;
/* 在第二个hash表中使用任意IP+端口进行匹配查找 */
result = udp4_lib_lookup2(net, saddr, sport,
htonl(INADDR_ANY), hnum, dif,
hslot2, slot2);
}
rcu_read_unlock();
return result;
}
begin:
result = NULL;
badness = -1;
/* 在第一个hash表的桶中进行查找 */
sk_nulls_for_each_rcu(sk, node, &hslot->head) {
/* 计算该套接字的匹配得分 */
score = compute_score(sk, net, saddr, hnum, sport,
daddr, dport, dif);
/* 保证匹配得分最高的套接字为最终结果 */
if (score > badness) {
result = sk;
badness = score;
}
}
/*
检查在查找的过程中,是否遇到了某个套接字被移到另外一个桶内的情况。
这时,需要重新进行匹配。
*/
if (get_nulls_value(node) != slot)
goto begin;
/* 找到了匹配的套接字 */
if (result) {
/* 增加套接字引用计数 */
if (unlikely(!atomic_inc_not_zero_hint(&result->sk_refcnt, 2)))
result = NULL;
/* 再次计算套接字得分,如小于最大分数,则重新匹配查找。之所以做二次检查,也是为了防止在
匹配与增加引用的过程中,套接字发生变化。 */
else if (unlikely(compute_score(result, net, saddr, hnum, sport,
daddr, dport, dif) < badness)) {
sock_put(result);
goto begin;
}
}
rcu_read_unlock();
return result;
}从上面的代码中可以看到,匹配UDP套接字的关键在于对应套接字的匹配得分.第一个hash表的得分计算函数为 compute_score.
static inline int compute_score(struct sock *sk, struct net *net, __be32 saddr,
unsigned short hnum,
__be16 sport, __be32 daddr, __be16 dport, int dif)
{
int score = -1;
/* 比较名称空间,端口等 */
if (net_eq(sock_net(sk), net) && udp_sk(sk)->udp_port_hash == hnum &&
!ipv6_only_sock(sk)) {
struct inet_sock *inet = inet_sk(sk);
/* 若套接字指明为PF_INET,则加1分 */
score = (sk->sk_family == PF_INET ? 1 : 0);
/* 套接字绑定了接收地址 */
if (inet->inet_rcv_saddr) {
/* 如果数据包的目的地址与绑定接收地址不符,则分数为-1,相同则增加2分。 */
if (inet->inet_rcv_saddr != daddr)
return -1;
score += 2;
}
/* 套接字设置了对端目的地址 */
if (inet->inet_daddr) {
/* 如果数据包的源地址与设置的目的地址不同,则分数为-1,相同则增加2分 */
if (inet->inet_daddr != saddr)
return -1;
score += 2;
}
/* 套接字设置了对端目的端口 */
if (inet->inet_dport) {
/* 如果数据包的源端口与设置的目的端口不同,则分数为-1,相同则增加2分 */
if (inet->inet_dport != sport)
return -1;
score += 2;
}
/* 套接字绑定了网卡 */
if (sk->sk_bound_dev_if) {
/* 如果接受数据包的网卡与绑定网卡不同,则分数为-1,相同则增加2分 */
if (sk->sk_bound_dev_if != dif)
return -1;
score += 2;
}
}
return score;
}对于第二个hash,其匹配分数计算函数为 compute_score2,算法与 compute_score 基本相同.总的来说UDP的套接字匹配有以下几个条件:
- 接收端口:必须匹配.
- 接收地址:如绑定了则必须匹配,分值为2分.
- 对端目的地址:如设置了则必须匹配,分值为2分.
- 对端目的端口:如设置了则必须匹配,分值为2分.
- 网卡:如绑定了则必须匹配,分值为2分.
- 套接字设置了PF_INET协议族,分值为1分.
根据上面的规则,匹配分值最高的套接字就为选中的UDP套接字,然后内核会将这个数据包加入到该UDP套接字的接收队列中.也就是说,即使数据包可以匹配多个UDP套接字(这是很有可能的),但是最终也只有一个最匹配的套接字会被选中,并且只有这个套接字可以收到数据包.
多套接字负载均衡误区
有一些开发人员想使用套接字的
SO_REUSEADDR选项,让多个套接字绑定同一个地址或端口,然后让独立的线程或进程负责一个套接字的处理,希望利用这样的设计来提高服务的响应速度.这里面有个想当然的认为,当多个套接字负责同一个地址和端口的数据包接收时,它们可以分担负载.然而从上面的源码分析中,我们可以发现这样的设计方案是达不到预期效果的.因为内核在进行套接字的匹配时,对于绑定相同地址和端口的多个套接字,每次只会命中同一个套接字.结果在上面的设计中,只有一个套接字会收到数据包,也就说最后只有一个线程或进程在处理数据包.
SO_REUSEPORT
不过Linux内核在3.9版本中引入了一个新的套接字选项
SO_REUSEPORT用于解决上面的问题.当多个套接字绑定于同一个地址和端口时,并启用了SO_REUSEPORT时,内核会自动在这几个套接字之间做负载均衡,保证对应的数据包能尽量平均地分配到不同的套接字上.
14.6.8 确定TCP套接字
TCP面向底层接口的处理结构为:
static const struct net_protocol tcp_protocol = {
.handler = tcp_v4_rcv,
.err_handler = tcp_v4_err,
.gso_send_check = tcp_v4_gso_send_check,
.gso_segment = tcp_tso_segment,
.gro_receive = tcp4_gro_receive,
.gro_complete = tcp4_gro_complete,
.no_policy = 1,
.netns_ok = 1,
};那么,如果是TCP数据包,则会进入 tcp_v4_rcv,代码如下:
int tcp_v4_rcv(struct sk_buff *skb)
{
const struct iphdr *iph;
const struct tcphdr *th;
struct sock *sk;
int ret;
struct net *net = dev_net(skb->dev);
/* 丢弃不是发给自己的数据包 */
if (skb->pkt_type != PACKET_HOST)
goto discard_it;
/* Count it even if it's bad */
TCP_INC_STATS_BH(net, TCP_MIB_INSEGS);
/* 检查数据包至少要有TCP固定首部的大小 */
if (!pskb_may_pull(skb, sizeof(struct tcphdr)))
goto discard_it;
/* 获得TCP首部地址 */
th = tcp_hdr(skb);
/* 检查TCP的数据偏移量是否合法,不能小于TCP固定首部的大小 */
if (th->doff < sizeof(struct tcphdr) / 4)
goto bad_packet;
/* 检查数据包的大小是否满足TCP指定的数据偏移位置 */
if (!pskb_may_pull(skb, th->doff * 4))
goto discard_it;
/* 如果需要检查校验和,则进行校验和初始化 */
if (!skb_csum_unnecessary(skb) && tcp_v4_checksum_init(skb))
goto bad_packet;
/* 因为skb可能会重新申请,所以需要重新得到TCP首部 */
th = tcp_hdr(skb);
iph = ip_hdr(skb);
/* 根据TCP报文信息,设置skb的TCP控制块 */
TCP_SKB_CB(skb)->seq = ntohl(th->seq);
TCP_SKB_CB(skb)->end_seq = (TCP_SKB_CB(skb)->seq + th->syn + th->fin + skb->len - th->doff * 4);
TCP_SKB_CB(skb)->ack_seq = ntohl(th->ack_seq);
TCP_SKB_CB(skb)->when = 0;
TCP_SKB_CB(skb)->ip_dsfield = ipv4_get_dsfield(iph);
TCP_SKB_CB(skb)->sacked = 0;
/* 查找匹配的TCP套接字 */
sk = __inet_lookup_skb(&tcp_hashinfo, skb, th->source, th->dest);
if (!sk)
goto no_tcp_socket;
/* 找到匹配的套接字并开始处理,因为不是本文重点,因此省略后面的分析 */
process:
……
}进入 __inet_lookup_skb → __inet_lookup.
static inline struct sock *__inet_lookup(struct net *net,
struct inet_hashinfo *hashinfo,
const __be32 saddr, const __be16 sport,
const __be32 daddr, const __be16 dport,
const int dif)
{
u16 hnum = ntohs(dport);
/* 先在已连接的套接字中进行查找 */
struct sock *sk = __inet_lookup_established(net, hashinfo,
saddr, sport, daddr, hnum, dif);
/* 优先使用已连接的套接字,若没有找到,则在监听的套接字中进行查找 */
return sk ? : __inet_lookup_listener(net, hashinfo, daddr, hnum, dif);
}对于TCP来说,套接字的匹配分为两部分:一个是匹配已经建立连接的,另外一个是匹配监听状态的套接字.我们先来看看前者,即已经建立连接的,进入 __inet_lookup_established,代码如下:
struct sock * __inet_lookup_established(struct net *net,
struct inet_hashinfo *hashinfo,
const __be32 saddr, const __be16 sport,
const __be32 daddr, const u16 hnum,
const int dif)
{
INET_ADDR_COOKIE(acookie, saddr, daddr)
const __portpair ports = INET_COMBINED_PORTS(sport, hnum);
struct sock *sk;
const struct hlist_nulls_node *node;
/* Optimize here for direct hit, only listening connections can
* have wildcards anyways.
*/
/* 根据目的地址、目的端口、源地址、源端口计算得到已经连接了hash表的桶索引 */
unsigned int hash = inet_ehashfn(net, daddr, hnum, saddr, sport);
unsigned int slot = hash & hashinfo->ehash_mask;
struct inet_ehash_bucket *head = &hashinfo->ehash[slot];
rcu_read_lock();
begin:
/* 遍历该桶节点 */
sk_nulls_for_each_rcu(sk, node, &head->chain) {
/*
比较源地址、目的地址、源端口、目的端口及接收网卡。
细心的读者会发现这里有两个参数acookie和ports,这两个参数用于加速匹配。在64位机器上,
acookie为源地址和目的地址合成的64位整数,在32位机器上acookie并无意义。
ports为源端口和目的端口合成的32位整数。通过直接比较组合的整数,可以加速匹配。
*/
if (INET_MATCH(sk, net, hash, acookie,
saddr, daddr, ports, dif)) {
/* 增加套接字引用计数 */
if (unlikely(!atomic_inc_not_zero(&sk->sk_refcnt)))
goto begintw;
/*
再次检测,防止套接字在INET_MATCH和增加计数之间被改变
*/
if (unlikely(!INET_MATCH(sk, net, hash, acookie,
saddr, daddr, ports, dif))) {
sock_put(sk);
goto begin;
}
goto out;
}
}
/*
* if the nulls value we got at the end of this lookup is
* not the expected one, we must restart lookup.
* We probably met an item that was moved to another chain.
*/
if (get_nulls_value(node) != slot)
goto begin;
begintw:
/* 如果在已经连接的hash表中找不到对应的套接字,则需要到连接为TIME_WAIT状态的hash表中查找
套接字,原理与上面相同。这说明TIME_WAIT状态的连接如果依然存在,则会优先于监听套接字。
*/
sk_nulls_for_each_rcu(sk, node, &head->twchain) {
if (INET_TW_MATCH(sk, net, hash, acookie,
saddr, daddr, ports, dif)) {
if (unlikely(!atomic_inc_not_zero(&sk->sk_refcnt))) {
sk = NULL;
goto out;
}
/* 与上面一样,需要再次进行匹配检查 */
if (unlikely(!INET_TW_MATCH(sk, net, hash, acookie,
saddr, daddr, ports, dif))) {
sock_put(sk);
goto begintw;
}
goto out;
}
}
/* 省略 */
……
}如果在已经连接和 TIME_WAIT 状态的 hash 表中,都没有找到匹配的套接字.这时就需要到监听 hash 表中查找匹配的套接字,代码如下:
struct sock *__inet_lookup_listener(struct net *net,
struct inet_hashinfo *hashinfo,
const __be32 daddr, const unsigned short hnum,
const int dif)
{
struct sock *sk, *result;
struct hlist_nulls_node *node;
/* 根据源端口计算监听hash表对应的桶索引 */
unsigned int hash = inet_lhashfn(net, hnum);
struct inet_listen_hashbucket *ilb = &hashinfo->listening_hash[hash];
int score, hiscore;
rcu_read_lock();
begin:
result = NULL;
hiscore = -1;
/* 遍历该桶中的套接字,与UDP相似,得分最高的套接字为匹配套接字 */
sk_nulls_for_each_rcu(sk, node, &ilb->head) {
/* 计算套接字的得分 */
score = compute_score(sk, net, hnum, daddr, dif);
if (score > hiscore) {
result = sk;
hiscore = score;
}
}
/*
* if the nulls value we got at the end of this lookup is
* not the expected one, we must restart lookup.
* We probably met an item that was moved to another chain.
*/
if (get_nulls_value(node) != hash + LISTENING_NULLS_BASE)
goto begin;
/* 找到了匹配的套接字 */
if (result) {
/* 增加套接字引用计数 */
if (unlikely(!atomic_inc_not_zero(&result->sk_refcnt)))
result = NULL;
/* 需要再次检查该套接字的得分 */
else if (unlikely(compute_score(result, net, hnum, daddr,
dif) < hiscore)) {
sock_put(result);
goto begin;
}
}
rcu_read_unlock();
return result;
}匹配TCP监听套接字的流程与UDP基本相同,都是在计算套接字的得分.下面我们来看一下TCP监听套接字的得分计算函数 compute_score.
static inline int compute_score(struct sock *sk, struct net *net,
const unsigned short hnum, const __be32 daddr,
const int dif)
{
int score = -1;
struct inet_sock *inet = inet_sk(sk);
/* 必须匹配名称空间和目的端口 */
if (net_eq(sock_net(sk), net) && inet->inet_num == hnum &&
!ipv6_only_sock(sk)) {
__be32 rcv_saddr = inet->inet_rcv_saddr;
/* 若协议族为PF_INET,则得分加1 */
score = sk->sk_family == PF_INET ? 1 : 0;
/*
若套接字指定了接收地址,则接收地址必须与目的地址相同。
不同则不匹配,相同则得分加2。
*/
if (rcv_saddr) {
if (rcv_saddr != daddr)
return -1;
score += 2;
}
/*
如果套接字绑定了网卡,则接收网卡必须相同。
不同则不匹配,相同则得分加2。
*/
if (sk->sk_bound_dev_if) {
if (sk->sk_bound_dev_if != dif)
return -1;
score += 2;
}
}
return score;
}与UDP的差异
这个函数的名字和功能均与UDP的相同,但是其实现代码却略有不同。大家可以发现,TCP的
compute_score不会对源端信息进行任何检查,如源地址、源端口等。为什么会这样呢?原因在于,TCP的compute_score是用于监听套接字的匹配的。这就意味着这是TCP连接的第一个数据包即SYN包,属于连接初始化阶段,这时自然无须对源端信息进行任何检查和匹配。在本机回复了SYN+ACK后,本机会创建已连接套接字并插入到已连接hash表中。这样在此连接后面的数据包,就会在已连接的hash表中找到匹配的套接字,而不会再进入监听套接字的匹配。