If you are here, you probably know what the GCP Cloud Tasks are.
Cloud Tasks is a fully managed service that allows you to manage the execution, dispatch, and delivery of a large number of distributed tasks. Using Cloud Tasks, you can perform work asynchronously outside of a user or service-to-service request.
Using Cloud Tasks with GAE Standard 3.7 is simple. You can use tasks_v2beta3.CloudTasksClient()
to add a task to a queue with some payload. This task can make a request to your API endpoint.
To use Cloud Tasks you need:
- Cloud Tasks API client library
- Google Cloud Project name
- Google Cloud Project location
- and of course a queue
This is a simple mixin, which can be used to extend your API handler to easily use Cloud Tasks.
import jsonimport timefrom typing import Dict, Optional, Union
from google.cloud import tasks_v2beta3from google.protobuf import timestamp_pb2
import settings
class TasksMixin: @property def _cloud_tasks_client(self): return tasks_v2beta3.CloudTasksClient()
def send_task( self, url: str, queue_name: str = 'default', http_method: str = 'POST', payload: Optional[Union[str, Dict]] = None, schedule_time: Optional[int] = None, name: Optional[str] = None, ) -> None: parent = self._cloud_tasks_client.queue_path( settings.GOOGLE_CLOUD_PROJECT, # example: my-project settings.GOOGLE_CLOUD_PROJECT_LOCATION, # example: europe-west1 queue_name, )
task = { 'app_engine_http_request': {'http_method': http_method, 'relative_uri': url} }
if name: task['name'] = name
if isinstance(payload, dict): payload = json.dumps(payload)
if payload is not None: converted_payload = payload.encode()
task['app_engine_http_request']['body'] = converted_payload
if schedule_time is not None: now = time.time() + schedule_time seconds = int(now) nanos = int((now - seconds) * 10 ** 9)
# Create Timestamp protobuf. timestamp = timestamp_pb2.Timestamp(seconds=seconds, nanos=nanos)
# Add the timestamp to the tasks. task['schedule_time'] = timestamp
self._cloud_tasks_client.create_task(parent, task)
Params:
Param | Description |
---|---|
url | A URL to your API endpoint, example: /v1/users |
queue_name | Queue name |
http_method | HTTP method, POST, GET, etc. |
payload | payload body of a HTTP Request, example: {test: 123} |
schedule_time | Number of seconds to delay task execution |
name | Task name |
How to use it?
from mixins import TasksMixin
class MyHandler(TasksMixin): def on_get(self, req, resp): self.send_task( '/v1/foo/bar', queue_name='my-queue', schedule_time=60, payload={'product_id': 123, 'user_id': 321} )
Remember that the task name must be unique for some period of time. According to documentation:
When you create a new task, App Engine assigns the task a unique name by default. However, you can assign your own name to a task by using the
name
parameter. An advantage of assigning your own task names is that named tasks are de-duplicated, which means you can use task names to guarantee that a task is only added once. De-duplication continues for 9 days after the task is completed or deleted.
You can catch AlreadyExists
exception to check if the task has unique name.
from google.api_core.exceptions import AlreadyExists
from mixins import TasksMixin
class MyHandler(TasksMixin): def on_get(self, req, resp): try: self.send_task('/v1/foo/bar') except AlreadyExists: logging.debug('Not unique!')