diff --git a/src/server.c b/src/server.c index 4337b8f01..5e18b5465 100644 --- a/src/server.c +++ b/src/server.c @@ -3401,6 +3401,10 @@ int processCommand(client *c) { } } + /* Make sure to use a reasonable amount of memory for client side + * caching metadata. */ + if (server.tracking_clients) trackingLimitUsedSlots(); + /* Don't accept write commands if there are problems persisting on disk * and if this is a master instance. */ int deny_write_type = writeCommandsDeniedByDiskError(); diff --git a/src/server.h b/src/server.h index b200a6696..fdab53830 100644 --- a/src/server.h +++ b/src/server.h @@ -1639,6 +1639,7 @@ void disableTracking(client *c); void trackingRememberKeys(client *c); void trackingInvalidateKey(robj *keyobj); void trackingInvalidateKeysOnFlush(int dbid); +void trackingLimitUsedSlots(void); /* List data type */ void listTypeTryConversion(robj *subject, robj *value); diff --git a/src/tracking.c b/src/tracking.c index dc2934831..1189e82d5 100644 --- a/src/tracking.c +++ b/src/tracking.c @@ -154,6 +154,30 @@ void sendTrackingMessage(client *c, long long hash) { } } +/* Invalidates a caching slot: this is actually the low level implementation + * of the API that Redis calls externally, that is trackingInvalidateKey(). */ +void trackingInvalidateSlot(uint64_t slot) { + if (TrackingTable == NULL || TrackingTable[slot] == NULL) return; + + raxIterator ri; + raxStart(&ri,TrackingTable[slot]); + raxSeek(&ri,"^",NULL,0); + while(raxNext(&ri)) { + uint64_t id; + memcpy(&id,ri.key,ri.key_len); + client *c = lookupClientByID(id); + if (c == NULL || !(c->flags & CLIENT_TRACKING)) continue; + sendTrackingMessage(c,slot); + } + raxStop(&ri); + + /* Free the tracking table: we'll create the radix tree and populate it + * again if more keys will be modified in this caching slot. */ + raxFree(TrackingTable[slot]); + TrackingTable[slot] = NULL; + TrackingTableUsedSlots--; +} + /* This function is called from signalModifiedKey() or other places in Redis * when a key changes value. In the context of keys tracking, our task here is * to send a notification to every client that may have keys about such caching @@ -164,25 +188,7 @@ void trackingInvalidateKey(robj *keyobj) { sds sdskey = keyobj->ptr; uint64_t hash = crc64(0, (unsigned char*)sdskey,sdslen(sdskey))&(TRACKING_TABLE_SIZE-1); - if (TrackingTable[hash] == NULL) return; - - raxIterator ri; - raxStart(&ri,TrackingTable[hash]); - raxSeek(&ri,"^",NULL,0); - while(raxNext(&ri)) { - uint64_t id; - memcpy(&id,ri.key,ri.key_len); - client *c = lookupClientByID(id); - if (c == NULL || !(c->flags & CLIENT_TRACKING)) continue; - sendTrackingMessage(c,hash); - } - raxStop(&ri); - - /* Free the tracking table: we'll create the radix tree and populate it - * again if more keys will be modified in this hash slot. */ - raxFree(TrackingTable[hash]); - TrackingTable[hash] = NULL; - TrackingTableUsedSlots--; + trackingInvalidateSlot(hash); } /* This function is called when one or all the Redis databases are flushed @@ -235,3 +241,17 @@ void trackingInvalidateKeysOnFlush(int dbid) { } } } + +/* Tracking forces Redis to remember information about which client may have + * keys about certian caching slots. In workloads where there are a lot of + * reads, but keys are hardly modified, the amount of information we have + * to remember server side could be a lot: for each 16 millions of caching + * slots we may end with a radix tree containing many entries. + * + * So Redis allows the user to configure a maximum fill rate for the + * invalidation table. This function makes sure that we don't go over the + * specified fill rate: if we are over, we can just evict informations about + * random caching slots, and send invalidation messages to clients like if + * the key was modified. */ +void trackingLimitUsedSlots(void) { +}