Skip to content

Google Cloud Tasks

Cloud Tasks manages the execution, dispatch and delivery of a large number of distributed tasks asynchronously. These tasks can be executed via Cloud Run, App Engine or any HTTP endpoint.

Google Tasks is an alternative to PubSub for submitting parallel workflows with many subtasks. Tasks are published to queues and distributed in either a push or pull fashion.

This module allows you to publish tasks one-by-one via the Google Python API, or in bulk using the Google CLI. The difference here is the efficiency involved in submitting jobs. There is a Task object here to handle the numerous pieces of information necessary to publish tasks.

from cloudy.google import tasks

See Google's Cloud Tasks Documentation.


Task dataclass

A Google Cloud Task to submit to the queue for processing.

Attributes:

Name Type Description
task_body str

the body of the task message to submit. Typically a dictionary that's been encoded to a string.

task_name str

a unique identifier to attach to this task for queueing and tracking.

queue str

the name of the queue to which the task will be published.

http_endpoint str

often a reference to a Cloud Run URL, but could be any HTTP endpoint.

google_config configs.GoogleCloudConfig

a configs.GoogleCloudConfig object with project, region, and account information.

Source code in cloudy/google/tasks.py
class Task(object):
    """A Google Cloud Task to submit to the queue for processing.

    Attributes:
      task_body (str): the body of the task message to submit. Typically a dictionary that's been encoded to a string.
      task_name (str): a unique identifier to attach to this task for queueing and tracking.
      queue (str): the name of the queue to which the task will be published.
      http_endpoint (str): often a reference to a Cloud Run URL, but could be any HTTP endpoint.
      google_config (configs.GoogleCloudConfig): a `configs.GoogleCloudConfig` object with project, region, and account information.
    """

    task_body: str
    task_name: str
    queue: str
    http_endpoint: str
    google_config: configs.GoogleCloudConfig

    def __post_init__(self) -> None:
        _initialize_client()
        assert len(self.task_body.encode()) < MAX_BYTES, "Tasks must be less than 100KB"
        self.queue_path = TASKS_CLIENT.queue_path(
            project=self.google_config.project_id, location=configs.DEFAULT_CLOUD_TASKS_REGION, queue=self.queue
        )
        self.task_path = TASKS_CLIENT.task_path(
            project=self.google_config.project_id,
            location=configs.DEFAULT_CLOUD_TASKS_REGION,
            queue=self.queue,
            task=self.task_name,
        )

publish_task_to_queue(task)

Publish a task to a Google Cloud Tasks queue.

Parameters:

Name Type Description Default
task Task

the Task() object to submit to the queue.

required

Returns:

Type Description
None

None. Submits the request without receiving any response. Responses should be tracked by the Tasks client.

Source code in cloudy/google/tasks.py
def publish_task_to_queue(task: Task) -> None:
    """Publish a task to a Google Cloud Tasks queue.

    Args:
      task: the Task() object to submit to the queue.

    Returns:
      None. Submits the request without receiving any response. Responses should be tracked by the Tasks client.
    """
    _logger.debug("Publishing task to Tasks queue {}".format(task.queue))
    task_request = {
        "name": task.task_path,
        "http_request": {
            "http_method": "POST",
            "url": task.http_endpoint,
            "oidc_token": {"service_account_email": task.google_config.service_account_email},
            "body": task.task_body.encode(),
        },
    }
    try:
        TASKS_CLIENT.create_task(parent=task.queue_path, task=task_request)  # Returns Task response object
    except google_exceptions.AlreadyExists:
        pass

publish_tasks_to_queue(tasks, assert_success)

Publish tasks to Google Cloud Tasks more quickly by not waiting for API responses.

Important note: if your task body is a dictionary of values, it needs a single json dump to convert it into a string, and this function will automatically utf-8 encode the command content.

Parameters:

Name Type Description Default
tasks List[cloudy.google.tasks.Task]

a list of Task() objects to submit to the queue.

required
assert_success bool

raises an error if a submission fails (e.g. if a task has already been submitted)

required

Returns:

Type Description
None

None. Submits the request without receiving any response. Responses should be tracked by the Tasks client.

Source code in cloudy/google/tasks.py
def publish_tasks_to_queue(tasks: List[Task], assert_success: bool) -> None:
    """Publish tasks to Google Cloud Tasks more quickly by not waiting for API responses.

    Important note:  if your task body is a dictionary of values, it needs a single json dump to convert it into a
      string, and this function will automatically utf-8 encode the command content.

    Args:
      tasks: a list of Task() objects to submit to the queue.
      assert_success: raises an error if a submission fails (e.g. if a task has already been submitted)

    Returns:
      None. Submits the request without receiving any response. Responses should be tracked by the Tasks client.
    """
    _logger.info("Publishing {} tasks to tasks queue(s)".format(len(tasks)))
    base_command = (
        "gcloud tasks create-http-task {name} --queue={queue} --url={endpoint} --method=post "
        + "--oidc-service-account-email={account}"
    )
    queues_tested = set()

    for idx_task, task in enumerate(tasks):
        command = shlex.split(
            base_command.format(
                name=task.task_path,
                queue=task.queue_path,
                endpoint=task.http_endpoint,
                account=task.google_config.service_account_email,
            )
        )
        command.append("--body-content={body}".format(body=task.task_body))
        command = [arg.encode("utf-8") for arg in command]

        if assert_success or idx_task < 10 or task.queue_path not in queues_tested:
            # Assert success for all if caller requests, otherwise assert success for the first x calls and the first
            # call for each unique queue
            completed = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
            stdout = completed.stdout.decode("utf-8")
            stderr = completed.stderr.decode("utf-8")
            if not re.search("ALREADY_EXISTS", stderr):
                assert "Created task [{}].".format(task.task_path) in stderr, "Result not expected: {}".format(stdout)
                queues_tested.add(task.queue_path)

        else:
            subprocess.Popen(command)
Back to top