-
Notifications
You must be signed in to change notification settings - Fork 283
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[logger] Make map access thread safe and proper terminate thread #510
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,73 @@ | ||
#pragma once | ||
|
||
#include <map> | ||
#include <string> | ||
#include <mutex> | ||
|
||
namespace swss | ||
{ | ||
template <typename K, typename V> | ||
class ConcurentMap | ||
{ | ||
public: | ||
|
||
ConcurentMap() = default; | ||
|
||
public: | ||
|
||
size_t size() | ||
{ | ||
std::lock_guard<std::mutex> _lock(m_mutex); | ||
|
||
return m_map.size(); | ||
} | ||
|
||
bool contains(const K& key) | ||
{ | ||
std::lock_guard<std::mutex> _lock(m_mutex); | ||
|
||
return m_map.find(key) != m_map.end(); | ||
} | ||
|
||
void insert(const std::pair<K,V>& pair) | ||
{ | ||
std::lock_guard<std::mutex> _lock(m_mutex); | ||
|
||
m_map.insert(pair); | ||
} | ||
|
||
void set(const K& key, const V& value) | ||
{ | ||
std::lock_guard<std::mutex> _lock(m_mutex); | ||
|
||
m_map[key] = value; | ||
} | ||
|
||
// return copy | ||
V get(const K& key) | ||
{ | ||
std::lock_guard<std::mutex> _lock(m_mutex); | ||
|
||
return m_map[key]; | ||
} | ||
|
||
// return copy | ||
std::map<K,V> getCopy() | ||
{ | ||
std::lock_guard<std::mutex> _lock(m_mutex); | ||
|
||
return m_map; | ||
} | ||
|
||
private: | ||
|
||
ConcurentMap(const ConcurentMap&); | ||
ConcurentMap& operator=(const ConcurentMap&); | ||
|
||
private: | ||
|
||
std::map<K,V> m_map; | ||
|
||
std::mutex m_mutex; | ||
}; | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,9 +14,11 @@ | |
#include "consumerstatetable.h" | ||
#include "producerstatetable.h" | ||
|
||
namespace swss { | ||
using namespace swss; | ||
|
||
void err_exit(const char *fn, int ln, int e, const char *fmt, ...) | ||
#define MUTEX std::lock_guard<std::mutex> _lock(getInstance().m_mutex); | ||
|
||
void swss::err_exit(const char *fn, int ln, int e, const char *fmt, ...) | ||
{ | ||
va_list ap; | ||
char buff[1024]; | ||
|
@@ -31,13 +33,34 @@ void err_exit(const char *fn, int ln, int e, const char *fmt, ...) | |
abort(); | ||
} | ||
|
||
Logger::~Logger() { | ||
if (m_settingThread) { | ||
terminateSettingThread = true; | ||
Logger::~Logger() | ||
{ | ||
terminateSettingThread(); | ||
} | ||
|
||
void Logger::terminateSettingThread() | ||
{ | ||
// can't be executed under mutex, since it can cause deadlock | ||
|
||
if (m_settingThread) | ||
{ | ||
m_runSettingThread = false; | ||
|
||
m_settingThread->join(); | ||
|
||
m_settingThread = nullptr; | ||
} | ||
} | ||
|
||
void Logger::restartSettingThread() | ||
{ | ||
terminateSettingThread(); | ||
|
||
m_runSettingThread = true; | ||
|
||
m_settingThread.reset(new std::thread(&Logger::settingThread, this)); | ||
} | ||
|
||
const Logger::PriorityStringMap Logger::priorityStringMap = { | ||
{ "EMERG", SWSS_EMERG }, | ||
{ "ALERT", SWSS_ALERT }, | ||
|
@@ -49,7 +72,7 @@ const Logger::PriorityStringMap Logger::priorityStringMap = { | |
{ "DEBUG", SWSS_DEBUG } | ||
}; | ||
|
||
void Logger::swssPrioNotify(const std::string &component, const std::string &prioStr) | ||
void Logger::swssPrioNotify(const std::string& component, const std::string& prioStr) | ||
{ | ||
auto& logger = getInstance(); | ||
|
||
|
@@ -70,7 +93,7 @@ const Logger::OutputStringMap Logger::outputStringMap = { | |
{ "STDERR", SWSS_STDERR } | ||
}; | ||
|
||
void Logger::swssOutputNotify(const std::string &component, const std::string &outputStr) | ||
void Logger::swssOutputNotify(const std::string& component, const std::string& outputStr) | ||
{ | ||
auto& logger = getInstance(); | ||
|
||
|
@@ -85,22 +108,27 @@ void Logger::swssOutputNotify(const std::string &component, const std::string &o | |
} | ||
} | ||
|
||
void Logger::linkToDbWithOutput(const std::string &dbName, const PriorityChangeNotify& prioNotify, const std::string& defPrio, const OutputChangeNotify& outputNotify, const std::string& defOutput) | ||
void Logger::linkToDbWithOutput( | ||
const std::string& dbName, | ||
const PriorityChangeNotify& prioNotify, | ||
const std::string& defPrio, | ||
const OutputChangeNotify& outputNotify, | ||
const std::string& defOutput) | ||
{ | ||
auto& logger = getInstance(); | ||
|
||
// Initialize internal DB with observer | ||
logger.m_settingChangeObservers.insert(std::make_pair(dbName, std::make_pair(prioNotify, outputNotify))); | ||
|
||
DBConnector db("LOGLEVEL_DB", 0); | ||
auto keys = db.keys("*"); | ||
|
||
std::string key = dbName + ":" + dbName; | ||
std::string prio, output; | ||
bool doUpdate = false; | ||
auto prioPtr = db.hget(key, DAEMON_LOGLEVEL); | ||
auto outputPtr = db.hget(key, DAEMON_LOGOUTPUT); | ||
|
||
if ( prioPtr == nullptr ) | ||
if (prioPtr == nullptr) | ||
{ | ||
prio = defPrio; | ||
doUpdate = true; | ||
|
@@ -110,7 +138,7 @@ void Logger::linkToDbWithOutput(const std::string &dbName, const PriorityChangeN | |
prio = *prioPtr; | ||
} | ||
|
||
if ( outputPtr == nullptr ) | ||
if (outputPtr == nullptr) | ||
{ | ||
output = defOutput; | ||
doUpdate = true; | ||
|
@@ -130,26 +158,26 @@ void Logger::linkToDbWithOutput(const std::string &dbName, const PriorityChangeN | |
table.set(dbName, fieldValues); | ||
} | ||
|
||
logger.m_currentPrios[dbName] = prio; | ||
logger.m_currentOutputs[dbName] = output; | ||
logger.m_currentPrios.set(dbName, prio); | ||
logger.m_currentOutputs.set(dbName, output); | ||
|
||
prioNotify(dbName, prio); | ||
outputNotify(dbName, output); | ||
} | ||
|
||
void Logger::linkToDb(const std::string &dbName, const PriorityChangeNotify& prioNotify, const std::string& defPrio) | ||
void Logger::linkToDb(const std::string& dbName, const PriorityChangeNotify& prioNotify, const std::string& defPrio) | ||
{ | ||
linkToDbWithOutput(dbName, prioNotify, defPrio, swssOutputNotify, "SYSLOG"); | ||
} | ||
|
||
void Logger::linkToDbNative(const std::string &dbName, const char * defPrio) | ||
void Logger::linkToDbNative(const std::string& dbName, const char * defPrio) | ||
{ | ||
auto& logger = getInstance(); | ||
|
||
linkToDb(dbName, swssPrioNotify, defPrio); | ||
logger.m_settingThread.reset(new std::thread(&Logger::settingThread, &logger)); | ||
|
||
getInstance().restartSettingThread(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is restarting the thread actually required if the thread is already started and the data structures are thread-safe? Put another way: is there any reason to maintain a separate linkToDbNative and linkToDb vs consolidating them into one function which starts the thread if needed? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. thats a good question, im just fixing this instance here, since previously if you call linkToDbNative twice, it will crash, and whether we should have only 1 thread, not sure about that, this PR is just to make those maps thread safe + plus crahs bug fix not changing behavior, @qiluo-msft can you have opinion? |
||
} | ||
|
||
Logger &Logger::getInstance() | ||
Logger& Logger::getInstance() | ||
{ | ||
static Logger m_logger; | ||
return m_logger; | ||
|
@@ -171,13 +199,13 @@ void Logger::settingThread() | |
DBConnector db("LOGLEVEL_DB", 0); | ||
std::map<std::string, std::shared_ptr<ConsumerStateTable>> selectables; | ||
|
||
while (!terminateSettingThread) | ||
while (m_runSettingThread) | ||
{ | ||
if (selectables.size() < m_settingChangeObservers.size()) | ||
{ | ||
for (const auto& i : m_settingChangeObservers) | ||
for (const auto& i : m_settingChangeObservers.getCopy()) | ||
{ | ||
const std::string &dbName = i.first; | ||
const std::string& dbName = i.first; | ||
if (selectables.find(dbName) == selectables.end()) | ||
{ | ||
auto table = std::make_shared<ConsumerStateTable>(&db, dbName); | ||
|
@@ -208,27 +236,28 @@ void Logger::settingThread() | |
dynamic_cast<ConsumerStateTable *>(selectable)->pop(koValues); | ||
std::string key = kfvKey(koValues), op = kfvOp(koValues); | ||
|
||
if ((op != SET_COMMAND) || (m_settingChangeObservers.find(key) == m_settingChangeObservers.end())) | ||
if (op != SET_COMMAND || !m_settingChangeObservers.contains(key)) | ||
{ | ||
continue; | ||
} | ||
|
||
auto values = kfvFieldsValues(koValues); | ||
for (const auto& i : values) | ||
const auto& values = kfvFieldsValues(koValues); | ||
|
||
for (auto& i : values) | ||
{ | ||
const std::string &field = fvField(i), &value = fvValue(i); | ||
if ((field == DAEMON_LOGLEVEL) && (value != m_currentPrios[key])) | ||
auto& field = fvField(i); | ||
auto& value = fvValue(i); | ||
|
||
if ((field == DAEMON_LOGLEVEL) && (value != m_currentPrios.get(key))) | ||
{ | ||
m_currentPrios[key] = value; | ||
m_settingChangeObservers[key].first(key, value); | ||
m_currentPrios.set(key, value); | ||
m_settingChangeObservers.get(key).first(key, value); | ||
} | ||
else if ((field == DAEMON_LOGOUTPUT) && (value != m_currentOutputs[key])) | ||
else if ((field == DAEMON_LOGOUTPUT) && (value != m_currentOutputs.get(key))) | ||
{ | ||
m_currentOutputs[key] = value; | ||
m_settingChangeObservers[key].second(key, value); | ||
m_currentOutputs.set(key, value); | ||
m_settingChangeObservers.get(key).second(key, value); | ||
} | ||
|
||
break; | ||
} | ||
} | ||
} | ||
|
@@ -246,14 +275,16 @@ void Logger::write(Priority prio, const char *fmt, ...) | |
|
||
if (m_output == SWSS_SYSLOG) | ||
{ | ||
vsyslog(prio, fmt, ap); | ||
vsyslog(prio, fmt, ap); | ||
} | ||
else | ||
{ | ||
std::stringstream ss; | ||
ss << std::setw(6) << std::right << priorityToString(prio); | ||
ss << fmt << std::endl; | ||
std::lock_guard<std::mutex> lock(m_mutex); | ||
|
||
MUTEX; | ||
|
||
if (m_output == SWSS_STDOUT) | ||
{ | ||
vprintf(ss.str().c_str(), ap); | ||
|
@@ -283,7 +314,9 @@ void Logger::wthrow(Priority prio, const char *fmt, ...) | |
std::stringstream ss; | ||
ss << std::setw(6) << std::right << priorityToString(prio); | ||
ss << fmt << std::endl; | ||
std::lock_guard<std::mutex> lock(m_mutex); | ||
|
||
MUTEX; | ||
|
||
if (m_output == SWSS_STDOUT) | ||
{ | ||
vprintf(ss.str().c_str(), ap); | ||
|
@@ -363,5 +396,3 @@ Logger::ScopeTimer::~ScopeTimer() | |
|
||
Logger::getInstance().write(swss::Logger::SWSS_NOTICE, ":- %s: %s took %lf sec", m_fun, m_msg.c_str(), duration); | ||
} | ||
|
||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: typo (ConcurrentMap)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed