Skip to content

Commit

Permalink
feat:add kqueue for mac
Browse files Browse the repository at this point in the history
  • Loading branch information
chenxuan520 committed Oct 10, 2024
1 parent 666e46d commit af81ef9
Show file tree
Hide file tree
Showing 9 changed files with 203 additions and 8 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
## TODO
- [ ] 支持设置LT和ET模式下的事件触发方式
- [ ] 支持设置连接的超时时间
- [ ] 支持UDP协议
- [ ] 支持UDP协议(添加测试)
- [ ] 支持Http协议(彻底迁移 cppweb -> cppnet)
- [ ] 抽象出epoll层
- [ ] 支持SSL
2 changes: 1 addition & 1 deletion src/cppnet/server/io_multiplexing/io_multiplexing_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class IOMultiplexingBase {
/**
* @brief: Stop loop.
*/
void Stop() { loop_flag_ = false; }
virtual void Stop() { loop_flag_ = false; }

public:
/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
#include "io_multiplexing_factory.hpp"

#ifdef __linux__
#include "epoll.hpp"
#elif __APPLE__
#include "kqueue.hpp"
#elif _WIN32
#include "select.hpp"
#endif

namespace cppnet {

Expand All @@ -9,7 +16,7 @@ std::shared_ptr<IOMultiplexingBase> IOMultiplexingFactory::CreateDefault() {
return std::make_shared<Epoll>();
#elif __APPLE__
// macos use kqueue
return nullptr;
return std::make_shared<KQueue>();
#elif _WIN32
// windows use select
return nullptr;
Expand Down
63 changes: 63 additions & 0 deletions src/cppnet/server/io_multiplexing/kqueue.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
#include "kqueue.hpp"
#include "utils/const.hpp"
#include <sys/event.h>

namespace cppnet {

KQueue::KQueue() : IOMultiplexingBase() {}

KQueue::~KQueue() { Close(); }

int KQueue::Init() {
kq_fd_ = kqueue();
if (kq_fd_.status() == Socket::kInit) {
err_msg_ = "[syserr]:" + std::string(strerror(errno));
return kSysErr;
}
return kSuccess;
}

int KQueue::MonitorSoc(const Socket &fd) {
struct kevent ke;
EV_SET(&ke, fd.fd(), EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, nullptr);
if (kevent(kq_fd_.fd(), &ke, 1, nullptr, 0, nullptr) == -1) {
err_msg_ = "[syserr]:" + std::string(strerror(errno));
return kSysErr;
}
return kSuccess;
}

int KQueue::RemoveSoc(const Socket &fd) {
struct kevent ke;
EV_SET(&ke, fd.fd(), EVFILT_READ, EV_DELETE, 0, 0, nullptr);
if (kevent(kq_fd_.fd(), &ke, 1, nullptr, 0, nullptr) == -1) {
err_msg_ = "[syserr]:" + std::string(strerror(errno));
return kSysErr;
}
return kSuccess;
}

int KQueue::Loop(NotifyCallBack callback) {
while (loop_flag_) {
struct kevent evs[1024];
int nfds = kevent(kq_fd_.fd(), nullptr, 0, evs, 1024, nullptr);
if (nfds < 0) {
if (errno == EINTR) {
}
err_msg_ = "[syserr]:" + std::string(strerror(errno));
return kSysErr;
}
for (int i = 0; i < nfds; ++i) {
if (evs[i].filter == EVFILT_READ) {
if (callback != nullptr) {
callback(*this, evs[i].ident);
}
}
}
}
return 0;
}

void KQueue::Close() { kq_fd_.Close(); }

} // namespace cppnet
22 changes: 22 additions & 0 deletions src/cppnet/server/io_multiplexing/kqueue.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#pragma once
#include "io_multiplexing_base.hpp"

namespace cppnet {

class KQueue : public IOMultiplexingBase {
public:
KQueue();
virtual ~KQueue();
int Init() override;
int MonitorSoc(const Socket &fd) override;
int RemoveSoc(const Socket &fd) override;
int Loop(NotifyCallBack callback) override;
void Close() override;

private:
Socket kq_fd_;

};

} // namespace cppnet

5 changes: 0 additions & 5 deletions src/cppnet/server/tcp_server.hpp
Original file line number Diff line number Diff line change
@@ -1,13 +1,8 @@
#pragma once

#include "../socket/socket.hpp"
#include "./io_multiplexing/io_multiplexing_factory.hpp"
#include <errno.h>
#include <functional>
#include <memory>
#include <string>
#include <thread>
#include <unordered_map>
namespace cppnet {

class TcpServer {
Expand Down
55 changes: 55 additions & 0 deletions src/cppnet/socket/socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,15 @@ int Socket::Init() {
return 0;
}

int Socket::InitUdp() {
fd_ = ::socket(AF_INET, SOCK_DGRAM, 0);
if (fd_ < 0) {
return -1;
}
status_ = kInit;
return 0;
}

Socket Socket::Accept(Address &addr, socklen_t *plen) const {
if (status_ != kInit) {
return Socket(-1);
Expand Down Expand Up @@ -66,6 +75,14 @@ int Socket::SetNoBlock() const {
return fcntl(fd_, F_SETFL, flags | O_NONBLOCK);
}

int Socket::SetBlock() const {
int flags = fcntl(fd_, F_GETFL, 0);
if (flags == -1) {
return -1;
}
return fcntl(fd_, F_SETFL, flags & ~O_NONBLOCK);
}

int Socket::Read(std::string &buf, size_t len) const {
if (status_ != kInit) {
return -1;
Expand All @@ -85,6 +102,29 @@ int Socket::Read(void *buf, size_t len) const {
return ::read(fd_, buf, len);
}

int Socket::ReadUdp(std::string &buf, size_t len, Address &addr) const {
if (status_ != kInit) {
return -1;
}
int addr_len = sizeof(sockaddr);
char *data = new char[len + 1];
memset(data, 0, len + 1);
auto rc =
::recvfrom(fd_, data, len, 0, addr.GetSockAddr(), (socklen_t *)&addr_len);
buf = data;
delete[] data;
return rc;
}

int Socket::ReadUdp(void *buf, size_t len, Address &addr) const {
if (status_ != kInit) {
return -1;
}
int addr_len = sizeof(sockaddr);
return ::recvfrom(fd_, buf, len, 0, addr.GetSockAddr(),
(socklen_t *)&addr_len);
}

int Socket::Write(const std::string &buf) const {
if (status_ != kInit) {
return -1;
Expand All @@ -99,6 +139,21 @@ int Socket::Write(const void *buf, size_t len) const {
return ::write(fd_, buf, len);
}

int Socket::WriteUdp(const std::string &buf, Address &addr) const {
if (status_ != kInit) {
return -1;
}
return ::sendto(fd_, buf.c_str(), buf.size(), 0, addr.GetSockAddr(),
sizeof(sockaddr));
}

int Socket::WriteUdp(const void *buf, size_t len, Address &addr) const {
if (status_ != kInit) {
return -1;
}
return ::sendto(fd_, buf, len, 0, addr.GetSockAddr(), sizeof(sockaddr));
}

int Socket::SetReuseAddr() const {
int reuse_addr_on = 1;
return ::setsockopt(fd_, SOL_SOCKET, SO_REUSEADDR, &reuse_addr_on,
Expand Down
41 changes: 41 additions & 0 deletions src/cppnet/socket/socket.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ class Socket {
* @return: 0 if success, -1 if failed.
*/
int Init();
/**
* @brief: Init socket with udp
* @return: 0 if success, -1 if failed.
*/
int InitUdp();
/**
* @brief: try to connect to server.
* @param addr: server address.
Expand Down Expand Up @@ -61,6 +66,22 @@ class Socket {
* @return: read length.
*/
int Read(void *buf, size_t len) const;
/**
* @brief: Read data from socket.
* @param buf: buffer to store data.
* @param len: buffer length.
* @param address: udp read from
* @return: read length.
*/
int ReadUdp(std::string &buf, size_t len,Address& addr) const;
/**
* @brief: Read data from udp socket.
* @param buf: buffer to store data.
* @param len: buffer length.
* @param address: udp read from
* @return: read length.
*/
int ReadUdp(void *buf, size_t len,Address& addr) const;
/**
* @brief: Write data to socket.
* @param buf: buffer to store data.
Expand All @@ -75,10 +96,30 @@ class Socket {
* @return: write length.
*/
int Write(const void *buf, size_t len) const;
/**
* @brief: Write data to udp socket.
* @param buf: buffer to store data.
* @param len: buffer length.
* @param addr: udp write to
* @return: write length.
*/
int WriteUdp(const std::string &buf, Address &addr) const;
/**
* @brief: Write data to udp socket.
* @param buf: buffer to store data.
* @param len: buffer length.
* @param addr: udp write to
* @return: write length.
*/
int WriteUdp(const void *buf, size_t len, Address &addr) const;
/**
* @brief: Set socket nonblock.
*/
int SetNoBlock() const;
/**
* @brief: Set socket block
*/
int SetBlock() const;
/**
* set socket reuse addr
*/
Expand Down
11 changes: 11 additions & 0 deletions src/cppnet/timer/timer.cpp
Original file line number Diff line number Diff line change
@@ -1,29 +1,40 @@

#include "timer.hpp"

#ifdef __linux__
#include <sys/timerfd.h>
#endif

#include <unistd.h>

namespace cppnet {

int Timer::CreateTimer(int sec, int nsec) {
// only linux support this feature
#ifdef __linux__
int timerfd = timerfd_create(CLOCK_REALTIME, 0);
if (timerfd <= 0) {
return -1;
}

ResetTimer(timerfd, sec, nsec);
return timerfd;
#else
return -1;
#endif
}

int Timer::ResetTimer(int timerfd, int sec, int nsec) {
#ifdef __linux__
struct itimerspec ts;
ts.it_value.tv_sec = sec;
ts.it_value.tv_nsec = nsec;
ts.it_interval.tv_sec = sec;
ts.it_interval.tv_nsec = nsec;

timerfd_settime(timerfd, 0, &ts, NULL);
#endif

return 0;
}

Expand Down

0 comments on commit af81ef9

Please sign in to comment.