winston 发表于 2012-1-14 10:40:00

Redis源码分析:snapshot

源码版本:redis 2.4.4

redis的snapshot通过将内存中的所有数据写入文件,以达到持久化的目的。
需要注意的是:
1)snapshot方式不是追加,而是将内存所有数据写入文件,snapshot间隔短的话,会造成磁盘IO频繁
2)在上一次做snapshot到当前,如果机器crash,期间修改过的数据会丢失
redis支持两种方式做snapshot
1)客户端发送bgsave命令(save命令也可以,其会阻塞主线程执行,不推荐)
2)根据配置的周期,定期执行

snapshot配置(redis.conf):
save AB
在A秒后,如果至少B个数据发生变化则做snapshot。eg: save 3000100即在3000s后,如果至少100个key发生变化则做snapshot

rdbcompression yes
是否启用压缩(压缩消耗cpu)

dbfilename dump.rdb
snapshot dump的文件名

dir ./
指定工作目录,snapshot文件将保存在该目录,同时,aof文件也将保存在该目录

源码
执行bgsave时执行:

void bgsaveCommand(redisClient *c) {
    if (server.bgsavechildpid != -1) { //无snapshot子进程运行
      addReplyError(c,"Background save already in progress");
    } else if (server.bgrewritechildpid != -1) { //无重写aof进程运行
      addReplyError(c,"Can't BGSAVE while AOF log rewriting is in progress");
    } else if (rdbSaveBackground(server.dbfilename) == REDIS_OK) { //执行snapshot
      addReplyStatus(c,"Background saving started");
    } else {
      addReply(c,shared.err);
    }
}
检查是否满足snapshot条件:

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData)
{
      .....
      /* If there is not a background saving in progress check if
         * we have to save now */
         for (j = 0; j < server.saveparamslen; j++) {
            struct saveparam *sp = server.saveparams+j;

            if (server.dirty >= sp->changes &&
                now-server.lastsave > sp->seconds) {
                redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
                  sp->changes, sp->seconds);
                rdbSaveBackground(server.dbfilename);
                break;
            }
         }
      ......
}


执行snapshot:

int rdbSaveBackground(char *filename) {
    pid_t childpid;
    long long start;

    if (server.bgsavechildpid != -1) return REDIS_ERR;
    if (server.vm_enabled) waitEmptyIOJobsQueue();
    server.dirty_before_bgsave = server.dirty;
    start = ustime();
    if ((childpid = fork()) == 0) {//fork子进程做snapshot
      /* Child */
      if (server.vm_enabled) vmReopenSwapFile();
      if (server.ipfd > 0) close(server.ipfd);
      if (server.sofd > 0) close(server.sofd);
      if (rdbSave(filename) == REDIS_OK) {
            _exit(0);
      } else {
            _exit(1);
      }
    } else {
      /* Parent */
      server.stat_fork_time = ustime()-start;
      if (childpid == -1) {
            redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
                strerror(errno));
            return REDIS_ERR;
      }
      redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
      server.bgsavechildpid = childpid;
      updateDictResizePolicy();
      return REDIS_OK;
    }
    return REDIS_OK; /* unreached */
}

利用copy on write调用rdbsave将当前数据状态写入文件。

