Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Long-lived Rust code that sends data continuously back to Dart #347

Closed
caesaneer opened this issue Feb 17, 2022 · 14 comments
Closed

Long-lived Rust code that sends data continuously back to Dart #347

caesaneer opened this issue Feb 17, 2022 · 14 comments

Comments

@caesaneer
Copy link

Hey Guys,

I thought I'd start a new thread for the question that I did not get an adequate answer to.

Here is a scenario. A Flutter/Dart app needs to start a Rust layer that runs for the life of the application. The Rust layer creates a TCP connection with a remote server and maintains the connection for the life of the app being opened.

The Rust layer needs to stream data back to the Flutter/Dart layer.

The current StreamSink design involves making a call to Rust via the FFI, doing some type of work, streaming the data back Dart/Flutter and once the function call has completed, the Rust layer closes.

Would it be possible to start a Rust layer, have it run for the life of an application, streaming data back to a Dart/Flutter layer? Something like a channel?

@fzyzcjy
Copy link
Owner

fzyzcjy commented Feb 18, 2022

once the function call has completed, the Rust layer closes.

No. It is not the case.

I guess just use the StreamSink is enough. B/c my own use case for StreamSink is as follows. I use it for logging purpose.

// for simplicity, say it is a global variable
let logger = MyLogger;

// api.rs, used by dart code
pub fn create_log_stream(s: StreamSink<LogEntry>) -> Result<()> {
    logger.stream_sink = s;
}

// any_other_file.rs when I want to do logging
logger.stream_sink.add("my_log");

// dart code
api.createLogStream().listen((logEntry) => print('log from rust: $logEntry'));

@fzyzcjy fzyzcjy changed the title Long-lived process Long-lived Rust code that sends data continuously back to Dart Feb 18, 2022
@Desdaemon
Copy link
Contributor

Desdaemon commented Feb 18, 2022

This is how I imagine a long-running Rust function would look like: in this gist, tick is running in a loop on a separate thread so as not to block the Flutter thread. Flutter only knows that the API exposes a Stream of ints, not that the stream might be implemented using threads or async.

I did uncover new questions and problems with this example however:

  • What happens when the stream goes out of scope? Is there already a mechanism in place to let Rust know to stop running after the stream becomes inaccessible/GC'd? Otherwise, the Rust function may add to a stale StreamSink and Rust cannot reclaim that thread anymore.
  • We should be able to omit the Result<()> return type to make it fit with other functions.

@fzyzcjy
Copy link
Owner

fzyzcjy commented Feb 18, 2022

@Desdaemon Thanks for the sample!

What happens when the stream goes out of scope? Is there already a mechanism in place to let Rust know to stop running after the stream becomes inaccessible/GC'd? Otherwise, the Rust function may add to a stale StreamSink and Rust cannot reclaim that thread anymore.

I agree. Streams has some rough edges. There seems to exist a stream.close though. Looking forward to PRs!

We should be able to omit the Result<()> return type to make it fit with other functions.

Agree as well ;)

@fzyzcjy
Copy link
Owner

fzyzcjy commented Feb 18, 2022

Doc updated. Now https://fzyzcjy.github.io/flutter_rust_bridge/feature/stream.html has an example of logger, and an example copied from @Desdaemon 's gist.

@caesaneer
Copy link
Author

Hey guys, thanks for the replies. Super exhausted and will review in the morning. I should have time to work on the benchmarks tomorrow as well. Cheers

@caesaneer
Copy link
Author

caesaneer commented Feb 18, 2022

This is how I imagine a long-running Rust function would look like: in this gist, tick is running in a loop on a separate thread so as not to block the Flutter thread. Flutter only knows that the API exposes a Stream of ints, not that the stream might be implemented using threads or async.

I did uncover new questions and problems with this example however:

* What happens when the stream goes out of scope? Is there already a mechanism in place to let Rust know to stop running after the stream becomes inaccessible/GC'd? Otherwise, the Rust function may add to a stale StreamSink and Rust cannot reclaim that thread anymore.

* We should be able to omit the `Result<()>` return type to make it fit with other functions.

Thanks for the Gist @Desdaemon. Having the StreamSink go out of scope on the Dart side is certainly an issue - not just with a long-lived Rust process, but in theory, it could happen with a short-lived execution as well. This is definitely a hole that needs to be fixed.

Is there any way to send data to Rust from Dart? The StreamSinks are one direction, no?

This feels like a scenario where some sort of context could be useful. For example, the Rust side could check if a context is still valid before writing to the StreamSink. OR

I have a lot of experience with Go. In Go, we have select blocks. A select block allows a Goroutine to wait on messages received via channels. https://go.dev/tour/concurrency/5 Selects can go in a loop.

When I was learning Rust, I cloned one of our Go projects using Tokio. Using Tokio and Tokio's MPSC/Oneshot channels in conjunction with Tokio's select blocks, I was able to create a perfect clone. https://tokio.rs/tokio/tutorial/select

