From 0fd101724bf312f2854cdc0f98974bfc3a19e1a5 Mon Sep 17 00:00:00 2001 From: harture <31417989+harture@users.noreply.github.com> Date: Mon, 9 Sep 2019 15:00:38 +0200 Subject: [PATCH] [CLOUDTRUST-1635] Fix concurrency issue * Fix concurrency issue, * Exit loop if any failure * Improve logging --- .../eventemitter/ConcurrentEvictingQueue.java | 45 -------------- .../eventemitter/EventEmitterProvider.java | 58 +++++++++++++++++-- .../EventEmitterProviderFactory.java | 9 +-- .../ConcurrentEvictingQueueTest.java | 37 ------------ .../EventEmitterProviderTest.java | 21 +++---- 5 files changed, 68 insertions(+), 102 deletions(-) delete mode 100644 src/main/java/io/cloudtrust/keycloak/eventemitter/ConcurrentEvictingQueue.java delete mode 100644 src/test/java/io/cloudtrust/keycloak/eventemitter/ConcurrentEvictingQueueTest.java diff --git a/src/main/java/io/cloudtrust/keycloak/eventemitter/ConcurrentEvictingQueue.java b/src/main/java/io/cloudtrust/keycloak/eventemitter/ConcurrentEvictingQueue.java deleted file mode 100644 index a28863a..0000000 --- a/src/main/java/io/cloudtrust/keycloak/eventemitter/ConcurrentEvictingQueue.java +++ /dev/null @@ -1,45 +0,0 @@ -package io.cloudtrust.keycloak.eventemitter; - -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; - -/** - * Concurrency safe non blocking queue. - * This class is a lazy modification of the LinkedBlockingQueue with a modification of the behavior - * of the offer method to evict the oldest leement is the queue is full. - * This implementation only expects the usage of offer and poll methods to manage elements in the queue. - * - */ -public class ConcurrentEvictingQueue extends LinkedBlockingQueue { - private static final long serialVersionUID = 2926742518543453893L; - - public ConcurrentEvictingQueue(int capacity){ - super(capacity); - } - - - /** - * Inserts the specified element at the tail of this queue. - * If the queue is full, the oldest element is dropped. - * - * @param t the element to add - * @return true in any case as element addition is always possible du to described strategy. - */ - @Override - public boolean offer(T t) { - // Dummy implementation, multiple iterations may be needed to be able to add the element - while(!super.offer(t)){ - super.poll(); - } - - return true; - } - - /** - * @throw UnsupportedOperationException - */ - @Override - public boolean offer(T t, long timeout, TimeUnit unit) throws InterruptedException { - throw new UnsupportedOperationException(); - } -} diff --git a/src/main/java/io/cloudtrust/keycloak/eventemitter/EventEmitterProvider.java b/src/main/java/io/cloudtrust/keycloak/eventemitter/EventEmitterProvider.java index 44eaa26..fc41b74 100644 --- a/src/main/java/io/cloudtrust/keycloak/eventemitter/EventEmitterProvider.java +++ b/src/main/java/io/cloudtrust/keycloak/eventemitter/EventEmitterProvider.java @@ -1,5 +1,6 @@ package io.cloudtrust.keycloak.eventemitter; +import com.fasterxml.jackson.core.JsonProcessingException; import io.cloudtrust.keycloak.snowflake.IdGenerator; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpPost; @@ -14,6 +15,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.Base64; +import java.util.concurrent.LinkedBlockingQueue; /** @@ -36,8 +38,8 @@ public class EventEmitterProvider implements EventListenerProvider{ private CloseableHttpClient httpClient; private HttpClientContext httpContext; private IdGenerator idGenerator; - private ConcurrentEvictingQueue pendingEvents; - private ConcurrentEvictingQueue pendingAdminEvents; + private LinkedBlockingQueue pendingEvents; + private LinkedBlockingQueue pendingAdminEvents; private String targetUri; private String username; private String secretToken; @@ -55,8 +57,8 @@ public class EventEmitterProvider implements EventListenerProvider{ */ EventEmitterProvider(CloseableHttpClient httpClient, IdGenerator idGenerator, String targetUri, SerialisationFormat format, - ConcurrentEvictingQueue pendingEvents, - ConcurrentEvictingQueue pendingAdminEvents){ + LinkedBlockingQueue pendingEvents, + LinkedBlockingQueue pendingAdminEvents){ logger.debug("EventEmitterProvider contructor call"); this.httpClient = httpClient; this.httpContext = HttpClientContext.create(); @@ -86,7 +88,19 @@ public void onEvent(Event event) { logger.debugf("Pending Events to send stored in buffer: %d", pendingEvents.size()); long uid = idGenerator.nextValidId(); IdentifiedEvent identifiedEvent = new IdentifiedEvent(uid, event); - pendingEvents.offer(identifiedEvent); + + while(!pendingEvents.offer(identifiedEvent)){ + Event skippedEvent = pendingEvents.poll(); + if(skippedEvent != null) { + String strEvent = null; + try { + strEvent = SerialisationUtils.toJson(skippedEvent); + } catch (JsonProcessingException e) { + strEvent = "SerializationFailure"; + } + logger.errorf("Event dropped(%s) due to full queue", strEvent); + } + } sendEvents(); } @@ -97,7 +111,19 @@ public void onEvent(AdminEvent adminEvent, boolean b) { logger.debugf("Pending AdminEvents to send stored in buffer: %d", pendingAdminEvents.size()); long uid = idGenerator.nextValidId(); IdentifiedAdminEvent identifiedAdminEvent = new IdentifiedAdminEvent(uid, adminEvent); - pendingAdminEvents.offer(identifiedAdminEvent); + + while(!pendingAdminEvents.offer(identifiedAdminEvent)){ + AdminEvent skippedAdminEvent = pendingAdminEvents.poll(); + if(skippedAdminEvent != null) { + String strAdminEvent = null; + try { + strAdminEvent = SerialisationUtils.toJson(skippedAdminEvent); + } catch (JsonProcessingException e) { + strAdminEvent = "SerializationFailure"; + } + logger.errorf("AdminEvent dropped(%s) due to full queue", strAdminEvent); + } + } sendEvents(); } @@ -125,6 +151,10 @@ private void sendEventsWithJsonFormat() { for (int i=0; i < pendingEventsSize; i++){ IdentifiedEvent event = pendingEvents.poll(); + if(event == null){ + break; + } + try { String json = SerialisationUtils.toJson(event); sendJson(json); @@ -132,6 +162,7 @@ private void sendEventsWithJsonFormat() { pendingEvents.offer(event); logger.infof("Failed to send event(ID=%s), try again later.", event.getUid()); logger.debug("Failed to serialize or send event", e); + break; } } @@ -139,6 +170,10 @@ private void sendEventsWithJsonFormat() { for (int i=0; i < pendingAdminEventsSize; i++){ IdentifiedAdminEvent event = pendingAdminEvents.poll(); + if(event == null){ + break; + } + try { String json = SerialisationUtils.toJson(event); sendJson(json); @@ -146,6 +181,7 @@ private void sendEventsWithJsonFormat() { pendingAdminEvents.offer(event); logger.infof("Failed to send adminEvent(ID=%s), try again later.", event.getUid()); logger.debug("Failed to serialize or send adminEvent", e); + break; } } } @@ -156,6 +192,10 @@ private void sendEventsWithFlatbufferFormat() { for (int i=0; i < pendingEventsSize; i++){ IdentifiedEvent event = pendingEvents.poll(); + if(event == null){ + break; + } + try { ByteBuffer buffer = SerialisationUtils.toFlat(event); sendBytes(buffer, Event.class.getSimpleName()); @@ -163,6 +203,7 @@ private void sendEventsWithFlatbufferFormat() { pendingEvents.offer(event); logger.infof("Failed to send event(ID=%s), try again later.", event.getUid()); logger.debug("Failed to serialize or send event", e); + break; } } @@ -170,6 +211,10 @@ private void sendEventsWithFlatbufferFormat() { for (int i=0; i < pendingAdminEventsSize; i++){ IdentifiedAdminEvent event = pendingAdminEvents.poll(); + if(event == null){ + break; + } + try { ByteBuffer buffer = SerialisationUtils.toFlat(event); sendBytes(buffer, AdminEvent.class.getSimpleName()); @@ -177,6 +222,7 @@ private void sendEventsWithFlatbufferFormat() { pendingAdminEvents.offer(event); logger.infof("Failed to send adminEvent(ID=%s), try again later.", event.getUid()); logger.debug("Failed to serialize or send adminEvent", e); + break; } } } diff --git a/src/main/java/io/cloudtrust/keycloak/eventemitter/EventEmitterProviderFactory.java b/src/main/java/io/cloudtrust/keycloak/eventemitter/EventEmitterProviderFactory.java index d32558e..2d9ebbf 100644 --- a/src/main/java/io/cloudtrust/keycloak/eventemitter/EventEmitterProviderFactory.java +++ b/src/main/java/io/cloudtrust/keycloak/eventemitter/EventEmitterProviderFactory.java @@ -15,6 +15,7 @@ import java.io.IOException; import java.util.LinkedHashMap; import java.util.Map; +import java.util.concurrent.LinkedBlockingQueue; /** * Factory for EventEmitterProvider. @@ -50,8 +51,8 @@ public class EventEmitterProviderFactory implements EventListenerProviderFactory private CloseableHttpClient httpClient; private IdGenerator idGenerator; - private ConcurrentEvictingQueue pendingEventsToSend; - private ConcurrentEvictingQueue pendingAdminEventsToSend; + private LinkedBlockingQueue pendingEventsToSend; + private LinkedBlockingQueue pendingAdminEventsToSend; @@ -132,8 +133,8 @@ public void init(Config.Scope config) { // Initialisation httpClient = HttpClients.createDefault(); idGenerator = new IdGenerator(keycloakId, datacenterId); - pendingEventsToSend = new ConcurrentEvictingQueue<>(bufferCapacity); - pendingAdminEventsToSend = new ConcurrentEvictingQueue<>(bufferCapacity); + pendingEventsToSend = new LinkedBlockingQueue<>(bufferCapacity); + pendingAdminEventsToSend = new LinkedBlockingQueue<>(bufferCapacity); } diff --git a/src/test/java/io/cloudtrust/keycloak/eventemitter/ConcurrentEvictingQueueTest.java b/src/test/java/io/cloudtrust/keycloak/eventemitter/ConcurrentEvictingQueueTest.java deleted file mode 100644 index 989c1ac..0000000 --- a/src/test/java/io/cloudtrust/keycloak/eventemitter/ConcurrentEvictingQueueTest.java +++ /dev/null @@ -1,37 +0,0 @@ -package io.cloudtrust.keycloak.eventemitter; - -import org.junit.Assert; -import org.junit.Test; - -public class ConcurrentEvictingQueueTest { - - @Test - public void testOffer() { - int capacity = 2; - ConcurrentEvictingQueue fifo = new ConcurrentEvictingQueue<>(capacity); - - for (int i = 0; i < 2 * capacity; i++) { - fifo.offer(i); - } - - Assert.assertEquals(capacity, fifo.size()); - } - - - @Test - public void testPoll() { - int capacity = 5; - ConcurrentEvictingQueue fifo = new ConcurrentEvictingQueue<>(capacity); - - for (int i = 0; i < capacity; i++) { - fifo.offer(i); - } - - for (int i = capacity; i > 0; i--) { - Assert.assertEquals(i, fifo.size()); - fifo.poll(); - } - - Assert.assertEquals(null, fifo.poll()); - } -} diff --git a/src/test/java/io/cloudtrust/keycloak/eventemitter/EventEmitterProviderTest.java b/src/test/java/io/cloudtrust/keycloak/eventemitter/EventEmitterProviderTest.java index 39b0a5e..e5fc3a8 100644 --- a/src/test/java/io/cloudtrust/keycloak/eventemitter/EventEmitterProviderTest.java +++ b/src/test/java/io/cloudtrust/keycloak/eventemitter/EventEmitterProviderTest.java @@ -22,6 +22,7 @@ import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.Base64; +import java.util.concurrent.LinkedBlockingQueue; import org.xnio.streams.ChannelInputStream; @@ -58,8 +59,8 @@ public void testFlatbufferFormatOutput() throws IOException, InterruptedExceptio Undertow server = startHttpServer(handler); CloseableHttpClient httpClient = HttpClients.createDefault(); IdGenerator idGenerator = new IdGenerator(1,1); - ConcurrentEvictingQueue pendingEvents = new ConcurrentEvictingQueue<>(BUFFER_CAPACITY); - ConcurrentEvictingQueue pendingAdminEvents = new ConcurrentEvictingQueue<>(BUFFER_CAPACITY); + LinkedBlockingQueue pendingEvents = new LinkedBlockingQueue<>(BUFFER_CAPACITY); + LinkedBlockingQueue pendingAdminEvents = new LinkedBlockingQueue<>(BUFFER_CAPACITY); EventEmitterProvider eventEmitterProvider = new EventEmitterProvider(httpClient, idGenerator, TARGET, SerialisationFormat.FLATBUFFER, pendingEvents,pendingAdminEvents); @@ -88,8 +89,8 @@ public void testJsonFormatOutput() throws IOException, InterruptedException { Undertow server = startHttpServer(handler); CloseableHttpClient httpClient = HttpClients.createDefault(); IdGenerator idGenerator = new IdGenerator(1,1); - ConcurrentEvictingQueue pendingEvents = new ConcurrentEvictingQueue<>(BUFFER_CAPACITY); - ConcurrentEvictingQueue pendingAdminEvents = new ConcurrentEvictingQueue<>(BUFFER_CAPACITY); + LinkedBlockingQueue pendingEvents = new LinkedBlockingQueue<>(BUFFER_CAPACITY); + LinkedBlockingQueue pendingAdminEvents = new LinkedBlockingQueue<>(BUFFER_CAPACITY); EventEmitterProvider eventEmitterProvider = new EventEmitterProvider(httpClient, idGenerator, TARGET, SerialisationFormat.JSON, pendingEvents,pendingAdminEvents); @@ -111,8 +112,8 @@ public void testJsonFormatOutput() throws IOException, InterruptedException { public void testNoConnection() throws IOException { CloseableHttpClient httpClient = HttpClients.createDefault(); IdGenerator idGenerator = new IdGenerator(1,1); - ConcurrentEvictingQueue pendingEvents = new ConcurrentEvictingQueue<>(BUFFER_CAPACITY); - ConcurrentEvictingQueue pendingAdminEvents = new ConcurrentEvictingQueue<>(BUFFER_CAPACITY); + LinkedBlockingQueue pendingEvents = new LinkedBlockingQueue<>(BUFFER_CAPACITY); + LinkedBlockingQueue pendingAdminEvents = new LinkedBlockingQueue<>(BUFFER_CAPACITY); EventEmitterProvider eventEmitterProvider = new EventEmitterProvider(httpClient, idGenerator, TARGET, SerialisationFormat.JSON, pendingEvents,pendingAdminEvents); @@ -138,8 +139,8 @@ public void testServerError() throws IOException, InterruptedException { Undertow server = startHttpServer(handler); CloseableHttpClient httpClient = HttpClients.createDefault(); IdGenerator idGenerator = new IdGenerator(1,1); - ConcurrentEvictingQueue pendingEvents = new ConcurrentEvictingQueue<>(BUFFER_CAPACITY); - ConcurrentEvictingQueue pendingAdminEvents = new ConcurrentEvictingQueue<>(BUFFER_CAPACITY); + LinkedBlockingQueue pendingEvents = new LinkedBlockingQueue<>(BUFFER_CAPACITY); + LinkedBlockingQueue pendingAdminEvents = new LinkedBlockingQueue<>(BUFFER_CAPACITY); EventEmitterProvider eventEmitterProvider = new EventEmitterProvider(httpClient, idGenerator, TARGET, SerialisationFormat.JSON, pendingEvents,pendingAdminEvents); @@ -166,8 +167,8 @@ public void testBufferAndSend() throws IOException, InterruptedException { CloseableHttpClient httpClient = HttpClients.createDefault(); IdGenerator idGenerator = new IdGenerator(1,1); - ConcurrentEvictingQueue pendingEvents = new ConcurrentEvictingQueue<>(BUFFER_CAPACITY); - ConcurrentEvictingQueue pendingAdminEvents = new ConcurrentEvictingQueue<>(BUFFER_CAPACITY); + LinkedBlockingQueue pendingEvents = new LinkedBlockingQueue<>(BUFFER_CAPACITY); + LinkedBlockingQueue pendingAdminEvents = new LinkedBlockingQueue<>(BUFFER_CAPACITY); EventEmitterProvider eventEmitterProvider = new EventEmitterProvider(httpClient, idGenerator, TARGET, SerialisationFormat.JSON, pendingEvents,pendingAdminEvents);