diff --git a/src/backend/lib/integerset.c b/src/backend/lib/integerset.c index 74ed621554..617cb2b1c6 100644 --- a/src/backend/lib/integerset.c +++ b/src/backend/lib/integerset.c @@ -773,7 +773,7 @@ intset_binsrch_leaf(uint64 item, leaf_item *arr, int arr_elems, bool nextkey) * * Simple-8b algorithm packs between 1 and 240 integers into 64-bit words, * called "codewords". The number of integers packed into a single codeword - * depends on the integers being packed: small integers are encoded using + * depends on the integers being packed; small integers are encoded using * fewer bits than large integers. A single codeword can store a single * 60-bit integer, or two 30-bit integers, for example. * @@ -783,11 +783,11 @@ intset_binsrch_leaf(uint64 item, leaf_item *arr, int arr_elems, bool nextkey) * of the absolute values. * * In Simple-8b, each codeword consists of a 4-bit selector, which indicates - * how many integers are encoded in the codeword, and the encoded integers + * how many integers are encoded in the codeword, and the encoded integers are * packed into the remaining 60 bits. The selector allows for 16 different - * ways of using the remaining 60 bits, "modes". The number of integers - * packed into a single codeword is listed in the simple8b_modes table below. - * For example, consider the following codeword: + * ways of using the remaining 60 bits, called "modes". The number of integers + * packed into a single codeword in each mode is listed in the simple8b_modes + * table below. For example, consider the following codeword: * * 20-bit integer 20-bit integer 20-bit integer * 1101 00000000000000010010 01111010000100100000 00000000000000010100 @@ -835,6 +835,7 @@ static const struct {20, 3}, /* mode 13: three 20-bit integers */ {30, 2}, /* mode 14: two 30-bit integers */ {60, 1}, /* mode 15: one 60-bit integer */ + {0, 0} /* sentinel value */ }; @@ -842,15 +843,20 @@ static const struct * EMPTY_CODEWORD is a special value, used to indicate "no values". * It is used if the next value is too large to be encoded with Simple-8b. * - * This value looks like a 0-mode codeword, but we check for it + * This value looks like a 0-mode codeword, but we check for it * specifically. (In a real 0-mode codeword, all the unused bits are zero.) */ -#define EMPTY_CODEWORD UINT64CONST(0xFFFFFFFFFFFFFFF0) +#define EMPTY_CODEWORD UINT64CONST(0x0FFFFFFFFFFFFFFF) /* * Encode a number of integers into a Simple-8b codeword. * - * Returns the number of integers that were encoded. + * The input array must contain at least SIMPLE8B_MAX_VALUES_PER_CODEWORD + * elements. + * + * Returns the encoded codeword, and sets *num_encoded to the number + * input integers that were encoded. It can be zero, if the first input is + * too large to be encoded. */ static uint64 simple8b_encode(uint64 *ints, int *num_encoded, uint64 base) @@ -861,7 +867,6 @@ simple8b_encode(uint64 *ints, int *num_encoded, uint64 base) uint64 diff; uint64 last_val; uint64 codeword; - uint64 diffs[60]; int i; Assert(ints[0] > base); @@ -891,13 +896,12 @@ simple8b_encode(uint64 *ints, int *num_encoded, uint64 base) selector++; nints = simple8b_modes[selector].num_ints; bits = simple8b_modes[selector].bits_per_int; + if (i >= nints) break; } else { - if (i < 60) - diffs[i] = diff; i++; if (i >= nints) break; @@ -910,7 +914,13 @@ simple8b_encode(uint64 *ints, int *num_encoded, uint64 base) if (nints == 0) { - /* The next value is too large and be encoded with Simple-8b */ + /* + * The first value is too large to be encoded with Simple-8b. + * + * If there is at least one not-too-large integer in the input, we + * will encode it using mode 15 (or a more compact mode). Hence, we + * only get here, if the *first* input integer is >= 2^60. + */ Assert(i == 0); *num_encoded = 0; return EMPTY_CODEWORD; @@ -924,16 +934,18 @@ simple8b_encode(uint64 *ints, int *num_encoded, uint64 base) codeword = 0; if (bits > 0) { - for (i = nints - 1; i >= 0; i--) + for (i = nints - 1; i > 0; i--) { + diff = ints[i] - ints[i - 1] - 1; + codeword |= diff; codeword <<= bits; - codeword |= diffs[i]; } + diff = ints[0] - base - 1; + codeword |= diff; } /* add selector to the codeword, and return */ - codeword <<= 4; - codeword |= selector; + codeword |= (uint64) selector << 60; *num_encoded = nints; return codeword; @@ -945,7 +957,7 @@ simple8b_encode(uint64 *ints, int *num_encoded, uint64 base) static int simple8b_decode(uint64 codeword, uint64 *decoded, uint64 base) { - int selector = codeword & 0x0f; + int selector = (codeword >> 60); int nints = simple8b_modes[selector].num_ints; uint64 bits = simple8b_modes[selector].bits_per_int; uint64 mask = (UINT64CONST(1) << bits) - 1; @@ -954,8 +966,6 @@ simple8b_decode(uint64 codeword, uint64 *decoded, uint64 base) if (codeword == EMPTY_CODEWORD) return 0; - codeword >>= 4; /* shift out the selector */ - prev_value = base; for (int i = 0; i < nints; i++) { @@ -976,15 +986,13 @@ simple8b_decode(uint64 codeword, uint64 *decoded, uint64 base) static bool simple8b_contains(uint64 codeword, uint64 key, uint64 base) { - int selector = codeword & 0x0f; + int selector = (codeword >> 60); int nints = simple8b_modes[selector].num_ints; int bits = simple8b_modes[selector].bits_per_int; if (codeword == EMPTY_CODEWORD) return false; - codeword >>= 4; /* shift out the selector */ - if (bits == 0) { /* Special handling for 0-bit cases. */ diff --git a/src/test/modules/test_integerset/test_integerset.c b/src/test/modules/test_integerset/test_integerset.c index 32713f4baa..eec9e7d0ce 100644 --- a/src/test/modules/test_integerset/test_integerset.c +++ b/src/test/modules/test_integerset/test_integerset.c @@ -539,6 +539,7 @@ test_huge_distances(void) val = 0; values[num_values++] = val; + /* Test differences on both sides of the 2^60 boundary. */ val += UINT64CONST(1152921504606846976) - 1; /* 2^60 - 1 */ values[num_values++] = val; @@ -563,16 +564,19 @@ test_huge_distances(void) val += UINT64CONST(1152921504606846976) + 1; /* 2^60 + 1 */ values[num_values++] = val; - val += UINT64CONST(1152921504606846976); /* 2^60 */ + val += UINT64CONST(1152921504606846976) + 2; /* 2^60 + 2 */ values[num_values++] = val; - /* we're now very close to 2^64, so can't add large values anymore */ + val += UINT64CONST(1152921504606846976) + 2; /* 2^60 + 2 */ + values[num_values++] = val; - intset = intset_create(); + val += UINT64CONST(1152921504606846976); /* 2^60 */ + values[num_values++] = val; /* - * Add many more values to the end, to make sure that all the above values - * get flushed and packed into the tree structure. + * We're now very close to 2^64, so can't add large values anymore. But + * add more smaller values to the end, to make sure that all the above + * values get flushed and packed into the tree structure. */ while (num_values < 1000) { @@ -580,7 +584,8 @@ test_huge_distances(void) values[num_values++] = val; } - /* Add these numbers to the set */ + /* Create an IntegerSet using these values */ + intset = intset_create(); for (int i = 0; i < num_values; i++) intset_add_member(intset, values[i]);