Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Next: A Proposal to easily create Operators from Actors. #47

Closed
oliviermilla opened this issue Feb 15, 2024 · 7 comments
Closed

Next: A Proposal to easily create Operators from Actors. #47

oliviermilla opened this issue Feb 15, 2024 · 7 comments

Comments

@oliviermilla
Copy link

oliviermilla commented Feb 15, 2024

Hello,

On a high level, I need to be able to subscribe to Actors. This is nonsensical, but numerous actors do not act as pure sinks, but instead as processing Operators in a data flow.

A theoretical example:

a1 = MyActor()
a2 = MyOtherActor()
subscribe!(a1, a2)

To my understanding, this isn't possible. The workaround I'm using is to pass a Subject and hack subscribe!()

struct MyActor{A} <: Actor{Int}
    # ...
    next::A
end

subject = Subject(Int)
a1 = MyActor(subject)
a2 = MyOtherActor()

Rocket.subscribe!(a::MyActor, b::Actor) = subscribe!(a.next, b)

subscribe!(a1, a2)

This is a similar design to what Actors that are through Proxies in Operators do. While I wish there would be a clean pattern to declare an Actor both an Actor and an Observable/Subject, I thought of an in-between step that matches the current design.

The idea occurred to me while looking at the CircularKeep Actor. I needed that Actor to actually next! his buffer every time new data is received. Since CircularKeep is a 'sink' Actor, the only way to do that would be to rewrite an Actor that copies the behavior of CircularKeep:

struct CircularKeepForOperators{A} <: Actor
     # ... Same stuff as in CircularKeep
     # Plus a receiving Actor
    actor::A
end

# Then define proxy and operator, on_call, operator_right, etc.

So this was painfully redundant, and there may be some pattern to turn any Actor into its Operator equivalent. (Though turning it into an Observable/Subject still makes sense to me.)

So I came up with a next() Operator. I would love to hear your thoughts as I think it would be a nice addition, possibly reduce the code base and allow for any actor to be turned into an operator easily:

export next

using Rocket

# Operator
next(actor::Actor{L1}, next::Actor{L2}) where {L1,L2} = NextOperator{L1,L2}(actor, next)

struct NextOperator{L1,L2} <: InferableOperator
    actor::Actor{L1}
    next::Actor{L2}
end

Rocket.operator_right(::NextOperator{L1,L2}, ::Type{L1}) where {L1,L2} = L2

function Rocket.on_call!(::Type{L1}, ::Type{L2}, operator::NextOperator, source) where {L1,L2}
    return proxy(L2, source, NextProxy(operator.actor, actor.next))
end

# Proxy
struct NextProxy{L1,L2} <: ActorProxy
    actor::Actor{L1}
    next::Actor{L2}
end

function Rocket.actor_proxy!(::Type{L2}, proxy::NextProxy{L1,L2}, actor::Actor{L2}) where {L1,L2}
    return NextActor{L1,L2}(proxy.actor, proxy.next)
end

# Observable
struct NextActor{L1,L2} <: Actor{L2}
    actor::Actor{L1}
    next::Actor{L2}
end

function Rocket.on_next!(actor::NextActor{L1,L2}, data::L1) where {L}
    next!(actor.actor, data)
    next!(actor.next, getrecent(actor.actor))
end

function Rocket.on_error!(actor::NextActor, error)
    error!(actor.actor, error)
    error!(actor.next, error)
end

function Rocket.on_complete!(actor::NextActor)
    complete!(actor.actor)
    complete!(actor.next)
end

Best! 🚀

@oliviermilla oliviermilla changed the title A Proposal for an operator to easily wrap actors into Operators. Next: A Proposal for a wrap Operator to easily wrap Actors into Operators. Feb 15, 2024
@oliviermilla oliviermilla changed the title Next: A Proposal for a wrap Operator to easily wrap Actors into Operators. Next: A Proposal to easily create Operators from Actors. Feb 15, 2024
@wouterwln
Copy link
Member

