Prevent LSET command from causing quicklist plain node size to exceed 4GB (#12955)

Fix #12864

The main reason for this crash is that when replacing a element of a
quicklist packed node with lpReplace() method,
if the final size is larger than 4GB, lpReplace() will fail and returns
NULL, causing `node->entry` to be incorrectly set to NULL.

Since the inserted data is not a large element, we can't just replace it
like a large element, first quicklistInsertAfter()
and then quicklistDelIndex(), because the current node may be merged and
invalidated in quicklistInsertAfter().

The solution of this PR:
When replacing a node fails (listpack exceeds 4GB), split the current
node, create a new node to put in the middle, and try to merge them.
This is the same as inserting a large element.
In the worst case, its size will not exceed 4GB.
This commit is contained in:
debing.sun 2024-02-07 00:21:28 +08:00 committed by GitHub
parent 0777dc7896
commit 1f00c951c2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 54 additions and 13 deletions

View File

@ -104,6 +104,9 @@ quicklistBookmark *_quicklistBookmarkFindByName(quicklist *ql, const char *name)
quicklistBookmark *_quicklistBookmarkFindByNode(quicklist *ql, quicklistNode *node);
void _quicklistBookmarkDelete(quicklist *ql, quicklistBookmark *bm);
quicklistNode *_quicklistSplitNode(quicklistNode *node, int offset, int after);
void _quicklistMergeNodes(quicklist *quicklist, quicklistNode *center);
/* Simple way to give quicklistEntry structs default values with one call. */
#define initEntry(e) \
do { \
@ -529,19 +532,25 @@ REDIS_STATIC int _quicklistNodeAllowMerge(const quicklistNode *a,
(node)->sz = lpBytes((node)->entry); \
} while (0)
static quicklistNode* __quicklistCreatePlainNode(void *value, size_t sz) {
static quicklistNode* __quicklistCreateNode(int container, void *value, size_t sz) {
quicklistNode *new_node = quicklistCreateNode();
new_node->entry = zmalloc(sz);
new_node->container = QUICKLIST_NODE_CONTAINER_PLAIN;
memcpy(new_node->entry, value, sz);
new_node->container = container;
if (container == QUICKLIST_NODE_CONTAINER_PLAIN) {
new_node->entry = zmalloc(sz);
memcpy(new_node->entry, value, sz);
} else {
new_node->entry = lpPrepend(lpNew(0), value, sz);
}
new_node->sz = sz;
new_node->count++;
return new_node;
}
static void __quicklistInsertPlainNode(quicklist *quicklist, quicklistNode *old_node,
void *value, size_t sz, int after) {
__quicklistInsertNode(quicklist, old_node, __quicklistCreatePlainNode(value, sz), after);
void *value, size_t sz, int after)
{
quicklistNode *new_node = __quicklistCreateNode(QUICKLIST_NODE_CONTAINER_PLAIN, value, sz);
__quicklistInsertNode(quicklist, old_node, new_node, after);
quicklist->count++;
}
@ -741,9 +750,13 @@ void quicklistReplaceEntry(quicklistIter *iter, quicklistEntry *entry,
void *data, size_t sz)
{
quicklist* quicklist = iter->quicklist;
quicklistNode *node = entry->node;
unsigned char *newentry;
if (likely(!QL_NODE_IS_PLAIN(entry->node) && !isLargeElement(sz))) {
entry->node->entry = lpReplace(entry->node->entry, &entry->zi, data, sz);
if (likely(!QL_NODE_IS_PLAIN(entry->node) && !isLargeElement(sz) &&
(newentry = lpReplace(entry->node->entry, &entry->zi, data, sz)) != NULL))
{
entry->node->entry = newentry;
quicklistNodeUpdateSz(entry->node);
/* quicklistNext() and quicklistGetIteratorEntryAtIdx() provide an uncompressed node */
quicklistCompress(quicklist, entry->node);
@ -758,15 +771,30 @@ void quicklistReplaceEntry(quicklistIter *iter, quicklistEntry *entry,
quicklistInsertAfter(iter, entry, data, sz);
__quicklistDelNode(quicklist, entry->node);
}
} else {
entry->node->dont_compress = 1; /* Prevent compression in quicklistInsertAfter() */
quicklistInsertAfter(iter, entry, data, sz);
} else { /* The node is full or data is a large element */
quicklistNode *split_node = NULL, *new_node;
node->dont_compress = 1; /* Prevent compression in __quicklistInsertNode() */
/* If the entry is not at the tail, split the node at the entry's offset. */
if (entry->offset != node->count - 1 && entry->offset != -1)
split_node = _quicklistSplitNode(node, entry->offset, 1);
/* Create a new node and insert it after the original node.
* If the original node was split, insert the split node after the new node. */
new_node = __quicklistCreateNode(isLargeElement(sz) ?
QUICKLIST_NODE_CONTAINER_PLAIN : QUICKLIST_NODE_CONTAINER_PACKED, data, sz);
__quicklistInsertNode(quicklist, node, new_node, 1);
if (split_node) __quicklistInsertNode(quicklist, new_node, split_node, 1);
quicklist->count++;
/* Delete the replaced element. */
if (entry->node->count == 1) {
__quicklistDelNode(quicklist, entry->node);
} else {
unsigned char *p = lpSeek(entry->node->entry, -1);
quicklistDelIndex(quicklist, entry->node, &p);
entry->node->dont_compress = 0; /* Re-enable compression */
_quicklistMergeNodes(quicklist, entry->node);
quicklistCompress(quicklist, entry->node);
quicklistCompress(quicklist, entry->node->next);
}
@ -1002,7 +1030,7 @@ REDIS_STATIC void _quicklistInsert(quicklistIter *iter, quicklistEntry *entry,
} else {
quicklistDecompressNodeForUse(node);
new_node = _quicklistSplitNode(node, entry->offset, after);
quicklistNode *entry_node = __quicklistCreatePlainNode(value, sz);
quicklistNode *entry_node = __quicklistCreateNode(QUICKLIST_NODE_CONTAINER_PLAIN, value, sz);
__quicklistInsertNode(quicklist, node, entry_node, after);
__quicklistInsertNode(quicklist, entry_node, new_node, after);
quicklist->count++;
@ -3224,7 +3252,7 @@ int quicklistTest(int argc, char *argv[], int flags) {
memcpy(s, "helloworld", 10);
memcpy(s + sz - 10, "1234567890", 10);
quicklistNode *node = __quicklistCreatePlainNode(s, sz);
quicklistNode *node = __quicklistCreateNode(QUICKLIST_NODE_CONTAINER_PLAIN, s, sz);
/* Just to avoid triggering the assertion in __quicklistCompressNode(),
* it disables the passing of quicklist head or tail node. */

View File

@ -220,6 +220,7 @@ start_server [list overrides [list save ""] ] {
# checking LSET in case ziplist needs to be split
test {Test LSET with packed is split in the middle} {
set original_config [config_get_set list-max-listpack-size 4]
r flushdb
r debug quicklist-packed-threshold 5b
r RPUSH lst "aa"
@ -227,6 +228,7 @@ start_server [list overrides [list save ""] ] {
r RPUSH lst "cc"
r RPUSH lst "dd"
r RPUSH lst "ee"
assert_encoding quicklist lst
r lset lst 2 [string repeat e 10]
assert_equal [r lpop lst] "aa"
assert_equal [r lpop lst] "bb"
@ -234,6 +236,7 @@ start_server [list overrides [list save ""] ] {
assert_equal [r lpop lst] "dd"
assert_equal [r lpop lst] "ee"
r debug quicklist-packed-threshold 0
r config set list-max-listpack-size $original_config
} {OK} {needs:debug}
@ -381,6 +384,16 @@ if {[lindex [r config get proto-max-bulk-len] 1] == 10000000000} {
assert_equal [read_big_bulk {r rpop lst}] $str_length
} {} {large-memory}
test {Test LSET on plain nodes with large elements under packed_threshold over 4GB} {
r flushdb
r rpush lst a b c d e
for {set i 0} {$i < 5} {incr i} {
r write "*4\r\n\$4\r\nlset\r\n\$3\r\nlst\r\n\$1\r\n$i\r\n"
write_big_bulk 1000000000
}
r ping
} {PONG} {large-memory}
test {Test LMOVE on plain nodes over 4GB} {
r flushdb
r RPUSH lst2{t} "aa"