-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Expand file tree
/
Copy pathtest_snippet.py
More file actions
76 lines (64 loc) · 2.59 KB
/
test_snippet.py
File metadata and controls
76 lines (64 loc) · 2.59 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# Copyright 2026 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the 'License');
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import asyncio
from google.cloud.storage.asyncio.async_appendable_object_writer import (
AsyncAppendableObjectWriter,
)
from google.cloud.storage import Blob
from google.cloud.storage._experimental.asyncio.async_grpc_client import AsyncGrpcClient
async def storage_create_and_write_appendable_object(
bucket_name, object_name, grpc_client=None
):
"""Uploads an appendable object to zonal bucket.
grpc_client: an existing grpc_client to use, this is only for testing.
"""
if grpc_client is None:
grpc_client = AsyncGrpcClient()
blob = Blob.from_uri("gs://{}/{}".format(bucket_name, object_name))
blob.content_type = "text/plain"
writer = AsyncAppendableObjectWriter(
client=grpc_client,
bucket_name=bucket_name,
object_name=object_name,
blob=blob,
generation=0, # throws `FailedPrecondition` if object already exists.
)
# This creates a new appendable object of size 0 and opens it for appending.
await writer.open()
# appends data to the object
# you can perform `.append` multiple times as needed. Data will be appended
# to the end of the object.
await writer.append(b"Some data")
# Once all appends are done, close the gRPC bidirectional stream.
new_object = await writer.close(finalize_on_close=True)
print(new_object)
print(new_object.size)
print(new_object.content_type)
print(
f"Appended object {object_name} created of size {writer.persisted_size} bytes."
)
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter
)
parser.add_argument("--bucket_name", help="Your Cloud Storage bucket name.")
parser.add_argument("--object_name", help="Your Cloud Storage object name.")
args = parser.parse_args()
asyncio.run(
storage_create_and_write_appendable_object(
bucket_name=args.bucket_name,
object_name=args.object_name,
)
)