Skip to content

Commit

Permalink
teste..
Browse files Browse the repository at this point in the history
  • Loading branch information
beats-dh committed Jan 18, 2025
1 parent ea3d93e commit d41e712
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 117 deletions.
21 changes: 10 additions & 11 deletions src/canary_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -293,10 +293,14 @@ void CanaryServer::loadConfigLua() {

#ifdef _WIN32
const std::string &defaultPriority = g_configManager().getString(DEFAULT_PRIORITY);
if (strcasecmp(defaultPriority.c_str(), "high") == 0) {
if (strcasecmp(defaultPriority.c_str(), "real-time") == 0) {
SetPriorityClass(GetCurrentProcess(), REALTIME_PRIORITY_CLASS);
} else if (strcasecmp(defaultPriority.c_str(), "high") == 0) {
SetPriorityClass(GetCurrentProcess(), HIGH_PRIORITY_CLASS);
} else if (strcasecmp(defaultPriority.c_str(), "above-normal") == 0) {
SetPriorityClass(GetCurrentProcess(), ABOVE_NORMAL_PRIORITY_CLASS);
} else {
SetPriorityClass(GetCurrentProcess(), NORMAL_PRIORITY_CLASS);
}
#endif
}
Expand All @@ -310,10 +314,7 @@ void CanaryServer::initializeDatabase() {

logger.debug("Running database manager...");
if (!DatabaseManager::isDatabaseSetup()) {
throw FailedToInitializeCanary(fmt::format(
"The database you have specified in {} is empty, please import the schema.sql to your database.",
g_configManager().getConfigFileLua()
));
throw FailedToInitializeCanary(fmt::format("The database you have specified in {} is empty, please import the schema.sql to your database.", g_configManager().getConfigFileLua()));
}

DatabaseManager::updateDatabase();
Expand All @@ -330,12 +331,10 @@ void CanaryServer::loadModules() {
const auto useAnyDatapack = g_configManager().getBoolean(USE_ANY_DATAPACK_FOLDER);
auto datapackName = g_configManager().getString(DATA_DIRECTORY);
if (!useAnyDatapack && datapackName != "data-canary" && datapackName != "data-otservbr-global") {
throw FailedToInitializeCanary(fmt::format(
"The datapack folder name '{}' is wrong, please select valid "
"datapack name 'data-canary' or 'data-otservbr-global "
"or enable in config.lua to use any datapack folder",
datapackName
));
throw FailedToInitializeCanary(fmt::format("The datapack folder name '{}' is wrong, please select valid "
"datapack name 'data-canary' or 'data-otservbr-global "
"or enable in config.lua to use any datapack folder",
datapackName));
}

logger.debug("Initializing lua environment...");
Expand Down
152 changes: 47 additions & 105 deletions src/server/network/connection/connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ void Connection::accept(Protocol_ptr protocolPtr) {
}

void Connection::acceptInternal(bool toggleParseHeader) {
readTimer.expires_from_now(std::chrono::seconds(CONNECTION_READ_TIMEOUT));
readTimer.expires_after(std::chrono::seconds(CONNECTION_READ_TIMEOUT));

auto weakSelf = std::weak_ptr(shared_from_this());

Expand All @@ -158,8 +158,8 @@ void Connection::acceptInternal(bool toggleParseHeader) {
}
});

try {
asio::async_read(socket, asio::buffer(m_msg.getBuffer(), HEADER_LENGTH), [weakSelf, toggleParseHeader](const std::error_code &error, std::size_t) {
executeWithCatch([&]() {
async_read(socket, asio::buffer(m_msg.getBuffer(), HEADER_LENGTH), [weakSelf, toggleParseHeader](const std::error_code &error, std::size_t) {
if (const auto self = weakSelf.lock()) {
if (toggleParseHeader) {
self->parseHeader(error);
Expand All @@ -170,16 +170,8 @@ void Connection::acceptInternal(bool toggleParseHeader) {
g_logger().warn("[Connection::acceptInternal] - Connection no longer exists during async_read");
}
});
} catch (const std::system_error &e) {
g_logger().error("[Connection::acceptInternal] - System error in async_read: {}", e.what());
close(true);
} catch (const std::exception &e) {
g_logger().error("[Connection::acceptInternal] - Unexpected error in async_read: {}", e.what());
close(true);
} catch (...) {
g_logger().error("[Connection::acceptInternal] - Unknown error in async_read");
close(true);
}
},
"Connection::acceptInternal");
}

void Connection::parseProxyIdentification(const std::error_code &error) {
Expand All @@ -198,7 +190,7 @@ void Connection::parseProxyIdentification(const std::error_code &error) {
}

uint8_t* msgBuffer = m_msg.getBuffer();
auto charData = static_cast<char*>(static_cast<void*>(msgBuffer));
const auto charData = static_cast<char*>(static_cast<void*>(msgBuffer));
const std::string serverName = g_configManager().getString(SERVER_NAME) + "\n";

if (connectionState.load() == CONNECTION_STATE_IDENTIFYING) {
Expand All @@ -210,8 +202,8 @@ void Connection::parseProxyIdentification(const std::error_code &error) {
const size_t remainder = serverName.length() - 2;
if (remainder > 0) {
connectionState.store(CONNECTION_STATE_READINGS);
try {
readTimer.expires_from_now(std::chrono::seconds(CONNECTION_READ_TIMEOUT));
executeWithCatch([&]() {
readTimer.expires_after(std::chrono::seconds(CONNECTION_READ_TIMEOUT));
auto weakSelf = std::weak_ptr<Connection>(shared_from_this());

readTimer.async_wait([weakSelf](const std::error_code &error) {
Expand All @@ -220,23 +212,15 @@ void Connection::parseProxyIdentification(const std::error_code &error) {
}
});

asio::async_read(socket, asio::buffer(m_msg.getBuffer(), remainder), [weakSelf](const std::error_code &error, std::size_t) {
async_read(socket, asio::buffer(m_msg.getBuffer(), remainder), [weakSelf](const std::error_code &error, std::size_t) {
if (const auto self = weakSelf.lock()) {
self->parseProxyIdentification(error);
} else {
g_logger().warn("[Connection::parseProxyIdentification] - Connection no longer exists during async_read");
}
});
} catch (const std::system_error &e) {
g_logger().error("[Connection::parseProxyIdentification] - System error in async_read: {}", e.what());
close(true);
} catch (const std::exception &e) {
g_logger().error("[Connection::parseProxyIdentification] - Unexpected error in async_read: {}", e.what());
close(true);
} catch (...) {
g_logger().error("[Connection::parseProxyIdentification] - Unknown error in async_read");
close(true);
}
},
"Connection::parseProxyIdentification");
return;
} else {
connectionState.store(CONNECTION_STATE_OPEN);
Expand Down Expand Up @@ -273,7 +257,7 @@ void Connection::parseHeader(const std::error_code &error) {
return;
}

uint32_t timePassed = std::max<uint32_t>(1, (time(nullptr) - timeConnected) + 1);
const uint32_t timePassed = std::max<uint32_t>(1, (time(nullptr) - timeConnected) + 1);
if ((++packetsSent / timePassed) > static_cast<uint32_t>(g_configManager().getNumber(MAX_PACKETS_PER_SECOND))) {
g_logger().warn("[Connection::parseHeader] - {} disconnected for exceeding packet per second limit.", convertIPToString(getIP()));
close();
Expand All @@ -285,41 +269,33 @@ void Connection::parseHeader(const std::error_code &error) {
packetsSent = 0;
}

uint16_t size = m_msg.getLengthHeader();
const uint16_t size = m_msg.getLengthHeader();
if (size == 0 || size > INPUTMESSAGE_MAXSIZE) {
close(true);
return;
}

try {
readTimer.expires_from_now(std::chrono::seconds(CONNECTION_READ_TIMEOUT));
executeWithCatch([&]() {
readTimer.expires_after(std::chrono::seconds(CONNECTION_READ_TIMEOUT));
auto weakSelf = std::weak_ptr<Connection>(shared_from_this());

readTimer.async_wait([weakSelf](const std::error_code &error) {
if (auto self = weakSelf.lock()) {
if (const auto self = weakSelf.lock()) {
handleTimeout(self, error);
}
});

m_msg.setLength(size + HEADER_LENGTH);

asio::async_read(socket, asio::buffer(m_msg.getBodyBuffer(), size), [weakSelf](const std::error_code &error, std::size_t) {
if (auto self = weakSelf.lock()) {
async_read(socket, asio::buffer(m_msg.getBodyBuffer(), size), [weakSelf](const std::error_code &error, std::size_t) {
if (const auto self = weakSelf.lock()) {
self->parsePacket(error);
} else {
g_logger().warn("[Connection::parseHeader] - Connection no longer exists during async_read");
}
});
} catch (const std::system_error &e) {
g_logger().error("[Connection::parseHeader] - System error in async_read: {}", e.what());
close(true);
} catch (const std::exception &e) {
g_logger().error("[Connection::parseHeader] - Unexpected error in async_read: {}", e.what());
close(true);
} catch (...) {
g_logger().error("[Connection::parseHeader] - Unknown error in async_read");
close(true);
}
},
"Connection::parseHeader");
}

void Connection::parsePacket(const std::error_code &error) {
Expand All @@ -343,13 +319,13 @@ void Connection::parsePacket(const std::error_code &error) {

if (!protocol) {
uint32_t checksum;
if (int32_t len = m_msg.getLength() - m_msg.getBufferPosition() - CHECKSUM_LENGTH; len > 0) {
if (const int32_t len = m_msg.getLength() - m_msg.getBufferPosition() - CHECKSUM_LENGTH; len > 0) {
checksum = adlerChecksum(m_msg.getBuffer() + m_msg.getBufferPosition() + CHECKSUM_LENGTH, len);
} else {
checksum = 0;
}

uint32_t recvChecksum = m_msg.get<uint32_t>();
const uint32_t recvChecksum = m_msg.get<uint32_t>();
if (recvChecksum != checksum) {
m_msg.skipBytes(-CHECKSUM_LENGTH);
}
Expand All @@ -369,8 +345,8 @@ void Connection::parsePacket(const std::error_code &error) {
skipReadingNextPacket = protocol->onRecvMessage(m_msg);
}

try {
readTimer.expires_from_now(std::chrono::seconds(CONNECTION_READ_TIMEOUT));
executeWithCatch([&]() {
readTimer.expires_after(std::chrono::seconds(CONNECTION_READ_TIMEOUT));
auto weakSelf = std::weak_ptr<Connection>(shared_from_this());

readTimer.async_wait([weakSelf](const std::error_code &error) {
Expand All @@ -380,55 +356,37 @@ void Connection::parsePacket(const std::error_code &error) {
});

if (!skipReadingNextPacket) {
asio::async_read(socket, asio::buffer(m_msg.getBuffer(), HEADER_LENGTH), [weakSelf](const std::error_code &error, std::size_t) {
async_read(socket, asio::buffer(m_msg.getBuffer(), HEADER_LENGTH), [weakSelf](const std::error_code &error, std::size_t) {
if (const auto self = weakSelf.lock()) {
self->parseHeader(error);
} else {
g_logger().warn("[Connection::parsePacket] - Connection no longer exists during async_read");
}
});
}
} catch (const std::system_error &e) {
g_logger().error("[Connection::parsePacket] - System error in async_read: {}", e.what());
close(true);
} catch (const std::exception &e) {
g_logger().error("[Connection::parsePacket] - Unexpected error in async_read: {}", e.what());
close(true);
} catch (...) {
g_logger().error("[Connection::parsePacket] - Unknown error in async_read");
close(true);
}
},
"Connection::parsePacket");
}

void Connection::resumeWork() {
executeWithCatch([&]() {
readTimer.expires_after(std::chrono::seconds(CONNECTION_READ_TIMEOUT));
auto weakSelf = std::weak_ptr<Connection>(shared_from_this());

readTimer.expires_from_now(std::chrono::seconds(CONNECTION_READ_TIMEOUT));
auto weakSelf = std::weak_ptr<Connection>(shared_from_this());

readTimer.async_wait([weakSelf](const std::error_code &error) {
if (const auto self = weakSelf.lock()) {
handleTimeout(self, error);
}
});

try {
asio::async_read(socket, asio::buffer(m_msg.getBuffer(), HEADER_LENGTH), [weakSelf](const std::error_code &error, std::size_t bytesTransferred) {
readTimer.async_wait([weakSelf](const std::error_code &error) {
if (const auto self = weakSelf.lock()) {
handleTimeout(self, error);
}
});
async_read(socket, asio::buffer(m_msg.getBuffer(), HEADER_LENGTH), [weakSelf](const std::error_code &error, std::size_t bytesTransferred) {
if (const auto self = weakSelf.lock()) {
self->parseHeader(error);
} else {
g_logger().warn("[Connection::resumeWork] - Connection no longer exists during async_read");
}
});
} catch (const std::system_error &e) {
g_logger().error("[Connection::resumeWork] - System error in async_read: {}", e.what());
close(true);
} catch (const std::exception &e) {
g_logger().error("[Connection::resumeWork] - Unexpected error in async_read: {}", e.what());
close(true);
} catch (...) {
g_logger().error("[Connection::resumeWork] - Unknown error in async_read");
close(true);
}
},
"Connection::resumeWork");
}

void Connection::send(const OutputMessage_ptr &outputMessage) {
Expand All @@ -440,24 +398,16 @@ void Connection::send(const OutputMessage_ptr &outputMessage) {
if (socket.is_open()) {
auto weakSelf = std::weak_ptr(shared_from_this());

try {
asio::post(socket.get_executor(), [weakSelf] {
executeWithCatch([&]() {
post(socket.get_executor(), [weakSelf] {
if (const auto self = weakSelf.lock()) {
self->internalWorker();
} else {
g_logger().warn("[Connection::send] - Connection no longer exists during posting write operation");
}
});
} catch (const std::system_error &e) {
g_logger().error("[Connection::send] - System error in posting write operation: {}", e.what());
close(true);
} catch (const std::exception &e) {
g_logger().error("[Connection::send] - Unexpected error in posting write operation: {}", e.what());
close(true);
} catch (...) {
g_logger().error("[Connection::send] - Unknown error in posting write operation");
close(true);
}
},
"Connection::send");
} else {
g_logger().error("[Connection::send] - Socket is not open for writing.");
close(true);
Expand Down Expand Up @@ -510,7 +460,7 @@ uint32_t Connection::getIP() {
}

void Connection::internalSend(const OutputMessage_ptr &outputMessage) {
writeTimer.expires_from_now(std::chrono::seconds(CONNECTION_WRITE_TIMEOUT));
writeTimer.expires_after(std::chrono::seconds(CONNECTION_WRITE_TIMEOUT));
auto weakSelf = std::weak_ptr(shared_from_this());

writeTimer.async_wait([weakSelf](const std::error_code &error) {
Expand All @@ -519,24 +469,16 @@ void Connection::internalSend(const OutputMessage_ptr &outputMessage) {
}
});

try {
asio::async_write(socket, asio::buffer(outputMessage->getOutputBuffer(), outputMessage->getLength()), [weakSelf](const std::error_code &error, std::size_t bytesTransferred) {
executeWithCatch([&]() {
async_write(socket, asio::buffer(outputMessage->getOutputBuffer(), outputMessage->getLength()), [weakSelf](const std::error_code &error, std::size_t bytesTransferred) {
if (const auto self = weakSelf.lock()) {
self->onWriteOperation(error);
} else {
g_logger().warn("[Connection::internalSend] - Connection no longer exists during async_write");
}
});
} catch (const std::system_error &e) {
g_logger().error("[Connection::internalSend] - System error in async_write: {}", e.what());
close(true);
} catch (const std::exception &e) {
g_logger().error("[Connection::internalSend] - Unexpected error in async_write: {}", e.what());
close(true);
} catch (...) {
g_logger().error("[Connection::internalSend] - Unknown error in async_write");
close(true);
}
},
"Connection::internalSend");
}

void Connection::onWriteOperation(const std::error_code &error) {
Expand Down
16 changes: 16 additions & 0 deletions src/server/network/connection/connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,4 +107,20 @@ class Connection : public std::enable_shared_from_this<Connection> {

friend class ServicePort;
friend class ConnectionManager;

template <typename Func>
void executeWithCatch(Func &&func, const std::string &context) {
try {
func();
} catch (const std::system_error &e) {
g_logger().error("[{}] - System error: {}", context, e.what());
close(true);
} catch (const std::exception &e) {
g_logger().error("[{}] - Unexpected error: {}", context, e.what());
close(true);
} catch (...) {
g_logger().error("[{}] - Unknown error occurred", context);
close(true);
}
}
};
2 changes: 1 addition & 1 deletion src/server/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ void ServiceManager::stop() {

acceptors.clear();

death_timer.expires_from_now(std::chrono::seconds(3));
death_timer.expires_after(std::chrono::seconds(3));
death_timer.async_wait([this](const std::error_code &err) {
if (!err) {
die();
Expand Down

0 comments on commit d41e712

Please sign in to comment.