Skip to content

Commit

Permalink
fix(reducer): correct the agg reducers
Browse files Browse the repository at this point in the history
  • Loading branch information
thanh-nguyen-dang committed Oct 30, 2023
1 parent 728bfb2 commit c722bb7
Showing 1 changed file with 19 additions and 17 deletions.
36 changes: 19 additions & 17 deletions tube/etl/indexers/aggregation/new_translator.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,18 +97,12 @@ def aggregate_intermediate_data_frame(self, node_name, child, child_df, edge_df)

select_expr = [get_node_id_name(node_name)]
for n in child_df.schema.names:
if n in self.parser.reducer_by_prop and self.parser.reducer_by_prop.get(
n
) in ["list", "set"]:
select_expr.append(
self.reducer_to_agg_func_expr(
self.parser.reducer_by_prop.get(n), n, is_merging=True
)
)
if n in self.parser.reducer_by_prop:
select_expr.append(n)
tmp_df = tmp_df.select(*select_expr)
return self.return_dataframe(
tmp_df,
f"{Translator.aggregate_intermediate_data_frame.__qualname__}__{node_name}__{child.name}"
f"{Translator.aggregate_intermediate_data_frame.__qualname__}__{node_name}__{child.name}",
)

def aggregate_with_count_on_edge_tbl(self, node_name, df, edge_df, child):
Expand Down Expand Up @@ -136,7 +130,7 @@ def aggregate_with_count_on_edge_tbl(self, node_name, df, edge_df, child):
count_df = (
edge_df.groupBy(node_id)
.count()
.select(node_id, col("count").alias(child.name))
.select(node_id, col("count").alias(count_reducer.prop.name))
)
count_reducer.done = True
# combine value lists new counted dataframe to existing one
Expand All @@ -147,7 +141,7 @@ def aggregate_with_count_on_edge_tbl(self, node_name, df, edge_df, child):
)
return self.return_dataframe(
result,
f"{Translator.aggregate_with_count_on_edge_tbl.__qualname__}__{node_name}__{child.name}"
f"{Translator.aggregate_with_count_on_edge_tbl.__qualname__}__{node_name}__{child.name}",
)

def aggregate_with_child_tbl(self, df, parent_name, edge_df, child):
Expand All @@ -172,7 +166,7 @@ def aggregate_with_child_tbl(self, df, parent_name, edge_df, child):
result = self.join_two_dataframe(df, temp_df, how="left_outer")
return self.return_dataframe(
result,
f"{Translator.aggregate_with_child_tbl.__qualname__}__{parent_name}__{child.name}"
f"{Translator.aggregate_with_child_tbl.__qualname__}__{parent_name}__{child.name}",
)

def aggregate_nested_properties(self):
Expand Down Expand Up @@ -231,7 +225,7 @@ def aggregate_nested_properties(self):
return None
return self.return_dataframe(
aggregated_dfs[self.parser.root.name],
Translator.aggregate_nested_properties.__qualname__
Translator.aggregate_nested_properties.__qualname__,
)

def get_direct_children(self, root_df):
Expand Down Expand Up @@ -287,7 +281,9 @@ def get_direct_children(self, root_df):
root_df = self.join_two_dataframe(root_df, child_by_root, how="left_outer")
child_df.unpersist()
child_by_root.unpersist()
return self.return_dataframe(root_df, Translator.get_direct_children.__qualname__)
return self.return_dataframe(
root_df, Translator.get_direct_children.__qualname__
)

def get_joining_props(self, translator, joining_index):
"""
Expand Down Expand Up @@ -380,7 +376,9 @@ def ensure_project_id_exist(self, df):
project_id_prop.name,
concat_ws("-", col(PROGRAM_NAME), col(PROJECT_CODE)),
)
return self.return_dataframe(df, Translator.ensure_project_id_exist.__qualname__)
return self.return_dataframe(
df, Translator.ensure_project_id_exist.__qualname__
)

def translate(self):
root_df = self.translate_table_to_dataframe(
Expand Down Expand Up @@ -424,7 +422,9 @@ def translate_joining_props(self, translators):
joining_index_translator = translators[j.joining_index]
if joining_index_translator.current_step > 0:
df = self.join_to_an_index(df, joining_index_translator, j)
return self.return_dataframe(df, Translator.translate_joining_props.__qualname__)
return self.return_dataframe(
df, Translator.translate_joining_props.__qualname__
)

def walk_through_graph(self, df, root_id, p):
src = self.parser.root
Expand Down Expand Up @@ -481,4 +481,6 @@ def translate_final(self):
execute_filter(df, self.parser.filter) if self.parser.filter else df
)

return self.return_dataframe(filtered_df, Translator.translate_final.__qualname__)
return self.return_dataframe(
filtered_df, Translator.translate_final.__qualname__
)

0 comments on commit c722bb7

Please sign in to comment.