Google PubSub¶
PubSub is a service for publishing messages (via API requests) and allowing other Google services to subscribe to those messages (servers that receive a request and return a response).
It was the first service we used for massively parallel workflows, but is no longer used in active projects due to various runtime restrictions and functionality shortcomings.
from cloudy.google import pubsub
See Google's PubSub Documentation.
publish_message(pubsub_topic, google_config, data=None, attributes=None)
¶
Publish a message to Google Cloud PubSub.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pubsub_topic |
str |
topic to which a request message is published. |
required |
google_config |
GoogleCloudConfig |
a |
required |
data |
Optional[str] |
data to publish. can be much larger than individual attributes. will be encoded before publication. |
None |
attributes |
Optional[Dict[str, str]] |
dictionary of attributes, each which much be relatively small compared to the data. |
None |
Returns:
Type | Description |
---|---|
None |
None. Submits a PubSub request. |
Source code in cloudy/google/pubsub.py
def publish_message(
pubsub_topic: str,
google_config: configs.GoogleCloudConfig,
data: Optional[str] = None,
attributes: Optional[Dict[str, str]] = None,
) -> None:
"""Publish a message to Google Cloud PubSub.
Args:
pubsub_topic: topic to which a request message is published.
google_config: a `configs.GoogleCloudConfig` object with project, region, and account information.
data: data to publish. can be much larger than individual attributes. will be encoded before publication.
attributes: dictionary of attributes, each which much be relatively small compared to the data.
Returns:
None. Submits a PubSub request.
"""
assert data or attributes, "Data and/or attributes are required for publication"
_logger.info("Publishing message to PubSub topic {}".format(pubsub_topic))
_initialize_client()
topic_path = PUBLISHER_CLIENT.topic_path(google_config.project_id, pubsub_topic)
attributes = attributes or dict()
PUBLISHER_CLIENT.publish(topic_path, data="".encode("utf-8"), **attributes)