|
aeEventLoop是一个记录记录程序事件状态的结构:- /* State of an event based program */
- typedef struct aeEventLoop {
- int maxfd;
- long long timeEventNextId;
- aeFileEvent events[AE_SETSIZE]; /* Registered events */
- aeFiredEvent fired[AE_SETSIZE]; /* Fired events */
- aeTimeEvent *timeEventHead;
- int stop;
- void *apidata; /* This is used for polling API specific data */
- aeBeforeSleepProc *beforesleep;
- } aeEventLoop;
复制代码 该结构在aeCreateEventLoop()函数中得到初始化。
EventLoop中的事件类型包括时间事件:
- /* Time event structure */
- typedef struct aeTimeEvent {
- long long id; /* time event identifier. */
- long when_sec; /* seconds */
- long when_ms; /* milliseconds */
- aeTimeProc *timeProc;
- aeEventFinalizerProc *finalizerProc;
- void *clientData;
- struct aeTimeEvent *next;
- } aeTimeEvent;
复制代码 以及文件事件:
- /* File event structure */
- typedef struct aeFileEvent {
- int mask; /* one of AE_(READABLE|WRITABLE) */
- aeFileProc *rfileProc;
- aeFileProc *wfileProc;
- void *clientData;
- } aeFileEvent;
复制代码 分别在aeCreateTimeEvent函数和aeCreateTimeEvent函数中得到初始化。
当事件循环EventLoop结构在initServer()中创建并初始化完成之后,主函数就调用aeMain()函数开始处理事件:
- void aeMain(aeEventLoop *eventLoop) {
- eventLoop->stop = 0;
- while (!eventLoop->stop) {
- if (eventLoop->beforesleep != NULL)
- eventLoop->beforesleep(eventLoop);
- aeProcessEvents(eventLoop, AE_ALL_EVENTS);
- }
- }
复制代码
我们看到,只要eventLoop结构中停止处理的标识stop不为1,事件循环就不断地调用aeProcessEvents来处理事件。
int aeProcessEvents(aeEventLoop *eventLoop, int flags);
该函数处理每个待处理的时间事件和文件事件,传入的flag决定处理的方式:
- * If flags is 0, the function does nothing and returns.
- * if flags has AE_ALL_EVENTS set, all the kind of events are processed.
- * if flags has AE_FILE_EVENTS set, file events are processed.
- * if flags has AE_TIME_EVENTS set, time events are processed.
- * if flags has AE_DONT_WAIT set the function returns ASAP until all
- * the events that's possible to process without to wait are processed.
复制代码 函数返回完成处理的事件的数量。
函数首先调用aeSearchNearestTimer函数选择最近要fire的计时器,然后更新tvp,这一步不展开。
在函数中,调用aeApiPoll来监控事件,该函数封装了select、kqueue、epoll三种机制:
- numevents = aeApiPoll(eventLoop, tvp);
复制代码
在select模型下,实现为:
- retval = select(eventLoop->maxfd+1,
- &state->_rfds,&state->_wfds,NULL,tvp);
复制代码
在kqueue模型下,实现为:
- if (tvp != NULL) {
- struct timespec timeout;
- timeout.tv_sec = tvp->tv_sec;
- timeout.tv_nsec = tvp->tv_usec * 1000;
- retval = kevent(state->kqfd, NULL, 0, state->events, AE_SETSIZE, &timeout);
- } else {
- retval = kevent(state->kqfd, NULL, 0, state->events, AE_SETSIZE, NULL);
- }
复制代码
在epoll模型下,实现为:
- retval = epoll_wait(state->epfd,state->events,AE_SETSIZE,
- tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
复制代码
关于这三种处理机制的分析比较可以参考:http://www.kegel.com/c10k.html
完成事件的监控调度之后,把这些事件加入eventLoop的fired记录中,然后返回numevents:
- if (retval > 0) {
- int j;
- numevents = retval;
- for (j = 0; j < numevents; j++) {
- int mask = 0;
- struct epoll_event *e = state->events+j;
- if (e->events & EPOLLIN) mask |= AE_READABLE;
- if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
- eventLoop->fired[j].fd = e->data.fd;
- eventLoop->fired[j].mask = mask;
- }
- }
- return numevents;
复制代码
aeFiredEvent定义为:
- /* A fired event */
- typedef struct aeFiredEvent {
- int fd;
- int mask;
- } aeFiredEvent;
复制代码 回到aeProcessEvents函数,处理标记好的firedEvents:
- for (j = 0; j < numevents; j++) {
- aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
- int mask = eventLoop->fired[j].mask;
- int fd = eventLoop->fired[j].fd;
- int rfired = 0;
- /* note the fe->mask & mask & ... code: maybe an already processed
- * event removed an element that fired and we still didn't
- * processed, so we check if the event is still valid. */
- if (fe->mask & mask & AE_READABLE) {
- rfired = 1;
- fe->rfileProc(eventLoop,fd,fe->clientData,mask);
- }
- if (fe->mask & mask & AE_WRITABLE) {
- if (!rfired || fe->wfileProc != fe->rfileProc)
- fe->wfileProc(eventLoop,fd,fe->clientData,mask);
- }
- processed++;
- }
复制代码 aeFileEvent中定义了两种文件处理的回调函数:rfileProc和wfileProc,分别对应读写。
处理完文件事件之后,才处理时间事件:
/* Check time events */- if (flags & AE_TIME_EVENTS)
- processed += processTimeEvents(eventLoop);其实现如下:
- /* Process time events */
- static int processTimeEvents(aeEventLoop *eventLoop) {
- int processed = 0;
- aeTimeEvent *te;
- long long maxId;
- te = eventLoop->timeEventHead;
- maxId = eventLoop->timeEventNextId-1;
- while(te) {
- long now_sec, now_ms;
- long long id;
- if (te->id > maxId) {
- te = te->next;
- continue;
- }
- aeGetTime(&now_sec, &now_ms);
- if (now_sec > te->when_sec ||
- (now_sec == te->when_sec && now_ms >= te->when_ms))
- {
- int retval;
- id = te->id;
- retval = te->timeProc(eventLoop, id, te->clientData);
- processed++;
- /* After an event is processed our time event list may
- * no longer be the same, so we restart from head.
- * Still we make sure to don't process events registered
- * by event handlers itself in order to don't loop forever.
- * To do so we saved the max ID we want to handle.
- *
- * FUTURE OPTIMIZATIONS:
- * Note that this is NOT great algorithmically. Redis uses
- * a single time event so it's not a problem but the right
- * way to do this is to add the new elements on head, and
- * to flag deleted elements in a special way for later
- * deletion (putting references to the nodes to delete into
- * another linked list). */
- if (retval != AE_NOMORE) {
- aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
- } else {
- aeDeleteTimeEvent(eventLoop, id);
- }
- te = eventLoop->timeEventHead;
- } else {
- te = te->next;
- }
- }
- return processed;
- }
复制代码 我们看到,也是通过调用aeTimeProc *类型的回调函数来处理事件。在遍历时间事件链表的过程中,对每个事件重复:如果事件已经处理完,retval的值标记为AE_NOMORE,则调用aeDeleteTimeEvent删除时间事件,否则,调用adAddMillisecondsToNow更新计时器。
最后返回已处理的事件数量:
return processed; /* return the number of processed file/time events */
作者:Aegeaner 发表于2012-2-27 15:12:49 原文链接
|
|