找回密码
 用户注册

QQ登录

只需一步,快速开始

查看: 4578|回复: 0

Redis源码分析:snapshot

[复制链接]
发表于 2012-1-14 10:40:00 | 显示全部楼层 |阅读模式
源码版本:redis 2.4.4

redis的snapshot通过将内存中的所有数据写入文件,以达到持久化的目的。
需要注意的是:
1)snapshot方式不是追加,而是将内存所有数据写入文件,snapshot间隔短的话,会造成磁盘IO频繁
2)在上一次做snapshot到当前,如果机器crash,期间修改过的数据会丢失
redis支持两种方式做snapshot
1)客户端发送bgsave命令(save命令也可以,其会阻塞主线程执行,不推荐)
2)根据配置的周期,定期执行

snapshot配置(redis.conf):
save A  B
在A秒后,如果至少B个数据发生变化则做snapshot。eg: save 3000100即在3000s后,如果至少100个key发生变化则做snapshot

rdbcompression yes
是否启用压缩(压缩消耗cpu)

dbfilename dump.rdb
snapshot dump的文件名

dir ./
指定工作目录,snapshot文件将保存在该目录,同时,aof文件也将保存在该目录

源码
执行bgsave时执行:
  1. void bgsaveCommand(redisClient *c) {
  2.     if (server.bgsavechildpid != -1) { //无snapshot子进程运行
  3.         addReplyError(c,"Background save already in progress");
  4.     } else if (server.bgrewritechildpid != -1) { //无重写aof进程运行
  5.         addReplyError(c,"Can't BGSAVE while AOF log rewriting is in progress");
  6.     } else if (rdbSaveBackground(server.dbfilename) == REDIS_OK) { //执行snapshot
  7.         addReplyStatus(c,"Background saving started");
  8.     } else {
  9.         addReply(c,shared.err);
  10.     }
  11. }
复制代码
检查是否满足snapshot条件:
  1. int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData)
  2. {
  3.         .....
  4.         /* If there is not a background saving in progress check if
  5.          * we have to save now */
  6.          for (j = 0; j < server.saveparamslen; j++) {
  7.             struct saveparam *sp = server.saveparams+j;
  8.             if (server.dirty >= sp->changes &&
  9.                 now-server.lastsave > sp->seconds) {
  10.                 redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
  11.                     sp->changes, sp->seconds);
  12.                 rdbSaveBackground(server.dbfilename);
  13.                 break;
  14.             }
  15.          }
  16.         ......
  17. }
复制代码

执行snapshot:
  1. int rdbSaveBackground(char *filename) {
  2.     pid_t childpid;
  3.     long long start;
  4.     if (server.bgsavechildpid != -1) return REDIS_ERR;
  5.     if (server.vm_enabled) waitEmptyIOJobsQueue();
  6.     server.dirty_before_bgsave = server.dirty;
  7.     start = ustime();
  8.     if ((childpid = fork()) == 0) {  //fork子进程做snapshot
  9.         /* Child */
  10.         if (server.vm_enabled) vmReopenSwapFile();
  11.         if (server.ipfd > 0) close(server.ipfd);
  12.         if (server.sofd > 0) close(server.sofd);
  13.         if (rdbSave(filename) == REDIS_OK) {
  14.             _exit(0);
  15.         } else {
  16.             _exit(1);
  17.         }
  18.     } else {
  19.         /* Parent */
  20.         server.stat_fork_time = ustime()-start;
  21.         if (childpid == -1) {
  22.             redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
  23.                 strerror(errno));
  24.             return REDIS_ERR;
  25.         }
  26.         redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
  27.         server.bgsavechildpid = childpid;
  28.         updateDictResizePolicy();
  29.         return REDIS_OK;
  30.     }
  31.     return REDIS_OK; /* unreached */
  32. }
复制代码

