diff --git a/tube/etl/indexers/aggregation/nested/parser.py b/tube/etl/indexers/aggregation/nested/parser.py index e922f36a..c323fb25 100644 --- a/tube/etl/indexers/aggregation/nested/parser.py +++ b/tube/etl/indexers/aggregation/nested/parser.py @@ -20,7 +20,7 @@ def __init__(self, mapping, model, dictionary, root_names): root_name, get_node_table_name(self.model, root_name), root_name, - root_name, + mapping.get("doc_type"), props=[], ) self.get_nested_props(mapping) @@ -35,6 +35,9 @@ def get_nested_props(self, mapping): new_nested_child = self.parse_nested_props(n_idx, root_node, root_name) if new_nested_child is not None: root_node.children.add(new_nested_child) + for root_name, root_node in self.root_nodes.items(): + if len(root_node.children) == 0: + self.root_nodes.pop(root_name) def parse_nested_props(self, mapping, nested_parent_node, parent_label): path = mapping.get("path") diff --git a/tube/etl/indexers/aggregation/nested/translator.py b/tube/etl/indexers/aggregation/nested/translator.py index 02cac2d1..a1655641 100644 --- a/tube/etl/indexers/aggregation/nested/translator.py +++ b/tube/etl/indexers/aggregation/nested/translator.py @@ -51,11 +51,12 @@ def collect_node(self, node, queue): node_df, node_name, child_df, child ) current_node_name = node_name - for parent_label, edge_up_tbl in node.parent_edge_up_tbl: - edge_df = self.collect_edge(edge_up_tbl, current_node_name, parent_label) - if edge_df is not None: - node_df = self.join_two_dataframe(edge_df, node_df) - current_node_name = parent_label + for edge in node.parent_edge_up_tbls: + for parent_label, edge_up_tbl in edge: + edge_df = self.collect_edge(edge_up_tbl, current_node_name, parent_label) + if edge_df is not None: + node_df = self.join_two_dataframe(edge_df, node_df) + current_node_name = parent_label for p in node.parent_nodes: if p is not None: p.children_ready_to_join.append(node) diff --git a/tube/etl/indexers/aggregation/new_translator.py b/tube/etl/indexers/aggregation/new_translator.py index 82cb98df..a99b1a21 100644 --- a/tube/etl/indexers/aggregation/new_translator.py +++ b/tube/etl/indexers/aggregation/new_translator.py @@ -41,14 +41,14 @@ def __init__(self, sc, hdfs_path, writer, mapping, model, dictionary): hdfs_path, writer, { - "root": mapping.get("root"), + "root": mapping.get("doc_type"), "doc_type": mapping.get("doc_type"), "name": mapping.get("name"), "nested_props": mapping.get("nested_props"), }, model, dictionary, - root_names=[mapping.get('root')] + root_names=[mapping.get('doc_type')] ) if nest_props is not None else None @@ -56,14 +56,7 @@ def __init__(self, sc, hdfs_path, writer, mapping, model, dictionary): def update_types(self): es_mapping = super(Translator, self).update_types() - properties = es_mapping.get(self.parser.doc_type).get("properties") - if self.nested_translator is not None: - nested_types = self.nested_translator.update_types() - for a in self.nested_translator.parser.array_types: - if a not in self.parser.array_types: - self.parser.array_types.append(a) - properties.update(nested_types[self.parser.root.name]["properties"]) - return es_mapping + return self.call_to_child_update_types(self.nested_translator, es_mapping) def aggregate_intermediate_data_frame(self, node_name, child, child_df, edge_df): """ diff --git a/tube/etl/indexers/aggregation/nodes/nested_node.py b/tube/etl/indexers/aggregation/nodes/nested_node.py index 54e7a0a7..07c1194c 100644 --- a/tube/etl/indexers/aggregation/nodes/nested_node.py +++ b/tube/etl/indexers/aggregation/nodes/nested_node.py @@ -23,8 +23,8 @@ def __init__( self.level = level self.path = path self.display_name = display_name - self.parent_edge_up_tbl = ( - [] if parent_edge_up_tbl is None else parent_edge_up_tbl + self.parent_edge_up_tbls = ( + [] if parent_edge_up_tbl is None else [parent_edge_up_tbl] ) self.parent_nodes = [parent_node] self.non_leaf_children_count = 0 @@ -33,12 +33,13 @@ def __init__( self.filter = create_filter_from_json(json_filter) def __key__(self): - if self.parent_edge_up_tbl is not None and len(self.parent_edge_up_tbl) > 0: - return self.name, self.parent_edge_up_tbl[0] + if self.parent_edge_up_tbls is not None and len(self.parent_edge_up_tbls) > 0: + return self.name, [p[0] for p in self.parent_edge_up_tbls] return self.name - def add_parent(self, parent_node): + def add_parent(self, parent_node, parent_edge_up_tbl): self.parent_nodes.append(parent_node) + self.parent_edge_up_tbls.append(parent_edge_up_tbl) def __hash__(self): return hash(self.__key__()) diff --git a/tube/etl/indexers/base/translator.py b/tube/etl/indexers/base/translator.py index 9f217972..eb916dfb 100644 --- a/tube/etl/indexers/base/translator.py +++ b/tube/etl/indexers/base/translator.py @@ -61,6 +61,16 @@ def update_types(self): def add_some_additional_props(self, keep_props): keep_props.append(self.parser.get_key_prop().name) + def call_to_child_update_types(self, child_translator, es_mapping): + properties = es_mapping.get(self.parser.doc_type).get("properties") + if child_translator is not None: + nested_types = child_translator.update_types() + for a in child_translator.parser.array_types: + if a not in self.parser.array_types: + self.parser.array_types.append(a) + properties.update(nested_types[self.parser.root.name]["properties"]) + return es_mapping + def remove_unnecessary_columns(self, df): props = list(PropFactory.get_prop_by_doc_name(self.parser.doc_type).values()) keep_props = [p.name for p in props] diff --git a/tube/etl/indexers/injection/new_translator.py b/tube/etl/indexers/injection/new_translator.py index 1b5e7bc3..0e65f648 100644 --- a/tube/etl/indexers/injection/new_translator.py +++ b/tube/etl/indexers/injection/new_translator.py @@ -34,7 +34,7 @@ def __init__(self, sc, hdfs_path, writer, mapping, model, dictionary): hdfs_path, writer, { - "root": mapping.get("root"), + "root": mapping.get("doc_type"), "doc_type": mapping.get("doc_type"), "name": mapping.get("name"), "nested_props": mapping.get("nested_props"), @@ -53,6 +53,10 @@ def __init__(self, sc, hdfs_path, writer, mapping, model, dictionary): PropFactory.add_additional_prop(self.parser.doc_type, "source_node", (str,)) ) + def update_types(self): + es_mapping = super(Translator, self).update_types() + return self.call_to_child_update_types(self.nested_translator, es_mapping) + def collect_leaf(self, child, edge_df, collected_leaf_dfs): if isinstance(child, LeafNode): key_name = self.parser.get_key_prop().name