Skip to content

Commit

Permalink
Enable parallel index scans on compressed chunk table (#7659)
Browse files Browse the repository at this point in the history
  • Loading branch information
akuzm authored Feb 11, 2025
1 parent ab3d247 commit 2d197a9
Show file tree
Hide file tree
Showing 4 changed files with 222 additions and 187 deletions.
50 changes: 36 additions & 14 deletions tsl/src/nodes/decompress_chunk/decompress_chunk.c
Original file line number Diff line number Diff line change
Expand Up @@ -1841,6 +1841,29 @@ decompress_chunk_add_plannerinfo(PlannerInfo *root, CompressionInfo *info, const
compressed_rel_setup_equivalence_classes(root, info);
/* translate chunk_rel->joininfo for compressed_rel */
compressed_rel_setup_joininfo(compressed_rel, info);

/*
* Force parallel plan creation, see compute_parallel_worker().
* This is not compatible with ts_classify_relation(), but on the other hand
* the compressed chunk rel shouldn't exist anywhere outside of the
* decompression planning, it is removed at the end.
*
* This is not needed for direct select from a single chunk, in which case
* the chunk reloptkind will be RELOPT_BASEREL
*/
if (chunk_rel->reloptkind == RELOPT_OTHER_MEMBER_REL)
{
compressed_rel->reloptkind = RELOPT_OTHER_MEMBER_REL;

/*
* We have to minimally initialize the append relation info for the
* compressed chunks, so that the generate_implied_equalities() works.
* Only the parent hypertable relindex is needed.
*/
root->append_rel_array[compressed_rel->relid] = makeNode(AppendRelInfo);
root->append_rel_array[compressed_rel->relid]->parent_relid = info->ht_rel->relid;
compressed_rel->top_parent_relids = chunk_rel->top_parent_relids;
}
}

static DecompressChunkPath *
Expand Down Expand Up @@ -1916,27 +1939,26 @@ create_compressed_scan_paths(PlannerInfo *root, RelOptInfo *compressed_rel,
compressed_path = create_seqscan_path(root, compressed_rel, NULL, 0);
add_path(compressed_rel, compressed_path);

/* create parallel scan path */
/*
* Create parallel seq scan path.
* We marked the compressed rel as RELOPT_OTHER_MEMBER_REL when creating it,
* so we should get a nonzero number of parallel workers even for small
* tables, so that they don't prevent parallelism in the entire append plan.
* See compute_parallel_workers(). This also applies to the creation of
* index paths below.
*/
if (compressed_rel->consider_parallel)
{
/* Almost the same functionality as ts_create_plain_partial_paths.
*
* However, we also create a partial path for small chunks to allow PostgreSQL to choose
* a parallel plan for decompression. If no partial path is present for a single chunk,
* PostgreSQL will not use a parallel plan and all chunks are decompressed by a
* non-parallel plan (even if there are a few bigger chunks).
*/
int parallel_workers = compute_parallel_worker(compressed_rel,
compressed_rel->pages,
-1,
max_parallel_workers_per_gather);

/* Use at least one worker */
parallel_workers = Max(parallel_workers, 1);

/* Add an unordered partial path based on a parallel sequential scan. */
add_partial_path(compressed_rel,
create_seqscan_path(root, compressed_rel, NULL, parallel_workers));
if (parallel_workers > 0)
{
add_partial_path(compressed_rel,
create_seqscan_path(root, compressed_rel, NULL, parallel_workers));
}
}

/*
Expand Down
205 changes: 110 additions & 95 deletions tsl/test/expected/compression_ddl.out
Original file line number Diff line number Diff line change
Expand Up @@ -1444,19 +1444,22 @@ EXPLAIN (costs off) SELECT device_id, count(*)
FROM compression_insert
GROUP BY device_id
ORDER BY device_id;
QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------------------
Finalize GroupAggregate
Group Key: _hyper_31_110_chunk.device_id
-> Merge Append
Sort Key: _hyper_31_110_chunk.device_id
-> Custom Scan (VectorAgg)
-> Custom Scan (DecompressChunk) on _hyper_31_110_chunk
-> Index Scan using compress_hyper_32_111_chunk_device_id__ts_meta_min_1__ts_me_idx on compress_hyper_32_111_chunk
-> Partial GroupAggregate
Group Key: _hyper_31_110_chunk.device_id
-> Index Only Scan using _hyper_31_110_chunk_compression_insert_device_id_time_idx on _hyper_31_110_chunk
(10 rows)
QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------------------------------------------
Sort
Sort Key: _hyper_31_110_chunk.device_id
-> Finalize HashAggregate
Group Key: _hyper_31_110_chunk.device_id
-> Gather
Workers Planned: 1
-> Parallel Append
-> Partial HashAggregate
Group Key: _hyper_31_110_chunk.device_id
-> Parallel Index Only Scan using _hyper_31_110_chunk_compression_insert_device_id_time_idx on _hyper_31_110_chunk
-> Custom Scan (VectorAgg)
-> Custom Scan (DecompressChunk) on _hyper_31_110_chunk
-> Parallel Index Scan using compress_hyper_32_111_chunk_device_id__ts_meta_min_1__ts_me_idx on compress_hyper_32_111_chunk
(13 rows)

SELECT device_id, count(*)
FROM compression_insert
Expand Down Expand Up @@ -1527,22 +1530,25 @@ EXPLAIN (costs off) SELECT device_id, count(*)
FROM compression_insert
GROUP BY device_id
ORDER BY device_id;
QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------------------
Finalize GroupAggregate
Group Key: _hyper_31_110_chunk.device_id
-> Merge Append
Sort Key: _hyper_31_110_chunk.device_id
-> Custom Scan (VectorAgg)
-> Custom Scan (DecompressChunk) on _hyper_31_110_chunk
-> Index Scan using compress_hyper_32_111_chunk_device_id__ts_meta_min_1__ts_me_idx on compress_hyper_32_111_chunk
-> Custom Scan (VectorAgg)
-> Custom Scan (DecompressChunk) on _hyper_31_112_chunk
-> Index Scan using compress_hyper_32_113_chunk_device_id__ts_meta_min_1__ts_me_idx on compress_hyper_32_113_chunk
-> Partial GroupAggregate
Group Key: _hyper_31_112_chunk.device_id
-> Index Only Scan using _hyper_31_112_chunk_compression_insert_device_id_time_idx on _hyper_31_112_chunk
(13 rows)
QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------------------------------------------
Sort
Sort Key: _hyper_31_112_chunk.device_id
-> Finalize HashAggregate
Group Key: _hyper_31_112_chunk.device_id
-> Gather
Workers Planned: 2
-> Parallel Append
-> Partial HashAggregate
Group Key: _hyper_31_112_chunk.device_id
-> Parallel Index Only Scan using _hyper_31_112_chunk_compression_insert_device_id_time_idx on _hyper_31_112_chunk
-> Custom Scan (VectorAgg)
-> Custom Scan (DecompressChunk) on _hyper_31_110_chunk
-> Parallel Index Scan using compress_hyper_32_111_chunk_device_id__ts_meta_min_1__ts_me_idx on compress_hyper_32_111_chunk
-> Custom Scan (VectorAgg)
-> Custom Scan (DecompressChunk) on _hyper_31_112_chunk
-> Parallel Index Scan using compress_hyper_32_113_chunk_device_id__ts_meta_min_1__ts_me_idx on compress_hyper_32_113_chunk
(16 rows)

SELECT device_id, count(*)
FROM compression_insert
Expand Down Expand Up @@ -1613,25 +1619,28 @@ EXPLAIN (costs off) SELECT device_id, count(*)
FROM compression_insert
GROUP BY device_id
ORDER BY device_id;
QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------------------
Finalize GroupAggregate
Group Key: _hyper_31_110_chunk.device_id
-> Merge Append
Sort Key: _hyper_31_110_chunk.device_id
-> Custom Scan (VectorAgg)
-> Custom Scan (DecompressChunk) on _hyper_31_110_chunk
-> Index Scan using compress_hyper_32_111_chunk_device_id__ts_meta_min_1__ts_me_idx on compress_hyper_32_111_chunk
-> Custom Scan (VectorAgg)
-> Custom Scan (DecompressChunk) on _hyper_31_112_chunk
-> Index Scan using compress_hyper_32_113_chunk_device_id__ts_meta_min_1__ts_me_idx on compress_hyper_32_113_chunk
-> Custom Scan (VectorAgg)
-> Custom Scan (DecompressChunk) on _hyper_31_114_chunk
-> Index Scan using compress_hyper_32_115_chunk_device_id__ts_meta_min_1__ts_me_idx on compress_hyper_32_115_chunk
-> Partial GroupAggregate
Group Key: _hyper_31_114_chunk.device_id
-> Index Only Scan using _hyper_31_114_chunk_compression_insert_device_id_time_idx on _hyper_31_114_chunk
(16 rows)
QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------------------------------------------
Sort
Sort Key: _hyper_31_114_chunk.device_id
-> Finalize HashAggregate
Group Key: _hyper_31_114_chunk.device_id
-> Gather
Workers Planned: 2
-> Parallel Append
-> Partial HashAggregate
Group Key: _hyper_31_114_chunk.device_id
-> Parallel Index Only Scan using _hyper_31_114_chunk_compression_insert_device_id_time_idx on _hyper_31_114_chunk
-> Custom Scan (VectorAgg)
-> Custom Scan (DecompressChunk) on _hyper_31_110_chunk
-> Parallel Index Scan using compress_hyper_32_111_chunk_device_id__ts_meta_min_1__ts_me_idx on compress_hyper_32_111_chunk
-> Custom Scan (VectorAgg)
-> Custom Scan (DecompressChunk) on _hyper_31_112_chunk
-> Parallel Index Scan using compress_hyper_32_113_chunk_device_id__ts_meta_min_1__ts_me_idx on compress_hyper_32_113_chunk
-> Custom Scan (VectorAgg)
-> Custom Scan (DecompressChunk) on _hyper_31_114_chunk
-> Parallel Index Scan using compress_hyper_32_115_chunk_device_id__ts_meta_min_1__ts_me_idx on compress_hyper_32_115_chunk
(19 rows)

SELECT device_id, count(*)
FROM compression_insert
Expand Down Expand Up @@ -1702,28 +1711,31 @@ EXPLAIN (costs off) SELECT device_id, count(*)
FROM compression_insert
GROUP BY device_id
ORDER BY device_id;
QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------------------
Finalize GroupAggregate
Group Key: _hyper_31_110_chunk.device_id
-> Merge Append
Sort Key: _hyper_31_110_chunk.device_id
-> Custom Scan (VectorAgg)
-> Custom Scan (DecompressChunk) on _hyper_31_110_chunk
-> Index Scan using compress_hyper_32_111_chunk_device_id__ts_meta_min_1__ts_me_idx on compress_hyper_32_111_chunk
-> Custom Scan (VectorAgg)
-> Custom Scan (DecompressChunk) on _hyper_31_112_chunk
-> Index Scan using compress_hyper_32_113_chunk_device_id__ts_meta_min_1__ts_me_idx on compress_hyper_32_113_chunk
-> Custom Scan (VectorAgg)
-> Custom Scan (DecompressChunk) on _hyper_31_114_chunk
-> Index Scan using compress_hyper_32_115_chunk_device_id__ts_meta_min_1__ts_me_idx on compress_hyper_32_115_chunk
-> Custom Scan (VectorAgg)
-> Custom Scan (DecompressChunk) on _hyper_31_116_chunk
-> Index Scan using compress_hyper_32_117_chunk_device_id__ts_meta_min_1__ts_me_idx on compress_hyper_32_117_chunk
-> Partial GroupAggregate
Group Key: _hyper_31_116_chunk.device_id
-> Index Only Scan using _hyper_31_116_chunk_compression_insert_device_id_time_idx on _hyper_31_116_chunk
(19 rows)
QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------------------------------------------
Sort
Sort Key: _hyper_31_116_chunk.device_id
-> Finalize HashAggregate
Group Key: _hyper_31_116_chunk.device_id
-> Gather
Workers Planned: 2
-> Parallel Append
-> Partial HashAggregate
Group Key: _hyper_31_116_chunk.device_id
-> Parallel Index Only Scan using _hyper_31_116_chunk_compression_insert_device_id_time_idx on _hyper_31_116_chunk
-> Custom Scan (VectorAgg)
-> Custom Scan (DecompressChunk) on _hyper_31_110_chunk
-> Parallel Index Scan using compress_hyper_32_111_chunk_device_id__ts_meta_min_1__ts_me_idx on compress_hyper_32_111_chunk
-> Custom Scan (VectorAgg)
-> Custom Scan (DecompressChunk) on _hyper_31_112_chunk
-> Parallel Index Scan using compress_hyper_32_113_chunk_device_id__ts_meta_min_1__ts_me_idx on compress_hyper_32_113_chunk
-> Custom Scan (VectorAgg)
-> Custom Scan (DecompressChunk) on _hyper_31_114_chunk
-> Parallel Index Scan using compress_hyper_32_115_chunk_device_id__ts_meta_min_1__ts_me_idx on compress_hyper_32_115_chunk
-> Custom Scan (VectorAgg)
-> Custom Scan (DecompressChunk) on _hyper_31_116_chunk
-> Parallel Index Scan using compress_hyper_32_117_chunk_device_id__ts_meta_min_1__ts_me_idx on compress_hyper_32_117_chunk
(22 rows)

SELECT device_id, count(*)
FROM compression_insert
Expand Down Expand Up @@ -1794,31 +1806,34 @@ EXPLAIN (costs off) SELECT device_id, count(*)
FROM compression_insert
GROUP BY device_id
ORDER BY device_id;
QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------------------
Finalize GroupAggregate
Group Key: _hyper_31_110_chunk.device_id
-> Merge Append
Sort Key: _hyper_31_110_chunk.device_id
-> Custom Scan (VectorAgg)
-> Custom Scan (DecompressChunk) on _hyper_31_110_chunk
-> Index Scan using compress_hyper_32_111_chunk_device_id__ts_meta_min_1__ts_me_idx on compress_hyper_32_111_chunk
-> Custom Scan (VectorAgg)
-> Custom Scan (DecompressChunk) on _hyper_31_112_chunk
-> Index Scan using compress_hyper_32_113_chunk_device_id__ts_meta_min_1__ts_me_idx on compress_hyper_32_113_chunk
-> Custom Scan (VectorAgg)
-> Custom Scan (DecompressChunk) on _hyper_31_114_chunk
-> Index Scan using compress_hyper_32_115_chunk_device_id__ts_meta_min_1__ts_me_idx on compress_hyper_32_115_chunk
-> Custom Scan (VectorAgg)
-> Custom Scan (DecompressChunk) on _hyper_31_116_chunk
-> Index Scan using compress_hyper_32_117_chunk_device_id__ts_meta_min_1__ts_me_idx on compress_hyper_32_117_chunk
-> Custom Scan (VectorAgg)
-> Custom Scan (DecompressChunk) on _hyper_31_118_chunk
-> Index Scan using compress_hyper_32_119_chunk_device_id__ts_meta_min_1__ts_me_idx on compress_hyper_32_119_chunk
-> Partial GroupAggregate
Group Key: _hyper_31_118_chunk.device_id
-> Index Only Scan using _hyper_31_118_chunk_compression_insert_device_id_time_idx on _hyper_31_118_chunk
(22 rows)
QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------------------------------------------
Sort
Sort Key: _hyper_31_118_chunk.device_id
-> Finalize HashAggregate
Group Key: _hyper_31_118_chunk.device_id
-> Gather
Workers Planned: 2
-> Parallel Append
-> Partial HashAggregate
Group Key: _hyper_31_118_chunk.device_id
-> Parallel Index Only Scan using _hyper_31_118_chunk_compression_insert_device_id_time_idx on _hyper_31_118_chunk
-> Custom Scan (VectorAgg)
-> Custom Scan (DecompressChunk) on _hyper_31_110_chunk
-> Parallel Index Scan using compress_hyper_32_111_chunk_device_id__ts_meta_min_1__ts_me_idx on compress_hyper_32_111_chunk
-> Custom Scan (VectorAgg)
-> Custom Scan (DecompressChunk) on _hyper_31_112_chunk
-> Parallel Index Scan using compress_hyper_32_113_chunk_device_id__ts_meta_min_1__ts_me_idx on compress_hyper_32_113_chunk
-> Custom Scan (VectorAgg)
-> Custom Scan (DecompressChunk) on _hyper_31_114_chunk
-> Parallel Index Scan using compress_hyper_32_115_chunk_device_id__ts_meta_min_1__ts_me_idx on compress_hyper_32_115_chunk
-> Custom Scan (VectorAgg)
-> Custom Scan (DecompressChunk) on _hyper_31_116_chunk
-> Parallel Index Scan using compress_hyper_32_117_chunk_device_id__ts_meta_min_1__ts_me_idx on compress_hyper_32_117_chunk
-> Custom Scan (VectorAgg)
-> Custom Scan (DecompressChunk) on _hyper_31_118_chunk
-> Parallel Index Scan using compress_hyper_32_119_chunk_device_id__ts_meta_min_1__ts_me_idx on compress_hyper_32_119_chunk
(25 rows)

SELECT device_id, count(*)
FROM compression_insert
Expand Down
Loading

0 comments on commit 2d197a9

Please sign in to comment.