Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bump version, add new functions, change pause demand boundary API #6

Merged
merged 2 commits into from
May 6, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ The package can be installed by adding `membrane_timestamp_queue` to your list o
```elixir
def deps do
[
{:membrane_timestamp_queue, "~> 0.1.0"}
{:membrane_timestamp_queue, "~> 0.2.0"}
]
end
```
Expand Down
70 changes: 53 additions & 17 deletions lib/membrane/timestamp_queue.ex
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,16 @@ defmodule Membrane.TimestampQueue do
metric_unit: :buffers | :bytes | :time,
pad_queues: %{optional(Pad.ref()) => pad_queue()},
pads_heap: Heap.t(),
blocking_registered_pads: MapSet.t(),
blocking_registered_pads: MapSet.t(Pad.ref()),
registered_pads_offsets: %{optional(Pad.ref()) => integer()},
# :awaiting_pads contain at most one element at the time
awaiting_pads: [Pad.ref()],
closed?: boolean(),
chunk_duration: nil | Membrane.Time.t(),
chunk_full?: boolean(),
next_chunk_boundary: nil | Membrane.Time.t(),
synchronization_strategy: :synchronize_on_arrival | :explicit_offsets
synchronization_strategy: :synchronize_on_arrival | :explicit_offsets,
known_pads: MapSet.t(Pad.ref())
}

defstruct current_queue_time: Membrane.Time.seconds(0),
Expand All @@ -60,18 +61,28 @@ defmodule Membrane.TimestampQueue do
chunk_duration: nil,
chunk_full?: false,
next_chunk_boundary: nil,
synchronization_strategy: :synchronize_on_arrival
synchronization_strategy: :synchronize_on_arrival,
known_pads: MapSet.new()

@typedoc """
Value passed to `:pause_demand_boundary` option in `new/1`.

Specyfies, what amount of buffers associated with a specific pad must be stored in the queue, to pause auto demand.

Is a two-element tuple, which
- the first element specifies metric, in which boundary is expressed (default to `:buffers`)
- the second element is the boundary (default to `1000`).
"""
@type pause_demand_boundary ::
{:buffer | :bytes, pos_integer() | :infinity} | {:time, Membrane.Time.t()}

@typedoc """
Options passed to `#{inspect(__MODULE__)}.new/1`.

Following options are allowed:
- `:pause_demand_boundary` - positive integer, `t:Membrane.Time.t()` or `:infinity` (default to `:infinity`). Tells,
what amount of buffers associated with specific pad must be stored in the queue, to pause auto demand.
- `:pause_demand_boundary_unit` - `:buffers`, `:bytes` or `:time` (deafult to `:buffers`). Tells, in which metric
`:pause_demand_boundary` is specified.
- `:pause_demand_boundary` - `t:pause_demand_boundary()`. Default to `{:buffers, 1000}`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- `:pause_demand_boundary` - `t:pause_demand_boundary()`. Default to `{:buffers, 1000}`.
- `:pause_demand_boundary` - `t:pause_demand_boundary/0`. Defaults to `{:buffers, 1000}`.

- `:chunk_duration` - `Membrane.Time.t()`. Specifies how long the fragments returned by
`#{inspect(__MODULE__)}.pop_chunked/1` will be approximately.
`#{inspect(__MODULE__)}.pop_chunked/1` will be approximately. If not set, popping chunks will not be available.
- `:synchronization_strategy` - `:synchronize_on_arrival` or `:exact_timestamps` (default to `:synchronize_on_arrival`).
Specyfies, how items from different pads will be synchronized with each other. If it is set to:
* `:synchronize_on_arrival` - in the moment of the arrival of the first buffer from a specific pad, there will be
Expand All @@ -84,7 +95,6 @@ defmodule Membrane.TimestampQueue do
"""
@type options :: [
pause_demand_boundary: pos_integer() | Membrane.Time.t() | :infinity,
pause_demand_boundary_unit: :buffers | :bytes | :time,
chunk_duration: Membrane.Time.t(),
synchronization_strategy: :synchronize_on_arrival | :explicit_offsets
]
Expand All @@ -93,15 +103,13 @@ defmodule Membrane.TimestampQueue do
def new(options \\ []) do
[
chunk_duration: chunk_duration,
pause_demand_boundary: boundary,
pause_demand_boundary_unit: unit,
pause_demand_boundary: {unit, boundary},
synchronization_strategy: synchronization_strategy
] =
options
|> Keyword.validate!(
chunk_duration: nil,
pause_demand_boundary: :infinity,
pause_demand_boundary_unit: :buffers,
pause_demand_boundary: {:buffers, 1000},
synchronization_strategy: :synchronize_on_arrival
)
|> Enum.sort()
Expand Down Expand Up @@ -159,9 +167,10 @@ defmodule Membrane.TimestampQueue do
end

