Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wait until GeoIP data to be replicated to all data nodes #348

Merged
merged 1 commit into from
Jul 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.opensearch.OpenSearchException;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.geospatial.annotation.VisibleForTesting;
import org.opensearch.geospatial.ip2geo.common.DatasourceManifest;
import org.opensearch.geospatial.ip2geo.common.DatasourceState;
import org.opensearch.geospatial.ip2geo.dao.DatasourceDao;
Expand All @@ -30,6 +31,8 @@

@Log4j2
public class DatasourceUpdateService {
private static final int SLEEP_TIME_IN_MILLIS = 5000; // 5 seconds
private static final int MAX_WAIT_TIME_FOR_REPLICATION_TO_COMPLETE_IN_MILLIS = 10 * 60 * 60 * 1000; // 10 hours
private final ClusterService clusterService;
private final ClusterSettings clusterSettings;
private final DatasourceDao datasourceDao;
Expand Down Expand Up @@ -86,10 +89,36 @@ public void updateOrCreateGeoIpData(final Datasource datasource, final Runnable
geoIpDataDao.putGeoIpData(indexName, header, reader.iterator(), renewLock);
}

waitUntilAllShardsStarted(indexName, MAX_WAIT_TIME_FOR_REPLICATION_TO_COMPLETE_IN_MILLIS);
Instant endTime = Instant.now();
updateDatasourceAsSucceeded(indexName, datasource, manifest, fieldsToStore, startTime, endTime);
}

/**
* We wait until all shards are ready to serve search requests before updating datasource metadata to
* point to a new index so that there won't be latency degradation during GeoIP data update
*
* @param indexName the indexName
*/
@VisibleForTesting
protected void waitUntilAllShardsStarted(final String indexName, final int timeout) {
Instant start = Instant.now();
try {
while (Instant.now().toEpochMilli() - start.toEpochMilli() < timeout) {
if (clusterService.state().routingTable().allShards(indexName).stream().allMatch(shard -> shard.started())) {
return;
}
Thread.sleep(SLEEP_TIME_IN_MILLIS);
}
throw new OpenSearchException(
"index[{}] replication did not complete after {} millis",
MAX_WAIT_TIME_FOR_REPLICATION_TO_COMPLETE_IN_MILLIS
);
} catch (InterruptedException e) {
throw new RuntimeException(e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add a test case for scenario when thread execution is interrupted, if that is possible

}
}

/**
* Return header fields of geo data with given url of a manifest file
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ public abstract class Ip2GeoTestCase extends RestActionTestCase {
protected Ip2GeoLockService ip2GeoLockService;
@Mock
protected Ip2GeoProcessorDao ip2GeoProcessorDao;
@Mock
protected RoutingTable routingTable;
protected IngestMetadata ingestMetadata;
protected NoOpNodeClient client;
protected VerifyingClient verifyingClient;
Expand All @@ -119,7 +121,7 @@ public void prepareIp2GeoTestCase() {
when(clusterService.state()).thenReturn(clusterState);
when(clusterState.metadata()).thenReturn(metadata);
when(clusterState.getMetadata()).thenReturn(metadata);
when(clusterState.routingTable()).thenReturn(RoutingTable.EMPTY_ROUTING_TABLE);
when(clusterState.routingTable()).thenReturn(routingTable);
when(ip2GeoExecutor.forDatasourceUpdate()).thenReturn(OpenSearchExecutors.newDirectExecutorService());
when(ingestService.getClusterService()).thenReturn(clusterService);
when(threadPool.generic()).thenReturn(OpenSearchExecutors.newDirectExecutorService());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.geospatial.ip2geo.jobscheduler;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.mock;
Expand All @@ -28,6 +29,7 @@
import org.apache.commons.csv.CSVParser;
import org.junit.Before;
import org.opensearch.OpenSearchException;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.common.SuppressForbidden;
import org.opensearch.geospatial.GeospatialTestHelper;
import org.opensearch.geospatial.ip2geo.Ip2GeoTestCase;
Expand Down Expand Up @@ -134,6 +136,9 @@ public void testUpdateOrCreateGeoIpData_whenValidInput_thenSucceed() {

File sampleFile = new File(this.getClass().getClassLoader().getResource("ip2geo/sample_valid.csv").getFile());
when(geoIpDataDao.getDatabaseReader(any())).thenReturn(CSVParser.parse(sampleFile, StandardCharsets.UTF_8, CSVFormat.RFC4180));
ShardRouting shardRouting = mock(ShardRouting.class);
when(shardRouting.started()).thenReturn(true);
when(routingTable.allShards(anyString())).thenReturn(Arrays.asList(shardRouting));

Datasource datasource = new Datasource();
datasource.setState(DatasourceState.AVAILABLE);
Expand All @@ -158,6 +163,34 @@ public void testUpdateOrCreateGeoIpData_whenValidInput_thenSucceed() {
verify(geoIpDataDao).putGeoIpData(eq(datasource.currentIndexName()), isA(String[].class), any(Iterator.class), any(Runnable.class));
}

public void testWaitUntilAllShardsStarted_whenTimedOut_thenThrowException() {
String indexName = GeospatialTestHelper.randomLowerCaseString();
ShardRouting shardRouting = mock(ShardRouting.class);
when(shardRouting.started()).thenReturn(false);
when(routingTable.allShards(indexName)).thenReturn(Arrays.asList(shardRouting));

// Run
Exception e = expectThrows(OpenSearchException.class, () -> datasourceUpdateService.waitUntilAllShardsStarted(indexName, 10));

// Verify
assertTrue(e.getMessage().contains("did not complete"));
}

@SneakyThrows
public void testWaitUntilAllShardsStarted_whenInterrupted_thenThrowException() {
String indexName = GeospatialTestHelper.randomLowerCaseString();
ShardRouting shardRouting = mock(ShardRouting.class);
when(shardRouting.started()).thenReturn(false);
when(routingTable.allShards(indexName)).thenReturn(Arrays.asList(shardRouting));

// Run
Thread.currentThread().interrupt();
Exception e = expectThrows(RuntimeException.class, () -> datasourceUpdateService.waitUntilAllShardsStarted(indexName, 10));

// Verify
assertEquals(InterruptedException.class, e.getCause().getClass());
}

@SneakyThrows
public void testGetHeaderFields_whenValidInput_thenReturnCorrectValue() {
File manifestFile = new File(this.getClass().getClassLoader().getResource("ip2geo/manifest.json").getFile());
Expand Down