Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(es7): test tube with es-7 #238

Merged
merged 6 commits into from
Nov 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ wheels/
*.egg-info/
.installed.cfg
*.egg
temp/

# PyInstaller
# Usually these files are written by a python script from a template
Expand Down
546 changes: 260 additions & 286 deletions poetry.lock

Large diffs are not rendered by default.

5 changes: 2 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,14 @@ python = "^3.9"
cdislogging = "1.1.1"
cryptography = "^37.0.4"
dictionaryutils = "^3.0.2"
elasticsearch = "6.4.0"
elasticsearch-dsl = "6.4.0"
elasticsearch = "7.10.0"
gdcdictionary = "1.2.0"
gen3datamodel = "3.0.2"
hdfs = "2.7.0"
psycopg2-binary="2.9.3"
psqlgraph = "3.0.1"
pyspark = "3.3.0"
urllib3 = "1.25.10"
urllib3 = "1.26.16"
requests = "^2.28.1"
setuptools = "65.3.0"
six = "1.16.0"
Expand Down
4 changes: 2 additions & 2 deletions run_etl.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ def main():
args = config_by_args()

es_hosts = config.ES["es.nodes"]
es_port = config.ES["es.port"]
es = Elasticsearch([{"host": es_hosts, "port": es_port}])
es_port = int(config.ES["es.port"])
es = Elasticsearch([{"host": es_hosts, "port": es_port, "schema": "http"}])
index_names = interpreter.get_index_names(config)

if args.force or check_to_run_etl(es, index_names):
Expand Down
34 changes: 25 additions & 9 deletions tests/integrated_tests/test_es/test_es.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import pytest
from elasticsearch import Elasticsearch
from elasticsearch import client
from elasticsearch_dsl import Search

import tube.settings as config
from tube.etl.indexers.interpreter import create_translators
Expand All @@ -22,14 +21,23 @@ def test_auth_resource_path_exist(doc_type):
Check that the field "auth_resource_path" exist
"""
parser = dict_translators[doc_type].parser
es = Elasticsearch([{"host": config.ES["es.nodes"], "port": config.ES["es.port"]}])
s = Search(using=es, index=parser.name, doc_type=doc_type)
response = s.execute()
es = Elasticsearch(
[
{
"host": config.ES["es.nodes"],
"port": int(config.ES["es.port"]),
"scheme": "http",
}
]
)
response = es.search(
index=parser.name, body={"query": {"match_all": {}}}, size=9999
)

hit_response = response["hits"]["hits"]
auth_resource_path = "/programs/jnkns/projects/jenkins"

for hit in response:
assert hit.auth_resource_path == auth_resource_path
for hit in hit_response:
assert hit.get("_source").get("auth_resource_path") == auth_resource_path


@pytest.mark.parametrize("doc_type", doc_types)
Expand All @@ -38,7 +46,15 @@ def test_es_types(doc_type):
Check that no field have "text" type
"""
parser = dict_translators[doc_type].parser
es = Elasticsearch([{"host": config.ES["es.nodes"], "port": config.ES["es.port"]}])
es = Elasticsearch(
[
{
"host": config.ES["es.nodes"],
"port": int(config.ES["es.port"]),
"scheme": "http",
}
]
)

indices = client.IndicesClient(es)
index_name = list(indices.get_alias(name=parser.name).keys())[0]
Expand All @@ -47,7 +63,7 @@ def test_es_types(doc_type):

# assert "_None_id" not in mapping[index_name]["mappings"][doc_type]["properties"]
list_errors = []
for k, t in list(mapping[index_name]["mappings"][doc_type]["properties"].items()):
for k, t in list(mapping[index_name]["mappings"]["properties"].items()):
try:
assert t["type"] != "text", f"field {k} has type as text"
except AssertionError as ex:
Expand Down
26 changes: 18 additions & 8 deletions tests/integrated_tests/utils_es.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,30 @@
from itertools import chain

