Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix](hive)Modify the Hive notification event processing method when using meta cache and add parameters to the Hive catalog. #39239

Merged
merged 9 commits into from
Aug 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docker/thirdparties/docker-compose/hive/hadoop-hive.env.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ HIVE_SITE_CONF_hive_server2_webui_port=0
HIVE_SITE_CONF_hive_compactor_initiator_on=true
HIVE_SITE_CONF_hive_compactor_worker_threads=2
HIVE_SITE_CONF_metastore_storage_schema_reader_impl=org.apache.hadoop.hive.metastore.SerDeStorageSchemaReader
HIVE_SITE_CONF_hive_metastore_event_db_notification_api_auth=false
HIVE_SITE_CONF_hive_metastore_dml_events=true
HIVE_SITE_CONF_hive_metastore_transactional_event_listeners=org.apache.hive.hcatalog.listener.DbNotificationListener

CORE_CONF_fs_defaultFS=hdfs://${IP_HOST}:${FS_PORT}
CORE_CONF_hadoop_http_staticuser_user=root
Expand Down
5 changes: 2 additions & 3 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
Original file line number Diff line number Diff line change
Expand Up @@ -1836,9 +1836,8 @@ protected void startNonMasterDaemonThreads() {
domainResolver.start();
// fe disk updater
feDiskUpdater.start();
if (Config.enable_hms_events_incremental_sync) {
metastoreEventsProcessor.start();
}

metastoreEventsProcessor.start();

dnsCache.start();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -655,13 +655,6 @@ public void registerExternalTableFromEvent(String dbName, String tableName,
return;
}

TableIf table = db.getTableNullable(tableName);
if (table != null) {
if (!ignoreIfExists) {
throw new DdlException("Table " + tableName + " has exist in db " + dbName);
}
return;
}
long tblId;
HMSExternalCatalog hmsCatalog = (HMSExternalCatalog) catalog;
if (hmsCatalog.getUseMetaCache().get()) {
Expand Down Expand Up @@ -712,13 +705,6 @@ public void registerExternalDatabaseFromEvent(String dbName, String catalogName,
if (!(catalog instanceof ExternalCatalog)) {
throw new DdlException("Only support create ExternalCatalog databases");
}
DatabaseIf db = catalog.getDbNullable(dbName);
if (db != null) {
if (!ignoreIfExists) {
throw new DdlException("Database " + dbName + " has exist in catalog " + catalog.getName());
}
return;
}

HMSExternalCatalog hmsCatalog = (HMSExternalCatalog) catalog;
long dbId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,11 +291,11 @@ public void checkProperties() throws DdlException {
}
}

if (properties.getOrDefault(ExternalCatalog.USE_META_CACHE, "true").equals("false")) {
LOG.warn("force to set use_meta_cache to true for catalog: {} when creating", name);
getCatalogProperty().addProperty(ExternalCatalog.USE_META_CACHE, "true");
useMetaCache = Optional.of(true);
}
// if (properties.getOrDefault(ExternalCatalog.USE_META_CACHE, "true").equals("false")) {
// LOG.warn("force to set use_meta_cache to true for catalog: {} when creating", name);
// getCatalogProperty().addProperty(ExternalCatalog.USE_META_CACHE, "true");
// useMetaCache = Optional.of(true);
// }
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -451,13 +451,14 @@ public void gsonPostProcess() throws IOException {

@Override
public void unregisterTable(String tableName) {
makeSureInitialized();
if (LOG.isDebugEnabled()) {
LOG.debug("create table [{}]", tableName);
}

if (extCatalog.getUseMetaCache().get()) {
if (isInitialized()) {
metaCache.invalidate(tableName);
metaCache.invalidate(tableName, Util.genIdByName(getQualifiedName(tableName)));
}
} else {
Long tableId = tableNameToId.remove(tableName);
Expand All @@ -480,18 +481,21 @@ public CatalogIf getCatalog() {
// Only used for sync hive metastore event
@Override
public boolean registerTable(TableIf tableIf) {
makeSureInitialized();
long tableId = tableIf.getId();
String tableName = tableIf.getName();
if (LOG.isDebugEnabled()) {
LOG.debug("create table [{}]", tableName);
}
if (extCatalog.getUseMetaCache().get()) {
if (isInitialized()) {
metaCache.updateCache(tableName, (T) tableIf);
metaCache.updateCache(tableName, (T) tableIf, Util.genIdByName(getQualifiedName(tableName)));
}
} else {
tableNameToId.put(tableName, tableId);
idToTbl.put(tableId, buildTableForInit(tableName, tableId, extCatalog));
if (!tableNameToId.containsKey(tableName)) {
tableNameToId.put(tableName, tableId);
idToTbl.put(tableId, buildTableForInit(tableName, tableId, extCatalog));
}
}
setLastUpdateTime(System.currentTimeMillis());
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.common.security.authentication.AuthenticationConfig;
import org.apache.doris.common.security.authentication.HadoopAuthenticator;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.CatalogProperty;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.ExternalDatabase;
Expand Down Expand Up @@ -73,6 +74,10 @@ public class HMSExternalCatalog extends ExternalCatalog {
@Getter
private HadoopAuthenticator authenticator;

private int hmsEventsBatchSizePerRpc = -1;
private boolean enableHmsEventsIncrementalSync = false;


@VisibleForTesting
public HMSExternalCatalog() {
catalogProperty = new CatalogProperty(null, null);
Expand Down Expand Up @@ -100,6 +105,19 @@ public void checkProperties() throws DdlException {
throw new DdlException(
"The parameter " + FILE_META_CACHE_TTL_SECOND + " is wrong, value is " + fileMetaCacheTtlSecond);
}
Map<String, String> properties = catalogProperty.getProperties();
if (properties.containsKey(HMSProperties.ENABLE_HMS_EVENTS_INCREMENTAL_SYNC)) {
enableHmsEventsIncrementalSync =
properties.get(HMSProperties.ENABLE_HMS_EVENTS_INCREMENTAL_SYNC).equals("true");
} else {
enableHmsEventsIncrementalSync = Config.enable_hms_events_incremental_sync;
}

if (properties.containsKey(HMSProperties.HMS_EVENTIS_BATCH_SIZE_PER_RPC)) {
hmsEventsBatchSizePerRpc = Integer.valueOf(properties.get(HMSProperties.HMS_EVENTIS_BATCH_SIZE_PER_RPC));
} else {
hmsEventsBatchSizePerRpc = Config.hms_events_batch_size_per_rpc;
}

// check the dfs.ha properties
// 'dfs.nameservices'='your-nameservice',
Expand Down Expand Up @@ -212,7 +230,7 @@ public void unregisterDatabase(String dbName) {
}
if (useMetaCache.get()) {
if (isInitialized()) {
metaCache.invalidate(dbName);
metaCache.invalidate(dbName, Util.genIdByName(getQualifiedName(dbName)));
}
} else {
Long dbId = dbNameToId.remove(dbName);
Expand All @@ -233,7 +251,7 @@ public void registerDatabase(long dbId, String dbName) {
ExternalDatabase<? extends ExternalTable> db = buildDbForInit(dbName, dbId, logType);
if (useMetaCache.get()) {
if (isInitialized()) {
metaCache.updateCache(dbName, db);
metaCache.updateCache(dbName, db, Util.genIdByName(getQualifiedName(dbName)));
}
} else {
dbNameToId.put(dbName, dbId);
Expand Down Expand Up @@ -266,4 +284,12 @@ public String getHiveMetastoreUris() {
public String getHiveVersion() {
return catalogProperty.getOrDefault(HMSProperties.HIVE_VERSION, "");
}

public int getHmsEventsBatchSizePerRpc() {
return hmsEventsBatchSizePerRpc;
}

public boolean isEnableHmsEventsIncrementalSync() {
return enableHmsEventsIncrementalSync;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,7 @@ protected boolean canBeBatched(MetastoreEvent that) {
// `that` event can be batched if this event's partitions contains all of the partitions which `that` event has
// else just remove `that` event's relevant partitions
for (String partitionName : getAllPartitionNames()) {
if (thatPartitionEvent instanceof AddPartitionEvent) {
((AddPartitionEvent) thatPartitionEvent).removePartition(partitionName);
} else if (thatPartitionEvent instanceof DropPartitionEvent) {
if (thatPartitionEvent instanceof DropPartitionEvent) {
((DropPartitionEvent) thatPartitionEvent).removePartition(partitionName);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
*/
public class IgnoredEvent extends MetastoreEvent {
private IgnoredEvent(NotificationEvent event, String catalogName) {
super(event, catalogName);
super(event);
}

protected static List<MetastoreEvent> getEvents(NotificationEvent event,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,37 +24,25 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.messaging.InsertMessage;

import java.util.List;

/**
* MetastoreEvent for INSERT event type
*/
public class InsertEvent extends MetastoreTableEvent {
private final Table hmsTbl;

// for test
public InsertEvent(long eventId, String catalogName, String dbName,
String tblName) {
super(eventId, catalogName, dbName, tblName, MetastoreEventType.INSERT);
this.hmsTbl = null;
}

private InsertEvent(NotificationEvent event, String catalogName) {
super(event, catalogName);
Preconditions.checkArgument(getEventType().equals(MetastoreEventType.INSERT));
Preconditions
.checkNotNull(event.getMessage(), debugString("Event message is null"));
try {
InsertMessage insertMessage =
MetastoreEventsProcessor.getMessageDeserializer(event.getMessageFormat())
.getInsertMessage(event.getMessage());
hmsTbl = Preconditions.checkNotNull(insertMessage.getTableObj());
} catch (Exception ex) {
throw new MetastoreNotificationException(ex);
}
}

protected static List<MetastoreEvent> getEvents(NotificationEvent event, String catalogName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,18 @@ protected MetastoreEvent(long eventId, String catalogName, String dbName,
this.event = null;
}

// for IgnoredEvent
protected MetastoreEvent(NotificationEvent event) {
this.event = event;
this.metastoreNotificationEvent = event;
this.eventId = -1;
this.eventTime = -1L;
this.catalogName = null;
this.dbName = null;
this.tblName = null;
this.eventType = null;
}

protected MetastoreEvent(NotificationEvent event, String catalogName) {
this.event = event;
// Some events that we don't care about, dbName may be empty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ public List<MetastoreEvent> transferNotificationEventToMetastoreEvents(Notificat
String catalogName) {
Preconditions.checkNotNull(event.getEventType());
MetastoreEventType metastoreEventType = MetastoreEventType.from(event.getEventType());
if (LOG.isDebugEnabled()) {
LOG.debug("catalogName = {}, Event = {}", catalogName, event.toString());
}
switch (metastoreEventType) {
case CREATE_TABLE:
return CreateTableEvent.getEvents(event, catalogName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ private void realRun() {
CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogId);
if (catalog instanceof HMSExternalCatalog) {
HMSExternalCatalog hmsExternalCatalog = (HMSExternalCatalog) catalog;
if (!hmsExternalCatalog.isEnableHmsEventsIncrementalSync()) {
continue;
}
try {
List<NotificationEvent> events = getNextHMSEvents(hmsExternalCatalog);
if (!events.isEmpty()) {
Expand All @@ -125,6 +128,8 @@ private void realRun() {
} catch (MetastoreNotificationFetchException e) {
LOG.warn("Failed to fetch hms events on {}. msg: ", hmsExternalCatalog.getName(), e);
} catch (Exception ex) {
hmsExternalCatalog.onRefreshCache(true);
updateLastSyncedEventId(hmsExternalCatalog, -1);
LOG.warn("Failed to process hive metastore [{}] events .",
hmsExternalCatalog.getName(), ex);
}
Expand All @@ -147,7 +152,7 @@ private List<NotificationEvent> getNextHMSEvents(HMSExternalCatalog hmsExternalC
response = getNextEventResponseForSlave(hmsExternalCatalog);
}

if (response == null) {
if (response == null || response.getEventsSize() == 0) {
return Collections.emptyList();
}
return response.getEvents();
Expand Down Expand Up @@ -207,9 +212,15 @@ private NotificationEventResponse getNextEventResponseForMaster(HMSExternalCatal
return null;
}

int batchSize = hmsExternalCatalog.getHmsEventsBatchSizePerRpc();
try {
return hmsExternalCatalog.getClient().getNextNotification(lastSyncedEventId,
Config.hms_events_batch_size_per_rpc, null);
NotificationEventResponse notificationEventResponse =
hmsExternalCatalog.getClient().getNextNotification(lastSyncedEventId, batchSize, null);
LOG.info("CatalogName = {}, lastSyncedEventId = {}, currentEventId = {},"
+ "batchSize = {}, getEventsSize = {}", hmsExternalCatalog.getName(), lastSyncedEventId,
currentEventId, batchSize, notificationEventResponse.getEvents().size());

return notificationEventResponse;
} catch (MetastoreNotificationFetchException e) {
// Need a fallback to handle this because this error state can not be recovered until restarting FE
if (StringUtils.isNotEmpty(e.getMessage())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public Optional<T> getMetaObjById(long id) {
return name == null ? Optional.empty() : getMetaObj(name, id);
}

public void updateCache(String objName, T obj) {
public void updateCache(String objName, T obj, long id) {
metaObjCache.put(objName, Optional.of(obj));
namesCache.asMap().compute("", (k, v) -> {
if (v == null) {
Expand All @@ -100,9 +100,10 @@ public void updateCache(String objName, T obj) {
return v;
}
});
idToName.put(id, objName);
}

public void invalidate(String objName) {
public void invalidate(String objName, long id) {
namesCache.asMap().compute("", (k, v) -> {
if (v == null) {
return Lists.newArrayList();
Expand All @@ -112,11 +113,13 @@ public void invalidate(String objName) {
}
});
metaObjCache.invalidate(objName);
idToName.remove(id);
}

public void invalidateAll() {
namesCache.invalidateAll();
metaObjCache.invalidateAll();
idToName.clear();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,6 @@ public class HMSProperties {
// required
public static final String HIVE_METASTORE_URIS = "hive.metastore.uris";
public static final List<String> REQUIRED_FIELDS = Collections.singletonList(HMSProperties.HIVE_METASTORE_URIS);

public static final String ENABLE_HMS_EVENTS_INCREMENTAL_SYNC = "hive.enable_hms_events_incremental_sync";
public static final String HMS_EVENTIS_BATCH_SIZE_PER_RPC = "hive.hms_events_batch_size_per_rpc";
}
2 changes: 2 additions & 0 deletions regression-test/pipeline/external/conf/fe.conf
Original file line number Diff line number Diff line change
Expand Up @@ -96,5 +96,7 @@ auth_token = 5ff161c3-2c08-4079-b108-26c8850b6598
infodb_support_ext_catalog=true

trino_connector_plugin_dir=/tmp/trino_connector/connectors
hms_events_polling_interval_ms=2000


KRB5_CONFIG=/keytabs/krb5.conf
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ suite("test_hive_ctas", "p0,external,hive,external_docker,external_docker_hive")
return;
}

for (String hivePrefix : ["hive2", "hive3"]) {
for (String hivePrefix : [ "hive3"]) {
def file_formats = ["parquet", "orc"]
setHivePrefix(hivePrefix)
def generateSrcDDLForCTAS = { String file_format, String catalog_name ->
Expand Down
Loading
Loading