int rdbSave(char *filename) {
    dictIterator *di = NULL;
    dictEntry *de;
    FILE *fp;
    char tmpfile;
    int j;
    time_t now = time(NULL);

    /* Wait for I/O therads to terminate, just in case this is a
   * foreground-saving, to avoid seeking the swap file descriptor at the
   * same time. */
    if (server.vm_enabled)
      waitEmptyIOJobsQueue();

    snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
    fp = fopen(tmpfile,"w");
    if (!fp) {
      redisLog(REDIS_WARNING, "Failed saving the DB: %s", strerror(errno));
      return REDIS_ERR;
    }
    if (fwrite("REDIS0002",9,1,fp) == 0) goto werr;
    for (j = 0; j < server.dbnum; j++) {
      redisDb *db = server.db+j;
      dict *d = db->dict;
      if (dictSize(d) == 0) continue;
      di = dictGetSafeIterator(d);
      if (!di) {
            fclose(fp);
            return REDIS_ERR;
      }

      /* Write the SELECT DB opcode */
      if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr;
      if (rdbSaveLen(fp,j) == -1) goto werr;

      /* Iterate this DB writing every entry */
      while((de = dictNext(di)) != NULL) {
            sds keystr = dictGetEntryKey(de);
            robj key, *o = dictGetEntryVal(de);
            time_t expiretime;
            
            initStaticStringObject(key,keystr);
            expiretime = getExpire(db,&key);

            /* Save the expire time */
            if (expiretime != -1) {
                /* If this key is already expired skip it */
                if (expiretime < now) continue;
                if (rdbSaveType(fp,REDIS_EXPIRETIME) == -1) goto werr;
                if (rdbSaveTime(fp,expiretime) == -1) goto werr;
            }
            /* Save the key and associated value. This requires special
             * handling if the value is swapped out. */
            if (!server.vm_enabled || o->storage == REDIS_VM_MEMORY ||
                                    o->storage == REDIS_VM_SWAPPING) {
                int otype = getObjectSaveType(o);

                /* Save type, key, value */
                if (rdbSaveType(fp,otype) == -1) goto werr;
                if (rdbSaveStringObject(fp,&key) == -1) goto werr;
                if (rdbSaveObject(fp,o) == -1) goto werr;
            } else {
                /* REDIS_VM_SWAPPED or REDIS_VM_LOADING */
                robj *po;
                /* Get a preview of the object in memory */
                po = vmPreviewObject(o);
                /* Save type, key, value */
                if (rdbSaveType(fp,getObjectSaveType(po)) == -1)
                  goto werr;
                if (rdbSaveStringObject(fp,&key) == -1) goto werr;
                if (rdbSaveObject(fp,po) == -1) goto werr;
                /* Remove the loaded object from memory */
                decrRefCount(po);
            }
      }
      dictReleaseIterator(di);
    }
    /* EOF opcode */
    if (rdbSaveType(fp,REDIS_EOF) == -1) goto werr;

    /* Make sure data will not remain on the OS's output buffers */
    fflush(fp);
    fsync(fileno(fp));
    fclose(fp);

    /* Use RENAME to make sure the DB file is changed atomically only
   * if the generate DB file is ok. */
    if (rename(tmpfile,filename) == -1) {
      redisLog(REDIS_WARNING,"Error moving temp DB file on the final destination: %s", strerror(errno));
      unlink(tmpfile);
      return REDIS_ERR;
    }
    redisLog(REDIS_NOTICE,"DB saved on disk");
    server.dirty = 0;
    server.lastsave = time(NULL);
    return REDIS_OK;

werr:
    fclose(fp);
    unlink(tmpfile);
    redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
    if (di) dictReleaseIterator(di);
    return REDIS_ERR;
}
在serverCron中检查做bgsave的子进程是否结束    /* Check if a background saving or AOF rewrite in progress terminated */
    if (server.bgsavechildpid != -1 || server.bgrewritechildpid != -1) { //是否有做snapshot或者aof rewrite的子进程存在
      int statloc;
      pid_t pid;

      if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
            if (pid == server.bgsavechildpid) {//结束了对其做后处理
                backgroundSaveDoneHandler(statloc);
            } else {
                backgroundRewriteDoneHandler(statloc);
            }
            updateDictResizePolicy();
      }
    } else {
         time_t now = time(NULL);


void backgroundSaveDoneHandler(int statloc) {
    int exitcode = WEXITSTATUS(statloc);
    int bysignal = WIFSIGNALED(statloc);
   
    //判断做snapshot是否成功
    if (!bysignal && exitcode == 0) {
      redisLog(REDIS_NOTICE,
            "Background saving terminated with success");
      server.dirty = server.dirty - server.dirty_before_bgsave;
      server.lastsave = time(NULL);
    } else if (!bysignal && exitcode != 0) {
      redisLog(REDIS_WARNING, "Background saving error");
    } else {
      redisLog(REDIS_WARNING,
            "Background saving terminated by signal %d", WTERMSIG(statloc));
      rdbRemoveTempFile(server.bgsavechildpid);
    }
    server.bgsavechildpid = -1;
    /* Possibly there are slaves waiting for a BGSAVE in order to be served
   * (the first stage of SYNC is a bulk transfer of dump.rdb) */
    updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
}




               作者:yfkiss 发表于2012-1-13 18:16:53 原文链接

页: [1]
查看完整版本: Redis源码分析:snapshot