Skip to content

Commit

Permalink
[INLONG-10387][Audit] Audit SDK supports obtaining Audit-Proxy confid…
Browse files Browse the repository at this point in the history
…ence through the InLong Manager
  • Loading branch information
doleyzi committed Jun 13, 2024
1 parent 3186d0a commit e4a90f0
Show file tree
Hide file tree
Showing 13 changed files with 387 additions and 23 deletions.
4 changes: 4 additions & 0 deletions inlong-audit/audit-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@
<groupId>io.netty</groupId>
<artifactId>netty-buffer</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,29 @@
* limitations under the License.
*/

package org.apache.inlong.audit.entities;
package org.apache.inlong.audit.entity;

public enum AuditComponent {

AGENT("Agent"), DATAPROXY("DataProxy"), SORT("Sort"), COMMON_AUDIT("Common");
private final String component;

/**
* Constructor for the enum.
*
* @param component the name of the component
*/

AuditComponent(String component) {
this.component = component;
}

/**
* Returns the name of the component.
*
* @return the name of the component
*/

public String getComponent() {
return component;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.inlong.audit.entities;
package org.apache.inlong.audit.entity;

import lombok.AllArgsConstructor;
import lombok.Data;
Expand All @@ -24,6 +24,23 @@
@AllArgsConstructor
public class AuditProxy {

/**
* The host of the audit proxy.
*/
private String host;

/**
* The port of the audit proxy.
*/
private int port;

/**
* Returns a string representation of the audit proxy.
*
* @return a string representation of the audit proxy
*/
@Override
public String toString() {
return String.format("%s:%d", host, port);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.audit.entity;

import lombok.AllArgsConstructor;
import lombok.Data;

/**
* Class representing an authentication configuration.
*/
@AllArgsConstructor
@Data
public class AuthConfig {

/**
* The secret ID for authentication.
*/
private String secretId;

/**
* The secret key for authentication.
*/
private String secretKey;

/**
* The token for authentication.
*/
private String token;

/**
* The service name for authentication.
*/
private String serviceName;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.audit.entity;

import com.google.gson.Gson;
import lombok.Data;

import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.List;

/**
* Class representing a common response.
*/
@Data
public class CommonResponse<T> {

/**
* Gson instance for JSON serialization and deserialization.
*/
private static final Gson gson = new Gson();

/**
* Error message of the response.
*/
private String errMsg;

/**
* Success status of the response.
*/
private boolean success;

/**
* Data of the response.
*/
private List<T> data;

/**
* Converts a JSON string to a CommonResponse object.
*
* @param json the JSON string
* @param clazz the class of the data
* @return a CommonResponse object
*/
public static CommonResponse fromJson(String json, Class clazz) {
Type objectType = type(CommonResponse.class, clazz);
return gson.fromJson(json, objectType);
}

/**
* Returns a parameterized type.
*
* @param raw the raw type
* @param args the actual type arguments
* @return a parameterized type
*/
private static ParameterizedType type(final Class raw, final Type... args) {
return new ParameterizedType() {

@Override
public Type getRawType() {
return raw;
}

@Override
public Type[] getActualTypeArguments() {
return args;
}

@Override
public Type getOwnerType() {
return null;
}
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.audit.utils;

import org.apache.inlong.audit.entity.AuthConfig;
import org.apache.inlong.common.util.BasicAuth;

import org.apache.http.client.HttpClient;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.conn.ssl.TrustSelfSignedStrategy;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.ssl.SSLContextBuilder;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.net.ssl.SSLContext;

import java.util.HashMap;
import java.util.Map;

public class HttpUtils {

private static final Logger LOGGER = LoggerFactory.getLogger(HttpUtils.class);
private static final String PARAM_COMPONENT = "component";
private static HttpClient httpClient = null;

static {
try {
SSLContext sslContext = SSLContextBuilder.create()
.loadTrustMaterial(new TrustSelfSignedStrategy())
.build();

httpClient = HttpClientBuilder.create()
.setSSLContext(sslContext)
.build();
} catch (Exception e) {
LOGGER.error("Error initializing SSL context or HTTP client", e);
}
}

public static Map<String, String> getAuthHeader(AuthConfig authConfig) {
Map<String, String> header = new HashMap<>();
try {
header.put(BasicAuth.BASIC_AUTH_HEADER,
BasicAuth.genBasicAuthCredential(authConfig.getSecretId(), authConfig.getSecretKey()));
} catch (Exception e) {
LOGGER.error("Get auth header error", e);
}
return header;
}
public static String sendGet(String component, String url, AuthConfig authConfig, int timeoutMs) {
if (httpClient == null) {
LOGGER.error("httpClient is null");
return null;
}
try {
RequestConfig requestConfig = RequestConfig.custom()
.setConnectTimeout(timeoutMs)
.setConnectionRequestTimeout(timeoutMs)
.setSocketTimeout(timeoutMs)
.build();
URIBuilder uriBuilder = new URIBuilder(url);
uriBuilder.addParameter(PARAM_COMPONENT, component);
String finalUrl = uriBuilder.build().toString();

HttpGet request = new HttpGet(finalUrl);
request.setConfig(requestConfig);

Map<String, String> authHeaders = getAuthHeader(authConfig);
for (Map.Entry<String, String> entry : authHeaders.entrySet()) {
request.addHeader(entry.getKey(), entry.getValue());
}

try (CloseableHttpResponse response = (CloseableHttpResponse) httpClient.execute(request)) {
String responseStr = EntityUtils.toString(response.getEntity());
LOGGER.info("Http response: {}", responseStr);
if (responseStr != null && !responseStr.isEmpty()
&& response.getStatusLine().getStatusCode() == 200) {
return responseStr;
}
}
} catch (Exception e) {
LOGGER.error("Send get request has exception", e);
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

import static org.apache.inlong.audit.entities.AuditComponent.COMMON_AUDIT;
import static org.apache.inlong.audit.entity.AuditComponent.COMMON_AUDIT;

public class Heartbeat {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import static org.apache.inlong.audit.entity.AuditType.POSTGRES;
import static org.apache.inlong.audit.entity.AuditType.SDK;
import static org.apache.inlong.audit.entity.AuditType.STARROCKS;
import static org.apache.inlong.audit.entity.AuditType.TUBE;
import static org.apache.inlong.audit.entity.AuditType.TUBE_MQ;
import static org.apache.inlong.audit.entity.FlowType.INPUT;
import static org.apache.inlong.audit.entity.FlowType.OUTPUT;

Expand Down Expand Up @@ -91,8 +91,8 @@ public enum AuditIdEnum {
SORT_BINLOG_INPUT(29, INPUT, BINLOG, "Received Audit Metrics for Sort Binlog"),
SORT_BINLOG_OUTPUT(30, OUTPUT, BINLOG, "Sent Audit Metrics for Sort Binlog"),

SORT_TUBE_INPUT(33, INPUT, TUBE, "Received Audit Metrics for Sort Tube"),
SORT_TUBE_OUTPUT(34, OUTPUT, TUBE, "Sent Audit Metrics for Sort Tube"),
SORT_TUBE_INPUT(33, INPUT, TUBE_MQ, "Received Audit Metrics for Sort Tube"),
SORT_TUBE_OUTPUT(34, OUTPUT, TUBE_MQ, "Sent Audit Metrics for Sort Tube"),

SORT_MYSQL_INPUT(35, INPUT, MYSQL, "Received Audit Metrics for Sort MySQL"),
SORT_MYSQL_OUTPUT(36, OUTPUT, MYSQL, "Sent Audit Metrics for Sort MySQL"),
Expand Down
Loading

0 comments on commit e4a90f0

Please sign in to comment.