Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Removed parameter and settings #332

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
*/
@Log4j2
public class GeoIpDataFacade {
public static final int BUNDLE_SIZE = 128;
private static final String IP_RANGE_FIELD_NAME = "_cidr";
private static final String DATA_FIELD_NAME = "_data";
private static final Map<String, Object> INDEX_SETTING_TO_CREATE = Map.of(
Expand Down Expand Up @@ -279,28 +280,19 @@ public void onFailure(final Exception e) {
*
* @param indexName the index name
* @param ipIterator the iterator of ip addresses
* @param maxBundleSize number of ip address to pass in multi search
* @param maxConcurrentSearches the max concurrent search requests
* @param firstOnly return only the first matching result if true
* @param geoIpData collected geo data
* @param actionListener the action listener
*/
public void getGeoIpData(
final String indexName,
final Iterator<String> ipIterator,
final Integer maxBundleSize,
final Integer maxConcurrentSearches,
final boolean firstOnly,
final Map<String, Map<String, Object>> geoIpData,
final ActionListener<Map<String, Map<String, Object>>> actionListener
) {
MultiSearchRequestBuilder mRequestBuilder = client.prepareMultiSearch();
if (maxConcurrentSearches != 0) {
mRequestBuilder.setMaxConcurrentSearchRequests(maxConcurrentSearches);
}

List<String> ipsToSearch = new ArrayList<>(maxBundleSize);
while (ipIterator.hasNext() && ipsToSearch.size() < maxBundleSize) {
List<String> ipsToSearch = new ArrayList<>(BUNDLE_SIZE);
while (ipIterator.hasNext() && ipsToSearch.size() < BUNDLE_SIZE) {
String ip = ipIterator.next();
if (geoIpData.get(ip) == null) {
mRequestBuilder.add(
Expand Down Expand Up @@ -340,13 +332,8 @@ public void onResponse(final MultiSearchResponse items) {
).v2().get(DATA_FIELD_NAME);

geoIpData.put(ipsToSearch.get(i), data);

if (firstOnly) {
actionListener.onResponse(geoIpData);
return;
}
}
getGeoIpData(indexName, ipIterator, maxBundleSize, maxConcurrentSearches, firstOnly, geoIpData, actionListener);
getGeoIpData(indexName, ipIterator, geoIpData, actionListener);
}

@Override
Expand All @@ -362,20 +349,18 @@ public void onFailure(final Exception e) {
* @param indexName Index name to puts the GeoIP data
* @param fields Field name matching with data in CSVRecord in order
* @param iterator GeoIP data to insert
* @param bulkSize Bulk size of data to process
* @param renewLock Runnable to renew lock
*/
public void putGeoIpData(
@NonNull final String indexName,
@NonNull final String[] fields,
@NonNull final Iterator<CSVRecord> iterator,
final int bulkSize,
@NonNull final Runnable renewLock
) throws IOException {
TimeValue timeout = clusterSettings.get(Ip2GeoSettings.TIMEOUT);
final BulkRequest bulkRequest = new BulkRequest();
Queue<DocWriteRequest> requests = new LinkedList<>();
for (int i = 0; i < bulkSize; i++) {
for (int i = 0; i < BUNDLE_SIZE; i++) {
requests.add(Requests.indexRequest(indexName));
}
while (iterator.hasNext()) {
Expand All @@ -385,7 +370,7 @@ public void putGeoIpData(
indexRequest.source(document);
indexRequest.id(record.get(0));
bulkRequest.add(indexRequest);
if (iterator.hasNext() == false || bulkRequest.requests().size() == bulkSize) {
if (iterator.hasNext() == false || bulkRequest.requests().size() == BUNDLE_SIZE) {
BulkResponse response = StashedThreadContext.run(client, () -> client.bulk(bulkRequest).actionGet(timeout));
if (response.hasFailures()) {
throw new OpenSearchException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,30 +51,6 @@ public class Ip2GeoSettings {
Setting.Property.Dynamic
);

/**
* Bulk size for indexing GeoIP data
*/
public static final Setting<Integer> INDEXING_BULK_SIZE = Setting.intSetting(
"plugins.geospatial.ip2geo.datasource.indexing_bulk_size",
10000,
1,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

/**
* Multi search bundle size for GeoIP data
*
* Multi search is used only when a field contains a list of ip addresses.
*/
public static final Setting<Integer> MAX_BUNDLE_SIZE = Setting.intSetting(
"plugins.geospatial.ip2geo.processor.max_bundle_size",
100,
1,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

/**
* Multi search max concurrent searches
*
Expand All @@ -96,14 +72,7 @@ public class Ip2GeoSettings {
* @return a list of all settings for Ip2Geo feature
*/
public static final List<Setting<?>> settings() {
return List.of(
DATASOURCE_ENDPOINT,
DATASOURCE_UPDATE_INTERVAL,
TIMEOUT,
INDEXING_BULK_SIZE,
MAX_BUNDLE_SIZE,
MAX_CONCURRENT_SEARCHES
);
return List.of(DATASOURCE_ENDPOINT, DATASOURCE_UPDATE_INTERVAL, TIMEOUT, MAX_CONCURRENT_SEARCHES);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.opensearch.geospatial.ip2geo.common.DatasourceManifest;
import org.opensearch.geospatial.ip2geo.common.DatasourceState;
import org.opensearch.geospatial.ip2geo.common.GeoIpDataFacade;
import org.opensearch.geospatial.ip2geo.common.Ip2GeoSettings;
import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule;

@Log4j2
Expand Down Expand Up @@ -83,13 +82,7 @@ public void updateOrCreateGeoIpData(final Datasource datasource, final Runnable
datasource.getDatabase().getFields().toString()
);
}
geoIpDataFacade.putGeoIpData(
indexName,
header,
reader.iterator(),
clusterSettings.get(Ip2GeoSettings.INDEXING_BULK_SIZE),
renewLock
);
geoIpDataFacade.putGeoIpData(indexName, header, reader.iterator(), renewLock);
}

Instant endTime = Instant.now();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.opensearch.geospatial.ip2geo.common.DatasourceFacade;
import org.opensearch.geospatial.ip2geo.common.DatasourceState;
import org.opensearch.geospatial.ip2geo.common.GeoIpDataFacade;
import org.opensearch.geospatial.ip2geo.common.Ip2GeoSettings;
import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource;
import org.opensearch.ingest.AbstractProcessor;
import org.opensearch.ingest.IngestDocument;
Expand All @@ -49,7 +48,6 @@ public final class Ip2GeoProcessor extends AbstractProcessor {
public static final String CONFIG_DATASOURCE = "datasource";
public static final String CONFIG_PROPERTIES = "properties";
public static final String CONFIG_IGNORE_MISSING = "ignore_missing";
public static final String CONFIG_FIRST_ONLY = "first_only";

private final String field;
private final String targetField;
Expand All @@ -60,7 +58,6 @@ public final class Ip2GeoProcessor extends AbstractProcessor {
private final String datasourceName;
private final Set<String> properties;
private final boolean ignoreMissing;
private final boolean firstOnly;
private final ClusterSettings clusterSettings;
private final DatasourceFacade datasourceFacade;
private final GeoIpDataFacade geoIpDataFacade;
Expand All @@ -79,7 +76,6 @@ public final class Ip2GeoProcessor extends AbstractProcessor {
* @param datasourceName the datasourceName
* @param properties the properties
* @param ignoreMissing true if documents with a missing value for the field should be ignored
* @param firstOnly true if only first result should be returned in case of array
* @param clusterSettings the cluster settings
* @param datasourceFacade the datasource facade
* @param geoIpDataFacade the geoip data facade
Expand All @@ -92,7 +88,6 @@ public Ip2GeoProcessor(
final String datasourceName,
final Set<String> properties,
final boolean ignoreMissing,
final boolean firstOnly,
final ClusterSettings clusterSettings,
final DatasourceFacade datasourceFacade,
final GeoIpDataFacade geoIpDataFacade
Expand All @@ -103,7 +98,6 @@ public Ip2GeoProcessor(
this.datasourceName = datasourceName;
this.properties = properties;
this.ignoreMissing = ignoreMissing;
this.firstOnly = firstOnly;
this.clusterSettings = clusterSettings;
this.datasourceFacade = datasourceFacade;
this.geoIpDataFacade = geoIpDataFacade;
Expand Down Expand Up @@ -252,9 +246,6 @@ public void onResponse(final Datasource datasource) {
geoIpDataFacade.getGeoIpData(
indexName,
ipList.iterator(),
clusterSettings.get(Ip2GeoSettings.MAX_BUNDLE_SIZE),
clusterSettings.get(Ip2GeoSettings.MAX_CONCURRENT_SEARCHES),
firstOnly,
data,
listenerToAppendDataToDocument(data, ipList, ingestDocument, handler)
);
Expand All @@ -277,33 +268,21 @@ protected ActionListener<Map<String, Map<String, Object>>> listenerToAppendDataT
return new ActionListener<>() {
@Override
public void onResponse(final Map<String, Map<String, Object>> response) {
if (firstOnly) {
for (String ipAddr : ipList) {
Map<String, Object> geoData = data.get(ipAddr);
// GeoData for ipAddr won't be null
if (geoData.isEmpty() == false) {
ingestDocument.setFieldValue(targetField, filteredGeoData(geoData, ipAddr));
handler.accept(ingestDocument, null);
return;
}
}
} else {
boolean match = false;
List<Map<String, Object>> geoDataList = new ArrayList<>(ipList.size());
for (String ipAddr : ipList) {
Map<String, Object> geoData = data.get(ipAddr);
// GeoData for ipAddr won't be null
geoDataList.add(geoData.isEmpty() ? null : filteredGeoData(geoData, ipAddr));
if (geoData.isEmpty() == false) {
match = true;
}
}
if (match) {
ingestDocument.setFieldValue(targetField, geoDataList);
handler.accept(ingestDocument, null);
return;
boolean match = false;
List<Map<String, Object>> geoDataList = new ArrayList<>(ipList.size());
for (String ipAddr : ipList) {
Map<String, Object> geoData = data.get(ipAddr);
// GeoData for ipAddr won't be null
geoDataList.add(geoData.isEmpty() ? null : filteredGeoData(geoData, ipAddr));
if (geoData.isEmpty() == false) {
match = true;
}
}
if (match) {
ingestDocument.setFieldValue(targetField, geoDataList);
handler.accept(ingestDocument, null);
return;
}
handler.accept(ingestDocument, null);
}

Expand Down Expand Up @@ -363,7 +342,6 @@ public Ip2GeoProcessor create(
String datasourceName = readStringProperty(TYPE, processorTag, config, CONFIG_DATASOURCE);
List<String> propertyNames = readOptionalList(TYPE, processorTag, config, CONFIG_PROPERTIES);
boolean ignoreMissing = readBooleanProperty(TYPE, processorTag, config, CONFIG_IGNORE_MISSING, false);
boolean firstOnly = readBooleanProperty(TYPE, processorTag, config, CONFIG_FIRST_ONLY, true);

// Skip validation for the call by cluster applier service
if (Thread.currentThread().getName().contains(CLUSTER_UPDATE_THREAD_NAME) == false) {
Expand All @@ -378,7 +356,6 @@ public Ip2GeoProcessor create(
datasourceName,
propertyNames == null ? null : new HashSet<>(propertyNames),
ignoreMissing,
firstOnly,
ingestService.getClusterService().getClusterSettings(),
datasourceFacade,
geoIpDataFacade
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,6 @@ protected Ip2GeoProcessor randomIp2GeoProcessor(String datasourceName) {
datasourceName,
properties,
true,
true,
clusterSettings,
datasourceFacade,
geoIpDataFacade
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.opensearch.geospatial.ip2geo.common.GeoIpDataFacade.BUNDLE_SIZE;
import static org.opensearch.geospatial.ip2geo.jobscheduler.Datasource.IP2GEO_DATA_INDEX_NAME_PREFIX;

import java.io.File;
Expand Down Expand Up @@ -193,7 +194,7 @@ public void testPutGeoIpData_whenValidInput_thenSucceed() {
verifyingClient.setExecuteVerifier((actionResponse, actionRequest) -> {
if (actionRequest instanceof BulkRequest) {
BulkRequest request = (BulkRequest) actionRequest;
assertEquals(1, request.numberOfActions());
assertEquals(2, request.numberOfActions());
BulkResponse response = mock(BulkResponse.class);
when(response.hasFailures()).thenReturn(false);
return response;
Expand Down Expand Up @@ -224,7 +225,7 @@ public void testPutGeoIpData_whenValidInput_thenSucceed() {
try (CSVParser csvParser = CSVParser.parse(sampleIp2GeoFile(), StandardCharsets.UTF_8, CSVFormat.RFC4180)) {
Iterator<CSVRecord> iterator = csvParser.iterator();
String[] fields = iterator.next().values();
verifyingGeoIpDataFacade.putGeoIpData(index, fields, iterator, 1, renewLock);
verifyingGeoIpDataFacade.putGeoIpData(index, fields, iterator, renewLock);
verify(renewLock, times(2)).run();
}
}
Expand Down Expand Up @@ -261,53 +262,37 @@ public void testGetSingleGeoIpData() {
assertEquals("seattle", captor.getValue().get("city"));
}

public void testGetMultipleGeoIpDataNoSearchRequired() {
public void testGetGeoIpData_whenAllDataIsGathered_thenNoMoreSearch() {
String indexName = GeospatialTestHelper.randomLowerCaseString();
String ip1 = randomIpAddress();
String ip2 = randomIpAddress();
Iterator<String> ipIterator = Arrays.asList(ip1, ip2).iterator();
int maxBundleSize = 1;
int maxConcurrentSearches = 1;
boolean firstOnly = true;
Map<String, Map<String, Object>> geoData = new HashMap<>();
geoData.put(ip1, Map.of("city", "Seattle"));
geoData.put(ip2, Map.of("city", "Hawaii"));
ActionListener<Map<String, Map<String, Object>>> actionListener = mock(ActionListener.class);

// Run
verifyingGeoIpDataFacade.getGeoIpData(
indexName,
ipIterator,
maxBundleSize,
maxConcurrentSearches,
firstOnly,
geoData,
actionListener
);
verifyingGeoIpDataFacade.getGeoIpData(indexName, ipIterator, geoData, actionListener);

// Verify
verify(actionListener).onResponse(geoData);
}

public void testGetMultipleGeoIpData() {
public void testGetGeoIpData_whenCalled_thenGetGeoIpData() {
String indexName = GeospatialTestHelper.randomLowerCaseString();
int dataSize = Randomness.get().nextInt(10) + 1;
List<String> ips = new ArrayList<>();
for (int i = 0; i < dataSize; i++) {
ips.add(randomIpAddress());
}
int maxBundleSize = Randomness.get().nextInt(11) + 1;
int maxConcurrentSearches = 1;
boolean firstOnly = false;
Map<String, Map<String, Object>> geoData = new HashMap<>();
ActionListener<Map<String, Map<String, Object>>> actionListener = mock(ActionListener.class);

List<String> cities = new ArrayList<>();
verifyingClient.setExecuteVerifier((actionResponse, actionRequest) -> {
assert actionRequest instanceof MultiSearchRequest;
MultiSearchRequest request = (MultiSearchRequest) actionRequest;
assertEquals(maxConcurrentSearches, request.maxConcurrentSearchRequests());
assertTrue(request.requests().size() == maxBundleSize || request.requests().size() == dataSize % maxBundleSize);
for (SearchRequest searchRequest : request.requests()) {
assertEquals("_local", searchRequest.preference());
assertEquals(1, searchRequest.source().size());
Expand Down Expand Up @@ -341,18 +326,10 @@ public void testGetMultipleGeoIpData() {
});

// Run
verifyingGeoIpDataFacade.getGeoIpData(
indexName,
ips.iterator(),
maxBundleSize,
maxConcurrentSearches,
firstOnly,
geoData,
actionListener
);
verifyingGeoIpDataFacade.getGeoIpData(indexName, ips.iterator(), geoData, actionListener);

// Verify
verify(verifyingClient, times((dataSize + maxBundleSize - 1) / maxBundleSize)).execute(
verify(verifyingClient, times((dataSize + BUNDLE_SIZE - 1) / BUNDLE_SIZE)).execute(
any(ActionType.class),
any(ActionRequest.class),
any(ActionListener.class)
Expand Down
Loading