Skip to content

Commit

Permalink
[improvement](statistics)Return -1 when external table row count is u…
Browse files Browse the repository at this point in the history
…nknown. (#38990)

Return -1 when external table row count is unknown. 
Don't cache any row count value when loading row count for external
table get exception.
  • Loading branch information
Jibing-Li authored and dataroaring committed Aug 11, 2024
1 parent 0944c46 commit c02977a
Show file tree
Hide file tree
Showing 8 changed files with 126 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,56 +86,57 @@ protected Optional<Long> doLoad(RowCountKey rowCountKey) {
TableIf table = StatisticsUtil.findTable(rowCountKey.catalogId, rowCountKey.dbId, rowCountKey.tableId);
return Optional.of(table.fetchRowCount());
} catch (Exception e) {
LOG.warn("Failed to get table with catalogId {}, dbId {}, tableId {}", rowCountKey.catalogId,
rowCountKey.dbId, rowCountKey.tableId);
return Optional.empty();
LOG.warn("Failed to get table row count with catalogId {}, dbId {}, tableId {}. Reason {}",
rowCountKey.catalogId, rowCountKey.dbId, rowCountKey.tableId, e.getMessage());
LOG.debug(e);
// Return Optional.empty() will cache this empty value in memory,
// so we can't try to load the row count until the cache expire.
// Throw an exception here will cause too much stack log in fe.out.
// So we return null when exception happen.
// Null may raise NPE in caller, but that is expected.
// We catch that NPE and return a default value -1 without keep the value in cache,
// so we can trigger the load function to fetch row count again next time in this exception case.
return null;
}
}
}

/**
* Get cached row count for the given table. Return 0 if cached not loaded or table not exists.
* Cached will be loaded async.
* @param catalogId
* @param dbId
* @param tableId
* @return Cached row count or 0 if not exist
* @return Cached row count or -1 if not exist
*/
public long getCachedRowCount(long catalogId, long dbId, long tableId) {
RowCountKey key = new RowCountKey(catalogId, dbId, tableId);
try {
CompletableFuture<Optional<Long>> f = rowCountCache.get(key);
if (f.isDone()) {
return f.get().orElse(0L);
return f.get().orElse(-1L);
}
} catch (Exception e) {
LOG.warn("Unexpected exception while returning row count", e);
}
return 0;
return -1;
}

