Skip to content

Commit

Permalink
update types for nested translator in collector etl
Browse files Browse the repository at this point in the history
  • Loading branch information
thanh-nguyen-dang committed Aug 21, 2024
1 parent 3ab1b54 commit 64f1287
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 22 deletions.
5 changes: 4 additions & 1 deletion tube/etl/indexers/aggregation/nested/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def __init__(self, mapping, model, dictionary, root_names):
root_name,
get_node_table_name(self.model, root_name),
root_name,
root_name,
mapping.get("doc_type"),
props=[],
)
self.get_nested_props(mapping)
Expand All @@ -35,6 +35,9 @@ def get_nested_props(self, mapping):
new_nested_child = self.parse_nested_props(n_idx, root_node, root_name)
if new_nested_child is not None:
root_node.children.add(new_nested_child)
for root_name, root_node in self.root_nodes.items():
if len(root_node.children) == 0:
self.root_nodes.pop(root_name)

def parse_nested_props(self, mapping, nested_parent_node, parent_label):
path = mapping.get("path")
Expand Down
11 changes: 6 additions & 5 deletions tube/etl/indexers/aggregation/nested/translator.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,12 @@ def collect_node(self, node, queue):
node_df, node_name, child_df, child
)
current_node_name = node_name
for parent_label, edge_up_tbl in node.parent_edge_up_tbl:
edge_df = self.collect_edge(edge_up_tbl, current_node_name, parent_label)
if edge_df is not None:
node_df = self.join_two_dataframe(edge_df, node_df)
current_node_name = parent_label
for edge in node.parent_edge_up_tbls:
for parent_label, edge_up_tbl in edge:
edge_df = self.collect_edge(edge_up_tbl, current_node_name, parent_label)
if edge_df is not None:
node_df = self.join_two_dataframe(edge_df, node_df)
current_node_name = parent_label
for p in node.parent_nodes:
if p is not None:
p.children_ready_to_join.append(node)
Expand Down
13 changes: 3 additions & 10 deletions tube/etl/indexers/aggregation/new_translator.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,29 +41,22 @@ def __init__(self, sc, hdfs_path, writer, mapping, model, dictionary):
hdfs_path,
writer,
{
"root": mapping.get("root"),
"root": mapping.get("doc_type"),
"doc_type": mapping.get("doc_type"),
"name": mapping.get("name"),
"nested_props": mapping.get("nested_props"),
},
model,
dictionary,
root_names=[mapping.get('root')]
root_names=[mapping.get('doc_type')]
)
if nest_props is not None
else None
)

def update_types(self):
es_mapping = super(Translator, self).update_types()
properties = es_mapping.get(self.parser.doc_type).get("properties")
if self.nested_translator is not None:
nested_types = self.nested_translator.update_types()
for a in self.nested_translator.parser.array_types:
if a not in self.parser.array_types:
self.parser.array_types.append(a)
properties.update(nested_types[self.parser.root.name]["properties"])
return es_mapping
return self.call_to_child_update_types(self.nested_translator, es_mapping)

def aggregate_intermediate_data_frame(self, node_name, child, child_df, edge_df):
"""
Expand Down
11 changes: 6 additions & 5 deletions tube/etl/indexers/aggregation/nodes/nested_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ def __init__(
self.level = level
self.path = path
self.display_name = display_name
self.parent_edge_up_tbl = (
[] if parent_edge_up_tbl is None else parent_edge_up_tbl
self.parent_edge_up_tbls = (
[] if parent_edge_up_tbl is None else [parent_edge_up_tbl]
)
self.parent_nodes = [parent_node]
self.non_leaf_children_count = 0
Expand All @@ -33,12 +33,13 @@ def __init__(
self.filter = create_filter_from_json(json_filter)

def __key__(self):
if self.parent_edge_up_tbl is not None and len(self.parent_edge_up_tbl) > 0:
return self.name, self.parent_edge_up_tbl[0]
if self.parent_edge_up_tbls is not None and len(self.parent_edge_up_tbls) > 0:
return self.name, [p[0] for p in self.parent_edge_up_tbls]
return self.name

def add_parent(self, parent_node):
def add_parent(self, parent_node, parent_edge_up_tbl):
self.parent_nodes.append(parent_node)
self.parent_edge_up_tbls.append(parent_edge_up_tbl)

def __hash__(self):
return hash(self.__key__())
Expand Down
10 changes: 10 additions & 0 deletions tube/etl/indexers/base/translator.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,16 @@ def update_types(self):
def add_some_additional_props(self, keep_props):
keep_props.append(self.parser.get_key_prop().name)

def call_to_child_update_types(self, child_translator, es_mapping):
properties = es_mapping.get(self.parser.doc_type).get("properties")
if child_translator is not None:
nested_types = child_translator.update_types()
for a in child_translator.parser.array_types:
if a not in self.parser.array_types:
self.parser.array_types.append(a)
properties.update(nested_types[self.parser.root.name]["properties"])
return es_mapping

def remove_unnecessary_columns(self, df):
props = list(PropFactory.get_prop_by_doc_name(self.parser.doc_type).values())
keep_props = [p.name for p in props]
Expand Down
6 changes: 5 additions & 1 deletion tube/etl/indexers/injection/new_translator.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def __init__(self, sc, hdfs_path, writer, mapping, model, dictionary):
hdfs_path,
writer,
{
"root": mapping.get("root"),
"root": mapping.get("doc_type"),
"doc_type": mapping.get("doc_type"),
"name": mapping.get("name"),
"nested_props": mapping.get("nested_props"),
Expand All @@ -53,6 +53,10 @@ def __init__(self, sc, hdfs_path, writer, mapping, model, dictionary):
PropFactory.add_additional_prop(self.parser.doc_type, "source_node", (str,))
)

def update_types(self):
es_mapping = super(Translator, self).update_types()
return self.call_to_child_update_types(self.nested_translator, es_mapping)

def collect_leaf(self, child, edge_df, collected_leaf_dfs):
if isinstance(child, LeafNode):
key_name = self.parser.get_key_prop().name
Expand Down

0 comments on commit 64f1287

Please sign in to comment.