Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade queue mechanism, to return more buffers #3

Merged
merged 5 commits into from
Apr 11, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
267 changes: 137 additions & 130 deletions lib/membrane/timestamp_queue.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,13 @@ defmodule Membrane.TimestampQueue do
use Bunch.Access

alias Membrane.{Buffer, Event, Pad, StreamFormat}
alias Membrane.Buffer.Metric
alias Membrane.Element.Action

@type pad_queue :: %{
timestamp_offset: integer(),
qex: Qex.t(),
buffers_size: non_neg_integer(),
buffers_number: non_neg_integer(),
update_heap_on_buffer?: boolean(),
paused_demand?: boolean(),
end_of_stream?: boolean(),
use_pts?: boolean() | nil,
Expand All @@ -36,18 +34,20 @@ defmodule Membrane.TimestampQueue do
@opaque t :: %__MODULE__{
current_queue_time: Membrane.Time.t(),
pause_demand_boundary: pos_integer() | :infinity,
metric: Metric.ByteSize | Metric.Count,
pause_demand_boundary_unit: :buffers | :bytes,
pad_queues: %{optional(Pad.ref()) => pad_queue()},
pads_heap: Heap.t(),
waiting_on_buffer_from: MapSet.t()
registered_pads: MapSet.t(),
awaiting_pads: [Pad.ref()]
}

defstruct current_queue_time: Membrane.Time.seconds(0),
pause_demand_boundary: :infinity,
metric: Metric.Count,
pause_demand_boundary_unit: :buffers,
pad_queues: %{},
pads_heap: Heap.max(),
waiting_on_buffer_from: MapSet.new()
registered_pads: MapSet.new(),
awaiting_pads: []

@typedoc """
Options passed to #{inspect(__MODULE__)}.new/1.
Expand All @@ -65,14 +65,17 @@ defmodule Membrane.TimestampQueue do

@spec new(options) :: t()
def new(options \\ []) do
metric =
[pause_demand_boundary: boundary, pause_demand_boundary_unit: unit] =
options
|> Keyword.get(:pause_demand_boundary_unit, :buffers)
|> Metric.from_unit()
|> Keyword.validate!(
pause_demand_boundary: :infinity,
pause_demand_boundary_unit: :buffers
)
|> Enum.sort()

%__MODULE__{
metric: metric,
pause_demand_boundary: Keyword.get(options, :pause_demand_boundary, :infinity)
pause_demand_boundary: boundary,
pause_demand_boundary_unit: unit
}
end

Expand All @@ -87,7 +90,7 @@ defmodule Membrane.TimestampQueue do
@spec register_pad(t(), Pad.ref()) :: t()
def register_pad(%__MODULE__{} = timestamp_queue, pad_ref) do
timestamp_queue
|> Map.update!(:waiting_on_buffer_from, &MapSet.put(&1, pad_ref))
|> Map.update!(:registered_pads, &MapSet.put(&1, pad_ref))
end

@doc """
Expand All @@ -110,15 +113,14 @@ defmodule Membrane.TimestampQueue do
end

def push_buffer(%__MODULE__{} = timestamp_queue, pad_ref, buffer) do
buffer_size = timestamp_queue.metric.buffers_size([buffer])

timestamp_queue
|> Map.update!(:waiting_on_buffer_from, &MapSet.delete(&1, pad_ref))
|> push_item(pad_ref, {:buffer, buffer})
|> Map.update!(:registered_pads, &MapSet.delete(&1, pad_ref))
|> Map.update!(:awaiting_pads, &List.delete(&1, pad_ref))
|> get_and_update_in([:pad_queues, pad_ref], fn pad_queue ->
pad_queue
|> Map.merge(%{
buffers_size: pad_queue.buffers_size + buffer_size,
buffers_size: pad_queue.buffers_size + buffer_size(timestamp_queue, buffer),
buffers_number: pad_queue.buffers_number + 1
})
|> Map.update!(:timestamp_offset, fn
Expand Down Expand Up @@ -187,39 +189,44 @@ defmodule Membrane.TimestampQueue do
@spec push_end_of_stream(t(), Pad.ref()) :: t()
def push_end_of_stream(%__MODULE__{} = timestamp_queue, pad_ref) do
timestamp_queue
|> Map.update!(:waiting_on_buffer_from, &MapSet.delete(&1, pad_ref))
|> push_item(pad_ref, :end_of_stream)
|> put_in([:pad_queues, pad_ref, :end_of_stream?], true)
|> Map.update!(:registered_pads, &MapSet.delete(&1, pad_ref))
|> Map.update!(:awaiting_pads, &List.delete(&1, pad_ref))
end

defp push_item(%__MODULE__{} = timestamp_queue, pad_ref, item) do
timestamp_queue
|> maybe_handle_item_from_new_pad(item, pad_ref)
|> update_in(
[:pad_queues, pad_ref, :qex],
&Qex.push(&1, item)
)
|> maybe_update_heap_on_pushing_item(pad_ref, item)
|> Map.update!(:pad_queues, &Map.put_new(&1, pad_ref, new_pad_queue()))
|> update_in([:pad_queues, pad_ref, :qex], &Qex.push(&1, item))
end

defp maybe_handle_item_from_new_pad(
%__MODULE__{pad_queues: pad_queues} = timestamp_queue,
_item,
pad_ref
)
when is_map_key(pad_queues, pad_ref) do
timestamp_queue
end
defp maybe_update_heap_on_pushing_item(timestamp_queue, pad_ref, item) do
pad_queue = Map.get(timestamp_queue.pad_queues, pad_ref)

defp maybe_handle_item_from_new_pad(%__MODULE__{} = timestamp_queue, first_item, pad_ref) do
priority =
case first_item do
{:buffer, _buffer} -> -timestamp_queue.current_queue_time
_other -> :infinity
end
first_item_on_awaiting_pad_qex? =
pad_ref in timestamp_queue.awaiting_pads and pad_queue != nil and pad_queue.qex == Qex.new()

timestamp_queue
|> put_in([:pad_queues, pad_ref], new_pad_queue())
|> Map.update!(:pads_heap, &Heap.push(&1, {priority, pad_ref}))
if first_item_on_awaiting_pad_qex? or not is_map_key(timestamp_queue.pad_queues, pad_ref) do
priority =
case item do
{:buffer, buffer} when first_item_on_awaiting_pad_qex? ->
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These conditions are hard to read, can you try to refactor that?

pad_queue = Map.get(timestamp_queue.pad_queues, pad_ref)
-buffer_time(buffer, pad_queue)

{:buffer, _buffer} ->
-timestamp_queue.current_queue_time

_other ->
:infinity
end

timestamp_queue
|> Map.update!(:pads_heap, &Heap.push(&1, {priority, pad_ref}))
else
timestamp_queue
end
end

defp new_pad_queue() do
Expand All @@ -228,7 +235,6 @@ defmodule Membrane.TimestampQueue do
qex: Qex.new(),
buffers_size: 0,
buffers_number: 0,
update_heap_on_buffer?: true,
paused_demand?: false,
end_of_stream?: false,
use_pts?: nil,
Expand All @@ -245,6 +251,12 @@ defmodule Membrane.TimestampQueue do
end
end

defp buffer_size(%__MODULE__{pause_demand_boundary_unit: :buffers}, _buffer),
do: 1

defp buffer_size(%__MODULE__{pause_demand_boundary_unit: :bytes}, %Buffer{payload: payload}),
do: byte_size(payload)

defp buffer_time(%Buffer{dts: dts}, %{use_pts?: false, timestamp_offset: timestamp_offset}),
do: dts + timestamp_offset

Expand All @@ -262,10 +274,13 @@ defmodule Membrane.TimestampQueue do
@doc """
Pops items from the queue while they are available.

An item that is not a buffer is always considered available. A buffer is
available when the following conditions are met:
- There is another buffer or `end_of_stream` enqueued on the pad
- On each other pad there is either `end_of_stream` or a buffer with a lower timestamp.
A buffer `b` from pad `p` is available, if all pads different than `p`
- either have a buffer in the queue, that is older than `b`
- or haven't ever had any buffer on the queue
- or have end of stream pushed on the queue.

An item that is not a buffer is considered available if all buffers from the same pad,
which are newer than the item are available.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
An item that is not a buffer is considered available if all buffers from the same pad,
which are newer than the item are available.
An item other than a buffer is considered available if all newer buffers on the same pad are available.


The returned value is a suggested actions list, a list of popped buffers and the updated queue.

Expand All @@ -275,112 +290,104 @@ defmodule Membrane.TimestampQueue do
"""
@spec pop_batch(t()) :: {[Action.resume_auto_demand()], [popped_value() | :none], t()}
def pop_batch(%__MODULE__{} = timestamp_queue) do
{batch, timestamp_queue} = do_pop_batch(timestamp_queue)
do_pop_batch(timestamp_queue)
end

{actions, timestamp_queue} =
batch
|> Enum.reduce(MapSet.new(), fn
{pad_ref, {:buffer, _buffer}}, map_set -> MapSet.put(map_set, pad_ref)
_other, map_set -> map_set
end)
|> Enum.reduce({[], timestamp_queue}, fn pad_ref, {actions_acc, timestamp_queue} ->
{actions, timestamp_queue} = actions_after_popping_buffer(timestamp_queue, pad_ref)
{actions ++ actions_acc, timestamp_queue}
end)
defp do_pop_batch(%__MODULE__{} = timestamp_queue, actions_acc \\ [], items_acc \\ []) do
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd avoid default values in recursive functions, as it's easy to reset an accumulator by accident

try_return_buffer? =
MapSet.size(timestamp_queue.registered_pads) == 0 and timestamp_queue.awaiting_pads == []

{actions, batch, timestamp_queue}
end
case Heap.root(timestamp_queue.pads_heap) do
{_priority, pad_ref} when try_return_buffer? ->
pop_buffer_and_following_items(timestamp_queue, pad_ref, actions_acc, items_acc)

@spec do_pop_batch(t(), [popped_value()]) :: {[popped_value() | :none], t()}
defp do_pop_batch(timestamp_queue, acc \\ []) do
case do_pop(timestamp_queue) do
{:none, timestamp_queue} -> {Enum.reverse(acc), timestamp_queue}
{value, timestamp_queue} -> do_pop_batch(timestamp_queue, [value | acc])
{:infinity, pad_ref} ->
pop_following_items(timestamp_queue, pad_ref, actions_acc, items_acc)

_other ->
{actions_acc, Enum.reverse(items_acc), timestamp_queue}
end
end

@spec do_pop(t()) :: {popped_value() | :none, t()}
defp do_pop(timestamp_queue) do
if MapSet.size(timestamp_queue.waiting_on_buffer_from) == 0 do
case Heap.root(timestamp_queue.pads_heap) do
{_priority, pad_ref} -> do_pop(timestamp_queue, pad_ref)
nil -> {:none, timestamp_queue}
end
defp pop_buffer_and_following_items(
%__MODULE__{} = timestamp_queue,
pad_ref,
actions_acc,
items_acc
) do
pad_queue = timestamp_queue.pad_queues |> Map.get(pad_ref)

with {{:value, {:buffer, buffer}}, qex} <- Qex.pop(pad_queue.qex) do
old_buffers_size = pad_queue.buffers_size

pad_queue = %{
pad_queue
| qex: qex,
buffers_size: old_buffers_size - buffer_size(timestamp_queue, buffer),
buffers_number: pad_queue.buffers_number - 1
}

timestamp_queue =
with %{buffers_number: 0, end_of_stream?: false} <- pad_queue do
timestamp_queue
|> Map.update!(:awaiting_pads, &[pad_ref | &1])
else
_pad_queue -> timestamp_queue
end

timestamp_queue =
timestamp_queue
|> put_in([:pad_queues, pad_ref], pad_queue)
|> Map.put(:current_queue_time, buffer_time(buffer, pad_queue))

boundary = timestamp_queue.pause_demand_boundary

actions_acc =
if pad_queue.buffers_size < boundary and old_buffers_size >= boundary,
do: [resume_auto_demand: pad_ref] ++ actions_acc,
else: actions_acc

items_acc = [{pad_ref, {:buffer, buffer}}] ++ items_acc

pop_following_items(timestamp_queue, pad_ref, actions_acc, items_acc)
else
case Heap.root(timestamp_queue.pads_heap) do
# priority :infinity cannot be associated with a buffer
{:infinity, pad_ref} -> do_pop(timestamp_queue, pad_ref)
_other -> {:none, timestamp_queue}
end
_other -> pop_following_items(timestamp_queue, pad_ref, actions_acc, items_acc)
end
end

@spec do_pop(t(), Pad.ref()) :: {popped_value() | :none, t()}
defp do_pop(timestamp_queue, pad_ref) do
pad_queue = Map.get(timestamp_queue.pad_queues, pad_ref)
defp pop_following_items(%__MODULE__{} = timestamp_queue, pad_ref, actions_acc, items_acc) do
pad_queue = timestamp_queue.pad_queues |> Map.get(pad_ref)

case Qex.pop(pad_queue.qex) do
{{:value, {:buffer, buffer}}, qex} ->
buffer_time = buffer_time(buffer, pad_queue)

case pad_queue do
%{update_heap_on_buffer?: true} ->
timestamp_queue
|> Map.update!(:pads_heap, &(&1 |> Heap.pop() |> Heap.push({-buffer_time, pad_ref})))
|> put_in([:pad_queues, pad_ref, :update_heap_on_buffer?], false)
|> do_pop()

%{buffers_number: 1, end_of_stream?: false} ->
# last buffer on pad queue without end of stream
{:none, timestamp_queue}

pad_queue ->
buffer_size = timestamp_queue.metric.buffers_size([buffer])

pad_queue = %{
pad_queue
| qex: qex,
buffers_size: pad_queue.buffers_size - buffer_size,
buffers_number: pad_queue.buffers_number - 1,
update_heap_on_buffer?: true
}

timestamp_queue =
timestamp_queue
|> Map.put(:current_queue_time, buffer_time)
|> put_in([:pad_queues, pad_ref], pad_queue)

{{pad_ref, {:buffer, buffer}}, timestamp_queue}
end
{{:value, {:buffer, buffer}}, _qex} ->
# update heap, ignore buffer, recursion to pop_batch
new_priority = -buffer_time(buffer, pad_queue)
heap_item = {new_priority, pad_ref}

timestamp_queue
|> Map.update!(:pads_heap, &(&1 |> Heap.pop() |> Heap.push(heap_item)))
|> do_pop_batch(actions_acc, items_acc)

{{:value, item}, qex} ->
timestamp_queue =
timestamp_queue
|> put_in([:pad_queues, pad_ref, :qex], qex)
# add to acc, recursion self
timestamp_queue = put_in(timestamp_queue, [:pad_queues, pad_ref, :qex], qex)
items_acc = [{pad_ref, item}] ++ items_acc

{{pad_ref, item}, timestamp_queue}
pop_following_items(timestamp_queue, pad_ref, actions_acc, items_acc)

{:empty, _qex} ->
{:empty, _empty_qex} when timestamp_queue.awaiting_pads == [pad_ref] ->
# pop heap, recursion to pop_batch
timestamp_queue
|> Map.update!(:pad_queues, &Map.delete(&1, pad_ref))
|> Map.update!(:pads_heap, &Heap.pop/1)
|> do_pop()
end
end
|> do_pop_batch(actions_acc, items_acc)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes the flow harder to follow, can we return the accumulators here to do_pop_batch instead, so that each function only calls itself recursively?


defp actions_after_popping_buffer(
%__MODULE__{pause_demand_boundary: boundary} = timestamp_queue,
pad_ref
) do
with %{paused_demand?: true, buffers_size: size} when size < boundary <-
get_in(timestamp_queue, [:pad_queues, pad_ref]) do
timestamp_queue =
timestamp_queue
|> put_in([:pad_queues, pad_ref, :paused_demand?], false)
{:empty, _empty_qex} ->
# cleanup, recursion to pop_batch

{[resume_auto_demand: pad_ref], timestamp_queue}
else
_other -> {[], timestamp_queue}
timestamp_queue
|> Map.update!(:pad_queues, &Map.delete(&1, pad_ref))
|> Map.update!(:pads_heap, &Heap.pop/1)
|> do_pop_batch(actions_acc, items_acc)
end
end
end
Loading