Google Cloud Storage¶
Support for reading and writing to Cloud Storage Buckets.
from cloudy.google import storage
See Google's Cloud Storage Documentation.
The functions in storage.py
range from trivial to complex.
The trivial functions are implemented because they remove mental overhead for those that don’t want to remember how to initiate or manage clients, whether to set environment variables, what parameters are best practices, or review the subpar Google documentation. These trivial functions include getting blobs, checking if blocks exist, listing blobs, deleting blobs, etc.
Some of the trivial functions which can be accomplished via the Google Python API are actually much more efficient when done via the Google CLI, so this module also hides the fact that it bulk uploads files via gsutil using a subprocess call.
The one complex function in this module is for managing parallel processes. There are many options for handling communication between instances for parallel workflows. These include starting up instances dedicated to specific task subsets of the whole workflow, or setting up dedicated job queues to distribute tasks, but the issues with these approaches are probably apparent.
A somewhat hacky but simpler approach is to use Google Storage as a job queue, especially when the workflow involves uploading assets to Google Storage. In this case, it’s relatively trivial for the instances to know what tasks need to be completed and simply check Google Storage to see if they’re completed yet. The one complication is ensuring that instances do not work on the same tasks due to race conditions, and the lock_blob_for_parallel_processing()
function in storage.py
handles this for you.
check_if_asset_exists_on_cloud_storage(bucket_name, bucket_key)
¶
Check if asset exists on Google Cloud Storage.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
bucket_name |
str |
path to the storage bucket. set to salo-packages for gs://salo-packages bucket. |
required |
bucket_key |
str |
path to the storage object inside the bucket. do not include the bucket name in this path. |
required |
Returns:
Type | Description |
---|---|
bool |
verifies whether an object exists. |
Source code in cloudy/google/storage.py
def check_if_asset_exists_on_cloud_storage(bucket_name: str, bucket_key: str) -> bool:
"""Check if asset exists on Google Cloud Storage.
Args:
bucket_name: path to the storage bucket. set to salo-packages for gs://salo-packages bucket.
bucket_key: path to the storage object inside the bucket. do not include the bucket name in this path.
Returns:
verifies whether an object exists.
"""
blob = get_cloud_storage_blob(bucket_name, bucket_key)
exists = blob.exists()
_logger.debug("Asset {}found at {}/{}".format("not " if not exists else "", bucket_name, bucket_key))
return exists
delete_cloud_storage_blob(bucket_name, bucket_key)
¶
Remove a storage object from the remote bucket.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
bucket_name |
str |
path to the storage bucket. set to salo-packages for gs://salo-packages bucket. |
required |
bucket_key |
str |
path to the storage object inside the bucket. do not include the bucket name in this path. |
required |
Returns:
Type | Description |
---|---|
None |
None. Deletes the remote blob. |
Source code in cloudy/google/storage.py
def delete_cloud_storage_blob(bucket_name: str, bucket_key: str) -> None:
"""Remove a storage object from the remote bucket.
Args:
bucket_name: path to the storage bucket. set to salo-packages for gs://salo-packages bucket.
bucket_key: path to the storage object inside the bucket. do not include the bucket name in this path.
Returns:
None. Deletes the remote blob.
"""
_logger.debug("Removing storage blob: {}/{}".format(bucket_name, bucket_key))
_initialize_client()
blob = get_cloud_storage_blob(bucket_name, bucket_key)
blob.delete()
download_asset_to_string(bucket_name, bucket_key)
¶
Read the contents of a remote asset as a string object.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
bucket_name |
str |
path to the storage bucket. set to salo-packages for gs://salo-packages bucket. |
required |
bucket_key |
str |
path to the storage object inside the bucket. do not include the bucket name in this path. |
required |
Returns:
Type | Description |
---|---|
str |
a decoded string with the contents of the storage asset. |
Source code in cloudy/google/storage.py
def download_asset_to_string(bucket_name: str, bucket_key: str) -> str:
"""Read the contents of a remote asset as a string object.
Args:
bucket_name: path to the storage bucket. set to salo-packages for gs://salo-packages bucket.
bucket_key: path to the storage object inside the bucket. do not include the bucket name in this path.
Returns:
a decoded string with the contents of the storage asset.
"""
_logger.debug("Downloading to string: {}/{}".format(bucket_name, bucket_key))
_initialize_client()
blob = get_cloud_storage_blob(bucket_name, bucket_key)
return blob.download_as_string().decode("utf-8")
get_cloud_storage_blob(bucket_name, bucket_key)
¶
Get blob for Google Cloud Storage object.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
bucket_name |
str |
path to the storage bucket. set to salo-packages for gs://salo-packages bucket. |
required |
bucket_key |
str |
path to the storage object inside the bucket. do not include the bucket name in this path. |
required |
Returns:
Type | Description |
---|---|
Blob |
a GCS object blob. |
Source code in cloudy/google/storage.py
def get_cloud_storage_blob(bucket_name: str, bucket_key: str) -> storage.Blob:
"""Get blob for Google Cloud Storage object.
Args:
bucket_name: path to the storage bucket. set to salo-packages for gs://salo-packages bucket.
bucket_key: path to the storage object inside the bucket. do not include the bucket name in this path.
Returns:
a GCS object blob.
"""
_initialize_client()
bucket = STORAGE_CLIENT.bucket(bucket_name)
return bucket.blob(bucket_key, chunk_size=CHUNK_SIZE) # Chunk size necessary to avoid timeouts
list_cloud_storage_blobs_recursively(bucket_name, bucket_key_prefix)
¶
Get blobs found at a specific bucket key prefix.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
bucket_name |
str |
path to the storage bucket. set to salo-packages for gs://salo-packages bucket. |
required |
bucket_key_prefix |
str |
prefix of a path to storage objects inside the bucket. |
required |
Returns:
Type | Description |
---|---|
List[google.cloud.storage.blob.Blob] |
All storage objects that match the bucket_key_prefix passed. |
Source code in cloudy/google/storage.py
def list_cloud_storage_blobs_recursively(bucket_name: str, bucket_key_prefix: str) -> List[storage.Blob]:
"""Get blobs found at a specific bucket key prefix.
Args:
bucket_name: path to the storage bucket. set to salo-packages for gs://salo-packages bucket.
bucket_key_prefix: prefix of a path to storage objects inside the bucket.
Returns:
All storage objects that match the bucket_key_prefix passed.
"""
_initialize_client()
bucket = STORAGE_CLIENT.bucket(bucket_name)
blobs = bucket.list_blobs(prefix=bucket_key_prefix)
return blobs
lock_blob_for_parallel_processing(lock_bucket_name, lock_bucket_key, duration=5)
¶
Attempt to lock a blob for parallel processing. Returns True if locked and False if not.
Example: you want to apply a model to many rasters on Google Cloud. You have many instances which are all tasked with processing all rasters. You want to avoid processing the same raster with multiple files, so you attempt to create lock files for each raster so that instances know when a file is being processed. Unfortunately, Google Cloud Storage does not support lock files in the same way as Linux filesystems, so this function handles race conditions.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
lock_bucket_name |
str |
path to the storage bucket. set to salo-packages for gs://salo-packages bucket. |
required |
lock_bucket_key |
str |
path to the storage object to lock. |
required |
duration |
float |
the time in seconds to wait for other processes to register. |
5 |
Returns:
Type | Description |
---|---|
bool |
Evaluates whether a blob has been locked by another process. |
Source code in cloudy/google/storage.py
def lock_blob_for_parallel_processing(lock_bucket_name: str, lock_bucket_key: str, duration: float = 5) -> bool:
"""Attempt to lock a blob for parallel processing. Returns True if locked and False if not.
Example: you want to apply a model to many rasters on Google Cloud. You have many instances which are all tasked
with processing all rasters. You want to avoid processing the same raster with multiple files, so you attempt to
create lock files for each raster so that instances know when a file is being processed. Unfortunately, Google Cloud
Storage does not support lock files in the same way as Linux filesystems, so this function handles race conditions.
Args:
lock_bucket_name: path to the storage bucket. set to salo-packages for gs://salo-packages bucket.
lock_bucket_key: path to the storage object to lock.
duration: the time in seconds to wait for other processes to register.
Returns:
Evaluates whether a blob has been locked by another process.
"""
# Return early if already in progress, i.e., locked
blob = get_cloud_storage_blob(lock_bucket_name, lock_bucket_key)
in_progress = blob.exists()
if in_progress:
_logger.debug("Blob in progress, locked by another process: {}/{}".format(lock_bucket_name, lock_bucket_key))
return False
# Attempt to lock blob with a unique string
this_uuid = str(uuid.uuid4())
upload_string_to_cloud_storage(this_uuid, lock_bucket_name, lock_bucket_key)
# Wait for any other process locks to resolve
time.sleep(duration)
# Get the last written unique string
locked_uuid = download_asset_to_string(lock_bucket_name, lock_bucket_key)
if locked_uuid != this_uuid:
_logger.debug(
"Blob in progress, locked by another process after resolving race conditions: {}/{}".format(
lock_bucket_name, lock_bucket_key
)
)
return False # Was last locked by another process
_logger.debug("Blob locked for processing by this process: {}/{}".format(lock_bucket_name, lock_bucket_key))
return True # Was locked by this process
unlock_blob_for_parallel_processing(lock_bucket_name, lock_bucket_key)
¶
Delete a storage object that was previously locked for processing.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
lock_bucket_name |
str |
path to the storage bucket. set to salo-packages for gs://salo-packages bucket. |
required |
lock_bucket_key |
str |
path to the locked storage object to delete. |
required |
Returns:
Type | Description |
---|---|
None |
None. Deletes the remote object. |
Source code in cloudy/google/storage.py
def unlock_blob_for_parallel_processing(lock_bucket_name: str, lock_bucket_key: str) -> None:
"""Delete a storage object that was previously locked for processing.
Args:
lock_bucket_name: path to the storage bucket. set to salo-packages for gs://salo-packages bucket.
lock_bucket_key: path to the locked storage object to delete.
Returns:
None. Deletes the remote object.
"""
delete_cloud_storage_blob(lock_bucket_name, lock_bucket_key)
upload_directory_to_cloud_storage(directory, bucket_name, bucket_key)
¶
Upload a directory of files to Google Cloud Storage.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
directory |
str |
the local directory path to upload. |
required |
bucket_name |
str |
path to the storage bucket. set to salo-packages for gs://salo-packages bucket. |
required |
bucket_key |
str |
path to the storage object inside the bucket. do not include the bucket name in this path. |
required |
Returns:
Type | Description |
---|---|
None |
None. Submits a remote upload operation. |
Source code in cloudy/google/storage.py
def upload_directory_to_cloud_storage(directory: str, bucket_name: str, bucket_key: str) -> None:
"""Upload a directory of files to Google Cloud Storage.
Args:
directory: the local directory path to upload.
bucket_name: path to the storage bucket. set to salo-packages for gs://salo-packages bucket.
bucket_key: path to the storage object inside the bucket. do not include the bucket name in this path.
Returns:
None. Submits a remote upload operation.
"""
_logger.debug("Uploading to {}/{}".format(bucket_name, bucket_key))
# Uploads with gsutil much quicker than blob.upload_*
command = "gsutil -m cp -r {dir} gs://{name}/{key}".format(dir=directory, name=bucket_name, key=bucket_key)
cloudy.run_command_line(command)
upload_string_to_cloud_storage(string, bucket_name, bucket_key)
¶
Upload in-memory string to Google Cloud Storage. Handles both string and bytes.
Useful for writing things like json
files to cloud storage locations.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
string |
str |
str: |
required |
bucket_name |
str |
path to the storage bucket. set to salo-packages for gs://salo-packages bucket. |
required |
bucket_key |
str |
path to the storage object inside the bucket. do not include the bucket name in this path. |
required |
Returns:
Type | Description |
---|---|
None |
None. Submits a remote upload operation. |
Source code in cloudy/google/storage.py
def upload_string_to_cloud_storage(string: str, bucket_name: str, bucket_key: str) -> None:
"""Upload in-memory string to Google Cloud Storage. Handles both string and bytes.
Useful for writing things like `json` files to cloud storage locations.
Args:
string: str:
bucket_name: path to the storage bucket. set to salo-packages for gs://salo-packages bucket.
bucket_key: path to the storage object inside the bucket. do not include the bucket name in this path.
Returns:
None. Submits a remote upload operation.
"""
_logger.debug("Uploading to {}/{}".format(bucket_name, bucket_key))
# I think this should work? I don't know if it handles all possible cases, but I guess we'll find out. Strings
# certainly have the encode attribute, while bytes have the decode attribute.
if hasattr(string, "encode"):
string = string.encode("utf-8")
_initialize_client()
blob = get_cloud_storage_blob(bucket_name, bucket_key)
blob.upload_from_string(string)