Skip to content

Commit

Permalink
Add refillToSync() into ConsumerBase to support warmboot. (#2866)
Browse files Browse the repository at this point in the history
Add refillToSync() into ConsumerBase to support warmboot.
Add warmboot support for zmq consumer.
  • Loading branch information
mint570 authored Oct 15, 2023
1 parent 755b260 commit f31ccd0
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 11 deletions.
27 changes: 18 additions & 9 deletions orchagent/orch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ size_t ConsumerBase::addToSync(const std::deque<KeyOpFieldsValuesTuple> &entries
}

// TODO: Table should be const
size_t Consumer::refillToSync(Table* table)
size_t ConsumerBase::refillToSync(Table* table)
{
std::deque<KeyOpFieldsValuesTuple> entries;
vector<string> keys;
Expand All @@ -178,7 +178,7 @@ size_t Consumer::refillToSync(Table* table)
return addToSync(entries);
}

size_t Consumer::refillToSync()
size_t ConsumerBase::refillToSync()
{
auto subTable = dynamic_cast<SubscriberStateTable *>(getSelectable());
if (subTable != NULL)
Expand All @@ -194,14 +194,23 @@ size_t Consumer::refillToSync()
} while (update_size != 0);
return total_size;
}
else
string tableName = getTableName();
auto consumerTable = dynamic_cast<ConsumerTableBase *>(getSelectable());
if (consumerTable != NULL)
{
// consumerTable is either ConsumerStateTable or ConsumerTable
auto db = getDbConnector();
string tableName = getTableName();
auto db = consumerTable->getDbConnector();
auto table = Table(db, tableName);
return refillToSync(&table);
}
auto zmqTable = dynamic_cast<ZmqConsumerStateTable *>(getSelectable());
if (zmqTable != NULL)
{
auto db = zmqTable->getDbConnector();
auto table = Table(db, tableName);
return refillToSync(&table);
}
return 0;
}

string ConsumerBase::dumpTuple(const KeyOpFieldsValuesTuple &tuple)
Expand Down Expand Up @@ -253,7 +262,7 @@ void Consumer::drain()

size_t Orch::addExistingData(const string& tableName)
{
auto consumer = dynamic_cast<Consumer *>(getExecutor(tableName));
auto consumer = dynamic_cast<ConsumerBase *>(getExecutor(tableName));
if (consumer == NULL)
{
SWSS_LOG_ERROR("No consumer %s in Orch", tableName.c_str());
Expand All @@ -267,7 +276,7 @@ size_t Orch::addExistingData(const string& tableName)
size_t Orch::addExistingData(Table *table)
{
string tableName = table->getTableName();
Consumer* consumer = dynamic_cast<Consumer *>(getExecutor(tableName));
ConsumerBase* consumer = dynamic_cast<ConsumerBase *>(getExecutor(tableName));
if (consumer == NULL)
{
SWSS_LOG_ERROR("No consumer %s in Orch", tableName.c_str());
Expand All @@ -285,7 +294,7 @@ bool Orch::bake()
{
string executorName = it.first;
auto executor = it.second;
auto consumer = dynamic_cast<Consumer *>(executor.get());
auto consumer = dynamic_cast<ConsumerBase *>(executor.get());
if (consumer == NULL)
{
continue;
Expand Down Expand Up @@ -537,7 +546,7 @@ void Orch::dumpPendingTasks(vector<string> &ts)
{
for (auto &it : m_consumerMap)
{
Consumer* consumer = dynamic_cast<Consumer *>(it.second.get());
ConsumerBase* consumer = dynamic_cast<ConsumerBase *>(it.second.get());
if (consumer == NULL)
{
SWSS_LOG_DEBUG("Executor is not a Consumer");
Expand Down
5 changes: 3 additions & 2 deletions orchagent/orch.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,9 @@ class ConsumerBase : public Executor {

// Returns: the number of entries added to m_toSync
size_t addToSync(const std::deque<swss::KeyOpFieldsValuesTuple> &entries);

size_t refillToSync();
size_t refillToSync(swss::Table* table);
};

class Consumer : public ConsumerBase {
Expand Down Expand Up @@ -194,8 +197,6 @@ class Consumer : public ConsumerBase {
return getDbConnector()->getDbName();
}

size_t refillToSync();
size_t refillToSync(swss::Table* table);
void execute() override;
void drain() override;
};
Expand Down

0 comments on commit f31ccd0

Please sign in to comment.