changeConfigs = persistService.findChangeConfig(startTime, endTime);
+ LogUtil.defaultLog.warn("changeConfig count:{}", changeConfigs.size());
+ for (ConfigInfoWrapper cf : changeConfigs) {
+ boolean result = ConfigCacheService
+ .dumpChange(cf.getDataId(), cf.getGroup(), cf.getTenant(), cf.getContent(), cf.getLastModified());
+ final String content = cf.getContent();
+ final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE);
+ LogUtil.defaultLog.info("[dump-change-ok] {}, {}, length={}, md5={}",
+ new Object[] {GroupKey2.getKey(cf.getDataId(), cf.getGroup()), cf.getLastModified(),
+ content.length(), md5});
+ }
+ ConfigCacheService.reloadConfig();
+ long endChangeConfigTime = System.currentTimeMillis();
+ LogUtil.defaultLog.warn("changeConfig done,cost:{}", endChangeConfigTime - startChangeConfigTime);
+ return true;
+ }
+
+ // =====================
+
+ final DumpService dumpService;
+
+ final PersistService persistService;
+
+ final Timestamp startTime;
+
+ final Timestamp endTime;
+}
diff --git a/config/src/main/java/com/alibaba/nacos/config/server/service/dump/processor/DumpProcessor.java b/config/src/main/java/com/alibaba/nacos/config/server/service/dump/processor/DumpProcessor.java
new file mode 100644
index 00000000000..963ef860005
--- /dev/null
+++ b/config/src/main/java/com/alibaba/nacos/config/server/service/dump/processor/DumpProcessor.java
@@ -0,0 +1,93 @@
+/*
+ * Copyright 1999-2018 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.nacos.config.server.service.dump.processor;
+
+import com.alibaba.nacos.config.server.manager.AbstractTask;
+import com.alibaba.nacos.config.server.manager.TaskProcessor;
+import com.alibaba.nacos.config.server.model.ConfigInfo;
+import com.alibaba.nacos.config.server.model.ConfigInfo4Beta;
+import com.alibaba.nacos.config.server.model.ConfigInfo4Tag;
+import com.alibaba.nacos.config.server.model.event.ConfigDumpEvent;
+import com.alibaba.nacos.config.server.service.dump.DumpConfigHandler;
+import com.alibaba.nacos.config.server.service.dump.DumpService;
+import com.alibaba.nacos.config.server.service.dump.task.DumpTask;
+import com.alibaba.nacos.config.server.service.repository.PersistService;
+import com.alibaba.nacos.config.server.utils.GroupKey2;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.Objects;
+
+/**
+ * dump processor.
+ *
+ * @author Nacos
+ * @date 2020/7/5 12:19 PM
+ */
+public class DumpProcessor implements TaskProcessor {
+
+ public DumpProcessor(DumpService dumpService) {
+ this.dumpService = dumpService;
+ }
+
+ @Override
+ public boolean process(String taskType, AbstractTask task) {
+ final PersistService persistService = dumpService.getPersistService();
+ DumpTask dumpTask = (DumpTask) task;
+ String[] pair = GroupKey2.parseKey(dumpTask.getGroupKey());
+ String dataId = pair[0];
+ String group = pair[1];
+ String tenant = pair[2];
+ long lastModified = dumpTask.getLastModified();
+ String handleIp = dumpTask.getHandleIp();
+ boolean isBeta = dumpTask.isBeta();
+ String tag = dumpTask.getTag();
+
+ ConfigDumpEvent.ConfigDumpEventBuilder build = ConfigDumpEvent.builder().namespaceId(tenant).dataId(dataId)
+ .group(group).isBeta(isBeta).tag(tag).lastModifiedTs(lastModified).handleIp(handleIp);
+
+ if (isBeta) {
+ // beta发布,则dump数据,更新beta缓存
+ ConfigInfo4Beta cf = persistService.findConfigInfo4Beta(dataId, group, tenant);
+
+ build.remove(Objects.isNull(cf));
+ build.betaIps(Objects.isNull(cf) ? null : cf.getBetaIps());
+ build.content(Objects.isNull(cf) ? null : cf.getContent());
+
+ return DumpConfigHandler.configDump(build.build());
+ } else {
+ if (StringUtils.isBlank(tag)) {
+ ConfigInfo cf = persistService.findConfigInfo(dataId, group, tenant);
+
+ build.remove(Objects.isNull(cf));
+ build.content(Objects.isNull(cf) ? null : cf.getContent());
+ build.type(Objects.isNull(cf) ? null : cf.getType());
+
+ return DumpConfigHandler.configDump(build.build());
+ } else {
+
+ ConfigInfo4Tag cf = persistService.findConfigInfo4Tag(dataId, group, tenant, tag);
+
+ build.remove(Objects.isNull(cf));
+ build.content(Objects.isNull(cf) ? null : cf.getContent());
+
+ return DumpConfigHandler.configDump(build.build());
+ }
+ }
+ }
+
+ final DumpService dumpService;
+}
diff --git a/config/src/main/java/com/alibaba/nacos/config/server/service/dump/task/DumpAllBetaTask.java b/config/src/main/java/com/alibaba/nacos/config/server/service/dump/task/DumpAllBetaTask.java
new file mode 100644
index 00000000000..21d3e99b60e
--- /dev/null
+++ b/config/src/main/java/com/alibaba/nacos/config/server/service/dump/task/DumpAllBetaTask.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 1999-2018 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.nacos.config.server.service.dump.task;
+
+import com.alibaba.nacos.config.server.manager.AbstractTask;
+
+/**
+ * Dump all beta task.
+ *
+ * @author Nacos
+ * @date 2020/7/5 12:19 PM
+ */
+public class DumpAllBetaTask extends AbstractTask {
+
+ @Override
+ public void merge(AbstractTask task) {
+ }
+
+ public static final String TASK_ID = "dumpAllBetaConfigTask";
+}
diff --git a/config/src/main/java/com/alibaba/nacos/config/server/service/dump/task/DumpAllTagTask.java b/config/src/main/java/com/alibaba/nacos/config/server/service/dump/task/DumpAllTagTask.java
new file mode 100644
index 00000000000..ec1121751a7
--- /dev/null
+++ b/config/src/main/java/com/alibaba/nacos/config/server/service/dump/task/DumpAllTagTask.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 1999-2018 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.nacos.config.server.service.dump.task;
+
+import com.alibaba.nacos.config.server.manager.AbstractTask;
+
+/**
+ * Dump all tag task.
+ *
+ * @author Nacos
+ * @date 2020/7/5 12:19 PM
+ */
+public class DumpAllTagTask extends AbstractTask {
+
+ @Override
+ public void merge(AbstractTask task) {
+ }
+
+ public static final String TASK_ID = "dumpAllTagConfigTask";
+}
diff --git a/config/src/main/java/com/alibaba/nacos/config/server/service/dump/task/DumpAllTask.java b/config/src/main/java/com/alibaba/nacos/config/server/service/dump/task/DumpAllTask.java
new file mode 100644
index 00000000000..ee6e6e8fb7d
--- /dev/null
+++ b/config/src/main/java/com/alibaba/nacos/config/server/service/dump/task/DumpAllTask.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 1999-2018 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.nacos.config.server.service.dump.task;
+
+import com.alibaba.nacos.config.server.manager.AbstractTask;
+
+/**
+ * Dump all task.
+ *
+ * @author Nacos
+ * @date 2020/7/5 12:17 PM
+ */
+public class DumpAllTask extends AbstractTask {
+
+ @Override
+ public void merge(AbstractTask task) {
+ }
+
+ public static final String TASK_ID = "dumpAllConfigTask";
+}
diff --git a/config/src/main/java/com/alibaba/nacos/config/server/service/dump/task/DumpChangeTask.java b/config/src/main/java/com/alibaba/nacos/config/server/service/dump/task/DumpChangeTask.java
new file mode 100644
index 00000000000..948dffc31af
--- /dev/null
+++ b/config/src/main/java/com/alibaba/nacos/config/server/service/dump/task/DumpChangeTask.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 1999-2018 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.nacos.config.server.service.dump.task;
+
+import com.alibaba.nacos.config.server.manager.AbstractTask;
+
+/**
+ * Dump change task.
+ *
+ * @author Nacos
+ * @date 2020/7/5 12:19 PM
+ */
+public class DumpChangeTask extends AbstractTask {
+
+ @Override
+ public void merge(AbstractTask task) {
+ }
+
+ public static final String TASK_ID = "dumpChangeConfigTask";
+}
diff --git a/config/src/main/java/com/alibaba/nacos/config/server/service/dump/task/DumpTask.java b/config/src/main/java/com/alibaba/nacos/config/server/service/dump/task/DumpTask.java
new file mode 100755
index 00000000000..ce2aa240ba7
--- /dev/null
+++ b/config/src/main/java/com/alibaba/nacos/config/server/service/dump/task/DumpTask.java
@@ -0,0 +1,94 @@
+/*
+ * Copyright 1999-2018 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.nacos.config.server.service.dump.task;
+
+import com.alibaba.nacos.config.server.manager.AbstractTask;
+
+/**
+ * Dump data task.
+ *
+ * @author Nacos
+ */
+public class DumpTask extends AbstractTask {
+
+ public DumpTask(String groupKey, long lastModified, String handleIp) {
+ this.groupKey = groupKey;
+ this.lastModified = lastModified;
+ this.handleIp = handleIp;
+ this.isBeta = false;
+ this.tag = null;
+ // retry interval: 1s
+ setTaskInterval(1000L);
+ }
+
+ public DumpTask(String groupKey, long lastModified, String handleIp, boolean isBeta) {
+ this.groupKey = groupKey;
+ this.lastModified = lastModified;
+ this.handleIp = handleIp;
+ this.isBeta = isBeta;
+ this.tag = null;
+ // retry interval: 1s
+ setTaskInterval(1000L);
+ }
+
+ public DumpTask(String groupKey, String tag, long lastModified, String handleIp, boolean isBeta) {
+ this.groupKey = groupKey;
+ this.lastModified = lastModified;
+ this.handleIp = handleIp;
+ this.isBeta = isBeta;
+ this.tag = tag;
+ // retry interval: 1s
+ setTaskInterval(1000L);
+ }
+
+ @Override
+ public void merge(AbstractTask task) {
+ }
+
+ final String groupKey;
+
+ final long lastModified;
+
+ final String handleIp;
+
+ final boolean isBeta;
+
+ final String tag;
+
+
+ public String getGroupKey() {
+ return groupKey;
+ }
+
+ public long getLastModified() {
+ return lastModified;
+ }
+
+ public String getHandleIp() {
+ return handleIp;
+ }
+
+ public boolean isBeta() {
+ return isBeta;
+ }
+
+ public String getTag() {
+ return tag;
+ }
+
+}
+
diff --git a/config/src/main/java/com/alibaba/nacos/config/server/service/merge/MergeDataTask.java b/config/src/main/java/com/alibaba/nacos/config/server/service/merge/MergeDataTask.java
index dfa5f35ed11..9242822f1f7 100755
--- a/config/src/main/java/com/alibaba/nacos/config/server/service/merge/MergeDataTask.java
+++ b/config/src/main/java/com/alibaba/nacos/config/server/service/merge/MergeDataTask.java
@@ -19,7 +19,7 @@
import com.alibaba.nacos.config.server.manager.AbstractTask;
/**
- * 表示对数据进行聚合的任务。
+ * Represents the task of aggregating data.
*
* @author jiuRen
*/
@@ -36,7 +36,7 @@ class MergeDataTask extends AbstractTask {
this.tag = tag;
this.clientIp = clientIp;
- // 聚合延迟
+ // aggregation delay
setTaskInterval(DELAY);
setLastProcessTime(System.currentTimeMillis());
}
diff --git a/config/src/main/java/com/alibaba/nacos/config/server/service/merge/MergeDatumService.java b/config/src/main/java/com/alibaba/nacos/config/server/service/merge/MergeDatumService.java
index c72bfdf4071..61daddd3e6a 100644
--- a/config/src/main/java/com/alibaba/nacos/config/server/service/merge/MergeDatumService.java
+++ b/config/src/main/java/com/alibaba/nacos/config/server/service/merge/MergeDatumService.java
@@ -26,9 +26,7 @@
import com.alibaba.nacos.config.server.utils.ContentUtils;
import com.alibaba.nacos.config.server.utils.PropertyUtil;
import com.alibaba.nacos.config.server.utils.TimeUtils;
-import com.alibaba.nacos.consistency.cp.CPProtocol;
import com.alibaba.nacos.core.distributed.ProtocolManager;
-import com.alibaba.nacos.core.distributed.raft.exception.NoSuchRaftGroupException;
import com.alibaba.nacos.core.utils.ApplicationUtils;
import com.alibaba.nacos.core.utils.InetUtils;
import org.slf4j.Logger;
@@ -42,9 +40,9 @@
import java.util.concurrent.atomic.AtomicInteger;
/**
- * 数据聚合服务。
- *
- * 启动时做全量聚合 + 修改数据触发的单条聚合
+ * Data aggregation service.
+ *
+ *
Full aggregation at startup and single aggregation triggered by data modification.
*
* @author jiuRen
*/
@@ -79,7 +77,7 @@ static List> splitList(List list, int
}
/**
- * 数据变更后调用,添加聚合任务
+ * Called after data changes to add aggregation tasks.
*/
public void addMergeTask(String dataId, String groupId, String tenant, String tag, String clientIp) {
if (!canExecute()) {
@@ -90,7 +88,7 @@ public void addMergeTask(String dataId, String groupId, String tenant, String ta
}
/**
- * 数据变更后调用,添加聚合任务
+ * Called after data changes to add aggregation tasks.
*/
public void addMergeTask(String dataId, String groupId, String tenant, String clientIp) {
if (!canExecute()) {
@@ -100,6 +98,9 @@ public void addMergeTask(String dataId, String groupId, String tenant, String cl
mergeTasks.addTask(task.getId(), task);
}
+ /**
+ * Merge all.
+ */
public void mergeAll() {
if (!canExecute()) {
return;
@@ -146,42 +147,40 @@ public void run() {
.findConfigInfoAggrByPage(dataId, group, tenant, pageNo, PAGE_SIZE);
if (page != null) {
datumList.addAll(page.getPageItems());
- log.info("[merge-query] {}, {}, size/total={}/{}", dataId, group, datumList.size(),
+ LOGGER.info("[merge-query] {}, {}, size/total={}/{}", dataId, group, datumList.size(),
rowCount);
}
}
final Timestamp time = TimeUtils.getCurrentTime();
- // 聚合
+
if (datumList.size() > 0) {
+ // merge
ConfigInfo cf = MergeTaskProcessor.merge(dataId, group, tenant, datumList);
persistService.insertOrUpdate(null, null, cf, time, null, false);
- log.info("[merge-ok] {}, {}, size={}, length={}, md5={}, content={}", dataId, group,
+ LOGGER.info("[merge-ok] {}, {}, size={}, length={}, md5={}, content={}", dataId, group,
datumList.size(), cf.getContent().length(), cf.getMd5(),
ContentUtils.truncateContent(cf.getContent()));
- }
- // 删除
- else {
+ } else {
+ // remove
persistService.removeConfigInfo(dataId, group, tenant, InetUtils.getSelfIp(), null);
- log.warn("[merge-delete] delete config info because no datum. dataId=" + dataId + ", groupId="
+ LOGGER.warn("[merge-delete] delete config info because no datum. dataId=" + dataId + ", groupId="
+ group);
}
} catch (Exception e) {
- log.info("[merge-error] " + dataId + ", " + group + ", " + e.toString(), e);
+ LOGGER.info("[merge-error] " + dataId + ", " + group + ", " + e.toString(), e);
}
FINISHED.incrementAndGet();
if (FINISHED.get() % 100 == 0) {
- log.info("[all-merge-dump] {} / {}", FINISHED.get(), total);
+ LOGGER.info("[all-merge-dump] {} / {}", FINISHED.get(), total);
}
}
- log.info("[all-merge-dump] {} / {}", FINISHED.get(), total);
+ LOGGER.info("[all-merge-dump] {} / {}", FINISHED.get(), total);
}
}
- // =====================
-
- private static final Logger log = LoggerFactory.getLogger(MergeDatumService.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(MergeDatumService.class);
final TaskManager mergeTasks;
diff --git a/config/src/main/java/com/alibaba/nacos/config/server/service/merge/MergeTaskProcessor.java b/config/src/main/java/com/alibaba/nacos/config/server/service/merge/MergeTaskProcessor.java
index 797c1c7540f..1e9f4fe6d01 100755
--- a/config/src/main/java/com/alibaba/nacos/config/server/service/merge/MergeTaskProcessor.java
+++ b/config/src/main/java/com/alibaba/nacos/config/server/service/merge/MergeTaskProcessor.java
@@ -38,13 +38,13 @@
import java.util.List;
/**
- * Merge task processor
+ * Merge task processor.
*
* @author Nacos
*/
public class MergeTaskProcessor implements TaskProcessor {
- final int PAGE_SIZE = 10000;
+ private static final int PAGE_SIZE = 10000;
MergeTaskProcessor(PersistService persistService, MergeDatumService mergeService) {
this.persistService = persistService;
@@ -68,33 +68,32 @@ public boolean process(String taskType, AbstractTask task) {
.findConfigInfoAggrByPage(dataId, group, tenant, pageNo, PAGE_SIZE);
if (page != null) {
datumList.addAll(page.getPageItems());
- log.info("[merge-query] {}, {}, size/total={}/{}", dataId, group, datumList.size(), rowCount);
+ LOGGER.info("[merge-query] {}, {}, size/total={}/{}", dataId, group, datumList.size(), rowCount);
}
}
final Timestamp time = TimeUtils.getCurrentTime();
- // 聚合
if (datumList.size() > 0) {
+ // merge
ConfigInfo cf = merge(dataId, group, tenant, datumList);
persistService.insertOrUpdate(null, null, cf, time, null);
- log.info("[merge-ok] {}, {}, size={}, length={}, md5={}, content={}", dataId, group, datumList.size(),
+ LOGGER.info("[merge-ok] {}, {}, size={}, length={}, md5={}, content={}", dataId, group, datumList.size(),
cf.getContent().length(), cf.getMd5(), ContentUtils.truncateContent(cf.getContent()));
ConfigTraceService
.logPersistenceEvent(dataId, group, tenant, null, time.getTime(), InetUtils.getSelfIp(),
ConfigTraceService.PERSISTENCE_EVENT_MERGE, cf.getContent());
- }
- // 删除
- else {
+ } else {
+ // remove
if (StringUtils.isBlank(tag)) {
persistService.removeConfigInfo(dataId, group, tenant, clientIp, null);
} else {
persistService.removeConfigInfoTag(dataId, group, tenant, tag, clientIp, null);
}
- log.warn("[merge-delete] delete config info because no datum. dataId=" + dataId + ", groupId=" + group);
+ LOGGER.warn("[merge-delete] delete config info because no datum. dataId=" + dataId + ", groupId=" + group);
ConfigTraceService
.logPersistenceEvent(dataId, group, tenant, null, time.getTime(), InetUtils.getSelfIp(),
@@ -105,12 +104,21 @@ public boolean process(String taskType, AbstractTask task) {
} catch (Exception e) {
mergeService.addMergeTask(dataId, group, tenant, mergeTask.getClientIp());
- log.info("[merge-error] " + dataId + ", " + group + ", " + e.toString(), e);
+ LOGGER.info("[merge-error] " + dataId + ", " + group + ", " + e.toString(), e);
}
return true;
}
+ /**
+ * merge datumList {@link ConfigInfoAggr}.
+ *
+ * @param dataId data id
+ * @param group group
+ * @param tenant tenant
+ * @param datumList datumList
+ * @return {@link ConfigInfo}
+ */
public static ConfigInfo merge(String dataId, String group, String tenant, List datumList) {
StringBuilder sb = new StringBuilder();
String appName = null;
@@ -125,9 +133,7 @@ public static ConfigInfo merge(String dataId, String group, String tenant, List<
return new ConfigInfo(dataId, group, tenant, appName, content);
}
- // =====================
-
- private static final Logger log = LoggerFactory.getLogger(MergeTaskProcessor.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(MergeTaskProcessor.class);
private PersistService persistService;