Skip to content

Commit

Permalink
Merge pull request #6 from membraneframework/release-v0.2.0
Browse files Browse the repository at this point in the history
Bump version, add new functions, change pause demand boundary API
  • Loading branch information
FelonEkonom authored May 6, 2024
2 parents 98baf76 + aa26c2e commit f47cc94
Show file tree
Hide file tree
Showing 4 changed files with 156 additions and 76 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ The package can be installed by adding `membrane_timestamp_queue` to your list o
```elixir
def deps do
[
{:membrane_timestamp_queue, "~> 0.1.0"}
{:membrane_timestamp_queue, "~> 0.2.0"}
]
end
```
Expand Down
72 changes: 54 additions & 18 deletions lib/membrane/timestamp_queue.ex
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,16 @@ defmodule Membrane.TimestampQueue do
metric_unit: :buffers | :bytes | :time,
pad_queues: %{optional(Pad.ref()) => pad_queue()},
pads_heap: Heap.t(),
blocking_registered_pads: MapSet.t(),
blocking_registered_pads: MapSet.t(Pad.ref()),
registered_pads_offsets: %{optional(Pad.ref()) => integer()},
# :awaiting_pads contain at most one element at the time
awaiting_pads: [Pad.ref()],
closed?: boolean(),
chunk_duration: nil | Membrane.Time.t(),
chunk_full?: boolean(),
next_chunk_boundary: nil | Membrane.Time.t(),
synchronization_strategy: :synchronize_on_arrival | :explicit_offsets
synchronization_strategy: :synchronize_on_arrival | :explicit_offsets,
known_pads: MapSet.t(Pad.ref())
}

defstruct current_queue_time: Membrane.Time.seconds(0),
Expand All @@ -60,18 +61,28 @@ defmodule Membrane.TimestampQueue do
chunk_duration: nil,
chunk_full?: false,
next_chunk_boundary: nil,
synchronization_strategy: :synchronize_on_arrival
synchronization_strategy: :synchronize_on_arrival,
known_pads: MapSet.new()

@typedoc """
Value passed to `:pause_demand_boundary` option in `new/1`.
Specyfies, what amount of buffers associated with a specific pad must be stored in the queue, to pause auto demand.
Is a two-element tuple, which
- the first element specifies metric, in which boundary is expressed (default to `:buffers`)
- the second element is the boundary (default to `1000`).
"""
@type pause_demand_boundary ::
{:buffer | :bytes, pos_integer() | :infinity} | {:time, Membrane.Time.t()}

@typedoc """
Options passed to `#{inspect(__MODULE__)}.new/1`.
Following options are allowed:
- `:pause_demand_boundary` - positive integer, `t:Membrane.Time.t()` or `:infinity` (default to `:infinity`). Tells,
what amount of buffers associated with specific pad must be stored in the queue, to pause auto demand.
- `:pause_demand_boundary_unit` - `:buffers`, `:bytes` or `:time` (deafult to `:buffers`). Tells, in which metric
`:pause_demand_boundary` is specified.
- `:chunk_duration` - `Membrane.Time.t()`. Specifies how long the fragments returned by
`#{inspect(__MODULE__)}.pop_chunked/1` will be approximately.
- `:pause_demand_boundary` - `t:pause_demand_boundary/0`. Defaults to `{:buffers, 1000}`.
- `:chunk_duration` - `t:Membrane.Time.t/0`. Specifies how long the fragments returned by
`#{inspect(__MODULE__)}.pop_chunked/1` will be approximately. If not set, popping chunks will not be available.
- `:synchronization_strategy` - `:synchronize_on_arrival` or `:exact_timestamps` (default to `:synchronize_on_arrival`).
Specyfies, how items from different pads will be synchronized with each other. If it is set to:
* `:synchronize_on_arrival` - in the moment of the arrival of the first buffer from a specific pad, there will be
Expand All @@ -84,7 +95,6 @@ defmodule Membrane.TimestampQueue do
"""
@type options :: [
pause_demand_boundary: pos_integer() | Membrane.Time.t() | :infinity,
pause_demand_boundary_unit: :buffers | :bytes | :time,
chunk_duration: Membrane.Time.t(),
synchronization_strategy: :synchronize_on_arrival | :explicit_offsets
]
Expand All @@ -93,15 +103,13 @@ defmodule Membrane.TimestampQueue do
def new(options \\ []) do
[
chunk_duration: chunk_duration,
pause_demand_boundary: boundary,
pause_demand_boundary_unit: unit,
pause_demand_boundary: {unit, boundary},
synchronization_strategy: synchronization_strategy
] =
options
|> Keyword.validate!(
chunk_duration: nil,
pause_demand_boundary: :infinity,
pause_demand_boundary_unit: :buffers,
pause_demand_boundary: {:buffers, 1000},
synchronization_strategy: :synchronize_on_arrival
)
|> Enum.sort()
Expand Down Expand Up @@ -159,9 +167,10 @@ defmodule Membrane.TimestampQueue do
end

