Skip to content

Google Cloud Storage

Support for reading and writing to Cloud Storage Buckets.

from cloudy.google import storage

See Google's Cloud Storage Documentation.


The functions in storage.py range from trivial to complex.

The trivial functions are implemented because they remove mental overhead for those that don’t want to remember how to initiate or manage clients, whether to set environment variables, what parameters are best practices, or review the subpar Google documentation. These trivial functions include getting blobs, checking if blocks exist, listing blobs, deleting blobs, etc.

Some of the trivial functions which can be accomplished via the Google Python API are actually much more efficient when done via the Google CLI, so this module also hides the fact that it bulk uploads files via gsutil using a subprocess call.

The one complex function in this module is for managing parallel processes. There are many options for handling communication between instances for parallel workflows. These include starting up instances dedicated to specific task subsets of the whole workflow, or setting up dedicated job queues to distribute tasks, but the issues with these approaches are probably apparent.

A somewhat hacky but simpler approach is to use Google Storage as a job queue, especially when the workflow involves uploading assets to Google Storage. In this case, it’s relatively trivial for the instances to know what tasks need to be completed and simply check Google Storage to see if they’re completed yet. The one complication is ensuring that instances do not work on the same tasks due to race conditions, and the lock_blob_for_parallel_processing() function in storage.py handles this for you.


check_if_asset_exists_on_cloud_storage(bucket_name, bucket_key)

Check if asset exists on Google Cloud Storage.

Parameters:

Name Type Description Default
bucket_name str

path to the storage bucket. set to salo-packages for gs://salo-packages bucket.

required
bucket_key str

path to the storage object inside the bucket. do not include the bucket name in this path.

required

Returns:

Type Description
bool

verifies whether an object exists.

Source code in cloudy/google/storage.py
def check_if_asset_exists_on_cloud_storage(bucket_name: str, bucket_key: str) -> bool:
    """Check if asset exists on Google Cloud Storage.

    Args:
      bucket_name: path to the storage bucket. set to salo-packages for gs://salo-packages bucket.
      bucket_key: path to the storage object inside the bucket. do not include the bucket name in this path.

    Returns:
      verifies whether an object exists.
    """
    blob = get_cloud_storage_blob(bucket_name, bucket_key)
    exists = blob.exists()
    _logger.debug("Asset {}found at {}/{}".format("not " if not exists else "", bucket_name, bucket_key))

    return exists

delete_cloud_storage_blob(bucket_name, bucket_key)

Remove a storage object from the remote bucket.

Parameters:

Name Type Description Default
bucket_name str

path to the storage bucket. set to salo-packages for gs://salo-packages bucket.

required
bucket_key str

path to the storage object inside the bucket. do not include the bucket name in this path.

required

Returns:

Type Description
None

None. Deletes the remote blob.

Source code in cloudy/google/storage.py
def delete_cloud_storage_blob(bucket_name: str, bucket_key: str) -> None:
    """Remove a storage object from the remote bucket.

    Args:
      bucket_name: path to the storage bucket. set to salo-packages for gs://salo-packages bucket.
      bucket_key: path to the storage object inside the bucket. do not include the bucket name in this path.

    Returns:
      None. Deletes the remote blob.
    """
    _logger.debug("Removing storage blob:  {}/{}".format(bucket_name, bucket_key))
    _initialize_client()
    blob = get_cloud_storage_blob(bucket_name, bucket_key)
    blob.delete()

download_asset_to_string(bucket_name, bucket_key)

Read the contents of a remote asset as a string object.

Parameters:

Name Type Description Default
bucket_name str

path to the storage bucket. set to salo-packages for gs://salo-packages bucket.

required
bucket_key str

path to the storage object inside the bucket. do not include the bucket name in this path.

required

Returns:

Type Description
str

a decoded string with the contents of the storage asset.

Source code in cloudy/google/storage.py
def download_asset_to_string(bucket_name: str, bucket_key: str) -> str:
    """Read the contents of a remote asset as a string object.

    Args:
      bucket_name: path to the storage bucket. set to salo-packages for gs://salo-packages bucket.
      bucket_key: path to the storage object inside the bucket. do not include the bucket name in this path.

    Returns:
      a decoded string with the contents of the storage asset.
    """
    _logger.debug("Downloading to string:  {}/{}".format(bucket_name, bucket_key))
    _initialize_client()
    blob = get_cloud_storage_blob(bucket_name, bucket_key)

    return blob.download_as_string().decode("utf-8")

