Skip to content

Commit

Permalink
Add watermark generator
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <daichen@amazon.com>
  • Loading branch information
dai-chen committed Oct 27, 2022
1 parent 91baab1 commit 2ac0fb5
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.planner.streaming.watermark;

import lombok.RequiredArgsConstructor;

/**
* Watermark generator that generates watermark with bounded out-of-order latency.
*/
@RequiredArgsConstructor
public class BoundedOutOfOrderWatermarkGenerator implements WatermarkGenerator {

/** The maximum out-of-order allowed. */
private final long maxOutOfOrderAllowed;

/** The maximum timestamp seen so far. */
private long maxTimestamp;

@Override
public long generate(long timestamp) {
maxTimestamp = Math.max(maxTimestamp, timestamp);
return (maxTimestamp - maxOutOfOrderAllowed - 1);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.planner.streaming.watermark;

/**
* A watermark generator generates watermark timestamp based on some strategy.
*/
public interface WatermarkGenerator {

/**
* Generate watermark timestamp on the given event timestamp.
*
* @param timestamp event timestamp.
* @return watermark timestamp
*/
long generate(long timestamp);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.planner.streaming.watermark;

import static org.junit.jupiter.api.Assertions.assertEquals;

import org.junit.jupiter.api.Test;

class BoundedOutOfOrderWatermarkGeneratorTest {

@Test
void shouldAdvanceWatermarkIfLaterEvent() {
BoundedOutOfOrderWatermarkGenerator generator = new BoundedOutOfOrderWatermarkGenerator(100);
assertEquals(899, generator.generate(1000));
assertEquals(1899, generator.generate(2000));
}

@Test
void shouldNotChangeWatermarkByLateEvent() {
BoundedOutOfOrderWatermarkGenerator generator = new BoundedOutOfOrderWatermarkGenerator(100);
assertEquals(899, generator.generate(1000));
assertEquals(899, generator.generate(500));
assertEquals(899, generator.generate(700));
}
}

0 comments on commit 2ac0fb5

Please sign in to comment.