From a00cfcb7fadd24e53e1eb57b8082050b8013ba48 Mon Sep 17 00:00:00 2001 From: hailin0 Date: Mon, 7 Aug 2023 17:10:36 +0800 Subject: [PATCH] [Feature][CDC][Zeta] Support schema evolution framework(DDL) (#5125) --- pom.xml | 3 + .../apache/seatunnel/api/sink/SinkWriter.java | 9 + .../seatunnel/api/source/Collector.java | 8 + .../table/event/AlterTableAddColumnEvent.java | 53 ++++ .../event/AlterTableChangeColumnEvent.java | 55 ++++ .../table/event/AlterTableColumnEvent.java | 21 +- .../table/event/AlterTableColumnsEvent.java | 46 +++ .../event/AlterTableDropColumnEvent.java | 34 ++ .../api/table/event/AlterTableEvent.java | 29 ++ .../event/AlterTableModifyColumnEvent.java | 46 +++ .../api/table/event/AlterTableNameEvent.java | 34 ++ .../api/table/event/SchemaChangeEvent.java | 33 ++ .../seatunnel/api/table/event/TableEvent.java | 36 +++ .../event/handler/AlterTableEventHandler.java | 196 ++++++++++++ .../DataTypeChangeEventDispatcher.java | 79 +++++ .../handler/DataTypeChangeEventHandler.java | 45 +++ .../handler/SchemaChangeEventHandler.java | 27 ++ .../cdc/base/schema/SchemaChangeResolver.java | 32 ++ .../cdc/base/source/IncrementalSource.java | 10 +- .../enumerator/IncrementalSplitAssigner.java | 6 +- .../reader/IncrementalSourceReader.java | 42 ++- .../IncrementalSourceRecordEmitter.java | 27 +- .../reader/IncrementalSourceSplitReader.java | 12 +- .../IncrementalSourceStreamFetcher.java | 116 ++++++- .../base/source/split/IncrementalSplit.java | 24 ++ .../split/wartermark/WatermarkEvent.java | 34 ++ .../split/wartermark/WatermarkKind.java | 6 + .../cdc/base/utils/SourceRecordUtils.java | 16 + .../DebeziumDeserializationSchema.java | 7 + ...SeaTunnelRowDebeziumDeserializeSchema.java | 294 +++++++++++------- seatunnel-connectors-v2/connector-cdc/pom.xml | 37 +++ .../console/sink/ConsoleSinkWriter.java | 18 +- .../YamlSeaTunnelDomConfigProcessor.java | 5 + .../config/server/CheckpointConfig.java | 9 + .../config/server/ServerConfigOptions.java | 7 + .../core/checkpoint/CheckpointType.java | 42 +++ .../InternalCheckpointListener.java | 10 + .../actions/ShuffleMultipleRowStrategy.java | 10 +- .../server/checkpoint/CheckpointBarrier.java | 2 +- .../checkpoint/CheckpointCoordinator.java | 140 +++++++-- .../server/checkpoint/CheckpointManager.java | 34 ++ .../checkpoint/CompletedCheckpoint.java | 5 + .../operation/CheckpointEndOperation.java | 107 +++++++ ...rSchemaChangeAfterCheckpointOperation.java | 74 +++++ ...SchemaChangeBeforeCheckpointOperation.java | 74 +++++ .../CheckpointDataSerializerHook.java | 14 +- .../server/task/SeaTunnelSourceCollector.java | 56 ++++ .../engine/server/task/SeaTunnelTask.java | 27 ++ .../task/SinkAggregatedCommitterTask.java | 8 + .../task/SourceSplitEnumeratorTask.java | 8 + .../task/flow/ShuffleSinkFlowLifeCycle.java | 15 + .../task/flow/ShuffleSourceFlowLifeCycle.java | 7 + .../server/task/flow/SinkFlowLifeCycle.java | 17 + .../server/task/flow/SourceFlowLifeCycle.java | 125 ++++++++ 54 files changed, 2056 insertions(+), 175 deletions(-) create mode 100644 seatunnel-api/src/main/java/org/apache/seatunnel/api/table/event/AlterTableAddColumnEvent.java create mode 100644 seatunnel-api/src/main/java/org/apache/seatunnel/api/table/event/AlterTableChangeColumnEvent.java rename seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/record/SchemaBarrier.java => seatunnel-api/src/main/java/org/apache/seatunnel/api/table/event/AlterTableColumnEvent.java (69%) create mode 100644 seatunnel-api/src/main/java/org/apache/seatunnel/api/table/event/AlterTableColumnsEvent.java create mode 100644 seatunnel-api/src/main/java/org/apache/seatunnel/api/table/event/AlterTableDropColumnEvent.java create mode 100644 seatunnel-api/src/main/java/org/apache/seatunnel/api/table/event/AlterTableEvent.java create mode 100644 seatunnel-api/src/main/java/org/apache/seatunnel/api/table/event/AlterTableModifyColumnEvent.java create mode 100644 seatunnel-api/src/main/java/org/apache/seatunnel/api/table/event/AlterTableNameEvent.java create mode 100644 seatunnel-api/src/main/java/org/apache/seatunnel/api/table/event/SchemaChangeEvent.java create mode 100644 seatunnel-api/src/main/java/org/apache/seatunnel/api/table/event/TableEvent.java create mode 100644 seatunnel-api/src/main/java/org/apache/seatunnel/api/table/event/handler/AlterTableEventHandler.java create mode 100644 seatunnel-api/src/main/java/org/apache/seatunnel/api/table/event/handler/DataTypeChangeEventDispatcher.java create mode 100644 seatunnel-api/src/main/java/org/apache/seatunnel/api/table/event/handler/DataTypeChangeEventHandler.java create mode 100644 seatunnel-api/src/main/java/org/apache/seatunnel/api/table/event/handler/SchemaChangeEventHandler.java create mode 100644 seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/schema/SchemaChangeResolver.java create mode 100644 seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointEndOperation.java create mode 100644 seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/TriggerSchemaChangeAfterCheckpointOperation.java create mode 100644 seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/TriggerSchemaChangeBeforeCheckpointOperation.java diff --git a/pom.xml b/pom.xml index 3d619644952b..224a339fc6e4 100644 --- a/pom.xml +++ b/pom.xml @@ -768,6 +768,9 @@ ${spotless.version} + + src/main/java/org/apache/seatunnel/antlr4/generated/*.* + 1.7 diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java index c0fbe2c0299c..3b1e715ebee7 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java @@ -18,6 +18,7 @@ package org.apache.seatunnel.api.sink; import org.apache.seatunnel.api.common.metrics.MetricsContext; +import org.apache.seatunnel.api.table.event.SchemaChangeEvent; import java.io.IOException; import java.io.Serializable; @@ -44,6 +45,14 @@ public interface SinkWriter { */ void write(T element) throws IOException; + /** + * apply schema change to third party data receiver. + * + * @param event + * @throws IOException + */ + default void applySchemaChange(SchemaChangeEvent event) throws IOException {} + /** * prepare the commit, will be called before {@link #snapshotState(long checkpointId)}. If you * need to use 2pc, you can return the commit info in this method, and receive the commit info diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Collector.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Collector.java index 0b924bb570a9..85435880c63b 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Collector.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Collector.java @@ -17,6 +17,8 @@ package org.apache.seatunnel.api.source; +import org.apache.seatunnel.api.table.event.SchemaChangeEvent; + /** * A {@link Collector} is used to collect data from {@link SourceReader}. * @@ -26,6 +28,12 @@ public interface Collector { void collect(T record); + default void markSchemaChangeBeforeCheckpoint() {} + + default void collect(SchemaChangeEvent event) {} + + default void markSchemaChangeAfterCheckpoint() {} + /** * Returns the checkpoint lock. * diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/event/AlterTableAddColumnEvent.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/event/AlterTableAddColumnEvent.java new file mode 100644 index 000000000000..967452545265 --- /dev/null +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/event/AlterTableAddColumnEvent.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.api.table.event; + +import org.apache.seatunnel.api.table.catalog.Column; +import org.apache.seatunnel.api.table.catalog.TablePath; + +import lombok.Getter; +import lombok.ToString; + +@Getter +@ToString(callSuper = true) +public class AlterTableAddColumnEvent extends AlterTableColumnEvent { + private final Column column; + private final boolean first; + private final String afterColumn; + + public AlterTableAddColumnEvent( + TablePath tablePath, Column column, boolean first, String afterColumn) { + super(tablePath); + this.column = column; + this.first = first; + this.afterColumn = afterColumn; + } + + public static AlterTableAddColumnEvent addFirst(TablePath tablePath, Column column) { + return new AlterTableAddColumnEvent(tablePath, column, true, null); + } + + public static AlterTableAddColumnEvent add(TablePath tablePath, Column column) { + return new AlterTableAddColumnEvent(tablePath, column, false, null); + } + + public static AlterTableAddColumnEvent addAfter( + TablePath tablePath, Column column, String afterColumn) { + return new AlterTableAddColumnEvent(tablePath, column, false, afterColumn); + } +} diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/event/AlterTableChangeColumnEvent.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/event/AlterTableChangeColumnEvent.java new file mode 100644 index 000000000000..2a8ba71846c3 --- /dev/null +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/event/AlterTableChangeColumnEvent.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.api.table.event; + +import org.apache.seatunnel.api.table.catalog.Column; +import org.apache.seatunnel.api.table.catalog.TablePath; + +import lombok.Getter; +import lombok.ToString; + +@Getter +@ToString(callSuper = true) +public class AlterTableChangeColumnEvent extends AlterTableAddColumnEvent { + private final String oldColumn; + + public AlterTableChangeColumnEvent( + TablePath tablePath, + String oldColumn, + Column column, + boolean first, + String afterColumn) { + super(tablePath, column, first, afterColumn); + this.oldColumn = oldColumn; + } + + public static AlterTableChangeColumnEvent changeFirst( + TablePath tablePath, String oldColumn, Column column) { + return new AlterTableChangeColumnEvent(tablePath, oldColumn, column, true, null); + } + + public static AlterTableChangeColumnEvent change( + TablePath tablePath, String oldColumn, Column column) { + return new AlterTableChangeColumnEvent(tablePath, oldColumn, column, false, null); + } + + public static AlterTableChangeColumnEvent changeAfter( + TablePath tablePath, String oldColumn, Column column, String afterColumn) { + return new AlterTableChangeColumnEvent(tablePath, oldColumn, column, false, afterColumn); + } +} diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/record/SchemaBarrier.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/event/AlterTableColumnEvent.java similarity index 69% rename from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/record/SchemaBarrier.java rename to seatunnel-api/src/main/java/org/apache/seatunnel/api/table/event/AlterTableColumnEvent.java index 4268acd86726..a61dccc08d15 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/record/SchemaBarrier.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/event/AlterTableColumnEvent.java @@ -15,22 +15,15 @@ * limitations under the License. */ -package org.apache.seatunnel.engine.server.task.record; +package org.apache.seatunnel.api.table.event; -/** Change the schema of the task and flow. */ -public class SchemaBarrier implements Barrier { - @Override - public long getId() { - return -1; - } +import org.apache.seatunnel.api.table.catalog.TablePath; - @Override - public boolean snapshot() { - return false; - } +import lombok.ToString; - @Override - public boolean prepareClose() { - return false; +@ToString(callSuper = true) +public abstract class AlterTableColumnEvent extends AlterTableEvent { + public AlterTableColumnEvent(TablePath tablePath) { + super(tablePath); } } diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/event/AlterTableColumnsEvent.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/event/AlterTableColumnsEvent.java new file mode 100644 index 000000000000..eb81c67dd193 --- /dev/null +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/event/AlterTableColumnsEvent.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.api.table.event; + +import org.apache.seatunnel.api.table.catalog.TablePath; + +import lombok.Getter; +import lombok.ToString; + +import java.util.ArrayList; +import java.util.List; + +@Getter +@ToString(callSuper = true) +public class AlterTableColumnsEvent extends AlterTableEvent { + private final List events; + + public AlterTableColumnsEvent(TablePath tablePath) { + this(tablePath, new ArrayList<>()); + } + + public AlterTableColumnsEvent(TablePath tablePath, List events) { + super(tablePath); + this.events = events; + } + + public AlterTableColumnsEvent addEvent(AlterTableColumnEvent event) { + events.add(event); + return this; + } +} diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/event/AlterTableDropColumnEvent.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/event/AlterTableDropColumnEvent.java new file mode 100644 index 000000000000..3dbf5294594f --- /dev/null +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/event/AlterTableDropColumnEvent.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.api.table.event; + +import org.apache.seatunnel.api.table.catalog.TablePath; + +import lombok.Getter; +import lombok.ToString; + +@Getter +@ToString(callSuper = true) +public class AlterTableDropColumnEvent extends AlterTableColumnEvent { + private final String column; + + public AlterTableDropColumnEvent(TablePath tablePath, String column) { + super(tablePath); + this.column = column; + } +} diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/event/AlterTableEvent.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/event/AlterTableEvent.java new file mode 100644 index 000000000000..0bf268dc210e --- /dev/null +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/event/AlterTableEvent.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.api.table.event; + +import org.apache.seatunnel.api.table.catalog.TablePath; + +import lombok.ToString; + +@ToString(callSuper = true) +public abstract class AlterTableEvent extends TableEvent { + public AlterTableEvent(TablePath tablePath) { + super(tablePath); + } +} diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/event/AlterTableModifyColumnEvent.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/event/AlterTableModifyColumnEvent.java new file mode 100644 index 000000000000..97be83f719eb --- /dev/null +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/event/AlterTableModifyColumnEvent.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.api.table.event; + +import org.apache.seatunnel.api.table.catalog.Column; +import org.apache.seatunnel.api.table.catalog.TablePath; + +import lombok.Getter; +import lombok.ToString; + +@Getter +@ToString(callSuper = true) +public class AlterTableModifyColumnEvent extends AlterTableAddColumnEvent { + public AlterTableModifyColumnEvent( + TablePath tablePath, Column column, boolean first, String afterColumn) { + super(tablePath, column, first, afterColumn); + } + + public static AlterTableModifyColumnEvent modifyFirst(TablePath tablePath, Column column) { + return new AlterTableModifyColumnEvent(tablePath, column, true, null); + } + + public static AlterTableModifyColumnEvent modify(TablePath tablePath, Column column) { + return new AlterTableModifyColumnEvent(tablePath, column, false, null); + } + + public static AlterTableModifyColumnEvent modifyAfter( + TablePath tablePath, Column column, String afterColumn) { + return new AlterTableModifyColumnEvent(tablePath, column, false, afterColumn); + } +} diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/event/AlterTableNameEvent.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/event/AlterTableNameEvent.java new file mode 100644 index 000000000000..cc01a916031b --- /dev/null +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/event/AlterTableNameEvent.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.api.table.event; + +import org.apache.seatunnel.api.table.catalog.TablePath; + +import lombok.Getter; +import lombok.ToString; + +@Getter +@ToString(callSuper = true) +public class AlterTableNameEvent extends AlterTableColumnEvent { + private final TablePath newTablePath; + + public AlterTableNameEvent(TablePath tablePath, TablePath newTablePath) { + super(tablePath); + this.newTablePath = newTablePath; + } +} diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/event/SchemaChangeEvent.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/event/SchemaChangeEvent.java new file mode 100644 index 000000000000..3f01d8f867fd --- /dev/null +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/event/SchemaChangeEvent.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.api.table.event; + +import org.apache.seatunnel.api.table.catalog.TablePath; + +import java.io.Serializable; + +/** Represents a structural change to a table schema. */ +public interface SchemaChangeEvent extends Serializable { + + /** + * Path of the change table object + * + * @return + */ + TablePath tablePath(); +} diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/event/TableEvent.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/event/TableEvent.java new file mode 100644 index 000000000000..b81f18f88763 --- /dev/null +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/event/TableEvent.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.api.table.event; + +import org.apache.seatunnel.api.table.catalog.TablePath; + +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import lombok.ToString; + +@Getter +@ToString +@RequiredArgsConstructor +public abstract class TableEvent implements SchemaChangeEvent { + protected final TablePath tablePath; + + @Override + public TablePath tablePath() { + return tablePath; + } +} diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/event/handler/AlterTableEventHandler.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/event/handler/AlterTableEventHandler.java new file mode 100644 index 000000000000..b020e66a2a3e --- /dev/null +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/event/handler/AlterTableEventHandler.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.api.table.event.handler; + +import org.apache.seatunnel.api.table.catalog.Column; +import org.apache.seatunnel.api.table.event.AlterTableAddColumnEvent; +import org.apache.seatunnel.api.table.event.AlterTableChangeColumnEvent; +import org.apache.seatunnel.api.table.event.AlterTableColumnEvent; +import org.apache.seatunnel.api.table.event.AlterTableColumnsEvent; +import org.apache.seatunnel.api.table.event.AlterTableDropColumnEvent; +import org.apache.seatunnel.api.table.event.AlterTableEvent; +import org.apache.seatunnel.api.table.event.AlterTableModifyColumnEvent; +import org.apache.seatunnel.api.table.event.AlterTableNameEvent; +import org.apache.seatunnel.api.table.event.SchemaChangeEvent; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; + +public class AlterTableEventHandler implements DataTypeChangeEventHandler { + private SeaTunnelRowType dataType; + + @Override + public SeaTunnelRowType get() { + return dataType; + } + + @Override + public DataTypeChangeEventHandler reset(SeaTunnelRowType dataType) { + this.dataType = dataType; + return this; + } + + @Override + public SeaTunnelRowType apply(SchemaChangeEvent event) { + AlterTableEvent alterTableEvent = (AlterTableEvent) event; + return apply(dataType, alterTableEvent); + } + + private SeaTunnelRowType apply(SeaTunnelRowType dataType, AlterTableEvent alterTableEvent) { + if (alterTableEvent instanceof AlterTableNameEvent) { + return dataType; + } + if (alterTableEvent instanceof AlterTableDropColumnEvent) { + return applyDropColumn(dataType, (AlterTableDropColumnEvent) alterTableEvent); + } + if (alterTableEvent instanceof AlterTableModifyColumnEvent) { + return applyModifyColumn(dataType, (AlterTableModifyColumnEvent) alterTableEvent); + } + if (alterTableEvent instanceof AlterTableChangeColumnEvent) { + return applyChangeColumn(dataType, (AlterTableChangeColumnEvent) alterTableEvent); + } + if (alterTableEvent instanceof AlterTableAddColumnEvent) { + return applyAddColumn(dataType, (AlterTableAddColumnEvent) alterTableEvent); + } + if (alterTableEvent instanceof AlterTableColumnsEvent) { + SeaTunnelRowType newType = dataType; + for (AlterTableColumnEvent columnEvent : + ((AlterTableColumnsEvent) alterTableEvent).getEvents()) { + newType = apply(newType, columnEvent); + } + return newType; + } + + throw new UnsupportedOperationException( + "Unsupported alter table event: " + alterTableEvent); + } + + private SeaTunnelRowType applyAddColumn( + SeaTunnelRowType dataType, AlterTableAddColumnEvent addColumnEvent) { + LinkedList originFields = new LinkedList<>(Arrays.asList(dataType.getFieldNames())); + LinkedList> originFieldTypes = + new LinkedList<>(Arrays.asList(dataType.getFieldTypes())); + Column column = addColumnEvent.getColumn(); + if (originFields.contains(column.getName())) { + return applyModifyColumn( + dataType, + new AlterTableModifyColumnEvent( + addColumnEvent.tablePath(), + addColumnEvent.getColumn(), + addColumnEvent.isFirst(), + addColumnEvent.getAfterColumn())); + } + + if (addColumnEvent.isFirst()) { + originFields.addFirst(column.getName()); + originFieldTypes.addFirst(column.getDataType()); + } else if (addColumnEvent.getAfterColumn() != null) { + int index = originFields.indexOf(addColumnEvent.getAfterColumn()); + originFields.add(index + 1, column.getName()); + originFieldTypes.add(index + 1, column.getDataType()); + } else { + originFields.addLast(column.getName()); + originFieldTypes.addLast(column.getDataType()); + } + + return new SeaTunnelRowType( + originFields.toArray(new String[0]), + originFieldTypes.toArray(new SeaTunnelDataType[0])); + } + + private SeaTunnelRowType applyDropColumn( + SeaTunnelRowType dataType, AlterTableDropColumnEvent dropColumnEvent) { + List fieldNames = new ArrayList<>(); + List fieldTypes = new ArrayList<>(); + for (int i = 0; i < dataType.getTotalFields(); i++) { + if (dataType.getFieldName(i).equals(dropColumnEvent.getColumn())) { + continue; + } + fieldNames.add(dataType.getFieldName(i)); + fieldTypes.add(dataType.getFieldType(i)); + } + return new SeaTunnelRowType( + fieldNames.toArray(new String[0]), fieldTypes.toArray(new SeaTunnelDataType[0])); + } + + private SeaTunnelRowType applyModifyColumn( + SeaTunnelRowType dataType, AlterTableModifyColumnEvent modifyColumnEvent) { + List fieldNames = Arrays.asList(dataType.getFieldNames()); + if (!fieldNames.contains(modifyColumnEvent.getColumn().getName())) { + return dataType; + } + + String modifyColumnName = modifyColumnEvent.getColumn().getName(); + int modifyColumnIndex = dataType.indexOf(modifyColumnName); + return applyModifyColumn( + dataType, + modifyColumnIndex, + modifyColumnEvent.getColumn(), + modifyColumnEvent.isFirst(), + modifyColumnEvent.getAfterColumn()); + } + + private SeaTunnelRowType applyChangeColumn( + SeaTunnelRowType dataType, AlterTableChangeColumnEvent changeColumnEvent) { + String oldColumn = changeColumnEvent.getOldColumn(); + int oldColumnIndex = dataType.indexOf(oldColumn); + + return applyModifyColumn( + dataType, + oldColumnIndex, + changeColumnEvent.getColumn(), + changeColumnEvent.isFirst(), + changeColumnEvent.getAfterColumn()); + } + + private SeaTunnelRowType applyModifyColumn( + SeaTunnelRowType dataType, + int columnIndex, + Column column, + boolean first, + String afterColumn) { + LinkedList originFields = new LinkedList<>(Arrays.asList(dataType.getFieldNames())); + LinkedList> originFieldTypes = + new LinkedList<>(Arrays.asList(dataType.getFieldTypes())); + + if (first) { + originFields.remove(columnIndex); + originFieldTypes.remove(columnIndex); + + originFields.addFirst(column.getName()); + originFieldTypes.addFirst(column.getDataType()); + } else if (afterColumn != null) { + originFields.remove(columnIndex); + originFieldTypes.remove(columnIndex); + + int index = originFields.indexOf(afterColumn); + originFields.add(index + 1, column.getName()); + originFieldTypes.add(index + 1, column.getDataType()); + } else { + originFields.set(columnIndex, column.getName()); + originFieldTypes.set(columnIndex, column.getDataType()); + } + return new SeaTunnelRowType( + originFields.toArray(new String[0]), + originFieldTypes.toArray(new SeaTunnelDataType[0])); + } +} diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/event/handler/DataTypeChangeEventDispatcher.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/event/handler/DataTypeChangeEventDispatcher.java new file mode 100644 index 000000000000..ec4f69334f7a --- /dev/null +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/event/handler/DataTypeChangeEventDispatcher.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.api.table.event.handler; + +import org.apache.seatunnel.api.table.event.AlterTableAddColumnEvent; +import org.apache.seatunnel.api.table.event.AlterTableChangeColumnEvent; +import org.apache.seatunnel.api.table.event.AlterTableColumnsEvent; +import org.apache.seatunnel.api.table.event.AlterTableDropColumnEvent; +import org.apache.seatunnel.api.table.event.AlterTableEvent; +import org.apache.seatunnel.api.table.event.AlterTableModifyColumnEvent; +import org.apache.seatunnel.api.table.event.AlterTableNameEvent; +import org.apache.seatunnel.api.table.event.SchemaChangeEvent; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; + +import lombok.extern.slf4j.Slf4j; + +import java.util.HashMap; +import java.util.Map; + +@Slf4j +public class DataTypeChangeEventDispatcher implements DataTypeChangeEventHandler { + + private final Map handlers; + private SeaTunnelRowType dataType; + + public DataTypeChangeEventDispatcher() { + this.handlers = createHandlers(); + } + + @Override + public SeaTunnelRowType get() { + return dataType; + } + + @Override + public DataTypeChangeEventHandler reset(SeaTunnelRowType dataType) { + this.dataType = dataType; + return this; + } + + @Override + public SeaTunnelRowType apply(SchemaChangeEvent event) { + DataTypeChangeEventHandler handler = handlers.get(event.getClass()); + if (handler == null) { + log.warn("No DataTypeChangeEventHandler for event: {}", event.getClass()); + return dataType; + } + return handler.reset(dataType).apply(event); + } + + private static Map createHandlers() { + Map handlers = new HashMap<>(); + + AlterTableEventHandler alterTableEventHandler = new AlterTableEventHandler(); + handlers.put(AlterTableEvent.class, alterTableEventHandler); + handlers.put(AlterTableNameEvent.class, alterTableEventHandler); + handlers.put(AlterTableColumnsEvent.class, alterTableEventHandler); + handlers.put(AlterTableAddColumnEvent.class, alterTableEventHandler); + handlers.put(AlterTableModifyColumnEvent.class, alterTableEventHandler); + handlers.put(AlterTableDropColumnEvent.class, alterTableEventHandler); + handlers.put(AlterTableChangeColumnEvent.class, alterTableEventHandler); + return handlers; + } +} diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/event/handler/DataTypeChangeEventHandler.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/event/handler/DataTypeChangeEventHandler.java new file mode 100644 index 000000000000..01d8924d531b --- /dev/null +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/event/handler/DataTypeChangeEventHandler.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.api.table.event.handler; + +import org.apache.seatunnel.api.table.event.SchemaChangeEvent; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; + +public interface DataTypeChangeEventHandler extends SchemaChangeEventHandler { + + SeaTunnelRowType get(); + + DataTypeChangeEventHandler reset(SeaTunnelRowType dataType); + + default SeaTunnelRowType handle(SchemaChangeEvent event) { + if (get() == null) { + throw new IllegalStateException("DataTypeChanger not reset"); + } + + try { + return apply(event); + } finally { + reset(null); + if (get() != null) { + throw new IllegalStateException("DataTypeChanger not reset"); + } + } + } + + SeaTunnelRowType apply(SchemaChangeEvent event); +} diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/event/handler/SchemaChangeEventHandler.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/event/handler/SchemaChangeEventHandler.java new file mode 100644 index 000000000000..167dc6cc315e --- /dev/null +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/event/handler/SchemaChangeEventHandler.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.api.table.event.handler; + +import org.apache.seatunnel.api.table.event.SchemaChangeEvent; + +import java.io.Serializable; + +public interface SchemaChangeEventHandler extends Serializable { + + T handle(SchemaChangeEvent event); +} diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/schema/SchemaChangeResolver.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/schema/SchemaChangeResolver.java new file mode 100644 index 000000000000..ee3ef08f7d22 --- /dev/null +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/schema/SchemaChangeResolver.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.cdc.base.schema; + +import org.apache.seatunnel.api.table.event.SchemaChangeEvent; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; + +import org.apache.kafka.connect.source.SourceRecord; + +import java.io.Serializable; + +public interface SchemaChangeResolver extends Serializable { + + boolean support(SourceRecord record); + + SchemaChangeEvent resolve(SourceRecord record, SeaTunnelDataType dataType); +} diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java index 965d3fd27fe8..c10ab3e06138 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java @@ -36,6 +36,7 @@ import org.apache.seatunnel.connectors.cdc.base.option.SourceOptions; import org.apache.seatunnel.connectors.cdc.base.option.StartupMode; import org.apache.seatunnel.connectors.cdc.base.option.StopMode; +import org.apache.seatunnel.connectors.cdc.base.schema.SchemaChangeResolver; import org.apache.seatunnel.connectors.cdc.base.source.enumerator.HybridSplitAssigner; import org.apache.seatunnel.connectors.cdc.base.source.enumerator.IncrementalSourceEnumerator; import org.apache.seatunnel.connectors.cdc.base.source.enumerator.IncrementalSplitAssigner; @@ -167,17 +168,22 @@ public SourceReader createReader(SourceReader.Context reader BlockingQueue> elementsQueue = new LinkedBlockingQueue<>(2); + SchemaChangeResolver schemaChangeResolver = deserializationSchema.getSchemaChangeResolver(); Supplier> splitReaderSupplier = () -> new IncrementalSourceSplitReader<>( - readerContext.getIndexOfSubtask(), dataSourceDialect, sourceConfig); + readerContext.getIndexOfSubtask(), + dataSourceDialect, + sourceConfig, + schemaChangeResolver); return new IncrementalSourceReader<>( elementsQueue, splitReaderSupplier, createRecordEmitter(sourceConfig, readerContext.getMetricsContext()), new SourceReaderOptions(readonlyConfig), readerContext, - sourceConfig); + sourceConfig, + deserializationSchema); } protected RecordEmitter createRecordEmitter( diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSplitAssigner.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSplitAssigner.java index d000d505363b..fe8204f6cd2f 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSplitAssigner.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSplitAssigner.java @@ -17,6 +17,7 @@ package org.apache.seatunnel.connectors.cdc.base.source.enumerator; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig; import org.apache.seatunnel.connectors.cdc.base.source.enumerator.state.IncrementalPhaseState; import org.apache.seatunnel.connectors.cdc.base.source.event.SnapshotSplitWatermark; @@ -70,6 +71,7 @@ public class IncrementalSplitAssigner implements SplitAs private final Map assignedSplits = new HashMap<>(); private boolean startWithSnapshotMinimumOffset = true; + private SeaTunnelDataType checkpointDataType; public IncrementalSplitAssigner( SplitAssigner.Context context, @@ -152,6 +154,7 @@ public void addSplits(Collection splits) { } tableWatermarks.put(tableId, startupOffset); } + checkpointDataType = incrementalSplit.getCheckpointDataType(); }); if (!tableWatermarks.isEmpty()) { this.startWithSnapshotMinimumOffset = false; @@ -249,6 +252,7 @@ private IncrementalSplit createIncrementalSplit( capturedTables, incrementalSplitStartOffset, sourceConfig.getStopConfig().getStopOffset(offsetFactory), - completedSnapshotSplitInfos); + completedSnapshotSplitInfos, + checkpointDataType); } } diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceReader.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceReader.java index b251759ff7c2..ceb6215f41d4 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceReader.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceReader.java @@ -19,6 +19,7 @@ import org.apache.seatunnel.api.source.Collector; import org.apache.seatunnel.api.source.SourceReader; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig; import org.apache.seatunnel.connectors.cdc.base.source.event.CompletedSnapshotSplitsReportEvent; import org.apache.seatunnel.connectors.cdc.base.source.event.SnapshotSplitWatermark; @@ -29,6 +30,7 @@ import org.apache.seatunnel.connectors.cdc.base.source.split.state.IncrementalSplitState; import org.apache.seatunnel.connectors.cdc.base.source.split.state.SnapshotSplitState; import org.apache.seatunnel.connectors.cdc.base.source.split.state.SourceSplitStateBase; +import org.apache.seatunnel.connectors.cdc.debezium.DebeziumDeserializationSchema; import org.apache.seatunnel.connectors.seatunnel.common.source.reader.RecordEmitter; import org.apache.seatunnel.connectors.seatunnel.common.source.reader.RecordsWithSplitIds; import org.apache.seatunnel.connectors.seatunnel.common.source.reader.SingleThreadMultiplexSourceReaderBase; @@ -38,6 +40,7 @@ import lombok.extern.slf4j.Slf4j; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -62,6 +65,7 @@ public class IncrementalSourceReader private final int subtaskId; private final C sourceConfig; + private final DebeziumDeserializationSchema debeziumDeserializationSchema; public IncrementalSourceReader( BlockingQueue> elementsQueue, @@ -69,7 +73,8 @@ public IncrementalSourceReader( RecordEmitter recordEmitter, SourceReaderOptions options, SourceReader.Context context, - C sourceConfig) { + C sourceConfig, + DebeziumDeserializationSchema debeziumDeserializationSchema) { super( elementsQueue, new SingleThreadFetcherManager<>(elementsQueue, splitReaderSupplier::get), @@ -79,6 +84,7 @@ public IncrementalSourceReader( this.sourceConfig = sourceConfig; this.finishedUnackedSplits = new HashMap<>(); this.subtaskId = context.getIndexOfSubtask(); + this.debeziumDeserializationSchema = debeziumDeserializationSchema; } @Override @@ -163,6 +169,15 @@ protected SourceSplitStateBase initializedState(SourceSplitBase split) { if (split.isSnapshotSplit()) { return new SnapshotSplitState(split.asSnapshotSplit()); } else { + IncrementalSplit incrementalSplit = split.asIncrementalSplit(); + if (incrementalSplit.getCheckpointDataType() != null) { + log.info( + "The incremental split[{}] has checkpoint datatype {} for restore.", + incrementalSplit.splitId(), + incrementalSplit.getCheckpointDataType()); + debeziumDeserializationSchema.restoreCheckpointProducedType( + incrementalSplit.getCheckpointDataType()); + } return new IncrementalSplitState(split.asIncrementalSplit()); } } @@ -180,6 +195,10 @@ public List snapshotState(long checkpointId) { // add finished snapshot splits that didn't receive ack yet unfinishedSplits.addAll(finishedUnackedSplits.values()); + if (isIncrementalSplitPhase(unfinishedSplits)) { + return snapshotCheckpointDataType(unfinishedSplits); + } + return unfinishedSplits; } @@ -187,4 +206,25 @@ public List snapshotState(long checkpointId) { protected SourceSplitBase toSplitType(String splitId, SourceSplitStateBase splitState) { return splitState.toSourceSplit(); } + + private boolean isIncrementalSplitPhase(List stateSplits) { + return stateSplits.size() == 1 && stateSplits.get(0).isIncrementalSplit(); + } + + private List snapshotCheckpointDataType(List stateSplits) { + if (!isIncrementalSplitPhase(stateSplits)) { + throw new IllegalStateException( + "The splits should be incremental split when snapshot checkpoint datatype"); + } + IncrementalSplit incrementalSplit = stateSplits.get(0).asIncrementalSplit(); + // Snapshot current datatype to checkpoint + SeaTunnelDataType checkpointDataType = debeziumDeserializationSchema.getProducedType(); + IncrementalSplit newIncrementalSplit = + new IncrementalSplit(incrementalSplit, checkpointDataType); + log.debug( + "Snapshot checkpoint datatype {} into split[{}] state.", + checkpointDataType, + incrementalSplit.splitId()); + return Arrays.asList(newIncrementalSplit); + } } diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceRecordEmitter.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceRecordEmitter.java index 2f8409b99a3a..eacb427acbcb 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceRecordEmitter.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceRecordEmitter.java @@ -20,6 +20,7 @@ import org.apache.seatunnel.api.common.metrics.Counter; import org.apache.seatunnel.api.common.metrics.MetricsContext; import org.apache.seatunnel.api.source.Collector; +import org.apache.seatunnel.api.table.event.SchemaChangeEvent; import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset; import org.apache.seatunnel.connectors.cdc.base.source.offset.OffsetFactory; import org.apache.seatunnel.connectors.cdc.base.source.split.SourceRecords; @@ -37,6 +38,8 @@ import static org.apache.seatunnel.connectors.cdc.base.source.split.wartermark.WatermarkEvent.isHighWatermarkEvent; import static org.apache.seatunnel.connectors.cdc.base.source.split.wartermark.WatermarkEvent.isLowWatermarkEvent; +import static org.apache.seatunnel.connectors.cdc.base.source.split.wartermark.WatermarkEvent.isSchemaChangeAfterWatermarkEvent; +import static org.apache.seatunnel.connectors.cdc.base.source.split.wartermark.WatermarkEvent.isSchemaChangeBeforeWatermarkEvent; import static org.apache.seatunnel.connectors.cdc.base.source.split.wartermark.WatermarkEvent.isWatermarkEvent; import static org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils.getFetchTimestamp; import static org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils.getMessageTimestamp; @@ -110,9 +113,12 @@ protected void processElement( Offset watermark = getWatermark(element); if (isLowWatermarkEvent(element) && splitState.isSnapshotSplitState()) { splitState.asSnapshotSplitState().setLowWatermark(watermark); - } - if (isHighWatermarkEvent(element) && splitState.isSnapshotSplitState()) { + } else if (isHighWatermarkEvent(element) && splitState.isSnapshotSplitState()) { splitState.asSnapshotSplitState().setHighWatermark(watermark); + } else if ((isSchemaChangeBeforeWatermarkEvent(element) + || isSchemaChangeAfterWatermarkEvent(element)) + && splitState.isIncrementalSplitState()) { + emitElement(element, output); } } else if (isSchemaChangeEvent(element) && splitState.isIncrementalSplitState()) { emitElement(element, output); @@ -157,9 +163,24 @@ public void collect(T record) { output.collect(record); } + @Override + public void collect(SchemaChangeEvent event) { + output.collect(event); + } + + @Override + public void markSchemaChangeBeforeCheckpoint() { + output.markSchemaChangeBeforeCheckpoint(); + } + + @Override + public void markSchemaChangeAfterCheckpoint() { + output.markSchemaChangeAfterCheckpoint(); + } + @Override public Object getCheckpointLock() { - return null; + return output.getCheckpointLock(); } } } diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceSplitReader.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceSplitReader.java index 932b5f0e4e96..53f97362734f 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceSplitReader.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceSplitReader.java @@ -20,6 +20,7 @@ import org.apache.seatunnel.common.utils.SeaTunnelException; import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig; import org.apache.seatunnel.connectors.cdc.base.dialect.DataSourceDialect; +import org.apache.seatunnel.connectors.cdc.base.schema.SchemaChangeResolver; import org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask; import org.apache.seatunnel.connectors.cdc.base.source.reader.external.Fetcher; import org.apache.seatunnel.connectors.cdc.base.source.reader.external.IncrementalSourceScanFetcher; @@ -50,13 +51,18 @@ public class IncrementalSourceSplitReader private String currentSplitId; private final DataSourceDialect dataSourceDialect; private final C sourceConfig; + private final SchemaChangeResolver schemaChangeResolver; public IncrementalSourceSplitReader( - int subtaskId, DataSourceDialect dataSourceDialect, C sourceConfig) { + int subtaskId, + DataSourceDialect dataSourceDialect, + C sourceConfig, + SchemaChangeResolver schemaChangeResolver) { this.subtaskId = subtaskId; this.splits = new ArrayDeque<>(); this.dataSourceDialect = dataSourceDialect; this.sourceConfig = sourceConfig; + this.schemaChangeResolver = schemaChangeResolver; } @Override @@ -133,7 +139,9 @@ protected void checkSplitOrStartNext() throws IOException { } final FetchTask.Context taskContext = dataSourceDialect.createFetchTaskContext(nextSplit, sourceConfig); - currentFetcher = new IncrementalSourceStreamFetcher(taskContext, subtaskId); + currentFetcher = + new IncrementalSourceStreamFetcher( + taskContext, subtaskId, schemaChangeResolver); log.info("Stream fetcher is created."); } currentFetcher.submitTask(dataSourceDialect.createFetchTask(nextSplit)); diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java index 2b8e9f7725fd..31fdaaf2e50a 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java @@ -18,11 +18,14 @@ package org.apache.seatunnel.connectors.cdc.base.source.reader.external; import org.apache.seatunnel.common.utils.SeaTunnelException; +import org.apache.seatunnel.connectors.cdc.base.schema.SchemaChangeResolver; import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset; import org.apache.seatunnel.connectors.cdc.base.source.split.CompletedSnapshotSplitInfo; import org.apache.seatunnel.connectors.cdc.base.source.split.IncrementalSplit; import org.apache.seatunnel.connectors.cdc.base.source.split.SourceRecords; import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase; +import org.apache.seatunnel.connectors.cdc.base.source.split.wartermark.WatermarkEvent; +import org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils; import org.apache.kafka.connect.source.SourceRecord; @@ -33,6 +36,7 @@ import lombok.extern.slf4j.Slf4j; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -53,6 +57,7 @@ @Slf4j public class IncrementalSourceStreamFetcher implements Fetcher { private final FetchTask.Context taskContext; + private final SchemaChangeResolver schemaChangeResolver; private final ExecutorService executorService; // has entered pure binlog mode private final Set pureBinlogPhaseTables; @@ -72,8 +77,12 @@ public class IncrementalSourceStreamFetcher implements Fetcher pollSplitRecords() throws InterruptedException, SeaTunnelException { checkReadException(); - final List sourceRecords = new ArrayList<>(); + + Iterator sourceRecordsIterator = Collections.emptyIterator(); if (streamFetchTask.isRunning()) { List batch = queue.poll(); - for (DataChangeEvent event : batch) { + if (!batch.isEmpty()) { + if (schemaChangeResolver != null) { + sourceRecordsIterator = splitSchemaChangeStream(batch); + } else { + sourceRecordsIterator = splitNormalStream(batch); + } + } + } + return sourceRecordsIterator; + } + + private Iterator splitNormalStream(List batchEvents) { + List sourceRecords = new ArrayList<>(); + if (streamFetchTask.isRunning()) { + for (DataChangeEvent event : batchEvents) { if (shouldEmit(event.getRecord())) { sourceRecords.add(event.getRecord()); } @@ -125,6 +149,92 @@ public Iterator pollSplitRecords() return sourceRecordsSet.iterator(); } + /** + * Split schema change stream. + * + *

For example 1: + * + *

Before event batch: [a, b, c, SchemaChangeEvent-1, SchemaChangeEvent-2, d, e] + * + *

After event batch: [a, b, c, checkpoint-before] [SchemaChangeEvent-1, SchemaChangeEvent-2, + * checkpoint-after] [d, e] + * + *

For example 2: + * + *

Before event batch: [SchemaChangeEvent-1, SchemaChangeEvent-2, a, b, c, d, e] + * + *

After event batch: [checkpoint-before] [SchemaChangeEvent-1, SchemaChangeEvent-2, + * checkpoint-after] [a, b, c, d, e] + */ + private Iterator splitSchemaChangeStream(List batchEvents) { + List sourceRecordsSet = new ArrayList<>(); + + List sourceRecordList = new ArrayList<>(); + SourceRecord previousRecord = null; + for (int i = 0; i < batchEvents.size(); i++) { + DataChangeEvent event = batchEvents.get(i); + SourceRecord currentRecord = event.getRecord(); + if (!shouldEmit(currentRecord)) { + continue; + } + if (!SourceRecordUtils.isDataChangeRecord(currentRecord) + && !SourceRecordUtils.isSchemaChangeEvent(currentRecord)) { + sourceRecordList.add(currentRecord); + continue; + } + + if (SourceRecordUtils.isSchemaChangeEvent(currentRecord)) { + if (!schemaChangeResolver.support(currentRecord)) { + continue; + } + + if (previousRecord == null) { + // add schema-change-before to first + sourceRecordList.add( + WatermarkEvent.createSchemaChangeBeforeWatermark(currentRecord)); + sourceRecordsSet.add(new SourceRecords(sourceRecordList)); + sourceRecordList = new ArrayList<>(); + sourceRecordList.add(currentRecord); + } else if (SourceRecordUtils.isSchemaChangeEvent(previousRecord)) { + sourceRecordList.add(currentRecord); + } else { + sourceRecordList.add( + WatermarkEvent.createSchemaChangeBeforeWatermark(currentRecord)); + sourceRecordsSet.add(new SourceRecords(sourceRecordList)); + sourceRecordList = new ArrayList<>(); + sourceRecordList.add(currentRecord); + } + } else if (SourceRecordUtils.isDataChangeRecord(currentRecord)) { + if (previousRecord == null + || SourceRecordUtils.isDataChangeRecord(previousRecord)) { + sourceRecordList.add(currentRecord); + } else { + sourceRecordList.add( + WatermarkEvent.createSchemaChangeAfterWatermark(currentRecord)); + sourceRecordsSet.add(new SourceRecords(sourceRecordList)); + sourceRecordList = new ArrayList<>(); + sourceRecordList.add(currentRecord); + } + } + previousRecord = currentRecord; + if (i == batchEvents.size() - 1) { + if (SourceRecordUtils.isSchemaChangeEvent(currentRecord)) { + sourceRecordList.add( + WatermarkEvent.createSchemaChangeAfterWatermark(currentRecord)); + } + sourceRecordsSet.add(new SourceRecords(sourceRecordList)); + } + } + + if (sourceRecordsSet.size() > 1) { + log.debug( + "Split events stream into {} batches and mark schema checkpoint before/after", + sourceRecordsSet.size()); + } + + return sourceRecordsSet.iterator(); + } + private void checkReadException() { if (readException != null) { throw new SeaTunnelException( diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/split/IncrementalSplit.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/split/IncrementalSplit.java index a5e6a9cbacdb..640e173682ac 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/split/IncrementalSplit.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/split/IncrementalSplit.java @@ -17,6 +17,7 @@ package org.apache.seatunnel.connectors.cdc.base.source.split; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset; import io.debezium.relational.TableId; @@ -43,16 +44,39 @@ public class IncrementalSplit extends SourceSplitBase { */ private final List completedSnapshotSplitInfos; + private final SeaTunnelDataType checkpointDataType; + public IncrementalSplit( String splitId, List capturedTables, Offset startupOffset, Offset stopOffset, List completedSnapshotSplitInfos) { + this(splitId, capturedTables, startupOffset, stopOffset, completedSnapshotSplitInfos, null); + } + + public IncrementalSplit(IncrementalSplit split, SeaTunnelDataType checkpointDataType) { + this( + split.splitId(), + split.getTableIds(), + split.getStartupOffset(), + split.getStopOffset(), + split.getCompletedSnapshotSplitInfos(), + checkpointDataType); + } + + public IncrementalSplit( + String splitId, + List capturedTables, + Offset startupOffset, + Offset stopOffset, + List completedSnapshotSplitInfos, + SeaTunnelDataType checkpointDataType) { super(splitId); this.tableIds = capturedTables; this.startupOffset = startupOffset; this.stopOffset = stopOffset; this.completedSnapshotSplitInfos = completedSnapshotSplitInfos; + this.checkpointDataType = checkpointDataType; } } diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/split/wartermark/WatermarkEvent.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/split/wartermark/WatermarkEvent.java index 1a120cb3b5c3..4e2a81f3f5c9 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/split/wartermark/WatermarkEvent.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/split/wartermark/WatermarkEvent.java @@ -72,6 +72,28 @@ public static SourceRecord create( signalRecordValue(splitId, watermarkKind)); } + public static SourceRecord createSchemaChangeBeforeWatermark(SourceRecord record) { + return new SourceRecord( + record.sourcePartition(), + record.sourceOffset(), + record.topic(), + SIGNAL_EVENT_KEY_SCHEMA, + signalRecordKey("schema-change-before"), + SIGNAL_EVENT_VALUE_SCHEMA, + signalRecordValue("schema-change-before", WatermarkKind.SCHEMA_CHANGE_BEFORE)); + } + + public static SourceRecord createSchemaChangeAfterWatermark(SourceRecord record) { + return new SourceRecord( + record.sourcePartition(), + record.sourceOffset(), + record.topic(), + SIGNAL_EVENT_KEY_SCHEMA, + signalRecordKey("schema-change-after"), + SIGNAL_EVENT_VALUE_SCHEMA, + signalRecordValue("schema-change-after", WatermarkKind.SCHEMA_CHANGE_AFTER)); + } + public static boolean isWatermarkEvent(SourceRecord record) { Optional watermarkKind = getWatermarkKind(record); return watermarkKind.isPresent(); @@ -92,6 +114,18 @@ public static boolean isEndWatermarkEvent(SourceRecord record) { return watermarkKind.isPresent() && watermarkKind.get() == WatermarkKind.END; } + public static boolean isSchemaChangeBeforeWatermarkEvent(SourceRecord record) { + Optional watermarkKind = getWatermarkKind(record); + return watermarkKind.isPresent() + && watermarkKind.get() == WatermarkKind.SCHEMA_CHANGE_BEFORE; + } + + public static boolean isSchemaChangeAfterWatermarkEvent(SourceRecord record) { + Optional watermarkKind = getWatermarkKind(record); + return watermarkKind.isPresent() + && watermarkKind.get() == WatermarkKind.SCHEMA_CHANGE_AFTER; + } + private static Optional getWatermarkKind(SourceRecord record) { if (record.valueSchema() != null && SIGNAL_EVENT_VALUE_SCHEMA_NAME.equals(record.valueSchema().name())) { diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/split/wartermark/WatermarkKind.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/split/wartermark/WatermarkKind.java index 334bd2c0e41f..cc6a2ee14acf 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/split/wartermark/WatermarkKind.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/split/wartermark/WatermarkKind.java @@ -21,6 +21,8 @@ public enum WatermarkKind { LOW, HIGH, + SCHEMA_CHANGE_BEFORE, + SCHEMA_CHANGE_AFTER, END; public WatermarkKind fromString(String kindString) { @@ -28,6 +30,10 @@ public WatermarkKind fromString(String kindString) { return LOW; } else if (HIGH.name().equalsIgnoreCase(kindString)) { return HIGH; + } else if (SCHEMA_CHANGE_BEFORE.name().equalsIgnoreCase(kindString)) { + return SCHEMA_CHANGE_BEFORE; + } else if (SCHEMA_CHANGE_AFTER.name().equalsIgnoreCase(kindString)) { + return SCHEMA_CHANGE_AFTER; } else { return END; } diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/utils/SourceRecordUtils.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/utils/SourceRecordUtils.java index e172b389b4a9..872669eacd33 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/utils/SourceRecordUtils.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/utils/SourceRecordUtils.java @@ -17,12 +17,14 @@ package org.apache.seatunnel.connectors.cdc.base.utils; +import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; +import io.debezium.connector.AbstractSourceInfo; import io.debezium.data.Envelope; import io.debezium.document.DocumentReader; import io.debezium.relational.TableId; @@ -193,4 +195,18 @@ private static boolean isNumericObject(Object obj) { private static BigDecimal toBigDecimal(Object numericObj) { return new BigDecimal(numericObj.toString()); } + + public static TablePath getTablePath(SourceRecord record) { + Struct messageStruct = (Struct) record.value(); + Struct sourceStruct = messageStruct.getStruct(Envelope.FieldName.SOURCE); + String databaseName = sourceStruct.getString(AbstractSourceInfo.DATABASE_NAME_KEY); + String tableName = sourceStruct.getString(AbstractSourceInfo.TABLE_NAME_KEY); + String schemaName = null; + try { + schemaName = sourceStruct.getString(AbstractSourceInfo.SCHEMA_NAME_KEY); + } catch (Throwable e) { + // ignore + } + return TablePath.of(databaseName, schemaName, tableName); + } } diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/DebeziumDeserializationSchema.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/DebeziumDeserializationSchema.java index 8cf300376550..8e8cb3c09c26 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/DebeziumDeserializationSchema.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/DebeziumDeserializationSchema.java @@ -19,6 +19,7 @@ import org.apache.seatunnel.api.source.Collector; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.connectors.cdc.base.schema.SchemaChangeResolver; import org.apache.kafka.connect.source.SourceRecord; @@ -36,4 +37,10 @@ public interface DebeziumDeserializationSchema extends Serializable { void deserialize(SourceRecord record, Collector out) throws Exception; SeaTunnelDataType getProducedType(); + + default void restoreCheckpointProducedType(SeaTunnelDataType checkpointDataType) {} + + default SchemaChangeResolver getSchemaChangeResolver() { + return null; + } } diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java index 2997fc78d625..ea0a3fc13e74 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java @@ -19,11 +19,16 @@ import org.apache.seatunnel.api.source.Collector; import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.api.table.event.SchemaChangeEvent; +import org.apache.seatunnel.api.table.event.handler.DataTypeChangeEventDispatcher; +import org.apache.seatunnel.api.table.event.handler.DataTypeChangeEventHandler; import org.apache.seatunnel.api.table.type.MultipleRowType; import org.apache.seatunnel.api.table.type.RowKind; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.connectors.cdc.base.schema.SchemaChangeResolver; +import org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils; import org.apache.seatunnel.connectors.cdc.debezium.DebeziumDeserializationConverterFactory; import org.apache.seatunnel.connectors.cdc.debezium.DebeziumDeserializationSchema; import org.apache.seatunnel.connectors.cdc.debezium.MetadataConverter; @@ -32,135 +37,160 @@ import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; -import io.debezium.connector.AbstractSourceInfo; import io.debezium.data.Envelope; +import lombok.AccessLevel; +import lombok.NoArgsConstructor; +import lombok.Setter; +import lombok.experimental.Accessors; import lombok.extern.slf4j.Slf4j; -import java.io.Serializable; import java.time.ZoneId; -import java.util.Collections; import java.util.HashMap; import java.util.Map; import static com.google.common.base.Preconditions.checkNotNull; +import static org.apache.seatunnel.connectors.cdc.base.source.split.wartermark.WatermarkEvent.isSchemaChangeAfterWatermarkEvent; +import static org.apache.seatunnel.connectors.cdc.base.source.split.wartermark.WatermarkEvent.isSchemaChangeBeforeWatermarkEvent; import static org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils.isDataChangeRecord; +import static org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils.isSchemaChangeEvent; /** Deserialization schema from Debezium object to {@link SeaTunnelRow}. */ @Slf4j public final class SeaTunnelRowDebeziumDeserializeSchema implements DebeziumDeserializationSchema { private static final long serialVersionUID = 1L; + private static final String DEFAULT_TABLE_NAME_KEY = null; - /** TypeInformation of the produced {@link SeaTunnelRow}. * */ - private final SeaTunnelDataType resultTypeInfo; - - /** - * Runtime converter that converts Kafka {@link SourceRecord}s into {@link SeaTunnelRow} - * consisted of - */ - private final SeaTunnelRowDebeziumDeserializationConverters singleTableRowConverter; - - private final Map - multipleTableRowConverters; - - /** Validator to validate the row value. */ - private final ValueValidator validator; - - /** Returns a builder to build {@link SeaTunnelRowDebeziumDeserializeSchema}. */ - public static Builder builder() { - return new Builder(); - } + private final MetadataConverter[] metadataConverters; + private final ZoneId serverTimeZone; + private final DebeziumDeserializationConverterFactory userDefinedConverterFactory; + private final SchemaChangeResolver schemaChangeResolver; + private final DataTypeChangeEventHandler dataTypeChangeEventHandler; + private SeaTunnelDataType resultTypeInfo; + private Map tableRowConverters; SeaTunnelRowDebeziumDeserializeSchema( SeaTunnelDataType physicalDataType, MetadataConverter[] metadataConverters, SeaTunnelDataType resultType, - ValueValidator validator, ZoneId serverTimeZone, - DebeziumDeserializationConverterFactory userDefinedConverterFactory) { - - SeaTunnelRowDebeziumDeserializationConverters singleTableRowConverter = null; - Map multipleTableRowConverters = - Collections.emptyMap(); - if (physicalDataType instanceof MultipleRowType) { - multipleTableRowConverters = new HashMap<>(); - for (Map.Entry item : (MultipleRowType) physicalDataType) { - SeaTunnelRowDebeziumDeserializationConverters itemRowConverter = - new SeaTunnelRowDebeziumDeserializationConverters( - item.getValue(), - metadataConverters, - serverTimeZone, - userDefinedConverterFactory); - multipleTableRowConverters.put(item.getKey(), itemRowConverter); - } - } else { - singleTableRowConverter = - new SeaTunnelRowDebeziumDeserializationConverters( - (SeaTunnelRowType) physicalDataType, - metadataConverters, - serverTimeZone, - userDefinedConverterFactory); - } - this.singleTableRowConverter = singleTableRowConverter; - this.multipleTableRowConverters = multipleTableRowConverters; + DebeziumDeserializationConverterFactory userDefinedConverterFactory, + SchemaChangeResolver schemaChangeResolver) { + this.metadataConverters = metadataConverters; + this.serverTimeZone = serverTimeZone; + this.userDefinedConverterFactory = userDefinedConverterFactory; this.resultTypeInfo = checkNotNull(resultType); - this.validator = checkNotNull(validator); + this.schemaChangeResolver = schemaChangeResolver; + this.dataTypeChangeEventHandler = new DataTypeChangeEventDispatcher(); + this.tableRowConverters = + createTableRowConverters( + resultType, + metadataConverters, + serverTimeZone, + userDefinedConverterFactory); } @Override public void deserialize(SourceRecord record, Collector collector) throws Exception { - if (!isDataChangeRecord(record)) { - log.debug("Unsupported record {}, just skip.", record); + if (isSchemaChangeBeforeWatermarkEvent(record)) { + collector.markSchemaChangeBeforeCheckpoint(); + return; + } + if (isSchemaChangeAfterWatermarkEvent(record)) { + collector.markSchemaChangeAfterCheckpoint(); + return; + } + if (isSchemaChangeEvent(record)) { + deserializeSchemaChangeRecord(record, collector); + return; + } + + if (isDataChangeRecord(record)) { + deserializeDataChangeRecord(record, collector); + return; + } + + log.debug("Unsupported record {}, just skip.", record); + } + + private void deserializeSchemaChangeRecord( + SourceRecord record, Collector collector) { + SchemaChangeEvent schemaChangeEvent = schemaChangeResolver.resolve(record, resultTypeInfo); + if (schemaChangeEvent == null) { + log.info("Unsupported resolve schemaChangeEvent {}, just skip.", record); return; } + if (resultTypeInfo instanceof MultipleRowType) { + Map newRowTypeMap = new HashMap<>(); + for (Map.Entry entry : (MultipleRowType) resultTypeInfo) { + if (!entry.getKey().equals(schemaChangeEvent.tablePath().toString())) { + newRowTypeMap.put(entry.getKey(), entry.getValue()); + continue; + } + + log.debug("Table[{}] datatype change before: {}", entry.getKey(), entry.getValue()); + SeaTunnelRowType newRowType = + dataTypeChangeEventHandler.reset(entry.getValue()).apply(schemaChangeEvent); + newRowTypeMap.put(entry.getKey(), newRowType); + log.debug("Table[{}] datatype change after: {}", entry.getKey(), newRowType); + } + resultTypeInfo = new MultipleRowType(newRowTypeMap); + } else { + log.debug("Table datatype change before: {}", resultTypeInfo); + resultTypeInfo = + dataTypeChangeEventHandler + .reset((SeaTunnelRowType) resultTypeInfo) + .apply(schemaChangeEvent); + log.debug("table datatype change after: {}", resultTypeInfo); + } + + tableRowConverters = + createTableRowConverters( + resultTypeInfo, + metadataConverters, + serverTimeZone, + userDefinedConverterFactory); + + collector.collect(schemaChangeEvent); + } + + private void deserializeDataChangeRecord(SourceRecord record, Collector collector) + throws Exception { Envelope.Operation operation = Envelope.operationFor(record); Struct messageStruct = (Struct) record.value(); Schema valueSchema = record.valueSchema(); - - Struct sourceStruct = messageStruct.getStruct(Envelope.FieldName.SOURCE); - String databaseName = sourceStruct.getString(AbstractSourceInfo.DATABASE_NAME_KEY); - String tableName = sourceStruct.getString(AbstractSourceInfo.TABLE_NAME_KEY); - String schemaName = null; - try { - schemaName = sourceStruct.getString(AbstractSourceInfo.SCHEMA_NAME_KEY); - } catch (Throwable e) { - // ignore - } - String tableId = TablePath.of(databaseName, schemaName, tableName).toString(); + TablePath tablePath = SourceRecordUtils.getTablePath(record); + String tableId = tablePath.toString(); SeaTunnelRowDebeziumDeserializationConverters converters; - if (!multipleTableRowConverters.isEmpty()) { - converters = multipleTableRowConverters.get(tableId); + if (resultTypeInfo instanceof MultipleRowType) { + converters = tableRowConverters.get(tableId); if (converters == null) { log.debug("Ignore newly added table {}", tableId); return; } } else { - converters = singleTableRowConverter; + converters = tableRowConverters.get(DEFAULT_TABLE_NAME_KEY); } if (operation == Envelope.Operation.CREATE || operation == Envelope.Operation.READ) { SeaTunnelRow insert = extractAfterRow(converters, record, messageStruct, valueSchema); insert.setRowKind(RowKind.INSERT); insert.setTableId(tableId); - validator.validate(insert, RowKind.INSERT); collector.collect(insert); } else if (operation == Envelope.Operation.DELETE) { SeaTunnelRow delete = extractBeforeRow(converters, record, messageStruct, valueSchema); - validator.validate(delete, RowKind.DELETE); delete.setRowKind(RowKind.DELETE); delete.setTableId(tableId); collector.collect(delete); } else { SeaTunnelRow before = extractBeforeRow(converters, record, messageStruct, valueSchema); - validator.validate(before, RowKind.UPDATE_BEFORE); before.setRowKind(RowKind.UPDATE_BEFORE); before.setTableId(tableId); collector.collect(before); SeaTunnelRow after = extractAfterRow(converters, record, messageStruct, valueSchema); - validator.validate(after, RowKind.UPDATE_AFTER); after.setRowKind(RowKind.UPDATE_AFTER); after.setTableId(tableId); collector.collect(after); @@ -196,64 +226,106 @@ public SeaTunnelDataType getProducedType() { return resultTypeInfo; } - // ------------------------------------------------------------------------------------- - // Builder - // ------------------------------------------------------------------------------------- + @Override + public SchemaChangeResolver getSchemaChangeResolver() { + return schemaChangeResolver; + } + + @Override + public void restoreCheckpointProducedType(SeaTunnelDataType checkpointDataType) { + if (!checkpointDataType.getSqlType().equals(resultTypeInfo.getSqlType())) { + throw new IllegalStateException( + String.format( + "The produced type %s of the SeaTunnel deserialization schema " + + "doesn't match the type %s of the restored snapshot.", + resultTypeInfo.getSqlType(), checkpointDataType.getSqlType())); + } + if (checkpointDataType instanceof MultipleRowType) { + MultipleRowType latestDataType = (MultipleRowType) resultTypeInfo; + Map newRowTypeMap = new HashMap<>(); + for (Map.Entry entry : latestDataType) { + newRowTypeMap.put(entry.getKey(), entry.getValue()); + } + for (Map.Entry entry : (MultipleRowType) checkpointDataType) { + SeaTunnelRowType oldDataType = latestDataType.getRowType(entry.getKey()); + if (oldDataType == null) { + log.info("Ignore restore table[{}] datatype has been deleted.", entry.getKey()); + continue; + } + + log.info("Table[{}] datatype restore before: {}", entry.getKey(), oldDataType); + newRowTypeMap.put(entry.getKey(), entry.getValue()); + log.info("Table[{}] datatype restore after: {}", entry.getKey(), entry.getValue()); + } + resultTypeInfo = new MultipleRowType(newRowTypeMap); + } else { + log.info("Table datatype restore before: {}", resultTypeInfo); + resultTypeInfo = checkpointDataType; + log.info("Table datatype restore after: {}", checkpointDataType); + } + tableRowConverters = + createTableRowConverters( + resultTypeInfo, + metadataConverters, + serverTimeZone, + userDefinedConverterFactory); + } + + private static Map + createTableRowConverters( + SeaTunnelDataType inputDataType, + MetadataConverter[] metadataConverters, + ZoneId serverTimeZone, + DebeziumDeserializationConverterFactory userDefinedConverterFactory) { + Map tableRowConverters = + new HashMap<>(); + if (inputDataType instanceof MultipleRowType) { + for (Map.Entry item : (MultipleRowType) inputDataType) { + SeaTunnelRowDebeziumDeserializationConverters itemRowConverter = + new SeaTunnelRowDebeziumDeserializationConverters( + item.getValue(), + metadataConverters, + serverTimeZone, + userDefinedConverterFactory); + tableRowConverters.put(item.getKey(), itemRowConverter); + } + return tableRowConverters; + } + + SeaTunnelRowDebeziumDeserializationConverters tableRowConverter = + new SeaTunnelRowDebeziumDeserializationConverters( + (SeaTunnelRowType) inputDataType, + metadataConverters, + serverTimeZone, + userDefinedConverterFactory); + tableRowConverters.put(DEFAULT_TABLE_NAME_KEY, tableRowConverter); + return tableRowConverters; + } - /** Custom validator to validate the row value. */ - public interface ValueValidator extends Serializable { - void validate(SeaTunnelRow rowData, RowKind rowKind) throws Exception; + public static Builder builder() { + return new Builder(); } - /** Builder of {@link SeaTunnelRowDebeziumDeserializeSchema}. */ + @Setter + @Accessors(chain = true) + @NoArgsConstructor(access = AccessLevel.PRIVATE) public static class Builder { private SeaTunnelDataType physicalRowType; private SeaTunnelDataType resultTypeInfo; private MetadataConverter[] metadataConverters = new MetadataConverter[0]; - private ValueValidator validator = (rowData, rowKind) -> {}; private ZoneId serverTimeZone = ZoneId.systemDefault(); private DebeziumDeserializationConverterFactory userDefinedConverterFactory = DebeziumDeserializationConverterFactory.DEFAULT; - - public Builder setPhysicalRowType(SeaTunnelDataType physicalRowType) { - this.physicalRowType = physicalRowType; - return this; - } - - public Builder setMetadataConverters(MetadataConverter[] metadataConverters) { - this.metadataConverters = metadataConverters; - return this; - } - - public Builder setResultTypeInfo(SeaTunnelDataType resultTypeInfo) { - this.resultTypeInfo = resultTypeInfo; - return this; - } - - public Builder setValueValidator(ValueValidator validator) { - this.validator = validator; - return this; - } - - public Builder setServerTimeZone(ZoneId serverTimeZone) { - this.serverTimeZone = serverTimeZone; - return this; - } - - public Builder setUserDefinedConverterFactory( - DebeziumDeserializationConverterFactory userDefinedConverterFactory) { - this.userDefinedConverterFactory = userDefinedConverterFactory; - return this; - } + private SchemaChangeResolver schemaChangeResolver; public SeaTunnelRowDebeziumDeserializeSchema build() { return new SeaTunnelRowDebeziumDeserializeSchema( physicalRowType, metadataConverters, resultTypeInfo, - validator, serverTimeZone, - userDefinedConverterFactory); + userDefinedConverterFactory, + schemaChangeResolver); } } } diff --git a/seatunnel-connectors-v2/connector-cdc/pom.xml b/seatunnel-connectors-v2/connector-cdc/pom.xml index 38f52b1680d7..3519b1c51c08 100644 --- a/seatunnel-connectors-v2/connector-cdc/pom.xml +++ b/seatunnel-connectors-v2/connector-cdc/pom.xml @@ -38,5 +38,42 @@ 1.6.4.Final + 4.8 + + + + + org.antlr + antlr4 + ${antlr.version} + + + + + + + + + org.antlr + antlr4-maven-plugin + ${antlr.version} + + src/main/antlr4 + src/main/java + true + true + true + + + + + antlr4 + + + + + + + diff --git a/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java b/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java index debd6284ec15..422a85b40d14 100644 --- a/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java +++ b/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java @@ -18,6 +18,9 @@ package org.apache.seatunnel.connectors.seatunnel.console.sink; import org.apache.seatunnel.api.sink.SinkWriter; +import org.apache.seatunnel.api.table.event.SchemaChangeEvent; +import org.apache.seatunnel.api.table.event.handler.DataTypeChangeEventDispatcher; +import org.apache.seatunnel.api.table.event.handler.DataTypeChangeEventHandler; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; @@ -36,16 +39,25 @@ @Slf4j public class ConsoleSinkWriter extends AbstractSinkWriter { - private final SeaTunnelRowType seaTunnelRowType; - public final AtomicLong rowCounter = new AtomicLong(0); - public SinkWriter.Context context; + private SeaTunnelRowType seaTunnelRowType; + private final AtomicLong rowCounter = new AtomicLong(0); + private final SinkWriter.Context context; + private final DataTypeChangeEventHandler dataTypeChangeEventHandler; public ConsoleSinkWriter(SeaTunnelRowType seaTunnelRowType, SinkWriter.Context context) { this.seaTunnelRowType = seaTunnelRowType; this.context = context; + this.dataTypeChangeEventHandler = new DataTypeChangeEventDispatcher(); log.info("output rowType: {}", fieldsInfo(seaTunnelRowType)); } + @Override + public void applySchemaChange(SchemaChangeEvent event) { + log.info("changed rowType before: {}", fieldsInfo(seaTunnelRowType)); + seaTunnelRowType = dataTypeChangeEventHandler.reset(seaTunnelRowType).apply(event); + log.info("changed rowType after: {}", fieldsInfo(seaTunnelRowType)); + } + @Override @SuppressWarnings("checkstyle:RegexpSingleline") public void write(SeaTunnelRow element) { diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java index 718e915a0c34..a901fbb5e6a9 100644 --- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java +++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java @@ -151,6 +151,11 @@ private CheckpointConfig parseCheckpointConfig(Node checkpointNode) { getIntegerValue( ServerConfigOptions.CHECKPOINT_TIMEOUT.key(), getTextContent(node))); + } else if (ServerConfigOptions.SCHEMA_CHANGE_CHECKPOINT_TIMEOUT.key().equals(name)) { + checkpointConfig.setSchemaChangeCheckpointTimeout( + getIntegerValue( + ServerConfigOptions.SCHEMA_CHANGE_CHECKPOINT_TIMEOUT.key(), + getTextContent(node))); } else if (ServerConfigOptions.CHECKPOINT_MAX_CONCURRENT.key().equals(name)) { checkpointConfig.setMaxConcurrentCheckpoints( getIntegerValue( diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CheckpointConfig.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CheckpointConfig.java index 78a5ff4d0558..7038a65b422d 100644 --- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CheckpointConfig.java +++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CheckpointConfig.java @@ -31,6 +31,8 @@ public class CheckpointConfig implements Serializable { private long checkpointInterval = ServerConfigOptions.CHECKPOINT_INTERVAL.defaultValue(); private long checkpointTimeout = ServerConfigOptions.CHECKPOINT_TIMEOUT.defaultValue(); + private long schemaChangeCheckpointTimeout = + ServerConfigOptions.SCHEMA_CHANGE_CHECKPOINT_TIMEOUT.defaultValue(); private int maxConcurrentCheckpoints = ServerConfigOptions.CHECKPOINT_MAX_CONCURRENT.defaultValue(); private int tolerableFailureCheckpoints = @@ -52,6 +54,13 @@ public void setCheckpointTimeout(long checkpointTimeout) { this.checkpointTimeout = checkpointTimeout; } + public void setSchemaChangeCheckpointTimeout(long checkpointTimeout) { + checkArgument( + checkpointTimeout >= MINIMAL_CHECKPOINT_TIME, + "The minimum checkpoint timeout is 10 ms."); + this.schemaChangeCheckpointTimeout = checkpointTimeout; + } + public void setMaxConcurrentCheckpoints(int maxConcurrentCheckpoints) { checkArgument( maxConcurrentCheckpoints >= 1, diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java index b5d02c03443c..2de8acad012f 100644 --- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java +++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java @@ -85,6 +85,13 @@ public class ServerConfigOptions { .defaultValue(30000) .withDescription("The timeout (in milliseconds) for a checkpoint."); + public static final Option SCHEMA_CHANGE_CHECKPOINT_TIMEOUT = + Options.key("schema-change-timeout") + .intType() + .defaultValue(30000) + .withDescription( + "The timeout (in milliseconds) for a schema change checkpoint."); + public static final Option CHECKPOINT_MAX_CONCURRENT = Options.key("max-concurrent") .intType() diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/checkpoint/CheckpointType.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/checkpoint/CheckpointType.java index ab012ed87f7a..aa057a2e888e 100644 --- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/checkpoint/CheckpointType.java +++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/checkpoint/CheckpointType.java @@ -22,6 +22,12 @@ public enum CheckpointType { /** Automatically triggered by the CheckpointCoordinator. */ CHECKPOINT_TYPE(true, "checkpoint"), + /** Automatically triggered by the schema change. */ + SCHEMA_CHANGE_BEFORE_POINT_TYPE(true, "schema-change-before-point"), + + /** Automatically triggered by the schema change. */ + SCHEMA_CHANGE_AFTER_POINT_TYPE(true, "schema-change-after-point"), + /** Triggered by the user. */ SAVEPOINT_TYPE(false, "savepoint"), @@ -52,4 +58,40 @@ public boolean isAuto() { public String getName() { return name; } + + public boolean isFinalCheckpoint() { + return this == COMPLETED_POINT_TYPE || this == SAVEPOINT_TYPE; + } + + public boolean isSchemaChangeCheckpoint() { + return isSchemaChangeBeforeCheckpoint() || isSchemaChangeAfterCheckpoint(); + } + + public boolean isSchemaChangeBeforeCheckpoint() { + return this == SCHEMA_CHANGE_BEFORE_POINT_TYPE; + } + + public boolean isSchemaChangeAfterCheckpoint() { + return this == SCHEMA_CHANGE_AFTER_POINT_TYPE; + } + + public boolean isSavepoint() { + return this == SAVEPOINT_TYPE; + } + + public boolean isGeneralCheckpoint() { + return this == CHECKPOINT_TYPE; + } + + public boolean notFinalCheckpoint() { + return isGeneralCheckpoint() || isSchemaChangeCheckpoint(); + } + + public boolean notSchemaChangeCheckpoint() { + return !isSchemaChangeCheckpoint(); + } + + public boolean notCompletedCheckpoint() { + return this != COMPLETED_POINT_TYPE; + } } diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/checkpoint/InternalCheckpointListener.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/checkpoint/InternalCheckpointListener.java index 6b6750be308e..137d5dbc5882 100644 --- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/checkpoint/InternalCheckpointListener.java +++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/checkpoint/InternalCheckpointListener.java @@ -41,4 +41,14 @@ default void notifyCheckpointComplete(long checkpointId) throws Exception {} */ @Override default void notifyCheckpointAborted(long checkpointId) throws Exception {} + + /** + * The notification that the checkpoint has ended means that the notifyCheckpointComplete method + * has been called for all tasks. + * + * @param checkpointId The ID of the checkpoint . + * @throws Exception This method can propagate exceptions, which leads to a failure/recovery for + * the task or job. + */ + default void notifyCheckpointEnd(long checkpointId) throws Exception {} } diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShuffleMultipleRowStrategy.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShuffleMultipleRowStrategy.java index 37477189d79c..b5dcdf0534c6 100644 --- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShuffleMultipleRowStrategy.java +++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShuffleMultipleRowStrategy.java @@ -17,6 +17,7 @@ package org.apache.seatunnel.engine.core.dag.actions; +import org.apache.seatunnel.api.table.event.SchemaChangeEvent; import org.apache.seatunnel.api.table.type.MultipleRowType; import org.apache.seatunnel.api.table.type.Record; import org.apache.seatunnel.api.table.type.SeaTunnelRow; @@ -73,7 +74,14 @@ public Map>> createShuffles( @Override public String createShuffleKey(Record record, int pipelineId, int inputIndex) { - String tableId = ((SeaTunnelRow) record.getData()).getTableId(); + String tableId; + if (record.getData() instanceof SeaTunnelRow) { + tableId = ((SeaTunnelRow) record.getData()).getTableId(); + } else if (record.getData() instanceof SchemaChangeEvent) { + tableId = ((SchemaChangeEvent) record.getData()).tablePath().toString(); + } else { + throw new UnsupportedOperationException("Unsupported record: " + record); + } return generateQueueName(pipelineId, inputIndex, tableId); } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointBarrier.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointBarrier.java index 3ebd672551b3..7179cc8cb354 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointBarrier.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointBarrier.java @@ -48,7 +48,7 @@ public boolean snapshot() { @Override public boolean prepareClose() { - return checkpointType != CheckpointType.CHECKPOINT_TYPE; + return checkpointType.isFinalCheckpoint(); } public long getTimestamp() { diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java index e6b3c3d27ac7..1b6bc6b68719 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java @@ -32,6 +32,7 @@ import org.apache.seatunnel.engine.serializer.api.Serializer; import org.apache.seatunnel.engine.serializer.protobuf.ProtoStuffSerializer; import org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointBarrierTriggerOperation; +import org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointEndOperation; import org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointFinishedOperation; import org.apache.seatunnel.engine.server.checkpoint.operation.NotifyTaskRestoreOperation; import org.apache.seatunnel.engine.server.checkpoint.operation.NotifyTaskStartOperation; @@ -64,6 +65,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -71,7 +73,6 @@ import static org.apache.seatunnel.engine.common.utils.ExceptionUtil.sneakyThrow; import static org.apache.seatunnel.engine.core.checkpoint.CheckpointType.CHECKPOINT_TYPE; -import static org.apache.seatunnel.engine.core.checkpoint.CheckpointType.COMPLETED_POINT_TYPE; import static org.apache.seatunnel.engine.core.checkpoint.CheckpointType.SAVEPOINT_TYPE; import static org.apache.seatunnel.engine.server.checkpoint.CheckpointPlan.COORDINATOR_INDEX; import static org.apache.seatunnel.engine.server.task.statemachine.SeaTunnelTaskState.READY_START; @@ -123,6 +124,8 @@ public class CheckpointCoordinator { private final AtomicInteger pendingCounter = new AtomicInteger(0); + private final AtomicBoolean schemaChanging = new AtomicBoolean(false); + private final Object lock = new Object(); /** Flag marking the coordinator as shut down (not accepting any messages anymore). */ @@ -183,19 +186,9 @@ public CheckpointCoordinator( this.checkpointIdCounter = checkpointIdCounter; this.readyToCloseStartingTask = new CopyOnWriteArraySet<>(); if (pipelineState != null) { - // fix after the savepoint job is restored, the checkpoint file cannot be generated - CompletedCheckpoint tmpCheckpoint = - serializer.deserialize(pipelineState.getStates(), CompletedCheckpoint.class); this.latestCompletedCheckpoint = - new CompletedCheckpoint( - tmpCheckpoint.getJobId(), - tmpCheckpoint.getPipelineId(), - tmpCheckpoint.getCheckpointId(), - tmpCheckpoint.getCheckpointTimestamp(), - CheckpointType.CHECKPOINT_TYPE, - tmpCheckpoint.getCompletedTimestamp(), - tmpCheckpoint.getTaskStates(), - tmpCheckpoint.getTaskStatistics()); + serializer.deserialize(pipelineState.getStates(), CompletedCheckpoint.class); + this.latestCompletedCheckpoint.setRestored(true); } this.checkpointCoordinatorFuture = new CompletableFuture(); @@ -326,8 +319,13 @@ private void notifyCompleted(CompletedCheckpoint completedCheckpoint) { try { LOG.info("start notify checkpoint completed, checkpoint:{}", completedCheckpoint); InvocationFuture[] invocationFutures = - notifyCheckpointCompleted(completedCheckpoint.getCheckpointId()); + notifyCheckpointCompleted(completedCheckpoint); CompletableFuture.allOf(invocationFutures).join(); + // Execution to this point means that all notifyCheckpointCompleted have been + // completed + InvocationFuture[] invocationFuturesForEnd = + notifyCheckpointEnd(completedCheckpoint); + CompletableFuture.allOf(invocationFuturesForEnd).join(); } catch (Throwable e) { handleCoordinatorError( "notify checkpoint completed failed", @@ -391,7 +389,7 @@ protected void tryTriggerPendingCheckpoint(CheckpointType checkpointType) { return; } final long currentTimestamp = Instant.now().toEpochMilli(); - if (notFinalCheckpoint(checkpointType)) { + if (checkpointType.notFinalCheckpoint() && checkpointType.notSchemaChangeCheckpoint()) { if (currentTimestamp - latestTriggerTimestamp.get() < coordinatorConfig.getCheckpointInterval() || pendingCounter.get() >= coordinatorConfig.getMaxConcurrentCheckpoints() @@ -411,27 +409,29 @@ protected void tryTriggerPendingCheckpoint(CheckpointType checkpointType) { shutdown)); return; } - if (!notFinalCheckpoint(checkpointType)) { + if (checkpointType.isFinalCheckpoint() || checkpointType.isSchemaChangeCheckpoint()) { if (pendingCounter.get() > 0) { scheduleTriggerPendingCheckpoint(checkpointType, 500L); return; } } + + if (schemaChanging.get() && checkpointType.isGeneralCheckpoint()) { + LOG.info("skip trigger generic-checkpoint because schema change in progress"); + return; + } + CompletableFuture pendingCheckpoint = createPendingCheckpoint(currentTimestamp, checkpointType); startTriggerPendingCheckpoint(pendingCheckpoint); pendingCounter.incrementAndGet(); // if checkpoint type are final type, we don't need to trigger next checkpoint - if (notFinalCheckpoint(checkpointType)) { + if (checkpointType.notFinalCheckpoint() && checkpointType.notSchemaChangeCheckpoint()) { scheduleTriggerPendingCheckpoint(coordinatorConfig.getCheckpointInterval()); } } } - private boolean notFinalCheckpoint(CheckpointType checkpointType) { - return checkpointType.equals(CHECKPOINT_TYPE); - } - public boolean isShutdown() { return shutdown; } @@ -519,13 +519,22 @@ private void startTriggerPendingCheckpoint( LOG.debug( "Start a scheduled task to prevent checkpoint timeouts for barrier " + pendingCheckpoint.getInfo()); + + long checkpointTimeout = coordinatorConfig.getCheckpointTimeout(); + if (pendingCheckpoint.getCheckpointType().isSchemaChangeAfterCheckpoint()) { + checkpointTimeout = coordinatorConfig.getSchemaChangeCheckpointTimeout(); + } + // TODO Need change to polling check until max timeout fails scheduler.schedule( () -> { // If any task is not acked within the checkpoint timeout if (pendingCheckpoints.get(pendingCheckpoint.getCheckpointId()) != null && !pendingCheckpoint.isFullyAcknowledged()) { - if (tolerableFailureCheckpoints-- <= 0) { + if (tolerableFailureCheckpoints-- <= 0 + || pendingCheckpoint + .getCheckpointType() + .isSchemaChangeCheckpoint()) { LOG.info( "timeout checkpoint: " + pendingCheckpoint.getInfo()); @@ -534,7 +543,7 @@ private void startTriggerPendingCheckpoint( } } }, - coordinatorConfig.getCheckpointTimeout(), + checkpointTimeout, TimeUnit.MILLISECONDS); }); } @@ -543,7 +552,7 @@ CompletableFuture createPendingCheckpoint( long triggerTimestamp, CheckpointType checkpointType) { synchronized (lock) { CompletableFuture idFuture; - if (!checkpointType.equals(COMPLETED_POINT_TYPE)) { + if (checkpointType.notCompletedCheckpoint()) { idFuture = CompletableFuture.supplyAsync( () -> { @@ -650,6 +659,7 @@ protected void cleanPendingCheckpoint(CheckpointCloseReason closedReason) { pipelineTaskStatus.clear(); readyToCloseStartingTask.clear(); pendingCounter.set(0); + schemaChanging.set(false); scheduler.shutdownNow(); scheduler = Executors.newScheduledThreadPool( @@ -679,7 +689,7 @@ protected void acknowledgeTask(TaskAcknowledgeOperation ackOperation) { pendingCheckpoint.acknowledgeTask( location, ackOperation.getStates(), - SAVEPOINT_TYPE == pendingCheckpoint.getCheckpointType() + pendingCheckpoint.getCheckpointType().isSavepoint() ? SubtaskStatus.SAVEPOINT_PREPARE_CLOSE : SubtaskStatus.RUNNING); } @@ -738,13 +748,13 @@ public synchronized void completePendingCheckpoint(CompletedCheckpoint completed pendingCounter.decrementAndGet(); if (pendingCheckpoints.size() + 1 == coordinatorConfig.getMaxConcurrentCheckpoints()) { // latest checkpoint completed time > checkpoint interval - if (notFinalCheckpoint(completedCheckpoint.getCheckpointType())) { + if (completedCheckpoint.getCheckpointType().notFinalCheckpoint()) { scheduleTriggerPendingCheckpoint(0L); } } if (isCompleted()) { cleanPendingCheckpoint(CheckpointCloseReason.CHECKPOINT_COORDINATOR_COMPLETED); - if (latestCompletedCheckpoint.getCheckpointType().equals(SAVEPOINT_TYPE)) { + if (latestCompletedCheckpoint.getCheckpointType().isSavepoint()) { updateStatus(CheckpointCoordinatorStatus.SUSPEND); checkpointCoordinatorFuture.complete( new CheckpointCoordinatorState(CheckpointCoordinatorStatus.SUSPEND, null)); @@ -756,28 +766,45 @@ public synchronized void completePendingCheckpoint(CompletedCheckpoint completed } } - public InvocationFuture[] notifyCheckpointCompleted(long checkpointId) { + public InvocationFuture[] notifyCheckpointCompleted(CompletedCheckpoint checkpoint) { + if (checkpoint.getCheckpointType().isSchemaChangeAfterCheckpoint()) { + completeSchemaChangeAfterCheckpoint(checkpoint); + } return plan.getPipelineSubtasks().stream() .map( taskLocation -> - new CheckpointFinishedOperation(taskLocation, checkpointId, true)) + new CheckpointFinishedOperation( + taskLocation, checkpoint.getCheckpointId(), true)) .map(checkpointManager::sendOperationToMemberNode) .toArray(InvocationFuture[]::new); } + public InvocationFuture[] notifyCheckpointEnd(CompletedCheckpoint checkpoint) { + if (checkpoint.getCheckpointType().isSchemaChangeCheckpoint()) { + return plan.getPipelineSubtasks().stream() + .map( + taskLocation -> + new CheckpointEndOperation( + taskLocation, checkpoint.getCheckpointId(), true)) + .map(checkpointManager::sendOperationToMemberNode) + .toArray(InvocationFuture[]::new); + } + return new InvocationFuture[0]; + } + public boolean isCompleted() { if (latestCompletedCheckpoint == null) { return false; } - return latestCompletedCheckpoint.getCheckpointType() == COMPLETED_POINT_TYPE - || latestCompletedCheckpoint.getCheckpointType() == SAVEPOINT_TYPE; + return latestCompletedCheckpoint.getCheckpointType().isFinalCheckpoint() + && !latestCompletedCheckpoint.isRestored(); } public boolean isEndOfSavePoint() { if (latestCompletedCheckpoint == null) { return false; } - return latestCompletedCheckpoint.getCheckpointType() == SAVEPOINT_TYPE; + return latestCompletedCheckpoint.getCheckpointType().isSavepoint(); } public PassiveCompletableFuture @@ -823,4 +850,53 @@ private synchronized void updateStatus(@NonNull CheckpointCoordinatorStatus targ checkpointStateImapKey, targetStatus)); } } + + protected void scheduleSchemaChangeBeforeCheckpoint() { + if (schemaChanging.compareAndSet(false, true)) { + LOG.info( + "stop trigger general-checkpoint({}@{}) because schema change in progress.", + pipelineId, + jobId); + LOG.info("schedule schema-change-before checkpoint({}@{}).", pipelineId, jobId); + scheduleTriggerPendingCheckpoint(CheckpointType.SCHEMA_CHANGE_BEFORE_POINT_TYPE, 0); + } else { + LOG.warn( + "schema-change-before checkpoint({}@{}) is already scheduled.", + pipelineId, + jobId); + } + } + + protected void scheduleSchemaChangeAfterCheckpoint() { + if (schemaChanging.get()) { + LOG.info("schedule schema-change-after checkpoint({}@{}).", pipelineId, jobId); + scheduleTriggerPendingCheckpoint(CheckpointType.SCHEMA_CHANGE_AFTER_POINT_TYPE, 0); + } else { + LOG.warn( + "schema-change-after checkpoint({}@{}) is already scheduled.", + pipelineId, + jobId); + } + } + + protected void completeSchemaChangeAfterCheckpoint(CompletedCheckpoint checkpoint) { + if (schemaChanging.compareAndSet(true, false)) { + LOG.info( + "completed schema-change-after checkpoint({}/{}@{}).", + checkpoint.getCheckpointId(), + pipelineId, + jobId); + LOG.info( + "recover trigger general-checkpoint({}/{}@{}).", + checkpoint.getCheckpointId(), + pipelineId, + jobId); + scheduleTriggerPendingCheckpoint(coordinatorConfig.getCheckpointInterval()); + } else { + throw new IllegalStateException( + String.format( + "schema-change-after checkpoint(%s/%s@%s) is already completed.", + checkpoint.getCheckpointId(), pipelineId, jobId)); + } + } } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java index 0c5a91698e7b..cd58da1dd9eb 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java @@ -32,6 +32,8 @@ import org.apache.seatunnel.engine.core.job.PipelineStatus; import org.apache.seatunnel.engine.server.checkpoint.operation.TaskAcknowledgeOperation; import org.apache.seatunnel.engine.server.checkpoint.operation.TaskReportStatusOperation; +import org.apache.seatunnel.engine.server.checkpoint.operation.TriggerSchemaChangeAfterCheckpointOperation; +import org.apache.seatunnel.engine.server.checkpoint.operation.TriggerSchemaChangeBeforeCheckpointOperation; import org.apache.seatunnel.engine.server.dag.execution.Pipeline; import org.apache.seatunnel.engine.server.dag.physical.SubPlan; import org.apache.seatunnel.engine.server.execution.Task; @@ -268,6 +270,38 @@ public void acknowledgeTask(TaskAcknowledgeOperation ackOperation) { coordinator.acknowledgeTask(ackOperation); } + public void triggerSchemaChangeBeforeCheckpoint( + TriggerSchemaChangeBeforeCheckpointOperation operation) { + log.debug( + "checkpoint manager received schema-change-before checkpoint operation {}", + operation.getTaskLocation()); + CheckpointCoordinator coordinator = getCheckpointCoordinator(operation.getTaskLocation()); + if (coordinator.isCompleted()) { + log.info( + "The checkpoint coordinator({}) is completed", + operation.getTaskLocation().getPipelineId()); + return; + } + + coordinator.scheduleSchemaChangeBeforeCheckpoint(); + } + + public void triggerSchemaChangeAfterCheckpoint( + TriggerSchemaChangeAfterCheckpointOperation operation) { + log.debug( + "checkpoint manager received schema-change-after checkpoint operation {}", + operation.getTaskLocation()); + CheckpointCoordinator coordinator = getCheckpointCoordinator(operation.getTaskLocation()); + if (coordinator.isCompleted()) { + log.info( + "The checkpoint coordinator({}) is completed", + operation.getTaskLocation().getPipelineId()); + return; + } + + coordinator.scheduleSchemaChangeAfterCheckpoint(); + } + public boolean isSavePointEnd() { return coordinatorMap.values().stream() .map(CheckpointCoordinator::isEndOfSavePoint) diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CompletedCheckpoint.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CompletedCheckpoint.java index 8d6ea554d7ee..3f196f2c8fff 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CompletedCheckpoint.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CompletedCheckpoint.java @@ -20,6 +20,9 @@ import org.apache.seatunnel.engine.core.checkpoint.Checkpoint; import org.apache.seatunnel.engine.core.checkpoint.CheckpointType; +import lombok.Getter; +import lombok.Setter; + import java.io.Serializable; import java.util.Map; @@ -41,6 +44,8 @@ public class CompletedCheckpoint implements Checkpoint, Serializable { private final Map taskStatistics; + @Getter @Setter private boolean isRestored = false; + public CompletedCheckpoint( long jobId, int pipelineId, diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointEndOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointEndOperation.java new file mode 100644 index 000000000000..62ec42f0a6a2 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointEndOperation.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.server.checkpoint.operation; + +import org.apache.seatunnel.common.utils.ExceptionUtils; +import org.apache.seatunnel.common.utils.RetryUtils; +import org.apache.seatunnel.engine.common.Constant; +import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException; +import org.apache.seatunnel.engine.server.SeaTunnelServer; +import org.apache.seatunnel.engine.server.exception.TaskGroupContextNotFoundException; +import org.apache.seatunnel.engine.server.execution.Task; +import org.apache.seatunnel.engine.server.execution.TaskGroupContext; +import org.apache.seatunnel.engine.server.execution.TaskLocation; +import org.apache.seatunnel.engine.server.serializable.CheckpointDataSerializerHook; +import org.apache.seatunnel.engine.server.task.operation.TaskOperation; + +import com.hazelcast.nio.ObjectDataInput; +import com.hazelcast.nio.ObjectDataOutput; +import lombok.Getter; +import lombok.NoArgsConstructor; + +import java.io.IOException; + +@Getter +@NoArgsConstructor +public class CheckpointEndOperation extends TaskOperation { + + private long checkpointId; + + private boolean successful; + + public CheckpointEndOperation( + TaskLocation taskLocation, long checkpointId, boolean successful) { + super(taskLocation); + this.checkpointId = checkpointId; + this.successful = successful; + } + + @Override + public int getFactoryId() { + return CheckpointDataSerializerHook.FACTORY_ID; + } + + @Override + public int getClassId() { + return CheckpointDataSerializerHook.CHECKPOINT_END_OPERATOR; + } + + @Override + protected void writeInternal(ObjectDataOutput out) throws IOException { + super.writeInternal(out); + out.writeLong(checkpointId); + out.writeBoolean(successful); + } + + @Override + protected void readInternal(ObjectDataInput in) throws IOException { + super.readInternal(in); + checkpointId = in.readLong(); + successful = in.readBoolean(); + } + + @Override + public void run() throws Exception { + SeaTunnelServer server = getService(); + RetryUtils.retryWithException( + () -> { + try { + TaskGroupContext groupContext = + server.getTaskExecutionService() + .getExecutionContext(taskLocation.getTaskGroupLocation()); + Task task = groupContext.getTaskGroup().getTask(taskLocation.getTaskID()); + ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader(groupContext.getClassLoader()); + + task.notifyCheckpointEnd(checkpointId); + + Thread.currentThread().setContextClassLoader(classLoader); + } catch (Exception e) { + throw new SeaTunnelEngineException(ExceptionUtils.getMessage(e)); + } + return null; + }, + new RetryUtils.RetryMaterial( + Constant.OPERATION_RETRY_TIME, + true, + exception -> + exception instanceof TaskGroupContextNotFoundException + && !server.taskIsEnded(taskLocation.getTaskGroupLocation()), + Constant.OPERATION_RETRY_SLEEP)); + } +} diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/TriggerSchemaChangeAfterCheckpointOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/TriggerSchemaChangeAfterCheckpointOperation.java new file mode 100644 index 000000000000..66be8cc9f306 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/TriggerSchemaChangeAfterCheckpointOperation.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.server.checkpoint.operation; + +import org.apache.seatunnel.engine.server.SeaTunnelServer; +import org.apache.seatunnel.engine.server.execution.TaskLocation; +import org.apache.seatunnel.engine.server.serializable.CheckpointDataSerializerHook; + +import com.hazelcast.nio.ObjectDataInput; +import com.hazelcast.nio.ObjectDataOutput; +import com.hazelcast.nio.serialization.IdentifiedDataSerializable; +import com.hazelcast.spi.impl.operationservice.Operation; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; + +@Slf4j +@Getter +@AllArgsConstructor +@NoArgsConstructor +public class TriggerSchemaChangeAfterCheckpointOperation extends Operation + implements IdentifiedDataSerializable { + + private TaskLocation taskLocation; + + @Override + public int getFactoryId() { + return CheckpointDataSerializerHook.FACTORY_ID; + } + + @Override + public int getClassId() { + return CheckpointDataSerializerHook.TRIGGER_SCHEMA_CHANGE_AFTER_CHECKPOINT_OPERATOR; + } + + @Override + protected void writeInternal(ObjectDataOutput out) throws IOException { + out.writeObject(taskLocation); + } + + @Override + protected void readInternal(ObjectDataInput in) throws IOException { + taskLocation = in.readObject(); + } + + @Override + public void run() { + log.debug("call TriggerSchemaChangeAfterCheckpointOperation start {}", taskLocation); + ((SeaTunnelServer) getService()) + .getCoordinatorService() + .getJobMaster(taskLocation.getJobId()) + .getCheckpointManager() + .triggerSchemaChangeAfterCheckpoint(this); + log.debug("call TriggerSchemaChangeAfterCheckpointOperation finished {}", taskLocation); + } +} diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/TriggerSchemaChangeBeforeCheckpointOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/TriggerSchemaChangeBeforeCheckpointOperation.java new file mode 100644 index 000000000000..54daedd8c627 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/TriggerSchemaChangeBeforeCheckpointOperation.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.server.checkpoint.operation; + +import org.apache.seatunnel.engine.server.SeaTunnelServer; +import org.apache.seatunnel.engine.server.execution.TaskLocation; +import org.apache.seatunnel.engine.server.serializable.CheckpointDataSerializerHook; + +import com.hazelcast.nio.ObjectDataInput; +import com.hazelcast.nio.ObjectDataOutput; +import com.hazelcast.nio.serialization.IdentifiedDataSerializable; +import com.hazelcast.spi.impl.operationservice.Operation; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; + +@Slf4j +@Getter +@AllArgsConstructor +@NoArgsConstructor +public class TriggerSchemaChangeBeforeCheckpointOperation extends Operation + implements IdentifiedDataSerializable { + + private TaskLocation taskLocation; + + @Override + public int getFactoryId() { + return CheckpointDataSerializerHook.FACTORY_ID; + } + + @Override + public int getClassId() { + return CheckpointDataSerializerHook.TRIGGER_SCHEMA_CHANGE_BEFORE_CHECKPOINT_OPERATOR; + } + + @Override + protected void writeInternal(ObjectDataOutput out) throws IOException { + out.writeObject(taskLocation); + } + + @Override + protected void readInternal(ObjectDataInput in) throws IOException { + taskLocation = in.readObject(); + } + + @Override + public void run() { + log.debug("call TriggerSchemaChangeBeforeCheckpointOperation {}", taskLocation); + ((SeaTunnelServer) getService()) + .getCoordinatorService() + .getJobMaster(taskLocation.getJobId()) + .getCheckpointManager() + .triggerSchemaChangeBeforeCheckpoint(this); + log.debug("call SchemaChangeBeforeCheckpoint finished {}", taskLocation); + } +} diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/CheckpointDataSerializerHook.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/CheckpointDataSerializerHook.java index 3349a107035b..fb6cb2b7d057 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/CheckpointDataSerializerHook.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/CheckpointDataSerializerHook.java @@ -19,12 +19,15 @@ import org.apache.seatunnel.engine.common.serializeable.SeaTunnelFactoryIdConstant; import org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointBarrierTriggerOperation; +import org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointEndOperation; import org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointErrorReportOperation; import org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointFinishedOperation; import org.apache.seatunnel.engine.server.checkpoint.operation.NotifyTaskRestoreOperation; import org.apache.seatunnel.engine.server.checkpoint.operation.NotifyTaskStartOperation; import org.apache.seatunnel.engine.server.checkpoint.operation.TaskAcknowledgeOperation; import org.apache.seatunnel.engine.server.checkpoint.operation.TaskReportStatusOperation; +import org.apache.seatunnel.engine.server.checkpoint.operation.TriggerSchemaChangeAfterCheckpointOperation; +import org.apache.seatunnel.engine.server.checkpoint.operation.TriggerSchemaChangeBeforeCheckpointOperation; import com.hazelcast.internal.serialization.DataSerializerHook; import com.hazelcast.internal.serialization.impl.FactoryIdHelper; @@ -41,8 +44,11 @@ public final class CheckpointDataSerializerHook implements DataSerializerHook { public static final int NOTIFY_TASK_RESTORE_OPERATOR = 5; public static final int NOTIFY_TASK_START_OPERATOR = 6; - public static final int CHECKPOINT_ERROR_REPORT_OPERATOR = 7; + public static final int TRIGGER_SCHEMA_CHANGE_BEFORE_CHECKPOINT_OPERATOR = 8; + public static final int TRIGGER_SCHEMA_CHANGE_AFTER_CHECKPOINT_OPERATOR = 9; + + public static final int CHECKPOINT_END_OPERATOR = 10; public static final int FACTORY_ID = FactoryIdHelper.getFactoryId( @@ -78,6 +84,12 @@ public IdentifiedDataSerializable create(int typeId) { return new NotifyTaskStartOperation(); case CHECKPOINT_ERROR_REPORT_OPERATOR: return new CheckpointErrorReportOperation(); + case TRIGGER_SCHEMA_CHANGE_BEFORE_CHECKPOINT_OPERATOR: + return new TriggerSchemaChangeBeforeCheckpointOperation(); + case TRIGGER_SCHEMA_CHANGE_AFTER_CHECKPOINT_OPERATOR: + return new TriggerSchemaChangeAfterCheckpointOperation(); + case CHECKPOINT_END_OPERATOR: + return new CheckpointEndOperation(); default: throw new IllegalArgumentException("Unknown type id " + typeId); } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java index 2a77a49729fd..0514d83c86d8 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java @@ -21,21 +21,30 @@ import org.apache.seatunnel.api.common.metrics.Meter; import org.apache.seatunnel.api.common.metrics.MetricsContext; import org.apache.seatunnel.api.source.Collector; +import org.apache.seatunnel.api.table.event.SchemaChangeEvent; import org.apache.seatunnel.api.table.type.Record; import org.apache.seatunnel.engine.server.task.flow.OneInputFlowLifeCycle; +import lombok.extern.slf4j.Slf4j; + import java.io.IOException; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_COUNT; import static org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_QPS; +@Slf4j public class SeaTunnelSourceCollector implements Collector { private final Object checkpointLock; private final List>> outputs; + private final AtomicBoolean schemaChangeBeforeCheckpointSignal = new AtomicBoolean(false); + + private final AtomicBoolean schemaChangeAfterCheckpointSignal = new AtomicBoolean(false); + private final Counter sourceReceivedCount; private final Meter sourceReceivedQPS; @@ -64,6 +73,53 @@ public void collect(T row) { } } + @Override + public void collect(SchemaChangeEvent event) { + try { + sendRecordToNext(new Record<>(event)); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void markSchemaChangeBeforeCheckpoint() { + if (schemaChangeAfterCheckpointSignal.get()) { + throw new IllegalStateException("schema-change-after checkpoint already marked."); + } + if (!schemaChangeBeforeCheckpointSignal.compareAndSet(false, true)) { + throw new IllegalStateException("schema-change-before checkpoint already marked."); + } + log.info("mark schema-change-before checkpoint signal."); + } + + @Override + public void markSchemaChangeAfterCheckpoint() { + if (schemaChangeBeforeCheckpointSignal.get()) { + throw new IllegalStateException("schema-change-before checkpoint already marked."); + } + if (!schemaChangeAfterCheckpointSignal.compareAndSet(false, true)) { + throw new IllegalStateException("schema-change-after checkpoint already marked."); + } + log.info("mark schema-change-after checkpoint signal."); + } + + public boolean captureSchemaChangeBeforeCheckpointSignal() { + if (schemaChangeBeforeCheckpointSignal.get()) { + log.info("capture schema-change-before checkpoint signal."); + return schemaChangeBeforeCheckpointSignal.getAndSet(false); + } + return false; + } + + public boolean captureSchemaChangeAfterCheckpointSignal() { + if (schemaChangeAfterCheckpointSignal.get()) { + log.info("capture schema-change-after checkpoint signal."); + return schemaChangeAfterCheckpointSignal.getAndSet(false); + } + return false; + } + @Override public Object getCheckpointLock() { return checkpointLock; diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java index c752d45d6997..9c529d2def43 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java @@ -33,6 +33,8 @@ import org.apache.seatunnel.engine.server.checkpoint.ActionSubtaskState; import org.apache.seatunnel.engine.server.checkpoint.CheckpointBarrier; import org.apache.seatunnel.engine.server.checkpoint.operation.TaskAcknowledgeOperation; +import org.apache.seatunnel.engine.server.checkpoint.operation.TriggerSchemaChangeAfterCheckpointOperation; +import org.apache.seatunnel.engine.server.checkpoint.operation.TriggerSchemaChangeBeforeCheckpointOperation; import org.apache.seatunnel.engine.server.dag.physical.config.IntermediateQueueConfig; import org.apache.seatunnel.engine.server.dag.physical.config.SinkConfig; import org.apache.seatunnel.engine.server.dag.physical.config.SourceConfig; @@ -59,6 +61,7 @@ import com.hazelcast.core.HazelcastInstance; import com.hazelcast.internal.metrics.MetricDescriptor; import com.hazelcast.internal.metrics.MetricsCollectionContext; +import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; @@ -347,6 +350,24 @@ public void ack(Barrier barrier) { } } + public InvocationFuture triggerSchemaChangeBeforeCheckpoint() { + log.info( + "trigger schema-change-before checkpoint. jobID[{}], taskLocation[{}]", + jobID, + taskLocation); + return this.getExecutionContext() + .sendToMaster(new TriggerSchemaChangeBeforeCheckpointOperation(taskLocation)); + } + + public InvocationFuture triggerSchemaChangeAfterCheckpoint() { + log.info( + "trigger schema-change-after checkpoint. jobID[{}], taskLocation[{}]", + jobID, + taskLocation); + return this.getExecutionContext() + .sendToMaster(new TriggerSchemaChangeAfterCheckpointOperation(taskLocation)); + } + public void addState(Barrier barrier, ActionStateKey stateKey, List state) { List states = checkpointStates.computeIfAbsent(barrier.getId(), id -> new ArrayList<>()); @@ -365,6 +386,12 @@ public void notifyCheckpointAborted(long checkpointId) throws Exception { tryClose(checkpointId); } + @Override + public void notifyCheckpointEnd(long checkpointId) throws Exception { + notifyAllAction(listener -> listener.notifyCheckpointEnd(checkpointId)); + tryClose(checkpointId); + } + public void notifyAllAction(ConsumerWithException consumer) { allCycles.stream() .filter(cycle -> cycle instanceof InternalCheckpointListener) diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java index a83f4bfb1dec..1a8ecf29c809 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java @@ -193,6 +193,8 @@ public void close() throws IOException { @Override public void triggerBarrier(Barrier barrier) throws Exception { + long startTime = System.currentTimeMillis(); + log.debug("trigger barrier for sink agg commit [{}]", barrier); Integer count = checkpointBarrierCounter.compute( @@ -233,6 +235,12 @@ public void triggerBarrier(Barrier barrier) throws Exception { ActionStateKey.of(sink), -1, states)))) .join(); } + + log.debug( + "trigger barrier [{}] finished, cost {}ms. taskLocation [{}]", + barrier.getId(), + System.currentTimeMillis() - startTime, + taskLocation); } @Override diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java index 56ba10c48709..da5fa8aeb3a0 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java @@ -134,6 +134,8 @@ public ProgressState call() throws Exception { @Override public void triggerBarrier(Barrier barrier) throws Exception { + long startTime = System.currentTimeMillis(); + log.debug("split enumer trigger barrier [{}]", barrier); if (barrier.prepareClose()) { this.prepareCloseTriggered = true; @@ -164,6 +166,12 @@ public void triggerBarrier(Barrier barrier) throws Exception { Collections.singletonList(serialize))))) .join(); } + + log.debug( + "trigger barrier [{}] finished, cost {}ms. taskLocation [{}]", + barrier.getId(), + System.currentTimeMillis() - startTime, + taskLocation); } @Override diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSinkFlowLifeCycle.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSinkFlowLifeCycle.java index 7054c8ac72d0..32ec5cb8f4d0 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSinkFlowLifeCycle.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSinkFlowLifeCycle.java @@ -17,6 +17,7 @@ package org.apache.seatunnel.engine.server.task.flow; +import org.apache.seatunnel.api.table.event.SchemaChangeEvent; import org.apache.seatunnel.api.table.type.Record; import org.apache.seatunnel.engine.core.dag.actions.ShuffleAction; import org.apache.seatunnel.engine.core.dag.actions.ShuffleStrategy; @@ -71,6 +72,8 @@ public ShuffleSinkFlowLifeCycle( @Override public void received(Record record) throws IOException { if (record.getData() instanceof Barrier) { + long startTime = System.currentTimeMillis(); + // flush shuffle buffer shuffleFlush(); @@ -93,6 +96,18 @@ public void received(Record record) throws IOException { throw new RuntimeException(e); } } + + log.debug( + "trigger barrier [{}] finished, cost: {}ms. taskLocation: [{}]", + barrier.getId(), + System.currentTimeMillis() - startTime, + runningTask.getTaskLocation()); + } else if (record.getData() instanceof SchemaChangeEvent) { + if (prepareClose) { + return; + } + + shuffleItem(record); } else { if (prepareClose) { return; diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSourceFlowLifeCycle.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSourceFlowLifeCycle.java index b32ba1c52439..2f14c6770114 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSourceFlowLifeCycle.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSourceFlowLifeCycle.java @@ -96,6 +96,8 @@ public void collect(Collector> collector) throws Exception { for (int recordIndex = 0; recordIndex < shuffleBatch.size(); recordIndex++) { Record record = shuffleBatch.get(recordIndex); if (record.getData() instanceof Barrier) { + long startTime = System.currentTimeMillis(); + Barrier barrier = (Barrier) record.getData(); // mark queue barrier @@ -117,6 +119,11 @@ public void collect(Collector> collector) throws Exception { runningTask.ack(barrier); collector.collect(record); + log.debug( + "trigger barrier [{}] finished, cost: {}ms. taskLocation: [{}]", + barrier.getId(), + System.currentTimeMillis() - startTime, + runningTask.getTaskLocation()); alignedBarriersCounter = 0; alignedBarriers.clear(); diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java index 9e67a601eb9a..7e6d73c54986 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java @@ -23,6 +23,7 @@ import org.apache.seatunnel.api.serialization.Serializer; import org.apache.seatunnel.api.sink.SinkCommitter; import org.apache.seatunnel.api.sink.SinkWriter; +import org.apache.seatunnel.api.table.event.SchemaChangeEvent; import org.apache.seatunnel.api.table.type.Record; import org.apache.seatunnel.common.utils.SerializationUtils; import org.apache.seatunnel.engine.core.checkpoint.InternalCheckpointListener; @@ -39,6 +40,7 @@ import org.apache.seatunnel.engine.server.task.record.Barrier; import com.hazelcast.cluster.Address; +import lombok.extern.slf4j.Slf4j; import java.io.IOException; import java.io.Serializable; @@ -56,6 +58,7 @@ import static org.apache.seatunnel.engine.common.utils.ExceptionUtil.sneaky; import static org.apache.seatunnel.engine.server.task.AbstractTask.serializeStates; +@Slf4j public class SinkFlowLifeCycle extends ActionFlowLifeCycle implements OneInputFlowLifeCycle>, InternalCheckpointListener { @@ -150,6 +153,8 @@ private void registerCommitter() { public void received(Record record) { try { if (record.getData() instanceof Barrier) { + long startTime = System.currentTimeMillis(); + Barrier barrier = (Barrier) record.getData(); if (barrier.prepareClose()) { prepareClose = true; @@ -197,6 +202,18 @@ public void received(Record record) { } } runningTask.ack(barrier); + + log.debug( + "trigger barrier [{}] finished, cost {}ms. taskLocation [{}]", + barrier.getId(), + System.currentTimeMillis() - startTime, + taskLocation); + } else if (record.getData() instanceof SchemaChangeEvent) { + if (prepareClose) { + return; + } + SchemaChangeEvent event = (SchemaChangeEvent) record.getData(); + writer.applySchemaChange(event); } else { if (prepareClose) { return; diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java index 9ca01eba322d..b883bd8ffdd9 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java @@ -24,10 +24,12 @@ import org.apache.seatunnel.api.source.SourceSplit; import org.apache.seatunnel.api.table.type.Record; import org.apache.seatunnel.common.utils.SerializationUtils; +import org.apache.seatunnel.engine.core.checkpoint.CheckpointType; import org.apache.seatunnel.engine.core.checkpoint.InternalCheckpointListener; import org.apache.seatunnel.engine.core.dag.actions.SourceAction; import org.apache.seatunnel.engine.server.checkpoint.ActionStateKey; import org.apache.seatunnel.engine.server.checkpoint.ActionSubtaskState; +import org.apache.seatunnel.engine.server.checkpoint.CheckpointBarrier; import org.apache.seatunnel.engine.server.execution.TaskLocation; import org.apache.seatunnel.engine.server.task.SeaTunnelSourceCollector; import org.apache.seatunnel.engine.server.task.SeaTunnelTask; @@ -41,14 +43,20 @@ import org.apache.seatunnel.engine.server.task.record.Barrier; import com.hazelcast.cluster.Address; +import lombok.AccessLevel; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import lombok.ToString; import lombok.extern.slf4j.Slf4j; import java.io.IOException; +import java.io.Serializable; import java.util.Collection; import java.util.List; import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import static org.apache.seatunnel.engine.common.utils.ExceptionUtil.sneaky; @@ -75,6 +83,8 @@ public class SourceFlowLifeCycle extends ActionFl private final MetricsContext metricsContext; + private final AtomicReference schemaChangePhase = new AtomicReference<>(); + public SourceFlowLifeCycle( SourceAction sourceAction, int indexID, @@ -132,12 +142,39 @@ public void close() throws IOException { public void collect() throws Exception { if (!prepareClose) { + if (schemaChanging()) { + log.debug("schema is changing, stop reader collect records"); + + Thread.sleep(200); + return; + } + reader.pollNext(collector); if (collector.isEmptyThisPollNext()) { Thread.sleep(100); } else { collector.resetEmptyThisPollNext(); } + + if (collector.captureSchemaChangeBeforeCheckpointSignal()) { + if (schemaChangePhase.get() != null) { + throw new IllegalStateException( + "previous schema changes in progress, schemaChangePhase: " + + schemaChangePhase.get()); + } + runningTask.triggerSchemaChangeBeforeCheckpoint().get(); + schemaChangePhase.set(SchemaChangePhase.createBeforePhase()); + log.info("triggered schema-change-before checkpoint, stopping collect data"); + } else if (collector.captureSchemaChangeAfterCheckpointSignal()) { + if (schemaChangePhase.get() != null) { + throw new IllegalStateException( + "previous schema changes in progress, schemaChangePhase: " + + schemaChangePhase.get()); + } + runningTask.triggerSchemaChangeAfterCheckpoint().get(); + schemaChangePhase.set(SchemaChangePhase.createAfterPhase()); + log.info("triggered schema-change-after checkpoint, stopping collect data"); + } } else { Thread.sleep(100); } @@ -214,6 +251,9 @@ public void receivedSplits(List splits) { public void triggerBarrier(Barrier barrier) throws Exception { log.debug("source trigger barrier [{}]", barrier); + + long startTime = System.currentTimeMillis(); + // Block the reader from adding barrier to the collector. synchronized (collector.getCheckpointLock()) { if (barrier.prepareClose()) { @@ -230,6 +270,38 @@ public void triggerBarrier(Barrier barrier) throws Exception { collector.sendRecordToNext(new Record<>(barrier)); log.debug("send record to next finished, taskId: [{}]", runningTask.getTaskID()); } + + log.debug( + "trigger barrier [{}] finished, cost: {}ms. taskLocation: [{}]", + barrier.getId(), + System.currentTimeMillis() - startTime, + currentTaskLocation); + + CheckpointType checkpointType = ((CheckpointBarrier) barrier).getCheckpointType(); + if (schemaChanging() && checkpointType.isSchemaChangeCheckpoint()) { + if (checkpointType.isSchemaChangeBeforeCheckpoint() + && schemaChangePhase.get().isBeforePhase()) { + schemaChangePhase.get().setCheckpointId(barrier.getId()); + } else if (checkpointType.isSchemaChangeAfterCheckpoint() + && schemaChangePhase.get().isAfterPhase()) { + schemaChangePhase.get().setCheckpointId(barrier.getId()); + } else { + throw new IllegalStateException( + String.format( + "schema-change checkpoint[%s,%s] and phase[%s] is not matched", + barrier.getId(), + checkpointType, + schemaChangePhase.get().getPhase())); + } + log.info( + "lock checkpoint[{}] waiting for complete..., phase: [{}]", + barrier.getId(), + schemaChangePhase.get().getPhase()); + } + } + + private boolean schemaChanging() { + return schemaChangePhase.get() != null; } @Override @@ -240,6 +312,25 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception { @Override public void notifyCheckpointAborted(long checkpointId) throws Exception { reader.notifyCheckpointAborted(checkpointId); + if (schemaChangePhase.get() != null + && schemaChangePhase.get().getCheckpointId() == checkpointId) { + throw new IllegalStateException( + String.format( + "schema-change checkpoint[%s] is aborted, phase: [%s]", + checkpointId, schemaChangePhase.get().getPhase())); + } + } + + @Override + public void notifyCheckpointEnd(long checkpointId) throws Exception { + if (schemaChangePhase.get() != null + && schemaChangePhase.get().getCheckpointId() == checkpointId) { + log.info( + "notify schema-change checkpoint[{}] end, phase: [{}]", + checkpointId, + schemaChangePhase.get().getPhase()); + schemaChangePhase.set(null); + } } @Override @@ -269,4 +360,38 @@ public void restoreState(List actionStateList) throws Except throw new RuntimeException(e); } } + + @Getter + @ToString + @RequiredArgsConstructor(access = AccessLevel.PRIVATE) + private static class SchemaChangePhase implements Serializable { + private static final String PHASE_CHANGE_BEFORE = "SCHEMA-CHANGE-BEFORE"; + private static final String PHASE_CHANGE_AFTER = "SCHEMA-CHANGE-AFTER"; + + private final String phase; + private volatile long checkpointId = -1; + + public static SchemaChangePhase createBeforePhase() { + return new SchemaChangePhase(PHASE_CHANGE_BEFORE); + } + + public static SchemaChangePhase createAfterPhase() { + return new SchemaChangePhase(PHASE_CHANGE_AFTER); + } + + public boolean isBeforePhase() { + return PHASE_CHANGE_BEFORE.equals(phase); + } + + public boolean isAfterPhase() { + return PHASE_CHANGE_AFTER.equals(phase); + } + + public void setCheckpointId(long checkpointId) { + if (this.checkpointId != -1) { + throw new IllegalStateException("checkpointId is already set"); + } + this.checkpointId = checkpointId; + } + } }