Skip to content

Commit

Permalink
support parameter(variable)
Browse files Browse the repository at this point in the history
  • Loading branch information
czpmango committed Dec 23, 2021
1 parent 3c2a13b commit 5f2a6d5
Show file tree
Hide file tree
Showing 3 changed files with 256 additions and 8 deletions.
31 changes: 29 additions & 2 deletions nebula2/gclient/net/Connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,17 @@ def execute(self, session_id, stmt):
:param stmt: the ngql
:return: ExecutionResponse
"""
return self.execute_parameter(session_id, stmt, None)

def execute_parameter(self, session_id, stmt, params):
"""execute interface with session_id and ngql
:param session_id: the session id get from result of authenticate interface
:param stmt: the ngql
:param params: parameter map
:return: ExecutionResponse
"""
try:
resp = self._connection.execute(session_id, stmt)
resp = self._connection.executeWithParameter(session_id, stmt, params)
return resp
except Exception as te:
if isinstance(te, TTransportException):
Expand All @@ -141,13 +150,21 @@ def execute(self, session_id, stmt):
raise

def execute_json(self, session_id, stmt):
"""execute_json interface with session_id and ngql
:param session_id: the session id get from result of authenticate interface
:param stmt: the ngql
:return: string json representing the execution result
"""
return self.execute_json_with_parameter(session_id, stmt, None)

def execute_json_with_parameter(self, session_id, stmt, params):
"""execute_json interface with session_id and ngql
:param session_id: the session id get from result of authenticate interface
:param stmt: the ngql
:return: string json representing the execution result
"""
try:
resp = self._connection.executeJson(session_id, stmt)
resp = self._connection.executeJsonWithParameter(session_id, stmt)
return resp
except Exception as te:
if isinstance(te, TTransportException):
Expand Down Expand Up @@ -198,6 +215,16 @@ def ping(self):
except Exception:
return False

def ping_parameter(self):
"""check the connection if ok
:return: True or False
"""
try:
resp = self._connection.execute(0, 'YIELD 1;', None)
return True
except Exception:
return False

def reset(self):
"""reset the idletime
Expand Down
90 changes: 84 additions & 6 deletions nebula2/gclient/net/Session.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,17 @@ def __init__(self, connection, auth_result: AuthResult, pool, retry_connect=True
self._pool = pool
self._retry_connect = retry_connect

def execute(self, stmt):
def execute_parameter(self, stmt, params):
"""execute statement
:param stmt: the ngql
:param params: parameter map
:return: ResultSet
"""
if self._connection is None:
raise RuntimeError('The session has released')
try:
start_time = time.time()
resp = self._connection.execute(self._session_id, stmt)
resp = self._connection.execute_parameter(self._session_id, stmt, params)
end_time = time.time()
return ResultSet(
resp,
Expand All @@ -52,7 +52,9 @@ def execute(self, stmt):
raise IOErrorException(
IOErrorException.E_ALL_BROKEN, ie.message
)
resp = self._connection.execute(self._session_id, stmt)
resp = self._connection.execute_parameter(
self._session_id, stmt, params
)
end_time = time.time()
return ResultSet(
resp,
Expand All @@ -63,6 +65,14 @@ def execute(self, stmt):
except Exception:
raise

def execute(self, stmt):
"""execute statement
:param stmt: the ngql
:return: ResultSet
"""
return self.execute_parameter(stmt, None)

def execute_json(self, stmt):
"""execute statement and return the result as a JSON string
Date and Datetime will be returned in UTC
Expand Down Expand Up @@ -124,10 +134,76 @@ def execute_json(self, stmt):
:param stmt: the ngql
:return: JSON string
"""
return self.execute_json_with_parameter(stmt, None)

def execute_json_with_parameter(self, stmt, params):
"""execute statement and return the result as a JSON string
Date and Datetime will be returned in UTC
JSON struct:
{
"results": [
{
"columns": [],
"data": [
{
"row": [
"row-data"
],
"meta": [
"metadata"
]
}
],
"latencyInUs": 0,
"spaceName": "",
"planDesc ": {
"planNodeDescs": [
{
"name": "",
"id": 0,
"outputVar": "",
"description": {
"key": ""
},
"profiles": [
{
"rows": 1,
"execDurationInUs": 0,
"totalDurationInUs": 0,
"otherStats": {}
}
],
"branchInfo": {
"isDoBranch": false,
"conditionNodeId": -1
},
"dependencies": []
}
],
"nodeIndexMap": {},
"format": "",
"optimize_time_in_us": 0
},
"comment ": ""
}
],
"errors": [
{
"code": 0,
"message": ""
}
]
}
:param stmt: the ngql
:param params: parameter map
:return: JSON string
"""
if self._connection is None:
raise RuntimeError('The session has released')
try:
resp_json = self._connection.execute_json(self._session_id, stmt)
resp_json = self._connection.execute_json_with_parameter(
self._session_id, stmt, params
)
return resp_json
except IOErrorException as ie:
if ie.type == IOErrorException.E_CONNECT_BROKEN:
Expand All @@ -138,7 +214,9 @@ def execute_json(self, stmt):
raise IOErrorException(
IOErrorException.E_ALL_BROKEN, ie.message
)
resp_json = self._connection.execute_json(self._session_id, stmt)
resp_json = self._connection.execute_json_with_parameter(
self._session_id, stmt, params
)
return resp_json
raise
except Exception:
Expand Down
143 changes: 143 additions & 0 deletions tests/test_parameter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
#!/usr/bin/env python
# --coding:utf-8--

# Copyright (c) 2021 vesoft inc. All rights reserved.
#
# This source code is licensed under Apache 2.0 License.

import time
import json

from nebula2.gclient.net import ConnectionPool
from nebula2.Config import Config
from nebula2.common import *
from unittest import TestCase


class TestParameter(TestCase):
@classmethod
def setUp(self) -> None:
super().setUpClass()
self.user_name = 'root'
self.password = 'nebula'
self.configs = Config()
self.configs.max_connection_pool_size = 6
self.pool = ConnectionPool()
self.pool.init([('127.0.0.1', 9671)], self.configs)

# get session from the pool
client = self.pool.get_session('root', 'nebula')
assert client is not None

# prepare space and insert data
resp = client.execute(
'CREATE SPACE IF NOT EXISTS parameter_test(vid_type=FIXED_STRING(30));USE parameter_test'
)
assert resp.is_succeeded(), resp.error_msg()
resp = client.execute(
'CREATE TAG IF NOT EXISTS person(name string, age int);'
'CREATE EDGE like (likeness double);'
)

time.sleep(6)
# insert data need to sleep after create schema
resp = client.execute('CREATE TAG INDEX person_age_index on person(age)')
time.sleep(6)
# insert vertex
resp = client.execute(
'INSERT VERTEX person(name, age) VALUES "Bob":("Bob", 10), "Lily":("Lily", 9)'
)
assert resp.is_succeeded(), resp.error_msg()
# insert edges
resp = client.execute('INSERT EDGE like(likeness) VALUES "Bob"->"Lily":(80.0);')
assert resp.is_succeeded(), resp.error_msg()
resp = client.execute('REBUILD TAG INDEX person_age_index')
assert resp.is_succeeded(), resp.error_msg()

# prepare parameters
bval = ttypes.Value()
bval.set_bVal(True)
ival = ttypes.Value()
ival.set_iVal(3)
sval = ttypes.Value()
sval.set_sVal("Bob")
self.params = {"p1": ival, "p2": bval, "p3": sval}

assert self.pool.connects() == 1
assert self.pool.in_used_connects() == 1

def test_parameter(self):
try:
# get session from the pool
client = self.pool.get_session('root', 'nebula')
assert client is not None
resp = client.execute_parameter(
'USE parameter_test',
self.params,
)
assert resp.is_succeeded()
# test basic parameter
resp = client.execute_parameter(
'RETURN abs($p1)+3 AS col1, (toBoolean($p2) and false) AS col2, toLower($p3)+1 AS col3',
self.params,
)
assert resp.is_succeeded(), resp.error_msg()
assert 1 == resp.row_size()
names = ['col1', 'col2', 'col3']
assert names == resp.keys()
assert 6 == resp.row_values(0)[0].as_int()
assert False == resp.row_values(0)[1].as_bool()
assert 'bob1' == resp.row_values(0)[2].as_string()
# test cypher parameter
resp = client.execute_parameter(
'MATCH (v:person)--() WHERE v.age>abs($p1)+3 RETURN v.name AS vname,v.age AS vage ORDER BY vage, $p3 LIMIT $p1+1',
self.params,
)
assert resp.is_succeeded(), resp.error_msg()
assert 2 == resp.row_size()
names = ['vname', 'vage']
assert names == resp.keys()
assert 'Lily' == resp.row_values(0)[0].as_string()
assert 9 == resp.row_values(0)[1].as_int()
assert 'Bob' == resp.row_values(1)[0].as_string()
assert 10 == resp.row_values(1)[1].as_int()
# test ngql parameter
resp = client.execute_parameter(
'$p1=go from "Bob" over like yield like._dst;',
self.params,
)
assert not resp.is_succeeded()
resp = client.execute_parameter(
'go from $p3 over like yield like._dst;',
self.params,
)
assert not resp.is_succeeded()
resp = client.execute_parameter(
'fetch prop on person $p3 yield vertex as v',
self.params,
)
assert not resp.is_succeeded()
resp = client.execute_parameter(
'find all path from $p3 to "Yao Ming" over like yield path as p',
self.params,
)
assert not resp.is_succeeded()
resp = client.execute_parameter(
'get subgraph from $p3 both like yield vertices as v',
self.params,
)
assert not resp.is_succeeded()
resp = client.execute_parameter(
'go 3 steps from \"Bob\" over like yield like._dst limit [1,$p1,3]',
self.params,
)
assert not resp.is_succeeded()

except Exception as e:
assert False, e

def tearDown(self) -> None:
client = self.pool.get_session('root', 'nebula')
assert client is not None
resp = client.execute('DROP SPACE parameter_test')
assert resp.is_succeeded(), resp.error_msg()

0 comments on commit 5f2a6d5

Please sign in to comment.