-
Notifications
You must be signed in to change notification settings - Fork 93
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Implementation of client side statements that return #1046
Changes from 9 commits
3f3b8cb
8b63b9c
2361dfb
72f6221
27594e5
9108192
f688d54
03f42a2
e603dc3
500b513
00df2ef
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -23,6 +23,7 @@ | |||||
from google.cloud.spanner_v1 import RequestOptions | ||||||
from google.cloud.spanner_v1.session import _get_retry_delay | ||||||
from google.cloud.spanner_v1.snapshot import Snapshot | ||||||
from deprecated import deprecated | ||||||
|
||||||
from google.cloud.spanner_dbapi.checksum import _compare_checksums | ||||||
from google.cloud.spanner_dbapi.checksum import ResultsChecksum | ||||||
|
@@ -35,7 +36,7 @@ | |||||
|
||||||
|
||||||
CLIENT_TRANSACTION_NOT_STARTED_WARNING = ( | ||||||
"This method is non-operational as transaction has not started" | ||||||
"This method is non-operational as transaction has not been started." | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit:
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||||||
) | ||||||
MAX_INTERNAL_RETRIES = 50 | ||||||
|
||||||
|
@@ -107,6 +108,9 @@ def __init__(self, instance, database=None, read_only=False): | |||||
self._staleness = None | ||||||
self.request_priority = None | ||||||
self._transaction_begin_marked = False | ||||||
# whether transaction started at Spanner. This means that we had | ||||||
# made atleast one call to Spanner. | ||||||
self._spanner_transaction_started = False | ||||||
|
||||||
@property | ||||||
def autocommit(self): | ||||||
|
@@ -140,26 +144,27 @@ def database(self): | |||||
return self._database | ||||||
|
||||||
@property | ||||||
def _spanner_transaction_started(self): | ||||||
"""Flag: whether transaction started at Spanner. This means that we had | ||||||
made atleast one call to Spanner. Property client_transaction_started | ||||||
would always be true if this is true as transaction has to start first | ||||||
at clientside than at Spanner | ||||||
def ddl_statements(self): | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this a new property? If so, any specific reason that we are adding this in this PR? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Intellij was giving a suggestion so created it. Let me know if I should revert it? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would in that case at least make it internal (so let it start with an underscore). We should be as conservative as possible when it comes to adding public methods and properties to the interface. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Removed the property as there is no different in accessing a property or a field in python when they both start with underscore and Intellij will give the same warning |
||||||
return self._ddl_statements | ||||||
|
||||||
Returns: | ||||||
bool: True if Spanner transaction started, False otherwise. | ||||||
""" | ||||||
@property | ||||||
def statements(self): | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same for this: Is there a specific reason that we are adding this in this PR? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Intellij was giving a suggestion so created it. Let me know if I should revert it? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See above |
||||||
return self._statements | ||||||
|
||||||
@property | ||||||
def client_transaction_started(self): | ||||||
olavloite marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
return self._client_transaction_started | ||||||
|
||||||
@property | ||||||
@deprecated( | ||||||
reason="This method is deprecated. Use _spanner_transaction_started field" | ||||||
) | ||||||
def inside_transaction(self): | ||||||
return ( | ||||||
self._transaction | ||||||
and not self._transaction.committed | ||||||
and not self._transaction.rolled_back | ||||||
) or (self._snapshot is not None) | ||||||
olavloite marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
|
||||||
@property | ||||||
def inside_transaction(self): | ||||||
"""Deprecated property which won't be supported in future versions. | ||||||
Please use spanner_transaction_started property instead.""" | ||||||
return self._spanner_transaction_started | ||||||
) | ||||||
|
||||||
@property | ||||||
def _client_transaction_started(self): | ||||||
|
@@ -277,7 +282,8 @@ def _release_session(self): | |||||
""" | ||||||
if self.database is None: | ||||||
raise ValueError("Database needs to be passed for this operation") | ||||||
self.database._pool.put(self._session) | ||||||
if self._session is not None: | ||||||
self.database._pool.put(self._session) | ||||||
self._session = None | ||||||
|
||||||
def retry_transaction(self): | ||||||
|
@@ -293,7 +299,7 @@ def retry_transaction(self): | |||||
""" | ||||||
attempt = 0 | ||||||
while True: | ||||||
self._transaction = None | ||||||
self._spanner_transaction_started = False | ||||||
attempt += 1 | ||||||
if attempt > MAX_INTERNAL_RETRIES: | ||||||
raise | ||||||
|
@@ -319,7 +325,6 @@ def _rerun_previous_statements(self): | |||||
status, res = transaction.batch_update(statements) | ||||||
|
||||||
if status.code == ABORTED: | ||||||
self.connection._transaction = None | ||||||
raise Aborted(status.details) | ||||||
|
||||||
retried_checksum = ResultsChecksum() | ||||||
|
@@ -363,6 +368,8 @@ def transaction_checkout(self): | |||||
if not self.read_only and self._client_transaction_started: | ||||||
if not self._spanner_transaction_started: | ||||||
self._transaction = self._session_checkout().transaction() | ||||||
self._snapshot = None | ||||||
self._spanner_transaction_started = True | ||||||
self._transaction.begin() | ||||||
|
||||||
return self._transaction | ||||||
|
@@ -377,11 +384,13 @@ def snapshot_checkout(self): | |||||
:returns: A Cloud Spanner snapshot object, ready to use. | ||||||
""" | ||||||
if self.read_only and self._client_transaction_started: | ||||||
if not self._snapshot: | ||||||
if not self._spanner_transaction_started: | ||||||
self._snapshot = Snapshot( | ||||||
self._session_checkout(), multi_use=True, **self.staleness | ||||||
) | ||||||
self._transaction = None | ||||||
self._snapshot.begin() | ||||||
olavloite marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
self._spanner_transaction_started = True | ||||||
|
||||||
return self._snapshot | ||||||
|
||||||
|
@@ -391,7 +400,7 @@ def close(self): | |||||
The connection will be unusable from this point forward. If the | ||||||
connection has an active transaction, it will be rolled back. | ||||||
""" | ||||||
if self._spanner_transaction_started and not self.read_only: | ||||||
if self._spanner_transaction_started and not self._read_only: | ||||||
self._transaction.rollback() | ||||||
|
||||||
if self._own_pool and self.database: | ||||||
|
@@ -405,13 +414,15 @@ def begin(self): | |||||
Marks the transaction as started. | ||||||
|
||||||
:raises: :class:`InterfaceError`: if this connection is closed. | ||||||
:raises: :class:`OperationalError`: if there is an existing transaction that has begin or is running | ||||||
:raises: :class:`OperationalError`: if there is an existing transaction | ||||||
that has been started | ||||||
""" | ||||||
if self._transaction_begin_marked: | ||||||
raise OperationalError("A transaction has already started") | ||||||
if self._spanner_transaction_started: | ||||||
raise OperationalError( | ||||||
"Beginning a new transaction is not allowed when a transaction is already running" | ||||||
"Beginning a new transaction is not allowed when a transaction " | ||||||
"is already running" | ||||||
) | ||||||
self._transaction_begin_marked = True | ||||||
|
||||||
|
@@ -430,41 +441,37 @@ def commit(self): | |||||
return | ||||||
|
||||||
self.run_prior_DDL_statements() | ||||||
if self._spanner_transaction_started: | ||||||
try: | ||||||
if self.read_only: | ||||||
self._snapshot = None | ||||||
else: | ||||||
self._transaction.commit() | ||||||
|
||||||
self._release_session() | ||||||
self._statements = [] | ||||||
self._transaction_begin_marked = False | ||||||
except Aborted: | ||||||
self.retry_transaction() | ||||||
self.commit() | ||||||
try: | ||||||
if self._spanner_transaction_started and not self._read_only: | ||||||
self._transaction.commit() | ||||||
except Aborted: | ||||||
self.retry_transaction() | ||||||
self.commit() | ||||||
finally: | ||||||
self._release_session() | ||||||
self._statements = [] | ||||||
self._transaction_begin_marked = False | ||||||
self._spanner_transaction_started = False | ||||||
|
||||||
def rollback(self): | ||||||
"""Rolls back any pending transaction. | ||||||
|
||||||
This is a no-op if there is no active client transaction. | ||||||
""" | ||||||
|
||||||
if not self._client_transaction_started: | ||||||
warnings.warn( | ||||||
CLIENT_TRANSACTION_NOT_STARTED_WARNING, UserWarning, stacklevel=2 | ||||||
) | ||||||
return | ||||||
|
||||||
if self._spanner_transaction_started: | ||||||
if self.read_only: | ||||||
self._snapshot = None | ||||||
else: | ||||||
try: | ||||||
if self._spanner_transaction_started and not self._read_only: | ||||||
self._transaction.rollback() | ||||||
|
||||||
finally: | ||||||
self._release_session() | ||||||
self._statements = [] | ||||||
self._transaction_begin_marked = False | ||||||
self._spanner_transaction_started = False | ||||||
|
||||||
@check_not_closed | ||||||
def cursor(self): | ||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done