Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add refresh index API for partition skipping index #1658

Merged

Conversation

dai-chen
Copy link
Collaborator

@dai-chen dai-chen commented May 25, 2023

Description

Please see more details in changes in doc: https://github.com/dai-chen/sql-1/blob/add-partion-index-building/flint/docs/index.md

Flint Metadata Spec

Update metadata with more info (index kind, column name and type). In this way, FlintSparkSkippingIndex can be deserialized out of _meta JSON.

Flint Spark API

Add refreshIndex API with FULL (batch) and INCREMENTAL (streaming) mode.

Partition Index Implementation

Followed the first approach below for now. Need to evaluate the performance later.

  • Approach 1: treat partition columns as normal and use FIRST_VALUE function to aggregate
  • Approach 2: treat partition columns as special and add partition columns to GROUP BY
scala> spark.sql("select first_value(year, true) as year, first_value(month, true) as month, first_value(day, true), first_value(hour, true), input_file_name() from alb_logs group by input_file_name()").explain
== Physical Plan ==
*(2) HashAggregate(keys=[_nondeterministic#146], functions=[first_value(year#33, true), first_value(month#34, true), first_value(day#35, true), first_value(hour#36, true)])
+- Exchange hashpartitioning(_nondeterministic#146, 200), ENSURE_REQUIREMENTS, [id=#112]
   +- *(1) HashAggregate(keys=[_nondeterministic#146], functions=[partial_first_value(year#33, true), partial_first_value(month#34, true), partial_first_value(day#35, true), partial_first_value(hour#36, true)])
      +- *(1) Project [year#33, month#34, day#35, hour#36, input_file_name() AS _nondeterministic#146]
         +- FileScan csv default.alb_logs[year#33,month#34,day#35,hour#36] Batched: false, DataFilters: [], Format: CSV, Location: CatalogFileIndex[s3a://maximus.dev.daichen.us-west-2/alb-log-daily], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<>

scala> spark.sql("select year, month, day, hour, input_file_name() from alb_logs group by year, month, day, hour, input_file_name()").explain
== Physical Plan ==
*(2) HashAggregate(keys=[year#33, month#34, day#35, hour#36, _nondeterministic#169], functions=[])
+- Exchange hashpartitioning(year#33, month#34, day#35, hour#36, _nondeterministic#169, 200), ENSURE_REQUIREMENTS, [id=#134]
   +- *(1) HashAggregate(keys=[year#33, month#34, day#35, hour#36, _nondeterministic#169], functions=[])
      +- *(1) Project [year#33, month#34, day#35, hour#36, input_file_name() AS _nondeterministic#169]
         +- FileScan csv default.alb_logs[year#33,month#34,day#35,hour#36] Batched: false, DataFilters: [], Format: CSV, Location: CatalogFileIndex[s3a://maximus.dev.daichen.us-west-2/alb-log-daily], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<>

Sample index data:

curl "localhost:9200/flint_test_skipping_index/_search?pretty"
{
  "took" : 121,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 18,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "flint_test_skipping_index",
        "_id" : "LWt3VIgBQCGf4ClzQDd8",
        "_score" : 1.0,
        "_source" : {
          "file_path" : "file:///.../spark-warehouse/test/year=2023/month=4/part-00000-c2f88187-da00-40aa-981a-471df2e15ca1.c000.csv",
          "year" : 2023,
          "month" : 4
        }
      },
      {
        "_index" : "flint_test_skipping_index",
        "_id" : "L2t6VIgBQCGf4ClzVzcU",
        "_score" : 1.0,
        "_source" : {
          "file_path" : "file:///.../spark-warehouse/test/year=2023/month=5/part-00000-10f5c696-8fd8-4b28-8d41-62ac9c39f905.c000.csv",
          "year" : 2023,
          "month" : 5
        }
      }
      ...

Issues Resolved

opensearch-project/opensearch-spark#2

Check List

  • New functionality includes testing.
    • All tests pass, including unit test, integration test and doctest
  • New functionality has been documented.
    • New functionality has javadoc added
    • New functionality has user manual doc added
  • Commits are signed per the DCO using --signoff

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Signed-off-by: Chen Dai <daichen@amazon.com>
@codecov
Copy link

codecov bot commented May 25, 2023

Codecov Report

Merging #1658 (9c6a5cd) into feature/flint (56e6520) will not change coverage.
The diff coverage is n/a.

@@               Coverage Diff                @@
##             feature/flint    opensearch-project/sql#1658   +/-   ##
================================================
  Coverage            97.19%   97.19%           
  Complexity            4107     4107           
================================================
  Files                  371      371           
  Lines                10464    10464           
  Branches               706      706           
================================================
  Hits                 10170    10170           
  Misses                 287      287           
  Partials                 7        7           
Flag Coverage Δ
sql-engine 97.19% <ø> (ø)

Flags with carried forward coverage won't be shown. Click here to find out more.

dai-chen added 6 commits May 25, 2023 17:32
Signed-off-by: Chen Dai <daichen@amazon.com>
Signed-off-by: Chen Dai <daichen@amazon.com>
Signed-off-by: Chen Dai <daichen@amazon.com>
Signed-off-by: Chen Dai <daichen@amazon.com>
Signed-off-by: Chen Dai <daichen@amazon.com>
@dai-chen dai-chen marked this pull request as ready for review May 26, 2023 20:01
@dai-chen dai-chen requested a review from penghuo May 26, 2023 21:12
val indexType = (meta \ "kind").extract[String]
val indexedColumns = (meta \ "indexedColumns").asInstanceOf[JArray]

indexType match {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/**
* Indexed column name and its Spark SQL type.
*/
val indexedColumn: (String, String)
val columnName: String
val columnType: String
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not use Spark StructType for columnType if it is spark sql type?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, I may remove this columnType field from _meta and this class after we refactored the Spark SQL type => Flint type mapping

extends FlintSparkSkippingStrategy {

override def outputSchema(): Map[String, String] = {
Map(indexedColumn._1 -> convertToFlintType(indexedColumn._2))
Map(columnName -> convertToFlintType(columnType))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does the flintType align need to align with AggregateFunction output type?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it needs to align. As comment above, I may need to use AggregateFunction output type if columnType is unnecessary. I've noted this down. Thanks!

@dai-chen dai-chen merged commit b97b91c into opensearch-project:feature/flint May 26, 2023
@dai-chen dai-chen deleted the add-partion-index-building branch May 26, 2023 22:43
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request Flint
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants