找回密码
 用户注册

QQ登录

只需一步,快速开始

查看: 25045|回复: 0

Redis源码分析:Event-driven programming library |yfkiss

[复制链接]
发表于 2011-12-14 14:07:46 | 显示全部楼层 |阅读模式
Event-driven programming  libray,提到这个,很容易想到如雷贯耳的libevent库(libeven封装了以下三种事件的响应:IO事件,定时器事件,信号事件)。
redis的没有采用庞大的libevent库,而是自己写了一个,牺牲了一些平台通用性,但是性能非常强劲。memcache采用了libevent,有人认为这是redis的优于性能比memcache性能。没有测试过,保留意见。

x相关源码:ae.h ae.c
networking.c   anet.c  net.h  ae_epoll.c ae_select.c ae_kqueue.c
ae.h、ae.c :event library具体实现
networking.c : 与客户端的交互
anet.h anet.c : 网络通信
ae_epoll.c ae_select.c ae_kqueue.c  : 不同系统多路IO封装
             ae_epoll.c : linux平台
             ae_select.c :unix平台
             ae_kqueue.c : BSD、APPLE

ae.c多路IO选择:
  1. #ifdef __linux__
  2. #define HAVE_EPOLL 1
  3. #endif
  4. #if (defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)) || defined(__FreeBSD__) || defined(__OpenBSD__) || defined (__NetBSD__)
  5. #define HAVE_KQUEUE 1
  6. #endif
  7. #ifdef HAVE_EPOLL
  8. #include "ae_epoll.c"
  9. #else
  10.     #ifdef HAVE_KQUEUE
  11.     #include "ae_kqueue.c"
  12.     #else
  13.     #include "ae_select.c"
  14.     #endif
  15. #endif
复制代码
多路IO封装(以ae_epoll.c为例):
aeApiCreate:创建句柄(epoll_create)
aeApiFree:关闭句柄(close)
aeApiAddEvent:事件添加(epoll_ctl)
aeApiDelEvent:事件删除(epoll_ctl)
aeApiPoll:等待事件发生(epoll_wait)

网络简要说明:   
1. 初始化server, 等待客户端连接,并注册事件,回调函数acceptTcpHandler/acceptUnixHandler
  1. if (server.ipfd > 0 && aeCreateFileEvent(server.el,server.ipfd,AE_READABLE,
  2.         acceptTcpHandler,NULL) == AE_ERR) oom("creating file event");
  3.     if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
  4.         acceptUnixHandler,NULL) == AE_ERR) oom("creating file event");
复制代码
2. 回调函数acceptTcpHandler/acceptUnixHandler
在监听到新连接请求时,接收连接,创建redisClient对象,并注册事件(回调函数readQueryFromClient)
  1. void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
  2.     redisClient *c = (redisClient*) privdata;
  3.     char buf[REDIS_IOBUF_LEN];
  4.     int nread;
  5.     REDIS_NOTUSED(el);
  6.     REDIS_NOTUSED(mask);
  7.     nread = read(fd, buf, REDIS_IOBUF_LEN);
  8.     if (nread == -1) {
  9.         if (errno == EAGAIN) {
  10.             nread = 0;
  11.         } else {
  12.             redisLog(REDIS_VERBOSE, "Reading from client: %s",strerror(errno));
  13.             freeClient(c);
  14.             return;
  15.         }
  16.     } else if (nread == 0) {
  17.         redisLog(REDIS_VERBOSE, "Client closed connection");
  18.         freeClient(c);
  19.         return;
  20.     }
  21.     if (nread) {
  22.         c->querybuf = sdscatlen(c->querybuf,buf,nread);
  23.         c->lastinteraction = time(NULL);
  24.     } else {
  25.         return;
  26.     }
  27.     if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
  28.         sds ci = getClientInfoString(c), bytes = sdsempty();
  29.         bytes = sdscatrepr(bytes,c->querybuf,64);
  30.         redisLog(REDIS_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
  31.         sdsfree(ci);
  32.         sdsfree(bytes);
  33.         freeClient(c);
  34.         return;
  35.     }
  36.     processInputBuffer(c);
  37. }
复制代码
3. 客户端请求处理:
在接收到客户端请求数据后,首先对请求进行解析,解析完成后反馈请求
  1. void processInputBuffer(redisClient *c) {
  2.     /* Keep processing while there is something in the input buffer */
  3.     while(sdslen(c->querybuf)) {
  4.         /* Immediately abort if the client is in the middle of something. */
  5.         if (c->flags & REDIS_BLOCKED || c->flags & REDIS_IO_WAIT) return;
  6.         /* REDIS_CLOSE_AFTER_REPLY closes the connection once the reply is
  7.          * written to the client. Make sure to not let the reply grow after
  8.          * this flag has been set (i.e. don't process more commands). */
  9.         if (c->flags & REDIS_CLOSE_AFTER_REPLY) return;
  10.         /* Determine request type when unknown. */
  11.         if (!c->reqtype) {
  12.             if (c->querybuf[0] == '*') {
  13.                 c->reqtype = REDIS_REQ_MULTIBULK;
  14.             } else {
  15.                 c->reqtype = REDIS_REQ_INLINE;
  16.             }
  17.         }
  18.         if (c->reqtype == REDIS_REQ_INLINE) {
  19.             if (processInlineBuffer(c) != REDIS_OK) break;
  20.         } else if (c->reqtype == REDIS_REQ_MULTIBULK) {
  21.             if (processMultibulkBuffer(c) != REDIS_OK) break;
  22.         } else {
  23.             redisPanic("Unknown request type");
  24.         }
  25.         /* Multibulk processing could see a <= 0 length. */
  26.         if (c->argc == 0) {
  27.             resetClient(c);
  28.         } else {
  29.             /* Only reset the client when the command was executed. */
  30.             if (processCommand(c) == REDIS_OK)
  31.                 resetClient(c);
  32.         }
  33.     }
  34. }
复制代码
在请求处理完成后,反馈结果.
  1. int processCommand(redisClient *c) {
  2.     .......
  3.     /* Exec the command */
  4.     if (c->flags & REDIS_MULTI &&
  5.         c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
  6.         c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
  7.     {
  8.         queueMultiCommand(c);
  9.         addReply(c,shared.queued);
  10.     } else {
  11.         if (server.vm_enabled && server.vm_max_threads > 0 &&
  12.             blockClientOnSwappedKeys(c)) return REDIS_ERR;
  13.         call(c);
  14.     }
  15.     return REDIS_OK;
  16. }void addReply(redisClient *c, robj *obj) {
  17.     if (_installWriteEvent(c) != REDIS_OK) return;
  18.     redisAssert(!server.vm_enabled || obj->storage == REDIS_VM_MEMORY);
  19.     /* This is an important place where we can avoid copy-on-write
  20.      * when there is a saving child running, avoiding touching the
  21.      * refcount field of the object if it's not needed.
  22.      *
  23.      * If the encoding is RAW and there is room in the static buffer
  24.      * we'll be able to send the object to the client without
  25.      * messing with its page. */
  26.     if (obj->encoding == REDIS_ENCODING_RAW) {
  27.         if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != REDIS_OK)
  28.             _addReplyObjectToList(c,obj);
  29.     } else {
  30.         /* FIXME: convert the long into string and use _addReplyToBuffer()
  31.          * instead of calling getDecodedObject. As this place in the
  32.          * code is too performance critical. */
  33.         obj = getDecodedObject(obj);
  34.         if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != REDIS_OK)
  35.             _addReplyObjectToList(c,obj);
  36.         decrRefCount(obj);
  37.     }
  38. }int _installWriteEvent(redisClient *c) {
  39.     if (c->fd <= 0) return REDIS_ERR;
  40.     if (c->bufpos == 0 && listLength(c->reply) == 0 &&
  41.         (c->replstate == REDIS_REPL_NONE ||
  42.          c->replstate == REDIS_REPL_ONLINE) &&
  43.         aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
  44.         sendReplyToClient, c) == AE_ERR) return REDIS_ERR;
  45.     return REDIS_OK;
  46. }
