Skip to content

Commit

Permalink
use memory_manager for allocate memory, instead hugepage_create_*
Browse files Browse the repository at this point in the history
  • Loading branch information
Timur Aitov committed Mar 19, 2024
1 parent 027c350 commit 3270b7c
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 213 deletions.
103 changes: 45 additions & 58 deletions dataplane/dataplane.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,21 +40,6 @@

common::log::LogPriority common::log::logPriority = common::log::TLOG_INFO;

YADECAP_UNUSED
static void printHugepageMemory(const char* prefix, tSocketId socketId) ///< @todo
{
rte_malloc_socket_stats stats;
if (rte_malloc_get_socket_stats(socketId, &stats) == 0)
{
YADECAP_LOG_INFO("%sheap_totalsz_bytes: %lu MB\n", prefix, stats.heap_totalsz_bytes / (1024 * 1024));
YADECAP_LOG_INFO("%sheap_freesz_bytes: %lu MB\n", prefix, stats.heap_freesz_bytes / (1024 * 1024));
YADECAP_LOG_INFO("%sgreatest_free_size: %lu MB\n", prefix, stats.greatest_free_size / (1024 * 1024));
YADECAP_LOG_INFO("%sfree_count: %u\n", prefix, stats.free_count);
YADECAP_LOG_INFO("%salloc_count: %u\n", prefix, stats.alloc_count);
YADECAP_LOG_INFO("%sheap_allocsz_bytes: %lu MB\n", prefix, stats.heap_allocsz_bytes / (1024 * 1024));
}
}

cDataPlane::cDataPlane() :
currentGlobalBaseId(0),
globalBaseSerial(0),
Expand Down Expand Up @@ -95,6 +80,7 @@ cDataPlane::cDataPlane() :
{eConfigType::balancer_tcp_fin_timeout, YANET_CONFIG_BALANCER_STATE_TIMEOUT_DEFAULT},
{eConfigType::balancer_udp_timeout, YANET_CONFIG_BALANCER_STATE_TIMEOUT_DEFAULT},
{eConfigType::balancer_other_protocols_timeout, YANET_CONFIG_BALANCER_STATE_TIMEOUT_DEFAULT},
{eConfigType::neighbor_ht_size, 64 * 1024},
};
}

