Skip to content

Commit

Permalink
feat(istream): implement TCP using ofxTCPServer
Browse files Browse the repository at this point in the history
the ofxNetwork API is actually quite hard to use (as the API) but since
it is simplified, we are using it anyway
  • Loading branch information
nebgnahz committed Sep 8, 2016
1 parent f271f6d commit 3190114
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 0 deletions.
57 changes: 57 additions & 0 deletions Xcode/ESP/src/istream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -324,3 +324,60 @@ void FirmataStream::update() {
}
}
}

bool TcpInputStream::start() {
server_.setup(port_num_);
server_.setMessageDelimiter("\n");

reading_thread_.reset(new std::thread([this]() {
// Should sleep a bit here
int sleep_time = 10;
ofLog() << "TCP input streams are checked every " << sleep_time << " ms";
while (has_started_) {
while (true) {
std::this_thread::sleep_for(
std::chrono::milliseconds(sleep_time));
for (int i = 0; i < server_.getLastID(); i++) {
if (server_.isClientConnected(i)) {
string str = server_.receive(i);
if (str != "") {
parseInput(str);
}
}
}
}
}
}));
return true;
}

void TcpInputStream::parseInput(const string& buffer) {
if (data_ready_callback_ != nullptr) {
istringstream iss(buffer);
vector<double> data;
double d;

while (iss >> d)
data.push_back(d);

if (data.size() > 0) {
data = normalize(data);

GRT::MatrixDouble matrix;
matrix.push_back(data);

data_ready_callback_(matrix);
}
}
}

void TcpInputStream::stop() {
if (reading_thread_ != nullptr && reading_thread_->joinable()) {
reading_thread_->join();
}
server_.close();
}

int TcpInputStream::getNumInputDimensions() {
return dim_;
}
25 changes: 25 additions & 0 deletions Xcode/ESP/src/istream.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

#include "GRT/GRT.h"
#include "ofMain.h"
#include "ofxNetwork.h"
#include "stream.h"

#include <cstdint>
Expand Down Expand Up @@ -253,3 +254,27 @@ class FirmataStream : public InputStream {
unique_ptr<std::thread> update_thread_;
void update();
};

// Forward declaration.
class ofxTCPServer;

/**
@brief Listening for data inputs over a TCP socket.
*/
class TcpInputStream : public InputStream {
public:
TcpInputStream(int port_num, int dimension)
: port_num_(port_num), dim_(dimension) {
}

virtual bool start() final;
virtual void stop() final;
virtual int getNumInputDimensions() final;

private:
void parseInput(const string& buffer);
ofxTCPServer server_;
unique_ptr<std::thread> reading_thread_;
int port_num_;
int dim_;
};

2 comments on commit 3190114

@damellis
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this may do unexpected things if more than one client connects and sends data at once. I'm not really sure what the right behavior is in that situation, though. To start, maybe it's fine to just say that the behavior is unexpected if more than one client connects.

@nebgnahz
Copy link
Collaborator Author

@nebgnahz nebgnahz commented on 3190114 Sep 9, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At first I was thinking that we should restrict it to a single client but couldn't find the API from ofxNetwork. Later I thought we could limit the loop for (int i = 0; i < server_.getLastID(); i++) but the flexibility of stopping the client and reconnecting seems crucial for prototyping (iteration). And for the next connection, you get a different client ID.

In the end I simply leave this as is and yes, when multiple clients connect, data will simply be merged and it's undefined behavior.

Please sign in to comment.