Non blocking replication (finally!). C-side linked lists API improved.

This commit is contained in:
antirez 2009-04-20 23:51:51 +02:00
parent 40d224a9e3
commit 6208b3a776
5 changed files with 173 additions and 108 deletions

6
TODO
View File

@ -1,12 +1,15 @@
BEFORE REDIS 1.0.0-rc1
- What happens if the saving child gets killed instead to end normally? Handle this.
- Fix INCRBY argument that is limited to 32bit int.
- Make sinterstore / unionstore / sdiffstore returning the cardinality of the resulting set.
- Add a new field as INFO output: bgsaveinprogress
- Remove max number of args limit
- GETSET
- network layer stresser in test in demo
- network layer stresser in test in demo, make sure to set/get random streams of data and check that what we read back is byte-by-byte the same.
- maxclients directive
- check 'server.dirty' everywere
- config parameter to change the name of the DB file
- replication automated tests
- replication non stopping master<->slave syncronization
- an external tool able to perform the 'difference' between two Redis servers. It's like 'diff', but against Redis servers, and the output is the set of commands needed to turn the first server into the second, suitable to be sent via netcat.
@ -21,6 +24,7 @@ This command should be smart and don't use too much memory, that is, take two co
- Add missing commands in documentation
- Document replication
- Objects sharing configuration, add the directive "objectsharingpool <size>"
- Make sure to confert all the fstat() calls to 64bit versions.
FUTURE HINTS

View File

@ -143,7 +143,7 @@ void listDelNode(list *list, listNode *node)
}
/* Returns a list iterator 'iter'. After the initialization every
* call to listNextElement() will return the next element of the list.
* call to listNext() will return the next element of the list.
*
* This function can't fail. */
listIter *listGetIterator(list *list, int direction)
@ -164,6 +164,17 @@ void listReleaseIterator(listIter *iter) {
zfree(iter);
}
/* Create an iterator in the list private iterator structure */
void listRewind(list *list) {
list->iter.next = list->head;
list->iter.direction = AL_START_HEAD;
}
void listRewindTail(list *list) {
list->iter.next = list->tail;
list->iter.direction = AL_START_TAIL;
}
/* Return the next element of an iterator.
* It's valid to remove the currently returned element using
* listDelNode(), but not to remove other elements.
@ -178,7 +189,7 @@ void listReleaseIterator(listIter *iter) {
* }
*
* */
listNode *listNextElement(listIter *iter)
listNode *listNext(listIter *iter)
{
listNode *current = iter->next;
@ -191,6 +202,11 @@ listNode *listNextElement(listIter *iter)
return current;
}
/* List Yield just call listNext() against the list private iterator */
listNode *listYield(list *list) {
return listNext(&list->iter);
}
/* Duplicate the whole list. On out of memory NULL is returned.
* On success a copy of the original list is returned.
*
@ -211,7 +227,7 @@ list *listDup(list *orig)
copy->free = orig->free;
copy->match = orig->match;
iter = listGetIterator(orig, AL_START_HEAD);
while((node = listNextElement(iter)) != NULL) {
while((node = listNext(iter)) != NULL) {
void *value;
if (copy->dup) {
@ -248,7 +264,7 @@ listNode *listSearchKey(list *list, void *key)
listNode *node;
iter = listGetIterator(list, AL_START_HEAD);
while((node = listNextElement(iter)) != NULL) {
while((node = listNext(iter)) != NULL) {
if (list->match) {
if (list->match(node->value, key)) {
listReleaseIterator(iter);

View File

@ -39,6 +39,11 @@ typedef struct listNode {
void *value;
} listNode;
typedef struct listIter {
listNode *next;
int direction;
} listIter;
typedef struct list {
listNode *head;
listNode *tail;
@ -46,14 +51,9 @@ typedef struct list {
void (*free)(void *ptr);
int (*match)(void *ptr, void *key);
unsigned int len;
listIter iter;
} list;
typedef struct listIter {
listNode *next;
listNode *prev;
int direction;
} listIter;
/* Functions implemented as macros */
#define listLength(l) ((l)->len)
#define listFirst(l) ((l)->head)
@ -77,11 +77,14 @@ list *listAddNodeHead(list *list, void *value);
list *listAddNodeTail(list *list, void *value);
void listDelNode(list *list, listNode *node);
listIter *listGetIterator(list *list, int direction);
listNode *listNextElement(listIter *iter);
listNode *listNext(listIter *iter);
void listReleaseIterator(listIter *iter);
list *listDup(list *orig);
listNode *listSearchKey(list *list, void *key);
listNode *listIndex(list *list, int index);
void listRewind(list *list);
void listRewindTail(list *list);
listNode *listYield(list *list);
/* Directions for iterators */
#define AL_START_HEAD 0

