Skip to content

Commit

Permalink
fix(es7): test tube with es-7 (#238)
Browse files Browse the repository at this point in the history
* fix(es7): test tube with es-7

* fix(urllib): update urllib3 version

* fix(poetry): update poetry.lock to match .toml file

* fix(es): correct es param

* use compose-etl master
  • Loading branch information
thanh-nguyen-dang authored Nov 9, 2023
1 parent 9265477 commit 83a76a7
Show file tree
Hide file tree
Showing 11 changed files with 334 additions and 328 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ wheels/
*.egg-info/
.installed.cfg
*.egg
temp/

# PyInstaller
# Usually these files are written by a python script from a template
Expand Down
546 changes: 260 additions & 286 deletions poetry.lock

Large diffs are not rendered by default.

5 changes: 2 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,14 @@ python = "^3.9"
cdislogging = "1.1.1"
cryptography = "^37.0.4"
dictionaryutils = "^3.0.2"
elasticsearch = "6.4.0"
elasticsearch-dsl = "6.4.0"
elasticsearch = "7.10.0"
gdcdictionary = "1.2.0"
gen3datamodel = "3.0.2"
hdfs = "2.7.0"
psycopg2-binary="2.9.3"
psqlgraph = "3.0.1"
pyspark = "3.3.0"
urllib3 = "1.25.10"
urllib3 = "1.26.16"
requests = "^2.28.1"
setuptools = "65.3.0"
six = "1.16.0"
Expand Down
4 changes: 2 additions & 2 deletions run_etl.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ def main():
args = config_by_args()

es_hosts = config.ES["es.nodes"]
es_port = config.ES["es.port"]
es = Elasticsearch([{"host": es_hosts, "port": es_port}])
es_port = int(config.ES["es.port"])
es = Elasticsearch([{"host": es_hosts, "port": es_port, "schema": "http"}])
index_names = interpreter.get_index_names(config)

if args.force or check_to_run_etl(es, index_names):
Expand Down
34 changes: 25 additions & 9 deletions tests/integrated_tests/test_es/test_es.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import pytest
from elasticsearch import Elasticsearch
from elasticsearch import client
from elasticsearch_dsl import Search

import tube.settings as config
from tube.etl.indexers.interpreter import create_translators
Expand All @@ -22,14 +21,23 @@ def test_auth_resource_path_exist(doc_type):
Check that the field "auth_resource_path" exist
"""
parser = dict_translators[doc_type].parser
es = Elasticsearch([{"host": config.ES["es.nodes"], "port": config.ES["es.port"]}])
s = Search(using=es, index=parser.name, doc_type=doc_type)
response = s.execute()
es = Elasticsearch(
[
{
"host": config.ES["es.nodes"],
"port": int(config.ES["es.port"]),
"scheme": "http",
}
]
)
response = es.search(
index=parser.name, body={"query": {"match_all": {}}}, size=9999
)

hit_response = response["hits"]["hits"]
auth_resource_path = "/programs/jnkns/projects/jenkins"

for hit in response:
assert hit.auth_resource_path == auth_resource_path
for hit in hit_response:
assert hit.get("_source").get("auth_resource_path") == auth_resource_path


@pytest.mark.parametrize("doc_type", doc_types)
Expand All @@ -38,7 +46,15 @@ def test_es_types(doc_type):
Check that no field have "text" type
"""
parser = dict_translators[doc_type].parser
es = Elasticsearch([{"host": config.ES["es.nodes"], "port": config.ES["es.port"]}])
es = Elasticsearch(
[
{
"host": config.ES["es.nodes"],
"port": int(config.ES["es.port"]),
"scheme": "http",
}
]
)

indices = client.IndicesClient(es)
index_name = list(indices.get_alias(name=parser.name).keys())[0]
Expand All @@ -47,7 +63,7 @@ def test_es_types(doc_type):

# assert "_None_id" not in mapping[index_name]["mappings"][doc_type]["properties"]
list_errors = []
for k, t in list(mapping[index_name]["mappings"][doc_type]["properties"].items()):
for k, t in list(mapping[index_name]["mappings"]["properties"].items()):
try:
assert t["type"] != "text", f"field {k} has type as text"
except AssertionError as ex:
Expand Down
26 changes: 18 additions & 8 deletions tests/integrated_tests/utils_es.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,30 @@
from itertools import chain

from elasticsearch import Elasticsearch
from elasticsearch_dsl import Search

import tube.settings as config


def get_item_from_elasticsearch(index, doc_type, item):
es = Elasticsearch([{"host": config.ES["es.nodes"], "port": config.ES["es.port"]}])
s = Search(using=es, index=index, doc_type=doc_type).query(
"match", submitter_id=item
es = Elasticsearch(
[
{
"host": config.ES["es.nodes"],
"port": int(config.ES["es.port"]),
"scheme": "http",
}
]
)
total = s.count()
s = s[0:total]
results = s.execute()
return results
search_results = es.search(
index=index, body={"query": {"match": {"submitter_id": item}}}, size=9999
)
print("Response:")
print(search_results)
result = search_results["hits"]["hits"]
total = len(result)
hits = result[0:total]
# results = s.execute()
return [h.get("_source") for h in hits]


def get_names(p):
Expand Down
2 changes: 1 addition & 1 deletion tests/integrated_tests/value/es_value.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def __init__(self, parser, submitter_id, doc_type, names):
self.val, self.length = self.value()

def __getattr__(self, item):
return self.val.__getattr__(item) if self.val and item in self.val else None
return self.val.get(item) if self.val and item in self.val else None

def value(self):
results = get_item_from_elasticsearch(
Expand Down
4 changes: 3 additions & 1 deletion tests/integrated_tests/value/value.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ def value_diff(left, right):

diffs = ["attr: left != right"]
for name in left.names:
left_val = left.__getattr__(name)
print(left)
print(left.__dict__)
left_val = left.val.get(name)
right_val = right.__getattr__(name)

if isinstance(right_val, list) and left_val is not None:
Expand Down
15 changes: 10 additions & 5 deletions tube/etl/indexers/base/translator.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,23 @@
from tube.utils.spark import save_rdd_of_dataframe, get_all_files, save_rdds
from tube.utils.general import get_node_id_name


def json_export_with_no_key(x, doc_type, root_name):
x[1][get_node_id_name(doc_type)] = x[0]
if root_name is not None and doc_type != root_name:
x[1][get_node_id_name(root_name)] = x[0]
x[1]["node_id"] = x[0] # redundant field for backward compatibility with arranger
return json.dumps(x[1])


cached_dataframe = {}


class Translator(object):
"""
The main entry point into the index export process for the mutation indices
"""

def __init__(self, sc, hdfs_path, writer, parser: Parser):
self.sc = sc
if sc is not None:
Expand Down Expand Up @@ -164,6 +168,7 @@ def translate_table_to_dataframe(
props = props if props is not None else node.props
try:
print(f"Create scheme for node: {node.name}")
print(f"With props: {node.props}")
schema = self.parser.create_schema(node)
df, is_empty = self.read_text_files_of_table(
node_tbl_name, self.get_empty_dataframe_with_name
Expand All @@ -180,7 +185,7 @@ def translate_table_to_dataframe(
new_df = new_df.select(*cols)
return self.return_dataframe(
new_df,
f"{Translator.translate_table_to_dataframe.__qualname__}__{node.name}"
f"{Translator.translate_table_to_dataframe.__qualname__}__{node.name}",
)
except Exception as ex:
print("HAPPEN WITH NODE: {}".format(node_tbl_name))
Expand Down Expand Up @@ -331,7 +336,9 @@ def save_dataframe_to_hadoop(self, df):
def return_dataframe(self, df, dataframe_name):
if config.RUNNING_MODE.lower() == enums.RUNNING_MODE_PRE_TEST.lower():
step_name = f"{self.current_step}_{dataframe_name}"
save_rdd_of_dataframe(df, self.get_path_to_save_dataframe(step_name), self.sc)
save_rdd_of_dataframe(
df, self.get_path_to_save_dataframe(step_name), self.sc
)
return df

def get_path_to_save_dataframe(self, step):
Expand All @@ -342,9 +349,7 @@ def get_path_to_save_dataframe(self, step):
current_number = cached_dataframe.get(dataframe_name) + 1
dataframe_name = f"{dataframe_name}_{current_number}"
cached_dataframe[dataframe_name] = current_number
return os.path.join(
self.hdfs_path, "output", dataframe_name
)
return os.path.join(self.hdfs_path, "output", dataframe_name)

def save_to_hadoop(self, df):
save_rdds(df, self.get_path_from_step(self.current_step), self.sc)
Expand Down
4 changes: 2 additions & 2 deletions tube/etl/outputs/es/timestamp.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,13 @@ def check_to_run_etl(es, index_names):


def timestamp_from_transaction_time(dt):
return "time_{trans_time}".format(trans_time=dt.isoformat("T"))
return "time_{trans_time}".format(trans_time=dt.strftime("%Y-%m-%dT%H-%M-%S"))


def get_timestamp_from_index(es, versioned_index):
res = es.indices.get_alias(index=versioned_index, name="time_*")
iso_str = list(res[versioned_index]["aliases"].keys())[0].replace("plus", "+")[5:]
return datetime.strptime(iso_str, "%Y-%m-%dT%H:%M:%S.%f")
return datetime.strptime(iso_str, "%Y-%m-%dT%H:%M:%S")


def putting_timestamp(es, index_to_write):
Expand Down
21 changes: 10 additions & 11 deletions tube/etl/outputs/es/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,13 @@ def get_es(self):
:return:
"""
es_hosts = self.es_config["es.nodes"]
es_port = self.es_config["es.port"]
return Elasticsearch([{"host": es_hosts, "port": es_port}])
es_port = int(self.es_config["es.port"])
return Elasticsearch([{"host": es_hosts, "port": es_port, "scheme": "http"}])

def write_to_new_index(self, df, index, doc_type):
df = df.map(lambda x: json_export(x, doc_type))
es_config = self.es_config
es_config["es.resource"] = index + "/{}".format(doc_type)
es_config["es.resource"] = index
df.saveAsNewAPIHadoopFile(
path="-",
outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
Expand All @@ -56,7 +56,7 @@ def write_to_new_index(self, df, index, doc_type):

def write_df_to_new_index(self, df, index, doc_type):
es_config = self.es_config
es_config["es.resource"] = index + "/{}".format(doc_type)
es_config["es.resource"] = index
df.coalesce(1).write.format("org.elasticsearch.spark.sql").option(
"es.nodes", es_config["es.nodes"]
).option("es.port", es_config["es.port"]).option(
Expand Down Expand Up @@ -86,11 +86,9 @@ def create_guppy_array_config(self, parser): # etl_index_name, types, array_typ

mapping = {
"mappings": {
"_doc": {
"properties": {
"timestamp": {"type": "date"},
"array": {"type": "keyword"},
}
"properties": {
"timestamp": {"type": "date"},
"array": {"type": "keyword"},
}
}
}
Expand All @@ -107,7 +105,7 @@ def create_guppy_array_config(self, parser): # etl_index_name, types, array_typ
index_to_write = self.versioning.create_new_index(
mapping, self.versioning.get_next_index_version(index)
)
self.es.index(index_to_write, "_doc", id=etl_index_name, body=doc)
self.es.index(index_to_write, body=doc, id=etl_index_name)
self.versioning.putting_new_version_tag(index_to_write, index)
self.versioning.putting_new_version_tag(index_to_write, alias)
putting_timestamp(self.es, index_to_write)
Expand All @@ -124,7 +122,8 @@ def write_dataframe(self, df, index, doc_type, types):
df = plugin(df)

index_to_write = self.versioning.create_new_index(
{"mappings": types}, self.versioning.get_next_index_version(index)
{"mappings": types.get(doc_type)},
self.versioning.get_next_index_version(index),
)
self.write_to_es(
df, index_to_write, index, doc_type, self.write_df_to_new_index
Expand Down

0 comments on commit 83a76a7

Please sign in to comment.