diff --git a/apps/srt-live-transmit.cpp b/apps/srt-live-transmit.cpp index 0a918e6b0..48f436e76 100644 --- a/apps/srt-live-transmit.cpp +++ b/apps/srt-live-transmit.cpp @@ -514,6 +514,7 @@ int main(int argc, char** argv) return 1; } int events = SRT_EPOLL_IN | SRT_EPOLL_ERR; + switch (src->uri.type()) { case UriParser::SRT: @@ -537,14 +538,18 @@ int main(int argc, char** argv) } break; case UriParser::FILE: - if (srt_epoll_add_ssock(pollid, - src->GetSysSocket(), &events)) { - cerr << "Failed to add FILE source to poll, " - << src->GetSysSocket() << endl; - return 1; + const int con = src->GetSysSocket(); + // try to make the standard input non blocking + if (srt_epoll_add_ssock(pollid, con, &events)) + { + cerr << "Failed to add FILE source to poll, " + << src->GetSysSocket() << endl; + return 1; + } + break; + } - break; default: break; } @@ -589,6 +594,7 @@ int main(int argc, char** argv) SRTSOCKET srtrwfds[4] = {SRT_INVALID_SOCK, SRT_INVALID_SOCK , SRT_INVALID_SOCK , SRT_INVALID_SOCK }; int sysrfdslen = 2; SYSSOCKET sysrfds[2]; + if (srt_epoll_wait(pollid, &srtrwfds[0], &srtrfdslen, &srtrwfds[2], &srtwfdslen, 100, @@ -771,12 +777,49 @@ int main(int argc, char** argv) break; } + + bool srcReady = false; + + if (src.get() && src->IsOpen()) + { + if (srtrfdslen > 0) + { + SRTSOCKET sock = src->GetSRTSocket(); + if (sock != SRT_INVALID_SOCK) + { + for (int n = 0; n < srtrfdslen; n ++) + { + if (sock == srtrwfds[n]) + { + srcReady = true; + break; + } + } + + } + } + if (!srcReady && sysrfdslen > 0) + { + int sock = src->GetSysSocket(); + if (sock != -1) + { + for (int n = 0; n < sysrfdslen; n++) + { + if (sock == sysrfds[n]) + { + srcReady = true; + break; + } + } + } + } + } // read a few chunks at a time in attempt to deplete // read buffers as much as possible on each read event // note that this implies live streams and does not // work for cached/file sources std::list> dataqueue; - if (src.get() && src->IsOpen() && (srtrfdslen || sysrfdslen)) + if (srcReady) { while (dataqueue.size() < cfg.buffering) { @@ -800,9 +843,10 @@ int main(int argc, char** argv) dataqueue.push_back(pkt); receivedBytes += pkt->payload.size(); + if (src->MayBlock()) + break; } } - // if there is no target, let the received data be lost while (!dataqueue.empty()) { diff --git a/apps/transmitbase.hpp b/apps/transmitbase.hpp index bd0af916d..0ebbf61e3 100644 --- a/apps/transmitbase.hpp +++ b/apps/transmitbase.hpp @@ -71,6 +71,7 @@ class Source: public Location virtual SRTSOCKET GetSRTSocket() const { return SRT_INVALID_SOCK; } virtual int GetSysSocket() const { return -1; } + virtual bool MayBlock() { return false; } virtual bool AcceptNewClient() { return false; } }; diff --git a/apps/transmitmedia.cpp b/apps/transmitmedia.cpp index 275295173..45a1e208b 100644 --- a/apps/transmitmedia.cpp +++ b/apps/transmitmedia.cpp @@ -704,6 +704,7 @@ Iface* CreateSrt(const string& host, int port, const map& par) { class ConsoleSource: public Source { + bool may_block = true; public: ConsoleSource() @@ -712,6 +713,9 @@ class ConsoleSource: public Source // The default stdin mode on windows is text. // We have to set it to the binary mode _setmode(_fileno(stdin), _O_BINARY); +#else + const int fd = fileno(stdin); + may_block = fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | O_NONBLOCK) < 0; #endif } @@ -720,9 +724,8 @@ class ConsoleSource: public Source if (pkt.payload.size() < chunk) pkt.payload.resize(chunk); - bool st = cin.read(pkt.payload.data(), chunk).good(); - chunk = cin.gcount(); - if (chunk == 0 || !st) + int ret = ::read(GetSysSocket(), pkt.payload.data(), chunk); + if (ret <= 0) { pkt.payload.clear(); return 0; @@ -731,14 +734,15 @@ class ConsoleSource: public Source // Save this time to potentially use it for SRT target. pkt.time = srt_time_now(); if (chunk < pkt.payload.size()) - pkt.payload.resize(chunk); + pkt.payload.resize(ret); - return (int) chunk; + return ret; } - bool IsOpen() override { return cin.good(); } + bool IsOpen() override { return !cin.eof(); } + bool MayBlock() override { return may_block; } bool End() override { return cin.eof(); } - int GetSysSocket() const override { return 0; }; + int GetSysSocket() const override { return fileno(stdin); }; }; class ConsoleTarget: public Target @@ -767,7 +771,7 @@ class ConsoleTarget: public Target bool IsOpen() override { return cout.good(); } bool Broken() override { return cout.eof(); } - int GetSysSocket() const override { return 0; }; + int GetSysSocket() const override { return fileno(stdout); }; }; template struct Console;