Skip to content

Commit

Permalink
[improvement] Remove unnecessary changes.
Browse files Browse the repository at this point in the history
  • Loading branch information
AnemoneIndicum committed Jul 3, 2024
1 parent d89be6b commit 13be648
Show file tree
Hide file tree
Showing 17 changed files with 333 additions and 107 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.web.engine.flink.common.status;

/** This enum represents the status of a cluster heartbeat. */
public enum HeartbeatStatus {
/** * The cluster is active. */
ACTIVE,
/** Unable to connect to the target machine, usually due to network anomalies. */
UNREACHABLE,
/**
* The cluster is in an unknown state, usually due to a cluster don't result heartbeat
* information, but its network is normal.
*/
UNKNOWN,
/**
* The cluster is dead, usually due to a cluster don't obtain heartbeat information for a long
* time.
*/
DEAD;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.web.engine.flink.sql.gateway.client;

import org.apache.paimon.web.engine.flink.common.status.HeartbeatStatus;
import org.apache.paimon.web.engine.flink.sql.gateway.model.HeartbeatEntity;

/** Using to execute cluster heartbeat action. */
public interface HeartbeatAction {

/**
* Execute cluster action to obtain cluster status.
*
* @return cluster heartbeat entity.
*/
HeartbeatEntity checkClusterHeartbeat();

/**
* Build a heartbeat entity representing an error based on the cluster status. This method is
* used to generate a heartbeat object when the cluster status is abnormal, recording the
* current time and the error status of the cluster.
*
* @param status The current status of the cluster, used to set the status field of the
* heartbeat entity.
* @return Returns a completed heartbeat entity, including the current timestamp and status
* information.
*/
default HeartbeatEntity buildResulHeartbeatEntity(HeartbeatStatus status) {
return HeartbeatEntity.builder().status(status.name()).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,39 +18,56 @@

package org.apache.paimon.web.engine.flink.sql.gateway.client;

import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.paimon.web.engine.flink.common.status.HeartbeatStatus;
import org.apache.paimon.web.engine.flink.sql.gateway.model.HeartbeatEntity;
import org.apache.paimon.web.engine.flink.sql.gateway.utils.SqlGateWayRestClient;

import lombok.extern.slf4j.Slf4j;
import org.apache.flink.runtime.rest.handler.legacy.messages.ClusterOverviewWithVersion;
import org.apache.flink.runtime.rest.messages.ClusterOverviewHeaders;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.paimon.web.engine.flink.sql.gateway.utils.SqlGateWayRestClient;

import java.util.Objects;

/**
* The flink session client provides some operations on the flink session cluster,
* such as obtaining the cluster status. etc.
* The flink client implementation of the {@link ClusterAction}.
* The flink session client provides some operations on the flink session cluster, such as obtaining
* the cluster status. etc. The flink client implementation of the {@link HeartbeatAction}.
*/
public class SessionClusterClient implements ClusterAction{
@Slf4j
public class SessionClusterClient implements HeartbeatAction {

private final SqlGateWayRestClient restClient;

public SessionClusterClient(String sessionClusterHost, int sessionClusterPort) throws Exception {
public SessionClusterClient(String sessionClusterHost, int sessionClusterPort)
throws Exception {
this.restClient = new SqlGateWayRestClient(sessionClusterHost, sessionClusterPort);
}

@Override
public ImmutablePair<ClusterStatus, Long> checkClusterHeartbeat() throws Exception{
ClusterOverviewWithVersion heartbeat = restClient
.sendRequest(
ClusterOverviewHeaders.getInstance(),
EmptyMessageParameters.getInstance(),
EmptyRequestBody.getInstance())
.get();
if (Objects.nonNull(heartbeat)) {
return this.buildClusterHeartbeatOfSuccess();
public HeartbeatEntity checkClusterHeartbeat() {
try {
ClusterOverviewWithVersion heartbeat =
restClient
.sendRequest(
ClusterOverviewHeaders.getInstance(),
EmptyMessageParameters.getInstance(),
EmptyRequestBody.getInstance())
.get();
if (Objects.nonNull(heartbeat)) {
return HeartbeatEntity.builder()
.lastHeartbeat(System.currentTimeMillis())
.status(HeartbeatStatus.ACTIVE.name())
.clusterVersion(heartbeat.getVersion())
.build();
}
} catch (Exception ex) {
log.error(
"An exception occurred while obtaining the cluster status :{}",
ex.getMessage(),
ex);
return this.buildResulHeartbeatEntity(HeartbeatStatus.UNREACHABLE);
}
return this.buildClusterHeartbeatOfError();
return this.buildResulHeartbeatEntity(HeartbeatStatus.UNKNOWN);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@

package org.apache.paimon.web.engine.flink.sql.gateway.client;

import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.flink.table.gateway.rest.header.util.GetInfoHeaders;
import org.apache.flink.table.gateway.rest.message.util.GetInfoResponseBody;
import org.apache.paimon.web.engine.flink.common.status.HeartbeatStatus;
import org.apache.paimon.web.engine.flink.sql.gateway.model.HeartbeatEntity;
import org.apache.paimon.web.engine.flink.sql.gateway.model.SessionEntity;
import org.apache.paimon.web.engine.flink.sql.gateway.utils.SqlGateWayRestClient;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
Expand All @@ -41,6 +41,7 @@
import org.apache.flink.table.gateway.rest.header.statement.CompleteStatementHeaders;
import org.apache.flink.table.gateway.rest.header.statement.ExecuteStatementHeaders;
import org.apache.flink.table.gateway.rest.header.statement.FetchResultsHeaders;
import org.apache.flink.table.gateway.rest.header.util.GetInfoHeaders;
import org.apache.flink.table.gateway.rest.message.operation.OperationMessageParameters;
import org.apache.flink.table.gateway.rest.message.session.ConfigureSessionRequestBody;
import org.apache.flink.table.gateway.rest.message.session.GetSessionConfigResponseBody;
Expand All @@ -50,22 +51,24 @@
import org.apache.flink.table.gateway.rest.message.statement.ExecuteStatementRequestBody;
import org.apache.flink.table.gateway.rest.message.statement.FetchResultsMessageParameters;
import org.apache.flink.table.gateway.rest.message.statement.FetchResultsResponseBody;
import org.apache.flink.table.gateway.rest.message.util.GetInfoResponseBody;
import org.apache.flink.table.gateway.rest.util.RowFormat;

import javax.annotation.Nullable;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

/**
* The client of flink sql gateway provides some operations of flink sql gateway. such as creating
* session, execute statement, fetch result, etc.
*/
public class SqlGatewayClient implements ClusterAction{
@Slf4j
public class SqlGatewayClient implements HeartbeatAction {

private static final String DEFAULT_SESSION_NAME_PREFIX = "FLINK_SQL_GATEWAY_SESSION";
private static final int REQUEST_WAITE_TIME = 1000;
Expand Down Expand Up @@ -248,16 +251,29 @@ private OperationHandle buildOperationHandleByOperationId(String operationId) {
}

@Override
public ImmutablePair<ClusterStatus, Long> checkClusterHeartbeat() throws Exception {
GetInfoResponseBody heartbeat = restClient
.sendRequest(
GetInfoHeaders.getInstance(),
EmptyMessageParameters.getInstance(),
EmptyRequestBody.getInstance())
.get();
if (Objects.nonNull(heartbeat)) {
return this.buildClusterHeartbeatOfSuccess();
public HeartbeatEntity checkClusterHeartbeat() {
try {
GetInfoResponseBody heartbeat =
restClient
.sendRequest(
GetInfoHeaders.getInstance(),
EmptyMessageParameters.getInstance(),
EmptyRequestBody.getInstance())
.get();
if (Objects.nonNull(heartbeat)) {
return HeartbeatEntity.builder()
.lastHeartbeat(System.currentTimeMillis())
.status(HeartbeatStatus.ACTIVE.name())
.clusterVersion(heartbeat.getProductVersion())
.build();
}
} catch (Exception exec) {
log.error(
"An exception occurred while obtaining the cluster status :{}",
exec.getMessage(),
exec);
return this.buildResulHeartbeatEntity(HeartbeatStatus.UNREACHABLE);
}
return this.buildClusterHeartbeatOfError();
return this.buildResulHeartbeatEntity(HeartbeatStatus.UNKNOWN);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.web.engine.flink.sql.gateway.model;

import lombok.Builder;
import lombok.Getter;

/** This is a heartbeat entity of the cluster. */
@Builder
@Getter
public class HeartbeatEntity {
private String status;
private Long lastHeartbeat;
private String clusterVersion;
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,30 +16,32 @@
* limitations under the License.
*/

package org.apache.paimon.web.engine.flink.sql.gateway.client;

import org.apache.commons.lang3.tuple.ImmutablePair;
package org.apache.paimon.web.gateway.enums;

/**
* Using to execute cluster action
* The {@code DeploymentMode} enum defines the types of cluster deployment mode that can be
* supported.
*/
public interface ClusterAction {
public enum DeploymentMode {
YARN_SESSION("yarn-session"),
FLINK_SQL_GATEWAY("flink-sql-gateway");

/**
* Execute cluster action to obtain cluster status.
*
* @return cluster heartbeat entity
*/
ImmutablePair<ClusterStatus, Long> checkClusterHeartbeat() throws Exception;
private final String type;

default ImmutablePair<ClusterStatus, Long> buildClusterHeartbeatOfSuccess() {
return ImmutablePair.of(ClusterStatus.RUNNING,System.currentTimeMillis());
DeploymentMode(String type) {
this.type = type;
}
default ImmutablePair<ClusterStatus, Long> buildClusterHeartbeatOfError() {
return ImmutablePair.of(ClusterStatus.UNKNOWN,System.currentTimeMillis());

public String getType() {
return type;
}
enum ClusterStatus {
RUNNING,
UNKNOWN,

public static DeploymentMode fromName(String name) {
for (DeploymentMode type : values()) {
if (type.getType().equals(name)) {
return type;
}
}
throw new IllegalArgumentException("Unknown deployment mode type value: " + name);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@
/** The {@code EngineType} enum defines the types of engines that can be supported. */
public enum EngineType {
SPARK,
FLINK_SQL_GATEWAY,
FLINK_SESSION;
FLINK;

public static EngineType fromName(String name) {
for (EngineType type : values()) {
Expand All @@ -32,11 +31,4 @@ public static EngineType fromName(String name) {
}
throw new IllegalArgumentException("Unknown engine type value: " + name);
}
public static EngineType format(String name) {
if (name == null || name.isEmpty()) {
throw new IllegalArgumentException("The engine type value is null");
}
String trimmed = name.trim();
return fromName(trimmed.replaceAll("\\s+", "_"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public ExecutorFactoryProvider(ExecutionConfig executionConfig) {

public ExecutorFactory getExecutorFactory(EngineType type) {
switch (type) {
case FLINK_SQL_GATEWAY:
case FLINK:
return new FlinkSqlGatewayExecutorFactory(executionConfig.getSessionEntity());
default:
throw new UnsupportedOperationException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public class ExecutorFactoryProviderTest {
public void testGetExecutorFactoryWithFlink() {
ExecutionConfig config =
ExecutionConfig.builder().sessionEntity(SessionEntity.builder().build()).build();
EngineType engineType = EngineType.fromName("FLINK_SQL_GATEWAY");
EngineType engineType = EngineType.fromName("FLINK");
ExecutorFactoryProvider executorFactoryProvider = new ExecutorFactoryProvider(config);
assertSame(
FlinkSqlGatewayExecutorFactory.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.paimon.web.server.util.PageSupport;

import cn.dev33.satoken.annotation.SaCheckPermission;
import cn.dev33.satoken.annotation.SaIgnore;
import com.baomidou.mybatisplus.core.metadata.IPage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.validation.annotation.Validated;
Expand Down Expand Up @@ -95,9 +96,9 @@ public R<Void> delete(@PathVariable Integer[] clusterIds) {
return clusterService.deleteClusterByIds(clusterIds) > 0 ? R.succeed() : R.failed();
}

@SaCheckPermission("system:cluster:check")
@SaIgnore
@PostMapping("/check")
public R<Void> check(@Validated @RequestBody ClusterInfo clusterInfo) {
return clusterService.checkClusterStatus(clusterInfo) ? R.succeed() : R.failed();
return clusterService.checkClusterHeartbeatStatus(clusterInfo) ? R.succeed() : R.failed();
}
}
Loading

0 comments on commit 13be648

Please sign in to comment.