/**
* Get cached row count for the given table if present. Return 0 if cached not loaded.
* Get cached row count for the given table if present. Return -1 if cached not loaded.
* This method will not trigger async loading if cache is missing.
*
* @param catalogId
* @param dbId
* @param tableId
* @return
* @return Cached row count or -1 if not exist
*/
public long getCachedRowCountIfPresent(long catalogId, long dbId, long tableId) {
RowCountKey key = new RowCountKey(catalogId, dbId, tableId);
try {
CompletableFuture<Optional<Long>> f = rowCountCache.getIfPresent(key);
if (f == null) {
return 0;
return -1;
} else if (f.isDone()) {
return f.get().orElse(0L);
return f.get().orElse(-1L);
}
} catch (Exception e) {
LOG.warn("Unexpected exception while returning row count if present", e);
}
return 0;
return -1;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -187,38 +187,38 @@ public String getMysqlType() {

@Override
public long getRowCount() {
// Return 0 if makeSureInitialized throw exception.
// Return -1 if makeSureInitialized throw exception.
// For example, init hive table may throw NotSupportedException.
try {
makeSureInitialized();
} catch (Exception e) {
LOG.warn("Failed to initialize table {}.{}.{}", catalog.getName(), dbName, name, e);
return 0;
return -1;
}
// All external table should get external row count from cache.
return Env.getCurrentEnv().getExtMetaCacheMgr().getRowCountCache().getCachedRowCount(catalog.getId(), dbId, id);
}

@Override
public long getCachedRowCount() {
// Return 0 if makeSureInitialized throw exception.
// Return -1 if makeSureInitialized throw exception.
// For example, init hive table may throw NotSupportedException.
try {
makeSureInitialized();
} catch (Exception e) {
LOG.warn("Failed to initialize table {}.{}.{}", catalog.getName(), dbName, name, e);
return 0;
return -1;
}
return Env.getCurrentEnv().getExtMetaCacheMgr().getRowCountCache().getCachedRowCount(catalog.getId(), dbId, id);
}

@Override
/**
* Default return 0. Subclass need to implement this interface.
* Default return -1. Subclass need to implement this interface.
* This is called by ExternalRowCountCache to load row count cache.
*/
public long fetchRowCount() {
return 0;
return -1;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ public long getCreateTime() {
}

private long getRowCountFromExternalSource() {
long rowCount;
long rowCount = -1;
switch (dlaType) {
case HIVE:
rowCount = StatisticsUtil.getHiveRowCount(this);
Expand All @@ -332,7 +332,6 @@ private long getRowCountFromExternalSource() {
if (LOG.isDebugEnabled()) {
LOG.debug("getRowCount for dlaType {} is not supported.", dlaType);
}
rowCount = -1;
}
return rowCount;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -592,22 +592,20 @@ public static List<Column> getSchema(ExternalCatalog catalog, String dbName, Str
* @return estimated row count
*/
public static long getIcebergRowCount(ExternalCatalog catalog, String dbName, String tbName) {
try {
Table icebergTable = Env.getCurrentEnv()
.getExtMetaCacheMgr()
.getIcebergMetadataCache()
.getIcebergTable(catalog, dbName, tbName);
Snapshot snapshot = icebergTable.currentSnapshot();
if (snapshot == null) {
// empty table
return 0;
}
Map<String, String> summary = snapshot.summary();
return Long.parseLong(summary.get(TOTAL_RECORDS)) - Long.parseLong(summary.get(TOTAL_POSITION_DELETES));
} catch (Exception e) {
LOG.warn("Fail to collect row count for db {} table {}", dbName, tbName, e);
// the table may be null when the iceberg metadata cache is not loaded.But I don't think it's a problem,
// because the NPE would be caught in the caller and return the default value -1.
// Meanwhile, it will trigger iceberg metadata cache to load the table, so we can get it next time.
Table icebergTable = Env.getCurrentEnv()
.getExtMetaCacheMgr()
.getIcebergMetadataCache()
.getIcebergTable(catalog, dbName, tbName);
Snapshot snapshot = icebergTable.currentSnapshot();
if (snapshot == null) {
// empty table
return 0;
}
return -1;
Map<String, String> summary = snapshot.summary();
return Long.parseLong(summary.get(TOTAL_RECORDS)) - Long.parseLong(summary.get(TOTAL_POSITION_DELETES));
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,22 +187,17 @@ public BaseAnalysisTask createAnalysisTask(AnalysisInfo info) {
@Override
public long fetchRowCount() {
makeSureInitialized();
try {
long rowCount = 0;
Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue();
Table paimonTable = schemaCacheValue.map(value -> ((PaimonSchemaCacheValue) value).getPaimonTable())
.orElse(null);
if (paimonTable == null) {
return -1;
}
List<Split> splits = paimonTable.newReadBuilder().newScan().plan().splits();
for (Split split : splits) {
rowCount += split.rowCount();
}
return rowCount;
} catch (Exception e) {
LOG.warn("Fail to collect row count for db {} table {}", dbName, name, e);
long rowCount = 0;
Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue();
Table paimonTable = schemaCacheValue.map(value -> ((PaimonSchemaCacheValue) value).getPaimonTable())
.orElse(null);
if (paimonTable == null) {
return -1;
}
List<Split> splits = paimonTable.newReadBuilder().newScan().plan().splits();
for (Split split : splits) {
rowCount += split.rowCount();
}
return -1;
return rowCount;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource;

import org.apache.doris.common.ThreadPoolManager;

import mockit.Mock;
import mockit.MockUp;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.util.Optional;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;

public class ExternalRowCountCacheTest {
@Test
public void testLoadWithException() throws Exception {
ThreadPoolExecutor executor = ThreadPoolManager.newDaemonFixedThreadPool(
1, Integer.MAX_VALUE, "TEST", true);
AtomicInteger counter = new AtomicInteger(0);

new MockUp<ExternalRowCountCache.RowCountCacheLoader>() {
@Mock
protected Optional<Long> doLoad(ExternalRowCountCache.RowCountKey rowCountKey) {
counter.incrementAndGet();
return null;
}
};
ExternalRowCountCache cache = new ExternalRowCountCache(executor);
long cachedRowCount = cache.getCachedRowCount(1, 1, 1);
Assertions.assertEquals(-1, cachedRowCount);
for (int i = 0; i < 60; i++) {
if (counter.get() == 1) {
break;
}
Thread.sleep(1000);
}
Assertions.assertEquals(1, counter.get());

new MockUp<ExternalRowCountCache.RowCountCacheLoader>() {
@Mock
protected Optional<Long> doLoad(ExternalRowCountCache.RowCountKey rowCountKey) {
counter.incrementAndGet();
return Optional.of(100L);
}
};
cache.getCachedRowCount(1, 1, 1);
for (int i = 0; i < 60; i++) {
cachedRowCount = cache.getCachedRowCount(1, 1, 1);
if (cachedRowCount != -1) {
Assertions.assertEquals(100, cachedRowCount);
break;
}
Thread.sleep(1000);
}
Assertions.assertEquals(2, counter.get());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ suite("test_iceberg_table_stats", "p0,external,doris,external_docker,external_do
while (retry < 10) {
def result = sql """ show table stats ${table_name} """
act = result[0][2]
if (act != "0") {
if (act != "-1") {
break;
}
Thread.sleep(2000)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ suite("test_paimon_table_stats", "p0,external,doris,external_docker,external_doc
while (retry < 10) {
def result = sql """ show table stats ${table_name} """
act = result[0][2]
if (act != "0") {
if (act != "-1") {
break;
}
Thread.sleep(2000)
Expand Down

0 comments on commit c02977a

Please sign in to comment.