Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Elasticsearch Sink Module #1163

Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
e57df85
added elasticsearch sink
bsuryadevara Aug 30, 2023
c43ddd0
Merge remote-tracking branch 'upstream/branch-23.11' into 902-fea-add…
bsuryadevara Aug 30, 2023
efab6e3
added elasticsearch dependecy
bsuryadevara Aug 31, 2023
6a8631c
Merge branch 'branch-23.11' into 902-fea-add-elasticsearch-sink-module
bsuryadevara Aug 31, 2023
d3004a5
updated docs
bsuryadevara Aug 31, 2023
963c28d
Merge branch '902-fea-add-elasticsearch-sink-module' of github.com:bs…
bsuryadevara Aug 31, 2023
2fe5d64
Merge branch 'branch-23.11' into 902-fea-add-elasticsearch-sink-module
bsuryadevara Aug 31, 2023
9be0de0
Moved controllers module one level up
bsuryadevara Aug 31, 2023
4ea1067
Merge branch 'branch-23.11' into 902-fea-add-elasticsearch-sink-module
bsuryadevara Sep 5, 2023
386fc62
Merge remote-tracking branch 'upstream/branch-23.11' into 902-fea-add…
bsuryadevara Sep 7, 2023
5a7fded
updated write to elasticsearch tests
bsuryadevara Sep 7, 2023
ff58148
Merge branch '902-fea-add-elasticsearch-sink-module' of github.com:bs…
bsuryadevara Sep 7, 2023
6c02b71
updated write to elasticsearch tests
bsuryadevara Sep 7, 2023
347f5f2
updated write to elasticsearch tests
bsuryadevara Sep 7, 2023
0b9bcc1
updated write to elasticsearch tests
bsuryadevara Sep 7, 2023
2dae8a8
updated write to elasticsearch tests
bsuryadevara Sep 7, 2023
8ec3060
Update morpheus/controllers/elasticsearch_controller.py
bsuryadevara Sep 8, 2023
8e05f42
Update morpheus/controllers/elasticsearch_controller.py
bsuryadevara Sep 8, 2023
f623e2d
added write_to_elasticsearch stage test
bsuryadevara Sep 8, 2023
a2b0c0e
Merge branch 'branch-23.11' into 902-fea-add-elasticsearch-sink-module
bsuryadevara Sep 8, 2023
c6d937e
moved refresh_client call to controller impl
bsuryadevara Sep 13, 2023
719ba1a
updates to tests
bsuryadevara Sep 13, 2023
b26e418
Merge branch 'branch-23.11' into 902-fea-add-elasticsearch-sink-module
bsuryadevara Sep 18, 2023
b284048
Added refresh client call to search documents
bsuryadevara Sep 18, 2023
d471e96
Merge branch '902-fea-add-elasticsearch-sink-module' of github.com:bs…
bsuryadevara Sep 18, 2023
f62bdb7
updated elasticsearch sink tests
bsuryadevara Sep 18, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docker/conda/environments/cuda11.8_dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ dependencies:
- dill
- docker-py=5.0
- docutils
- elasticsearch==8.9.0
- faker=12.3.0
- feedparser=6.0.10
- flake8
Expand Down
35 changes: 0 additions & 35 deletions docs/source/modules/core/multiplexer.md

This file was deleted.

45 changes: 45 additions & 0 deletions docs/source/modules/core/write_to_elasticsearch.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
<!--
SPDX-FileCopyrightText: Copyright (c) 2022-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
SPDX-License-Identifier: Apache-2.0

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->

## Write to Elasticsearch Module

This module reads an input data stream, converts each row of data to a document format suitable for Elasticsearch, and writes the documents to the specified Elasticsearch index using the Elasticsearch bulk API.

### Configurable Parameters

