Skip to content

Commit 25aba4f

Browse files
chalmerloweparthea
authored andcommitted
chore: updates thread handling to avoid race condition in tests
1 parent 80a0fe4 commit 25aba4f

2 files changed

Lines changed: 239 additions & 21 deletions

File tree

packages/google-cloud-spanner/google/cloud/spanner_v1/snapshot.py

Lines changed: 238 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -194,19 +194,20 @@ def __init__(self, session, client_context=None):
194194
self._client_context = _validate_client_context(client_context)
195195
self._execute_sql_request_count: int = 0
196196
self._read_request_count: int = 0
197+
self._begin_request_sent: bool = False
198+
199+
# Identifier for the transaction.
197200
self._transaction_id: Optional[bytes] = None
198201
self._precommit_token: Optional[MultiplexedSessionPrecommitToken] = None
199202
self._lock: CrossSync._Sync_Impl.Lock = CrossSync._Sync_Impl.Lock()
200203

201-
@property
202-
def _resource_info(self):
203-
"""Resource information for metrics labels."""
204-
database = self._session._database
205-
return {
206-
"project": database._instance._client.project,
207-
"instance": database._instance.instance_id,
208-
"database": database.database_id,
209-
}
204+
# Operation within a transaction can be performed using multiple
205+
# threads, so we need to use a lock when updating the transaction.
206+
self._lock: threading.Lock = threading.Lock()
207+
208+
# Event to coordinate concurrent requests beginning the transaction.
209+
# This is used to prevent the "Transaction has not begun" race condition.
210+
self._transaction_begin_event = threading.Event()
210211

