From e7fe2683ed822d6a43eb966ccdbee998aa8e196b Mon Sep 17 00:00:00 2001 From: igorcoding Date: Wed, 29 Jun 2022 13:40:41 +0300 Subject: [PATCH] cleanup and README update --- README.md | 43 +++++--- asynctnt/__init__.py | 2 +- temp/superbench.py => bench/benchmark.py | 2 +- {temp => bench}/init.lua | 0 temp/bench.py | 131 ----------------------- temp/bench_cmp.py | 35 ------ temp/bug_push_hangs.py | 37 ------- temp/demo.lua | 32 ------ temp/demo/demo.py | 55 ---------- temp/demo/init.lua | 27 ----- temp/instance.py | 53 --------- temp/memleak.py | 24 ----- temp/push.py | 22 ---- temp/resp.py | 20 ---- temp/schema_fail.py | 22 ---- temp/sql.py | 20 ---- temp/time_connect.py | 67 ------------ temp/with_crash.py | 33 ------ 18 files changed, 30 insertions(+), 595 deletions(-) rename temp/superbench.py => bench/benchmark.py (98%) rename {temp => bench}/init.lua (100%) delete mode 100644 temp/bench.py delete mode 100644 temp/bench_cmp.py delete mode 100644 temp/bug_push_hangs.py delete mode 100644 temp/demo.lua delete mode 100644 temp/demo/demo.py delete mode 100644 temp/demo/init.lua delete mode 100644 temp/instance.py delete mode 100644 temp/memleak.py delete mode 100644 temp/push.py delete mode 100644 temp/resp.py delete mode 100644 temp/schema_fail.py delete mode 100644 temp/sql.py delete mode 100644 temp/time_connect.py delete mode 100644 temp/with_crash.py diff --git a/README.md b/README.md index 15bd64c..60bb492 100644 --- a/README.md +++ b/README.md @@ -31,9 +31,11 @@ Documentation is available [here](https://igorcoding.github.io/asynctnt). * Support for all the **basic requests** that Tarantool supports. This includes: `insert`, `select`, `update`, `upsert`, `call`, `eval`, `execute`. -* Full support for **SQL**, including prepared statements. -* Support for **interactive transaction** via Tarantool streams. -* Support of `Decimal` and `UUID` types natively (starting from Tarantool 2.4.1). +* Full support for [SQL](https://www.tarantool.io/en/doc/latest/tutorials/sql_tutorial/), + including [prepared statements](https://www.tarantool.io/en/doc/latest/reference/reference_lua/box_sql/prepare/). +* Support for [interactive transaction](https://www.tarantool.io/en/doc/latest/book/box/atomic/txn_mode_mvcc/) via Tarantool streams. +* Support of `Decimal`, `UUID` and `datetime` types natively. +* Support for parsing [custom errors](https://www.tarantool.io/en/doc/latest/reference/reference_lua/box_error/new/). * **Schema fetching** on connection establishment, so you can use spaces and indexes names rather than their ids, and **auto refetching** if schema in Tarantool is changed @@ -142,8 +144,8 @@ async def main(): conn = asynctnt.Connection(host='127.0.0.1', port=3301) await conn.connect() - await conn.execute("insert into users (id, name) values (1, 'James Bond')") - await conn.execute("insert into users (id, name) values (2, 'Ethan Hunt')") + await conn.execute("insert into users (id, name) values (?, ?)", [1, 'James Bond']) + await conn.execute("insert into users (id, name) values (?, ?)", [2, 'Ethan Hunt']) data = await conn.execute('select * from users') for row in data: @@ -160,7 +162,7 @@ Stdout: ``` -More about SQL features in asynctnt please refer to the [documentation](https://igorcoding.github.io/asynctnt/examples.html#using-sql) +More about SQL features in asynctnt please refer to the [documentation](https://igorcoding.github.io/asynctnt/sql.html) ## Performance @@ -168,17 +170,28 @@ Two performance tests were conducted: 1. `Seq` -- Sequentially calling 40k requests and measuring performance 2. `Parallel` -- Sending 200k in 300 parallel coroutines -On all the benchmarks below `wal_mode = none` +On all the benchmarks below `wal_mode = none`. Turning `uvloop` on has a massive effect on the performance, so it is recommended to use `asynctnt` with it -| | Seq (uvloop=off) | Seq (uvloop=on) | Parallel (uvloop=off) | Parallel (uvloop=on) | -|----------|-----------------:|----------------:|----------------------:|---------------------:| -| `ping` | 9037.07 | 20372.59 | 44090.53 | 134043.41 | -| `call` | 9113.32 | 17279.21 | 41129.16 | 99866.28 | -| `eval` | 8921.95 | 16642.67 | 44097.02 | 91056.69 | -| `select` | 9681.12 | 17730.24 | 35853.33 | 108940.41 | -| `insert` | 9332.21 | 17384.26 | 31329.85 | 102971.13 | -| `update` | 10532.75 | 15990.12 | 36281.59 | 98643.46 | +**Benchmark environment** +* MacBook Pro 2020 +* CPU: 2 GHz Quad-Core Intel Core i5 +* Memory: 16GB 3733 MHz LPDDR4X + +Tarantool: +```lua +box.cfg{wal_mode = 'none'} +``` + +| | Seq (uvloop=off) | Seq (uvloop=on) | Parallel (uvloop=off) | Parallel (uvloop=on) | +|-----------|------------------:|----------------:|----------------------:|---------------------:| +| `ping` | 12940.93 | 19980.82 | 88341.95 | 215756.24 | +| `call` | 11586.38 | 18783.56 | 74651.40 | 137557.25 | +| `eval` | 10631.19 | 17040.57 | 61077.84 | 121542.42 | +| `select` | 9613.88 | 16718.97 | 61584.07 | 152526.21 | +| `insert` | 10077.10 | 16989.06 | 65594.82 | 135491.25 | +| `update` | 10832.16 | 16562.80 | 63003.31 | 121892.28 | +| `execute` | 10431.75 | 16967.85 | 58377.81 | 96891.61 | ## License diff --git a/asynctnt/__init__.py b/asynctnt/__init__.py index c7e957a..380cff0 100644 --- a/asynctnt/__init__.py +++ b/asynctnt/__init__.py @@ -5,4 +5,4 @@ Db, IProtoError, IProtoErrorStackFrame ) -__version__ = '2.0.0b1' +__version__ = '2.0.0b2' diff --git a/temp/superbench.py b/bench/benchmark.py similarity index 98% rename from temp/superbench.py rename to bench/benchmark.py index 3dde57c..4a622a3 100644 --- a/temp/superbench.py +++ b/bench/benchmark.py @@ -32,7 +32,7 @@ def main(): ['execute', ['select 1 as a, 2 as b'], dict(parse_metadata=False)], ] - for use_uvloop in [True, ]: + for use_uvloop in [True]: if use_uvloop: import uvloop asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) diff --git a/temp/init.lua b/bench/init.lua similarity index 100% rename from temp/init.lua rename to bench/init.lua diff --git a/temp/bench.py b/temp/bench.py deleted file mode 100644 index 5425e38..0000000 --- a/temp/bench.py +++ /dev/null @@ -1,131 +0,0 @@ -import argparse -import asyncio -import datetime -import logging -import math -import sys - - -def main(): - logging.basicConfig(level=logging.DEBUG, stream=sys.stdout) - parser = argparse.ArgumentParser() - parser.add_argument('--asynctnt', type=bool, default=False) - parser.add_argument('--aiotnt', type=bool, default=False) - parser.add_argument('--tarantool', type=bool, default=False) - parser.add_argument('--uvloop', type=bool, default=False) - parser.add_argument('-n', type=int, default=50000, - help='number of executed requests') - parser.add_argument('-b', type=int, default=100, help='number of bulks') - args = parser.parse_args() - - if args.uvloop: - import uvloop - asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) - - loop = asyncio.get_event_loop() - - print('Running \'{}\' for {} requests in {} batches. ' - 'Using uvloop: {}\n'.format( - 'aiotarantool' if args.aiotnt else 'asynctnt', - args.n, args.b, args.uvloop) - ) - - if args.aiotnt: - loop.run_until_complete( - bench_aiotarantool(args.n, args.b) - ) - elif args.tarantool: - bench_tarantool(args.n, 1) - else: - loop.run_until_complete( - bench_asynctnt(args.n, args.b) - ) - - -async def bench_asynctnt(n, b): - import asynctnt - from asynctnt.iproto.protocol import Iterator - - conn = asynctnt.Connection(host='127.0.0.1', - port=3305, - username='t1', - password='t1', - reconnect_timeout=1) - await conn.connect() - - n_requests_per_bulk = int(math.ceil(n / b)) - - async def bulk_f(): - for _ in range(n_requests_per_bulk): - # await conn.ping() - # await conn.call('test') - # await conn.eval('return "hello"') - await conn.select(512) - # await conn.sql('select 1 as a, 2 as b') - # await conn.replace('tester', [2, 'hhhh']) - # await conn.update('tester', [2], [(':', 1, 1, 3, 'yo!')]) - - coros = [bulk_f() for _ in range(b)] - - start = datetime.datetime.now() - await asyncio.wait(coros) - end = datetime.datetime.now() - - elapsed = end - start - print('Elapsed: {}, RPS: {}. TPR: {}'.format(elapsed, n / elapsed.total_seconds(), elapsed.total_seconds() / n)) - - -async def bench_aiotarantool(n, b): - import aiotarantool - - conn = aiotarantool.connect('127.0.0.1', 3305, - user='t1', password='t1') - - n_requests_per_bulk = int(math.ceil(n / b)) - - async def bulk_f(): - for _ in range(n_requests_per_bulk): - await conn.ping() - # await conn.call('test') - # await conn.eval('return "hello"') - # await conn.select(512) - # await conn.replace('tester', [2, 'hhhh']) - # await conn.update('tester', [2], [(':', 1, 1, 3, 'yo!')]) - - coros = [bulk_f() for _ in range(b)] - - start = datetime.datetime.now() - await asyncio.wait(coros) - end = datetime.datetime.now() - - elapsed = end - start - print('Elapsed: {}, RPS: {}'.format(elapsed, n / elapsed.total_seconds())) - - -def bench_tarantool(n, b): - import tarantool - - conn = tarantool.Connection(host='127.0.0.1', - port=3305, - user='t1', - password='t1') - conn.connect() - b = 1 - n_requests_per_bulk = int(math.ceil(n / b)) - - start = datetime.datetime.now() - for _ in range(n_requests_per_bulk): - # conn.ping() - # await conn.call('test') - # await conn.eval('return "hello"') - conn.select(512) - # await conn.replace('tester', [2, 'hhhh']) - # await conn.update('tester', [2], [(':', 1, 1, 3, 'yo!')]) - - end = datetime.datetime.now() - elapsed = end - start - print('Elapsed: {}, RPS: {}'.format(elapsed, n / elapsed.total_seconds())) - - -if __name__ == '__main__': - main() diff --git a/temp/bench_cmp.py b/temp/bench_cmp.py deleted file mode 100644 index bbdd2ab..0000000 --- a/temp/bench_cmp.py +++ /dev/null @@ -1,35 +0,0 @@ -import asyncio -import uvloop; asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) -import datetime - - -def create_tuple(i, size): - s = 'x' * size - return [] - -async def run_asynctnt(): - import asynctnt - conn = asynctnt.Connection(host='127.0.0.1', port=3303) - await conn.connect() - - n = 100000 - size = 1.5 * 1024 - - start = datetime.datetime.now() - - for i in range(1, n + 1): - # await conn.insert('tester', [i, 'x' * int(size)]) - # await conn.select('tester', [i]) - await conn.delete('tester', [i]) - - # values = await conn.select('tester', []) - - end = datetime.datetime.now() - elapsed = end - start - print('Elapsed: {}, RPS: {}'.format(elapsed, n / elapsed.total_seconds())) - - await conn.disconnect() - - -loop = asyncio.get_event_loop() -loop.run_until_complete(run_asynctnt()) diff --git a/temp/bug_push_hangs.py b/temp/bug_push_hangs.py deleted file mode 100644 index 8f6ffae..0000000 --- a/temp/bug_push_hangs.py +++ /dev/null @@ -1,37 +0,0 @@ -import asyncio -import asynctnt - - -def get_push_iterator(connection): - fut = connection.call("infinite_push_loop", push_subscribe=True) - return fut, asynctnt.PushIterator(fut) - - -async def main(): - async with asynctnt.Connection(port=3301) as conn: - - fut, it = get_push_iterator(conn) - transport_id = id(conn._transport) - - while True: - current_transport_id = id(conn._transport) - if current_transport_id != transport_id: - transport_id = current_transport_id - fut, it = get_push_iterator(conn) - - try: - result = await it.__anext__() - # result = await asyncio.wait_for(it.__anext__(), timeout=10) - print(result) - except asyncio.TimeoutError: - print('timeout') - pass - except Exception as e: - # res = await fut - # print(res) - print(e) - return - - -if __name__ == "__main__": - asyncio.run(main()) diff --git a/temp/demo.lua b/temp/demo.lua deleted file mode 100644 index 4df8f16..0000000 --- a/temp/demo.lua +++ /dev/null @@ -1,32 +0,0 @@ -box.cfg { - listen = '127.0.0.1:3301', - wal_mode = 'none' -} - -box.once('v1', function() - box.schema.user.grant('guest', 'read,write,execute', 'universe') - - local s = box.schema.create_space('tester') - s:create_index('primary') - s:format({ - {name='id', type='unsigned'}, - {name='text', type='string'}, - }) -end) - - -function f(...) - return ... -end - - -local x = 0 - -function infinite_push_loop() - local fiber = require('fiber') - while true do - fiber.sleep(2) - x = x + 1 - box.session.push(x) - end -end diff --git a/temp/demo/demo.py b/temp/demo/demo.py deleted file mode 100644 index 4515d6f..0000000 --- a/temp/demo/demo.py +++ /dev/null @@ -1,55 +0,0 @@ -# import asyncio -# -# import asynctnt -# -# -# async def main(): -# conn = asynctnt.Connection(host='127.0.0.1', port=3301) -# await conn.connect() -# -# for i in range(1, 11): -# await conn.replace('tester', [i, 'hello{}'.format(i)]) -# -# data = await conn.select('tester', []) -# first_tuple = data[0] -# print('tuple:', first_tuple) -# print(f'tuple[0]: {first_tuple[0]}; tuple["id"]: {first_tuple["id"]}') -# print(f'tuple[1]: {first_tuple[1]}; tuple["name"]: {first_tuple["name"]}') -# -# await conn.disconnect() -# -# -# asyncio.run(main()) - - -import asyncio -from pprint import pprint - -import asynctnt -import logging - -# logging.basicConfig(level=logging.DEBUG) - -async def main(): - conn = asynctnt.Connection(host='127.0.0.1', port=3301) - await conn.connect() - - # await conn.sql(""" - # create table users ( - # id int primary key, - # name text - # ) - # """) - # await conn.sql("insert into users (id, name) values (1, 'James Bond')") - # await conn.sql("insert into users (id, name) values (2, 'Ethan Hunt')") - # data = await conn.sql('select * from users') - # - # for row in data: - # print(row) - # - # await conn.disconnect() - - print(await conn.sql("select * from users", parse_metadata=False)) - - -asyncio.run(main()) \ No newline at end of file diff --git a/temp/demo/init.lua b/temp/demo/init.lua deleted file mode 100644 index c1ac5a2..0000000 --- a/temp/demo/init.lua +++ /dev/null @@ -1,27 +0,0 @@ -box.cfg { - listen = '0.0.0.0:3301' -} - -box.once('v1', function() - pcall(box.schema.user.grant, 'guest', 'read,write,execute', 'universe') - - local s = box.schema.create_space('tester') - s:create_index('primary') - s:format({ - {name='id', type='unsigned'}, - {name='name', type='string'}, - }) -end) - -box.once('v2', function() - pcall(box.schema.user.grant, 'guest', 'read,write,execute', 'universe') - - box.execute([[ - create table users ( - id int primary key, - name text - ) - ]]) -end) - -require 'console'.start() diff --git a/temp/instance.py b/temp/instance.py deleted file mode 100644 index d65e74c..0000000 --- a/temp/instance.py +++ /dev/null @@ -1,53 +0,0 @@ -import asyncio -# import uvloop; asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) - -import logging - -import sys - -from asynctnt.instance import TarantoolAsyncInstance, \ - TarantoolSyncInstance - -logging.basicConfig(level=logging.DEBUG) - -t = TarantoolSyncInstance( - host='unix/', - port='/tmp/_mytnt.sock', - console_host='127.0.0.1', - applua=open('../tests/files/app.lua').read()) - -t.start() -t.stop() - - -sys.exit(0) - - -async def main(t, loop): - await t.start() - data = await t.command("box.info.status") - print(data) - await t.stop() - - await t.start() - data = await t.command("box.info.status") - print(data) - await t.stop() - # await t.wait_stopped() - -event_loop = asyncio.get_event_loop() -asyncio.get_child_watcher().attach_loop(event_loop) - -t = TarantoolAsyncInstance( - host='unix/', - port='/tmp/_mytnt.sock', - console_host='127.0.0.1', - applua=open('../tests/files/app.lua').read()) -try: - event_loop.run_until_complete(main(t, event_loop)) - # event_loop.run_until_complete(t.start()) - # event_loop.run_until_complete(t.wait_stopped()) -except KeyboardInterrupt: - event_loop.run_until_complete(t.stop()) -finally: - event_loop.close() diff --git a/temp/memleak.py b/temp/memleak.py deleted file mode 100644 index 50b9474..0000000 --- a/temp/memleak.py +++ /dev/null @@ -1,24 +0,0 @@ -import asyncio -import logging -import sys - -import asynctnt - -logging.basicConfig(level=logging.DEBUG) - - -async def main(): - c = asynctnt.Connection( - host='localhost', - port=3305, - connect_timeout=5, - request_timeout=5, - reconnect_timeout=1/3, - ) - async with c: - while True: - res = await c.eval('local t ={}; for i=1,1000000 do t[i] = {i + 0.03} end; return t') - print(sys.getrefcount(res.body[0][-1])) - - -asyncio.run(main()) diff --git a/temp/push.py b/temp/push.py deleted file mode 100644 index 81c3870..0000000 --- a/temp/push.py +++ /dev/null @@ -1,22 +0,0 @@ -import asyncio -import time - -import asynctnt -import logging - -logging.basicConfig(level=logging.DEBUG) - - -async def run(): - conn = asynctnt.Connection(host='127.0.0.1', port=3305) - await conn.connect() - - fut = conn.call("asyncaction", push_subscribe=True) - async for row in asynctnt.PushIterator(fut): - print('wow', row) - - print(await fut) - # await asyncio.sleep(20) - -loop = asyncio.get_event_loop() -loop.run_until_complete(run()) diff --git a/temp/resp.py b/temp/resp.py deleted file mode 100644 index fa0bd28..0000000 --- a/temp/resp.py +++ /dev/null @@ -1,20 +0,0 @@ -import asynctnt -import asyncio - - -async def f(): - c = await asynctnt.connect() - c.select(281) - - -loop = asyncio.get_event_loop() -res = loop.run_until_complete(f()) - -t = res[0] -print(type(res), len(res)) -print(type(t), len(t)) - -print(t) -print(list(t)) -for k, v in t.items(): - print(k , v) diff --git a/temp/schema_fail.py b/temp/schema_fail.py deleted file mode 100644 index 8d2e3f7..0000000 --- a/temp/schema_fail.py +++ /dev/null @@ -1,22 +0,0 @@ -import asyncio -import asynctnt - -import logging - -logging.basicConfig(level=logging.DEBUG) - - -async def main(): - conn = asynctnt.Connection(host='127.0.0.1', port=3301, username="storage", password="passw0rd") - await conn.connect() - logging.info('connected') - - await conn.call('box.schema.space.create', ['geo', {"if_not_exists": True}]) - logging.info('space_create') - await conn.call('box.space.geo:format', [[{"name": "id", "type": "string"}]]) - logging.info('space_format') - - await conn.disconnect() - - -asyncio.run(main()) diff --git a/temp/sql.py b/temp/sql.py deleted file mode 100644 index 6f9fd18..0000000 --- a/temp/sql.py +++ /dev/null @@ -1,20 +0,0 @@ -import asyncio -import time - -import asynctnt -import logging - -logging.basicConfig(level=logging.DEBUG) - - -async def run(): - conn = asynctnt.Connection(host='127.0.0.1', port=3305) - await conn.connect() - - resp = await conn.sql("insert into s (id, name) values (1, 'one')") - # resp = await conn.sql("select * from s") - print(resp) - print(resp.rowcount) - -loop = asyncio.get_event_loop() -loop.run_until_complete(run()) diff --git a/temp/time_connect.py b/temp/time_connect.py deleted file mode 100644 index ba77c99..0000000 --- a/temp/time_connect.py +++ /dev/null @@ -1,67 +0,0 @@ -import asyncio -import time - -import asynctnt -import logging - -logging.basicConfig(level=logging.DEBUG) - - -async def run(): - conn = asynctnt.Connection(host='127.0.0.1', port=3305) - begin = time.time() - n = 1000 - for i in range(n): - await conn.connect() - await conn.disconnect() - - dt = time.time() - begin - print('Total: {}'.format(dt)) - print('1 connect+disconnect: {}'.format(dt / n)) - - -async def run2(): - conn = asynctnt.Connection(host='127.0.0.1', port=3305) - await conn.connect() - begin = time.time() - n = 10000 - for i in range(n): - await conn.eval('return box.info') - - dt = time.time() - begin - print('Total: {}'.format(dt)) - print('1 ping: {}'.format(dt / n)) - - -async def run3(): - conn = asynctnt.Connection(host='127.0.0.1', port=3301) - await conn.connect() - - begin = time.time() - n = 10000 - for i in range(n): - await conn.select('S') - dt = time.time() - begin - print('Total: {}'.format(dt)) - print('1 select: {}'.format(dt / n)) - - -async def run4(): - conn = asynctnt.Connection(host='127.0.0.1', port=3301) - await conn.connect() - - begin = time.time() - n = 10000 - for i in range(n): - resp = await conn.sql('select * from s') - dt = time.time() - begin - print('Total: {}'.format(dt)) - print('1 sql: {}'.format(dt / n)) - - print(resp.encoding) - print(resp) - print(list(resp)) - -loop = asyncio.get_event_loop() -loop.run_until_complete(run3()) -loop.run_until_complete(run4()) diff --git a/temp/with_crash.py b/temp/with_crash.py deleted file mode 100644 index d45fa5a..0000000 --- a/temp/with_crash.py +++ /dev/null @@ -1,33 +0,0 @@ -import asynctnt -import asyncio -import logging -from asynctnt.exceptions import TarantoolNotConnectedError - -logging.basicConfig(level=logging.DEBUG) - - -async def main(): - c = asynctnt.Connection( - host='localhost', - port=3305, - connect_timeout=5, - request_timeout=5, - reconnect_timeout=1/3, - ) - try: - while True: - if not c.is_connected: - print('started connecting') - await c.connect() # <------------- Hangs here after the Tarantool instance crashes - print('connected') - try: - input('press any key to segfalt...') - await c.eval('''require('ffi').cast('char *', 0)[0] = 48''') - except TarantoolNotConnectedError as e: - print('EXCEPTION:', e.__class__.__name__, e) - finally: - await c.disconnect() - print('disconnected') - - -asyncio.run(main()) \ No newline at end of file