Skip to content

Commit

Permalink
[feature](ES catalog)Support auto detect available nodes in node_disc…
Browse files Browse the repository at this point in the history
…overy mode
  • Loading branch information
qidaye committed Jan 8, 2025
1 parent c3ed2d3 commit e082af9
Show file tree
Hide file tree
Showing 10 changed files with 154 additions and 2 deletions.
5 changes: 5 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.datasource.SplitSourceManager;
import org.apache.doris.datasource.es.EsExternalCatalog;
import org.apache.doris.datasource.es.EsNodeDiscovery;
import org.apache.doris.datasource.es.EsRepository;
import org.apache.doris.datasource.hive.HiveTransactionMgr;
import org.apache.doris.datasource.hive.event.MetastoreEventsProcessor;
Expand Down Expand Up @@ -4591,6 +4592,10 @@ public EsRepository getEsRepository() {
return getInternalCatalog().getEsRepository();
}

public EsNodeDiscovery getEsNodeDiscovery() {
return getInternalCatalog().getEsNodeDiscovery();
}

public PolicyMgr getPolicyMgr() {
return this.policyMgr;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
import org.apache.doris.datasource.es.EsMetaStateTracker;
import org.apache.doris.datasource.es.EsNodeInfo;
import org.apache.doris.datasource.es.EsRestClient;
import org.apache.doris.datasource.es.EsTablePartitions;
import org.apache.doris.datasource.es.EsUtil;
Expand Down Expand Up @@ -85,6 +86,7 @@ public class EsTable extends Table implements GsonPostProcessable {
@SerializedName("pi")
private PartitionInfo partitionInfo;
private EsTablePartitions esTablePartitions;
private Set<EsNodeInfo> availableNodesInfo;

// Whether to enable docvalues scan optimization for fetching fields more fast, default to true
private boolean enableDocValueScan = Boolean.parseBoolean(EsResource.DOC_VALUE_SCAN_DEFAULT_VALUE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.es.EsExternalCatalog;
import org.apache.doris.datasource.hive.HMSExternalCatalog;
import org.apache.doris.datasource.hive.HMSExternalDatabase;
import org.apache.doris.datasource.hive.HMSExternalTable;
Expand All @@ -71,6 +72,7 @@
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -133,6 +135,9 @@ private void addCatalog(CatalogIf catalog) {
resource.addReference(catalog.getName(), ReferenceType.CATALOG);
}
}
if (catalog instanceof EsExternalCatalog) {
Env.getCurrentEnv().getEsNodeDiscovery().registerCatalog((EsExternalCatalog) catalog);
}
}

private CatalogIf removeCatalog(long catalogId) {
Expand All @@ -152,6 +157,9 @@ private CatalogIf removeCatalog(long catalogId) {
}
}
Env.getCurrentEnv().getQueryStats().clear(catalog.getId());
if (catalog.getType().equals(InitCatalogLog.Type.ES.name().toLowerCase(Locale.ROOT))) {
Env.getCurrentEnv().getEsNodeDiscovery().deRegisterCatalog(catalog.getId());
}
}
return catalog;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@
import org.apache.doris.common.util.SqlParserUtils;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.es.EsNodeDiscovery;
import org.apache.doris.datasource.es.EsRepository;
import org.apache.doris.event.DropPartitionEvent;
import org.apache.doris.mtmv.MTMVUtil;
Expand Down Expand Up @@ -228,6 +229,8 @@ public class InternalCatalog implements CatalogIf<Database> {
// Add transient to fix gson issue.
@Getter
private transient EsRepository esRepository = new EsRepository();
@Getter
private transient EsNodeDiscovery esNodeDiscovery = new EsNodeDiscovery();

public InternalCatalog() {
// create internal databases
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,11 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

/**
* External catalog for elasticsearch
Expand All @@ -47,13 +50,16 @@ public class EsExternalCatalog extends ExternalCatalog {
private static final List<String> REQUIRED_PROPERTIES = ImmutableList.of(
EsResource.HOSTS
);
// Available node
private Set<EsNodeInfo> availableNodesInfo;

/**
* Default constructor for EsExternalCatalog.
*/
public EsExternalCatalog(long catalogId, String name, String resource, Map<String, String> props, String comment) {
super(catalogId, name, InitCatalogLog.Type.ES, comment);
this.catalogProperty = new CatalogProperty(resource, processCompatibleProperties(props));
this.availableNodesInfo = ConcurrentHashMap.newKeySet();
}

private Map<String, String> processCompatibleProperties(Map<String, String> props) {
Expand Down Expand Up @@ -158,4 +164,38 @@ public void checkProperties() throws DdlException {
}
}
}

public void detectAvailableNodesInfo() {
if (availableNodesInfo == null) {
availableNodesInfo = ConcurrentHashMap.newKeySet();
}
List<EsNodeInfo> nodeInfos = esRestClient.getHttpNodesList();
for (EsNodeInfo nodeInfo : nodeInfos) {
String[] nodes = {nodeInfo.getHost() + ":" + nodeInfo.getPublishAddress().getPort()};
EsRestClient esRestClient = new EsRestClient(nodes, getUsername(), getPassword(), enableSsl());
if (esRestClient.health()) {
availableNodesInfo.add(nodeInfo);
} else {
availableNodesInfo.remove(nodeInfo);
}
}
}

public Set<EsNodeInfo> getAvailableNodesInfo() {
if (availableNodesInfo == null) {
availableNodesInfo = ConcurrentHashMap.newKeySet();
}
if (availableNodesInfo.isEmpty()) {
try {
detectAvailableNodesInfo();
} catch (Exception e) {
return availableNodesInfo;
}
}
return Collections.unmodifiableSet(availableNodesInfo);
}

public void clearAvailableNodesInfo() {
availableNodesInfo.clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ private EsTable toEsTable() {
esTable.setHttpSslEnabled(esCatalog.enableSsl());
esTable.setLikePushDown(esCatalog.enableLikePushDown());
esTable.setSeeds(esCatalog.getNodes());
esTable.setAvailableNodesInfo(esCatalog.getAvailableNodesInfo());
esTable.setHosts(String.join(",", esCatalog.getNodes()));
esTable.syncTableMetaData();
esTable.setIncludeHiddenIndex(esCatalog.enableIncludeHiddenIndex());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource.es;


import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
import org.apache.doris.common.util.MasterDaemon;

import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Map;


/**
* It is responsible for automatic loading all ES external catalogs' nodes info periodically
*/
public class EsNodeDiscovery extends MasterDaemon {

private static final Logger LOG = LogManager.getLogger(EsNodeDiscovery.class);

private Map<Long, EsExternalCatalog> esCatalogs;

public EsNodeDiscovery() {
super("es node discovery", Config.es_state_sync_interval_second * 1000);
esCatalogs = Maps.newConcurrentMap();
}

public void registerCatalog(EsExternalCatalog esCatalog) {
if (Env.isCheckpointThread()) {
return;
}
esCatalogs.put(esCatalog.getId(), esCatalog);
LOG.info("register a new catalog [{}] to sync list", esCatalog);
}

public void deRegisterCatalog(long catalogId) {
esCatalogs.remove(catalogId);
}

@Override
protected void runAfterCatalogReady() {
for (EsExternalCatalog esCatalog : esCatalogs.values()) {
try {
esCatalog.detectAvailableNodesInfo();
} catch (Throwable e) {
LOG.warn("Exception happens when fetch catalog [{}] nodes info data from remote es cluster",
esCatalog.getName(), e);
esCatalog.clearAvailableNodesInfo();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,21 @@ public Map<String, EsNodeInfo> getHttpNodes() throws DorisEsException {
return nodesMap;
}

public List<EsNodeInfo> getHttpNodesList() throws DorisEsException {
Map<String, Map<String, Object>> nodesData = get("_nodes/http", "nodes");
if (nodesData == null) {
return Collections.emptyList();
}
List<EsNodeInfo> nodesList = new ArrayList<>();
for (Map.Entry<String, Map<String, Object>> entry : nodesData.entrySet()) {
EsNodeInfo node = new EsNodeInfo(entry.getKey(), entry.getValue(), httpSslEnable);
if (node.hasHttp()) {
nodesList.add(node);
}
}
return nodesList;
}

/**
* Get mapping for indexName.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@ public PartitionPhase(EsRestClient client) {
@Override
public void execute(SearchContext context) throws DorisEsException {
shardPartitions = client.searchShards(context.sourceIndex());
if (context.nodesDiscovery()) {
nodesInfo = client.getHttpNodes();
if (context.nodesDiscovery() && !context.getAvailableNodesInfo().isEmpty()) {
for (EsNodeInfo nodeInfo : context.getAvailableNodesInfo()) {
nodesInfo.put(nodeInfo.getId(), nodeInfo);
}
} else {
nodesInfo = new HashMap<>();
String[] seeds = context.esTable().getSeeds();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import lombok.Getter;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;

/**
* This class encapsulates the state needed to execute a query on ES table such as fields、doc_values、resolved index、
Expand Down Expand Up @@ -89,13 +91,17 @@ public class SearchContext {
// whether the nodes needs to be discovered
private boolean nodesDiscovery;

@Getter
private Set<EsNodeInfo> availableNodesInfo;


public SearchContext(EsTable table) {
this.table = table;
fullSchema = table.getFullSchema();
sourceIndex = table.getIndexName();
type = table.getMappingType();
nodesDiscovery = table.isNodesDiscovery();
availableNodesInfo = table.getAvailableNodesInfo();
}


Expand Down

0 comments on commit e082af9

Please sign in to comment.