From 7b6a1e266f1c987a09ef0ef2b282e5881afa0953 Mon Sep 17 00:00:00 2001 From: Peng Huo Date: Tue, 1 Nov 2022 09:17:43 -0700 Subject: [PATCH 1/7] Add Streaming Source Impl Signed-off-by: Peng Huo --- .../sql/executor/streaming/Batch.java | 18 ++ .../sql/executor/streaming/Offset.java | 17 ++ .../executor/streaming/StreamingSource.java | 29 ++++ .../opensearch/sql/storage/split/Split.java | 21 +++ filesystem/build.gradle | 69 ++++++++ .../storage/split/FileSystemSplit.java | 24 +++ .../filesystem/streaming/FileMetaData.java | 21 +++ .../streaming/FileSystemStreamSource.java | 103 ++++++++++++ .../streaming/FileSystemStreamSourceTest.java | 155 ++++++++++++++++++ settings.gradle | 1 + 10 files changed, 458 insertions(+) create mode 100644 core/src/main/java/org/opensearch/sql/executor/streaming/Batch.java create mode 100644 core/src/main/java/org/opensearch/sql/executor/streaming/Offset.java create mode 100644 core/src/main/java/org/opensearch/sql/executor/streaming/StreamingSource.java create mode 100644 core/src/main/java/org/opensearch/sql/storage/split/Split.java create mode 100644 filesystem/build.gradle create mode 100644 filesystem/src/main/java/org/opensearch/sql/filesystem/storage/split/FileSystemSplit.java create mode 100644 filesystem/src/main/java/org/opensearch/sql/filesystem/streaming/FileMetaData.java create mode 100644 filesystem/src/main/java/org/opensearch/sql/filesystem/streaming/FileSystemStreamSource.java create mode 100644 filesystem/src/test/java/org/opensearch/sql/filesystem/streaming/FileSystemStreamSourceTest.java diff --git a/core/src/main/java/org/opensearch/sql/executor/streaming/Batch.java b/core/src/main/java/org/opensearch/sql/executor/streaming/Batch.java new file mode 100644 index 0000000000..7c27ab4622 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/executor/streaming/Batch.java @@ -0,0 +1,18 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.executor.streaming; + +import java.util.List; +import lombok.Data; +import org.opensearch.sql.storage.split.Split; + +/** + * A batch of streaming execution. + */ +@Data +public class Batch { + private final List splits; +} diff --git a/core/src/main/java/org/opensearch/sql/executor/streaming/Offset.java b/core/src/main/java/org/opensearch/sql/executor/streaming/Offset.java new file mode 100644 index 0000000000..00f040e437 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/executor/streaming/Offset.java @@ -0,0 +1,17 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.executor.streaming; + +import lombok.Data; + +/** + * Offset. + */ +@Data +public class Offset { + + private final Long offset; +} diff --git a/core/src/main/java/org/opensearch/sql/executor/streaming/StreamingSource.java b/core/src/main/java/org/opensearch/sql/executor/streaming/StreamingSource.java new file mode 100644 index 0000000000..ebd3fa714b --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/executor/streaming/StreamingSource.java @@ -0,0 +1,29 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.executor.streaming; + +import java.util.Optional; + +/** + * Streaming source. + */ +public interface StreamingSource { + /** + * Get current {@link Offset} of stream data. + * + * @return empty if the stream does not has new data. + */ + Optional getLatestOffset(); + + /** + * Get a {@link Batch} from source between (start, end]. + * + * @param start start offset. + * @param end end offset. + * @return @link Batch}. + */ + Batch getBatch(Optional start, Offset end); +} diff --git a/core/src/main/java/org/opensearch/sql/storage/split/Split.java b/core/src/main/java/org/opensearch/sql/storage/split/Split.java new file mode 100644 index 0000000000..e9e0c6fcc1 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/storage/split/Split.java @@ -0,0 +1,21 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.storage.split; + +import org.opensearch.sql.storage.StorageEngine; + +/** + * Split is a sections of a data set. Each {@link StorageEngine} should have specific + * implementation of Split. + */ +public interface Split { + + /** + * Get the split id. + * @return split id. + */ + String getSplitId(); +} diff --git a/filesystem/build.gradle b/filesystem/build.gradle new file mode 100644 index 0000000000..64659d85d3 --- /dev/null +++ b/filesystem/build.gradle @@ -0,0 +1,69 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +plugins { + id 'java-library' + id "io.freefair.lombok" + id 'jacoco' +} + +ext { + hadoop = "3.3.4" + aws = "1.12.330" +} + + +dependencies { + implementation project(':core') + + testImplementation "org.junit.jupiter:junit-jupiter:${junit_jupiter}" + testImplementation group: 'org.hamcrest', name: 'hamcrest-library', version: '2.1' + testImplementation group: 'org.mockito', name: 'mockito-core', version: '3.12.4' + testImplementation group: 'org.mockito', name: 'mockito-junit-jupiter', version: '3.12.4' +} + +test { + useJUnitPlatform() + testLogging { + events "passed", "skipped", "failed" + exceptionFormat "full" + } +} + +jacocoTestReport { + reports { + html.enabled true + xml.enabled true + } + afterEvaluate { + classDirectories.setFrom(files(classDirectories.files.collect { + fileTree(dir: it) + })) + } +} +test.finalizedBy(project.tasks.jacocoTestReport) + +jacocoTestCoverageVerification { + violationRules { + rule { + element = 'CLASS' + limit { + counter = 'LINE' + minimum = 1.0 + } + limit { + counter = 'BRANCH' + minimum = 1.0 + } + } + } + afterEvaluate { + classDirectories.setFrom(files(classDirectories.files.collect { + fileTree(dir: it) + })) + } +} +check.dependsOn jacocoTestCoverageVerification +jacocoTestCoverageVerification.dependsOn jacocoTestReport diff --git a/filesystem/src/main/java/org/opensearch/sql/filesystem/storage/split/FileSystemSplit.java b/filesystem/src/main/java/org/opensearch/sql/filesystem/storage/split/FileSystemSplit.java new file mode 100644 index 0000000000..695af94fe4 --- /dev/null +++ b/filesystem/src/main/java/org/opensearch/sql/filesystem/storage/split/FileSystemSplit.java @@ -0,0 +1,24 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.filesystem.storage.split; + +import java.nio.file.Path; +import java.util.Set; +import java.util.UUID; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import org.opensearch.sql.storage.split.Split; + +@Data +public class FileSystemSplit implements Split { + + @Getter + @EqualsAndHashCode.Exclude + private final String splitId = UUID.randomUUID().toString(); + + private final Set paths; +} diff --git a/filesystem/src/main/java/org/opensearch/sql/filesystem/streaming/FileMetaData.java b/filesystem/src/main/java/org/opensearch/sql/filesystem/streaming/FileMetaData.java new file mode 100644 index 0000000000..24d2a822cd --- /dev/null +++ b/filesystem/src/main/java/org/opensearch/sql/filesystem/streaming/FileMetaData.java @@ -0,0 +1,21 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.filesystem.streaming; + +import java.nio.file.Path; +import java.util.Set; +import lombok.Data; + +/** + * File metadata. Batch id associate with the set of {@link Path}. + */ +@Data +public class FileMetaData { + + private final Long batchId; + + private final Set paths; +} diff --git a/filesystem/src/main/java/org/opensearch/sql/filesystem/streaming/FileSystemStreamSource.java b/filesystem/src/main/java/org/opensearch/sql/filesystem/streaming/FileSystemStreamSource.java new file mode 100644 index 0000000000..9207583c5b --- /dev/null +++ b/filesystem/src/main/java/org/opensearch/sql/filesystem/streaming/FileSystemStreamSource.java @@ -0,0 +1,103 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.filesystem.streaming; + +import com.google.common.collect.Sets; +import java.io.File; +import java.nio.file.FileSystem; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.sql.executor.streaming.Batch; +import org.opensearch.sql.executor.streaming.DefaultMetadataLog; +import org.opensearch.sql.executor.streaming.MetadataLog; +import org.opensearch.sql.executor.streaming.Offset; +import org.opensearch.sql.executor.streaming.StreamingSource; +import org.opensearch.sql.filesystem.storage.split.FileSystemSplit; + +/** + * FileSystem Streaming Source use Hadoop FileSystem. + */ +public class FileSystemStreamSource implements StreamingSource { + + private static final Logger log = LogManager.getLogger(FileSystemStreamSource.class); + + private final MetadataLog fileMetaDataLog; + + private Set seenFiles; + + private final FileSystem fs; + + private final String basePath; + + /** + * Constructor of FileSystemStreamSource. + */ + public FileSystemStreamSource(FileSystem fs, String basePath) { + this.fs = fs; + this.basePath = basePath; + // todo, need to add state recovery + this.fileMetaDataLog = new DefaultMetadataLog<>(); + // todo, need to add state recovery + this.seenFiles = new HashSet<>(); + } + + @Override + public Optional getLatestOffset() { + // list all files. todo. improvement list performance. + Set allFiles = + Arrays.stream(fs.getPath(basePath).toFile().listFiles()) + .filter(file -> !file.isDirectory()) + .map(File::toPath) + .collect(Collectors.toSet()); + + // find unread files. + log.debug("all files {}", allFiles); + Set unread = Sets.difference(allFiles, seenFiles); + + // update seenFiles. + seenFiles = allFiles; + log.debug("seen files {}", seenFiles); + + Optional latestBatchIdOptional = fileMetaDataLog.getLatest().map(Pair::getKey); + if (!unread.isEmpty()) { + long latestBatchId = latestBatchIdOptional.map(id -> id + 1).orElse(0L); + fileMetaDataLog.add(latestBatchId, new FileMetaData(latestBatchId, unread)); + log.debug("latestBatchId {}", latestBatchId); + return Optional.of(new Offset(latestBatchId)); + } else { + log.debug("no unread data"); + Optional offset = + latestBatchIdOptional.isEmpty() + ? Optional.empty() + : Optional.of(new Offset(latestBatchIdOptional.get())); + log.debug("return empty offset {}", offset); + return offset; + } + } + + @Override + public Batch getBatch(Optional start, Offset end) { + Long startBatchId = start.map(Offset::getOffset).map(id -> id + 1).orElse(0L); + Long endBatchId = end.getOffset(); + + Set paths = + fileMetaDataLog.get(Optional.of(startBatchId), Optional.of(endBatchId)).stream() + .map(FileMetaData::getPaths) + .flatMap(Set::stream) + .collect(Collectors.toSet()); + + log.debug("fetch files {} with id from: {} to: {}.", paths, start, end); + return new Batch(Collections.singletonList(new FileSystemSplit(paths))); + } +} diff --git a/filesystem/src/test/java/org/opensearch/sql/filesystem/streaming/FileSystemStreamSourceTest.java b/filesystem/src/test/java/org/opensearch/sql/filesystem/streaming/FileSystemStreamSourceTest.java new file mode 100644 index 0000000000..537fd10c9f --- /dev/null +++ b/filesystem/src/test/java/org/opensearch/sql/filesystem/streaming/FileSystemStreamSourceTest.java @@ -0,0 +1,155 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.filesystem.streaming; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import java.io.IOException; +import java.nio.file.FileSystems; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.io.TempDir; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.sql.executor.streaming.Batch; +import org.opensearch.sql.executor.streaming.Offset; +import org.opensearch.sql.filesystem.storage.split.FileSystemSplit; + +@ExtendWith(MockitoExtension.class) +class FileSystemStreamSourceTest { + + @TempDir + Path perTestTempDir; + + FileSystemStreamSource streamSource; + + @BeforeEach + void setup() { + streamSource = + new FileSystemStreamSource( + FileSystems.getDefault(), + perTestTempDir.toString()); + } + + @Test + void getBatchFromFolder() throws IOException { + Path file = Files.createFile(perTestTempDir.resolve("log.2022.01.01")); + assertTrue(file.toFile().exists()); + + Optional latestOffset = streamSource.getLatestOffset(); + assertTrue(latestOffset.isPresent()); + assertEquals(new Offset(0L), latestOffset.get()); + + // fetch batch (empty, latestOffset] + assertEquals( + Collections.singletonList( + new FileSystemSplit(ImmutableSet.of(file))), + streamSource.getBatch(Optional.empty(), latestOffset.get()).getSplits()); + } + + @Test + void latestOffsetShouldIncreaseIfNoNewFileAdded() throws IOException { + Path file1 = Files.createFile(perTestTempDir.resolve("log.2022.01.01")); + assertTrue(file1.toFile().exists()); + + Optional latestOffset = streamSource.getLatestOffset(); + assertTrue(latestOffset.isPresent()); + assertEquals(new Offset(0L), latestOffset.get()); + + Path file2 = Files.createFile(perTestTempDir.resolve("log.2022.01.02")); + assertTrue(file2.toFile().exists()); + + latestOffset = streamSource.getLatestOffset(); + assertTrue(latestOffset.isPresent()); + assertEquals(new Offset(1L), latestOffset.get()); + + // fetch batch (empty, 1L] + assertBatchEquals( + ImmutableList.of(file1, file2), + streamSource.getBatch(Optional.empty(), latestOffset.get())); + + // fetch batch (empty, 0L] + assertBatchEquals( + ImmutableList.of(file1), streamSource.getBatch(Optional.empty(), new Offset(0L))); + + // fetch batch (0L, 1L] + assertBatchEquals( + ImmutableList.of(file2), + streamSource.getBatch(Optional.of(new Offset(0L)), new Offset(1L))); + } + + @Test + void latestOffsetShouldSameIfNoNewFileAdded() throws IOException { + Path file1 = Files.createFile(perTestTempDir.resolve("log.2022.01.01")); + assertTrue(file1.toFile().exists()); + + Optional latestOffset = streamSource.getLatestOffset(); + assertTrue(latestOffset.isPresent()); + assertEquals(new Offset(0L), latestOffset.get()); + + // no new files. + latestOffset = streamSource.getLatestOffset(); + assertTrue(latestOffset.isPresent()); + assertEquals(new Offset(0L), latestOffset.get()); + } + + @Test + void latestOffsetIsEmptyIfNoFilesInSource() { + Optional latestOffset = streamSource.getLatestOffset(); + assertTrue(latestOffset.isEmpty()); + } + + @Test + void getBatchOutOfRange() throws IOException { + Path file = Files.createFile(perTestTempDir.resolve("log.2022.01.01")); + assertTrue(file.toFile().exists()); + + Optional latestOffset = streamSource.getLatestOffset(); + assertTrue(latestOffset.isPresent()); + assertEquals(new Offset(0L), latestOffset.get()); + + assertEquals( + Collections.singletonList( + new FileSystemSplit(ImmutableSet.of(file))), + streamSource.getBatch(Optional.empty(), latestOffset.get()).getSplits()); + } + + @Test + void dirIsFiltered() throws IOException { + Path file = Files.createFile(perTestTempDir.resolve("log.2022.01.01")); + assertTrue(file.toFile().exists()); + + Path dir = Files.createDirectory(perTestTempDir.resolve("logDir")); + assertTrue(dir.toFile().isDirectory()); + + Optional latestOffset = streamSource.getLatestOffset(); + assertTrue(latestOffset.isPresent()); + assertEquals(new Offset(0L), latestOffset.get()); + + // fetch batch (empty, latestOffset] + assertEquals( + Collections.singletonList( + new FileSystemSplit(ImmutableSet.of(file))), + streamSource.getBatch(Optional.empty(), latestOffset.get()).getSplits()); + } + + void assertBatchEquals(List expectedFiles, Batch batch) { + assertEquals(1, batch.getSplits().size()); + assertThat( + ((FileSystemSplit) batch.getSplits().get(0)).getPaths(), + containsInAnyOrder(expectedFiles.toArray())); + } +} diff --git a/settings.gradle b/settings.gradle index 2f850f422b..7650959451 100644 --- a/settings.gradle +++ b/settings.gradle @@ -18,4 +18,5 @@ include 'doctest' include 'legacy' include 'sql' include 'prometheus' +include 'filesystem' From 3231dfe345adf8023005d2a7079532b403d8c830 Mon Sep 17 00:00:00 2001 From: Peng Huo Date: Tue, 1 Nov 2022 09:30:05 -0700 Subject: [PATCH 2/7] update build.gradle Signed-off-by: Peng Huo --- filesystem/build.gradle | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/filesystem/build.gradle b/filesystem/build.gradle index 64659d85d3..d7fc3e02a8 100644 --- a/filesystem/build.gradle +++ b/filesystem/build.gradle @@ -9,16 +9,10 @@ plugins { id 'jacoco' } -ext { - hadoop = "3.3.4" - aws = "1.12.330" -} - - dependencies { implementation project(':core') - testImplementation "org.junit.jupiter:junit-jupiter:${junit_jupiter}" + testImplementation('org.junit.jupiter:junit-jupiter:5.6.2') testImplementation group: 'org.hamcrest', name: 'hamcrest-library', version: '2.1' testImplementation group: 'org.mockito', name: 'mockito-core', version: '3.12.4' testImplementation group: 'org.mockito', name: 'mockito-junit-jupiter', version: '3.12.4' From 752168ff68092d0d37a78bc70db22142031dceb4 Mon Sep 17 00:00:00 2001 From: Peng Huo Date: Thu, 3 Nov 2022 10:24:36 -0700 Subject: [PATCH 3/7] change to hadoop-fs Signed-off-by: Peng Huo --- .../workflows/sql-test-and-build-workflow.yml | 4 +- doctest/build.gradle | 2 +- filesystem/build.gradle | 29 +++ .../storage/split/FileSystemSplit.java | 2 +- .../filesystem/streaming/FileMetaData.java | 2 +- .../streaming/FileSystemStreamSource.java | 19 +- .../streaming/FileSystemStreamSourceTest.java | 203 ++++++++++-------- 7 files changed, 153 insertions(+), 108 deletions(-) diff --git a/.github/workflows/sql-test-and-build-workflow.yml b/.github/workflows/sql-test-and-build-workflow.yml index 3d063a2bfc..25e0387cf3 100644 --- a/.github/workflows/sql-test-and-build-workflow.yml +++ b/.github/workflows/sql-test-and-build-workflow.yml @@ -25,10 +25,10 @@ jobs: matrix: entry: - { os: ubuntu-latest, java: 11 } - - { os: windows-latest, java: 11, os_build_args: -x doctest -x integTest -x jacocoTestReport -x compileJdbc} + - { os: windows-latest, java: 11, os_build_args: -x doctest -x integTest -x jacocoTestReport -x compileJdbc -PbuildPlatform=windows } - { os: macos-latest, java: 11, os_build_args: -x doctest -x integTest -x jacocoTestReport -x compileJdbc } - { os: ubuntu-latest, java: 17 } - - { os: windows-latest, java: 17, os_build_args: -x doctest -x integTest -x jacocoTestReport -x compileJdbc } + - { os: windows-latest, java: 17, os_build_args: -x doctest -x integTest -x jacocoTestReport -x compileJdbc -PbuildPlatform=windows } - { os: macos-latest, java: 17, os_build_args: -x doctest -x integTest -x jacocoTestReport -x compileJdbc } runs-on: ${{ matrix.entry.os }} diff --git a/doctest/build.gradle b/doctest/build.gradle index 69fac44d95..cf2329d9d3 100644 --- a/doctest/build.gradle +++ b/doctest/build.gradle @@ -86,7 +86,7 @@ task stopOpenSearch(type: KillProcessTask) { doctest.dependsOn startOpenSearch startOpenSearch.dependsOn startPrometheus doctest.finalizedBy stopOpenSearch -build.dependsOn doctest +check.dependsOn doctest clean.dependsOn(cleanBootstrap) // 2.0.0-alpha1-SNAPSHOT -> 2.0.0.0-alpha1-SNAPSHOT diff --git a/filesystem/build.gradle b/filesystem/build.gradle index d7fc3e02a8..b4be876507 100644 --- a/filesystem/build.gradle +++ b/filesystem/build.gradle @@ -9,8 +9,29 @@ plugins { id 'jacoco' } +ext { + hadoop = "3.3.4" + aws = "1.12.330" +} + dependencies { implementation project(':core') + // required by hadoop filesystem https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/filesystem/index.html. + implementation("org.apache.hadoop:hadoop-common:${hadoop}") { + exclude group: 'org.apache.zookeeper', module: 'zookeeper' + exclude group: 'com.sun.jersey', module: 'jersey-json' + exclude group: 'com.google.protobuf', module: 'protobuf-java' + exclude group: 'org.apache.avro', module: 'avro' + exclude group: 'org.eclipse.jetty', module: 'jetty-server' + } + constraints { + implementation('com.fasterxml.woodstox:woodstox-core:6.4.0') { + because 'https://www.mend.io/vulnerability-database/CVE-2022-40156' + } + } + // required https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html + implementation("org.apache.hadoop:hadoop-aws:${hadoop}") + implementation "com.amazonaws:aws-java-sdk-bundle:${aws}" testImplementation('org.junit.jupiter:junit-jupiter:5.6.2') testImplementation group: 'org.hamcrest', name: 'hamcrest-library', version: '2.1' @@ -24,6 +45,14 @@ test { events "passed", "skipped", "failed" exceptionFormat "full" } + + // hadoop-fs depend on native library which is missing on windows. + // https://hadoop.apache.org/docs/r3.3.4/hadoop-project-dist/hadoop-common/NativeLibraries.html#Native_Hadoop_Library + if ('windows' == project.getProperties().getOrDefault('buildPlatform', 'linux')) { + excludes = [ + '**/FileSystemStreamSourceTest.class' + ] + } } jacocoTestReport { diff --git a/filesystem/src/main/java/org/opensearch/sql/filesystem/storage/split/FileSystemSplit.java b/filesystem/src/main/java/org/opensearch/sql/filesystem/storage/split/FileSystemSplit.java index 695af94fe4..7fefb11a85 100644 --- a/filesystem/src/main/java/org/opensearch/sql/filesystem/storage/split/FileSystemSplit.java +++ b/filesystem/src/main/java/org/opensearch/sql/filesystem/storage/split/FileSystemSplit.java @@ -5,12 +5,12 @@ package org.opensearch.sql.filesystem.storage.split; -import java.nio.file.Path; import java.util.Set; import java.util.UUID; import lombok.Data; import lombok.EqualsAndHashCode; import lombok.Getter; +import org.apache.hadoop.fs.Path; import org.opensearch.sql.storage.split.Split; @Data diff --git a/filesystem/src/main/java/org/opensearch/sql/filesystem/streaming/FileMetaData.java b/filesystem/src/main/java/org/opensearch/sql/filesystem/streaming/FileMetaData.java index 24d2a822cd..6a8c90ee80 100644 --- a/filesystem/src/main/java/org/opensearch/sql/filesystem/streaming/FileMetaData.java +++ b/filesystem/src/main/java/org/opensearch/sql/filesystem/streaming/FileMetaData.java @@ -5,9 +5,9 @@ package org.opensearch.sql.filesystem.streaming; -import java.nio.file.Path; import java.util.Set; import lombok.Data; +import org.apache.hadoop.fs.Path; /** * File metadata. Batch id associate with the set of {@link Path}. diff --git a/filesystem/src/main/java/org/opensearch/sql/filesystem/streaming/FileSystemStreamSource.java b/filesystem/src/main/java/org/opensearch/sql/filesystem/streaming/FileSystemStreamSource.java index 9207583c5b..0a1d032c53 100644 --- a/filesystem/src/main/java/org/opensearch/sql/filesystem/streaming/FileSystemStreamSource.java +++ b/filesystem/src/main/java/org/opensearch/sql/filesystem/streaming/FileSystemStreamSource.java @@ -6,16 +6,18 @@ package org.opensearch.sql.filesystem.streaming; import com.google.common.collect.Sets; -import java.io.File; -import java.nio.file.FileSystem; -import java.nio.file.Path; +import java.io.IOException; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; +import lombok.SneakyThrows; import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.sql.executor.streaming.Batch; @@ -38,12 +40,12 @@ public class FileSystemStreamSource implements StreamingSource { private final FileSystem fs; - private final String basePath; + private final Path basePath; /** * Constructor of FileSystemStreamSource. */ - public FileSystemStreamSource(FileSystem fs, String basePath) { + public FileSystemStreamSource(FileSystem fs, Path basePath) { this.fs = fs; this.basePath = basePath; // todo, need to add state recovery @@ -52,13 +54,14 @@ public FileSystemStreamSource(FileSystem fs, String basePath) { this.seenFiles = new HashSet<>(); } + @SneakyThrows(value = IOException.class) @Override public Optional getLatestOffset() { // list all files. todo. improvement list performance. Set allFiles = - Arrays.stream(fs.getPath(basePath).toFile().listFiles()) - .filter(file -> !file.isDirectory()) - .map(File::toPath) + Arrays.stream(fs.listStatus(basePath)) + .filter(status -> !status.isDirectory()) + .map(FileStatus::getPath) .collect(Collectors.toSet()); // find unread files. diff --git a/filesystem/src/test/java/org/opensearch/sql/filesystem/streaming/FileSystemStreamSourceTest.java b/filesystem/src/test/java/org/opensearch/sql/filesystem/streaming/FileSystemStreamSourceTest.java index 537fd10c9f..75c494ec8c 100644 --- a/filesystem/src/test/java/org/opensearch/sql/filesystem/streaming/FileSystemStreamSourceTest.java +++ b/filesystem/src/test/java/org/opensearch/sql/filesystem/streaming/FileSystemStreamSourceTest.java @@ -8,148 +8,161 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doThrow; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; import java.io.IOException; -import java.nio.file.FileSystems; import java.nio.file.Files; import java.nio.file.Path; -import java.util.Collections; +import java.util.Arrays; import java.util.List; import java.util.Optional; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.io.TempDir; +import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; -import org.opensearch.sql.executor.streaming.Batch; import org.opensearch.sql.executor.streaming.Offset; import org.opensearch.sql.filesystem.storage.split.FileSystemSplit; +import org.opensearch.sql.storage.split.Split; @ExtendWith(MockitoExtension.class) class FileSystemStreamSourceTest { - @TempDir - Path perTestTempDir; + @TempDir Path perTestTempDir; FileSystemStreamSource streamSource; + /** + * use hadoop default filesystem. it only works on unix-like system. for running on windows, it + * require native library. Reference. + * https://hadoop.apache.org/docs/r3.3.4/hadoop-project-dist/hadoop-common/NativeLibraries.html + */ @BeforeEach - void setup() { + void setup() throws IOException { streamSource = new FileSystemStreamSource( - FileSystems.getDefault(), - perTestTempDir.toString()); + FileSystem.get(new Configuration()), + new org.apache.hadoop.fs.Path(perTestTempDir.toUri())); } @Test - void getBatchFromFolder() throws IOException { - Path file = Files.createFile(perTestTempDir.resolve("log.2022.01.01")); - assertTrue(file.toFile().exists()); - - Optional latestOffset = streamSource.getLatestOffset(); - assertTrue(latestOffset.isPresent()); - assertEquals(new Offset(0L), latestOffset.get()); - - // fetch batch (empty, latestOffset] - assertEquals( - Collections.singletonList( - new FileSystemSplit(ImmutableSet.of(file))), - streamSource.getBatch(Optional.empty(), latestOffset.get()).getSplits()); + void addOneFileToSource() throws IOException { + emptySource().addFile("log1").latestOffsetShouldBe(0L).batchFromStart("log1"); } @Test - void latestOffsetShouldIncreaseIfNoNewFileAdded() throws IOException { - Path file1 = Files.createFile(perTestTempDir.resolve("log.2022.01.01")); - assertTrue(file1.toFile().exists()); - - Optional latestOffset = streamSource.getLatestOffset(); - assertTrue(latestOffset.isPresent()); - assertEquals(new Offset(0L), latestOffset.get()); - - Path file2 = Files.createFile(perTestTempDir.resolve("log.2022.01.02")); - assertTrue(file2.toFile().exists()); - - latestOffset = streamSource.getLatestOffset(); - assertTrue(latestOffset.isPresent()); - assertEquals(new Offset(1L), latestOffset.get()); - - // fetch batch (empty, 1L] - assertBatchEquals( - ImmutableList.of(file1, file2), - streamSource.getBatch(Optional.empty(), latestOffset.get())); - - // fetch batch (empty, 0L] - assertBatchEquals( - ImmutableList.of(file1), streamSource.getBatch(Optional.empty(), new Offset(0L))); - - // fetch batch (0L, 1L] - assertBatchEquals( - ImmutableList.of(file2), - streamSource.getBatch(Optional.of(new Offset(0L)), new Offset(1L))); + void addMultipleFileInSequence() throws IOException { + emptySource() + .addFile("log1") + .latestOffsetShouldBe(0L) + .batchFromStart("log1") + .addFile("log2") + .latestOffsetShouldBe(1L) + .batchFromStart("log1", "log2") + .batchInBetween(0L, 1L, "log2"); } @Test void latestOffsetShouldSameIfNoNewFileAdded() throws IOException { - Path file1 = Files.createFile(perTestTempDir.resolve("log.2022.01.01")); - assertTrue(file1.toFile().exists()); - - Optional latestOffset = streamSource.getLatestOffset(); - assertTrue(latestOffset.isPresent()); - assertEquals(new Offset(0L), latestOffset.get()); - - // no new files. - latestOffset = streamSource.getLatestOffset(); - assertTrue(latestOffset.isPresent()); - assertEquals(new Offset(0L), latestOffset.get()); + emptySource() + .addFile("log1") + .latestOffsetShouldBe(0L) + .batchFromStart("log1") + .latestOffsetShouldBe(0L) + .batchFromStart("log1"); } @Test void latestOffsetIsEmptyIfNoFilesInSource() { - Optional latestOffset = streamSource.getLatestOffset(); - assertTrue(latestOffset.isEmpty()); + emptySource().noOffset(); } @Test - void getBatchOutOfRange() throws IOException { - Path file = Files.createFile(perTestTempDir.resolve("log.2022.01.01")); - assertTrue(file.toFile().exists()); - - Optional latestOffset = streamSource.getLatestOffset(); - assertTrue(latestOffset.isPresent()); - assertEquals(new Offset(0L), latestOffset.get()); - - assertEquals( - Collections.singletonList( - new FileSystemSplit(ImmutableSet.of(file))), - streamSource.getBatch(Optional.empty(), latestOffset.get()).getSplits()); + void dirIsFiltered() throws IOException { + emptySource() + .addFile("log1") + .latestOffsetShouldBe(0L) + .addDir("dir1") + .latestOffsetShouldBe(0L) + .batchFromStart("log1"); } @Test - void dirIsFiltered() throws IOException { - Path file = Files.createFile(perTestTempDir.resolve("log.2022.01.01")); - assertTrue(file.toFile().exists()); - - Path dir = Files.createDirectory(perTestTempDir.resolve("logDir")); - assertTrue(dir.toFile().isDirectory()); + void sneakThrowException() throws IOException { + FileSystem fs = Mockito.mock(FileSystem.class); + doThrow(IOException.class).when(fs).listStatus(any(org.apache.hadoop.fs.Path.class)); - Optional latestOffset = streamSource.getLatestOffset(); - assertTrue(latestOffset.isPresent()); - assertEquals(new Offset(0L), latestOffset.get()); + streamSource = + new FileSystemStreamSource(fs, + new org.apache.hadoop.fs.Path(perTestTempDir.toUri())); + assertThrows(IOException.class, () -> streamSource.getLatestOffset()); + } - // fetch batch (empty, latestOffset] - assertEquals( - Collections.singletonList( - new FileSystemSplit(ImmutableSet.of(file))), - streamSource.getBatch(Optional.empty(), latestOffset.get()).getSplits()); + StreamSource emptySource() { + return new StreamSource(); } - void assertBatchEquals(List expectedFiles, Batch batch) { - assertEquals(1, batch.getSplits().size()); - assertThat( - ((FileSystemSplit) batch.getSplits().get(0)).getPaths(), - containsInAnyOrder(expectedFiles.toArray())); + private class StreamSource { + + StreamSource addFile(String filename) throws IOException { + Path file = Files.createFile(perTestTempDir.resolve(filename)); + assertTrue(file.toFile().exists()); + + return this; + } + + StreamSource addDir(String dirname) throws IOException { + Path dir = Files.createDirectory(perTestTempDir.resolve(dirname)); + assertTrue(dir.toFile().isDirectory()); + + return this; + } + + StreamSource noOffset() { + assertFalse(streamSource.getLatestOffset().isPresent()); + + return this; + } + + StreamSource latestOffsetShouldBe(Long offset) { + Optional latestOffset = streamSource.getLatestOffset(); + assertTrue(latestOffset.isPresent()); + assertEquals(new Offset(offset), latestOffset.get()); + + return this; + } + + StreamSource batchFromStart(String... uris) { + assertTrue(streamSource.getLatestOffset().isPresent()); + internalBatchInBetween(Optional.empty(), streamSource.getLatestOffset().get(), uris); + + return this; + } + + StreamSource batchInBetween(Long start, Long end, String... uris) { + internalBatchInBetween(Optional.of(new Offset(start)), new Offset(end), uris); + + return this; + } + + private StreamSource internalBatchInBetween( + Optional start, Offset end, String... uris) { + List splits = streamSource.getBatch(start, end).getSplits(); + assertEquals(1, splits.size()); + assertThat( + ((FileSystemSplit) splits.get(0)).getPaths(), + containsInAnyOrder( + Arrays.stream(uris) + .map(name -> new org.apache.hadoop.fs.Path(perTestTempDir.resolve(name).toUri())) + .toArray())); + return this; + } } } From 91ef9b36b024aa9f8a594e8352f2509024137353 Mon Sep 17 00:00:00 2001 From: Peng Huo Date: Thu, 3 Nov 2022 11:15:31 -0700 Subject: [PATCH 4/7] exclude FileSystemStreamSource from jacoco Signed-off-by: Peng Huo --- filesystem/build.gradle | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/filesystem/build.gradle b/filesystem/build.gradle index b4be876507..37baec959e 100644 --- a/filesystem/build.gradle +++ b/filesystem/build.gradle @@ -71,6 +71,13 @@ test.finalizedBy(project.tasks.jacocoTestReport) jacocoTestCoverageVerification { violationRules { rule { + // hadoop-fs depend on native library which is missing on windows. + // https://hadoop.apache.org/docs/r3.3.4/hadoop-project-dist/hadoop-common/NativeLibraries.html#Native_Hadoop_Library + if ('windows' == project.getProperties().getOrDefault('buildPlatform', 'linux')) { + excludes = [ + 'org.opensearch.sql.filesystem.streaming.FileSystemStreamSource' + ] + } element = 'CLASS' limit { counter = 'LINE' From 5fc1eb579101dc944e0abd77d1f2fced9dadbc91 Mon Sep 17 00:00:00 2001 From: Peng Huo Date: Thu, 3 Nov 2022 16:49:22 -0700 Subject: [PATCH 5/7] exclude unnecessary depedency Signed-off-by: Peng Huo --- filesystem/build.gradle | 42 +++++++++++++++++++++++++++++++++++------ plugin/build.gradle | 5 +++++ 2 files changed, 41 insertions(+), 6 deletions(-) diff --git a/filesystem/build.gradle b/filesystem/build.gradle index 37baec959e..0571088132 100644 --- a/filesystem/build.gradle +++ b/filesystem/build.gradle @@ -14,24 +14,54 @@ ext { aws = "1.12.330" } +configurations.all { + resolutionStrategy.force "commons-io:commons-io:2.8.0" +} + dependencies { implementation project(':core') // required by hadoop filesystem https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/filesystem/index.html. implementation("org.apache.hadoop:hadoop-common:${hadoop}") { - exclude group: 'org.apache.zookeeper', module: 'zookeeper' - exclude group: 'com.sun.jersey', module: 'jersey-json' + exclude group: 'org.apache.zookeeper' + exclude group: 'org.eclipse.jetty' + exclude group: 'com.sun.jersey' + exclude group: 'javax.servlet.jsp' + exclude group: 'javax.servlet' + exclude group: 'org.apache.kerby' + exclude group: 'org.apache.curator' exclude group: 'com.google.protobuf', module: 'protobuf-java' exclude group: 'org.apache.avro', module: 'avro' - exclude group: 'org.eclipse.jetty', module: 'jetty-server' + exclude group: 'com.nimbusds', module: 'nimbus-jose-jwt' + // enforce version. + exclude group: 'com.fasterxml.woodstox', module: 'woodstox-core' + exclude group: 'commons-io', module: 'commons-io' + exclude group: 'ch.qos.reload4j', module: 'reload4j' + exclude group: 'org.apache.httpcomponents', module: 'httpcore' } + implementation('com.fasterxml.woodstox:woodstox-core') constraints { implementation('com.fasterxml.woodstox:woodstox-core:6.4.0') { because 'https://www.mend.io/vulnerability-database/CVE-2022-40156' } } - // required https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html - implementation("org.apache.hadoop:hadoop-aws:${hadoop}") - implementation "com.amazonaws:aws-java-sdk-bundle:${aws}" + implementation('commons-io:commons-io') + constraints { + implementation('commons-io:commons-io:2.8.0') { + because 'between versions 2.8.0 and 2.5' + } + } + implementation('ch.qos.reload4j:reload4j') + constraints { + implementation('ch.qos.reload4j:reload4j:1.2.22') { + because 'between versions 1.2.22 and 1.2.19' + } + } + implementation('org.apache.httpcomponents:httpcore') + constraints { + implementation('org.apache.httpcomponents:httpcore:4.4.15') { + because 'between versions 4.4.15 and 4.4.13' + } + } testImplementation('org.junit.jupiter:junit-jupiter:5.6.2') testImplementation group: 'org.hamcrest', name: 'hamcrest-library', version: '2.1' diff --git a/plugin/build.gradle b/plugin/build.gradle index d170b72a95..4295854e37 100644 --- a/plugin/build.gradle +++ b/plugin/build.gradle @@ -91,6 +91,10 @@ configurations.all { resolutionStrategy.force "com.fasterxml.jackson.core:jackson-databind:${jackson_version}" resolutionStrategy.force "org.jetbrains.kotlin:kotlin-stdlib:1.6.0" resolutionStrategy.force "org.jetbrains.kotlin:kotlin-stdlib-common:1.6.0" + resolutionStrategy.force "org.apache.commons:commons-math3:3.6.1" + resolutionStrategy.force "org.apache.commons:commons-lang3:3.12.0" + resolutionStrategy.force "joda-time:joda-time:2.10.12" + resolutionStrategy.force "org.slf4j:slf4j-api:1.7.36" } compileJava { options.compilerArgs.addAll(["-processor", 'lombok.launch.AnnotationProcessorHider$AnnotationProcessor']) @@ -110,6 +114,7 @@ dependencies { api project(':legacy') api project(':opensearch') api project(':prometheus') + api project(':filesystem') } test { From c8b0118638633e2bb85f2132773c516aa18553a9 Mon Sep 17 00:00:00 2001 From: Peng Huo Date: Thu, 3 Nov 2022 20:02:09 -0700 Subject: [PATCH 6/7] Update integ-test depedency Signed-off-by: Peng Huo --- integ-test/build.gradle | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/integ-test/build.gradle b/integ-test/build.gradle index 5e0a53bf1a..66b3c1c94c 100644 --- a/integ-test/build.gradle +++ b/integ-test/build.gradle @@ -58,6 +58,10 @@ configurations.all { resolutionStrategy.force "com.fasterxml.jackson.core:jackson-databind:${jackson_version}" resolutionStrategy.force "org.jetbrains.kotlin:kotlin-stdlib:1.6.0" resolutionStrategy.force "org.jetbrains.kotlin:kotlin-stdlib-common:1.6.0" + resolutionStrategy.force "org.apache.commons:commons-math3:3.6.1" + resolutionStrategy.force "org.apache.commons:commons-lang3:3.12.0" + resolutionStrategy.force "joda-time:joda-time:2.10.12" + resolutionStrategy.force "org.slf4j:slf4j-api:1.7.36" } dependencies { From e2e3cd8efb9228bec8b8c5cb01c0f4016bd46839 Mon Sep 17 00:00:00 2001 From: Peng Huo Date: Mon, 7 Nov 2022 19:24:18 -0800 Subject: [PATCH 7/7] change from splits to split in batch Signed-off-by: Peng Huo --- .../java/org/opensearch/sql/executor/streaming/Batch.java | 3 +-- .../sql/filesystem/streaming/FileSystemStreamSource.java | 3 +-- .../filesystem/streaming/FileSystemStreamSourceTest.java | 6 ++---- 3 files changed, 4 insertions(+), 8 deletions(-) diff --git a/core/src/main/java/org/opensearch/sql/executor/streaming/Batch.java b/core/src/main/java/org/opensearch/sql/executor/streaming/Batch.java index 7c27ab4622..cd7d7dae5a 100644 --- a/core/src/main/java/org/opensearch/sql/executor/streaming/Batch.java +++ b/core/src/main/java/org/opensearch/sql/executor/streaming/Batch.java @@ -5,7 +5,6 @@ package org.opensearch.sql.executor.streaming; -import java.util.List; import lombok.Data; import org.opensearch.sql.storage.split.Split; @@ -14,5 +13,5 @@ */ @Data public class Batch { - private final List splits; + private final Split split; } diff --git a/filesystem/src/main/java/org/opensearch/sql/filesystem/streaming/FileSystemStreamSource.java b/filesystem/src/main/java/org/opensearch/sql/filesystem/streaming/FileSystemStreamSource.java index 0a1d032c53..6a9639bdcb 100644 --- a/filesystem/src/main/java/org/opensearch/sql/filesystem/streaming/FileSystemStreamSource.java +++ b/filesystem/src/main/java/org/opensearch/sql/filesystem/streaming/FileSystemStreamSource.java @@ -8,7 +8,6 @@ import com.google.common.collect.Sets; import java.io.IOException; import java.util.Arrays; -import java.util.Collections; import java.util.HashSet; import java.util.Optional; import java.util.Set; @@ -101,6 +100,6 @@ public Batch getBatch(Optional start, Offset end) { .collect(Collectors.toSet()); log.debug("fetch files {} with id from: {} to: {}.", paths, start, end); - return new Batch(Collections.singletonList(new FileSystemSplit(paths))); + return new Batch(new FileSystemSplit(paths)); } } diff --git a/filesystem/src/test/java/org/opensearch/sql/filesystem/streaming/FileSystemStreamSourceTest.java b/filesystem/src/test/java/org/opensearch/sql/filesystem/streaming/FileSystemStreamSourceTest.java index 75c494ec8c..fba038f6a3 100644 --- a/filesystem/src/test/java/org/opensearch/sql/filesystem/streaming/FileSystemStreamSourceTest.java +++ b/filesystem/src/test/java/org/opensearch/sql/filesystem/streaming/FileSystemStreamSourceTest.java @@ -18,7 +18,6 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.Arrays; -import java.util.List; import java.util.Optional; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -154,10 +153,9 @@ StreamSource batchInBetween(Long start, Long end, String... uris) { private StreamSource internalBatchInBetween( Optional start, Offset end, String... uris) { - List splits = streamSource.getBatch(start, end).getSplits(); - assertEquals(1, splits.size()); + Split split = streamSource.getBatch(start, end).getSplit(); assertThat( - ((FileSystemSplit) splits.get(0)).getPaths(), + ((FileSystemSplit) split).getPaths(), containsInAnyOrder( Arrays.stream(uris) .map(name -> new org.apache.hadoop.fs.Path(perTestTempDir.resolve(name).toUri()))