timestamp_queue =
if wait?,
do: Map.update!(timestamp_queue, :blocking_registered_pads, &MapSet.put(&1, pad_ref)),
else: timestamp_queue
if(wait?, do: [:blocking_registered_pads, :known_pads], else: [:known_pads])
|> Enum.reduce(timestamp_queue, fn field_name, timestamp_queue ->
Map.update!(timestamp_queue, field_name, &MapSet.put(&1, pad_ref))
end)

if offset != nil,
do: put_in(timestamp_queue, [:registered_pads_offsets, pad_ref], offset),
Expand Down Expand Up @@ -223,6 +232,7 @@ defmodule Membrane.TimestampQueue do
other ->
other
end)
|> Map.update!(:known_pads, &MapSet.put(&1, pad_ref))
|> remove_pad_from_registered_and_awaiting_pads(pad_ref)
|> push_pad_on_heap_if_qex_empty(pad_ref, -buff_time, pad_queue)
|> push_item_on_qex(pad_ref, {:buffer, buffer})
Expand Down Expand Up @@ -320,6 +330,7 @@ defmodule Membrane.TimestampQueue do
defp push_item(%__MODULE__{} = timestamp_queue, pad_ref, item) do
timestamp_queue
|> Map.update!(:pad_queues, &Map.put_new_lazy(&1, pad_ref, fn -> new_pad_queue() end))
|> Map.update!(:known_pads, &MapSet.put(&1, pad_ref))
|> push_pad_on_heap_if_qex_empty(pad_ref, :infinity)
|> push_item_on_qex(pad_ref, item)
end
Expand Down Expand Up @@ -585,7 +596,13 @@ defmodule Membrane.TimestampQueue do
{actions_acc, items_acc, timestamp_queue}

{:empty, _empty_qex} ->
{_pad_queue, timestamp_queue} = pop_in(timestamp_queue, [:pad_queues, pad_ref])
{pad_queue, timestamp_queue} = pop_in(timestamp_queue, [:pad_queues, pad_ref])

timestamp_queue =
if pad_queue.end_of_stream?,
do: Map.update!(timestamp_queue, :known_pads, &MapSet.delete(&1, pad_ref)),
else: timestamp_queue

{actions_acc, items_acc, timestamp_queue}
end
end
Expand Down Expand Up @@ -651,6 +668,25 @@ defmodule Membrane.TimestampQueue do
|> pop_available_items()
end

@doc """
Returns true, if the pad has been registered in the queue or item from it has been pushed to the queue
and moreover, end of stream of this pad hasn't been popped from the queue.
"""
@spec has_pad?(t(), Pad.ref()) :: boolean()
def has_pad?(%__MODULE__{} = timestamp_queue, pad_ref) do
MapSet.member?(timestamp_queue.known_pads, pad_ref)
end

@doc """
Returns list of all pads, that:
1) have been ever registered in the queue or item from them has been pushed to the queue
2) their end of stream hasn't been popped from the queue.
"""
@spec pads(t()) :: [Pad.ref()]
def pads(%__MODULE__{} = timestamp_queue) do
MapSet.to_list(timestamp_queue.known_pads)
end

defp ensure_queue_not_closed!(%__MODULE__{closed?: true}, pad_ref, item) do
inspected_item =
case item do
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
defmodule Membrane.TimestampQueue.Mixfile do
use Mix.Project

@version "0.1.0"
@version "0.2.0"
@github_url "https://github.com/membraneframework/membrane_timestamp_queue"

def project do
Expand Down
Loading

0 comments on commit f47cc94

Please sign in to comment.