winston 发表于 2011-12-14 14:07:46

Redis源码分析:Event-driven programming library |yfkiss

Event-driven programminglibray,提到这个,很容易想到如雷贯耳的libevent库(libeven封装了以下三种事件的响应:IO事件,定时器事件,信号事件)。
redis的没有采用庞大的libevent库,而是自己写了一个,牺牲了一些平台通用性,但是性能非常强劲。memcache采用了libevent,有人认为这是redis的优于性能比memcache性能。没有测试过,保留意见。

x相关源码:ae.h ae.c
networking.c   anet.cnet.hae_epoll.c ae_select.c ae_kqueue.c
ae.h、ae.c :event library具体实现
networking.c : 与客户端的交互
anet.h anet.c : 网络通信
ae_epoll.c ae_select.c ae_kqueue.c: 不同系统多路IO封装
             ae_epoll.c : linux平台
             ae_select.c :unix平台
             ae_kqueue.c : BSD、APPLE

ae.c多路IO选择:
#ifdef __linux__
#define HAVE_EPOLL 1
#endif

#if (defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)) || defined(__FreeBSD__) || defined(__OpenBSD__) || defined (__NetBSD__)
#define HAVE_KQUEUE 1
#endif

#ifdef HAVE_EPOLL
#include "ae_epoll.c"
#else
    #ifdef HAVE_KQUEUE
    #include "ae_kqueue.c"
    #else
    #include "ae_select.c"
    #endif
#endif
多路IO封装(以ae_epoll.c为例):
aeApiCreate:创建句柄(epoll_create)
aeApiFree:关闭句柄(close)
aeApiAddEvent:事件添加(epoll_ctl)
aeApiDelEvent:事件删除(epoll_ctl)
aeApiPoll:等待事件发生(epoll_wait)

网络简要说明:   
1. 初始化server, 等待客户端连接,并注册事件,回调函数acceptTcpHandler/acceptUnixHandler
if (server.ipfd > 0 && aeCreateFileEvent(server.el,server.ipfd,AE_READABLE,
      acceptTcpHandler,NULL) == AE_ERR) oom("creating file event");
    if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
      acceptUnixHandler,NULL) == AE_ERR) oom("creating file event");2. 回调函数acceptTcpHandler/acceptUnixHandler
