Skip to content

Commit

Permalink
Added unit tests with some refactoring of codes
Browse files Browse the repository at this point in the history
Signed-off-by: Heemin Kim <heemin@amazon.com>
  • Loading branch information
heemin32 committed Apr 27, 2023
1 parent 4b29b4e commit 6306861
Show file tree
Hide file tree
Showing 34 changed files with 2,340 additions and 505 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.geospatial.annotation;

public @interface VisibleForTesting {
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import java.net.URL;
import java.util.Locale;

import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.log4j.Log4j2;
Expand All @@ -28,11 +29,12 @@
import org.opensearch.geospatial.ip2geo.common.DatasourceManifest;

/**
* GeoIP datasource creation request
* Ip2Geo datasource creation request
*/
@Getter
@Setter
@Log4j2
@EqualsAndHashCode
public class PutDatasourceRequest extends AcknowledgedRequest<PutDatasourceRequest> {
private static final ParseField ENDPOINT_FIELD = new ParseField("endpoint");
private static final ParseField UPDATE_INTERVAL_IN_DAYS_FIELD = new ParseField("update_interval_in_days");
Expand Down Expand Up @@ -147,7 +149,7 @@ private void validateManifestFile(final URL url, final ActionRequestValidationEx
errors.addValidationError(
String.format(
Locale.ROOT,
"updateInterval %d is should be smaller than %d",
"updateInterval %d should be smaller than %d",
updateIntervalInDays.days(),
manifest.getValidForInDays()
)
Expand All @@ -156,12 +158,12 @@ private void validateManifestFile(final URL url, final ActionRequestValidationEx
}

/**
* Validate updateIntervalInDays is larger than 0
* Validate updateIntervalInDays is equal or larger than 1
*
* @param errors the errors to add error messages
*/
private void validateUpdateInterval(final ActionRequestValidationException errors) {
if (updateIntervalInDays.compareTo(TimeValue.timeValueDays(1)) > 0) {
if (updateIntervalInDays.compareTo(TimeValue.timeValueDays(1)) < 0) {
errors.addValidationError("Update interval should be equal to or larger than 1 day");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
import java.net.URL;
import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

import lombok.extern.log4j.Log4j2;

Expand All @@ -34,6 +36,7 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.xcontent.json.JsonXContent;
import org.opensearch.geospatial.annotation.VisibleForTesting;
import org.opensearch.geospatial.ip2geo.common.DatasourceHelper;
import org.opensearch.geospatial.ip2geo.common.DatasourceManifest;
import org.opensearch.geospatial.ip2geo.common.DatasourceState;
Expand All @@ -54,6 +57,8 @@ public class PutDatasourceTransportAction extends HandledTransportAction<PutData
private final Client client;
private final ClusterService clusterService;
private final ThreadPool threadPool;
private final DatasourceHelper datasourceHelper;
private final GeoIpDataHelper geoIpDataHelper;

private TimeValue timeout;
private int indexingBulkSize;
Expand All @@ -76,12 +81,16 @@ public PutDatasourceTransportAction(
final ClusterService clusterService,
final ThreadPool threadPool,
final Settings settings,
final ClusterSettings clusterSettings
final ClusterSettings clusterSettings,
final DatasourceHelper datasourceHelper,
final GeoIpDataHelper geoIpDataHelper
) {
super(PutDatasourceAction.NAME, transportService, actionFilters, PutDatasourceRequest::new);
this.client = client;
this.clusterService = clusterService;
this.threadPool = threadPool;
this.datasourceHelper = datasourceHelper;
this.geoIpDataHelper = geoIpDataHelper;
timeout = Ip2GeoSettings.TIMEOUT_IN_SECONDS.get(settings);
clusterSettings.addSettingsUpdateConsumer(Ip2GeoSettings.TIMEOUT_IN_SECONDS, newValue -> timeout = newValue);
indexingBulkSize = Ip2GeoSettings.INDEXING_BULK_SIZE.get(settings);
Expand All @@ -91,99 +100,106 @@ public PutDatasourceTransportAction(
@Override
protected void doExecute(final Task task, final PutDatasourceRequest request, final ActionListener<AcknowledgedResponse> listener) {
try {
Datasource jobParameter = Datasource.Builder.build(request);
Datasource datasource = Datasource.Builder.build(request);
IndexRequest indexRequest = new IndexRequest().index(DatasourceExtension.JOB_INDEX_NAME)
.id(jobParameter.getId())
.source(jobParameter.toXContent(JsonXContent.contentBuilder(), null))
.id(datasource.getId())
.source(datasource.toXContent(JsonXContent.contentBuilder(), null))
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.opType(DocWriteRequest.OpType.CREATE);
client.index(indexRequest, new ActionListener<>() {
@Override
public void onResponse(final IndexResponse indexResponse) {
// This is user initiated request. Therefore, we want to handle the first datasource update task in a generic thread
// pool.
threadPool.generic().submit(() -> {
try {
createDatasource(jobParameter);
} catch (Exception e) {
log.error("Failed to create datasource for {}", jobParameter.getId(), e);
jobParameter.getUpdateStats().setLastFailedAt(Instant.now());
jobParameter.setState(DatasourceState.FAILED);
try {
DatasourceHelper.updateDatasource(client, jobParameter, timeout);
} catch (Exception ex) {
log.error("Failed to mark datasource state as FAILED for {}", jobParameter.getId(), ex);
}
}
});
listener.onResponse(new AcknowledgedResponse(true));
}

@Override
public void onFailure(final Exception e) {
if (e instanceof VersionConflictEngineException) {
listener.onFailure(
new ResourceAlreadyExistsException("datasource [{}] already exists", request.getDatasourceName())
);
} else {
listener.onFailure(e);
}
}
});
client.index(indexRequest, getIndexResponseListener(datasource, listener));
} catch (Exception e) {
listener.onFailure(e);
}
}

private void createDatasource(final Datasource jobParameter) throws Exception {
if (!DatasourceState.PREPARING.equals(jobParameter.getState())) {
log.error("Invalid datasource state. Expecting {} but received {}", DatasourceState.AVAILABLE, jobParameter.getState());
jobParameter.setState(DatasourceState.FAILED);
jobParameter.getUpdateStats().setLastFailedAt(Instant.now());
DatasourceHelper.updateDatasource(client, jobParameter, timeout);
@VisibleForTesting
protected ActionListener<IndexResponse> getIndexResponseListener(
final Datasource datasource,
final ActionListener<AcknowledgedResponse> listener
) {
return new ActionListener<>() {
@Override
public void onResponse(final IndexResponse indexResponse) {
// This is user initiated request. Therefore, we want to handle the first datasource update task in a generic thread
// pool.
threadPool.generic().submit(() -> { createDatasource(datasource); });
listener.onResponse(new AcknowledgedResponse(true));
}

@Override
public void onFailure(final Exception e) {
if (e instanceof VersionConflictEngineException) {
listener.onFailure(new ResourceAlreadyExistsException("datasource [{}] already exists", datasource.getId()));
} else {
listener.onFailure(e);
}
}
};
}

@VisibleForTesting
protected void createDatasource(final Datasource datasource) {
if (!DatasourceState.CREATING.equals(datasource.getState())) {
log.error("Invalid datasource state. Expecting {} but received {}", DatasourceState.AVAILABLE, datasource.getState());
markDatasourceAsCreateFailed(datasource);
return;
}

URL url = new URL(jobParameter.getEndpoint());
DatasourceManifest manifest = DatasourceManifest.Builder.build(url);
String indexName = setupIndex(manifest, jobParameter);
Instant startTime = Instant.now();
String[] fields = putIp2GeoData(indexName, manifest);
Instant endTime = Instant.now();
updateJobParameterAsSucceeded(jobParameter, manifest, fields, startTime, endTime);
log.info("GeoIP database[{}] creation succeeded after {} seconds", jobParameter.getId(), Duration.between(startTime, endTime));
try {
URL url = new URL(datasource.getEndpoint());
DatasourceManifest manifest = DatasourceManifest.Builder.build(url);
String indexName = setupIndex(manifest, datasource);
Instant startTime = Instant.now();
List<String> fields = putGeoIpData(indexName, manifest);
Instant endTime = Instant.now();
updateJobParameterAsSucceeded(datasource, manifest, fields, startTime, endTime);
log.info("GeoIP database[{}] creation succeeded after {} seconds", datasource.getId(), Duration.between(startTime, endTime));
} catch (Exception e) {
log.error("Failed to create datasource for {}", datasource.getId(), e);
markDatasourceAsCreateFailed(datasource);
}
}

private void markDatasourceAsCreateFailed(final Datasource datasource) {
datasource.getUpdateStats().setLastFailedAt(Instant.now());
datasource.setState(DatasourceState.CREATE_FAILED);
try {
datasourceHelper.updateDatasource(datasource);
} catch (Exception e) {
log.error("Failed to mark datasource state as CREATE_FAILED for {}", datasource.getId(), e);
}
}

private void updateJobParameterAsSucceeded(
final Datasource jobParameter,
final Datasource datasource,
final DatasourceManifest manifest,
final String[] fields,
final List<String> fields,
final Instant startTime,
final Instant endTime
) throws IOException {
jobParameter.setDatabase(manifest, fields);
jobParameter.getUpdateStats().setLastSucceededAt(endTime);
jobParameter.getUpdateStats().setLastProcessingTimeInMillis(endTime.toEpochMilli() - startTime.toEpochMilli());
jobParameter.enable();
jobParameter.setState(DatasourceState.AVAILABLE);
DatasourceHelper.updateDatasource(client, jobParameter, timeout);
datasource.setDatabase(manifest, fields);
datasource.getUpdateStats().setLastSucceededAt(endTime);
datasource.getUpdateStats().setLastProcessingTimeInMillis(endTime.toEpochMilli() - startTime.toEpochMilli());
datasource.enable();
datasource.setState(DatasourceState.AVAILABLE);
datasourceHelper.updateDatasource(datasource);
}

private String setupIndex(final DatasourceManifest manifest, final Datasource jobParameter) throws IOException {
String indexName = jobParameter.indexNameFor(manifest);
jobParameter.getIndices().add(indexName);
DatasourceHelper.updateDatasource(client, jobParameter, timeout);
GeoIpDataHelper.createIndexIfNotExists(clusterService, client, indexName, timeout);
private String setupIndex(final DatasourceManifest manifest, final Datasource datasource) throws IOException {
String indexName = datasource.indexNameFor(manifest);
datasource.getIndices().add(indexName);
datasourceHelper.updateDatasource(datasource);
geoIpDataHelper.createIndexIfNotExists(indexName);
return indexName;
}

private String[] putIp2GeoData(final String indexName, final DatasourceManifest manifest) throws IOException {
String[] fields;
try (CSVParser reader = GeoIpDataHelper.getDatabaseReader(manifest)) {
private List<String> putGeoIpData(final String indexName, final DatasourceManifest manifest) throws IOException {
String[] header;
try (CSVParser reader = geoIpDataHelper.getDatabaseReader(manifest)) {
Iterator<CSVRecord> iter = reader.iterator();
fields = iter.next().values();
GeoIpDataHelper.putGeoData(client, indexName, fields, iter, indexingBulkSize, timeout);
header = iter.next().values();
geoIpDataHelper.putGeoIpData(indexName, header, iter, indexingBulkSize);
}
return fields;
return Arrays.asList(header).subList(1, header.length);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import org.opensearch.action.index.IndexRequestBuilder;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.client.Client;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
import org.opensearch.common.xcontent.XContentFactory;
Expand All @@ -36,17 +38,22 @@
*/
@Log4j2
public class DatasourceHelper {
private final Client client;
private TimeValue timeout;

public DatasourceHelper(final Client client, final Settings settings, final ClusterSettings clusterSettings) {
this.client = client;
this.timeout = Ip2GeoSettings.TIMEOUT_IN_SECONDS.get(settings);
clusterSettings.addSettingsUpdateConsumer(Ip2GeoSettings.TIMEOUT_IN_SECONDS, newValue -> this.timeout = newValue);
}

/**
* Update datasource in an index {@code DatasourceExtension.JOB_INDEX_NAME}
* @param client the client
* @param datasource the datasource
* @param timeout the timeout
* @return index response
* @throws IOException exception
*/
public static IndexResponse updateDatasource(final Client client, final Datasource datasource, final TimeValue timeout)
throws IOException {
public IndexResponse updateDatasource(final Datasource datasource) throws IOException {
datasource.setLastUpdateTime(Instant.now());
IndexRequestBuilder requestBuilder = client.prepareIndex(DatasourceExtension.JOB_INDEX_NAME);
requestBuilder.setId(datasource.getId());
Expand All @@ -57,13 +64,11 @@ public static IndexResponse updateDatasource(final Client client, final Datasour

/**
* Get datasource from an index {@code DatasourceExtension.JOB_INDEX_NAME}
* @param client the client
* @param id the name of a datasource
* @param timeout the timeout
* @return datasource
* @throws IOException exception
*/
public static Datasource getDatasource(final Client client, final String id, final TimeValue timeout) throws IOException {
public Datasource getDatasource(final String id) throws IOException {
GetRequest request = new GetRequest(DatasourceExtension.JOB_INDEX_NAME, id);
GetResponse response;
try {
Expand All @@ -87,11 +92,10 @@ public static Datasource getDatasource(final Client client, final String id, fin

/**
* Get datasource from an index {@code DatasourceExtension.JOB_INDEX_NAME}
* @param client the client
* @param id the name of a datasource
* @param actionListener the action listener
*/
public static void getDatasource(final Client client, final String id, final ActionListener<Datasource> actionListener) {
public void getDatasource(final String id, final ActionListener<Datasource> actionListener) {
GetRequest request = new GetRequest(DatasourceExtension.JOB_INDEX_NAME, id);
client.get(request, new ActionListener<GetResponse>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,28 +11,28 @@
/**
* Ip2Geo datasource state
*
* When data source is created, it starts with PREPARING state. Once the first GeoIP data is generated, the state changes to AVAILABLE.
* Only when the first GeoIP data generation failed, the state changes to FAILED.
* Subsequent GeoIP data failure won't change data source state from AVAILABLE to FAILED.
* When data source is created, it starts with CREATING state. Once the first GeoIP data is generated, the state changes to AVAILABLE.
* Only when the first GeoIP data generation failed, the state changes to CREATE_FAILED.
* Subsequent GeoIP data failure won't change data source state from AVAILABLE to CREATE_FAILED.
* When delete request is received, the data source state changes to DELETING.
*
* State changed from left to right for the entire lifecycle of a datasource
* (PREPARING) to (FAILED or AVAILABLE) to (DELETING)
* (CREATING) to (CREATE_FAILED or AVAILABLE) to (DELETING)
*
*/
public enum DatasourceState {
/**
* Data source is being prepared
* Data source is being created
*/
PREPARING,
CREATING,
/**
* Data source is ready to be used
*/
AVAILABLE,
/**
* Data source preparation failed
* Data source creation failed
*/
FAILED,
CREATE_FAILED,
/**
* Data source is being deleted
*/
Expand Down
Loading

0 comments on commit 6306861

Please sign in to comment.