From 319011486ec32fef34d0212bce1560073daca4bf Mon Sep 17 00:00:00 2001 From: Ben Zhang Date: Thu, 8 Sep 2016 14:01:16 -0700 Subject: [PATCH] feat(istream): implement TCP using ofxTCPServer the ofxNetwork API is actually quite hard to use (as the API) but since it is simplified, we are using it anyway --- Xcode/ESP/src/istream.cpp | 57 +++++++++++++++++++++++++++++++++++++++ Xcode/ESP/src/istream.h | 25 +++++++++++++++++ 2 files changed, 82 insertions(+) diff --git a/Xcode/ESP/src/istream.cpp b/Xcode/ESP/src/istream.cpp index 4c72754..8e2ac10 100644 --- a/Xcode/ESP/src/istream.cpp +++ b/Xcode/ESP/src/istream.cpp @@ -324,3 +324,60 @@ void FirmataStream::update() { } } } + +bool TcpInputStream::start() { + server_.setup(port_num_); + server_.setMessageDelimiter("\n"); + + reading_thread_.reset(new std::thread([this]() { + // Should sleep a bit here + int sleep_time = 10; + ofLog() << "TCP input streams are checked every " << sleep_time << " ms"; + while (has_started_) { + while (true) { + std::this_thread::sleep_for( + std::chrono::milliseconds(sleep_time)); + for (int i = 0; i < server_.getLastID(); i++) { + if (server_.isClientConnected(i)) { + string str = server_.receive(i); + if (str != "") { + parseInput(str); + } + } + } + } + } + })); + return true; +} + +void TcpInputStream::parseInput(const string& buffer) { + if (data_ready_callback_ != nullptr) { + istringstream iss(buffer); + vector data; + double d; + + while (iss >> d) + data.push_back(d); + + if (data.size() > 0) { + data = normalize(data); + + GRT::MatrixDouble matrix; + matrix.push_back(data); + + data_ready_callback_(matrix); + } + } +} + +void TcpInputStream::stop() { + if (reading_thread_ != nullptr && reading_thread_->joinable()) { + reading_thread_->join(); + } + server_.close(); +} + +int TcpInputStream::getNumInputDimensions() { + return dim_; +} diff --git a/Xcode/ESP/src/istream.h b/Xcode/ESP/src/istream.h index 5d4c919..79659c0 100644 --- a/Xcode/ESP/src/istream.h +++ b/Xcode/ESP/src/istream.h @@ -11,6 +11,7 @@ #include "GRT/GRT.h" #include "ofMain.h" +#include "ofxNetwork.h" #include "stream.h" #include @@ -253,3 +254,27 @@ class FirmataStream : public InputStream { unique_ptr update_thread_; void update(); }; + +// Forward declaration. +class ofxTCPServer; + +/** + @brief Listening for data inputs over a TCP socket. + */ +class TcpInputStream : public InputStream { + public: + TcpInputStream(int port_num, int dimension) + : port_num_(port_num), dim_(dimension) { + } + + virtual bool start() final; + virtual void stop() final; + virtual int getNumInputDimensions() final; + + private: + void parseInput(const string& buffer); + ofxTCPServer server_; + unique_ptr reading_thread_; + int port_num_; + int dim_; +};