Skip to content

Commit

Permalink
[Feature] Add cluster status check interface and scheduled tasks (#453)
Browse files Browse the repository at this point in the history
  • Loading branch information
AnemoneIndicum authored Jul 8, 2024
1 parent 1fd762d commit ece930e
Show file tree
Hide file tree
Showing 12 changed files with 533 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.web.engine.flink.common.status;

/** This enum represents the status of a cluster heartbeat. */
public enum HeartbeatStatus {
/** * The cluster is active. */
ACTIVE,
/** Unable to connect to the target machine, usually due to network anomalies. */
UNREACHABLE,
/**
* The cluster is in an unknown state, usually due to a cluster don't result heartbeat
* information, but its network is normal.
*/
UNKNOWN,
/**
* The cluster is dead, usually due to a cluster don't obtain heartbeat information for a long
* time.
*/
DEAD;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.web.engine.flink.sql.gateway.client;

import org.apache.paimon.web.engine.flink.common.status.HeartbeatStatus;
import org.apache.paimon.web.engine.flink.sql.gateway.model.HeartbeatEntity;

/** Using to execute cluster heartbeat action. */
public interface HeartbeatAction {

/**
* Execute cluster action to obtain cluster status.
*
* @return cluster heartbeat entity.
*/
HeartbeatEntity checkClusterHeartbeat();

/**
* Build a heartbeat entity representing an error based on the cluster status. This method is
* used to generate a heartbeat object when the cluster status is abnormal, recording the
* current time and the error status of the cluster.
*
* @param status The current status of the cluster, used to set the status field of the
* heartbeat entity.
* @return Returns a completed heartbeat entity, including the current timestamp and status
* information.
*/
default HeartbeatEntity buildResulHeartbeatEntity(HeartbeatStatus status) {
return HeartbeatEntity.builder().status(status.name()).build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.web.engine.flink.sql.gateway.client;

import org.apache.paimon.web.engine.flink.common.status.HeartbeatStatus;
import org.apache.paimon.web.engine.flink.sql.gateway.model.HeartbeatEntity;
import org.apache.paimon.web.engine.flink.sql.gateway.utils.SqlGateWayRestClient;

import lombok.extern.slf4j.Slf4j;
import org.apache.flink.runtime.rest.handler.legacy.messages.ClusterOverviewWithVersion;
import org.apache.flink.runtime.rest.messages.ClusterOverviewHeaders;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;

import java.util.Objects;

/**
* The flink session client provides some operations on the flink session cluster, such as obtaining
* the cluster status. etc. The flink client implementation of the {@link HeartbeatAction}.
*/
@Slf4j
public class SessionClusterClient implements HeartbeatAction {

private final SqlGateWayRestClient restClient;

public SessionClusterClient(String sessionClusterHost, int sessionClusterPort)
throws Exception {
this.restClient = new SqlGateWayRestClient(sessionClusterHost, sessionClusterPort);
}

@Override
public HeartbeatEntity checkClusterHeartbeat() {
try {
ClusterOverviewWithVersion heartbeat =
restClient
.sendRequest(
ClusterOverviewHeaders.getInstance(),
EmptyMessageParameters.getInstance(),
EmptyRequestBody.getInstance())
.get();
if (Objects.nonNull(heartbeat)) {
return HeartbeatEntity.builder()
.lastHeartbeat(System.currentTimeMillis())
.status(HeartbeatStatus.ACTIVE.name())
.clusterVersion(heartbeat.getVersion())
.build();
}
} catch (Exception ex) {
log.error(
"An exception occurred while obtaining the cluster status :{}",
ex.getMessage(),
ex);
return this.buildResulHeartbeatEntity(HeartbeatStatus.UNREACHABLE);
}
return this.buildResulHeartbeatEntity(HeartbeatStatus.UNKNOWN);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@

package org.apache.paimon.web.engine.flink.sql.gateway.client;

import org.apache.paimon.web.engine.flink.common.status.HeartbeatStatus;
import org.apache.paimon.web.engine.flink.sql.gateway.model.HeartbeatEntity;
import org.apache.paimon.web.engine.flink.sql.gateway.model.SessionEntity;
import org.apache.paimon.web.engine.flink.sql.gateway.utils.SqlGateWayRestClient;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
Expand All @@ -38,6 +41,7 @@
import org.apache.flink.table.gateway.rest.header.statement.CompleteStatementHeaders;
import org.apache.flink.table.gateway.rest.header.statement.ExecuteStatementHeaders;
import org.apache.flink.table.gateway.rest.header.statement.FetchResultsHeaders;
import org.apache.flink.table.gateway.rest.header.util.GetInfoHeaders;
import org.apache.flink.table.gateway.rest.message.operation.OperationMessageParameters;
import org.apache.flink.table.gateway.rest.message.session.ConfigureSessionRequestBody;
import org.apache.flink.table.gateway.rest.message.session.GetSessionConfigResponseBody;
Expand All @@ -47,21 +51,24 @@
import org.apache.flink.table.gateway.rest.message.statement.ExecuteStatementRequestBody;
import org.apache.flink.table.gateway.rest.message.statement.FetchResultsMessageParameters;
import org.apache.flink.table.gateway.rest.message.statement.FetchResultsResponseBody;
import org.apache.flink.table.gateway.rest.message.util.GetInfoResponseBody;
import org.apache.flink.table.gateway.rest.util.RowFormat;

import javax.annotation.Nullable;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

/**
* The client of flink sql gateway provides some operations of flink sql gateway. such as creating
* session, execute statement, fetch result, etc.
*/
public class SqlGatewayClient {
@Slf4j
public class SqlGatewayClient implements HeartbeatAction {

private static final String DEFAULT_SESSION_NAME_PREFIX = "FLINK_SQL_GATEWAY_SESSION";
private static final int REQUEST_WAITE_TIME = 1000;
Expand Down Expand Up @@ -242,4 +249,31 @@ private SessionHandle buildSessionHandleBySessionId(String sessionId) {
private OperationHandle buildOperationHandleByOperationId(String operationId) {
return new OperationHandle(UUID.fromString(operationId));
}

@Override
public HeartbeatEntity checkClusterHeartbeat() {
try {
GetInfoResponseBody heartbeat =
restClient
.sendRequest(
GetInfoHeaders.getInstance(),
EmptyMessageParameters.getInstance(),
EmptyRequestBody.getInstance())
.get();
if (Objects.nonNull(heartbeat)) {
return HeartbeatEntity.builder()
.lastHeartbeat(System.currentTimeMillis())
.status(HeartbeatStatus.ACTIVE.name())
.clusterVersion(heartbeat.getProductVersion())
.build();
}
} catch (Exception exec) {
log.error(
"An exception occurred while obtaining the cluster status :{}",
exec.getMessage(),
exec);
return this.buildResulHeartbeatEntity(HeartbeatStatus.UNREACHABLE);
}
return this.buildResulHeartbeatEntity(HeartbeatStatus.UNKNOWN);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.web.engine.flink.sql.gateway.model;

import lombok.Builder;
import lombok.Getter;

/** This is a heartbeat entity of the cluster. */
@Builder
@Getter
public class HeartbeatEntity {
private String status;
private Long lastHeartbeat;
private String clusterVersion;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.web.gateway.enums;

/**
* The {@code DeploymentMode} enum defines the types of cluster deployment mode that can be
* supported.
*/
public enum DeploymentMode {
YARN_SESSION("yarn-session"),
FLINK_SQL_GATEWAY("flink-sql-gateway");

private final String type;

DeploymentMode(String type) {
this.type = type;
}

public String getType() {
return type;
}

public static DeploymentMode fromName(String name) {
for (DeploymentMode type : values()) {
if (type.getType().equals(name)) {
return type;
}
}
throw new IllegalArgumentException("Unknown deployment mode type value: " + name);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.paimon.web.server.util.PageSupport;

import cn.dev33.satoken.annotation.SaCheckPermission;
import cn.dev33.satoken.annotation.SaIgnore;
import com.baomidou.mybatisplus.core.metadata.IPage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.validation.annotation.Validated;
Expand Down Expand Up @@ -94,4 +95,10 @@ public R<Void> update(@Validated @RequestBody ClusterInfo clusterInfo) {
public R<Void> delete(@PathVariable Integer[] clusterIds) {
return clusterService.deleteClusterByIds(clusterIds) > 0 ? R.succeed() : R.failed();
}

@SaIgnore
@PostMapping("/check")
public R<Void> check(@Validated @RequestBody ClusterInfo clusterInfo) {
return clusterService.checkClusterHeartbeatStatus(clusterInfo) ? R.succeed() : R.failed();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;

import java.time.LocalDateTime;

/** Cluster table model. */
@Data
@Builder
Expand All @@ -44,5 +46,11 @@ public class ClusterInfo extends BaseModel {

private String type;

private String deploymentMode;

private Boolean enabled;

private String heartbeatStatus;

private LocalDateTime lastHeartbeat;
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,6 @@ public interface ClusterService extends IService<ClusterInfo> {
boolean checkClusterNameUnique(ClusterInfo cluster);

int deleteClusterByIds(Integer[] clusterIds);

boolean checkClusterHeartbeatStatus(ClusterInfo clusterInfo);
}
Loading

0 comments on commit ece930e

Please sign in to comment.