I had the same idea before, and I fixed it by implementing an Actor which does some computation and then calls next! on a coupled subject. The idea is implemented in RxEnvironments.jl here:

https://github.com/biaslab/RxEnvironments.jl/blob/25f1217baea3a525aee95a08b72555d577a853af/src/markovblanket.jl#L17C1-L32C1

I hope this helps!

@bvdmitri
Copy link
Member

bvdmitri commented Feb 15, 2024

Hey! Thanks for using the package.

an Actor both an Actor and an Observable/Subject`

An actor which is both an actor and an observable is by definition a Subject. So I do not understand what exactly else is needed to be implemented. If you need a sort of a "subscribable" actor then indeed the Subject is correct way to do that. Actors in proxies is just an implementational detail and may change in the future. And also, they don't really produce any value either, its just that the only purpose of an Actor in the operators is to retranslate values to other actors.

there may be some pattern to turn any Actor into its Operator equivalent

In my head there is none, because again an Actor by itself does not produce any values over time. An Observable does. You can create an actor that retranslates a value from one place to another, but it is not generalizable to the the concept of an actor.

On a high level, I need to be able to subscribe to Actors.

That is semantically wrong. This was a deliberate design choice that you cannot subscribe on Actors and this is not going to change. Actors are aimed to be a single independent computational units. Actors as a concept do not produce any values, so there is nothing to subscribe for.

So I came up with a next() Operator.

I'm not entirely sure I understand the purpose of this operator? As far as I can read from the code (I might be wrong) it multicasts the same messages to two actors, but there is already the multicast and shared operators for this purpose.
https://reactivebayes.github.io/Rocket.jl/stable/operators/multicasting/about/ Maybe this is exactly what you need? Multicasting operators use the Subject under the hood.

Maybe if you explain your use case I can come up with a better solution to your problem.

P.S. The getrecent function is not really defined for all actors and is an ugly hack, that I would like to replace eventually with a better API.

@oliviermilla
Copy link
Author

Hello @bvdmitri,

Thank you for your message. And thank you for Rocket.jl !

  • About Subjects: OK, I get your point. I haven't figured out how to implement a Subject that does computations before forecasting it to its subscribers. That may be my missing piece. :)

  • About Actors: I totally agree with the concept that actors cannot be subscribed to.

  • About My code: Let me rephrase my question: How would you write a circularkeep() Operator that leverages the current CircularKeepActor? My code was an attempt to do just that.

source = from(1:42) |> circularkeep(3)
subscribe!(source, logger())
  • About getrecent(), yeah. I saw the comment in the code, but I don't understand the need for the pattern (yet). :)

@oliviermilla
Copy link
Author

oliviermilla commented Feb 15, 2024

I had the same idea before, and I fixed it by implementing an Actor which does some computation and then calls next! on a coupled subject. The idea is implemented in RxEnvironments.jl here:

https://github.com/biaslab/RxEnvironments.jl/blob/25f1217baea3a525aee95a08b72555d577a853af/src/markovblanket.jl#L17C1-L32C1

I hope this helps!

Exactly! The pattern seems natural and redundant (at least for me).

I was thinking about how to have it as a clean pattern to reuse. Thanks for sharing.

@bvdmitri
Copy link
Member

bvdmitri commented Feb 15, 2024

Well, I would do something like this probably

using Rocket, DataStructures

source = from(1:10) |> scan(CircularBuffer{Int}, (v, c) -> push!(c, v), CircularBuffer{Int}(3))
subscribe!(source, logger)

prints

[LogActor] Data: [1]
[LogActor] Data: [1, 2]
[LogActor] Data: [1, 2, 3]
[LogActor] Data: [2, 3, 4]
[LogActor] Data: [3, 4, 5]
[LogActor] Data: [4, 5, 6]
[LogActor] Data: [5, 6, 7]
[LogActor] Data: [6, 7, 8]
[LogActor] Data: [7, 8, 9]
[LogActor] Data: [8, 9, 10]
[LogActor] Completed

And you can do something like

operatorcircularkeep(n) = scan(CircularBuffer{Any}, (v, c) -> push!(c, v), CircularBuffer{Any}(n))

subscribe!(from(1:10) |> operatorcircularkeep(3), logger())
subscribe!(from(1:10) |> operatorcircularkeep(5), logger())

The second approach is convenient, but type-unstable (I used Any in the container eltype).

Leveraging circularkeep actor is probably possible with somewhat the same approach, but will be overly specific to the concrete Actor and probably not generic, so if you just want to keep values in a circular manner I would just make a specific scan operation.

If you want to explore other possibilities you can also look into the call_operator!(operator::O, source::S) implementation. Maybe you just want to implement call_operator!(actor::A, source::S) since the source |> operator just translates to call_operator!(operator, source). Maybe you can come up with a certain logic in the application of an actor to a source, but I doubt it will generalize beyond several specific actors since as I've already mentioned the actor does not necessarily produce any value or does not necessarily store any value inside (e.g. logger() actor does not store any value, just prints in a terminal)

@oliviermilla
Copy link
Author

@bvdmitri thank you for that beautiful example! It's perfectly clear now.

For the rest, I'll stick to @wouterwln's pattern which I already use. Though I have the pattern a lot. :)

Thank you gentlemen!

@oliviermilla
Copy link
Author

To comment further for anyone coming after:

Here is what I need following @bvdmitri 's suggestion:

movingaverage(n) = scan(Vector{Union{Missing, Int}}, (val, vec) -> trail(vec, val), Vector{Union{Missing, Int}}(missing, n)) |> map(Union{Missing, Float64}, vec -> Statistics.mean(vec)) |> filter(v -> !ismissing(v))

function trail(vec::Vector, val::Int)
    circshift!(vec, -1)
    vec[end] = val
    return vec
end
movingaverage(n) = scan(Vector{Union{Missing, Int}}, (val, vec) -> trail(vec, val), Vector{Union{Missing, Int}}(missing, n)) |> map(Union{Missing, Float64}, vec -> Statistics.mean(vec)) |> filter(v -> !ismissing(v))
source = from(1:10) |> movingaverage(3)
subscribe!(source, logger())

[LogActor] Data: 2.0
[LogActor] Data: 3.0
[LogActor] Data: 4.0
[LogActor] Data: 5.0
[LogActor] Data: 6.0
[LogActor] Data: 7.0
[LogActor] Data: 8.0
[LogActor] Data: 9.0
[LogActor] Completed

Trying to remove the type unstability I came up with:

trail(n::Int) = TrailOperator(n)
struct TrailOperator <: InferableOperator
    length::Int
    function TrailOperator(length::Int)
        length > 0 || error("TrailOperator: $(length) must be positive to calculate a moving average.")
        return new(length)
    end
end
Rocket.operator_right(::TrailOperator, ::Type{L}) where {L} = Vector{Union{Missing, L}} #IndicatorType(SMAIndicator, length, L)

function trail(val::Int, vec::Vector)
    circshift!(vec, -1)
    vec[end] = val
    return vec
end
function Rocket.on_call!(::Type{L}, ::Type{R}, operator::TrailOperator, source) where {L,R}
    return proxy(R, source, Rocket.ScanProxy{L, R, Function}(trail, Vector{Union{Missing, L}}(missing, operator.length)))    
end

sma(n::Int) = trail(n) |> map(Union{Missing, Float64}, vec -> Statistics.mean(vec)) |> filter(v -> !ismissing(v))

[LogActor] Data: 2.0
[LogActor] Data: 3.0
[LogActor] Data: 4.0
[LogActor] Data: 5.0
[LogActor] Data: 6.0
[LogActor] Data: 7.0
[LogActor] Data: 8.0
[LogActor] Data: 9.0
[LogActor] Completed

Same thing, though leveraging on_call! and accessing ScanProxy already defined in Rocket.

Best! Thanks again.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants