diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index ac129fb79..d5d12b56d 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -22,6 +22,7 @@ jobs: - '1.5' - '1.6' - '1.7' + - '1.8' - 'nightly' os: - ubuntu-latest @@ -57,7 +58,7 @@ jobs: - uses: actions/checkout@v2 - uses: julia-actions/setup-julia@v1 with: - version: '1.7' + version: '1.8' - run: | julia --project=docs -e ' using Pkg diff --git a/Project.toml b/Project.toml index adb35a226..234842216 100644 --- a/Project.toml +++ b/Project.toml @@ -1,7 +1,7 @@ name = "Rocket" uuid = "df971d30-c9d6-4b37-b8ff-e965b2cb3a40" authors = ["Dmitri Bagaev "] -version = "1.4.0" +version = "1.5.0" [deps] DataStructures = "864edb3b-99cc-5e75-8d2d-829cb0a9cfe8" diff --git a/docs/make.jl b/docs/make.jl index 6735874fc..173bdc061 100644 --- a/docs/make.jl +++ b/docs/make.jl @@ -39,12 +39,14 @@ makedocs( "Network" => "observables/types/network.md", "Defer" => "observables/types/defer.md", "Zipped" => "observables/types/zipped.md", + "Labeled" => "observables/types/labeled.md", ], "Actors" => [ "Lambda" => "actors/types/lambda.md", "Logger" => "actors/types/logger.md", "Sync" => "actors/types/sync.md", "Keep" => "actors/types/keep.md", + "CircularKeep" => "actors/types/circularkeep.md", "Buffer" => "actors/types/buffer.md", "Void" => "actors/types/void.md", "Function" => "actors/types/function.md", diff --git a/docs/src/actors/types/circularkeep.md b/docs/src/actors/types/circularkeep.md new file mode 100644 index 000000000..05171b16b --- /dev/null +++ b/docs/src/actors/types/circularkeep.md @@ -0,0 +1,9 @@ +# [CircularKeep actor](@id actor_circularkeep) + +```@docs +circularkeep +``` + +```@docs +CircularKeepActor +``` diff --git a/docs/src/observables/types/labeled.md b/docs/src/observables/types/labeled.md new file mode 100644 index 000000000..31cc70059 --- /dev/null +++ b/docs/src/observables/types/labeled.md @@ -0,0 +1,9 @@ +# [Labeled Observable](@id observable_labeled) + +```@docs +labeled +``` + +```@docs +LabeledObservable +``` diff --git a/src/Rocket.jl b/src/Rocket.jl index 1943d474a..c79f59ebd 100644 --- a/src/Rocket.jl +++ b/src/Rocket.jl @@ -28,6 +28,7 @@ include("actor/logger.jl") include("actor/void.jl") include("actor/sync.jl") include("actor/keep.jl") +include("actor/circularkeep.jl") include("actor/buffer.jl") include("actor/server.jl") include("actor/storage.jl") @@ -46,6 +47,7 @@ include("subjects/recent.jl") @generate_subscribe! RecentSubjectInstance AbstractSubject include("observable/generate.jl") +include("observable/labeled.jl") include("observable/single.jl") include("observable/array.jl") include("observable/iterable.jl") diff --git a/src/actor/circularkeep.jl b/src/actor/circularkeep.jl new file mode 100644 index 000000000..d9df35739 --- /dev/null +++ b/src/actor/circularkeep.jl @@ -0,0 +1,83 @@ +export CircularKeepActor, circularkeep, getvalues + +import DataStructures: CircularBuffer + +""" + CirucalKeepActor{D}() where D + +Circual keep actor is similar to keep actor, but uses `CircularBuffer` as a storage. +It saves all incoming successful `next` events in a `values` circular buffer, throws an ErrorException on `error!` event and does nothing on completion event. + +# Examples +```jldoctest +using Rocket + +source = from(1:5) +actor = circularkeep(Int, 3) + +subscribe!(source, actor) +show(getvalues(actor)) + +# output +[3, 4, 5] +``` + +See also: [`Actor`](@ref), [`keep`](@ref), [`circularkeep`](@ref) +""" +struct CircularKeepActor{T} <: Actor{T} + values :: CircularBuffer{T} + + CircularKeepActor{T}(capacity::Int) where T = new{T}(CircularBuffer{T}(capacity)) +end + +getvalues(actor::CircularKeepActor) = actor.values + +on_next!(actor::CircularKeepActor{T}, data::T) where T = push!(actor.values, data) +on_error!(actor::CircularKeepActor, err) = error(err) +on_complete!(actor::CircularKeepActor) = begin end + +""" + circularkeep(::Type{T}, capacity::Int) where T + +# Arguments +- `::Type{T}`: Type of keep data +- `capacity::Int`: circular buffer capacity + +Creation operator for the `CircularKeepActor` actor. + +# Examples + +```jldoctest +using Rocket + +actor = circularkeep(Int, 3) +actor isa CircularKeepActor{Int} + +# output +true +``` + +See also: [`CircularKeepActor`](@ref), [`AbstractActor`](@ref) +""" +circularkeep(::Type{T}, capacity::Int) where T = CircularKeepActor{T}(capacity) + +# Julia iterable interface + +Base.IteratorSize(::Type{ <: CircularKeepActor }) = Base.HasLength() +Base.IteratorEltype(::Type{ <: CircularKeepActor }) = Base.HasEltype() + +Base.IndexStyle(::Type{ <: CircularKeepActor }) = Base.IndexLinear() + +Base.eltype(::Type{ <: CircularKeepActor{T} }) where T = T + +Base.iterate(actor::CircularKeepActor) = iterate(actor.values) +Base.iterate(actor::CircularKeepActor, state) = iterate(actor.values, state) + +Base.size(actor::CircularKeepActor) = (length(actor.values), ) +Base.length(actor::CircularKeepActor) = length(actor.values) +Base.getindex(actor::CircularKeepActor, I) = Base.getindex(actor.values, I) + +Base.getindex(actor::CircularKeepActor, ::Unrolled.FixedRange{A, B}) where { A, B } = getindex(actor, A:B) + +Base.firstindex(actor::CircularKeepActor) = firstindex(actor.values) +Base.lastindex(actor::CircularKeepActor) = lastindex(actor.values) diff --git a/src/actor/test.jl b/src/actor/test.jl index 1845d8b96..a9096d534 100644 --- a/src/actor/test.jl +++ b/src/actor/test.jl @@ -419,7 +419,13 @@ macro ts(expr) elseif arg.head === :vect push!(values, collect(arg.args)) elseif arg.head === :tuple - push!(values, (arg.args..., )) + if all(expr -> expr isa Expr && expr.head == :(=), arg.args) # NamedTuple case + names = map(d -> d.args[1], arg.args) + items = map(d -> d.args[2], arg.args) + push!(values, NamedTuple{tuple(names...)}(items)) + else # regular tuple case + push!(values, (arg.args..., )) + end elseif arg.head === :call if arg.args[1] === :e push!(tests, TestActorErrorTest(length(arg.args) === 2 ? arg.args[2] : nothing)) diff --git a/src/observable/labeled.jl b/src/observable/labeled.jl new file mode 100644 index 000000000..6074d7eeb --- /dev/null +++ b/src/observable/labeled.jl @@ -0,0 +1,59 @@ +export LabeledObservable, labeled + +import Base: show + +""" + labeled(names::Val, stream) + +Creation operator for the `LabeledObservable` that wraps given `stream`, that produces `Tuple` values into a `NamedTuple` with given `names`. + +# Arguments +- `names`: a `Val` object that contains a tuple of symbols +- `stream`: an observable that emits a `Tuple`, length of the `Tuple` events must be equal to the length of the `names` argument + +# Examples + +```jldoctest +using Rocket + +source = labeled(Val((:x, :y)), from([ (1, 2), (2, 3), (3, 4) ])) +subscribe!(source, logger()) +; + +# output + +[LogActor] Data: (x = 1, y = 2) +[LogActor] Data: (x = 2, y = 3) +[LogActor] Data: (x = 3, y = 4) +[LogActor] Completed +``` + +See also: [`ScheduledSubscribable`](@ref), [`subscribe!`](@ref), [`from`](@ref) +""" +labeled(::Val{Names}, stream::S) where { Names, S } = labeled(eltype(S), Val(Names), stream) +labeled(::Type{D}, ::Val{Names}, stream::S) where { D, Names, S } = LabeledObservable{NamedTuple{Names, D}, S}(stream) + +""" + LabeledObservable{D, S}() + +An Observable that emits `NamesTuple` items from a source `Observable` that emits `Tuple` items. + +See also: [`Subscribable`](@ref), [`labeled`](@ref) +""" +@subscribable struct LabeledObservable{D, S} <: Subscribable{D} + stream :: S +end + +struct LabeledActor{T, N, A} <: Actor{T} + actor :: A +end + +@inline on_next!(actor::LabeledActor{T, N}, data::T) where { T, N } = next!(actor.actor, NamedTuple{N, T}(data)) +@inline on_error!(actor::LabeledActor, err) = error!(actor.actor, err) +@inline on_complete!(actor::LabeledActor) = complete!(actor.actor) + +function on_subscribe!(observable::LabeledObservable{R}, actor::A) where { N, T, R <: NamedTuple{N, T}, A } + return subscribe!(observable.stream, LabeledActor{T, N, A}(actor)) +end + +Base.show(io::IO, ::LabeledObservable{D, S}) where { D, S } = print(io, "LabeledObservable($D, $S)") diff --git a/test/actor/test_circularkeep_actor.jl b/test/actor/test_circularkeep_actor.jl new file mode 100644 index 000000000..d65008b9b --- /dev/null +++ b/test/actor/test_circularkeep_actor.jl @@ -0,0 +1,81 @@ +module RocketKeepActorTest + +using Test +using Rocket +using DataStructures + +@testset "CircularKeepActor" begin + + println("Testing: actor CircularKeepActor") + + @testset begin + source = from([ 1, 2, 3 ]) + actor = CircularKeepActor{Int}(2) + + subscribe!(source, actor) + + @test actor.values == [ 2, 3 ] + @test getvalues(actor) == [ 2, 3 ] + end + + @testset begin + source = from(1:100) + actor = CircularKeepActor{Int}(2) + + subscribe!(source, actor) + + @test actor.values == [ 99, 100 ] + @test getvalues(actor) == [ 99, 100 ] + @test length(getvalues(actor)) == 2 + end + + @testset begin + source = from([ 1, 2, 3 ]) + actor = CircularKeepActor{Int}(2) + + subscribe!(source, actor) + + @test actor[1] === 2 + @test actor[2] === 3 + + @test collect(actor) == [ 2, 3 ] + end + + @testset begin + source = from([ 1, 2, 3 ]) + actor = CircularKeepActor{Int}(2) + + subscribe!(source, actor) + + i = 2 + for item in actor + @test item === i + i += 1 + end + end + + @testset begin + source = from([ 1, 2, 3 ]) + actor = CircularKeepActor{Int}(2) + + subscribe!(source, actor) + + @test actor[1:end] == [ 2, 3 ] + end + + @testset begin + source = faulted(Int, "Error") + actor = CircularKeepActor{Int}(2) + + @test_throws ErrorException subscribe!(source, actor) + @test actor.values == [] + @test getvalues(actor) == [] + end + + @testset begin + @test circularkeep(Int, 3) isa CircularKeepActor{Int} + @test capacity(getvalues(circularkeep(Int, 3))) === 3 + end +end + +end diff --git a/test/observable/test_observable_labeled.jl b/test/observable/test_observable_labeled.jl new file mode 100644 index 000000000..31fab73ed --- /dev/null +++ b/test/observable/test_observable_labeled.jl @@ -0,0 +1,42 @@ +module RocketLabeledObservableTest + +using Test +using Rocket + +include("../test_helpers.jl") + +@testset "LabeledObservable" begin + + @testset begin + @test labeled(Val((:x, )), from([ (1, ), (2, ), (3, ) ])) isa LabeledObservable{NamedTuple{(:x, ), Tuple{Int}}} + @test labeled(Val((:x, :y)), from([ (1, 1.0), (2, 2.0), (3, 3.0) ])) isa LabeledObservable{NamedTuple{(:x, :y), Tuple{Int, Float64}}} + end + + @testset begin + source = labeled(Val((:x, )), from([ (1, ), (2, ), (3, ) ])) + io = IOBuffer() + + show(io, source) + + printed = String(take!(io)) + + @test occursin("LabeledObservable", printed) + @test occursin(string(eltype(source)), printed) + end + + run_testset([ + ( + source = labeled(Val((:x, )), from([ (1, ), (2, ), (3, ) ])), + values = @ts([ (x = 1, ), (x = 2, ), (x = 3, ), c ]), + source_type = NamedTuple{(:x, ), Tuple{Int}} + ), + ( + source = labeled(Val((:x, :y)), from([ (1, 2.0), (2, 3.0), (3, 4.0) ])), + values = @ts([ (x = 1, y = 2.0), (x = 2, y = 3.0), (x = 3, y = 4.0), c ]), + source_type = NamedTuple{(:x, :y), Tuple{Int, Float64}} + ), + ]) + +end + +end diff --git a/test/runtests.jl b/test/runtests.jl index 2b136bda6..1d75ed75f 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -18,6 +18,7 @@ doctest(Rocket) include("./actor/test_lambda_actor.jl") include("./actor/test_logger_actor.jl") include("./actor/test_keep_actor.jl") + include("./actor/test_circularkeep_actor.jl") include("./actor/test_buffer_actor.jl") include("./actor/test_sync_actor.jl") include("./actor/test_function_actor.jl") @@ -25,6 +26,7 @@ doctest(Rocket) include("./test_subscribable.jl") include("./observable/test_observable_function.jl") + include("./observable/test_observable_labeled.jl") include("./observable/test_observable_single.jl") include("./observable/test_observable_array.jl") include("./observable/test_observable_iterable.jl")