Skip to content

Commit

Permalink
[INLONG-7567][Sort] Extract metrics as common parameters (apache#7568)
Browse files Browse the repository at this point in the history
(cherry picked from commit 3e96b03)
  • Loading branch information
e-mhui authored and menghuiyu committed Mar 31, 2023
1 parent 995fc07 commit 01d2cc8
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
/** A basic Source configuration which is used by {@link IncrementalSource}.
* Copy from com.ververica:flink-cdc-base:2.3.0.
* */
public abstract class BaseSourceConfig implements SourceConfig {
public abstract class BaseSourceConfig implements SourceConfig, MetricConfig {

private static final long serialVersionUID = 1L;

Expand All @@ -42,6 +42,12 @@ public abstract class BaseSourceConfig implements SourceConfig {
protected final Properties dbzProperties;
protected transient Configuration dbzConfiguration;

// --------------------------------------------------------------------------------------------
// Metric Configurations
// --------------------------------------------------------------------------------------------
protected final String inlongMetric;
protected final String inlongAudit;

public BaseSourceConfig(
StartupOptions startupOptions,
int splitSize,
Expand All @@ -50,7 +56,9 @@ public BaseSourceConfig(
double distributionFactorLower,
boolean includeSchemaChanges,
Properties dbzProperties,
Configuration dbzConfiguration) {
Configuration dbzConfiguration,
String inlongMetric,
String inlongAudit) {
this.startupOptions = startupOptions;
this.splitSize = splitSize;
this.splitMetaGroupSize = splitMetaGroupSize;
Expand All @@ -59,6 +67,8 @@ public BaseSourceConfig(
this.includeSchemaChanges = includeSchemaChanges;
this.dbzProperties = dbzProperties;
this.dbzConfiguration = dbzConfiguration;
this.inlongMetric = inlongMetric;
this.inlongAudit = inlongAudit;
}

@Override
Expand Down Expand Up @@ -96,4 +106,15 @@ public Properties getDbzProperties() {
public Configuration getDbzConfiguration() {
return Configuration.from(dbzProperties);
}

@Override
public String getInlongMetric() {
return inlongMetric;
}

@Override
public String getInlongAudit() {
return inlongAudit;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,6 @@ public abstract class JdbcSourceConfig extends BaseSourceConfig {
protected final int connectionPoolSize;
protected final String chunkKeyColumn;

protected final String inlongMetric;
protected final String inlongAudit;

public JdbcSourceConfig(
StartupOptions startupOptions,
List<String> databaseList,
Expand Down Expand Up @@ -80,7 +77,9 @@ public JdbcSourceConfig(
distributionFactorLower,
includeSchemaChanges,
dbzProperties,
dbzConfiguration);
dbzConfiguration,
inlongMetric,
inlongAudit);
this.driverClassName = driverClassName;
this.hostname = hostname;
this.port = port;
Expand All @@ -95,8 +94,6 @@ public JdbcSourceConfig(
this.connectionPoolSize = connectionPoolSize;
this.chunkKeyColumn = chunkKeyColumn;

this.inlongMetric = inlongMetric;
this.inlongAudit = inlongAudit;
}

public abstract RelationalDatabaseConnectorConfig getDbzConnectorConfig();
Expand Down Expand Up @@ -153,11 +150,4 @@ public String getChunkKeyColumn() {
return chunkKeyColumn;
}

public String getInlongMetric() {
return inlongMetric;
}

public String getInlongAudit() {
return inlongAudit;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sort.cdc.base.config;

import java.io.Serializable;

/** The mertic configuration which offers basic metric configuration. **/
public interface MetricConfig extends Serializable {

/**
* getInlongMetric
*
* @return a label of inlong metric
*/
String getInlongMetric();

/**
* getInlongAudit
*
* @return an address of inlong audit
*/
String getInlongAudit();

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.inlong.sort.base.metric.MetricOption;
import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
import org.apache.inlong.sort.cdc.base.config.JdbcSourceConfig;
import org.apache.inlong.sort.cdc.base.config.MetricConfig;
import org.apache.inlong.sort.cdc.base.config.SourceConfig;
import org.apache.inlong.sort.cdc.base.debezium.DebeziumDeserializationSchema;
import org.apache.inlong.sort.cdc.base.dialect.DataSourceDialect;
Expand Down Expand Up @@ -109,7 +109,7 @@ public IncrementalSourceReader<T, C> createReader(SourceReaderContext readerCont
throws Exception {
// create source config for the given subtask (e.g. unique server id)
C sourceConfig = configFactory.create(readerContext.getIndexOfSubtask());
JdbcSourceConfig jdbcSourceConfig = (JdbcSourceConfig) sourceConfig;
MetricConfig metricConfig = (MetricConfig) sourceConfig;
FutureCompletingBlockingQueue<RecordsWithSplitIds<SourceRecords>> elementsQueue =
new FutureCompletingBlockingQueue<>();

Expand All @@ -121,8 +121,8 @@ public IncrementalSourceReader<T, C> createReader(SourceReaderContext readerCont

// create source config for the given subtask (e.g. unique server id)
MetricOption metricOption = MetricOption.builder()
.withInlongLabels(jdbcSourceConfig.getInlongMetric())
.withInlongAudit(jdbcSourceConfig.getInlongAudit())
.withInlongLabels(metricConfig.getInlongMetric())
.withAuditAddress(metricConfig.getInlongAudit())
.withRegisterMetric(RegisteredMetric.ALL)
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import org.apache.inlong.sort.base.Constants;
import org.apache.inlong.sort.base.metric.MetricOption;
import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
import org.apache.inlong.sort.cdc.base.config.JdbcSourceConfig;
import org.apache.inlong.sort.cdc.base.config.MetricConfig;
import org.apache.inlong.sort.cdc.base.config.SourceConfig;
import org.apache.inlong.sort.cdc.base.debezium.DebeziumDeserializationSchema;
import org.apache.inlong.sort.cdc.base.dialect.DataSourceDialect;
Expand Down Expand Up @@ -111,7 +111,7 @@ public IncrementalSourceReader<T, C> createReader(SourceReaderContext readerCont
throws Exception {
// create source config for the given subtask (e.g. unique server id)
C sourceConfig = configFactory.create(readerContext.getIndexOfSubtask());
JdbcSourceConfig jdbcSourceConfig = (JdbcSourceConfig) sourceConfig;
MetricConfig metricConfig = (MetricConfig) sourceConfig;
FutureCompletingBlockingQueue<RecordsWithSplitIds<SourceRecords>> elementsQueue =
new FutureCompletingBlockingQueue<>();

Expand All @@ -123,8 +123,8 @@ public IncrementalSourceReader<T, C> createReader(SourceReaderContext readerCont

// create source config for the given subtask (e.g. unique server id)
MetricOption metricOption = MetricOption.builder()
.withInlongLabels(jdbcSourceConfig.getInlongMetric())
.withInlongAudit(jdbcSourceConfig.getInlongAudit())
.withInlongLabels(metricConfig.getInlongMetric())
.withAuditAddress(metricConfig.getInlongAudit())
.withRegisterMetric(RegisteredMetric.ALL)
.build();

Expand Down

0 comments on commit 01d2cc8

Please sign in to comment.