Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(transform): some new features and improve #542

Merged
merged 51 commits into from
Jan 3, 2025
Merged
Show file tree
Hide file tree
Changes from 46 commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
8b7a3bb
some fix & refactor
shinyano Dec 19, 2024
b999680
restore config
shinyano Dec 19, 2024
55f5736
remove comment && better javadoc
shinyano Dec 19, 2024
a10d08e
Merge branch 'main' into feat/transform-polish
shinyano Dec 20, 2024
4af057a
Merge branch 'main' into feat/transform-polish
shinyano Dec 20, 2024
a7de04b
add test for show all/idle jobs
shinyano Dec 20, 2024
b6b9edf
add test for redundant cancellation
shinyano Dec 20, 2024
41c28f1
restore
shinyano Dec 20, 2024
fcd4836
add test for alter USF script & fix bug
shinyano Dec 20, 2024
58bce3e
Merge branch 'main' into feat/transform-improve
shinyano Dec 23, 2024
09e3c57
stopOnFailure switch
shinyano Dec 23, 2024
d897554
relative path to server
shinyano Dec 23, 2024
6d4d4bc
fix tests
shinyano Dec 24, 2024
0ea9b6c
fix tests
shinyano Dec 24, 2024
5b322f3
fix tests
shinyano Dec 24, 2024
bfaeba8
update python thrift
shinyano Dec 24, 2024
9000220
restore
shinyano Dec 25, 2024
b7651dd
Merge branch 'main' into feat/transform-improve
shinyano Dec 26, 2024
8af666b
fix bug && cancel notify
shinyano Dec 25, 2024
c074463
automatically restart(wip) & some fixs
shinyano Dec 26, 2024
9f72c96
fix bug
shinyano Dec 27, 2024
9e592c3
format
shinyano Dec 27, 2024
83edb2c
format
shinyano Dec 27, 2024
14c19c9
mk dir
shinyano Dec 27, 2024
1a739e5
add ip port
shinyano Dec 27, 2024
0e22b64
test relative path
shinyano Dec 29, 2024
b5847e3
insert data
shinyano Dec 29, 2024
cfe6bc0
show log
shinyano Dec 29, 2024
a7355c7
correct script
shinyano Dec 29, 2024
eceb2aa
correct script
shinyano Dec 29, 2024
b1b4516
correct script
shinyano Dec 29, 2024
fd43060
sleep longer
shinyano Dec 29, 2024
bc58209
query longer
shinyano Dec 29, 2024
354a614
session id = 0
shinyano Dec 29, 2024
604aa31
useless test removed
shinyano Dec 29, 2024
bad4fb1
restore tests
shinyano Dec 29, 2024
f895321
assembly
shinyano Dec 30, 2024
a2f3d0f
assembly
shinyano Dec 30, 2024
f3425de
restore tests
shinyano Dec 30, 2024
3e33e90
rf 2 r
shinyano Dec 30, 2024
c5351ab
restart test
shinyano Dec 30, 2024
b8d5edd
restart test
shinyano Dec 30, 2024
0abb07f
format
shinyano Dec 30, 2024
4711830
find path error
shinyano Dec 30, 2024
58c56b7
find path error
shinyano Dec 30, 2024
d3ace37
restore
shinyano Dec 30, 2024
bf108e2
Merge branch 'main' into feat/transform-improve
shinyano Dec 31, 2024
f64a884
clear dall data
shinyano Dec 31, 2024
f771af3
Merge branch 'main' into feat/transform-improve
zhuyuqing Jan 2, 2025
59b970b
read job info in client
shinyano Jan 2, 2025
65c800c
Merge branch 'main' into feat/transform-improve
zhuyuqing Jan 3, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions .github/scripts/test/cli/test_job_register.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#!/bin/bash
#
# IGinX - the polystore system with high performance
# Copyright (C) Tsinghua University
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#


set -e

cp -f test/src/test/resources/transform/TransformMultiplePythonJobsWithExportToIginx.yaml client/target/iginx-client-$1/sbin/TransformMultiplePythonJobsWithExportToIginx.yaml

ls client/target/iginx-client-$1/sbin

COMMAND='commit transform job "'"TransformMultiplePythonJobsWithExportToIginx.yaml"'";'

cd client/target/iginx-client-$1/sbin

SCRIPT_COMMAND="bash start_cli.sh -e '{}'"

bash -c "chmod +x start_cli.sh"

result=$(bash -c "echo '$COMMAND' | xargs -0 -t -i ${SCRIPT_COMMAND}")

if [[ $result =~ 'id' ]]; then
echo success
sleep 3
else
echo 'Error: failed to commit job.'
echo $result
exit 1
fi
42 changes: 42 additions & 0 deletions .github/scripts/test/cli/test_job_register_macos.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#!/bin/sh
#
# IGinX - the polystore system with high performance
# Copyright (C) Tsinghua University
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#


set -e

cp -f test/src/test/resources/transform/TransformMultiplePythonJobsWithExportToIginx.yaml client/target/iginx-client-$1/sbin/TransformMultiplePythonJobsWithExportToIginx.yaml

ls client/target/iginx-client-$1/sbin

COMMAND='commit transform job "'"TransformMultiplePythonJobsWithExportToIginx.yaml"'";'

cd client/target/iginx-client-$1/sbin

sh -c "chmod +x start_cli.sh"

result=$(sh -c "echo '$COMMAND' | xargs -0 -t -I F sh start_cli.sh -e 'F'")

if [[ $result =~ 'id' ]]; then
echo success
sleep 3
else
echo 'Error: failed to commit job.'
echo $result
exit 1
fi
40 changes: 40 additions & 0 deletions .github/scripts/test/cli/test_job_register_windows.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
#!/bin/bash
#
# IGinX - the polystore system with high performance
# Copyright (C) Tsinghua University
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#


set -e

cp -f test/src/test/resources/transform/TransformMultiplePythonJobsWithExportToIginx.yaml client/target/iginx-client-$1/sbin/TransformMultiplePythonJobsWithExportToIginx.yaml

ls client/target/iginx-client-$1/sbin

COMMAND='commit transform job "'"TransformMultiplePythonJobsWithExportToIginx.yaml"'";'

cd client/target/iginx-client-$1/sbin

result=$(bash -c "./start_cli.bat -e '$COMMAND'")

if [[ $result =~ 'id' ]]; then
echo success
sleep 3
else
echo 'Error: failed to commit job.'
echo $result
exit 1
fi
2 changes: 2 additions & 0 deletions .github/workflows/assembly-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ jobs:
cd assembly/target/iginx-assembly-${{ env.VERSION }}-include
./stopIGinX.${{ steps.platform.outputs.suffix }}
./clearAllData.${{ steps.platform.outputs.suffix }}
ls sbin
rm -r sbin/transform_jobs

- name: Save Final Workspace Tree
run: tree assembly/target/iginx-assembly-${{ env.VERSION }}-include >final-tree.txt
Expand Down
42 changes: 41 additions & 1 deletion .github/workflows/standalone-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,47 @@ jobs:
"${GITHUB_WORKSPACE}/.github/scripts/test/cli/test_py_register_macos.sh" ${VERSION}
fi

# two steps:
# 1. test relative path
# 2. prepare for restart test
- name: Run Transform Job tests
shell: bash
run: |
mvn test -q -Dtest=TransformJobPathIT#prepare -DfailIfNoTests=false -P-format
if [ "$RUNNER_OS" == "Linux" ]; then
chmod +x "${GITHUB_WORKSPACE}/.github/scripts/test/cli/test_job_register.sh"
"${GITHUB_WORKSPACE}/.github/scripts/test/cli/test_job_register.sh" ${VERSION}
elif [ "$RUNNER_OS" == "Windows" ]; then
chmod +x "${GITHUB_WORKSPACE}/.github/scripts/test/cli/test_job_register_windows.sh"
"${GITHUB_WORKSPACE}/.github/scripts/test/cli/test_job_register_windows.sh" ${VERSION}
elif [ "$RUNNER_OS" == "macOS" ]; then
chmod +x "${GITHUB_WORKSPACE}/.github/scripts/test/cli/test_job_register_macos.sh"
"${GITHUB_WORKSPACE}/.github/scripts/test/cli/test_job_register_macos.sh" ${VERSION}
fi
mvn test -q -Dtest=TransformJobPathIT#verifyResult -DfailIfNoTests=false -P-format
mvn test -q -Dtest=TransformJobRestartIT#prepare -DfailIfNoTests=false -P-format

- name: Stop IGinX
uses: ./.github/actions/iginxRunner
with:
if-stop: true

- name: Restart IGinX
uses: ./.github/actions/iginxRunner
with:
if-test-udf: "true"

