Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support refresh interval and checkpoint location option #39

Merged

Conversation

dai-chen
Copy link
Collaborator

@dai-chen dai-chen commented Sep 20, 2023

Description

  1. Added refresh_interval and checkpoint_location option and store it in Flint metadata
  2. Fixed STRING token and deleteIndex() stop job first bug
  3. Extract Spark parsing to SparkSqlAstBuilder and mix-in to Flint parser

TODO

  1. Didn't find easy way to verify index option in streaming job. Will add UT in next PR for index setting option.
  2. Refactoring Flint metadata and build index API #24: build API refactor will come first in next PR for MV.
  3. Will update user manual with new index options in next PR.

Example

spark-sql> CREATE INDEX orderkey_and_quantity
         > ON stream.lineitem_tiny (l_orderkey, l_quantity)
         > WITH (
         >   auto_refresh = true,
         >   refresh_interval = '10 seconds',
         >   checkpoint_location = 's3a://test/'
         > );

GET flint_stream_lineitem_tiny_orderkey_and_quantity_index/_mapping
{
  "flint_stream_lineitem_tiny_orderkey_and_quantity_index": {
    "mappings": {
      "_meta": {
        "name": "orderkey_and_quantity",
        "options": {
          "auto_refresh": "true",
          "refresh_interval": "10 seconds",
          "checkpoint_location": "s3a://test/"
        },
        "source": "stream.lineitem_tiny",
        "kind": "covering",
        ...
    }
  }
}

spark-sql> 23/09/21 16:06:30 WARN ProcessingTimeExecutor: Current batch is falling behind. 
The trigger interval is 10000 milliseconds, but spent 72423 milliseconds

$ aws s3 ls test
     commits/
     offsets/
     sources/

Issues Resolved

#26

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Signed-off-by: Chen Dai <daichen@amazon.com>
@dai-chen dai-chen added the enhancement New feature or request label Sep 20, 2023
@dai-chen dai-chen self-assigned this Sep 20, 2023
@dai-chen dai-chen changed the title Support more options in create index statement Support refresh interval and checkpoint location option in create statement Sep 20, 2023
@dai-chen dai-chen changed the title Support refresh interval and checkpoint location option in create statement Support refresh interval and checkpoint location option Sep 20, 2023
Signed-off-by: Chen Dai <daichen@amazon.com>
Signed-off-by: Chen Dai <daichen@amazon.com>
Signed-off-by: Chen Dai <daichen@amazon.com>
Signed-off-by: Chen Dai <daichen@amazon.com>
Signed-off-by: Chen Dai <daichen@amazon.com>
@dai-chen dai-chen marked this pull request as ready for review September 25, 2023 15:34
Signed-off-by: Chen Dai <daichen@amazon.com>
Signed-off-by: Chen Dai <daichen@amazon.com>
@dai-chen dai-chen requested a review from penghuo September 26, 2023 17:32
docs/index.md Outdated Show resolved Hide resolved
Signed-off-by: Chen Dai <daichen@amazon.com>
}
}

override def visitPropertyValue(value: PropertyValueContext): String = {
Copy link
Member

@vamsimanohar vamsimanohar Sep 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like every option value is treated like a string.
Few QQs:

  • If user passes some random strings in auto refresh. When will the failure occur?
  • [Not related to this PR] Any thoughts on communicating syntax and other errors to user

Copy link
Collaborator Author

@dai-chen dai-chen Sep 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Yes, it's a map of string passed to FlintSparkIndexOptions which will interpret it.
  2. For the error case, currently exception will be thrown when we build DataFrame for streaming job in FlintSpark. Let me think about if Spark error is clear enough or can we validate beforehand in Add index settings option in create statement #44. Thanks for the good point!

@vamsimanohar
Copy link
Member

LGTM

@dai-chen dai-chen merged commit 896fa9f into opensearch-project:main Sep 26, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants