Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pyln: Add command notifications support #4311

Merged
merged 3 commits into from
Jan 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 63 additions & 10 deletions contrib/pyln-client/pyln/client/lightning.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
from decimal import Decimal
from math import floor, log10
from typing import Optional, Union
import json
import logging
import os
import socket
import warnings
from contextlib import contextmanager
from decimal import Decimal
from json import JSONEncoder
from math import floor, log10
from typing import Optional, Union


def _patched_default(self, obj):
Expand Down Expand Up @@ -283,8 +284,9 @@ def __init__(self, socket_path, executor=None, logger=logging, encoder_cls=json.
self.decoder = decoder
self.executor = executor
self.logger = logger
self._notify = None

self.next_id = 0
self.next_id = 1

def _writeobj(self, sock, obj):
s = json.dumps(obj, ensure_ascii=False, cls=self.encoder_cls)
Expand Down Expand Up @@ -334,18 +336,44 @@ def call(self, method, payload=None):
# FIXME: we open a new socket for every readobj call...
sock = UnixSocket(self.socket_path)
this_id = self.next_id
self._writeobj(sock, {
self.next_id += 0
buf = b''

if self._notify is not None:
# Opt into the notifications support
self._writeobj(sock, {
"jsonrpc": "2.0",
"method": "notifications",
"id": 0,
"params": {
"enable": True
},
})
_, buf = self._readobj(sock, buf)

request = {
"jsonrpc": "2.0",
"method": method,
"params": payload,
"id": this_id,
})
self.next_id += 1
buf = b''
}

self._writeobj(sock, request)
while True:
resp, buf = self._readobj(sock, buf)
# FIXME: We should offer a callback for notifications.
if 'method' not in resp or 'id' in resp:
id = resp.get("id", None)
meth = resp.get("method", None)

if meth == 'message' and self._notify is not None:
n = resp['params']
self._notify(
message=n.get('message', None),
progress=n.get('progress', None),
request=request
)
continue

if meth is None or id is None:
break

self.logger.debug("Received response for %s call: %r", method, resp)
Expand All @@ -361,6 +389,31 @@ def call(self, method, payload=None):
raise ValueError("Malformed response, \"result\" missing.")
return resp["result"]

@contextmanager
def notify(self, fn):
"""Register a notification callback to use for a set of RPC calls.

This is a context manager and should be used like this:

```python
def fn(message, progress, request, **kwargs):
print(message)

with rpc.notify(fn):
rpc.somemethod()
```

The `fn` function will be called once for each notification
the is sent by `somemethod`. This is a context manager,
meaning that multiple commands can share the same context, and
the same notification function.

"""
old = self._notify
self._notify = fn
yield
self._notify = old


class LightningRpc(UnixDomainSocketRpc):
"""
Expand Down
55 changes: 44 additions & 11 deletions contrib/pyln-client/pyln/client/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,47 @@ def set_exception(self, exc: Union[Exception, RpcException]) -> None:
def _write_result(self, result: dict) -> None:
self.plugin._write_locked(result)

def _notify(self, method: str, params: JSONType) -> None:
"""Send a notification to the caller.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already have this? It's split into the two kinds of notifications we have currently defined though: notify_message and notify_progress.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I didn't see that it's in the Plugin not the Request, that's a really weird place for it, if we require access to the request anyway. I'll change Plugin.notify_message to become a wrapper of Request.notify and move that code there instead.

Copy link
Contributor

@rustyrussell rustyrussell Jan 7, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, good point. Pretty sure this API is unused, so just move it.


Can contain a variety of things, but is usually used to report
progress or command status.

"""
self._write_result({
'jsonrpc': '2.0',
'params': params,
"method": method,
})

def notify(self, message: str, level: str = 'info') -> None:
"""Send a message notification to the caller.
"""
self._notify(
"message",
params={
'id': self.id,
'level': level,
'message': message,
}
)

def progress(self,
progress: int,
total: int,
stage: Optional[int] = None,
stage_total: Optional[int] = None
) -> None:
d: Dict[str, JSONType] = {
"id": self.id,
"num": progress,
"total": total,
}
if stage is not None and stage_total is not None:
d['stage'] = {"num": stage, "total": stage_total}

self._notify("progress", d)


# If a hook call fails we need to coerce it into something the main daemon can
# handle. Returning an error is not an option since we explicitly do not allow
Expand Down Expand Up @@ -639,22 +680,14 @@ def log(self, message: str, level: str = 'info') -> None:
def notify_message(self, request: Request, message: str,
level: str = 'info') -> None:
"""Send a notification message to sender of this request"""
self.notify("message", {"id": request.id,
"level": level,
"message": message})
request.notify(message=message)

def notify_progress(self, request: Request,
progress: int, progress_total: int,
stage: Optional[int] = None,
stage_total: Optional[int] = None) -> None:
"""Send a progerss message to sender of this request: if more than one stage, set stage and stage_total"""
d: Dict[str, Any] = {"id": request.id,
"num": progress,
"total": progress_total}
if stage_total is not None:
d['stage'] = {"num": stage, "total": stage_total}

self.notify("progress", d)
"""Send a progress message to sender of this request: if more than one stage, set stage and stage_total"""
request.progress(progress, progress_total, stage, stage_total)

def _parse_request(self, jsrequest: Dict[str, JSONType]) -> Request:
i = jsrequest.get('id', None)
Expand Down
19 changes: 19 additions & 0 deletions tests/plugins/countdown.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#!/usr/bin/env python3

from pyln.client import Plugin
import time

plugin = Plugin()


@plugin.method("countdown")
def countdown(count, plugin, request):
count = int(count)
for i in range(count):
time.sleep(0.1)
request.notify("{}/{}".format(i, count), "INFO")

return "Done"


plugin.run()
40 changes: 34 additions & 6 deletions tests/test_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -2027,15 +2027,18 @@ def test_notify(node_factory):
assert out[2 + i].endswith("|\n")
else:
assert out[2 + i].endswith("|\r")
assert out[102] == '\r'

assert out[102] == '# Beginning stage 2\n'
assert out[103] == '\r'

for i in range(10):
assert out[103 + i].startswith("# Stage 2/2 {:>2}/10 |".format(1 + i))
assert out[104 + i].startswith("# Stage 2/2 {:>2}/10 |".format(1 + i))
if i == 9:
assert out[103 + i].endswith("|\n")
assert out[104 + i].endswith("|\n")
else:
assert out[103 + i].endswith("|\r")
assert out[113] == '"This worked"\n'
assert len(out) == 114
assert out[104 + i].endswith("|\r")
assert out[114] == '"This worked"\n'
assert len(out) == 115

# At debug level, we get the second prompt.
out = subprocess.check_output(['cli/lightning-cli',
Expand Down Expand Up @@ -2199,3 +2202,28 @@ def test_dynamic_args(node_factory):
l1.rpc.plugin_stop(plugin_path)

assert [p for p in l1.rpc.listconfigs()['plugins'] if p['path'] == plugin_path] == []


def test_pyln_request_notify(node_factory):
"""Test that pyln-client plugins can send notifications.
"""
plugin_path = os.path.join(
os.path.dirname(__file__), 'plugins/countdown.py'
)
l1 = node_factory.get_node(options={'plugin': plugin_path})
notifications = []

def n(*args, message, **kwargs):
print("Got a notification:", message)
notifications.append(message)

with l1.rpc.notify(n):
l1.rpc.countdown(10)

expected = ['{}/10'.format(i) for i in range(10)]
assert expected == notifications

# Calling without the context manager we should not get any notifications
notifications = []
l1.rpc.countdown(10)
assert notifications == []