[go: up one dir, main page]

Skip to content

Commit

Permalink
feat: add retry and timeout for batch dml (#1107)
Browse files Browse the repository at this point in the history
* feat(spanner): add retry, timeout for batch update

* feat(spanner): add samples for retry, timeout

* feat(spanner): update unittest

* feat(spanner): update comments

* feat(spanner): update code for retry

* feat(spanner): update comment
  • Loading branch information
harshachinta committed Mar 6, 2024
1 parent 9c919fa commit 4f6340b
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 3 deletions.
17 changes: 16 additions & 1 deletion google/cloud/spanner_v1/transaction.py
Expand Up @@ -410,7 +410,14 @@ def execute_update(

return response.stats.row_count_exact

def batch_update(self, statements, request_options=None):
def batch_update(
self,
statements,
request_options=None,
*,
retry=gapic_v1.method.DEFAULT,
timeout=gapic_v1.method.DEFAULT,
):
"""Perform a batch of DML statements via an ``ExecuteBatchDml`` request.
:type statements:
Expand All @@ -431,6 +438,12 @@ def batch_update(self, statements, request_options=None):
If a dict is provided, it must be of the same form as the protobuf
message :class:`~google.cloud.spanner_v1.types.RequestOptions`.
:type retry: :class:`~google.api_core.retry.Retry`
:param retry: (Optional) The retry settings for this request.
:type timeout: float
:param timeout: (Optional) The timeout for this request.
:rtype:
Tuple(status, Sequence[int])
:returns:
Expand Down Expand Up @@ -486,6 +499,8 @@ def batch_update(self, statements, request_options=None):
api.execute_batch_dml,
request=request,
metadata=metadata,
retry=retry,
timeout=timeout,
)

if self._transaction_id is None:
Expand Down
50 changes: 50 additions & 0 deletions samples/samples/snippets.py
Expand Up @@ -3017,6 +3017,51 @@ def directed_read_options(
# [END spanner_directed_read]


def set_custom_timeout_and_retry(instance_id, database_id):
"""Executes a snapshot read with custom timeout and retry."""
# [START spanner_set_custom_timeout_and_retry]
from google.api_core import retry
from google.api_core import exceptions as core_exceptions

# instance_id = "your-spanner-instance"
# database_id = "your-spanner-db-id"
spanner_client = spanner.Client()
instance = spanner_client.instance(instance_id)
database = instance.database(database_id)

retry = retry.Retry(
# Customize retry with an initial wait time of 500 milliseconds.
initial=0.5,
# Customize retry with a maximum wait time of 16 seconds.
maximum=16,
# Customize retry with a wait time multiplier per iteration of 1.5.
multiplier=1.5,
# Customize retry with a timeout on
# how long a certain RPC may be retried in
# case the server returns an error.
timeout=60,
# Configure which errors should be retried.
predicate=retry.if_exception_type(
core_exceptions.ServiceUnavailable,
),
)

# Set a custom retry and timeout setting.
with database.snapshot() as snapshot:
results = snapshot.execute_sql(
"SELECT SingerId, AlbumId, AlbumTitle FROM Albums",
# Set custom retry setting for this request
retry=retry,
# Set custom timeout of 60 seconds for this request
timeout=60,
)

for row in results:
print("SingerId: {}, AlbumId: {}, AlbumTitle: {}".format(*row))

# [END spanner_set_custom_timeout_and_retry]


if __name__ == "__main__": # noqa: C901
parser = argparse.ArgumentParser(
description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter
Expand Down Expand Up @@ -3157,6 +3202,9 @@ def directed_read_options(
)
enable_fine_grained_access_parser.add_argument("--title", default="condition title")
subparsers.add_parser("directed_read_options", help=directed_read_options.__doc__)
subparsers.add_parser(
"set_custom_timeout_and_retry", help=set_custom_timeout_and_retry.__doc__
)

args = parser.parse_args()

Expand Down Expand Up @@ -3290,3 +3338,5 @@ def directed_read_options(
)
elif args.command == "directed_read_options":
directed_read_options(args.instance_id, args.database_id)
elif args.command == "set_custom_timeout_and_retry":
set_custom_timeout_and_retry(args.instance_id, args.database_id)
7 changes: 7 additions & 0 deletions samples/samples/snippets_test.py
Expand Up @@ -859,3 +859,10 @@ def test_directed_read_options(capsys, instance_id, sample_database):
snippets.directed_read_options(instance_id, sample_database.database_id)
out, _ = capsys.readouterr()
assert "SingerId: 1, AlbumId: 1, AlbumTitle: Total Junk" in out


@pytest.mark.dependency(depends=["insert_data"])
def test_set_custom_timeout_and_retry(capsys, instance_id, sample_database):
snippets.set_custom_timeout_and_retry(instance_id, sample_database.database_id)
out, _ = capsys.readouterr()
assert "SingerId: 1, AlbumId: 1, AlbumTitle: Total Junk" in out
14 changes: 14 additions & 0 deletions tests/unit/test_spanner.py
Expand Up @@ -556,6 +556,8 @@ def test_transaction_should_include_begin_with_first_batch_update(self):
("google-cloud-resource-prefix", database.name),
("x-goog-spanner-route-to-leader", "true"),
],
retry=RETRY,
timeout=TIMEOUT,
)

def test_transaction_should_use_transaction_id_if_error_with_first_batch_update(
Expand All @@ -574,6 +576,8 @@ def test_transaction_should_use_transaction_id_if_error_with_first_batch_update(
("google-cloud-resource-prefix", database.name),
("x-goog-spanner-route-to-leader", "true"),
],
retry=RETRY,
timeout=TIMEOUT,
)
self._execute_update_helper(transaction=transaction, api=api)
api.execute_sql.assert_called_once_with(
Expand Down Expand Up @@ -715,6 +719,8 @@ def test_transaction_should_use_transaction_id_returned_by_first_read(self):
("google-cloud-resource-prefix", database.name),
("x-goog-spanner-route-to-leader", "true"),
],
retry=RETRY,
timeout=TIMEOUT,
)

def test_transaction_should_use_transaction_id_returned_by_first_batch_update(self):
Expand All @@ -729,6 +735,8 @@ def test_transaction_should_use_transaction_id_returned_by_first_batch_update(se
("google-cloud-resource-prefix", database.name),
("x-goog-spanner-route-to-leader", "true"),
],
retry=RETRY,
timeout=TIMEOUT,
)
self._read_helper(transaction=transaction, api=api)
api.streaming_read.assert_called_once_with(
Expand Down Expand Up @@ -797,6 +805,8 @@ def test_transaction_for_concurrent_statement_should_begin_one_transaction_with_
("google-cloud-resource-prefix", database.name),
("x-goog-spanner-route-to-leader", "true"),
],
retry=RETRY,
timeout=TIMEOUT,
)

self.assertEqual(api.execute_sql.call_count, 2)
Expand Down Expand Up @@ -846,6 +856,8 @@ def test_transaction_for_concurrent_statement_should_begin_one_transaction_with_
("google-cloud-resource-prefix", database.name),
("x-goog-spanner-route-to-leader", "true"),
],
retry=RETRY,
timeout=TIMEOUT,
)

api.execute_batch_dml.assert_any_call(
Expand All @@ -854,6 +866,8 @@ def test_transaction_for_concurrent_statement_should_begin_one_transaction_with_
("google-cloud-resource-prefix", database.name),
("x-goog-spanner-route-to-leader", "true"),
],
retry=RETRY,
timeout=TIMEOUT,
)

self.assertEqual(api.execute_sql.call_count, 1)
Expand Down
25 changes: 23 additions & 2 deletions tests/unit/test_transaction.py
Expand Up @@ -662,7 +662,14 @@ def test_batch_update_other_error(self):
with self.assertRaises(RuntimeError):
transaction.batch_update(statements=[DML_QUERY])

def _batch_update_helper(self, error_after=None, count=0, request_options=None):
def _batch_update_helper(
self,
error_after=None,
count=0,
request_options=None,
retry=gapic_v1.method.DEFAULT,
timeout=gapic_v1.method.DEFAULT,
):
from google.rpc.status_pb2 import Status
from google.protobuf.struct_pb2 import Struct
from google.cloud.spanner_v1 import param_types
Expand Down Expand Up @@ -716,7 +723,10 @@ def _batch_update_helper(self, error_after=None, count=0, request_options=None):
request_options = RequestOptions(request_options)

status, row_counts = transaction.batch_update(
dml_statements, request_options=request_options
dml_statements,
request_options=request_options,
retry=retry,
timeout=timeout,
)

self.assertEqual(status, expected_status)
Expand Down Expand Up @@ -753,6 +763,8 @@ def _batch_update_helper(self, error_after=None, count=0, request_options=None):
("google-cloud-resource-prefix", database.name),
("x-goog-spanner-route-to-leader", "true"),
],
retry=retry,
timeout=timeout,
)

self.assertEqual(transaction._execute_sql_count, count + 1)
Expand Down Expand Up @@ -826,6 +838,15 @@ def test_batch_update_error(self):

self.assertEqual(transaction._execute_sql_count, 1)

def test_batch_update_w_timeout_param(self):
self._batch_update_helper(timeout=2.0)

def test_batch_update_w_retry_param(self):
self._batch_update_helper(retry=gapic_v1.method.DEFAULT)

def test_batch_update_w_timeout_and_retry_params(self):
self._batch_update_helper(retry=gapic_v1.method.DEFAULT, timeout=2.0)

def test_context_mgr_success(self):
import datetime
from google.cloud.spanner_v1 import CommitResponse
Expand Down

0 comments on commit 4f6340b

Please sign in to comment.