Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade queue mechanism, to return more buffers #3

Merged
merged 5 commits into from
Apr 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
260 changes: 127 additions & 133 deletions lib/membrane/timestamp_queue.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,13 @@ defmodule Membrane.TimestampQueue do
use Bunch.Access

alias Membrane.{Buffer, Event, Pad, StreamFormat}
alias Membrane.Buffer.Metric
alias Membrane.Element.Action

@type pad_queue :: %{
timestamp_offset: integer(),
qex: Qex.t(),
buffers_size: non_neg_integer(),
buffers_number: non_neg_integer(),
update_heap_on_buffer?: boolean(),
paused_demand?: boolean(),
end_of_stream?: boolean(),
use_pts?: boolean() | nil,
Expand All @@ -36,18 +34,20 @@ defmodule Membrane.TimestampQueue do
@opaque t :: %__MODULE__{
current_queue_time: Membrane.Time.t(),
pause_demand_boundary: pos_integer() | :infinity,
metric: Metric.ByteSize | Metric.Count,
pause_demand_boundary_unit: :buffers | :bytes,
pad_queues: %{optional(Pad.ref()) => pad_queue()},
pads_heap: Heap.t(),
waiting_on_buffer_from: MapSet.t()
registered_pads: MapSet.t(),
awaiting_pads: [Pad.ref()]
}

defstruct current_queue_time: Membrane.Time.seconds(0),
pause_demand_boundary: :infinity,
metric: Metric.Count,
pause_demand_boundary_unit: :buffers,
pad_queues: %{},
pads_heap: Heap.max(),
waiting_on_buffer_from: MapSet.new()
registered_pads: MapSet.new(),
awaiting_pads: []

@typedoc """
Options passed to #{inspect(__MODULE__)}.new/1.
Expand All @@ -65,14 +65,17 @@ defmodule Membrane.TimestampQueue do

@spec new(options) :: t()
def new(options \\ []) do
metric =
[pause_demand_boundary: boundary, pause_demand_boundary_unit: unit] =
options
|> Keyword.get(:pause_demand_boundary_unit, :buffers)
|> Metric.from_unit()
|> Keyword.validate!(
pause_demand_boundary: :infinity,
pause_demand_boundary_unit: :buffers
)
|> Enum.sort()

%__MODULE__{
metric: metric,
pause_demand_boundary: Keyword.get(options, :pause_demand_boundary, :infinity)
pause_demand_boundary: boundary,
pause_demand_boundary_unit: unit
}
end

Expand All @@ -87,7 +90,7 @@ defmodule Membrane.TimestampQueue do
@spec register_pad(t(), Pad.ref()) :: t()
def register_pad(%__MODULE__{} = timestamp_queue, pad_ref) do
timestamp_queue
|> Map.update!(:waiting_on_buffer_from, &MapSet.put(&1, pad_ref))
|> Map.update!(:registered_pads, &MapSet.put(&1, pad_ref))
end

@doc """
Expand All @@ -110,15 +113,14 @@ defmodule Membrane.TimestampQueue do
end

def push_buffer(%__MODULE__{} = timestamp_queue, pad_ref, buffer) do
buffer_size = timestamp_queue.metric.buffers_size([buffer])

timestamp_queue
|> Map.update!(:waiting_on_buffer_from, &MapSet.delete(&1, pad_ref))
|> push_item(pad_ref, {:buffer, buffer})
|> Map.update!(:registered_pads, &MapSet.delete(&1, pad_ref))
|> Map.update!(:awaiting_pads, &List.delete(&1, pad_ref))
|> get_and_update_in([:pad_queues, pad_ref], fn pad_queue ->
pad_queue
|> Map.merge(%{
buffers_size: pad_queue.buffers_size + buffer_size,
buffers_size: pad_queue.buffers_size + buffer_size(timestamp_queue, buffer),
buffers_number: pad_queue.buffers_number + 1
})
|> Map.update!(:timestamp_offset, fn
Expand Down Expand Up @@ -187,39 +189,36 @@ defmodule Membrane.TimestampQueue do
@spec push_end_of_stream(t(), Pad.ref()) :: t()
def push_end_of_stream(%__MODULE__{} = timestamp_queue, pad_ref) do
timestamp_queue
|> Map.update!(:waiting_on_buffer_from, &MapSet.delete(&1, pad_ref))
|> push_item(pad_ref, :end_of_stream)
|> put_in([:pad_queues, pad_ref, :end_of_stream?], true)
|> Map.update!(:registered_pads, &MapSet.delete(&1, pad_ref))
|> Map.update!(:awaiting_pads, &List.delete(&1, pad_ref))
end

defp push_item(%__MODULE__{} = timestamp_queue, pad_ref, item) do
timestamp_queue
|> maybe_handle_item_from_new_pad(item, pad_ref)
|> update_in(
[:pad_queues, pad_ref, :qex],
&Qex.push(&1, item)
)
|> maybe_push_pad_on_heap_on_new_item(pad_ref, item)
|> Map.update!(:pad_queues, &Map.put_new(&1, pad_ref, new_pad_queue()))
|> update_in([:pad_queues, pad_ref, :qex], &Qex.push(&1, item))
end

defp maybe_handle_item_from_new_pad(
%__MODULE__{pad_queues: pad_queues} = timestamp_queue,
_item,
pad_ref
)
when is_map_key(pad_queues, pad_ref) do
timestamp_queue
end
defp maybe_push_pad_on_heap_on_new_item(timestamp_queue, pad_ref, item) do
pad_queue = Map.get(timestamp_queue.pad_queues, pad_ref)
empty_qex = Qex.new()

defp maybe_handle_item_from_new_pad(%__MODULE__{} = timestamp_queue, first_item, pad_ref) do
priority =
case first_item do
{:buffer, _buffer} -> -timestamp_queue.current_queue_time
_other -> :infinity
end
case {item, pad_queue} do
{{:buffer, _buffer}, nil} ->
push_pad_on_heap(timestamp_queue, pad_ref, -timestamp_queue.current_queue_time)

timestamp_queue
|> put_in([:pad_queues, pad_ref], new_pad_queue())
|> Map.update!(:pads_heap, &Heap.push(&1, {priority, pad_ref}))
{{:buffer, buffer}, pad_queue} when pad_queue.qex == empty_qex ->
push_pad_on_heap(timestamp_queue, pad_ref, -buffer_time(buffer, pad_queue))

{_non_buffer, pad_queue} when pad_queue == nil or pad_queue.qex == empty_qex ->
push_pad_on_heap(timestamp_queue, pad_ref, :infinity)

_else ->
timestamp_queue
end
end

defp new_pad_queue() do
Expand All @@ -228,7 +227,6 @@ defmodule Membrane.TimestampQueue do
qex: Qex.new(),
buffers_size: 0,
buffers_number: 0,
update_heap_on_buffer?: true,
paused_demand?: false,
end_of_stream?: false,
use_pts?: nil,
Expand All @@ -245,6 +243,12 @@ defmodule Membrane.TimestampQueue do
end
end

defp buffer_size(%__MODULE__{pause_demand_boundary_unit: :buffers}, _buffer),
do: 1

defp buffer_size(%__MODULE__{pause_demand_boundary_unit: :bytes}, %Buffer{payload: payload}),
do: byte_size(payload)

defp buffer_time(%Buffer{dts: dts}, %{use_pts?: false, timestamp_offset: timestamp_offset}),
do: dts + timestamp_offset

Expand All @@ -262,125 +266,115 @@ defmodule Membrane.TimestampQueue do
@doc """
Pops items from the queue while they are available.

An item that is not a buffer is always considered available. A buffer is
available when the following conditions are met:
- There is another buffer or `end_of_stream` enqueued on the pad
- On each other pad there is either `end_of_stream` or a buffer with a lower timestamp.
A buffer `b` from pad `p` is available, if all pads different than `p`
- either have a buffer in the queue, that is older than `b`
- or haven't ever had any buffer on the queue
- or have end of stream pushed on the queue.

An item other than a buffer is considered available if all newer buffers on the same pad are
available.

The returned value is a suggested actions list, a list of popped buffers and the updated queue.

If the amount of buffers associated with any pad in the queue falls below the
`pause_demand_boundary`, the suggested actions list contains `t:Action.resume_auto_demand()`
actions, otherwise it is an empty list.
"""
@spec pop_batch(t()) :: {[Action.resume_auto_demand()], [popped_value() | :none], t()}
@spec pop_batch(t()) :: {[Action.resume_auto_demand()], [popped_value()], t()}
def pop_batch(%__MODULE__{} = timestamp_queue) do
{batch, timestamp_queue} = do_pop_batch(timestamp_queue)
do_pop_batch(timestamp_queue, [], [])
end

{actions, timestamp_queue} =
batch
|> Enum.reduce(MapSet.new(), fn
{pad_ref, {:buffer, _buffer}}, map_set -> MapSet.put(map_set, pad_ref)
_other, map_set -> map_set
end)
|> Enum.reduce({[], timestamp_queue}, fn pad_ref, {actions_acc, timestamp_queue} ->
{actions, timestamp_queue} = actions_after_popping_buffer(timestamp_queue, pad_ref)
{actions ++ actions_acc, timestamp_queue}
end)
defp do_pop_batch(%__MODULE__{} = timestamp_queue, actions_acc, items_acc) do
try_return_buffer? =
MapSet.size(timestamp_queue.registered_pads) == 0 and timestamp_queue.awaiting_pads == []

{actions, batch, timestamp_queue}
end
case Heap.root(timestamp_queue.pads_heap) do
{priority, pad_ref} when try_return_buffer? or priority == :infinity ->
{actions, items, timestamp_queue} =
timestamp_queue
|> Map.update!(:pads_heap, &Heap.pop/1)
|> pop_buffer_and_following_items(pad_ref)

do_pop_batch(timestamp_queue, actions ++ actions_acc, items ++ items_acc)

@spec do_pop_batch(t(), [popped_value()]) :: {[popped_value() | :none], t()}
defp do_pop_batch(timestamp_queue, acc \\ []) do
case do_pop(timestamp_queue) do
{:none, timestamp_queue} -> {Enum.reverse(acc), timestamp_queue}
{value, timestamp_queue} -> do_pop_batch(timestamp_queue, [value | acc])
_other ->
{actions_acc, Enum.reverse(items_acc), timestamp_queue}
end
end

@spec do_pop(t()) :: {popped_value() | :none, t()}
defp do_pop(timestamp_queue) do
if MapSet.size(timestamp_queue.waiting_on_buffer_from) == 0 do
case Heap.root(timestamp_queue.pads_heap) do
{_priority, pad_ref} -> do_pop(timestamp_queue, pad_ref)
nil -> {:none, timestamp_queue}
end
@spec pop_buffer_and_following_items(t(), Pad.ref()) ::
{[Action.resume_auto_demand()], [popped_value()], t()}
defp pop_buffer_and_following_items(%__MODULE__{} = timestamp_queue, pad_ref) do
pad_queue = timestamp_queue.pad_queues |> Map.get(pad_ref)

with {{:value, {:buffer, buffer}}, qex} <- Qex.pop(pad_queue.qex) do
old_buffers_size = pad_queue.buffers_size

pad_queue = %{
pad_queue
| qex: qex,
buffers_size: old_buffers_size - buffer_size(timestamp_queue, buffer),
buffers_number: pad_queue.buffers_number - 1
}

timestamp_queue =
with %{buffers_number: 0, end_of_stream?: false} <- pad_queue do
timestamp_queue
|> Map.update!(:awaiting_pads, &[pad_ref | &1])
else
_pad_queue -> timestamp_queue
end

timestamp_queue =
timestamp_queue
|> put_in([:pad_queues, pad_ref], pad_queue)
|> Map.put(:current_queue_time, buffer_time(buffer, pad_queue))

boundary = timestamp_queue.pause_demand_boundary

actions =
if pad_queue.buffers_size < boundary and old_buffers_size >= boundary,
do: [resume_auto_demand: pad_ref],
else: []

items = [{pad_ref, {:buffer, buffer}}]

pop_following_items(timestamp_queue, pad_ref, actions, items)
else
case Heap.root(timestamp_queue.pads_heap) do
# priority :infinity cannot be associated with a buffer
{:infinity, pad_ref} -> do_pop(timestamp_queue, pad_ref)
_other -> {:none, timestamp_queue}
end
_other -> pop_following_items(timestamp_queue, pad_ref, [], [])
end
end

@spec do_pop(t(), Pad.ref()) :: {popped_value() | :none, t()}
defp do_pop(timestamp_queue, pad_ref) do
pad_queue = Map.get(timestamp_queue.pad_queues, pad_ref)
@spec pop_following_items(t(), Pad.ref(), [Action.resume_auto_demand()], [popped_value()]) ::
{[Action.resume_auto_demand()], [popped_value()], t()}
defp pop_following_items(%__MODULE__{} = timestamp_queue, pad_ref, actions_acc, items_acc) do
pad_queue = timestamp_queue.pad_queues |> Map.get(pad_ref)

case Qex.pop(pad_queue.qex) do
{{:value, {:buffer, buffer}}, qex} ->
buffer_time = buffer_time(buffer, pad_queue)

case pad_queue do
%{update_heap_on_buffer?: true} ->
timestamp_queue
|> Map.update!(:pads_heap, &(&1 |> Heap.pop() |> Heap.push({-buffer_time, pad_ref})))
|> put_in([:pad_queues, pad_ref, :update_heap_on_buffer?], false)
|> do_pop()

%{buffers_number: 1, end_of_stream?: false} ->
# last buffer on pad queue without end of stream
{:none, timestamp_queue}

pad_queue ->
buffer_size = timestamp_queue.metric.buffers_size([buffer])

pad_queue = %{
pad_queue
| qex: qex,
buffers_size: pad_queue.buffers_size - buffer_size,
buffers_number: pad_queue.buffers_number - 1,
update_heap_on_buffer?: true
}

timestamp_queue =
timestamp_queue
|> Map.put(:current_queue_time, buffer_time)
|> put_in([:pad_queues, pad_ref], pad_queue)

{{pad_ref, {:buffer, buffer}}, timestamp_queue}
end
{{:value, {:buffer, buffer}}, _qex} ->
new_priority = -buffer_time(buffer, pad_queue)
timestamp_queue = push_pad_on_heap(timestamp_queue, pad_ref, new_priority)

{actions_acc, items_acc, timestamp_queue}

{{:value, item}, qex} ->
timestamp_queue =
timestamp_queue
|> put_in([:pad_queues, pad_ref, :qex], qex)
timestamp_queue = put_in(timestamp_queue, [:pad_queues, pad_ref, :qex], qex)
items_acc = [{pad_ref, item}] ++ items_acc

{{pad_ref, item}, timestamp_queue}
pop_following_items(timestamp_queue, pad_ref, actions_acc, items_acc)

{:empty, _qex} ->
timestamp_queue
|> Map.update!(:pad_queues, &Map.delete(&1, pad_ref))
|> Map.update!(:pads_heap, &Heap.pop/1)
|> do_pop()
{:empty, _empty_qex} when timestamp_queue.awaiting_pads == [pad_ref] ->
{actions_acc, items_acc, timestamp_queue}

{:empty, _empty_qex} ->
{_pad_queue, timestamp_queue} = pop_in(timestamp_queue, [:pad_queues, pad_ref])
{actions_acc, items_acc, timestamp_queue}
end
end

defp actions_after_popping_buffer(
%__MODULE__{pause_demand_boundary: boundary} = timestamp_queue,
pad_ref
) do
with %{paused_demand?: true, buffers_size: size} when size < boundary <-
get_in(timestamp_queue, [:pad_queues, pad_ref]) do
timestamp_queue =
timestamp_queue
|> put_in([:pad_queues, pad_ref, :paused_demand?], false)

{[resume_auto_demand: pad_ref], timestamp_queue}
else
_other -> {[], timestamp_queue}
end
defp push_pad_on_heap(timestamp_queue, pad_ref, priority) do
heap_item = {priority, pad_ref}
Map.update!(timestamp_queue, :pads_heap, &Heap.push(&1, heap_item))
end
end
Loading