Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/disable_mtmv_list_rollup' into d…
Browse files Browse the repository at this point in the history
…isable_mtmv_list_rollup
  • Loading branch information
zddr committed Jul 22, 2024
2 parents 66c767e + b770f0c commit 23d69c7
Show file tree
Hide file tree
Showing 2,363 changed files with 113,836 additions and 2,830 deletions.
6 changes: 5 additions & 1 deletion be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#include <sstream>
#include <string>
#include <thread>
#include <type_traits>
#include <utility>
#include <vector>

Expand Down Expand Up @@ -1384,9 +1385,12 @@ void update_s3_resource(const TStorageResource& param, io::RemoteFileSystemSPtr
auto client = static_cast<io::S3FileSystem*>(existed_fs.get())->client_holder();
auto new_s3_conf = S3Conf::get_s3_conf(param.s3_storage_param);
S3ClientConf conf {
.endpoint {},
.region {},
.ak = std::move(new_s3_conf.client_conf.ak),
.sk = std::move(new_s3_conf.client_conf.sk),
.token = std::move(new_s3_conf.client_conf.token),
.bucket {},
.provider = new_s3_conf.client_conf.provider,
};
st = client->reset(conf);
Expand Down Expand Up @@ -1789,7 +1793,7 @@ void PublishVersionWorkerPool::publish_version_callback(const TAgentTaskRequest&
if (tablet->exceed_version_limit(config::max_tablet_version_num * 2 / 3) &&
published_count % 20 == 0) {
auto st = _engine.submit_compaction_task(
tablet, CompactionType::CUMULATIVE_COMPACTION, true);
tablet, CompactionType::CUMULATIVE_COMPACTION, true, false);
if (!st.ok()) [[unlikely]] {
LOG(WARNING) << "trigger compaction failed, tablet_id=" << tablet_id
<< ", published=" << published_count << " : " << st;
Expand Down
8 changes: 8 additions & 0 deletions be/src/cloud/cloud_backend_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
#include "common/status.h"
#include "io/cache/block_file_cache_downloader.h"
#include "io/cache/block_file_cache_factory.h"
#include "runtime/stream_load/stream_load_context.h"
#include "runtime/stream_load/stream_load_recorder.h"
#include "util/brpc_client_cache.h" // BrpcClientCache
#include "util/thrift_server.h"

Expand Down Expand Up @@ -186,4 +188,10 @@ void CloudBackendService::check_warm_up_cache_async(TCheckWarmUpCacheAsyncRespon
response.status = t_status;
}

void CloudBackendService::get_stream_load_record(TStreamLoadRecordResult& result,
int64_t last_stream_record_time) {
BaseBackendService::get_stream_load_record(result, last_stream_record_time,
_engine.get_stream_load_recorder());
}

} // namespace doris
3 changes: 3 additions & 0 deletions be/src/cloud/cloud_backend_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ class CloudBackendService final : public BaseBackendService {
void check_warm_up_cache_async(TCheckWarmUpCacheAsyncResponse& response,
const TCheckWarmUpCacheAsyncRequest& request) override;

void get_stream_load_record(TStreamLoadRecordResult& result,
int64_t last_stream_record_time) override;

private:
CloudStorageEngine& _engine;
};
Expand Down
4 changes: 4 additions & 0 deletions be/src/cloud/cloud_storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,10 @@ Status CloudStorageEngine::open() {

_tablet_hotspot = std::make_unique<TabletHotspot>();

RETURN_NOT_OK_STATUS_WITH_WARN(
init_stream_load_recorder(ExecEnv::GetInstance()->store_paths()[0].path),
"init StreamLoadRecorder failed");

return ThreadPoolBuilder("SyncLoadForTabletsThreadPool")
.set_max_threads(config::sync_load_for_tablets_thread)
.set_min_threads(config::sync_load_for_tablets_thread)
Expand Down
19 changes: 19 additions & 0 deletions be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@

#include <atomic>
#include <memory>
#include <shared_mutex>
#include <unordered_map>
#include <vector>

#include "cloud/cloud_meta_mgr.h"
#include "cloud/cloud_storage_engine.h"
Expand All @@ -42,8 +44,10 @@
#include "olap/rowset/rowset_writer.h"
#include "olap/rowset/segment_v2/inverted_index_desc.h"
#include "olap/storage_policy.h"
#include "olap/tablet_schema.h"
#include "olap/txn_manager.h"
#include "util/debug_points.h"
#include "vec/common/schema_util.h"

namespace doris {
using namespace ErrorCode;
Expand Down Expand Up @@ -131,6 +135,19 @@ Status CloudTablet::sync_rowsets(int64_t query_version, bool warmup_delta_data)
return st;
}

TabletSchemaSPtr CloudTablet::merged_tablet_schema() const {
std::shared_lock rdlock(_meta_lock);
TabletSchemaSPtr target_schema;
std::vector<TabletSchemaSPtr> schemas;
for (const auto& [_, rowset] : _rs_version_map) {
schemas.push_back(rowset->tablet_schema());
}
// get the max version schema and merge all schema
static_cast<void>(
vectorized::schema_util::get_least_common_schema(schemas, nullptr, target_schema));
return target_schema;
}

// Sync tablet meta and all rowset meta if not running.
// This could happen when BE didn't finish schema change job and another BE committed this schema change job.
// It should be a quite rare situation.
Expand Down Expand Up @@ -227,6 +244,7 @@ void CloudTablet::add_rowsets(std::vector<RowsetSharedPtr> to_add, bool version_
{
.expiration_time = expiration_time,
},
.download_done {},
});
}
#endif
Expand Down Expand Up @@ -463,6 +481,7 @@ int64_t CloudTablet::get_cloud_base_compaction_score() const {
if (_tablet_meta->compaction_policy() == CUMULATIVE_TIME_SERIES_POLICY) {
bool has_delete = false;
int64_t point = cumulative_layer_point();
std::shared_lock<std::shared_mutex> rlock(_meta_lock);
for (const auto& rs_meta : _tablet_meta->all_rs_metas()) {
if (rs_meta->start_version() >= point) {
continue;
Expand Down
15 changes: 3 additions & 12 deletions be/src/cloud/cloud_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,18 +147,6 @@ class CloudTablet final : public BaseTablet {

std::vector<RowsetSharedPtr> pick_candidate_rowsets_to_base_compaction();

void traverse_rowsets(std::function<void(const RowsetSharedPtr&)> visitor,
bool include_stale = false) {
std::shared_lock rlock(_meta_lock);
for (auto& [v, rs] : _rs_version_map) {
visitor(rs);
}
if (!include_stale) return;
for (auto& [v, rs] : _stale_rs_version_map) {
visitor(rs);
}
}

inline Version max_version() const {
std::shared_lock rdlock(_meta_lock);
return _tablet_meta->max_version();
Expand Down Expand Up @@ -206,6 +194,9 @@ class CloudTablet final : public BaseTablet {
int64_t last_cumu_compaction_success_time_ms = 0;
int64_t last_cumu_no_suitable_version_ms = 0;

// Return merged extended schema
TabletSchemaSPtr merged_tablet_schema() const override;

private:
// FIXME(plat1ko): No need to record base size if rowsets are ordered by version
void update_base_size(const Rowset& rs);
Expand Down
5 changes: 5 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ DEFINE_Validator(doris_scanner_thread_pool_thread_num, [](const int config) -> b
}
return true;
});
DEFINE_Int32(doris_scanner_min_thread_pool_thread_num, "8");
DEFINE_Int32(remote_split_source_batch_size, "10240");
DEFINE_Int32(doris_max_remote_scanner_thread_pool_thread_num, "-1");
// number of olap scanner thread pool queue size
Expand Down Expand Up @@ -484,6 +485,8 @@ DEFINE_mInt32(migration_remaining_size_threshold_mb, "10");
// If the task runs longer than this time, the task will be terminated, in seconds.
// timeout = std::max(migration_task_timeout_secs, tablet size / 1MB/s)
DEFINE_mInt32(migration_task_timeout_secs, "300");
// timeout for try_lock migration lock
DEFINE_Int64(migration_lock_timeout_ms, "1000");

// Port to start debug webserver on
DEFINE_Int32(webserver_port, "8040");
Expand Down Expand Up @@ -1341,6 +1344,8 @@ DEFINE_mBool(ignore_not_found_file_in_external_table, "true");

DEFINE_mBool(enable_hdfs_mem_limiter, "true");

DEFINE_mInt16(topn_agg_limit_multiplier, "2");

// clang-format off
#ifdef BE_TEST
// test s3
Expand Down
7 changes: 7 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ DECLARE_mInt64(doris_blocking_priority_queue_wait_timeout_ms);
// number of scanner thread pool size for olap table
// and the min thread num of remote scanner thread pool
DECLARE_mInt32(doris_scanner_thread_pool_thread_num);
DECLARE_mInt32(doris_scanner_min_thread_pool_thread_num);
// number of batch size to fetch the remote split source
DECLARE_mInt32(remote_split_source_batch_size);
// max number of remote scanner thread pool size
Expand Down Expand Up @@ -534,6 +535,8 @@ DECLARE_mInt32(migration_remaining_size_threshold_mb);
// If the task runs longer than this time, the task will be terminated, in seconds.
// timeout = std::max(migration_task_timeout_secs, tablet size / 1MB/s)
DECLARE_mInt32(migration_task_timeout_secs);
// timeout for try_lock migration lock
DECLARE_Int64(migration_lock_timeout_ms);

// Port to start debug webserver on
DECLARE_Int32(webserver_port);
Expand Down Expand Up @@ -1433,6 +1436,10 @@ DECLARE_mBool(ignore_not_found_file_in_external_table);

DECLARE_mBool(enable_hdfs_mem_limiter);

// Define how many percent data in hashtable bigger than limit
// we should do agg limit opt
DECLARE_mInt16(topn_agg_limit_multiplier);

#ifdef BE_TEST
// test s3
DECLARE_String(test_s3_resource);
Expand Down
27 changes: 17 additions & 10 deletions be/src/exprs/runtime_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -952,7 +952,7 @@ class RuntimePredicateWrapper {
RuntimeFilterType _filter_type;
int32_t _max_in_num = -1;

SharedRuntimeFilterContext _context;
RuntimeFilterContextSPtr _context;
uint32_t _filter_id;
};

Expand All @@ -965,7 +965,7 @@ Status IRuntimeFilter::create(RuntimeFilterParamsContext* state, ObjectPool* poo
return (*res)->init_with_desc(desc, query_options, node_id, build_bf_exactly);
}

SharedRuntimeFilterContext& IRuntimeFilter::get_shared_context_ref() {
RuntimeFilterContextSPtr& IRuntimeFilter::get_shared_context_ref() {
return _wrapper->_context;
}

Expand Down Expand Up @@ -1033,34 +1033,38 @@ Status IRuntimeFilter::publish(bool publish_local) {
class SyncSizeClosure : public AutoReleaseClosure<PSendFilterSizeRequest,
DummyBrpcCallback<PSendFilterSizeResponse>> {
std::shared_ptr<pipeline::Dependency> _dependency;
IRuntimeFilter* _filter;
RuntimeFilterContextSPtr _rf_context;
std::string _rf_debug_info;
using Base =
AutoReleaseClosure<PSendFilterSizeRequest, DummyBrpcCallback<PSendFilterSizeResponse>>;
ENABLE_FACTORY_CREATOR(SyncSizeClosure);

void _process_if_rpc_failed() override {
((pipeline::CountedFinishDependency*)_dependency.get())->sub();
LOG(WARNING) << "sync filter size meet rpc error, filter=" << _filter->debug_string();
LOG(WARNING) << "sync filter size meet rpc error, filter=" << _rf_debug_info;
Base::_process_if_rpc_failed();
}

void _process_if_meet_error_status(const Status& status) override {
((pipeline::CountedFinishDependency*)_dependency.get())->sub();
if (status.is<ErrorCode::END_OF_FILE>()) {
// rf merger backend may finished before rf's send_filter_size, we just ignore filter in this case.
_filter->set_ignored();
_rf_context->ignored = true;
} else {
LOG(WARNING) << "sync filter size meet error status, filter="
<< _filter->debug_string();
LOG(WARNING) << "sync filter size meet error status, filter=" << _rf_debug_info;
Base::_process_if_meet_error_status(status);
}
}

public:
SyncSizeClosure(std::shared_ptr<PSendFilterSizeRequest> req,
std::shared_ptr<DummyBrpcCallback<PSendFilterSizeResponse>> callback,
std::shared_ptr<pipeline::Dependency> dependency, IRuntimeFilter* filter)
: Base(req, callback), _dependency(std::move(dependency)), _filter(filter) {}
std::shared_ptr<pipeline::Dependency> dependency,
RuntimeFilterContextSPtr rf_context, std::string_view rf_debug_info)
: Base(req, callback),
_dependency(std::move(dependency)),
_rf_context(rf_context),
_rf_debug_info(rf_debug_info) {}
};

Status IRuntimeFilter::send_filter_size(RuntimeState* state, uint64_t local_filter_size) {
Expand Down Expand Up @@ -1103,7 +1107,10 @@ Status IRuntimeFilter::send_filter_size(RuntimeState* state, uint64_t local_filt

auto request = std::make_shared<PSendFilterSizeRequest>();
auto callback = DummyBrpcCallback<PSendFilterSizeResponse>::create_shared();
auto closure = SyncSizeClosure::create_unique(request, callback, _dependency, this);
// IRuntimeFilter maybe deconstructed before the rpc finished, so that could not use
// a raw pointer in closure. Has to use the context's shared ptr.
auto closure = SyncSizeClosure::create_unique(request, callback, _dependency,
_wrapper->_context, this->debug_string());
auto* pquery_id = request->mutable_query_id();
pquery_id->set_hi(_state->query_id.hi());
pquery_id->set_lo(_state->query_id.lo());
Expand Down
4 changes: 2 additions & 2 deletions be/src/exprs/runtime_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class TQueryOptions;
namespace vectorized {
class VExpr;
class VExprContext;
struct SharedRuntimeFilterContext;
struct RuntimeFilterContextSPtr;
} // namespace vectorized

namespace pipeline {
Expand Down Expand Up @@ -220,7 +220,7 @@ class IRuntimeFilter {
const RuntimeFilterRole role, int node_id, IRuntimeFilter** res,
bool build_bf_exactly = false, bool need_local_merge = false);

SharedRuntimeFilterContext& get_shared_context_ref();
RuntimeFilterContextSPtr& get_shared_context_ref();

// insert data to build filter
void insert_batch(vectorized::ColumnPtr column, size_t start);
Expand Down
25 changes: 19 additions & 6 deletions be/src/http/action/calc_file_crc_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <exception>
#include <string>

#include "cloud/cloud_storage_engine.h"
#include "common/logging.h"
#include "common/status.h"
#include "http/http_channel.h"
Expand All @@ -38,7 +39,7 @@
namespace doris {
using namespace ErrorCode;

CalcFileCrcAction::CalcFileCrcAction(ExecEnv* exec_env, StorageEngine& engine,
CalcFileCrcAction::CalcFileCrcAction(ExecEnv* exec_env, BaseStorageEngine& engine,
TPrivilegeHier::type hier, TPrivilegeType::type ptype)
: HttpHandlerWithAuth(exec_env, hier, ptype), _engine(engine) {}

Expand All @@ -58,16 +59,28 @@ Status CalcFileCrcAction::_handle_calc_crc(HttpRequest* req, uint32_t* crc_value
return Status::InternalError("convert tablet id or failed, {}", e.what());
}

TabletSharedPtr tablet = _engine.tablet_manager()->get_tablet(tablet_id);
BaseTabletSPtr tablet = nullptr;

if (auto cloudEngine = dynamic_cast<CloudStorageEngine*>(&_engine)) {
tablet = DORIS_TRY(cloudEngine->get_tablet(tablet_id));
// sync all rowsets
RETURN_IF_ERROR(std::dynamic_pointer_cast<CloudTablet>(tablet)->sync_rowsets(-1));
} else if (auto storageEngine = dynamic_cast<StorageEngine*>(&_engine)) {
auto tabletPtr = storageEngine->tablet_manager()->get_tablet(tablet_id);
tablet = std::dynamic_pointer_cast<Tablet>(tabletPtr);
} else {
return Status::InternalError("convert _engine failed");
}

if (tablet == nullptr) {
return Status::NotFound("Tablet not found. tablet_id={}", tablet_id);
return Status::NotFound("failed to get tablet {}", tablet_id);
}

const auto& req_start_version = req->param(PARAM_START_VERSION);
const auto& req_end_version = req->param(PARAM_END_VERSION);

*start_version = 0;
*end_version = tablet->max_version().second;
*end_version = tablet->max_version_unlocked();

if (!req_start_version.empty()) {
try {
Expand All @@ -85,8 +98,8 @@ Status CalcFileCrcAction::_handle_calc_crc(HttpRequest* req, uint32_t* crc_value
}
}

auto st = tablet->calc_local_file_crc(crc_value, *start_version, *end_version, rowset_count,
file_count);
auto st = tablet->calc_file_crc(crc_value, *start_version, *end_version, rowset_count,
file_count);
if (!st.ok()) {
return st;
}
Expand Down
6 changes: 3 additions & 3 deletions be/src/http/action/calc_file_crc_action.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

namespace doris {
class HttpRequest;
class StorageEngine;
class BaseStorageEngine;
class ExecEnv;

const std::string PARAM_START_VERSION = "start_version";
Expand All @@ -35,7 +35,7 @@ const std::string PARAM_END_VERSION = "end_version";
// This action is used to calculate the crc value of the files in the tablet.
class CalcFileCrcAction : public HttpHandlerWithAuth {
public:
CalcFileCrcAction(ExecEnv* exec_env, StorageEngine& engine, TPrivilegeHier::type hier,
CalcFileCrcAction(ExecEnv* exec_env, BaseStorageEngine& engine, TPrivilegeHier::type hier,
TPrivilegeType::type ptype);

~CalcFileCrcAction() override = default;
Expand All @@ -47,7 +47,7 @@ class CalcFileCrcAction : public HttpHandlerWithAuth {
int64_t* end_version, int32_t* rowset_count, int64_t* file_count);

private:
StorageEngine& _engine;
BaseStorageEngine& _engine;
};

} // end namespace doris
Loading

0 comments on commit 23d69c7

Please sign in to comment.