Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use instance of LockService instantiated in JobScheduler through Guice #677

Merged
merged 11 commits into from
Jan 21, 2025
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,4 @@ See the [CONTRIBUTING guide](./CONTRIBUTING.md#Changelog) for instructions on ho
### Documentation
### Maintenance
### Refactoring
- Use instance of LockService instantiated in JobScheduler through Guice ([#677](https://github.com/opensearch-project/geospatial/pull/677))
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import java.util.concurrent.atomic.AtomicReference;

import org.opensearch.OpenSearchException;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.core.action.ActionListener;
import org.opensearch.jobscheduler.spi.LockModel;
Expand All @@ -30,17 +29,19 @@ public class Ip2GeoLockService {
public static final long LOCK_DURATION_IN_SECONDS = 300l;
public static final long RENEW_AFTER_IN_SECONDS = 120l;
private final ClusterService clusterService;
private final LockService lockService;
private LockService lockService;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't it be better to make Ip2GeoLockService to be LifecycleComponent and get injected both cluster service and lock service?

Copy link
Member Author

@cwperks cwperks Jan 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is still being returned from createComponents so that its injectable. The difference in this PR is that it now implements onNodeStarted to get the instance of LockService instantiated by the job scheduler from the GuiceHolder and then sets it on the Ip2GeoLockService (Lock Service Wrapper) instance.

Copy link
Collaborator

@heemin32 heemin32 Jan 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking like

public class Ip2GeoLockService implements LifecycleComponent { 
  private final static Ip2GeoLockService lockService;
  
  public static Ip2GeoLockService getLockService() {
    return lockService;
  }

  @Inject
  public Ip2GeoLockService(final ClusterService clusterService, final LockService lockService) {
  }
}


@Override
public Collection<Class<? extends LifecycleComponent>> getGuiceServiceClasses() {
    final List<Class<? extends LifecycleComponent>> services = new ArrayList<>(2);
    services.add(Ip2GeoListener.class);
    services.add(Ip2GeoLockService.class);
    return services;
}

This way, we don't need an additional class.
However, this approach opens up the possibility for the public static Ip2GeoLockService getLockService() method to be used across different parts of the code which is not desirable. Therefore, I think having a separate class might be better in that regard.

Copy link
Member Author

@cwperks cwperks Jan 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That looks like a good solution, but how do you refer to the instance of the LifecycleComponent that's created?

For instance this line:

DatasourceRunner.getJobRunnerInstance()
            .initialize(this.clusterService, this.datasourceUpdateService, this.ip2GeoExecutor, this.datasourceDao, this.ip2GeoLockService);

It takes the ip2GeoLockService as the last arg. When using the service class, how do you get a hold of an instance?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can call Ip2GeoLockService.getLockService()

However, I think having a GuiceHolder class might be better not to expose that static method.


/**
* Constructor
*
* @param clusterService the cluster service
* @param client the client
*/
public Ip2GeoLockService(final ClusterService clusterService, final Client client) {
public Ip2GeoLockService(final ClusterService clusterService) {
this.clusterService = clusterService;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might need to add lockService not initialized check in every Ip2GeoLockService methods.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should never happen (i.e. onNodeStarted will always be called and the lock service will get set accordingly) but these checks would be good to put in none-the-less.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the checks

this.lockService = new LockService(client, clusterService);
}

public void initialize(final LockService lockService) {
this.lockService = lockService;
}

/**
Expand All @@ -54,6 +55,9 @@ public Ip2GeoLockService(final ClusterService clusterService, final Client clien
* @param listener the listener
*/
public void acquireLock(final String datasourceName, final Long lockDurationSeconds, final ActionListener<LockModel> listener) {
if (lockService == null) {
throw new OpenSearchException("Ip2GeoLockService is not initialized");
}
lockService.acquireLockWithId(JOB_INDEX_NAME, lockDurationSeconds, datasourceName, listener);
}

Expand All @@ -65,6 +69,9 @@ public void acquireLock(final String datasourceName, final Long lockDurationSeco
* @return lock model
*/
public Optional<LockModel> acquireLock(final String datasourceName, final Long lockDurationSeconds) {
if (lockService == null) {
throw new OpenSearchException("Ip2GeoLockService is not initialized");
}
AtomicReference<LockModel> lockReference = new AtomicReference();
CountDownLatch countDownLatch = new CountDownLatch(1);
lockService.acquireLockWithId(JOB_INDEX_NAME, lockDurationSeconds, datasourceName, new ActionListener<>() {
Expand Down Expand Up @@ -95,6 +102,9 @@ public void onFailure(final Exception e) {
* @param lockModel the lock model
*/
public void releaseLock(final LockModel lockModel) {
if (lockService == null) {
throw new OpenSearchException("Ip2GeoLockService is not initialized");
}
lockService.release(
lockModel,
ActionListener.wrap(released -> {}, exception -> log.error("Failed to release the lock", exception))
Expand All @@ -108,6 +118,9 @@ public void releaseLock(final LockModel lockModel) {
* @return renewed lock if renew succeed and null otherwise
*/
public LockModel renewLock(final LockModel lockModel) {
if (lockService == null) {
throw new OpenSearchException("Ip2GeoLockService is not initialized");
}
AtomicReference<LockModel> lockReference = new AtomicReference();
CountDownLatch countDownLatch = new CountDownLatch(1);
lockService.renewLock(lockModel, new ActionListener<>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,14 @@
import org.opensearch.action.ActionRequest;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.collect.MapBuilder;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.lifecycle.Lifecycle;
import org.opensearch.common.lifecycle.LifecycleComponent;
import org.opensearch.common.lifecycle.LifecycleListener;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.IndexScopedSettings;
import org.opensearch.common.settings.Setting;
Expand Down Expand Up @@ -75,7 +79,9 @@
import org.opensearch.index.mapper.Mapper;
import org.opensearch.indices.SystemIndexDescriptor;
import org.opensearch.ingest.Processor;
import org.opensearch.jobscheduler.spi.utils.LockService;
import org.opensearch.plugins.ActionPlugin;
import org.opensearch.plugins.ClusterPlugin;
import org.opensearch.plugins.IngestPlugin;
import org.opensearch.plugins.MapperPlugin;
import org.opensearch.plugins.Plugin;
Expand All @@ -96,11 +102,22 @@
* to interact with Cluster.
*/
@Log4j2
public class GeospatialPlugin extends Plugin implements IngestPlugin, ActionPlugin, MapperPlugin, SearchPlugin, SystemIndexPlugin {
public class GeospatialPlugin extends Plugin
implements
IngestPlugin,
ActionPlugin,
MapperPlugin,
SearchPlugin,
SystemIndexPlugin,
ClusterPlugin {
private Ip2GeoCachedDao ip2GeoCachedDao;
private DatasourceDao datasourceDao;
private GeoIpDataDao geoIpDataDao;
private URLDenyListChecker urlDenyListChecker;
private ClusterService clusterService;
private Ip2GeoLockService ip2GeoLockService;
private Ip2GeoExecutor ip2GeoExecutor;
private DatasourceUpdateService datasourceUpdateService;

@Override
public Collection<SystemIndexDescriptor> getSystemIndexDescriptors(Settings settings) {
Expand Down Expand Up @@ -129,7 +146,10 @@ public void onIndexModule(IndexModule indexModule) {

@Override
public Collection<Class<? extends LifecycleComponent>> getGuiceServiceClasses() {
return List.of(Ip2GeoListener.class);
final List<Class<? extends LifecycleComponent>> services = new ArrayList<>(2);
services.add(Ip2GeoListener.class);
services.add(GuiceHolder.class);
return services;
}

@Override
Expand Down Expand Up @@ -158,20 +178,10 @@ public Collection<Object> createComponents(
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<RepositoriesService> repositoriesServiceSupplier
) {
DatasourceUpdateService datasourceUpdateService = new DatasourceUpdateService(
clusterService,
datasourceDao,
geoIpDataDao,
urlDenyListChecker
);
Ip2GeoExecutor ip2GeoExecutor = new Ip2GeoExecutor(threadPool);
Ip2GeoLockService ip2GeoLockService = new Ip2GeoLockService(clusterService, client);
/**
* We don't need to return datasource runner because it is used only by job scheduler and job scheduler
* does not use DI but it calls DatasourceExtension#getJobRunner to get DatasourceRunner instance.
*/
DatasourceRunner.getJobRunnerInstance()
.initialize(clusterService, datasourceUpdateService, ip2GeoExecutor, datasourceDao, ip2GeoLockService);
this.clusterService = clusterService;
this.datasourceUpdateService = new DatasourceUpdateService(clusterService, datasourceDao, geoIpDataDao, urlDenyListChecker);
this.ip2GeoExecutor = new Ip2GeoExecutor(threadPool);
this.ip2GeoLockService = new Ip2GeoLockService(clusterService);

return List.of(
UploadStats.getInstance(),
Expand Down Expand Up @@ -265,4 +275,48 @@ public List<AggregationSpec> getAggregations() {

return List.of(geoHexGridSpec);
}

@Override
public void onNodeStarted(DiscoveryNode localNode) {
LockService lockService = GuiceHolder.getLockService();
ip2GeoLockService.initialize(lockService);

DatasourceRunner.getJobRunnerInstance()
.initialize(this.clusterService, this.datasourceUpdateService, this.ip2GeoExecutor, this.datasourceDao, this.ip2GeoLockService);
}

public static class GuiceHolder implements LifecycleComponent {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move this class to its own file?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its a static inner class so I think it needs to be defined in this file. This is following a similar pattern present in the security plugin. This is a way to do dependency injection outside of the transport layer.

Copy link
Collaborator

@heemin32 heemin32 Jan 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does not matter where and how the class is defined. As long as you return your class in getGuiceServiceClasses(), the dependency will get injected.

   @Override
    public Collection<Class<? extends LifecycleComponent>> getGuiceServiceClasses() {
        final List<Class<? extends LifecycleComponent>> services = new ArrayList<>(2);
        services.add(Ip2GeoListener.class);
        services.add(GuiceHolder.class);
        return services;
    }

Anyway, I think having this class inside Plugin class might make sense if we want it to be used only inside this Plugin class.
private or package private might be better instead of public then.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switched to package-private

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like it needs to be public:

ref: https://github.com/opensearch-project/geospatial/actions/runs/12895082709/job/35955219215?pr=677

?  java.lang.AssertionError: Wrong access modifiers on public org.opensearch.geospatial.plugin.GeospatialPlugin$GuiceHolder(org.opensearch.jobscheduler.spi.utils.LockService)
?  	at org.opensearch.common.inject.DefaultConstructionProxyFactory$1.newInstance(DefaultConstructionProxyFactory.java:69)
?  	at org.opensearch.common.inject.ConstructorInjector.construct(ConstructorInjector.java:101)
?  	at org.opensearch.common.inject.ConstructorBindingImpl$Factory.get(ConstructorBindingImpl.java:140)
?  	at org.opensearch.common.inject.InjectorImpl$4$1.call(InjectorImpl.java:753)
?  	at org.opensearch.common.inject.InjectorImpl.callInContext(InjectorImpl.java:809)
?  	at org.opensearch.common.inject.InjectorImpl$4.get(InjectorImpl.java:748)
?  	at org.opensearch.common.inject.InjectorImpl.getInstance(InjectorImpl.java:[792](https://github.com/opensearch-project/geospatial/actions/runs/12895082709/job/35955219215?pr=677#step:6:793))
?  	at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
?  	at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1708)
?  	at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
?  	at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
?  	at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
?  	at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
?  	at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)
?  	at org.opensearch.node.Node.<init>(Node.java:1548)
?  	at org.opensearch.node.Node.<init>(Node.java:452)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In such case, making methods as package private might be enough.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any of the overridden methods need to be marked public. I marked getLockService as package-private.

Copy link
Collaborator

@heemin32 heemin32 Jan 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I was talking only for those static methods :) Sorry for the confusion.


private static LockService lockService;

@Inject
public GuiceHolder(final LockService lockService) {
GuiceHolder.lockService = lockService;
}

static LockService getLockService() {
return lockService;
}

@Override
public void close() {}

@Override
public Lifecycle.State lifecycleState() {
return null;
}

@Override
public void addLifecycleListener(LifecycleListener listener) {}

@Override
public void removeLifecycleListener(LifecycleListener listener) {}

@Override
public void start() {}

@Override
public void stop() {}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.opensearch.common.network.NetworkModule;
import org.opensearch.common.settings.Settings;
import org.opensearch.env.Environment;
import org.opensearch.geospatial.plugin.GeospatialPlugin;
import org.opensearch.node.MockNode;
import org.opensearch.node.Node;
import org.opensearch.plugins.Plugin;
Expand All @@ -49,7 +48,7 @@ private List<Class<? extends Plugin>> basePlugins() {
List<Class<? extends Plugin>> plugins = new ArrayList<>();
plugins.add(getTestTransportPlugin());
plugins.add(MockHttpTransport.TestPlugin.class);
plugins.add(GeospatialPlugin.class);
plugins.add(TestGeospatialPlugin.class);
return plugins;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.geospatial;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;

import org.opensearch.common.lifecycle.LifecycleComponent;
import org.opensearch.geospatial.ip2geo.listener.Ip2GeoListener;
import org.opensearch.geospatial.plugin.GeospatialPlugin;

/**
* This class is needed for ClusterSettingsHelper.createMockNode to instantiate a test instance of the
* GeospatialPlugin without the JobSchedulerPlugin installed. Without overriding this class, the
* GeospatialPlugin would try to Inject JobScheduler's LockService in the GuiceHolder which will
* fail because JobScheduler is not installed
*/
public class TestGeospatialPlugin extends GeospatialPlugin {
@Override
public Collection<Class<? extends LifecycleComponent>> getGuiceServiceClasses() {
final List<Class<? extends LifecycleComponent>> services = new ArrayList<>(1);
services.add(Ip2GeoListener.class);
return services;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,19 @@
import org.opensearch.geospatial.GeospatialTestHelper;
import org.opensearch.geospatial.ip2geo.Ip2GeoTestCase;
import org.opensearch.jobscheduler.spi.LockModel;
import org.opensearch.jobscheduler.spi.utils.LockService;

public class Ip2GeoLockServiceTests extends Ip2GeoTestCase {
private Ip2GeoLockService ip2GeoLockService;
private Ip2GeoLockService noOpsLockService;

@Before
public void init() {
ip2GeoLockService = new Ip2GeoLockService(clusterService, verifyingClient);
noOpsLockService = new Ip2GeoLockService(clusterService, client);
ip2GeoLockService = new Ip2GeoLockService(clusterService);
noOpsLockService = new Ip2GeoLockService(clusterService);
// TODO Remove direct instantiation and offer a TestLockService class to plugins
ip2GeoLockService.initialize(new LockService(verifyingClient, clusterService));
noOpsLockService.initialize(new LockService(client, clusterService));
}

public void testAcquireLock_whenValidInput_thenSucceed() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ public void testCreateComponents() {
}

public void testGetGuiceServiceClasses() {
Collection<Class<? extends LifecycleComponent>> classes = List.of(Ip2GeoListener.class);
Collection<Class<? extends LifecycleComponent>> classes = List.of(Ip2GeoListener.class, GeospatialPlugin.GuiceHolder.class);
assertEquals(classes, plugin.getGuiceServiceClasses());
}

Expand Down
Loading