Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to cancel watch? #528

Closed
ralf0131 opened this issue Mar 14, 2019 · 3 comments
Closed

How to cancel watch? #528

ralf0131 opened this issue Mar 14, 2019 · 3 comments

Comments

@ralf0131
Copy link

Hi

I use the following code to do test the jetcd watch API, my question is: how to cancel the watch with the watch API?

        String path="/hello"
        String endpoint = "http://127.0.0.1:2379";
        CountDownLatch latch = new CountDownLatch(1);
        ByteSequence key = ByteSequence.from(path, UTF_8);

        Watch.Listener listener = Watch.listener(response -> {
            for (WatchEvent event : response.getEvents()) {
                Assertions.assertEquals("PUT", event.getEventType().toString());
                Assertions.assertEquals(path, event.getKeyValue().getKey().toString(UTF_8));
                Assertions.assertEquals("Hello", event.getKeyValue().getValue().toString(UTF_8));
            }
            latch.countDown();
        });

        try (Client client = Client.builder().endpoints(endpoint).build();
             Watch watch = client.getWatchClient();
             Watch.Watcher watcher = watch.watch(key, listener)) {
            // try to modify the key
            client.getKVClient().put(ByteSequence.from(path, UTF_8), ByteSequence.from("Hello", UTF_8));
            latch.await();
        } catch (Exception e) {
            Assertions.fail(e.getMessage());
        }

According to the doc, a watch_id is required to cancel the watch.

Is there a way to obtain the watch id?

@fanminshi
Copy link
Member

@ralf0131 I think you can just called watcher.close() to cancel it since watcher implements closeable . you can see that from our test where we use IOUtils.closeQuietly(watcher) to close the watcher.

@ralf0131
Copy link
Author

Hi,

I don't think that would solve my solution.
I tried it locally, after I called watcher.close(), and then put value again, the listener would still receive response, which is not I want.

Actually when you look into the source code of watcher.close(), it just reset the stream so that it won't be able to make further request. But further update to the watched key will still be notified to the listener.

What I'd like to propose is to introduce a method called cancel in Watch.Watcher, which actually send the WatchCancelRequest.

@ralf0131
Copy link
Author

Here is a code snippet that I worked it around with low-level grpc interface:

    @Test
    public void testCancelWatchWithGrpc() {
        String path = "/hello";
        String endpoint = "http://127.0.0.1:2379";
        CountDownLatch updateLatch = new CountDownLatch(1);
        CountDownLatch cancelLatch = new CountDownLatch(1);
        final AtomicLong watchID = new AtomicLong(-1L);
        try (Client client = Client.builder().endpoints(endpoint).build()) {
            ManagedChannel channel = getChannel(client);
            StreamObserver<WatchRequest> observer = WatchGrpc.newStub(channel).watch(new StreamObserver<WatchResponse>() {
                @Override
                public void onNext(WatchResponse response) {
                    watchID.set(response.getWatchId());
                    for (Event event : response.getEventsList()) {
                        Assertions.assertEquals("PUT", event.getType().toString());
                        Assertions.assertEquals(path, event.getKv().getKey().toString(UTF_8));
                        Assertions.assertEquals("Hello", event.getKv().getValue().toString(UTF_8));
                        updateLatch.countDown();
                    }
                    if (response.getCanceled()) {
                        // received the cancel response
                        cancelLatch.countDown();
                    }
                }

                @Override
                public void onError(Throwable throwable) {

                }

                @Override
                public void onCompleted() {

                }
            });
            // create
            WatchCreateRequest.Builder builder = WatchCreateRequest.newBuilder()
                    .setKey(ByteString.copyFrom(path, UTF_8));

            // make the grpc call to watch the key
            observer.onNext(WatchRequest.newBuilder().setCreateRequest(builder).build());

            // try to put the value
            client.getKVClient().put(ByteSequence.from(path, UTF_8), ByteSequence.from("Hello", UTF_8));

            // response received, latch counts down to zero
            updateLatch.await();

            WatchCancelRequest watchCancelRequest =
                    WatchCancelRequest.newBuilder().setWatchId(watchID.get()).build();
            WatchRequest cancelRequest = WatchRequest.newBuilder()
                    .setCancelRequest(watchCancelRequest).build();
            observer.onNext(cancelRequest);

            // try to put the value
            client.getKVClient().put(ByteSequence.from(path, UTF_8), ByteSequence.from("Hello world", UTF_8));

            cancelLatch.await();
        } catch (Exception e) {
            Assertions.fail(e.getMessage());
        }
    }

    private ManagedChannel getChannel(Client client) {
        try {
            // hack, use reflection to get the shared channel.
            Field connectionField = client.getClass().getDeclaredField("connectionManager");
            connectionField.setAccessible(true);
            Object connection = connectionField.get(client);
            Method channelMethod = connection.getClass().getDeclaredMethod("getChannel");
            channelMethod.setAccessible(true);
            ManagedChannel channel = (ManagedChannel) channelMethod.invoke(connection);
            return channel;
        } catch (Exception e) {
            return null;
        }
    }

The getChannel() does some hack in order to reuse the channel created.
Hopefully the jetcd library could wrap that kind of logic for us.

lburgazzoli added a commit to lburgazzoli/etcd-io-jetcd that referenced this issue Mar 15, 2019
lburgazzoli added a commit to lburgazzoli/etcd-io-jetcd that referenced this issue Mar 15, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

No branches or pull requests

2 participants