Skip to content

Commit

Permalink
update types for nested translator in collector etl
Browse files Browse the repository at this point in the history
  • Loading branch information
thanh-nguyen-dang committed Aug 23, 2024
1 parent 3ab1b54 commit d3abf18
Show file tree
Hide file tree
Showing 7 changed files with 76 additions and 33 deletions.
28 changes: 21 additions & 7 deletions tube/etl/indexers/aggregation/nested/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@
from tube.etl.indexers.base.parser import Parser as BaseParser, ES_TYPES


def is_node_in_set(node_set, node_to_check: NestedNode):
for node in node_set:
if node.__key__() == node_to_check.__key__():
return True
return False


class Parser(BaseParser):
"""
The main entry point into the index export process for the mutation indices
Expand All @@ -20,7 +27,7 @@ def __init__(self, mapping, model, dictionary, root_names):
root_name,
get_node_table_name(self.model, root_name),
root_name,
root_name,
mapping.get("doc_type"),
props=[],
)
self.get_nested_props(mapping)
Expand All @@ -35,6 +42,11 @@ def get_nested_props(self, mapping):
new_nested_child = self.parse_nested_props(n_idx, root_node, root_name)
if new_nested_child is not None:
root_node.children.add(new_nested_child)
self.root_nodes = {
root_name: root_node
for root_name, root_node in self.root_nodes.items()
if len(root_node.children) > 0.
}

def parse_nested_props(self, mapping, nested_parent_node, parent_label):
path = mapping.get("path")
Expand Down Expand Up @@ -62,7 +74,7 @@ def parse_nested_props(self, mapping, nested_parent_node, parent_label):
self.doc_type, mapping.get("props"), node_label=current_node_label
)

if current_node_label not in self.all_nested_nodes:
if path not in self.all_nested_nodes:
current_nested_node = NestedNode(
current_node_label,
tbl_name,
Expand All @@ -73,19 +85,21 @@ def parse_nested_props(self, mapping, nested_parent_node, parent_label):
parent_edge_up_tbl=parent_edge_up_tbls,
json_filter=mapping.get("filter"),
)
self.all_nested_nodes[current_node_label] = current_nested_node
self.all_nested_nodes[path] = current_nested_node
else:
current_nested_node = self.all_nested_nodes[current_node_label]
current_nested_node.add_parent(nested_parent_node)
current_nested_node = self.all_nested_nodes[path]
current_nested_node.add_parent(nested_parent_node, parent_edge_up_tbls)
nested_idxes = mapping.get("nested_props", [])
for n_idx in nested_idxes:
new_nested_child = self.parse_nested_props(n_idx, current_nested_node, current_node_label)
if new_nested_child is not None:
current_nested_node.children.add(new_nested_child)

if len(current_nested_node.children) == 0:
if (not is_node_in_set(self.leaves, current_nested_node)
and len(current_nested_node.children) == 0):
self.leaves.append(current_nested_node)
else:
elif (not is_node_in_set(self.collectors, current_nested_node)
and len(current_nested_node.children) > 0):
self.collectors.append(current_nested_node)

return current_nested_node
Expand Down
39 changes: 30 additions & 9 deletions tube/etl/indexers/aggregation/nested/translator.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,16 @@ def collect_tree(self):
df = execute_filter(df, queue[i].filter)
self.collected_node_dfs[queue[i].name] = df
i += 1
return self.collected_node_dfs[queue[len(queue) - 1].name]
latest_queue_item = queue[-1]
final_df = self.collected_node_dfs[latest_queue_item.name]
root_id_field = get_node_id_name(self.parser.doc_type)
nested_props = [c_child.display_name for c_child in latest_queue_item.children]
cols = self.get_cols_from_node(latest_queue_item.name, latest_queue_item.props,
nested_props, final_df)
final_df = final_df.groupBy(root_id_field).agg(
collect_list(struct(*cols)).alias(latest_queue_item.display_name)
)
return final_df

def collect_node(self, node, queue):
node_df = self.translate_table_to_dataframe(node, node.props)
Expand All @@ -50,18 +59,30 @@ def collect_node(self, node, queue):
node_df = self.collect_structural_df(
node_df, node_name, child_df, child
)
current_node_name = node_name
for parent_label, edge_up_tbl in node.parent_edge_up_tbl:
edge_df = self.collect_edge(edge_up_tbl, current_node_name, parent_label)
if edge_df is not None:
node_df = self.join_two_dataframe(edge_df, node_df)
current_node_name = parent_label
up_df = None
for edge in node.parent_edge_up_tbls:
it_df = node_df
current_node_name = node_name
for parent_label, edge_up_tbl in edge:
edge_df = self.collect_edge(edge_up_tbl, current_node_name, parent_label)
if edge_df is not None:
it_df = self.join_two_dataframe(edge_df, it_df)
current_node_name = parent_label
root_id_field = get_node_id_name(current_node_name)
if node.level is None or node.level == 1:
it_df = it_df.withColumnRenamed(root_id_field, get_node_id_name(self.parser.doc_type))
up_df = (
it_df
if up_df is None
else up_df.unionByName(it_df).distinct()
)

for p in node.parent_nodes:
if p is not None:
p.children_ready_to_join.append(node)
if len(p.children_ready_to_join) == len(p.children):
if p.level is not None and len(p.children_ready_to_join) == len(p.children):
queue.append(p)
return node_df
return up_df

def collect_structural_df(self, node_df, node_name, child_df, child):
id_field = get_node_id_name(node_name)
Expand Down
13 changes: 3 additions & 10 deletions tube/etl/indexers/aggregation/new_translator.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,29 +41,22 @@ def __init__(self, sc, hdfs_path, writer, mapping, model, dictionary):
hdfs_path,
writer,
{
"root": mapping.get("root"),
"root": mapping.get("doc_type"),
"doc_type": mapping.get("doc_type"),
"name": mapping.get("name"),
"nested_props": mapping.get("nested_props"),
},
model,
dictionary,
root_names=[mapping.get('root')]
root_names=[mapping.get('doc_type')]
)
if nest_props is not None
else None
)

def update_types(self):
es_mapping = super(Translator, self).update_types()
properties = es_mapping.get(self.parser.doc_type).get("properties")
if self.nested_translator is not None:
nested_types = self.nested_translator.update_types()
for a in self.nested_translator.parser.array_types:
if a not in self.parser.array_types:
self.parser.array_types.append(a)
properties.update(nested_types[self.parser.root.name]["properties"])
return es_mapping
return self.call_to_child_update_types(self.nested_translator, es_mapping)

def aggregate_intermediate_data_frame(self, node_name, child, child_df, edge_df):
"""
Expand Down
11 changes: 6 additions & 5 deletions tube/etl/indexers/aggregation/nodes/nested_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ def __init__(
self.level = level
self.path = path
self.display_name = display_name
self.parent_edge_up_tbl = (
[] if parent_edge_up_tbl is None else parent_edge_up_tbl
self.parent_edge_up_tbls = (
[] if parent_edge_up_tbl is None else [parent_edge_up_tbl]
)
self.parent_nodes = [parent_node]
self.non_leaf_children_count = 0
Expand All @@ -33,12 +33,13 @@ def __init__(
self.filter = create_filter_from_json(json_filter)

def __key__(self):
if self.parent_edge_up_tbl is not None and len(self.parent_edge_up_tbl) > 0:
return self.name, self.parent_edge_up_tbl[0]
if self.parent_edge_up_tbls is not None and len(self.parent_edge_up_tbls) > 0:
return self.name, ",".join([str(p[0]) for p in self.parent_edge_up_tbls]), self.path
return self.name

def add_parent(self, parent_node):
def add_parent(self, parent_node, parent_edge_up_tbl):
self.parent_nodes.append(parent_node)
self.parent_edge_up_tbls.append(parent_edge_up_tbl)

def __hash__(self):
return hash(self.__key__())
Expand Down
4 changes: 2 additions & 2 deletions tube/etl/indexers/base/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,9 @@ def get_python_type_of_prop(self, p, array_types):
if array_type_condition:
if array_types is not None and p.name not in array_types:
array_types.append(p.name)
return (self.select_widest_type(p.type), 1)
return self.select_widest_type(p.type), 1
else:
return (self.select_widest_type(p.type), 0)
return self.select_widest_type(p.type), 0

def get_es_types(self):
types = {}
Expand Down
10 changes: 10 additions & 0 deletions tube/etl/indexers/base/translator.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,16 @@ def update_types(self):
def add_some_additional_props(self, keep_props):
keep_props.append(self.parser.get_key_prop().name)

def call_to_child_update_types(self, child_translator, es_mapping):
properties = es_mapping.get(self.parser.doc_type).get("properties")
if child_translator is not None:
nested_types = child_translator.update_types()
for a in child_translator.parser.array_types:
if a not in self.parser.array_types:
self.parser.array_types.append(a)
properties.update(nested_types[self.parser.doc_type]["properties"])
return es_mapping

def remove_unnecessary_columns(self, df):
props = list(PropFactory.get_prop_by_doc_name(self.parser.doc_type).values())
keep_props = [p.name for p in props]
Expand Down
4 changes: 4 additions & 0 deletions tube/etl/indexers/injection/new_translator.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ def __init__(self, sc, hdfs_path, writer, mapping, model, dictionary):
PropFactory.add_additional_prop(self.parser.doc_type, "source_node", (str,))
)

def update_types(self):
es_mapping = super(Translator, self).update_types()
return self.call_to_child_update_types(self.nested_translator, es_mapping)

def collect_leaf(self, child, edge_df, collected_leaf_dfs):
if isinstance(child, LeafNode):
key_name = self.parser.get_key_prop().name
Expand Down

0 comments on commit d3abf18

Please sign in to comment.