Google Cloud Tasks¶
Cloud Tasks manages the execution, dispatch and delivery of a large number of distributed tasks asynchronously. These tasks can be executed via Cloud Run, App Engine or any HTTP endpoint.
Google Tasks is an alternative to PubSub for submitting parallel workflows with many subtasks. Tasks are published to queues and distributed in either a push or pull fashion.
This module allows you to publish tasks one-by-one via the Google Python API, or in bulk using the Google CLI. The difference here is the efficiency involved in submitting jobs. There is a Task
object here to handle the numerous pieces of information necessary to publish tasks.
from cloudy.google import tasks
See Google's Cloud Tasks Documentation.
Task
dataclass
¶
A Google Cloud Task to submit to the queue for processing.
Attributes:
Name | Type | Description |
---|---|---|
task_body |
str |
the body of the task message to submit. Typically a dictionary that's been encoded to a string. |
task_name |
str |
a unique identifier to attach to this task for queueing and tracking. |
queue |
str |
the name of the queue to which the task will be published. |
http_endpoint |
str |
often a reference to a Cloud Run URL, but could be any HTTP endpoint. |
google_config |
configs.GoogleCloudConfig |
a |
Source code in cloudy/google/tasks.py
class Task(object):
"""A Google Cloud Task to submit to the queue for processing.
Attributes:
task_body (str): the body of the task message to submit. Typically a dictionary that's been encoded to a string.
task_name (str): a unique identifier to attach to this task for queueing and tracking.
queue (str): the name of the queue to which the task will be published.
http_endpoint (str): often a reference to a Cloud Run URL, but could be any HTTP endpoint.
google_config (configs.GoogleCloudConfig): a `configs.GoogleCloudConfig` object with project, region, and account information.
"""
task_body: str
task_name: str
queue: str
http_endpoint: str
google_config: configs.GoogleCloudConfig
def __post_init__(self) -> None:
_initialize_client()
assert len(self.task_body.encode()) < MAX_BYTES, "Tasks must be less than 100KB"
self.queue_path = TASKS_CLIENT.queue_path(
project=self.google_config.project_id, location=configs.DEFAULT_CLOUD_TASKS_REGION, queue=self.queue
)
self.task_path = TASKS_CLIENT.task_path(
project=self.google_config.project_id,
location=configs.DEFAULT_CLOUD_TASKS_REGION,
queue=self.queue,
task=self.task_name,
)
publish_task_to_queue(task)
¶
Publish a task to a Google Cloud Tasks queue.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
task |
Task |
the Task() object to submit to the queue. |
required |
Returns:
Type | Description |
---|---|
None |
None. Submits the request without receiving any response. Responses should be tracked by the Tasks client. |
Source code in cloudy/google/tasks.py
def publish_task_to_queue(task: Task) -> None:
"""Publish a task to a Google Cloud Tasks queue.
Args:
task: the Task() object to submit to the queue.
Returns:
None. Submits the request without receiving any response. Responses should be tracked by the Tasks client.
"""
_logger.debug("Publishing task to Tasks queue {}".format(task.queue))
task_request = {
"name": task.task_path,
"http_request": {
"http_method": "POST",
"url": task.http_endpoint,
"oidc_token": {"service_account_email": task.google_config.service_account_email},
"body": task.task_body.encode(),
},
}
try:
TASKS_CLIENT.create_task(parent=task.queue_path, task=task_request) # Returns Task response object
except google_exceptions.AlreadyExists:
pass
publish_tasks_to_queue(tasks, assert_success)
¶
Publish tasks to Google Cloud Tasks more quickly by not waiting for API responses.
Important note: if your task body is a dictionary of values, it needs a single json dump to convert it into a string, and this function will automatically utf-8 encode the command content.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
tasks |
List[cloudy.google.tasks.Task] |
a list of Task() objects to submit to the queue. |
required |
assert_success |
bool |
raises an error if a submission fails (e.g. if a task has already been submitted) |
required |
Returns:
Type | Description |
---|---|
None |
None. Submits the request without receiving any response. Responses should be tracked by the Tasks client. |
Source code in cloudy/google/tasks.py
def publish_tasks_to_queue(tasks: List[Task], assert_success: bool) -> None:
"""Publish tasks to Google Cloud Tasks more quickly by not waiting for API responses.
Important note: if your task body is a dictionary of values, it needs a single json dump to convert it into a
string, and this function will automatically utf-8 encode the command content.
Args:
tasks: a list of Task() objects to submit to the queue.
assert_success: raises an error if a submission fails (e.g. if a task has already been submitted)
Returns:
None. Submits the request without receiving any response. Responses should be tracked by the Tasks client.
"""
_logger.info("Publishing {} tasks to tasks queue(s)".format(len(tasks)))
base_command = (
"gcloud tasks create-http-task {name} --queue={queue} --url={endpoint} --method=post "
+ "--oidc-service-account-email={account}"
)
queues_tested = set()
for idx_task, task in enumerate(tasks):
command = shlex.split(
base_command.format(
name=task.task_path,
queue=task.queue_path,
endpoint=task.http_endpoint,
account=task.google_config.service_account_email,
)
)
command.append("--body-content={body}".format(body=task.task_body))
command = [arg.encode("utf-8") for arg in command]
if assert_success or idx_task < 10 or task.queue_path not in queues_tested:
# Assert success for all if caller requests, otherwise assert success for the first x calls and the first
# call for each unique queue
completed = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout = completed.stdout.decode("utf-8")
stderr = completed.stderr.decode("utf-8")
if not re.search("ALREADY_EXISTS", stderr):
assert "Created task [{}].".format(task.task_path) in stderr, "Result not expected: {}".format(stdout)
queues_tested.add(task.queue_path)
else:
subprocess.Popen(command)