229
redis.c
View File

@ -65,7 +65,7 @@
/* Static server configuration */
#define REDIS_SERVERPORT 6379 /* TCP port */
#define REDIS_MAXIDLETIME (60*5) /* default client timeout */
#define REDIS_QUERYBUF_LEN 1024
#define REDIS_IOBUF_LEN 1024
#define REDIS_LOADBUF_LEN 1024
#define REDIS_MAX_ARGS 16
#define REDIS_DEFAULT_DBNUM 16
@ -194,7 +194,7 @@ typedef struct redisClient {
int authenticated; /* when requirepass is non-NULL */
int replstate; /* replication state if this is a slave */
int repldbfd; /* replication DB file descriptor */
int repldboff; /* replication DB file offset */
long repldboff; /* replication DB file offset */
off_t repldbsize; /* replication DB file size */
} redisClient;
@ -304,6 +304,7 @@ static int deleteIfVolatile(redisDb *db, robj *key);
static int deleteKey(redisDb *db, robj *key);
static time_t getExpire(redisDb *db, robj *key);
static int setExpire(redisDb *db, robj *key, time_t when);
static void updateSalvesWaitingBgsave(int bgsaveerr);
static void authCommand(redisClient *c);
static void pingCommand(redisClient *c);
@ -633,13 +634,11 @@ static void oom(const char *msg) {
/* ====================== Redis server networking stuff ===================== */
void closeTimedoutClients(void) {
redisClient *c;
listIter *li;
listNode *ln;
time_t now = time(NULL);
li = listGetIterator(server.clients,AL_START_HEAD);
if (!li) return;
while ((ln = listNextElement(li)) != NULL) {
listRewind(server.clients);
while ((ln = listYield(server.clients)) != NULL) {
c = listNodeValue(ln);
if (!(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
(now - c->lastinteraction > server.maxidletime)) {
@ -647,7 +646,6 @@ void closeTimedoutClients(void) {
freeClient(c);
}
}
listReleaseIterator(li);
}
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
@ -695,6 +693,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
/* Check if a background saving in progress terminated */
if (server.bgsaveinprogress) {
int statloc;
/* XXX: TODO handle the case of the saving child killed */
if (wait4(-1,&statloc,WNOHANG,NULL)) {
int exitcode = WEXITSTATUS(statloc);
if (exitcode == 0) {
@ -707,6 +706,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
"Background saving error");
}
server.bgsaveinprogress = 0;
updateSalvesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
}
} else {
/* If there is not a background saving in progress check if
@ -1041,6 +1041,8 @@ static void freeClient(redisClient *c) {
assert(ln != NULL);
listDelNode(server.clients,ln);
if (c->flags & REDIS_SLAVE) {
if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
close(c->repldbfd);
list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
ln = listSearchKey(l,c);
assert(ln != NULL);
@ -1055,13 +1057,13 @@ static void freeClient(redisClient *c) {
static void glueReplyBuffersIfNeeded(redisClient *c) {
int totlen = 0;
listNode *ln = c->reply->head, *next;
listNode *ln;
robj *o;
while(ln) {
listRewind(c->reply);
while((ln = listYield(c->reply))) {
o = ln->value;
totlen += sdslen(o->ptr);
ln = ln->next;
/* This optimization makes more sense if we don't have to copy
* too much data */
if (totlen > 1024) return;
@ -1070,14 +1072,12 @@ static void glueReplyBuffersIfNeeded(redisClient *c) {
char buf[1024];
int copylen = 0;
ln = c->reply->head;
while(ln) {
next = ln->next;
listRewind(c->reply);
while((ln = listYield(c->reply))) {
o = ln->value;
memcpy(buf+copylen,o->ptr,sdslen(o->ptr));
copylen += sdslen(o->ptr);
listDelNode(c->reply,ln);
ln = next;
}
/* Now the output buffer is empty, add the new single element */
addReplySds(c,sdsnewlen(buf,totlen));
@ -1231,7 +1231,7 @@ static int processCommand(redisClient *c) {
}
static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc) {
listNode *ln = slaves->head;
listNode *ln;
robj *outv[REDIS_MAX_ARGS*4]; /* enough room for args, spaces, newlines */
int outc = 0, j;
@ -1253,14 +1253,12 @@ static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int di
* be sure to free objects if there is no slave in a replication state
* able to be feed with commands */
for (j = 0; j < outc; j++) incrRefCount(outv[j]);
while(ln) {
listRewind(slaves);
while((ln = listYield(slaves))) {
redisClient *slave = ln->value;
/* Don't feed slaves that are still waiting for BGSAVE to start */
if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
ln = ln->next;
continue;
}
if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
/* Feed all the other slaves, MONITORs and so on */
if (slave->slaveseldb != dictid) {
@ -1287,19 +1285,18 @@ static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int di
slave->slaveseldb = dictid;
}
for (j = 0; j < outc; j++) addReply(slave,outv[j]);
ln = ln->next;
}
for (j = 0; j < outc; j++) decrRefCount(outv[j]);
}
static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
redisClient *c = (redisClient*) privdata;
char buf[REDIS_QUERYBUF_LEN];
char buf[REDIS_IOBUF_LEN];
int nread;
REDIS_NOTUSED(el);
REDIS_NOTUSED(mask);
nread = read(fd, buf, REDIS_QUERYBUF_LEN);
nread = read(fd, buf, REDIS_IOBUF_LEN);
if (nread == -1) {
if (errno == EAGAIN) {
nread = 0;
@ -1428,6 +1425,8 @@ static redisClient *createClient(int fd) {
static void addReply(redisClient *c, robj *obj) {
if (listLength(c->reply) == 0 &&
(c->replstate == REDIS_REPL_NONE ||
c->replstate == REDIS_REPL_ONLINE) &&
aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
sendReplyToClient, c, NULL) == AE_ERR) return;
if (!listAddNodeTail(c->reply,obj)) oom("listAddNodeTail");
@ -1804,14 +1803,14 @@ static int rdbSave(char *filename) {
} else if (o->type == REDIS_LIST) {
/* Save a list value */
list *list = o->ptr;
listNode *ln = list->head;
listNode *ln;
listRewind(list);
if (rdbSaveLen(fp,listLength(list)) == -1) goto werr;
while(ln) {
while((ln = listYield(list))) {
robj *eleobj = listNodeValue(ln);
if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
ln = ln->next;
}
} else if (o->type == REDIS_SET) {
/* Save a set value */
@ -2373,6 +2372,7 @@ static void bgsaveCommand(redisClient *c) {
static void shutdownCommand(redisClient *c) {
redisLog(REDIS_WARNING,"User requested shutdown, saving DB...");
/* XXX: TODO kill the child if there is a bgsave in progress */
if (rdbSave(server.dbfilename) == REDIS_OK) {
if (server.daemonize) {
unlink(server.pidfile);
@ -3224,13 +3224,14 @@ static void sortCommand(redisClient *c) {
j = 0;
if (sortval->type == REDIS_LIST) {
list *list = sortval->ptr;
listNode *ln = list->head;
while(ln) {
listNode *ln;
listRewind(list);
while((ln = listYield(list))) {
robj *ele = ln->value;
vector[j].obj = ele;
vector[j].u.score = 0;
vector[j].u.cmpobj = NULL;
ln = ln->next;
j++;
}
} else {
@ -3292,14 +3293,15 @@ static void sortCommand(redisClient *c) {
outputlen = getop ? getop*(end-start+1) : end-start+1;
addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen));
for (j = start; j <= end; j++) {
listNode *ln = operations->head;
listNode *ln;
if (!getop) {
addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",
sdslen(vector[j].obj->ptr)));
addReply(c,vector[j].obj);
addReply(c,shared.crlf);
}
while(ln) {
listRewind(operations);
while((ln = listYield(operations))) {
redisSortOperation *sop = ln->value;
robj *val = lookupKeyByPattern(c->db,sop->pattern,
vector[j].obj);
@ -3316,7 +3318,6 @@ static void sortCommand(redisClient *c) {
} else if (sop->type == REDIS_SORT_DEL) {
/* TODO */
}
ln = ln->next;
}
}
@ -3455,23 +3456,6 @@ static void expireCommand(redisClient *c) {
/* =============================== Replication ============================= */
/* Send the whole output buffer syncronously to the slave. This a general operation in theory, but it is actually useful only for replication. */
static int flushClientOutput(redisClient *c) {
int retval;
time_t start = time(NULL);
while(listLength(c->reply)) {
if (time(NULL)-start > 5) return REDIS_ERR; /* 5 seconds timeout */
retval = aeWait(c->fd,AE_WRITABLE,1000);
if (retval == -1) {
return REDIS_ERR;
} else if (retval & AE_WRITABLE) {
sendReplyToClient(NULL, c->fd, c, AE_WRITABLE);
}
}
return REDIS_OK;
}
static int syncWrite(int fd, char *ptr, ssize_t size, int timeout) {
ssize_t nwritten, ret = size;
time_t start = time(NULL);
@ -3557,11 +3541,10 @@ static void syncCommand(redisClient *c) {
redisClient *slave;
listNode *ln;
ln = server.slaves->head;
while(ln) {
listRewind(server.slaves);
while((ln = listYield(server.slaves))) {
slave = ln->value;
if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
ln = ln->next;
}
if (ln) {
/* Perfect, the server is already registering differences for
@ -3587,59 +3570,115 @@ static void syncCommand(redisClient *c) {
}
c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
}
c->repldbfd = -1;
c->flags |= REDIS_SLAVE;
c->slaveseldb = 0;
if (!listAddNodeTail(server.slaves,c)) oom("listAddNodeTail");
redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
return;
}
#if 0
static void _syncCommand(redisClient *c) {
struct stat sb;
int fd = -1, len;
time_t start = time(NULL);
char sizebuf[32];
static void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
redisClient *slave = privdata;
REDIS_NOTUSED(el);
REDIS_NOTUSED(mask);
char buf[REDIS_IOBUF_LEN];
ssize_t nwritten, buflen;
/* ignore SYNC if aleady slave or in monitor mode */
if (c->flags & REDIS_SLAVE) return;
if (slave->repldboff == 0) {
/* Write the bulk write count before to transfer the DB. In theory here
* we don't know how much room there is in the output buffer of the
* socket, but in pratice SO_SNDLOWAT (the minimum count for output
* operations) will never be smaller than the few bytes we need. */
sds bulkcount;
redisLog(REDIS_NOTICE,"Slave ask for synchronization");
if (flushClientOutput(c) == REDIS_ERR ||
rdbSave(server.dbfilename) != REDIS_OK)
goto closeconn;
fd = open(server.dbfilename, O_RDONLY);
if (fd == -1 || fstat(fd,&sb) == -1) goto closeconn;
len = sb.st_size;
snprintf(sizebuf,32,"$%d\r\n",len);
if (syncWrite(c->fd,sizebuf,strlen(sizebuf),5) == -1) goto closeconn;
while(len) {
char buf[1024];
int nread;
if (time(NULL)-start > REDIS_MAX_SYNC_TIME) goto closeconn;
nread = read(fd,buf,1024);
if (nread == -1) goto closeconn;
len -= nread;
if (syncWrite(c->fd,buf,nread,5) == -1) goto closeconn;
bulkcount = sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
slave->repldbsize);
if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount))
{
sdsfree(bulkcount);
freeClient(slave);
return;
}
sdsfree(bulkcount);
}
lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
if (buflen <= 0) {
redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
(buflen == 0) ? "premature EOF" : strerror(errno));
freeClient(slave);
return;
}
if ((nwritten = write(fd,buf,buflen)) == -1) {
redisLog(REDIS_DEBUG,"Write error sending DB to slave: %s",
strerror(errno));
freeClient(slave);
return;
}
slave->repldboff += nwritten;
if (slave->repldboff == slave->repldbsize) {
close(slave->repldbfd);
slave->repldbfd = -1;
aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
slave->replstate = REDIS_REPL_ONLINE;
if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
sendReplyToClient, slave, NULL) == AE_ERR) {
freeClient(slave);
return;
}
addReplySds(slave,sdsempty());
redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
}
}
static void updateSalvesWaitingBgsave(int bgsaveerr) {
listNode *ln;
int startbgsave = 0;
listRewind(server.slaves);
while((ln = listYield(server.slaves))) {
redisClient *slave = ln->value;
if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
startbgsave = 1;
slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
} else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
struct stat buf;
if (bgsaveerr != REDIS_OK) {
freeClient(slave);
redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
continue;
}
if ((slave->repldbfd = open(server.dbfilename,O_RDONLY)) == -1 ||
fstat(slave->repldbfd,&buf) == -1) {
freeClient(slave);
redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
continue;
}
slave->repldboff = 0;
slave->repldbsize = buf.st_size;
slave->replstate = REDIS_REPL_SEND_BULK;
aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave, NULL) == AE_ERR) {
freeClient(slave);
continue;
}
}
}
if (startbgsave) {
if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
listRewind(server.slaves);
redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
while((ln = listYield(server.slaves))) {
redisClient *slave = ln->value;
if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
freeClient(slave);
}
}
}
if (syncWrite(c->fd,"\r\n",2,5) == -1) goto closeconn;
close(fd);
c->flags |= REDIS_SLAVE;
c->slaveseldb = 0;
if (!listAddNodeTail(server.slaves,c)) oom("listAddNodeTail");
redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
return;
closeconn:
if (fd != -1) close(fd);
c->flags |= REDIS_CLOSE;
redisLog(REDIS_WARNING,"Synchronization with slave failed");
return;
}
#endif
static int syncWithMaster(void) {
char buf[1024], tmpfile[256];

View File

@ -29,4 +29,7 @@ def redisSha1(opts={})
sha1
end
p "Dataset SHA1: #{redisSha1()}"
host = (ARGV[0] or "127.0.0.1")
port = (ARGV[1] or "6379")
puts "Performing SHA1 of Redis server #{host} #{port}"
p "Dataset SHA1: #{redisSha1(:host => host, :port => port.to_i)}"