Google App Engine Standard, Python 3.7 and Cloud Tasks

If you are here, you probably know what the GCP Cloud Tasks are.

Cloud Tasks is a fully managed service that allows you to manage the execution, dispatch, and delivery of a large number of distributed tasks. Using Cloud Tasks, you can perform work asynchronously outside of a user or service-to-service request.

Using Cloud Tasks with GAE Standard 3.7 is simple. You can use tasks_v2beta3.CloudTasksClient() to add a task to a queue with some payload. This task can make a request to your API endpoint.

To use Cloud Tasks you need:

This is a simple mixin, which can be used to extend your API handler to easily use Cloud Tasks.

import json
import time
from typing import Dict, Optional, Union
from google.cloud import tasks_v2beta3
from google.protobuf import timestamp_pb2
import settings
class TasksMixin:
@property
def _cloud_tasks_client(self):
return tasks_v2beta3.CloudTasksClient()
def send_task(
self,
url: str,
queue_name: str = 'default',
http_method: str = 'POST',
payload: Optional[Union[str, Dict]] = None,
schedule_time: Optional[int] = None,
name: Optional[str] = None,
) -> None:
parent = self._cloud_tasks_client.queue_path(
settings.GOOGLE_CLOUD_PROJECT, # example: my-project
settings.GOOGLE_CLOUD_PROJECT_LOCATION, # example: europe-west1
queue_name,
)
task = {
'app_engine_http_request': {'http_method': http_method, 'relative_uri': url}
}
if name:
task['name'] = name
if isinstance(payload, dict):
payload = json.dumps(payload)
if payload is not None:
converted_payload = payload.encode()
task['app_engine_http_request']['body'] = converted_payload
if schedule_time is not None:
now = time.time() + schedule_time
seconds = int(now)
nanos = int((now - seconds) * 10 ** 9)
# Create Timestamp protobuf.
timestamp = timestamp_pb2.Timestamp(seconds=seconds, nanos=nanos)
# Add the timestamp to the tasks.
task['schedule_time'] = timestamp
self._cloud_tasks_client.create_task(parent, task)

Params:

ParamDescription
urlA URL to your API endpoint, example: /v1/users
queue_nameQueue name
http_methodHTTP method, POST, GET, etc.
payloadpayload body of a HTTP Request, example: {test: 123}
schedule_timeNumber of seconds to delay task execution
nameTask name

How to use it?

from mixins import TasksMixin
class MyHandler(TasksMixin):
def on_get(self, req, resp):
self.send_task(
'/v1/foo/bar',
queue_name='my-queue',
schedule_time=60,
payload={'product_id': 123, 'user_id': 321}
)

Remember that the task name must be unique for some period of time. According to documentation:

When you create a new task, App Engine assigns the task a unique name by default. However, you can assign your own name to a task by using the name parameter. An advantage of assigning your own task names is that named tasks are de-duplicated, which means you can use task names to guarantee that a task is only added once. De-duplication continues for 9 days after the task is completed or deleted.

You can catch AlreadyExists exception to check if the task has unique name.

from google.api_core.exceptions import AlreadyExists
from mixins import TasksMixin
class MyHandler(TasksMixin):
def on_get(self, req, resp):
try:
self.send_task('/v1/foo/bar')
except AlreadyExists:
logging.debug('Not unique!')