Skip to content

Commit

Permalink
Re-define 'max size' of SEI queue to operate on unique timestamps
Browse files Browse the repository at this point in the history
This ensures it works correctly when there are multiple SEI messages per
sample and the max size is set from e.g. H.264's
`max_num_reorder_frames`.

PiperOrigin-RevId: 694526152
(cherry picked from commit 53953dd)
  • Loading branch information
icbaker committed Nov 19, 2024
1 parent dba3110 commit 0e37bd0
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 50 deletions.
6 changes: 6 additions & 0 deletions RELEASENOTES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Release notes

### Unreleased changes

* Text:
* Fix garbled CEA-608 subtitles in content with more than one SEI message
per sample.

## 1.5

### 1.5.0-rc01 (2024-11-13)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,19 @@
package androidx.media3.container;

import static androidx.annotation.RestrictTo.Scope.LIBRARY_GROUP;
import static androidx.media3.common.util.Assertions.checkArgument;
import static androidx.media3.common.util.Assertions.checkState;
import static androidx.media3.common.util.Util.castNonNull;

import androidx.annotation.Nullable;
import androidx.annotation.RestrictTo;
import androidx.media3.common.C;
import androidx.media3.common.util.ParsableByteArray;
import androidx.media3.common.util.UnstableApi;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.ArrayList;
import java.util.List;
import java.util.PriorityQueue;
import java.util.concurrent.atomic.AtomicLong;

/** A queue of SEI messages, ordered by presentation timestamp. */
@UnstableApi
Expand All @@ -40,18 +42,17 @@ public interface SeiConsumer {
}

private final SeiConsumer seiConsumer;
private final AtomicLong tieBreakGenerator = new AtomicLong();

/**
* Pool of re-usable {@link SeiMessage} objects to avoid repeated allocations. Elements should be
* added and removed from the 'tail' of the queue (with {@link Deque#push(Object)} and {@link
* Deque#pop()}), to avoid unnecessary array copying.
*/
private final ArrayDeque<SeiMessage> unusedSeiMessages;
/** Pool of re-usable {@link ParsableByteArray} objects to avoid repeated allocations. */
private final ArrayDeque<ParsableByteArray> unusedParsableByteArrays;

private final PriorityQueue<SeiMessage> pendingSeiMessages;
/** Pool of re-usable {@link SampleSeiMessages} objects to avoid repeated allocations. */
private final ArrayDeque<SampleSeiMessages> unusedSampleSeiMessages;

private final PriorityQueue<SampleSeiMessages> pendingSeiMessages;

private int reorderingQueueSize;
@Nullable private SampleSeiMessages lastQueuedMessage;

/**
* Creates an instance, initially with no max size.
Expand All @@ -62,16 +63,24 @@ public interface SeiConsumer {
*/
public ReorderingSeiMessageQueue(SeiConsumer seiConsumer) {
this.seiConsumer = seiConsumer;
unusedSeiMessages = new ArrayDeque<>();
unusedParsableByteArrays = new ArrayDeque<>();
unusedSampleSeiMessages = new ArrayDeque<>();
pendingSeiMessages = new PriorityQueue<>();
reorderingQueueSize = C.LENGTH_UNSET;
}

