Skip to content

Commit

Permalink
{EPOLL} New API functions (#872)
Browse files Browse the repository at this point in the history
* Added new API functions for epoll
  • Loading branch information
ethouris authored and rndi committed Sep 12, 2019
1 parent e59a8bd commit 808a1fe
Show file tree
Hide file tree
Showing 8 changed files with 187 additions and 0 deletions.
65 changes: 65 additions & 0 deletions srtcore/api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1433,6 +1433,20 @@ int CUDTUnited::epoll_wait(
return m_EPoll.wait(eid, readfds, writefds, msTimeOut, lrfds, lwfds);
}

int CUDTUnited::epoll_uwait(
const int eid,
SRT_EPOLL_EVENT* fdsSet,
int fdsSize,
int64_t msTimeOut)
{
return m_EPoll.uwait(eid, fdsSet, fdsSize, msTimeOut);
}

int32_t CUDTUnited::epoll_set(int eid, int32_t flags)
{
return m_EPoll.setflags(eid, flags);
}

int CUDTUnited::epoll_release(const int eid)
{
return m_EPoll.release(eid);
Expand Down Expand Up @@ -2663,6 +2677,52 @@ int CUDT::epoll_wait(
}
}

int CUDT::epoll_uwait(
const int eid,
SRT_EPOLL_EVENT* fdsSet,
int fdsSize,
int64_t msTimeOut)
{
try
{
return s_UDTUnited.epoll_uwait(eid, fdsSet, fdsSize, msTimeOut);
}
catch (CUDTException e)
{
s_UDTUnited.setError(new CUDTException(e));
return ERROR;
}
catch (std::exception& ee)
{
LOGC(mglog.Fatal, log << "epoll_uwait: UNEXPECTED EXCEPTION: "
<< typeid(ee).name() << ": " << ee.what());
s_UDTUnited.setError(new CUDTException(MJ_UNKNOWN, MN_NONE, 0));
return ERROR;
}
}

int32_t CUDT::epoll_set(
const int eid,
int32_t flags)
{
try
{
return s_UDTUnited.epoll_set(eid, flags);
}
catch (CUDTException e)
{
s_UDTUnited.setError(new CUDTException(e));
return ERROR;
}
catch (std::exception& ee)
{
LOGC(mglog.Fatal, log << "epoll_set: UNEXPECTED EXCEPTION: "
<< typeid(ee).name() << ": " << ee.what());
s_UDTUnited.setError(new CUDTException(MJ_UNKNOWN, MN_NONE, 0));
return ERROR;
}
}

int CUDT::epoll_release(const int eid)
{
try
Expand Down Expand Up @@ -3102,6 +3162,11 @@ int epoll_wait2(
return ret;
}

int epoll_uwait(int eid, SRT_EPOLL_EVENT* fdsSet, int fdsSize, int64_t msTimeOut)
{
return CUDT::epoll_uwait(eid, fdsSet, fdsSize, msTimeOut);
}

int epoll_release(int eid)
{
return CUDT::epoll_release(eid);
Expand Down
2 changes: 2 additions & 0 deletions srtcore/api.h
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,8 @@ friend class CRendezvousQueue;
int epoll_update_usock(const int eid, const SRTSOCKET u, const int* events = NULL);
int epoll_update_ssock(const int eid, const SYSSOCKET s, const int* events = NULL);
int epoll_wait(const int eid, std::set<SRTSOCKET>* readfds, std::set<SRTSOCKET>* writefds, int64_t msTimeOut, std::set<SYSSOCKET>* lrfds = NULL, std::set<SYSSOCKET>* lwfds = NULL);
int epoll_uwait(const int eid, SRT_EPOLL_EVENT* fdsSet, int fdsSize, int64_t msTimeOut);
int32_t epoll_set(const int eid, int32_t flags);
int epoll_release(const int eid);

/// record the UDT exception.
Expand Down
2 changes: 2 additions & 0 deletions srtcore/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,8 @@ class CUDT
static int epoll_update_usock(const int eid, const SRTSOCKET u, const int* events = NULL);
static int epoll_update_ssock(const int eid, const SYSSOCKET s, const int* events = NULL);
static int epoll_wait(const int eid, std::set<SRTSOCKET>* readfds, std::set<SRTSOCKET>* writefds, int64_t msTimeOut, std::set<SYSSOCKET>* lrfds = NULL, std::set<SYSSOCKET>* wrfds = NULL);
static int epoll_uwait(const int eid, SRT_EPOLL_EVENT* fdsSet, int fdsSize, int64_t msTimeOut);
static int32_t epoll_set(const int eid, int32_t flags);
static int epoll_release(const int eid);
static CUDTException& getlasterror();
static int perfmon(SRTSOCKET u, CPerfMon* perf, bool clear = true);
Expand Down
89 changes: 89 additions & 0 deletions srtcore/epoll.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,95 @@ int CEPoll::update_ssock(const int eid, const SYSSOCKET& s, const int* events)
return 0;
}

int CEPoll::setflags(const int eid, int32_t flags)
{
CGuard pg(m_EPollLock);
map<int, CEPollDesc>::iterator p = m_mPolls.find(eid);
if (p == m_mPolls.end())
throw CUDTException(MJ_NOTSUP, MN_EIDINVAL);
CEPollDesc& ed = p->second;

int32_t oflags = ed.flags();

if (flags == -1)
return oflags;

if (flags == 0)
{
ed.clr_flags(~int32_t());
}
else
{
ed.set_flags(flags);
}

return oflags;
}

int CEPoll::uwait(const int eid, SRT_EPOLL_EVENT* fdsSet, int fdsSize, int64_t msTimeOut)
{
// It is allowed to call this function witn fdsSize == 0
// and therefore also NULL fdsSet. This will then only report
// the number of ready sockets, just without information which.
if (fdsSize < 0 || (fdsSize > 0 && !fdsSet))
throw CUDTException(MJ_NOTSUP, MN_INVAL);

int64_t entertime = CTimer::getTime();

while (true)
{
{
CGuard pg(m_EPollLock);
map<int, CEPollDesc>::iterator p = m_mPolls.find(eid);
if (p == m_mPolls.end())
throw CUDTException(MJ_NOTSUP, MN_EIDINVAL);
CEPollDesc& ed = p->second;

if (!ed.flags(SRT_EPOLL_ENABLE_EMPTY) && ed.watch_empty())
{
// Empty EID is not allowed, report error.
throw CUDTException(MJ_NOTSUP, MN_INVAL);
}

if (ed.flags(SRT_EPOLL_ENABLE_OUTPUTCHECK) && (fdsSet == NULL || fdsSize == 0))
{
// Empty EID is not allowed, report error.
throw CUDTException(MJ_NOTSUP, MN_INVAL);
}

if (!ed.m_sLocals.empty())
{
// XXX Add error log
// uwait should not be used with EIDs subscribed to system sockets
throw CUDTException(MJ_NOTSUP, MN_INVAL);
}

int total = 0; // This is a list, so count it during iteration
CEPollDesc::enotice_t::iterator i = ed.enotice_begin();
while (i != ed.enotice_end())
{
int pos = total; // previous past-the-end position
++total;

if (total > fdsSize)
break;

fdsSet[pos] = *i;

ed.checkEdge(i++); // NOTE: potentially deletes `i`
}
if (total)
return total;
}

if ((msTimeOut >= 0) && (int64_t(CTimer::getTime() - entertime) >= msTimeOut * int64_t(1000)))
break; // official wait does: throw CUDTException(MJ_AGAIN, MN_XMTIMEOUT, 0);

CTimer::waitForEvent();
}

return 0;
}

int CEPoll::wait(const int eid, set<SRTSOCKET>* readfds, set<SRTSOCKET>* writefds, int64_t msTimeOut, set<SYSSOCKET>* lrfds, set<SYSSOCKET>* lwfds)
{
Expand Down
11 changes: 11 additions & 0 deletions srtcore/epoll.h
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,15 @@ friend class CRendezvousQueue;

int wait(const int eid, std::set<SRTSOCKET>* readfds, std::set<SRTSOCKET>* writefds, int64_t msTimeOut, std::set<SYSSOCKET>* lrfds, std::set<SYSSOCKET>* lwfds);

/// wait for EPoll events or timeout optimized with explicit EPOLL_ERR event and the edge mode option.
/// @param [in] eid EPoll ID.
/// @param [out] fdsSet array of user socket events (SRT_EPOLL_IN | SRT_EPOLL_OUT | SRT_EPOLL_ERR).
/// @param [int] fdsSize of fds array
/// @param [in] msTimeOut timeout threshold, in milliseconds.
/// @return total of available events in the epoll system (can be greater than fdsSize)

int uwait(const int eid, SRT_EPOLL_EVENT* fdsSet, int fdsSize, int64_t msTimeOut);

/// close and release an EPoll.
/// @param [in] eid EPoll ID.
/// @return 0 if success, otherwise an error number.
Expand All @@ -358,6 +367,8 @@ friend class CRendezvousQueue;

int update_events(const SRTSOCKET& uid, std::set<int>& eids, int events, bool enable);

int setflags(const int eid, int32_t flags);

private:
int m_iIDSeed; // seed to generate a new ID
pthread_mutex_t m_SeedLock;
Expand Down
3 changes: 3 additions & 0 deletions srtcore/srt.h
Original file line number Diff line number Diff line change
Expand Up @@ -685,6 +685,9 @@ typedef struct SRT_EPOLL_EVENT_
SRTSOCKET fd;
int events; // SRT_EPOLL_IN | SRT_EPOLL_OUT | SRT_EPOLL_ERR
} SRT_EPOLL_EVENT;
SRT_API int srt_epoll_uwait(int eid, SRT_EPOLL_EVENT* fdsSet, int fdsSize, int64_t msTimeOut);

SRT_API int32_t srt_epoll_set(int eid, int32_t flags);
SRT_API int srt_epoll_release(int eid);

// Logging control
Expand Down
14 changes: 14 additions & 0 deletions srtcore/srt_c_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,20 @@ int srt_epoll_wait(
lrfds, lrnum, lwfds, lwnum);
}

int srt_epoll_uwait(int eid, SRT_EPOLL_EVENT* fdsSet, int fdsSize, int64_t msTimeOut)
{
return UDT::epoll_uwait(
eid,
fdsSet,
fdsSize,
msTimeOut);
}

// use this function to set flags. Default flags are always "everything unset".
// Pass 0 here to clear everything, or nonzero to set a desired flag.
// Pass -1 to not change anything (but still get the current flag value).
int32_t srt_epoll_set(int eid, int32_t flags) { return CUDT::epoll_set(eid, flags); }

int srt_epoll_release(int eid) { return CUDT::epoll_release(eid); }

void srt_setloglevel(int ll)
Expand Down
1 change: 1 addition & 0 deletions srtcore/udt.h
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,7 @@ UDT_API int epoll_wait(int eid, std::set<UDTSOCKET>* readfds, std::set<UDTSOCKET
std::set<SYSSOCKET>* lrfds = NULL, std::set<SYSSOCKET>* wrfds = NULL);
UDT_API int epoll_wait2(int eid, UDTSOCKET* readfds, int* rnum, UDTSOCKET* writefds, int* wnum, int64_t msTimeOut,
SYSSOCKET* lrfds = NULL, int* lrnum = NULL, SYSSOCKET* lwfds = NULL, int* lwnum = NULL);
UDT_API int epoll_uwait(const int eid, SRT_EPOLL_EVENT* fdsSet, int fdsSize, int64_t msTimeOut);
UDT_API int epoll_release(int eid);
UDT_API ERRORINFO& getlasterror();
UDT_API int getlasterror_code();
Expand Down

0 comments on commit 808a1fe

Please sign in to comment.