Skip to content

Commit

Permalink
feat(businessAttribute): parallelize-business-attribute-propagation (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
deepgarg-visa authored and yoonhyejin committed Jul 16, 2024
1 parent 6a3462f commit 14c2e4d
Show file tree
Hide file tree
Showing 4 changed files with 201 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -942,29 +942,27 @@ public RelatedEntitiesScrollResult scrollRelatedEntities(
final String edgeCriteria = relationshipFilterToCriteria(relationshipFilter);

final RelationshipDirection relationshipDirection = relationshipFilter.getDirection();
String srcNodeLabel = "";

String matchTemplate = "MATCH (src %s)-[r%s %s]-(dest %s)%s";
if (relationshipDirection == RelationshipDirection.INCOMING) {
matchTemplate = "MATCH (src %s)<-[r%s %s]-(dest %s)%s";
} else if (relationshipDirection == RelationshipDirection.OUTGOING) {
matchTemplate = "MATCH (src %s)-[r%s %s]->(dest %s)%s";
}

String srcNodeLabel = StringUtils.EMPTY;
// Create a URN from the String. Only proceed if srcCriteria is not null or empty
if (srcCriteria != null && !srcCriteria.isEmpty()) {
if (StringUtils.isNotEmpty(srcCriteria)) {
final String urnValue =
sourceEntityFilter.getOr().get(0).getAnd().get(0).getValue().toString();
try {
final Urn urn = Urn.createFromString(urnValue);
srcNodeLabel = urn.getEntityType();
matchTemplate = matchTemplate.replace("(src ", "(src:%s ");
} catch (URISyntaxException e) {
log.error("Failed to parse URN: {} ", urnValue, e);
}
}
String matchTemplate = "MATCH (src:%s %s)-[r%s %s]-(dest %s)%s";
if (relationshipDirection == RelationshipDirection.INCOMING) {
matchTemplate = "MATCH (src:%s %s)<-[r%s %s]-(dest %s)%s";
} else if (relationshipDirection == RelationshipDirection.OUTGOING) {
matchTemplate = "MATCH (src:%s %s)-[r%s %s]->(dest %s)%s";
}

final String returnNodes =
String.format(
"RETURN dest, src, type(r)"); // Return both related entity and the relationship type.
final String returnCount = "RETURN count(*)"; // For getting the total results.

String relationshipTypeFilter = "";
if (!relationshipTypes.isEmpty()) {
Expand All @@ -974,18 +972,34 @@ public RelatedEntitiesScrollResult scrollRelatedEntities(
String whereClause = computeEntityTypeWhereClause(sourceTypes, destinationTypes);

// Build Statement strings
String baseStatementString =
String.format(
matchTemplate,
srcNodeLabel,
srcCriteria,
relationshipTypeFilter,
edgeCriteria,
destCriteria,
whereClause);
String baseStatementString;

if (StringUtils.isNotEmpty(srcNodeLabel)) {
baseStatementString =
String.format(
matchTemplate,
srcNodeLabel,
srcCriteria,
relationshipTypeFilter,
edgeCriteria,
destCriteria,
whereClause);
} else {
baseStatementString =
String.format(
matchTemplate,
srcCriteria,
relationshipTypeFilter,
edgeCriteria,
destCriteria,
whereClause);
}
log.info(baseStatementString);

final String returnNodes =
"RETURN dest, src, type(r)"; // Return both related entity and the relationship type.
final String returnCount = "RETURN count(*)"; // For getting the total results.

final String resultStatementString =
String.format("%s %s SKIP $offset LIMIT $count", baseStatementString, returnNodes);
final String countStatementString = String.format("%s %s", baseStatementString, returnCount);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@
import com.linkedin.mxe.PlatformEvent;
import com.linkedin.platform.event.v1.EntityChangeEvent;
import io.datahubproject.metadata.context.OperationContext;
import java.util.Arrays;
import java.util.Map;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.time.StopWatch;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.lang.NonNull;
import org.springframework.lang.Nullable;
Expand All @@ -41,18 +42,25 @@ public class BusinessAttributeUpdateHookService {
private final UpdateIndicesService updateIndicesService;
private final int relatedEntitiesCount;
private final int getRelatedEntitiesBatchSize;

private ExecutorService executor;
public static final String TAG = "TAG";
public static final String GLOSSARY_TERM = "GLOSSARY_TERM";
public static final String DOCUMENTATION = "DOCUMENTATION";
private final int threadCount;
private final int AWAIT_TERMINATION_TIME = 10;
private final int keepAlive;

public BusinessAttributeUpdateHookService(
@NonNull UpdateIndicesService updateIndicesService,
@NonNull @Value("${businessAttribute.fetchRelatedEntitiesCount}") int relatedEntitiesCount,
@NonNull @Value("${businessAttribute.fetchRelatedEntitiesBatchSize}") int relatedBatchSize) {
@NonNull @Value("${businessAttribute.fetchRelatedEntitiesBatchSize}") int relatedBatchSize,
@NonNull @Value("${businessAttribute.threadCount}") int threadCount,
@NonNull @Value("${businessAttribute.keepAliveTime}") int keepAlive) {
this.updateIndicesService = updateIndicesService;
this.relatedEntitiesCount = relatedEntitiesCount;
this.getRelatedEntitiesBatchSize = relatedBatchSize;
this.threadCount = threadCount;
this.keepAlive = keepAlive;
}

public void handleChangeEvent(
Expand All @@ -61,38 +69,51 @@ public void handleChangeEvent(
GenericRecordUtils.deserializePayload(
event.getPayload().getValue(), EntityChangeEvent.class);

executor = businessAttributePropagationWorkerPool(threadCount, keepAlive);

if (!entityChangeEvent.getEntityType().equals(Constants.BUSINESS_ATTRIBUTE_ENTITY_NAME)) {
log.info("Skipping MCL event for entity:" + entityChangeEvent.getEntityType());
return;
}

final Set<String> businessAttributeCategories =
ImmutableSet.of(TAG, GLOSSARY_TERM, DOCUMENTATION);
if (!businessAttributeCategories.contains(entityChangeEvent.getCategory())) {
log.info("Skipping MCL event for category: " + entityChangeEvent.getCategory());
return;
}

Urn urn = entityChangeEvent.getEntityUrn();
log.info("Business Attribute update hook invoked for urn :" + urn);
log.info("Business Attribute update hook invoked for urn : {}", urn);
fetchRelatedEntities(
opContext,
urn,
(batch, batchNumber) -> processBatch(opContext, batch, batchNumber),
(batch, batchNumber, entityKey) -> processBatch(opContext, batch, batchNumber, entityKey),
null,
0,
1);

executor.shutdown();
try {
if (!executor.awaitTermination(AWAIT_TERMINATION_TIME, TimeUnit.MINUTES)) {
executor.shutdownNow(); // Cancel currently executing tasks
if (!executor.awaitTermination(AWAIT_TERMINATION_TIME, TimeUnit.MINUTES))
log.error("Business Attribute Propagation Executor is not terminating");
}
} catch (InterruptedException ie) {
executor.shutdownNow();
}
}

private void fetchRelatedEntities(
@NonNull final OperationContext opContext,
@NonNull final Urn urn,
@NonNull final BiConsumer<RelatedEntitiesScrollResult, Integer> resultConsumer,
@NonNull
final TriFunction<RelatedEntitiesScrollResult, Integer, String, Callable<ExecutionResult>>
resultFunction,
@Nullable String scrollId,
int consumedEntityCount,
int batchNumber) {
GraphRetriever graph = opContext.getRetrieverContext().get().getGraphRetriever();

final ArrayList<Future<ExecutionResult>> futureList = new ArrayList<>();
RelatedEntitiesScrollResult result =
graph.scrollRelatedEntities(
null,
Expand All @@ -106,52 +127,143 @@ private void fetchRelatedEntities(
getRelatedEntitiesBatchSize,
null,
null);
resultConsumer.accept(result, batchNumber);

futureList.add(
executor.submit(resultFunction.apply(result, batchNumber, urn.getEntityKey().toString())));

consumedEntityCount = consumedEntityCount + result.getEntities().size();
if (result.getScrollId() != null && consumedEntityCount < relatedEntitiesCount) {
batchNumber = batchNumber + 1;
fetchRelatedEntities(
opContext, urn, resultConsumer, result.getScrollId(), consumedEntityCount, batchNumber);
opContext, urn, resultFunction, result.getScrollId(), consumedEntityCount, batchNumber);
}

for (Future<ExecutionResult> future : futureList) {
try {
ExecutionResult futureResult = future.get();
if (futureResult.getException() != null) {
log.error(
"Batch {} for BA:{} is failed with exception",
futureResult.getBatchNumber(),
futureResult.getEntityKey(),
futureResult.getException());
} else {
log.info(futureResult.getResult());
}
} catch (InterruptedException | ExecutionException e) {
log.error("Business Attribute Propagation Parallel Processing Exception", e);
}
}
futureList.clear();
}

private void processBatch(
private Callable<ExecutionResult> processBatch(
@NonNull OperationContext opContext,
@NonNull RelatedEntitiesScrollResult batch,
int batchNumber) {
AspectRetriever aspectRetriever = opContext.getRetrieverContext().get().getAspectRetriever();
log.info("BA Update Batch {} started", batchNumber);
Set<Urn> entityUrns =
batch.getEntities().stream()
.map(RelatedEntity::getUrn)
.map(UrnUtils::getUrn)
.collect(Collectors.toSet());

Map<Urn, Map<String, Aspect>> entityAspectMap =
aspectRetriever.getLatestAspectObjects(
entityUrns, Set.of(Constants.BUSINESS_ATTRIBUTE_ASPECT));

entityAspectMap.entrySet().stream()
.filter(entry -> entry.getValue().containsKey(Constants.BUSINESS_ATTRIBUTE_ASPECT))
.forEach(
entry -> {
final Urn entityUrn = entry.getKey();
final Aspect aspect = entry.getValue().get(Constants.BUSINESS_ATTRIBUTE_ASPECT);

updateIndicesService.handleChangeEvent(
opContext,
PegasusUtils.constructMCL(
null,
Constants.SCHEMA_FIELD_ENTITY_NAME,
entityUrn,
ChangeType.UPSERT,
Constants.BUSINESS_ATTRIBUTE_ASPECT,
opContext.getAuditStamp(),
new BusinessAttributes(aspect.data()),
null,
null,
null));
});
log.info("BA Update Batch {} completed", batchNumber);
int batchNumber,
String entityKey) {
return () -> {
StopWatch stopWatch = new StopWatch();
stopWatch.start();
AspectRetriever aspectRetriever = opContext.getRetrieverContext().get().getAspectRetriever();
log.info("Batch {} for BA:{} started", batchNumber, entityKey);
ExecutionResult executionResult = new ExecutionResult();
executionResult.setBatchNumber(batchNumber);
executionResult.setEntityKey(entityKey);
try {
Set<Urn> entityUrns =
batch.getEntities().stream()
.map(RelatedEntity::getUrn)
.map(UrnUtils::getUrn)
.collect(Collectors.toSet());

Map<Urn, Map<String, Aspect>> entityAspectMap =
aspectRetriever.getLatestAspectObjects(
entityUrns, Set.of(Constants.BUSINESS_ATTRIBUTE_ASPECT));

entityAspectMap.entrySet().stream()
.filter(entry -> entry.getValue().containsKey(Constants.BUSINESS_ATTRIBUTE_ASPECT))
.forEach(
entry -> {
final Urn entityUrn = entry.getKey();
final Aspect aspect = entry.getValue().get(Constants.BUSINESS_ATTRIBUTE_ASPECT);
updateIndicesService.handleChangeEvent(
opContext,
PegasusUtils.constructMCL(
null,
Constants.SCHEMA_FIELD_ENTITY_NAME,
entityUrn,
ChangeType.UPSERT,
Constants.BUSINESS_ATTRIBUTE_ASPECT,
opContext.getAuditStamp(),
new BusinessAttributes(aspect.data()),
null,
null,
null));
});
stopWatch.stop();
String result =
String.format(
"Batch %s for BA:%s is completed in %s",
batchNumber, entityKey, TimeAgo.toDuration(stopWatch.getTime()))
.toString();
executionResult.setResult(result);
} catch (Exception e) {
executionResult.setException(e);
}
return executionResult;
};
}

private ExecutorService businessAttributePropagationWorkerPool(int numThreads, int keepAlive) {
numThreads = numThreads < 0 ? Runtime.getRuntime().availableProcessors() * 2 : numThreads;
return new ThreadPoolExecutor(
numThreads, numThreads, keepAlive, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
}

@FunctionalInterface
private interface TriFunction<T, U, V, R> {
R apply(T t, U u, V v);
}

@Data
private class ExecutionResult {
String result;
Throwable exception;
int batchNumber;
String entityKey;
}

private static final class TimeAgo {
private static final List<Long> times =
Arrays.asList(
TimeUnit.DAYS.toMillis(365),
TimeUnit.DAYS.toMillis(30),
TimeUnit.DAYS.toMillis(1),
TimeUnit.HOURS.toMillis(1),
TimeUnit.MINUTES.toMillis(1),
TimeUnit.SECONDS.toMillis(1),
TimeUnit.MILLISECONDS.toMillis(1));
private static final List<String> timesString =
Arrays.asList("year", "month", "day", "hour", "minute", "second", "milliseconds");

private static String toDuration(long duration) {

StringBuffer res = new StringBuffer();
for (int i = 0; i < times.size(); i++) {
Long current = times.get(i);
long temp = duration / current;
if (temp > 0) {
res.append(temp)
.append(" ")
.append(timesString.get(i))
.append(temp != 1 ? "s" : StringUtils.EMPTY)
.append(" ");
}
duration = duration % current;
}
if (StringUtils.EMPTY.equals(res.toString())) return "0 seconds ago";
else return res.toString();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public void setupTest() throws URISyntaxException {
mockUpdateIndicesService = mock(UpdateIndicesService.class);
actorUrn = Urn.createFromString(TEST_ACTOR_URN);
businessAttributeServiceHook =
new BusinessAttributeUpdateHookService(mockUpdateIndicesService, 100, 1);
new BusinessAttributeUpdateHookService(mockUpdateIndicesService, 100, 1, 10, 60);
businessAttributeUpdateHook =
new BusinessAttributeUpdateHook(businessAttributeServiceHook, true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,8 @@ forms:
businessAttribute:
fetchRelatedEntitiesCount: ${BUSINESS_ATTRIBUTE_RELATED_ENTITIES_COUNT:20000}
fetchRelatedEntitiesBatchSize: ${BUSINESS_ATTRIBUTE_RELATED_ENTITIES_BATCH_SIZE:1000}
threadCount: ${BUSINESS_ATTRIBUTE_PROPAGATION_CONCURRENCY_THREAD_COUNT:-1} # Thread Pool size, default 2 * # of cores
keepAliveTime: ${BUSINESS_ATTRIBUTE_PROPAGATION_CONCURRENCY_KEEP_ALIVE:60} # Number of seconds to keep inactive threads alive

metadataChangeProposal:
throttle:
Expand Down

0 comments on commit 14c2e4d

Please sign in to comment.