- name: Verify Transform Job
shell: bash
run: |
mvn test -q -Dtest=TransformJobRestartIT#verifyJobExists -DfailIfNoTests=false -P-format

- name: Show IGinX log
if: always()
shell: bash
run: |
cat iginx-*.log

- name: set client test context
uses: ./.github/actions/context
with:
Expand All @@ -113,7 +154,6 @@ jobs:
# TODO: extract it to a separate job to test
# large image export only tested in FileSystem
- name: Test Client Export File
if: always()
shell: bash
run: |
if [[ "${{ matrix.DB-name }}" == "FileSystem" ]]; then
Expand Down
10 changes: 10 additions & 0 deletions antlr/src/main/antlr4/cn/edu/tsinghua/iginx/sql/Sql.g4
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,8 @@ jobStatus
| CREATED
| IDLE
| RUNNING
| PARTIALLY_FAILING
| PARTIALLY_FAILED
| FAILING
| FAILED
| CLOSING
Expand Down Expand Up @@ -930,6 +932,14 @@ RUNNING
: R U N N I N G
;

PARTIALLY_FAILING
: P A R T I A L L Y UNDERLINE F A I L I N G
;

PARTIALLY_FAILED
: P A R T I A L L Y UNDERLINE F A I L E D
;

FAILING
: F A I L I N G
;
Expand Down
15 changes: 15 additions & 0 deletions client/src/main/java/cn/edu/tsinghua/iginx/client/IginxClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,8 @@ private static OperationResult handleInputStatement(String statement)
processSetTimeUnit(statement);
} else if (isRegisterPy(trimedStatement)) {
processPythonRegister(statement);
} else if (isCommitTransformJob(trimedStatement)) {
processCommitTransformJob(statement);
} else {
processSql(statement);
}
Expand All @@ -326,6 +328,10 @@ private static boolean isSetTimeUnit(String sql) {
return sql.startsWith("set time unit in");
}

private static boolean isCommitTransformJob(String sql) {
return sql.startsWith("commit") && sql.contains("transform") && sql.contains("job");
}

