Skip to content

Commit

Permalink
[Optimize](Row store) pick #37145, #38236 (#38932)
Browse files Browse the repository at this point in the history
  • Loading branch information
lxr599 authored Aug 7, 2024
1 parent bc644cb commit 2543b56
Show file tree
Hide file tree
Showing 22 changed files with 183 additions and 13 deletions.
2 changes: 0 additions & 2 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1053,8 +1053,6 @@ DEFINE_mInt64(max_tablet_io_errors, "-1");
DEFINE_Int32(tablet_path_check_interval_seconds, "-1");
DEFINE_mInt32(tablet_path_check_batch_size, "1000");

// Page size of row column, default 4KB
DEFINE_mInt64(row_column_page_size, "4096");
// it must be larger than or equal to 5MB
DEFINE_mInt32(s3_write_buffer_size, "5242880");
// The timeout config for S3 buffer allocation
Expand Down
2 changes: 0 additions & 2 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1097,8 +1097,6 @@ DECLARE_mInt64(max_tablet_io_errors);
DECLARE_Int32(tablet_path_check_interval_seconds);
DECLARE_mInt32(tablet_path_check_batch_size);

// Page size of row column, default 4KB
DECLARE_mInt64(row_column_page_size);
// it must be larger than or equal to 5MB
DECLARE_mInt32(s3_write_buffer_size);
// The timeout config for S3 buffer allocation
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/rowset/segment_v2/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ namespace segment_v2 {

static constexpr size_t DEFAULT_PAGE_SIZE = 1024 * 1024; // default size: 1M

constexpr long ROW_STORE_PAGE_SIZE_DEFAULT_VALUE = 16384; // default row column page size: 16KB

struct PageBuilderOptions {
size_t data_page_size = DEFAULT_PAGE_SIZE;

Expand Down
5 changes: 4 additions & 1 deletion be/src/olap/rowset/segment_v2/segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -256,8 +256,11 @@ Status SegmentWriter::init(const std::vector<uint32_t>& col_ids, bool has_key) {

if (column.is_row_store_column()) {
// smaller page size for row store column
opts.data_page_size = config::row_column_page_size;
auto page_size = _tablet_schema->row_store_page_size();
opts.data_page_size =
(page_size > 0) ? page_size : segment_v2::ROW_STORE_PAGE_SIZE_DEFAULT_VALUE;
}

std::unique_ptr<ColumnWriter> writer;
RETURN_IF_ERROR(ColumnWriter::create(opts, &column, _file_writer, &writer));
RETURN_IF_ERROR(writer->init());
Expand Down
5 changes: 4 additions & 1 deletion be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,11 @@ Status VerticalSegmentWriter::_create_column_writer(uint32_t cid, const TabletCo

if (column.is_row_store_column()) {
// smaller page size for row store column
opts.data_page_size = config::row_column_page_size;
auto page_size = _tablet_schema->row_store_page_size();
opts.data_page_size =
(page_size > 0) ? page_size : segment_v2::ROW_STORE_PAGE_SIZE_DEFAULT_VALUE;
}

std::unique_ptr<ColumnWriter> writer;
RETURN_IF_ERROR(ColumnWriter::create(opts, &column, _file_writer, &writer));
RETURN_IF_ERROR(writer->init());
Expand Down
3 changes: 3 additions & 0 deletions be/src/olap/tablet_meta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,9 @@ TabletMeta::TabletMeta(int64_t table_id, int64_t partition_id, int64_t tablet_id
if (tablet_schema.__isset.store_row_column) {
schema->set_store_row_column(tablet_schema.store_row_column);
}
if (tablet_schema.__isset.row_store_page_size) {
schema->set_row_store_page_size(tablet_schema.row_store_page_size);
}
if (tablet_schema.__isset.skip_write_index_on_load) {
schema->set_skip_write_index_on_load(tablet_schema.skip_write_index_on_load);
}
Expand Down
4 changes: 4 additions & 0 deletions be/src/olap/tablet_schema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -993,6 +993,7 @@ void TabletSchema::init_from_pb(const TabletSchemaPB& schema, bool ignore_extrac
_sort_type = schema.sort_type();
_sort_col_num = schema.sort_col_num();
_compression_type = schema.compression_type();
_row_store_page_size = schema.row_store_page_size();
_schema_version = schema.schema_version();
// Default to V1 inverted index storage format for backward compatibility if not specified in schema.
if (!schema.has_inverted_index_storage_format()) {
Expand Down Expand Up @@ -1051,6 +1052,7 @@ void TabletSchema::build_current_tablet_schema(int64_t index_id, int32_t version
_skip_write_index_on_load = ori_tablet_schema.skip_write_index_on_load();
_sort_type = ori_tablet_schema.sort_type();
_sort_col_num = ori_tablet_schema.sort_col_num();
_row_store_page_size = ori_tablet_schema.row_store_page_size();

// copy from table_schema_param
_schema_version = version;
Expand Down Expand Up @@ -1204,6 +1206,7 @@ void TabletSchema::to_schema_pb(TabletSchemaPB* tablet_schema_pb) const {
tablet_schema_pb->set_sort_col_num(_sort_col_num);
tablet_schema_pb->set_schema_version(_schema_version);
tablet_schema_pb->set_compression_type(_compression_type);
tablet_schema_pb->set_row_store_page_size(_row_store_page_size);
tablet_schema_pb->set_version_col_idx(_version_col_idx);
tablet_schema_pb->set_inverted_index_storage_format(_inverted_index_storage_format);
}
Expand Down Expand Up @@ -1505,6 +1508,7 @@ bool operator==(const TabletSchema& a, const TabletSchema& b) {
if (a._disable_auto_compaction != b._disable_auto_compaction) return false;
if (a._enable_single_replica_compaction != b._enable_single_replica_compaction) return false;
if (a._store_row_column != b._store_row_column) return false;
if (a._row_store_page_size != b._row_store_page_size) return false;
if (a._skip_write_index_on_load != b._skip_write_index_on_load) return false;
return true;
}
Expand Down
4 changes: 4 additions & 0 deletions be/src/olap/tablet_schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "common/status.h"
#include "gutil/stringprintf.h"
#include "olap/olap_common.h"
#include "olap/rowset/segment_v2/options.h"
#include "runtime/define_primitive_type.h"
#include "runtime/descriptors.h"
#include "util/string_util.h"
Expand Down Expand Up @@ -338,6 +339,8 @@ class TabletSchema {
void set_version_col_idx(int32_t version_col_idx) { _version_col_idx = version_col_idx; }
int32_t version_col_idx() const { return _version_col_idx; }
segment_v2::CompressionTypePB compression_type() const { return _compression_type; }
void set_row_store_page_size(long page_size) { _row_store_page_size = page_size; }
long row_store_page_size() const { return _row_store_page_size; }

const std::vector<TabletIndex>& indexes() const { return _indexes; }
bool has_inverted_index() const {
Expand Down Expand Up @@ -482,6 +485,7 @@ class TabletSchema {
size_t _num_rows_per_row_block = 0;
CompressKind _compress_kind = COMPRESS_NONE;
segment_v2::CompressionTypePB _compression_type = segment_v2::CompressionTypePB::LZ4F;
long _row_store_page_size = segment_v2::ROW_STORE_PAGE_SIZE_DEFAULT_VALUE;
size_t _next_column_unique_id = 0;
std::string _auto_increment_column;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,8 @@ protected void runPendingJob() throws AlterCancelException {
tbl.getTimeSeriesCompactionEmptyRowsetsThreshold(),
tbl.getTimeSeriesCompactionLevelThreshold(),
tbl.storeRowColumn(),
binlogConfig, objectPool);
binlogConfig, objectPool,
tbl.rowStorePageSize());
createReplicaTask.setBaseTablet(tabletIdMap.get(rollupTabletId), baseSchemaHash);
if (this.storageFormat != null) {
createReplicaTask.setStorageFormat(this.storageFormat);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,8 @@ protected void runPendingJob() throws AlterCancelException {
tbl.getTimeSeriesCompactionEmptyRowsetsThreshold(),
tbl.getTimeSeriesCompactionLevelThreshold(),
tbl.storeRowColumn(),
binlogConfig, objectPool);
binlogConfig, objectPool,
tbl.rowStorePageSize());

createReplicaTask.setBaseTablet(partitionIndexTabletMap.get(partitionId, shadowIdxId)
.get(shadowTabletId), originSchemaHash);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1102,7 +1102,8 @@ private void createReplicas(Database db, AgentBatchTask batchTask, OlapTable loc
localTbl.getTimeSeriesCompactionEmptyRowsetsThreshold(),
localTbl.getTimeSeriesCompactionLevelThreshold(),
localTbl.storeRowColumn(),
binlogConfig, objectPool);
binlogConfig, objectPool,
localTbl.rowStorePageSize());
task.setInvertedIndexStorageFormat(localTbl.getInvertedIndexStorageFormat());
task.setInRestoreMode(true);
batchTask.addTask(task);
Expand Down
4 changes: 4 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
Original file line number Diff line number Diff line change
Expand Up @@ -3384,6 +3384,10 @@ private static void addOlapTablePropertyInfo(OlapTable olapTable, StringBuilder
if (olapTable.storeRowColumn()) {
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_STORE_ROW_COLUMN).append("\" = \"");
sb.append(olapTable.storeRowColumn()).append("\"");

// row store page size
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_ROW_STORE_PAGE_SIZE).append("\" = \"");
sb.append(olapTable.rowStorePageSize()).append("\"");
}

// skip inverted index on load
Expand Down
15 changes: 15 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -2432,6 +2432,21 @@ public void setCompressionType(TCompressionType compressionType) {
tableProperty.buildCompressionType();
}

public void setRowStorePageSize(long pageSize) {
TableProperty tableProperty = getOrCreatTableProperty();
tableProperty.modifyTableProperties(PropertyAnalyzer.PROPERTIES_ROW_STORE_PAGE_SIZE,
Long.valueOf(pageSize).toString());
tableProperty.buildRowStorePageSize();
}

public long rowStorePageSize() {
if (tableProperty != null) {
return tableProperty.rowStorePageSize();
}
return PropertyAnalyzer.ROW_STORE_PAGE_SIZE_DEFAULT_VALUE;
}


public void setStorageFormat(TStorageFormat storageFormat) {
TableProperty tableProperty = getOrCreatTableProperty();
tableProperty.modifyTableProperties(PropertyAnalyzer.PROPERTIES_STORAGE_FORMAT, storageFormat.name());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ public class TableProperty implements Writable {

private boolean skipWriteIndexOnLoad = false;

private long rowStorePageSize = PropertyAnalyzer.ROW_STORE_PAGE_SIZE_DEFAULT_VALUE;

private String compactionPolicy = PropertyAnalyzer.SIZE_BASED_COMPACTION_POLICY;

private long timeSeriesCompactionGoalSizeMbytes
Expand Down Expand Up @@ -238,6 +240,17 @@ public boolean storeRowColumn() {
return storeRowColumn;
}

public TableProperty buildRowStorePageSize() {
rowStorePageSize = Long.parseLong(
properties.getOrDefault(PropertyAnalyzer.PROPERTIES_ROW_STORE_PAGE_SIZE,
Long.toString(PropertyAnalyzer.ROW_STORE_PAGE_SIZE_DEFAULT_VALUE)));
return this;
}

public long rowStorePageSize() {
return rowStorePageSize;
}

public TableProperty buildSkipWriteIndexOnLoad() {
skipWriteIndexOnLoad = Boolean.parseBoolean(
properties.getOrDefault(PropertyAnalyzer.PROPERTIES_SKIP_WRITE_INDEX_ON_LOAD, "false"));
Expand Down Expand Up @@ -606,6 +619,7 @@ public static TableProperty read(DataInput in) throws IOException {
.buildBinlogConfig()
.buildEnableLightSchemaChange()
.buildStoreRowColumn()
.buildRowStorePageSize()
.buildSkipWriteIndexOnLoad()
.buildCompactionPolicy()
.buildTimeSeriesCompactionGoalSizeMbytes()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ public class PropertyAnalyzer {
public static final String PROPERTIES_TIMEOUT = "timeout";
public static final String PROPERTIES_COMPRESSION = "compression";

// row store page size, default 16KB
public static final String PROPERTIES_ROW_STORE_PAGE_SIZE = "row_store_page_size";
public static final long ROW_STORE_PAGE_SIZE_DEFAULT_VALUE = 16384;

public static final String PROPERTIES_ENABLE_LIGHT_SCHEMA_CHANGE = "light_schema_change";

public static final String PROPERTIES_DISTRIBUTION_TYPE = "distribution_type";
Expand Down Expand Up @@ -887,6 +891,31 @@ public static TCompressionType analyzeCompressionType(Map<String, String> proper
}
}

public static long alignTo4K(long size) {
return (size + 4095) & ~4095;
}

// analyzeRowStorePageSize will parse the row_store_page_size from properties
public static long analyzeRowStorePageSize(Map<String, String> properties) throws AnalysisException {
long rowStorePageSize = ROW_STORE_PAGE_SIZE_DEFAULT_VALUE;
if (properties != null && properties.containsKey(PROPERTIES_ROW_STORE_PAGE_SIZE)) {
String rowStorePageSizeStr = properties.get(PROPERTIES_ROW_STORE_PAGE_SIZE);
try {
rowStorePageSize = alignTo4K(Long.parseLong(rowStorePageSizeStr));
} catch (NumberFormatException e) {
throw new AnalysisException("Invalid row store page size: " + rowStorePageSizeStr);
}

if (rowStorePageSize <= 0) {
throw new AnalysisException("Row store page size should larger than 0.");
}

properties.remove(PROPERTIES_ROW_STORE_PAGE_SIZE);
}

return rowStorePageSize;
}

// analyzeStorageFormat will parse the storage format from properties
// sql: alter table tablet_name set ("storage_format" = "v2")
// Use this sql to convert all tablets(base and rollup index) to a new format segment
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1511,6 +1511,10 @@ public void addPartition(Database db, String tableName, AddPartitionClause addPa
properties.put(PropertyAnalyzer.PROPERTIES_STORE_ROW_COLUMN,
olapTable.storeRowColumn().toString());
}
if (!properties.containsKey(PropertyAnalyzer.PROPERTIES_ROW_STORE_PAGE_SIZE)) {
properties.put(PropertyAnalyzer.PROPERTIES_ROW_STORE_PAGE_SIZE,
Long.toString(olapTable.rowStorePageSize()));
}
if (!properties.containsKey(PropertyAnalyzer.PROPERTIES_SKIP_WRITE_INDEX_ON_LOAD)) {
properties.put(PropertyAnalyzer.PROPERTIES_SKIP_WRITE_INDEX_ON_LOAD,
olapTable.skipWriteIndexOnLoad().toString());
Expand Down Expand Up @@ -1988,7 +1992,7 @@ protected Partition createPartitionWithIndices(long dbId, OlapTable tbl, long pa
tbl.getTimeSeriesCompactionTimeThresholdSeconds(),
tbl.getTimeSeriesCompactionEmptyRowsetsThreshold(),
tbl.getTimeSeriesCompactionLevelThreshold(),
tbl.storeRowColumn(), binlogConfig, objectPool);
tbl.storeRowColumn(), binlogConfig, objectPool, tbl.rowStorePageSize());

task.setStorageFormat(tbl.getStorageFormat());
task.setInvertedIndexStorageFormat(tbl.getInvertedIndexStorageFormat());
Expand Down Expand Up @@ -2347,6 +2351,16 @@ private boolean createOlapTable(Database db, CreateTableStmt stmt) throws UserEx
}
olapTable.setCompressionType(compressionType);

// get row_store_page_size
long rowStorePageSize = PropertyAnalyzer.ROW_STORE_PAGE_SIZE_DEFAULT_VALUE;
try {
rowStorePageSize = PropertyAnalyzer.analyzeRowStorePageSize(properties);
} catch (AnalysisException e) {
throw new DdlException(e.getMessage());
}

olapTable.setRowStorePageSize(rowStorePageSize);

// check data sort properties
int keyColumnSize = CollectionUtils.isEmpty(keysDesc.getClusterKeysColumnIds()) ? keysDesc.keysColumnSize() :
keysDesc.getClusterKeysColumnIds().size();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -865,7 +865,8 @@ private static void deleteFromMeta(ListMultimap<Long, Long> tabletDeleteFromMeta
olapTable.getTimeSeriesCompactionEmptyRowsetsThreshold(),
olapTable.getTimeSeriesCompactionLevelThreshold(),
olapTable.storeRowColumn(),
binlogConfig, objectPool);
binlogConfig, objectPool,
olapTable.rowStorePageSize());

createReplicaTask.setIsRecoverTask(true);
createReplicaTask.setInvertedIndexStorageFormat(olapTable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public class CreateReplicaTask extends AgentTask {
private TStorageType storageType;
private TStorageMedium storageMedium;
private TCompressionType compressionType;
private long rowStorePageSize;

private List<Column> columns;

Expand Down Expand Up @@ -148,7 +149,8 @@ public CreateReplicaTask(long backendId, long dbId, long tableId, long partition
long timeSeriesCompactionLevelThreshold,
boolean storeRowColumn,
BinlogConfig binlogConfig,
Map<Object, Object> objectPool) {
Map<Object, Object> objectPool,
long rowStorePageSize) {
super(null, backendId, TTaskType.CREATE, dbId, tableId, partitionId, indexId, tabletId);

this.replicaId = replicaId;
Expand Down Expand Up @@ -193,6 +195,8 @@ public CreateReplicaTask(long backendId, long dbId, long tableId, long partition
this.storeRowColumn = storeRowColumn;
this.binlogConfig = binlogConfig;
this.objectPool = objectPool;
this.rowStorePageSize = rowStorePageSize;

}

public void setIsRecoverTask(boolean isRecoverTask) {
Expand Down Expand Up @@ -334,6 +338,7 @@ public TCreateTabletReq toThrift() {
tSchema.setEnableSingleReplicaCompaction(enableSingleReplicaCompaction);
tSchema.setSkipWriteIndexOnLoad(skipWriteIndexOnLoad);
tSchema.setStoreRowColumn(storeRowColumn);
tSchema.setRowStorePageSize(rowStorePageSize);
createTabletReq.setTabletSchema(tSchema);

createTabletReq.setVersion(version);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public class AgentTaskTest {
private long version = 1L;

private TStorageType storageType = TStorageType.COLUMN;
private long rowStorePageSize = 16384L;
private List<Column> columns;
private MarkedCountDownLatch<Long, Long> latch = new MarkedCountDownLatch<Long, Long>(3);

Expand Down Expand Up @@ -107,7 +108,7 @@ public void setUp() throws AnalysisException {
createReplicaTask = new CreateReplicaTask(backendId1, dbId, tableId, partitionId,
indexId1, tabletId1, replicaId1, shortKeyNum, schemaHash1, version, KeysType.AGG_KEYS, storageType,
TStorageMedium.SSD, columns, null, 0, latch, null, false, TTabletType.TABLET_TYPE_DISK, null,
TCompressionType.LZ4F, false, "", false, false, false, "", 0, 0, 0, 0, 0, false, null, objectPool);
TCompressionType.LZ4F, false, "", false, false, false, "", 0, 0, 0, 0, 0, false, null, objectPool, rowStorePageSize);

// drop
dropTask = new DropReplicaTask(backendId1, tabletId1, replicaId1, schemaHash1, false);
Expand Down
1 change: 1 addition & 0 deletions gensrc/proto/olap_file.proto
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ message TabletSchemaPB {
optional InvertedIndexStorageFormatPB inverted_index_storage_format = 25 [default=V1];
// column unique ids for row store columns
repeated int32 row_store_column_unique_ids = 26;
optional int64 row_store_page_size = 27 [default=16384];
}

enum TabletStatePB {
Expand Down
1 change: 1 addition & 0 deletions gensrc/thrift/AgentService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ struct TTabletSchema {
17: optional bool enable_single_replica_compaction = false
18: optional bool skip_write_index_on_load = false
19: optional list<i32> cluster_key_idxes
21: optional i64 row_store_page_size = 16384
}

// this enum stands for different storage format in src_backends
Expand Down
Loading

0 comments on commit 2543b56

Please sign in to comment.