-
Notifications
You must be signed in to change notification settings - Fork 866
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[apps] FIX srt-live-transmit stops listening after SRT disconnect #2997... #3108
base: master
Are you sure you want to change the base?
Changes from all commits
dfa9a99
c9e9425
3289d96
33ffc41
726f66e
94cac83
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -514,6 +514,7 @@ int main(int argc, char** argv) | |
return 1; | ||
} | ||
int events = SRT_EPOLL_IN | SRT_EPOLL_ERR; | ||
|
||
switch (src->uri.type()) | ||
{ | ||
case UriParser::SRT: | ||
|
@@ -537,14 +538,18 @@ int main(int argc, char** argv) | |
} | ||
break; | ||
case UriParser::FILE: | ||
if (srt_epoll_add_ssock(pollid, | ||
src->GetSysSocket(), &events)) | ||
{ | ||
cerr << "Failed to add FILE source to poll, " | ||
<< src->GetSysSocket() << endl; | ||
return 1; | ||
const int con = src->GetSysSocket(); | ||
// try to make the standard input non blocking | ||
if (srt_epoll_add_ssock(pollid, con, &events)) | ||
{ | ||
cerr << "Failed to add FILE source to poll, " | ||
<< src->GetSysSocket() << endl; | ||
return 1; | ||
} | ||
break; | ||
|
||
} | ||
break; | ||
default: | ||
break; | ||
} | ||
|
@@ -589,6 +594,7 @@ int main(int argc, char** argv) | |
SRTSOCKET srtrwfds[4] = {SRT_INVALID_SOCK, SRT_INVALID_SOCK , SRT_INVALID_SOCK , SRT_INVALID_SOCK }; | ||
int sysrfdslen = 2; | ||
SYSSOCKET sysrfds[2]; | ||
|
||
if (srt_epoll_wait(pollid, | ||
&srtrwfds[0], &srtrfdslen, &srtrwfds[2], &srtwfdslen, | ||
100, | ||
|
@@ -771,12 +777,49 @@ int main(int argc, char** argv) | |
break; | ||
} | ||
|
||
|
||
bool srcReady = false; | ||
|
||
if (src.get() && src->IsOpen()) | ||
{ | ||
if (srtrfdslen > 0) | ||
{ | ||
SRTSOCKET sock = src->GetSRTSocket(); | ||
if (sock != SRT_INVALID_SOCK) | ||
{ | ||
for (int n = 0; n < srtrfdslen; n ++) | ||
{ | ||
if (sock == srtrwfds[n]) | ||
{ | ||
srcReady = true; | ||
break; | ||
} | ||
} | ||
|
||
} | ||
} | ||
if (!srcReady && sysrfdslen > 0) | ||
{ | ||
int sock = src->GetSysSocket(); | ||
if (sock != -1) | ||
{ | ||
for (int n = 0; n < sysrfdslen; n++) | ||
{ | ||
if (sock == sysrfds[n]) | ||
{ | ||
srcReady = true; | ||
break; | ||
} | ||
} | ||
} | ||
} | ||
} | ||
// read a few chunks at a time in attempt to deplete | ||
// read buffers as much as possible on each read event | ||
// note that this implies live streams and does not | ||
// work for cached/file sources | ||
std::list<std::shared_ptr<MediaPacket>> dataqueue; | ||
if (src.get() && src->IsOpen() && (srtrfdslen || sysrfdslen)) | ||
if (srcReady) | ||
{ | ||
while (dataqueue.size() < cfg.buffering) | ||
{ | ||
|
@@ -800,9 +843,10 @@ int main(int argc, char** argv) | |
|
||
dataqueue.push_back(pkt); | ||
receivedBytes += pkt->payload.size(); | ||
if (src->MayBlock()) | ||
break; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think that actually reading should be done this way: either you have the device read-ready, so you read, and then after you read, you don't know, and have to recheck. Or, you can resolve to reading multiple times, counting on that when particular time reading isn't ready, then the Read call should report an error. Might be, I think, a good idea, to keep the "blocked" state in the fields, which will be written to, in case when particular Read implementation finds out that the call failed due to not being ready. This way it won't need to see if this is SRT and this way we use that function to get the error and maybe check for an SRT-specific readiness failure. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. From what I understand, the current implementation already follows this approach. For file descriptors in blocking mode (case 1), it checks if the device is ready before attempting to read, performs the read operation, and rechecks the state afterward since readiness isn’t guaranteed. For file descriptors in non-blocking mode (case 2), it handles multiple read attempts and relies on the error returned by the Read call to detect if the device isn’t ready. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This here is unclear. First, The only thing I'm referring to is the approach to multiple reading calls, which should follow one of two methods:
Note that the second approach isn't possible to be used reliably in case of blocking mode, that's why it should not be taken into account. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I understand the benefits of enforcing non-blocking mode for consistency and simplifying the architecture. However, I think it might be preferable to maintain support for both blocking and non-blocking modes. This flexibility ensures compatibility with a broader range of sources and use cases, particularly for systems where blocking mode is either required or more practical. The current implementation can differentiate the handling of each mode: Would you be open to maintaining support for both modes, or do you see specific challenges in doing so? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The initial specific about There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Exactly. This is built in into the architecture of this application to only support the nonblocking mode and it should not even take the blocking mode into account - unlike the |
||
} | ||
} | ||
|
||
// if there is no target, let the received data be lost | ||
while (!dataqueue.empty()) | ||
{ | ||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -71,6 +71,7 @@ class Source: public Location | |||||
|
||||||
virtual SRTSOCKET GetSRTSocket() const { return SRT_INVALID_SOCK; } | ||||||
virtual int GetSysSocket() const { return -1; } | ||||||
virtual bool MayBlock() { return false; } | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is not expected to change object's state, at least so far. Hence can be const.
Suggested change
|
||||||
virtual bool AcceptNewClient() { return false; } | ||||||
}; | ||||||
|
||||||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -704,6 +704,7 @@ Iface* CreateSrt(const string& host, int port, const map<string,string>& par) { | |||||
|
||||||
class ConsoleSource: public Source | ||||||
{ | ||||||
bool may_block = true; | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Potentially can be ConsoleSource()
#ifdef _WIN32
: may_block(true)
#else
: may_block(fcntl(fileno(stdin), F_SETFL, fcntl(fd, F_GETFL) | O_NONBLOCK) < 0)
#endif There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would eliminate it altogether and just report an error and exit in case when nonblocking mode can't be enforced. |
||||||
public: | ||||||
|
||||||
ConsoleSource() | ||||||
|
@@ -712,6 +713,9 @@ class ConsoleSource: public Source | |||||
// The default stdin mode on windows is text. | ||||||
// We have to set it to the binary mode | ||||||
_setmode(_fileno(stdin), _O_BINARY); | ||||||
#else | ||||||
const int fd = fileno(stdin); | ||||||
may_block = fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | O_NONBLOCK) < 0; | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That doesn't make any sense. If this fails (best from the constructor), it should make the application exit. Error from this call is unlikely. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Typically, when implementing an event loop for reading, the pattern is to use non-blocking file descriptors and continue reading until |
||||||
#endif | ||||||
} | ||||||
|
||||||
|
@@ -720,9 +724,8 @@ class ConsoleSource: public Source | |||||
if (pkt.payload.size() < chunk) | ||||||
pkt.payload.resize(chunk); | ||||||
|
||||||
bool st = cin.read(pkt.payload.data(), chunk).good(); | ||||||
chunk = cin.gcount(); | ||||||
if (chunk == 0 || !st) | ||||||
int ret = ::read(GetSysSocket(), pkt.payload.data(), chunk); | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
if (ret <= 0) | ||||||
{ | ||||||
pkt.payload.clear(); | ||||||
return 0; | ||||||
|
@@ -731,14 +734,15 @@ class ConsoleSource: public Source | |||||
// Save this time to potentially use it for SRT target. | ||||||
pkt.time = srt_time_now(); | ||||||
if (chunk < pkt.payload.size()) | ||||||
pkt.payload.resize(chunk); | ||||||
pkt.payload.resize(ret); | ||||||
|
||||||
return (int) chunk; | ||||||
return ret; | ||||||
} | ||||||
|
||||||
bool IsOpen() override { return cin.good(); } | ||||||
bool IsOpen() override { return !cin.eof(); } | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't feel good about this, because the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually the only use case when |
||||||
bool MayBlock() override { return may_block; } | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
bool End() override { return cin.eof(); } | ||||||
int GetSysSocket() const override { return 0; }; | ||||||
int GetSysSocket() const override { return fileno(stdin); }; | ||||||
}; | ||||||
|
||||||
class ConsoleTarget: public Target | ||||||
|
@@ -767,7 +771,7 @@ class ConsoleTarget: public Target | |||||
|
||||||
bool IsOpen() override { return cout.good(); } | ||||||
bool Broken() override { return cout.eof(); } | ||||||
int GetSysSocket() const override { return 0; }; | ||||||
int GetSysSocket() const override { return fileno(stdout); }; | ||||||
}; | ||||||
|
||||||
template <class Iface> struct Console; | ||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider packing this into a helper function than can be used both for
srtrwfds
andsysrfds
, thus making the code more compact and easier to read the intent.