| Parameter | Type | Description | Example Value | Default Value |
|-------------------------|--------------|---------------------------------------------------------------------------------------------------------|-------------------------------|---------------|
| `index` | str | Elasticsearch index. | "my_index" | `[Required]` |
| `connection_kwargs` | dict | Elasticsearch connection kwargs configuration. | {"hosts": ["host": "localhost", ...} | `[Required]` |
| `raise_on_exception` | bool | Raise or suppress exceptions when writing to Elasticsearch. | true | `false` |
| `pickled_func_config` | str | Pickled custom function configuration to update connection_kwargs as needed for the client connection. | See below | None |
| `refresh_period_secs` | int | Time in seconds to refresh the client connection. | 3600 | `2400` |

### Example JSON Configuration

```json
{
"index": "test_index",
"connection_kwargs": {"hosts": [{"host": "localhost", "port": 9200, "scheme": "http"}]},
"raise_on_exception": true,
"pickled_func_config": {
"pickled_func_str": "pickled function as a string",
"encoding": "latin1"
},
"refresh_period_secs": 2400
}
```
2 changes: 1 addition & 1 deletion docs/source/modules/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ limitations under the License.
./core/filter_detections.md
./core/from_control_message.md
./core/mlflow_model_writer.md
./core/multiplexer.md
./core/payload_batcher.md
./core/serialize.md
./core/to_control_message.md
./core/write_to_elasticsearch.md
./core/write_to_file.md
```

Expand Down
3 changes: 3 additions & 0 deletions morpheus/cli/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -675,6 +675,9 @@ def post_pipeline(ctx: click.Context, *args, **kwargs):
add_command("preprocess", "morpheus.stages.preprocess.preprocess_nlp_stage.PreprocessNLPStage", modes=NLP_ONLY)
add_command("serialize", "morpheus.stages.postprocess.serialize_stage.SerializeStage", modes=ALL)
add_command("timeseries", "morpheus.stages.postprocess.timeseries_stage.TimeSeriesStage", modes=AE_ONLY)
add_command("to-elasticsearch",
"morpheus.stages.output.write_to_elasticsearch_stage.WriteToElasticsearchStage",
modes=ALL)
add_command("to-file", "morpheus.stages.output.write_to_file_stage.WriteToFileStage", modes=ALL)
add_command("to-kafka", "morpheus.stages.output.write_to_kafka_stage.WriteToKafkaStage", modes=ALL)
add_command("to-http", "morpheus.stages.output.http_client_sink_stage.HttpClientSinkStage", modes=ALL)
Expand Down
163 changes: 163 additions & 0 deletions morpheus/controllers/elasticsearch_controller.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
# Copyright (c) 2022-2023, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
import time

import pandas as pd
from elasticsearch import ConnectionError as ESConnectionError
from elasticsearch import Elasticsearch
from elasticsearch.helpers import parallel_bulk

logger = logging.getLogger(__name__)


class ElasticsearchController:
"""
ElasticsearchController to perform read and write operations using Elasticsearch service.

Parameters
----------
connection_kwargs : dict
Keyword arguments to configure the Elasticsearch connection.
raise_on_exception : bool, optional, default: False
Whether to raise exceptions on Elasticsearch errors.
refresh_period_secs : int, optional, default: 2400
The refresh period in seconds for client refreshing.
"""

def __init__(self, connection_kwargs: dict, raise_on_exception: bool = False, refresh_period_secs: int = 2400):

self._client = None
self._last_refresh_time = None
self._raise_on_exception = raise_on_exception
self._refresh_period_secs = refresh_period_secs

if connection_kwargs is not None and not connection_kwargs:
raise ValueError("Connection kwargs cannot be none or empty.")

self._connection_kwargs = connection_kwargs

logger.debug("Creating Elasticsearch client with configuration: %s", connection_kwargs)

self.refresh_client(force=True)

logger.debug("Elasticsearch cluster info: %s", self._client.info)
logger.debug("Creating Elasticsearch client... Done!")

def refresh_client(self, force: bool = False) -> bool:
"""
Refresh the Elasticsearch client instance.

Parameters
----------
force : bool, optional, default: False
Force a client refresh.

Returns
-------
bool
Returns true if client is refreshed, otherwise false.
"""

is_refreshed = False
time_now = time.time()
if force or self._client is None or time_now - self._last_refresh_time >= self._refresh_period_secs:
if self._client:
try:
# Close the existing client
self.close_client()
except Exception as ex:
logger.warning("Ignoring client close error: %s", ex)
logger.debug("Refreshing Elasticsearch client....")

# Create Elasticsearch client
self._client = Elasticsearch(**self._connection_kwargs)

# Check if the client is connected
if self._client.ping():
logger.debug("Elasticsearch client is connected.")
else:
raise ESConnectionError("Elasticsearch client is not connected.")

logger.debug("Refreshing Elasticsearch client.... Done!")
self._last_refresh_time = time.time()
is_refreshed = True

return is_refreshed

def parallel_bulk_write(self, actions) -> None:
"""
Perform parallel bulk writes to Elasticsearch.

Parameters
----------
actions : list
List of actions to perform in parallel.
"""

for success, info in parallel_bulk(self._client, actions=actions, raise_on_exception=self._raise_on_exception):
if not success:
logger.error("Error writing to ElasticSearch: %s", str(info))

def search_documents(self, index: str, query: dict, **kwargs) -> dict:
"""
Search for documents in Elasticsearch based on the given query.

Parameters
----------
index : str
The name of the index to search.
query : dict
The DSL query for the search.
**kwargs
Additional keyword arguments that are supported by the Elasticsearch search method.

Returns
-------
dict
The search result returned by Elasticsearch.
"""

try:
result = self._client.search(index=index, query=query, **kwargs)
return result
except Exception as exc:
logger.error("Error searching documents: %s", exc)
if self._raise_on_exception:
raise RuntimeError(f"Error searching documents: {exc}") from exc

return {}

def df_to_parallel_bulk_write(self, index: str, df: pd.DataFrame) -> None:
"""
Converts DataFrames to actions and parallel bulk writes to Elasticsearch.

Parameters
----------
index : str
The name of the index to write.
df : pd.DataFrame
DataFrame entries that require writing to Elasticsearch.
"""

actions = [{"_index": index, "_source": row} for row in df.to_dict("records")]

self.parallel_bulk_write(actions) # Parallel bulk upload to Elasticsearch

def close_client(self) -> None:
"""
Close the Elasticsearch client connection.
"""
self._client.close()
6 changes: 4 additions & 2 deletions morpheus/modules/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
"""
Morpheus module definitions, each module is automatically registered when imported
"""
from morpheus._lib import modules
# When segment modules are imported, they're added to the module registry.
# To avoid flake8 warnings about unused code, the noqa flag is used during import.
from morpheus.modules import file_batcher
Expand All @@ -26,8 +27,8 @@
from morpheus.modules import payload_batcher
from morpheus.modules import serialize
from morpheus.modules import to_control_message
from morpheus.modules import write_to_elasticsearch
from morpheus.modules import write_to_file
from morpheus._lib import modules

__all__ = [
"file_batcher",
Expand All @@ -41,5 +42,6 @@
"payload_batcher",
"serialize",
"to_control_message",
"write_to_file"
"write_to_file",
"write_to_elasticsearch"
]
84 changes: 84 additions & 0 deletions morpheus/modules/write_to_elasticsearch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
# Copyright (c) 2022-2023, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
import pickle

import mrc
from mrc.core import operators as ops

from morpheus.controllers.elasticsearch_controller import ElasticsearchController
from morpheus.messages import ControlMessage
from morpheus.utils.module_ids import MORPHEUS_MODULE_NAMESPACE
from morpheus.utils.module_ids import WRITE_TO_ELASTICSEARCH
from morpheus.utils.module_utils import register_module

logger = logging.getLogger(__name__)


@register_module(WRITE_TO_ELASTICSEARCH, MORPHEUS_MODULE_NAMESPACE)
def write_to_elasticsearch(builder: mrc.Builder):
"""
This module reads input data stream, converts each row of data to a document format suitable for Elasticsearch,
and writes the documents to the specified Elasticsearch index using the Elasticsearch bulk API.

Parameters
----------
builder : mrc.Builder
An mrc Builder object.
"""

config = builder.get_current_module_config()

index = config.get("index", None)

if index is None:
raise ValueError("Index must not be None.")

connection_kwargs = config.get("connection_kwargs")

if not isinstance(connection_kwargs, dict):
raise ValueError(f"Expects `connection_kwargs` as a dictionary, but it is of type {type(connection_kwargs)}")

raise_on_exception = config.get("raise_on_exception", False)
pickled_func_config = config.get("pickled_func_config", None)
refresh_period_secs = config.get("refresh_period_secs", 2400)

if pickled_func_config:
pickled_func_str = pickled_func_config.get("pickled_func_str")
encoding = pickled_func_config.get("encoding")

if pickled_func_str and encoding:
connection_kwargs_update_func = pickle.loads(bytes(pickled_func_str, encoding))
connection_kwargs = connection_kwargs_update_func(connection_kwargs)

controller = ElasticsearchController(connection_kwargs=connection_kwargs,
raise_on_exception=raise_on_exception,
refresh_period_secs=refresh_period_secs)

def on_data(message: ControlMessage):

controller.refresh_client()

df = message.payload().df.to_pandas()

controller.df_to_parallel_bulk_write(index=index, df=df)

return message

node = builder.make_node(WRITE_TO_ELASTICSEARCH, ops.map(on_data), ops.on_completed(controller.close_client))

# Register input and output port for a module.
builder.register_module_input("input", node)
builder.register_module_output("output", node)
Loading