复制代码
Redis执行完客户端请求后,会调用addReply,在addReply中调用installWriteEvent来注册一个事件,并绑定事件处理函数sendReplyToClient,用来把数据发送到client。
4.主循环
处理定时事件和注册事件
  1. void aeMain(aeEventLoop *eventLoop) {
  2.     eventLoop->stop = 0;
  3.     while (!eventLoop->stop) {
  4.         if (eventLoop->beforesleep != NULL)
  5.             eventLoop->beforesleep(eventLoop);
  6.         aeProcessEvents(eventLoop, AE_ALL_EVENTS);
  7.     }
  8. }
复制代码
beforesleep通过aeSetBeforeSleepProc定义,主要是特殊处理vm和aof相关的请求
  1. int aeProcessEvents(aeEventLoop *eventLoop, int flags)
  2. {
  3.     int processed = 0, numevents;
  4.     /* Nothing to do? return ASAP */
  5.     if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
  6.     /* Note that we want call select() even if there are no
  7.      * file events to process as long as we want to process time
  8.      * events, in order to sleep until the next time event is ready
  9.      * to fire. */
  10.     if (eventLoop->maxfd != -1 ||
  11.         ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
  12.         int j;
  13.         aeTimeEvent *shortest = NULL;
  14.         struct timeval tv, *tvp;
  15.         if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
  16.             shortest = aeSearchNearestTimer(eventLoop);
  17.         if (shortest) {
  18.             long now_sec, now_ms;
  19.             /* Calculate the time missing for the nearest
  20.              * timer to fire. */
  21.             aeGetTime(&now_sec, &now_ms);
  22.             tvp = &tv;
  23.             tvp->tv_sec = shortest->when_sec - now_sec;
  24.             if (shortest->when_ms < now_ms) {
  25.                 tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000;
  26.                 tvp->tv_sec --;
  27.             } else {
  28.                 tvp->tv_usec = (shortest->when_ms - now_ms)*1000;
  29.             }
  30.             if (tvp->tv_sec < 0) tvp->tv_sec = 0;
  31.             if (tvp->tv_usec < 0) tvp->tv_usec = 0;
  32.         } else {
  33.             /* If we have to check for events but need to return
  34.              * ASAP because of AE_DONT_WAIT we need to se the timeout
  35.              * to zero */
  36.             if (flags & AE_DONT_WAIT) {
  37.                 tv.tv_sec = tv.tv_usec = 0;
  38.                 tvp = &tv;
  39.             } else {
  40.                 /* Otherwise we can block */
  41.                 tvp = NULL; /* wait forever */
  42.             }
  43.         }
  44.         numevents = aeApiPoll(eventLoop, tvp);
  45.         for (j = 0; j < numevents; j++) {
  46.             aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
  47.             int mask = eventLoop->fired[j].mask;
  48.             int fd = eventLoop->fired[j].fd;
  49.             int rfired = 0;
  50.             /* note the fe->mask & mask & ... code: maybe an already processed
  51.              * event removed an element that fired and we still didn't
  52.              * processed, so we check if the event is still valid. */
  53.             if (fe->mask & mask & AE_READABLE) {
  54.                 rfired = 1;
  55.                 fe->rfileProc(eventLoop,fd,fe->clientData,mask);
  56.             }
  57.             if (fe->mask & mask & AE_WRITABLE) {
  58.                 if (!rfired || fe->wfileProc != fe->rfileProc)
  59.                     fe->wfileProc(eventLoop,fd,fe->clientData,mask);
  60.             }
  61.             processed++;
  62.         }
  63.     }
  64.     /* Check time events */
  65.     if (flags & AE_TIME_EVENTS)
  66.         processed += processTimeEvents(eventLoop);
  67.     return processed; /* return the number of processed file/time events */
  68. }
复制代码
rfileProc和wfileProc即注册事件时定义的回调函数

作者:yfkiss 发表于2011-12-13 23:33:27 原文链接
您需要登录后才可以回帖 登录 | 用户注册

本版积分规则

Archiver|手机版|小黑屋|ACE Developer ( 京ICP备06055248号 )

GMT+8, 2024-4-28 02:49 , Processed in 0.124782 second(s), 5 queries , Redis On.

Powered by Discuz! X3.5

© 2001-2023 Discuz! Team.

快速回复 返回顶部 返回列表