From 4120b6bed6e7e5f3dd919d7fc0f63491954c7d9a Mon Sep 17 00:00:00 2001 From: doleyzi Date: Wed, 14 Aug 2024 15:30:45 +0800 Subject: [PATCH] [INLONG-10780][SDK] Optimize memory management for DataProxy CPP SDK --- .../dataproxy-sdk-cpp/src/config/sdk_conf.cc | 46 +++++++--- .../dataproxy-sdk-cpp/src/config/sdk_conf.h | 3 + .../src/manager/buffer_manager.h | 85 +++++++++++++++++++ .../src/utils/capi_constant.h | 2 + 4 files changed, 126 insertions(+), 10 deletions(-) create mode 100644 inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/buffer_manager.h diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc index 319262adc2f..68cc122c4ce 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc @@ -38,6 +38,9 @@ bool SdkConfig::ParseConfig(const std::string &config_path) { // Guaranteed to only parse the configuration file once if (!__sync_bool_compare_and_swap(&parsed_, false, true)) { LOG_INFO("ParseConfig has parsed ."); + if (++instance_num_ > max_instance_) { + return false; + } return true; } @@ -92,7 +95,7 @@ void SdkConfig::defaultInit() { load_balance_interval_ = constants::kLoadBalanceInterval; heart_beat_interval_ = constants::kHeartBeatInterval; enable_balance_ = constants::kEnableBalance; - isolation_level_=constants::IsolationLevel::kLevelSecond; + isolation_level_ = constants::IsolationLevel::kLevelSecond; // cache parameter send_buf_size_ = constants::kSendBufSize; @@ -132,6 +135,10 @@ void SdkConfig::defaultInit() { enable_setaffinity_ = constants::kEnableSetAffinity; mask_cpu_affinity_ = constants::kMaskCPUAffinity; extend_field_ = constants::kExtendField; + + need_auth_ = constants::kNeedAuth; + max_instance_ = constants::kMaxInstance; + instance_num_ = 1; } void SdkConfig::InitThreadParam(const rapidjson::Value &doc) { @@ -212,6 +219,14 @@ void SdkConfig::InitCacheParam(const rapidjson::Value &doc) { } else { max_stream_id_num_ = constants::kMaxGroupIdNum; } + + // max_cache_num + if (doc.HasMember("max_cache_num") && doc["max_cache_num"].IsInt() && doc["max_cache_num"].GetInt() >= 0) { + const rapidjson::Value &obj = doc["max_cache_num"]; + max_cache_num_ = obj.GetInt(); + } else { + max_cache_num_ = constants::kMaxCacheNum; + } } void SdkConfig::InitZipParam(const rapidjson::Value &doc) { @@ -431,9 +446,10 @@ void SdkConfig::InitAuthParm(const rapidjson::Value &doc) { } else { need_auth_ = constants::kNeedAuth; LOG_INFO("need_auth is not expect, then use default:%s" << need_auth_ - ? "true" - : "false"); + ? "true" + : "false"); } + } void SdkConfig::OthersParam(const rapidjson::Value &doc) { // ser_ip @@ -475,12 +491,20 @@ void SdkConfig::OthersParam(const rapidjson::Value &doc) { } else { extend_field_ = constants::kExtendField; } + + // instance num + if (doc.HasMember("max_instance") && doc["max_instance"].IsInt() && doc["max_instance"].GetInt() > 0) { + const rapidjson::Value &obj = doc["max_instance"]; + max_instance_ = obj.GetInt(); + } else { + max_instance_ = constants::kMaxInstance; + } } -bool SdkConfig::GetLocalIPV4Address(std::string& err_info, std::string& localhost) { +bool SdkConfig::GetLocalIPV4Address(std::string &err_info, std::string &localhost) { int32_t sockfd; int32_t ip_num = 0; - char buf[1024] = {0}; + char buf[1024] = {0}; struct ifreq *ifreq; struct ifreq if_flag; struct ifconf ifconf; @@ -493,7 +517,7 @@ bool SdkConfig::GetLocalIPV4Address(std::string& err_info, std::string& localhos } ioctl(sockfd, SIOCGIFCONF, &ifconf); - ifreq = (struct ifreq *)buf; + ifreq = (struct ifreq *) buf; ip_num = ifconf.ifc_len / sizeof(struct ifreq); for (int32_t i = 0; i < ip_num; i++, ifreq++) { if (ifreq->ifr_flags != AF_INET) { @@ -511,11 +535,11 @@ bool SdkConfig::GetLocalIPV4Address(std::string& err_info, std::string& localhos continue; } - if (!strncmp(inet_ntoa(((struct sockaddr_in*)&(ifreq->ifr_addr))->sin_addr), + if (!strncmp(inet_ntoa(((struct sockaddr_in *) &(ifreq->ifr_addr))->sin_addr), "127.0.0.1", 7)) { continue; } - localhost = inet_ntoa(((struct sockaddr_in*)&(ifreq->ifr_addr))->sin_addr); + localhost = inet_ntoa(((struct sockaddr_in *) &(ifreq->ifr_addr))->sin_addr); close(sockfd); err_info = "Ok"; return true; @@ -545,8 +569,8 @@ void SdkConfig::ShowClientConfig() { LOG_INFO("manager_cluster_url: " << manager_cluster_url_.c_str()); LOG_INFO( "enable_manager_url_from_cluster: " << enable_manager_url_from_cluster_ - ? "true" - : "false"); + ? "true" + : "false"); LOG_INFO("manager_update_interval: minutes" << manager_update_interval_); LOG_INFO("manager_url_timeout: " << manager_url_timeout_); LOG_INFO("max_tcp_num: " << max_proxy_num_); @@ -566,6 +590,8 @@ void SdkConfig::ShowClientConfig() { LOG_INFO("max_group_id_num: " << max_group_id_num_); LOG_INFO("max_stream_id_num: " << max_stream_id_num_); LOG_INFO("isolation_level: " << isolation_level_); + LOG_INFO("max_instance: " << max_instance_); + LOG_INFO("max_cache_num: " << max_cache_num_); } } // namespace inlong diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.h b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.h index 6d7b23dc218..f8581a1198a 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.h +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.h @@ -50,6 +50,9 @@ class SdkConfig { uint32_t send_buf_size_; // Send buf size, bid granularity uint32_t max_group_id_num_; // Send buf size, bid granularity uint32_t max_stream_id_num_; // Send buf size, bid granularity + uint32_t max_cache_num_; + uint32_t max_instance_; + uint32_t instance_num_; // thread parameters uint32_t per_groupid_thread_nums_; // Sending thread per groupid diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/buffer_manager.h b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/buffer_manager.h new file mode 100644 index 00000000000..428188e38e7 --- /dev/null +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/buffer_manager.h @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include + +#include "../config/sdk_conf.h" +#include "../utils/send_buffer.h" + +#ifndef INLONG_BUFFER_MANAGER_H +#define INLONG_BUFFER_MANAGER_H + +namespace inlong { +class BufferManager { + private: + std::queue buffer_queue_; + mutable std::mutex mutex_; + uint32_t queue_limit_; + BufferManager() { + uint32_t data_capacity_ = std::max(SdkConfig::getInstance()->max_msg_size_, + SdkConfig::getInstance()->pack_size_); + uint32_t buffer_num = + (SdkConfig::getInstance()->recv_buf_size_ / data_capacity_) * + SdkConfig::getInstance()->instance_num_; + queue_limit_ = + std::min(SdkConfig::getInstance()->max_cache_num_, buffer_num); + LOG_INFO("Data capacity:" + << data_capacity_ << ", buffer num: " << buffer_num + << ", instance num: " << SdkConfig::getInstance()->instance_num_ + << ", limit: " << queue_limit_ << " ,max cache num: " + << SdkConfig::getInstance()->max_cache_num_); + for ( uint32_t index = 0; index < queue_limit_; index++) { + std::shared_ptr send_buffer = + std::make_shared(data_capacity_); + if (send_buffer == nullptr) { + LOG_INFO("Buffer manager is null"); + continue; + } + AddSendBuffer(send_buffer); + } + } + + public: + static BufferManager *GetInstance() { + static BufferManager instance; + return &instance; + } + SendBufferPtrT GetSendBuffer() { + std::lock_guard lck(mutex_); + if (buffer_queue_.empty()) { + return nullptr; + } + SendBufferPtrT buf = buffer_queue_.front(); + buffer_queue_.pop(); + return buf; + } + void AddSendBuffer(const SendBufferPtrT &send_buffer) { + if (nullptr == send_buffer) { + return; + } + send_buffer->releaseBuf(); + std::lock_guard lck(mutex_); + if (buffer_queue_.size() > queue_limit_) { + return; + } + buffer_queue_.emplace(send_buffer); + } +}; +} // namespace inlong +#endif // INLONG_BUFFER_MANAGER_H diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h index 1dbebd03db6..399bd1b3487 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h @@ -41,6 +41,8 @@ static const int32_t kSendBufSize = 10240000; static const int32_t kRecvBufSize = 10240000; static const uint32_t kMaxGroupIdNum = 50; static const uint32_t kMaxStreamIdNum = 100; +static const uint32_t kMaxCacheNum = 10; +static const uint32_t kMaxInstance = 30; static const int32_t kDispatchIntervalZip = 8; static const int32_t kDispatchIntervalSend = 10;