Skip to content

Commit

Permalink
fix(es): correct es param
Browse files Browse the repository at this point in the history
  • Loading branch information
thanh-nguyen-dang committed Jun 9, 2023
1 parent 69d9f07 commit ffd3007
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 13 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ wheels/
*.egg-info/
.installed.cfg
*.egg
temp/

# PyInstaller
# Usually these files are written by a python script from a template
Expand Down
1 change: 1 addition & 0 deletions tests/integrated_tests/test_es/test_es.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ def test_auth_resource_path_exist(doc_type):
for hit in hit_response:
assert hit.get("_source").get("auth_resource_path") == auth_resource_path


@pytest.mark.parametrize("doc_type", doc_types)
def test_es_types(doc_type):
"""
Expand Down
2 changes: 1 addition & 1 deletion tests/integrated_tests/utils_es.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from itertools import chain

from elasticsearch import Elasticsearch
from elasticsearch_dsl import Search

import tube.settings as config

Expand All @@ -27,6 +26,7 @@ def get_item_from_elasticsearch(index, doc_type, item):
# results = s.execute()
return [h.get("_source") for h in hits]


def get_names(p):
mapping = p.mapping
names = []
Expand Down
14 changes: 9 additions & 5 deletions tube/etl/indexers/base/translator.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,23 @@
from tube.utils.spark import save_rdd_of_dataframe, get_all_files, save_rdds
from tube.utils.general import get_node_id_name


def json_export_with_no_key(x, doc_type, root_name):
x[1][get_node_id_name(doc_type)] = x[0]
if root_name is not None and doc_type != root_name:
x[1][get_node_id_name(root_name)] = x[0]
x[1]["node_id"] = x[0] # redundant field for backward compatibility with arranger
return json.dumps(x[1])


cached_dataframe = {}


class Translator(object):
"""
The main entry point into the index export process for the mutation indices
"""

def __init__(self, sc, hdfs_path, writer, parser: Parser):
self.sc = sc
if sc is not None:
Expand Down Expand Up @@ -181,7 +185,7 @@ def translate_table_to_dataframe(
new_df = new_df.select(*cols)
return self.return_dataframe(
new_df,
f"{Translator.translate_table_to_dataframe.__qualname__}__{node.name}"
f"{Translator.translate_table_to_dataframe.__qualname__}__{node.name}",
)
except Exception as ex:
print("HAPPEN WITH NODE: {}".format(node_tbl_name))
Expand Down Expand Up @@ -332,7 +336,9 @@ def save_dataframe_to_hadoop(self, df):
def return_dataframe(self, df, dataframe_name):
if config.RUNNING_MODE.lower() == enums.RUNNING_MODE_PRE_TEST.lower():
step_name = f"{self.current_step}_{dataframe_name}"
save_rdd_of_dataframe(df, self.get_path_to_save_dataframe(step_name), self.sc)
save_rdd_of_dataframe(
df, self.get_path_to_save_dataframe(step_name), self.sc
)
return df

def get_path_to_save_dataframe(self, step):
Expand All @@ -343,9 +349,7 @@ def get_path_to_save_dataframe(self, step):
current_number = cached_dataframe.get(dataframe_name) + 1
dataframe_name = f"{dataframe_name}_{current_number}"
cached_dataframe[dataframe_name] = current_number
return os.path.join(
self.hdfs_path, "output", dataframe_name
)
return os.path.join(self.hdfs_path, "output", dataframe_name)

def save_to_hadoop(self, df):
save_rdds(df, self.get_path_from_step(self.current_step), self.sc)
Expand Down
12 changes: 5 additions & 7 deletions tube/etl/outputs/es/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,9 @@ def create_guppy_array_config(self, parser): # etl_index_name, types, array_typ

mapping = {
"mappings": {
"type_name": {
"properties": {
"timestamp": {"type": "date"},
"array": {"type": "keyword"},
}
"properties": {
"timestamp": {"type": "date"},
"array": {"type": "keyword"},
}
}
}
Expand All @@ -107,7 +105,7 @@ def create_guppy_array_config(self, parser): # etl_index_name, types, array_typ
index_to_write = self.versioning.create_new_index(
mapping, self.versioning.get_next_index_version(index)
)
self.es.index(index_to_write, "_doc", id=etl_index_name, body=doc)
self.es.index(index_to_write, body=doc, id=etl_index_name)
self.versioning.putting_new_version_tag(index_to_write, index)
self.versioning.putting_new_version_tag(index_to_write, alias)
putting_timestamp(self.es, index_to_write)
Expand All @@ -125,7 +123,7 @@ def write_dataframe(self, df, index, doc_type, types):

index_to_write = self.versioning.create_new_index(
{"mappings": types.get(doc_type)},
self.versioning.get_next_index_version(index)
self.versioning.get_next_index_version(index),
)
self.write_to_es(
df, index_to_write, index, doc_type, self.write_df_to_new_index
Expand Down

0 comments on commit ffd3007

Please sign in to comment.