Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[INLONG-10650][Agent] When the installer updates the configuration, it first checks the version #10654

Merged
merged 5 commits into from
Jul 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ private Runnable configFetchThread() {
while (isRunnable()) {
try {
ConfigResult config = getConfig();
if (config != null && config.getCode().equals(AgentResponseCode.SUCCESS)) {
if (config != null && config.getCode().equals(AgentResponseCode.SUCCESS)
&& manager.getModuleManager().getCurrentVersion() < config.getVersion()) {
manager.getModuleManager().submitConfig(config);
}
} catch (Throwable ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ public class ModuleManager extends AbstractDaemon {
private final String confPath;
private final BlockingQueue<ConfigResult> configQueue;
private String currentMd5 = "";
private Integer currentVersion = -1;
private Map<Integer, ModuleConfig> currentModules = new ConcurrentHashMap<>();
private static final GsonBuilder gsonBuilder = new GsonBuilder().setDateFormat("yyyy-MM-dd HH:mm:ss");
private static final Gson GSON = gsonBuilder.create();
Expand Down Expand Up @@ -135,6 +136,10 @@ private boolean isConfigValid(ConfigResult config) {
LOGGER.error("modules md5 should not be null!");
return false;
}
if (config.getVersion() == null) {
LOGGER.error("modules version should not be null!");
return false;
}
if (config.getModuleList().isEmpty()) {
LOGGER.error("module list should not be empty!");
return false;
Expand Down Expand Up @@ -218,6 +223,10 @@ public String getCurrentMd5() {
return currentMd5;
}

public Integer getCurrentVersion() {
return currentVersion;
}

public ModuleConfig getModule(Integer moduleId) {
return currentModules.get(moduleId);
}
Expand Down Expand Up @@ -254,8 +263,13 @@ public void restoreFromLocalFile(String confPath) {
new FileInputStream(localModuleConfigPath), StandardCharsets.UTF_8)) {
JsonElement tmpElement = JsonParser.parseReader(reader).getAsJsonObject();
ConfigResult curConfig = GSON.fromJson(tmpElement.getAsJsonObject(), ConfigResult.class);
if (curConfig.getMd5() != null && curConfig.getModuleList() != null) {
currentMd5 = curConfig.getMd5();
if (curConfig.getModuleList() != null) {
if (curConfig.getMd5() != null) {
currentMd5 = curConfig.getMd5();
}
if (curConfig.getVersion() != null) {
currentVersion = curConfig.getVersion();
}
curConfig.getModuleList().forEach((module) -> {
currentModules.put(module.getId(), module);
});
Expand All @@ -277,7 +291,7 @@ public void saveToLocalFile(String confPath) {
File jsonPath = new File(temp.getPath() + "/" + LOCAL_CONFIG_FILE);
try (BufferedWriter writer = new BufferedWriter(
new OutputStreamWriter(new FileOutputStream(jsonPath), StandardCharsets.UTF_8))) {
String curConfig = GSON.toJson(ConfigResult.builder().md5(currentMd5)
String curConfig = GSON.toJson(ConfigResult.builder().md5(currentMd5).version(currentVersion)
.moduleList(currentModules.values().stream().collect(Collectors.toList())).build());
writer.write(curConfig);
writer.flush();
Expand All @@ -299,6 +313,7 @@ private void dealWithConfigQueue(BlockingQueue<ConfigResult> queue) {
}
if (updateModules(config.getModuleList())) {
currentMd5 = config.getMd5();
currentVersion = config.getVersion();
saveToLocalFile(confPath);
} else {
LOGGER.error("update modules failed!");
Expand All @@ -308,13 +323,14 @@ private void dealWithConfigQueue(BlockingQueue<ConfigResult> queue) {
private void checkModules() {
LOGGER.info("check modules start");
currentModules.values().forEach((module) -> {
LOGGER.info("check module current state {} {}", module.getName(), module.getState());
LOGGER.info("check module {}({}) current state {}", module.getId(), module.getName(), module.getState());
switch (module.getState()) {
case NEW:
if (downloadModule(module)) {
saveModuleState(module.getId(), ModuleStateEnum.DOWNLOADED);
} else {
LOGGER.error("download module {} failed, keep state in new", module.getName());
LOGGER.error("download module {}({}) failed, keep state in new", module.getId(),
module.getName());
}
break;
case DOWNLOADED:
Expand All @@ -323,22 +339,24 @@ private void checkModules() {
saveModuleState(module.getId(), ModuleStateEnum.INSTALLED);
} else {
LOGGER.info(
"check module {} package failed, change stated to new, will download package again",
module.getName());
"check module {}({}) package failed, change stated to new, will download package again",
module.getId(), module.getName());
saveModuleState(module.getId(), ModuleStateEnum.NEW);
}
break;
case INSTALLED:
if (!isProcessAllStarted(module)) {
LOGGER.info("module {} process not all started try to start", module.getName());
LOGGER.info("module {}({}) process not all started try to start", module.getId(),
module.getName());
if (!startModule(module)) {
LOGGER.info("start module {} failed, change state to downloaded", module.getState());
LOGGER.info("start module {}({}) failed, change state to downloaded", module.getId(),
module.getName());
saveModuleState(module.getId(), ModuleStateEnum.DOWNLOADED);
}
}
break;
default:
LOGGER.error("module {} invalid state {}", module.getName(), module.getState());
LOGGER.error("module {}({}) invalid state {}", module.getId(), module.getName(), module.getState());
}
});
LOGGER.info("check modules end");
Expand All @@ -358,15 +376,15 @@ private void traverseManagerModulesToLocal(Map<Integer, ModuleConfig> modulesFro
modulesFromManager.values().forEach((managerModule) -> {
ModuleConfig localModule = currentModules.get(managerModule.getId());
if (localModule == null) {
LOGGER.info("traverseManagerModulesToLocal module {} {} {} not found in local, add it",
LOGGER.info("traverseManagerModulesToLocal module {}({}) {} not found in local, add it",
managerModule.getId(), managerModule.getName(), managerModule.getVersion());
addModule(managerModule);
} else {
if (managerModule.getMd5().equals(localModule.getMd5())) {
LOGGER.info("traverseManagerModulesToLocal module {} {} {} md5 no change, do nothing",
LOGGER.info("traverseManagerModulesToLocal module {}({}) {} md5 no change, do nothing",
localModule.getId(), localModule.getName(), localModule.getVersion());
} else {
LOGGER.info("traverseManagerModulesToLocal module {} {} {} md5 changed, update it",
LOGGER.info("traverseManagerModulesToLocal module {}({}) {} md5 changed, update it",
localModule.getId(), localModule.getName(), localModule.getVersion());
updateModule(localModule, managerModule);
}
Expand All @@ -378,62 +396,65 @@ private void traverseLocalModulesToManager(Map<Integer, ModuleConfig> modulesFro
currentModules.values().forEach((localModule) -> {
ModuleConfig managerModule = modulesFromManager.get(localModule.getId());
if (managerModule == null) {
LOGGER.info("traverseLocalModulesToManager module {} {} {} not found in local, delete it",
LOGGER.info("traverseLocalModulesToManager module {}({}) {} not found in local, delete it",
localModule.getId(), localModule.getName(), localModule.getVersion());
deleteModule(localModule);
}
});
}

private void addModule(ModuleConfig module) {
LOGGER.info("add module {} start", module.getName());
LOGGER.info("add module {}({}) start", module.getId(), module.getName());
addAndSaveModuleConfig(module);
if (!downloadModule(module)) {
LOGGER.error("add module {} but download failed", module.getName());
LOGGER.error("add module {}({}) but download failed", module.getId(), module.getName());
return;
}
saveModuleState(module.getId(), ModuleStateEnum.DOWNLOADED);
installModule(module);
saveModuleState(module.getId(), ModuleStateEnum.INSTALLED);
startModule(module);
LOGGER.info("add module {} end", module.getId());
LOGGER.info("add module {}({}) end", module.getId(), module.getName());
}

private void deleteModule(ModuleConfig module) {
LOGGER.info("delete module {} start", module.getId());
LOGGER.info("delete module {}({}) start", module.getId(), module.getName());
stopModule(module);
uninstallModule(module);
deleteAndSaveModuleConfig(module);
LOGGER.info("delete module {} end", module.getId());
LOGGER.info("delete module {}({}) end", module.getId(), module.getName());
}

private void updateModule(ModuleConfig localModule, ModuleConfig managerModule) {
LOGGER.info("update module {} start", localModule.getId());
LOGGER.info("update module {}({}) start", localModule.getId(), localModule.getName());
if (localModule.getPackageConfig().getMd5().equals(managerModule.getPackageConfig().getMd5())) {
LOGGER.info("module {} package md5 no change, will restart and save config", localModule.getId());
LOGGER.info("module {}({}) package md5 no change, will restart and save config", localModule.getId(),
localModule.getName());
restartModule(localModule, managerModule);
managerModule.setState(localModule.getState());
updateModuleConfig(managerModule);
} else {
LOGGER.info("module {} package md5 changed, will reinstall", localModule.getId());
LOGGER.info("module {}({}) package md5 changed, will reinstall", localModule.getId(),
localModule.getName());
deleteModule(localModule);
addModule(managerModule);
}
LOGGER.info("update module {} end", localModule.getId());
LOGGER.info("update module {}({}) end", localModule.getId(), localModule.getName());
}

private void addAndSaveModuleConfig(ModuleConfig module) {
module.setState(ModuleStateEnum.NEW);
if (currentModules.containsKey(module.getId())) {
LOGGER.error("should not happen! module {} found! will force to replace it!", module.getId());
LOGGER.error("should not happen! module {}({}) found! will force to replace it!", module.getId(),
module.getName());
}
currentModules.put(module.getId(), module);
saveToLocalFile(confPath);
}

private void deleteAndSaveModuleConfig(ModuleConfig module) {
if (!currentModules.containsKey(module.getId())) {
LOGGER.error("should not happen! module {} not found!", module.getId());
LOGGER.error("should not happen! module {}({}) not found!", module.getId(), module.getName());
return;
}
currentModules.remove(module.getId());
Expand All @@ -453,7 +474,7 @@ private boolean saveModuleState(Integer moduleId, ModuleStateEnum state) {
}
module.setState(state);
saveToLocalFile(confPath);
LOGGER.info("save module state to {} {}", moduleId, state);
LOGGER.info("save module {}({}) state to {}", module.getId(), module.getName(), state);
return true;
}

Expand All @@ -463,42 +484,43 @@ private void restartModule(ModuleConfig localModule, ModuleConfig managerModule)
}

private void installModule(ModuleConfig module) {
LOGGER.info("install module {} with cmd {}", module.getId(), module.getInstallCommand());
LOGGER.info("install module {}({}) with cmd {}", module.getId(), module.getName(), module.getInstallCommand());
String ret = ExcuteLinux.exeCmd(module.getInstallCommand());
LOGGER.info("install module {} return {} ", module.getId(), ret);
LOGGER.info("install module {}({}) return {} ", module.getId(), module.getName(), ret);
}

private boolean startModule(ModuleConfig module) {
LOGGER.info("start module {} with cmd {}", module.getId(), module.getStartCommand());
LOGGER.info("start module {}({}) with cmd {}", module.getId(), module.getName(), module.getStartCommand());
for (int i = 0; i < module.getProcessesNum(); i++) {
String ret = ExcuteLinux.exeCmd(module.getStartCommand());
LOGGER.info("start [{}] module {} return {} ", i, module.getId(), ret);
LOGGER.info("start module {}({}) proc[{}] return {} ", module.getId(), module.getName(), i, ret);
}
if (isProcessAllStarted(module)) {
LOGGER.info("start module {} success", module.getId());
LOGGER.info("start module {}({}) success", module.getId(), module.getName());
return true;
} else {
LOGGER.info("start module {} failed", module.getId());
LOGGER.info("start module {}({}) failed", module.getId(), module.getName());
return false;
}
}

private void stopModule(ModuleConfig module) {
LOGGER.info("stop module {} with cmd {}", module.getId(), module.getStopCommand());
LOGGER.info("stop module {}({}) with cmd {}", module.getId(), module.getName(), module.getStopCommand());
String ret = ExcuteLinux.exeCmd(module.getStopCommand());
LOGGER.info("stop module {} return {} ", module.getId(), ret);
LOGGER.info("stop module {}({}) return {} ", module.getId(), module.getName(), ret);
}

private void uninstallModule(ModuleConfig module) {
LOGGER.info("uninstall module {} with cmd {}", module.getId(), module.getUninstallCommand());
LOGGER.info("uninstall module {}({}) with cmd {}", module.getId(), module.getName(),
module.getUninstallCommand());
String ret = ExcuteLinux.exeCmd(module.getUninstallCommand());
LOGGER.info("uninstall module {} return {} ", module.getId(), ret);
LOGGER.info("uninstall module {}({}) return {} ", module.getId(), module.getName(), ret);
}

private boolean isProcessAllStarted(ModuleConfig module) {
String ret = ExcuteLinux.exeCmd(module.getCheckCommand());
if (ret == null) {
LOGGER.error("get module process num {} failed", module.getName());
LOGGER.error("get module {}({}) process num failed", module.getId(), module.getName());
return false;
}
String[] processArray = ret.split("\n");
Expand All @@ -508,12 +530,12 @@ private boolean isProcessAllStarted(ModuleConfig module) {
cnt++;
}
}
LOGGER.info("get module process num {} {}", module.getName(), cnt);
LOGGER.info("get module {}({}) process num {}", module.getId(), module.getName(), cnt);
return cnt >= module.getProcessesNum();
}

private boolean downloadModule(ModuleConfig module) {
LOGGER.info("download module {} begin with url {}", module.getId(),
LOGGER.info("download module {}({}) begin with url {}", module.getId(), module.getName(),
module.getPackageConfig().getDownloadUrl());
try {
URL url = new URL(module.getPackageConfig().getDownloadUrl());
Expand All @@ -526,7 +548,7 @@ private boolean downloadModule(ModuleConfig module) {
module.getPackageConfig().getStoragePath() + "/" + module.getPackageConfig().getFileName();
try (InputStream inputStream = conn.getInputStream();
FileOutputStream outputStream = new FileOutputStream(path)) {
LOGGER.info("save path {}", path);
LOGGER.info("module {}({}) save path {}", module.getId(), module.getName(), path);
int byteRead;
byte[] buffer = new byte[DOWNLOAD_PACKAGE_READ_BUFF_SIZE];
while ((byteRead = inputStream.read(buffer)) != -1) {
Expand All @@ -536,15 +558,15 @@ private boolean downloadModule(ModuleConfig module) {
if (isPackageDownloaded(module)) {
return true;
} else {
LOGGER.error("download package md5 not match!");
LOGGER.error("download module {}({}) package md5 not match!", module.getId(), module.getName());
return false;
}
} catch (FileNotFoundException e) {
LOGGER.error("download module err", e);
LOGGER.error("download module {}({}) err", module.getId(), module.getName(), e);
} catch (IOException e) {
LOGGER.error("download module err", e);
LOGGER.error("download module {}({}) err", module.getId(), module.getName(), e);
}
LOGGER.info("download module {} end", module.getId());
LOGGER.info("download module {}({}) end", module.getId(), module.getName());
return false;
}

Expand All @@ -554,7 +576,8 @@ private boolean isPackageDownloaded(ModuleConfig module) {
if (Objects.equals(fileMd5, module.getPackageConfig().getMd5())) {
return true;
} else {
LOGGER.error("md5 not match! fileMd5 {} moduleMd5 {}", fileMd5, module.getPackageConfig().getMd5());
LOGGER.error("module {}({}) md5 not match! fileMd5 {} moduleMd5 {}", module.getId(), module.getName(),
fileMd5, module.getPackageConfig().getMd5());
return false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ private ConfigResult getConfig() {
"echo empty uninstall cmd", "agent-release-1.13.0-SNAPSHOT-bin.tar.gz",
"http://11.151.252.111:8083/inlong/manager/openapi/agent/download/agent-release-1.13.0-SNAPSHOT-bin.tar.gz",
NEW_MD5));
return ConfigResult.builder().moduleList(configs).md5("config-result-md5-193603").build();
return ConfigResult.builder().moduleList(configs).md5("config-result-md5-193603").version(1).build();
}

private ModuleConfig getModuleConfig(int id, String name, String md5, String version, Integer procNum,
Expand Down
Loading
Loading