Skip to content

Commit

Permalink
feat:add test file
Browse files Browse the repository at this point in the history
  • Loading branch information
chenxuan520 committed Oct 11, 2024
1 parent 048ea96 commit b7b9ba3
Show file tree
Hide file tree
Showing 12 changed files with 259 additions and 136 deletions.
2 changes: 1 addition & 1 deletion src/cppnet/server/io_multiplexing/epoll.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

namespace cppnet {

Epoll::TriggerType Epoll::trigger_type_ = Epoll::TriggerType::kEdgeTrigger;
Epoll::TriggerType Epoll::trigger_type_ = Epoll::TriggerType::kLevelTrigger;

int Epoll::Init() {
epoll_fd_ = CreateEpoll();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ std::shared_ptr<IOMultiplexingBase> IOMultiplexingFactory::CreateDefault() {
return std::make_shared<KQueue>();
#elif _WIN32
// windows use select
return nullptr;
return std::make_shared<Select>();
#endif
}

Expand Down
13 changes: 13 additions & 0 deletions src/cppnet/server/io_multiplexing/select.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@

#ifdef _WIN32
#include "select.hpp"

namespace cppnet {

Select::Select() : IOMultiplexingBase() {}
Select::~Select() {}
// TODO: finish it in windows //

} // namespace cppnet

#endif
11 changes: 1 addition & 10 deletions src/cppnet/server/tcp_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,17 +68,8 @@ void TcpServer::HandleAccept() {
return;
}

// ET mode need set to non block
auto rc = new_socket.SetNoBlock();
if (rc < 0) {
err_msg_ = "[syserr]:" + std::string(strerror(errno));
event_callback_(kEventError, *this, new_socket);
new_socket.Close();
return;
}

// add to epoll
rc = io_multiplexing_->MonitorSoc(new_socket);
auto rc = io_multiplexing_->MonitorSoc(new_socket);
if (rc < 0) {
err_msg_ = "[syserr]:" + std::string(strerror(errno));
event_callback_(kEventError, *this, new_socket);
Expand Down
3 changes: 3 additions & 0 deletions src/cppnet/socket/address.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ class Address {
Address() = default;
Address(const std::string &ip, uint16_t port);
Address(sockaddr_in addr) : addr_(addr){};
/**
* @brief: Get ip and port from addr_.
*/
void GetIPAndPort(std::string &ip, uint16_t &port);

public:
Expand Down
8 changes: 8 additions & 0 deletions src/cppnet/socket/socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,4 +160,12 @@ int Socket::SetReuseAddr() const {
sizeof(reuse_addr_on));
}

int Socket::GetAddr(Address &addr) const {
if (status_ != kInit) {
return -1;
}
socklen_t addr_len = sizeof(sockaddr);
return getpeername(fd_, addr.GetSockAddr(), addr.GetAddrLen());
}

} // namespace cppnet
6 changes: 5 additions & 1 deletion src/cppnet/socket/socket.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,13 @@ class Socket {
*/
int SetBlock() const;
/**
* set socket reuse addr
* @brief: set socket reuse addr
*/
int SetReuseAddr() const;
/**
* @brief: get addr from socket
*/
int GetAddr(Address &addr) const;

public:
inline int fd() const { return fd_; }
Expand Down
117 changes: 77 additions & 40 deletions src/cppnet/utils/threadpoll.hpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#pragma once
#include "utils/const.hpp"
#include <atomic>
#include <condition_variable>
#include <functional>
Expand All @@ -9,81 +10,117 @@

namespace cppnet {

// Because of template,so cannot use cpp file
template <class T = void *> class ThreadPool {
public: // a struct for you to add task
public:
/**
* @brief: Task of ThreadPool
*/
struct Task {
std::function<void(T)> task_;
T arg_;
std::function<void(T)> task_func_;
T task_arg_;
};

private:
std::mutex que_lock_;
std::mutex user_lock_;
std::condition_variable cond_;
std::queue<Task> task_que_;
unsigned thread_num_;
std::atomic<unsigned> busy_now_;
std::atomic<bool> is_continue_;
std::vector<std::thread *> arr;
const char *error;

public:
ThreadPool(unsigned threadNum = 10) {
error = NULL;
this->thread_num_ = threadNum;
for (unsigned i = 0; i < threadNum; i++)
arr.push_back(new std::thread(worker, this));
is_continue_ = true;
busy_now_ = 0;
}
/**
* @brief: Constructor
* @param thread_num: The number of threads
*/
ThreadPool(unsigned thread_num) { Init(thread_num); }
ThreadPool() = default;
~ThreadPool() {
stopPool();
for (unsigned i = 0; i < arr.size(); i++) {
arr[i]->join();
delete arr[i];
Stop();
for (unsigned i = 0; i < threads_.size(); i++) {
threads_[i].join();
}
}
inline void mutexLock() { user_lock_.lock(); }
inline void mutexUnlock() { user_lock_.unlock(); }
void addTask(Task task) {
std::unique_lock<std::mutex> guard(this->que_lock_);
this->task_que_.push(task);
this->cond_.notify_one();
/**
* @brief: Init
* @param thread_num: The number of threads
*/
void Init(unsigned thread_num = 10) {
this->thread_num_ = thread_num;
for (unsigned i = 0; i < thread_num; i++) {
threads_.push_back(std::move(std::thread(Worker, this)));
}
}
/**
* @brief: Add task
* @param task: Task
*/
int AddTask(Task task) {
if (!is_continue_ || thread_num_ == 0) {
err_msg_ = "[logicerr]:threadpoll is not init";
return kLogicErr;
}
std::unique_lock<std::mutex> guard(que_lock_);
task_que_.push(task);
cond_.notify_one();
return kSuccess;
}
void stopPool() {
/**
* @brief: Stop
*/
void Stop() {
is_continue_ = false;
this->cond_.notify_all();
}
void getBusyAndTask(unsigned &busy, unsigned &task) {
/**
* @brief: Get busy and task number
* @param busy: busy number
* @param task: task number
*/
void GetBusyAndTaskNum(unsigned &busy, unsigned &task) {
std::unique_lock<std::mutex> guard(this->que_lock_);
task = this->task_que_.size();
busy = this->busy_now_;
}

public:
inline void MutexLock() { user_lock_.lock(); }
inline void MutexUnlock() { user_lock_.unlock(); }
inline void MutexTryLock() { user_lock_.try_lock(); }

private:
static void worker(void *arg) {
static void Worker(void *arg) {
ThreadPool &pool = *(ThreadPool *)arg;
Task task = {0, 0};
while (1) {
{
std::unique_lock<std::mutex> guard(pool.que_lock_);
if (pool.is_continue_ && pool.task_que_.size() == 0)
if (pool.is_continue_ && pool.task_que_.size() == 0) {
pool.cond_.wait(guard, [&]() -> bool {
return pool.task_que_.size() > 0 || pool.is_continue_ == false;
});
if (pool.is_continue_ == false)
}
if (pool.is_continue_ == false) {
return;
if (pool.task_que_.size() == 0)
}
if (pool.task_que_.size() == 0) {
continue;
}
task = pool.task_que_.front();
pool.task_que_.pop();
}

pool.busy_now_++;
if (task.task_ != NULL)
task.task_(task.arg_);
if (task.task_func_ != NULL) {
task.task_func_(task.task_arg_);
}
pool.busy_now_--;
}
}

private:
std::mutex que_lock_;
std::mutex user_lock_;
std::condition_variable cond_;
std::queue<Task> task_que_;
unsigned thread_num_{0};
std::atomic<unsigned> busy_now_{0};
std::atomic<bool> is_continue_{true};
std::vector<std::thread> threads_;
std::string err_msg_;
};

} // namespace cppnet
2 changes: 2 additions & 0 deletions src/test/main.cpp
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
#include "server/tcp_server_test.hpp"
#include "socket/address_test.hpp"
#include "socket/socket_test.hpp"
#include "test.h"
#include "timer/timer_test.hpp"
#include "utils/host_test.hpp"
#include "utils/threadpoll_test.hpp"

INIT(Main) {
GO([&]() {
Expand Down
Loading

0 comments on commit b7b9ba3

Please sign in to comment.