找回密码
 用户注册

QQ登录

只需一步,快速开始

查看: 5454|回复: 0

[原]Redis源代码分析之七:事件驱动库分析

[复制链接]
发表于 2012-3-5 22:51:09 | 显示全部楼层 |阅读模式
aeEventLoop是一个记录记录程序事件状态的结构:
  1. /* State of an event based program */
  2. typedef struct aeEventLoop {
  3.     int maxfd;
  4.     long long timeEventNextId;
  5.     aeFileEvent events[AE_SETSIZE]; /* Registered events */
  6.     aeFiredEvent fired[AE_SETSIZE]; /* Fired events */
  7.     aeTimeEvent *timeEventHead;
  8.     int stop;
  9.     void *apidata; /* This is used for polling API specific data */
  10.     aeBeforeSleepProc *beforesleep;
  11. } aeEventLoop;
复制代码
该结构在aeCreateEventLoop()函数中得到初始化。
EventLoop中的事件类型包括时间事件:
  1. /* Time event structure */
  2. typedef struct aeTimeEvent {
  3.     long long id; /* time event identifier. */
  4.     long when_sec; /* seconds */
  5.     long when_ms; /* milliseconds */
  6.     aeTimeProc *timeProc;
  7.     aeEventFinalizerProc *finalizerProc;
  8.     void *clientData;
  9.     struct aeTimeEvent *next;
  10. } aeTimeEvent;
复制代码
以及文件事件:
  1. /* File event structure */
  2. typedef struct aeFileEvent {
  3.     int mask; /* one of AE_(READABLE|WRITABLE) */
  4.     aeFileProc *rfileProc;
  5.     aeFileProc *wfileProc;
  6.     void *clientData;
  7. } aeFileEvent;
复制代码
分别在aeCreateTimeEvent函数和aeCreateTimeEvent函数中得到初始化。
当事件循环EventLoop结构在initServer()中创建并初始化完成之后,主函数就调用aeMain()函数开始处理事件:
  1. void aeMain(aeEventLoop *eventLoop) {
  2.     eventLoop->stop = 0;
  3.     while (!eventLoop->stop) {
  4.         if (eventLoop->beforesleep != NULL)
  5.             eventLoop->beforesleep(eventLoop);
  6.         aeProcessEvents(eventLoop, AE_ALL_EVENTS);
  7.     }
  8. }
复制代码

我们看到,只要eventLoop结构中停止处理的标识stop不为1,事件循环就不断地调用aeProcessEvents来处理事件。

int aeProcessEvents(aeEventLoop *eventLoop, int flags);
该函数处理每个待处理的时间事件和文件事件,传入的flag决定处理的方式:
  1. * If flags is 0, the function does nothing and returns.
  2. * if flags has AE_ALL_EVENTS set, all the kind of events are processed.
  3. * if flags has AE_FILE_EVENTS set, file events are processed.
  4. * if flags has AE_TIME_EVENTS set, time events are processed.
  5. * if flags has AE_DONT_WAIT set the function returns ASAP until all
  6. * the events that's possible to process without to wait are processed.
复制代码
函数返回完成处理的事件的数量。
函数首先调用aeSearchNearestTimer函数选择最近要fire的计时器,然后更新tvp,这一步不展开。
在函数中,调用aeApiPoll来监控事件,该函数封装了select、kqueue、epoll三种机制:
  1. numevents = aeApiPoll(eventLoop, tvp);
复制代码

在select模型下,实现为:
  1. retval = select(eventLoop->maxfd+1,
  2.                 &state->_rfds,&state->_wfds,NULL,tvp);
复制代码

在kqueue模型下,实现为
  1. if (tvp != NULL) {
  2.         struct timespec timeout;
  3.         timeout.tv_sec = tvp->tv_sec;
  4.         timeout.tv_nsec = tvp->tv_usec * 1000;
  5.         retval = kevent(state->kqfd, NULL, 0, state->events, AE_SETSIZE, &timeout);
  6.     } else {
  7.         retval = kevent(state->kqfd, NULL, 0, state->events, AE_SETSIZE, NULL);
  8.     }   
复制代码

