Skip to content

Commit

Permalink
support parameter(variable)
Browse files Browse the repository at this point in the history
update thrift

small change

rename package name

small change

small change
  • Loading branch information
czpmango committed Dec 21, 2021
1 parent 876c2a6 commit fa70573
Show file tree
Hide file tree
Showing 4 changed files with 229 additions and 13 deletions.
56 changes: 44 additions & 12 deletions example/GraphClientSimpleExample.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,10 @@
#
# This source code is licensed under Apache 2.0 License.


import time
import json

from nebula2.gclient.net import ConnectionPool

from nebula2.Config import Config
from nebula2.common import *
from FormatResp import print_resp
Expand All @@ -28,38 +26,72 @@
client = connection_pool.get_session('root', 'nebula')
assert client is not None

# get the result in json format
resp_json = client.execute_json("yield 1")
json_obj = json.loads(resp_json)
print(json.dumps(json_obj, indent=2, sort_keys=True))

client.execute(
'CREATE SPACE IF NOT EXISTS test(vid_type=FIXED_STRING(30)); USE test;'
# prepare space and insert data
print("\n Prepare space data...\n")
resp = client.execute(
'CREATE SPACE IF NOT EXISTS test(vid_type=FIXED_STRING(30));'
)
assert resp.is_succeeded(), resp.error_msg()
time.sleep(6)
resp = client.execute('USE test;')
assert resp.is_succeeded(), resp.error_msg()
resp = client.execute(
'CREATE TAG IF NOT EXISTS person(name string, age int);'
'CREATE EDGE like (likeness double);'
)

time.sleep(6)
# insert data need to sleep after create schema
resp = client.execute('CREATE TAG INDEX person_age_index on person(age)')
time.sleep(6)

# insert vertex
resp = client.execute(
'INSERT VERTEX person(name, age) VALUES "Bob":("Bob", 10), "Lily":("Lily", 9)'
)
assert resp.is_succeeded(), resp.error_msg()

# insert edges
resp = client.execute('INSERT EDGE like(likeness) VALUES "Bob"->"Lily":(80.0);')
assert resp.is_succeeded(), resp.error_msg()
resp = client.execute('REBUILD TAG INDEX person_age_index')
assert resp.is_succeeded(), resp.error_msg()
print("\n Data preparation is completed\n")

# test fetch prop on statement
print("\n Test fetch prop on statement...\n")
resp = client.execute('FETCH PROP ON person "Bob" YIELD vertex as node')
assert resp.is_succeeded(), resp.error_msg()
print_resp(resp)

resp = client.execute('FETCH PROP ON like "Bob"->"Lily" YIELD edge as e')
assert resp.is_succeeded(), resp.error_msg()
print_resp(resp)

# get the result in json format
print("\n Test execute_json interface...\n")
resp_json = client.execute_json("yield 1")
json_obj = json.loads(resp_json)
print(json.dumps(json_obj, indent=2, sort_keys=True))

# test parameter interface
print("\n Test cypher parameter...\n")
bval = ttypes.Value()
bval.set_bVal(True)
ival = ttypes.Value()
ival.set_iVal(3)
sval = ttypes.Value()
sval.set_sVal("Cypher Parameter")
params = {"p1": ival, "p2": bval, "p3": sval}
resp = client.execute_parameter(
'RETURN abs($p1)+3, toBoolean($p2) and false, toLower($p3)+1', params
)
assert resp.is_succeeded(), resp.error_msg()
print_resp(resp)
resp = client.execute_parameter(
'MATCH (v:person)--() WHERE v.age>abs($p1)+3 RETURN v,v.age AS vage ORDER BY vage, $p3 LIMIT $p1+1',
params,
)
assert resp.is_succeeded(), resp.error_msg()
print_resp(resp)

# drop space
resp = client.execute('DROP SPACE test')
assert resp.is_succeeded(), resp.error_msg()
Expand Down
59 changes: 59 additions & 0 deletions nebula2/gclient/net/Connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,31 @@ def execute(self, session_id, stmt):
raise IOErrorException(IOErrorException.E_UNKNOWN, te.message)
raise

def execute_parameter(self, session_id, stmt, params):
"""execute interface with session_id and ngql
:param session_id: the session id get from result of authenticate interface
:param stmt: the ngql
:param params: parameter map
:return: ExecutionResponse
"""
try:
resp = self._connection.executeWithParameter(session_id, stmt, params)
return resp
except Exception as te:
if isinstance(te, TTransportException):
if te.message.find("timed out") > 0:
self._reopen()
raise IOErrorException(IOErrorException.E_TIMEOUT, te.message)
elif te.type == TTransportException.END_OF_FILE:
raise IOErrorException(
IOErrorException.E_CONNECT_BROKEN, te.message
)
elif te.type == TTransportException.NOT_OPEN:
raise IOErrorException(IOErrorException.E_NOT_OPEN, te.message)
else:
raise IOErrorException(IOErrorException.E_UNKNOWN, te.message)
raise

def execute_json(self, session_id, stmt):
"""execute_json interface with session_id and ngql
:param session_id: the session id get from result of authenticate interface
Expand All @@ -164,6 +189,30 @@ def execute_json(self, session_id, stmt):
raise IOErrorException(IOErrorException.E_UNKNOWN, te.message)
raise

def execute_json_with_parameter(self, session_id, stmt, params):
"""execute_json interface with session_id and ngql
:param session_id: the session id get from result of authenticate interface
:param stmt: the ngql
:return: string json representing the execution result
"""
try:
resp = self._connection.executeJsonWithParameter(session_id, stmt)
return resp
except Exception as te:
if isinstance(te, TTransportException):
if te.message.find("timed out") > 0:
self._reopen()
raise IOErrorException(IOErrorException.E_TIMEOUT, te.message)
elif te.type == TTransportException.END_OF_FILE:
raise IOErrorException(
IOErrorException.E_CONNECT_BROKEN, te.message
)
elif te.type == TTransportException.NOT_OPEN:
raise IOErrorException(IOErrorException.E_NOT_OPEN, te.message)
else:
raise IOErrorException(IOErrorException.E_UNKNOWN, te.message)
raise

def signout(self, session_id):
"""tells the graphd can release the session info
Expand Down Expand Up @@ -198,6 +247,16 @@ def ping(self):
except Exception:
return False

def ping_parameter(self):
"""check the connection if ok
:return: True or False
"""
try:
resp = self._connection.execute(0, 'YIELD 1;', None)
return True
except Exception:
return False

def reset(self):
"""reset the idletime
Expand Down
125 changes: 125 additions & 0 deletions nebula2/gclient/net/Session.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,45 @@ def execute(self, stmt):
except Exception:
raise

def execute_parameter(self, stmt, params):
"""execute statement
:param stmt: the ngql
:param params: parameter map
:return: ResultSet
"""
if self._connection is None:
raise RuntimeError('The session has released')
try:
start_time = time.time()
resp = self._connection.execute_parameter(self._session_id, stmt, params)
end_time = time.time()
return ResultSet(
resp,
all_latency=int((end_time - start_time) * 1000000),
timezone_offset=self._timezone_offset,
)
except IOErrorException as ie:
if ie.type == IOErrorException.E_CONNECT_BROKEN:
self._pool.update_servers_status()
if self._retry_connect:
if not self._reconnect():
logging.warning('Retry connect failed')
raise IOErrorException(
IOErrorException.E_ALL_BROKEN, ie.message
)
resp = self._connection.executeWithParameter(
self._session_id, stmt, params
)
end_time = time.time()
return ResultSet(
resp,
all_latency=int((end_time - start_time) * 1000000),
timezone_offset=self._timezone_offset,
)
raise
except Exception:
raise

def execute_json(self, stmt):
"""execute statement and return the result as a JSON string
Date and Datetime will be returned in UTC
Expand Down Expand Up @@ -144,6 +183,92 @@ def execute_json(self, stmt):
except Exception:
raise

def execute_json_with_parameter(self, stmt, params):
"""execute statement and return the result as a JSON string
Date and Datetime will be returned in UTC
JSON struct:
{
"results": [
{
"columns": [],
"data": [
{
"row": [
"row-data"
],
"meta": [
"metadata"
]
}
],
"latencyInUs": 0,
"spaceName": "",
"planDesc ": {
"planNodeDescs": [
{
"name": "",
"id": 0,
"outputVar": "",
"description": {
"key": ""
},
"profiles": [
{
"rows": 1,
"execDurationInUs": 0,
"totalDurationInUs": 0,
"otherStats": {}
}
],
"branchInfo": {
"isDoBranch": false,
"conditionNodeId": -1
},
"dependencies": []
}
],
"nodeIndexMap": {},
"format": "",
"optimize_time_in_us": 0
},
"comment ": ""
}
],
"errors": [
{
"code": 0,
"message": ""
}
]
}
:param stmt: the ngql
:param params: parameter map
:return: JSON string
"""
if self._connection is None:
raise RuntimeError('The session has released')
try:
resp_json = self._connection.execute_json_with_parameter(
self._session_id, stmt, params
)
return resp_json
except IOErrorException as ie:
if ie.type == IOErrorException.E_CONNECT_BROKEN:
self._pool.update_servers_status()
if self._retry_connect:
if not self._reconnect():
logging.warning('Retry connect failed')
raise IOErrorException(
IOErrorException.E_ALL_BROKEN, ie.message
)
resp_json = self._connection.execute_json_with_parameter(
self._session_id, stmt, params
)
return resp_json
raise
except Exception:
raise

def release(self):
"""release the connection to pool, and the session couldn't been use again
Expand Down
2 changes: 1 addition & 1 deletion tests/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def test_2_reconnect(self):
if i == 3:
os.system('docker stop tests_graphd0_1')
os.system('docker stop tests_graphd1_1')
time.sleep(3)
time.sleep(6)
# the session update later, the expect test
# resp = session.execute('SHOW TAGS')
resp = session.execute('SHOW HOSTS')
Expand Down

0 comments on commit fa70573

Please sign in to comment.