Event-driven programming libray,提到这个,很容易想到如雷贯耳的libevent库(libeven封装了以下三种事件的响应:IO事件,定时器事件,信号事件)。
x相关源码:ae.h ae.c
networking.c anet.c net.h ae_epoll.c ae_select.c ae_kqueue.c
ae.h、ae.c :event library具体实现
networking.c : 与客户端的交互
anet.h anet.c : 网络通信
ae_epoll.c ae_select.c ae_kqueue.c : 不同系统多路IO封装
ae_epoll.c : linux平台
ae_select.c :unix平台
ae_kqueue.c : BSD、APPLE
ae.c多路IO选择:- #ifdef __linux__
- #define HAVE_EPOLL 1
- #endif
- #if (defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)) || defined(__FreeBSD__) || defined(__OpenBSD__) || defined (__NetBSD__)
- #define HAVE_KQUEUE 1
- #endif
- #ifdef HAVE_EPOLL
- #include "ae_epoll.c"
- #else
- #ifdef HAVE_KQUEUE
- #include "ae_kqueue.c"
- #else
- #include "ae_select.c"
- #endif
- #endif
多路IO封装(以ae_epoll.c为例):
1. 初始化server, 等待客户端连接,并注册事件,回调函数acceptTcpHandler/acceptUnixHandler- if (server.ipfd > 0 && aeCreateFileEvent(server.el,server.ipfd,AE_READABLE,
- acceptTcpHandler,NULL) == AE_ERR) oom("creating file event");
- if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
- acceptUnixHandler,NULL) == AE_ERR) oom("creating file event");
复制代码 2. 回调函数acceptTcpHandler/acceptUnixHandler
在监听到新连接请求时,接收连接,创建redisClient对象,并注册事件(回调函数readQueryFromClient)- void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
- redisClient *c = (redisClient*) privdata;
- char buf[REDIS_IOBUF_LEN];
- int nread;
- nread = read(fd, buf, REDIS_IOBUF_LEN);
- if (nread == -1) {
- if (errno == EAGAIN) {
- nread = 0;
- } else {
- redisLog(REDIS_VERBOSE, "Reading from client: %s",strerror(errno));
- freeClient(c);
- return;
- }
- } else if (nread == 0) {
- redisLog(REDIS_VERBOSE, "Client closed connection");
- freeClient(c);
- return;
- }
- if (nread) {
- c->querybuf = sdscatlen(c->querybuf,buf,nread);
- c->lastinteraction = time(NULL);
- } else {
- return;
- }
- if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
- sds ci = getClientInfoString(c), bytes = sdsempty();
- bytes = sdscatrepr(bytes,c->querybuf,64);
- redisLog(REDIS_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
- sdsfree(ci);
- sdsfree(bytes);
- freeClient(c);
- return;
- }
- processInputBuffer(c);
- }
复制代码 3. 客户端请求处理:
在接收到客户端请求数据后,首先对请求进行解析,解析完成后反馈请求- void processInputBuffer(redisClient *c) {
- /* Keep processing while there is something in the input buffer */
- while(sdslen(c->querybuf)) {
- /* Immediately abort if the client is in the middle of something. */
- if (c->flags & REDIS_BLOCKED || c->flags & REDIS_IO_WAIT) return;
- /* REDIS_CLOSE_AFTER_REPLY closes the connection once the reply is
- * written to the client. Make sure to not let the reply grow after
- * this flag has been set (i.e. don't process more commands). */
- if (c->flags & REDIS_CLOSE_AFTER_REPLY) return;
- /* Determine request type when unknown. */
- if (!c->reqtype) {
- if (c->querybuf[0] == '*') {
- c->reqtype = REDIS_REQ_MULTIBULK;
- } else {
- c->reqtype = REDIS_REQ_INLINE;
- }
- }
- if (c->reqtype == REDIS_REQ_INLINE) {
- if (processInlineBuffer(c) != REDIS_OK) break;
- } else if (c->reqtype == REDIS_REQ_MULTIBULK) {
- if (processMultibulkBuffer(c) != REDIS_OK) break;
- } else {
- redisPanic("Unknown request type");
- }
- /* Multibulk processing could see a <= 0 length. */
- if (c->argc == 0) {
- resetClient(c);
- } else {
- /* Only reset the client when the command was executed. */
- if (processCommand(c) == REDIS_OK)
- resetClient(c);
- }
- }
- }
在请求处理完成后,反馈结果.
- .......
- /* Exec the command */
- if (c->flags & REDIS_MULTI &&
- c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
- c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
- {
- queueMultiCommand(c);
- addReply(c,shared.queued);
- } else {
- if (server.vm_enabled && server.vm_max_threads > 0 &&
- blockClientOnSwappedKeys(c)) return REDIS_ERR;
- call(c);
- }
- return REDIS_OK;
- }void addReply(redisClient *c, robj *obj) {
- if (_installWriteEvent(c) != REDIS_OK) return;
- redisAssert(!server.vm_enabled || obj->storage == REDIS_VM_MEMORY);
- /* This is an important place where we can avoid copy-on-write
- * when there is a saving child running, avoiding touching the
- * refcount field of the object if it's not needed.
- *
- * If the encoding is RAW and there is room in the static buffer
- * we'll be able to send the object to the client without
- * messing with its page. */
- if (obj->encoding == REDIS_ENCODING_RAW) {
- if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != REDIS_OK)
- _addReplyObjectToList(c,obj);
- } else {
- /* FIXME: convert the long into string and use _addReplyToBuffer()
- * instead of calling getDecodedObject. As this place in the
- * code is too performance critical. */
- obj = getDecodedObject(obj);
- if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != REDIS_OK)
- _addReplyObjectToList(c,obj);
- decrRefCount(obj);
- }
- }int _installWriteEvent(redisClient *c) {
- if (c->fd <= 0) return REDIS_ERR;
- if (c->bufpos == 0 && listLength(c->reply) == 0 &&
- (c->replstate == REDIS_REPL_NONE ||
- c->replstate == REDIS_REPL_ONLINE) &&
- aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
- sendReplyToClient, c) == AE_ERR) return REDIS_ERR;
- return REDIS_OK;
- }
Redis执行完客户端请求后,会调用addReply,在addReply中调用installWriteEvent来注册一个事件,并绑定事件处理函数sendReplyToClient,用来把数据发送到client。
处理定时事件和注册事件
处理定时事件和注册事件- void aeMain(aeEventLoop *eventLoop) {
- eventLoop->stop = 0;
- while (!eventLoop->stop) {
- if (eventLoop->beforesleep != NULL)
- eventLoop->beforesleep(eventLoop);
- aeProcessEvents(eventLoop, AE_ALL_EVENTS);
- }
- }
复制代码 beforesleep通过aeSetBeforeSleepProc定义,主要是特殊处理vm和aof相关的请求- int aeProcessEvents(aeEventLoop *eventLoop, int flags)
- {
- int processed = 0, numevents;
- /* Nothing to do? return ASAP */
- if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
- /* Note that we want call select() even if there are no
- * file events to process as long as we want to process time
- * events, in order to sleep until the next time event is ready
- * to fire. */
- if (eventLoop->maxfd != -1 ||
- ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
- int j;
- aeTimeEvent *shortest = NULL;
- struct timeval tv, *tvp;
- if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
- shortest = aeSearchNearestTimer(eventLoop);
- if (shortest) {
- long now_sec, now_ms;
- /* Calculate the time missing for the nearest
- * timer to fire. */
- aeGetTime(&now_sec, &now_ms);
- tvp = &tv;
- tvp->tv_sec = shortest->when_sec - now_sec;
- if (shortest->when_ms < now_ms) {
- tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000;
- tvp->tv_sec --;
- } else {
- tvp->tv_usec = (shortest->when_ms - now_ms)*1000;
- }
- if (tvp->tv_sec < 0) tvp->tv_sec = 0;
- if (tvp->tv_usec < 0) tvp->tv_usec = 0;
- } else {
- /* If we have to check for events but need to return
- * ASAP because of AE_DONT_WAIT we need to se the timeout
- * to zero */
- if (flags & AE_DONT_WAIT) {
- tv.tv_sec = tv.tv_usec = 0;
- tvp = &tv;
- } else {
- /* Otherwise we can block */
- tvp = NULL; /* wait forever */
- }
- }
- numevents = aeApiPoll(eventLoop, tvp);
- for (j = 0; j < numevents; j++) {
- aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
- int mask = eventLoop->fired[j].mask;
- int fd = eventLoop->fired[j].fd;
- int rfired = 0;
- /* note the fe->mask & mask & ... code: maybe an already processed
- * event removed an element that fired and we still didn't
- * processed, so we check if the event is still valid. */
- if (fe->mask & mask & AE_READABLE) {
- rfired = 1;
- fe->rfileProc(eventLoop,fd,fe->clientData,mask);
- }
- if (fe->mask & mask & AE_WRITABLE) {
- if (!rfired || fe->wfileProc != fe->rfileProc)
- fe->wfileProc(eventLoop,fd,fe->clientData,mask);
- }
- processed++;
- }
- }
- /* Check time events */
- if (flags & AE_TIME_EVENTS)
- processed += processTimeEvents(eventLoop);
- return processed; /* return the number of processed file/time events */
- }
rfileProc和wfileProc即注册事件时定义的回调函数
