redis rdb 的常用接口了
这是比较大的一块。顾名思义,就是rdb的常用接口了。
接口如下:
// rdb没有little endian和big endian的区别(除了最后的checksum),所以rdb文件在不同系统中可能不能移植
// type只占一个字节
int rdbSaveType(rio* rdb, unsigned char type);
int rdbLoadType(rio* rdb);
// 这个只有声明,没有定义,不用去管
int rdbSaveTime(rio* rdb, time_t t);
time_t rdbLoadTime(rio* rdb);
int rdbSaveLen(rio* rdb, uint32_t len);
uint32_t rdbLoadLen(rio* rdb, int* isencoded);
int rdbSaveObjectType(rio* rdb, robj* o);
int rdbLoadObjectType(rio* rdb);
int rdbLoad(char* filename);
int rdbSaveBackground(char* filename);
void rdbRemoveTempFile(pid_t childpid);
int rdbSave(char* filename);
int rdbSaveObject(rio* rdb, robj* o);
off_t rdbSavedObjectLen(robj* o);
off_t rdbSavedObjectPages(robj* o);
robj* rdbLoadObject(int type, rio* rdb);
void backgroundSaveDoneHandler(int exitcode, int bysignal);
int rdbSaveKeyValuePair(rio* rdb, robj* key, robj* val, long long expiretime, long long now);
robj* rdbLoadStringObject(rio* rdb);
redis做了很多减少内存使用的工作,其中最常使用的就是对长度编码,长度域最高两比特(MSB)00表示长度值为后面的6比特表示,01表示14bit,10表示32bit,11表示后面的6bit为而不是长度。
目前看到rdbLoad,基本还是比较清晰的,除了一部分是在redis.h中的不太清楚之外。
所谓robj是这个样子的:
typedef struct redisObject {
unsigned type:4;
unsigned notused:2;
unsigned encoding:4;
unsigned lru:22; // server.lrulock
int refcount;
void* ptr;
} robj;
一个type可以有多种encoding,如string可以用整数编码(如string代表的是整数)或raw。refcount是引用计数,共享对象(0-10000的整数字符串引用计数可能大于1),变为0时这个对象就要回收了。直接贴rdbLoad代码,几乎没什么特别的:
int rdbLoad(char* filename) {
uint32_t dbid;
int type, rdbver;
redisDb* db = server.db + 0;
char buf[1024];
long long expiretime, now = mstime();
long loops = 0;
FILE* fp;
rio rdb;
if ((fp = fopen(filename, "r")) == NULL) return REDIS_ERR;
rioInitWithFile(&rdb, fp);
// 如果打开了checksum选项,rio就需要记录crc64.
if (server.rdb_checksum) {
rdb.update_cksum = rioGenericChecksum;
}
if (rioRead(&rdb, buf, 9) == 0) goto eoferr;
buf[9] = '\0';
if (memcmp(buf, "REDIS", 5) != 0) {
fclose(fp);
redisLog(REDIS_WARNING, "Wrong signature trying to load DB from file");
errno = EINVAL;
return REDIS_ERR;
}
rdbver = atoi(buf + 5);
if (rdbver < 1 || rdbver > REDIS_RDB_VERSION) {
fclose(fp);
redisLog(REDIS_WARNING, "Can't handle RDB format version %d", rdbver);
errno = EINVAL;
return REDIS_ERR;
}
startLoading(fp);
while (1) {
robj* key, *val;
expiretime = -1;
// serve the clients from time to time.
if (!(loops++ % 1000)) {
loadingProgress(rioTell(&rdb));
aeProcessEvents(server.el, AE_FILE_EVENTS | AE_DONT_WAIT);
}
// read type.
if ((type = rdbLoadType(&rdb)) == -1) goto eoferr;
if (type == REDIS_RDB_OPCODE_EXIPRETIME) {
if ((expiretime = rdbLoadTime(&rdb)) == -1) goto eoferr; // 32位
// we read the time so we need to read the object type again.
if ((type = rdbLoadType(&rdb)) == -1) goto eoferr;
expiretime *= 1000;
} else if (type == REDIS_RDB_OPCODE_EXPIRETIME_MS) {
if ((exiretime = rdbLoadMillisecondTime(&rdb)) == -1) goto eoferr; // 64位
if ((type = rdbLoadType(&rdb)) == -1) goto eoferr;
}
if (type == REDIS_RDB_OPCODE_EOF) {
break;
}
if (type == REDIS_RDB_OPCODE_SELECTDB) {
if ((dbid = rdbLoadLen(&rdb, NULL)) == REDIS_RDB_LENERR) {
goto eoferr;
}
if (dbid >= (unsigned)server.dbnum) {
redisLog(REDIS_WARNING, "FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum);
exit(1);
}
db = server.db + dbid;
continue;
}
// Read key
// 虽说是加载字符串,但也可能以整数编码
if ((key = rdbLoadStringObject(&rdb)) == NULL) goto eoferr;
if ((val = rdbLoadObject(type, &rdb)) == NULL) goto eoferr;
// Check if the key already exipired. This function is used when loading an RDB file from disk, either at startup, or when an RDB was received from the master. In the latter case, the master is responsible for key expiry. If we could expire keys here, the snapshot taken by the master may not be reflected on the slave.
if (server.masterhost == NULL && expiretime != -1 && expirtetime < now) {
decrRefCount(key);
decrRefCount(val);
continue;
}
dbAdd(db, key, val);
if (expiretime != -1) setExpire(db, key, expiretime);
decrRefCount(key);
}
// Verify the checksum if RDB version is >= 5
if (rdbver >= 5 && server.rdb_checksum) {
uint64_t cksum, expected = rdb.cksum;
if (rioRead(&rdb, &cksum, 8) == 0) goto eoferr;
memrevifbe(&cksum);
if (cksum == 0) {
redisLog(REDIS_WARNING, "RDB file was saved with checksum disabled: no check performed.");
} else if (cksum != expected) {
redisLog(REDIS_WARNING, "Wrong RDB checkusm. Aborting now.");
exit(1);
}
}
fclose(fp);
stopLoading();
return REDIS_OK;
eoferr:
redisLog(REDIS_WARNING, "Short read or OOM loading DB. Unrecoverable error, aborting now.");
exit(1);
return REDIS_ERR;
}
其中大头是rdbLoadObject,对各种对象进行加载,可以看到加载时一般优先使用ziplist,intset这样省内存的结构,只有元素数量足够大或者其中一个元素较大时才使用常规的list,dict结构。set也是dict结构,只不过其中的value指向NULL。还有一些结构如 zipmap等好像也是list,dict等的衍生(zipmap好像是以ziplist形式保存key,value),也有的结构涉及到跳跃表,要等看 redis.h和redis.c时可以完全弄清楚。
可以看到,这里面字符串有两种加载方式:
// rdb没有little endian和big endian的区别(除了最后的checksum),所以rdb文件在不同系统中可能不能移植 // type只占一个字节 int rdbSaveType(rio* rdb, unsigned char type); int rdbLoadType(rio* rdb); // 这个只有声明,没有定义,不用去管 int rdbSaveTime(rio* rdb, time_t t); time_t rdbLoadTime(rio* rdb); int rdbSaveLen(rio* rdb, uint32_t len); uint32_t rdbLoadLen(rio* rdb, int* isencoded); int rdbSaveObjectType(rio* rdb, robj* o); int rdbLoadObjectType(rio* rdb); int rdbLoad(char* filename); int rdbSaveBackground(char* filename); void rdbRemoveTempFile(pid_t childpid); int rdbSave(char* filename); int rdbSaveObject(rio* rdb, robj* o); off_t rdbSavedObjectLen(robj* o); off_t rdbSavedObjectPages(robj* o); robj* rdbLoadObject(int type, rio* rdb); void backgroundSaveDoneHandler(int exitcode, int bysignal); int rdbSaveKeyValuePair(rio* rdb, robj* key, robj* val, long long expiretime, long long now); robj* rdbLoadStringObject(rio* rdb); redis做了很多减少内存使用的工作,其中最常使用的就是对长度编码,长度域最高两比特(MSB)00表示长度值为后面的6比特表示,01表示14bit,10表示32bit,11表示后面的6bit为而不是长度。 目前看到rdbLoad,基本还是比较清晰的,除了一部分是在redis.h中的不太清楚之外。 所谓robj是这个样子的: typedef struct redisObject { unsigned type:4; unsigned notused:2; unsigned encoding:4; unsigned lru:22; // server.lrulock int refcount; void* ptr; } robj; 一个type可以有多种encoding,如string可以用整数编码(如string代表的是整数)或raw。refcount是引用计数,共 享对象(0-10000的整数字符串引用计数可能大于1),变为0时这个对象就要回收了。直接贴rdbLoad代码,几乎没什么特别的: int rdbLoad(char* filename) { uint32_t dbid; int type, rdbver; redisDb* db = server.db + 0; char buf[1024]; long long expiretime, now = mstime(); long loops = 0; FILE* fp; rio rdb; if ((fp = fopen(filename, "r")) == NULL) return REDIS_ERR; rioInitWithFile(&rdb, fp); // 如果打开了checksum选项,rio就需要记录crc64. if (server.rdb_checksum) { rdb.update_cksum = rioGenericChecksum; } if (rioRead(&rdb, buf, 9) == 0) goto eoferr; buf[9] = '\0'; if (memcmp(buf, "REDIS", 5) != 0) { fclose(fp); redisLog(REDIS_WARNING, "Wrong signature trying to load DB from file"); errno = EINVAL; return REDIS_ERR; } rdbver = atoi(buf + 5); if (rdbver < 1 || rdbver > REDIS_RDB_VERSION) { fclose(fp); redisLog(REDIS_WARNING, "Can't handle RDB format version %d", rdbver); errno = EINVAL; return REDIS_ERR; } startLoading(fp); while (1) { robj* key, *val; expiretime = -1; // serve the clients from time to time. if (!(loops++ % 1000)) { loadingProgress(rioTell(&rdb)); aeProcessEvents(server.el, AE_FILE_EVENTS | AE_DONT_WAIT); } // read type. if ((type = rdbLoadType(&rdb)) == -1) goto eoferr; if (type == REDIS_RDB_OPCODE_EXIPRETIME) { if ((expiretime = rdbLoadTime(&rdb)) == -1) goto eoferr; // 32位 // we read the time so we need to read the object type again. if ((type = rdbLoadType(&rdb)) == -1) goto eoferr; expiretime *= 1000; } else if (type == REDIS_RDB_OPCODE_EXPIRETIME_MS) { if ((exiretime = rdbLoadMillisecondTime(&rdb)) == -1) goto eoferr; // 64位 if ((type = rdbLoadType(&rdb)) == -1) goto eoferr; } if (type == REDIS_RDB_OPCODE_EOF) { break; } if (type == REDIS_RDB_OPCODE_SELECTDB) { if ((dbid = rdbLoadLen(&rdb, NULL)) == REDIS_RDB_LENERR) { goto eoferr; } if (dbid >= (unsigned)server.dbnum) { redisLog(REDIS_WARNING, "FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum); exit(1); } db = server.db + dbid; continue; } // Read key // 虽说是加载字符串,但也可能以整数编码 if ((key = rdbLoadStringObject(&rdb)) == NULL) goto eoferr; if ((val = rdbLoadObject(type, &rdb)) == NULL) goto eoferr; // Check if the key already exipired. This function is used when loading an RDB file from disk, either at startup, or when an RDB was received from the master. In the latter case, the master is responsible for key expiry. If we could expire keys here, the snapshot taken by the master may not be reflected on the slave. if (server.masterhost == NULL && expiretime != -1 && expirtetime < now) { decrRefCount(key); decrRefCount(val); continue; } dbAdd(db, key, val); if (expiretime != -1) setExpire(db, key, expiretime); decrRefCount(key); } // Verify the checksum if RDB version is >= 5 if (rdbver >= 5 && server.rdb_checksum) { uint64_t cksum, expected = rdb.cksum; if (rioRead(&rdb, &cksum, 8) == 0) goto eoferr; memrevifbe(&cksum); if (cksum == 0) { redisLog(REDIS_WARNING, "RDB file was saved with checksum disabled: no check performed."); } else if (cksum != expected) { redisLog(REDIS_WARNING, "Wrong RDB checkusm. Aborting now."); exit(1); } } fclose(fp); stopLoading(); return REDIS_OK; eoferr: redisLog(REDIS_WARNING, "Short read or OOM loading DB. Unrecoverable error, aborting now."); exit(1); return REDIS_ERR; }
rdbLoadEncodedStringObject:如果字符串是数字,就使用数字编码,并允许使用共享integer编码对象。
rdbLoadStringObject:即使字符串是整数,也仍然要使用string编码。对于db中的key是这样的(其实如果没有maxmemory和lru的设定,也大可使用rdbLoadEncodedStringObject,这应该是作者没优化的地方)。
介于中间的是,rdbLoadStringObject之后调用tryObjectEncoding,这个方法会尽量使用integer编码,还检查是否有maxmemory和lru设定,没有的话可以使用共享integer编码对象。
rdbSave保存rdb文件,可以看出它是先写到一个临时文件,写成功之后再rename到指定的文件名。
rdbBgsave非阻塞第做保存工作,它是fork出一个子进程做这个工作(按照一般的思路,可能就要用线程之类的),这种做法简洁,避免了多线程内存共享的很多问题。
void backgroundSaveDoneHandler(int exitcode, int bysignal)是bgsave完成之后调用的函数,它会调用replication.c中的updateSlavesWaitingBgsave,需要到时留意其功能。
代码看到现在,发现现在几个文件是纠结在一起的,似乎共用一个redis.h。