Expand Down Expand Up @@ -594,45 +580,64 @@ eResult cDataPlane::initGlobalBases()
auto create_globalbase_atomics = [this](const tSocketId& socket_id) -> eResult {
if (globalBaseAtomics.find(socket_id) == globalBaseAtomics.end())
{
auto* globalbase_atomic = hugepage_create_static<dataplane::globalBase::atomic>(socket_id,
this,
socket_id);
auto* globalbase_atomic = memory_manager.create_static<dataplane::globalBase::atomic>("globalbase.atomic",
socket_id,
this,
socket_id);
if (!globalbase_atomic)
{
return eResult::errorAllocatingMemory;
}

{
auto* ipv4_states_ht = hugepage_create_dynamic<dataplane::globalBase::acl::ipv4_states_ht>(socket_id, getConfigValue(eConfigType::acl_states4_ht_size), globalbase_atomic->updater.fw4_state);
using namespace dataplane::globalBase;

auto* ipv4_states_ht = memory_manager.create<acl::ipv4_states_ht>("acl.state.v4.ht",
socket_id,
acl::ipv4_states_ht::calculate_sizeof(getConfigValue(eConfigType::acl_states4_ht_size)));
if (!ipv4_states_ht)
{
return eResult::errorAllocatingMemory;
}

auto* ipv6_states_ht = hugepage_create_dynamic<dataplane::globalBase::acl::ipv6_states_ht>(socket_id, getConfigValue(eConfigType::acl_states6_ht_size), globalbase_atomic->updater.fw6_state);
auto* ipv6_states_ht = memory_manager.create<acl::ipv6_states_ht>("acl.state.v6.ht",
socket_id,
acl::ipv6_states_ht::calculate_sizeof(getConfigValue(eConfigType::acl_states6_ht_size)));
if (!ipv6_states_ht)
{
return eResult::errorAllocatingMemory;
}

auto* nat64stateful_lan_state = hugepage_create_dynamic<dataplane::globalBase::nat64stateful::lan_ht>(socket_id, getConfigValue(eConfigType::nat64stateful_states_size), globalbase_atomic->updater.nat64stateful_lan_state);
auto* nat64stateful_lan_state = memory_manager.create<nat64stateful::lan_ht>("nat64stateful.state.lan.ht",
socket_id,
nat64stateful::lan_ht::calculate_sizeof(getConfigValue(eConfigType::nat64stateful_states_size)));
if (!nat64stateful_lan_state)
{
return eResult::errorAllocatingMemory;
}

auto* nat64stateful_wan_state = hugepage_create_dynamic<dataplane::globalBase::nat64stateful::wan_ht>(socket_id, getConfigValue(eConfigType::nat64stateful_states_size), globalbase_atomic->updater.nat64stateful_wan_state);
auto* nat64stateful_wan_state = memory_manager.create<nat64stateful::wan_ht>("nat64stateful.state.wan.ht",
socket_id,
nat64stateful::wan_ht::calculate_sizeof(getConfigValue(eConfigType::nat64stateful_states_size)));
if (!nat64stateful_wan_state)
{
return eResult::errorAllocatingMemory;
}

auto* balancer_state = hugepage_create_dynamic<dataplane::globalBase::balancer::state_ht>(socket_id, getConfigValue(eConfigType::balancer_state_ht_size), globalbase_atomic->updater.balancer_state);
auto* balancer_state = memory_manager.create<dataplane::globalBase::balancer::state_ht>("balancer.state.ht",
socket_id,
dataplane::globalBase::balancer::state_ht::calculate_sizeof(getConfigValue(eConfigType::balancer_state_ht_size)));
if (!balancer_state)
{
return eResult::errorAllocatingMemory;
}

globalbase_atomic->updater.fw4_state.update_pointer(ipv4_states_ht, socket_id, getConfigValue(eConfigType::acl_states4_ht_size));
globalbase_atomic->updater.fw6_state.update_pointer(ipv6_states_ht, socket_id, getConfigValue(eConfigType::acl_states6_ht_size));
globalbase_atomic->updater.nat64stateful_lan_state.update_pointer(nat64stateful_lan_state, socket_id, getConfigValue(eConfigType::nat64stateful_states_size));
globalbase_atomic->updater.nat64stateful_wan_state.update_pointer(nat64stateful_wan_state, socket_id, getConfigValue(eConfigType::nat64stateful_states_size));
globalbase_atomic->updater.balancer_state.update_pointer(balancer_state, socket_id, getConfigValue(eConfigType::balancer_state_ht_size));

globalbase_atomic->fw4_state = ipv4_states_ht;
globalbase_atomic->fw6_state = ipv6_states_ht;
globalbase_atomic->nat64stateful_lan_state = nat64stateful_lan_state;
Expand All @@ -647,9 +652,10 @@ eResult cDataPlane::initGlobalBases()
};

auto create_globalbase = [this](const tSocketId& socket_id) -> dataplane::globalBase::generation* {
auto* globalbase = hugepage_create_static<dataplane::globalBase::generation>(socket_id,
this,
socket_id);
auto* globalbase = memory_manager.create_static<dataplane::globalBase::generation>("globalbase.generation",
socket_id,
this,
socket_id);
if (!globalbase)
{
return nullptr;
Expand Down Expand Up @@ -738,8 +744,9 @@ eResult cDataPlane::initWorkers()

YADECAP_LOG_INFO("initWorker. coreId: %u [slow worker]\n", coreId);

auto* worker = hugepage_create_static<cWorker>(socket_id,
this);
auto* worker = memory_manager.create_static<cWorker>("worker",
socket_id,
this);
if (!worker)
{
return eResult::errorAllocatingMemory;
Expand Down Expand Up @@ -783,8 +790,9 @@ eResult cDataPlane::initWorkers()

YADECAP_LOG_INFO("initWorker. coreId: %u\n", coreId);

auto* worker = hugepage_create_static<cWorker>(socket_id,
this);
auto* worker = memory_manager.create_static<cWorker>("worker",
socket_id,
this);
if (!worker)
{
return eResult::errorAllocatingMemory;
Expand Down Expand Up @@ -936,8 +944,9 @@ eResult cDataPlane::initWorkers()

YADECAP_LOG_INFO("initWorker. coreId: %u [worker_gc]\n", core_id);

auto* worker = hugepage_create_static<worker_gc_t>(socket_id,
this);
auto* worker = memory_manager.create_static<worker_gc_t>("worker_gc",
socket_id,
this);
if (!worker)
{
return eResult::errorAllocatingMemory;
Expand Down Expand Up @@ -1079,32 +1088,6 @@ void cDataPlane::init_worker_base()
neighbor.update_worker_base(base_nexts);
}

void cDataPlane::hugepage_destroy(void* pointer)
{
auto it = hugepage_pointers.find(pointer);
if (it == hugepage_pointers.end())
{
YADECAP_LOG_ERROR("unknown pointer: %p\n", pointer);
return;
}

hugepage_pointers.erase(it);
}

void cDataPlane::hugepage_debug(tSocketId socket_id)
{
rte_malloc_socket_stats stats;
if (rte_malloc_get_socket_stats(socket_id, &stats) == 0)
{
YADECAP_LOG_INFO("heap_totalsz_bytes: %lu MB\n", stats.heap_totalsz_bytes / (1024 * 1024));
YADECAP_LOG_INFO("heap_freesz_bytes: %lu MB\n", stats.heap_freesz_bytes / (1024 * 1024));
YADECAP_LOG_INFO("greatest_free_size: %lu MB\n", stats.greatest_free_size / (1024 * 1024));
YADECAP_LOG_INFO("free_count: %u\n", stats.free_count);
YADECAP_LOG_INFO("alloc_count: %u\n", stats.alloc_count);
YADECAP_LOG_INFO("heap_allocsz_bytes: %lu MB\n", stats.heap_allocsz_bytes / (1024 * 1024));
}
}

int cDataPlane::lcoreThread(void* args)
{
cDataPlane* dataPlane = (cDataPlane*)args;
Expand Down Expand Up @@ -1898,6 +1881,10 @@ eResult cDataPlane::parseConfigValues(const nlohmann::json& json)
{
configValues[eConfigType::balancer_other_protocols_timeout] = json["balancer_other_protocols_timeout"];
}
if (exist(json, "neighbor_ht_size"))
{
configValues[eConfigType::neighbor_ht_size] = json["neighbor_ht_size"];
}

return eResult::success;
}
Expand Down
140 changes: 1 addition & 139 deletions dataplane/dataplane.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ enum class eConfigType
balancer_tcp_syn_ack_timeout,
balancer_tcp_fin_timeout,
balancer_other_protocols_timeout,
neighbor_ht_size,
};

struct tDataPlaneConfig
Expand Down Expand Up @@ -139,142 +140,6 @@ class cDataPlane
return current_time;
}

template<typename type,
typename... args_t>
type* hugepage_create_static(int socket_id,
const args_t&... args)
{
size_t size = sizeof(type) + 2 * RTE_CACHE_LINE_SIZE;

YADECAP_LOG_INFO("yanet_alloc(size: %lu, align: %u, socket: %u)\n",
size,
RTE_CACHE_LINE_SIZE,
socket_id);

void* pointer = rte_zmalloc_socket(nullptr,
size,
RTE_CACHE_LINE_SIZE,
socket_id);
if (pointer == nullptr)
{
YADECAP_LOG_ERROR("yanet_alloc(size: %lu)\n", size);
hugepage_debug(socket_id);
return nullptr;
}

type* result = new ((type*)pointer) type(args...);

{
std::lock_guard<std::mutex> guard(hugepage_pointers_mutex);
hugepage_pointers.try_emplace(result, size, [result]() {
YADECAP_LOG_INFO("yanet_free()\n");
result->~type();
rte_free(result);
});
}

return result;
}

template<typename type,
typename... args_t>
type* hugepage_create_static_array(int socket_id, const size_t count, const args_t&... args)
{
size_t size = count * sizeof(type) + 2 * RTE_CACHE_LINE_SIZE;

YADECAP_LOG_INFO("yanet_alloc(size: %lu, align: %u, socket: %u)\n",
size,
RTE_CACHE_LINE_SIZE,
socket_id);

void* pointer = rte_zmalloc_socket(nullptr,
size,
RTE_CACHE_LINE_SIZE,
socket_id);
if (pointer == nullptr)
{
YADECAP_LOG_ERROR("yanet_alloc(size: %lu)\n", size);
hugepage_debug(socket_id);
return nullptr;
}

for (size_t i = 0;
i < count;
i++)
{
new (((type*)pointer) + i) type(args...);
}

{
std::lock_guard<std::mutex> guard(hugepage_pointers_mutex);
hugepage_pointers.try_emplace(pointer, size, [pointer, count]() {
YADECAP_LOG_INFO("yanet_free()\n");
for (size_t i = 0;
i < count;
i++)
{
type* result = ((type*)pointer) + i;
result->~type();
}
rte_free(pointer);
});
}

return (type*)pointer;
}

template<typename type,
typename elems_t,
typename updater_type,
typename... args_t>
type* hugepage_create_dynamic(int socket_id,
elems_t elems,
updater_type& updater,
const args_t&... args)
{
size_t size = type::calculate_sizeof(elems);
if (!size)
{
return nullptr;
}

size += 2 * RTE_CACHE_LINE_SIZE;

YADECAP_LOG_INFO("yanet_alloc(size: %lu, align: %u, socket: %u)\n",
size,
RTE_CACHE_LINE_SIZE,
socket_id);

void* pointer = rte_zmalloc_socket(nullptr,
size,
RTE_CACHE_LINE_SIZE,
socket_id);
if (pointer == nullptr)
{
YADECAP_LOG_ERROR("yanet_alloc(size: %lu)\n", size);
hugepage_debug(socket_id);
return nullptr;
}

type* result = new ((type*)pointer) type(args...);

{
std::lock_guard<std::mutex> guard(hugepage_pointers_mutex);
hugepage_pointers.try_emplace(result, size, [result]() {
YADECAP_LOG_INFO("yanet_free()\n");
result->~type();
rte_free(result);
});
}

updater.update_pointer(result, socket_id, elems, args...);

return result;
}

void hugepage_destroy(void* pointer);
void hugepage_debug(tSocketId socket_id);

protected:
eResult parseConfig(const std::string& configFilePath);
eResult parseJsonPorts(const nlohmann::json& json);
Expand Down Expand Up @@ -353,9 +218,6 @@ class cDataPlane

std::map<tSocketId, std::tuple<key_t, void*>> shm_by_socket_id;

std::mutex hugepage_pointers_mutex;
std::map<void*, hugepage_pointer> hugepage_pointers;

std::set<tSocketId> socket_ids;
std::map<tSocketId, worker_gc_t*> socket_worker_gcs;
std::vector<cWorker*> workers_vector;
Expand Down
18 changes: 18 additions & 0 deletions dataplane/memory_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,24 @@ class memory_manager
return new (reinterpret_cast<type*>(pointer)) type(args...);
}

template<typename type,
typename... args_t>
type* create_static(const char* name,
const tSocketId socket_id,
const args_t&... args)
{
void* pointer = alloc(name, socket_id, sizeof(type), [](void* pointer) {
reinterpret_cast<type*>(pointer)->~type();
});

if (pointer == nullptr)
{
return nullptr;
}

return new (reinterpret_cast<type*>(pointer)) type(args...);
}

template<typename type,
typename... args_t>
type* create_static_array(const char* name,
Expand Down
Loading

0 comments on commit 3270b7c

Please sign in to comment.