Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEATURE] Add bloom filter skipping index type #206

Closed
5 tasks done
dai-chen opened this issue Dec 22, 2023 · 4 comments
Closed
5 tasks done

[FEATURE] Add bloom filter skipping index type #206

dai-chen opened this issue Dec 22, 2023 · 4 comments
Labels
0.3 enhancement New feature or request

Comments

@dai-chen
Copy link
Collaborator

dai-chen commented Dec 22, 2023

Is your feature request related to a problem?

What solution would you like?

User Experience

Here is the example. See more details in comment below.

spark-sql> CREATE SKIPPING INDEX ON stream.test (name BLOOM_FILTER);
spark-sql> REFRESH SKIPPING INDEX ON stream.test;

spark-sql> EXPLAIN SELECT input_file_name() FROM stream.test WHERE name = 'Hello';
== Physical Plan ==
*(1) Project [input_file_name() AS input_file_name()#42]
+- *(1) Filter (isnotnull(name#0) AND (name#0 = hello))
   +- FileScan csv stream.value_set_test[name#0] Batched: false,
  DataFilters: [isnotnull(name#0), (name#0 = hello)], Format: CSV,
  Location: FlintSparkSkippingFileIndex(1 paths)[s3://test],
  PartitionFilters: [], PushedFilters: [IsNotNull(name), EqualTo(name,hello)],
  ReadSchema: struct

spark-sql> SELECT input_file_name() FROM stream.test WHERE name = 'hello';
s3://test/part-00000-560ff9f5-6180-4d82-9d34-9d913522f397-c000.csv
Time taken: 10.871 seconds, Fetched 1 row(s)

spark-sql> SELECT input_file_name() FROM stream.test WHERE name = 'test';
Time taken: 0.872 seconds

Skipping index data in OpenSearch:

POST flint_myglue_stream_value_set_test_skipping_index/_search
        ...
        "_source": {
          "file_path": "s3://test/part-00000-560ff9f5-6180-4d82-9d34-9d913522f397-c000.csv",
          "name": "AAAAAQAAAAUAAABzAAA...="
        }

Proposed Solution

Design decision from problem space to solution space:

Screenshot 2023-12-22 at 9 09 42 AM

  1. Restrict value set size as proposed above
  2. Add BloomFilter as UDF in SparkSQL: reuse Spark built-in BloomFilterAggregate and BloomFilterMightContain function and map to OpenSearch binary.
  3. Delegate BloomFilter predicate pushdown to SQL plugin [TBD]

Proof of Concept

PoC branch: https://github.com/dai-chen/opensearch-spark/tree/bloom-filter-skipping-poc

  • Build bloom filter in Spark and persist in OpenSearch
  • Optimize query by bloom filter in Spark
  • Integrate with Flint index codebase
  • Choose algorithm parameters
  • Optimize field size in OpenSearch
  1. Implement Bloom Filter data structure and its read/write function (BloomFilterAggregate and BloomFilterMightContain) in Flint Spark layer
  2. Enhance FlintClient to support binary type and might contain function pushdown (cannot delegate to SQL plugin because it's not Flint dependency at the moment)

Screenshot 2024-02-06 at 9 34 01 AM

Benchmark

Preparation

  1. Setup
    a. OpenSearch cluster
    b. Index setting/mapping: binary field with docvalue enabled
    c. Query cache disabled

  2. Test Data
    a. http_logs log files in compressed JSON format
    b. Total file number = 1045 in different partition on S3 bucket
    c. File size: 100K -> 25MB (compressed)
    d. clientip column cardinality per file: 1 -> 180K

  3. Test Case: (FPP is always 1%)
    a. Static NDV 512K
    b. Static 256K NDV: assume user has prior knowledge of max clientip cardinality
    c. Adaptive NDV with 10 BloomFilters from 1K to 512K

CREATE SKIPPING INDEX ON ds_tables.http_logs (clientip BLOOM_FILTER);

REFRESH SKIPPING INDEX ON ds_tables.http_logs;

DROP SKIPPING INDEX ON ds_tables.http_logs;

VACUUM SKIPPING INDEX ON ds_tables.http_logs;

# Q1: a client ip with almost no false positive
SELECT `@timestamp`, request FROM ds_tables.http_logs WHERE clientip = '205.132.4.0';

# Q2: a client ip may have ~5 false positive files
SELECT `@timestamp`, request FROM ds_tables.http_logs WHERE clientip = '139.127.0.0';

Test Result

Bloom Filter Configuration Build Latency (sec) Build Memory (MB) BloomFilter Size (MB) Query Latency (sec)
Static 512K NDV 593 TBD 1100 Q1: 4.2 - 5.2
Q2: 4.3 - 5.2
Static 256K NDV
(Prior knowledge of column cardinality)
580 733 Q1: 3.3 - 4.8
Q2: 3.8 - 4.3
Adaptive NDV
(10 BFs from 1K to 512K)
709 276 Q1: 3.6 - 4.2
Q2: 6.7 - 7.6

Test Result Analysis

  1. Static 512K NDV: generate biggest OS index because large BF with 512K NDV allocated per file
  2. Static 256K NDV: bigger OS index than adaptive but has best performance (because max cardinality is 180k)
  3. Adaptive NDV
    a. Generate smallest OS index but building latency and Q2 query is slower than the other 2 configurations
    b. For building latency, this is expected because insertion happens on 10 BF internally. This can be optimized by variant BF and discard a BF when it's saturated.
    c. For Q2, this is mainly because we use BF NDV right bigger than unique values. With same expected FPP and unique values inserted, bigger NDV decrease the actual FPP. In reality, we may choose lower expected FPP which consume more space as below but has lower FPP. For example, if we choose 0.1% as expected FPP, the OS index size will be doubled to 400M but FPP is much lower and may achieve same performance as Static 256K NDV config.

More Test on FPP

FPP impact on size:

NDV FPP Size (MB)
50K 3% 0.05
1% 0.07
0.1% 0.11
0.01% 0.15
512K 3% 0.47
1% 0.62
0.1% 0.94
0.01% 1.25
@dai-chen
Copy link
Collaborator Author

dai-chen commented Jan 12, 2024

BloomFilter Build PoC (Global Parameters)

Basic idea is to reuse Spark's BloomFilterAggregate. For some reason, Spark doesn't register it to built-in function repository and it only accepts expected item number and size (cannot specify FPP instead).

HTTP logs data set for test. There are 1045 files and clientip column cardinality is ~50K - 200K:

SELECT input_file_name(), approx_count_distinct(clientip) FROM ds_tables.http_logs GROUP BY input_file_name();
s3://httplogs/year=1998/month=6/day=10/part-00451-76cb51e0-1b8f-41ea-8bfd-e77261483002.c000.json.bz2	160818
s3://httplogs/year=1998/month=6/day=11/part-00631-76cb51e0-1b8f-41ea-8bfd-e77261483002.c000.json.bz2	163687
s3://httplogs/year=1998/month=6/day=10/part-00386-76cb51e0-1b8f-41ea-8bfd-e77261483002.c000.json.bz2	143661
s3://httplogs/year=1998/month=6/day=11/part-00544-76cb51e0-1b8f-41ea-8bfd-e77261483002.c000.json.bz2	158091
s3://httplogs/year=1998/month=6/day=11/part-00616-76cb51e0-1b8f-41ea-8bfd-e77261483002.c000.json.bz2	173761
s3://httplogs/year=1998/month=6/day=10/part-00434-76cb51e0-1b8f-41ea-8bfd-e77261483002.c000.json.bz2	138823
s3://httplogs/year=1998/month=6/day=11/part-00646-76cb51e0-1b8f-41ea-8bfd-e77261483002.c000.json.bz2	176312
s3://httplogs/year=1998/month=5/day=27/part-00198-76cb51e0-1b8f-41ea-8bfd-e77261483002.c000.json.bz2	17872
s3://httplogs/year=1998/month=6/day=12/part-00725-76cb51e0-1b8f-41ea-8bfd-e77261483002.c000.json.bz2	136647
s3://httplogs/year=1998/month=6/day=11/part-00602-76cb51e0-1b8f-41ea-8bfd-e77261483002.c000.json.bz2	147839
s3://httplogs/year=1998/month=6/day=12/part-00852-76cb51e0-1b8f-41ea-8bfd-e77261483002.c000.json.bz2	162665
s3://httplogs/year=1998/month=5/day=25/part-00175-76cb51e0-1b8f-41ea-8bfd-e77261483002.c000.json.bz2	17109
s3://httplogs/year=1998/month=6/day=12/part-00770-76cb51e0-1b8f-41ea-8bfd-e77261483002.c000.json.bz2	127598
s3://httplogs/year=1998/month=6/day=11/part-00612-76cb51e0-1b8f-41ea-8bfd-e77261483002.c000.json.bz2	177900
s3://httplogs/year=1998/month=6/day=12/part-00828-76cb51e0-1b8f-41ea-8bfd-e77261483002.c000.json.bz2	157072
...

Use UDF to convert IP to Long. Test BloomFilter with 100K expected items and default 0.03 FPP:

PUT bf_ip_test
{
  "mappings": {
    "properties": {
      "file_path": {
        "type": "keyword"
      },
      "clientip_bloom_filter": {
        "type": "binary"
      }
    }
  }
}

import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.catalyst.expressions._

import org.opensearch.flint.spark.skipping.bloomfilter.BloomFilterAggregate

val funcId_bloom_filter_agg = new FunctionIdentifier("bloom_filter_agg")
val funcId_might_contain = new FunctionIdentifier("might_contain")

// Register 'bloom_filter_agg' to builtin.
spark.sessionState.functionRegistry.registerFunction(funcId_bloom_filter_agg,
  new ExpressionInfo(classOf[BloomFilterAggregate].getName, "bloom_filter_agg"),
  (children: Seq[Expression]) => children.size match {
    case 1 => new BloomFilterAggregate(children.head)
    case 2 => new BloomFilterAggregate(children.head, children(1))
    case 3 => new BloomFilterAggregate(children.head, children(1), children(2))
  })

import org.opensearch.flint.spark.skipping.bloomfilter.BloomFilterMightContain

// Register 'might_contain' to builtin.
spark.sessionState.functionRegistry.registerFunction(funcId_might_contain,
  new ExpressionInfo(classOf[BloomFilterMightContain].getName, "might_contain"),
  (children: Seq[Expression]) => BloomFilterMightContain(children.head, children(1)))

val ip_to_num = udf((ip: String) => {
    val arr = ip.split('.').map(_.toLong)
    require(arr.length == 4, s"Invalid IPv4: ${ip}")
    arr(0) << 24 | arr(1) << 16 | arr(2) << 8 | arr(3)
})

spark.udf.register("ip_to_num", ip_to_num)

spark.sql("SELECT input_file_name() AS file_path, bloom_filter_agg(ip_to_num(clientip), 100000L) AS clientip_bloom_filter FROM ds_tables.http_logs GROUP BY input_file_name()").write.format("flint").mode("overwrite").save("bf_ip_test_3")

spark.conf.set("spark.sql.codegen.wholeStage", "false")

spark.sql("SELECT clientip, COUNT(*) AS file_count FROM (SELECT DISTINCT clientip, input_file_name() FROM ds_tables.http_logs) GROUP BY clientip ORDER BY file_count ASC LIMIT 5").show
+-----------+----------+
|   clientip|file_count|
+-----------+----------+
| 6.100.18.0|         1|
|139.127.0.0|         1|
| 77.204.4.0|         1|
|205.132.4.0|         1|
|79.176.13.0|         1|
+-----------+----------+

spark.read.format("flint").load("bf_ip_test_3").filter(expr(s"might_contain(clientip_bloom_filter, ip_to_num('6.100.18.0'))")).select("file_path").count
res18: Long = 19

Bloom filter size ~0.1MB per file (BF with 1000K takes 0.6MB but Flint data source has problem with long binary):

health status index                                                         uuid                   pri rep docs.count docs.deleted store.size pri.store.size
green  open   bf_ip_test_3                                                  O1DESZ9TQASrmJMlA0L2bA   5   2       1045            0    349.8mb        116.6mb

Tested BF with 1000K expected item:

// Reduce scroll page size to avoid overflow 100MB response limit
spark.conf.set("spark.datasource.flint.read.scroll_size", 10)

spark.read.format("flint").load("bf_ip_test_2").filter(expr(s"might_contain(clientip_bloom_filter, ip_to_num('6.100.18.0'))")).select("file_path").count
res9: Long = 1

@dai-chen
Copy link
Collaborator Author

dai-chen commented Jan 17, 2024

Query Rewrite PoC

By default, docValue is disabled for OpenSearch binary field. In this PoC, we create a new index with docValue enabled:

PUT bf_ip_test_4
{
  "mappings": {
    "properties": {
      "file_path": {
        "type": "keyword"
      },
      "clientip_bloom_filter": {
        "type": "binary",
        "doc_values": true
      }
    }
  }
}

scala> spark.sql("SELECT input_file_name() AS file_path, bloom_filter_agg(ip_to_num(clientip), 1000000L) AS clientip_bloom_filter FROM ds_tables.http_logs GROUP BY input_file_name()").write.format("flint").mode("overwrite").save("bf_ip_test_4")

The index size for BF 1M expected item increase to 1.6GB (1045 docs in total => ~1.6MB BF per file)

green  open   bf_ip_test_4                                                  NQwe5hqGQqiilHR_pttA7w   5   2       1045            0      4.8gb          1.6gb

Push down Spark's BloomFilterImpl.mightContain() to Painless script. Because Painless only supports basic Java API, we need to inline any class unsupported, such as ByteArrayInputStream:

GET bf_ip_test_4/_search
{
  "query": {
    "bool": {
      "filter": {
        "script": {
          "script": {
            "source": """
int hashLong(long input, int seed) {
    int low = (int) input;
    int high = (int) (input >>> 32);

    int k1 = mixK1(low);
    int h1 = mixH1(seed, k1);

    k1 = mixK1(high);
    h1 = mixH1(h1, k1);

    return fmix(h1, 8);
}

int mixK1(int k1) {
    k1 *= 0xcc9e2d51L;
    k1 = Integer.rotateLeft(k1, 15);
    k1 *= 0x1b873593L;
    return k1;
}

int mixH1(int h1, int k1) {
    h1 ^= k1;
    h1 = Integer.rotateLeft(h1, 13);
    h1 = h1 * 5 + (int) 0xe6546b64L;
    return h1;
}

int fmix(int h1, int length) {
    h1 ^= length;
    h1 ^= h1 >>> 16;
    h1 *= 0x85ebca6bL;
    h1 ^= h1 >>> 13;
    h1 *= 0xc2b2ae35L;
    h1 ^= h1 >>> 16;
    return h1;
}

BytesRef bfBytes = doc['clientip_bloom_filter'].value;
byte[] buf = bfBytes.bytes;
int pos = 0;
int count = buf.length;
// int version = dis.readInt();
int ch1 = (pos < count) ? (buf[pos++] & (int) 0xffL) : -1;
int ch2 = (pos < count) ? (buf[pos++] & (int) 0xffL) : -1;
int ch3 = (pos < count) ? (buf[pos++] & (int) 0xffL) : -1;
int ch4 = (pos < count) ? (buf[pos++] & (int) 0xffL) : -1;
int version = ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0));
// int numHashFunctions = dis.readInt();
ch1 = (pos < count) ? (buf[pos++] & (int) 0xffL) : -1;
ch2 = (pos < count) ? (buf[pos++] & (int) 0xffL) : -1;
ch3 = (pos < count) ? (buf[pos++] & (int) 0xffL) : -1;
ch4 = (pos < count) ? (buf[pos++] & (int) 0xffL) : -1;
int numHashFunctions = ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0));
// int numWords = dis.readInt();
ch1 = (pos < count) ? (buf[pos++] & (int) 0xffL) : -1;
ch2 = (pos < count) ? (buf[pos++] & (int) 0xffL) : -1;
ch3 = (pos < count) ? (buf[pos++] & (int) 0xffL) : -1;
ch4 = (pos < count) ? (buf[pos++] & (int) 0xffL) : -1;
int numWords = ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0));

// Create BitArray internally
long[] data = new long[numWords];
byte[] readBuffer = new byte[8];
for (int i = 0; i < numWords; i++) {

  // data[i] = dis.readLong()
  int n = 0;
  while (n < 8) {
    int count2;
    // int count2 = in.read(readBuffer, off + n, len - n);
    int off = n;
    int len = 8 - n;
    if (pos >= count) {
      count2 = -1;
    } else {
      int avail = count - pos;
      if (len > avail) {
        len = avail;
      }
      if (len <= 0) {
        count2 = 0;
      } else {
        System.arraycopy(buf, pos, readBuffer, off, len);
        pos += len;
        count2 = len;
      }
    }
    n += count2;
  }
  data[i] = (((long) readBuffer[0] << 56) +
      ((long) (readBuffer[1] & 255) << 48) +
      ((long) (readBuffer[2] & 255) << 40) +
      ((long) (readBuffer[3] & 255) << 32) +
      ((long) (readBuffer[4] & 255) << 24) +
      ((readBuffer[5] & 255) << 16) +
      ((readBuffer[6] & 255) << 8) +
      ((readBuffer[7] & 255) << 0));
}
long bitCount = 0;
for (long word : data) {
  bitCount += Long.bitCount(word);
}

// BloomFilterImpl.mightContainLong(item)
long item = params.ip;
int h1 = hashLong(item, 0);
int h2 = hashLong(item, h1);

long bitSize = (long) data.length * Long.SIZE;
for (int i = 1; i <= numHashFunctions; i++) {
  int combinedHash = h1 + (i * h2);
  // Flip all the bits if it's negative (guaranteed positive number)
  if (combinedHash < 0) {
    combinedHash = ~combinedHash;
  }
  if ((data[(int) (combinedHash % bitSize >>> 6)] & (1L << combinedHash % bitSize)) == 0) {
    return false;
  }
}
return true
            """,
            "params": {
              "ip": 107221504
            }
          }
        }
      }
    }
  }
}

Result:

    "hits": [
      {
        "_index": "bf_ip_test_4",
        "_id": "ojr4GI0B9arJAA1FN11b",
        "_score": 0,
        "_source": {
          "file_path": "s3://httplogs/http_logs_partitioned_json_bz2/year=1998/month=6/day=12/part-00820-76cb51e0-1b8f-41ea-8bfd-e77261483002.c000.json.bz2",
          "clientip_bloom_filter": "AAAAAQAAAAYAAehIAAAAAAAg..."

Verify:

scala> spark.sql("SELECT clientip, input_file_name() FROM ds_tables.http_logs WHERE clientip = '6.100.18.0'").show(false)
24/01/17 21:03:43 WARN CredentialsLegacyConfigLocationProvider: Found the legacy config profiles file at [/home/hadoop/.aws/config]. Please move it to the latest default location [~/.aws/credentials].
24/01/17 21:03:43 WARN CredentialsLegacyConfigLocationProvider: Found the legacy config profiles file at [/home/hadoop/.aws/config]. Please move it to the latest default location [~/.aws/credentials].
24/01/17 21:03:43 WARN CredentialsLegacyConfigLocationProvider: Found the legacy config profiles file at [/home/hadoop/.aws/config]. Please move it to the latest default location [~/.aws/credentials].
+----------+-----------------------------------------------------------------------------------------------------------------------------------------------------+
|clientip  |input_file_name()                                                                                                                                    |
+----------+-----------------------------------------------------------------------------------------------------------------------------------------------------+
|6.100.18.0|s3://httplogs/http_logs_partitioned_json_bz2/year=1998/month=6/day=12/part-00820-76cb51e0-1b8f-41ea-8bfd-e77261483002.c000.json.bz2|
+----------+-----------------------------------------------------------------------------------------------------------------------------------------------------+

More test:

scala> spark.sql("SELECT clientip, COUNT(*) AS file_count FROM (SELECT DISTINCT clientip, input_file_name() FROM ds_tables.http_logs) GROUP BY clientip HAVING COUNT(*) = 3 ORDER BY file_count ASC LIMIT 5").show
+-----------+----------+
|   clientip|file_count|
+-----------+----------+
|186.207.8.0|         3|
|243.254.9.0|         3|
| 160.69.1.0|         3|
| 165.71.0.0|         3|
|  41.96.1.0|         3|
+-----------+----------+

scala> spark.sql("select ip_to_num('243.254.9.0')").show
+----------------------+
|ip_to_num(243.254.9.0)|
+----------------------+
|            4093511936|
+----------------------+

# Run the query with IP 243.254.9.0
    "hits": [
      {
        "_index": "bf_ip_test_4",
        "_id": "sDX3GI0BWdGHpYUCxqxK",
        "_score": 0,
        "fields": {
          "file_path": [
            "s3://httplogs/http_logs_partitioned_json_bz2/year=1998/month=5/day=30/part-00217-76cb51e0-1b8f-41ea-8bfd-e77261483002.c000.json.bz2"
          ]
        }
      },
      {
        "_index": "bf_ip_test_4",
        "_id": "UzX3GI0BWdGHpYUCpKyP",
        "_score": 0,
        "fields": {
          "file_path": [
            "s3://httplogs/http_logs_partitioned_json_bz2/year=1998/month=5/day=30/part-00215-76cb51e0-1b8f-41ea-8bfd-e77261483002.c000.json.bz2"
          ]
        }
      },
      {
        "_index": "bf_ip_test_4",
        "_id": "LDr3GI0B9arJAA1Fx1wB",
        "_score": 0,
        "fields": {
          "file_path": [
            "s3://httplogs/http_logs_partitioned_json_bz2/year=1998/month=5/day=30/part-00216-76cb51e0-1b8f-41ea-8bfd-e77261483002.c000.json.bz2"
          ]
        }
      }
    ]

@dai-chen
Copy link
Collaborator Author

dai-chen commented Jan 30, 2024

BloomFilter Storage Optimization in OpenSearch

  1. Field type choice: binary type or other?
  2. Field mapping
    a. _source
    b. docvalue
    c. stored field
    d. compression algorithm

Quick tested the following index but found not much difference:

POST binary_test_X/_doc
{
  "file_path": "s3://httplogs/http_logs_partitioned_json_bz2/year=1998/month=6/day=10/part-00409-76cb51e0-1b8f-41ea-8bfd-e77261483002.c000.json.bz2",
  "clientip_bloom_filter": "VGhpcyBpcyBhIG5lZWRzY3JpcHQgMTAwMEIgc3RyaW5nOgoKR2V0IHlvdSBhIGJhc2U2NCBzdHJpbmc6CkkgbmVlZCBpdCBmb3IgdGVzdAoKRmFpbGVkIHRvIGJlIGluIFhNTDoKLS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUMwakNDQXhTZ0F3SUJBZ0lKQU5oOGJ5dGdXNFlBYXpMdXZINHRzclNuTThOS0VhWllVSzlYdDgxZjRjc1MKVnpKN2RReVNKT2hBSWh2N3ZaNkYydjJ0T2xONmtHcG14QXZZVmdBNXpLWGhVUExQR2xEa2w2OHlwaGg3YkRFUQoxYytqM0FLcXRkRFZrR1VSS1NWMjRXbDBuVWZwZllTSERrWVZWT0dWcmQzOHc2dCtMaExSRG0vaFdCclVsSnQwY0N4CkI0RmtVUjB0SVU3Rk5aUG1iZlJxRzRZRmlaeTB0ZEdaRXRkcXdYcXd6Q1F3R0Z1dTRFbUhwZ3hRVVNENjU1SHNkM1gKMjY1Q2tIaGg5eEw5V2pxK2JkcXg4OXkxdTFIVHJyS1FUaFBxMmxLTFZMZUxJczJYN2pmUTBqZ0t0d2M1ajFLQgp5RWpFbnE3TVhRSnp4Z0k1eGQ0cHdJWjNJSWxURlRKWUhUc09rWEt6eVNvb0htM0lGMFQwRkdSTUhhWjdpUEhrCkhwSmZJTE5wVE5zYkpUdG1KVjJ5dUN2S3JjRFBsOVZSLlpwMjlXN3I1YWJvVUxZS1hxdDRaNVU1RnVtZVJnS0UwMApFRS9lQklHbz0KLS0tLS1FTkQgQ0VSVElGSUNBVEUtLS0tLQo="
}

# DocValue is disabled by default
PUT binary_test_1
{
  "mappings": {
    "properties": {
      "file_path": {
        "type": "keyword"
      },
      "clientip_bloom_filter": {
        "type": "binary"
      }
    }
  }
}

# Enable docValue required by pushdown
PUT binary_test_2
{
  "mappings": {
    "properties": {
      "file_path": {
        "type": "keyword"
      },
      "clientip_bloom_filter": {
        "type": "binary",
        "doc_values": true
      }
    }
  }
}

# Disable _source
PUT binary_test_3
{
  "mappings": {
    "_source": {
      "enabled": false
    },
    "properties": {
      "file_path": {
        "type": "keyword"
      },
      "clientip_bloom_filter": {
        "type": "binary",
        "doc_values": true
      }
    }
  }
}

# Exclude instead of disabling _source
# because a _restore_source field found in Lucene index
PUT binary_test_4
{
  "mappings": {
    "_source": {
      "excludes": ["file_path", "clientip_bloom_filter"]
    },
    "properties": {
      "file_path": {
        "type": "keyword"
      },
      "clientip_bloom_filter": {
        "type": "binary",
        "doc_values": true
      }
    }
  }
}

# Store BF only in stored field
PUT binary_test_5
{
  "mappings": {
    "properties": {
      "file_path": {
        "type": "keyword"
      },
      "clientip_bloom_filter": {
        "type": "binary",
        "store": true
      }
    }
  }
}

# Use best compression
PUT binary_test_6
{
  "settings": {
    "index.codec": "best_compression"
  }, 
  "mappings": {
    "properties": {
      "file_path": {
        "type": "keyword"
      },
      "clientip_bloom_filter": {
        "type": "binary"
      }
    }
  }
}

@dai-chen
Copy link
Collaborator Author

dai-chen commented Jan 30, 2024

BloomFilter Algorithm Parameter Selection

Strategy

Here are different strategies for determining Bloom filter algorithm parameters:

  • Bloom Filter with Global Parameters:

    • Involves using the same Bloom filter parameters for all files.
    • Default configuration includes parameters such as FPP 0.03 and 1 million items (refer to Databricks documentation).
    • Potential issue: Large Bloom filter on very small files (e.g., Parquet Issue).
  • Adaptive Bloom Filter per File:

    • Adapts the Bloom filter parameters based on the characteristics of each individual file.

    • Aims to optimize the filter size for each file.

    • Possible solutions:

      • Approximate Parameter Sampling

        • Sample each file to calculate approximate column cardinality
      • Adaptive Bloom Filter

        • Build multiple bloom filter and choose best candidate (PARQUET-2254)
      • Scalable Bloom Filter

        • Involves dynamically adjusting the filter size to accommodate varying amounts of data.
        • Discussed in the context of Redis (Scalable Bloom Filter in Redis).

Adaptive Bloom Filter

Pseudocode:

class AdaptiveBloomFilter:
    // Initialize 10 candidate BF which NDV exponentially increase
    val ranges = (1K -> 512K)
    val candidateBFs = ranges.map(ndv => BloomFilter.create(ndv))

    // How many unique values seen so far (approximated by BF.put() result)
    var total = 0

    def put(item: Long):
        // bitChanged=true means this is the first time for this item inserted to BF
        // bitChanged=false mean maybe first time or not
        // Use last (biggest) BF for more accurate result
        val bitChanged = candidates.map(_.putLong(item)).last
        if (bitChanged) {
          total = total + 1
        }

    def bestCandidate():
        // Use candidate with NDV slightly greater than total counter
        val index = ranges.indexWhere(range => total < range * K)
        candidates(index)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
0.3 enhancement New feature or request
Development

No branches or pull requests

1 participant