此学习笔记主要记录对Muduo源码的理解,主要参考陈硕的《Linux多线程服务端编程》,其他的参考资料会列于笔记末尾。水平有限,若有错误请在我的阅读笔记项目中指出。如果觉得写得还不错,可以点个小星星。
Poller介绍
Poller class
是IO multiplexing
的封装,在muduo
网络库中是一个抽象基类,muduo
同时支持poll(2)
和epoll(7)
两种机制,epoll(7)
采用水平触发。这个会在后面源码分析部分讲到。Poller
是EventLoop
的间接成员,由EventLoop
对象利用std::unique_str
智能指针管理,生命周期EventLoop
相同。owner EventLoop
在IO线程调用Poller::poll
,无须加锁。Poller
并不拥有Channel
, Channel
析构之前必须自己unregister
, (Channel::remove -> EventLoop:removeChannel -> Poller::removeChannel
)。
接下来我们首先介绍下IO multiplexing
,然后分析下Poller
相关的源码。
IO Multiplexing
I/O多路复用
允许程序同时监视多个文件描述符,等待其中一个或多个文件描述符变为就绪,以执行相应的I/O操作(例如,read(2)
或write(2)
)。复用指的是复用一个线程、使用一个线程来检查多个文件描述符(Socket)的就绪状态。Linux下,实现I/O复用的系统调用主要有3个:1)select(2)
;2)poll(2)
;3)epoll(7)
。muduo采用了poll
和epoll
,分别用PollPoller/EPollPoller
进行了封装,基类Poller
用于提供统一的接口。
I/O多路复用最好搭配非阻塞IO食用:
On Linux, select/poll/epoll may report a socket file descriptor as “ready for reading”, while nevertheless a subsequent read blocks. This could for example happen when data has arrived but upon examination has the wrong checksum and is discarded. There may be other circumstances in which a file descriptor is spuriously reported as ready. Thus it may be safer to use O_NONBLOCK on sockets that should not block.
poll(2)
poll()
调用的声明如下:1
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
fds
和nfds
标识一个数组pollfds
,其中包含了所有被监视的描述符及相应的事件,每个被监视的文件描述符及其事件数组用struct pollfd
表示。网络库中Channel
的数据成员其实就对应于struct pollfd
的组成。1
2
3
4
5struct pollfd {
int fd; /* 文件描述符 */
short events; /* 监视的事件 */
short revents; /* 返回的事件 */
};poll()
系统调用是同步IO
, 会一直阻塞直到:
- 某个文件描述符就绪
- the call is interrupted by a signal handler
- 超时
poll()
调用的返回值有以下几种情况:
- 0: 超时
- 正数: 会返回一个非负的整数,表示
pollfds
数组中有多少个元素发生了监视的事件或者错误 - -1: poll发生了错误
常用的事件类型:
| 事件 | 含义 |
| —- | —- |
| POLLIN | 有数据要读取 |
| POLLPRI | 通常是urgent data to read |
| POLLOUT | 可写 |
| POLLRDHUP | Stream socket peer closed connection, or shut down writing half of connection. |
| POLLERR | 错误 |
| POLLHUP | 挂断。在Tcp连接中通常表示对端close
。Subsequent reads from the channel will return 0 (end of file) only after all outstanding data in the channel has been consumed |
| POLLNVAL | 不合法的操作:fd not open |POLLERR
, POLLHUP
和POLLNVAL
标识异常信息的仅会在返回的revents
中,设置在events
会被忽略。
epoll(7)
epoll(7)
系统调用和poll(2)
类似:监控多个文件描述符,找出其中可以进行I/O
的文件描述符。 它具有两种模式,一是边缘触发(edge-triggered),二是水平触发(level-triggered)。
epoll 的核心概念是 epoll 实例(epoll instance),这是内核的一个内部数据结构,从用户空间的角度看,它可以被看作一个内含两个列表的容器:
兴趣列表(interest list):一个集合包含需要监控的所有文件描述符及其事件。
就绪列表(ready list):“准备好”进行 I/O 的文件描述符的集合。就绪列表是兴趣列表中的文件描述符的子集。内核会根据这些文件描述符上的 I/O 活动动态地填充就绪列表。
使用 epoll 分为三步:
- 调用
epoll_create1(2)
创建一个新的 epoll 实例,并返回一个指向该实例的文件描述符。 调用
epoll_ctl(2)
向 epoll 实例的兴趣列表中添加项目,注册对特定文件描述符需要监控的事件。This system call is used to add, modify, or remove entries in the interest list of the epoll(7) instance referred to by the file descriptor epfd. It requests that the operation op be performed for the target file descriptor, fd.
Valid values for the op argument are:EPOLL_CTL_ADD
: Add fd to the interest list and associate the settings specified in event with the internal file linked to fd.EPOLL_CTL_MOD
: Change the settings associated with fd in the interest list to the new settings specified in event.EPOLL_CTL_DEL
: Remove (deregister) the target file descriptor fd from the interest list. The event argument is ignored and can be NULL (but see BUGS below).
调用
epoll_wait(2)
等待 I/O 事件,如果当前没有事件可用,则阻塞调用它的线程。(此系统调用可被看作从 epoll 实例的就绪列表中获取项目。)
水平触发 v.s. 边缘触发:
假设发生下列情况:
1.读取方在 epoll 实例中注册代表管道读取端(rfd)的文件描述符。
- 写入方在管道的写入端写入 2 kB 的数据。
- 读取方调用
epoll_wait(2)
, rfd 作为一个就绪的文件描述符被返回。 - 读取方只从 rfd 中读取 1 kB 的数据。
- 读取方再次调用
epoll_wait(2)
。
如果读取方添加 rfd 到 epoll 接口时使用了 EPOLLET (边缘触发)标志位,那么纵使此刻文件输入缓冲区中仍有可用的数据(剩余的1 KB 数据),步骤5中的epoll_wait(2)
调用仍可能会挂起;与此同时,写入方可能在等待读取方对它发送的数据的响应。造成这种互相等待的情形的原因是边缘触发模式只有在被监控的文件描述符发生变化时才会递送事件。因此,在步骤5中,读取方最终可能会为一些已经存在于自己输入缓冲区内的数据一直等下去。在上面的例子中,由于写入方在第2步中进行了写操作, rfd 上产生了一个事件,这个事件在第3步中被读取方消耗了。但读取方在第4步中进行的读操作却没有消耗完整个缓冲区的数据,因此在第5步中对epoll_wait(2)
的调用可能会无限期地阻塞。
使用 EPOLLET 标志位的应用程序应当使用非阻塞的文件描述符,以避免(因事件被消耗而)使正在处理多个文件描述符的任务因阻塞的读或写而出现饥饿。将 epoll用作边缘触发(EPOLLET)的接口,建议的使用方法如下:
a) 使用非阻塞的文件描述符;
b) 只在 read(2)
或 write(2)
返回 EAGAIN 后再等待新的事件。
当作为水平触发的接口使用时(默认情况,没有指定 EPOLLET), epoll 只是一个更快的 poll(2)
,可以用在任何能使用 poll(2)
的地方,因为此时两者的语义相同。
Poller源码分析
Poller
Poller
是抽象基类,EPollPoller
, PollPoller
分别是使用epoll
和poll
的派生类:
Poller
提供了Poller::hasChannel(Channel*)
成员函数的具体实现,用于判断Channel
是否由Poller
监听。
1 | /* |
此外,Poller::newDefaultPoller(EventLoop*)
静态成员函数基于环境变量获取具体的Poller
实例
1 | Poller* Poller::newDefaultPoller(EventLoop* loop) |
下面分别介绍PollerPoller
和EPollerPoller
实现的poll
方法,阻塞等待有Channel被激活。以及部分对Channel
进行管理, 说是管理,其实把Channel的更新同步到监听的方法实现。
PollPoller
PollerPoller
增加pollfds_: std::vector<struct pollfd>
数据成员。
poll
1 |
|
Channel更新及删除
1 |
|
EPollerPoller
poll
1 | /* 利用epoll_wait系统调用poll,和PollPoller逻辑没啥区别 |
Channel管理
EPollPoller
与PollPoller
主要在这个部分区别较大,毕竟poll
是直接传一个数组,简单粗暴,而epoll
需要调用epoll_ctl(2)
进行增删改。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70// Channel更新
void EPollPoller::updateChannel(Channel* channel)
{
Poller::assertInLoopThread();
const int index = channel->index();
// Channel默认为kNew
// 需要进行新增操作
if (index == kNew || index == kDeleted)
{
// a new one, add with EPOLL_CTL_ADD
int fd = channel->fd();
// 确定是kNew or kDeleted 判断省略
channel->set_index(kAdded);
update(EPOLL_CTL_ADD, channel);
}
else
{
// update existing one with EPOLL_CTL_MOD/DEL
// 更新已有的Channel 修改或删除
int fd = channel->fd();
// 判断确实是当前已添加在监听的Channel 判断省略...
// 需要取消监听该Channel
if (channel->isNoneEvent())
{
update(EPOLL_CTL_DEL, channel);
channel->set_index(kDeleted);
}
// 需要修改监听的事件类型
else
{
update(EPOLL_CTL_MOD, channel);
}
}
}
void EPollPoller::removeChannel(Channel* channel)
{
Poller::assertInLoopThread();
int fd = channel->fd();
// 确定存在该Channel&&目前没有任何监听 判断过程省略
int index = channel->index();
size_t n = channels_.erase(fd);
// 重制Channel状态
if (index == kAdded)
{
update(EPOLL_CTL_DEL, channel);
}
channel->set_index(kNew);
}
// epoll_ctl的一个封装,便于更好的使用
void EPollPoller::update(int operation, Channel* channel)
{
struct epoll_event event;
memZero(&event, sizeof event);
event.events = channel->events();
event.data.ptr = channel;
int fd = channel->fd();
if (::epoll_ctl(epollfd_, operation, fd, &event) < 0)
{
if (operation == EPOLL_CTL_DEL)
{
LOG_SYSERR << "epoll_ctl op =" << operationToString(operation) << " fd =" << fd;
}
else
{
LOG_SYSFATAL << "epoll_ctl op =" << operationToString(operation) << " fd =" << fd;
}
}
}