From c1fe7095c300b247a2739c561ee5e733994197e4 Mon Sep 17 00:00:00 2001 From: Andrea Selva Date: Thu, 28 Apr 2022 09:16:03 +0200 Subject: [PATCH 1/3] Implements DLQ storage policy (#13923) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Exposes dead_letter_queue.storage_policy configuration setting to explicitly enable the drop_older behavior in DLQs. Moving from a drop_newer to a drop_older behavior has impact both on the writer side and to the reader side. The implementation leverage the fact that a complete DLQ segment can be removed to free up space; on the writer side when the dead_letter_queue.max_bytes limit is reached it has to remove old segments. On the reader side, the consuming has to be adapted to don't expect a continuous flow of segments, it could face an hole due to removal of tail segments. Co-authored-by: João Duarte Co-authored-by: Karen Metts <35154725+karenzone@users.noreply.github.com> --- config/logstash.yml | 11 +- config/pipelines.yml | 8 + docker/data/logstash/env2yaml/env2yaml.go | 1 + docs/static/dead-letter-queues.asciidoc | 5 +- docs/static/settings-file.asciidoc | 4 + logstash-core/lib/logstash/environment.rb | 1 + logstash-core/lib/logstash/settings.rb | 1 + .../spec/logstash/java_pipeline_spec.rb | 1 + .../common/DeadLetterQueueFactory.java | 11 +- .../common/io/DeadLetterQueueReader.java | 124 +++++++++--- .../common/io/DeadLetterQueueUtils.java | 28 +++ .../common/io/DeadLetterQueueWriter.java | 45 ++++- .../logstash/common/io/QueueStorageType.java | 35 ++++ .../execution/AbstractPipelineExt.java | 13 +- .../common/DeadLetterQueueFactoryTest.java | 9 +- .../common/io/DeadLetterQueueReaderTest.java | 188 ++++++++++++++++-- .../common/io/DeadLetterQueueTestUtils.java | 27 +++ .../common/io/DeadLetterQueueWriterTest.java | 104 ++++++++++ 18 files changed, 554 insertions(+), 62 deletions(-) create mode 100644 logstash-core/src/main/java/org/logstash/common/io/DeadLetterQueueUtils.java create mode 100644 logstash-core/src/main/java/org/logstash/common/io/QueueStorageType.java create mode 100644 logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueTestUtils.java diff --git a/config/logstash.yml b/config/logstash.yml index 0e07005b0f6..c01bff0c840 100644 --- a/config/logstash.yml +++ b/config/logstash.yml @@ -256,11 +256,18 @@ # If using dead_letter_queue.enable: true, the interval in milliseconds where if no further events eligible for the DLQ # have been created, a dead letter queue file will be written. A low value here will mean that more, smaller, queue files # may be written, while a larger value will introduce more latency between items being "written" to the dead letter queue, and -# being available to be read by the dead_letter_queue input when items are are written infrequently. +# being available to be read by the dead_letter_queue input when items are written infrequently. # Default is 5000. # # dead_letter_queue.flush_interval: 5000 +# If using dead_letter_queue.enable: true, defines the action to take when the dead_letter_queue.max_bytes is reached, +# could be "drop_newer" or "drop_older". +# With drop_newer, messages that were inserted most recently are dropped, logging an error line. +# With drop_older setting, the oldest messages are dropped as new ones are inserted. +# Default value is "drop_newer". +# dead_letter_queue.storage_policy: drop_newer + # If using dead_letter_queue.enable: true, the directory path where the data files will be stored. # Default is path.data/dead_letter_queue # @@ -279,6 +286,8 @@ # log.level: info # path.logs: # + + # ------------ Other Settings -------------- # # Where to find custom plugins diff --git a/config/pipelines.yml b/config/pipelines.yml index 3dd60017a6c..a71519e30e1 100644 --- a/config/pipelines.yml +++ b/config/pipelines.yml @@ -88,6 +88,14 @@ # # dead_letter_queue.flush_interval: 5000 +# If using dead_letter_queue.enable: true, defines the action to take when the dead_letter_queue.max_bytes is reached, +# could be "drop_newer" or "drop_older". +# With drop_newer, messages that were inserted most recently are dropped, logging an error line. +# With drop_older setting, the oldest messages are dropped as new ones are inserted. +# Default value is "drop_newer". +# +# dead_letter_queue.storage_policy: drop_newer + # # If using dead_letter_queue.enable: true, the directory path where the data files will be stored. # Default is path.data/dead_letter_queue diff --git a/docker/data/logstash/env2yaml/env2yaml.go b/docker/data/logstash/env2yaml/env2yaml.go index 2643bb65555..433686c60dc 100644 --- a/docker/data/logstash/env2yaml/env2yaml.go +++ b/docker/data/logstash/env2yaml/env2yaml.go @@ -83,6 +83,7 @@ func normalizeSetting(setting string) (string, error) { "dead_letter_queue.enable", "dead_letter_queue.max_bytes", "dead_letter_queue.flush_interval", + "dead_letter_queue.storage_policy", "path.dead_letter_queue", "http.enabled", // DEPRECATED: prefer `api.enabled` "http.environment", // DEPRECATED: prefer `api.environment` diff --git a/docs/static/dead-letter-queues.asciidoc b/docs/static/dead-letter-queues.asciidoc index 58a4f1cc54f..7a95cbe9416 100644 --- a/docs/static/dead-letter-queues.asciidoc +++ b/docs/static/dead-letter-queues.asciidoc @@ -118,7 +118,10 @@ file is created automatically. By default, the maximum size of each dead letter queue is set to 1024mb. To change this setting, use the `dead_letter_queue.max_bytes` option. Entries will be dropped if they would increase the size of the dead letter queue beyond -this setting. +this setting. +Use the `dead_letter_queue.storage_policy` option to control which entries should be dropped to avoid exceeding the size limit. +Set the value to `drop_newer` (default) to stop accepting new values that would push the file size over the limit. +Set the value to `drop_older` to remove the oldest events to make space for new ones. [[processing-dlq-events]] ==== Processing events in the dead letter queue diff --git a/docs/static/settings-file.asciidoc b/docs/static/settings-file.asciidoc index 694d548e5b7..ac038e541d5 100644 --- a/docs/static/settings-file.asciidoc +++ b/docs/static/settings-file.asciidoc @@ -227,6 +227,10 @@ Values other than `disabled` are currently considered BETA, and may produce unin would increase the size of the dead letter queue beyond this setting. | `1024mb` +| `dead_letter_queue.storage_policy` +| Defines the action to take when the dead_letter_queue.max_bytes setting is reached: `drop_newer` stops accepting new values that would push the file size over the limit, and `drop_older` removes the oldest events to make space for new ones. +| `drop_newer` + | `path.dead_letter_queue` | The directory path where the data files will be stored for the dead-letter queue. | `path.data/dead_letter_queue` diff --git a/logstash-core/lib/logstash/environment.rb b/logstash-core/lib/logstash/environment.rb index 0147a1cf61f..5ad6a78a9c8 100644 --- a/logstash-core/lib/logstash/environment.rb +++ b/logstash-core/lib/logstash/environment.rb @@ -91,6 +91,7 @@ module Environment Setting::Boolean.new("dead_letter_queue.enable", false), Setting::Bytes.new("dead_letter_queue.max_bytes", "1024mb"), Setting::Numeric.new("dead_letter_queue.flush_interval", 5000), + Setting::String.new("dead_letter_queue.storage_policy", "drop_newer", true, ["drop_newer", "drop_older"]), Setting::TimeValue.new("slowlog.threshold.warn", "-1"), Setting::TimeValue.new("slowlog.threshold.info", "-1"), Setting::TimeValue.new("slowlog.threshold.debug", "-1"), diff --git a/logstash-core/lib/logstash/settings.rb b/logstash-core/lib/logstash/settings.rb index 1df5315ca01..7d4b2b606d1 100644 --- a/logstash-core/lib/logstash/settings.rb +++ b/logstash-core/lib/logstash/settings.rb @@ -49,6 +49,7 @@ def self.included(base) "dead_letter_queue.enable", "dead_letter_queue.flush_interval", "dead_letter_queue.max_bytes", + "dead_letter_queue.storage_policy", "metric.collect", "pipeline.plugin_classloaders", "path.config", diff --git a/logstash-core/spec/logstash/java_pipeline_spec.rb b/logstash-core/spec/logstash/java_pipeline_spec.rb index 5b406ad05a1..daec7162b54 100644 --- a/logstash-core/spec/logstash/java_pipeline_spec.rb +++ b/logstash-core/spec/logstash/java_pipeline_spec.rb @@ -1235,6 +1235,7 @@ def flush(options) collect_stats # A newly created dead letter queue with no entries will have a size of 1 (the version 'header') expect(collected_stats[:queue_size_in_bytes].value).to eq(1) + expect(collected_stats[:storage_policy].value).to eq("drop_newer") end end end diff --git a/logstash-core/src/main/java/org/logstash/common/DeadLetterQueueFactory.java b/logstash-core/src/main/java/org/logstash/common/DeadLetterQueueFactory.java index 88e8175936c..9c16faacdd5 100644 --- a/logstash-core/src/main/java/org/logstash/common/DeadLetterQueueFactory.java +++ b/logstash-core/src/main/java/org/logstash/common/DeadLetterQueueFactory.java @@ -41,6 +41,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.logstash.common.io.DeadLetterQueueWriter; +import org.logstash.common.io.QueueStorageType; import java.io.IOException; import java.nio.file.Paths; @@ -75,19 +76,21 @@ private DeadLetterQueueFactory() { * @param maxQueueSize Maximum size of the dead letter queue (in bytes). No entries will be written * that would make the size of this dlq greater than this value * @param flushInterval Maximum duration between flushes of dead letter queue files if no data is sent. + * @param storageType overwriting type in case of queue full: drop_older or drop_newer. * @return The write manager for the specific id's dead-letter-queue context */ - public static DeadLetterQueueWriter getWriter(String id, String dlqPath, long maxQueueSize, Duration flushInterval) { - return REGISTRY.computeIfAbsent(id, key -> newWriter(key, dlqPath, maxQueueSize, flushInterval)); + public static DeadLetterQueueWriter getWriter(String id, String dlqPath, long maxQueueSize, Duration flushInterval, QueueStorageType storageType) { + return REGISTRY.computeIfAbsent(id, key -> newWriter(key, dlqPath, maxQueueSize, flushInterval, storageType)); } public static DeadLetterQueueWriter release(String id) { return REGISTRY.remove(id); } - private static DeadLetterQueueWriter newWriter(final String id, final String dlqPath, final long maxQueueSize, final Duration flushInterval) { + private static DeadLetterQueueWriter newWriter(final String id, final String dlqPath, final long maxQueueSize, + final Duration flushInterval, final QueueStorageType storageType) { try { - return new DeadLetterQueueWriter(Paths.get(dlqPath, id), MAX_SEGMENT_SIZE_BYTES, maxQueueSize, flushInterval); + return new DeadLetterQueueWriter(Paths.get(dlqPath, id), MAX_SEGMENT_SIZE_BYTES, maxQueueSize, flushInterval, storageType); } catch (IOException e) { logger.error("unable to create dead letter queue writer", e); } diff --git a/logstash-core/src/main/java/org/logstash/common/io/DeadLetterQueueReader.java b/logstash-core/src/main/java/org/logstash/common/io/DeadLetterQueueReader.java index 75110249a36..50e0c2f2e53 100644 --- a/logstash-core/src/main/java/org/logstash/common/io/DeadLetterQueueReader.java +++ b/logstash-core/src/main/java/org/logstash/common/io/DeadLetterQueueReader.java @@ -52,9 +52,10 @@ import java.nio.file.WatchEvent; import java.nio.file.WatchKey; import java.nio.file.WatchService; +import java.util.Comparator; +import java.util.NoSuchElementException; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.TimeUnit; -import java.util.function.Function; import java.util.stream.Collectors; import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE; @@ -73,46 +74,68 @@ public DeadLetterQueueReader(Path queuePath) throws IOException { this.queuePath = queuePath; this.watchService = FileSystems.getDefault().newWatchService(); this.queuePath.register(watchService, ENTRY_CREATE, ENTRY_DELETE); - this.segments = new ConcurrentSkipListSet<>((p1, p2) -> { - Function id = (p) -> Integer.parseInt(p.getFileName().toString().split("\\.")[0]); - return id.apply(p1).compareTo(id.apply(p2)); - }); - + this.segments = new ConcurrentSkipListSet<>( + Comparator.comparingInt(DeadLetterQueueUtils::extractSegmentId) + ); segments.addAll(getSegmentPaths(queuePath).collect(Collectors.toList())); } public void seekToNextEvent(Timestamp timestamp) throws IOException { for (Path segment : segments) { + if (!Files.exists(segment)) { + segments.remove(segment); + continue; + } currentReader = new RecordIOReader(segment); - byte[] event = currentReader.seekToNextEventPosition(timestamp, (b) -> { - try { - return DLQEntry.deserialize(b).getEntryTime(); - } catch (IOException e) { - throw new IllegalStateException(e); - } - }, Timestamp::compareTo); + byte[] event = currentReader.seekToNextEventPosition(timestamp, DeadLetterQueueReader::extractEntryTimestamp, Timestamp::compareTo); if (event != null) { return; } } - currentReader.close(); - currentReader = null; + if (currentReader != null) { + currentReader.close(); + currentReader = null; + } + } + + private static Timestamp extractEntryTimestamp(byte[] serialized) { + try { + return DLQEntry.deserialize(serialized).getEntryTime(); + } catch (IOException e) { + throw new IllegalStateException(e); + } } private long pollNewSegments(long timeout) throws IOException, InterruptedException { long startTime = System.currentTimeMillis(); WatchKey key = watchService.poll(timeout, TimeUnit.MILLISECONDS); if (key != null) { - for (WatchEvent watchEvent : key.pollEvents()) { - if (watchEvent.kind() == StandardWatchEventKinds.ENTRY_CREATE) { - segments.addAll(getSegmentPaths(queuePath).collect(Collectors.toList())); - } - key.reset(); - } + pollSegmentsOnWatch(key); } return System.currentTimeMillis() - startTime; } + private void pollNewSegments() throws IOException { + WatchKey key = watchService.poll(); + if (key != null) { + pollSegmentsOnWatch(key); + } + } + + private void pollSegmentsOnWatch(WatchKey key) throws IOException { + for (WatchEvent watchEvent : key.pollEvents()) { + if (watchEvent.kind() == StandardWatchEventKinds.ENTRY_CREATE) { + segments.addAll(getSegmentPaths(queuePath).collect(Collectors.toList())); + } else if (watchEvent.kind() == StandardWatchEventKinds.ENTRY_DELETE) { + final int oldSize = segments.size(); + segments.clear(); + segments.addAll(getSegmentPaths(queuePath).collect(Collectors.toList())); + logger.debug("Notified of segment removal, switched from {} to {} segments", oldSize, segments.size()); + } + key.reset(); + } + } + public DLQEntry pollEntry(long timeout) throws IOException, InterruptedException { byte[] bytes = pollEntryBytes(timeout); if (bytes == null) { @@ -134,34 +157,79 @@ byte[] pollEntryBytes(long timeout) throws IOException, InterruptedException { logger.debug("No entries found: no segment files found in dead-letter-queue directory"); return null; } - currentReader = new RecordIOReader(segments.first()); + try { + final Path firstSegment = segments.first(); + currentReader = new RecordIOReader(firstSegment); + } catch (NoSuchElementException ex) { + // all elements were removed after the empty check + logger.debug("No entries found: no segment files found in dead-letter-queue directory"); + return null; + } } byte[] event = currentReader.readEvent(); if (event == null && currentReader.isEndOfStream()) { - if (currentReader.getPath().equals(segments.last())) { + if (consumedAllSegments()) { pollNewSegments(timeoutRemaining); } else { currentReader.close(); - currentReader = new RecordIOReader(segments.higher(currentReader.getPath())); - return pollEntryBytes(timeoutRemaining); + final Path nextSegment = nextExistingSegmentFile(currentReader.getPath()); + if (nextSegment == null) { + // segments were all already deleted files, do a poll + pollNewSegments(timeoutRemaining); + } else { + currentReader = new RecordIOReader(nextSegment); + return pollEntryBytes(timeoutRemaining); + } } } return event; } + private boolean consumedAllSegments() { + try { + return currentReader.getPath().equals(segments.last()); + } catch (NoSuchElementException ex) { + // last segment was removed while processing + logger.debug("No last segment found, poll for new segments"); + return true; + } + } + + private Path nextExistingSegmentFile(Path currentSegmentPath) { + Path nextExpectedSegment; + boolean skip; + do { + nextExpectedSegment = segments.higher(currentSegmentPath); + if (nextExpectedSegment != null && !Files.exists(nextExpectedSegment)) { + segments.remove(nextExpectedSegment); + skip = true; + } else { + skip = false; + } + } while (skip); + return nextExpectedSegment; + } + public void setCurrentReaderAndPosition(Path segmentPath, long position) throws IOException { // If the provided segment Path exist, then set the reader to start from the supplied position if (Files.exists(segmentPath)) { currentReader = new RecordIOReader(segmentPath); currentReader.seekToOffset(position); - }else{ + } else { // Otherwise, set the current reader to be at the beginning of the next // segment. - Path next = segments.higher(segmentPath); - if (next != null){ + Path next = nextExistingSegmentFile(segmentPath); + if (next != null) { currentReader = new RecordIOReader(next); + } else { + pollNewSegments(); + // give a second try after a re-load of segments from filesystem + next = nextExistingSegmentFile(segmentPath); + if (next != null) { + currentReader = new RecordIOReader(next); + } } } } diff --git a/logstash-core/src/main/java/org/logstash/common/io/DeadLetterQueueUtils.java b/logstash-core/src/main/java/org/logstash/common/io/DeadLetterQueueUtils.java new file mode 100644 index 00000000000..16146feebed --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/common/io/DeadLetterQueueUtils.java @@ -0,0 +1,28 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.logstash.common.io; + +import java.nio.file.Path; + +class DeadLetterQueueUtils { + + static int extractSegmentId(Path p) { + return Integer.parseInt(p.getFileName().toString().split("\\.log")[0]); + } +} diff --git a/logstash-core/src/main/java/org/logstash/common/io/DeadLetterQueueWriter.java b/logstash-core/src/main/java/org/logstash/common/io/DeadLetterQueueWriter.java index b1d2e2cea62..90082ef4948 100644 --- a/logstash-core/src/main/java/org/logstash/common/io/DeadLetterQueueWriter.java +++ b/logstash-core/src/main/java/org/logstash/common/io/DeadLetterQueueWriter.java @@ -46,6 +46,9 @@ import java.nio.file.StandardCopyOption; import java.time.Duration; import java.time.Instant; +import java.util.Comparator; +import java.util.Locale; +import java.util.Optional; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -80,6 +83,7 @@ private enum FinalizeWhen {ALWAYS, ONLY_IF_STALE}; FieldReference.from(String.format("%s[dead_letter_queue]", Event.METADATA_BRACKETS)); private final long maxSegmentSize; private final long maxQueueSize; + private final QueueStorageType storageType; private LongAdder currentQueueSize; private final Path queuePath; private final FileLock fileLock; @@ -91,11 +95,18 @@ private enum FinalizeWhen {ALWAYS, ONLY_IF_STALE}; private final AtomicBoolean open = new AtomicBoolean(true); private ScheduledExecutorService flushScheduler; - public DeadLetterQueueWriter(final Path queuePath, final long maxSegmentSize, final long maxQueueSize, final Duration flushInterval) throws IOException { + public DeadLetterQueueWriter(final Path queuePath, final long maxSegmentSize, final long maxQueueSize, + final Duration flushInterval) throws IOException { + this(queuePath, maxSegmentSize, maxQueueSize, flushInterval, QueueStorageType.DROP_NEWER); + } + + public DeadLetterQueueWriter(final Path queuePath, final long maxSegmentSize, final long maxQueueSize, + final Duration flushInterval, final QueueStorageType storageType) throws IOException { this.fileLock = FileLockFactory.obtainLock(queuePath, LOCK_FILE); this.queuePath = queuePath; this.maxSegmentSize = maxSegmentSize; this.maxQueueSize = maxQueueSize; + this.storageType = storageType; this.flushInterval = flushInterval; this.currentQueueSize = new LongAdder(); this.currentQueueSize.add(getStartupQueueSize()); @@ -122,6 +133,10 @@ public long getCurrentQueueSize() { return currentQueueSize.longValue(); } + public String getStoragePolicy() { + return storageType.name().toLowerCase(Locale.ROOT); + } + public void writeEntry(Event event, String pluginName, String pluginId, String reason) throws IOException { writeEntry(new DLQEntry(event, pluginName, pluginId, reason)); } @@ -177,15 +192,37 @@ private void innerWriteEntry(DLQEntry entry) throws IOException { byte[] record = entry.serialize(); int eventPayloadSize = RECORD_HEADER_SIZE + record.length; if (currentQueueSize.longValue() + eventPayloadSize > maxQueueSize) { - logger.error("cannot write event to DLQ(path: " + this.queuePath + "): reached maxQueueSize of " + maxQueueSize); - return; - } else if (currentWriter.getPosition() + eventPayloadSize > maxSegmentSize) { + if (storageType == QueueStorageType.DROP_NEWER) { + logger.error("cannot write event to DLQ(path: " + this.queuePath + "): reached maxQueueSize of " + maxQueueSize); + return; + } else { + do { + dropTailSegment(); + } while (currentQueueSize.longValue() + eventPayloadSize > maxQueueSize); + } + } + if (currentWriter.getPosition() + eventPayloadSize > maxSegmentSize) { finalizeSegment(FinalizeWhen.ALWAYS); } currentQueueSize.add(currentWriter.writeEvent(record)); lastWrite = Instant.now(); } + // package-private for testing + void dropTailSegment() throws IOException { + // remove oldest segment + final Optional oldestSegment = getSegmentPaths(queuePath) + .min(Comparator.comparingInt(DeadLetterQueueUtils::extractSegmentId)); + if (!oldestSegment.isPresent()) { + throw new IllegalStateException("Listing of DLQ segments resulted in empty set during storage policy size(" + maxQueueSize + ") check"); + } + final Path beheadedSegment = oldestSegment.get(); + final long segmentSize = Files.size(beheadedSegment); + currentQueueSize.add(-segmentSize); + Files.delete(beheadedSegment); + logger.debug("Deleted exceeded retained size segment file {}", beheadedSegment); + } + /** * Method to determine whether the event has already been processed by the DLQ - currently this * just checks the metadata to see if metadata has been added to the event that indicates that diff --git a/logstash-core/src/main/java/org/logstash/common/io/QueueStorageType.java b/logstash-core/src/main/java/org/logstash/common/io/QueueStorageType.java new file mode 100644 index 00000000000..08dec1d5dcf --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/common/io/QueueStorageType.java @@ -0,0 +1,35 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.logstash.common.io; + +public enum QueueStorageType { + DROP_NEWER, DROP_OLDER; + + public static QueueStorageType parse(String value) { + switch (value) { + case "drop_newer": + return DROP_NEWER; + case "drop_older": + return DROP_OLDER; + default: + throw new IllegalArgumentException("Invalid queue type: " + value + ", admitted values are drop_newer or drop_newer"); + } + } +} diff --git a/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java b/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java index 139648b72ff..be91d3dd174 100644 --- a/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java +++ b/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java @@ -53,6 +53,7 @@ import org.logstash.common.DeadLetterQueueFactory; import org.logstash.common.EnvironmentVariableProvider; import org.logstash.common.SourceWithMetadata; +import org.logstash.common.io.QueueStorageType; import org.logstash.config.ir.ConfigCompiler; import org.logstash.config.ir.InvalidIRException; import org.logstash.config.ir.PipelineConfig; @@ -108,6 +109,9 @@ public class AbstractPipelineExt extends RubyBasicObject { private static final RubySymbol DLQ_KEY = RubyUtil.RUBY.newSymbol("dlq"); + private static final RubySymbol STORAGE_POLICY = + RubyUtil.RUBY.newSymbol("storage_policy"); + private static final @SuppressWarnings("rawtypes") RubyArray EVENTS_METRIC_NAMESPACE = RubyArray.newArray( RubyUtil.RUBY, new IRubyObject[]{MetricKeys.STATS_KEY, MetricKeys.EVENTS_KEY} ); @@ -278,13 +282,16 @@ public final IRubyObject lir(final ThreadContext context) { public final IRubyObject dlqWriter(final ThreadContext context) { if (dlqWriter == null) { if (dlqEnabled(context).isTrue()) { + final QueueStorageType storageType = QueueStorageType.parse(getSetting(context, "dead_letter_queue.storage_policy").asJavaString()); + dlqWriter = JavaUtil.convertJavaToUsableRubyObject( context.runtime, DeadLetterQueueFactory.getWriter( pipelineId.asJavaString(), getSetting(context, "path.dead_letter_queue").asJavaString(), getSetting(context, "dead_letter_queue.max_bytes").convertToInteger().getLongValue(), - Duration.ofMillis(getSetting(context, "dead_letter_queue.flush_interval").convertToInteger().getLongValue())) + Duration.ofMillis(getSetting(context, "dead_letter_queue.flush_interval").convertToInteger().getLongValue()), + storageType) ); } else { dlqWriter = RubyUtil.DUMMY_DLQ_WRITER_CLASS.callMethod(context, "new"); @@ -319,6 +326,10 @@ public final IRubyObject collectDlqStats(final ThreadContext context) { context, QUEUE_SIZE_IN_BYTES, dlqWriter(context).callMethod(context, "get_current_queue_size") ); + getDlqMetric(context).gauge( + context, STORAGE_POLICY, + dlqWriter(context).callMethod(context, "get_storage_policy") + ); } return context.nil; } diff --git a/logstash-core/src/test/java/org/logstash/common/DeadLetterQueueFactoryTest.java b/logstash-core/src/test/java/org/logstash/common/DeadLetterQueueFactoryTest.java index 12df8049b41..cf85824d7a9 100644 --- a/logstash-core/src/test/java/org/logstash/common/DeadLetterQueueFactoryTest.java +++ b/logstash-core/src/test/java/org/logstash/common/DeadLetterQueueFactoryTest.java @@ -44,6 +44,7 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; import org.logstash.common.io.DeadLetterQueueWriter; +import org.logstash.common.io.QueueStorageType; import java.io.IOException; import java.nio.file.Path; @@ -68,9 +69,9 @@ public void setUp() throws Exception { public void testSameBeforeRelease() throws IOException { try { Path pipelineA = dir.resolve(PIPELINE_NAME); - DeadLetterQueueWriter writer = DeadLetterQueueFactory.getWriter(PIPELINE_NAME, pipelineA.toString(), 10000, Duration.ofSeconds(1)); + DeadLetterQueueWriter writer = DeadLetterQueueFactory.getWriter(PIPELINE_NAME, pipelineA.toString(), 10000, Duration.ofSeconds(1), QueueStorageType.DROP_NEWER); assertTrue(writer.isOpen()); - DeadLetterQueueWriter writer2 = DeadLetterQueueFactory.getWriter(PIPELINE_NAME, pipelineA.toString(), 10000, Duration.ofSeconds(1)); + DeadLetterQueueWriter writer2 = DeadLetterQueueFactory.getWriter(PIPELINE_NAME, pipelineA.toString(), 10000, Duration.ofSeconds(1), QueueStorageType.DROP_NEWER); assertSame(writer, writer2); writer.close(); } finally { @@ -82,11 +83,11 @@ public void testSameBeforeRelease() throws IOException { public void testOpenableAfterRelease() throws IOException { try { Path pipelineA = dir.resolve(PIPELINE_NAME); - DeadLetterQueueWriter writer = DeadLetterQueueFactory.getWriter(PIPELINE_NAME, pipelineA.toString(), 10000, Duration.ofSeconds(1)); + DeadLetterQueueWriter writer = DeadLetterQueueFactory.getWriter(PIPELINE_NAME, pipelineA.toString(), 10000, Duration.ofSeconds(1), QueueStorageType.DROP_NEWER); assertTrue(writer.isOpen()); writer.close(); DeadLetterQueueFactory.release(PIPELINE_NAME); - writer = DeadLetterQueueFactory.getWriter(PIPELINE_NAME, pipelineA.toString(), 10000, Duration.ofSeconds(1)); + writer = DeadLetterQueueFactory.getWriter(PIPELINE_NAME, pipelineA.toString(), 10000, Duration.ofSeconds(1), QueueStorageType.DROP_NEWER); assertTrue(writer.isOpen()); writer.close(); }finally{ diff --git a/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueReaderTest.java b/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueReaderTest.java index f1a0d1af747..000981388c2 100644 --- a/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueReaderTest.java +++ b/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueReaderTest.java @@ -31,15 +31,20 @@ import org.logstash.ackedqueue.StringElement; import java.io.IOException; +import java.nio.file.Files; import java.nio.file.Path; import java.time.Duration; import java.util.*; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.logstash.common.io.DeadLetterQueueTestUtils.MB; import static org.logstash.common.io.RecordIOWriter.BLOCK_SIZE; import static org.logstash.common.io.RecordIOWriter.VERSION_SIZE; @@ -186,29 +191,21 @@ private void writeSegmentSizeEntries(int count) throws IOException { long startTime = System.currentTimeMillis(); DLQEntry templateEntry = new DLQEntry(event, "1", "1", String.valueOf(0), constantSerializationLengthTimestamp(startTime)); int size = templateEntry.serialize().length + RecordIOWriter.RECORD_HEADER_SIZE + VERSION_SIZE; - DeadLetterQueueWriter writeManager = null; - try { - writeManager = new DeadLetterQueueWriter(dir, size, defaultDlqSize, Duration.ofSeconds(1)); + try (DeadLetterQueueWriter writeManager = new DeadLetterQueueWriter(dir, size, defaultDlqSize, Duration.ofSeconds(1))) { for (int i = 1; i <= count; i++) { writeManager.writeEntry(new DLQEntry(event, "1", "1", String.valueOf(i), constantSerializationLengthTimestamp(startTime++))); } - } finally { - writeManager.close(); } } private void validateEntries(Path firstLog, int startEntry, int endEntry, int startPosition) throws IOException, InterruptedException { - DeadLetterQueueReader readManager = null; - try { - readManager = new DeadLetterQueueReader(dir); + try (DeadLetterQueueReader readManager = new DeadLetterQueueReader(dir)) { readManager.setCurrentReaderAndPosition(firstLog, startPosition); for (int i = startEntry; i <= endEntry; i++) { DLQEntry readEntry = readManager.pollEntry(100); assertThat(readEntry.getReason(), equalTo(String.valueOf(i))); } - } finally { - readManager.close(); } } @@ -219,7 +216,6 @@ private void validateEntries(Path firstLog, int startEntry, int endEntry, int st // This test tests for a single event that ends on a block boundary @Test public void testBlockBoundary() throws Exception { - final int PAD_FOR_BLOCK_SIZE_EVENT = 32490; Event event = createEventWithConstantSerializationOverhead(); char[] field = new char[PAD_FOR_BLOCK_SIZE_EVENT]; @@ -227,7 +223,7 @@ public void testBlockBoundary() throws Exception { event.setField("T", new String(field)); Timestamp timestamp = constantSerializationLengthTimestamp(); - try(DeadLetterQueueWriter writeManager = new DeadLetterQueueWriter(dir, 10 * 1024 * 1024, defaultDlqSize, Duration.ofSeconds(1))) { + try (DeadLetterQueueWriter writeManager = new DeadLetterQueueWriter(dir, 10 * 1024 * 1024, defaultDlqSize, Duration.ofSeconds(1))) { for (int i = 0; i < 2; i++) { DLQEntry entry = new DLQEntry(event, "", "", "", timestamp); assertThat(entry.serialize().length + RecordIOWriter.RECORD_HEADER_SIZE, is(BLOCK_SIZE)); @@ -250,7 +246,7 @@ public void testBlockBoundaryMultiple() throws Exception { event.setField("message", new String(field)); long startTime = System.currentTimeMillis(); int messageSize = 0; - try(DeadLetterQueueWriter writeManager = new DeadLetterQueueWriter(dir, 10 * 1024 * 1024, defaultDlqSize, Duration.ofSeconds(1))) { + try (DeadLetterQueueWriter writeManager = new DeadLetterQueueWriter(dir, 10 * 1024 * 1024, defaultDlqSize, Duration.ofSeconds(1))) { for (int i = 1; i <= 5; i++) { DLQEntry entry = new DLQEntry(event, "", "", "", constantSerializationLengthTimestamp(startTime++)); messageSize += entry.serialize().length; @@ -273,7 +269,7 @@ public void testFlushAfterWriterClose() throws Exception { event.setField("T", generateMessageContent(PAD_FOR_BLOCK_SIZE_EVENT/8)); Timestamp timestamp = new Timestamp(); - try(DeadLetterQueueWriter writeManager = new DeadLetterQueueWriter(dir, BLOCK_SIZE, defaultDlqSize, Duration.ofSeconds(1))) { + try (DeadLetterQueueWriter writeManager = new DeadLetterQueueWriter(dir, BLOCK_SIZE, defaultDlqSize, Duration.ofSeconds(1))) { for (int i = 0; i < 6; i++) { DLQEntry entry = new DLQEntry(event, "", "", Integer.toString(i), timestamp); writeManager.writeEntry(entry); @@ -468,6 +464,47 @@ public void testWriteStopBigWriteSeekByTimestamp() throws Exception { String.valueOf(FIRST_WRITE_EVENT_COUNT)); } + @Test + public void testSeekByTimestampMoveAfterDeletedSegment() throws IOException, InterruptedException { + final long startTime = 1646296760000L; + final int eventsPerSegment = prepareFilledSegmentFiles(2, startTime); + + try (DeadLetterQueueReader reader = new DeadLetterQueueReader(dir)) { + // remove the first segment + Files.delete(dir.resolve(segmentFileName(1))); + + //Exercise, seek in the middle of first segment + final Timestamp seekTarget = new Timestamp(startTime + (eventsPerSegment / 2)); + reader.seekToNextEvent(seekTarget); + + // Verify, hit the first event of the second segment + DLQEntry readEntry = reader.pollEntry(100); + assertEquals("Must load first event of next available segment", readEntry.getReason(), String.format("%05d", eventsPerSegment)); + final Timestamp firstEventSecondSegmentTimestamp = new Timestamp(startTime + eventsPerSegment); + assertEquals(firstEventSecondSegmentTimestamp, readEntry.getEntryTime()); + } + } + + @Test + public void testSeekByTimestampWhenAllSegmentsAreDeleted() throws IOException, InterruptedException { + final long startTime = System.currentTimeMillis(); + final int eventsPerSegment = prepareFilledSegmentFiles(2, startTime); + + try (DeadLetterQueueReader reader = new DeadLetterQueueReader(dir)) { + // remove the first segment + Files.delete(dir.resolve(segmentFileName(1))); + Files.delete(dir.resolve(segmentFileName(2))); + + //Exercise, seek in the middle of first segment + final Timestamp seekTarget = new Timestamp(startTime + (eventsPerSegment / 2)); + reader.seekToNextEvent(seekTarget); + + // Verify, hit the first event of the second segment + DLQEntry readEntry = reader.pollEntry(100); + assertNull("No entry is available after all segments are deleted", readEntry); + } + } + /** * Tests concurrently reading and writing from the DLQ. * @throws Exception On Failure @@ -519,6 +556,94 @@ public void testConcurrentWriteReadRandomEventSize() throws Exception { } } + @Test + public void testReaderFindSegmentHoleAfterSimulatingRetentionPolicyClean() throws IOException, InterruptedException { + final int eventsPerSegment = prepareFilledSegmentFiles(3); + assertEquals(319, eventsPerSegment); + + int remainingEventsInSegment = eventsPerSegment; + + try (DeadLetterQueueReader reader = new DeadLetterQueueReader(dir)) { + // read the first event to initialize reader structures + final DLQEntry dlqEntry = reader.pollEntry(1_000); + assertEquals("00000", dlqEntry.getReason()); + remainingEventsInSegment--; + + // simulate a storage policy clean, drop the middle segment file + final List allSegments = Files.list(dir) + .sorted(Comparator.comparingInt(DeadLetterQueueUtils::extractSegmentId)) + .collect(Collectors.toList()); + assertThat(allSegments.size(), greaterThanOrEqualTo(2)); + Files.delete(allSegments.remove(0)); // tail segment + Files.delete(allSegments.remove(0)); // the segment after + + // consume the first segment + for (int i = 0; i < remainingEventsInSegment; i++) { + reader.pollEntry(1_000); + } + + // Exercise + // consume the first event after the hole + final DLQEntry entryAfterHole = reader.pollEntry(1_000); + + assertEquals(String.format("%05d", eventsPerSegment * 2), entryAfterHole.getReason()); + } + } + + @Test + public void testReaderWhenAllRemaningSegmentsAreRemoved() throws IOException, InterruptedException { + int remainingEventsInSegment = prepareFilledSegmentFiles(3); + + try (DeadLetterQueueReader reader = new DeadLetterQueueReader(dir)) { + // read the first event to initialize reader structures + final DLQEntry dlqEntry = reader.pollEntry(1_000); + assertEquals("00000", dlqEntry.getReason()); + remainingEventsInSegment--; + + // simulate a retention policy clean, that drops the remaining segments + Files.list(dir) + .sorted() + .skip(1) + .forEach(DeadLetterQueueReaderTest::deleteSegment); + + // consume the first segment + for (int i = 0; i < remainingEventsInSegment; i++) { + reader.pollEntry(1_000); + } + + // Exercise + // consume the first event after the hole + final DLQEntry entryAfterHole = reader.pollEntry(1_000); + assertNull(entryAfterHole); + } + } + + private static void deleteSegment(Path file) { + try { + Files.delete(file); + } catch (IOException e) { + e.printStackTrace(); + } + } + + @Test + public void testSeekToMiddleWhileTheLogIsRemoved() throws IOException, InterruptedException { + writeSegmentSizeEntries(3); + + try (DeadLetterQueueReader readManager = new DeadLetterQueueReader(dir)) { + + // removes 2 segments simulating a retention policy action + Files.delete(dir.resolve("1.log")); + Files.delete(dir.resolve("2.log")); + + readManager.setCurrentReaderAndPosition(dir.resolve("1.log"), 1); + + DLQEntry readEntry = readManager.pollEntry(100); + assertThat(readEntry.getReason(), equalTo(String.valueOf(3))); + } + } + + /** * Produces a {@link Timestamp} whose epoch milliseconds is _near_ the provided value * such that the result will have a constant serialization length of 24 bytes. @@ -529,7 +654,7 @@ public void testConcurrentWriteReadRandomEventSize() throws Exception { * @param millis * @return */ - private Timestamp constantSerializationLengthTimestamp(long millis) { + static Timestamp constantSerializationLengthTimestamp(long millis) { if ( millis % 1000 == 0) { millis += 1; } final Timestamp timestamp = new Timestamp(millis); @@ -542,7 +667,7 @@ private Timestamp constantSerializationLengthTimestamp() { return constantSerializationLengthTimestamp(System.currentTimeMillis()); } - private Timestamp constantSerializationLengthTimestamp(final Timestamp basis) { + private static Timestamp constantSerializationLengthTimestamp(final Timestamp basis) { return constantSerializationLengthTimestamp(basis.toEpochMilli()); } @@ -559,7 +684,7 @@ private Timestamp constantSerializationLengthTimestamp(final Timestamp basis) { * @param data * @return */ - private Event createEventWithConstantSerializationOverhead(final Map data) { + static Event createEventWithConstantSerializationOverhead(final Map data) { final Event event = new Event(data); final Timestamp existingTimestamp = event.getTimestamp(); @@ -570,7 +695,7 @@ private Event createEventWithConstantSerializationOverhead(final Map p.toFile().length()).sum(); } } + + static String generateASCIIMessageContent(int size, byte fillChar) { + byte[] fillArray = new byte[size]; + Arrays.fill(fillArray, fillChar); + return new String(fillArray); + } + + @Test + public void testRemoveOldestSegmentWhenRetainedSizeIsExceededAndDropOlderModeIsEnabled() throws IOException { + Event event = DeadLetterQueueReaderTest.createEventWithConstantSerializationOverhead(Collections.emptyMap()); + event.setField("message", DeadLetterQueueReaderTest.generateMessageContent(32479)); + long startTime = System.currentTimeMillis(); + + int messageSize = 0; + try (DeadLetterQueueWriter writeManager = new DeadLetterQueueWriter(dir, 10 * MB, 20 * MB, Duration.ofSeconds(1))) { + + // 320 generates 10 Mb of data + for (int i = 0; i < (320 * 2) - 1; i++) { + DLQEntry entry = new DLQEntry(event, "", "", String.format("%05d", i), DeadLetterQueueReaderTest.constantSerializationLengthTimestamp(startTime)); + final int serializationLength = entry.serialize().length; + assertEquals("Serialized entry fills block payload", BLOCK_SIZE - RECORD_HEADER_SIZE, serializationLength); + messageSize += serializationLength; + writeManager.writeEntry(entry); + } + assertThat(messageSize, Matchers.is(greaterThan(BLOCK_SIZE))); + } + + // but every segment file has 1 byte header, 639 messages of 32Kb generates 3 files + // 0.log with 319 + // 1.log with 319 + // 2.log with 1 + List segmentFileNames = Files.list(dir) + .map(Path::getFileName) + .map(Path::toString) + .sorted() + .collect(Collectors.toList()); + assertEquals(3, segmentFileNames.size()); + final String fileToBeRemoved = segmentFileNames.get(0); + + // Exercise + // with another 32Kb message write we go to write the third file and trigger the 20Mb limit of retained store + final long prevQueueSize; + final long beheadedQueueSize; + try (DeadLetterQueueWriter writeManager = new DeadLetterQueueWriter(dir, 10 * MB, 20 * MB, + Duration.ofSeconds(1), QueueStorageType.DROP_OLDER)) { + prevQueueSize = writeManager.getCurrentQueueSize(); + final int expectedQueueSize = 2 * // number of full segment files + FULL_SEGMENT_FILE_SIZE + // size of a segment file + VERSION_SIZE + BLOCK_SIZE + // the third segment file with just one message + VERSION_SIZE; // the header of the head tmp file created in opening + assertEquals("Queue size is composed of 2 full segment files plus one with an event plus another with just the header byte", + expectedQueueSize, prevQueueSize); + DLQEntry entry = new DLQEntry(event, "", "", String.format("%05d", (320 * 2) - 1), DeadLetterQueueReaderTest.constantSerializationLengthTimestamp(startTime)); + writeManager.writeEntry(entry); + beheadedQueueSize = writeManager.getCurrentQueueSize(); + } + + // 1.log with 319 + // 2.log with 1 + // 3.log with 1, created because the close flushes and beheaded the tail file. + Set afterBeheadSegmentFileNames = Files.list(dir) + .map(Path::getFileName) + .map(Path::toString) + .collect(Collectors.toSet()); + assertEquals(3, afterBeheadSegmentFileNames.size()); + assertThat(afterBeheadSegmentFileNames, Matchers.not(Matchers.contains(fileToBeRemoved))); + final long expectedQueueSize = prevQueueSize + + BLOCK_SIZE - // the space of the newly inserted message + FULL_SEGMENT_FILE_SIZE; //the size of the removed segment file + assertEquals("Total queue size must be decremented by the size of the first segment file", + expectedQueueSize, beheadedQueueSize); + } + + @Test + public void testRemoveSegmentsOrder() throws IOException { + try (DeadLetterQueueWriter sut = new DeadLetterQueueWriter(dir, 10 * MB, 20 * MB, Duration.ofSeconds(1))) { + Files.delete(dir.resolve("1.log.tmp")); + + // create some segments files + Files.createFile(dir.resolve("9.log")); + Files.createFile(dir.resolve("10.log")); + + // Exercise + sut.dropTailSegment(); + + // Verify + final Set segments = Files.list(dir) + .map(Path::getFileName) + .map(Path::toString) + .filter(s -> !".lock".equals(s)) // skip .lock file created by writer + .collect(Collectors.toSet()); + assertEquals(Collections.singleton("10.log"), segments); + } + } } From 2c5cc00e0bbc020e161f276368a6f04833bcf099 Mon Sep 17 00:00:00 2001 From: Karen Metts <35154725+karenzone@users.noreply.github.com> Date: Thu, 28 Apr 2022 18:33:37 -0400 Subject: [PATCH 2/3] Doc: Group central mgmt and configuring central mgmt topics (#14050) --- docs/index.asciidoc | 4 ++-- .../management/configuring-centralized-pipelines.asciidoc | 5 +---- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/docs/index.asciidoc b/docs/index.asciidoc index ead78d2ae22..76df496cdbe 100644 --- a/docs/index.asciidoc +++ b/docs/index.asciidoc @@ -97,8 +97,6 @@ include::static/glob-support.asciidoc[] include::static/ingest-convert.asciidoc[] -include::static/management/configuring-centralized-pipelines.asciidoc[] - include::static/field-reference.asciidoc[] //The `field-reference.asciidoc` file (included above) contains a @@ -114,6 +112,8 @@ include::static/ls-ls-config.asciidoc[] // Centralized configuration managements include::static/config-management.asciidoc[] +include::static/management/configuring-centralized-pipelines.asciidoc[] + // Working with Logstash Modules include::static/modules.asciidoc[] diff --git a/docs/static/management/configuring-centralized-pipelines.asciidoc b/docs/static/management/configuring-centralized-pipelines.asciidoc index fe3f3570376..d8b6f20a47f 100644 --- a/docs/static/management/configuring-centralized-pipelines.asciidoc +++ b/docs/static/management/configuring-centralized-pipelines.asciidoc @@ -1,8 +1,5 @@ [[configuring-centralized-pipelines]] -=== Configuring Centralized Pipeline Management -++++ -Centralized Pipeline Management -++++ +=== Configure Centralized Pipeline Management To configure {logstash-ref}/logstash-centralized-pipeline-management.html[centralized pipeline management]: From 1c851bb15c6d8651be591f3c9389116536d22770 Mon Sep 17 00:00:00 2001 From: kaisecheng <69120390+kaisecheng@users.noreply.github.com> Date: Fri, 29 Apr 2022 15:25:28 +0100 Subject: [PATCH 3/3] Fix geoip database download does not respect http_proxy setting (#14048) This commit adds `http_proxy` to geoip database download option to respect proxy setting Fixed: #14047 --- x-pack/lib/filters/geoip/download_manager.rb | 4 +++- x-pack/spec/filters/geoip/download_manager_spec.rb | 11 ++++++++++- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/x-pack/lib/filters/geoip/download_manager.rb b/x-pack/lib/filters/geoip/download_manager.rb index c3d6fee604a..eb7586155d9 100644 --- a/x-pack/lib/filters/geoip/download_manager.rb +++ b/x-pack/lib/filters/geoip/download_manager.rb @@ -93,7 +93,9 @@ def download_database(database_type, dirname, db_info) actual_url = download_url(db_info['url']) logger.debug? && logger.debug("download #{actual_url}") - Down.download(actual_url, destination: zip_path) + options = { destination: zip_path } + options.merge!({proxy: ENV['http_proxy']}) if ENV.include?('http_proxy') + Down.download(actual_url, options) raise "the new download has wrong checksum" if md5(zip_path) != db_info['md5_hash'] logger.debug("new database downloaded in ", :path => zip_path) diff --git a/x-pack/spec/filters/geoip/download_manager_spec.rb b/x-pack/spec/filters/geoip/download_manager_spec.rb index af0412b80b3..a245eb847d8 100644 --- a/x-pack/spec/filters/geoip/download_manager_spec.rb +++ b/x-pack/spec/filters/geoip/download_manager_spec.rb @@ -41,15 +41,24 @@ end context "when ENV['http_proxy'] is set" do + let(:mock_resp) { JSON.parse(::File.read(::File.expand_path("./fixtures/normal_resp.json", ::File.dirname(__FILE__)))) } + let(:db_info) { mock_resp[1] } let(:proxy_url) { 'http://user:pass@example.com:1234' } around(:each) { |example| with_environment('http_proxy' => proxy_url, &example) } - it "initializes the client with the proxy" do + it "initializes the rest client with the proxy" do expect(::Manticore::Client).to receive(:new).with(a_hash_including(:proxy => proxy_url)).and_call_original download_manager.send(:rest_client) end + + it "download database with the proxy" do + expect(download_manager).to receive(:md5).and_return(db_info['md5_hash']) + expect(::Down).to receive(:download).with(db_info['url'], a_hash_including(:proxy => proxy_url)).and_return(true) + + download_manager.send(:download_database, database_type, second_dirname, db_info) + end end end