From 978c1b1550c1c27c8064a45e1168f039f3bc7fdc Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Fri, 26 Apr 2024 16:06:41 +0200 Subject: [PATCH 1/2] Bump version, add new functions, change pause demand boundary API --- README.md | 2 +- lib/membrane/timestamp_queue.ex | 70 +++++++--- mix.exs | 2 +- test/membrane_timestamp_queue/unit_test.exs | 147 +++++++++++++------- 4 files changed, 150 insertions(+), 71 deletions(-) diff --git a/README.md b/README.md index e5f9b24..201e697 100644 --- a/README.md +++ b/README.md @@ -15,7 +15,7 @@ The package can be installed by adding `membrane_timestamp_queue` to your list o ```elixir def deps do [ - {:membrane_timestamp_queue, "~> 0.1.0"} + {:membrane_timestamp_queue, "~> 0.2.0"} ] end ``` diff --git a/lib/membrane/timestamp_queue.ex b/lib/membrane/timestamp_queue.ex index 648463a..0ed6248 100644 --- a/lib/membrane/timestamp_queue.ex +++ b/lib/membrane/timestamp_queue.ex @@ -37,7 +37,7 @@ defmodule Membrane.TimestampQueue do metric_unit: :buffers | :bytes | :time, pad_queues: %{optional(Pad.ref()) => pad_queue()}, pads_heap: Heap.t(), - blocking_registered_pads: MapSet.t(), + blocking_registered_pads: MapSet.t(Pad.ref()), registered_pads_offsets: %{optional(Pad.ref()) => integer()}, # :awaiting_pads contain at most one element at the time awaiting_pads: [Pad.ref()], @@ -45,7 +45,8 @@ defmodule Membrane.TimestampQueue do chunk_duration: nil | Membrane.Time.t(), chunk_full?: boolean(), next_chunk_boundary: nil | Membrane.Time.t(), - synchronization_strategy: :synchronize_on_arrival | :explicit_offsets + synchronization_strategy: :synchronize_on_arrival | :explicit_offsets, + known_pads: MapSet.t(Pad.ref()) } defstruct current_queue_time: Membrane.Time.seconds(0), @@ -60,18 +61,28 @@ defmodule Membrane.TimestampQueue do chunk_duration: nil, chunk_full?: false, next_chunk_boundary: nil, - synchronization_strategy: :synchronize_on_arrival + synchronization_strategy: :synchronize_on_arrival, + known_pads: MapSet.new() + + @typedoc """ + Value passed to `:pause_demand_boundary` option in `new/1`. + + Specyfies, what amount of buffers associated with a specific pad must be stored in the queue, to pause auto demand. + + Is a two-element tuple, which + - the first element specifies metric, in which boundary is expressed (default to `:buffers`) + - the second element is the boundary (default to `1000`). + """ + @type pause_demand_boundary :: + {:buffer | :bytes, pos_integer() | :infinity} | {:time, Membrane.Time.t()} @typedoc """ Options passed to `#{inspect(__MODULE__)}.new/1`. Following options are allowed: - - `:pause_demand_boundary` - positive integer, `t:Membrane.Time.t()` or `:infinity` (default to `:infinity`). Tells, - what amount of buffers associated with specific pad must be stored in the queue, to pause auto demand. - - `:pause_demand_boundary_unit` - `:buffers`, `:bytes` or `:time` (deafult to `:buffers`). Tells, in which metric - `:pause_demand_boundary` is specified. + - `:pause_demand_boundary` - `t:pause_demand_boundary()`. Default to `{:buffers, 1000}`. - `:chunk_duration` - `Membrane.Time.t()`. Specifies how long the fragments returned by - `#{inspect(__MODULE__)}.pop_chunked/1` will be approximately. + `#{inspect(__MODULE__)}.pop_chunked/1` will be approximately. If not set, popping chunks will not be available. - `:synchronization_strategy` - `:synchronize_on_arrival` or `:exact_timestamps` (default to `:synchronize_on_arrival`). Specyfies, how items from different pads will be synchronized with each other. If it is set to: * `:synchronize_on_arrival` - in the moment of the arrival of the first buffer from a specific pad, there will be @@ -84,7 +95,6 @@ defmodule Membrane.TimestampQueue do """ @type options :: [ pause_demand_boundary: pos_integer() | Membrane.Time.t() | :infinity, - pause_demand_boundary_unit: :buffers | :bytes | :time, chunk_duration: Membrane.Time.t(), synchronization_strategy: :synchronize_on_arrival | :explicit_offsets ] @@ -93,15 +103,13 @@ defmodule Membrane.TimestampQueue do def new(options \\ []) do [ chunk_duration: chunk_duration, - pause_demand_boundary: boundary, - pause_demand_boundary_unit: unit, + pause_demand_boundary: {unit, boundary}, synchronization_strategy: synchronization_strategy ] = options |> Keyword.validate!( chunk_duration: nil, - pause_demand_boundary: :infinity, - pause_demand_boundary_unit: :buffers, + pause_demand_boundary: {:buffers, 1000}, synchronization_strategy: :synchronize_on_arrival ) |> Enum.sort() @@ -159,9 +167,10 @@ defmodule Membrane.TimestampQueue do end timestamp_queue = - if wait?, - do: Map.update!(timestamp_queue, :blocking_registered_pads, &MapSet.put(&1, pad_ref)), - else: timestamp_queue + if(wait?, do: [:blocking_registered_pads, :known_pads], else: [:known_pads]) + |> Enum.reduce(timestamp_queue, fn field_name, timestamp_queue -> + Map.update!(timestamp_queue, field_name, &MapSet.put(&1, pad_ref)) + end) if offset != nil, do: put_in(timestamp_queue, [:registered_pads_offsets, pad_ref], offset), @@ -223,6 +232,7 @@ defmodule Membrane.TimestampQueue do other -> other end) + |> Map.update!(:known_pads, &MapSet.put(&1, pad_ref)) |> remove_pad_from_registered_and_awaiting_pads(pad_ref) |> push_pad_on_heap_if_qex_empty(pad_ref, -buff_time, pad_queue) |> push_item_on_qex(pad_ref, {:buffer, buffer}) @@ -320,6 +330,7 @@ defmodule Membrane.TimestampQueue do defp push_item(%__MODULE__{} = timestamp_queue, pad_ref, item) do timestamp_queue |> Map.update!(:pad_queues, &Map.put_new_lazy(&1, pad_ref, fn -> new_pad_queue() end)) + |> Map.update!(:known_pads, &MapSet.put(&1, pad_ref)) |> push_pad_on_heap_if_qex_empty(pad_ref, :infinity) |> push_item_on_qex(pad_ref, item) end @@ -585,7 +596,13 @@ defmodule Membrane.TimestampQueue do {actions_acc, items_acc, timestamp_queue} {:empty, _empty_qex} -> - {_pad_queue, timestamp_queue} = pop_in(timestamp_queue, [:pad_queues, pad_ref]) + {pad_queue, timestamp_queue} = pop_in(timestamp_queue, [:pad_queues, pad_ref]) + + timestamp_queue = + if pad_queue.end_of_stream?, + do: Map.update!(timestamp_queue, :known_pads, &MapSet.delete(&1, pad_ref)), + else: timestamp_queue + {actions_acc, items_acc, timestamp_queue} end end @@ -651,6 +668,25 @@ defmodule Membrane.TimestampQueue do |> pop_available_items() end + @doc """ + Returns true, if the pad has been registered in the queue or item from it has been pushed to the queue + and moreover, end of stream of this pad hasn't been popped from the queue. + """ + @spec has_pad?(t(), Pad.ref()) :: boolean() + def has_pad?(%__MODULE__{} = timestamp_queue, pad_ref) do + MapSet.member?(timestamp_queue.known_pads, pad_ref) + end + + @doc """ + Returns list of all pads, that: + 1) have been ever registered in the queue or item from them has been pushed to the queue + 2) their end of stream hasn't been popped from the queue. + """ + @spec pads(t()) :: [Pad.ref()] + def pads(%__MODULE__{} = timestamp_queue) do + MapSet.to_list(timestamp_queue.known_pads) + end + defp ensure_queue_not_closed!(%__MODULE__{closed?: true}, pad_ref, item) do inspected_item = case item do diff --git a/mix.exs b/mix.exs index 0750beb..391b517 100644 --- a/mix.exs +++ b/mix.exs @@ -1,7 +1,7 @@ defmodule Membrane.TimestampQueue.Mixfile do use Mix.Project - @version "0.1.0" + @version "0.2.0" @github_url "https://github.com/membraneframework/membrane_timestamp_queue" def project do diff --git a/test/membrane_timestamp_queue/unit_test.exs b/test/membrane_timestamp_queue/unit_test.exs index d14f81b..56c0a4e 100644 --- a/test/membrane_timestamp_queue/unit_test.exs +++ b/test/membrane_timestamp_queue/unit_test.exs @@ -30,7 +30,7 @@ defmodule Membrane.TimestampQueue.UnitTest do queue = input_order |> Enum.reduce(TimestampQueue.new(), fn i, queue -> - assert {[], queue} = + assert {_actions, queue} = TimestampQueue.push_buffer(queue, pad_generator.(i), %Buffer{ dts: 0, payload: <<>> @@ -42,14 +42,14 @@ defmodule Membrane.TimestampQueue.UnitTest do queue = input_order |> Enum.reduce(queue, fn i, queue -> - assert {[], queue} = + assert {_actions, queue} = TimestampQueue.push_buffer(queue, pad_generator.(i), buffer_generator.(i)) queue end) # assert that queue won't pop last buffer from pad queue, if it hasn't recevied EoS on this pad - assert {[], batch, queue} = TimestampQueue.pop_available_items(queue) + assert {_actions, batch, queue} = TimestampQueue.pop_available_items(queue) batch_length = length(batch) batch @@ -65,7 +65,7 @@ defmodule Membrane.TimestampQueue.UnitTest do TimestampQueue.push_end_of_stream(queue, pad_generator.(i)) end) - assert {[], batch, queue} = TimestampQueue.pop_available_items(queue) + assert {_actions, batch, queue} = TimestampQueue.pop_available_items(queue) # assert batch expected_batch = @@ -137,7 +137,7 @@ defmodule Membrane.TimestampQueue.UnitTest do queue = case apply(TimestampQueue, fun_name, [queue, pad_ref, item]) do # if buffer - {[], queue} -> queue + {_actions, queue} -> queue # if event or stream_format queue -> queue end @@ -153,7 +153,7 @@ defmodule Membrane.TimestampQueue.UnitTest do # sanity check, that test is written correctly assert %{} = pads_items - assert {[], batch, _queue} = TimestampQueue.pop_available_items(queue) + assert {_actions, batch, _queue} = TimestampQueue.pop_available_items(queue) assert length(batch) == pads_number * pad_items_number + pads_number batch_without_eos = Enum.reject(batch, &match?({_pad_ref, :end_of_stream}, &1)) @@ -168,19 +168,19 @@ defmodule Membrane.TimestampQueue.UnitTest do test "queue prioritizes stream formats and buffers not preceded by a buffer" do queue = TimestampQueue.new() - {[], queue} = TimestampQueue.push_buffer(queue, :a, %Buffer{dts: 1, payload: <<>>}) - {[], queue} = TimestampQueue.push_buffer(queue, :a, %Buffer{dts: 2, payload: <<>>}) + {_actions, queue} = TimestampQueue.push_buffer(queue, :a, %Buffer{dts: 1, payload: <<>>}) + {_actions, queue} = TimestampQueue.push_buffer(queue, :a, %Buffer{dts: 2, payload: <<>>}) expected_batch = for i <- [1, 2], do: {:a, {:buffer, %Buffer{dts: i, payload: <<>>}}} - assert {[], ^expected_batch, queue} = TimestampQueue.pop_available_items(queue) + assert {_actions, ^expected_batch, queue} = TimestampQueue.pop_available_items(queue) - {[], queue} = TimestampQueue.push_buffer(queue, :a, %Buffer{dts: 3, payload: <<>>}) + {_actions, queue} = TimestampQueue.push_buffer(queue, :a, %Buffer{dts: 3, payload: <<>>}) queue = TimestampQueue.push_end_of_stream(queue, :a) queue = TimestampQueue.push_stream_format(queue, :b, %StreamFormat{}) queue = TimestampQueue.push_event(queue, :b, %Event{}) - assert {[], batch, queue} = TimestampQueue.pop_available_items(queue) + assert {_actions, batch, queue} = TimestampQueue.pop_available_items(queue) assert batch == [ b: {:stream_format, %StreamFormat{}}, @@ -189,7 +189,7 @@ defmodule Membrane.TimestampQueue.UnitTest do a: :end_of_stream ] - assert {[], [], ^queue} = TimestampQueue.pop_available_items(queue) + assert {_actions, [], ^queue} = TimestampQueue.pop_available_items(queue) end [ @@ -205,10 +205,7 @@ defmodule Membrane.TimestampQueue.UnitTest do boundary = buffer_size * boundary_in_buff_no queue = - TimestampQueue.new( - pause_demand_boundary: boundary, - pause_demand_boundary_unit: unit - ) + TimestampQueue.new(pause_demand_boundary: {unit, boundary}) Enum.reduce(1..10, queue, fn _iteration, queue -> queue = @@ -241,11 +238,7 @@ defmodule Membrane.TimestampQueue.UnitTest do end) test "queue returns proper suggested actions when boundary unit is :time" do - queue = - TimestampQueue.new( - pause_demand_boundary: 100, - pause_demand_boundary_unit: :time - ) + queue = TimestampQueue.new(pause_demand_boundary: {:time, 100}) Enum.reduce(1..10, queue, fn iteration, queue -> pts_offset = iteration * 100_000 @@ -295,7 +288,7 @@ defmodule Membrane.TimestampQueue.UnitTest do %Buffer{payload: <<>>} |> Map.put(new_pad_timestamp_field, 0) - {[], queue} = TimestampQueue.push_buffer(queue, new_pad, buffer) + {_actions, queue} = TimestampQueue.push_buffer(queue, new_pad, buffer) queue = pads @@ -310,12 +303,12 @@ defmodule Membrane.TimestampQueue.UnitTest do %Buffer{payload: <<>>} |> Map.put(timestamp_field, timestamp) - {[], queue} = TimestampQueue.push_buffer(queue, pad_ref, buffer) + {_actions, queue} = TimestampQueue.push_buffer(queue, pad_ref, buffer) queue end) end) - {[], batch, queue} = TimestampQueue.pop_available_items(queue) + {_actions, batch, queue} = TimestampQueue.pop_available_items(queue) sorted_batch = batch @@ -359,25 +352,25 @@ defmodule Membrane.TimestampQueue.UnitTest do queue = buffers |> Enum.reduce(queue, fn buffer, queue -> - {[], queue} = TimestampQueue.push_buffer(queue, :a, buffer) + {_actions, queue} = TimestampQueue.push_buffer(queue, :a, buffer) queue end) - {[], batch, queue} = TimestampQueue.pop_available_items(queue) + {_actions, batch, queue} = TimestampQueue.pop_available_items(queue) grouped_batch = Enum.group_by(batch, &elem(&1, 0), &(elem(&1, 1) |> elem(1))) assert grouped_batch == %{a: events, b: events} - assert {[], [], queue} = TimestampQueue.pop_available_items(queue) + assert {_actions, [], queue} = TimestampQueue.pop_available_items(queue) queue = buffers |> Enum.reduce(queue, fn buffer, queue -> - {[], queue} = TimestampQueue.push_buffer(queue, :b, buffer) + {_actions, queue} = TimestampQueue.push_buffer(queue, :b, buffer) queue end) - {[], batch, _queue} = TimestampQueue.pop_available_items(queue) + {_actions, batch, _queue} = TimestampQueue.pop_available_items(queue) sorted_batch = Enum.sort_by(batch, fn {_pad_ref, {:buffer, buffer}} -> buffer.dts end) assert batch == sorted_batch @@ -393,7 +386,7 @@ defmodule Membrane.TimestampQueue.UnitTest do test "queue doesn't wait on buffers from pad registered with option wait_on_buffers?: false" do buffer = %Buffer{dts: 0, payload: ""} - assert {[], [a: {:buffer, ^buffer}], _queue} = + assert {_actions, [a: {:buffer, ^buffer}], _queue} = TimestampQueue.new() |> TimestampQueue.register_pad(:b, wait_on_buffers?: false) |> TimestampQueue.push_buffer_and_pop_available_items(:a, buffer) @@ -414,7 +407,7 @@ defmodule Membrane.TimestampQueue.UnitTest do pad_ref = Enum.random([:a, :b, :c, :d]) buffer = %Buffer{pts: i, payload: ""} - {[], queue} = TimestampQueue.push_buffer(queue, pad_ref, buffer) + {_actions, queue} = TimestampQueue.push_buffer(queue, pad_ref, buffer) {{pad_ref, {:buffer, buffer}}, queue} end) @@ -424,7 +417,7 @@ defmodule Membrane.TimestampQueue.UnitTest do buffer.pts - offsets[pad_ref] end) - {[], given_batch, _queue} = TimestampQueue.flush_and_close(queue) + {_actions, given_batch, _queue} = TimestampQueue.flush_and_close(queue) assert given_batch == expected_batch end @@ -432,10 +425,10 @@ defmodule Membrane.TimestampQueue.UnitTest do test "queue returns events and stream formats, even if it cannot return next buffer" do queue = TimestampQueue.new() - {[], queue} = TimestampQueue.push_buffer(queue, :a, %Buffer{dts: 0, payload: ""}) - {[], queue} = TimestampQueue.push_buffer(queue, :b, %Buffer{dts: 0, payload: ""}) + {_actions, queue} = TimestampQueue.push_buffer(queue, :a, %Buffer{dts: 0, payload: ""}) + {_actions, queue} = TimestampQueue.push_buffer(queue, :b, %Buffer{dts: 0, payload: ""}) - {[], batch, queue} = TimestampQueue.pop_available_items(queue) + {_actions, batch, queue} = TimestampQueue.pop_available_items(queue) assert [{pad_ref, {:buffer, _buffer}}] = batch queue = @@ -443,7 +436,7 @@ defmodule Membrane.TimestampQueue.UnitTest do TimestampQueue.push_event(queue, pad_ref, %Event{}) end) - {[], batch, queue} = TimestampQueue.pop_available_items(queue) + {_actions, batch, queue} = TimestampQueue.pop_available_items(queue) assert batch == for(_i <- 1..10, do: {pad_ref, {:event, %Event{}}}) @@ -455,13 +448,13 @@ defmodule Membrane.TimestampQueue.UnitTest do queue = Enum.reduce(buffers, queue, fn {pad, buffer}, queue -> - {[], queue} = TimestampQueue.push_buffer(queue, pad, buffer) + {_actions, queue} = TimestampQueue.push_buffer(queue, pad, buffer) queue end) |> TimestampQueue.push_end_of_stream(:a) |> TimestampQueue.push_end_of_stream(:b) - {[], batch, _queue} = TimestampQueue.pop_available_items(queue) + {_actions, batch, _queue} = TimestampQueue.pop_available_items(queue) expected_batch = Enum.map(0..9, &{opposite_pad_ref, {:buffer, %Buffer{dts: &1, payload: ""}}}) @@ -481,7 +474,9 @@ defmodule Membrane.TimestampQueue.UnitTest do TimestampQueue.push_stream_format(queue, pad_ref, %StreamFormat{}) end, fn queue, pad_ref, i -> - {[], queue} = TimestampQueue.push_buffer(queue, pad_ref, %Buffer{dts: i, payload: ""}) + {_actions, queue} = + TimestampQueue.push_buffer(queue, pad_ref, %Buffer{dts: i, payload: ""}) + queue end ] @@ -493,9 +488,9 @@ defmodule Membrane.TimestampQueue.UnitTest do Enum.random(push_functions) |> apply([queue, pad_ref, i]) end) - {[], flush_batch, closed_queue} = TimestampQueue.flush_and_close(full_queue) + {_actions, flush_batch, closed_queue} = TimestampQueue.flush_and_close(full_queue) - {[], pop_available_items, popped_queue} = + {_actions, pop_available_items, popped_queue} = Enum.reduce(1..100, full_queue, fn i, queue -> TimestampQueue.push_end_of_stream(queue, Pad.ref(:input, i)) end) @@ -530,10 +525,10 @@ defmodule Membrane.TimestampQueue.UnitTest do 1..upperbound//step |> Enum.map(&%Buffer{pts: &1, payload: ""}) - {[], queue} = + {_actions, queue} = buffers |> Enum.reduce(queue, fn buffer, queue -> - {[], queue} = TimestampQueue.push_buffer(queue, :input, buffer) + {_actions, queue} = TimestampQueue.push_buffer(queue, :input, buffer) queue end) |> TimestampQueue.push_buffer(:input, %Buffer{ @@ -541,7 +536,7 @@ defmodule Membrane.TimestampQueue.UnitTest do payload: "" }) - {[], given_chunks, _queue} = TimestampQueue.pop_chunked(queue) + {_actions, given_chunks, _queue} = TimestampQueue.pop_chunked(queue) expected_chunks = buffers @@ -563,9 +558,9 @@ defmodule Membrane.TimestampQueue.UnitTest do 1..100 |> Enum.reduce(queue, fn i, queue -> pad_ref = Pad.ref(:input, i) - {[], queue} = TimestampQueue.push_buffer(queue, pad_ref, zero_buffer) + {_actions, queue} = TimestampQueue.push_buffer(queue, pad_ref, zero_buffer) - {[], queue} = + {_actions, queue} = TimestampQueue.push_buffer(queue, pad_ref, %Buffer{ dts: Membrane.Time.seconds(i), payload: "" @@ -575,7 +570,7 @@ defmodule Membrane.TimestampQueue.UnitTest do end) |> TimestampQueue.push_end_of_stream(Pad.ref(:input, 1)) - {[], first_batch, queue} = TimestampQueue.pop_chunked(queue) + {_actions, first_batch, queue} = TimestampQueue.pop_chunked(queue) assert [first_chunk, second_chunk] = first_batch @@ -595,7 +590,7 @@ defmodule Membrane.TimestampQueue.UnitTest do 2..100 |> Enum.reduce(queue, &TimestampQueue.push_end_of_stream(&2, Pad.ref(:input, &1))) - {[], second_batch, _queue} = TimestampQueue.pop_chunked(queue) + {_actions, second_batch, _queue} = TimestampQueue.pop_chunked(queue) expected_second_batch = Enum.map(2..100, fn i -> @@ -611,8 +606,7 @@ defmodule Membrane.TimestampQueue.UnitTest do test "push_buffer_and_pop_* functions work as composition of push and pop functions" do queue = TimestampQueue.new( - pause_demand_boundary_unit: :buffers, - pause_demand_boundary: 100, + pause_demand_boundary: {:buffers, 100}, chunk_duration: Membrane.Time.milliseconds(10) ) @@ -661,8 +655,7 @@ defmodule Membrane.TimestampQueue.UnitTest do test "push_buffer_and_pop_* functions don't return pause and resume demand actions for the same pad" do queue = TimestampQueue.new( - pause_demand_boundary_unit: :buffers, - pause_demand_boundary: 100, + pause_demand_boundary: {:buffers, 100}, chunk_duration: Membrane.Time.milliseconds(10) ) @@ -689,4 +682,54 @@ defmodule Membrane.TimestampQueue.UnitTest do assert items == expected_items end) end + + test "has_pad?/2 and pads/1 work correctly" do + queue = TimestampQueue.new() + + queue = + Enum.reduce([:b, :f], queue, fn pad_ref, queue -> + TimestampQueue.register_pad(queue, pad_ref) + end) + + queue = + Enum.reduce([:a, :b], queue, fn pad_ref, queue -> + TimestampQueue.push_event(queue, pad_ref, %Event{}) + end) + + queue = + Enum.reduce([:c, :d, :h], queue, fn pad_ref, queue -> + buffer = %Buffer{dts: 0, payload: ""} + {_actions, queue} = TimestampQueue.push_buffer(queue, pad_ref, buffer) + queue + end) + + queue = + Enum.reduce([:c, :h], queue, fn pad_ref, queue -> + buffer = %Buffer{dts: 10, payload: ""} + {_actions, queue} = TimestampQueue.push_buffer(queue, pad_ref, buffer) + queue + end) + + queue = + Enum.reduce([:b, :d, :f, :g], queue, fn pad_ref, queue -> + TimestampQueue.push_end_of_stream(queue, pad_ref) + end) + + {_actions, _items, queue} = TimestampQueue.pop_available_items(queue) + + queue = TimestampQueue.register_pad(queue, :e) + + pads_in_queue = [:a, :c, :e, :h] + pads_beyond_queue = [:b, :d, :f, :g] + + assert TimestampQueue.pads(queue) |> MapSet.new() == MapSet.new(pads_in_queue) + + for pad_ref <- pads_in_queue do + assert TimestampQueue.has_pad?(queue, pad_ref) + end + + for pad_ref <- pads_beyond_queue do + assert not TimestampQueue.has_pad?(queue, pad_ref) + end + end end From aa26c2eedabc03e15d68f09c764f908cd5a3cc6d Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Mon, 6 May 2024 15:43:43 +0200 Subject: [PATCH 2/2] Implement suggestion from CR --- lib/membrane/timestamp_queue.ex | 4 ++-- test/membrane_timestamp_queue/unit_test.exs | 11 ++++++----- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/lib/membrane/timestamp_queue.ex b/lib/membrane/timestamp_queue.ex index 0ed6248..ce68062 100644 --- a/lib/membrane/timestamp_queue.ex +++ b/lib/membrane/timestamp_queue.ex @@ -80,8 +80,8 @@ defmodule Membrane.TimestampQueue do Options passed to `#{inspect(__MODULE__)}.new/1`. Following options are allowed: - - `:pause_demand_boundary` - `t:pause_demand_boundary()`. Default to `{:buffers, 1000}`. - - `:chunk_duration` - `Membrane.Time.t()`. Specifies how long the fragments returned by + - `:pause_demand_boundary` - `t:pause_demand_boundary/0`. Defaults to `{:buffers, 1000}`. + - `:chunk_duration` - `t:Membrane.Time.t/0`. Specifies how long the fragments returned by `#{inspect(__MODULE__)}.pop_chunked/1` will be approximately. If not set, popping chunks will not be available. - `:synchronization_strategy` - `:synchronize_on_arrival` or `:exact_timestamps` (default to `:synchronize_on_arrival`). Specyfies, how items from different pads will be synchronized with each other. If it is set to: diff --git a/test/membrane_timestamp_queue/unit_test.exs b/test/membrane_timestamp_queue/unit_test.exs index 56c0a4e..88dfadc 100644 --- a/test/membrane_timestamp_queue/unit_test.exs +++ b/test/membrane_timestamp_queue/unit_test.exs @@ -377,7 +377,7 @@ defmodule Membrane.TimestampQueue.UnitTest do grouped_batch = Enum.group_by(batch, &elem(&1, 0), &(elem(&1, 1) |> elem(1))) - assert grouped_batch |> Map.keys() |> MapSet.new() == MapSet.new([:a, :b]) + assert grouped_batch |> Map.keys() |> Enum.sort() == [:a, :b] assert grouped_batch |> Map.values() |> MapSet.new() == MapSet.new([buffers, List.delete_at(buffers, 999)]) @@ -574,10 +574,11 @@ defmodule Membrane.TimestampQueue.UnitTest do assert [first_chunk, second_chunk] = first_batch - expected_first_chunk_data = - MapSet.new(1..100, &{Pad.ref(:input, &1), {:buffer, zero_buffer}}) + sorted_expected_first_chunk = + Enum.map(1..100, &{Pad.ref(:input, &1), {:buffer, zero_buffer}}) + |> Enum.sort() - assert MapSet.new(first_chunk) == expected_first_chunk_data + assert Enum.sort(first_chunk) == sorted_expected_first_chunk expected_second_chunk = [ {Pad.ref(:input, 1), {:buffer, %Buffer{dts: Membrane.Time.second(), payload: ""}}}, @@ -722,7 +723,7 @@ defmodule Membrane.TimestampQueue.UnitTest do pads_in_queue = [:a, :c, :e, :h] pads_beyond_queue = [:b, :d, :f, :g] - assert TimestampQueue.pads(queue) |> MapSet.new() == MapSet.new(pads_in_queue) + assert TimestampQueue.pads(queue) |> Enum.sort() == pads_in_queue for pad_ref <- pads_in_queue do assert TimestampQueue.has_pad?(queue, pad_ref)