Skip to content

Commit

Permalink
Using Java FileSystems
Browse files Browse the repository at this point in the history
Signed-off-by: Peng Huo <penghuo@gmail.com>
  • Loading branch information
penghuo committed Nov 1, 2022
1 parent abb824f commit ad27cfc
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 57 deletions.
17 changes: 0 additions & 17 deletions filesystem/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,6 @@ dependencies {
implementation project(':core')
implementation group: 'org.opensearch', name: 'opensearch', version: "${opensearch_version}"

// required by hadoop filesystem https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/filesystem/index.html.
implementation "org.apache.hadoop:hadoop-aws:${hadoop}"
implementation("org.apache.hadoop:hadoop-common:${hadoop}") {
exclude group: 'ch.qos.reload4j', module: 'reload4j'
exclude group: 'org.slf4j', module: 'slf4j-reload4j'
exclude group: 'org.apache.zookeeper', module: 'zookeeper'
exclude group: 'com.fasterxml.jackson.core', module: 'jackson-databind'
exclude group: 'org.eclipse.jetty', module: 'jetty-server'
exclude group: 'org.codehaus.jettison', module: 'jettison'
exclude group: 'com.google.protobuf', module: 'protobuf-java'
exclude group: 'com.sun.jersey', module: 'jersey-json'

// exclude group: 'com.fasterxml.woodstox', module: 'woodstox-core'
}
// required https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html
implementation "com.amazonaws:aws-java-sdk-bundle:${aws}"

testImplementation "org.junit.jupiter:junit-jupiter:${junit_jupiter}"
testImplementation group: 'org.hamcrest', name: 'hamcrest-library', version: '2.1'
testImplementation group: 'org.mockito', name: 'mockito-core', version: '3.12.4'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@

package org.opensearch.sql.filesystem.storage.split;

import java.nio.file.Path;
import java.util.Set;
import java.util.UUID;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import org.apache.hadoop.fs.Path;
import org.opensearch.sql.storage.split.Split;

@Data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@

package org.opensearch.sql.filesystem.streaming;

import java.nio.file.Path;
import java.util.Set;
import lombok.Data;
import org.apache.hadoop.fs.Path;

/**
* File metadata. Batch id associate with the set of {@link Path}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,16 @@
package org.opensearch.sql.filesystem.streaming;

import com.google.common.collect.Sets;
import java.io.IOException;
import java.io.File;
import java.nio.file.FileSystem;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.SneakyThrows;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.sql.executor.streaming.Batch;
Expand All @@ -40,12 +38,12 @@ public class FileSystemStreamSource implements StreamingSource {

private final FileSystem fs;

private final Path basePath;
private final String basePath;

/**
* Constructor of FileSystemStreamSource.
*/
public FileSystemStreamSource(FileSystem fs, Path basePath) {
public FileSystemStreamSource(FileSystem fs, String basePath) {
this.fs = fs;
this.basePath = basePath;
// todo, need to add state recovery
Expand All @@ -54,12 +52,14 @@ public FileSystemStreamSource(FileSystem fs, Path basePath) {
this.seenFiles = new HashSet<>();
}

@SneakyThrows({IOException.class})
@Override
public Optional<Offset> getLatestOffset() {
// list all files. todo. improvement list performance.
Set<Path> allFiles =
Arrays.stream(fs.listStatus(basePath)).map(FileStatus::getPath).collect(Collectors.toSet());
Arrays.stream(fs.getPath(basePath).toFile().listFiles())
.filter(file -> !file.isDirectory())
.map(File::toPath)
.collect(Collectors.toSet());

// find unread files.
log.debug("all files {}", allFiles);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,27 +8,21 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import java.io.IOException;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.sql.executor.streaming.Batch;
import org.opensearch.sql.executor.streaming.Offset;
Expand All @@ -43,11 +37,11 @@ class FileSystemStreamSourceTest {
FileSystemStreamSource streamSource;

@BeforeEach
void setup() throws IOException {
void setup() {
streamSource =
new FileSystemStreamSource(
FileSystem.get(new Configuration()),
new org.apache.hadoop.fs.Path(perTestTempDir.toUri()));
FileSystems.getDefault(),
perTestTempDir.toString());
}

@Test
Expand All @@ -62,7 +56,7 @@ void getBatchFromFolder() throws IOException {
// fetch batch (empty, latestOffset]
assertEquals(
Collections.singletonList(
new FileSystemSplit(ImmutableSet.of(new org.apache.hadoop.fs.Path(file.toUri())))),
new FileSystemSplit(ImmutableSet.of(file))),
streamSource.getBatch(Optional.empty(), latestOffset.get()).getSplits());
}

Expand All @@ -84,19 +78,16 @@ void latestOffsetShouldIncreaseIfNoNewFileAdded() throws IOException {

// fetch batch (empty, 1L]
assertBatchEquals(
ImmutableList.of(
new org.apache.hadoop.fs.Path(file1.toUri()),
new org.apache.hadoop.fs.Path(file2.toUri())),
ImmutableList.of(file1, file2),
streamSource.getBatch(Optional.empty(), latestOffset.get()));

// fetch batch (empty, 0L]
assertBatchEquals(
ImmutableList.of(new org.apache.hadoop.fs.Path(file1.toUri())),
streamSource.getBatch(Optional.empty(), new Offset(0L)));
ImmutableList.of(file1), streamSource.getBatch(Optional.empty(), new Offset(0L)));

// fetch batch (0L, 1L]
assertBatchEquals(
ImmutableList.of(new org.apache.hadoop.fs.Path(file2.toUri())),
ImmutableList.of(file2),
streamSource.getBatch(Optional.of(new Offset(0L)), new Offset(1L)));
}

Expand Down Expand Up @@ -132,24 +123,30 @@ void getBatchOutOfRange() throws IOException {

assertEquals(
Collections.singletonList(
new FileSystemSplit(ImmutableSet.of(new org.apache.hadoop.fs.Path(file.toUri())))),
new FileSystemSplit(ImmutableSet.of(file))),
streamSource.getBatch(Optional.empty(), latestOffset.get()).getSplits());
}

@Test
void fileNotFoundThrownException() throws IOException {
FileSystem fs = mock(FileSystem.class);
doThrow(IOException.class).when(fs).listStatus(any(org.apache.hadoop.fs.Path.class));
assertThrows(
IOException.class,
() -> {
streamSource =
new FileSystemStreamSource(fs, new org.apache.hadoop.fs.Path(perTestTempDir.toUri()));
streamSource.getLatestOffset();
});
void dirIsFiltered() throws IOException {
Path file = Files.createFile(perTestTempDir.resolve("log.2022.01.01"));
assertTrue(file.toFile().exists());

Path dir = Files.createDirectory(perTestTempDir.resolve("logDir"));
assertTrue(dir.toFile().isDirectory());

Optional<Offset> latestOffset = streamSource.getLatestOffset();
assertTrue(latestOffset.isPresent());
assertEquals(new Offset(0L), latestOffset.get());

// fetch batch (empty, latestOffset]
assertEquals(
Collections.singletonList(
new FileSystemSplit(ImmutableSet.of(file))),
streamSource.getBatch(Optional.empty(), latestOffset.get()).getSplits());
}

void assertBatchEquals(List<org.apache.hadoop.fs.Path> expectedFiles, Batch batch) {
void assertBatchEquals(List<Path> expectedFiles, Batch batch) {
assertEquals(1, batch.getSplits().size());
assertThat(
((FileSystemSplit) batch.getSplits().get(0)).getPaths(),
Expand Down

0 comments on commit ad27cfc

Please sign in to comment.