利用copy on write调用rdbsave将当前数据状态写入文件。
  1. int rdbSave(char *filename) {
  2.     dictIterator *di = NULL;
  3.     dictEntry *de;
  4.     FILE *fp;
  5.     char tmpfile[256];
  6.     int j;
  7.     time_t now = time(NULL);
  8.     /* Wait for I/O therads to terminate, just in case this is a
  9.      * foreground-saving, to avoid seeking the swap file descriptor at the
  10.      * same time. */
  11.     if (server.vm_enabled)
  12.         waitEmptyIOJobsQueue();
  13.     snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
  14.     fp = fopen(tmpfile,"w");
  15.     if (!fp) {
  16.         redisLog(REDIS_WARNING, "Failed saving the DB: %s", strerror(errno));
  17.         return REDIS_ERR;
  18.     }
  19.     if (fwrite("REDIS0002",9,1,fp) == 0) goto werr;
  20.     for (j = 0; j < server.dbnum; j++) {
  21.         redisDb *db = server.db+j;
  22.         dict *d = db->dict;
  23.         if (dictSize(d) == 0) continue;
  24.         di = dictGetSafeIterator(d);
  25.         if (!di) {
  26.             fclose(fp);
  27.             return REDIS_ERR;
  28.         }
  29.         /* Write the SELECT DB opcode */
  30.         if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr;
  31.         if (rdbSaveLen(fp,j) == -1) goto werr;
  32.         /* Iterate this DB writing every entry */
  33.         while((de = dictNext(di)) != NULL) {
  34.             sds keystr = dictGetEntryKey(de);
  35.             robj key, *o = dictGetEntryVal(de);
  36.             time_t expiretime;
  37.             
  38.             initStaticStringObject(key,keystr);
  39.             expiretime = getExpire(db,&key);
  40.             /* Save the expire time */
  41.             if (expiretime != -1) {
  42.                 /* If this key is already expired skip it */
  43.                 if (expiretime < now) continue;
  44.                 if (rdbSaveType(fp,REDIS_EXPIRETIME) == -1) goto werr;
  45.                 if (rdbSaveTime(fp,expiretime) == -1) goto werr;
  46.             }
  47.             /* Save the key and associated value. This requires special
  48.              * handling if the value is swapped out. */
  49.             if (!server.vm_enabled || o->storage == REDIS_VM_MEMORY ||
  50.                                       o->storage == REDIS_VM_SWAPPING) {
  51.                 int otype = getObjectSaveType(o);
  52.                 /* Save type, key, value */
  53.                 if (rdbSaveType(fp,otype) == -1) goto werr;
  54.                 if (rdbSaveStringObject(fp,&key) == -1) goto werr;
  55.                 if (rdbSaveObject(fp,o) == -1) goto werr;
  56.             } else {
  57.                 /* REDIS_VM_SWAPPED or REDIS_VM_LOADING */
  58.                 robj *po;
  59.                 /* Get a preview of the object in memory */
  60.                 po = vmPreviewObject(o);
  61.                 /* Save type, key, value */
  62.                 if (rdbSaveType(fp,getObjectSaveType(po)) == -1)
  63.                     goto werr;
  64.                 if (rdbSaveStringObject(fp,&key) == -1) goto werr;
  65.                 if (rdbSaveObject(fp,po) == -1) goto werr;
  66.                 /* Remove the loaded object from memory */
  67.                 decrRefCount(po);
  68.             }
  69.         }
  70.         dictReleaseIterator(di);
  71.     }
  72.     /* EOF opcode */
  73.     if (rdbSaveType(fp,REDIS_EOF) == -1) goto werr;
  74.     /* Make sure data will not remain on the OS's output buffers */
  75.     fflush(fp);
  76.     fsync(fileno(fp));
  77.     fclose(fp);
  78.     /* Use RENAME to make sure the DB file is changed atomically only
  79.      * if the generate DB file is ok. */
  80.     if (rename(tmpfile,filename) == -1) {
  81.         redisLog(REDIS_WARNING,"Error moving temp DB file on the final destination: %s", strerror(errno));
  82.         unlink(tmpfile);
  83.         return REDIS_ERR;
  84.     }
  85.     redisLog(REDIS_NOTICE,"DB saved on disk");
  86.     server.dirty = 0;
  87.     server.lastsave = time(NULL);
  88.     return REDIS_OK;
  89. werr:
  90.     fclose(fp);
  91.     unlink(tmpfile);
  92.     redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
  93.     if (di) dictReleaseIterator(di);
  94.     return REDIS_ERR;
  95. }
复制代码
在serverCron  中检查做bgsave的子进程是否结束   
  1. /* Check if a background saving or AOF rewrite in progress terminated */
  2.     if (server.bgsavechildpid != -1 || server.bgrewritechildpid != -1) { //是否有做snapshot或者aof rewrite的子进程存在
  3.         int statloc;
  4.         pid_t pid;
  5.         if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
  6.             if (pid == server.bgsavechildpid) {  //结束了对其做后处理
  7.                 backgroundSaveDoneHandler(statloc);
  8.             } else {
  9.                 backgroundRewriteDoneHandler(statloc);
  10.             }
  11.             updateDictResizePolicy();
  12.         }
  13.     } else {
  14.          time_t now = time(NULL);
  15. void backgroundSaveDoneHandler(int statloc) {
  16.     int exitcode = WEXITSTATUS(statloc);
  17.     int bysignal = WIFSIGNALED(statloc);
  18.    
  19.     //判断做snapshot是否成功
  20.     if (!bysignal && exitcode == 0) {
  21.         redisLog(REDIS_NOTICE,
  22.             "Background saving terminated with success");
  23.         server.dirty = server.dirty - server.dirty_before_bgsave;
  24.         server.lastsave = time(NULL);
  25.     } else if (!bysignal && exitcode != 0) {
  26.         redisLog(REDIS_WARNING, "Background saving error");
  27.     } else {
  28.         redisLog(REDIS_WARNING,
  29.             "Background saving terminated by signal %d", WTERMSIG(statloc));
  30.         rdbRemoveTempFile(server.bgsavechildpid);
  31.     }
  32.     server.bgsavechildpid = -1;
  33.     /* Possibly there are slaves waiting for a BGSAVE in order to be served
  34.      * (the first stage of SYNC is a bulk transfer of dump.rdb) */
  35.     updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
  36. }
复制代码
               作者:yfkiss 发表于2012-1-13 18:16:53 原文链接

您需要登录后才可以回帖 登录 | 用户注册

本版积分规则

Archiver|手机版|小黑屋|ACE Developer ( 京ICP备06055248号 )

GMT+8, 2024-4-26 18:33 , Processed in 0.021279 second(s), 6 queries , Redis On.

Powered by Discuz! X3.5

© 2001-2023 Discuz! Team.

快速回复 返回顶部 返回列表