Skip to content

Commit

Permalink
[INLONG-10780][SDK] Optimize memory management for DataProxy CPP SDK
Browse files Browse the repository at this point in the history
  • Loading branch information
doleyzi committed Aug 14, 2024
1 parent 7148c1c commit 4120b6b
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ bool SdkConfig::ParseConfig(const std::string &config_path) {
// Guaranteed to only parse the configuration file once
if (!__sync_bool_compare_and_swap(&parsed_, false, true)) {
LOG_INFO("ParseConfig has parsed .");
if (++instance_num_ > max_instance_) {
return false;
}
return true;
}

Expand Down Expand Up @@ -92,7 +95,7 @@ void SdkConfig::defaultInit() {
load_balance_interval_ = constants::kLoadBalanceInterval;
heart_beat_interval_ = constants::kHeartBeatInterval;
enable_balance_ = constants::kEnableBalance;
isolation_level_=constants::IsolationLevel::kLevelSecond;
isolation_level_ = constants::IsolationLevel::kLevelSecond;

// cache parameter
send_buf_size_ = constants::kSendBufSize;
Expand Down Expand Up @@ -132,6 +135,10 @@ void SdkConfig::defaultInit() {
enable_setaffinity_ = constants::kEnableSetAffinity;
mask_cpu_affinity_ = constants::kMaskCPUAffinity;
extend_field_ = constants::kExtendField;

need_auth_ = constants::kNeedAuth;
max_instance_ = constants::kMaxInstance;
instance_num_ = 1;
}

void SdkConfig::InitThreadParam(const rapidjson::Value &doc) {
Expand Down Expand Up @@ -212,6 +219,14 @@ void SdkConfig::InitCacheParam(const rapidjson::Value &doc) {
} else {
max_stream_id_num_ = constants::kMaxGroupIdNum;
}

// max_cache_num
if (doc.HasMember("max_cache_num") && doc["max_cache_num"].IsInt() && doc["max_cache_num"].GetInt() >= 0) {
const rapidjson::Value &obj = doc["max_cache_num"];
max_cache_num_ = obj.GetInt();
} else {
max_cache_num_ = constants::kMaxCacheNum;
}
}

void SdkConfig::InitZipParam(const rapidjson::Value &doc) {
Expand Down Expand Up @@ -431,9 +446,10 @@ void SdkConfig::InitAuthParm(const rapidjson::Value &doc) {
} else {
need_auth_ = constants::kNeedAuth;
LOG_INFO("need_auth is not expect, then use default:%s" << need_auth_
? "true"
: "false");
? "true"
: "false");
}

}
void SdkConfig::OthersParam(const rapidjson::Value &doc) {
// ser_ip
Expand Down Expand Up @@ -475,12 +491,20 @@ void SdkConfig::OthersParam(const rapidjson::Value &doc) {
} else {
extend_field_ = constants::kExtendField;
}

// instance num
if (doc.HasMember("max_instance") && doc["max_instance"].IsInt() && doc["max_instance"].GetInt() > 0) {
const rapidjson::Value &obj = doc["max_instance"];
max_instance_ = obj.GetInt();
} else {
max_instance_ = constants::kMaxInstance;
}
}

bool SdkConfig::GetLocalIPV4Address(std::string& err_info, std::string& localhost) {
bool SdkConfig::GetLocalIPV4Address(std::string &err_info, std::string &localhost) {
int32_t sockfd;
int32_t ip_num = 0;
char buf[1024] = {0};
char buf[1024] = {0};
struct ifreq *ifreq;
struct ifreq if_flag;
struct ifconf ifconf;
Expand All @@ -493,7 +517,7 @@ bool SdkConfig::GetLocalIPV4Address(std::string& err_info, std::string& localhos
}

ioctl(sockfd, SIOCGIFCONF, &ifconf);
ifreq = (struct ifreq *)buf;
ifreq = (struct ifreq *) buf;
ip_num = ifconf.ifc_len / sizeof(struct ifreq);
for (int32_t i = 0; i < ip_num; i++, ifreq++) {
if (ifreq->ifr_flags != AF_INET) {
Expand All @@ -511,11 +535,11 @@ bool SdkConfig::GetLocalIPV4Address(std::string& err_info, std::string& localhos
continue;
}

if (!strncmp(inet_ntoa(((struct sockaddr_in*)&(ifreq->ifr_addr))->sin_addr),
if (!strncmp(inet_ntoa(((struct sockaddr_in *) &(ifreq->ifr_addr))->sin_addr),
"127.0.0.1", 7)) {
continue;
}
localhost = inet_ntoa(((struct sockaddr_in*)&(ifreq->ifr_addr))->sin_addr);
localhost = inet_ntoa(((struct sockaddr_in *) &(ifreq->ifr_addr))->sin_addr);
close(sockfd);
err_info = "Ok";
return true;
Expand Down Expand Up @@ -545,8 +569,8 @@ void SdkConfig::ShowClientConfig() {
LOG_INFO("manager_cluster_url: " << manager_cluster_url_.c_str());
LOG_INFO(
"enable_manager_url_from_cluster: " << enable_manager_url_from_cluster_
? "true"
: "false");
? "true"
: "false");
LOG_INFO("manager_update_interval: minutes" << manager_update_interval_);
LOG_INFO("manager_url_timeout: " << manager_url_timeout_);
LOG_INFO("max_tcp_num: " << max_proxy_num_);
Expand All @@ -566,6 +590,8 @@ void SdkConfig::ShowClientConfig() {
LOG_INFO("max_group_id_num: " << max_group_id_num_);
LOG_INFO("max_stream_id_num: " << max_stream_id_num_);
LOG_INFO("isolation_level: " << isolation_level_);
LOG_INFO("max_instance: " << max_instance_);
LOG_INFO("max_cache_num: " << max_cache_num_);
}

} // namespace inlong
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ class SdkConfig {
uint32_t send_buf_size_; // Send buf size, bid granularity
uint32_t max_group_id_num_; // Send buf size, bid granularity
uint32_t max_stream_id_num_; // Send buf size, bid granularity
uint32_t max_cache_num_;
uint32_t max_instance_;
uint32_t instance_num_;

// thread parameters
uint32_t per_groupid_thread_nums_; // Sending thread per groupid
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include <queue>

#include "../config/sdk_conf.h"
#include "../utils/send_buffer.h"

#ifndef INLONG_BUFFER_MANAGER_H
#define INLONG_BUFFER_MANAGER_H

namespace inlong {
class BufferManager {
private:
std::queue<SendBufferPtrT> buffer_queue_;
mutable std::mutex mutex_;
uint32_t queue_limit_;
BufferManager() {
uint32_t data_capacity_ = std::max(SdkConfig::getInstance()->max_msg_size_,
SdkConfig::getInstance()->pack_size_);
uint32_t buffer_num =
(SdkConfig::getInstance()->recv_buf_size_ / data_capacity_) *
SdkConfig::getInstance()->instance_num_;
queue_limit_ =
std::min(SdkConfig::getInstance()->max_cache_num_, buffer_num);
LOG_INFO("Data capacity:"
<< data_capacity_ << ", buffer num: " << buffer_num
<< ", instance num: " << SdkConfig::getInstance()->instance_num_
<< ", limit: " << queue_limit_ << " ,max cache num: "
<< SdkConfig::getInstance()->max_cache_num_);
for ( uint32_t index = 0; index < queue_limit_; index++) {
std::shared_ptr<SendBuffer> send_buffer =
std::make_shared<SendBuffer>(data_capacity_);
if (send_buffer == nullptr) {
LOG_INFO("Buffer manager is null");
continue;
}
AddSendBuffer(send_buffer);
}
}

public:
static BufferManager *GetInstance() {
static BufferManager instance;
return &instance;
}
SendBufferPtrT GetSendBuffer() {
std::lock_guard<std::mutex> lck(mutex_);
if (buffer_queue_.empty()) {
return nullptr;
}
SendBufferPtrT buf = buffer_queue_.front();
buffer_queue_.pop();
return buf;
}
void AddSendBuffer(const SendBufferPtrT &send_buffer) {
if (nullptr == send_buffer) {
return;
}
send_buffer->releaseBuf();
std::lock_guard<std::mutex> lck(mutex_);
if (buffer_queue_.size() > queue_limit_) {
return;
}
buffer_queue_.emplace(send_buffer);
}
};
} // namespace inlong
#endif // INLONG_BUFFER_MANAGER_H
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ static const int32_t kSendBufSize = 10240000;
static const int32_t kRecvBufSize = 10240000;
static const uint32_t kMaxGroupIdNum = 50;
static const uint32_t kMaxStreamIdNum = 100;
static const uint32_t kMaxCacheNum = 10;
static const uint32_t kMaxInstance = 30;

static const int32_t kDispatchIntervalZip = 8;
static const int32_t kDispatchIntervalSend = 10;
Expand Down

0 comments on commit 4120b6b

Please sign in to comment.