diff --git a/CHANGELOG.md b/CHANGELOG.md index 8d70099a6..96ea4d6da 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,9 @@ and what APIs have changed, if applicable. ## [Unreleased] +## [29.63.2] - 2025-01-31 +- Make XdsDirectory lazy to subscribe the names + ## [29.63.1] - 2025-01-14 - Add XdsDirectory to get d2 service and cluster names from INDIS @@ -5764,7 +5767,8 @@ patch operations can re-use these classes for generating patch messages. ## [0.14.1] -[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.63.1...master +[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.63.2...master +[29.63.2]: https://github.com/linkedin/rest.li/compare/v29.63.1...v29.63.2 [29.63.1]: https://github.com/linkedin/rest.li/compare/v29.63.0...v29.63.1 [29.63.0]: https://github.com/linkedin/rest.li/compare/v29.62.1...v29.63.0 [29.62.1]: https://github.com/linkedin/rest.li/compare/v29.62.0...v29.62.1 diff --git a/d2/src/main/java/com/linkedin/d2/xds/balancer/XdsDirectory.java b/d2/src/main/java/com/linkedin/d2/xds/balancer/XdsDirectory.java index 3d0d30f46..d3181fc65 100644 --- a/d2/src/main/java/com/linkedin/d2/xds/balancer/XdsDirectory.java +++ b/d2/src/main/java/com/linkedin/d2/xds/balancer/XdsDirectory.java @@ -27,8 +27,7 @@ public class XdsDirectory implements Directory final ConcurrentMap _serviceNames = new ConcurrentHashMap<>(); @VisibleForTesting final ConcurrentMap _clusterNames = new ConcurrentHashMap<>(); - @VisibleForTesting - final AtomicReference _watcher = new AtomicReference<>(); + private final AtomicReference _watcher = new AtomicReference<>(); /** * A flag that shows whether the service/cluster names data is being updated. Requests to the data should wait until * the update is done. @@ -51,8 +50,7 @@ public XdsDirectory(XdsClient xdsClient) public void start() { LOG.debug("Starting. Setting isUpdating to true"); - _isUpdating.set(true); // initially set to true to block reads before the first update completes - addNameWatcher(); + _isUpdating.set(true); // initially set to true to block reads before the first (lazy) update completes } @Override diff --git a/d2/src/test/java/com/linkedin/d2/xds/balancer/TestXdsDirectory.java b/d2/src/test/java/com/linkedin/d2/xds/balancer/TestXdsDirectory.java index 6748143e4..24a4eb7ed 100644 --- a/d2/src/test/java/com/linkedin/d2/xds/balancer/TestXdsDirectory.java +++ b/d2/src/test/java/com/linkedin/d2/xds/balancer/TestXdsDirectory.java @@ -11,6 +11,8 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.stream.Collectors; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; import org.mockito.Mock; import org.mockito.MockitoAnnotations; import org.testng.Assert; @@ -35,12 +37,11 @@ public void testGetClusterAndServiceNames() throws InterruptedException { int halfCallers = numCallers / 2; XdsDirectoryFixture fixture = new XdsDirectoryFixture(); XdsDirectory directory = fixture._xdsDirectory; - Assert.assertNull(directory._watcher.get()); directory.start(); List expectedClusterNames = Collections.singletonList(CLUSTER_NAME); List expectedServiceNames = Collections.singletonList(SERVICE_NAME); fixture.runCallers(halfCallers, expectedClusterNames, expectedServiceNames); - XdsClient.WildcardD2ClusterOrServiceNameResourceWatcher watcher = Objects.requireNonNull(directory._watcher.get()); + XdsClient.WildcardD2ClusterOrServiceNameResourceWatcher watcher = fixture.waitWatcher(); // verified names are not updated, results are empty, which means all threads are waiting. Assert.assertTrue(directory._isUpdating.get()); @@ -58,7 +59,7 @@ public void testGetClusterAndServiceNames() throws InterruptedException { Assert.assertEquals(directory._clusterNames, Collections.singletonMap(CLUSTER_RESOURCE_NAME, CLUSTER_NAME)); Assert.assertEquals(directory._serviceNames, Collections.singletonMap(SERVICE_RESOURCE_NAME, SERVICE_NAME)); Assert.assertTrue(directory._isUpdating.get()); - Assert.assertEquals(fixture._latch.getCount(), numCallers); + Assert.assertEquals(fixture._callerLatch.getCount(), numCallers); // finish updating by another thread to verify the lock can be released by a different thread. All callers should // be unblocked and the isUpdating flag is false. @@ -77,7 +78,7 @@ public void testGetClusterAndServiceNames() throws InterruptedException { Assert.assertTrue(directory._isUpdating.get()); Assert.assertEquals(directory._serviceNames, ImmutableMap.of(SERVICE_RESOURCE_NAME, SERVICE_NAME, SERVICE_RESOURCE_NAME_2, SERVICE_NAME_2)); - Assert.assertEquals(fixture._latch.getCount(), 1); + Assert.assertEquals(fixture._callerLatch.getCount(), 1); // finish updating again, new data should be added to the results fixture.notifyComplete(); @@ -90,26 +91,44 @@ private static final class XdsDirectoryFixture XdsDirectory _xdsDirectory; @Mock XdsClient _xdsClient; - CountDownLatch _latch; + CountDownLatch _callerLatch; ExecutorService _executor; + CountDownLatch _watcherLatch = new CountDownLatch(1); + @Captor + ArgumentCaptor _watcherCaptor = + ArgumentCaptor.forClass(XdsClient.WildcardD2ClusterOrServiceNameResourceWatcher.class); + + public XdsDirectoryFixture() { MockitoAnnotations.initMocks(this); - doNothing().when(_xdsClient).watchAllXdsResources(any()); + doAnswer((invocation) -> { + _watcherLatch.countDown(); + return null; + }).when(_xdsClient).watchAllXdsResources(_watcherCaptor.capture()); _xdsDirectory = new XdsDirectory(_xdsClient); } + XdsClient.WildcardD2ClusterOrServiceNameResourceWatcher waitWatcher() throws InterruptedException + { + if (!_watcherLatch.await(1000, java.util.concurrent.TimeUnit.MILLISECONDS)) + { + Assert.fail("Timeout waiting for watcher to be added"); + } + return _watcherCaptor.getValue(); + } + void runCallers(int num, List expectedClusterResult, List expectedServiceResult) { if (_executor == null || _executor.isShutdown() || _executor.isTerminated()) { _executor = Executors.newFixedThreadPool(num); - _latch = new CountDownLatch(num); + _callerLatch = new CountDownLatch(num); } else { - _latch = new CountDownLatch((int) (_latch.getCount() + num)); + _callerLatch = new CountDownLatch((int) (_callerLatch.getCount() + num)); } for (int i = 0; i < num; i++) @@ -122,7 +141,7 @@ void runCallers(int num, List expectedClusterResult, List expect void waitCallers() throws InterruptedException { _executor.shutdown(); - if (!_latch.await(1000, java.util.concurrent.TimeUnit.MILLISECONDS)) + if (!_callerLatch.await(1000, java.util.concurrent.TimeUnit.MILLISECONDS)) { Assert.fail("Timeout waiting for all callers to finish"); } @@ -135,7 +154,7 @@ CallerThread createCaller(boolean isForServiceNames, List expectedResult void notifyComplete() { - Thread t = new Thread(() -> _xdsDirectory._watcher.get().onAllResourcesProcessed()); + Thread t = new Thread(() -> _watcherCaptor.getValue().onAllResourcesProcessed()); t.start(); @@ -177,7 +196,7 @@ public void onError(Throwable e) public void onSuccess(List result) { assertTrue(matchSortedLists(result, expectedResult)); - _latch.countDown(); + _callerLatch.countDown(); } }; _isForServiceNames = isForServiceNames; diff --git a/gradle.properties b/gradle.properties index 23824f1c7..fc9618dee 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,4 +1,4 @@ -version=29.63.1 +version=29.63.2 group=com.linkedin.pegasus org.gradle.configureondemand=true org.gradle.parallel=true