Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

POC for abstracting file storage #206

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ public class AppProperties {
@Value("${app.uploadPath}")
private String uploadPath;

@Value("${app.cachePath}")
private String cachePath;

@Value("${app.key}")
private String appKey;

Expand Down Expand Up @@ -82,6 +85,10 @@ public String getUploadPath() {
return uploadPath;
}

public String getCachePath() {
return cachePath;
}

public String getAppKey() {
return appKey;
}
Expand Down Expand Up @@ -123,6 +130,7 @@ public String toString() {
return "AppProperties{"
+ "name='" + name + '\''
+ ", uploadPath='" + uploadPath + '\''
+ ", cachePath='" + cachePath + '\''
+ ", appKey='XXXXXX'"
+ ", maxConcurrentWebSocketConsumers=" + maxConcurrentWebSocketConsumers
+ ", consumerIdPrefix='" + consumerIdPrefix + '\''
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.sourcelab.kafka.webview.ui.manager.encryption.SecretManager;
import org.sourcelab.kafka.webview.ui.manager.file.FileManager;
import org.sourcelab.kafka.webview.ui.manager.file.FileStorageService;
import org.sourcelab.kafka.webview.ui.manager.file.FileType;
import org.sourcelab.kafka.webview.ui.manager.file.LocalDiskStorage;
import org.sourcelab.kafka.webview.ui.manager.kafka.KafkaAdminFactory;
import org.sourcelab.kafka.webview.ui.manager.kafka.KafkaClientConfigUtil;
import org.sourcelab.kafka.webview.ui.manager.kafka.KafkaConsumerFactory;
Expand All @@ -57,34 +61,32 @@ public class PluginConfig {

/**
* Upload manager, for handling uploads of Plugins and Keystores.
* @param appProperties Definition of app properties.
* @param fileManager for managing file persistence.
* @return UploadManager for Plugins
*/
@Bean
public UploadManager getPluginUploadManager(final AppProperties appProperties) {
return new UploadManager(appProperties.getUploadPath());
public UploadManager getPluginUploadManager(final FileManager fileManager) {
return new UploadManager(fileManager);
}

/**
* PluginFactory for creating instances of Deserializers.
* @param appProperties Definition of app properties.
* @param fileManager for managing file persistence.
* @return PluginFactory for Deserializers.
*/
@Bean
public PluginFactory<Deserializer> getDeserializerPluginFactory(final AppProperties appProperties) {
final String jarDirectory = appProperties.getUploadPath() + "/deserializers";
return new PluginFactory<>(jarDirectory, Deserializer.class);
@Bean("PluginFactoryForDeserializer")
public PluginFactory<Deserializer> getDeserializerPluginFactory(final FileManager fileManager) {
return new PluginFactory<>(FileType.DESERIALIZER, Deserializer.class, fileManager);
}

/**
* PluginFactory for creating instances of Record Filters.
* @param appProperties Definition of app properties.
* @param fileManager for managing file persistence.
* @return PluginFactory for Record Filters.
*/
@Bean
public PluginFactory<RecordFilter> getRecordFilterPluginFactory(final AppProperties appProperties) {
final String jarDirectory = appProperties.getUploadPath() + "/filters";
return new PluginFactory<>(jarDirectory, RecordFilter.class);
@Bean("PluginFactoryForRecordFilter")
public PluginFactory<RecordFilter> getRecordFilterPluginFactory(final FileManager fileManager) {
return new PluginFactory<>(FileType.FILTER, RecordFilter.class, fileManager);
}

/**
Expand All @@ -103,7 +105,13 @@ public SecretManager getSecretManager(final AppProperties appProperties) {
* @return Web Kafka Consumer Factory instance.
*/
@Bean
public WebKafkaConsumerFactory getWebKafkaConsumerFactory(final AppProperties appProperties, final KafkaClientConfigUtil configUtil) {
public WebKafkaConsumerFactory getWebKafkaConsumerFactory(
final AppProperties appProperties,
final KafkaClientConfigUtil configUtil,
final PluginFactory<Deserializer> deserializerPluginFactory,
final PluginFactory<RecordFilter> recordFilterPluginFactory,
final SecretManager secretManager
) {
final ExecutorService executorService;

// If we have multi-threaded consumer option enabled
Expand All @@ -123,24 +131,26 @@ public WebKafkaConsumerFactory getWebKafkaConsumerFactory(final AppProperties ap
}

return new WebKafkaConsumerFactory(
getDeserializerPluginFactory(appProperties),
getRecordFilterPluginFactory(appProperties),
getSecretManager(appProperties),
deserializerPluginFactory,
recordFilterPluginFactory,
secretManager,
getKafkaConsumerFactory(configUtil),
executorService
);
}

/**
* For creating Kafka operational consumers.
* @param appProperties Definition of app properties.
* @param configUtil Utility for configuring kafka clients.
* @return Web Kafka Operations Client Factory instance.
*/
@Bean
public KafkaOperationsFactory getKafkaOperationsFactory(final AppProperties appProperties, final KafkaClientConfigUtil configUtil) {
public KafkaOperationsFactory getKafkaOperationsFactory(
final KafkaClientConfigUtil configUtil,
final SecretManager secretManager
) {
return new KafkaOperationsFactory(
getSecretManager(appProperties),
secretManager,
getKafkaAdminFactory(configUtil)
);
}
Expand Down Expand Up @@ -202,4 +212,21 @@ public KafkaClientConfigUtil getKafkaClientConfigUtil(final AppProperties appPro
public SaslUtility getSaslUtility(final SecretManager secretManager) {
return new SaslUtility(secretManager);
}

@Bean
public FileStorageService fileStorageService(final AppProperties appProperties) {
return new LocalDiskStorage(appProperties.getUploadPath());
}

/**
* Utility for managing file storage operations.
* @param fileStorageService Where to back filestorage.
* @param appProperties Definition of app properties.
* @return FileManager instance.
*/
@Bean
public FileManager fileManager(final FileStorageService fileStorageService, final AppProperties appProperties) {
final LocalDiskStorage localCacheStorage = new LocalDiskStorage(appProperties.getCachePath());
return new FileManager(fileStorageService, localCacheStorage);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,12 @@ public String clusterUpdate(
if (!clusterForm.exists() || (clusterForm.getTrustStoreFile() != null && !clusterForm.getTrustStoreFile().isEmpty())) {
// Delete previous trust store if updating
if (cluster.getTrustStoreFile() != null) {
uploadManager.deleteKeyStore(cluster.getTrustStoreFile());
try {
uploadManager.deleteKeyStore(cluster.getTrustStoreFile());
} catch (final IOException exception) {
// TODO handle
throw new RuntimeException(exception.getMessage(), exception);
}
cluster.setTrustStoreFile(null);
cluster.setTrustStorePassword(null);
}
Expand All @@ -257,7 +262,7 @@ public String clusterUpdate(
// Persist in model.
cluster.setTrustStoreFile(filename);
cluster.setTrustStorePassword(encrypted);
} catch (IOException exception) {
} catch (final IOException exception) {
// TODO handle
throw new RuntimeException(exception.getMessage(), exception);
}
Expand All @@ -266,7 +271,12 @@ public String clusterUpdate(
if (!clusterForm.exists() || (clusterForm.getKeyStoreFile() != null && !clusterForm.getKeyStoreFile().isEmpty())) {
// Delete previous key store if updating, or if SASL is enabled.
if (clusterForm.getSasl() || cluster.getKeyStoreFile() != null) {
uploadManager.deleteKeyStore(cluster.getKeyStoreFile());
try {
uploadManager.deleteKeyStore(cluster.getKeyStoreFile());
} catch (final IOException exception) {
// TODO handle
throw new RuntimeException(exception.getMessage(), exception);
}
cluster.setKeyStoreFile(null);
cluster.setKeyStorePassword(null);
}
Expand Down Expand Up @@ -298,8 +308,13 @@ public String clusterUpdate(
cluster.setSslEnabled(false);

// Remove from disk
uploadManager.deleteKeyStore(cluster.getKeyStoreFile());
uploadManager.deleteKeyStore(cluster.getTrustStoreFile());
try {
uploadManager.deleteKeyStore(cluster.getKeyStoreFile());
uploadManager.deleteKeyStore(cluster.getTrustStoreFile());
} catch (final IOException exception) {
// TODO handle
throw new RuntimeException(exception.getMessage(), exception);
}

// Null out fields
cluster.setKeyStoreFile(null);
Expand Down Expand Up @@ -376,11 +391,16 @@ public String deleteCluster(@PathVariable final Long id, final RedirectAttribute
final Cluster cluster = clusterOptional.get();

// Delete KeyStores
if (cluster.getTrustStoreFile() != null) {
uploadManager.deleteKeyStore(cluster.getTrustStoreFile());
}
if (cluster.getKeyStoreFile() != null) {
uploadManager.deleteKeyStore(cluster.getKeyStoreFile());
try {
if (cluster.getTrustStoreFile() != null) {
uploadManager.deleteKeyStore(cluster.getTrustStoreFile());
}
if (cluster.getKeyStoreFile() != null) {
uploadManager.deleteKeyStore(cluster.getKeyStoreFile());
}
} catch (final IOException exception) {
// TODO handle
throw new RuntimeException(exception.getMessage(), exception);
}

// Delete it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@

import org.sourcelab.kafka.webview.ui.controller.BaseController;
import org.sourcelab.kafka.webview.ui.controller.configuration.filter.forms.FilterForm;
import org.sourcelab.kafka.webview.ui.manager.file.FileManager;
import org.sourcelab.kafka.webview.ui.manager.file.FileType;
import org.sourcelab.kafka.webview.ui.manager.plugin.PluginFactory;
import org.sourcelab.kafka.webview.ui.manager.plugin.UploadManager;
import org.sourcelab.kafka.webview.ui.manager.plugin.exception.LoaderException;
Expand Down Expand Up @@ -69,6 +71,9 @@ public class FilterConfigController extends BaseController {
@Autowired
private UploadManager uploadManager;

@Autowired
private FileManager fileManager;

@Autowired
private PluginFactory<RecordFilter> recordFilterPluginFactory;

Expand Down Expand Up @@ -218,7 +223,6 @@ public String update(

// Persist jar on filesystem into temp location
final String tmpJarLocation = uploadManager.handleFilterUpload(file, tmpFilename);
final String finalJarLocation = tmpJarLocation.substring(0, tmpJarLocation.lastIndexOf(".tmp"));

// Attempt to load jar?
final String filterOptionNames;
Expand All @@ -230,7 +234,7 @@ public String update(
filterOptionNames = filterOptions.stream().collect(Collectors.joining(","));
} catch (final LoaderException exception) {
// Remove jar
Files.delete(new File(tmpJarLocation).toPath());
fileManager.deleteFile(tmpJarLocation, FileType.FILTER);

bindingResult.addError(new FieldError(
"filterForm", "file", "", true, null, null, exception.getMessage())
Expand All @@ -239,10 +243,7 @@ public String update(
}

// If successful overwrite original jar
final Path tmpJarPath = new File(tmpJarLocation).toPath();
final Path finalJarPath = new File(finalJarLocation).toPath();
Files.deleteIfExists(finalJarPath);
Files.move(tmpJarPath, finalJarPath);
fileManager.moveFile(tmpFilename, filename, FileType.FILTER);

// Set properties
filter.setClasspath(filterForm.getClasspath());
Expand Down Expand Up @@ -289,7 +290,7 @@ public String delete(@PathVariable final Long id, final RedirectAttributes redir
filterRepository.deleteById(id);

// Delete jar from disk
Files.delete(recordFilterPluginFactory.getPathForJar(filter.getJar()));
fileManager.deleteFile(filter.getJar(), FileType.FILTER);
redirectAttributes.addFlashAttribute("FlashMessage", FlashMessage.newSuccess("Deleted filter!"));
} catch (IOException e) {
e.printStackTrace();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.apache.kafka.common.serialization.Deserializer;
import org.sourcelab.kafka.webview.ui.controller.BaseController;
import org.sourcelab.kafka.webview.ui.controller.configuration.messageformat.forms.MessageFormatForm;
import org.sourcelab.kafka.webview.ui.manager.file.FileManager;
import org.sourcelab.kafka.webview.ui.manager.file.FileType;
import org.sourcelab.kafka.webview.ui.manager.plugin.PluginFactory;
import org.sourcelab.kafka.webview.ui.manager.plugin.UploadManager;
import org.sourcelab.kafka.webview.ui.manager.plugin.exception.LoaderException;
Expand Down Expand Up @@ -71,6 +73,9 @@ public class MessageFormatController extends BaseController {
@Autowired
private UploadManager uploadManager;

@Autowired
private FileManager fileManager;

@Autowired
private PluginFactory<Deserializer> deserializerLoader;

Expand Down Expand Up @@ -229,7 +234,7 @@ public String create(
deserializerLoader.checkPlugin(tempFilename, messageFormatForm.getClasspath());
} catch (final LoaderException exception) {
// If we had issues, remove the temp location
Files.delete(Paths.get(jarPath));
fileManager.deleteFile(jarPath, FileType.DESERIALIZER);

// Add an error
bindingResult.addError(new FieldError(
Expand All @@ -242,13 +247,11 @@ public String create(
// 1 - remove pre-existing jar if it exists
if (messageFormat.getJar() != null && !messageFormat.getJar().isEmpty()) {
// Delete pre-existing jar.
Files.deleteIfExists(deserializerLoader.getPathForJar(messageFormat.getJar()));
fileManager.deleteFile(messageFormat.getJar(), FileType.DESERIALIZER);
}

// 2 - move tempFilename => filename.
// Lets just delete the temp path and re-handle the upload.
Files.deleteIfExists(Paths.get(jarPath));
uploadManager.handleDeserializerUpload(file, newFilename);
fileManager.moveFile(tempFilename, newFilename, FileType.DESERIALIZER);

// 3 - Update the jar and class path properties.
messageFormat.setJar(newFilename);
Expand Down Expand Up @@ -336,7 +339,7 @@ public String deleteCluster(@PathVariable final Long id, final RedirectAttribute

// Delete jar from disk
try {
Files.deleteIfExists(deserializerLoader.getPathForJar(messageFormat.getJar()));
fileManager.deleteFile(messageFormat.getJar(), FileType.DESERIALIZER);
} catch (final NoSuchFileException exception) {
// swallow.
}
Expand Down
Loading