get_cloud_storage_blob(bucket_name, bucket_key)

Get blob for Google Cloud Storage object.

Parameters:

Name Type Description Default
bucket_name str

path to the storage bucket. set to salo-packages for gs://salo-packages bucket.

required
bucket_key str

path to the storage object inside the bucket. do not include the bucket name in this path.

required

Returns:

Type Description
Blob

a GCS object blob.

Source code in cloudy/google/storage.py
def get_cloud_storage_blob(bucket_name: str, bucket_key: str) -> storage.Blob:
    """Get blob for Google Cloud Storage object.

    Args:
      bucket_name: path to the storage bucket. set to salo-packages for gs://salo-packages bucket.
      bucket_key: path to the storage object inside the bucket. do not include the bucket name in this path.

    Returns:
      a GCS object blob.
    """
    _initialize_client()
    bucket = STORAGE_CLIENT.bucket(bucket_name)

    return bucket.blob(bucket_key, chunk_size=CHUNK_SIZE)  # Chunk size necessary to avoid timeouts

list_cloud_storage_blobs_recursively(bucket_name, bucket_key_prefix)

Get blobs found at a specific bucket key prefix.

Parameters:

Name Type Description Default
bucket_name str

path to the storage bucket. set to salo-packages for gs://salo-packages bucket.

required
bucket_key_prefix str

prefix of a path to storage objects inside the bucket.

required

Returns:

Type Description
List[google.cloud.storage.blob.Blob]

All storage objects that match the bucket_key_prefix passed.

Source code in cloudy/google/storage.py
def list_cloud_storage_blobs_recursively(bucket_name: str, bucket_key_prefix: str) -> List[storage.Blob]:
    """Get blobs found at a specific bucket key prefix.

    Args:
      bucket_name: path to the storage bucket. set to salo-packages for gs://salo-packages bucket.
      bucket_key_prefix: prefix of a path to storage objects inside the bucket.

    Returns:
      All storage objects that match the bucket_key_prefix passed.
    """
    _initialize_client()
    bucket = STORAGE_CLIENT.bucket(bucket_name)
    blobs = bucket.list_blobs(prefix=bucket_key_prefix)

    return blobs

lock_blob_for_parallel_processing(lock_bucket_name, lock_bucket_key, duration=5)

Attempt to lock a blob for parallel processing. Returns True if locked and False if not.

Example: you want to apply a model to many rasters on Google Cloud. You have many instances which are all tasked with processing all rasters. You want to avoid processing the same raster with multiple files, so you attempt to create lock files for each raster so that instances know when a file is being processed. Unfortunately, Google Cloud Storage does not support lock files in the same way as Linux filesystems, so this function handles race conditions.

Parameters:

Name Type Description Default
lock_bucket_name str

path to the storage bucket. set to salo-packages for gs://salo-packages bucket.

required
lock_bucket_key str

path to the storage object to lock.

required
duration float

the time in seconds to wait for other processes to register.

5

Returns:

Type Description
bool

Evaluates whether a blob has been locked by another process.

Source code in cloudy/google/storage.py
def lock_blob_for_parallel_processing(lock_bucket_name: str, lock_bucket_key: str, duration: float = 5) -> bool:
    """Attempt to lock a blob for parallel processing. Returns True if locked and False if not.

    Example: you want to apply a model to many rasters on Google Cloud. You have many instances which are all tasked
    with processing all rasters. You want to avoid processing the same raster with multiple files, so you attempt to
    create lock files for each raster so that instances know when a file is being processed. Unfortunately, Google Cloud
    Storage does not support lock files in the same way as Linux filesystems, so this function handles race conditions.

    Args:
      lock_bucket_name: path to the storage bucket. set to salo-packages for gs://salo-packages bucket.
      lock_bucket_key: path to the storage object to lock.
      duration: the time in seconds to wait for other processes to register.

    Returns:
      Evaluates whether a blob has been locked by another process.
    """
    # Return early if already in progress, i.e., locked
    blob = get_cloud_storage_blob(lock_bucket_name, lock_bucket_key)
    in_progress = blob.exists()
    if in_progress:
        _logger.debug("Blob in progress, locked by another process:  {}/{}".format(lock_bucket_name, lock_bucket_key))
        return False

    # Attempt to lock blob with a unique string
    this_uuid = str(uuid.uuid4())
    upload_string_to_cloud_storage(this_uuid, lock_bucket_name, lock_bucket_key)

    # Wait for any other process locks to resolve
    time.sleep(duration)

    # Get the last written unique string
    locked_uuid = download_asset_to_string(lock_bucket_name, lock_bucket_key)
    if locked_uuid != this_uuid:
        _logger.debug(
            "Blob in progress, locked by another process after resolving race conditions:  {}/{}".format(
                lock_bucket_name, lock_bucket_key
            )
        )
        return False  # Was last locked by another process
    _logger.debug("Blob locked for processing by this process:  {}/{}".format(lock_bucket_name, lock_bucket_key))

    return True  # Was locked by this process