211212
def begin(self) -> bytes:
212213
"""Begins a transaction on the database.
@@ -234,12 +235,109 @@ def read(
234235
column_info=None,
235236
lazy_decode=False,
236237
):
237-
"""Perform a ``StreamingRead`` API request for rows in a table."""
238-
if self._read_request_count > 0:
239-
if not self._multi_use:
240-
raise ValueError("Cannot re-use single-use snapshot.")
241-
if self._transaction_id is None:
242-
raise ValueError("Transaction has not begun.")
238+
"""Perform a ``StreamingRead`` API request for rows in a table.
239+
240+
:type table: str
241+
:param table: name of the table from which to fetch data
242+
243+
:type columns: list of str
244+
:param columns: names of columns to be retrieved
245+
246+
:type keyset: :class:`~google.cloud.spanner_v1.keyset.KeySet`
247+
:param keyset: keys / ranges identifying rows to be retrieved
248+
249+
:type index: str
250+
:param index: (Optional) name of index to use, rather than the
251+
table's primary key
252+
253+
:type limit: int
254+
:param limit: (Optional) maximum number of rows to return.
255+
Incompatible with ``partition``.
256+
257+
:type partition: bytes
258+
:param partition: (Optional) one of the partition tokens returned
259+
from :meth:`partition_read`. Incompatible with
260+
``limit``.
261+
262+
:type request_options:
263+
:class:`google.cloud.spanner_v1.types.RequestOptions`
264+
:param request_options:
265+
(Optional) Common options for this request.
266+
If a dict is provided, it must be of the same form as the protobuf
267+
message :class:`~google.cloud.spanner_v1.types.RequestOptions`.
268+
Please note, the `transactionTag` setting will be ignored for
269+
snapshot as it's not supported for read-only transactions.
270+
271+
:type retry: :class:`~google.api_core.retry.Retry`
272+
:param retry: (Optional) The retry settings for this request.
273+
274+
:type timeout: float
275+
:param timeout: (Optional) The timeout for this request.
276+
277+
:type data_boost_enabled:
278+
:param data_boost_enabled:
279+
(Optional) If this is for a partitioned read and this field is
280+
set ``true``, the request will be executed via offline access.
281+
If the field is set to ``true`` but the request does not set
282+
``partition_token``, the API will return an
283+
``INVALID_ARGUMENT`` error.
284+
285+
:type directed_read_options: :class:`~google.cloud.spanner_v1.DirectedReadOptions`
286+
or :class:`dict`
287+
:param directed_read_options: (Optional) Request level option used to set the directed_read_options
288+
for all ReadRequests and ExecuteSqlRequests that indicates which replicas
289+
or regions should be used for non-transactional reads or queries.
290+
291+
:type column_info: dict
292+
:param column_info: (Optional) dict of mapping between column names and additional column information.
293+
An object where column names as keys and custom objects as corresponding
294+
values for deserialization. It's specifically useful for data types like
295+
protobuf where deserialization logic is on user-specific code. When provided,
296+
the custom object enables deserialization of backend-received column data.
297+
If not provided, data remains serialized as bytes for Proto Messages and
298+
integer for Proto Enums.
299+
300+
:type lazy_decode: bool
301+
:param lazy_decode:
302+
(Optional) If this argument is set to ``true``, the iterator
303+
returns the underlying protobuf values instead of decoded Python
304+
objects. This reduces the time that is needed to iterate through
305+
large result sets. The application is responsible for decoding
306+
the data that is needed. The returned row iterator contains two
307+
functions that can be used for this. ``iterator.decode_row(row)``
308+
decodes all the columns in the given row to an array of Python
309+
objects. ``iterator.decode_column(row, column_index)`` decodes one
310+
specific column in the given row.
311+
312+
:rtype: :class:`~google.cloud.spanner_v1.streamed.StreamedResultSet`
313+
:returns: a result set instance which can be used to consume rows.
314+
315+
:raises ValueError: if the Transaction already used to execute a
316+
read request, but is not a multi-use transaction or has not begun.
317+
"""
318+
319+
with self._lock:
320+
# Check if this request is beginning the transaction.
321+
# If a request is already in progress, other requests must wait
322+
# until the transaction ID is available.
323+
if self._begin_request_sent or self._read_request_count > 0:
324+
if not self._multi_use:
325+
raise ValueError("Cannot re-use single-use snapshot.")
326+
if self._transaction_id is None:
327+
wait_needed = True
328+
else:
329+
wait_needed = False
330+
else:
331+
wait_needed = False
332+
self._begin_request_sent = True
333+
334+
if wait_needed:
335+
# Wait for the transaction to begin (set by another concurrent request).
336+
# This prevents the race condition where concurrent requests think
337+
# the transaction hasn't begun.
338+
if not self._transaction_begin_event.wait(timeout=30.0):
339+
raise ValueError("Timed out waiting for transaction to begin.")
340+
243341
session = self._session
244342
database = session._database
245343
api = database.spanner_api
@@ -314,12 +412,128 @@ def execute_sql(
314412
column_info=None,
315413
lazy_decode=False,
316414
):
317-
"""Perform an ``ExecuteStreamingSql`` API request."""
318-
if self._read_request_count > 0:
319-
if not self._multi_use:
320-
raise ValueError("Cannot re-use single-use snapshot.")
321-
if self._transaction_id is None:
322-
raise ValueError("Transaction has not begun.")
415+
"""Perform an ``ExecuteStreamingSql`` API request.
416+
417+
:type sql: str
418+
:param sql: SQL query statement
419+
420+
:type params: dict, {str -> column value}
421+
:param params: values for parameter replacement. Keys must match
422+
the names used in ``sql``.
423+
424+
:type param_types: dict[str -> Union[dict, .types.Type]]
425+
:param param_types:
426+
(Optional) maps explicit types for one or more param values;
427+
required if parameters are passed.
428+
429+
:type query_mode:
430+
:class:`~google.cloud.spanner_v1.types.ExecuteSqlRequest.QueryMode`
431+
:param query_mode: Mode governing return of results / query plan.
432+
See:
433+
`QueryMode <https://cloud.google.com/spanner/reference/rpc/google.spanner.v1#google.spanner.v1.ExecuteSqlRequest.QueryMode>`_.
434+
435+
:type query_options:
436+
:class:`~google.cloud.spanner_v1.types.ExecuteSqlRequest.QueryOptions`
437+
or :class:`dict`
438+
:param query_options:
439+
(Optional) Query optimizer configuration to use for the given query.
440+
If a dict is provided, it must be of the same form as the protobuf
441+
message :class:`~google.cloud.spanner_v1.types.QueryOptions`
442+
443+
:type request_options:
444+
:class:`google.cloud.spanner_v1.types.RequestOptions`
445+
:param request_options:
446+
(Optional) Common options for this request.
447+
If a dict is provided, it must be of the same form as the protobuf
448+
message :class:`~google.cloud.spanner_v1.types.RequestOptions`.
449+
450+
:type last_statement: bool
451+
:param last_statement:
452+
If set to true, this option marks the end of the transaction. The
453+
transaction should be committed or aborted after this statement
454+
executes, and attempts to execute any other requests against this
455+
transaction (including reads and queries) will be rejected. Mixing
456+
mutations with statements that are marked as the last statement is
457+
not allowed.
458+
For DML statements, setting this option may cause some error
459+
reporting to be deferred until commit time (e.g. validation of
460+
unique constraints). Given this, successful execution of a DML
461+
statement should not be assumed until the transaction commits.
462+
463+
:type partition: bytes
464+
:param partition: (Optional) one of the partition tokens returned
465+
from :meth:`partition_query`.
466+
467+
:rtype: :class:`~google.cloud.spanner_v1.streamed.StreamedResultSet`
468+
:returns: a result set instance which can be used to consume rows.
469+
470+
:type retry: :class:`~google.api_core.retry.Retry`
471+
:param retry: (Optional) The retry settings for this request.
472+
473+
:type timeout: float
474+
:param timeout: (Optional) The timeout for this request.
475+
476+
:type data_boost_enabled:
477+
:param data_boost_enabled:
478+
(Optional) If this is for a partitioned query and this field is
479+
set ``true``, the request will be executed via offline access.
480+
If the field is set to ``true`` but the request does not set
481+
``partition_token``, the API will return an
482+
``INVALID_ARGUMENT`` error.
483+
484+
:type directed_read_options: :class:`~google.cloud.spanner_v1.DirectedReadOptions`
485+
or :class:`dict`
486+
:param directed_read_options: (Optional) Request level option used to set the directed_read_options
487+
for all ReadRequests and ExecuteSqlRequests that indicates which replicas
488+
or regions should be used for non-transactional reads or queries.
489+
490+
:type column_info: dict
491+
:param column_info: (Optional) dict of mapping between column names and additional column information.
492+
An object where column names as keys and custom objects as corresponding
493+
values for deserialization. It's specifically useful for data types like
494+
protobuf where deserialization logic is on user-specific code. When provided,
495+
the custom object enables deserialization of backend-received column data.
496+
If not provided, data remains serialized as bytes for Proto Messages and
497+
integer for Proto Enums.
498+
499+
:type lazy_decode: bool
500+
:param lazy_decode:
501+
(Optional) If this argument is set to ``true``, the iterator
502+
returns the underlying protobuf values instead of decoded Python
503+
objects. This reduces the time that is needed to iterate through
504+
large result sets. The application is responsible for decoding
505+
the data that is needed. The returned row iterator contains two
506+
functions that can be used for this. ``iterator.decode_row(row)``
507+
decodes all the columns in the given row to an array of Python
508+
objects. ``iterator.decode_column(row, column_index)`` decodes one
509+
specific column in the given row.
510+
511+
:raises ValueError: if the Transaction already used to execute a
512+
read request, but is not a multi-use transaction or has not begun.
513+
"""
514+
515+
with self._lock:
516+
# Check if this request is beginning the transaction.
517+
# If a request is already in progress, other requests must wait
518+
# until the transaction ID is available.
519+
if self._begin_request_sent or self._read_request_count > 0:
520+
if not self._multi_use:
521+
raise ValueError("Cannot re-use single-use snapshot.")
522+
if self._transaction_id is None:
523+
wait_needed = True
524+
else:
525+
wait_needed = False
526+
else:
527+
wait_needed = False
528+
self._begin_request_sent = True
529+
530+
if wait_needed:
531+
# Wait for the transaction to begin (set by another concurrent request).
532+
# This prevents the race condition where concurrent requests think
533+
# the transaction hasn't begun.
534+
if not self._transaction_begin_event.wait(timeout=30.0):
535+
raise ValueError("Timed out waiting for transaction to begin.")
536+
323537
if params is not None:
324538
params_pb = Struct(
325539
fields={key: _make_value_pb(value) for key, value in params.items()}
@@ -670,6 +884,9 @@ def _update_for_transaction_pb(self, transaction_pb: Transaction) -> None:
670884
"""Updates the snapshot for the given transaction."""
671885
if self._transaction_id is None and transaction_pb.id:
672886
self._transaction_id = transaction_pb.id
887+
# Notify waiting threads that the transaction has begun.
888+
self._transaction_begin_event.set()
889+
673890
if transaction_pb._pb.HasField("precommit_token"):
674891
self._update_for_precommit_token_pb_unsafe(transaction_pb.precommit_token)
675892

packages/google-cloud-spanner/tests/unit/test_spanner.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1104,6 +1104,7 @@ def test_transaction_for_concurrent_statement_should_begin_one_transaction_with_
11041104
def test_transaction_for_concurrent_statement_should_begin_one_transaction_with_read(
11051105
self,
11061106
):
1107+
self.maxDiff = None
11071108
database = _Database()
11081109
api = database.spanner_api = self._make_spanner_api()
11091110
session = _Session(database)

0 commit comments

Comments
 (0)