timestamp_queue =
if wait?,
do: Map.update!(timestamp_queue, :blocking_registered_pads, &MapSet.put(&1, pad_ref)),
else: timestamp_queue
if(wait?, do: [:blocking_registered_pads, :known_pads], else: [:known_pads])
|> Enum.reduce(timestamp_queue, fn field_name, timestamp_queue ->
Map.update!(timestamp_queue, field_name, &MapSet.put(&1, pad_ref))
end)

if offset != nil,
do: put_in(timestamp_queue, [:registered_pads_offsets, pad_ref], offset),
Expand Down Expand Up @@ -223,6 +232,7 @@ defmodule Membrane.TimestampQueue do
other ->
other
end)
|> Map.update!(:known_pads, &MapSet.put(&1, pad_ref))
|> remove_pad_from_registered_and_awaiting_pads(pad_ref)
|> push_pad_on_heap_if_qex_empty(pad_ref, -buff_time, pad_queue)
|> push_item_on_qex(pad_ref, {:buffer, buffer})
Expand Down Expand Up @@ -320,6 +330,7 @@ defmodule Membrane.TimestampQueue do
defp push_item(%__MODULE__{} = timestamp_queue, pad_ref, item) do
timestamp_queue
|> Map.update!(:pad_queues, &Map.put_new_lazy(&1, pad_ref, fn -> new_pad_queue() end))
|> Map.update!(:known_pads, &MapSet.put(&1, pad_ref))
|> push_pad_on_heap_if_qex_empty(pad_ref, :infinity)
|> push_item_on_qex(pad_ref, item)
end
Expand Down Expand Up @@ -585,7 +596,13 @@ defmodule Membrane.TimestampQueue do
{actions_acc, items_acc, timestamp_queue}

{:empty, _empty_qex} ->
{_pad_queue, timestamp_queue} = pop_in(timestamp_queue, [:pad_queues, pad_ref])
{pad_queue, timestamp_queue} = pop_in(timestamp_queue, [:pad_queues, pad_ref])

timestamp_queue =
if pad_queue.end_of_stream?,
do: Map.update!(timestamp_queue, :known_pads, &MapSet.delete(&1, pad_ref)),
else: timestamp_queue

{actions_acc, items_acc, timestamp_queue}
end
end
Expand Down Expand Up @@ -651,6 +668,25 @@ defmodule Membrane.TimestampQueue do
|> pop_available_items()
end

@doc """
Returns true, if the pad has been registered in the queue or item from it has been pushed to the queue
and moreover, end of stream of this pad hasn't been popped from the queue.
"""
@spec has_pad?(t(), Pad.ref()) :: boolean()
def has_pad?(%__MODULE__{} = timestamp_queue, pad_ref) do
MapSet.member?(timestamp_queue.known_pads, pad_ref)
end

@doc """
Returns list of all pads, that:
1) have been ever registered in the queue or item from them has been pushed to the queue
2) their end of stream hasn't been popped from the queue.
"""
@spec pads(t()) :: [Pad.ref()]
def pads(%__MODULE__{} = timestamp_queue) do
MapSet.to_list(timestamp_queue.known_pads)
end

defp ensure_queue_not_closed!(%__MODULE__{closed?: true}, pad_ref, item) do
inspected_item =
case item do
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
defmodule Membrane.TimestampQueue.Mixfile do
use Mix.Project

@version "0.1.0"
@version "0.2.0"
@github_url "https://github.com/membraneframework/membrane_timestamp_queue"

def project do
Expand Down
Loading
Loading