Based on what I know so far about FRB, I feel like @Desdaemon's example could be expanded so that tick used Tokio and Tokio's select. Select in Tokio can wait on any async operation, so if there was a way to wrap a cancellation event in a Future, tick could wait automatically cancel when the Future completed (closed on the Dart side).

Again, not sure if something like this could actually work, but it's an idea.

@Desdaemon
Copy link
Contributor

Based on what I know so far about FRB, I feel like @Desdaemon's example could be expanded so that tick used Tokio and Tokio's select. Select in Tokio can wait on any async operation, so if there was a way to wrap a cancellation event in a Future, tick could wait automatically cancel when the Future completed (closed on the Dart side).

I really like this idea! It would be awesome to see a snippet of this in action.

Is there any way to send data to Rust from Dart? The StreamSinks are one direction, no?

Dart has its own StreamSink, perhaps we can use it or at least the idea?

@fzyzcjy
Copy link
Owner

fzyzcjy commented Feb 19, 2022

Is there any way to send data to Rust from Dart? The StreamSinks are one direction, no?

What about directly calling a rust function? And in that rust function do anything you like, such as enqueueing the data to a Vec, use an Iterator, a Stream etc.

I have a lot of experience with Go. In Go, we have select blocks. A select block allows a Goroutine to wait on messages received via channels. https://go.dev/tour/concurrency/5 Selects can go in a loop.
Based on what I know so far about FRB, I feel like @Desdaemon's example could be expanded so that tick used Tokio and Tokio's select. Select in Tokio can wait on any async operation, so if there was a way to wrap a cancellation event in a Future, tick could wait automatically cancel when the Future completed (closed on the Dart side).

Sounds interesting!

@fzyzcjy
Copy link
Owner

fzyzcjy commented Feb 19, 2022

  • What happens when the stream goes out of scope? Is there already a mechanism in place to let Rust know to stop running after the stream becomes inaccessible/GC'd? Otherwise, the Rust function may add to a stale StreamSink and Rust cannot reclaim that thread anymore.
    Thanks for the Gist @Desdaemon. Having the StreamSink go out of scope on the Dart side is certainly an issue - not just with a long-lived Rust process, but in theory, it could happen with a short-lived execution as well. This is definitely a hole that needs to be fixed.

Can we do something like this: When the StreamSink is dropped, it is automatically closed. In this case:

@caesaneer
Copy link
Author

  • What happens when the stream goes out of scope? Is there already a mechanism in place to let Rust know to stop running after the stream becomes inaccessible/GC'd? Otherwise, the Rust function may add to a stale StreamSink and Rust cannot reclaim that thread anymore.
    Thanks for the Gist @Desdaemon. Having the StreamSink go out of scope on the Dart side is certainly an issue - not just with a long-lived Rust process, but in theory, it could happen with a short-lived execution as well. This is definitely a hole that needs to be fixed.

Can we do something like this: When the StreamSink is dropped, it is automatically closed. In this case:

* The logger example (https://fzyzcjy.github.io/flutter_rust_bridge/feature/stream.html#example-logger) continues working, since we still allow `StreamSink` to live much longer than the frb function.

* No worries about forgetting to close the stream, or sending data to a closed stream (guaranteed by rust's very powerful checkers)

What happens when the Dart side closes (terminates an isolate waiting on a sink) the StreamSink but the Rust side is still running, as @Desdaemon has pointed out? It's entirely possible for the Rust side to add data to a StreamSink that is no longer valid.

@caesaneer
Copy link
Author

I feel like this was closed, far from any reasonable solution/resolution being reached.

@Desdaemon
Copy link
Contributor

Desdaemon commented Feb 19, 2022

I feel like a slight tweak of the Rust StreamSink API is needed (maybe have sink.add return a #[must_use] Result<()>?), but I'm a bit busy right now so looking forward to contributions from other folks.

The sink.add function already returns a bool, perhaps we need documentation for these items as well.

@fzyzcjy
Copy link
Owner

fzyzcjy commented Feb 20, 2022

What happens when the Dart side closes (terminates an isolate waiting on a sink) the StreamSink but the Rust side is still running, as @Desdaemon has pointed out? It's entirely possible for the Rust side to add data to a StreamSink that is no longer valid.

Maybe no problem? Dart cannot close the stream. Only Rust triggers closing.

Btw, why dart create isolates? in my use case everything is on main isolate

The sink.add function already returns a bool, perhaps we need documentation for these items as well.

Agree. Btw that means whether the data send to SendPort gets sent successfully, not about whether the stream is closed or not... Anyway error handling is hard...

@github-actions
Copy link
Contributor

github-actions bot commented Mar 6, 2022

This thread has been automatically locked since there has not been any recent activity after it was closed. If you are still experiencing a similar issue, please open a new issue.

@github-actions github-actions bot locked as resolved and limited conversation to collaborators Mar 6, 2022
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants