Skip to content

Commit

Permalink
allow to set row_column_page_size for tables
Browse files Browse the repository at this point in the history
  • Loading branch information
lxr599 committed Jul 4, 2024
1 parent ec59d28 commit 76a78da
Show file tree
Hide file tree
Showing 24 changed files with 127 additions and 23 deletions.
2 changes: 0 additions & 2 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1040,8 +1040,6 @@ DEFINE_mInt64(max_tablet_io_errors, "-1");
DEFINE_Int32(tablet_path_check_interval_seconds, "-1");
DEFINE_mInt32(tablet_path_check_batch_size, "1000");

// Page size of row column, default 16KB
DEFINE_mInt64(row_column_page_size, "16384");
// it must be larger than or equal to 5MB
DEFINE_mInt64(s3_write_buffer_size, "5242880");
// Log interval when doing s3 upload task
Expand Down
2 changes: 0 additions & 2 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1084,8 +1084,6 @@ DECLARE_mInt64(max_tablet_io_errors);
DECLARE_Int32(tablet_path_check_interval_seconds);
DECLARE_mInt32(tablet_path_check_batch_size);

// Page size of row column, default 4KB
DECLARE_mInt64(row_column_page_size);
// it must be larger than or equal to 5MB
DECLARE_mInt64(s3_write_buffer_size);
// Log interval when doing s3 upload task
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/rowset/segment_v2/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ namespace segment_v2 {

static constexpr size_t DEFAULT_PAGE_SIZE = 1024 * 1024; // default size: 1M

constexpr long ROW_COLUMN_PAGE_SIZE_DEFAULT_VALUE = 16384; // default row column page size: 16KB

struct PageBuilderOptions {
size_t data_page_size = DEFAULT_PAGE_SIZE;

Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/segment_v2/segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ Status SegmentWriter::init(const std::vector<uint32_t>& col_ids, bool has_key) {

if (column.is_row_store_column()) {
// smaller page size for row store column
opts.data_page_size = config::row_column_page_size;
opts.data_page_size = _tablet_schema->row_column_page_size();
}
std::unique_ptr<ColumnWriter> writer;
RETURN_IF_ERROR(ColumnWriter::create(opts, &column, _file_writer, &writer));
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ Status VerticalSegmentWriter::_create_column_writer(uint32_t cid, const TabletCo

if (column.is_row_store_column()) {
// smaller page size for row store column
opts.data_page_size = config::row_column_page_size;
opts.data_page_size = _tablet_schema->row_column_page_size();
}
std::unique_ptr<ColumnWriter> writer;
RETURN_IF_ERROR(ColumnWriter::create(opts, &column, _file_writer, &writer));
Expand Down
3 changes: 3 additions & 0 deletions be/src/olap/tablet_meta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,9 @@ TabletMeta::TabletMeta(int64_t table_id, int64_t partition_id, int64_t tablet_id
if (tablet_schema.__isset.store_row_column) {
schema->set_store_row_column(tablet_schema.store_row_column);
}
if (tablet_schema.__isset.row_column_page_size) {
schema->set_row_column_page_size(tablet_schema.row_column_page_size);
}
if (tablet_schema.__isset.skip_write_index_on_load) {
schema->set_skip_write_index_on_load(tablet_schema.skip_write_index_on_load);
}
Expand Down
4 changes: 4 additions & 0 deletions be/src/olap/tablet_schema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -969,6 +969,7 @@ void TabletSchema::init_from_pb(const TabletSchemaPB& schema, bool ignore_extrac
_sort_type = schema.sort_type();
_sort_col_num = schema.sort_col_num();
_compression_type = schema.compression_type();
_row_column_page_size = schema.row_column_page_size();
_schema_version = schema.schema_version();
// Default to V1 inverted index storage format for backward compatibility if not specified in schema.
if (!schema.has_inverted_index_storage_format()) {
Expand Down Expand Up @@ -1009,6 +1010,7 @@ void TabletSchema::build_current_tablet_schema(int64_t index_id, int32_t version
_skip_write_index_on_load = ori_tablet_schema.skip_write_index_on_load();
_sort_type = ori_tablet_schema.sort_type();
_sort_col_num = ori_tablet_schema.sort_col_num();
_row_column_page_size = ori_tablet_schema.row_column_page_size();

// copy from table_schema_param
_schema_version = version;
Expand Down Expand Up @@ -1162,6 +1164,7 @@ void TabletSchema::to_schema_pb(TabletSchemaPB* tablet_schema_pb) const {
tablet_schema_pb->set_sort_col_num(_sort_col_num);
tablet_schema_pb->set_schema_version(_schema_version);
tablet_schema_pb->set_compression_type(_compression_type);
tablet_schema_pb->set_row_column_page_size(_row_column_page_size);
tablet_schema_pb->set_version_col_idx(_version_col_idx);
tablet_schema_pb->set_inverted_index_storage_format(_inverted_index_storage_format);
}
Expand Down Expand Up @@ -1464,6 +1467,7 @@ bool operator==(const TabletSchema& a, const TabletSchema& b) {
if (a._disable_auto_compaction != b._disable_auto_compaction) return false;
if (a._enable_single_replica_compaction != b._enable_single_replica_compaction) return false;
if (a._store_row_column != b._store_row_column) return false;
if (a._row_column_page_size != b._row_column_page_size) return false;
if (a._skip_write_index_on_load != b._skip_write_index_on_load) return false;
return true;
}
Expand Down
6 changes: 6 additions & 0 deletions be/src/olap/tablet_schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "common/status.h"
#include "gutil/stringprintf.h"
#include "olap/olap_common.h"
#include "olap/rowset/segment_v2/options.h"
#include "runtime/define_primitive_type.h"
#include "runtime/descriptors.h"
#include "util/string_util.h"
Expand Down Expand Up @@ -334,6 +335,10 @@ class TabletSchema {
void set_version_col_idx(int32_t version_col_idx) { _version_col_idx = version_col_idx; }
int32_t version_col_idx() const { return _version_col_idx; }
segment_v2::CompressionTypePB compression_type() const { return _compression_type; }
void set_row_column_page_size(long page_size) {
_row_column_page_size = page_size;
}
long row_column_page_size() const { return _row_column_page_size; }

const std::vector<TabletIndex>& indexes() const { return _indexes; }
bool has_inverted_index() const {
Expand Down Expand Up @@ -478,6 +483,7 @@ class TabletSchema {
size_t _num_rows_per_row_block = 0;
CompressKind _compress_kind = COMPRESS_NONE;
segment_v2::CompressionTypePB _compression_type = segment_v2::CompressionTypePB::LZ4F;
long _row_column_page_size = segment_v2::ROW_COLUMN_PAGE_SIZE_DEFAULT_VALUE;
size_t _next_column_unique_id = 0;
std::string _auto_increment_column;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,8 @@ private void createRollupReplicaForPartition(OlapTable tbl) throws Exception {
tbl.getTimeSeriesCompactionTimeThresholdSeconds(),
tbl.getTimeSeriesCompactionEmptyRowsetsThreshold(),
tbl.getTimeSeriesCompactionLevelThreshold(),
tbl.disableAutoCompaction());
tbl.disableAutoCompaction(),
tbl.rowColumnPageSize());
requestBuilder.addTabletMetas(builder);
} // end for rollupTablets
((CloudInternalCatalog) Env.getCurrentInternalCatalog())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,8 @@ private void createShadowIndexReplicaForPartition(OlapTable tbl) throws Exceptio
tbl.getTimeSeriesCompactionTimeThresholdSeconds(),
tbl.getTimeSeriesCompactionEmptyRowsetsThreshold(),
tbl.getTimeSeriesCompactionLevelThreshold(),
tbl.disableAutoCompaction());
tbl.disableAutoCompaction(),
tbl.rowColumnPageSize());
requestBuilder.addTabletMetas(builder);
} // end for rollupTablets
((CloudInternalCatalog) Env.getCurrentInternalCatalog())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,8 @@ protected void createRollupReplica() throws AlterCancelException {
tbl.getTimeSeriesCompactionEmptyRowsetsThreshold(),
tbl.getTimeSeriesCompactionLevelThreshold(),
tbl.storeRowColumn(),
binlogConfig);
binlogConfig,
tbl.rowColumnPageSize());
createReplicaTask.setBaseTablet(tabletIdMap.get(rollupTabletId), baseSchemaHash);
if (this.storageFormat != null) {
createReplicaTask.setStorageFormat(this.storageFormat);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,8 @@ protected void createShadowIndexReplica() throws AlterCancelException {
tbl.getTimeSeriesCompactionEmptyRowsetsThreshold(),
tbl.getTimeSeriesCompactionLevelThreshold(),
tbl.storeRowColumn(),
binlogConfig);
binlogConfig,
tbl.rowColumnPageSize());

createReplicaTask.setBaseTablet(partitionIndexTabletMap.get(partitionId, shadowIdxId)
.get(shadowTabletId), originSchemaHash);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1099,7 +1099,8 @@ private void createReplicas(Database db, AgentBatchTask batchTask, OlapTable loc
localTbl.getTimeSeriesCompactionEmptyRowsetsThreshold(),
localTbl.getTimeSeriesCompactionLevelThreshold(),
localTbl.storeRowColumn(),
binlogConfig);
binlogConfig,
localTbl.rowColumnPageSize());

task.setInRestoreMode(true);
batchTask.addTask(task);
Expand Down
4 changes: 4 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
Original file line number Diff line number Diff line change
Expand Up @@ -3499,6 +3499,10 @@ public static void getDdlStmt(DdlStmt ddlStmt, String dbName, TableIf table, Lis
sb.append(olapTable.getCompressionType()).append("\"");
}

// row column page size
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_ROW_COLUMN_PAGE_SIZE).append("\" = \"");
sb.append(olapTable.rowColumnPageSize()).append("\"");

// estimate_partition_size
if (!olapTable.getEstimatePartitionSize().equals("")) {
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_ESTIMATE_PARTITION_SIZE).append("\" = \"");
Expand Down
14 changes: 14 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -2464,6 +2464,20 @@ public void setCompressionType(TCompressionType compressionType) {
tableProperty.buildCompressionType();
}

public void setRowColumnPageSize(long pageSize) {
TableProperty tableProperty = getOrCreatTableProperty();
tableProperty.modifyTableProperties(PropertyAnalyzer.PROPERTIES_ROW_COLUMN_PAGE_SIZE,
Long.valueOf(pageSize).toString());
tableProperty.buildRowColumnPageSize();
}

public long rowColumnPageSize() {
if (tableProperty != null) {
return tableProperty.rowColumnPageSize();
}
return PropertyAnalyzer.ROW_COLUMN_PAGE_SIZE_DEFAULT_VALUE;
}

public void setStorageFormat(TStorageFormat storageFormat) {
TableProperty tableProperty = getOrCreatTableProperty();
tableProperty.modifyTableProperties(PropertyAnalyzer.PROPERTIES_STORAGE_FORMAT, storageFormat.name());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ public class TableProperty implements Writable {

private boolean skipWriteIndexOnLoad = false;

private long rowColumnPageSize = PropertyAnalyzer.ROW_COLUMN_PAGE_SIZE_DEFAULT_VALUE;

private String compactionPolicy = PropertyAnalyzer.SIZE_BASED_COMPACTION_POLICY;

private long timeSeriesCompactionGoalSizeMbytes
Expand Down Expand Up @@ -250,6 +252,17 @@ public boolean storeRowColumn() {
return storeRowColumn;
}

public TableProperty buildRowColumnPageSize() {
rowColumnPageSize = Long.parseLong(
properties.getOrDefault(PropertyAnalyzer.PROPERTIES_ROW_COLUMN_PAGE_SIZE,
Long.toString(PropertyAnalyzer.ROW_COLUMN_PAGE_SIZE_DEFAULT_VALUE)));
return this;
}

public long rowColumnPageSize() {
return rowColumnPageSize;
}

public TableProperty buildSkipWriteIndexOnLoad() {
skipWriteIndexOnLoad = Boolean.parseBoolean(
properties.getOrDefault(PropertyAnalyzer.PROPERTIES_SKIP_WRITE_INDEX_ON_LOAD, "false"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ protected Partition createPartitionWithIndices(long dbId, OlapTable tbl, long pa
String storagePolicy,
IdGeneratorBuffer idGeneratorBuffer,
BinlogConfig binlogConfig,
boolean isStorageMediumSpecified, List<Integer> clusterKeyIndexes)
boolean isStorageMediumSpecified,
List<Integer> clusterKeyIndexes, long pageSize)
throws DdlException {
// create base index first.
Preconditions.checkArgument(tbl.getBaseIndexId() != -1);
Expand Down Expand Up @@ -170,7 +171,8 @@ protected Partition createPartitionWithIndices(long dbId, OlapTable tbl, long pa
tbl.getTimeSeriesCompactionTimeThresholdSeconds(),
tbl.getTimeSeriesCompactionEmptyRowsetsThreshold(),
tbl.getTimeSeriesCompactionLevelThreshold(),
tbl.disableAutoCompaction());
tbl.disableAutoCompaction(),
tbl.rowColumnPageSize());
requestBuilder.addTabletMetas(builder);
}
if (!storageVaultIdSet && ((CloudEnv) Env.getCurrentEnv()).getEnableStorageVault()) {
Expand Down Expand Up @@ -216,7 +218,7 @@ public OlapFile.TabletMetaCloudPB.Builder createTabletMetaBuilder(long tableId,
boolean storeRowColumn, int schemaVersion, String compactionPolicy,
Long timeSeriesCompactionGoalSizeMbytes, Long timeSeriesCompactionFileCountThreshold,
Long timeSeriesCompactionTimeThresholdSeconds, Long timeSeriesCompactionEmptyRowsetsThreshold,
Long timeSeriesCompactionLevelThreshold, boolean disableAutoCompaction) throws DdlException {
Long timeSeriesCompactionLevelThreshold, boolean disableAutoCompaction, long pageSize) throws DdlException {
OlapFile.TabletMetaCloudPB.Builder builder = OlapFile.TabletMetaCloudPB.newBuilder();
builder.setTableId(tableId);
builder.setIndexId(indexId);
Expand Down Expand Up @@ -331,6 +333,8 @@ public OlapFile.TabletMetaCloudPB.Builder createTabletMetaBuilder(long tableId,
}
schemaBuilder.setDisableAutoCompaction(disableAutoCompaction);

schemaBuilder.setRowColumnPageSize(pageSize);

OlapFile.TabletSchemaCloudPB schema = schemaBuilder.build();
builder.setSchema(schema);
// rowset
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ public class PropertyAnalyzer {
public static final String PROPERTIES_TIMEOUT = "timeout";
public static final String PROPERTIES_COMPRESSION = "compression";

// row column page size, default 16KB
public static final String PROPERTIES_ROW_COLUMN_PAGE_SIZE = "row_column_page_size";
public static final long ROW_COLUMN_PAGE_SIZE_DEFAULT_VALUE = 16384;

public static final String PROPERTIES_ENABLE_LIGHT_SCHEMA_CHANGE = "light_schema_change";

public static final String PROPERTIES_DISTRIBUTION_TYPE = "distribution_type";
Expand Down Expand Up @@ -974,6 +978,31 @@ public static TCompressionType analyzeCompressionType(Map<String, String> proper
}
}

public static long alignTo4K(long size) {
return (size + 4095) & ~4095;
}

// analyzeRowColumnPageSize will parse the row_column_page_size from properties
public static long analyzeRowColumnPageSize(Map<String, String> properties) throws AnalysisException {
long rowColumnPageSize = 16384;
if (properties != null && properties.containsKey(PROPERTIES_ROW_COLUMN_PAGE_SIZE)) {
String rowColumnPageSizeStr = properties.get(PROPERTIES_ROW_COLUMN_PAGE_SIZE);
try {
rowColumnPageSize = alignTo4K(Long.parseLong(rowColumnPageSizeStr));
} catch (NumberFormatException e) {
throw new AnalysisException("Invalid row column page size: " + rowColumnPageSizeStr);
}

if (rowColumnPageSize <= 0) {
throw new AnalysisException("Row column page size should larger than 0.");
}

properties.remove(PROPERTIES_ROW_COLUMN_PAGE_SIZE);
}

return rowColumnPageSize;
}

// analyzeStorageFormat will parse the storage format from properties
// sql: alter table tablet_name set ("storage_format" = "v2")
// Use this sql to convert all tablets(base and rollup index) to a new format segment
Expand Down
Loading

0 comments on commit 76a78da

Please sign in to comment.