Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP to show Geospatial plugin using LockService instance from JS #1

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,4 @@ See the [CONTRIBUTING guide](./CONTRIBUTING.md#Changelog) for instructions on ho
### Documentation
### Maintenance
### Refactoring
- Use instance of LockService instantiated in JobScheduler through Guice ([#677](https://github.com/opensearch-project/geospatial/pull/677))
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import java.util.concurrent.atomic.AtomicReference;

import org.opensearch.OpenSearchException;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.core.action.ActionListener;
import org.opensearch.jobscheduler.spi.LockModel;
Expand All @@ -30,17 +29,19 @@ public class Ip2GeoLockService {
public static final long LOCK_DURATION_IN_SECONDS = 300l;
public static final long RENEW_AFTER_IN_SECONDS = 120l;
private final ClusterService clusterService;
private final LockService lockService;
private LockService lockService;

/**
* Constructor
*
* @param clusterService the cluster service
* @param client the client
*/
public Ip2GeoLockService(final ClusterService clusterService, final Client client) {
public Ip2GeoLockService(final ClusterService clusterService) {
this.clusterService = clusterService;
this.lockService = new LockService(client, clusterService);
}

public void initialize(final LockService lockService) {
this.lockService = lockService;
}

/**
Expand All @@ -54,6 +55,9 @@ public Ip2GeoLockService(final ClusterService clusterService, final Client clien
* @param listener the listener
*/
public void acquireLock(final String datasourceName, final Long lockDurationSeconds, final ActionListener<LockModel> listener) {
if (lockService == null) {
throw new OpenSearchException("Ip2GeoLockService is not initialized");
}
lockService.acquireLockWithId(JOB_INDEX_NAME, lockDurationSeconds, datasourceName, listener);
}

Expand All @@ -65,6 +69,9 @@ public void acquireLock(final String datasourceName, final Long lockDurationSeco
* @return lock model
*/
public Optional<LockModel> acquireLock(final String datasourceName, final Long lockDurationSeconds) {
if (lockService == null) {
throw new OpenSearchException("Ip2GeoLockService is not initialized");
}
AtomicReference<LockModel> lockReference = new AtomicReference();
CountDownLatch countDownLatch = new CountDownLatch(1);
lockService.acquireLockWithId(JOB_INDEX_NAME, lockDurationSeconds, datasourceName, new ActionListener<>() {
Expand Down Expand Up @@ -95,6 +102,9 @@ public void onFailure(final Exception e) {
* @param lockModel the lock model
*/
public void releaseLock(final LockModel lockModel) {
if (lockService == null) {
throw new OpenSearchException("Ip2GeoLockService is not initialized");
}
lockService.release(
lockModel,
ActionListener.wrap(released -> {}, exception -> log.error("Failed to release the lock", exception))
Expand All @@ -108,6 +118,9 @@ public void releaseLock(final LockModel lockModel) {
* @return renewed lock if renew succeed and null otherwise
*/
public LockModel renewLock(final LockModel lockModel) {
if (lockService == null) {
throw new OpenSearchException("Ip2GeoLockService is not initialized");
}
AtomicReference<LockModel> lockReference = new AtomicReference();
CountDownLatch countDownLatch = new CountDownLatch(1);
lockService.renewLock(lockModel, new ActionListener<>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,14 @@
import org.opensearch.action.ActionRequest;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.collect.MapBuilder;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.lifecycle.Lifecycle;
import org.opensearch.common.lifecycle.LifecycleComponent;
import org.opensearch.common.lifecycle.LifecycleListener;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.IndexScopedSettings;
import org.opensearch.common.settings.Setting;
Expand Down Expand Up @@ -75,7 +79,9 @@
import org.opensearch.index.mapper.Mapper;
import org.opensearch.indices.SystemIndexDescriptor;
import org.opensearch.ingest.Processor;
import org.opensearch.jobscheduler.spi.utils.LockService;
import org.opensearch.plugins.ActionPlugin;
import org.opensearch.plugins.ClusterPlugin;
import org.opensearch.plugins.IngestPlugin;
import org.opensearch.plugins.MapperPlugin;
import org.opensearch.plugins.Plugin;
Expand All @@ -96,11 +102,22 @@
* to interact with Cluster.
*/
@Log4j2
public class GeospatialPlugin extends Plugin implements IngestPlugin, ActionPlugin, MapperPlugin, SearchPlugin, SystemIndexPlugin {
public class GeospatialPlugin extends Plugin
implements
IngestPlugin,
ActionPlugin,
MapperPlugin,
SearchPlugin,
SystemIndexPlugin,
ClusterPlugin {
private Ip2GeoCachedDao ip2GeoCachedDao;
private DatasourceDao datasourceDao;
private GeoIpDataDao geoIpDataDao;
private URLDenyListChecker urlDenyListChecker;
private ClusterService clusterService;
private Ip2GeoLockService ip2GeoLockService;
private Ip2GeoExecutor ip2GeoExecutor;
private DatasourceUpdateService datasourceUpdateService;

@Override
public Collection<SystemIndexDescriptor> getSystemIndexDescriptors(Settings settings) {
Expand Down Expand Up @@ -129,7 +146,10 @@ public void onIndexModule(IndexModule indexModule) {

@Override
public Collection<Class<? extends LifecycleComponent>> getGuiceServiceClasses() {
return List.of(Ip2GeoListener.class);
final List<Class<? extends LifecycleComponent>> services = new ArrayList<>(2);
services.add(Ip2GeoListener.class);
services.add(GuiceHolder.class);
return services;
}

@Override
Expand Down Expand Up @@ -158,20 +178,10 @@ public Collection<Object> createComponents(
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<RepositoriesService> repositoriesServiceSupplier
) {
DatasourceUpdateService datasourceUpdateService = new DatasourceUpdateService(
clusterService,
datasourceDao,
geoIpDataDao,
urlDenyListChecker
);
Ip2GeoExecutor ip2GeoExecutor = new Ip2GeoExecutor(threadPool);
Ip2GeoLockService ip2GeoLockService = new Ip2GeoLockService(clusterService, client);
/**
* We don't need to return datasource runner because it is used only by job scheduler and job scheduler
* does not use DI but it calls DatasourceExtension#getJobRunner to get DatasourceRunner instance.
*/
DatasourceRunner.getJobRunnerInstance()
.initialize(clusterService, datasourceUpdateService, ip2GeoExecutor, datasourceDao, ip2GeoLockService);
this.clusterService = clusterService;
this.datasourceUpdateService = new DatasourceUpdateService(clusterService, datasourceDao, geoIpDataDao, urlDenyListChecker);
this.ip2GeoExecutor = new Ip2GeoExecutor(threadPool);
this.ip2GeoLockService = new Ip2GeoLockService(clusterService);

return List.of(
UploadStats.getInstance(),
Expand Down Expand Up @@ -265,4 +275,48 @@ public List<AggregationSpec> getAggregations() {

return List.of(geoHexGridSpec);
}

@Override
public void onNodeStarted(DiscoveryNode localNode) {
LockService lockService = GuiceHolder.getLockService();
ip2GeoLockService.initialize(lockService);

DatasourceRunner.getJobRunnerInstance()
.initialize(this.clusterService, this.datasourceUpdateService, this.ip2GeoExecutor, this.datasourceDao, this.ip2GeoLockService);
}

public static class GuiceHolder implements LifecycleComponent {

private static LockService lockService;

@Inject
public GuiceHolder(final LockService lockService) {
GuiceHolder.lockService = lockService;
}

static LockService getLockService() {
return lockService;
}

@Override
public void close() {}

@Override
public Lifecycle.State lifecycleState() {
return null;
}

@Override
public void addLifecycleListener(LifecycleListener listener) {}

@Override
public void removeLifecycleListener(LifecycleListener listener) {}

@Override
public void start() {}

@Override
public void stop() {}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.opensearch.common.network.NetworkModule;
import org.opensearch.common.settings.Settings;
import org.opensearch.env.Environment;
import org.opensearch.geospatial.plugin.GeospatialPlugin;
import org.opensearch.node.MockNode;
import org.opensearch.node.Node;
import org.opensearch.plugins.Plugin;
Expand All @@ -49,7 +48,7 @@ private List<Class<? extends Plugin>> basePlugins() {
List<Class<? extends Plugin>> plugins = new ArrayList<>();
plugins.add(getTestTransportPlugin());
plugins.add(MockHttpTransport.TestPlugin.class);
plugins.add(GeospatialPlugin.class);
plugins.add(TestGeospatialPlugin.class);
return plugins;
}

Expand Down
29 changes: 29 additions & 0 deletions src/test/java/org/opensearch/geospatial/TestGeospatialPlugin.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.geospatial;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;

import org.opensearch.common.lifecycle.LifecycleComponent;
import org.opensearch.geospatial.ip2geo.listener.Ip2GeoListener;
import org.opensearch.geospatial.plugin.GeospatialPlugin;

/**
* This class is needed for ClusterSettingsHelper.createMockNode to instantiate a test instance of the
* GeospatialPlugin without the JobSchedulerPlugin installed. Without overriding this class, the
* GeospatialPlugin would try to Inject JobScheduler's LockService in the GuiceHolder which will
* fail because JobScheduler is not installed
*/
public class TestGeospatialPlugin extends GeospatialPlugin {
@Override
public Collection<Class<? extends LifecycleComponent>> getGuiceServiceClasses() {
final List<Class<? extends LifecycleComponent>> services = new ArrayList<>(1);
services.add(Ip2GeoListener.class);
return services;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,19 @@
import org.opensearch.geospatial.GeospatialTestHelper;
import org.opensearch.geospatial.ip2geo.Ip2GeoTestCase;
import org.opensearch.jobscheduler.spi.LockModel;
import org.opensearch.jobscheduler.spi.utils.LockService;

public class Ip2GeoLockServiceTests extends Ip2GeoTestCase {
private Ip2GeoLockService ip2GeoLockService;
private Ip2GeoLockService noOpsLockService;

@Before
public void init() {
ip2GeoLockService = new Ip2GeoLockService(clusterService, verifyingClient);
noOpsLockService = new Ip2GeoLockService(clusterService, client);
ip2GeoLockService = new Ip2GeoLockService(clusterService);
noOpsLockService = new Ip2GeoLockService(clusterService);
// TODO Remove direct instantiation and offer a TestLockService class to plugins
ip2GeoLockService.initialize(new LockService(verifyingClient, clusterService));
noOpsLockService.initialize(new LockService(client, clusterService));
}

public void testAcquireLock_whenValidInput_thenSucceed() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ public void testCreateComponents() {
}

public void testGetGuiceServiceClasses() {
Collection<Class<? extends LifecycleComponent>> classes = List.of(Ip2GeoListener.class);
Collection<Class<? extends LifecycleComponent>> classes = List.of(Ip2GeoListener.class, GeospatialPlugin.GuiceHolder.class);
assertEquals(classes, plugin.getGuiceServiceClasses());
}

Expand Down
Loading