From 0c4733c8d7c50e0b0dc3373733fddfff3f62dda0 Mon Sep 17 00:00:00 2001 From: Oran Agra Date: Sun, 17 Apr 2022 17:16:46 +0300 Subject: [PATCH] Optimize integer zset scores in listpack (converting to string and back) (#10486) When the score doesn't have fractional part, and can be stored as an integer, we use the integer capabilities of listpack to store it, rather than convert it to string. This already existed before this PR (lpInsert dose that conversion implicitly). But to do that, we would have first converted the score from double to string (calling `d2string`), then pass the string to `lpAppend` which identified it as being an integer and convert it back to an int. Now, instead of converting it to a string, we store it using lpAppendInteger`. Unrelated: --- * Fix the double2ll range check (negative and positive ranges, and also the comparison operands were slightly off. but also, the range could be made much larger, see comment). * Unify the double to string conversion code in rdb.c with the one in util.c * Small optimization in lpStringToInt64, don't attempt to convert strings that are obviously too long. Benchmark; --- Up to 20% improvement in certain tight loops doing zzlInsert with large integers. (if listpack is pre-allocated to avoid realloc, and insertion is sorted from largest to smaller) --- src/listpack.c | 3 ++- src/listpack.h | 2 ++ src/rdb.c | 19 ++++--------------- src/t_zset.c | 16 ++++++++++++---- src/util.c | 49 ++++++++++++++++++++++++++++++++++--------------- src/util.h | 1 + 6 files changed, 55 insertions(+), 35 deletions(-) diff --git a/src/listpack.c b/src/listpack.c index e651e4960..75189f55f 100644 --- a/src/listpack.c +++ b/src/listpack.c @@ -180,7 +180,8 @@ int lpStringToInt64(const char *s, unsigned long slen, int64_t *value) { int negative = 0; uint64_t v; - if (plen == slen) + /* Abort if length indicates this cannot possibly be an int */ + if (slen == 0 || slen >= LONG_STR_SIZE) return 0; /* Special case: first and only digit is 0. */ diff --git a/src/listpack.h b/src/listpack.h index 6c4d6bdd6..3e750af5b 100644 --- a/src/listpack.h +++ b/src/listpack.h @@ -59,6 +59,8 @@ void lpFree(unsigned char *lp); unsigned char* lpShrinkToFit(unsigned char *lp); unsigned char *lpInsertString(unsigned char *lp, unsigned char *s, uint32_t slen, unsigned char *p, int where, unsigned char **newp); +unsigned char *lpInsertInteger(unsigned char *lp, long long lval, + unsigned char *p, int where, unsigned char **newp); unsigned char *lpPrepend(unsigned char *lp, unsigned char *s, uint32_t slen); unsigned char *lpPrependInteger(unsigned char *lp, long long lval); unsigned char *lpAppend(unsigned char *lp, unsigned char *s, uint32_t slen); diff --git a/src/rdb.c b/src/rdb.c index 7f2225dfb..62ec5bbb2 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -588,22 +588,11 @@ int rdbSaveDoubleValue(rio *rdb, double val) { len = 1; buf[0] = (val < 0) ? 255 : 254; } else { -#if (DBL_MANT_DIG >= 52) && (LLONG_MAX == 0x7fffffffffffffffLL) - /* Check if the float is in a safe range to be casted into a - * long long. We are assuming that long long is 64 bit here. - * Also we are assuming that there are no implementations around where - * double has precision < 52 bit. - * - * Under this assumptions we test if a double is inside an interval - * where casting to long long is safe. Then using two castings we - * make sure the decimal part is zero. If all this is true we use - * integer printing function that is much faster. */ - double min = -4503599627370495; /* (2^52)-1 */ - double max = 4503599627370496; /* -(2^52) */ - if (val > min && val < max && val == ((double)((long long)val))) - ll2string((char*)buf+1,sizeof(buf)-1,(long long)val); + long long lvalue; + /* Integer printing function is much faster, check if we can safely use it. */ + if (double2ll(val, &lvalue)) + ll2string((char*)buf+1,sizeof(buf)-1,lvalue); else -#endif snprintf((char*)buf+1,sizeof(buf)-1,"%.17g",val); buf[0] = strlen((char*)buf+1); len = buf[0]+1; diff --git a/src/t_zset.c b/src/t_zset.c index 2169301cc..7796a6dec 100644 --- a/src/t_zset.c +++ b/src/t_zset.c @@ -1029,17 +1029,25 @@ unsigned char *zzlInsertAt(unsigned char *zl, unsigned char *eptr, sds ele, doub unsigned char *sptr; char scorebuf[MAX_D2STRING_CHARS]; int scorelen; - - scorelen = d2string(scorebuf,sizeof(scorebuf),score); + long long lscore; + int score_is_long = double2ll(score, &lscore); + if (!score_is_long) + scorelen = d2string(scorebuf,sizeof(scorebuf),score); if (eptr == NULL) { zl = lpAppend(zl,(unsigned char*)ele,sdslen(ele)); - zl = lpAppend(zl,(unsigned char*)scorebuf,scorelen); + if (score_is_long) + zl = lpAppendInteger(zl,lscore); + else + zl = lpAppend(zl,(unsigned char*)scorebuf,scorelen); } else { /* Insert member before the element 'eptr'. */ zl = lpInsertString(zl,(unsigned char*)ele,sdslen(ele),eptr,LP_BEFORE,&sptr); /* Insert score after the member. */ - zl = lpInsertString(zl,(unsigned char*)scorebuf,scorelen,sptr,LP_AFTER,NULL); + if (score_is_long) + zl = lpInsertInteger(zl,lscore,sptr,LP_AFTER,NULL); + else + zl = lpInsertString(zl,(unsigned char*)scorebuf,scorelen,sptr,LP_AFTER,NULL); } return zl; } diff --git a/src/util.c b/src/util.c index 45591d9f2..ae1f89b65 100644 --- a/src/util.c +++ b/src/util.c @@ -552,6 +552,36 @@ int string2d(const char *s, size_t slen, double *dp) { return 1; } +/* Returns 1 if the double value can safely be represented in long long without + * precision loss, in which case the corresponding long long is stored in the out variable. */ +int double2ll(double d, long long *out) { +#if (DBL_MANT_DIG >= 52) && (DBL_MANT_DIG <= 63) && (LLONG_MAX == 0x7fffffffffffffffLL) + /* Check if the float is in a safe range to be casted into a + * long long. We are assuming that long long is 64 bit here. + * Also we are assuming that there are no implementations around where + * double has precision < 52 bit. + * + * Under this assumptions we test if a double is inside a range + * where casting to long long is safe. Then using two castings we + * make sure the decimal part is zero. If all this is true we can use + * integer without precision loss. + * + * Note that numbers above 2^52 and below 2^63 use all the fraction bits as real part, + * and the exponent bits are positive, which means the "decimal" part must be 0. + * i.e. all double values in that range are representable as a long without precision loss, + * but not all long values in that range can be represented as a double. + * we only care about the first part here. */ + if (d < -LLONG_MAX/2 || d > LLONG_MAX/2) + return 0; + long long ll = d; + if (ll == d) { + *out = ll; + return 1; + } +#endif + return 0; +} + /* Convert a double to a string representation. Returns the number of bytes * required. The representation should always be parsable by strtod(3). * This function does not support human-friendly formatting like ld2string @@ -572,22 +602,11 @@ int d2string(char *buf, size_t len, double value) { else len = snprintf(buf,len,"0"); } else { -#if (DBL_MANT_DIG >= 52) && (LLONG_MAX == 0x7fffffffffffffffLL) - /* Check if the float is in a safe range to be casted into a - * long long. We are assuming that long long is 64 bit here. - * Also we are assuming that there are no implementations around where - * double has precision < 52 bit. - * - * Under this assumptions we test if a double is inside an interval - * where casting to long long is safe. Then using two castings we - * make sure the decimal part is zero. If all this is true we use - * integer printing function that is much faster. */ - double min = -4503599627370495; /* (2^52)-1 */ - double max = 4503599627370496; /* -(2^52) */ - if (value > min && value < max && value == ((double)((long long)value))) - len = ll2string(buf,len,(long long)value); + long long lvalue; + /* Integer printing function is much faster, check if we can safely use it. */ + if (double2ll(value, &lvalue)) + len = ll2string(buf,len,lvalue); else -#endif len = snprintf(buf,len,"%.17g",value); } diff --git a/src/util.h b/src/util.h index 5ea71fecd..0515f1a83 100644 --- a/src/util.h +++ b/src/util.h @@ -75,6 +75,7 @@ int string2d(const char *s, size_t slen, double *dp); int trimDoubleString(char *buf, size_t len); int d2string(char *buf, size_t len, double value); int ld2string(char *buf, size_t len, long double value, ld2string_mode mode); +int double2ll(double d, long long *out); int yesnotoi(char *s); sds getAbsolutePath(char *filename); long getTimeZone(void);