Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Streaming Indexing] Introduce bulk HTTP API streaming flavour #9070

Closed
reta opened this issue Aug 2, 2023 · 8 comments · Fixed by #15381
Closed

[Streaming Indexing] Introduce bulk HTTP API streaming flavour #9070

reta opened this issue Aug 2, 2023 · 8 comments · Fixed by #15381
Assignees
Labels
enhancement Enhancement or improvement to existing feature or request Indexing Indexing, Bulk Indexing and anything related to indexing v2.17.0 v3.0.0 Issues and PRs related to version 3.0.0

Comments

@reta
Copy link
Collaborator

reta commented Aug 2, 2023

Is your feature request related to a problem? Please describe.
The bulk HTTP API does not support streaming (neither HTTP/2 nor chunked transfer)

Describe the solution you'd like
Introduce bulk HTTP API streaming flavor based on new experimental transport (#9067)

Describe alternatives you've considered
N/A

Additional context
See please #9067

opensearch-project/opensearch-api-specification#537
opensearch-project/documentation-website#8111

@reta reta added enhancement Enhancement or improvement to existing feature or request Indexing & Search labels Aug 2, 2023
@reta reta removed the untriaged label Aug 2, 2023
@anasalkouz anasalkouz added Indexing Indexing, Bulk Indexing and anything related to indexing and removed Indexing & Search labels Sep 19, 2023
@reta reta self-assigned this Aug 20, 2024
@reta reta added the v2.17.0 label Aug 20, 2024
@reta
Copy link
Collaborator Author

reta commented Aug 20, 2024

There is new experimental API introduced in 2.15.0 which is not yet documented nor finalized:

    POST  /_bulk/stream
    PUT    /_bulk/stream
    POST  /{index}/_bulk/stream
    PUT    /{index}/_bulk/stream

The new API is using LD-JSON and at the moment, picks a single chunk from the stream (bulk operation), forwards it to the node of the cluster and returns the result of the operation as a chunk in the response, for example the request with three chunks:

<---- chunk 1 -----> 
{ "index": { "_index": "test-streaming", "_id": "1" } }
{ "name": "josh" }
<---- chunk 1 -----> 

<---- chunk 2 -----> 
{ "index": { "_index": "test-streaming", "_id": "2" } }
{ "name": "tom" }
<---- chunk 2 -----> 

<---- chunk 3 -----> 
{ "index": { "_index": "test-streaming", "_id": "3" } }
{ "name": "john" }
<---- chunk 3 -----> 

would receive a response with three chunks as well (1:1):

<---- chunk 1 -----> 
{"took":547,"errors":false,"items":[{"index":{"_index":"test-streaming","_id":"1","_version":1,"result":"created","forced_refresh":true,"_shards":{"total":2,"successful":1,"failed":0},"_seq_no":0,"_primary_term":1,"status":201}}]}

<---- chunk 2 -----> 
{"took":20,"errors":false,"items":[{"index":{"_index":"test-streaming","_id":"2","_version":1,"result":"created","forced_refresh":true,"_shards":{"total":2,"successful":1,"failed":0},"_seq_no":1,"_primary_term":1,"status":201}}]}
<---- chunk 2 -----> 

<---- chunk 3 -----> 
{"took":22,"errors":false,"items":[{"index":{"_index":"test-streaming","_id":"3","_version":1,"result":"created","forced_refresh":true,"_shards":{"total":2,"successful":1,"failed":0},"_seq_no":2,"_primary_term":1,"status":201}}]}
<---- chunk 3 -----> 

This is simple communication model but very inefficient. The goal of this issue is to formalize API in such a way that it could be used as efficiently as possible, aiming to overtake existing /_bulk APIs in the future.

@reta reta added the v3.0.0 Issues and PRs related to version 3.0.0 label Aug 20, 2024
@reta
Copy link
Collaborator Author

reta commented Aug 21, 2024

Option #1: Keep using NDJSON for streaming ingestion

The current _bulk APIs use NDJSON (Newline delimited JSON, https://github.com/ndjson/ndjson-spec) which fits perfectly to the streaming use cases: each chunk sent along the request represents distinct bulk operation/action (index, create, delete, and update). The suggestion is to not change it and use NDJSON for streaming /_bulk/stream APIs as well.

To efficiently batch chunks, the streaming bulk request could support following parameters:

  • batch_size: how many operations/actions to accumulate before routing the bulk request to the data nodes
  • batch_interval: how long to accumulate operations/actions before routing the bulk request to the data nodes

The combination of both should also be possible (in this regard, whichever condition triggers first, it would shape the batch). Although the request format stays unchanged, the semantic of response would change to reflect the internal batching (so it won't be 1:1 anymore), for example, with batch_size=3 and three request chunks:

<---- chunk 1 -----> 
{ "index": { "_index": "test-streaming", "_id": "1" } }
{ "name": "josh" }
<---- chunk 1 -----> 

<---- chunk 2 -----> 
{ "index": { "_index": "test-streaming", "_id": "2" } }
{ "name": "tom" }
<---- chunk 2 -----> 

<---- chunk 3 -----> 
{ "index": { "_index": "test-streaming", "_id": "3" } }
{ "name": "john" }
<---- chunk 3 -----> 

the response would only contain one chunk (per 3 request chunks):

<---- chunk 1 -----> 
{"took":547,"errors":false,"items":[
    {"index":{"_index":"test-streaming","_id":"1","_version":1,"result":"created","forced_refresh":true,"_shards":{"total":2,"successful":1,"failed":0},"_seq_no":0,"_primary_term":1,"status":201}}
    {"index":{"_index":"test-streaming","_id":"2","_version":1,"result":"created","forced_refresh":true,"_shards":{"total":2,"successful":1,"failed":0},"_seq_no":1,"_primary_term":1,"status":201}},
    {"index":{"_index":"test-streaming","_id":"3","_version":1,"result":"created","forced_refresh":true,"_shards":{"total":2,"successful":1,"failed":0},"_seq_no":2,"_primary_term":1,"status":201}}
]}
<---- chunk 1 -----> 

The downside of this model: if the bulk action has documents without _id, it would be difficult to match the outcome of N request chunks to corresponding M response chunks (but this is a general problem with existing _bulk APIs as well) to precisely pinpoint the failures fe.

On the benefits side, keeping NDJSON would significantly simplify the migration from the existing _bulk APIs since the only changes would be related to slicing the bulk operations (and response processing if needed) but not data formats.

Risks to consider:

  • dealing with very large request chunks (basically, documents)

Option #2: Introduce efficient (binary?) format for streaming ingestion

Alternative option is to introduce new efficient (binary?) format for streaming ingestion (for example, based on Protocol Buffers).

Protocol Buffers are great for handling individual messages within a large data set. Usually, large data sets are a collection of small pieces, where each small piece is structured data. - https://protobuf.dev/programming-guides/techniques/

The example message schema may look like this:

syntax = "proto3";
import "google/protobuf/any.proto";

message Index {
  optional string index = 1;
  optional string _id = 2;
  optional bool require_alias = 3;
  map<string,  google.protobuf.Any> fields = 4;
}

message Create {
  optional string index = 1;
  optional string _id = 2;
  optional bool require_alias = 3;  
  map<string,  google.protobuf.Any> fields = 4;
}

message Delete {
  optional string index = 1;
  string _id = 2;
  optional bool require_alias = 3;      
}

message Update {
  optional string index = 1;
  string _id = 2;
  optional bool require_alias = 3;    
  optional google.protobuf.Any doc = 4;
}

message Action {
  oneof action {
      Index index = 1;
      Create create = 2;
      Delete delete= 3;
      Update update = 4;
  }
}

The schema actively relies on google.protobuf.Any to pass freestyle JSON-like structures around (for example, documents or scripts):

The Any message type lets you use messages as embedded types without having their .proto definition. An Any contains an arbitrary serialized message as bytes, along with a URL that acts as a globally unique identifier for and resolves to that message’s type. - https://protobuf.dev/programming-guides/proto3/#any

Risks to consider:

  • dealing with very large messages (basically, documents)

@reta
Copy link
Collaborator Author

reta commented Aug 22, 2024

@andrross @dblock @msfroh would appreciate early feedback if possible, working on some POCs at the moment to capture the operational metrics.

@andrross
Copy link
Member

andrross commented Aug 23, 2024

@reta My inclination here is that since this is a new API that is specifically targeting high-throughput use cases then we should explore option 2. I think there is quite a lot of overhead with parsing JSON and a binary protocol could improve upon that. For simple use cases where ease of use is paramount, the existing request-response style newline delimited JSON API will remain. I also think we could make adoption of a new binary API easier for the end user by keeping the same public API (as much as possible) of the various language clients like opensearch-java by continuing to use the current types exposed by those APIs even if it means an extra conversion on the client side to say a protobuf object type in the process of doing the binary serialization.

Tagging @amberzsy here as she is working on some protobuf experiments with the client API on the search side. We could potentially make the protobuf implementation for the client API much easier by generating the protobuf schemas from the opensearch-api specification, as that should the source-of-truth for OpenSearch client API.

@reta
Copy link
Collaborator Author

reta commented Aug 23, 2024

@reta My inclination here is that since this is a new API that is specifically targeting high-throughput use cases then we should explore option 2.

Thanks @andrross , I will try this route as well and publish some metrics so we could compare the wins (if any), thanks!

@andrross
Copy link
Member

@reta I'm really curious if the protobuf Any type is feasible here. Have you actually used it in practice at all? I was thinking we might have to fallback to binary blobs of UTF-8 encoded JSON for the actual document source in some cases.

@reta
Copy link
Collaborator Author

reta commented Aug 23, 2024

@reta I'm really curious if the protobuf Any type is feasible here. Have you actually used it in practice at all? I was thinking we might have to fallback to binary blobs of UTF-8 encoded JSON for the actual document source in some cases.

No, I haven't tried this schema yet in practice (only made sure it is processable by protoc successfully), will let you know how it goes, thanks @andrross !

@reta reta changed the title [Streaming Indexing] Introduce bulk HTTP API streaming flavor [Streaming Indexing] Introduce bulk HTTP API streaming flavour Aug 27, 2024
@reta
Copy link
Collaborator Author

reta commented Aug 27, 2024

To have change delivered incrementally, focusing this issue on HTTP/NDJSON API flavour (Option #1) and spinning off Protobuf API flavour into #15447 for future release (Option #2).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement Enhancement or improvement to existing feature or request Indexing Indexing, Bulk Indexing and anything related to indexing v2.17.0 v3.0.0 Issues and PRs related to version 3.0.0
Projects
Development

Successfully merging a pull request may close this issue.

3 participants