Skip to content

Commit

Permalink
[INLONG-7952][Sort] Mask sensitive message of Flink SQL in the logs (a…
Browse files Browse the repository at this point in the history
…pache#7953)

(cherry picked from commit 3f4ebb5)
  • Loading branch information
menghuiyu committed May 5, 2023
1 parent 7688703 commit 3b540a4
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -278,8 +278,13 @@ private static int maskStartPosition(int idxSeparator, String separator, StringB
return charPos;
}

public static String removeMaskMsg(String sql) {
StringBuilder buffer = new StringBuilder(sql);
/**
* mask sensitive message
* @param message raw message
* @return non-sensitive message
*/
public static String maskSensitiveMessage(String message) {
StringBuilder buffer = new StringBuilder(message);
mask(buffer);
return buffer.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@

package org.apache.inlong.sort.parser.impl;

import static org.apache.inlong.common.util.MaskDataUtils.maskSensitiveMessage;

import com.google.common.base.Preconditions;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.inlong.common.util.MaskDataUtils;
import org.apache.inlong.sort.configuration.Constants;
import org.apache.inlong.sort.formats.base.TableFormatUtils;
import org.apache.inlong.sort.formats.common.ArrayFormatInfo;
Expand Down Expand Up @@ -263,7 +264,7 @@ private void parseNode(Node node, NodeRelation relation, Map<String, Node> nodeM
if (node instanceof ExtractNode) {
log.info("start parse node, node id:{}", node.getId());
String sql = genCreateSql(node);
log.info("node id:{}, create table sql:\n{}", node.getId(), MaskDataUtils.removeMaskMsg(sql));
log.info("node id:{}, create table sql:\n{}", node.getId(), maskSensitiveMessage(sql));
registerTableSql(node, sql);
hasParsedSet.add(node.getId());
} else {
Expand All @@ -278,11 +279,11 @@ private void parseNode(Node node, NodeRelation relation, Map<String, Node> nodeM
}
if (node instanceof LoadNode) {
String createSql = genCreateSql(node);
log.info("node id:{}, create table sql:\n{}", node.getId(), MaskDataUtils.removeMaskMsg(createSql));
log.info("node id:{}, create table sql:\n{}", node.getId(), maskSensitiveMessage(createSql));
registerTableSql(node, createSql);
LoadNode loadNode = (LoadNode) node;
String insertSql = genLoadNodeInsertSql(loadNode, relation, nodeMap);
log.info("node id:{}, insert sql:\n{}", node.getId(), MaskDataUtils.removeMaskMsg(insertSql));
log.info("node id:{}, insert sql:\n{}", node.getId(), maskSensitiveMessage(insertSql));
insertSqls.add(insertSql);
hasParsedSet.add(node.getId());
} else if (node instanceof TransformNode) {
Expand All @@ -292,9 +293,9 @@ private void parseNode(Node node, NodeRelation relation, Map<String, Node> nodeM
Preconditions.checkState(!transformNode.getFieldRelations().isEmpty(),
"field relations is empty");
String createSql = genCreateSql(node);
log.info("node id:{}, create table sql:\n{}", node.getId(), MaskDataUtils.removeMaskMsg(createSql));
log.info("node id:{}, create table sql:\n{}", node.getId(), maskSensitiveMessage(createSql));
String selectSql = genTransformSelectSql(transformNode, relation, nodeMap);
log.info("node id:{}, tansform sql:\n{}", node.getId(), MaskDataUtils.removeMaskMsg(selectSql));
log.info("node id:{}, transform sql:\n{}", node.getId(), maskSensitiveMessage(selectSql));
registerTableSql(node, createSql + " AS\n" + selectSql);
hasParsedSet.add(node.getId());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.inlong.sort.parser.result;

import static org.apache.inlong.common.util.MaskDataUtils.maskSensitiveMessage;

import com.google.common.base.Preconditions;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -26,7 +28,6 @@

import java.io.Serializable;
import java.util.List;
import org.apache.inlong.common.util.MaskDataUtils;

/**
* Flink sql parse result, It is a concrete implementation of ParseResult
Expand Down Expand Up @@ -85,7 +86,7 @@ private TableResult executeLoadSqls(List<String> sqls) {

private void executeCreateTableSqls(List<String> sqls) {
for (String sql : sqls) {
log.info("execute createSql:\n{}", MaskDataUtils.removeMaskMsg(sql));
log.info("execute createSql:\n{}", maskSensitiveMessage(sql));
tableEnv.executeSql(sql);
}
}
Expand Down

0 comments on commit 3b540a4

Please sign in to comment.