/**
* Sets the max size of the re-ordering queue.
*
* <p>The size is defined in terms of the number of unique presentation timestamps, rather than
* the number of messages. This ensures that properties like H.264's {@code
* max_number_reorder_frames} can be used to set this max size in the case of multiple SEI
* messages per sample (where multiple SEI messages therefore have the same presentation
* timestamp).
*
* <p>When the queue exceeds this size during a call to {@link #add(long, ParsableByteArray)}, the
* least message is passed to the {@link SeiConsumer} provided during construction.
* messages associated with the least timestamp are passed to the {@link SeiConsumer} provided
* during construction.
*
* <p>If the new size is larger than the number of elements currently in the queue, items are
* removed from the head of the queue (least first) and passed to the {@link SeiConsumer} provided
Expand All @@ -86,7 +95,7 @@ public void setMaxSize(int reorderingQueueSize) {
/**
* Returns the maximum size of this queue, or {@link C#LENGTH_UNSET} if it is unbounded.
*
* <p>See {@link #setMaxSize(int)}.
* <p>See {@link #setMaxSize(int)} for details on how size is defined.
*/
public int getMaxSize() {
return reorderingQueueSize;
Expand All @@ -95,12 +104,16 @@ public int getMaxSize() {
/**
* Adds a message to the queue.
*
* <p>If this causes the queue to exceed its {@linkplain #setMaxSize(int) max size}, the least
* message (which may be the one passed to this method) is passed to the {@link SeiConsumer}
* provided during construction.
* <p>If this causes the queue to exceed its {@linkplain #setMaxSize(int) max size}, messages
* associated with the least timestamp (which may be the message passed to this method) are passed
* to the {@link SeiConsumer} provided during construction.
*
* <p>Messages with matching timestamps must be added consecutively (this will naturally happen
* when parsing messages from a container).
*
* @param presentationTimeUs The presentation time of the SEI message.
* @param seiBuffer The SEI data. The data will be copied, so the provided object can be re-used.
* @param seiBuffer The SEI data. The data will be copied, so the provided object can be re-used
* after this method returns.
*/
public void add(long presentationTimeUs, ParsableByteArray seiBuffer) {
if (reorderingQueueSize == 0
Expand All @@ -110,15 +123,42 @@ && presentationTimeUs < castNonNull(pendingSeiMessages.peek()).presentationTimeU
seiConsumer.consume(presentationTimeUs, seiBuffer);
return;
}
SeiMessage seiMessage =
unusedSeiMessages.isEmpty() ? new SeiMessage() : unusedSeiMessages.poll();
seiMessage.reset(presentationTimeUs, tieBreakGenerator.getAndIncrement(), seiBuffer);
pendingSeiMessages.add(seiMessage);
// Make a local copy of the SEI data so we can store it in the queue and allow the seiBuffer
// parameter to be safely re-used after this add() method returns.
ParsableByteArray seiBufferCopy = copy(seiBuffer);
if (lastQueuedMessage != null && presentationTimeUs == lastQueuedMessage.presentationTimeUs) {
lastQueuedMessage.nalBuffers.add(seiBufferCopy);
return;
}
SampleSeiMessages sampleSeiMessages =
unusedSampleSeiMessages.isEmpty() ? new SampleSeiMessages() : unusedSampleSeiMessages.pop();
sampleSeiMessages.init(presentationTimeUs, seiBufferCopy);
pendingSeiMessages.add(sampleSeiMessages);
lastQueuedMessage = sampleSeiMessages;
if (reorderingQueueSize != C.LENGTH_UNSET) {
flushQueueDownToSize(reorderingQueueSize);
}
}

/**
* Copies {@code input} into a {@link ParsableByteArray} instance from {@link
* #unusedParsableByteArrays}, or a new instance if that is empty.
*/
private ParsableByteArray copy(ParsableByteArray input) {
ParsableByteArray result =
unusedParsableByteArrays.isEmpty()
? new ParsableByteArray()
: unusedParsableByteArrays.pop();
result.reset(input.bytesLeft());
System.arraycopy(
/* src= */ input.getData(),
/* srcPos= */ input.getPosition(),
/* dest= */ result.getData(),
/* destPos= */ 0,
/* length= */ result.bytesLeft());
return result;
}

/**
* Empties the queue, passing all messages (least first) to the {@link SeiConsumer} provided
* during construction.
Expand All @@ -129,47 +169,42 @@ public void flush() {

private void flushQueueDownToSize(int targetSize) {
while (pendingSeiMessages.size() > targetSize) {
SeiMessage seiMessage = castNonNull(pendingSeiMessages.poll());
seiConsumer.consume(seiMessage.presentationTimeUs, seiMessage.data);
unusedSeiMessages.push(seiMessage);
SampleSeiMessages sampleSeiMessages = castNonNull(pendingSeiMessages.poll());
for (int i = 0; i < sampleSeiMessages.nalBuffers.size(); i++) {
seiConsumer.consume(
sampleSeiMessages.presentationTimeUs, sampleSeiMessages.nalBuffers.get(i));
unusedParsableByteArrays.push(sampleSeiMessages.nalBuffers.get(i));
}
sampleSeiMessages.nalBuffers.clear();
if (lastQueuedMessage != null
&& lastQueuedMessage.presentationTimeUs == sampleSeiMessages.presentationTimeUs) {
lastQueuedMessage = null;
}
unusedSampleSeiMessages.push(sampleSeiMessages);
}
}

/** Holds data from a SEI sample with its presentation timestamp. */
private static final class SeiMessage implements Comparable<SeiMessage> {

private final ParsableByteArray data;

private long presentationTimeUs;
/** Holds the presentation timestamp of a sample and the data from associated SEI messages. */
private static final class SampleSeiMessages implements Comparable<SampleSeiMessages> {

/**
* {@link PriorityQueue} breaks ties arbitrarily. This field ensures that insertion order is
* preserved when messages have the same {@link #presentationTimeUs}.
*/
private long tieBreak;
public final List<ParsableByteArray> nalBuffers;
public long presentationTimeUs;

public SeiMessage() {
public SampleSeiMessages() {
presentationTimeUs = C.TIME_UNSET;
data = new ParsableByteArray();
nalBuffers = new ArrayList<>();
}

public void reset(long presentationTimeUs, long tieBreak, ParsableByteArray nalBuffer) {
checkState(presentationTimeUs != C.TIME_UNSET);
public void init(long presentationTimeUs, ParsableByteArray nalBuffer) {
checkArgument(presentationTimeUs != C.TIME_UNSET);
checkState(this.nalBuffers.isEmpty());
this.presentationTimeUs = presentationTimeUs;
this.tieBreak = tieBreak;
this.data.reset(nalBuffer.bytesLeft());
System.arraycopy(
/* src= */ nalBuffer.getData(),
/* srcPos= */ nalBuffer.getPosition(),
/* dest= */ data.getData(),
/* destPos= */ 0,
/* length= */ nalBuffer.bytesLeft());
this.nalBuffers.add(nalBuffer);
}

@Override
public int compareTo(SeiMessage other) {
int timeComparison = Long.compare(this.presentationTimeUs, other.presentationTimeUs);
return timeComparison != 0 ? timeComparison : Long.compare(this.tieBreak, other.tieBreak);
public int compareTo(SampleSeiMessages other) {
return Long.compare(this.presentationTimeUs, other.presentationTimeUs);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,41 @@ public void withMaxSize_addEmitsWhenQueueIsFull() {
.containsExactly(new SeiMessage(/* presentationTimeUs= */ -123, data2));
}

@Test
public void withMaxSize_addEmitsWhenQueueIsFull_handlesDuplicateTimestamps() {
ArrayList<SeiMessage> emittedMessages = new ArrayList<>();
ReorderingSeiMessageQueue reorderingQueue =
new ReorderingSeiMessageQueue(
(presentationTimeUs, seiBuffer) ->
emittedMessages.add(new SeiMessage(presentationTimeUs, seiBuffer)));
reorderingQueue.setMaxSize(1);

// Deliberately re-use a single ParsableByteArray instance to ensure the implementation is
// copying as required.
ParsableByteArray scratchData = new ParsableByteArray();
byte[] data1 = TestUtil.buildTestData(20);
scratchData.reset(data1);
reorderingQueue.add(/* presentationTimeUs= */ 345, scratchData);
// Add a message with a repeated timestamp which should not trigger the max size.
byte[] data2 = TestUtil.buildTestData(15);
scratchData.reset(data2);
reorderingQueue.add(/* presentationTimeUs= */ 345, scratchData);
byte[] data3 = TestUtil.buildTestData(10);
scratchData.reset(data3);
reorderingQueue.add(/* presentationTimeUs= */ -123, scratchData);
// Add another message to flush out the two t=345 messages.
byte[] data4 = TestUtil.buildTestData(5);
scratchData.reset(data4);
reorderingQueue.add(/* presentationTimeUs= */ 456, scratchData);

assertThat(emittedMessages)
.containsExactly(
new SeiMessage(/* presentationTimeUs= */ -123, data3),
new SeiMessage(/* presentationTimeUs= */ 345, data1),
new SeiMessage(/* presentationTimeUs= */ 345, data2))
.inOrder();
}

/**
* Tests that if a message smaller than all current queue items is added when the queue is full,
* the same {@link ParsableByteArray} instance is passed straight to the output to avoid
Expand Down

0 comments on commit 0e37bd0

Please sign in to comment.