Fix crash due to merge of quicklist node introduced by #12955 (#13040)

Fix two crash introducted by #12955

When a quicklist node can't be inserted and split, we eventually merge
the current node with its neighboring
nodes after inserting, and compress the current node and its siblings.

1. When the current node is merged with another node, the current node
may become invalid and can no longer be used.

   Solution: let `_quicklistMergeNodes()` return the merged nodes.

3. If the current node is a LZF quicklist node, its recompress will be
1. If the split node can be merged with a sibling node to become head or
tail, recompress may cause the head and tail to be compressed, which is
not allowed.

    Solution: always recompress to 0 after merging.
This commit is contained in:
debing.sun 2024-02-08 20:29:16 +08:00 committed by GitHub
parent 81666a6510
commit 1e8dc1da0d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 72 additions and 8 deletions

View File

@ -105,7 +105,7 @@ quicklistBookmark *_quicklistBookmarkFindByNode(quicklist *ql, quicklistNode *no
void _quicklistBookmarkDelete(quicklist *ql, quicklistBookmark *bm);
quicklistNode *_quicklistSplitNode(quicklistNode *node, int offset, int after);
void _quicklistMergeNodes(quicklist *quicklist, quicklistNode *center);
quicklistNode *_quicklistMergeNodes(quicklist *quicklist, quicklistNode *center);
/* Simple way to give quicklistEntry structs default values with one call. */
#define initEntry(e) \
@ -381,6 +381,15 @@ REDIS_STATIC void __quicklistCompress(const quicklist *quicklist,
quicklistCompressNode(reverse);
}
/* This macro is used to compress a node.
*
* If the 'recompress' flag of the node is true, we compress it directly without
* checking whether it is within the range of compress depth.
* However, it's important to ensure that the 'recompress' flag of head and tail
* is always false, as we always assume that head and tail are not compressed.
*
* If the 'recompress' flag of the node is false, we check whether the node is
* within the range of compress depth before compressing it. */
#define quicklistCompress(_ql, _node) \
do { \
if ((_node)->recompress) \
@ -794,9 +803,14 @@ void quicklistReplaceEntry(quicklistIter *iter, quicklistEntry *entry,
unsigned char *p = lpSeek(entry->node->entry, -1);
quicklistDelIndex(quicklist, entry->node, &p);
entry->node->dont_compress = 0; /* Re-enable compression */
_quicklistMergeNodes(quicklist, entry->node);
quicklistCompress(quicklist, entry->node);
quicklistCompress(quicklist, entry->node->next);
new_node = _quicklistMergeNodes(quicklist, new_node);
/* We can't know if the current node and its sibling nodes are correctly compressed,
* and we don't know if they are within the range of compress depth, so we need to
* use quicklistCompress() for compression, which checks if node is within compress
* depth before compressing. */
quicklistCompress(quicklist, new_node);
quicklistCompress(quicklist, new_node->prev);
if (new_node->next) quicklistCompress(quicklist, new_node->next);
}
}
@ -854,6 +868,8 @@ REDIS_STATIC quicklistNode *_quicklistListpackMerge(quicklist *quicklist,
}
keep->count = lpLength(keep->entry);
quicklistNodeUpdateSz(keep);
keep->recompress = 0; /* Prevent 'keep' from being recompressed if
* it becomes head or tail after merging. */
nokeep->count = 0;
__quicklistDelNode(quicklist, nokeep);
@ -872,9 +888,10 @@ REDIS_STATIC quicklistNode *_quicklistListpackMerge(quicklist *quicklist,
* - (center->next, center->next->next)
* - (center->prev, center)
* - (center, center->next)
*
* Returns the new 'center' after merging.
*/
REDIS_STATIC void _quicklistMergeNodes(quicklist *quicklist,
quicklistNode *center) {
REDIS_STATIC quicklistNode *_quicklistMergeNodes(quicklist *quicklist, quicklistNode *center) {
int fill = quicklist->fill;
quicklistNode *prev, *prev_prev, *next, *next_next, *target;
prev = prev_prev = next = next_next = target = NULL;
@ -914,8 +931,9 @@ REDIS_STATIC void _quicklistMergeNodes(quicklist *quicklist,
/* Use result of center merge (or original) to merge with next node. */
if (_quicklistNodeAllowMerge(target, target->next, fill)) {
_quicklistListpackMerge(quicklist, target, target->next);
target = _quicklistListpackMerge(quicklist, target, target->next);
}
return target;
}
/* Split 'node' into two parts, parameterized by 'offset' and 'after'.

View File

@ -394,7 +394,53 @@ if {[lindex [r config get proto-max-bulk-len] 1] == 10000000000} {
r ping
} {PONG} {large-memory}
test {Test LMOVE on plain nodes over 4GB} {
test {Test LSET splits a quicklist node, and then merge} {
# Test when a quicklist node can't be inserted and is split, the split
# node merges with the node before it and the `before` node is kept.
r flushdb
r rpush lst [string repeat "x" 4096]
r lpush lst a b c d e f g
r lpush lst [string repeat "y" 4096]
# now: [y...] [g f e d c b a x...]
# (node0) (node1)
# Keep inserting elements into node1 until node1 is split into two
# nodes([g] [...]), eventually node0 will merge with the [g] node.
# Since node0 is larger, after the merge node0 will be kept and
# the [g] node will be deleted.
for {set i 7} {$i >= 3} {incr i -1} {
r write "*4\r\n\$4\r\nlset\r\n\$3\r\nlst\r\n\$1\r\n$i\r\n"
write_big_bulk 1000000000
}
assert_equal "g" [r lindex lst 1]
r ping
} {PONG} {large-memory}
test {Test LSET splits a LZF compressed quicklist node, and then merge} {
# Test when a LZF compressed quicklist node can't be inserted and is split,
# the split node merges with the node before it and the split node is kept.
r flushdb
r config set list-compress-depth 1
r lpush lst [string repeat "x" 2000]
r rpush lst [string repeat "y" 7000]
r rpush lst a b c d e f g
r rpush lst [string repeat "z" 8000]
r lset lst 0 h
# now: [h] [y... a b c d e f g] [z...]
# node0 node1(LZF)
# Keep inserting elements into node1 until node1 is split into two
# nodes([y...] [...]), eventually node0 will merge with the [y...] node.
# Since [y...] node is larger, after the merge node0 will be deleted and
# the [y...] node will be kept.
for {set i 7} {$i >= 3} {incr i -1} {
r write "*4\r\n\$4\r\nlset\r\n\$3\r\nlst\r\n\$1\r\n$i\r\n"
write_big_bulk 1000000000
}
assert_equal "h" [r lindex lst 0]
r config set list-compress-depth 0
r ping
} {PONG} {large-memory}
test {Test LMOVE on plain nodes over 4GB} {
r flushdb
r RPUSH lst2{t} "aa"
r RPUSH lst2{t} "bb"