diff --git a/src/rdb.c b/src/rdb.c index c566378fb..5111b6b88 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -42,31 +42,35 @@ #include #include -#define rdbExitReportCorruptRDB(...) rdbCheckThenExit(__LINE__,__VA_ARGS__) +/* This macro is called when the internal RDB stracture is corrupt */ +#define rdbExitReportCorruptRDB(...) rdbReportReadError(0, __LINE__,__VA_ARGS__) +/* This macro is called when RDB read failed (possibly a short read) */ +#define rdbReportReadError(...) rdbReportError(1, __LINE__,__VA_ARGS__) char* rdbFileBeingLoaded = NULL; /* used for rdb checking on read error */ extern int rdbCheckMode; void rdbCheckError(const char *fmt, ...); void rdbCheckSetError(const char *fmt, ...); -void rdbCheckThenExit(int linenum, char *reason, ...) { +void rdbReportError(int read_error, int linenum, char *reason, ...) { va_list ap; char msg[1024]; int len; len = snprintf(msg,sizeof(msg), - "Internal error in RDB reading function at rdb.c:%d -> ", linenum); + "Internal error in RDB reading offset %llu, function at rdb.c:%d -> ", + (unsigned long long)server.loading_loaded_bytes, linenum); va_start(ap,reason); vsnprintf(msg+len,sizeof(msg)-len,reason,ap); va_end(ap); if (!rdbCheckMode) { - serverLog(LL_WARNING, "%s", msg); - if (rdbFileBeingLoaded) { + if (rdbFileBeingLoaded || !read_error) { + serverLog(LL_WARNING, "%s", msg); char *argv[2] = {"",rdbFileBeingLoaded}; redis_check_rdb_main(2,argv,NULL); } else { - serverLog(LL_WARNING, "Failure loading rdb format from socket, assuming connection error, resuming operation."); + serverLog(LL_WARNING, "%s. Failure loading rdb format from socket, assuming connection error, resuming operation.", msg); return; } } else { @@ -82,18 +86,6 @@ static int rdbWriteRaw(rio *rdb, void *p, size_t len) { return len; } -/* This is just a wrapper for the low level function rioRead() that will - * automatically abort if it is not possible to read the specified amount - * of bytes. */ -void rdbLoadRaw(rio *rdb, void *buf, uint64_t len) { - if (rioRead(rdb,buf,len) == 0) { - rdbExitReportCorruptRDB( - "Impossible to read %llu bytes in rdbLoadRaw()", - (unsigned long long) len); - return; /* Not reached. */ - } -} - int rdbSaveType(rio *rdb, unsigned char type) { return rdbWriteRaw(rdb,&type,1); } @@ -110,10 +102,11 @@ int rdbLoadType(rio *rdb) { /* This is only used to load old databases stored with the RDB_OPCODE_EXPIRETIME * opcode. New versions of Redis store using the RDB_OPCODE_EXPIRETIME_MS * opcode. */ -time_t rdbLoadTime(rio *rdb) { +int rdbLoadTime(rio *rdb, time_t *t) { int32_t t32; - rdbLoadRaw(rdb,&t32,4); - return (time_t)t32; + if (rioRead(rdb,&t32,4) == 0) return C_ERR; + *t = (time_t)t32; + return C_OK;; } int rdbSaveMillisecondTime(rio *rdb, long long t) { @@ -133,12 +126,13 @@ int rdbSaveMillisecondTime(rio *rdb, long long t) { * own old RDB files. Because of that, we instead fix the function only for new * RDB versions, and load older RDB versions as we used to do in the past, * allowing big endian systems to load their own old RDB files. */ -long long rdbLoadMillisecondTime(rio *rdb, int rdbver) { +int rdbLoadMillisecondTime(rio *rdb, long long *t, int rdbver) { int64_t t64; - rdbLoadRaw(rdb,&t64,8); + if (rioRead(rdb,&t64,8) == 0) return C_ERR; if (rdbver >= 9) /* Check the top comment of this function. */ memrev64ifbe(&t64); /* Convert in big endian if the system is BE. */ - return (long long)t64; + *t = (long long)t64; + return C_OK; } /* Saves an encoded length. The first two bits in the first byte are used to @@ -216,9 +210,8 @@ int rdbLoadLenByRef(rio *rdb, int *isencoded, uint64_t *lenptr) { if (rioRead(rdb,&len,8) == 0) return -1; *lenptr = ntohu64(len); } else { - rdbExitReportCorruptRDB( - "Unknown length encoding %d in rdbLoadLen()",type); - return -1; /* Never reached. */ + serverLog(LL_WARNING, "Unknown length encoding %d in rdbLoadLen()",type); + return -1; } return 0; } @@ -284,8 +277,8 @@ void *rdbLoadIntegerObject(rio *rdb, int enctype, int flags, size_t *lenptr) { v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24); val = (int32_t)v; } else { - val = 0; /* anti-warning */ - rdbExitReportCorruptRDB("Unknown RDB integer encoding type %d",enctype); + serverLog(LL_WARNING, "Unknown RDB integer encoding type %d", enctype); + return NULL; } if (plain || sds) { char buf[LONG_STR_SIZE], *p; @@ -502,7 +495,8 @@ void *rdbGenericLoadStringObject(rio *rdb, int flags, size_t *lenptr) { case RDB_ENC_LZF: return rdbLoadLzfStringObject(rdb,flags,lenptr); default: - rdbExitReportCorruptRDB("Unknown RDB string encoding type %d",len); + serverLog(LL_WARNING, "Unknown RDB encoding type %llu", (unsigned long long)len); + return NULL; } } @@ -1644,6 +1638,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) { hashTypeConvert(o, OBJ_ENCODING_HT); break; default: + /* totally unreachable */ rdbExitReportCorruptRDB("Unknown RDB encoding type %d",rdbtype); break; } @@ -1651,6 +1646,11 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) { o = createStreamObject(); stream *s = o->ptr; uint64_t listpacks = rdbLoadLen(rdb,NULL); + if (listpacks == RDB_LENERR) { + rdbReportReadError("Stream listpacks len loading failed."); + decrRefCount(o); + return NULL; + } while(listpacks--) { /* Get the master ID, the one we'll use as key of the radix tree @@ -1658,7 +1658,9 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) { * relatively to this ID. */ sds nodekey = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL); if (nodekey == NULL) { - rdbExitReportCorruptRDB("Stream master ID loading failed: invalid encoding or I/O error."); + rdbReportReadError("Stream master ID loading failed: invalid encoding or I/O error."); + decrRefCount(o); + return NULL; } if (sdslen(nodekey) != sizeof(streamID)) { rdbExitReportCorruptRDB("Stream node key entry is not the " @@ -1668,7 +1670,12 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) { /* Load the listpack. */ unsigned char *lp = rdbGenericLoadStringObject(rdb,RDB_LOAD_PLAIN,NULL); - if (lp == NULL) return NULL; + if (lp == NULL) { + rdbReportReadError("Stream listpacks loading failed."); + sdsfree(nodekey); + decrRefCount(o); + return NULL; + } unsigned char *first = lpFirst(lp); if (first == NULL) { /* Serialized listpacks should never be empty, since on @@ -1685,13 +1692,26 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) { rdbExitReportCorruptRDB("Listpack re-added with existing key"); } /* Load total number of items inside the stream. */ - s->length = rdbLoadLen(rdb,NULL); + if (rdbLoadLenByRef(rdb,NULL,&s->length)) { + rdbReportReadError("Stream item count loading failed."); + decrRefCount(o); + return NULL; + } /* Load the last entry ID. */ - s->last_id.ms = rdbLoadLen(rdb,NULL); - s->last_id.seq = rdbLoadLen(rdb,NULL); + if (rdbLoadLenByRef(rdb,NULL,&s->last_id.ms) || + rdbLoadLenByRef(rdb,NULL,&s->last_id.seq)) { + rdbReportReadError("Stream last entry ID loading failed."); + decrRefCount(o); + return NULL; + } /* Consumer groups loading */ - size_t cgroups_count = rdbLoadLen(rdb,NULL); + uint64_t cgroups_count = rdbLoadLen(rdb,NULL); + if (cgroups_count == RDB_LENERR) { + rdbReportReadError("Stream cgroup count loading failed."); + decrRefCount(o); + return NULL; + } while(cgroups_count--) { /* Get the consumer group name and ID. We can then create the * consumer group ASAP and populate its structure as @@ -1699,11 +1719,18 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) { streamID cg_id; sds cgname = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL); if (cgname == NULL) { - rdbExitReportCorruptRDB( + rdbReportReadError( "Error reading the consumer group name from Stream"); + decrRefCount(o); + return NULL; + } + if (rdbLoadLenByRef(rdb,NULL,&cg_id.ms) || + rdbLoadLenByRef(rdb,NULL,&cg_id.seq)) { + rdbReportReadError("Stream cgroup ID loading failed."); + sdsfree(cgname); + decrRefCount(o); + return NULL; } - cg_id.ms = rdbLoadLen(rdb,NULL); - cg_id.seq = rdbLoadLen(rdb,NULL); streamCG *cgroup = streamCreateCG(s,cgname,sdslen(cgname),&cg_id); if (cgroup == NULL) rdbExitReportCorruptRDB("Duplicated consumer group name %s", @@ -1715,13 +1742,32 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) { * owner, since consumers for this group and their messages will * be read as a next step. So for now leave them not resolved * and later populate it. */ - size_t pel_size = rdbLoadLen(rdb,NULL); + uint64_t pel_size = rdbLoadLen(rdb,NULL); + if (pel_size == RDB_LENERR) { + rdbReportReadError("Stream PEL size loading failed."); + decrRefCount(o); + return NULL; + } while(pel_size--) { unsigned char rawid[sizeof(streamID)]; - rdbLoadRaw(rdb,rawid,sizeof(rawid)); + if (rioRead(rdb,rawid,sizeof(rawid)) == 0) { + rdbReportReadError("Stream PEL ID loading failed."); + decrRefCount(o); + return NULL; + } streamNACK *nack = streamCreateNACK(NULL); - nack->delivery_time = rdbLoadMillisecondTime(rdb,RDB_VERSION); - nack->delivery_count = rdbLoadLen(rdb,NULL); + if (rdbLoadMillisecondTime(rdb, &nack->delivery_time,RDB_VERSION) == C_ERR) { + rdbReportReadError("Stream PEL nack loading failed."); + decrRefCount(o); + streamFreeNACK(nack); + return NULL; + } + if ((nack->delivery_count = rdbLoadLen(rdb,NULL)) == RDB_LENERR) { + rdbReportReadError("Stream nack deliveries loading failed."); + decrRefCount(o); + streamFreeNACK(nack); + return NULL; + } if (!raxInsert(cgroup->pel,rawid,sizeof(rawid),nack,NULL)) rdbExitReportCorruptRDB("Duplicated gobal PEL entry " "loading stream consumer group"); @@ -1729,24 +1775,44 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) { /* Now that we loaded our global PEL, we need to load the * consumers and their local PELs. */ - size_t consumers_num = rdbLoadLen(rdb,NULL); + uint64_t consumers_num = rdbLoadLen(rdb,NULL); + if (consumers_num == RDB_LENERR) { + rdbReportReadError("Stream consumers num loading failed."); + decrRefCount(o); + return NULL; + } while(consumers_num--) { sds cname = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL); if (cname == NULL) { - rdbExitReportCorruptRDB( - "Error reading the consumer name from Stream group"); + rdbReportReadError( + "Error reading the consumer name from Stream group."); + decrRefCount(o); + return NULL; } streamConsumer *consumer = streamLookupConsumer(cgroup,cname, 1); sdsfree(cname); - consumer->seen_time = rdbLoadMillisecondTime(rdb,RDB_VERSION); + if (rdbLoadMillisecondTime(rdb, &consumer->seen_time,RDB_VERSION) == C_ERR) { + rdbReportReadError("Stream short read reading seen time."); + decrRefCount(o); + return NULL; + } /* Load the PEL about entries owned by this specific * consumer. */ pel_size = rdbLoadLen(rdb,NULL); + if (pel_size == RDB_LENERR) { + rdbReportReadError("Stream consumer PEL num loading failed."); + decrRefCount(o); + return NULL; + } while(pel_size--) { unsigned char rawid[sizeof(streamID)]; - rdbLoadRaw(rdb,rawid,sizeof(rawid)); + if (rioRead(rdb,rawid,sizeof(rawid)) == 0) { + rdbReportReadError("Stream short read reading PEL streamID."); + decrRefCount(o); + return NULL; + } streamNACK *nack = raxFind(cgroup->pel,rawid,sizeof(rawid)); if (nack == raxNotFound) rdbExitReportCorruptRDB("Consumer entry not found in " @@ -1764,7 +1830,10 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) { } } } else if (rdbtype == RDB_TYPE_MODULE || rdbtype == RDB_TYPE_MODULE_2) { - uint64_t moduleid = rdbLoadLen(rdb,NULL); + uint64_t moduleid; + if (rdbLoadLenByRef(rdb,NULL, &moduleid)) { + return NULL; + } moduleType *mt = moduleTypeLookupModuleByID(moduleid); char name[10]; @@ -1792,6 +1861,11 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) { /* Module v2 serialization has an EOF mark at the end. */ if (io.ver == 2) { uint64_t eof = rdbLoadLen(rdb,NULL); + if (eof == RDB_LENERR) { + o = createModuleObject(mt,ptr); /* creating just in order to easily destroy */ + decrRefCount(o); + return NULL; + } if (eof != RDB_MODULE_OPCODE_EOF) { serverLog(LL_WARNING,"The RDB file contains module data for the module '%s' that is not terminated by the proper module value EOF marker", name); exit(1); @@ -1805,7 +1879,8 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) { } o = createModuleObject(mt,ptr); } else { - rdbExitReportCorruptRDB("Unknown RDB encoding type %d",rdbtype); + rdbReportReadError("Unknown RDB encoding type %d",rdbtype); + return NULL; } return o; } @@ -1902,13 +1977,14 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) { /* EXPIRETIME: load an expire associated with the next key * to load. Note that after loading an expire we need to * load the actual type, and continue. */ - expiretime = rdbLoadTime(rdb); - expiretime *= 1000; + time_t t; + if (rdbLoadTime(rdb, &t) == C_ERR) goto eoferr; + expiretime = t * 1000; continue; /* Read next opcode. */ } else if (type == RDB_OPCODE_EXPIRETIME_MS) { /* EXPIRETIME_MS: milliseconds precision expire times introduced * with RDB v3. Like EXPIRETIME but no with more precision. */ - expiretime = rdbLoadMillisecondTime(rdb,rdbver); + if (rdbLoadMillisecondTime(rdb, &expiretime, rdbver) == C_ERR) goto eoferr; continue; /* Read next opcode. */ } else if (type == RDB_OPCODE_FREQ) { /* FREQ: LFU frequency. */ @@ -2017,7 +2093,8 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) { * we have the ability to read a MODULE_AUX opcode followed by an * identifier of the module, and a serialized value in "MODULE V2" * format. */ - uint64_t moduleid = rdbLoadLen(rdb,NULL); + uint64_t moduleid; + if (rdbLoadLenByRef(rdb,NULL,&moduleid)) goto eoferr; moduleType *mt = moduleTypeLookupModuleByID(moduleid); char name[10]; moduleTypeNameByID(name,moduleid); @@ -2090,8 +2167,9 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) { eoferr: /* unexpected end of file is handled here with a fatal exit */ serverLog(LL_WARNING,"Short read or OOM loading DB. Unrecoverable error, aborting now."); - rdbExitReportCorruptRDB("Unexpected EOF reading RDB file"); - return C_ERR; /* Just to avoid warning */ + rdbReportReadError("Unexpected EOF reading RDB file"); +err: + return C_ERR; } /* Like rdbLoadRio() but takes a filename instead of a rio stream. The diff --git a/src/rdb.h b/src/rdb.h index 0acddf9ab..2bec0171b 100644 --- a/src/rdb.h +++ b/src/rdb.h @@ -127,10 +127,10 @@ int rdbSaveType(rio *rdb, unsigned char type); int rdbLoadType(rio *rdb); int rdbSaveTime(rio *rdb, time_t t); -time_t rdbLoadTime(rio *rdb); +int rdbLoadTime(rio *rdb, time_t *t); int rdbSaveLen(rio *rdb, uint64_t len); int rdbSaveMillisecondTime(rio *rdb, long long t); -long long rdbLoadMillisecondTime(rio *rdb, int rdbver); +int rdbLoadMillisecondTime(rio *rdb, long long *t, int rdbver); uint64_t rdbLoadLen(rio *rdb, int *isencoded); int rdbLoadLenByRef(rio *rdb, int *isencoded, uint64_t *lenptr); int rdbSaveObjectType(rio *rdb, robj *o); diff --git a/src/redis-check-rdb.c b/src/redis-check-rdb.c index e2d71b5a5..fd3f07b0a 100644 --- a/src/redis-check-rdb.c +++ b/src/redis-check-rdb.c @@ -212,18 +212,19 @@ int redis_check_rdb(char *rdbfilename, FILE *fp) { /* Handle special types. */ if (type == RDB_OPCODE_EXPIRETIME) { + time_t t; rdbstate.doing = RDB_CHECK_DOING_READ_EXPIRE; /* EXPIRETIME: load an expire associated with the next key * to load. Note that after loading an expire we need to * load the actual type, and continue. */ - if ((expiretime = rdbLoadTime(&rdb)) == -1) goto eoferr; - expiretime *= 1000; + if (rdbLoadTime(&rdb, &t) == C_ERR) goto eoferr; + expiretime = t * 1000; continue; /* Read next opcode. */ } else if (type == RDB_OPCODE_EXPIRETIME_MS) { /* EXPIRETIME_MS: milliseconds precision expire times introduced * with RDB v3. Like EXPIRETIME but no with more precision. */ rdbstate.doing = RDB_CHECK_DOING_READ_EXPIRE; - if ((expiretime = rdbLoadMillisecondTime(&rdb, rdbver)) == -1) goto eoferr; + if (rdbLoadMillisecondTime(&rdb, &expiretime, rdbver) == C_ERR) goto eoferr; continue; /* Read next opcode. */ } else if (type == RDB_OPCODE_FREQ) { /* FREQ: LFU frequency. */ diff --git a/src/stream.h b/src/stream.h index ef08753b5..8ae90ce77 100644 --- a/src/stream.h +++ b/src/stream.h @@ -109,5 +109,6 @@ streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id); streamNACK *streamCreateNACK(streamConsumer *consumer); void streamDecodeID(void *buf, streamID *id); int streamCompareID(streamID *a, streamID *b); +void streamFreeNACK(streamNACK *na); #endif diff --git a/tests/integration/replication.tcl b/tests/integration/replication.tcl index d69a1761a..5d32555b0 100644 --- a/tests/integration/replication.tcl +++ b/tests/integration/replication.tcl @@ -192,12 +192,6 @@ foreach mdl {no yes} { set master_host [srv 0 host] set master_port [srv 0 port] set slaves {} - set load_handle0 [start_bg_complex_data $master_host $master_port 9 100000000] - set load_handle1 [start_bg_complex_data $master_host $master_port 11 100000000] - set load_handle2 [start_bg_complex_data $master_host $master_port 12 100000000] - set load_handle3 [start_write_load $master_host $master_port 8] - set load_handle4 [start_write_load $master_host $master_port 4] - after 5000 ;# wait for some data to accumulate so that we have RDB part for the fork start_server {} { lappend slaves [srv 0 client] start_server {} { @@ -205,6 +199,14 @@ foreach mdl {no yes} { start_server {} { lappend slaves [srv 0 client] test "Connect multiple replicas at the same time (issue #141), master diskless=$mdl, replica diskless=$sdl" { + # start load handles only inside the test, so that the test can be skipped + set load_handle0 [start_bg_complex_data $master_host $master_port 9 100000000] + set load_handle1 [start_bg_complex_data $master_host $master_port 11 100000000] + set load_handle2 [start_bg_complex_data $master_host $master_port 12 100000000] + set load_handle3 [start_write_load $master_host $master_port 8] + set load_handle4 [start_write_load $master_host $master_port 4] + after 5000 ;# wait for some data to accumulate so that we have RDB part for the fork + # Send SLAVEOF commands to slaves [lindex $slaves 0] config set repl-diskless-load $sdl [lindex $slaves 1] config set repl-diskless-load $sdl @@ -278,9 +280,9 @@ start_server {tags {"repl"}} { set master [srv 0 client] set master_host [srv 0 host] set master_port [srv 0 port] - set load_handle0 [start_write_load $master_host $master_port 3] start_server {} { test "Master stream is correctly processed while the replica has a script in -BUSY state" { + set load_handle0 [start_write_load $master_host $master_port 3] set slave [srv 0 client] $slave config set lua-time-limit 500 $slave slaveof $master_host $master_port @@ -383,3 +385,84 @@ test {slave fails full sync and diskless load swapdb recoveres it} { } } } + +test {diskless loading short read} { + start_server {tags {"repl"}} { + set replica [srv 0 client] + set replica_host [srv 0 host] + set replica_port [srv 0 port] + start_server {} { + set master [srv 0 client] + set master_host [srv 0 host] + set master_port [srv 0 port] + + # Set master and replica to use diskless replication + $master config set repl-diskless-sync yes + $master config set rdbcompression no + $replica config set repl-diskless-load swapdb + # Try to fill the master with all types of data types / encodings + for {set k 0} {$k < 3} {incr k} { + for {set i 0} {$i < 10} {incr i} { + r set "$k int_$i" [expr {int(rand()*10000)}] + r expire "$k int_$i" [expr {int(rand()*10000)}] + r set "$k string_$i" [string repeat A [expr {int(rand()*1000000)}]] + r hset "$k hash_small" [string repeat A [expr {int(rand()*10)}]] 0[string repeat A [expr {int(rand()*10)}]] + r hset "$k hash_large" [string repeat A [expr {int(rand()*10000)}]] [string repeat A [expr {int(rand()*1000000)}]] + r sadd "$k set_small" [string repeat A [expr {int(rand()*10)}]] + r sadd "$k set_large" [string repeat A [expr {int(rand()*1000000)}]] + r zadd "$k zset_small" [expr {rand()}] [string repeat A [expr {int(rand()*10)}]] + r zadd "$k zset_large" [expr {rand()}] [string repeat A [expr {int(rand()*1000000)}]] + r lpush "$k list_small" [string repeat A [expr {int(rand()*10)}]] + r lpush "$k list_large" [string repeat A [expr {int(rand()*1000000)}]] + for {set j 0} {$j < 10} {incr j} { + r xadd "$k stream" * foo "asdf" bar "1234" + } + r xgroup create "$k stream" "mygroup_$i" 0 + r xreadgroup GROUP "mygroup_$i" Alice COUNT 1 STREAMS "$k stream" > + } + } + + # Start the replication process... + $master config set repl-diskless-sync-delay 0 + $replica replicaof $master_host $master_port + + # kill the replication at various points + set attempts 3 + if {$::accurate} { set attempts 10 } + for {set i 0} {$i < $attempts} {incr i} { + # wait for the replica to start reading the rdb + # using the log file since the replica only responds to INFO once in 2mb + wait_for_log_message -1 "*Loading DB in memory*" 5 2000 1 + + # add some additional random sleep so that we kill the master on a different place each time + after [expr {int(rand()*100)}] + + # kill the replica connection on the master + set killed [$master client kill type replica] + + if {[catch { + set res [wait_for_log_message -1 "*Internal error in RDB*" 5 100 10] + if {$::verbose} { + puts $res + } + }]} { + puts "failed triggering short read" + # force the replica to try another full sync + $master client kill type replica + $master set asdf asdf + # the side effect of resizing the backlog is that it is flushed (16k is the min size) + $master config set repl-backlog-size [expr {16384 + $i}] + } + # wait for loading to stop (fail) + wait_for_condition 100 10 { + [s -1 loading] eq 0 + } else { + fail "Replica didn't disconnect" + } + } + # enable fast shutdown + $master config set rdb-key-save-delay 0 + } + } +} + diff --git a/tests/support/util.tcl b/tests/support/util.tcl index 41cc5612a..c2e76afad 100644 --- a/tests/support/util.tcl +++ b/tests/support/util.tcl @@ -99,6 +99,25 @@ proc wait_for_ofs_sync {r1 r2} { } } +proc wait_for_log_message {srv_idx pattern last_lines maxtries delay} { + set retry $maxtries + set stdout [srv $srv_idx stdout] + while {$retry} { + set result [exec tail -$last_lines < $stdout] + set result [split $result "\n"] + foreach line $result { + if {[string match $pattern $line]} { + return $line + } + } + incr retry -1 + after $delay + } + if {$retry == 0} { + fail "log message of '$pattern' not found" + } +} + # Random integer between 0 and max (excluded). proc randomInt {max} { expr {int(rand()*$max)}