Skip to content

Commit

Permalink
Make openbmp support redis subscription/population on sonic.
Browse files Browse the repository at this point in the history
  • Loading branch information
FengPan-Frank committed Apr 9, 2024
1 parent 1a615a3 commit 283d443
Show file tree
Hide file tree
Showing 10 changed files with 819 additions and 51 deletions.
72 changes: 57 additions & 15 deletions Server/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -33,25 +33,51 @@ find_library(LIBYAML_CPP_LIBRARY
lib64
lib)

find_path(LIBRDKAFKA_INCLUDE_DIR
librdkafka/rdkafkacpp.h
#find_path(LIBRDKAFKA_INCLUDE_DIR
# librdkafka/rdkafkacpp.h
# HINTS
# ${HINT_ROOT_DIR}
# PATH_SUFFIXES
# include)

#find_library(LIBRDKAFKA_LIBRARY
# NAMES
# librdkafka.a rdkafka
# HINTS
# ${HINT_ROOT_DIR}
# PATH_SUFFIXES
# lib64
# lib)

#find_library(LIBRDKAFKA_CPP_LIBRARY
# NAMES
# librdkafka++.a rdkafka++
# HINTS
# ${HINT_ROOT_DIR}
# PATH_SUFFIXES
# lib64
# lib)


find_path(LIBSWSSCOMMON_INCLUDE_DIR
sonic-swss-common/common/dbconnector.h
HINTS
${HINT_ROOT_DIR}
PATH_SUFFIXES
include)

find_library(LIBRDKAFKA_LIBRARY
find_library(LIBHIREDIS_LIBRARY
NAMES
librdkafka.a rdkafka
libhiredis.a hiredis
HINTS
${HINT_ROOT_DIR}
PATH_SUFFIXES
lib64
lib)

find_library(LIBRDKAFKA_CPP_LIBRARY
find_library(LIBSWSSCOMMON_LIBRARY
NAMES
librdkafka++.a rdkafka++
libswsscommon.a libswsscommon
HINTS
${HINT_ROOT_DIR}
PATH_SUFFIXES
Expand All @@ -67,10 +93,22 @@ find_library(LIBRT_LIBRARY
lib64
lib)

if (NOT LIBRDKAFKA_INCLUDE_DIR OR NOT LIBRDKAFKA_LIBRARY OR NOT LIBRDKAFKA_CPP_LIBRARY)
Message (FATAL_ERROR "Librdkafka was not found, cannot proceed. Visit https://github.com/edenhill/librdkafka for details on how to install it.")
#if (NOT LIBRDKAFKA_INCLUDE_DIR OR NOT LIBRDKAFKA_LIBRARY OR NOT LIBRDKAFKA_CPP_LIBRARY)
# Message (FATAL_ERROR "Librdkafka was not found, cannot proceed. Visit https://github.com/edenhill/librdkafka for details on how to install it.")
#else ()
# Message ("lib = " ${LIBRDKAFKA_LIBRARY})
#endif()

if (NOT LIBHIREDIS_LIBRARY)
Message (FATAL_ERROR "Libhiredis was not found, cannot proceed. Visit https://github.com/redis/hiredis for details on how to install it.")
#else ()
# Message ("lib = " ${LIBHIREDIS_LIBRARY})
endif()

if (NOT LIBSWSSCOMMON_INCLUDE_DIR OR NOT LIBSWSSCOMMON_LIBRARY)
Message (FATAL_ERROR "swsscommon was not found, cannot proceed. Visit https://github.com/sonic-net/sonic-swss-common for details on how to install it.")
#else ()
# Message ("lib = " ${LIBSWSSCOMMON_LIBRARY})
endif()

if (NOT LIBYAML_CPP_INCLUDE_DIR OR NOT LIBYAML_CPP_LIBRARY)
Expand All @@ -82,19 +120,20 @@ if (NOT LIBRT_LIBRARY AND NOT MACOSX)
endif()

# Update the include dir
include_directories(${LIBRDKAFKA_INCLUDE_DIR} ${LIBYAML_CPP_INCLUDE_DIR} src/ src/bmp src/bgp src/bgp/linkstate src/kafka)
#include_directories(${LIBRDKAFKA_INCLUDE_DIR} ${LIBYAML_CPP_INCLUDE_DIR} src/ src/bmp src/bgp src/bgp/linkstate src/kafka)
include_directories(${LIBSWSSCOMMON_INCLUDE_DIR} ${LIBYAML_CPP_INCLUDE_DIR} src/ src/bmp src/bgp src/bgp/linkstate src/redis)
#link_directories(${LIBRDKAFKA_LIBRARY})


# Define the source files to compile
set (SRC_FILES
src/bmp/BMPListener.cpp
src/bmp/BMPReader.cpp
src/kafka/MsgBusImpl_kafka.cpp
src/kafka/KafkaEventCallback.cpp
src/kafka/KafkaDeliveryReportCallback.cpp
src/kafka/KafkaTopicSelector.cpp
src/kafka/KafkaPeerPartitionerCallback.cpp
#src/kafka/MsgBusImpl_kafka.cpp
#src/kafka/KafkaEventCallback.cpp
#src/kafka/KafkaDeliveryReportCallback.cpp
#src/kafka/KafkaTopicSelector.cpp
#src/kafka/KafkaPeerPartitionerCallback.cpp
src/openbmp.cpp
src/bmp/parseBMP.cpp
src/md5.cpp
Expand All @@ -112,6 +151,8 @@ set (SRC_FILES
src/bgp/EVPN.cpp
src/bgp/linkstate/MPLinkState.cpp
src/bgp/linkstate/MPLinkStateAttr.cpp
src/RedisManager.cpp
src/redis/MsgBusImpl_redis.cpp
)

# Disable warnings
Expand All @@ -134,7 +175,8 @@ if ("${CMAKE_CXX_COMPILER_ID}" STREQUAL "Clang" OR CMAKE_COMPILER_IS_GNUCXX)
endif()

# Set the libs to link
set (LIBS pthread ${LIBYAML_CPP_LIBRARY} ${LIBRDKAFKA_CPP_LIBRARY} ${LIBRDKAFKA_LIBRARY} z ${SSL_LIBS} dl)
#set (LIBS pthread ${LIBYAML_CPP_LIBRARY} ${LIBRDKAFKA_CPP_LIBRARY} ${LIBRDKAFKA_LIBRARY} z ${SSL_LIBS} dl)
set (LIBS pthread ${LIBYAML_CPP_LIBRARY} ${LIBHIREDIS_LIBRARY} ${LIBSWSSCOMMON_LIBRARY} z ${SSL_LIBS} dl)

# Set the binary
add_executable (openbmpd ${SRC_FILES})
Expand Down
4 changes: 3 additions & 1 deletion Server/src/Config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
#include <boost/algorithm/string/replace.hpp>

#include "Config.h"
#include "kafka/KafkaTopicSelector.h"
//#include "kafka/KafkaTopicSelector.h"

/*********************************************************************//**
* Constructor for class
Expand Down Expand Up @@ -62,6 +62,7 @@ Config::Config() {
* The keys match the configuration node/vars. Topic name nodes will be ignored if
* not initialized here.
*/
/*
topic_names_map[MSGBUS_TOPIC_VAR_COLLECTOR] = MSGBUS_TOPIC_COLLECTOR;
topic_names_map[MSGBUS_TOPIC_VAR_ROUTER] = MSGBUS_TOPIC_ROUTER;
topic_names_map[MSGBUS_TOPIC_VAR_PEER] = MSGBUS_TOPIC_PEER;
Expand All @@ -74,6 +75,7 @@ Config::Config() {
topic_names_map[MSGBUS_TOPIC_VAR_LS_PREFIX] = MSGBUS_TOPIC_LS_PREFIX;
topic_names_map[MSGBUS_TOPIC_VAR_L3VPN] = MSGBUS_TOPIC_L3VPN;
topic_names_map[MSGBUS_TOPIC_VAR_EVPN] = MSGBUS_TOPIC_EVPN;
*/
}

/*********************************************************************//**
Expand Down
236 changes: 236 additions & 0 deletions Server/src/RedisManager.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
/*
* Copyright (c) 2024 Microsoft, Inc. and others. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*
*/

#include "RedisManager.h"


/*********************************************************************//**
* Constructor for class
***********************************************************************/
RedisManager::RedisManager() : stateDb_(BMP_DB_NAME, 0, true) {
swss::SonicDBConfig::initialize();
swss::SonicDBConfig::initializeGlobalConfig();
separator_ = swss::SonicDBConfig::getSeparator(BMP_DB_NAME);
exit_ = false;
}

/*********************************************************************//**
* Constructor for class
***********************************************************************/
RedisManager::~RedisManager() {
if (!exit_) {
exit_ = true;
for (auto& threadPtr : threadList_) {
threadPtr->join();
}
}
}


/*********************************************************************
* Setup logger for this class
*
* \param [in] logPtr logger pointer
***********************************************************************/
void RedisManager::Setup(Logger *logPtr, BMPListener::ClientInfo *client) {
logger = logPtr;
client_ = client;
}



/**
* Get Key separator for deletion
*
* \param [in] N/A
*/
std::string RedisManager::GetKeySeparator() {
return separator_;
}


/**
* WriteBMPTable
*
* \param [in] table Reference to table name
* \param [in] key Reference to various keys list
* \param [in] fieldValues Reference to field-value pairs
*/
bool RedisManager::WriteBMPTable(const std::string& table, const std::vector<std::string>& keys, const std::vector<swss::FieldValueTuple> fieldValues) {

if (enabledTables_.find(table) == enabledTables_.end()) {
LOG_INFO("RedisManager %s is disabled", table.c_str());
return false;
}

swss::Table stateBMPTable(&stateDb_, table);
std::string fullKey;
for (const auto& key : keys) {
fullKey += key;
fullKey += separator_;
}
fullKey.erase(fullKey.size() - 1);

LOG_INFO("RedisManager WriteBMPTable key = %s", fullKey.c_str());

stateBMPTable.set(fullKey, fieldValues);
return true;
}


/**
* RemoveBMPTable
*
* \param [in] keys Reference to various keys
*/
bool RedisManager::RemoveBMPTable(const std::vector<std::string>& keys) {

stateDb_.del(keys);
return true;
}

/**
* DisconnectBMP
*
* \param [in] N/A
*/
void RedisManager::DisconnectBMP() {
LOG_INFO("RedisManager DisconnectBMP");
close(client_->c_sock);
client_->c_sock = 0;
}

/**
* ExitRedisManager
*
* \param [in] N/A
*/
void RedisManager::ExitRedisManager() {
exit_ = true;
for (auto& threadPtr : threadList_) {
threadPtr->join();
}
}

/**
* ReadBMPTable, there will be dedicated thread be launched inside and monitor corresponding redis table.
*
* \param [in] tables table names to be subscribed.
*/
void RedisManager::SubscriberWorker(const std::string& table) {
try {
swss::DBConnector cfgDb("CONFIG_DB", 0, true);

swss::SubscriberStateTable conf_table(&cfgDb, table);
swss::Select s;
s.addSelectable(&conf_table);

while (!exit_) {
swss::Selectable *sel;
int ret;

ret = s.select(&sel, BMP_CFG_TABLE_SELECT_TIMEOUT);
if (ret == swss::Select::ERROR) {
SWSS_LOG_NOTICE("Error: %s!", strerror(errno));
continue;
}
if (ret == swss::Select::TIMEOUT) {
continue;
}

swss::KeyOpFieldsValuesTuple kco;
conf_table.pop(kco);

if (std::get<0>(kco) == "SET") {
if (std::get<1>(kco) == "true") {
EnableTable(table);
}
else {
DisableTable(table);
DisconnectBMP();
}
}
else if (std::get<0>(kco) == "DEL")
{
LOG_ERR("Config should not be deleted");
}
}
}
catch (const exception &e) {
LOG_ERR("Runtime error: %s", e.what());
}
}



/**
* ReadBMPTable, there will be dedicated thread be launched inside and monitor corresponding redis table.
*
* \param [in] tables table names to be subscribed.
*/
bool RedisManager::ReadBMPTable(const std::vector<std::string>& tables) {
for (const auto& table : tables) {
std::shared_ptr<std::thread> threadPtr = std::make_shared<std::thread>(
std::bind(&RedisManager::SubscriberWorker, this, table));
threadList_.push_back(threadPtr);
}
return true;
}


/**
* Enable specific Table
*
* \param [in] table Reference to table name, like BGP_NEIGHBOR_TABLE/BGP_RIB_OUT_TABLE/BGP_RIB_IN_TABLE
*/
bool RedisManager::EnableTable(const std::string & table) {
enabledTables_.insert(table);
return true;
}

/**
* Enable BGP_Neighbor* Table
*
* \param [in] table Reference to table name BGP_NEIGHBOR_TABLE/BGP_RIB_OUT_TABLE/BGP_RIB_IN_TABLE
*/
bool RedisManager::DisableTable(const std::string & table) {
enabledTables_.erase(table);
return ResetBMPTable(table);
}


/**
* Reset ResetBMPTable, this will flush redis
*
* \param [in] table Reference to table name BGP_NEIGHBOR_TABLE/BGP_RIB_OUT_TABLE/BGP_RIB_IN_TABLE
*/
bool RedisManager::ResetBMPTable(const std::string & table) {

LOG_INFO("RedisManager ResetBMPTable %s", table.c_str());
swss::Table stateBMPTable(&stateDb_, table);
std::vector<std::string> keys;
stateBMPTable.getKeys(keys);
stateDb_.del(keys);

return true;
}



/**
* Reset all Tables once FRR reconnects to BMP, this will not disable table population
*
* \param [in] N/A
*/
void RedisManager::ResetAllTables() {
LOG_INFO("RedisManager ResetAllTables");
for (const auto& enabledTable : enabledTables_) {
ResetBMPTable(enabledTable);
}
}
Loading

0 comments on commit 283d443

Please sign in to comment.