From 6306861a2b203f25524683693f23484ae40bf361 Mon Sep 17 00:00:00 2001 From: Heemin Kim Date: Mon, 24 Apr 2023 21:44:30 -0700 Subject: [PATCH] Added unit tests with some refactoring of codes Signed-off-by: Heemin Kim --- .../annotation/VisibleForTesting.java | 12 + .../ip2geo/action/PutDatasourceRequest.java | 10 +- .../action/PutDatasourceTransportAction.java | 156 ++++---- .../ip2geo/common/DatasourceHelper.java | 22 +- .../ip2geo/common/DatasourceState.java | 16 +- .../ip2geo/common/GeoIpDataHelper.java | 89 +++-- .../ip2geo/common/Ip2GeoExecutorHelper.java | 8 +- .../ip2geo/jobscheduler/Datasource.java | 44 ++- .../jobscheduler/DatasourceExtension.java | 2 +- .../ip2geo/jobscheduler/DatasourceRunner.java | 220 +++-------- .../jobscheduler/DatasourceUpdateService.java | 181 +++++++++ .../ip2geo/processor/Ip2GeoCache.java | 2 +- .../ip2geo/processor/Ip2GeoProcessor.java | 232 ++++++------ .../geospatial/plugin/GeospatialPlugin.java | 32 +- .../plugin-metadata/plugin-security.policy | 3 + .../geospatial/ip2geo/Ip2GeoTestCase.java | 227 ++++++++++++ .../action/PutDatasourceRequestTests.java | 96 ++++- .../PutDatasourceTransportActionTests.java | 139 +++++++ .../action/RestPutDatasourceHandlerIT.java | 54 +++ .../action/RestPutDatasourceHandlerTests.java | 45 ++- .../ip2geo/common/DatasourceHelperTests.java | 109 ++++-- .../ip2geo/common/GeoIpDataHelperTests.java | 299 ++++++++++++++- .../DatasourceExtensionTests.java | 47 +++ .../jobscheduler/DatasourceRunnerTests.java | 95 +++++ .../ip2geo/jobscheduler/DatasourceTests.java | 54 ++- .../DatasourceUpdateServiceTests.java | 163 +++++++++ .../processor/Ip2GeoProcessorTests.java | 343 ++++++++++++++++++ .../plugin/GeospatialPluginTests.java | 116 +++++- src/test/resources/ip2geo/manifest.json | 8 + .../ip2geo/manifest_invalid_url.json | 8 + .../resources/ip2geo/manifest_template.json | 8 + .../sample_invalid_less_than_two_fields.csv | 2 + src/test/resources/ip2geo/sample_valid.csv | 3 + src/test/resources/ip2geo/sample_valid.zip | Bin 0 -> 250 bytes 34 files changed, 2340 insertions(+), 505 deletions(-) create mode 100644 src/main/java/org/opensearch/geospatial/annotation/VisibleForTesting.java create mode 100644 src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateService.java create mode 100644 src/test/java/org/opensearch/geospatial/ip2geo/Ip2GeoTestCase.java create mode 100644 src/test/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceTransportActionTests.java create mode 100644 src/test/java/org/opensearch/geospatial/ip2geo/action/RestPutDatasourceHandlerIT.java create mode 100644 src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceExtensionTests.java create mode 100644 src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceRunnerTests.java create mode 100644 src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateServiceTests.java create mode 100644 src/test/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessorTests.java create mode 100644 src/test/resources/ip2geo/manifest.json create mode 100644 src/test/resources/ip2geo/manifest_invalid_url.json create mode 100644 src/test/resources/ip2geo/manifest_template.json create mode 100644 src/test/resources/ip2geo/sample_invalid_less_than_two_fields.csv create mode 100644 src/test/resources/ip2geo/sample_valid.csv create mode 100644 src/test/resources/ip2geo/sample_valid.zip diff --git a/src/main/java/org/opensearch/geospatial/annotation/VisibleForTesting.java b/src/main/java/org/opensearch/geospatial/annotation/VisibleForTesting.java new file mode 100644 index 00000000..d48c6dc2 --- /dev/null +++ b/src/main/java/org/opensearch/geospatial/annotation/VisibleForTesting.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.geospatial.annotation; + +public @interface VisibleForTesting { +} diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceRequest.java b/src/main/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceRequest.java index 24266b0d..bbe97630 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceRequest.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceRequest.java @@ -14,6 +14,7 @@ import java.net.URL; import java.util.Locale; +import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.Setter; import lombok.extern.log4j.Log4j2; @@ -28,11 +29,12 @@ import org.opensearch.geospatial.ip2geo.common.DatasourceManifest; /** - * GeoIP datasource creation request + * Ip2Geo datasource creation request */ @Getter @Setter @Log4j2 +@EqualsAndHashCode public class PutDatasourceRequest extends AcknowledgedRequest { private static final ParseField ENDPOINT_FIELD = new ParseField("endpoint"); private static final ParseField UPDATE_INTERVAL_IN_DAYS_FIELD = new ParseField("update_interval_in_days"); @@ -147,7 +149,7 @@ private void validateManifestFile(final URL url, final ActionRequestValidationEx errors.addValidationError( String.format( Locale.ROOT, - "updateInterval %d is should be smaller than %d", + "updateInterval %d should be smaller than %d", updateIntervalInDays.days(), manifest.getValidForInDays() ) @@ -156,12 +158,12 @@ private void validateManifestFile(final URL url, final ActionRequestValidationEx } /** - * Validate updateIntervalInDays is larger than 0 + * Validate updateIntervalInDays is equal or larger than 1 * * @param errors the errors to add error messages */ private void validateUpdateInterval(final ActionRequestValidationException errors) { - if (updateIntervalInDays.compareTo(TimeValue.timeValueDays(1)) > 0) { + if (updateIntervalInDays.compareTo(TimeValue.timeValueDays(1)) < 0) { errors.addValidationError("Update interval should be equal to or larger than 1 day"); } } diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceTransportAction.java b/src/main/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceTransportAction.java index 291e1087..4e5bae74 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceTransportAction.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceTransportAction.java @@ -12,7 +12,9 @@ import java.net.URL; import java.time.Duration; import java.time.Instant; +import java.util.Arrays; import java.util.Iterator; +import java.util.List; import lombok.extern.log4j.Log4j2; @@ -34,6 +36,7 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.xcontent.json.JsonXContent; +import org.opensearch.geospatial.annotation.VisibleForTesting; import org.opensearch.geospatial.ip2geo.common.DatasourceHelper; import org.opensearch.geospatial.ip2geo.common.DatasourceManifest; import org.opensearch.geospatial.ip2geo.common.DatasourceState; @@ -54,6 +57,8 @@ public class PutDatasourceTransportAction extends HandledTransportAction timeout = newValue); indexingBulkSize = Ip2GeoSettings.INDEXING_BULK_SIZE.get(settings); @@ -91,99 +100,106 @@ public PutDatasourceTransportAction( @Override protected void doExecute(final Task task, final PutDatasourceRequest request, final ActionListener listener) { try { - Datasource jobParameter = Datasource.Builder.build(request); + Datasource datasource = Datasource.Builder.build(request); IndexRequest indexRequest = new IndexRequest().index(DatasourceExtension.JOB_INDEX_NAME) - .id(jobParameter.getId()) - .source(jobParameter.toXContent(JsonXContent.contentBuilder(), null)) + .id(datasource.getId()) + .source(datasource.toXContent(JsonXContent.contentBuilder(), null)) .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) .opType(DocWriteRequest.OpType.CREATE); - client.index(indexRequest, new ActionListener<>() { - @Override - public void onResponse(final IndexResponse indexResponse) { - // This is user initiated request. Therefore, we want to handle the first datasource update task in a generic thread - // pool. - threadPool.generic().submit(() -> { - try { - createDatasource(jobParameter); - } catch (Exception e) { - log.error("Failed to create datasource for {}", jobParameter.getId(), e); - jobParameter.getUpdateStats().setLastFailedAt(Instant.now()); - jobParameter.setState(DatasourceState.FAILED); - try { - DatasourceHelper.updateDatasource(client, jobParameter, timeout); - } catch (Exception ex) { - log.error("Failed to mark datasource state as FAILED for {}", jobParameter.getId(), ex); - } - } - }); - listener.onResponse(new AcknowledgedResponse(true)); - } - - @Override - public void onFailure(final Exception e) { - if (e instanceof VersionConflictEngineException) { - listener.onFailure( - new ResourceAlreadyExistsException("datasource [{}] already exists", request.getDatasourceName()) - ); - } else { - listener.onFailure(e); - } - } - }); + client.index(indexRequest, getIndexResponseListener(datasource, listener)); } catch (Exception e) { listener.onFailure(e); } } - private void createDatasource(final Datasource jobParameter) throws Exception { - if (!DatasourceState.PREPARING.equals(jobParameter.getState())) { - log.error("Invalid datasource state. Expecting {} but received {}", DatasourceState.AVAILABLE, jobParameter.getState()); - jobParameter.setState(DatasourceState.FAILED); - jobParameter.getUpdateStats().setLastFailedAt(Instant.now()); - DatasourceHelper.updateDatasource(client, jobParameter, timeout); + @VisibleForTesting + protected ActionListener getIndexResponseListener( + final Datasource datasource, + final ActionListener listener + ) { + return new ActionListener<>() { + @Override + public void onResponse(final IndexResponse indexResponse) { + // This is user initiated request. Therefore, we want to handle the first datasource update task in a generic thread + // pool. + threadPool.generic().submit(() -> { createDatasource(datasource); }); + listener.onResponse(new AcknowledgedResponse(true)); + } + + @Override + public void onFailure(final Exception e) { + if (e instanceof VersionConflictEngineException) { + listener.onFailure(new ResourceAlreadyExistsException("datasource [{}] already exists", datasource.getId())); + } else { + listener.onFailure(e); + } + } + }; + } + + @VisibleForTesting + protected void createDatasource(final Datasource datasource) { + if (!DatasourceState.CREATING.equals(datasource.getState())) { + log.error("Invalid datasource state. Expecting {} but received {}", DatasourceState.AVAILABLE, datasource.getState()); + markDatasourceAsCreateFailed(datasource); return; } - URL url = new URL(jobParameter.getEndpoint()); - DatasourceManifest manifest = DatasourceManifest.Builder.build(url); - String indexName = setupIndex(manifest, jobParameter); - Instant startTime = Instant.now(); - String[] fields = putIp2GeoData(indexName, manifest); - Instant endTime = Instant.now(); - updateJobParameterAsSucceeded(jobParameter, manifest, fields, startTime, endTime); - log.info("GeoIP database[{}] creation succeeded after {} seconds", jobParameter.getId(), Duration.between(startTime, endTime)); + try { + URL url = new URL(datasource.getEndpoint()); + DatasourceManifest manifest = DatasourceManifest.Builder.build(url); + String indexName = setupIndex(manifest, datasource); + Instant startTime = Instant.now(); + List fields = putGeoIpData(indexName, manifest); + Instant endTime = Instant.now(); + updateJobParameterAsSucceeded(datasource, manifest, fields, startTime, endTime); + log.info("GeoIP database[{}] creation succeeded after {} seconds", datasource.getId(), Duration.between(startTime, endTime)); + } catch (Exception e) { + log.error("Failed to create datasource for {}", datasource.getId(), e); + markDatasourceAsCreateFailed(datasource); + } + } + + private void markDatasourceAsCreateFailed(final Datasource datasource) { + datasource.getUpdateStats().setLastFailedAt(Instant.now()); + datasource.setState(DatasourceState.CREATE_FAILED); + try { + datasourceHelper.updateDatasource(datasource); + } catch (Exception e) { + log.error("Failed to mark datasource state as CREATE_FAILED for {}", datasource.getId(), e); + } } private void updateJobParameterAsSucceeded( - final Datasource jobParameter, + final Datasource datasource, final DatasourceManifest manifest, - final String[] fields, + final List fields, final Instant startTime, final Instant endTime ) throws IOException { - jobParameter.setDatabase(manifest, fields); - jobParameter.getUpdateStats().setLastSucceededAt(endTime); - jobParameter.getUpdateStats().setLastProcessingTimeInMillis(endTime.toEpochMilli() - startTime.toEpochMilli()); - jobParameter.enable(); - jobParameter.setState(DatasourceState.AVAILABLE); - DatasourceHelper.updateDatasource(client, jobParameter, timeout); + datasource.setDatabase(manifest, fields); + datasource.getUpdateStats().setLastSucceededAt(endTime); + datasource.getUpdateStats().setLastProcessingTimeInMillis(endTime.toEpochMilli() - startTime.toEpochMilli()); + datasource.enable(); + datasource.setState(DatasourceState.AVAILABLE); + datasourceHelper.updateDatasource(datasource); } - private String setupIndex(final DatasourceManifest manifest, final Datasource jobParameter) throws IOException { - String indexName = jobParameter.indexNameFor(manifest); - jobParameter.getIndices().add(indexName); - DatasourceHelper.updateDatasource(client, jobParameter, timeout); - GeoIpDataHelper.createIndexIfNotExists(clusterService, client, indexName, timeout); + private String setupIndex(final DatasourceManifest manifest, final Datasource datasource) throws IOException { + String indexName = datasource.indexNameFor(manifest); + datasource.getIndices().add(indexName); + datasourceHelper.updateDatasource(datasource); + geoIpDataHelper.createIndexIfNotExists(indexName); return indexName; } - private String[] putIp2GeoData(final String indexName, final DatasourceManifest manifest) throws IOException { - String[] fields; - try (CSVParser reader = GeoIpDataHelper.getDatabaseReader(manifest)) { + private List putGeoIpData(final String indexName, final DatasourceManifest manifest) throws IOException { + String[] header; + try (CSVParser reader = geoIpDataHelper.getDatabaseReader(manifest)) { Iterator iter = reader.iterator(); - fields = iter.next().values(); - GeoIpDataHelper.putGeoData(client, indexName, fields, iter, indexingBulkSize, timeout); + header = iter.next().values(); + geoIpDataHelper.putGeoIpData(indexName, header, iter, indexingBulkSize); } - return fields; + return Arrays.asList(header).subList(1, header.length); } } diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/common/DatasourceHelper.java b/src/main/java/org/opensearch/geospatial/ip2geo/common/DatasourceHelper.java index ea89e585..e9fbafe2 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/common/DatasourceHelper.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/common/DatasourceHelper.java @@ -20,6 +20,8 @@ import org.opensearch.action.index.IndexRequestBuilder; import org.opensearch.action.index.IndexResponse; import org.opensearch.client.Client; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.xcontent.LoggingDeprecationHandler; import org.opensearch.common.xcontent.XContentFactory; @@ -36,17 +38,22 @@ */ @Log4j2 public class DatasourceHelper { + private final Client client; + private TimeValue timeout; + + public DatasourceHelper(final Client client, final Settings settings, final ClusterSettings clusterSettings) { + this.client = client; + this.timeout = Ip2GeoSettings.TIMEOUT_IN_SECONDS.get(settings); + clusterSettings.addSettingsUpdateConsumer(Ip2GeoSettings.TIMEOUT_IN_SECONDS, newValue -> this.timeout = newValue); + } /** * Update datasource in an index {@code DatasourceExtension.JOB_INDEX_NAME} - * @param client the client * @param datasource the datasource - * @param timeout the timeout * @return index response * @throws IOException exception */ - public static IndexResponse updateDatasource(final Client client, final Datasource datasource, final TimeValue timeout) - throws IOException { + public IndexResponse updateDatasource(final Datasource datasource) throws IOException { datasource.setLastUpdateTime(Instant.now()); IndexRequestBuilder requestBuilder = client.prepareIndex(DatasourceExtension.JOB_INDEX_NAME); requestBuilder.setId(datasource.getId()); @@ -57,13 +64,11 @@ public static IndexResponse updateDatasource(final Client client, final Datasour /** * Get datasource from an index {@code DatasourceExtension.JOB_INDEX_NAME} - * @param client the client * @param id the name of a datasource - * @param timeout the timeout * @return datasource * @throws IOException exception */ - public static Datasource getDatasource(final Client client, final String id, final TimeValue timeout) throws IOException { + public Datasource getDatasource(final String id) throws IOException { GetRequest request = new GetRequest(DatasourceExtension.JOB_INDEX_NAME, id); GetResponse response; try { @@ -87,11 +92,10 @@ public static Datasource getDatasource(final Client client, final String id, fin /** * Get datasource from an index {@code DatasourceExtension.JOB_INDEX_NAME} - * @param client the client * @param id the name of a datasource * @param actionListener the action listener */ - public static void getDatasource(final Client client, final String id, final ActionListener actionListener) { + public void getDatasource(final String id, final ActionListener actionListener) { GetRequest request = new GetRequest(DatasourceExtension.JOB_INDEX_NAME, id); client.get(request, new ActionListener() { @Override diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/common/DatasourceState.java b/src/main/java/org/opensearch/geospatial/ip2geo/common/DatasourceState.java index 27523bda..85b0aecf 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/common/DatasourceState.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/common/DatasourceState.java @@ -11,28 +11,28 @@ /** * Ip2Geo datasource state * - * When data source is created, it starts with PREPARING state. Once the first GeoIP data is generated, the state changes to AVAILABLE. - * Only when the first GeoIP data generation failed, the state changes to FAILED. - * Subsequent GeoIP data failure won't change data source state from AVAILABLE to FAILED. + * When data source is created, it starts with CREATING state. Once the first GeoIP data is generated, the state changes to AVAILABLE. + * Only when the first GeoIP data generation failed, the state changes to CREATE_FAILED. + * Subsequent GeoIP data failure won't change data source state from AVAILABLE to CREATE_FAILED. * When delete request is received, the data source state changes to DELETING. * * State changed from left to right for the entire lifecycle of a datasource - * (PREPARING) to (FAILED or AVAILABLE) to (DELETING) + * (CREATING) to (CREATE_FAILED or AVAILABLE) to (DELETING) * */ public enum DatasourceState { /** - * Data source is being prepared + * Data source is being created */ - PREPARING, + CREATING, /** * Data source is ready to be used */ AVAILABLE, /** - * Data source preparation failed + * Data source creation failed */ - FAILED, + CREATE_FAILED, /** * Data source is being deleted */ diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/common/GeoIpDataHelper.java b/src/main/java/org/opensearch/geospatial/ip2geo/common/GeoIpDataHelper.java index 4543ce7f..b3f0bfaa 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/common/GeoIpDataHelper.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/common/GeoIpDataHelper.java @@ -41,7 +41,9 @@ import org.opensearch.action.search.MultiSearchRequestBuilder; import org.opensearch.action.search.MultiSearchResponse; import org.opensearch.action.search.SearchResponse; +import org.opensearch.action.support.IndicesOptions; import org.opensearch.action.support.WriteRequest; +import org.opensearch.action.support.master.AcknowledgedResponse; import org.opensearch.client.Client; import org.opensearch.client.Requests; import org.opensearch.cluster.service.ClusterService; @@ -61,21 +63,24 @@ public class GeoIpDataHelper { private static final String DATA_FIELD_NAME = "_data"; private static final Tuple INDEX_SETTING_NUM_OF_SHARDS = new Tuple<>("index.number_of_shards", 1); private static final Tuple INDEX_SETTING_AUTO_EXPAND_REPLICAS = new Tuple<>("index.auto_expand_replicas", "0-all"); + private final ClusterService clusterService; + private final Client client; + private TimeValue timeout; + + public GeoIpDataHelper(final ClusterService clusterService, final Client client) { + this.clusterService = clusterService; + this.client = client; + this.timeout = Ip2GeoSettings.TIMEOUT_IN_SECONDS.get(clusterService.getSettings()); + clusterService.getClusterSettings() + .addSettingsUpdateConsumer(Ip2GeoSettings.TIMEOUT_IN_SECONDS, newValue -> this.timeout = newValue); + } /** * Create an index of single shard with auto expand replicas to all nodes * - * @param clusterService cluster service - * @param client client * @param indexName index name - * @param timeout timeout */ - public static void createIndexIfNotExists( - final ClusterService clusterService, - final Client client, - final String indexName, - final TimeValue timeout - ) { + public void createIndexIfNotExists(final String indexName) { if (clusterService.state().metadata().hasIndex(indexName) == true) { return; } @@ -101,11 +106,11 @@ public static void createIndexIfNotExists( * * @return String representing datasource database index mapping */ - private static String getIndexMapping() { + private String getIndexMapping() { try { try (InputStream is = DatasourceHelper.class.getResourceAsStream("/mappings/ip2geo_datasource.json")) { try (BufferedReader reader = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8))) { - return reader.lines().collect(Collectors.joining()); + return reader.lines().map(line -> line.trim()).collect(Collectors.joining()); } } } catch (IOException e) { @@ -120,7 +125,7 @@ private static String getIndexMapping() { * @return CSVParser for GeoIP data */ @SuppressForbidden(reason = "Need to connect to http endpoint to read GeoIP database file") - public static CSVParser getDatabaseReader(final DatasourceManifest manifest) { + public CSVParser getDatabaseReader(final DatasourceManifest manifest) { SpecialPermission.check(); return AccessController.doPrivileged((PrivilegedAction) () -> { try { @@ -164,7 +169,7 @@ public static CSVParser getDatabaseReader(final DatasourceManifest manifest) { * @param values a list of values * @return Document in json string format */ - public static String createDocument(final String[] fields, final String[] values) { + public String createDocument(final String[] fields, final String[] values) { StringBuilder sb = new StringBuilder(); sb.append("{\""); sb.append(IP_RANGE_FIELD_NAME); @@ -188,19 +193,13 @@ public static String createDocument(final String[] fields, final String[] values } /** - * Query a given index using a given ip address to get geo data + * Query a given index using a given ip address to get geoip data * - * @param client client * @param indexName index * @param ip ip address * @param actionListener action listener */ - public static void getGeoData( - final Client client, - final String indexName, - final String ip, - final ActionListener> actionListener - ) { + public void getGeoIpData(final String indexName, final String ip, final ActionListener> actionListener) { client.prepareSearch(indexName) .setSize(1) .setQuery(QueryBuilders.termQuery(IP_RANGE_FIELD_NAME, ip)) @@ -211,12 +210,12 @@ public void onResponse(final SearchResponse searchResponse) { if (searchResponse.getHits().getHits().length == 0) { actionListener.onResponse(Collections.emptyMap()); } else { - Map geoData = (Map) XContentHelper.convertToMap( + Map geoIpData = (Map) XContentHelper.convertToMap( searchResponse.getHits().getAt(0).getSourceRef(), false, XContentType.JSON ).v2().get(DATA_FIELD_NAME); - actionListener.onResponse(geoData); + actionListener.onResponse(geoIpData); } } @@ -228,28 +227,26 @@ public void onFailure(final Exception e) { } /** - * Query a given index using a given ip address iterator to get geo data + * Query a given index using a given ip address iterator to get geoip data * * This method calls itself recursively until it processes all ip addresses in bulk of {@code bulkSize}. * - * @param client the client * @param indexName the index name * @param ipIterator the iterator of ip addresses * @param maxBundleSize number of ip address to pass in multi search * @param maxConcurrentSearches the max concurrent search requests * @param firstOnly return only the first matching result if true - * @param geoData collected geo data + * @param geoIpData collected geo data * @param actionListener the action listener */ - public static void getGeoData( - final Client client, + public void getGeoIpData( final String indexName, final Iterator ipIterator, final Integer maxBundleSize, final Integer maxConcurrentSearches, final boolean firstOnly, - final Map> geoData, - final ActionListener actionListener + final Map> geoIpData, + final ActionListener>> actionListener ) { MultiSearchRequestBuilder mRequestBuilder = client.prepareMultiSearch(); if (maxConcurrentSearches != 0) { @@ -259,7 +256,7 @@ public static void getGeoData( List ipsToSearch = new ArrayList<>(maxBundleSize); while (ipIterator.hasNext() && ipsToSearch.size() < maxBundleSize) { String ip = ipIterator.next(); - if (geoData.get(ip) == null) { + if (geoIpData.get(ip) == null) { mRequestBuilder.add( client.prepareSearch(indexName) .setSize(1) @@ -271,7 +268,7 @@ public static void getGeoData( } if (ipsToSearch.isEmpty()) { - actionListener.onResponse(null); + actionListener.onResponse(geoIpData); return; } @@ -285,7 +282,7 @@ public void onResponse(final MultiSearchResponse items) { } if (items.getResponses()[i].getResponse().getHits().getHits().length == 0) { - geoData.put(ipsToSearch.get(i), Collections.emptyMap()); + geoIpData.put(ipsToSearch.get(i), Collections.emptyMap()); continue; } @@ -295,14 +292,14 @@ public void onResponse(final MultiSearchResponse items) { XContentType.JSON ).v2().get(DATA_FIELD_NAME); - geoData.put(ipsToSearch.get(i), data); + geoIpData.put(ipsToSearch.get(i), data); if (firstOnly) { - actionListener.onResponse(null); + actionListener.onResponse(geoIpData); return; } } - getGeoData(client, indexName, ipIterator, maxBundleSize, maxConcurrentSearches, firstOnly, geoData, actionListener); + getGeoIpData(indexName, ipIterator, maxBundleSize, maxConcurrentSearches, firstOnly, geoIpData, actionListener); } @Override @@ -315,21 +312,12 @@ public void onFailure(final Exception e) { /** * Puts GeoIP data from CSVRecord iterator into a given index in bulk * - * @param client OpenSearch client * @param indexName Index name to puts the GeoIP data * @param fields Field name matching with data in CSVRecord in order * @param iterator GeoIP data to insert * @param bulkSize Bulk size of data to process - * @param timeout Timeout */ - public static void putGeoData( - final Client client, - final String indexName, - final String[] fields, - final Iterator iterator, - final int bulkSize, - final TimeValue timeout - ) { + public void putGeoIpData(final String indexName, final String[] fields, final Iterator iterator, final int bulkSize) { BulkRequest bulkRequest = new BulkRequest().setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); while (iterator.hasNext()) { CSVRecord record = iterator.next(); @@ -351,4 +339,13 @@ public static void putGeoData( client.admin().indices().prepareRefresh(indexName).execute().actionGet(timeout); client.admin().indices().prepareForceMerge(indexName).setMaxNumSegments(1).execute().actionGet(timeout); } + + public AcknowledgedResponse deleteIndex(final String index) { + return client.admin() + .indices() + .prepareDelete(index) + .setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN) + .execute() + .actionGet(timeout); + } } diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/common/Ip2GeoExecutorHelper.java b/src/main/java/org/opensearch/geospatial/ip2geo/common/Ip2GeoExecutorHelper.java index 7c5d77f9..d620af6d 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/common/Ip2GeoExecutorHelper.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/common/Ip2GeoExecutorHelper.java @@ -20,6 +20,11 @@ */ public class Ip2GeoExecutorHelper { private static final String THREAD_POOL_NAME = "_plugin_geospatial_ip2geo_datasource_update"; + private final ThreadPool threadPool; + + public Ip2GeoExecutorHelper(final ThreadPool threadPool) { + this.threadPool = threadPool; + } /** * We use fixed thread count of 1 for updating datasource as updating datasource is running background @@ -35,10 +40,9 @@ public static ExecutorBuilder executorBuilder(final Settings settings) { /** * Return an executor service for datasource update task * - * @param threadPool the thread pool * @return the executor service */ - public static ExecutorService forDatasourceUpdate(final ThreadPool threadPool) { + public ExecutorService forDatasourceUpdate() { return threadPool.executor(THREAD_POOL_NAME); } } diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/Datasource.java b/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/Datasource.java index 4d42f9ad..c0209916 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/Datasource.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/Datasource.java @@ -12,20 +12,24 @@ import java.time.Instant; import java.time.temporal.ChronoUnit; import java.util.ArrayList; -import java.util.Arrays; +import java.util.HashSet; import java.util.List; import java.util.Locale; +import java.util.Set; import lombok.AccessLevel; import lombok.AllArgsConstructor; +import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.NoArgsConstructor; import lombok.Setter; +import lombok.ToString; import org.opensearch.core.ParseField; import org.opensearch.core.xcontent.ConstructingObjectParser; import org.opensearch.core.xcontent.ToXContent; import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.geospatial.annotation.VisibleForTesting; import org.opensearch.geospatial.ip2geo.action.PutDatasourceRequest; import org.opensearch.geospatial.ip2geo.common.DatasourceManifest; import org.opensearch.geospatial.ip2geo.common.DatasourceState; @@ -38,6 +42,8 @@ */ @Getter @Setter +@ToString +@EqualsAndHashCode @AllArgsConstructor public class Datasource implements ScheduledJobParameter { /** @@ -173,22 +179,20 @@ public class Datasource implements ScheduledJobParameter { } - /** - * Visible for testing - */ - protected Datasource() { + @VisibleForTesting + public Datasource() { this(null, null, null); } public Datasource(final String id, final IntervalSchedule schedule, final String endpoint) { this( id, - Instant.now(), + Instant.now().truncatedTo(ChronoUnit.MILLIS), null, false, schedule, endpoint, - DatasourceState.PREPARING, + DatasourceState.CREATING, new ArrayList<>(), new Database(), new UpdateStats() @@ -264,14 +268,14 @@ public Long getLockDurationSeconds() { */ @Override public Double getJitter() { - return 5.0 / (schedule.getInterval() * 24 * 60); + return 5.0 / (schedule.getInterval() * 24.0 * 60.0); } /** * Enable auto update of GeoIP data */ public void enable() { - enabledTime = Instant.now(); + enabledTime = Instant.now().truncatedTo(ChronoUnit.MILLIS); isEnabled = true; } @@ -322,12 +326,22 @@ public boolean isExpired() { return Instant.now().isAfter(lastCheckedAt.plus(database.validForInDays, ChronoUnit.DAYS)); } - public void setDatabase(final DatasourceManifest datasourceManifest, final String[] fields) { + public void setDatabase(final DatasourceManifest datasourceManifest, final List fields) { this.database.setProvider(datasourceManifest.getProvider()); this.database.setMd5Hash(datasourceManifest.getMd5Hash()); this.database.setUpdatedAt(Instant.ofEpochMilli(datasourceManifest.getUpdatedAt())); - this.database.setValidForInDays(database.validForInDays); - this.database.setFields(Arrays.asList(fields)); + this.database.setValidForInDays(datasourceManifest.getValidForInDays()); + this.database.setFields(fields); + } + + public boolean isCompatible(final List fields) { + Set fieldsSet = new HashSet<>(fields); + for (String field : database.fields) { + if (!fieldsSet.contains(field)) { + return false; + } + } + return true; } /** @@ -335,6 +349,8 @@ public void setDatabase(final DatasourceManifest datasourceManifest, final Strin */ @Getter @Setter + @ToString + @EqualsAndHashCode @AllArgsConstructor(access = AccessLevel.PRIVATE) @NoArgsConstructor(access = AccessLevel.PRIVATE) public static class Database implements ToXContent { @@ -427,6 +443,8 @@ public XContentBuilder toXContent(final XContentBuilder builder, final Params pa */ @Getter @Setter + @ToString + @EqualsAndHashCode @AllArgsConstructor(access = AccessLevel.PRIVATE) @NoArgsConstructor(access = AccessLevel.PRIVATE) public static class UpdateStats implements ToXContent { @@ -517,7 +535,7 @@ public static class Builder { public static Datasource build(final PutDatasourceRequest request) { String id = request.getDatasourceName(); IntervalSchedule schedule = new IntervalSchedule( - Instant.now(), + Instant.now().truncatedTo(ChronoUnit.MILLIS), (int) request.getUpdateIntervalInDays().days(), ChronoUnit.DAYS ); diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceExtension.java b/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceExtension.java index ed94f6fe..6def5295 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceExtension.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceExtension.java @@ -15,7 +15,7 @@ /** * Datasource job scheduler extension * - * This extension is responsible for scheduling Ip2Geo data update task + * This extension is responsible for scheduling GeoIp data update task * * See https://github.com/opensearch-project/job-scheduler/blob/main/README.md#getting-started */ diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceRunner.java b/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceRunner.java index c408ff3b..508d958f 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceRunner.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceRunner.java @@ -8,40 +8,26 @@ package org.opensearch.geospatial.ip2geo.jobscheduler; -import java.net.URL; -import java.time.Duration; +import java.io.IOException; import java.time.Instant; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Iterator; -import java.util.List; import lombok.extern.log4j.Log4j2; -import org.apache.commons.csv.CSVParser; -import org.apache.commons.csv.CSVRecord; -import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; -import org.opensearch.action.support.IndicesOptions; import org.opensearch.client.Client; import org.opensearch.cluster.service.ClusterService; -import org.opensearch.common.unit.TimeValue; +import org.opensearch.geospatial.annotation.VisibleForTesting; import org.opensearch.geospatial.ip2geo.common.DatasourceHelper; -import org.opensearch.geospatial.ip2geo.common.DatasourceManifest; -import org.opensearch.geospatial.ip2geo.common.DatasourceState; -import org.opensearch.geospatial.ip2geo.common.GeoIpDataHelper; import org.opensearch.geospatial.ip2geo.common.Ip2GeoExecutorHelper; -import org.opensearch.geospatial.ip2geo.common.Ip2GeoSettings; import org.opensearch.jobscheduler.spi.JobExecutionContext; import org.opensearch.jobscheduler.spi.ScheduledJobParameter; import org.opensearch.jobscheduler.spi.ScheduledJobRunner; import org.opensearch.jobscheduler.spi.utils.LockService; -import org.opensearch.threadpool.ThreadPool; /** * Datasource update task * - * This is a background task which is responsible for updating Ip2Geo datasource + * This is a background task which is responsible for updating GeoIp data */ @Log4j2 public class DatasourceRunner implements ScheduledJobRunner { @@ -66,10 +52,10 @@ public static DatasourceRunner getJobRunnerInstance() { } private ClusterService clusterService; - private ThreadPool threadPool; private Client client; - private TimeValue timeout; - private Integer indexingBulkSize; + private DatasourceUpdateService datasourceUpdateService; + private Ip2GeoExecutorHelper ip2GeoExecutorHelper; + private DatasourceHelper datasourceHelper; private boolean initialized; private DatasourceRunner() { @@ -79,17 +65,18 @@ private DatasourceRunner() { /** * Initialize timeout and indexingBulkSize from settings */ - public void initialize(final ClusterService clusterService, final ThreadPool threadPool, final Client client) { + public void initialize( + final ClusterService clusterService, + final Client client, + final DatasourceUpdateService datasourceUpdateService, + final Ip2GeoExecutorHelper ip2GeoExecutorHelper, + final DatasourceHelper datasourceHelper + ) { this.clusterService = clusterService; - this.threadPool = threadPool; this.client = client; - - this.timeout = Ip2GeoSettings.TIMEOUT_IN_SECONDS.get(clusterService.getSettings()); - clusterService.getClusterSettings() - .addSettingsUpdateConsumer(Ip2GeoSettings.TIMEOUT_IN_SECONDS, newValue -> this.timeout = newValue); - this.indexingBulkSize = Ip2GeoSettings.INDEXING_BULK_SIZE.get(clusterService.getSettings()); - clusterService.getClusterSettings() - .addSettingsUpdateConsumer(Ip2GeoSettings.INDEXING_BULK_SIZE, newValue -> this.indexingBulkSize = newValue); + this.datasourceUpdateService = datasourceUpdateService; + this.ip2GeoExecutorHelper = ip2GeoExecutorHelper; + this.datasourceHelper = datasourceHelper; this.initialized = true; } @@ -102,11 +89,11 @@ public void runJob(final ScheduledJobParameter jobParameter, final JobExecutionC log.info("Update job started for a datasource[{}]", jobParameter.getName()); if (jobParameter instanceof Datasource == false) { throw new IllegalStateException( - "job parameter is not instance of DatasourceUpdateJobParameter, type: " + jobParameter.getClass().getCanonicalName() + "job parameter is not instance of Datasource, type: " + jobParameter.getClass().getCanonicalName() ); } - Ip2GeoExecutorHelper.forDatasourceUpdate(threadPool).submit(updateDatasourceRunner(jobParameter, context)); + ip2GeoExecutorHelper.forDatasourceUpdate().submit(updateDatasourceRunner(jobParameter, context)); } /** @@ -118,161 +105,42 @@ public void runJob(final ScheduledJobParameter jobParameter, final JobExecutionC * @param jobParameter job parameter * @param context context */ - private Runnable updateDatasourceRunner(final ScheduledJobParameter jobParameter, final JobExecutionContext context) { + @VisibleForTesting + protected Runnable updateDatasourceRunner(final ScheduledJobParameter jobParameter, final JobExecutionContext context) { final LockService lockService = context.getLockService(); return () -> { - if (jobParameter.getLockDurationSeconds() != null) { - lockService.acquireLock(jobParameter, context, ActionListener.wrap(lock -> { - if (lock == null) { - return; - } - try { - Datasource datasource = DatasourceHelper.getDatasource(client, jobParameter.getName(), timeout); - if (datasource == null) { - log.info("Datasource[{}] is already deleted", jobParameter.getName()); - return; - } - - try { - deleteUnusedIndices(datasource); - updateDatasource(datasource); - deleteUnusedIndices(datasource); - } catch (Exception e) { - log.error("Failed to update datasource for {}", datasource.getId(), e); - datasource.getUpdateStats().setLastFailedAt(Instant.now()); - DatasourceHelper.updateDatasource(client, datasource, timeout); - } - } finally { - lockService.release( - lock, - ActionListener.wrap(released -> {}, exception -> { log.error("Failed to release lock [{}]", lock); }) - ); - } - }, exception -> { log.error("Failed to acquire lock for job [{}]", jobParameter.getName()); })); - } - }; - - } - - /** - * Delete all indices except the one which are being used - * - * @param parameter - */ - private void deleteUnusedIndices(final Datasource parameter) { - try { - List deletedIndices = new ArrayList<>(); - for (String index : parameter.getIndices()) { - if (index.equals(parameter.currentIndexName())) { - continue; - } - - if (!clusterService.state().metadata().hasIndex(index)) { - deletedIndices.add(index); - continue; + lockService.acquireLock(jobParameter, context, ActionListener.wrap(lock -> { + if (lock == null) { + return; } - try { - if (client.admin() - .indices() - .prepareDelete(index) - .setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN) - .execute() - .actionGet(timeout) - .isAcknowledged()) { - deletedIndices.add(index); - } else { - log.error("Failed to delete an index [{}]", index); - } - } catch (Exception e) { - log.error("Failed to delete an index [{}]", index, e); + updateDatasource(jobParameter); + } finally { + lockService.release( + lock, + ActionListener.wrap(released -> {}, exception -> { log.error("Failed to release lock [{}]", lock, exception); }) + ); } - } - if (!deletedIndices.isEmpty()) { - parameter.getIndices().removeAll(deletedIndices); - DatasourceHelper.updateDatasource(client, parameter, timeout); - } - } catch (Exception e) { - log.error("Failed to delete old indices for {}", parameter.getId(), e); - } + }, exception -> { log.error("Failed to acquire lock for job [{}]", jobParameter.getName(), exception); })); + }; } - /** - * Update GeoIP data internal - * - * @param jobParameter - * @throws Exception - */ - private void updateDatasource(final Datasource jobParameter) throws Exception { - if (!DatasourceState.AVAILABLE.equals(jobParameter.getState())) { - log.error("Invalid datasource state. Expecting {} but received {}", DatasourceState.AVAILABLE, jobParameter.getState()); - jobParameter.disable(); - jobParameter.getUpdateStats().setLastFailedAt(Instant.now()); - DatasourceHelper.updateDatasource(client, jobParameter, timeout); - return; - } - - URL url = new URL(jobParameter.getEndpoint()); - DatasourceManifest manifest = DatasourceManifest.Builder.build(url); - - if (skipUpdate(jobParameter, manifest)) { - log.info("Skipping GeoIP database update. Update is not required for {}", jobParameter.getId()); - jobParameter.getUpdateStats().setLastSkippedAt(Instant.now()); - DatasourceHelper.updateDatasource(client, jobParameter, timeout); + @VisibleForTesting + protected void updateDatasource(final ScheduledJobParameter jobParameter) throws IOException { + Datasource datasource = datasourceHelper.getDatasource(jobParameter.getName()); + if (datasource == null) { + log.info("Datasource[{}] is already deleted", jobParameter.getName()); return; } - Instant startTime = Instant.now(); - String indexName = jobParameter.indexNameFor(manifest); - jobParameter.getIndices().add(indexName); - DatasourceHelper.updateDatasource(client, jobParameter, timeout); - GeoIpDataHelper.createIndexIfNotExists(clusterService, client, indexName, timeout); - String[] fields; - try (CSVParser reader = GeoIpDataHelper.getDatabaseReader(manifest)) { - Iterator iter = reader.iterator(); - fields = iter.next().values(); - if (!jobParameter.getDatabase().getFields().equals(Arrays.asList(fields))) { - throw new OpenSearchException( - "fields does not match between old [{}] and new [{}]", - jobParameter.getDatabase().getFields().toString(), - Arrays.asList(fields).toString() - ); - } - GeoIpDataHelper.putGeoData(client, indexName, fields, iter, indexingBulkSize, timeout); - } - - Instant endTime = Instant.now(); - jobParameter.getDatabase().setProvider(manifest.getProvider()); - jobParameter.getDatabase().setMd5Hash(manifest.getMd5Hash()); - jobParameter.getDatabase().setUpdatedAt(Instant.ofEpochMilli(manifest.getUpdatedAt())); - jobParameter.getDatabase().setValidForInDays(manifest.getValidForInDays()); - jobParameter.getDatabase().setFields(Arrays.asList(fields)); - jobParameter.getUpdateStats().setLastSucceededAt(endTime); - jobParameter.getUpdateStats().setLastProcessingTimeInMillis(endTime.toEpochMilli() - startTime.toEpochMilli()); - DatasourceHelper.updateDatasource(client, jobParameter, timeout); - log.info( - "GeoIP database creation succeeded for {} and took {} seconds", - jobParameter.getId(), - Duration.between(startTime, endTime) - ); - } - - /** - * Determine if update is needed or not - * - * Update is needed when all following conditions are met - * 1. MD5 hash value in datasource is different with MD5 hash value in manifest - * 2. updatedAt value in datasource is before updateAt value in manifest - * - * @param parameter - * @param manifest - * @return - */ - private boolean skipUpdate(final Datasource parameter, final DatasourceManifest manifest) { - if (manifest.getMd5Hash().equals(parameter.getDatabase().getMd5Hash())) { - return true; + try { + datasourceUpdateService.deleteUnusedIndices(datasource); + datasourceUpdateService.updateGeoIpData(datasource); + datasourceUpdateService.deleteUnusedIndices(datasource); + } catch (Exception e) { + log.error("Failed to update datasource for {}", datasource.getId(), e); + datasource.getUpdateStats().setLastFailedAt(Instant.now()); + datasourceHelper.updateDatasource(datasource); } - - return parameter.getDatabase().getUpdatedAt().toEpochMilli() >= manifest.getUpdatedAt(); } } diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateService.java b/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateService.java new file mode 100644 index 00000000..8293a032 --- /dev/null +++ b/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateService.java @@ -0,0 +1,181 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.geospatial.ip2geo.jobscheduler; + +import java.net.URL; +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; + +import lombok.extern.log4j.Log4j2; + +import org.apache.commons.csv.CSVParser; +import org.apache.commons.csv.CSVRecord; +import org.opensearch.OpenSearchException; +import org.opensearch.client.Client; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.geospatial.ip2geo.common.DatasourceHelper; +import org.opensearch.geospatial.ip2geo.common.DatasourceManifest; +import org.opensearch.geospatial.ip2geo.common.DatasourceState; +import org.opensearch.geospatial.ip2geo.common.GeoIpDataHelper; +import org.opensearch.geospatial.ip2geo.common.Ip2GeoSettings; + +@Log4j2 +public class DatasourceUpdateService { + private final ClusterService clusterService; + private final Client client; + private final DatasourceHelper datasourceHelper; + private final GeoIpDataHelper geoIpDataHelper; + private TimeValue timeout; + private Integer indexingBulkSize; + + public DatasourceUpdateService( + final ClusterService clusterService, + final Client client, + final DatasourceHelper datasourceHelper, + final GeoIpDataHelper geoIpDataHelper + ) { + this.clusterService = clusterService; + this.client = client; + this.datasourceHelper = datasourceHelper; + this.geoIpDataHelper = geoIpDataHelper; + + this.timeout = Ip2GeoSettings.TIMEOUT_IN_SECONDS.get(clusterService.getSettings()); + clusterService.getClusterSettings() + .addSettingsUpdateConsumer(Ip2GeoSettings.TIMEOUT_IN_SECONDS, newValue -> this.timeout = newValue); + this.indexingBulkSize = Ip2GeoSettings.INDEXING_BULK_SIZE.get(clusterService.getSettings()); + clusterService.getClusterSettings() + .addSettingsUpdateConsumer(Ip2GeoSettings.INDEXING_BULK_SIZE, newValue -> this.indexingBulkSize = newValue); + } + + /** + * Update GeoIp data internal + * + * @param jobParameter + * @throws Exception + */ + public void updateGeoIpData(final Datasource jobParameter) throws Exception { + if (!DatasourceState.AVAILABLE.equals(jobParameter.getState())) { + log.error("Invalid datasource state. Expecting {} but received {}", DatasourceState.AVAILABLE, jobParameter.getState()); + jobParameter.disable(); + jobParameter.getUpdateStats().setLastFailedAt(Instant.now()); + datasourceHelper.updateDatasource(jobParameter); + return; + } + + URL url = new URL(jobParameter.getEndpoint()); + DatasourceManifest manifest = DatasourceManifest.Builder.build(url); + + if (skipUpdate(jobParameter, manifest)) { + log.info("Skipping GeoIP database update. Update is not required for {}", jobParameter.getId()); + jobParameter.getUpdateStats().setLastSkippedAt(Instant.now()); + datasourceHelper.updateDatasource(jobParameter); + return; + } + + Instant startTime = Instant.now(); + String indexName = jobParameter.indexNameFor(manifest); + jobParameter.getIndices().add(indexName); + datasourceHelper.updateDatasource(jobParameter); + geoIpDataHelper.createIndexIfNotExists(indexName); + String[] fields; + List fieldsToStore; + try (CSVParser reader = geoIpDataHelper.getDatabaseReader(manifest)) { + Iterator iter = reader.iterator(); + fields = iter.next().values(); + if (fields.length < 2) { + throw new OpenSearchException("geoip database should have at least two fields"); + } + fieldsToStore = Arrays.asList(fields).subList(1, fields.length); + if (!jobParameter.isCompatible(fieldsToStore)) { + throw new OpenSearchException( + "new fields [{}] does not contain all old fields [{}]", + fieldsToStore.toString(), + jobParameter.getDatabase().getFields().toString() + ); + } + geoIpDataHelper.putGeoIpData(indexName, fields, iter, indexingBulkSize); + } + + Instant endTime = Instant.now(); + jobParameter.getDatabase().setProvider(manifest.getProvider()); + jobParameter.getDatabase().setMd5Hash(manifest.getMd5Hash()); + jobParameter.getDatabase().setUpdatedAt(Instant.ofEpochMilli(manifest.getUpdatedAt())); + jobParameter.getDatabase().setValidForInDays(manifest.getValidForInDays()); + jobParameter.getDatabase().setFields(fieldsToStore); + jobParameter.getUpdateStats().setLastSucceededAt(endTime); + jobParameter.getUpdateStats().setLastProcessingTimeInMillis(endTime.toEpochMilli() - startTime.toEpochMilli()); + datasourceHelper.updateDatasource(jobParameter); + log.info( + "GeoIP database creation succeeded for {} and took {} seconds", + jobParameter.getId(), + Duration.between(startTime, endTime) + ); + } + + /** + * Delete all indices except the one which are being used + * + * @param parameter + */ + public void deleteUnusedIndices(final Datasource parameter) { + try { + List deletedIndices = new ArrayList<>(); + for (String index : parameter.getIndices()) { + if (index.equals(parameter.currentIndexName())) { + continue; + } + + if (!clusterService.state().metadata().hasIndex(index)) { + deletedIndices.add(index); + continue; + } + + try { + if (geoIpDataHelper.deleteIndex(index).isAcknowledged()) { + deletedIndices.add(index); + } else { + log.error("Failed to delete an index [{}]", index); + } + } catch (Exception e) { + log.error("Failed to delete an index [{}]", index, e); + } + } + if (!deletedIndices.isEmpty()) { + parameter.getIndices().removeAll(deletedIndices); + datasourceHelper.updateDatasource(parameter); + } + } catch (Exception e) { + log.error("Failed to delete old indices for {}", parameter.getId(), e); + } + } + + /** + * Determine if update is needed or not + * + * Update is needed when all following conditions are met + * 1. MD5 hash value in datasource is different with MD5 hash value in manifest + * 2. updatedAt value in datasource is before updateAt value in manifest + * + * @param parameter + * @param manifest + * @return + */ + private boolean skipUpdate(final Datasource parameter, final DatasourceManifest manifest) { + if (manifest.getMd5Hash().equals(parameter.getDatabase().getMd5Hash())) { + return true; + } + + return parameter.getDatabase().getUpdatedAt().toEpochMilli() >= manifest.getUpdatedAt(); + } +} diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoCache.java b/src/main/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoCache.java index 2bcc1ed5..a9a09b2f 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoCache.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoCache.java @@ -18,7 +18,7 @@ import org.opensearch.common.unit.TimeValue; /** - * The in-memory cache for the ip2geo data. There should only be 1 instance of this class. + * The in-memory cache for the GeoIp data. There should only be 1 instance of this class. */ public class Ip2GeoCache { private static final TimeValue CACHING_PERIOD = TimeValue.timeValueMinutes(1); diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessor.java b/src/main/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessor.java index 0d6ddd8c..d2f7e1c8 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessor.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessor.java @@ -27,8 +27,7 @@ import org.opensearch.action.ActionListener; import org.opensearch.client.Client; import org.opensearch.cluster.service.ClusterService; -import org.opensearch.common.settings.ClusterSettings; -import org.opensearch.common.unit.TimeValue; +import org.opensearch.geospatial.annotation.VisibleForTesting; import org.opensearch.geospatial.ip2geo.common.DatasourceHelper; import org.opensearch.geospatial.ip2geo.common.DatasourceState; import org.opensearch.geospatial.ip2geo.common.GeoIpDataHelper; @@ -55,6 +54,8 @@ public final class Ip2GeoProcessor extends AbstractProcessor { private final Ip2GeoCache cache; private final Client client; private final ClusterService clusterService; + private final DatasourceHelper datasourceHelper; + private final GeoIpDataHelper geoIpDataHelper; private int maxBundleSize; private int maxConcurrentSearches; @@ -89,7 +90,9 @@ public Ip2GeoProcessor( final boolean firstOnly, final Ip2GeoCache cache, final Client client, - final ClusterService clusterService + final ClusterService clusterService, + final DatasourceHelper datasourceHelper, + final GeoIpDataHelper geoIpDataHelper ) { super(tag, description); this.field = field; @@ -101,6 +104,8 @@ public Ip2GeoProcessor( this.cache = cache; this.client = client; this.clusterService = clusterService; + this.datasourceHelper = datasourceHelper; + this.geoIpDataHelper = geoIpDataHelper; maxBundleSize = clusterService.getClusterSettings().get(Ip2GeoSettings.MAX_BUNDLE_SIZE); clusterService.getClusterSettings().addSettingsUpdateConsumer(Ip2GeoSettings.MAX_BUNDLE_SIZE, newValue -> maxBundleSize = newValue); @@ -119,12 +124,9 @@ public Ip2GeoProcessor( public void execute(IngestDocument ingestDocument, BiConsumer handler) { Object ip = ingestDocument.getFieldValue(field, Object.class, ignoreMissing); - if (ip == null && ignoreMissing) { + if (ip == null) { handler.accept(ingestDocument, null); return; - } else if (ip == null) { - handler.accept(null, new IllegalArgumentException("field [" + field + "] is null, cannot extract geo information.")); - return; } if (ip instanceof String) { @@ -147,14 +149,8 @@ public IngestDocument execute(IngestDocument ingestDocument) { throw new IllegalStateException("Not implemented"); } - /** - * Handle single ip - * - * @param ingestDocument the document - * @param handler the handler - * @param ip the ip - */ - private void executeInternal( + @VisibleForTesting + protected void executeInternal( final IngestDocument ingestDocument, final BiConsumer handler, final String ip @@ -168,28 +164,17 @@ private void executeInternal( return; } - DatasourceHelper.getDatasource(client, datasourceName, new ActionListener<>() { + datasourceHelper.getDatasource(datasourceName, new ActionListener<>() { @Override public void onResponse(final Datasource datasource) { - if (datasource == null) { - handler.accept(null, new IllegalStateException("datasource does not exist")); + if (handleInvalidDatasource(ingestDocument, datasource, handler)) { return; } - if (datasource.isExpired()) { - ingestDocument.setFieldValue(targetField, DATA_EXPIRED); - handler.accept(ingestDocument, null); - return; - } - - GeoIpDataHelper.getGeoData(client, datasource.currentIndexName(), ip, new ActionListener<>() { + geoIpDataHelper.getGeoIpData(datasource.currentIndexName(), ip, new ActionListener<>() { @Override public void onResponse(final Map stringObjectMap) { - cache.put(ip, datasourceName, stringObjectMap); - if (!stringObjectMap.isEmpty()) { - ingestDocument.setFieldValue(targetField, filteredGeoData(stringObjectMap, ip)); - } - handler.accept(ingestDocument, null); + handleSingleIp(ip, stringObjectMap, ingestDocument, handler); } @Override @@ -206,6 +191,37 @@ public void onFailure(final Exception e) { }); } + @VisibleForTesting + protected void handleSingleIp( + final String ip, + final Map stringObjectMap, + final IngestDocument ingestDocument, + final BiConsumer handler + ) { + cache.put(ip, datasourceName, stringObjectMap); + if (!stringObjectMap.isEmpty()) { + ingestDocument.setFieldValue(targetField, filteredGeoData(stringObjectMap, ip)); + } + handler.accept(ingestDocument, null); + } + + private Map filteredGeoData(final Map geoData, final String ip) { + Map filteredGeoData; + if (properties == null) { + filteredGeoData = geoData; + } else { + filteredGeoData = new HashMap<>(); + for (String property : this.properties) { + if (property.equals(PROPERTY_IP)) { + filteredGeoData.put(PROPERTY_IP, ip); + } else { + filteredGeoData.put(property, geoData.get(property)); + } + } + } + return filteredGeoData; + } + /** * Handle multiple ips * @@ -213,7 +229,8 @@ public void onFailure(final Exception e) { * @param handler the handler * @param ips the ip list */ - private void executeInternal( + @VisibleForTesting + protected void executeInternal( final IngestDocument ingestDocument, final BiConsumer handler, final List ips @@ -227,68 +244,21 @@ private void executeInternal( data.put(ipAddr, cache.get(ipAddr, datasourceName)); } List ipList = (List) ips; - DatasourceHelper.getDatasource(client, datasourceName, new ActionListener<>() { + datasourceHelper.getDatasource(datasourceName, new ActionListener<>() { @Override public void onResponse(final Datasource datasource) { - if (datasource == null) { - handler.accept(null, new IllegalStateException("datasource does not exist")); + if (handleInvalidDatasource(ingestDocument, datasource, handler)) { return; } - if (datasource.isExpired()) { - ingestDocument.setFieldValue(targetField, DATA_EXPIRED); - handler.accept(ingestDocument, null); - return; - } - GeoIpDataHelper.getGeoData( - client, + geoIpDataHelper.getGeoIpData( datasource.currentIndexName(), ipList.iterator(), maxBundleSize, maxConcurrentSearches, firstOnly, data, - new ActionListener<>() { - @Override - public void onResponse(final Object obj) { - for (Map.Entry> entry : data.entrySet()) { - cache.put(entry.getKey(), datasourceName, entry.getValue()); - } - - if (firstOnly) { - for (String ipAddr : ipList) { - Map geoData = data.get(ipAddr); - // GeoData for ipAddr won't be null - if (!geoData.isEmpty()) { - ingestDocument.setFieldValue(targetField, geoData); - handler.accept(ingestDocument, null); - return; - } - } - handler.accept(ingestDocument, null); - } else { - boolean match = false; - List> geoDataList = new ArrayList<>(ipList.size()); - for (String ipAddr : ipList) { - Map geoData = data.get(ipAddr); - // GeoData for ipAddr won't be null - geoDataList.add(geoData.isEmpty() ? null : geoData); - if (!geoData.isEmpty()) { - match = true; - } - } - if (match) { - ingestDocument.setFieldValue(targetField, geoDataList); - } - handler.accept(ingestDocument, null); - } - } - - @Override - public void onFailure(final Exception e) { - handler.accept(null, e); - } - } + listenerToAppendDataToDocument(data, ipList, ingestDocument, handler) ); } @@ -299,21 +269,74 @@ public void onFailure(final Exception e) { }); } - private Map filteredGeoData(final Map geoData, final String ip) { - Map filteredGeoData; - if (properties == null) { - filteredGeoData = geoData; - } else { - filteredGeoData = new HashMap<>(); - for (String property : this.properties) { - if (property.equals(PROPERTY_IP)) { - filteredGeoData.put(PROPERTY_IP, ip); + @VisibleForTesting + protected ActionListener>> listenerToAppendDataToDocument( + final Map> data, + final List ipList, + final IngestDocument ingestDocument, + final BiConsumer handler + ) { + return new ActionListener<>() { + @Override + public void onResponse(final Map> response) { + for (Map.Entry> entry : data.entrySet()) { + cache.put(entry.getKey(), datasourceName, entry.getValue()); + } + + if (firstOnly) { + for (String ipAddr : ipList) { + Map geoData = data.get(ipAddr); + // GeoData for ipAddr won't be null + if (!geoData.isEmpty()) { + ingestDocument.setFieldValue(targetField, geoData); + handler.accept(ingestDocument, null); + return; + } + } } else { - filteredGeoData.put(property, geoData.get(property)); + boolean match = false; + List> geoDataList = new ArrayList<>(ipList.size()); + for (String ipAddr : ipList) { + Map geoData = data.get(ipAddr); + // GeoData for ipAddr won't be null + geoDataList.add(geoData.isEmpty() ? null : geoData); + if (!geoData.isEmpty()) { + match = true; + } + } + if (match) { + ingestDocument.setFieldValue(targetField, geoDataList); + handler.accept(ingestDocument, null); + return; + } } + handler.accept(ingestDocument, null); + } + + @Override + public void onFailure(final Exception e) { + handler.accept(null, e); } + }; + } + + @VisibleForTesting + protected boolean handleInvalidDatasource( + final IngestDocument ingestDocument, + final Datasource datasource, + final BiConsumer handler + ) { + if (datasource == null) { + handler.accept(null, new IllegalStateException("datasource does not exist")); + return true; } - return filteredGeoData; + + if (datasource.isExpired()) { + ingestDocument.setFieldValue(targetField, DATA_EXPIRED); + handler.accept(ingestDocument, null); + return true; + } + return false; } @Override @@ -328,7 +351,8 @@ public static final class Factory implements Processor.Factory { private final Ip2GeoCache cache; private final Client client; private final IngestService ingestService; - private TimeValue timeout; + private final DatasourceHelper datasourceHelper; + private final GeoIpDataHelper geoIpDataHelper; /** * Default constructor @@ -337,14 +361,18 @@ public static final class Factory implements Processor.Factory { * @param client the client * @param ingestService the ingest service */ - public Factory(final Ip2GeoCache cache, final Client client, final IngestService ingestService) { + public Factory( + final Ip2GeoCache cache, + final Client client, + final IngestService ingestService, + final DatasourceHelper datasourceHelper, + final GeoIpDataHelper geoIpDataHelper + ) { this.cache = cache; this.client = client; this.ingestService = ingestService; - - timeout = Ip2GeoSettings.TIMEOUT_IN_SECONDS.get(client.settings()); - ClusterSettings clusterSettings = ingestService.getClusterService().getClusterSettings(); - clusterSettings.addSettingsUpdateConsumer(Ip2GeoSettings.TIMEOUT_IN_SECONDS, newValue -> timeout = newValue); + this.datasourceHelper = datasourceHelper; + this.geoIpDataHelper = geoIpDataHelper; } /** @@ -388,12 +416,14 @@ public Ip2GeoProcessor create( firstOnly, cache, client, - ingestService.getClusterService() + ingestService.getClusterService(), + datasourceHelper, + geoIpDataHelper ); } private void validate(final String processorTag, final String datasourceName, final List propertyNames) throws IOException { - Datasource datasource = DatasourceHelper.getDatasource(client, datasourceName, timeout); + Datasource datasource = datasourceHelper.getDatasource(datasourceName); if (datasource == null) { throw newConfigurationException(TYPE, processorTag, "datasource", "datasource [" + datasourceName + "] doesn't exist"); diff --git a/src/main/java/org/opensearch/geospatial/plugin/GeospatialPlugin.java b/src/main/java/org/opensearch/geospatial/plugin/GeospatialPlugin.java index bc6ce32d..3a77b300 100644 --- a/src/main/java/org/opensearch/geospatial/plugin/GeospatialPlugin.java +++ b/src/main/java/org/opensearch/geospatial/plugin/GeospatialPlugin.java @@ -39,9 +39,12 @@ import org.opensearch.geospatial.ip2geo.action.PutDatasourceAction; import org.opensearch.geospatial.ip2geo.action.PutDatasourceTransportAction; import org.opensearch.geospatial.ip2geo.action.RestPutDatasourceHandler; +import org.opensearch.geospatial.ip2geo.common.DatasourceHelper; +import org.opensearch.geospatial.ip2geo.common.GeoIpDataHelper; import org.opensearch.geospatial.ip2geo.common.Ip2GeoExecutorHelper; import org.opensearch.geospatial.ip2geo.common.Ip2GeoSettings; import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceRunner; +import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceUpdateService; import org.opensearch.geospatial.ip2geo.processor.Ip2GeoCache; import org.opensearch.geospatial.ip2geo.processor.Ip2GeoProcessor; import org.opensearch.geospatial.processor.FeatureProcessor; @@ -89,7 +92,13 @@ public Map getProcessors(Processor.Parameters paramet new Ip2GeoProcessor.Factory( new Ip2GeoCache(Ip2GeoSettings.CACHE_SIZE.get(parameters.client.settings())), parameters.client, - parameters.ingestService + parameters.ingestService, + new DatasourceHelper( + parameters.client, + parameters.ingestService.getClusterService().getSettings(), + parameters.ingestService.getClusterService().getClusterSettings() + ), + new GeoIpDataHelper(parameters.ingestService.getClusterService(), parameters.client) ) ) .immutableMap(); @@ -121,10 +130,23 @@ public Collection createComponents( IndexNameExpressionResolver indexNameExpressionResolver, Supplier repositoriesServiceSupplier ) { - // Initialize DatasourceUpdateRunner - DatasourceRunner.getJobRunnerInstance().initialize(clusterService, threadPool, client); - - return List.of(UploadStats.getInstance()); + GeoIpDataHelper geoIpDataHelper = new GeoIpDataHelper(clusterService, client); + DatasourceHelper datasourceHelper = new DatasourceHelper(client, clusterService.getSettings(), clusterService.getClusterSettings()); + DatasourceUpdateService datasourceUpdateService = new DatasourceUpdateService( + clusterService, + client, + datasourceHelper, + geoIpDataHelper + ); + Ip2GeoExecutorHelper ip2GeoExecutorHelper = new Ip2GeoExecutorHelper(threadPool); + /** + * We don't need to return datasource runner because it is used only by job scheduler and job scheduler + * does not use DI but it calls DatasourceExtension#getJobRunner to get DatasourceRunner instance. + */ + DatasourceRunner.getJobRunnerInstance() + .initialize(clusterService, client, datasourceUpdateService, ip2GeoExecutorHelper, datasourceHelper); + + return List.of(UploadStats.getInstance(), datasourceUpdateService, datasourceHelper, ip2GeoExecutorHelper, geoIpDataHelper); } @Override diff --git a/src/main/plugin-metadata/plugin-security.policy b/src/main/plugin-metadata/plugin-security.policy index 6e9e1030..2deecc06 100644 --- a/src/main/plugin-metadata/plugin-security.policy +++ b/src/main/plugin-metadata/plugin-security.policy @@ -9,4 +9,7 @@ grant { // needed by Ip2Geo datasource to get GeoIP database permission java.net.SocketPermission "*", "connect,resolve"; + + // needed for unit test + permission java.io.FilePermission "<>", "read"; }; diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/Ip2GeoTestCase.java b/src/test/java/org/opensearch/geospatial/ip2geo/Ip2GeoTestCase.java new file mode 100644 index 00000000..0c59a7df --- /dev/null +++ b/src/test/java/org/opensearch/geospatial/ip2geo/Ip2GeoTestCase.java @@ -0,0 +1,227 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.geospatial.ip2geo; + +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +import java.io.File; +import java.nio.file.Paths; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Locale; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiFunction; +import java.util.stream.Collectors; + +import org.junit.After; +import org.junit.Before; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.opensearch.action.ActionListener; +import org.opensearch.action.ActionRequest; +import org.opensearch.action.ActionResponse; +import org.opensearch.action.ActionType; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.Randomness; +import org.opensearch.common.SuppressForbidden; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.concurrent.OpenSearchExecutors; +import org.opensearch.geospatial.ip2geo.common.DatasourceHelper; +import org.opensearch.geospatial.ip2geo.common.DatasourceState; +import org.opensearch.geospatial.ip2geo.common.GeoIpDataHelper; +import org.opensearch.geospatial.ip2geo.common.Ip2GeoExecutorHelper; +import org.opensearch.geospatial.ip2geo.common.Ip2GeoSettings; +import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceUpdateService; +import org.opensearch.ingest.IngestService; +import org.opensearch.jobscheduler.spi.utils.LockService; +import org.opensearch.tasks.Task; +import org.opensearch.tasks.TaskListener; +import org.opensearch.test.client.NoOpNodeClient; +import org.opensearch.test.rest.RestActionTestCase; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportService; + +public abstract class Ip2GeoTestCase extends RestActionTestCase { + @Mock + protected ClusterService clusterService; + @Mock + protected DatasourceUpdateService datasourceUpdateService; + @Mock + protected DatasourceHelper datasourceHelper; + @Mock + protected Ip2GeoExecutorHelper ip2GeoExecutorHelper; + @Mock + protected ExecutorService executorService; + @Mock + protected GeoIpDataHelper geoIpDataHelper; + @Mock + protected ClusterState clusterState; + @Mock + protected Metadata metadata; + @Mock + protected IngestService ingestService; + @Mock + protected ActionFilters actionFilters; + @Mock + protected ThreadPool threadPool; + @Mock + protected TransportService transportService; + protected NoOpNodeClient client; + protected VerifyingClient verifyingClient; + protected LockService lockService; + protected ClusterSettings clusterSettings; + protected Settings settings; + private AutoCloseable openMocks; + + @Before + public void prepareIp2GeoTestCase() { + openMocks = MockitoAnnotations.openMocks(this); + settings = Settings.EMPTY; + client = new NoOpNodeClient(this.getTestName()); + verifyingClient = spy(new VerifyingClient(this.getTestName())); + clusterSettings = new ClusterSettings(settings, new HashSet<>(Ip2GeoSettings.settings())); + lockService = new LockService(client, clusterService); + when(clusterService.getSettings()).thenReturn(Settings.EMPTY); + when(clusterService.getClusterSettings()).thenReturn(clusterSettings); + when(clusterService.state()).thenReturn(clusterState); + when(clusterState.metadata()).thenReturn(metadata); + when(clusterState.routingTable()).thenReturn(RoutingTable.EMPTY_ROUTING_TABLE); + when(ip2GeoExecutorHelper.forDatasourceUpdate()).thenReturn(executorService); + when(ingestService.getClusterService()).thenReturn(clusterService); + when(threadPool.generic()).thenReturn(OpenSearchExecutors.newDirectExecutorService()); + } + + @After + public void clean() throws Exception { + openMocks.close(); + client.close(); + verifyingClient.close(); + } + + public DatasourceState randomStateExcept(DatasourceState state) { + assertNotNull(state); + return Arrays.stream(DatasourceState.values()) + .sequential() + .filter(s -> !s.equals(state)) + .collect(Collectors.toList()) + .get(Randomness.createSecure().nextInt(DatasourceState.values().length - 2)); + } + + public String randomIpAddress() { + return String.format( + Locale.ROOT, + "%d.%d.%d.%d", + Randomness.get().nextInt(255), + Randomness.get().nextInt(255), + Randomness.get().nextInt(255), + Randomness.get().nextInt(255) + ); + } + + @SuppressForbidden(reason = "unit test") + public String sampleManifestUrl() throws Exception { + return Paths.get(this.getClass().getClassLoader().getResource("ip2geo/manifest.json").toURI()).toUri().toURL().toExternalForm(); + } + + @SuppressForbidden(reason = "unit test") + public String sampleManifestUrlWithInvalidUrl() throws Exception { + return Paths.get(this.getClass().getClassLoader().getResource("ip2geo/manifest_invalid_url.json").toURI()) + .toUri() + .toURL() + .toExternalForm(); + } + + @SuppressForbidden(reason = "unit test") + public File sampleIp2GeoFile() { + return new File(this.getClass().getClassLoader().getResource("ip2geo/sample_valid.csv").getFile()); + } + + /** + * Temporary class of VerifyingClient until this PR(https://github.com/opensearch-project/OpenSearch/pull/7167) + * is merged in OpenSearch core + */ + public static class VerifyingClient extends NoOpNodeClient { + AtomicReference executeVerifier = new AtomicReference<>(); + AtomicReference executeLocallyVerifier = new AtomicReference<>(); + + public VerifyingClient(String testName) { + super(testName); + reset(); + } + + /** + * Clears any previously set verifier functions set by {@link #setExecuteVerifier(BiFunction)} and/or + * {@link #setExecuteLocallyVerifier(BiFunction)}. These functions are replaced with functions which will throw an + * {@link AssertionError} if called. + */ + public void reset() { + executeVerifier.set((arg1, arg2) -> { throw new AssertionError(); }); + executeLocallyVerifier.set((arg1, arg2) -> { throw new AssertionError(); }); + } + + /** + * Sets the function that will be called when {@link #doExecute(ActionType, ActionRequest, ActionListener)} is called. The given + * function should return either a subclass of {@link ActionResponse} or {@code null}. + * @param verifier A function which is called in place of {@link #doExecute(ActionType, ActionRequest, ActionListener)} + */ + public void setExecuteVerifier( + BiFunction, Request, Response> verifier + ) { + executeVerifier.set(verifier); + } + + @Override + public void doExecute( + ActionType action, + Request request, + ActionListener listener + ) { + listener.onResponse((Response) executeVerifier.get().apply(action, request)); + } + + /** + * Sets the function that will be called when {@link #executeLocally(ActionType, ActionRequest, TaskListener)}is called. The given + * function should return either a subclass of {@link ActionResponse} or {@code null}. + * @param verifier A function which is called in place of {@link #executeLocally(ActionType, ActionRequest, TaskListener)} + */ + public void setExecuteLocallyVerifier( + BiFunction, Request, Response> verifier + ) { + executeLocallyVerifier.set(verifier); + } + + @Override + public Task executeLocally( + ActionType action, + Request request, + ActionListener listener + ) { + listener.onResponse((Response) executeLocallyVerifier.get().apply(action, request)); + return null; + } + + @Override + public Task executeLocally( + ActionType action, + Request request, + TaskListener listener + ) { + listener.onResponse(null, (Response) executeLocallyVerifier.get().apply(action, request)); + return null; + } + + } +} diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceRequestTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceRequestTests.java index 3198f26a..0d5a8b22 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceRequestTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceRequestTests.java @@ -11,25 +11,31 @@ import java.util.Locale; import org.opensearch.action.ActionRequestValidationException; +import org.opensearch.common.Randomness; +import org.opensearch.common.io.stream.BytesStreamInput; +import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.unit.TimeValue; -import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.geospatial.GeospatialTestHelper; +import org.opensearch.geospatial.ip2geo.Ip2GeoTestCase; -public class PutDatasourceRequestTests extends OpenSearchTestCase { +public class PutDatasourceRequestTests extends Ip2GeoTestCase { - public void testValidateInvalidUrl() { - PutDatasourceRequest request = new PutDatasourceRequest("test"); + public void testValidateWithInvalidUrl() { + String datasourceName = GeospatialTestHelper.randomLowerCaseString(); + PutDatasourceRequest request = new PutDatasourceRequest(datasourceName); request.setEndpoint("invalidUrl"); - request.setUpdateIntervalInDays(TimeValue.ZERO); + request.setUpdateIntervalInDays(TimeValue.timeValueDays(1)); ActionRequestValidationException exception = request.validate(); assertEquals(1, exception.validationErrors().size()); assertEquals("Invalid URL format is provided", exception.validationErrors().get(0)); } - public void testValidateInvalidManifestFile() { - PutDatasourceRequest request = new PutDatasourceRequest("test"); - request.setDatasourceName("test"); - request.setEndpoint("https://hi.com"); - request.setUpdateIntervalInDays(TimeValue.ZERO); + public void testValidateWithInvalidManifestFile() { + String datasourceName = GeospatialTestHelper.randomLowerCaseString(); + String domain = GeospatialTestHelper.randomLowerCaseString(); + PutDatasourceRequest request = new PutDatasourceRequest(datasourceName); + request.setEndpoint(String.format(Locale.ROOT, "https://%s.com", domain)); + request.setUpdateIntervalInDays(TimeValue.timeValueDays(1)); ActionRequestValidationException exception = request.validate(); assertEquals(1, exception.validationErrors().size()); assertEquals( @@ -37,4 +43,74 @@ public void testValidateInvalidManifestFile() { exception.validationErrors().get(0) ); } + + public void testValidate() throws Exception { + String datasourceName = GeospatialTestHelper.randomLowerCaseString(); + PutDatasourceRequest request = new PutDatasourceRequest(datasourceName); + request.setEndpoint(sampleManifestUrl()); + request.setUpdateIntervalInDays(TimeValue.timeValueDays(1)); + assertNull(request.validate()); + } + + public void testValidateWithZeroUpdateInterval() throws Exception { + String datasourceName = GeospatialTestHelper.randomLowerCaseString(); + PutDatasourceRequest request = new PutDatasourceRequest(datasourceName); + request.setEndpoint(sampleManifestUrl()); + request.setUpdateIntervalInDays(TimeValue.timeValueDays(0)); + + // Run + ActionRequestValidationException exception = request.validate(); + + // Verify + assertEquals(1, exception.validationErrors().size()); + assertEquals( + String.format(Locale.ROOT, "Update interval should be equal to or larger than 1 day"), + exception.validationErrors().get(0) + ); + } + + public void testValidateWithLargeUpdateInterval() throws Exception { + String datasourceName = GeospatialTestHelper.randomLowerCaseString(); + PutDatasourceRequest request = new PutDatasourceRequest(datasourceName); + request.setEndpoint(sampleManifestUrl()); + request.setUpdateIntervalInDays(TimeValue.timeValueDays(30)); + + // Run + ActionRequestValidationException exception = request.validate(); + + // Verify + assertEquals(1, exception.validationErrors().size()); + assertTrue(exception.validationErrors().get(0).contains("should be smaller")); + } + + public void testValidateWithInvalidUrlInsideManifest() throws Exception { + String datasourceName = GeospatialTestHelper.randomLowerCaseString(); + PutDatasourceRequest request = new PutDatasourceRequest(datasourceName); + request.setEndpoint(sampleManifestUrlWithInvalidUrl()); + request.setUpdateIntervalInDays(TimeValue.timeValueDays(1)); + + // Run + ActionRequestValidationException exception = request.validate(); + + // Verify + assertEquals(1, exception.validationErrors().size()); + assertTrue(exception.validationErrors().get(0).contains("Invalid URL format")); + } + + public void testStreamInOut() throws Exception { + String datasourceName = GeospatialTestHelper.randomLowerCaseString(); + String domain = GeospatialTestHelper.randomLowerCaseString(); + PutDatasourceRequest request = new PutDatasourceRequest(datasourceName); + request.setEndpoint(String.format(Locale.ROOT, "https://%s.com", domain)); + request.setUpdateIntervalInDays(TimeValue.timeValueDays(Randomness.get().nextInt(30) + 1)); + + // Run + BytesStreamOutput output = new BytesStreamOutput(); + request.writeTo(output); + BytesStreamInput input = new BytesStreamInput(output.bytes().toBytesRef().bytes); + PutDatasourceRequest copiedRequest = new PutDatasourceRequest(input); + + // Verify + assertEquals(request, copiedRequest); + } } diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceTransportActionTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceTransportActionTests.java new file mode 100644 index 00000000..2990ef90 --- /dev/null +++ b/src/test/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceTransportActionTests.java @@ -0,0 +1,139 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.geospatial.ip2geo.action; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.time.Instant; +import java.util.Arrays; + +import org.apache.commons.csv.CSVFormat; +import org.apache.commons.csv.CSVParser; +import org.junit.Before; +import org.opensearch.ResourceAlreadyExistsException; +import org.opensearch.action.ActionListener; +import org.opensearch.action.DocWriteRequest; +import org.opensearch.action.index.IndexRequest; +import org.opensearch.action.support.WriteRequest; +import org.opensearch.action.support.master.AcknowledgedResponse; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.geospatial.GeospatialTestHelper; +import org.opensearch.geospatial.ip2geo.Ip2GeoTestCase; +import org.opensearch.geospatial.ip2geo.common.DatasourceManifest; +import org.opensearch.geospatial.ip2geo.common.DatasourceState; +import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource; +import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceExtension; +import org.opensearch.index.engine.VersionConflictEngineException; +import org.opensearch.tasks.Task; + +public class PutDatasourceTransportActionTests extends Ip2GeoTestCase { + private PutDatasourceTransportAction action; + + @Before + public void init() { + action = new PutDatasourceTransportAction( + transportService, + actionFilters, + verifyingClient, + clusterService, + threadPool, + settings, + clusterSettings, + datasourceHelper, + geoIpDataHelper + ); + } + + public void testDoExecute() throws Exception { + Task task = mock(Task.class); + PutDatasourceRequest request = new PutDatasourceRequest("test"); + request.setEndpoint(sampleManifestUrl()); + request.setUpdateIntervalInDays(TimeValue.timeValueDays(1)); + ActionListener listener = mock(ActionListener.class); + verifyingClient.setExecuteVerifier((actionResponse, actionRequest) -> { + assertTrue(actionRequest instanceof IndexRequest); + IndexRequest indexRequest = (IndexRequest) actionRequest; + assertEquals(DatasourceExtension.JOB_INDEX_NAME, indexRequest.index()); + assertEquals(request.getDatasourceName(), indexRequest.id()); + assertEquals(WriteRequest.RefreshPolicy.IMMEDIATE, indexRequest.getRefreshPolicy()); + assertEquals(DocWriteRequest.OpType.CREATE, indexRequest.opType()); + return null; + }); + action.doExecute(task, request, listener); + verify(verifyingClient).index(any(IndexRequest.class), any(ActionListener.class)); + verify(listener).onResponse(new AcknowledgedResponse(true)); + } + + public void testIndexResponseListenerFailure() { + Datasource datasource = new Datasource(); + ActionListener listener = mock(ActionListener.class); + action.getIndexResponseListener(datasource, listener) + .onFailure( + new VersionConflictEngineException( + null, + GeospatialTestHelper.randomLowerCaseString(), + GeospatialTestHelper.randomLowerCaseString() + ) + ); + verify(listener).onFailure(any(ResourceAlreadyExistsException.class)); + } + + public void testCreateDatasourceInvalidState() throws Exception { + Datasource datasource = new Datasource(); + datasource.setState(randomStateExcept(DatasourceState.CREATING)); + datasource.getUpdateStats().setLastFailedAt(null); + action.createDatasource(datasource); + + assertEquals(DatasourceState.CREATE_FAILED, datasource.getState()); + assertNotNull(datasource.getUpdateStats().getLastFailedAt()); + verify(datasourceHelper).updateDatasource(datasource); + } + + public void testCreateDatasourceWithException() throws Exception { + Datasource datasource = new Datasource(); + action.createDatasource(datasource); + + assertEquals(DatasourceState.CREATE_FAILED, datasource.getState()); + assertNotNull(datasource.getUpdateStats().getLastFailedAt()); + verify(datasourceHelper).updateDatasource(datasource); + } + + public void testCreateDatasource() throws Exception { + try (CSVParser csvParser = CSVParser.parse(sampleIp2GeoFile(), StandardCharsets.UTF_8, CSVFormat.RFC4180)) { + when(geoIpDataHelper.getDatabaseReader(any(DatasourceManifest.class))).thenReturn(csvParser); + Datasource datasource = new Datasource(); + datasource.getUpdateStats().setLastSucceededAt(null); + datasource.getUpdateStats().setLastProcessingTimeInMillis(null); + datasource.disable(); + datasource.setEndpoint(sampleManifestUrl()); + + DatasourceManifest manifest = DatasourceManifest.Builder.build(new URL(sampleManifestUrl())); + + action.createDatasource(datasource); + + assertEquals(DatasourceState.AVAILABLE, datasource.getState()); + assertEquals(Arrays.asList("country_name"), datasource.getDatabase().getFields()); + assertEquals(manifest.getProvider(), datasource.getDatabase().getProvider()); + assertEquals(manifest.getMd5Hash(), datasource.getDatabase().getMd5Hash()); + assertEquals(manifest.getValidForInDays(), datasource.getDatabase().getValidForInDays()); + assertEquals(Instant.ofEpochMilli(manifest.getUpdatedAt()), datasource.getDatabase().getUpdatedAt()); + assertEquals(1, datasource.getIndices().size()); + assertNotNull(datasource.getUpdateStats().getLastSucceededAt()); + assertNotNull(datasource.getUpdateStats().getLastProcessingTimeInMillis()); + assertTrue(datasource.isEnabled()); + verify(datasourceHelper, times(2)).updateDatasource(datasource); + } + } +} diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/action/RestPutDatasourceHandlerIT.java b/src/test/java/org/opensearch/geospatial/ip2geo/action/RestPutDatasourceHandlerIT.java new file mode 100644 index 00000000..2c49ceaa --- /dev/null +++ b/src/test/java/org/opensearch/geospatial/ip2geo/action/RestPutDatasourceHandlerIT.java @@ -0,0 +1,54 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.geospatial.ip2geo.action; + +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Locale; + +import org.junit.BeforeClass; +import org.opensearch.client.Request; +import org.opensearch.client.Response; +import org.opensearch.common.SuppressForbidden; +import org.opensearch.geospatial.GeospatialRestTestCase; +import org.opensearch.geospatial.GeospatialTestHelper; +import org.opensearch.rest.RestStatus; + +@SuppressForbidden(reason = "integ test") +public class RestPutDatasourceHandlerIT extends GeospatialRestTestCase { + private static Path manifestFile; + + @BeforeClass + public static void init() throws Exception { + manifestFile = createManifestFile(); + } + + public void testCreateDatasource() throws Exception { + String datasourceName = GeospatialTestHelper.randomLowerCaseString(); + String url = String.join("/", "_plugins", "geospatial", "ip2geo", "datasource", datasourceName); + Request request = new Request("PUT", url); + request.setJsonEntity(String.format(Locale.ROOT, "{\"endpoint\":\"%s\"}", manifestFile.toUri().toURL().toExternalForm())); + Response response = client().performRequest(request); + assertEquals(RestStatus.OK, RestStatus.fromCode(response.getStatusLine().getStatusCode())); + } + + private static Path createManifestFile() throws Exception { + Path zipFile = Paths.get(RestPutDatasourceHandlerIT.class.getClassLoader().getResource("ip2geo/sample_valid.zip").toURI()); + Path tempManifestFile = Paths.get( + RestPutDatasourceHandlerIT.class.getClassLoader().getResource("ip2geo/manifest_template.json").toURI() + ); + String tempManifest = new String(Files.readAllBytes(tempManifestFile), StandardCharsets.UTF_8); + tempManifest = tempManifest.replaceFirst("URL", zipFile.toUri().toURL().toExternalForm()); + Path path = createTempFile("ip2geo", ".json"); + Files.write(path, tempManifest.getBytes(StandardCharsets.UTF_8)); + return path; + } +} diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/action/RestPutDatasourceHandlerTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/action/RestPutDatasourceHandlerTests.java index 82cae4ad..0460cdf5 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/action/RestPutDatasourceHandlerTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/action/RestPutDatasourceHandlerTests.java @@ -8,67 +8,78 @@ package org.opensearch.geospatial.ip2geo.action; +import static org.opensearch.geospatial.shared.URLBuilder.URL_DELIMITER; +import static org.opensearch.geospatial.shared.URLBuilder.getPluginURLPrefix; + import java.util.HashSet; +import java.util.concurrent.atomic.AtomicBoolean; import org.junit.Before; +import org.opensearch.common.SuppressForbidden; import org.opensearch.common.bytes.BytesArray; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.xcontent.XContentType; +import org.opensearch.geospatial.GeospatialTestHelper; import org.opensearch.geospatial.ip2geo.common.Ip2GeoSettings; import org.opensearch.rest.RestRequest; import org.opensearch.test.rest.FakeRestRequest; import org.opensearch.test.rest.RestActionTestCase; +@SuppressForbidden(reason = "unit test") public class RestPutDatasourceHandlerTests extends RestActionTestCase { + private String path; private RestPutDatasourceHandler action; @Before public void setupAction() { action = new RestPutDatasourceHandler(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, new HashSet(Ip2GeoSettings.settings()))); controller().registerHandler(action); + path = String.join(URL_DELIMITER, getPluginURLPrefix(), "ip2geo/datasource/%s"); } public void testPrepareRequest() { - String content = "{\"endpoint\":\"https://test.com\", \"update_interval\":1}"; - RestRequest restRequest = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.PUT) - .withPath("/_geoip/datasource/test") + String datasourceName = GeospatialTestHelper.randomLowerCaseString(); + String content = "{\"endpoint\":\"https://test.com\", \"update_interval_in_days\":1}"; + RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.PUT) + .withPath(String.format(path, datasourceName)) .withContent(new BytesArray(content), XContentType.JSON) .build(); + AtomicBoolean isExecuted = new AtomicBoolean(false); - verifyingClient.setExecuteVerifier((actionResponse, actionRequest) -> { + verifyingClient.setExecuteLocallyVerifier((actionResponse, actionRequest) -> { assertTrue(actionRequest instanceof PutDatasourceRequest); PutDatasourceRequest putDatasourceRequest = (PutDatasourceRequest) actionRequest; assertEquals("https://test.com", putDatasourceRequest.getEndpoint()); assertEquals(TimeValue.timeValueDays(1), putDatasourceRequest.getUpdateIntervalInDays()); - assertEquals("test", putDatasourceRequest.getDatasourceName()); + assertEquals(datasourceName, putDatasourceRequest.getDatasourceName()); + isExecuted.set(true); return null; }); - dispatchRequest(restRequest); + dispatchRequest(request); + assertTrue(isExecuted.get()); } public void testPrepareRequestDefaultValue() { - RestRequest restRequestWithEmptyContent = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.PUT) - .withPath("/_geoip/datasource/test") + String datasourceName = GeospatialTestHelper.randomLowerCaseString(); + RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.PUT) + .withPath(String.format(path, datasourceName)) .withContent(new BytesArray("{}"), XContentType.JSON) .build(); - - RestRequest restRequestWithoutContent = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.PUT) - .withPath("/_geoip/datasource/test") - .build(); - - verifyingClient.setExecuteVerifier((actionResponse, actionRequest) -> { + AtomicBoolean isExecuted = new AtomicBoolean(false); + verifyingClient.setExecuteLocallyVerifier((actionResponse, actionRequest) -> { assertTrue(actionRequest instanceof PutDatasourceRequest); PutDatasourceRequest putDatasourceRequest = (PutDatasourceRequest) actionRequest; assertEquals("https://geoip.maps.opensearch.org/v1/geolite-2/manifest.json", putDatasourceRequest.getEndpoint()); assertEquals(TimeValue.timeValueDays(3), putDatasourceRequest.getUpdateIntervalInDays()); - assertEquals("test", putDatasourceRequest.getDatasourceName()); + assertEquals(datasourceName, putDatasourceRequest.getDatasourceName()); + isExecuted.set(true); return null; }); - dispatchRequest(restRequestWithEmptyContent); - dispatchRequest(restRequestWithoutContent); + dispatchRequest(request); + assertTrue(isExecuted.get()); } } diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/common/DatasourceHelperTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/common/DatasourceHelperTests.java index 02ff8d58..4d9e3663 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/common/DatasourceHelperTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/common/DatasourceHelperTests.java @@ -8,35 +8,54 @@ package org.opensearch.geospatial.ip2geo.common; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.IOException; import java.time.Instant; import java.time.temporal.ChronoUnit; +import java.util.HashSet; +import org.junit.Before; +import org.opensearch.action.ActionListener; import org.opensearch.action.DocWriteRequest; import org.opensearch.action.get.GetRequest; +import org.opensearch.action.get.GetResponse; import org.opensearch.action.index.IndexRequest; -import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.bytes.BytesReference; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.xcontent.json.JsonXContent; +import org.opensearch.geospatial.GeospatialTestHelper; +import org.opensearch.geospatial.ip2geo.Ip2GeoTestCase; import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource; import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceExtension; import org.opensearch.index.IndexNotFoundException; import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule; -import org.opensearch.test.rest.RestActionTestCase; -public class DatasourceHelperTests extends RestActionTestCase { +public class DatasourceHelperTests extends Ip2GeoTestCase { + private DatasourceHelper datasourceHelper; + + @Before + public void init() { + datasourceHelper = new DatasourceHelper( + verifyingClient, + Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, new HashSet<>(Ip2GeoSettings.settings())) + ); + } public void testUpdateDatasource() throws Exception { - Instant previousTime = Instant.now().minusMillis(1); + String datasourceName = GeospatialTestHelper.randomLowerCaseString(); Datasource datasource = new Datasource( - "testId", - previousTime, - null, - false, - null, - null, - DatasourceState.PREPARING, - null, - null, - null + datasourceName, + new IntervalSchedule(Instant.now().truncatedTo(ChronoUnit.MILLIS), 1, ChronoUnit.DAYS), + "https://test.com" ); + Instant previousTime = Instant.now().minusMillis(1); + datasource.setLastUpdateTime(previousTime); verifyingClient.setExecuteVerifier((actionResponse, actionRequest) -> { assertTrue(actionRequest instanceof IndexRequest); @@ -47,22 +66,45 @@ public void testUpdateDatasource() throws Exception { return null; }); - DatasourceHelper.updateDatasource(verifyingClient, datasource, TimeValue.timeValueSeconds(30)); + datasourceHelper.updateDatasource(datasource); assertTrue(previousTime.isBefore(datasource.getLastUpdateTime())); } public void testGetDatasourceException() throws Exception { + Datasource datasource = setupClientForGetRequest(true, new IndexNotFoundException(DatasourceExtension.JOB_INDEX_NAME)); + assertNull(datasourceHelper.getDatasource(datasource.getId())); + } + + public void testGetDatasourceExist() throws Exception { + Datasource datasource = setupClientForGetRequest(true, null); + assertEquals(datasource, datasourceHelper.getDatasource(datasource.getId())); + } + + public void testGetDatasourceNotExist() throws Exception { + Datasource datasource = setupClientForGetRequest(false, null); + assertNull(datasourceHelper.getDatasource(datasource.getId())); + } + + public void testGetDatasourceExistWithListener() { + Datasource datasource = setupClientForGetRequest(true, null); + ActionListener listener = mock(ActionListener.class); + datasourceHelper.getDatasource(datasource.getId(), listener); + verify(listener).onResponse(eq(datasource)); + } + + public void testGetDatasourceNotExistWithListener() { + Datasource datasource = setupClientForGetRequest(false, null); + ActionListener listener = mock(ActionListener.class); + datasourceHelper.getDatasource(datasource.getId(), listener); + verify(listener).onResponse(null); + } + + private Datasource setupClientForGetRequest(final boolean isExist, final RuntimeException exception) { + String datasourceName = GeospatialTestHelper.randomLowerCaseString(); Datasource datasource = new Datasource( - "testId", - Instant.now(), - null, - false, - new IntervalSchedule(Instant.now(), 1, ChronoUnit.DAYS), - "https://test.com", - DatasourceState.PREPARING, - null, - null, - null + datasourceName, + new IntervalSchedule(Instant.now().truncatedTo(ChronoUnit.MILLIS), 1, ChronoUnit.DAYS), + "https://test.com" ); verifyingClient.setExecuteVerifier((actionResponse, actionRequest) -> { @@ -70,9 +112,20 @@ public void testGetDatasourceException() throws Exception { GetRequest request = (GetRequest) actionRequest; assertEquals(datasource.getId(), request.id()); assertEquals(DatasourceExtension.JOB_INDEX_NAME, request.index()); - throw new IndexNotFoundException(DatasourceExtension.JOB_INDEX_NAME); + GetResponse response = mock(GetResponse.class); + when(response.isExists()).thenReturn(isExist); + try { + when(response.getSourceAsBytesRef()).thenReturn( + BytesReference.bytes(datasource.toXContent(JsonXContent.contentBuilder(), null)) + ); + } catch (IOException e) { + throw new RuntimeException(e); + } + if (exception != null) { + throw exception; + } + return response; }); - - assertNull(DatasourceHelper.getDatasource(verifyingClient, datasource.getId(), TimeValue.timeValueSeconds(30))); + return datasource; } } diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/common/GeoIpDataHelperTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/common/GeoIpDataHelperTests.java index 6b65026d..49369d78 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/common/GeoIpDataHelperTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/common/GeoIpDataHelperTests.java @@ -8,15 +8,308 @@ package org.opensearch.geospatial.ip2geo.common; -import org.opensearch.test.OpenSearchTestCase; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.File; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Locale; +import java.util.Map; + +import org.apache.commons.csv.CSVFormat; +import org.apache.commons.csv.CSVParser; +import org.apache.commons.csv.CSVRecord; +import org.apache.lucene.search.TotalHits; +import org.junit.Before; +import org.mockito.ArgumentCaptor; +import org.opensearch.OpenSearchException; +import org.opensearch.action.ActionListener; +import org.opensearch.action.ActionRequest; +import org.opensearch.action.ActionType; +import org.opensearch.action.admin.indices.create.CreateIndexRequest; +import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; +import org.opensearch.action.admin.indices.forcemerge.ForceMergeRequest; +import org.opensearch.action.admin.indices.refresh.RefreshRequest; +import org.opensearch.action.bulk.BulkRequest; +import org.opensearch.action.bulk.BulkResponse; +import org.opensearch.action.search.MultiSearchRequest; +import org.opensearch.action.search.MultiSearchResponse; +import org.opensearch.action.search.SearchRequest; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.action.support.IndicesOptions; +import org.opensearch.action.support.WriteRequest; +import org.opensearch.common.Randomness; +import org.opensearch.common.SuppressForbidden; +import org.opensearch.common.bytes.BytesReference; +import org.opensearch.geospatial.GeospatialTestHelper; +import org.opensearch.geospatial.ip2geo.Ip2GeoTestCase; +import org.opensearch.index.query.QueryBuilders; +import org.opensearch.search.SearchHit; +import org.opensearch.search.SearchHits; + +@SuppressForbidden(reason = "unit test") +public class GeoIpDataHelperTests extends Ip2GeoTestCase { + private static final String IP_RANGE_FIELD_NAME = "_cidr"; + private static final String DATA_FIELD_NAME = "_data"; + private GeoIpDataHelper noOpsGeoIpDataHelper; + private GeoIpDataHelper verifyingGeoIpDataHelper; + + @Before + public void init() { + noOpsGeoIpDataHelper = new GeoIpDataHelper(clusterService, client); + verifyingGeoIpDataHelper = new GeoIpDataHelper(clusterService, verifyingClient); + } + + public void testCreateIndexIfNotExistsWithExistingIndex() { + String index = GeospatialTestHelper.randomLowerCaseString(); + when(metadata.hasIndex(index)).thenReturn(true); + verifyingClient.setExecuteVerifier((actionResponse, actionRequest) -> { throw new RuntimeException("Shouldn't get called"); }); + verifyingGeoIpDataHelper.createIndexIfNotExists(index); + } + + public void testCreateIndexIfNotExistsWithoutExistingIndex() { + String index = GeospatialTestHelper.randomLowerCaseString(); + when(metadata.hasIndex(index)).thenReturn(false); + verifyingClient.setExecuteVerifier((actionResponse, actionRequest) -> { + assertTrue(actionRequest instanceof CreateIndexRequest); + CreateIndexRequest request = (CreateIndexRequest) actionRequest; + assertEquals(index, request.index()); + assertEquals("1", request.settings().get("index.number_of_shards")); + assertEquals("0-all", request.settings().get("index.auto_expand_replicas")); + assertEquals( + "{\"dynamic\": false,\"properties\": {\"_cidr\": {\"type\": \"ip_range\",\"doc_values\": false}}}", + request.mappings() + ); + return null; + }); + verifyingGeoIpDataHelper.createIndexIfNotExists(index); + } -public class GeoIpDataHelperTests extends OpenSearchTestCase { public void testCreateDocument() { String[] names = { "ip", "country", "city" }; String[] values = { "1.0.0.0/25", "USA", "Seattle" }; assertEquals( "{\"_cidr\":\"1.0.0.0/25\",\"_data\":{\"country\":\"USA\",\"city\":\"Seattle\"}}", - GeoIpDataHelper.createDocument(names, values) + noOpsGeoIpDataHelper.createDocument(names, values) + ); + } + + public void testGetDatabaseReader() throws Exception { + File zipFile = new File(this.getClass().getClassLoader().getResource("ip2geo/sample_valid.zip").getFile()); + DatasourceManifest manifest = new DatasourceManifest( + zipFile.toURI().toURL().toExternalForm(), + "sample_valid.csv", + "fake_md5", + 1l, + Instant.now().toEpochMilli(), + "tester" + ); + CSVParser parser = noOpsGeoIpDataHelper.getDatabaseReader(manifest); + String[] expectedHeader = { "network", "country_name" }; + assertArrayEquals(expectedHeader, parser.iterator().next().values()); + String[] expectedValues = { "1.0.0.0/24", "Australia" }; + assertArrayEquals(expectedValues, parser.iterator().next().values()); + } + + public void testGetDatabaseReaderNoFile() throws Exception { + File zipFile = new File(this.getClass().getClassLoader().getResource("ip2geo/sample_valid.zip").getFile()); + DatasourceManifest manifest = new DatasourceManifest( + zipFile.toURI().toURL().toExternalForm(), + "no_file.csv", + "fake_md5", + 1l, + Instant.now().toEpochMilli(), + "tester" + ); + OpenSearchException exception = expectThrows(OpenSearchException.class, () -> noOpsGeoIpDataHelper.getDatabaseReader(manifest)); + assertTrue(exception.getMessage().contains("does not exist")); + } + + public void testDeleteIndex() { + String index = GeospatialTestHelper.randomLowerCaseString(); + verifyingClient.setExecuteVerifier((actionResponse, actionRequest) -> { + assertTrue(actionRequest instanceof DeleteIndexRequest); + DeleteIndexRequest request = (DeleteIndexRequest) actionRequest; + assertEquals(1, request.indices().length); + assertEquals(index, request.indices()[0]); + assertEquals(IndicesOptions.LENIENT_EXPAND_OPEN, request.indicesOptions()); + return null; + }); + verifyingGeoIpDataHelper.deleteIndex(index); + } + + public void testPutGeoIpData() throws Exception { + String index = GeospatialTestHelper.randomLowerCaseString(); + verifyingClient.setExecuteVerifier((actionResponse, actionRequest) -> { + if (actionRequest instanceof BulkRequest) { + BulkRequest request = (BulkRequest) actionRequest; + assertEquals(1, request.numberOfActions()); + assertEquals(WriteRequest.RefreshPolicy.WAIT_UNTIL, request.getRefreshPolicy()); + BulkResponse response = mock(BulkResponse.class); + when(response.hasFailures()).thenReturn(false); + return response; + } else if (actionRequest instanceof RefreshRequest) { + RefreshRequest request = (RefreshRequest) actionRequest; + assertEquals(1, request.indices().length); + assertEquals(index, request.indices()[0]); + return null; + } else if (actionRequest instanceof ForceMergeRequest) { + ForceMergeRequest request = (ForceMergeRequest) actionRequest; + assertEquals(1, request.indices().length); + assertEquals(index, request.indices()[0]); + assertEquals(1, request.maxNumSegments()); + return null; + } else { + throw new RuntimeException("invalid request is called"); + } + }); + try (CSVParser csvParser = CSVParser.parse(sampleIp2GeoFile(), StandardCharsets.UTF_8, CSVFormat.RFC4180)) { + Iterator iterator = csvParser.iterator(); + String[] fields = iterator.next().values(); + verifyingGeoIpDataHelper.putGeoIpData(index, fields, iterator, 1); + } + } + + public void testGetSingleGeoIpData() { + String indexName = GeospatialTestHelper.randomLowerCaseString(); + String ip = randomIpAddress(); + verifyingClient.setExecuteVerifier((actionResponse, actionRequest) -> { + assert actionRequest instanceof SearchRequest; + SearchRequest request = (SearchRequest) actionRequest; + assertEquals("_local", request.preference()); + assertEquals(1, request.source().size()); + assertEquals(QueryBuilders.termQuery(IP_RANGE_FIELD_NAME, ip), request.source().query()); + + String data = String.format( + Locale.ROOT, + "{\"%s\":\"1.0.0.1/16\",\"%s\":{\"city\":\"seattle\"}}", + IP_RANGE_FIELD_NAME, + DATA_FIELD_NAME + ); + SearchHit searchHit = new SearchHit(1); + searchHit.sourceRef(BytesReference.fromByteBuffer(ByteBuffer.wrap(data.getBytes(StandardCharsets.UTF_8)))); + SearchHit[] searchHitArray = { searchHit }; + SearchHits searchHits = new SearchHits(searchHitArray, new TotalHits(1l, TotalHits.Relation.EQUAL_TO), 1); + + SearchResponse response = mock(SearchResponse.class); + when(response.getHits()).thenReturn(searchHits); + return response; + }); + ActionListener> listener = mock(ActionListener.class); + verifyingGeoIpDataHelper.getGeoIpData(indexName, ip, listener); + ArgumentCaptor> captor = ArgumentCaptor.forClass(Map.class); + verify(listener).onResponse(captor.capture()); + assertEquals("seattle", captor.getValue().get("city")); + } + + public void testGetMultipleGeoIpDataNoSearchRequired() { + String indexName = GeospatialTestHelper.randomLowerCaseString(); + String ip1 = randomIpAddress(); + String ip2 = randomIpAddress(); + Iterator ipIterator = Arrays.asList(ip1, ip2).iterator(); + int maxBundleSize = 1; + int maxConcurrentSearches = 1; + boolean firstOnly = true; + Map> geoData = new HashMap<>(); + geoData.put(ip1, Map.of("city", "Seattle")); + geoData.put(ip2, Map.of("city", "Hawaii")); + ActionListener>> actionListener = mock(ActionListener.class); + + // Run + verifyingGeoIpDataHelper.getGeoIpData( + indexName, + ipIterator, + maxBundleSize, + maxConcurrentSearches, + firstOnly, + geoData, + actionListener + ); + + // Verify + verify(actionListener).onResponse(geoData); + } + + public void testGetMultipleGeoIpData() { + String indexName = GeospatialTestHelper.randomLowerCaseString(); + int dataSize = Randomness.get().nextInt(10) + 1; + List ips = new ArrayList<>(); + for (int i = 0; i < dataSize; i++) { + ips.add(randomIpAddress()); + } + int maxBundleSize = Randomness.get().nextInt(11) + 1; + int maxConcurrentSearches = 1; + boolean firstOnly = false; + Map> geoData = new HashMap<>(); + ActionListener>> actionListener = mock(ActionListener.class); + + List cities = new ArrayList<>(); + verifyingClient.setExecuteVerifier((actionResponse, actionRequest) -> { + assert actionRequest instanceof MultiSearchRequest; + MultiSearchRequest request = (MultiSearchRequest) actionRequest; + assertEquals(maxConcurrentSearches, request.maxConcurrentSearchRequests()); + assertTrue(request.requests().size() == maxBundleSize || request.requests().size() == dataSize % maxBundleSize); + for (SearchRequest searchRequest : request.requests()) { + assertEquals("_local", searchRequest.preference()); + assertEquals(1, searchRequest.source().size()); + } + + MultiSearchResponse.Item[] items = new MultiSearchResponse.Item[request.requests().size()]; + for (int i = 0; i < request.requests().size(); i++) { + String city = GeospatialTestHelper.randomLowerCaseString(); + cities.add(city); + String data = String.format( + Locale.ROOT, + "{\"%s\":\"1.0.0.1/16\",\"%s\":{\"city\":\"%s\"}}", + IP_RANGE_FIELD_NAME, + DATA_FIELD_NAME, + city + ); + SearchHit searchHit = new SearchHit(1); + searchHit.sourceRef(BytesReference.fromByteBuffer(ByteBuffer.wrap(data.getBytes(StandardCharsets.UTF_8)))); + SearchHit[] searchHitArray = { searchHit }; + SearchHits searchHits = new SearchHits(searchHitArray, new TotalHits(1l, TotalHits.Relation.EQUAL_TO), 1); + SearchResponse searchResponse = mock(SearchResponse.class); + when(searchResponse.getHits()).thenReturn(searchHits); + MultiSearchResponse.Item item = mock(MultiSearchResponse.Item.class); + when(item.isFailure()).thenReturn(false); + when(item.getResponse()).thenReturn(searchResponse); + items[i] = item; + } + MultiSearchResponse response = mock(MultiSearchResponse.class); + when(response.getResponses()).thenReturn(items); + return response; + }); + + // Run + verifyingGeoIpDataHelper.getGeoIpData( + indexName, + ips.iterator(), + maxBundleSize, + maxConcurrentSearches, + firstOnly, + geoData, + actionListener + ); + + // Verify + verify(verifyingClient, times((dataSize + maxBundleSize - 1) / maxBundleSize)).execute( + any(ActionType.class), + any(ActionRequest.class), + any(ActionListener.class) ); + for (int i = 0; i < dataSize; i++) { + assertEquals(cities.get(i), geoData.get(ips.get(i)).get("city")); + } } } diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceExtensionTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceExtensionTests.java new file mode 100644 index 00000000..3632d9e9 --- /dev/null +++ b/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceExtensionTests.java @@ -0,0 +1,47 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.geospatial.ip2geo.jobscheduler; + +import static org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceExtension.JOB_INDEX_NAME; + +import java.time.Instant; +import java.time.temporal.ChronoUnit; + +import org.opensearch.common.Randomness; +import org.opensearch.common.xcontent.XContentFactory; +import org.opensearch.geospatial.GeospatialTestHelper; +import org.opensearch.geospatial.ip2geo.Ip2GeoTestCase; +import org.opensearch.jobscheduler.spi.JobDocVersion; +import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule; + +public class DatasourceExtensionTests extends Ip2GeoTestCase { + public void testBasic() throws Exception { + DatasourceExtension extension = new DatasourceExtension(); + assertEquals("scheduler_geospatial_ip2geo_datasource", extension.getJobType()); + assertEquals(JOB_INDEX_NAME, extension.getJobIndex()); + assertEquals(DatasourceRunner.getJobRunnerInstance(), extension.getJobRunner()); + } + + public void testParser() throws Exception { + DatasourceExtension extension = new DatasourceExtension(); + String id = GeospatialTestHelper.randomLowerCaseString(); + IntervalSchedule schedule = new IntervalSchedule(Instant.now().truncatedTo(ChronoUnit.MILLIS), 1, ChronoUnit.DAYS); + String endpoint = GeospatialTestHelper.randomLowerCaseString(); + Datasource datasource = new Datasource(id, schedule, endpoint); + + Datasource anotherDatasource = (Datasource) extension.getJobParser() + .parse( + createParser(datasource.toXContent(XContentFactory.jsonBuilder(), null)), + GeospatialTestHelper.randomLowerCaseString(), + new JobDocVersion(Randomness.get().nextLong(), Randomness.get().nextLong(), Randomness.get().nextLong()) + ); + + assertTrue(datasource.equals(anotherDatasource)); + } +} diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceRunnerTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceRunnerTests.java new file mode 100644 index 00000000..953b49a7 --- /dev/null +++ b/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceRunnerTests.java @@ -0,0 +1,95 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.geospatial.ip2geo.jobscheduler; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.mockito.internal.verification.VerificationModeFactory.times; +import static org.opensearch.geospatial.GeospatialTestHelper.randomLowerCaseString; + +import java.time.Instant; + +import org.opensearch.geospatial.ip2geo.Ip2GeoTestCase; +import org.opensearch.jobscheduler.spi.JobDocVersion; +import org.opensearch.jobscheduler.spi.JobExecutionContext; +import org.opensearch.jobscheduler.spi.ScheduledJobParameter; + +public class DatasourceRunnerTests extends Ip2GeoTestCase { + public void testRunJobInvalidClass() { + JobExecutionContext jobExecutionContext = mock(JobExecutionContext.class); + ScheduledJobParameter jobParameter = mock(ScheduledJobParameter.class); + DatasourceRunner.getJobRunnerInstance() + .initialize(clusterService, client, datasourceUpdateService, ip2GeoExecutorHelper, datasourceHelper); + expectThrows(IllegalStateException.class, () -> DatasourceRunner.getJobRunnerInstance().runJob(jobParameter, jobExecutionContext)); + } + + public void testRunJob() { + JobDocVersion jobDocVersion = new JobDocVersion(randomInt(), randomInt(), randomInt()); + String jobIndexName = randomLowerCaseString(); + String jobId = randomLowerCaseString(); + JobExecutionContext jobExecutionContext = new JobExecutionContext(Instant.now(), jobDocVersion, lockService, jobIndexName, jobId); + Datasource datasource = new Datasource(); + DatasourceRunner.getJobRunnerInstance() + .initialize(clusterService, client, datasourceUpdateService, ip2GeoExecutorHelper, datasourceHelper); + + // Run + DatasourceRunner.getJobRunnerInstance().runJob(datasource, jobExecutionContext); + + // Verify + verify(executorService).submit(any(Runnable.class)); + } + + public void testUpdateDatasourceNull() throws Exception { + Datasource datasource = new Datasource(); + DatasourceRunner.getJobRunnerInstance() + .initialize(clusterService, client, datasourceUpdateService, ip2GeoExecutorHelper, datasourceHelper); + + // Run + DatasourceRunner.getJobRunnerInstance().updateDatasource(datasource); + + // Verify + verify(datasourceUpdateService, never()).deleteUnusedIndices(any()); + } + + public void testUpdateDatasource() throws Exception { + Datasource datasource = new Datasource(); + datasource.setId(randomLowerCaseString()); + when(datasourceHelper.getDatasource(datasource.getId())).thenReturn(datasource); + DatasourceRunner.getJobRunnerInstance() + .initialize(clusterService, client, datasourceUpdateService, ip2GeoExecutorHelper, datasourceHelper); + + // Run + DatasourceRunner.getJobRunnerInstance().updateDatasource(datasource); + + // Verify + verify(datasourceUpdateService, times(2)).deleteUnusedIndices(datasource); + verify(datasourceUpdateService).updateGeoIpData(datasource); + } + + public void testUpdateDatasourceExceptionHandling() throws Exception { + Datasource datasource = new Datasource(); + datasource.setId(randomLowerCaseString()); + datasource.getUpdateStats().setLastFailedAt(null); + when(datasourceHelper.getDatasource(datasource.getId())).thenReturn(datasource); + doThrow(new RuntimeException("test failure")).when(datasourceUpdateService).deleteUnusedIndices(any()); + DatasourceRunner.getJobRunnerInstance() + .initialize(clusterService, client, datasourceUpdateService, ip2GeoExecutorHelper, datasourceHelper); + + // Run + DatasourceRunner.getJobRunnerInstance().updateDatasource(datasource); + + // Verify + assertNotNull(datasource.getUpdateStats().getLastFailedAt()); + verify(datasourceHelper).updateDatasource(datasource); + } +} diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceTests.java index 0faaa3e2..0353f0fa 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceTests.java @@ -15,15 +15,41 @@ import java.time.Instant; import java.time.temporal.ChronoUnit; import java.util.ArrayList; +import java.util.Arrays; import java.util.Locale; import org.opensearch.common.Randomness; +import org.opensearch.common.xcontent.XContentFactory; import org.opensearch.geospatial.GeospatialTestHelper; import org.opensearch.geospatial.ip2geo.common.DatasourceManifest; import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule; import org.opensearch.test.OpenSearchTestCase; public class DatasourceTests extends OpenSearchTestCase { + + public void testParser() throws Exception { + String id = GeospatialTestHelper.randomLowerCaseString(); + IntervalSchedule schedule = new IntervalSchedule(Instant.now().truncatedTo(ChronoUnit.MILLIS), 1, ChronoUnit.DAYS); + String endpoint = GeospatialTestHelper.randomLowerCaseString(); + Datasource datasource = new Datasource(id, schedule, endpoint); + datasource.enable(); + datasource.getDatabase().setFields(Arrays.asList("field1", "field2")); + datasource.getDatabase().setProvider("test_provider"); + datasource.getDatabase().setUpdatedAt(Instant.now().truncatedTo(ChronoUnit.MILLIS)); + datasource.getDatabase().setMd5Hash(GeospatialTestHelper.randomLowerCaseString()); + datasource.getDatabase().setValidForInDays(1l); + datasource.getUpdateStats().setLastProcessingTimeInMillis(Randomness.get().nextLong()); + datasource.getUpdateStats().setLastSucceededAt(Instant.now().truncatedTo(ChronoUnit.MILLIS)); + datasource.getUpdateStats().setLastSkippedAt(Instant.now().truncatedTo(ChronoUnit.MILLIS)); + datasource.getUpdateStats().setLastFailedAt(Instant.now().truncatedTo(ChronoUnit.MILLIS)); + + Datasource anotherDatasource = Datasource.PARSER.parse( + createParser(datasource.toXContent(XContentFactory.jsonBuilder(), null)), + null + ); + assertTrue(datasource.equals(anotherDatasource)); + } + public void testCurrentIndexName() { String id = GeospatialTestHelper.randomLowerCaseString(); Instant now = Instant.now(); @@ -56,9 +82,33 @@ public void testGetIndexNameFor() { public void testGetJitter() { Datasource datasource = new Datasource(); - datasource.setSchedule(new IntervalSchedule(Instant.now(), Randomness.get().nextInt(31), ChronoUnit.DAYS)); - long intervalInMinutes = datasource.getSchedule().getInterval() * 60 * 24; + datasource.setSchedule(new IntervalSchedule(Instant.now(), Randomness.get().ints(1, 31).findFirst().getAsInt(), ChronoUnit.DAYS)); + long intervalInMinutes = datasource.getSchedule().getInterval() * 60l * 24l; double sixMinutes = 6; assertTrue(datasource.getJitter() * intervalInMinutes <= sixMinutes); } + + public void testIsExpired() { + Datasource datasource = new Datasource(); + // never expire if validForInDays is null + assertFalse(datasource.isExpired()); + + datasource.getDatabase().setValidForInDays(1l); + + // if last skipped date is null, use only last succeeded date to determine + datasource.getUpdateStats().setLastSucceededAt(Instant.now().minus(1, ChronoUnit.DAYS)); + assertTrue(datasource.isExpired()); + + // use the latest date between last skipped date and last succeeded date to determine + datasource.getUpdateStats().setLastSkippedAt(Instant.now()); + assertFalse(datasource.isExpired()); + datasource.getUpdateStats().setLastSkippedAt(Instant.now().minus(1, ChronoUnit.DAYS)); + datasource.getUpdateStats().setLastSucceededAt(Instant.now()); + assertFalse(datasource.isExpired()); + } + + public void testLockDurationSeconds() { + Datasource datasource = new Datasource(); + assertNotNull(datasource.getLockDurationSeconds()); + } } diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateServiceTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateServiceTests.java new file mode 100644 index 00000000..dfd3ce0d --- /dev/null +++ b/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateServiceTests.java @@ -0,0 +1,163 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.geospatial.ip2geo.jobscheduler; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.File; +import java.nio.charset.StandardCharsets; +import java.time.Instant; +import java.util.Arrays; + +import org.apache.commons.csv.CSVFormat; +import org.apache.commons.csv.CSVParser; +import org.junit.Before; +import org.opensearch.OpenSearchException; +import org.opensearch.action.support.master.AcknowledgedResponse; +import org.opensearch.common.SuppressForbidden; +import org.opensearch.geospatial.GeospatialTestHelper; +import org.opensearch.geospatial.ip2geo.Ip2GeoTestCase; +import org.opensearch.geospatial.ip2geo.common.DatasourceManifest; +import org.opensearch.geospatial.ip2geo.common.DatasourceState; + +@SuppressForbidden(reason = "unit test") +public class DatasourceUpdateServiceTests extends Ip2GeoTestCase { + private DatasourceUpdateService datasourceUpdateService; + + @Before + public void init() { + datasourceUpdateService = new DatasourceUpdateService(clusterService, client, datasourceHelper, geoIpDataHelper); + } + + public void testUpdateDatasourceInvalidState() throws Exception { + Datasource datasource = new Datasource(); + datasource.enable(); + datasource.getUpdateStats().setLastFailedAt(null); + datasource.setState(randomStateExcept(DatasourceState.AVAILABLE)); + datasourceUpdateService.updateGeoIpData(datasource); + assertFalse(datasource.isEnabled()); + assertNotNull(datasource.getUpdateStats().getLastFailedAt()); + verify(datasourceHelper).updateDatasource(datasource); + } + + public void testUpdateDatasourceSkip() throws Exception { + File manifestFile = new File(this.getClass().getClassLoader().getResource("ip2geo/manifest.json").getFile()); + DatasourceManifest manifest = DatasourceManifest.Builder.build(manifestFile.toURI().toURL()); + + Datasource datasource = new Datasource(); + datasource.setState(DatasourceState.AVAILABLE); + datasource.getUpdateStats().setLastSkippedAt(null); + datasource.getDatabase().setUpdatedAt(Instant.ofEpochMilli(manifest.getUpdatedAt())); + datasource.getDatabase().setMd5Hash(manifest.getMd5Hash()); + datasource.setEndpoint(manifestFile.toURI().toURL().toExternalForm()); + + // Run + datasourceUpdateService.updateGeoIpData(datasource); + + // Verify + assertNotNull(datasource.getUpdateStats().getLastSkippedAt()); + verify(datasourceHelper).updateDatasource(datasource); + } + + public void testUpdateDatasourceInvalidFile() throws Exception { + File manifestFile = new File(this.getClass().getClassLoader().getResource("ip2geo/manifest.json").getFile()); + DatasourceManifest manifest = DatasourceManifest.Builder.build(manifestFile.toURI().toURL()); + + File sampleFile = new File( + this.getClass().getClassLoader().getResource("ip2geo/sample_invalid_less_than_two_fields.csv").getFile() + ); + when(geoIpDataHelper.getDatabaseReader(any())).thenReturn(CSVParser.parse(sampleFile, StandardCharsets.UTF_8, CSVFormat.RFC4180)); + + Datasource datasource = new Datasource(); + datasource.setState(DatasourceState.AVAILABLE); + datasource.getDatabase().setUpdatedAt(Instant.ofEpochMilli(manifest.getUpdatedAt() - 1)); + datasource.getDatabase().setMd5Hash(manifest.getMd5Hash().substring(1)); + datasource.getDatabase().setFields(Arrays.asList("country_name")); + datasource.setEndpoint(manifestFile.toURI().toURL().toExternalForm()); + + // Run + expectThrows(OpenSearchException.class, () -> datasourceUpdateService.updateGeoIpData(datasource)); + } + + public void testUpdateDatasourceIncompatibleFields() throws Exception { + File manifestFile = new File(this.getClass().getClassLoader().getResource("ip2geo/manifest.json").getFile()); + DatasourceManifest manifest = DatasourceManifest.Builder.build(manifestFile.toURI().toURL()); + + File sampleFile = new File(this.getClass().getClassLoader().getResource("ip2geo/sample_valid.csv").getFile()); + when(geoIpDataHelper.getDatabaseReader(any())).thenReturn(CSVParser.parse(sampleFile, StandardCharsets.UTF_8, CSVFormat.RFC4180)); + + Datasource datasource = new Datasource(); + datasource.setState(DatasourceState.AVAILABLE); + datasource.getDatabase().setUpdatedAt(Instant.ofEpochMilli(manifest.getUpdatedAt() - 1)); + datasource.getDatabase().setMd5Hash(manifest.getMd5Hash().substring(1)); + datasource.getDatabase().setFields(Arrays.asList("country_name", "additional_field")); + datasource.setEndpoint(manifestFile.toURI().toURL().toExternalForm()); + + // Run + expectThrows(OpenSearchException.class, () -> datasourceUpdateService.updateGeoIpData(datasource)); + } + + public void testUpdateDatasource() throws Exception { + File manifestFile = new File(this.getClass().getClassLoader().getResource("ip2geo/manifest.json").getFile()); + DatasourceManifest manifest = DatasourceManifest.Builder.build(manifestFile.toURI().toURL()); + + File sampleFile = new File(this.getClass().getClassLoader().getResource("ip2geo/sample_valid.csv").getFile()); + when(geoIpDataHelper.getDatabaseReader(any())).thenReturn(CSVParser.parse(sampleFile, StandardCharsets.UTF_8, CSVFormat.RFC4180)); + + Datasource datasource = new Datasource(); + datasource.setState(DatasourceState.AVAILABLE); + datasource.getDatabase().setUpdatedAt(Instant.ofEpochMilli(manifest.getUpdatedAt() - 1)); + datasource.getDatabase().setMd5Hash(manifest.getMd5Hash().substring(1)); + datasource.getDatabase().setFields(Arrays.asList("country_name")); + datasource.setEndpoint(manifestFile.toURI().toURL().toExternalForm()); + datasource.getUpdateStats().setLastSucceededAt(null); + datasource.getUpdateStats().setLastProcessingTimeInMillis(null); + + // Run + datasourceUpdateService.updateGeoIpData(datasource); + + // Verify + assertEquals(manifest.getProvider(), datasource.getDatabase().getProvider()); + assertEquals(manifest.getMd5Hash(), datasource.getDatabase().getMd5Hash()); + assertEquals(Instant.ofEpochMilli(manifest.getUpdatedAt()), datasource.getDatabase().getUpdatedAt()); + assertEquals(manifest.getValidForInDays(), datasource.getDatabase().getValidForInDays()); + assertNotNull(datasource.getUpdateStats().getLastSucceededAt()); + assertNotNull(datasource.getUpdateStats().getLastProcessingTimeInMillis()); + verify(datasourceHelper, times(2)).updateDatasource(datasource); + } + + public void testDeleteUnusedIndices() throws Exception { + String datasourceName = GeospatialTestHelper.randomLowerCaseString(); + String indexPrefix = String.format(".ip2geo-data.%s.", datasourceName); + Instant now = Instant.now(); + String currentIndex = indexPrefix + now.toEpochMilli(); + String oldIndex = indexPrefix + now.minusMillis(1).toEpochMilli(); + String lingeringIndex = indexPrefix + now.minusMillis(2).toEpochMilli(); + Datasource datasource = new Datasource(); + datasource.setId(datasourceName); + datasource.getIndices().add(currentIndex); + datasource.getIndices().add(oldIndex); + datasource.getIndices().add(lingeringIndex); + datasource.getDatabase().setUpdatedAt(now); + + when(metadata.hasIndex(currentIndex)).thenReturn(true); + when(metadata.hasIndex(oldIndex)).thenReturn(true); + when(metadata.hasIndex(lingeringIndex)).thenReturn(false); + when(geoIpDataHelper.deleteIndex(any())).thenReturn(new AcknowledgedResponse(true)); + + datasourceUpdateService.deleteUnusedIndices(datasource); + + assertEquals(1, datasource.getIndices().size()); + assertEquals(currentIndex, datasource.getIndices().get(0)); + verify(datasourceHelper).updateDatasource(datasource); + } +} diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessorTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessorTests.java new file mode 100644 index 00000000..0e8158aa --- /dev/null +++ b/src/test/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessorTests.java @@ -0,0 +1,343 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.geospatial.ip2geo.processor; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyMap; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.function.BiConsumer; + +import org.junit.Before; +import org.mockito.ArgumentCaptor; +import org.opensearch.OpenSearchException; +import org.opensearch.action.ActionListener; +import org.opensearch.common.Randomness; +import org.opensearch.geospatial.GeospatialTestHelper; +import org.opensearch.geospatial.ip2geo.Ip2GeoTestCase; +import org.opensearch.geospatial.ip2geo.common.DatasourceState; +import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource; +import org.opensearch.ingest.IngestDocument; + +public class Ip2GeoProcessorTests extends Ip2GeoTestCase { + private static final String DEFAULT_TARGET_FIELD = "ip2geo"; + private static final String CONFIG_DATASOURCE_KEY = "datasource"; + private static final String CONFIG_FIELD_KEY = "field"; + private static final List SUPPORTED_FIELDS = Arrays.asList("city", "country"); + private Ip2GeoProcessor.Factory factory; + private Ip2GeoCache ip2GeoCache; + + @Before + public void init() { + ip2GeoCache = new Ip2GeoCache(1); + factory = new Ip2GeoProcessor.Factory(ip2GeoCache, client, ingestService, datasourceHelper, geoIpDataHelper); + } + + public void testCreateWithNoDatasource() { + Map config = new HashMap<>(); + config.put("field", "ip"); + config.put(CONFIG_DATASOURCE_KEY, "no_datasource"); + OpenSearchException exception = expectThrows( + OpenSearchException.class, + () -> factory.create( + Collections.emptyMap(), + GeospatialTestHelper.randomLowerCaseString(), + GeospatialTestHelper.randomLowerCaseString(), + config + ) + ); + assertTrue(exception.getDetailedMessage().contains("doesn't exist")); + } + + public void testCreateWithInvalidDatasourceState() { + Datasource datasource = new Datasource(); + datasource.setId(GeospatialTestHelper.randomLowerCaseString()); + datasource.setState(randomStateExcept(DatasourceState.AVAILABLE)); + OpenSearchException exception = expectThrows(OpenSearchException.class, () -> createProcessor(datasource, Collections.emptyMap())); + assertTrue(exception.getDetailedMessage().contains("available state")); + } + + public void testCreateWithInvalidProperties() { + Map config = new HashMap<>(); + config.put("properties", Arrays.asList("ip", "invalid_property")); + OpenSearchException exception = expectThrows( + OpenSearchException.class, + () -> createProcessor(GeospatialTestHelper.randomLowerCaseString(), config) + ); + assertTrue(exception.getDetailedMessage().contains("property")); + } + + public void testExecuteWithNoIpAndIgnoreMissing() throws Exception { + String datasourceName = GeospatialTestHelper.randomLowerCaseString(); + Map config = new HashMap<>(); + config.put("ignore_missing", true); + Ip2GeoProcessor processor = createProcessor(datasourceName, config); + IngestDocument document = new IngestDocument(new HashMap<>(), new HashMap<>()); + BiConsumer handler = (doc, e) -> { + assertEquals(document, doc); + assertNull(e); + }; + processor.execute(document, handler); + } + + public void testExecuteWithNoIp() throws Exception { + String datasourceName = GeospatialTestHelper.randomLowerCaseString(); + Map config = new HashMap<>(); + Ip2GeoProcessor processor = createProcessor(datasourceName, config); + IngestDocument document = new IngestDocument(new HashMap<>(), new HashMap<>()); + BiConsumer handler = (doc, e) -> {}; + IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> processor.execute(document, handler)); + assertTrue(exception.getMessage().contains("not present")); + } + + public void testExecuteWithNonStringValue() throws Exception { + String datasourceName = GeospatialTestHelper.randomLowerCaseString(); + Ip2GeoProcessor processor = createProcessor(datasourceName, Collections.emptyMap()); + Map source = new HashMap<>(); + source.put("ip", Randomness.get().nextInt()); + IngestDocument document = new IngestDocument(source, new HashMap<>()); + BiConsumer handler = (doc, e) -> {}; + IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> processor.execute(document, handler)); + assertTrue(exception.getMessage().contains("string")); + } + + public void testExecuteWithCacheHit() throws Exception { + String datasourceName = GeospatialTestHelper.randomLowerCaseString(); + Map config = new HashMap<>(); + Ip2GeoProcessor processor = createProcessor(datasourceName, config); + + Map source = new HashMap<>(); + String ip = randomIpAddress(); + source.put("ip", ip); + IngestDocument document = new IngestDocument(source, new HashMap<>()); + + Map cachedData = new HashMap<>(); + String cityName = GeospatialTestHelper.randomLowerCaseString(); + cachedData.put("city", cityName); + ip2GeoCache.put(ip, datasourceName, cachedData); + + BiConsumer handler = (doc, e) -> { + assertEquals(cityName, doc.getFieldValue(DEFAULT_TARGET_FIELD + ".city", String.class)); + }; + processor.execute(document, handler); + } + + public void testExecuteWithNullDatasource() throws Exception { + BiConsumer handler = (doc, e) -> { + assertNull(doc); + assertTrue(e.getMessage().contains("datasource does not exist")); + }; + getActionListener(Collections.emptyMap(), handler).onResponse(null); + } + + public void testExecuteWithExpiredDatasource() throws Exception { + Datasource datasource = mock(Datasource.class); + when(datasource.isExpired()).thenReturn(true); + BiConsumer handler = (doc, e) -> { + assertEquals("ip2geo_data_expired", doc.getFieldValue(DEFAULT_TARGET_FIELD + ".error", String.class)); + assertNull(e); + }; + getActionListener(Collections.emptyMap(), handler).onResponse(datasource); + } + + public void testExecute() throws Exception { + Map ip2geoData = new HashMap<>(); + for (String field : SUPPORTED_FIELDS) { + ip2geoData.put(field, GeospatialTestHelper.randomLowerCaseString()); + } + + Datasource datasource = mock(Datasource.class); + when(datasource.isExpired()).thenReturn(false); + when(datasource.currentIndexName()).thenReturn(GeospatialTestHelper.randomLowerCaseString()); + BiConsumer handler = (doc, e) -> { + assertEquals( + ip2geoData.get(SUPPORTED_FIELDS.get(0)), + doc.getFieldValue(DEFAULT_TARGET_FIELD + "." + SUPPORTED_FIELDS.get(0), String.class) + ); + for (int i = 1; i < SUPPORTED_FIELDS.size(); i++) { + assertNull(doc.getFieldValue(DEFAULT_TARGET_FIELD + "." + SUPPORTED_FIELDS.get(i), String.class, true)); + } + assertNull(e); + }; + Map config = Map.of("properties", Arrays.asList(SUPPORTED_FIELDS.get(0))); + getActionListener(config, handler).onResponse(datasource); + + ArgumentCaptor>> captor = ArgumentCaptor.forClass(ActionListener.class); + verify(geoIpDataHelper).getGeoIpData(anyString(), anyString(), captor.capture()); + captor.getValue().onResponse(ip2geoData); + } + + private ActionListener getActionListener( + final Map config, + final BiConsumer handler + ) throws Exception { + String datasourceName = GeospatialTestHelper.randomLowerCaseString(); + Ip2GeoProcessor processor = createProcessor(datasourceName, config); + + Map source = new HashMap<>(); + String ip = randomIpAddress(); + source.put("ip", ip); + IngestDocument document = new IngestDocument(source, new HashMap<>()); + + processor.execute(document, handler); + ArgumentCaptor> captor = ArgumentCaptor.forClass(ActionListener.class); + verify(datasourceHelper).getDatasource(eq(datasourceName), captor.capture()); + return captor.getValue(); + } + + public void testExecuteNotImplemented() throws Exception { + String datasourceName = GeospatialTestHelper.randomLowerCaseString(); + Ip2GeoProcessor processor = createProcessor(datasourceName, Collections.emptyMap()); + IngestDocument document = new IngestDocument(Collections.emptyMap(), Collections.emptyMap()); + Exception e = expectThrows(IllegalStateException.class, () -> processor.execute(document)); + assertTrue(e.getMessage().contains("Not implemented")); + } + + public void testGenerateDataToAppendWithFirstOnlyOption() throws Exception { + String datasourceName = GeospatialTestHelper.randomLowerCaseString(); + Ip2GeoProcessor processor = createProcessor(datasourceName, Map.of("first_only", true)); + List ips = new ArrayList<>(); + Map> data = new HashMap<>(); + for (int i = 0; i < 3; i++) { + String ip = randomIpAddress(); + ips.add(ip); + data.put(ip, i == 0 ? Collections.emptyMap() : Map.of("city", GeospatialTestHelper.randomLowerCaseString())); + } + Map source = new HashMap<>(); + String ip = randomIpAddress(); + source.put("ip", ip); + IngestDocument document = new IngestDocument(source, new HashMap<>()); + BiConsumer handler = mock(BiConsumer.class); + processor.listenerToAppendDataToDocument(data, ips, document, handler).onResponse(data); + verify(handler).accept(document, null); + assertEquals(data.get(ips.get(1)), document.getFieldValue(DEFAULT_TARGET_FIELD, Map.class)); + } + + public void testGenerateDataToAppendWithOutFirstOnlyOption() throws Exception { + String datasourceName = GeospatialTestHelper.randomLowerCaseString(); + Ip2GeoProcessor processor = createProcessor(datasourceName, Map.of("first_only", false)); + List ips = new ArrayList<>(); + Map> data = new HashMap<>(); + for (int i = 0; i < 3; i++) { + String ip = randomIpAddress(); + ips.add(ip); + data.put(ip, i == 0 ? Collections.emptyMap() : Map.of("city", GeospatialTestHelper.randomLowerCaseString())); + } + Map source = new HashMap<>(); + String ip = randomIpAddress(); + source.put("ip", ip); + IngestDocument document = new IngestDocument(source, new HashMap<>()); + BiConsumer handler = mock(BiConsumer.class); + processor.listenerToAppendDataToDocument(data, ips, document, handler).onResponse(data); + verify(handler).accept(document, null); + List> expectedResult = Arrays.asList(null, data.get(ips.get(1)), data.get(ips.get(2))); + assertEquals(expectedResult, document.getFieldValue(DEFAULT_TARGET_FIELD, List.class)); + } + + public void testGenerateDataToAppendWithNoData() throws Exception { + String datasourceName = GeospatialTestHelper.randomLowerCaseString(); + Ip2GeoProcessor processor = createProcessor(datasourceName, Map.of("first_only", Randomness.get().nextInt() % 2 == 0)); + List ips = new ArrayList<>(); + Map> data = new HashMap<>(); + for (int i = 0; i < 3; i++) { + String ip = randomIpAddress(); + ips.add(ip); + data.put(ip, Collections.emptyMap()); + } + Map source = new HashMap<>(); + String ip = randomIpAddress(); + source.put("ip", ip); + IngestDocument document = new IngestDocument(source, new HashMap<>()); + BiConsumer handler = mock(BiConsumer.class); + processor.listenerToAppendDataToDocument(data, ips, document, handler).onResponse(data); + verify(handler).accept(document, null); + List> expectedResult = Arrays.asList(null, data.get(ips.get(1)), data.get(ips.get(2))); + Exception e = expectThrows(IllegalArgumentException.class, () -> document.getFieldValue(DEFAULT_TARGET_FIELD, Map.class)); + assertTrue(e.getMessage().contains("not present")); + } + + public void testExecuteInternalNonStringIp() throws Exception { + String datasourceName = GeospatialTestHelper.randomLowerCaseString(); + Ip2GeoProcessor processor = createProcessor(datasourceName, Collections.emptyMap()); + List ips = Arrays.asList(randomIpAddress(), 1); + Map source = new HashMap<>(); + String ip = randomIpAddress(); + source.put("ip", ip); + IngestDocument document = new IngestDocument(source, new HashMap<>()); + + BiConsumer handler = mock(BiConsumer.class); + Exception e = expectThrows(IllegalArgumentException.class, () -> processor.executeInternal(document, handler, ips)); + assertTrue(e.getMessage().contains("should only contain strings")); + } + + public void testExecuteInternal() throws Exception { + String datasourceName = GeospatialTestHelper.randomLowerCaseString(); + Ip2GeoProcessor processor = createProcessor(datasourceName, Collections.emptyMap()); + List ips = Arrays.asList(randomIpAddress(), randomIpAddress()); + Map source = new HashMap<>(); + String ip = randomIpAddress(); + source.put("ip", ip); + IngestDocument document = new IngestDocument(source, new HashMap<>()); + + BiConsumer handler = mock(BiConsumer.class); + processor.executeInternal(document, handler, ips); + + ArgumentCaptor> captor = ArgumentCaptor.forClass(ActionListener.class); + verify(datasourceHelper).getDatasource(eq(datasourceName), captor.capture()); + Datasource datasource = mock(Datasource.class); + when(datasource.isExpired()).thenReturn(false); + when(datasource.currentIndexName()).thenReturn(GeospatialTestHelper.randomLowerCaseString()); + captor.getValue().onResponse(datasource); + verify(geoIpDataHelper).getGeoIpData( + anyString(), + any(Iterator.class), + anyInt(), + anyInt(), + anyBoolean(), + anyMap(), + any(ActionListener.class) + ); + } + + private Ip2GeoProcessor createProcessor(final String datasourceName, final Map config) throws Exception { + Datasource datasource = new Datasource(); + datasource.setId(datasourceName); + datasource.setState(DatasourceState.AVAILABLE); + datasource.getDatabase().setFields(SUPPORTED_FIELDS); + return createProcessor(datasource, config); + } + + private Ip2GeoProcessor createProcessor(final Datasource datasource, final Map config) throws Exception { + when(datasourceHelper.getDatasource(datasource.getId())).thenReturn(datasource); + Map baseConfig = new HashMap<>(); + baseConfig.put(CONFIG_FIELD_KEY, "ip"); + baseConfig.put(CONFIG_DATASOURCE_KEY, datasource.getName()); + baseConfig.putAll(config); + + return factory.create( + Collections.emptyMap(), + GeospatialTestHelper.randomLowerCaseString(), + GeospatialTestHelper.randomLowerCaseString(), + baseConfig + ); + } +} diff --git a/src/test/java/org/opensearch/geospatial/plugin/GeospatialPluginTests.java b/src/test/java/org/opensearch/geospatial/plugin/GeospatialPluginTests.java index d7ce5594..194ff732 100644 --- a/src/test/java/org/opensearch/geospatial/plugin/GeospatialPluginTests.java +++ b/src/test/java/org/opensearch/geospatial/plugin/GeospatialPluginTests.java @@ -7,31 +7,52 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import static org.opensearch.geospatial.ip2geo.jobscheduler.Datasource.IP2GEO_DATA_INDEX_NAME_PREFIX; +import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.function.Supplier; +import org.junit.After; +import org.junit.Before; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; import org.opensearch.action.ActionRequest; import org.opensearch.action.ActionResponse; import org.opensearch.client.Client; +import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.io.stream.NamedWriteableRegistry; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; +import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.env.Environment; +import org.opensearch.env.NodeEnvironment; import org.opensearch.geospatial.action.upload.geojson.UploadGeoJSONAction; import org.opensearch.geospatial.ip2geo.action.RestPutDatasourceHandler; +import org.opensearch.geospatial.ip2geo.common.DatasourceHelper; +import org.opensearch.geospatial.ip2geo.common.GeoIpDataHelper; +import org.opensearch.geospatial.ip2geo.common.Ip2GeoExecutorHelper; import org.opensearch.geospatial.ip2geo.common.Ip2GeoSettings; +import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceUpdateService; import org.opensearch.geospatial.processor.FeatureProcessor; import org.opensearch.geospatial.rest.action.upload.geojson.RestUploadGeoJSONAction; import org.opensearch.geospatial.stats.upload.RestUploadStatsAction; +import org.opensearch.geospatial.stats.upload.UploadStats; +import org.opensearch.indices.SystemIndexDescriptor; import org.opensearch.ingest.IngestService; import org.opensearch.ingest.Processor; import org.opensearch.plugins.ActionPlugin; import org.opensearch.plugins.IngestPlugin; +import org.opensearch.repositories.RepositoriesService; import org.opensearch.rest.RestHandler; import org.opensearch.script.ScriptService; import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.watcher.ResourceWatcherService; public class GeospatialPluginTests extends OpenSearchTestCase { private final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, new HashSet(Ip2GeoSettings.settings())); @@ -40,17 +61,94 @@ public class GeospatialPluginTests extends OpenSearchTestCase { new RestUploadStatsAction(), new RestPutDatasourceHandler(Settings.EMPTY, clusterSettings) ); - private final Client client; - private final ClusterService clusterService; - private final IngestService ingestService; - - public GeospatialPluginTests() { - client = mock(Client.class); - when(client.settings()).thenReturn(Settings.EMPTY); - clusterService = mock(ClusterService.class); + + private final Set SUPPORTED_SYSTEM_INDEX_PATTERN = Set.of(IP2GEO_DATA_INDEX_NAME_PREFIX); + + private final Set SUPPORTED_COMPONENTS = Set.of( + UploadStats.class, + DatasourceUpdateService.class, + DatasourceHelper.class, + Ip2GeoExecutorHelper.class, + GeoIpDataHelper.class + ); + + @Mock + private Client client; + @Mock + private ClusterService clusterService; + @Mock + private IngestService ingestService; + @Mock + private ThreadPool threadPool; + @Mock + private ResourceWatcherService resourceWatcherService; + @Mock + private ScriptService scriptService; + @Mock + private NamedXContentRegistry xContentRegistry; + @Mock + private Environment environment; + @Mock + private NamedWriteableRegistry namedWriteableRegistry; + @Mock + private IndexNameExpressionResolver indexNameExpressionResolver; + @Mock + private Supplier repositoriesServiceSupplier; + private NodeEnvironment nodeEnvironment; + private Settings settings; + private AutoCloseable openMocks; + + @Before + public void init() { + openMocks = MockitoAnnotations.openMocks(this); + settings = Settings.EMPTY; + when(client.settings()).thenReturn(settings); when(clusterService.getClusterSettings()).thenReturn(clusterSettings); - ingestService = mock(IngestService.class); + when(clusterService.getSettings()).thenReturn(settings); when(ingestService.getClusterService()).thenReturn(clusterService); + nodeEnvironment = null; + } + + @After + public void close() throws Exception { + openMocks.close(); + } + + public void testSystemIndexDescriptors() { + GeospatialPlugin plugin = new GeospatialPlugin(); + Set registeredSystemIndexPatterns = new HashSet<>(); + for (SystemIndexDescriptor descriptor : plugin.getSystemIndexDescriptors(Settings.EMPTY)) { + registeredSystemIndexPatterns.add(descriptor.getIndexPattern()); + } + assertEquals(SUPPORTED_SYSTEM_INDEX_PATTERN, registeredSystemIndexPatterns); + + } + + public void testExecutorBuilders() { + GeospatialPlugin plugin = new GeospatialPlugin(); + assertEquals(1, plugin.getExecutorBuilders(Settings.EMPTY).size()); + } + + public void testCreateComponents() { + GeospatialPlugin plugin = new GeospatialPlugin(); + Set registeredComponents = new HashSet<>(); + Collection components = plugin.createComponents( + client, + clusterService, + threadPool, + resourceWatcherService, + scriptService, + xContentRegistry, + environment, + nodeEnvironment, + namedWriteableRegistry, + indexNameExpressionResolver, + repositoriesServiceSupplier + ); + for (Object component : components) { + registeredComponents.add(component.getClass()); + } + assertEquals(SUPPORTED_COMPONENTS, registeredComponents); } public void testIsAnIngestPlugin() { diff --git a/src/test/resources/ip2geo/manifest.json b/src/test/resources/ip2geo/manifest.json new file mode 100644 index 00000000..652bc9d8 --- /dev/null +++ b/src/test/resources/ip2geo/manifest.json @@ -0,0 +1,8 @@ +{ + "url": "https://test.com/db.zip", + "db_name": "sample_valid.csv", + "md5_hash": "safasdfaskkkesadfasdf", + "valid_for_in_days": 30, + "updated_at": 3134012341236, + "provider": "sample_provider" +} \ No newline at end of file diff --git a/src/test/resources/ip2geo/manifest_invalid_url.json b/src/test/resources/ip2geo/manifest_invalid_url.json new file mode 100644 index 00000000..77d68aaf --- /dev/null +++ b/src/test/resources/ip2geo/manifest_invalid_url.json @@ -0,0 +1,8 @@ +{ + "url": "invalid://test.com/db.zip", + "db_name": "sample_valid.csv", + "md5_hash": "safasdfaskkkesadfasdf", + "valid_for_in_days": 30, + "updated_at": 3134012341236, + "provider": "sample_provider" +} \ No newline at end of file diff --git a/src/test/resources/ip2geo/manifest_template.json b/src/test/resources/ip2geo/manifest_template.json new file mode 100644 index 00000000..4c273fa4 --- /dev/null +++ b/src/test/resources/ip2geo/manifest_template.json @@ -0,0 +1,8 @@ +{ + "url": "URL", + "db_name": "sample_valid.csv", + "md5_hash": "safasdfaskkkesadfasdf", + "valid_for_in_days": 30, + "updated_at": 3134012341236, + "provider": "maxmind" +} \ No newline at end of file diff --git a/src/test/resources/ip2geo/sample_invalid_less_than_two_fields.csv b/src/test/resources/ip2geo/sample_invalid_less_than_two_fields.csv new file mode 100644 index 00000000..08670061 --- /dev/null +++ b/src/test/resources/ip2geo/sample_invalid_less_than_two_fields.csv @@ -0,0 +1,2 @@ +network +1.0.0.0/24 \ No newline at end of file diff --git a/src/test/resources/ip2geo/sample_valid.csv b/src/test/resources/ip2geo/sample_valid.csv new file mode 100644 index 00000000..a6d08935 --- /dev/null +++ b/src/test/resources/ip2geo/sample_valid.csv @@ -0,0 +1,3 @@ +network,country_name +1.0.0.0/24,Australia +10.0.0.0/24,USA \ No newline at end of file diff --git a/src/test/resources/ip2geo/sample_valid.zip b/src/test/resources/ip2geo/sample_valid.zip new file mode 100644 index 0000000000000000000000000000000000000000..0bdeeadbf1f9d2c9c7d542cde8694556b4e055fa GIT binary patch literal 250 zcmWIWW@Zs#-~d9W>KS1SP@oB<1sD_E%D&A`a=gOPy&XbJVQ4^ZKW*d7jfhtF5`J=e+kB-={`bj2Rqjm4#)G zUwSlY(UCc4To?kp**Vz%&wil+)C00Bz?+dtgc;!uWI2#KU|>ljh()Ta0=!w-K>8Sg LFb+t!fjA5RP5V9S literal 0 HcmV?d00001