Skip to content

Commit

Permalink
Stop filling monitoring queues when processor fails
Browse files Browse the repository at this point in the history
Signed-off-by: Jorge Bescos Gascon <jorge.bescos.gascon@oracle.com>
  • Loading branch information
jbescos committed Jan 22, 2021
1 parent fc07c4a commit e5e32bf
Show file tree
Hide file tree
Showing 4 changed files with 218 additions and 22 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2013, 2018 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2013, 2021 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
Expand All @@ -20,14 +20,14 @@
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;

import javax.ws.rs.ProcessingException;

import javax.annotation.Priority;
import javax.inject.Inject;
import javax.ws.rs.ProcessingException;

import org.glassfish.jersey.internal.inject.InjectionManager;
import org.glassfish.jersey.server.internal.LocalizationMessages;
Expand Down Expand Up @@ -68,6 +68,8 @@ public final class MonitoringEventListener implements ApplicationEventListener {
private final Queue<Integer> responseStatuses = new ArrayBlockingQueue<>(EVENT_QUEUE_SIZE);
private final Queue<RequestEvent> exceptionMapperEvents = new ArrayBlockingQueue<>(EVENT_QUEUE_SIZE);
private volatile MonitoringStatisticsProcessor monitoringStatisticsProcessor;
// By default new events can arrive before MonitoringStatisticsProcessor is running.
private final AtomicBoolean processorFailed = new AtomicBoolean(false);

/**
* Time statistics.
Expand Down Expand Up @@ -185,6 +187,7 @@ public void onEvent(final ApplicationEvent event) {
case RELOAD_FINISHED:
case INITIALIZATION_FINISHED:
this.monitoringStatisticsProcessor = new MonitoringStatisticsProcessor(injectionManager, this);
processorFailed.set(false);
this.monitoringStatisticsProcessor.startMonitoringWorker();
break;
case DESTROY_FINISHED:
Expand Down Expand Up @@ -238,13 +241,13 @@ public void onEvent(final RequestEvent event) {
methodStats = new MethodStats(method, methodTimeStart, now - methodTimeStart);
break;
case EXCEPTION_MAPPING_FINISHED:
if (!exceptionMapperEvents.offer(event)) {
if (!offer(exceptionMapperEvents, event)) {
LOGGER.warning(LocalizationMessages.ERROR_MONITORING_QUEUE_MAPPER());
}
break;
case FINISHED:
if (event.isResponseWritten()) {
if (!responseStatuses.offer(event.getContainerResponse().getStatus())) {
if (!offer(responseStatuses, event.getContainerResponse().getStatus())) {
LOGGER.warning(LocalizationMessages.ERROR_MONITORING_QUEUE_RESPONSE());
}
}
Expand All @@ -264,8 +267,7 @@ public void onEvent(final RequestEvent event) {
}
sb.setLength(sb.length() - 1);
}

if (!requestQueuedItems.offer(new RequestStats(new TimeStats(requestTimeStart, now - requestTimeStart),
if (!offer(requestQueuedItems, new RequestStats(new TimeStats(requestTimeStart, now - requestTimeStart),
methodStats, sb.toString()))) {
LOGGER.warning(LocalizationMessages.ERROR_MONITORING_QUEUE_REQUEST());
}
Expand All @@ -274,6 +276,21 @@ public void onEvent(final RequestEvent event) {
}
}

private <T> boolean offer(Queue<T> queue, T event) {
if (!processorFailed.get()) {
return queue.offer(event);
}
// Don't need to warn that the event was not queued because an Exception was thrown by MonitoringStatisticsProcessor
return true;
}

/**
* Invoked by {@link MonitoringStatisticsProcessor} when there is one exception consuming from queues.
*/
void processorFailed() {
processorFailed.set(true);
}

/**
* Get the exception mapper event queue.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2013, 2019 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2013, 2021 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
Expand Down Expand Up @@ -35,6 +35,7 @@
import org.glassfish.jersey.server.ExtendedResourceContext;
import org.glassfish.jersey.server.ServerProperties;
import org.glassfish.jersey.server.internal.LocalizationMessages;
import org.glassfish.jersey.server.internal.monitoring.MonitoringEventListener.RequestStats;
import org.glassfish.jersey.server.model.ResourceMethod;
import org.glassfish.jersey.server.model.ResourceModel;
import org.glassfish.jersey.server.monitoring.MonitoringStatisticsListener;
Expand Down Expand Up @@ -94,6 +95,7 @@ public void run() {
processResponseCodeEvents();
processExceptionMapperEvents();
} catch (final Throwable t) {
monitoringEventListener.processorFailed();
LOGGER.log(Level.SEVERE, LocalizationMessages.ERROR_MONITORING_STATISTICS_GENERATION(), t);
// rethrowing exception stops further task execution
throw new ProcessingException(LocalizationMessages.ERROR_MONITORING_STATISTICS_GENERATION(), t);
Expand All @@ -120,11 +122,9 @@ public void run() {
private void processExceptionMapperEvents() {
final Queue<RequestEvent> eventQueue = monitoringEventListener.getExceptionMapperEvents();
final FloodingLogger floodingLogger = new FloodingLogger(eventQueue);

while (!eventQueue.isEmpty()) {
RequestEvent event = null;
while ((event = eventQueue.poll()) != null) {
floodingLogger.conditionallyLogFlooding();

final RequestEvent event = eventQueue.remove();
final ExceptionMapperStatisticsImpl.Builder mapperStats = statisticsBuilder.getExceptionMapperStatisticsBuilder();

if (event.getExceptionMapper() != null) {
Expand All @@ -138,12 +138,9 @@ private void processExceptionMapperEvents() {
private void processRequestItems() {
final Queue<MonitoringEventListener.RequestStats> requestQueuedItems = monitoringEventListener.getRequestQueuedItems();
final FloodingLogger floodingLogger = new FloodingLogger(requestQueuedItems);

while (!requestQueuedItems.isEmpty()) {
RequestStats event = null;
while ((event = requestQueuedItems.poll()) != null) {
floodingLogger.conditionallyLogFlooding();

final MonitoringEventListener.RequestStats event = requestQueuedItems.remove();

final MonitoringEventListener.TimeStats requestStats = event.getRequestStats();
statisticsBuilder.addRequestExecution(requestStats.getStartTime(), requestStats.getDuration());

Expand All @@ -160,11 +157,9 @@ private void processRequestItems() {
private void processResponseCodeEvents() {
final Queue<Integer> responseEvents = monitoringEventListener.getResponseStatuses();
final FloodingLogger floodingLogger = new FloodingLogger(responseEvents);

while (!responseEvents.isEmpty()) {
Integer code = null;
while ((code = responseEvents.poll()) != null) {
floodingLogger.conditionallyLogFlooding();

final Integer code = responseEvents.remove();
statisticsBuilder.addResponseCode(code);
}

Expand Down
8 changes: 7 additions & 1 deletion tests/e2e-server/pom.xml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright (c) 2017, 2020 Oracle and/or its affiliates. All rights reserved.
Copyright (c) 2017, 2021 Oracle and/or its affiliates. All rights reserved.
This program and the accompanying materials are made available under the
terms of the Eclipse Public License v. 2.0, which is available at
Expand Down Expand Up @@ -189,6 +189,12 @@
<artifactId>xmlunit</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<profiles>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
/*
* Copyright (c) 2021 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
* http://www.eclipse.org/legal/epl-2.0.
*
* This Source Code may also be made available under the following Secondary
* Licenses when the conditions for such availability set forth in the
* Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
* version 2 with the GNU Classpath Exception, which is available at
* https://www.gnu.org/software/classpath/license.html.
*
* SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
*/

package org.glassfish.jersey.tests.e2e.server.monitoring;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import java.lang.management.ManagementFactory;
import java.lang.reflect.Method;
import java.util.Collection;
import java.util.Map;

import javax.inject.Inject;
import javax.management.JMX;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import javax.management.openmbean.CompositeDataSupport;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.core.Application;
import javax.ws.rs.core.Response;
import javax.ws.rs.ext.ExceptionMapper;
import javax.ws.rs.ext.Provider;

import org.glassfish.jersey.internal.inject.InjectionManager;
import org.glassfish.jersey.internal.inject.Providers;
import org.glassfish.jersey.server.ResourceConfig;
import org.glassfish.jersey.server.ServerProperties;
import org.glassfish.jersey.server.internal.monitoring.MonitoringEventListener;
import org.glassfish.jersey.server.monitoring.ApplicationEventListener;
import org.glassfish.jersey.server.monitoring.ExceptionMapperMXBean;
import org.glassfish.jersey.server.monitoring.RequestEvent;
import org.glassfish.jersey.server.monitoring.RequestEventListener;
import org.glassfish.jersey.test.JerseyTest;
import org.junit.Test;

import static org.junit.Assert.assertEquals;

public class MonitoringEventListenerTest extends JerseyTest {

private static final long TIMEOUT = 500;
private static final String MBEAN_EXCEPTION =
"org.glassfish.jersey:type=MonitoringEventListenerTest,subType=Global,exceptions=ExceptionMapper";

@Path("/example")
public static class ExampleResource {
@Inject
private InjectionManager injectionManager;
@GET
@Path("/error")
public Response error() {
throw new RuntimeException("Any exception to be counted in ExceptionMapper");
}
@GET
@Path("/poison")
public Response poison() {
MonitoringEventListener monitoringEventListener = listener();
RequestEvent requestEvent = mock(RequestEvent.class);
when(requestEvent.getType()).thenReturn(RequestEvent.Type.START);
RequestEventListener eventListener = monitoringEventListener.onRequest(requestEvent);
RequestEvent poisonEvent = mock(RequestEvent.class);
when(poisonEvent.getType()).thenReturn(RequestEvent.Type.EXCEPTION_MAPPING_FINISHED);
when(poisonEvent.getExceptionMapper())
.thenThrow(new IllegalStateException("This causes the scheduler to stop working"));
eventListener.onEvent(poisonEvent);
return Response.ok().build();
}
@GET
@Path("/queueSize")
public Response queueSize() throws Exception {
MonitoringEventListener monitoringEventListener = listener();
Method method = MonitoringEventListener.class.getDeclaredMethod("getExceptionMapperEvents");
method.setAccessible(true);
Collection<?> queue = (Collection<?>) method.invoke(monitoringEventListener);
return Response.ok(queue.size()).build();
}
private MonitoringEventListener listener() {
Iterable<ApplicationEventListener> listeners =
Providers.getAllProviders(injectionManager, ApplicationEventListener.class);
for (ApplicationEventListener listener : listeners) {
if (listener instanceof MonitoringEventListener) {
return (MonitoringEventListener) listener;
}
}
throw new IllegalStateException("MonitoringEventListener was not found");
}
}

@Provider
public static class RuntimeExceptionMapper implements ExceptionMapper<RuntimeException> {
@Override
public Response toResponse(RuntimeException e) {
return Response.status(500).entity("RuntimeExceptionMapper: " + e.getMessage()).build();
}
}

@Override
protected Application configure() {
ResourceConfig resourceConfig = new ResourceConfig(ExampleResource.class);
// Need to map the exception to be counted by ExceptionMapper
resourceConfig.register(RuntimeExceptionMapper.class);
resourceConfig.property(ServerProperties.MONITORING_ENABLED, true);
resourceConfig.property(ServerProperties.MONITORING_STATISTICS_ENABLED, true);
resourceConfig.property(ServerProperties.MONITORING_STATISTICS_MBEANS_ENABLED, true);
resourceConfig.property(ServerProperties.MONITORING_STATISTICS_REFRESH_INTERVAL, 1);
resourceConfig.setApplicationName("MonitoringEventListenerTest");
return resourceConfig;
}

@Test
public void exceptionInScheduler() throws Exception {
final Long ERRORS_BEFORE_FAIL = 10L;
// Send some requests to process some statistics.
request(ERRORS_BEFORE_FAIL);
// Give some time to the scheduler to collect data.
Thread.sleep(TIMEOUT);
// All events were consumed by scheduler
queueIsEmpty();
// Make the scheduler to fail. No more statistics are collected.
makeFailure();
// Sending again requests
request(20);
Thread.sleep(TIMEOUT);
// No new events should be accepted because scheduler is not working.
queueIsEmpty();
Long monitoredErrors = mappedErrorsFromJMX(MBEAN_EXCEPTION);
assertEquals(ERRORS_BEFORE_FAIL, monitoredErrors);
}

private void makeFailure() {
Response response = target("/example/poison").request().get();
assertEquals(200, response.getStatus());
}

private void queueIsEmpty() {
Response response = target("/example/queueSize").request().get();
assertEquals(200, response.getStatus());
assertEquals(Integer.valueOf(0), response.readEntity(Integer.class));
}

private Long mappedErrorsFromJMX(String name) throws Exception {
Long monitoredErrors = null;
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
ObjectName objectName = new ObjectName(name);
ExceptionMapperMXBean bean = JMX.newMBeanProxy(mbs, objectName, ExceptionMapperMXBean.class);
Map<?, ?> counter = bean.getExceptionMapperCount();
CompositeDataSupport value = (CompositeDataSupport) counter.entrySet().iterator().next().getValue();
for (Object obj : value.values()) {
if (obj instanceof Long) {
// Messy way to get the errors, but generic types doesn't match and there is no nice way
monitoredErrors = (Long) obj;
break;
}
}
return monitoredErrors;
}

private void request(long requests) {
for (long i = 0; i < requests; i++) {
Response response = target("/example/error").request().get();
assertEquals(500, response.getStatus());
}
}
}

0 comments on commit e5e32bf

Please sign in to comment.