From c5bbd674c4ffbb9f90e869466f9aa4fa0ff7065e Mon Sep 17 00:00:00 2001 From: mxsm Date: Wed, 7 Jun 2023 10:27:38 +0800 Subject: [PATCH] [ISSUE #3983] Optimize MessageQueue (#3984) * [ISSUE #3983] Optimize MessageQueue * modify public of attribute items to private * optimize code readability * optimize code logic --- .../standalone/broker/MessageQueue.java | 57 +++++++++++++------ 1 file changed, 40 insertions(+), 17 deletions(-) diff --git a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/MessageQueue.java b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/MessageQueue.java index c887bf693b..a7951cb384 100644 --- a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/MessageQueue.java +++ b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/MessageQueue.java @@ -24,18 +24,21 @@ import com.google.common.base.Preconditions; +import lombok.Getter; + /** * This is a block queue, can get entity by offset. The queue is a FIFO data structure. */ public class MessageQueue { + @Getter private final MessageEntity[] items; - private int takeIndex; + private volatile int takeIndex; - private int putIndex; + private volatile int putIndex; - private int count; + private volatile int count; private final ReentrantLock lock; @@ -59,9 +62,10 @@ public MessageQueue(int capacity) { } /** - * Insert the message at the tail of this queue, waiting for space to become available if the queue is full + * Inserts the specified MessageEntity object into the queue. * - * @param messageEntity + * @param messageEntity The MessageEntity object to be inserted into the queue. + * @throws InterruptedException if the current thread is interrupted while waiting for space to become available in the queue. */ public void put(MessageEntity messageEntity) throws InterruptedException { Preconditions.checkNotNull(messageEntity); @@ -80,8 +84,8 @@ public void put(MessageEntity messageEntity) throws InterruptedException { /** * Get the first message at this queue, waiting for the message is available if the queue is empty, this method will not remove the message * - * @return MessageEntity - * @throws InterruptedException + * @return The MessageEntity object at the head of the queue. + * @throws InterruptedException if the current thread is interrupted while waiting for an element to become available in the queue. */ public MessageEntity take() throws InterruptedException { ReentrantLock reentrantLock = this.lock; @@ -145,21 +149,26 @@ public MessageEntity getTail() { /** * Get the message by offset, since the offset is increment, so we can get the first message in this queue and calculate the index of this offset * - * @param offset - * @return MessageEntity + * @param offset The offset of the MessageEntity object to be retrieved. + * @return The MessageEntity object with the specified offset, or null if no such object exists in the queue. + * @throws RuntimeException if the specified offset is less than the offset of the head MessageEntity object. */ public MessageEntity getByOffset(long offset) { ReentrantLock reentrantLock = this.lock; reentrantLock.lock(); try { - MessageEntity head = getHead(); - if (head == null) { + if (count == 0) { return null; } + int tailIndex = putIndex - 1; + MessageEntity head = itemAt(takeIndex); if (head.getOffset() > offset) { throw new RuntimeException(String.format("The message has been deleted, offset: %s", offset)); } - MessageEntity tail = getTail(); + if (tailIndex < 0) { + tailIndex += items.length; + } + MessageEntity tail = itemAt(tailIndex); if (tail == null || tail.getOffset() < offset) { return null; } @@ -174,6 +183,9 @@ public MessageEntity getByOffset(long offset) { } } + /** + * Removes the MessageEntity object at the head of the queue. + */ public void removeHead() { ReentrantLock reentrantLock = this.lock; reentrantLock.lock(); @@ -195,11 +207,21 @@ public int getSize() { return count; } - + /** + * Returns the MessageEntity object at the specified index. + * + * @param index The index of the MessageEntity object to be returned. + * @return The MessageEntity object at the specified index. + */ private MessageEntity itemAt(int index) { return items[index]; } + /** + * Insert the message at the tail of this queue, waiting for space to become available if the queue is full + * + * @param messageEntity The MessageEntity object to be inserted into the queue. + */ private void enqueue(MessageEntity messageEntity) { items[putIndex++] = messageEntity; if (putIndex == items.length) { @@ -209,6 +231,11 @@ private void enqueue(MessageEntity messageEntity) { notEmpty.signalAll(); } + /** + * Removes and returns the MessageEntity object at the head of the queue. + * + * @return The MessageEntity object at the head of the queue. + */ private MessageEntity dequeue() { MessageEntity item = items[takeIndex++]; if (takeIndex == items.length) { @@ -225,8 +252,4 @@ public int getTakeIndex() { public int getPutIndex() { return putIndex; } - - public MessageEntity[] getItems() { - return items; - } }