在监听到新连接请求时,接收连接,创建redisClient对象,并注册事件(回调函数readQueryFromClient)
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
    redisClient *c = (redisClient*) privdata;
    char buf;
    int nread;
    REDIS_NOTUSED(el);
    REDIS_NOTUSED(mask);

    nread = read(fd, buf, REDIS_IOBUF_LEN);
    if (nread == -1) {
      if (errno == EAGAIN) {
            nread = 0;
      } else {
            redisLog(REDIS_VERBOSE, "Reading from client: %s",strerror(errno));
            freeClient(c);
            return;
      }
    } else if (nread == 0) {
      redisLog(REDIS_VERBOSE, "Client closed connection");
      freeClient(c);
      return;
    }
    if (nread) {
      c->querybuf = sdscatlen(c->querybuf,buf,nread);
      c->lastinteraction = time(NULL);
    } else {
      return;
    }
    if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
      sds ci = getClientInfoString(c), bytes = sdsempty();

      bytes = sdscatrepr(bytes,c->querybuf,64);
      redisLog(REDIS_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
      sdsfree(ci);
      sdsfree(bytes);
      freeClient(c);
      return;
    }
    processInputBuffer(c);
}3. 客户端请求处理:
在接收到客户端请求数据后,首先对请求进行解析,解析完成后反馈请求
void processInputBuffer(redisClient *c) {
    /* Keep processing while there is something in the input buffer */
    while(sdslen(c->querybuf)) {
      /* Immediately abort if the client is in the middle of something. */
      if (c->flags & REDIS_BLOCKED || c->flags & REDIS_IO_WAIT) return;

      /* REDIS_CLOSE_AFTER_REPLY closes the connection once the reply is
         * written to the client. Make sure to not let the reply grow after
         * this flag has been set (i.e. don't process more commands). */
      if (c->flags & REDIS_CLOSE_AFTER_REPLY) return;

      /* Determine request type when unknown. */
      if (!c->reqtype) {
            if (c->querybuf == '*') {
                c->reqtype = REDIS_REQ_MULTIBULK;
            } else {
                c->reqtype = REDIS_REQ_INLINE;
            }
      }

      if (c->reqtype == REDIS_REQ_INLINE) {
            if (processInlineBuffer(c) != REDIS_OK) break;
      } else if (c->reqtype == REDIS_REQ_MULTIBULK) {
            if (processMultibulkBuffer(c) != REDIS_OK) break;
      } else {
            redisPanic("Unknown request type");
      }

      /* Multibulk processing could see a <= 0 length. */
      if (c->argc == 0) {
            resetClient(c);
      } else {
            /* Only reset the client when the command was executed. */
            if (processCommand(c) == REDIS_OK)
                resetClient(c);
      }
    }
}在请求处理完成后,反馈结果.
int processCommand(redisClient *c) {
    .......
    /* Exec the command */
    if (c->flags & REDIS_MULTI &&
      c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
      c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
    {
      queueMultiCommand(c);
      addReply(c,shared.queued);
    } else {
      if (server.vm_enabled && server.vm_max_threads > 0 &&
            blockClientOnSwappedKeys(c)) return REDIS_ERR;
      call(c);
    }
    return REDIS_OK;
}void addReply(redisClient *c, robj *obj) {
    if (_installWriteEvent(c) != REDIS_OK) return;
    redisAssert(!server.vm_enabled || obj->storage == REDIS_VM_MEMORY);

    /* This is an important place where we can avoid copy-on-write
   * when there is a saving child running, avoiding touching the
   * refcount field of the object if it's not needed.
   *
   * If the encoding is RAW and there is room in the static buffer
   * we'll be able to send the object to the client without
   * messing with its page. */
    if (obj->encoding == REDIS_ENCODING_RAW) {
      if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != REDIS_OK)
            _addReplyObjectToList(c,obj);
    } else {
      /* FIXME: convert the long into string and use _addReplyToBuffer()
         * instead of calling getDecodedObject. As this place in the
         * code is too performance critical. */
      obj = getDecodedObject(obj);
      if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != REDIS_OK)
            _addReplyObjectToList(c,obj);
      decrRefCount(obj);
    }
}int _installWriteEvent(redisClient *c) {
    if (c->fd <= 0) return REDIS_ERR;
    if (c->bufpos == 0 && listLength(c->reply) == 0 &&
      (c->replstate == REDIS_REPL_NONE ||
         c->replstate == REDIS_REPL_ONLINE) &&
      aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
      sendReplyToClient, c) == AE_ERR) return REDIS_ERR;
    return REDIS_OK;
}Redis执行完客户端请求后,会调用addReply,在addReply中调用installWriteEvent来注册一个事件,并绑定事件处理函数sendReplyToClient,用来把数据发送到client。
4.主循环
处理定时事件和注册事件
void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
      if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);
      aeProcessEvents(eventLoop, AE_ALL_EVENTS);
    }
}beforesleep通过aeSetBeforeSleepProc定义,主要是特殊处理vm和aof相关的请求
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    int processed = 0, numevents;

    /* Nothing to do? return ASAP */
    if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;

    /* Note that we want call select() even if there are no
   * file events to process as long as we want to process time
   * events, in order to sleep until the next time event is ready
   * to fire. */
    if (eventLoop->maxfd != -1 ||
      ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
      int j;
      aeTimeEvent *shortest = NULL;
      struct timeval tv, *tvp;

      if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
            shortest = aeSearchNearestTimer(eventLoop);
      if (shortest) {
            long now_sec, now_ms;

            /* Calculate the time missing for the nearest
             * timer to fire. */
            aeGetTime(&now_sec, &now_ms);
            tvp = &tv;
            tvp->tv_sec = shortest->when_sec - now_sec;
            if (shortest->when_ms < now_ms) {
                tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000;
                tvp->tv_sec --;
            } else {
                tvp->tv_usec = (shortest->when_ms - now_ms)*1000;
            }
            if (tvp->tv_sec < 0) tvp->tv_sec = 0;
            if (tvp->tv_usec < 0) tvp->tv_usec = 0;
      } else {
            /* If we have to check for events but need to return
             * ASAP because of AE_DONT_WAIT we need to se the timeout
             * to zero */
            if (flags & AE_DONT_WAIT) {
                tv.tv_sec = tv.tv_usec = 0;
                tvp = &tv;
            } else {
                /* Otherwise we can block */
                tvp = NULL; /* wait forever */
            }
      }

      numevents = aeApiPoll(eventLoop, tvp);
      for (j = 0; j < numevents; j++) {
            aeFileEvent *fe = &eventLoop->events.fd];
            int mask = eventLoop->fired.mask;
            int fd = eventLoop->fired.fd;
            int rfired = 0;

          /* note the fe->mask & mask & ... code: maybe an already processed
             * event removed an element that fired and we still didn't
             * processed, so we check if the event is still valid. */
            if (fe->mask & mask & AE_READABLE) {
                rfired = 1;
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
            }
            if (fe->mask & mask & AE_WRITABLE) {
                if (!rfired || fe->wfileProc != fe->rfileProc)
                  fe->wfileProc(eventLoop,fd,fe->clientData,mask);
            }
            processed++;
      }
    }
    /* Check time events */
    if (flags & AE_TIME_EVENTS)
      processed += processTimeEvents(eventLoop);

    return processed; /* return the number of processed file/time events */
}rfileProc和wfileProc即注册事件时定义的回调函数

作者:yfkiss 发表于2011-12-13 23:33:27 原文链接
页: [1]
查看完整版本: Redis源码分析:Event-driven programming library |yfkiss