from elasticsearch import Elasticsearch
from elasticsearch_dsl import Search

import tube.settings as config


def get_item_from_elasticsearch(index, doc_type, item):
es = Elasticsearch([{"host": config.ES["es.nodes"], "port": config.ES["es.port"]}])
s = Search(using=es, index=index, doc_type=doc_type).query(
"match", submitter_id=item
es = Elasticsearch(
[
{
"host": config.ES["es.nodes"],
"port": int(config.ES["es.port"]),
"scheme": "http",
}
]
)
total = s.count()
s = s[0:total]
results = s.execute()
return results
search_results = es.search(
index=index, body={"query": {"match": {"submitter_id": item}}}, size=9999
)
print("Response:")
print(search_results)
result = search_results["hits"]["hits"]
total = len(result)
hits = result[0:total]
# results = s.execute()
return [h.get("_source") for h in hits]


def get_names(p):
Expand Down
2 changes: 1 addition & 1 deletion tests/integrated_tests/value/es_value.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def __init__(self, parser, submitter_id, doc_type, names):
self.val, self.length = self.value()

def __getattr__(self, item):
return self.val.__getattr__(item) if self.val and item in self.val else None
return self.val.get(item) if self.val and item in self.val else None

def value(self):
results = get_item_from_elasticsearch(
Expand Down
4 changes: 3 additions & 1 deletion tests/integrated_tests/value/value.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ def value_diff(left, right):

diffs = ["attr: left != right"]
for name in left.names:
left_val = left.__getattr__(name)
print(left)
print(left.__dict__)
left_val = left.val.get(name)
right_val = right.__getattr__(name)

if isinstance(right_val, list) and left_val is not None:
Expand Down
15 changes: 10 additions & 5 deletions tube/etl/indexers/base/translator.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,23 @@
from tube.utils.spark import save_rdd_of_dataframe, get_all_files, save_rdds
from tube.utils.general import get_node_id_name


def json_export_with_no_key(x, doc_type, root_name):
x[1][get_node_id_name(doc_type)] = x[0]
if root_name is not None and doc_type != root_name:
x[1][get_node_id_name(root_name)] = x[0]
x[1]["node_id"] = x[0] # redundant field for backward compatibility with arranger
return json.dumps(x[1])


cached_dataframe = {}


class Translator(object):
"""
The main entry point into the index export process for the mutation indices
"""

def __init__(self, sc, hdfs_path, writer, parser: Parser):
self.sc = sc
if sc is not None:
Expand Down Expand Up @@ -164,6 +168,7 @@ def translate_table_to_dataframe(
props = props if props is not None else node.props
try:
print(f"Create scheme for node: {node.name}")
print(f"With props: {node.props}")
schema = self.parser.create_schema(node)
df, is_empty = self.read_text_files_of_table(
node_tbl_name, self.get_empty_dataframe_with_name
Expand All @@ -180,7 +185,7 @@ def translate_table_to_dataframe(
new_df = new_df.select(*cols)
return self.return_dataframe(
new_df,
f"{Translator.translate_table_to_dataframe.__qualname__}__{node.name}"
f"{Translator.translate_table_to_dataframe.__qualname__}__{node.name}",
)
except Exception as ex:
print("HAPPEN WITH NODE: {}".format(node_tbl_name))
Expand Down Expand Up @@ -331,7 +336,9 @@ def save_dataframe_to_hadoop(self, df):
def return_dataframe(self, df, dataframe_name):
if config.RUNNING_MODE.lower() == enums.RUNNING_MODE_PRE_TEST.lower():
step_name = f"{self.current_step}_{dataframe_name}"
save_rdd_of_dataframe(df, self.get_path_to_save_dataframe(step_name), self.sc)
save_rdd_of_dataframe(
df, self.get_path_to_save_dataframe(step_name), self.sc
)
return df

def get_path_to_save_dataframe(self, step):
Expand All @@ -342,9 +349,7 @@ def get_path_to_save_dataframe(self, step):
current_number = cached_dataframe.get(dataframe_name) + 1
dataframe_name = f"{dataframe_name}_{current_number}"
cached_dataframe[dataframe_name] = current_number
return os.path.join(
self.hdfs_path, "output", dataframe_name
)
return os.path.join(self.hdfs_path, "output", dataframe_name)

def save_to_hadoop(self, df):
save_rdds(df, self.get_path_from_step(self.current_step), self.sc)
Expand Down
4 changes: 2 additions & 2 deletions tube/etl/outputs/es/timestamp.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,13 @@ def check_to_run_etl(es, index_names):


def timestamp_from_transaction_time(dt):
return "time_{trans_time}".format(trans_time=dt.isoformat("T"))
return "time_{trans_time}".format(trans_time=dt.strftime("%Y-%m-%dT%H-%M-%S"))


def get_timestamp_from_index(es, versioned_index):
res = es.indices.get_alias(index=versioned_index, name="time_*")
iso_str = list(res[versioned_index]["aliases"].keys())[0].replace("plus", "+")[5:]
return datetime.strptime(iso_str, "%Y-%m-%dT%H:%M:%S.%f")
return datetime.strptime(iso_str, "%Y-%m-%dT%H:%M:%S")


def putting_timestamp(es, index_to_write):
Expand Down
21 changes: 10 additions & 11 deletions tube/etl/outputs/es/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,13 @@ def get_es(self):
:return:
"""
es_hosts = self.es_config["es.nodes"]
es_port = self.es_config["es.port"]
return Elasticsearch([{"host": es_hosts, "port": es_port}])
es_port = int(self.es_config["es.port"])
return Elasticsearch([{"host": es_hosts, "port": es_port, "scheme": "http"}])

def write_to_new_index(self, df, index, doc_type):
df = df.map(lambda x: json_export(x, doc_type))
es_config = self.es_config
es_config["es.resource"] = index + "/{}".format(doc_type)
es_config["es.resource"] = index
df.saveAsNewAPIHadoopFile(
path="-",
outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
Expand All @@ -56,7 +56,7 @@ def write_to_new_index(self, df, index, doc_type):

def write_df_to_new_index(self, df, index, doc_type):
es_config = self.es_config
es_config["es.resource"] = index + "/{}".format(doc_type)
es_config["es.resource"] = index
df.coalesce(1).write.format("org.elasticsearch.spark.sql").option(
"es.nodes", es_config["es.nodes"]
).option("es.port", es_config["es.port"]).option(
Expand Down Expand Up @@ -86,11 +86,9 @@ def create_guppy_array_config(self, parser): # etl_index_name, types, array_typ

mapping = {
"mappings": {
"_doc": {
"properties": {
"timestamp": {"type": "date"},
"array": {"type": "keyword"},
}
"properties": {
"timestamp": {"type": "date"},
"array": {"type": "keyword"},
}
}
}
Expand All @@ -107,7 +105,7 @@ def create_guppy_array_config(self, parser): # etl_index_name, types, array_typ
index_to_write = self.versioning.create_new_index(
mapping, self.versioning.get_next_index_version(index)
)
self.es.index(index_to_write, "_doc", id=etl_index_name, body=doc)
self.es.index(index_to_write, body=doc, id=etl_index_name)
self.versioning.putting_new_version_tag(index_to_write, index)
self.versioning.putting_new_version_tag(index_to_write, alias)
putting_timestamp(self.es, index_to_write)
Expand All @@ -124,7 +122,8 @@ def write_dataframe(self, df, index, doc_type, types):
df = plugin(df)

index_to_write = self.versioning.create_new_index(
{"mappings": types}, self.versioning.get_next_index_version(index)
{"mappings": types.get(doc_type)},
self.versioning.get_next_index_version(index),
)
self.write_to_es(
df, index_to_write, index, doc_type, self.write_df_to_new_index
Expand Down
Loading