Skip to content

Commit

Permalink
refactor: 升级依赖版本,替换过时的API. (#14)
Browse files Browse the repository at this point in the history
* build: 升级guava到32.1.2-jre

* build: 升级依赖

* refactor: 升级依赖版本,替换过时的API.
  • Loading branch information
zhou-hao authored Oct 24, 2023
1 parent ec215a3 commit 6769cef
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 9 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
<project.build.locales>zh_CN</project.build.locales>
<java.version>1.8</java.version>
<project.build.jdk>${java.version}</project.build.jdk>
<reactor.version>2020.0.36</reactor.version>
<reactor.version>2020.0.31</reactor.version>
</properties>

<profiles>
Expand Down
14 changes: 6 additions & 8 deletions src/main/java/org/jetlinks/reactor/ql/DefaultReactorQL.java
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ protected void prepare() {
)
)
))
.subscriberContext(context -> context.put(ReactorQLContext.class, ctx));
.contextWrite(context -> context.put(ReactorQLContext.class, ctx));
} else {
builder = ctx ->
limit.apply(ctx,
Expand All @@ -117,7 +117,7 @@ protected void prepare() {
)
)
)
).subscriberContext(context -> context.put(ReactorQLContext.class, ctx));
).contextWrite(context -> context.put(ReactorQLContext.class, ctx));
}
}

Expand Down Expand Up @@ -308,15 +308,15 @@ else if (groupByExpression instanceof BinaryExpression) {
//过滤分组结果
.filterWhen(ctx -> filter.apply(ctx, ctx.getRecord()))
//分组命名放到上下文里
.subscriberContext(Context.of(GROUP_NAME_CONTEXT_KEY, group.getT2())),
.contextWrite(Context.of(GROUP_NAME_CONTEXT_KEY, group.getT2())),
Integer.MAX_VALUE
);
}
return flux -> groupMapper
.apply(flux)
.flatMap(group -> columnMapper
.apply(group.getT1())
.subscriberContext(Context.of(GROUP_NAME_CONTEXT_KEY, group.getT2())),
.contextWrite(Context.of(GROUP_NAME_CONTEXT_KEY, group.getT2())),
Integer.MAX_VALUE
);
}
Expand Down Expand Up @@ -449,8 +449,7 @@ public void visit(AllTableColumns columns) {
.doOnNext(cursor::set)
.as(oneMapper)
.flatMap(val -> Mono
.subscriberContext()
.flatMap(ctx -> {
.deferContextual(ctx -> {
ReactorQLRecord newCtx = cursor.get();
if (newCtx == null) {
newCtx = ReactorQLRecord
Expand Down Expand Up @@ -516,8 +515,7 @@ public void visit(AllTableColumns columns) {
});
})
.flatMap(map -> Mono
.subscriberContext()
.flatMap(ctx -> {
.deferContextual(ctx -> {
ReactorQLRecord newCtx = lastRecordRef.get();
//上游没有数据则创建一个新数据
if (newCtx == null) {
Expand Down

0 comments on commit 6769cef

Please sign in to comment.