Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RejectedExecution exception after call closed #9547

Closed
YifeiZhuang opened this issue Sep 15, 2022 · 1 comment · Fixed by #9626
Closed

RejectedExecution exception after call closed #9547

YifeiZhuang opened this issue Sep 15, 2022 · 1 comment · Fixed by #9626
Assignees
Labels
Milestone

Comments

@YifeiZhuang
Copy link
Contributor

Sep 14, 2022 11:55:46 PM io.netty.util.concurrent.DefaultPromise notifyListener0
WARNING: An exception was thrown by io.grpc.netty.NettyClientHandler$4.operationComplete()
java.util.concurrent.RejectedExecutionException
        at io.grpc.stub.ClientCalls$ThreadlessExecutor.execute(ClientCalls.java:790)
        at io.grpc.internal.RetriableStream$Sublistener.closed(RetriableStream.java:904)
        at io.grpc.internal.DelayedStream$DelayedStreamListener$4.run(DelayedStream.java:510)
        at io.grpc.internal.DelayedStream$DelayedStreamListener.delayOrExecute(DelayedStream.java:462)
        at io.grpc.internal.DelayedStream$DelayedStreamListener.closed(DelayedStream.java:507)
        at io.grpc.internal.ForwardingClientStreamListener.closed(ForwardingClientStreamListener.java:34)
        at io.grpc.internal.InternalSubchannel$CallTracingTransport$1$1.closed(InternalSubchannel.java:693)
        at io.grpc.internal.AbstractClientStream$TransportState.closeListener(AbstractClientStream.java:459)
        at io.grpc.internal.AbstractClientStream$TransportState.access$400(AbstractClientStream.java:221)
        at io.grpc.internal.AbstractClientStream$TransportState$1.run(AbstractClientStream.java:442)
        at io.grpc.internal.AbstractClientStream$TransportState.deframerClosed(AbstractClientStream.java:278)
        at io.grpc.internal.Http2ClientStreamTransportState.deframerClosed(Http2ClientStreamTransportState.java:31)
        at io.grpc.internal.MessageDeframer.close(MessageDeframer.java:233)
        at io.grpc.internal.AbstractStream$TransportState.closeDeframer(AbstractStream.java:198)
        at io.grpc.internal.AbstractClientStream$TransportState.transportReportStatus(AbstractClientStream.java:445)
        at io.grpc.netty.NettyClientHandler$4.operationComplete(NettyClientHandler.java:646)
        at io.grpc.netty.NettyClientHandler$4.operationComplete(NettyClientHandler.java:610)
        at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578)
        at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:552)
        at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491)
        at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616)
        at io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:609)
        at io.netty.util.concurrent.DefaultPromise.setFailure(DefaultPromise.java:109)
        at io.netty.channel.DefaultChannelPromise.setFailure(DefaultChannelPromise.java:89)
        at io.netty.handler.codec.http2.StreamBufferingEncoder$Frame.release(StreamBufferingEncoder.java:327)
        at io.netty.handler.codec.http2.StreamBufferingEncoder$PendingStream.close(StreamBufferingEncoder.java:308)
        at io.netty.handler.codec.http2.StreamBufferingEncoder.cancelGoAwayStreams(StreamBufferingEncoder.java:274)
        at io.netty.handler.codec.http2.StreamBufferingEncoder.access$400(StreamBufferingEncoder.java:59)
        at io.netty.handler.codec.http2.StreamBufferingEncoder$1.onGoAwayReceived(StreamBufferingEncoder.java:138)
        at io.netty.handler.codec.http2.DefaultHttp2Connection.goAwayReceived(DefaultHttp2Connection.java:237)
        at io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder.onGoAwayRead0(DefaultHttp2ConnectionDecoder.java:217)
        at io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder$FrameReadListener.onGoAwayRead(DefaultHttp2ConnectionDecoder.java:583)
        at io.netty.handler.codec.http2.Http2InboundFrameLogger$1.onGoAwayRead(Http2InboundFrameLogger.java:119)
        at io.netty.handler.codec.http2.DefaultHttp2FrameReader.readGoAwayFrame(DefaultHttp2FrameReader.java:580)
        at io.netty.handler.codec.http2.DefaultHttp2FrameReader.processPayloadState(DefaultHttp2FrameReader.java:271)
        at io.netty.handler.codec.http2.DefaultHttp2FrameReader.readFrame(DefaultHttp2FrameReader.java:159)
        at io.netty.handler.codec.http2.Http2InboundFrameLogger.readFrame(Http2InboundFrameLogger.java:41)
        at io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder.decodeFrame(DefaultHttp2ConnectionDecoder.java:173)
        at io.netty.handler.codec.http2.Http2ConnectionHandler$FrameDecoder.decode(Http2ConnectionHandler.java:378)
        at io.netty.handler.codec.http2.Http2ConnectionHandler.decode(Http2ConnectionHandler.java:438)
        at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:507)
        at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:446)
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.base/java.lang.Thread.run(Thread.java:829)
Sep 14, 2022 11:55:46 PM io.grpc.internal.ManagedChannelImpl$2 uncaughtException
SEVERE: [Channel<1>: (localhost:8080)] Uncaught exception in the SynchronizationContext. Panic!
java.util.concurrent.RejectedExecutionException
        at io.grpc.stub.ClientCalls$ThreadlessExecutor.execute(ClientCalls.java:790)
        at io.grpc.internal.DelayedClientTransport.reprocess(DelayedClientTransport.java:311)
        at io.grpc.internal.ManagedChannelImpl.updateSubchannelPicker(ManagedChannelImpl.java:899)
        at io.grpc.internal.ManagedChannelImpl.access$5300(ManagedChannelImpl.java:118)
        at io.grpc.internal.ManagedChannelImpl$LbHelperImpl$1UpdateBalancingState.run(ManagedChannelImpl.java:1482)
        at io.grpc.SynchronizationContext.drain(SynchronizationContext.java:95)
        at io.grpc.SynchronizationContext.execute(SynchronizationContext.java:127)
        at io.grpc.internal.InternalSubchannel$TransportListener.transportReady(InternalSubchannel.java:547)
        at io.grpc.netty.ClientTransportLifecycleManager.notifyReady(ClientTransportLifecycleManager.java:44)
        at io.grpc.netty.NettyClientHandler$FrameListener.onSettingsRead(NettyClientHandler.java:920)
        at io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder$FrameReadListener.onSettingsRead(DefaultHttp2ConnectionDecoder.java:515)
        at io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder$PrefaceFrameListener.onSettingsRead(DefaultHttp2ConnectionDecoder.java:735)
        at io.netty.handler.codec.http2.Http2InboundFrameLogger$1.onSettingsRead(Http2InboundFrameLogger.java:93)
        at io.netty.handler.codec.http2.DefaultHttp2FrameReader.readSettingsFrame(DefaultHttp2FrameReader.java:531)
        at io.netty.handler.codec.http2.DefaultHttp2FrameReader.processPayloadState(DefaultHttp2FrameReader.java:262)
        at io.netty.handler.codec.http2.DefaultHttp2FrameReader.readFrame(DefaultHttp2FrameReader.java:159)
        at io.netty.handler.codec.http2.Http2InboundFrameLogger.readFrame(Http2InboundFrameLogger.java:41)
        at io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder.decodeFrame(DefaultHttp2ConnectionDecoder.java:173)
        at io.netty.handler.codec.http2.Http2ConnectionHandler$FrameDecoder.decode(Http2ConnectionHandler.java:378)
        at io.netty.handler.codec.http2.Http2ConnectionHandler$PrefaceDecoder.decode(Http2ConnectionHandler.java:242)
        at io.netty.handler.codec.http2.Http2ConnectionHandler.decode(Http2ConnectionHandler.java:438)
        at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:507)
        at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:446)
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.base/java.lang.Thread.run(Thread.java:829)

Exception in thread "main" io.grpc.StatusRuntimeException: INTERNAL: Panic! This is a bug!
        at io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:271)
        at io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:252)
        at io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:165)
        at io.grpc.testing.integration.TestServiceGrpc$TestServiceBlockingStub.emptyCall(TestServiceGrpc.java:615)
        at io.grpc.testing.integration.AbstractInteropTest.emptyUnary(AbstractInteropTest.java:421)
        at io.grpc.testing.integration.TestServiceClient.runTest(TestServiceClient.java:264)
        at io.grpc.testing.integration.TestServiceClient.run(TestServiceClient.java:252)
        at io.grpc.testing.integration.TestServiceClient.main(TestServiceClient.java:66)
Caused by: java.util.concurrent.RejectedExecutionException
        at io.grpc.stub.ClientCalls$ThreadlessExecutor.execute(ClientCalls.java:790)
        at io.grpc.internal.DelayedClientTransport.reprocess(DelayedClientTransport.java:311)
        at io.grpc.internal.ManagedChannelImpl.updateSubchannelPicker(ManagedChannelImpl.java:899)
        at io.grpc.internal.ManagedChannelImpl.access$5300(ManagedChannelImpl.java:118)
        at io.grpc.internal.ManagedChannelImpl$LbHelperImpl$1UpdateBalancingState.run(ManagedChannelImpl.java:1482)
        at io.grpc.SynchronizationContext.drain(SynchronizationContext.java:95)
        at io.grpc.SynchronizationContext.execute(SynchronizationContext.java:127)
        at io.grpc.internal.InternalSubchannel$TransportListener.transportReady(InternalSubchannel.java:547)
        at io.grpc.netty.ClientTransportLifecycleManager.notifyReady(ClientTransportLifecycleManager.java:44)
        at io.grpc.netty.NettyClientHandler$FrameListener.onSettingsRead(NettyClientHandler.java:920)
        at io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder$FrameReadListener.onSettingsRead(DefaultHttp2ConnectionDecoder.java:515)
        at io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder$PrefaceFrameListener.onSettingsRead(DefaultHttp2ConnectionDecoder.java:735)
        at io.netty.handler.codec.http2.Http2InboundFrameLogger$1.onSettingsRead(Http2InboundFrameLogger.java:93)
        at io.netty.handler.codec.http2.DefaultHttp2FrameReader.readSettingsFrame(DefaultHttp2FrameReader.java:531)
        at io.netty.handler.codec.http2.DefaultHttp2FrameReader.processPayloadState(DefaultHttp2FrameReader.java:262)
        at io.netty.handler.codec.http2.DefaultHttp2FrameReader.readFrame(DefaultHttp2FrameReader.java:159)
        at io.netty.handler.codec.http2.Http2InboundFrameLogger.readFrame(Http2InboundFrameLogger.java:41)
        at io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder.decodeFrame(DefaultHttp2ConnectionDecoder.java:173)
        at io.netty.handler.codec.http2.Http2ConnectionHandler$FrameDecoder.decode(Http2ConnectionHandler.java:378)
        at io.netty.handler.codec.http2.Http2ConnectionHandler$PrefaceDecoder.decode(Http2ConnectionHandler.java:242)
        at io.netty.handler.codec.http2.Http2ConnectionHandler.decode(Http2ConnectionHandler.java:438)
        at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:507)
        at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:446)
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.base/java.lang.Thread.run(Thread.java:829)
@YifeiZhuang YifeiZhuang self-assigned this Sep 15, 2022
@YifeiZhuang YifeiZhuang added this to the Next milestone Sep 15, 2022
@ejona86 ejona86 added the bug label Oct 4, 2022
@YifeiZhuang
Copy link
Contributor Author

Background

ThreadlessExecutor wraps over client supplied executor, in order to manage the executor lifecycle. Application methods normally run on this executor. ClientCall injects it into CallOptions to pass it to transports, e.g. during call buffering or retry processing. The ThreadlessExecutor will queue the runnables and execute them sequentially. The moment that the transport notifies call is closed, it should be safe to shutdown the executor. If a subsystem still schedules runnables on the executor after the call is terminated, is not allowed. To obey the rule, subsystems, e.g. call buffering or retrying, may need to do internal synchronization in order to prevent itself from using the executor after the call is closed.

ManagedChannelImpl

#8978

Retriable Stream

Retriable stream intercepts the stream seen from ClientCall. It talks to the transport and may create multiple real streams until one of them receives successful response from the wire or it decides no more retry. Then it notifies up to the application of the result. Once RetrableStream returns to the application, it marks the end of the retry processing and the Retriable Stream should respect the life cycle related to the call. Failing to do so violates the contract mentioned above and may cause channel panic.
In Retriable Stream, the calls on the stream from the application are saved and replayed for each of the retried sub streams. When a previous stream is closed in an error status code, a new sub stream may be created because of, e.g. transparent retry, or the retry strategy decides to. The replay tasks run on the executor supplied to the call, i.e. through CallOptions.

Problem

There is a race condition between cancel() to close the masterListener, and using callExecutor to drain the retry streams: CallOption.Executor is still used (in Sublistener.closed()) even after the application is notified of the close (retriableStream.cancel()). Once the close happens, in the Stub layer the ClientCall shutdowns the ThreadlessExecutor immediately and does not allow new runnables to be scheduled on the executor. However, Retriable Stream is lacking in the synchronization and still schedules drain tasks on the executor, thus RejectedExecution is thrown. Note that the underneath DelayedStream also uses the executor, so it needs to be considered as well, we will discuss this in the solution section.

Reproduction

Use interop tests setup can reproduce the bug. At the server, fail the RPC immediately (setting max connection age to 0). At the client, set a small(100ms) deadline. Run interop test empty call for 100 times, on a GCE VM. Reproduction rate 100%.

Solution: Use In flight retriable stream counter to synchronize call close and call executor

Use a new field atomic integer called InFlightSubStreams.
A negative value of the counter indicates a cancellation has been committed.
A positive value of the counter indicates there are outstanding new streams and it is not safe to be closed.
In cancel(), add Integer.MIN_VALUE to the counter.
When creating a new subStream, increment the counter first, if it is a positive number then cancel is not called so it is safe to create. Plus cancel() will not close the masterListener since the counter is positive.
Decrement the counter when it is drained. (This might be wrong note that drained means the stream is closed, immediately call drain does not mean drained finished, because the other subsystems, e.g. delayedStream, might still be using the executor. Therefore, we do this check in listener closed())
Between cancel() and drain() the last stream, if cancel() happens first, then the counter equals to 0 in the last stream close(), notify masterListner.closed() there. If drain() happens first, then the counter equals Integer.MIN_VALUE in cancel(), notify the masterListner there.

Alternative Solution:

Use a new field SettableFuture called delayedClose.
Delay notifying masterListener in cancel(). Use ListenerExecutor to synchronize between cancel() and creating a new stream. Blocking when the cancel() is committed, wait for the Future to be done. Creating a new sub stream should check whether the call is already closed. Create a new SettableFuture if it wins and set null value to it once drained.

cc. @ejona86

@ejona86 ejona86 modified the milestones: Next, 1.52 Dec 20, 2022
@github-actions github-actions bot locked as resolved and limited conversation to collaborators Mar 21, 2023
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants