Skip to content

Commit

Permalink
fix comment
Browse files Browse the repository at this point in the history
  • Loading branch information
hubgeter authored and morningman committed Aug 22, 2024
1 parent 70f6fb1 commit 335747a
Show file tree
Hide file tree
Showing 8 changed files with 45 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -291,11 +291,11 @@ public void checkProperties() throws DdlException {
}
}

if (properties.getOrDefault(ExternalCatalog.USE_META_CACHE, "true").equals("false")) {
LOG.warn("force to set use_meta_cache to true for catalog: {} when creating", name);
getCatalogProperty().addProperty(ExternalCatalog.USE_META_CACHE, "true");
useMetaCache = Optional.of(true);
}
// if (properties.getOrDefault(ExternalCatalog.USE_META_CACHE, "true").equals("false")) {
// LOG.warn("force to set use_meta_cache to true for catalog: {} when creating", name);
// getCatalogProperty().addProperty(ExternalCatalog.USE_META_CACHE, "true");
// useMetaCache = Optional.of(true);
// }
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -492,8 +492,10 @@ public boolean registerTable(TableIf tableIf) {
metaCache.updateCache(tableName, (T) tableIf, Util.genIdByName(getQualifiedName(tableName)));
}
} else {
tableNameToId.put(tableName, tableId);
idToTbl.put(tableId, buildTableForInit(tableName, tableId, extCatalog));
if (!tableNameToId.containsKey(tableName)) {
tableNameToId.put(tableName, tableId);
idToTbl.put(tableId, buildTableForInit(tableName, tableId, extCatalog));
}
}
setLastUpdateTime(System.currentTimeMillis());
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,19 @@ public void checkProperties() throws DdlException {
throw new DdlException(
"The parameter " + FILE_META_CACHE_TTL_SECOND + " is wrong, value is " + fileMetaCacheTtlSecond);
}
Map<String, String> properties = catalogProperty.getProperties();
if (properties.containsKey(HMSProperties.ENABLE_HMS_EVENTS_INCREMENTAL_SYNC)) {
enableHmsEventsIncrementalSync =
properties.get(HMSProperties.ENABLE_HMS_EVENTS_INCREMENTAL_SYNC).equals("true");
} else {
enableHmsEventsIncrementalSync = Config.enable_hms_events_incremental_sync;
}

enableHmsEventsIncrementalSync =
catalogProperty.getOrDefault(HMSProperties.ENABLE_HMS_EVENTS_INCREMENTAL_SYNC, "false").equals("true");

hmsEventsBatchSizePerRpc =
Integer.valueOf(catalogProperty.getOrDefault(HMSProperties.HMS_EVENTIS_BATCH_SIZE_PER_RPC, "-1"));
if (properties.containsKey(HMSProperties.HMS_EVENTIS_BATCH_SIZE_PER_RPC)) {
hmsEventsBatchSizePerRpc = Integer.valueOf(properties.get(HMSProperties.HMS_EVENTIS_BATCH_SIZE_PER_RPC));
} else {
hmsEventsBatchSizePerRpc = Config.hms_events_batch_size_per_rpc;
}

// check the dfs.ha properties
// 'dfs.nameservices'='your-nameservice',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
*/
public class IgnoredEvent extends MetastoreEvent {
private IgnoredEvent(NotificationEvent event, String catalogName) {
super(event, catalogName);
super(event);
}

protected static List<MetastoreEvent> getEvents(NotificationEvent event,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,18 @@ protected MetastoreEvent(long eventId, String catalogName, String dbName,
this.event = null;
}

// for IgnoredEvent
protected MetastoreEvent(NotificationEvent event) {
this.event = event;
this.metastoreNotificationEvent = event;
this.eventId = -1;
this.eventTime = -1L;
this.catalogName = null;
this.dbName = null;
this.tblName = null;
this.eventType = null;
}

protected MetastoreEvent(NotificationEvent event, String catalogName) {
this.event = event;
// Some events that we don't care about, dbName may be empty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ private void realRun() {
} catch (MetastoreNotificationFetchException e) {
LOG.warn("Failed to fetch hms events on {}. msg: ", hmsExternalCatalog.getName(), e);
} catch (Exception ex) {
hmsExternalCatalog.onRefresh(true);
hmsExternalCatalog.onRefreshCache(true);
updateLastSyncedEventId(hmsExternalCatalog, -1);
LOG.warn("Failed to process hive metastore [{}] events .",
hmsExternalCatalog.getName(), ex);
Expand Down Expand Up @@ -212,9 +212,7 @@ private NotificationEventResponse getNextEventResponseForMaster(HMSExternalCatal
return null;
}

int batchSize = hmsExternalCatalog.getHmsEventsBatchSizePerRpc() == -1
? Config.hms_events_batch_size_per_rpc
: hmsExternalCatalog.getHmsEventsBatchSizePerRpc();
int batchSize = hmsExternalCatalog.getHmsEventsBatchSizePerRpc();
try {
NotificationEventResponse notificationEventResponse =
hmsExternalCatalog.getClient().getNextNotification(lastSyncedEventId, batchSize, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ suite("test_hms_event_notification", "p0,external,hive,external_docker,external_
logger.info("diable Hive test.")
return;
}

for (String useMetaCache : ["true","false"] ) {
for (String hivePrefix : [ "hive2","hive3"]) {
try {
setHivePrefix(hivePrefix)
Expand All @@ -38,15 +38,15 @@ suite("test_hms_event_notification", "p0,external,hive,external_docker,external_
sql """create catalog if not exists ${catalog_name} properties (
"type"="hms",
'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hms_port}',
"use_meta_cache" = "false",
"use_meta_cache" = "${useMetaCache}",
"enable_hms_events_incremental_sync" ="true",
"hms_events_batch_size_per_rpc" = "1000"
);"""

sql """create catalog if not exists ${catalog_name_2} properties (
"type"="hms",
'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hms_port}',
"use_meta_cache" = "false",
"use_meta_cache" = "${useMetaCache}",
"enable_hms_events_incremental_sync" ="true"
);"""

Expand Down Expand Up @@ -383,6 +383,7 @@ suite("test_hms_event_notification", "p0,external,hive,external_docker,external_
sql """drop catalog if exists ${catalog_name}"""
} finally {
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ suite("test_hms_event_notification_multi_catalog", "p0,external,hive,external_do
return;
}

for (String useMetaCache : ["true","false"] ) {

for (String hivePrefix : [ "hive2","hive3"]) {
try {
setHivePrefix(hivePrefix)
Expand All @@ -38,7 +40,7 @@ suite("test_hms_event_notification_multi_catalog", "p0,external,hive,external_do
sql """create catalog if not exists ${catalog_name} properties (
"type"="hms",
'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hms_port}',
"use_meta_cache" = "false",
"use_meta_cache" = "${useMetaCache}",
"enable_hms_events_incremental_sync" ="true",
"hms_events_batch_size_per_rpc" = "10000"
);"""
Expand All @@ -47,7 +49,7 @@ suite("test_hms_event_notification_multi_catalog", "p0,external,hive,external_do
sql """create catalog if not exists ${catalog_name_2} properties (
"type"="hms",
'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hms_port}',
"use_meta_cache" = "false",
"use_meta_cache" = "${useMetaCache}",
"enable_hms_events_incremental_sync" ="true",
"hms_events_batch_size_per_rpc" = "100000"
);"""
Expand Down Expand Up @@ -666,6 +668,7 @@ suite("test_hms_event_notification_multi_catalog", "p0,external,hive,external_do
} finally {
}
}
}
}


Expand Down

0 comments on commit 335747a

Please sign in to comment.