unlock_blob_for_parallel_processing(lock_bucket_name, lock_bucket_key)

Delete a storage object that was previously locked for processing.

Parameters:

Name Type Description Default
lock_bucket_name str

path to the storage bucket. set to salo-packages for gs://salo-packages bucket.

required
lock_bucket_key str

path to the locked storage object to delete.

required

Returns:

Type Description
None

None. Deletes the remote object.

Source code in cloudy/google/storage.py
def unlock_blob_for_parallel_processing(lock_bucket_name: str, lock_bucket_key: str) -> None:
    """Delete a storage object that was previously locked for processing.

    Args:
      lock_bucket_name: path to the storage bucket. set to salo-packages for gs://salo-packages bucket.
      lock_bucket_key: path to the locked storage object to delete.

    Returns:
      None. Deletes the remote object.
    """
    delete_cloud_storage_blob(lock_bucket_name, lock_bucket_key)

upload_directory_to_cloud_storage(directory, bucket_name, bucket_key)

Upload a directory of files to Google Cloud Storage.

Parameters:

Name Type Description Default
directory str

the local directory path to upload.

required
bucket_name str

path to the storage bucket. set to salo-packages for gs://salo-packages bucket.

required
bucket_key str

path to the storage object inside the bucket. do not include the bucket name in this path.

required

Returns:

Type Description
None

None. Submits a remote upload operation.

Source code in cloudy/google/storage.py
def upload_directory_to_cloud_storage(directory: str, bucket_name: str, bucket_key: str) -> None:
    """Upload a directory of files to Google Cloud Storage.

    Args:
      directory: the local directory path to upload.
      bucket_name: path to the storage bucket. set to salo-packages for gs://salo-packages bucket.
      bucket_key: path to the storage object inside the bucket. do not include the bucket name in this path.

    Returns:
      None. Submits a remote upload operation.
    """
    _logger.debug("Uploading to {}/{}".format(bucket_name, bucket_key))

    # Uploads with gsutil much quicker than blob.upload_*
    command = "gsutil -m cp -r {dir} gs://{name}/{key}".format(dir=directory, name=bucket_name, key=bucket_key)
    cloudy.run_command_line(command)

upload_string_to_cloud_storage(string, bucket_name, bucket_key)

Upload in-memory string to Google Cloud Storage. Handles both string and bytes.

Useful for writing things like json files to cloud storage locations.

Parameters:

Name Type Description Default
string str

str:

required
bucket_name str

path to the storage bucket. set to salo-packages for gs://salo-packages bucket.

required
bucket_key str

path to the storage object inside the bucket. do not include the bucket name in this path.

required

Returns:

Type Description
None

None. Submits a remote upload operation.

Source code in cloudy/google/storage.py
def upload_string_to_cloud_storage(string: str, bucket_name: str, bucket_key: str) -> None:
    """Upload in-memory string to Google Cloud Storage. Handles both string and bytes.

    Useful for writing things like `json` files to cloud storage locations.

    Args:
      string: str:
      bucket_name: path to the storage bucket. set to salo-packages for gs://salo-packages bucket.
      bucket_key: path to the storage object inside the bucket. do not include the bucket name in this path.

    Returns:
      None. Submits a remote upload operation.
    """
    _logger.debug("Uploading to {}/{}".format(bucket_name, bucket_key))
    # I think this should work? I don't know if it handles all possible cases, but I guess we'll find out. Strings
    # certainly have the encode attribute, while bytes have the decode attribute.
    if hasattr(string, "encode"):
        string = string.encode("utf-8")
    _initialize_client()
    blob = get_cloud_storage_blob(bucket_name, bucket_key)
    blob.upload_from_string(string)
Back to top