Skip to content

Commit

Permalink
fix(reducer): correct the agg reducers
Browse files Browse the repository at this point in the history
  • Loading branch information
thanh-nguyen-dang committed Oct 30, 2023
1 parent 728bfb2 commit f6465af
Showing 1 changed file with 3 additions and 9 deletions.
12 changes: 3 additions & 9 deletions tube/etl/indexers/aggregation/new_translator.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,14 +97,8 @@ def aggregate_intermediate_data_frame(self, node_name, child, child_df, edge_df)

select_expr = [get_node_id_name(node_name)]
for n in child_df.schema.names:
if n in self.parser.reducer_by_prop and self.parser.reducer_by_prop.get(
n
) in ["list", "set"]:
select_expr.append(
self.reducer_to_agg_func_expr(
self.parser.reducer_by_prop.get(n), n, is_merging=True
)
)
if n in self.parser.reducer_by_prop:
select_expr.append(n)
tmp_df = tmp_df.select(*select_expr)
return self.return_dataframe(
tmp_df,
Expand Down Expand Up @@ -136,7 +130,7 @@ def aggregate_with_count_on_edge_tbl(self, node_name, df, edge_df, child):
count_df = (
edge_df.groupBy(node_id)
.count()
.select(node_id, col("count").alias(child.name))
.select(node_id, col("count").alias(count_reducer.prop.name))
)
count_reducer.done = True
# combine value lists new counted dataframe to existing one
Expand Down

0 comments on commit f6465af

Please sign in to comment.