Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix build #406

Merged
merged 3 commits into from
Oct 26, 2020
Merged

Fix build #406

merged 3 commits into from
Oct 26, 2020

Conversation

LMnet
Copy link
Member

@LMnet LMnet commented Oct 15, 2020

The latest update of scala 2.12.10 to 2.12.12 makes all build red because of this error:

[error] Test suite fs2.kafka.KafkaConsumerSpec failed with java.lang.NoClassDefFoundError: scala/math/Ordering$$anon$7.
[error] This may be due to the ClassLoaderLayeringStrategy (ScalaLibrary) used by your task.
[error] To improve performance and reduce memory, sbt attempts to cache the class loaders used to load the project dependencies.
[error] The project class files are loaded in a separate class loader that is created for each test run.
[error] The test class loader accesses the project dependency classes using the cached project dependency classloader.
[error] With this approach, class loading may fail under the following conditions:
[error] 
[error]  * Dependencies use reflection to access classes in your project's classpath.
[error]    Java serialization/deserialization may cause this.
[error]  * An open package is accessed across layers. If the project's classes access or extend
[error]    jvm package private classes defined in a project dependency, it may cause an IllegalAccessError
[error]    because the jvm enforces package private at the classloader level.
[error] 
[error] These issues, along with others that were not enumerated above, may be resolved by changing the class loader layering strategy.
[error] The Flat and ScalaLibrary strategies bundle the full project classpath in the same class loader.
[error] To use one of these strategies, set the  ClassLoaderLayeringStrategy key
[error] in your configuration, for example:
[error] 
[error] set core / Test / classLoaderLayeringStrategy := ClassLoaderLayeringStrategy.ScalaLibrary
[error] set core / Test / classLoaderLayeringStrategy := ClassLoaderLayeringStrategy.Flat
[error] 
[error] See ClassLoaderLayeringStrategy.scala for the full list of options.

I tried to investigate a bit and found this and this. It looks like some sbt plugins manipulate scala version and cause this class loader issue. I'm not 100% sure that this is the root cause, but it looks pretty similar.

I didn't find any valuable solution in a decent time, so I decided to just revert scala 2.12 update. Looks like this update is not necessary for the library and making all current builds green is very important for the pending pull requests.

Also, looks like updating kafka-avro-serializer to the 6.0.0 cause this issue. Fortunately, there is a workaround for it. I added this workaround in a second commit.

@LMnet LMnet mentioned this pull request Oct 15, 2020
@LMnet LMnet changed the title Revert scala 2.12 update to make build green again Fix build Oct 15, 2020
@LMnet
Copy link
Member Author

LMnet commented Oct 15, 2020

A build was fixed, but one of the tests is failed:

