-
Notifications
You must be signed in to change notification settings - Fork 161
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Added Elasticsearch controller to establish client connection and read and write to Elasticsearch. - Added tests Closes #902 Authors: - Bhargav Suryadevara (https://github.com/bsuryadevara) Approvers: - Michael Demoret (https://github.com/mdemoret-nv) URL: #1163
- Loading branch information
1 parent
aab0d96
commit 9174cec
Showing
12 changed files
with
702 additions
and
39 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
<!-- | ||
SPDX-FileCopyrightText: Copyright (c) 2022-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. | ||
SPDX-License-Identifier: Apache-2.0 | ||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
http://www.apache.org/licenses/LICENSE-2.0 | ||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
--> | ||
|
||
## Write to Elasticsearch Module | ||
|
||
This module reads an input data stream, converts each row of data to a document format suitable for Elasticsearch, and writes the documents to the specified Elasticsearch index using the Elasticsearch bulk API. | ||
|
||
### Configurable Parameters | ||
|
||
| Parameter | Type | Description | Example Value | Default Value | | ||
|-------------------------|--------------|---------------------------------------------------------------------------------------------------------|-------------------------------|---------------| | ||
| `index` | str | Elasticsearch index. | "my_index" | `[Required]` | | ||
| `connection_kwargs` | dict | Elasticsearch connection kwargs configuration. | {"hosts": ["host": "localhost", ...} | `[Required]` | | ||
| `raise_on_exception` | bool | Raise or suppress exceptions when writing to Elasticsearch. | true | `false` | | ||
| `pickled_func_config` | str | Pickled custom function configuration to update connection_kwargs as needed for the client connection. | See below | None | | ||
| `refresh_period_secs` | int | Time in seconds to refresh the client connection. | 3600 | `2400` | | ||
|
||
### Example JSON Configuration | ||
|
||
```json | ||
{ | ||
"index": "test_index", | ||
"connection_kwargs": {"hosts": [{"host": "localhost", "port": 9200, "scheme": "http"}]}, | ||
"raise_on_exception": true, | ||
"pickled_func_config": { | ||
"pickled_func_str": "pickled function as a string", | ||
"encoding": "latin1" | ||
}, | ||
"refresh_period_secs": 2400 | ||
} | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,166 @@ | ||
# Copyright (c) 2022-2023, NVIDIA CORPORATION. | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
import logging | ||
import time | ||
|
||
import pandas as pd | ||
from elasticsearch import ConnectionError as ESConnectionError | ||
from elasticsearch import Elasticsearch | ||
from elasticsearch.helpers import parallel_bulk | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class ElasticsearchController: | ||
""" | ||
ElasticsearchController to perform read and write operations using Elasticsearch service. | ||
Parameters | ||
---------- | ||
connection_kwargs : dict | ||
Keyword arguments to configure the Elasticsearch connection. | ||
raise_on_exception : bool, optional, default: False | ||
Whether to raise exceptions on Elasticsearch errors. | ||
refresh_period_secs : int, optional, default: 2400 | ||
The refresh period in seconds for client refreshing. | ||
""" | ||
|
||
def __init__(self, connection_kwargs: dict, raise_on_exception: bool = False, refresh_period_secs: int = 2400): | ||
|
||
self._client = None | ||
self._last_refresh_time = None | ||
self._raise_on_exception = raise_on_exception | ||
self._refresh_period_secs = refresh_period_secs | ||
|
||
if connection_kwargs is not None and not connection_kwargs: | ||
raise ValueError("Connection kwargs cannot be none or empty.") | ||
|
||
self._connection_kwargs = connection_kwargs | ||
|
||
logger.debug("Creating Elasticsearch client with configuration: %s", connection_kwargs) | ||
|
||
self.refresh_client(force=True) | ||
|
||
logger.debug("Elasticsearch cluster info: %s", self._client.info) | ||
logger.debug("Creating Elasticsearch client... Done!") | ||
|
||
def refresh_client(self, force: bool = False) -> bool: | ||
""" | ||
Refresh the Elasticsearch client instance. | ||
Parameters | ||
---------- | ||
force : bool, optional, default: False | ||
Force a client refresh. | ||
Returns | ||
------- | ||
bool | ||
Returns true if client is refreshed, otherwise false. | ||
""" | ||
|
||
is_refreshed = False | ||
time_now = time.time() | ||
if force or self._client is None or time_now - self._last_refresh_time >= self._refresh_period_secs: | ||
if self._client: | ||
try: | ||
# Close the existing client | ||
self.close_client() | ||
except Exception as ex: | ||
logger.warning("Ignoring client close error: %s", ex) | ||
logger.debug("Refreshing Elasticsearch client....") | ||
|
||
# Create Elasticsearch client | ||
self._client = Elasticsearch(**self._connection_kwargs) | ||
|
||
# Check if the client is connected | ||
if self._client.ping(): | ||
logger.debug("Elasticsearch client is connected.") | ||
else: | ||
raise ESConnectionError("Elasticsearch client is not connected.") | ||
|
||
logger.debug("Refreshing Elasticsearch client.... Done!") | ||
self._last_refresh_time = time.time() | ||
is_refreshed = True | ||
|
||
return is_refreshed | ||
|
||
def parallel_bulk_write(self, actions) -> None: | ||
""" | ||
Perform parallel bulk writes to Elasticsearch. | ||
Parameters | ||
---------- | ||
actions : list | ||
List of actions to perform in parallel. | ||
""" | ||
|
||
self.refresh_client() | ||
|
||
for success, info in parallel_bulk(self._client, actions=actions, raise_on_exception=self._raise_on_exception): | ||
if not success: | ||
logger.error("Error writing to ElasticSearch: %s", str(info)) | ||
|
||
def search_documents(self, index: str, query: dict, **kwargs) -> dict: | ||
""" | ||
Search for documents in Elasticsearch based on the given query. | ||
Parameters | ||
---------- | ||
index : str | ||
The name of the index to search. | ||
query : dict | ||
The DSL query for the search. | ||
**kwargs | ||
Additional keyword arguments that are supported by the Elasticsearch search method. | ||
Returns | ||
------- | ||
dict | ||
The search result returned by Elasticsearch. | ||
""" | ||
|
||
try: | ||
self.refresh_client() | ||
result = self._client.search(index=index, query=query, **kwargs) | ||
return result | ||
except Exception as exc: | ||
logger.error("Error searching documents: %s", exc) | ||
if self._raise_on_exception: | ||
raise RuntimeError(f"Error searching documents: {exc}") from exc | ||
|
||
return {} | ||
|
||
def df_to_parallel_bulk_write(self, index: str, df: pd.DataFrame) -> None: | ||
""" | ||
Converts DataFrames to actions and parallel bulk writes to Elasticsearch. | ||
Parameters | ||
---------- | ||
index : str | ||
The name of the index to write. | ||
df : pd.DataFrame | ||
DataFrame entries that require writing to Elasticsearch. | ||
""" | ||
|
||
actions = [{"_index": index, "_source": row} for row in df.to_dict("records")] | ||
|
||
self.parallel_bulk_write(actions) # Parallel bulk upload to Elasticsearch | ||
|
||
def close_client(self) -> None: | ||
""" | ||
Close the Elasticsearch client connection. | ||
""" | ||
self._client.close() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,82 @@ | ||
# Copyright (c) 2022-2023, NVIDIA CORPORATION. | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
import logging | ||
import pickle | ||
|
||
import mrc | ||
from mrc.core import operators as ops | ||
|
||
from morpheus.controllers.elasticsearch_controller import ElasticsearchController | ||
from morpheus.messages import ControlMessage | ||
from morpheus.utils.module_ids import MORPHEUS_MODULE_NAMESPACE | ||
from morpheus.utils.module_ids import WRITE_TO_ELASTICSEARCH | ||
from morpheus.utils.module_utils import register_module | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
@register_module(WRITE_TO_ELASTICSEARCH, MORPHEUS_MODULE_NAMESPACE) | ||
def write_to_elasticsearch(builder: mrc.Builder): | ||
""" | ||
This module reads input data stream, converts each row of data to a document format suitable for Elasticsearch, | ||
and writes the documents to the specified Elasticsearch index using the Elasticsearch bulk API. | ||
Parameters | ||
---------- | ||
builder : mrc.Builder | ||
An mrc Builder object. | ||
""" | ||
|
||
config = builder.get_current_module_config() | ||
|
||
index = config.get("index", None) | ||
|
||
if index is None: | ||
raise ValueError("Index must not be None.") | ||
|
||
connection_kwargs = config.get("connection_kwargs") | ||
|
||
if not isinstance(connection_kwargs, dict): | ||
raise ValueError(f"Expects `connection_kwargs` as a dictionary, but it is of type {type(connection_kwargs)}") | ||
|
||
raise_on_exception = config.get("raise_on_exception", False) | ||
pickled_func_config = config.get("pickled_func_config", None) | ||
refresh_period_secs = config.get("refresh_period_secs", 2400) | ||
|
||
if pickled_func_config: | ||
pickled_func_str = pickled_func_config.get("pickled_func_str") | ||
encoding = pickled_func_config.get("encoding") | ||
|
||
if pickled_func_str and encoding: | ||
connection_kwargs_update_func = pickle.loads(bytes(pickled_func_str, encoding)) | ||
connection_kwargs = connection_kwargs_update_func(connection_kwargs) | ||
|
||
controller = ElasticsearchController(connection_kwargs=connection_kwargs, | ||
raise_on_exception=raise_on_exception, | ||
refresh_period_secs=refresh_period_secs) | ||
|
||
def on_data(message: ControlMessage): | ||
|
||
df = message.payload().df.to_pandas() | ||
|
||
controller.df_to_parallel_bulk_write(index=index, df=df) | ||
|
||
return message | ||
|
||
node = builder.make_node(WRITE_TO_ELASTICSEARCH, ops.map(on_data), ops.on_completed(controller.close_client)) | ||
|
||
# Register input and output port for a module. | ||
builder.register_module_input("input", node) | ||
builder.register_module_output("output", node) |
Oops, something went wrong.