diff --git a/nebula2/gclient/net/Connection.py b/nebula2/gclient/net/Connection.py index 700a3874..9cd7e68e 100644 --- a/nebula2/gclient/net/Connection.py +++ b/nebula2/gclient/net/Connection.py @@ -122,8 +122,17 @@ def execute(self, session_id, stmt): :param stmt: the ngql :return: ExecutionResponse """ + return self.execute_parameter(session_id, stmt, None) + + def execute_parameter(self, session_id, stmt, params): + """execute interface with session_id and ngql + :param session_id: the session id get from result of authenticate interface + :param stmt: the ngql + :param params: parameter map + :return: ExecutionResponse + """ try: - resp = self._connection.execute(session_id, stmt) + resp = self._connection.executeWithParameter(session_id, stmt, params) return resp except Exception as te: if isinstance(te, TTransportException): @@ -141,13 +150,21 @@ def execute(self, session_id, stmt): raise def execute_json(self, session_id, stmt): + """execute_json interface with session_id and ngql + :param session_id: the session id get from result of authenticate interface + :param stmt: the ngql + :return: string json representing the execution result + """ + return self.execute_json_with_parameter(session_id, stmt, None) + + def execute_json_with_parameter(self, session_id, stmt, params): """execute_json interface with session_id and ngql :param session_id: the session id get from result of authenticate interface :param stmt: the ngql :return: string json representing the execution result """ try: - resp = self._connection.executeJson(session_id, stmt) + resp = self._connection.executeJsonWithParameter(session_id, stmt) return resp except Exception as te: if isinstance(te, TTransportException): diff --git a/nebula2/gclient/net/Session.py b/nebula2/gclient/net/Session.py index 6e9d48a7..d1155cdb 100644 --- a/nebula2/gclient/net/Session.py +++ b/nebula2/gclient/net/Session.py @@ -26,17 +26,17 @@ def __init__(self, connection, auth_result: AuthResult, pool, retry_connect=True self._pool = pool self._retry_connect = retry_connect - def execute(self, stmt): + def execute_parameter(self, stmt, params): """execute statement - :param stmt: the ngql + :param params: parameter map :return: ResultSet """ if self._connection is None: raise RuntimeError('The session has released') try: start_time = time.time() - resp = self._connection.execute(self._session_id, stmt) + resp = self._connection.execute_parameter(self._session_id, stmt, params) end_time = time.time() return ResultSet( resp, @@ -52,7 +52,9 @@ def execute(self, stmt): raise IOErrorException( IOErrorException.E_ALL_BROKEN, ie.message ) - resp = self._connection.execute(self._session_id, stmt) + resp = self._connection.execute_parameter( + self._session_id, stmt, params + ) end_time = time.time() return ResultSet( resp, @@ -63,6 +65,14 @@ def execute(self, stmt): except Exception: raise + def execute(self, stmt): + """execute statement + + :param stmt: the ngql + :return: ResultSet + """ + return self.execute_parameter(stmt, None) + def execute_json(self, stmt): """execute statement and return the result as a JSON string Date and Datetime will be returned in UTC @@ -124,10 +134,76 @@ def execute_json(self, stmt): :param stmt: the ngql :return: JSON string """ + return self.execute_json_with_parameter(stmt, None) + + def execute_json_with_parameter(self, stmt, params): + """execute statement and return the result as a JSON string + Date and Datetime will be returned in UTC + JSON struct: + { + "results": [ + { + "columns": [], + "data": [ + { + "row": [ + "row-data" + ], + "meta": [ + "metadata" + ] + } + ], + "latencyInUs": 0, + "spaceName": "", + "planDesc ": { + "planNodeDescs": [ + { + "name": "", + "id": 0, + "outputVar": "", + "description": { + "key": "" + }, + "profiles": [ + { + "rows": 1, + "execDurationInUs": 0, + "totalDurationInUs": 0, + "otherStats": {} + } + ], + "branchInfo": { + "isDoBranch": false, + "conditionNodeId": -1 + }, + "dependencies": [] + } + ], + "nodeIndexMap": {}, + "format": "", + "optimize_time_in_us": 0 + }, + "comment ": "" + } + ], + "errors": [ + { + "code": 0, + "message": "" + } + ] + } + :param stmt: the ngql + :param params: parameter map + :return: JSON string + """ if self._connection is None: raise RuntimeError('The session has released') try: - resp_json = self._connection.execute_json(self._session_id, stmt) + resp_json = self._connection.execute_json_with_parameter( + self._session_id, stmt, params + ) return resp_json except IOErrorException as ie: if ie.type == IOErrorException.E_CONNECT_BROKEN: @@ -138,7 +214,9 @@ def execute_json(self, stmt): raise IOErrorException( IOErrorException.E_ALL_BROKEN, ie.message ) - resp_json = self._connection.execute_json(self._session_id, stmt) + resp_json = self._connection.execute_json_with_parameter( + self._session_id, stmt, params + ) return resp_json raise except Exception: diff --git a/tests/test_parameter.py b/tests/test_parameter.py new file mode 100644 index 00000000..32e4e22d --- /dev/null +++ b/tests/test_parameter.py @@ -0,0 +1,143 @@ +#!/usr/bin/env python +# --coding:utf-8-- + +# Copyright (c) 2021 vesoft inc. All rights reserved. +# +# This source code is licensed under Apache 2.0 License. + +import time +import json + +from nebula2.gclient.net import ConnectionPool +from nebula2.Config import Config +from nebula2.common import * +from unittest import TestCase + + +class TestParameter(TestCase): + @classmethod + def setUp(self) -> None: + super().setUpClass() + self.user_name = 'root' + self.password = 'nebula' + self.configs = Config() + self.configs.max_connection_pool_size = 6 + self.pool = ConnectionPool() + self.pool.init([('127.0.0.1', 9671)], self.configs) + + # get session from the pool + client = self.pool.get_session('root', 'nebula') + assert client is not None + + # prepare space and insert data + resp = client.execute( + 'CREATE SPACE IF NOT EXISTS parameter_test(vid_type=FIXED_STRING(30));USE parameter_test' + ) + assert resp.is_succeeded(), resp.error_msg() + resp = client.execute( + 'CREATE TAG IF NOT EXISTS person(name string, age int);' + 'CREATE EDGE like (likeness double);' + ) + + time.sleep(6) + # insert data need to sleep after create schema + resp = client.execute('CREATE TAG INDEX person_age_index on person(age)') + time.sleep(6) + # insert vertex + resp = client.execute( + 'INSERT VERTEX person(name, age) VALUES "Bob":("Bob", 10), "Lily":("Lily", 9)' + ) + assert resp.is_succeeded(), resp.error_msg() + # insert edges + resp = client.execute('INSERT EDGE like(likeness) VALUES "Bob"->"Lily":(80.0);') + assert resp.is_succeeded(), resp.error_msg() + resp = client.execute('REBUILD TAG INDEX person_age_index') + assert resp.is_succeeded(), resp.error_msg() + + # prepare parameters + bval = ttypes.Value() + bval.set_bVal(True) + ival = ttypes.Value() + ival.set_iVal(3) + sval = ttypes.Value() + sval.set_sVal("Bob") + self.params = {"p1": ival, "p2": bval, "p3": sval} + + assert self.pool.connects() == 1 + assert self.pool.in_used_connects() == 1 + + def test_parameter(self): + try: + # get session from the pool + client = self.pool.get_session('root', 'nebula') + assert client is not None + resp = client.execute_parameter( + 'USE parameter_test', + self.params, + ) + assert resp.is_succeeded() + # test basic parameter + resp = client.execute_parameter( + 'RETURN abs($p1)+3 AS col1, (toBoolean($p2) and false) AS col2, toLower($p3)+1 AS col3', + self.params, + ) + assert resp.is_succeeded(), resp.error_msg() + assert 1 == resp.row_size() + names = ['col1', 'col2', 'col3'] + assert names == resp.keys() + assert 6 == resp.row_values(0)[0].as_int() + assert False == resp.row_values(0)[1].as_bool() + assert 'bob1' == resp.row_values(0)[2].as_string() + # test cypher parameter + resp = client.execute_parameter( + 'MATCH (v:person)--() WHERE v.age>abs($p1)+3 RETURN v.name AS vname,v.age AS vage ORDER BY vage, $p3 LIMIT $p1+1', + self.params, + ) + assert resp.is_succeeded(), resp.error_msg() + assert 2 == resp.row_size() + names = ['vname', 'vage'] + assert names == resp.keys() + assert 'Lily' == resp.row_values(0)[0].as_string() + assert 9 == resp.row_values(0)[1].as_int() + assert 'Bob' == resp.row_values(1)[0].as_string() + assert 10 == resp.row_values(1)[1].as_int() + # test ngql parameter + resp = client.execute_parameter( + '$p1=go from "Bob" over like yield like._dst;', + self.params, + ) + assert not resp.is_succeeded() + resp = client.execute_parameter( + 'go from $p3 over like yield like._dst;', + self.params, + ) + assert not resp.is_succeeded() + resp = client.execute_parameter( + 'fetch prop on person $p3 yield vertex as v', + self.params, + ) + assert not resp.is_succeeded() + resp = client.execute_parameter( + 'find all path from $p3 to "Yao Ming" over like yield path as p', + self.params, + ) + assert not resp.is_succeeded() + resp = client.execute_parameter( + 'get subgraph from $p3 both like yield vertices as v', + self.params, + ) + assert not resp.is_succeeded() + resp = client.execute_parameter( + 'go 3 steps from \"Bob\" over like yield like._dst limit [1,$p1,3]', + self.params, + ) + assert not resp.is_succeeded() + + except Exception as e: + assert False, e + + def tearDown(self) -> None: + client = self.pool.get_session('root', 'nebula') + assert client is not None + resp = client.execute('DROP SPACE parameter_test') + assert resp.is_succeeded(), resp.error_msg()