[info] - should read from the given offset *** FAILED *** (5 seconds, 75 milliseconds)
[info]   org.apache.kafka.common.KafkaException: Failed to close kafka consumer
[info]   at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2392)
[info]   at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2340)
[info]   at fs2.kafka.internal.WithConsumer$.$anonfun$apply$8(WithConsumer.scala:47)
[info]   at unsafeRunSync @ fs2.kafka.KafkaSpec.$anonfun$new$2(KafkaSpec.scala:25)
[info]   at blockOn @ fs2.kafka.internal.WithConsumer$$anon$1.$anonfun$apply$6(WithConsumer.scala:40)
[info]   at flatMap @ fs2.kafka.internal.Synchronized$$anon$1.$anonfun$use$2(Synchronized.scala:41)
[info]   at unsafeRunSync @ fs2.kafka.KafkaSpec.$anonfun$new$2(KafkaSpec.scala:25)
[info]   at bracket @ fs2.kafka.internal.Synchronized$$anon$1.$anonfun$use$1(Synchronized.scala:42)
[info]   at flatMap @ fs2.kafka.internal.Synchronized$$anon$1.use(Synchronized.scala:39)
[info]   at flatMap @ fs2.internal.ScopedResource$$anon$1.release(ScopedResource.scala:121)
[info]   at flatTraverse @ fs2.internal.CompileScope.$anonfun$lease$1(CompileScope.scala:317)
[info]   at as @ fs2.Stream$.$anonfun$evalTap$1(Stream.scala:1096)
[info]   at flatMap @ fs2.internal.ScopedResource$$anon$1.release(ScopedResource.scala:121)
[info]   at flatTraverse @ fs2.internal.CompileScope.$anonfun$lease$1(CompileScope.scala:317)
[info]   at as @ fs2.Stream$.$anonfun$evalTap$1(Stream.scala:1096)
[info]   at flatMap @ fs2.internal.ScopedResource$$anon$1.release(ScopedResource.scala:121)
[info]   at flatTraverse @ fs2.internal.CompileScope.$anonfun$lease$1(CompileScope.scala:317)
[info]   Cause: org.apache.kafka.common.KafkaException: User rebalance callback throws an error
[info]   at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onLeavePrepare(ConsumerCoordinator.java:757)
[info]   at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.close(AbstractCoordinator.java:976)
[info]   at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.close(ConsumerCoordinator.java:895)
[info]   at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2373)
[info]   at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2340)
[info]   at fs2.kafka.internal.WithConsumer$.$anonfun$apply$8(WithConsumer.scala:47)
[info]   at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
[info]   at cats.effect.internals.IORunLoop$.cats$effect$internals$IORunLoop$$loop(IORunLoop.scala:103)
[info]   at cats.effect.internals.IORunLoop$.restartCancelable(IORunLoop.scala:50)
[info]   at cats.effect.internals.IOBracket$BracketStart.run(IOBracket.scala:100)
[info]   at cats.effect.internals.Trampoline.cats$effect$internals$Trampoline$$immediateLoop(Trampoline.scala:67)
[info]   at cats.effect.internals.Trampoline.startLoop(Trampoline.scala:35)
[info]   at cats.effect.internals.TrampolineEC$JVMTrampoline.super$startLoop(TrampolineEC.scala:90)
[info]   at cats.effect.internals.TrampolineEC$JVMTrampoline.$anonfun$startLoop$1(TrampolineEC.scala:90)
[info]   at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
[info]   at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:94)
[info]   at cats.effect.internals.TrampolineEC$JVMTrampoline.startLoop(TrampolineEC.scala:90)
[info]   at cats.effect.internals.Trampoline.execute(Trampoline.scala:43)
[info]   at cats.effect.internals.TrampolineEC.execute(TrampolineEC.scala:42)
[info]   at cats.effect.internals.IOBracket$BracketStart.apply(IOBracket.scala:80)
[info]   at cats.effect.internals.IOBracket$BracketStart.apply(IOBracket.scala:58)
[info]   at cats.effect.internals.IORunLoop$.cats$effect$internals$IORunLoop$$loop(IORunLoop.scala:177)
[info]   at cats.effect.internals.IORunLoop$RestartCallback.signal(IORunLoop.scala:455)
[info]   at cats.effect.internals.IORunLoop$RestartCallback.apply(IORunLoop.scala:476)
[info]   at cats.effect.internals.IORunLoop$RestartCallback.apply(IORunLoop.scala:414)
[info]   at cats.effect.internals.IOShift$Tick.run(IOShift.scala:36)
[info]   at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
[info]   at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
[info]   at java.base/java.lang.Thread.run(Thread.java:834)
[info]   Cause: java.lang.IllegalStateException: Attempting to complete a Deferred that has already been completed
[info]   at flatTraverse @ fs2.internal.CompileScope.$anonfun$lease$1(CompileScope.scala:317)
[info]   at traverse @ fs2.internal.CompileScope.fs2$internal$CompileScope$$traverseError(CompileScope.scala:191)
[info]   at traverse @ fs2.kafka.internal.KafkaConsumerActor.$anonfun$poll$6(KafkaConsumerActor.scala:433)
[info]   at traverse @ fs2.internal.CompileScope.fs2$internal$CompileScope$$traverseError(CompileScope.scala:191)
[info]   at traverse @ fs2.internal.CompileScope.fs2$internal$CompileScope$$traverseError(CompileScope.scala:191)
[info]   at traverse @ fs2.kafka.internal.KafkaConsumerActor.$anonfun$poll$6(KafkaConsumerActor.scala:433)
[info]   at $greater$greater$extension @ fs2.Stream$.$anonfun$groupWithin$3(Stream.scala:1446)
[info]   at $greater$greater$extension @ fs2.Stream$.$anonfun$groupWithin$3(Stream.scala:1446)
[info]   at $greater$greater$extension @ fs2.Stream$.$anonfun$groupWithin$3(Stream.scala:1446)
[info]   at $greater$greater$extension @ fs2.Stream$.$anonfun$groupWithin$3(Stream.scala:1446)
[info]   at $greater$greater$extension @ fs2.Stream$.$anonfun$groupWithin$3(Stream.scala:1446)
[info]   at flatMap @ fs2.kafka.internal.KafkaConsumerActor.fs2$kafka$internal$KafkaConsumerActor$$revoked(KafkaConsumerActor.scala:252)

Looks like there is some race condition, I will try to investigate.

@LMnet
Copy link
Member Author

LMnet commented Oct 15, 2020

I found a few race conditions in the KafkaConsumerActor and fixed them. The main problem point was interacting with the Ref[F, State[F, K, V]]. Some of these interactions were not atomic and not synchronized. At first, I thought, that it was ok because all interactions inside actor are made through the requests queue. But then I found, that consumerRebalanceListener runs bypassing this queue. So, there are possible race conditions in this actor.

To fix race conditions I made all interactions with state atomic through a modify method. Because of this, some code became a bit more difficult.

The last commit is pretty complex. If it helps, I can create a separate pull request with it.

@codecov-io
Copy link

Codecov Report

Merging #406 into master will decrease coverage by 1.02%.
The diff coverage is 76.19%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master     #406      +/-   ##
==========================================
- Coverage   92.01%   90.99%   -1.03%     
==========================================
  Files          62       62              
  Lines        1466     1499      +33     
  Branches       49       46       -3     
==========================================
+ Hits         1349     1364      +15     
- Misses        117      135      +18     
Impacted Files Coverage Δ
.../scala/fs2/kafka/internal/KafkaConsumerActor.scala 83.77% <76.19%> (-4.33%) ⬇️
...e/src/main/scala/fs2/kafka/internal/LogEntry.scala 32.50% <0.00%> (-2.50%) ⬇️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update d1a5b56...54e666c. Read the comment docs.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

Successfully merging this pull request may close these issues.

3 participants