From 02c8c07f55d053c558d6c54db8dd400a543ff926 Mon Sep 17 00:00:00 2001 From: Seth Wiesman Date: Tue, 20 Apr 2021 08:48:32 -0500 Subject: [PATCH] [FLINK-22369][rocksdb] RocksDB state backend might occur ClassNotFoundException when deserializing on TM side --- .../state/EmbeddedRocksDBStateBackend.java | 42 +++++++------------ .../streaming/state/RocksDBStateBackend.java | 3 +- .../streaming/state/RocksDBInitTest.java | 2 +- 3 files changed, 16 insertions(+), 31 deletions(-) diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java index c2651f9d499b8..d7dc57880bfbd 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java @@ -18,7 +18,6 @@ package org.apache.flink.contrib.streaming.state; -import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.ExecutionConfig; @@ -112,8 +111,6 @@ public enum PriorityQueueStateType { private static final long UNDEFINED_WRITE_BATCH_SIZE = -1; - private Logger logger = LOG; - // ------------------------------------------------------------------------ // -- configuration values, set in the application / configuration @@ -260,7 +257,7 @@ private EmbeddedRocksDBStateBackend( original.predefinedOptions == null ? PredefinedOptions.valueOf(config.get(RocksDBOptions.PREDEFINED_OPTIONS)) : original.predefinedOptions; - logger.info("Using predefined options: {}.", predefinedOptions.name()); + LOG.info("Using predefined options: {}.", predefinedOptions.name()); // configure RocksDB options factory try { @@ -278,16 +275,6 @@ private EmbeddedRocksDBStateBackend( latencyTrackingConfigBuilder = original.latencyTrackingConfigBuilder.configure(config); } - /** - * Overrides the default logger for this class. It ensures users of the legacy {@link - * RocksDBStateBackend} see consistent logging. - */ - @Internal - @SuppressWarnings("SameParameterValue") - void setLogger(Logger logger) { - this.logger = logger; - } - // ------------------------------------------------------------------------ // Reconfiguration // ------------------------------------------------------------------------ @@ -334,7 +321,7 @@ private void lazyInitializeForJob( "Local DB files directory '" + f + "' does not exist and cannot be created. "; - logger.error(msg); + LOG.error(msg); errorMessage.append(msg); } else { dirs.add(f); @@ -415,7 +402,7 @@ public AbstractKeyedStateBackend createKeyedStateBackend( // first, make sure that the RocksDB JNI library is loaded // we do this explicitly here to have better error handling String tempDir = env.getTaskManagerInfo().getTmpDirectories()[0]; - ensureRocksDBIsLoaded(tempDir, logger); + ensureRocksDBIsLoaded(tempDir); // replace all characters that are not legal for filenames with underscore String fileCompatibleIdentifier = operatorIdentifier.replaceAll("[^a-zA-Z0-9\\-]", "_"); @@ -437,10 +424,9 @@ public AbstractKeyedStateBackend createKeyedStateBackend( final OpaqueMemoryResource sharedResources = RocksDBOperationUtils.allocateSharedCachesIfConfigured( - memoryConfiguration, env.getMemoryManager(), managedMemoryFraction, logger); + memoryConfiguration, env.getMemoryManager(), managedMemoryFraction, LOG); if (sharedResources != null) { - logger.info( - "Obtained shared RocksDB cache of size {} bytes", sharedResources.getSize()); + LOG.info("Obtained shared RocksDB cache of size {} bytes", sharedResources.getSize()); } final RocksDBResourceContainer resourceContainer = createOptionsAndResourceContainer(sharedResources); @@ -512,7 +498,7 @@ private RocksDBOptionsFactory configureOptionsFactory( ((ConfigurableRocksDBOptionsFactory) originalOptionsFactory) .configure(config); } - logger.info("Using application-defined options factory: {}.", originalOptionsFactory); + LOG.info("Using application-defined options factory: {}.", originalOptionsFactory); return originalOptionsFactory; } @@ -523,7 +509,7 @@ private RocksDBOptionsFactory configureOptionsFactory( DefaultConfigurableOptionsFactory optionsFactory = new DefaultConfigurableOptionsFactory(); optionsFactory.configure(config); - logger.info("Using default options factory: {}.", optionsFactory); + LOG.info("Using default options factory: {}.", optionsFactory); return optionsFactory; } else { @@ -537,7 +523,7 @@ private RocksDBOptionsFactory configureOptionsFactory( optionsFactory = ((ConfigurableRocksDBOptionsFactory) optionsFactory).configure(config); } - logger.info("Using configured options factory: {}.", optionsFactory); + LOG.info("Using configured options factory: {}.", optionsFactory); return optionsFactory; } catch (ClassNotFoundException e) { @@ -834,12 +820,12 @@ public String toString() { // ------------------------------------------------------------------------ @VisibleForTesting - static void ensureRocksDBIsLoaded(String tempDirectory, Logger logger) throws IOException { + static void ensureRocksDBIsLoaded(String tempDirectory) throws IOException { synchronized (EmbeddedRocksDBStateBackend.class) { if (!rocksDbInitialized) { final File tempDirParent = new File(tempDirectory).getAbsoluteFile(); - logger.info( + LOG.info( "Attempting to load RocksDB native library and store it under '{}'", tempDirParent); @@ -863,7 +849,7 @@ static void ensureRocksDBIsLoaded(String tempDirectory, Logger logger) throws IO rocksLibFolder = new File(tempDirParent, "rocksdb-lib-" + new AbstractID()); // make sure the temp path exists - logger.debug( + LOG.debug( "Attempting to create RocksDB native library folder {}", rocksLibFolder); // noinspection ResultOfMethodCallIgnored @@ -877,18 +863,18 @@ static void ensureRocksDBIsLoaded(String tempDirectory, Logger logger) throws IO RocksDB.loadLibrary(); // seems to have worked - logger.info("Successfully loaded RocksDB native library"); + LOG.info("Successfully loaded RocksDB native library"); rocksDbInitialized = true; return; } catch (Throwable t) { lastException = t; - logger.debug("RocksDB JNI library loading attempt {} failed", attempt, t); + LOG.debug("RocksDB JNI library loading attempt {} failed", attempt, t); // try to force RocksDB to attempt reloading the library try { resetRocksDBLoadedFlag(); } catch (Throwable tt) { - logger.debug( + LOG.debug( "Failed to reset 'initialized' flag in RocksDB native code loader", tt); } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java index f630ee24c3319..3b0a71d794fa4 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java @@ -216,7 +216,6 @@ public RocksDBStateBackend( } this.checkpointStreamBackend = checkNotNull(checkpointStreamBackend); this.rocksDBStateBackend = new EmbeddedRocksDBStateBackend(enableIncrementalCheckpointing); - this.rocksDBStateBackend.setLogger(LOG); } /** @deprecated Use {@link #RocksDBStateBackend(StateBackend)} instead. */ @@ -588,7 +587,7 @@ public String toString() { @VisibleForTesting static void ensureRocksDBIsLoaded(String tempDirectory) throws IOException { - EmbeddedRocksDBStateBackend.ensureRocksDBIsLoaded(tempDirectory, LOG); + EmbeddedRocksDBStateBackend.ensureRocksDBIsLoaded(tempDirectory); } @VisibleForTesting diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBInitTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBInitTest.java index ff68a37c33763..578507a085b8f 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBInitTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBInitTest.java @@ -62,7 +62,7 @@ public void testTempLibFolderDeletedOnFail() throws Exception { File tempFolder = temporaryFolder.newFolder(); try { - EmbeddedRocksDBStateBackend.ensureRocksDBIsLoaded(tempFolder.getAbsolutePath(), LOG); + EmbeddedRocksDBStateBackend.ensureRocksDBIsLoaded(tempFolder.getAbsolutePath()); fail("Not throwing expected exception."); } catch (IOException ignored) { // ignored