private static void processPythonRegister(String sql) {
try {
String parseErrorMsg;
Expand Down Expand Up @@ -357,6 +363,15 @@ private static void processSetTimeUnit(String sql) {
System.out.printf("Current time unit: %s\n", timestampPrecision);
}

private static void processCommitTransformJob(String sql) {
try {
long id = session.commitTransformJob(sql);
System.out.println("job id: " + id);
} catch (SessionException e) {
System.out.println(e.getMessage());
}
}

private static boolean isSetTimeUnit() {
return !timestampPrecision.equals("");
}
Expand Down
3 changes: 3 additions & 0 deletions conf/config.properties
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,9 @@ transformTaskThreadPoolSize=10
# Transform最大重试次数
transformMaxRetryTimes=3

# 定时任务的yaml脚本的保存位置
defaultScheduledTransformJobDir=transform_jobs

####################
### MQTT 配置
####################
Expand Down
8 changes: 7 additions & 1 deletion core/src/main/java/cn/edu/tsinghua/iginx/IginxWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ public class IginxWorker implements IService.Iface {

private final StatementExecutor executor = StatementExecutor.getInstance();

// to init scheduled jobs
private final TransformJobManager transformJobManager = TransformJobManager.getInstance();

private static final Config config = ConfigDescriptor.getInstance().getConfig();

private IginxWorker() {
Expand Down Expand Up @@ -655,6 +658,9 @@ public GetReplicaNumResp getReplicaNum(GetReplicaNumReq req) {
public ExecuteSqlResp executeSql(ExecuteSqlReq req) {
StatementExecutor executor = StatementExecutor.getInstance();
RequestContext ctx = contextBuilder.build(req);
if (req.isSetRemoteSession()) {
ctx.setRemoteSession(req.isRemoteSession());
}
executor.execute(ctx);
return ctx.getResult().getExecuteSqlResp();
}
Expand Down Expand Up @@ -847,7 +853,7 @@ public LoadUDFResp loadUDF(LoadUDFReq req) {
StatementExecutor executor = StatementExecutor.getInstance();
RequestContext ctx = contextBuilder.build(req);
ctx.setUDFModuleByteBuffer(req.udfFile);
ctx.setRemoteUDF(req.isRemote);
ctx.setRemoteSession(req.isRemote);
executor.execute(ctx);
return ctx.getResult().getLoadUDFResp();
}
Expand Down
10 changes: 10 additions & 0 deletions core/src/main/java/cn/edu/tsinghua/iginx/conf/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@ public class Config {

private int transformMaxRetryTimes = 3;

private String defaultScheduledTransformJobDir = "transform_jobs";

private boolean needInitBasicUDFFunctions = true;

private List<String> udfList = new ArrayList<>();
Expand Down Expand Up @@ -903,4 +905,12 @@ public String getRuleBasedOptimizer() {
public void setRuleBasedOptimizer(String ruleBasedOptimizer) {
this.ruleBasedOptimizer = ruleBasedOptimizer;
}

public String getDefaultScheduledTransformJobDir() {
return defaultScheduledTransformJobDir;
}

public void setDefaultScheduledTransformJobDir(String defaultScheduledTransformJobDir) {
this.defaultScheduledTransformJobDir = defaultScheduledTransformJobDir;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,8 @@ private void loadPropsFromFile() {
Integer.parseInt(properties.getProperty("transformTaskThreadPoolSize", "10")));
config.setTransformMaxRetryTimes(
Integer.parseInt(properties.getProperty("transformMaxRetryTimes", "3")));
config.setDefaultScheduledTransformJobDir(
properties.getProperty("defaultScheduledTransformJobDir", "transform_jobs"));
config.setNeedInitBasicUDFFunctions(
Boolean.parseBoolean(properties.getProperty("needInitBasicUDFFunctions", "false")));
config.setHistoricalPrefixList(properties.getProperty("historicalPrefixList", ""));
Expand Down Expand Up @@ -333,6 +335,9 @@ private void loadPropsFromEnv() {
EnvUtils.loadEnv("transformTaskThreadPoolSize", config.getTransformTaskThreadPoolSize()));
config.setTransformMaxRetryTimes(
EnvUtils.loadEnv("transformMaxRetryTimes", config.getTransformMaxRetryTimes()));
config.setDefaultScheduledTransformJobDir(
EnvUtils.loadEnv(
"defaultScheduledTransformJobDir", config.getDefaultScheduledTransformJobDir()));
config.setNeedInitBasicUDFFunctions(
EnvUtils.loadEnv("needInitBasicUDFFunctions", config.isNeedInitBasicUDFFunctions()));
config.setHistoricalPrefixList(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,17 @@

import cn.edu.tsinghua.iginx.engine.shared.function.manager.FunctionManager;
import cn.edu.tsinghua.iginx.engine.shared.function.manager.ThreadInterpreterManager;
import java.util.Map;
import java.util.concurrent.*;
import javax.validation.constraints.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pemja.core.PythonInterpreter;
import pemja.core.PythonInterpreterConfig;

/** wrapped thread pool for memory/storage tasks */
public class AbstractTaskThreadPoolExecutor extends ThreadPoolExecutor {
private static final Logger LOGGER =
LoggerFactory.getLogger(AbstractTaskThreadPoolExecutor.class);

private final Map<Thread, PythonInterpreter> threadInterpreterMap = new ConcurrentHashMap<>();

// 默认使用FunctionManager中的config
protected final PythonInterpreterConfig config;

Expand All @@ -47,9 +43,7 @@ public Thread newThread(@NotNull Runnable r) {
Thread thread = defaultFactory.newThread(r);
thread.setUncaughtExceptionHandler(
(t, e) -> {
// 线程异常退出时,手动清除threadlocal里的interpreter但暂不关闭,留待线程池完全关闭时一起回收
LOGGER.error("Uncaught exception in thread: {}", t.getName(), e);
ThreadInterpreterManager.cleanupInterpreter();
});
return thread;
}
Expand Down Expand Up @@ -94,22 +88,20 @@ public AbstractTaskThreadPoolExecutor(
@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
threadInterpreterMap.computeIfAbsent(t, k -> new PythonInterpreter(config));
ThreadInterpreterManager.setInterpreter(threadInterpreterMap.get(t));
ThreadInterpreterManager.setConfig(this.config);
}

/** 每次执行后,线程清除threadlocal,但interpreter保存在map中以备下次使用 */
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
ThreadInterpreterManager.cleanupInterpreter();
}

/** 所有线程已被关闭,线程池退出前关闭所有interpreter资源 */
@Override
protected void terminated() {
super.terminated();
threadInterpreterMap.values().forEach(PythonInterpreter::close);
threadInterpreterMap.clear();
if (ThreadInterpreterManager.isInterpreterSet()) {
ThreadInterpreterManager.getInterpreter().close();
}
}
}
Loading
Loading