Skip to content

Commit

Permalink
Merge branch 'main' into PR-14046
Browse files Browse the repository at this point in the history
* main:
  Fix geoip database download does not respect http_proxy setting (elastic#14048)
  Doc: Group central mgmt and configuring central mgmt topics (elastic#14050)
  Implements DLQ storage policy (elastic#13923)
  • Loading branch information
kares committed May 2, 2022
2 parents 65408fc + 1c851bb commit 20c1384
Show file tree
Hide file tree
Showing 22 changed files with 570 additions and 70 deletions.
11 changes: 10 additions & 1 deletion config/logstash.yml
Original file line number Diff line number Diff line change
Expand Up @@ -256,11 +256,18 @@
# If using dead_letter_queue.enable: true, the interval in milliseconds where if no further events eligible for the DLQ
# have been created, a dead letter queue file will be written. A low value here will mean that more, smaller, queue files
# may be written, while a larger value will introduce more latency between items being "written" to the dead letter queue, and
# being available to be read by the dead_letter_queue input when items are are written infrequently.
# being available to be read by the dead_letter_queue input when items are written infrequently.
# Default is 5000.
#
# dead_letter_queue.flush_interval: 5000

# If using dead_letter_queue.enable: true, defines the action to take when the dead_letter_queue.max_bytes is reached,
# could be "drop_newer" or "drop_older".
# With drop_newer, messages that were inserted most recently are dropped, logging an error line.
# With drop_older setting, the oldest messages are dropped as new ones are inserted.
# Default value is "drop_newer".
# dead_letter_queue.storage_policy: drop_newer

# If using dead_letter_queue.enable: true, the directory path where the data files will be stored.
# Default is path.data/dead_letter_queue
#
Expand All @@ -279,6 +286,8 @@
# log.level: info
# path.logs:
#


# ------------ Other Settings --------------
#
# Run Logstash with superuser
Expand Down
8 changes: 8 additions & 0 deletions config/pipelines.yml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,14 @@
#
# dead_letter_queue.flush_interval: 5000

# If using dead_letter_queue.enable: true, defines the action to take when the dead_letter_queue.max_bytes is reached,
# could be "drop_newer" or "drop_older".
# With drop_newer, messages that were inserted most recently are dropped, logging an error line.
# With drop_older setting, the oldest messages are dropped as new ones are inserted.
# Default value is "drop_newer".
#
# dead_letter_queue.storage_policy: drop_newer

#
# If using dead_letter_queue.enable: true, the directory path where the data files will be stored.
# Default is path.data/dead_letter_queue
Expand Down
1 change: 1 addition & 0 deletions docker/data/logstash/env2yaml/env2yaml.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func normalizeSetting(setting string) (string, error) {
"dead_letter_queue.enable",
"dead_letter_queue.max_bytes",
"dead_letter_queue.flush_interval",
"dead_letter_queue.storage_policy",
"path.dead_letter_queue",
"http.enabled", // DEPRECATED: prefer `api.enabled`
"http.environment", // DEPRECATED: prefer `api.environment`
Expand Down
4 changes: 2 additions & 2 deletions docs/index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,6 @@ include::static/glob-support.asciidoc[]

include::static/ingest-convert.asciidoc[]

include::static/management/configuring-centralized-pipelines.asciidoc[]

include::static/field-reference.asciidoc[]

//The `field-reference.asciidoc` file (included above) contains a
Expand All @@ -114,6 +112,8 @@ include::static/ls-ls-config.asciidoc[]
// Centralized configuration managements
include::static/config-management.asciidoc[]

include::static/management/configuring-centralized-pipelines.asciidoc[]

// Working with Logstash Modules
include::static/modules.asciidoc[]

Expand Down
5 changes: 4 additions & 1 deletion docs/static/dead-letter-queues.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,10 @@ file is created automatically.
By default, the maximum size of each dead letter queue is set to 1024mb. To
change this setting, use the `dead_letter_queue.max_bytes` option. Entries
will be dropped if they would increase the size of the dead letter queue beyond
this setting.
this setting.
Use the `dead_letter_queue.storage_policy` option to control which entries should be dropped to avoid exceeding the size limit.
Set the value to `drop_newer` (default) to stop accepting new values that would push the file size over the limit.
Set the value to `drop_older` to remove the oldest events to make space for new ones.

[[processing-dlq-events]]
==== Processing events in the dead letter queue
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
[[configuring-centralized-pipelines]]
=== Configuring Centralized Pipeline Management
++++
<titleabbrev>Centralized Pipeline Management</titleabbrev>
++++
=== Configure Centralized Pipeline Management

To configure
{logstash-ref}/logstash-centralized-pipeline-management.html[centralized pipeline management]:
Expand Down
4 changes: 4 additions & 0 deletions docs/static/settings-file.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,10 @@ Values other than `disabled` are currently considered BETA, and may produce unin
would increase the size of the dead letter queue beyond this setting.
| `1024mb`

| `dead_letter_queue.storage_policy`
| Defines the action to take when the dead_letter_queue.max_bytes setting is reached: `drop_newer` stops accepting new values that would push the file size over the limit, and `drop_older` removes the oldest events to make space for new ones.
| `drop_newer`

| `path.dead_letter_queue`
| The directory path where the data files will be stored for the dead-letter queue.
| `path.data/dead_letter_queue`
Expand Down
1 change: 1 addition & 0 deletions logstash-core/lib/logstash/environment.rb
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ module Environment
Setting::Boolean.new("dead_letter_queue.enable", false),
Setting::Bytes.new("dead_letter_queue.max_bytes", "1024mb"),
Setting::Numeric.new("dead_letter_queue.flush_interval", 5000),
Setting::String.new("dead_letter_queue.storage_policy", "drop_newer", true, ["drop_newer", "drop_older"]),
Setting::TimeValue.new("slowlog.threshold.warn", "-1"),
Setting::TimeValue.new("slowlog.threshold.info", "-1"),
Setting::TimeValue.new("slowlog.threshold.debug", "-1"),
Expand Down
1 change: 1 addition & 0 deletions logstash-core/lib/logstash/settings.rb
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ def self.included(base)
"dead_letter_queue.enable",
"dead_letter_queue.flush_interval",
"dead_letter_queue.max_bytes",
"dead_letter_queue.storage_policy",
"metric.collect",
"pipeline.plugin_classloaders",
"path.config",
Expand Down
1 change: 1 addition & 0 deletions logstash-core/spec/logstash/java_pipeline_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1235,6 +1235,7 @@ def flush(options)
collect_stats
# A newly created dead letter queue with no entries will have a size of 1 (the version 'header')
expect(collected_stats[:queue_size_in_bytes].value).to eq(1)
expect(collected_stats[:storage_policy].value).to eq("drop_newer")
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.logstash.common.io.DeadLetterQueueWriter;
import org.logstash.common.io.QueueStorageType;

import java.io.IOException;
import java.nio.file.Paths;
Expand Down Expand Up @@ -75,19 +76,21 @@ private DeadLetterQueueFactory() {
* @param maxQueueSize Maximum size of the dead letter queue (in bytes). No entries will be written
* that would make the size of this dlq greater than this value
* @param flushInterval Maximum duration between flushes of dead letter queue files if no data is sent.
* @param storageType overwriting type in case of queue full: drop_older or drop_newer.
* @return The write manager for the specific id's dead-letter-queue context
*/
public static DeadLetterQueueWriter getWriter(String id, String dlqPath, long maxQueueSize, Duration flushInterval) {
return REGISTRY.computeIfAbsent(id, key -> newWriter(key, dlqPath, maxQueueSize, flushInterval));
public static DeadLetterQueueWriter getWriter(String id, String dlqPath, long maxQueueSize, Duration flushInterval, QueueStorageType storageType) {
return REGISTRY.computeIfAbsent(id, key -> newWriter(key, dlqPath, maxQueueSize, flushInterval, storageType));
}

public static DeadLetterQueueWriter release(String id) {
return REGISTRY.remove(id);
}

private static DeadLetterQueueWriter newWriter(final String id, final String dlqPath, final long maxQueueSize, final Duration flushInterval) {
private static DeadLetterQueueWriter newWriter(final String id, final String dlqPath, final long maxQueueSize,
final Duration flushInterval, final QueueStorageType storageType) {
try {
return new DeadLetterQueueWriter(Paths.get(dlqPath, id), MAX_SEGMENT_SIZE_BYTES, maxQueueSize, flushInterval);
return new DeadLetterQueueWriter(Paths.get(dlqPath, id), MAX_SEGMENT_SIZE_BYTES, maxQueueSize, flushInterval, storageType);
} catch (IOException e) {
logger.error("unable to create dead letter queue writer", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,10 @@
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.util.Comparator;
import java.util.NoSuchElementException;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;

import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE;
Expand All @@ -73,46 +74,68 @@ public DeadLetterQueueReader(Path queuePath) throws IOException {
this.queuePath = queuePath;
this.watchService = FileSystems.getDefault().newWatchService();
this.queuePath.register(watchService, ENTRY_CREATE, ENTRY_DELETE);
this.segments = new ConcurrentSkipListSet<>((p1, p2) -> {
Function<Path, Integer> id = (p) -> Integer.parseInt(p.getFileName().toString().split("\\.")[0]);
return id.apply(p1).compareTo(id.apply(p2));
});

this.segments = new ConcurrentSkipListSet<>(
Comparator.comparingInt(DeadLetterQueueUtils::extractSegmentId)
);
segments.addAll(getSegmentPaths(queuePath).collect(Collectors.toList()));
}

public void seekToNextEvent(Timestamp timestamp) throws IOException {
for (Path segment : segments) {
if (!Files.exists(segment)) {
segments.remove(segment);
continue;
}
currentReader = new RecordIOReader(segment);
byte[] event = currentReader.seekToNextEventPosition(timestamp, (b) -> {
try {
return DLQEntry.deserialize(b).getEntryTime();
} catch (IOException e) {
throw new IllegalStateException(e);
}
}, Timestamp::compareTo);
byte[] event = currentReader.seekToNextEventPosition(timestamp, DeadLetterQueueReader::extractEntryTimestamp, Timestamp::compareTo);
if (event != null) {
return;
}
}
currentReader.close();
currentReader = null;
if (currentReader != null) {
currentReader.close();
currentReader = null;
}
}

private static Timestamp extractEntryTimestamp(byte[] serialized) {
try {
return DLQEntry.deserialize(serialized).getEntryTime();
} catch (IOException e) {
throw new IllegalStateException(e);
}
}

private long pollNewSegments(long timeout) throws IOException, InterruptedException {
long startTime = System.currentTimeMillis();
WatchKey key = watchService.poll(timeout, TimeUnit.MILLISECONDS);
if (key != null) {
for (WatchEvent<?> watchEvent : key.pollEvents()) {
if (watchEvent.kind() == StandardWatchEventKinds.ENTRY_CREATE) {
segments.addAll(getSegmentPaths(queuePath).collect(Collectors.toList()));
}
key.reset();
}
pollSegmentsOnWatch(key);
}
return System.currentTimeMillis() - startTime;
}

private void pollNewSegments() throws IOException {
WatchKey key = watchService.poll();
if (key != null) {
pollSegmentsOnWatch(key);
}
}

private void pollSegmentsOnWatch(WatchKey key) throws IOException {
for (WatchEvent<?> watchEvent : key.pollEvents()) {
if (watchEvent.kind() == StandardWatchEventKinds.ENTRY_CREATE) {
segments.addAll(getSegmentPaths(queuePath).collect(Collectors.toList()));
} else if (watchEvent.kind() == StandardWatchEventKinds.ENTRY_DELETE) {
final int oldSize = segments.size();
segments.clear();
segments.addAll(getSegmentPaths(queuePath).collect(Collectors.toList()));
logger.debug("Notified of segment removal, switched from {} to {} segments", oldSize, segments.size());
}
key.reset();
}
}

public DLQEntry pollEntry(long timeout) throws IOException, InterruptedException {
byte[] bytes = pollEntryBytes(timeout);
if (bytes == null) {
Expand All @@ -134,34 +157,79 @@ byte[] pollEntryBytes(long timeout) throws IOException, InterruptedException {
logger.debug("No entries found: no segment files found in dead-letter-queue directory");
return null;
}
currentReader = new RecordIOReader(segments.first());
try {
final Path firstSegment = segments.first();
currentReader = new RecordIOReader(firstSegment);
} catch (NoSuchElementException ex) {
// all elements were removed after the empty check
logger.debug("No entries found: no segment files found in dead-letter-queue directory");
return null;
}
}

byte[] event = currentReader.readEvent();
if (event == null && currentReader.isEndOfStream()) {
if (currentReader.getPath().equals(segments.last())) {
if (consumedAllSegments()) {
pollNewSegments(timeoutRemaining);
} else {
currentReader.close();
currentReader = new RecordIOReader(segments.higher(currentReader.getPath()));
return pollEntryBytes(timeoutRemaining);
final Path nextSegment = nextExistingSegmentFile(currentReader.getPath());
if (nextSegment == null) {
// segments were all already deleted files, do a poll
pollNewSegments(timeoutRemaining);
} else {
currentReader = new RecordIOReader(nextSegment);
return pollEntryBytes(timeoutRemaining);
}
}
}

return event;
}

private boolean consumedAllSegments() {
try {
return currentReader.getPath().equals(segments.last());
} catch (NoSuchElementException ex) {
// last segment was removed while processing
logger.debug("No last segment found, poll for new segments");
return true;
}
}

private Path nextExistingSegmentFile(Path currentSegmentPath) {
Path nextExpectedSegment;
boolean skip;
do {
nextExpectedSegment = segments.higher(currentSegmentPath);
if (nextExpectedSegment != null && !Files.exists(nextExpectedSegment)) {
segments.remove(nextExpectedSegment);
skip = true;
} else {
skip = false;
}
} while (skip);
return nextExpectedSegment;
}

public void setCurrentReaderAndPosition(Path segmentPath, long position) throws IOException {
// If the provided segment Path exist, then set the reader to start from the supplied position
if (Files.exists(segmentPath)) {
currentReader = new RecordIOReader(segmentPath);
currentReader.seekToOffset(position);
}else{
} else {
// Otherwise, set the current reader to be at the beginning of the next
// segment.
Path next = segments.higher(segmentPath);
if (next != null){
Path next = nextExistingSegmentFile(segmentPath);
if (next != null) {
currentReader = new RecordIOReader(next);
} else {
pollNewSegments();
// give a second try after a re-load of segments from filesystem
next = nextExistingSegmentFile(segmentPath);
if (next != null) {
currentReader = new RecordIOReader(next);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.logstash.common.io;

import java.nio.file.Path;

class DeadLetterQueueUtils {

static int extractSegmentId(Path p) {
return Integer.parseInt(p.getFileName().toString().split("\\.log")[0]);
}
}
Loading

0 comments on commit 20c1384

Please sign in to comment.