Skip to content

Commit

Permalink
Licensing controls for logsdb routing on sort fields (elastic#120276)
Browse files Browse the repository at this point in the history
* Restrict routing on sort fields to enterprise license

* sync

* bypass checking for serverless

* Node deprecation warning for indexes and component templates with source mode in mapping

* Revert "Node deprecation warning for indexes and component templates with source mode in mapping"

This reverts commit 0fd4ca7.

* address comments
  • Loading branch information
kkrik-es committed Feb 14, 2025
1 parent 8418b85 commit 43234d0
Show file tree
Hide file tree
Showing 10 changed files with 192 additions and 124 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ public void testLogsdbRouteOnSortFields() throws IOException {
var settings = (Map<?, ?>) ((Map<?, ?>) getIndexSettings(index).get(index)).get("settings");
assertEquals("logsdb", settings.get("index.mode"));
assertEquals(SourceFieldMapper.Mode.STORED.toString(), settings.get("index.mapping.source.mode"));
assertEquals("true", settings.get(IndexSettings.LOGSDB_ROUTE_ON_SORT_FIELDS.getKey()));
assertEquals(List.of("host.name", "message"), settings.get(IndexMetadata.INDEX_ROUTING_PATH.getKey()));
assertEquals("false", settings.get(IndexSettings.LOGSDB_ROUTE_ON_SORT_FIELDS.getKey()));
assertNull(settings.get(IndexMetadata.INDEX_ROUTING_PATH.getKey()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.List;
import java.util.Map;

import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.equalTo;

public class LogsdbRestIT extends ESRestTestCase {
Expand Down Expand Up @@ -66,9 +67,15 @@ public void testFeatureUsageWithLogsdbIndex() throws IOException {
List<Map<?, ?>> features = (List<Map<?, ?>>) response.get("features");
logger.info("response's features: {}", features);
assertThat(features, Matchers.not(Matchers.empty()));
Map<?, ?> feature = features.stream().filter(map -> "mappings".equals(map.get("family"))).findFirst().get();
assertThat(feature.get("name"), equalTo("synthetic-source"));
assertThat(feature.get("license_level"), equalTo("enterprise"));
boolean found = false;
for (var feature : features) {
if (feature.get("family") != null) {
assertThat(feature.get("name"), anyOf(equalTo("synthetic-source"), equalTo("logsdb-routing-on-sort-fields")));
assertThat(feature.get("license_level"), equalTo("enterprise"));
found = true;
}
}
assertTrue(found);

var indexResponse = (Map<?, ?>) getIndexSettings("test-index", true).get("test-index");
logger.info("indexResponse: {}", indexResponse);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,12 @@
import java.util.function.Predicate;
import java.util.function.Supplier;

import static org.elasticsearch.xpack.logsdb.SyntheticSourceLicenseService.FALLBACK_SETTING;
import static org.elasticsearch.xpack.logsdb.LogsdbLicenseService.FALLBACK_SETTING;

public class LogsDBPlugin extends Plugin implements ActionPlugin {

private final Settings settings;
private final SyntheticSourceLicenseService licenseService;
private final LogsdbLicenseService licenseService;
private static final Setting<Boolean> LOGSDB_PRIOR_LOGS_USAGE = Setting.boolSetting(
"logsdb.prior_logs_usage",
false,
Expand All @@ -63,7 +63,7 @@ public class LogsDBPlugin extends Plugin implements ActionPlugin {

public LogsDBPlugin(Settings settings) {
this.settings = settings;
this.licenseService = new SyntheticSourceLicenseService(settings);
this.licenseService = new LogsdbLicenseService(settings);
this.logsdbIndexModeSettingsProvider = new LogsdbIndexModeSettingsProvider(licenseService, settings);
}

Expand All @@ -82,7 +82,6 @@ public Collection<?> createComponents(PluginServices services) {
CLUSTER_LOGSDB_ENABLED,
logsdbIndexModeSettingsProvider::updateClusterIndexModeLogsdbEnabled
);

// Nothing to share here:
return super.createComponents(services);
}
Expand All @@ -95,6 +94,7 @@ public Collection<IndexSettingProvider> getAdditionalIndexSettingProviders(Index
IndexVersion.current(),
parameters.clusterService().state().nodes().getMaxDataNodeCompatibleIndexVersion()
),
DiscoveryNode.isStateless(settings) == false,
DiscoveryNode.isStateless(settings) == false
);
return List.of(logsdbIndexModeSettingsProvider);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ protected void masterOperation(
}
}
final boolean enabled = LogsDBPlugin.CLUSTER_LOGSDB_ENABLED.get(clusterService.getSettings());
final boolean hasCustomCutoffDate = System.getProperty(SyntheticSourceLicenseService.CUTOFF_DATE_SYS_PROP_NAME) != null;
final boolean hasCustomCutoffDate = System.getProperty(LogsdbLicenseService.CUTOFF_DATE_SYS_PROP_NAME) != null;
final DiscoveryNode[] nodes = state.nodes().getDataNodes().values().toArray(DiscoveryNode[]::new);
final var statsRequest = new IndexModeStatsActionType.StatsRequest(nodes);
final int finalNumIndices = numIndices;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,16 @@ final class LogsdbIndexModeSettingsProvider implements IndexSettingProvider {
static final String LOGS_PATTERN = "logs-*-*";
private static final Set<String> MAPPING_INCLUDES = Set.of("_doc._source.*", "_doc.properties.host**", "_doc.subobjects");

private final SyntheticSourceLicenseService syntheticSourceLicenseService;
private final LogsdbLicenseService licenseService;
private final SetOnce<CheckedFunction<IndexMetadata, MapperService, IOException>> mapperServiceFactory = new SetOnce<>();
private final SetOnce<Supplier<IndexVersion>> createdIndexVersion = new SetOnce<>();
private final SetOnce<Boolean> supportFallbackToStoredSource = new SetOnce<>();
private final SetOnce<Boolean> supportFallbackLogsdbRouting = new SetOnce<>();

private volatile boolean isLogsdbEnabled;

LogsdbIndexModeSettingsProvider(SyntheticSourceLicenseService syntheticSourceLicenseService, final Settings settings) {
this.syntheticSourceLicenseService = syntheticSourceLicenseService;
LogsdbIndexModeSettingsProvider(LogsdbLicenseService licenseService, final Settings settings) {
this.licenseService = licenseService;
this.isLogsdbEnabled = CLUSTER_LOGSDB_ENABLED.get(settings);
}

Expand All @@ -68,11 +69,13 @@ void updateClusterIndexModeLogsdbEnabled(boolean isLogsdbEnabled) {
void init(
CheckedFunction<IndexMetadata, MapperService, IOException> factory,
Supplier<IndexVersion> indexVersion,
boolean supportFallbackToStoredSource
boolean supportFallbackToStoredSource,
boolean supportFallbackLogsdbRouting
) {
this.mapperServiceFactory.set(factory);
this.createdIndexVersion.set(indexVersion);
this.supportFallbackToStoredSource.set(supportFallbackToStoredSource);
this.supportFallbackLogsdbRouting.set(supportFallbackLogsdbRouting);
}

@Override
Expand All @@ -93,6 +96,7 @@ public Settings getAdditionalIndexSettings(
) {
Settings.Builder settingsBuilder = null;
boolean isLogsDB = templateIndexMode == IndexMode.LOGSDB;
boolean isTemplateValidation = "validate-index-name".equals(indexName);

// Inject logsdb index mode, based on the logs pattern.
if (isLogsdbEnabled
Expand All @@ -110,76 +114,74 @@ && matchesLogsPattern(dataStreamName)) {
if (mappingHints.hasSyntheticSourceUsage && supportFallbackToStoredSource.get()) {
// This index name is used when validating component and index templates, we should skip this check in that case.
// (See MetadataIndexTemplateService#validateIndexTemplateV2(...) method)
boolean isTemplateValidation = "validate-index-name".equals(indexName);
boolean legacyLicensedUsageOfSyntheticSourceAllowed = isLegacyLicensedUsageOfSyntheticSourceAllowed(
templateIndexMode,
indexName,
dataStreamName
);
if (syntheticSourceLicenseService.fallbackToStoredSource(isTemplateValidation, legacyLicensedUsageOfSyntheticSourceAllowed)) {
if (licenseService.fallbackToStoredSource(isTemplateValidation, legacyLicensedUsageOfSyntheticSourceAllowed)) {
LOGGER.debug("creation of index [{}] with synthetic source without it being allowed", indexName);
if (settingsBuilder == null) {
settingsBuilder = Settings.builder();
}
settingsBuilder.put(IndexSettings.INDEX_MAPPER_SOURCE_MODE_SETTING.getKey(), SourceFieldMapper.Mode.STORED.toString());
settingsBuilder = getBuilder(settingsBuilder).put(
IndexSettings.INDEX_MAPPER_SOURCE_MODE_SETTING.getKey(),
SourceFieldMapper.Mode.STORED.toString()
);
}
}

if (isLogsDB) {
// Inject sorting on [host.name], in addition to [@timestamp].
if (mappingHints.sortOnHostName) {
if (settingsBuilder == null) {
settingsBuilder = Settings.builder();
}
if (mappingHints.addHostNameField) {
// Inject keyword field [host.name] too.
settingsBuilder.put(IndexSettings.LOGSDB_ADD_HOST_NAME_FIELD.getKey(), true);
settingsBuilder = getBuilder(settingsBuilder).put(IndexSettings.LOGSDB_ADD_HOST_NAME_FIELD.getKey(), true);
}
settingsBuilder.put(IndexSettings.LOGSDB_SORT_ON_HOST_NAME.getKey(), true);
settingsBuilder = getBuilder(settingsBuilder).put(IndexSettings.LOGSDB_SORT_ON_HOST_NAME.getKey(), true);
}

// Inject routing path matching sort fields.
if (settings.getAsBoolean(IndexSettings.LOGSDB_ROUTE_ON_SORT_FIELDS.getKey(), false)) {
List<String> sortFields = new ArrayList<>(settings.getAsList(IndexSortConfig.INDEX_SORT_FIELD_SETTING.getKey()));
sortFields.removeIf(s -> s.equals(DataStreamTimestampFieldMapper.DEFAULT_PATH));
if (sortFields.size() < 2) {
throw new IllegalStateException(
String.format(
Locale.ROOT,
"data stream [%s] in logsdb mode and with [%s] index setting has only %d sort fields "
+ "(excluding timestamp), needs at least 2",
dataStreamName,
IndexSettings.LOGSDB_ROUTE_ON_SORT_FIELDS.getKey(),
sortFields.size()
)
);
}
if (settings.hasValue(IndexMetadata.INDEX_ROUTING_PATH.getKey())) {
List<String> routingPaths = settings.getAsList(IndexMetadata.INDEX_ROUTING_PATH.getKey());
if (routingPaths.equals(sortFields) == false) {
if (supportFallbackLogsdbRouting.get() == false || licenseService.allowLogsdbRoutingOnSortField(isTemplateValidation)) {
List<String> sortFields = new ArrayList<>(settings.getAsList(IndexSortConfig.INDEX_SORT_FIELD_SETTING.getKey()));
sortFields.removeIf(s -> s.equals(DataStreamTimestampFieldMapper.DEFAULT_PATH));
if (sortFields.size() < 2) {
throw new IllegalStateException(
String.format(
Locale.ROOT,
"data stream [%s] in logsdb mode and with [%s] index setting has mismatching sort "
+ "and routing fields, [index.routing_path:%s], [index.sort.fields:%s]",
"data stream [%s] in logsdb mode and with [%s] index setting has only %d sort fields "
+ "(excluding timestamp), needs at least 2",
dataStreamName,
IndexSettings.LOGSDB_ROUTE_ON_SORT_FIELDS.getKey(),
routingPaths,
sortFields
sortFields.size()
)
);
}
} else {
if (settingsBuilder == null) {
settingsBuilder = Settings.builder();
if (settings.hasValue(IndexMetadata.INDEX_ROUTING_PATH.getKey())) {
List<String> routingPaths = settings.getAsList(IndexMetadata.INDEX_ROUTING_PATH.getKey());
if (routingPaths.equals(sortFields) == false) {
throw new IllegalStateException(
String.format(
Locale.ROOT,
"data stream [%s] in logsdb mode and with [%s] index setting has mismatching sort "
+ "and routing fields, [index.routing_path:%s], [index.sort.fields:%s]",
dataStreamName,
IndexSettings.LOGSDB_ROUTE_ON_SORT_FIELDS.getKey(),
routingPaths,
sortFields
)
);
}
} else {
settingsBuilder = getBuilder(settingsBuilder).putList(INDEX_ROUTING_PATH.getKey(), sortFields);
}
settingsBuilder.putList(INDEX_ROUTING_PATH.getKey(), sortFields);
} else {
// Routing on sort fields is not allowed, reset the corresponding index setting.
LOGGER.debug("creation of index [{}] with logsdb mode and routing on sort fields without it being allowed", indexName);
settingsBuilder = getBuilder(settingsBuilder).put(IndexSettings.LOGSDB_ROUTE_ON_SORT_FIELDS.getKey(), false);
}
}
}

return settingsBuilder == null ? Settings.EMPTY : settingsBuilder.build();

}

record MappingHints(boolean hasSyntheticSourceUsage, boolean sortOnHostName, boolean addHostNameField) {
Expand All @@ -194,6 +196,14 @@ private static IndexMode resolveIndexMode(final String mode) {
return mode != null ? Enum.valueOf(IndexMode.class, mode.toUpperCase(Locale.ROOT)) : null;
}

// Returned value needs to be reassigned to the passed arg, to track the created builder.
private static Settings.Builder getBuilder(Settings.Builder builder) {
if (builder == null) {
return Settings.builder();
}
return builder;
}

MappingHints getMappingHints(
String indexName,
IndexMode templateIndexMode,
Expand Down Expand Up @@ -260,8 +270,8 @@ MappingHints getMappingHints(
|| mapperService.mappingLookup().getMapping().getRoot().subobjects() == ObjectMapper.Subobjects.DISABLED));
boolean sortOnHostName = IndexSettings.LOGSDB_SORT_ON_HOST_NAME.get(indexTemplateAndCreateRequestSettings)
|| addHostNameField
|| ((hostName instanceof NumberFieldMapper nfm && nfm.fieldType().hasDocValues())
|| (hostName instanceof KeywordFieldMapper kfm && kfm.fieldType().hasDocValues()));
|| (hostName instanceof NumberFieldMapper nfm && nfm.fieldType().hasDocValues())
|| (hostName instanceof KeywordFieldMapper kfm && kfm.fieldType().hasDocValues());
return new MappingHints(hasSyntheticSourceUsage, sortOnHostName, addHostNameField);
}
} catch (AssertionError | Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@
/**
* Determines based on license and fallback setting whether synthetic source usages should fallback to stored source.
*/
final class SyntheticSourceLicenseService {
final class LogsdbLicenseService {

static final String MAPPINGS_FEATURE_FAMILY = "mappings";
// You can only override this property if you received explicit approval from Elastic.
static final String CUTOFF_DATE_SYS_PROP_NAME = "es.mapping.synthetic_source_fallback_to_stored_source.cutoff_date_restricted_override";
private static final Logger LOGGER = LogManager.getLogger(SyntheticSourceLicenseService.class);
private static final Logger LOGGER = LogManager.getLogger(LogsdbLicenseService.class);
static final long DEFAULT_CUTOFF_DATE = LocalDateTime.of(2025, 2, 4, 0, 0).toInstant(ZoneOffset.UTC).toEpochMilli();

/**
Expand All @@ -53,16 +53,22 @@ final class SyntheticSourceLicenseService {
License.OperationMode.GOLD
);

static final LicensedFeature.Momentary LOGSDB_ROUTING_ON_SORT_FIELDS_FEATURE = LicensedFeature.momentary(
MAPPINGS_FEATURE_FAMILY,
"logsdb-routing-on-sort-fields",
License.OperationMode.ENTERPRISE
);

private final long cutoffDate;
private LicenseService licenseService;
private XPackLicenseState licenseState;
private volatile boolean syntheticSourceFallback;

SyntheticSourceLicenseService(Settings settings) {
LogsdbLicenseService(Settings settings) {
this(settings, System.getProperty(CUTOFF_DATE_SYS_PROP_NAME));
}

SyntheticSourceLicenseService(Settings settings, String cutoffDate) {
LogsdbLicenseService(Settings settings, String cutoffDate) {
this.syntheticSourceFallback = FALLBACK_SETTING.get(settings);
this.cutoffDate = getCutoffDate(cutoffDate);
}
Expand Down Expand Up @@ -97,6 +103,13 @@ && checkFeature(SYNTHETIC_SOURCE_FEATURE_LEGACY, licenseStateSnapshot, isTemplat
return true;
}

/**
* @return whether indexes in logsdb mode can use routing on sort fields.
*/
public boolean allowLogsdbRoutingOnSortField(boolean isTemplateValidation) {
return checkFeature(LOGSDB_ROUTING_ON_SORT_FIELDS_FEATURE, licenseState.copyCurrentLicenseState(), isTemplateValidation);
}

private static boolean checkFeature(
LicensedFeature.Momentary licensedFeature,
XPackLicenseState licenseStateSnapshot,
Expand Down
Loading

0 comments on commit 43234d0

Please sign in to comment.