Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(filestore): support reading nested directory and various file #423

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
164 commits
Select commit Hold shift + click to select a range
d9d421b
build(dataSource): dedup maven configuration
aqni Jul 17, 2024
c5ae1b2
build: remove duplicate properties and deploy
aqni Jul 17, 2024
6ea589d
build(file): init module
aqni Jul 18, 2024
c7af45e
feat(file): add empty FileStorage implement
aqni Jul 18, 2024
93b426a
refactor(file): add legacy filesystem and parquet implement dependenc…
aqni Jul 18, 2024
90238a1
build(file): automatic compile thrift idl
aqni Jul 18, 2024
0051c54
refactor(dataSource): rename file to filestore
aqni Jul 18, 2024
26c124c
feat(filestore): init thrift rpc definition
aqni Jul 18, 2024
86ec8b7
feat(filestore): init rpc server
aqni Jul 18, 2024
cd3f1a5
store
aqni Jul 20, 2024
8a39dbd
refactor(filestore): delete legacy code
aqni Jul 23, 2024
0d60a1f
feat(filestore): wrapper parquet
aqni Jul 23, 2024
e140a95
feat(filestore): wrapper parquet
aqni Jul 23, 2024
d8a84fa
update StorageEngineType in thrift define
aqni Jul 23, 2024
fbb22ec
feat(filestore): wrap parquet
aqni Jul 23, 2024
e8e9545
feat(filestore): wrap parquet
aqni Jul 23, 2024
a85d078
feat(filestore): wrap parquet
aqni Jul 23, 2024
cb778d9
feat(filestore): wrap parquet
aqni Jul 23, 2024
41b2103
feat(filestore): wrap filesystem
aqni Jul 23, 2024
b4df977
feat(filestore): remove FileWriter.java
aqni Jul 23, 2024
3492e50
feat(filestore): wrap parquet
aqni Jul 23, 2024
9e9b7c4
format
aqni Jul 24, 2024
ab6559a
feat(filestore): wrap filesystem
aqni Jul 24, 2024
24d4da5
feat(filestore): update config.properties
aqni Jul 24, 2024
f11446c
feat(filestore): init
aqni Jul 24, 2024
3ede147
fix build
aqni Jul 24, 2024
663ced5
feat(filestore): wrap filestore
aqni Jul 24, 2024
ca19fed
feat(filestore): init
aqni Jul 24, 2024
638e2e8
feat(filestore): formats
aqni Jul 24, 2024
4070467
fix(core): use StorageEngineClassLoader load resources
aqni Jul 24, 2024
bb2947e
fix(filestore): fix server and ServiceLoader, refactor log
aqni Jul 24, 2024
4c3b5c1
fix(filestore): fix parquet wrapper
aqni Jul 24, 2024
20e069c
fix(filestore): fix parquet wrapper
aqni Jul 24, 2024
955e139
fix(filestore): fix ranges util
aqni Jul 24, 2024
0550d9c
fix(filestore): fix parquet wrapper
aqni Jul 24, 2024
1bdeb60
test(filestore): standalone-test-pushdown.yml
aqni Jul 24, 2024
08cb8b1
format
aqni Jul 24, 2024
77cda43
format
aqni Jul 24, 2024
1251ea4
rollback default conf
aqni Jul 24, 2024
af40576
rollback default conf
aqni Jul 24, 2024
e35ad66
test filestore
aqni Jul 24, 2024
2969ff3
test filestore
aqni Jul 24, 2024
5364630
Merge branch 'main' into feat/filestore/init
aqni Jul 24, 2024
37c6557
rollback default conf
aqni Jul 24, 2024
a3dcb8d
rollback default conf
aqni Jul 24, 2024
14ae15c
fix ci test
aqni Jul 24, 2024
e1917c5
fix log
aqni Jul 24, 2024
abdcdfa
fix log
aqni Jul 24, 2024
600e08f
fix class loader
aqni Jul 24, 2024
0d687aa
fix class loader
aqni Jul 24, 2024
41e7a21
fix class loader
aqni Jul 24, 2024
2500d9e
feat(core): reimplement StorageEngineClassLoader
aqni Jul 24, 2024
1614e5e
Merge branch 'main' into feat/filestore/init
aqni Jul 24, 2024
440fe6f
fix spi
aqni Jul 24, 2024
07962eb
fix spi
aqni Jul 24, 2024
3f8159d
fix spi
aqni Jul 24, 2024
9d70355
ci: cancel quiet build
aqni Jul 24, 2024
d39ef08
build: configure maven-compiler-plugin in pluginManagement
aqni Jul 24, 2024
31a046a
test: add testConfig
aqni Jul 24, 2024
c57977c
test: fix filestore test
aqni Jul 24, 2024
6b74fb5
build: fix plugins version
aqni Jul 24, 2024
eb883a0
ci: cancel quiet test
aqni Jul 24, 2024
25f1665
ci: fix test args
aqni Jul 24, 2024
90f40e5
filesystem wrapper
aqni Jul 25, 2024
7d99769
format class loader
aqni Jul 25, 2024
c1f80fe
fix count pushdown filter for parquet
aqni Jul 25, 2024
8f7810f
enable standalone-test.yml
aqni Jul 25, 2024
7b2a549
enable DB-CE
aqni Jul 25, 2024
c8dfa63
fix DB-CE for filestore
aqni Jul 25, 2024
865a2f2
feat(filestore): impl union dummy units with unit-test and fix
aqni Jul 25, 2024
af1c25d
format
aqni Jul 25, 2024
556f070
format test
aqni Jul 25, 2024
c3246b9
test db-ce
aqni Jul 26, 2024
46bd2b6
fix config in filestore
aqni Jul 26, 2024
e57f69f
fix classloader
aqni Jul 26, 2024
3f65761
test jdbc
aqni Jul 26, 2024
ab7a44b
fix codeql
aqni Jul 26, 2024
9dc99fd
fix spi
aqni Jul 26, 2024
f461599
fix embedded type
aqni Jul 26, 2024
c3749da
fix classloader
aqni Jul 26, 2024
df9e1db
fix dbce
aqni Jul 26, 2024
460e2bf
rollback classloader
aqni Jul 26, 2024
33dfdad
fix log
aqni Jul 26, 2024
1a3886c
fix classloader
aqni Jul 26, 2024
5da3295
fix log
aqni Jul 26, 2024
3609209
fix thrift
aqni Jul 26, 2024
03db2c7
fix thrift
aqni Jul 26, 2024
bfbf192
fix thrift
aqni Jul 26, 2024
7d2c488
add test and fix filestore server
aqni Jul 26, 2024
b9e1248
revert cli message changes
aqni Jul 26, 2024
984203c
revert test changes
aqni Jul 26, 2024
52c2dec
update test config
aqni Jul 27, 2024
9d0bb4a
rollback thrift storageEngineType
aqni Jul 27, 2024
855d6cd
rollback thrift storageEngineType
aqni Jul 27, 2024
fd981d9
update pysession
aqni Jul 27, 2024
0e4469a
update pysession
aqni Jul 27, 2024
83364ea
rollback poms
aqni Jul 27, 2024
910122c
add license
aqni Jul 27, 2024
15be27a
roll back
aqni Jul 27, 2024
40a05fc
fix sed in mac
aqni Jul 27, 2024
30d7808
test db-ce
aqni Jul 27, 2024
cb98b83
roll back server
aqni Jul 27, 2024
3a977c8
fix roll back server
aqni Jul 27, 2024
d4f5923
fix server
aqni Jul 27, 2024
4e8c080
format
aqni Jul 27, 2024
4669cf5
fix default dummy struct
aqni Jul 27, 2024
c9dac73
fix rpc rawFilter
aqni Jul 27, 2024
31837d1
fix server
aqni Jul 27, 2024
ba54f2d
update libthrift
aqni Jul 28, 2024
a1679fc
update libthrift
aqni Jul 28, 2024
6e85182
fix server
aqni Aug 5, 2024
d9d0678
rollback test
aqni Aug 5, 2024
d228864
fix codeql
aqni Aug 5, 2024
b929a2e
update test
aqni Aug 6, 2024
4c050a3
Merge branch 'refs/heads/main' into feat/filestore/init
aqni Aug 6, 2024
50e9d95
fix after merge
aqni Aug 6, 2024
474386b
fix after merge
aqni Aug 7, 2024
8566af5
store
aqni Aug 8, 2024
a160f78
Merge branch 'refs/heads/main' into feat/filestore/init
aqni Aug 8, 2024
ec14427
fix after merge
aqni Aug 8, 2024
8ff0522
update pysession thrift
aqni Aug 8, 2024
21c2324
fix after merge
aqni Aug 8, 2024
d4b9664
test dbce
aqni Aug 8, 2024
df5f1e4
debug
aqni Aug 8, 2024
dff8f9d
Revert "debug"
aqni Aug 8, 2024
f106af0
fix show columns
aqni Aug 8, 2024
0375e49
Revert "test dbce"
aqni Aug 8, 2024
61e4dff
format
aqni Aug 8, 2024
631525a
fix dbce
aqni Aug 8, 2024
fcd3eb7
fix server startup
aqni Aug 9, 2024
6826b13
fix server startup
aqni Aug 9, 2024
949a79f
store
aqni Aug 12, 2024
9d3df11
Merge branch 'refs/heads/main' into feat/filestore/init
aqni Aug 12, 2024
008693e
fix server startup
aqni Aug 9, 2024
5e07ef9
store
aqni Aug 13, 2024
6a23f26
store
aqni Aug 13, 2024
2874002
store
aqni Aug 13, 2024
902f4b8
store
aqni Aug 13, 2024
dfe72b8
store
aqni Aug 14, 2024
434a081
store
aqni Aug 14, 2024
b6b904f
Merge branch 'main' into feat/filestore/read-parquet-in-file-tree
aqni Aug 14, 2024
e268058
Merge branch 'refs/heads/feat/filestore/init' into feat/filestore/rea…
aqni Aug 14, 2024
0c2095b
fix dbce
aqni Aug 15, 2024
d07fa0a
test filestore dbce
aqni Aug 15, 2024
0ce4698
Revert "test filestore dbce"
aqni Aug 15, 2024
42c7968
store
aqni Aug 15, 2024
3f1614e
fix dbce
aqni Aug 15, 2024
0f56c8a
add license
aqni Aug 15, 2024
04a0f94
fix dbce
aqni Aug 15, 2024
137d946
read file by path
aqni Aug 16, 2024
aa25cf4
Merge branch 'refs/heads/main' into feat/filestore/init
aqni Aug 16, 2024
a7a00aa
Merge branch 'refs/heads/feat/filestore/init' into feat/filestore/rea…
aqni Aug 16, 2024
418473f
fix
aqni Aug 17, 2024
4683d6d
fix
aqni Aug 19, 2024
afa9424
Merge branch 'refs/heads/main' into feat/filestore/read-parquet-in-fi…
aqni Aug 19, 2024
774d4e4
Merge branch 'main' into feat/filestore/init
aqni Aug 20, 2024
5e8192d
fix
aqni Aug 20, 2024
7ae4cc8
test filestore
aqni Aug 20, 2024
c7726e0
Revert "test filestore"
aqni Aug 20, 2024
876fa34
Merge branch 'refs/heads/feat/filestore/init' into feat/filestore/rea…
aqni Aug 20, 2024
3b87076
Merge branch 'main' into feat/filestore/read-parquet-in-file-tree
aqni Aug 23, 2024
e9c10c4
Merge branch 'main' into feat/filestore/read-parquet-in-file-tree
aqni Sep 3, 2024
ec1552c
format
aqni Sep 3, 2024
0bf3ce6
format
aqni Sep 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,18 @@
import cn.edu.tsinghua.iginx.engine.shared.data.read.Row;
import cn.edu.tsinghua.iginx.engine.shared.data.read.RowStream;
import java.util.Collections;
import java.util.Objects;

public class EmptyRowStream implements RowStream {

private final Header header;

public EmptyRowStream() {
this.header = new Header(Field.KEY, Collections.emptyList());
this(new Header(Field.KEY, Collections.emptyList()));
}

public EmptyRowStream(Header header) {
this.header = Objects.requireNonNull(header);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import cn.edu.tsinghua.iginx.engine.shared.operator.filter.Filter;
import cn.edu.tsinghua.iginx.engine.shared.operator.tag.TagFilter;
import cn.edu.tsinghua.iginx.filestore.common.AbstractConfig;
import cn.edu.tsinghua.iginx.filestore.common.Configs;
import cn.edu.tsinghua.iginx.filestore.common.FileStoreException;
import cn.edu.tsinghua.iginx.filestore.common.Filters;
import cn.edu.tsinghua.iginx.filestore.service.FileStoreConfig;
Expand All @@ -48,6 +49,7 @@
import cn.edu.tsinghua.iginx.filestore.struct.FileStructureManager;
import cn.edu.tsinghua.iginx.filestore.struct.legacy.filesystem.LegacyFilesystem;
import cn.edu.tsinghua.iginx.filestore.struct.legacy.parquet.LegacyParquet;
import cn.edu.tsinghua.iginx.filestore.struct.tree.FileTreeConfig;
import cn.edu.tsinghua.iginx.filestore.thrift.DataBoundary;
import cn.edu.tsinghua.iginx.filestore.thrift.DataUnit;
import cn.edu.tsinghua.iginx.metadata.entity.ColumnsInterval;
Expand All @@ -56,8 +58,8 @@
import cn.edu.tsinghua.iginx.thrift.AggregateType;
import cn.edu.tsinghua.iginx.thrift.StorageEngineType;
import cn.edu.tsinghua.iginx.utils.Pair;
import com.google.common.base.Strings;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigBeanFactory;
import com.typesafe.config.ConfigFactory;
import java.net.InetSocketAddress;
import java.util.*;
Expand Down Expand Up @@ -102,7 +104,7 @@ static FileStoreConfig toFileStoreConfig(StorageEngineMeta meta)
throws StorageInitializationException {
Config rawConfig = toConfig(meta);
LOGGER.debug("storage of {} config: {}", meta, rawConfig);
FileStoreConfig fileStoreConfig = ConfigBeanFactory.create(rawConfig, FileStoreConfig.class);
FileStoreConfig fileStoreConfig = FileStoreConfig.of(rawConfig);
LOGGER.debug("storage of {} will be initialized with {}", meta, fileStoreConfig);
List<AbstractConfig.ValidationProblem> problems = fileStoreConfig.validate();
if (!problems.isEmpty()) {
Expand All @@ -112,26 +114,41 @@ static FileStoreConfig toFileStoreConfig(StorageEngineMeta meta)
}

static Config toConfig(StorageEngineMeta meta) throws StorageInitializationException {
HashMap<String, String> reshapedParams = new HashMap<>();
HashMap<String, String> reshaped = new HashMap<>();

for (Map.Entry<String, String> param : meta.getExtraParams().entrySet()) {
String key = param.getKey();
String value = param.getValue();
if (key.contains(".")) {
reshapedParams.put(key, value);
reshaped.put(key, value);
}
}

reshapedParams.put("data.root", meta.getExtraParams().get("dir"));
reshapedParams.put("dummy.root", meta.getExtraParams().get("dummy_dir"));
reshapedParams.putIfAbsent("data.struct", LegacyParquet.NAME);
reshapedParams.putIfAbsent("dummy.struct", LegacyFilesystem.NAME);
Configs.put(
reshaped,
meta.getExtraParams().get("dir"),
FileStoreConfig.Fields.data,
StorageConfig.Fields.root);
Configs.put(
reshaped,
meta.getExtraParams().get("dummy_dir"),
FileStoreConfig.Fields.dummy,
StorageConfig.Fields.root);
Configs.put(
reshaped,
meta.getExtraParams().get("embedded_prefix"),
FileStoreConfig.Fields.dummy,
StorageConfig.Fields.config,
FileTreeConfig.Fields.prefix);
Configs.putIfAbsent(
reshaped, LegacyParquet.NAME, FileStoreConfig.Fields.data, StorageConfig.Fields.struct);
Configs.putIfAbsent(
reshaped, LegacyFilesystem.NAME, FileStoreConfig.Fields.dummy, StorageConfig.Fields.struct);

boolean local = isLocal(meta);
reshapedParams.put("server", String.valueOf(local));
reshaped.put(FileStoreConfig.Fields.serve, String.valueOf(local));

Config config =
ConfigFactory.parseMap(reshapedParams, "storage engine initialization parameters");
Config config = ConfigFactory.parseMap(reshaped, "storage engine initialization parameters");

if (local) {
LOGGER.debug("storage of {} is local, ignore config for remote", meta);
Expand Down Expand Up @@ -308,7 +325,7 @@ public List<Column> getColumns(Set<String> patterns, TagFilter tagFilter)
@Override
public Pair<ColumnsInterval, KeyInterval> getBoundaryOfStorage(String prefix)
throws PhysicalException {
Map<DataUnit, DataBoundary> units = service.getUnits(prefix);
Map<DataUnit, DataBoundary> units = service.getUnits(Strings.emptyToNull(prefix));
DataBoundary boundary = units.get(unitOfDummy());
if (Objects.equals(boundary, new DataBoundary())) {
throw new PhysicalTaskExecuteFailureException("no data");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package cn.edu.tsinghua.iginx.filestore.common;

import com.google.common.collect.Range;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigBeanFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand All @@ -29,6 +31,10 @@ public abstract class AbstractConfig {

public abstract List<ValidationProblem> validate();

public static <C extends AbstractConfig> C of(Config raw, Class<C> clazz) {
return ConfigBeanFactory.create(raw, clazz);
}

public static class ValidationProblem {
private final List<String> reversedPath;
private final String problem;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* IGinX - the polystore system with high performance
* Copyright (C) Tsinghua University
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package cn.edu.tsinghua.iginx.filestore.common;

import java.io.Closeable;
import java.io.IOException;

public class Closeables {

private Closeables() {}

public static void close(Iterable<? extends Closeable> ac) throws IOException {
if (ac == null) {
return;
} else if (ac instanceof Closeable) {
((Closeable) ac).close();
return;
}

IOException exception = null;
for (Closeable closeable : ac) {
try {
if (closeable != null) {
closeable.close();
}
} catch (IOException e) {
if (exception == null) {
exception = e;
} else if (e != exception) {
exception.addSuppressed(e);
}
}
}
if (exception != null) {
throw exception;
}
}

public static Closeable closeAsIOException(AutoCloseable ac) {
return () -> {
try {
ac.close();
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new IOException(e);
}
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* IGinX - the polystore system with high performance
* Copyright (C) Tsinghua University
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package cn.edu.tsinghua.iginx.filestore.common;

import java.util.Map;

public class Configs {

private Configs() {}

public static Object put(Map<String, Object> map, Object value, String... path) {
String joinedPath = String.join(".", path);
return map.put(joinedPath, value);
}

public static String put(Map<String, String> map, String value, String... path) {
String joinedPath = String.join(".", path);
return map.put(joinedPath, value);
}

public static String putIfAbsent(Map<String, String> map, String value, String... path) {
String joinedPath = String.join(".", path);
return map.putIfAbsent(joinedPath, value);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import cn.edu.tsinghua.iginx.engine.shared.data.read.Field;
import cn.edu.tsinghua.iginx.thrift.DataType;
import java.util.Map;
import javax.annotation.Nullable;

public class Fields {

Expand All @@ -32,4 +33,8 @@ public static Field of(Column column) {
DataType dataType = column.getDataType();
return new Field(name, dataType, tags);
}

public static Field addPrefix(Field field, @Nullable String prefix) {
return new Field(IginxPaths.join(prefix, field.getName()), field.getType(), field.getTags());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* IGinX - the polystore system with high performance
* Copyright (C) Tsinghua University
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package cn.edu.tsinghua.iginx.filestore.common;

import cn.edu.tsinghua.iginx.engine.shared.data.read.Header;
import cn.edu.tsinghua.iginx.engine.shared.data.read.Row;
import cn.edu.tsinghua.iginx.engine.shared.data.read.RowStream;

public abstract class FileStoreRowStream implements RowStream {

@Override
public abstract Header getHeader() throws FileStoreException;

@Override
public abstract void close() throws FileStoreException;

@Override
public abstract boolean hasNext() throws FileStoreException;

@Override
public abstract Row next() throws FileStoreException;
}
Loading
Loading