From 63969ac79c3cd8218d8018f9850dccbbad38d93d Mon Sep 17 00:00:00 2001 From: Thanh Nguyen Date: Wed, 21 Aug 2024 10:34:48 -0500 Subject: [PATCH] update types for nested translator in collector etl --- .../etl/indexers/aggregation/nested/parser.py | 28 ++++++++++++++----- .../indexers/aggregation/nested/translator.py | 23 +++++++++++---- .../indexers/aggregation/new_translator.py | 13 ++------- .../indexers/aggregation/nodes/nested_node.py | 11 ++++---- tube/etl/indexers/base/parser.py | 4 +-- tube/etl/indexers/base/translator.py | 10 +++++++ tube/etl/indexers/injection/new_translator.py | 4 +++ 7 files changed, 63 insertions(+), 30 deletions(-) diff --git a/tube/etl/indexers/aggregation/nested/parser.py b/tube/etl/indexers/aggregation/nested/parser.py index e922f36a..c65bccfc 100644 --- a/tube/etl/indexers/aggregation/nested/parser.py +++ b/tube/etl/indexers/aggregation/nested/parser.py @@ -4,6 +4,13 @@ from tube.etl.indexers.base.parser import Parser as BaseParser, ES_TYPES +def is_node_in_set(node_set, node_to_check: NestedNode): + for node in node_set: + if node.__key__() == node_to_check.__key__(): + return True + return False + + class Parser(BaseParser): """ The main entry point into the index export process for the mutation indices @@ -20,7 +27,7 @@ def __init__(self, mapping, model, dictionary, root_names): root_name, get_node_table_name(self.model, root_name), root_name, - root_name, + mapping.get("doc_type"), props=[], ) self.get_nested_props(mapping) @@ -35,6 +42,11 @@ def get_nested_props(self, mapping): new_nested_child = self.parse_nested_props(n_idx, root_node, root_name) if new_nested_child is not None: root_node.children.add(new_nested_child) + self.root_nodes = { + root_name: root_node + for root_name, root_node in self.root_nodes.items() + if len(root_node.children) > 0. + } def parse_nested_props(self, mapping, nested_parent_node, parent_label): path = mapping.get("path") @@ -62,7 +74,7 @@ def parse_nested_props(self, mapping, nested_parent_node, parent_label): self.doc_type, mapping.get("props"), node_label=current_node_label ) - if current_node_label not in self.all_nested_nodes: + if path not in self.all_nested_nodes: current_nested_node = NestedNode( current_node_label, tbl_name, @@ -73,19 +85,21 @@ def parse_nested_props(self, mapping, nested_parent_node, parent_label): parent_edge_up_tbl=parent_edge_up_tbls, json_filter=mapping.get("filter"), ) - self.all_nested_nodes[current_node_label] = current_nested_node + self.all_nested_nodes[path] = current_nested_node else: - current_nested_node = self.all_nested_nodes[current_node_label] - current_nested_node.add_parent(nested_parent_node) + current_nested_node = self.all_nested_nodes[path] + current_nested_node.add_parent(nested_parent_node, parent_edge_up_tbls) nested_idxes = mapping.get("nested_props", []) for n_idx in nested_idxes: new_nested_child = self.parse_nested_props(n_idx, current_nested_node, current_node_label) if new_nested_child is not None: current_nested_node.children.add(new_nested_child) - if len(current_nested_node.children) == 0: + if (not is_node_in_set(self.leaves, current_nested_node) + and len(current_nested_node.children) == 0): self.leaves.append(current_nested_node) - else: + elif (not is_node_in_set(self.collectors, current_nested_node) + and len(current_nested_node.children) > 0): self.collectors.append(current_nested_node) return current_nested_node diff --git a/tube/etl/indexers/aggregation/nested/translator.py b/tube/etl/indexers/aggregation/nested/translator.py index 02cac2d1..b584c229 100644 --- a/tube/etl/indexers/aggregation/nested/translator.py +++ b/tube/etl/indexers/aggregation/nested/translator.py @@ -51,17 +51,28 @@ def collect_node(self, node, queue): node_df, node_name, child_df, child ) current_node_name = node_name - for parent_label, edge_up_tbl in node.parent_edge_up_tbl: - edge_df = self.collect_edge(edge_up_tbl, current_node_name, parent_label) - if edge_df is not None: - node_df = self.join_two_dataframe(edge_df, node_df) - current_node_name = parent_label + up_df = node_df + for edge in node.parent_edge_up_tbls: + it_df = node_df + for parent_label, edge_up_tbl in edge: + edge_df = self.collect_edge(edge_up_tbl, current_node_name, parent_label) + if edge_df is not None: + it_df = self.join_two_dataframe(edge_df, it_df) + current_node_name = parent_label + root_id_field = get_node_id_name(current_node_name) + it_df = it_df.withColumnRenamed(root_id_field, get_node_id_name(self.parser.doc_type)) + up_df = ( + it_df + if up_df is None + else up_df.unionByName(it_df).distinct() + ) + for p in node.parent_nodes: if p is not None: p.children_ready_to_join.append(node) if len(p.children_ready_to_join) == len(p.children): queue.append(p) - return node_df + return up_df def collect_structural_df(self, node_df, node_name, child_df, child): id_field = get_node_id_name(node_name) diff --git a/tube/etl/indexers/aggregation/new_translator.py b/tube/etl/indexers/aggregation/new_translator.py index 82cb98df..a99b1a21 100644 --- a/tube/etl/indexers/aggregation/new_translator.py +++ b/tube/etl/indexers/aggregation/new_translator.py @@ -41,14 +41,14 @@ def __init__(self, sc, hdfs_path, writer, mapping, model, dictionary): hdfs_path, writer, { - "root": mapping.get("root"), + "root": mapping.get("doc_type"), "doc_type": mapping.get("doc_type"), "name": mapping.get("name"), "nested_props": mapping.get("nested_props"), }, model, dictionary, - root_names=[mapping.get('root')] + root_names=[mapping.get('doc_type')] ) if nest_props is not None else None @@ -56,14 +56,7 @@ def __init__(self, sc, hdfs_path, writer, mapping, model, dictionary): def update_types(self): es_mapping = super(Translator, self).update_types() - properties = es_mapping.get(self.parser.doc_type).get("properties") - if self.nested_translator is not None: - nested_types = self.nested_translator.update_types() - for a in self.nested_translator.parser.array_types: - if a not in self.parser.array_types: - self.parser.array_types.append(a) - properties.update(nested_types[self.parser.root.name]["properties"]) - return es_mapping + return self.call_to_child_update_types(self.nested_translator, es_mapping) def aggregate_intermediate_data_frame(self, node_name, child, child_df, edge_df): """ diff --git a/tube/etl/indexers/aggregation/nodes/nested_node.py b/tube/etl/indexers/aggregation/nodes/nested_node.py index 54e7a0a7..8c3b1f0d 100644 --- a/tube/etl/indexers/aggregation/nodes/nested_node.py +++ b/tube/etl/indexers/aggregation/nodes/nested_node.py @@ -23,8 +23,8 @@ def __init__( self.level = level self.path = path self.display_name = display_name - self.parent_edge_up_tbl = ( - [] if parent_edge_up_tbl is None else parent_edge_up_tbl + self.parent_edge_up_tbls = ( + [] if parent_edge_up_tbl is None else [parent_edge_up_tbl] ) self.parent_nodes = [parent_node] self.non_leaf_children_count = 0 @@ -33,12 +33,13 @@ def __init__( self.filter = create_filter_from_json(json_filter) def __key__(self): - if self.parent_edge_up_tbl is not None and len(self.parent_edge_up_tbl) > 0: - return self.name, self.parent_edge_up_tbl[0] + if self.parent_edge_up_tbls is not None and len(self.parent_edge_up_tbls) > 0: + return self.name, ",".join([str(p[0]) for p in self.parent_edge_up_tbls]), self.path return self.name - def add_parent(self, parent_node): + def add_parent(self, parent_node, parent_edge_up_tbl): self.parent_nodes.append(parent_node) + self.parent_edge_up_tbls.append(parent_edge_up_tbl) def __hash__(self): return hash(self.__key__()) diff --git a/tube/etl/indexers/base/parser.py b/tube/etl/indexers/base/parser.py index 3069e791..ce38d3b3 100644 --- a/tube/etl/indexers/base/parser.py +++ b/tube/etl/indexers/base/parser.py @@ -103,9 +103,9 @@ def get_python_type_of_prop(self, p, array_types): if array_type_condition: if array_types is not None and p.name not in array_types: array_types.append(p.name) - return (self.select_widest_type(p.type), 1) + return self.select_widest_type(p.type), 1 else: - return (self.select_widest_type(p.type), 0) + return self.select_widest_type(p.type), 0 def get_es_types(self): types = {} diff --git a/tube/etl/indexers/base/translator.py b/tube/etl/indexers/base/translator.py index 9f217972..ff131709 100644 --- a/tube/etl/indexers/base/translator.py +++ b/tube/etl/indexers/base/translator.py @@ -61,6 +61,16 @@ def update_types(self): def add_some_additional_props(self, keep_props): keep_props.append(self.parser.get_key_prop().name) + def call_to_child_update_types(self, child_translator, es_mapping): + properties = es_mapping.get(self.parser.doc_type).get("properties") + if child_translator is not None: + nested_types = child_translator.update_types() + for a in child_translator.parser.array_types: + if a not in self.parser.array_types: + self.parser.array_types.append(a) + properties.update(nested_types[self.parser.doc_type]["properties"]) + return es_mapping + def remove_unnecessary_columns(self, df): props = list(PropFactory.get_prop_by_doc_name(self.parser.doc_type).values()) keep_props = [p.name for p in props] diff --git a/tube/etl/indexers/injection/new_translator.py b/tube/etl/indexers/injection/new_translator.py index 1b5e7bc3..cefbb1c6 100644 --- a/tube/etl/indexers/injection/new_translator.py +++ b/tube/etl/indexers/injection/new_translator.py @@ -53,6 +53,10 @@ def __init__(self, sc, hdfs_path, writer, mapping, model, dictionary): PropFactory.add_additional_prop(self.parser.doc_type, "source_node", (str,)) ) + def update_types(self): + es_mapping = super(Translator, self).update_types() + return self.call_to_child_update_types(self.nested_translator, es_mapping) + def collect_leaf(self, child, edge_df, collected_leaf_dfs): if isinstance(child, LeafNode): key_name = self.parser.get_key_prop().name