在epoll模型下,实现为:
  1. retval = epoll_wait(state->epfd,state->events,AE_SETSIZE,
  2.             tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
复制代码

关于这三种处理机制的分析比较可以参考:http://www.kegel.com/c10k.html
完成事件的监控调度之后,把这些事件加入eventLoop的fired记录中,然后返回numevents:
  1. if (retval > 0) {
  2.         int j;
  3.         numevents = retval;
  4.         for (j = 0; j < numevents; j++) {
  5.             int mask = 0;
  6.             struct epoll_event *e = state->events+j;
  7.             if (e->events & EPOLLIN) mask |= AE_READABLE;
  8.             if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
  9.             eventLoop->fired[j].fd = e->data.fd;
  10.             eventLoop->fired[j].mask = mask;
  11.         }
  12.     }
  13.     return numevents;
复制代码

aeFiredEvent定义为:
  1. /* A fired event */
  2. typedef struct aeFiredEvent {
  3.     int fd;
  4.     int mask;
  5. } aeFiredEvent;
复制代码
回到aeProcessEvents函数,处理标记好的firedEvents:
  1. for (j = 0; j < numevents; j++) {
  2.             aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
  3.             int mask = eventLoop->fired[j].mask;
  4.             int fd = eventLoop->fired[j].fd;
  5.             int rfired = 0;
  6. /* note the fe->mask & mask & ... code: maybe an already processed
  7.              * event removed an element that fired and we still didn't
  8.              * processed, so we check if the event is still valid. */
  9.             if (fe->mask & mask & AE_READABLE) {
  10.                 rfired = 1;
  11.                 fe->rfileProc(eventLoop,fd,fe->clientData,mask);
  12.             }
  13.             if (fe->mask & mask & AE_WRITABLE) {
  14.                 if (!rfired || fe->wfileProc != fe->rfileProc)
  15.                     fe->wfileProc(eventLoop,fd,fe->clientData,mask);
  16.             }
  17.             processed++;
  18.         }
复制代码
aeFileEvent中定义了两种文件处理的回调函数:rfileProc和wfileProc,分别对应读写。
处理完文件事件之后,才处理时间事件:

/* Check time events */
  1.     if (flags & AE_TIME_EVENTS)
  2.         processed += processTimeEvents(eventLoop);其实现如下:
  3. /* Process time events */
  4. static int processTimeEvents(aeEventLoop *eventLoop) {
  5.     int processed = 0;
  6.     aeTimeEvent *te;
  7.     long long maxId;
  8.     te = eventLoop->timeEventHead;
  9.     maxId = eventLoop->timeEventNextId-1;
  10.     while(te) {
  11.         long now_sec, now_ms;
  12.         long long id;
  13.         if (te->id > maxId) {
  14.             te = te->next;
  15.             continue;
  16.         }
  17.         aeGetTime(&now_sec, &now_ms);
  18.         if (now_sec > te->when_sec ||
  19.             (now_sec == te->when_sec && now_ms >= te->when_ms))
  20.         {
  21.             int retval;
  22.             id = te->id;
  23.             retval = te->timeProc(eventLoop, id, te->clientData);
  24.             processed++;
  25.             /* After an event is processed our time event list may
  26.              * no longer be the same, so we restart from head.
  27.              * Still we make sure to don't process events registered
  28.              * by event handlers itself in order to don't loop forever.
  29.              * To do so we saved the max ID we want to handle.
  30.              *
  31.              * FUTURE OPTIMIZATIONS:
  32.              * Note that this is NOT great algorithmically. Redis uses
  33.              * a single time event so it's not a problem but the right
  34.              * way to do this is to add the new elements on head, and
  35.              * to flag deleted elements in a special way for later
  36.              * deletion (putting references to the nodes to delete into
  37.              * another linked list). */
  38.             if (retval != AE_NOMORE) {
  39.                 aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
  40.             } else {
  41.                 aeDeleteTimeEvent(eventLoop, id);
  42.             }
  43.             te = eventLoop->timeEventHead;
  44.         } else {
  45.             te = te->next;
  46.         }
  47.     }
  48.     return processed;
  49. }
复制代码
我们看到,也是通过调用aeTimeProc *类型的回调函数来处理事件。在遍历时间事件链表的过程中,对每个事件重复:如果事件已经处理完,retval的值标记为AE_NOMORE,则调用aeDeleteTimeEvent删除时间事件,否则,调用adAddMillisecondsToNow更新计时器。

最后返回已处理的事件数量:

return processed; /* return the number of processed file/time events */




作者:Aegeaner 发表于2012-2-27 15:12:49 原文链接


您需要登录后才可以回帖 登录 | 用户注册

本版积分规则

Archiver|手机版|小黑屋|ACE Developer ( 京ICP备06055248号 )

GMT+8, 2024-11-23 21:18 , Processed in 0.025572 second(s), 5 queries , Redis On.

Powered by Discuz! X3.5

© 2001-2023 Discuz! Team.

快速回复 返回顶部 返回列表