Google App Engine Standard, Python 3.7 and Cloud Tasks

May 27, 2019 - IT

If you are here, you probably know what the GCP Cloud Tasks are.

Cloud Tasks is a fully managed service that allows you to manage the execution, dispatch, and delivery of a large number of distributed tasks. Using Cloud Tasks, you can perform work asynchronously outside of a user or service-to-service request.

Using Cloud Tasks with GAE Standard 3.7 is simple. You can use tasks_v2beta3.CloudTasksClient() to add a task to a queue with some payload. This task can make a request to your API endpoint.

To use Cloud Tasks you need:

This is a simple mixin, which can be used to extend your API handler to easily use Cloud Tasks.

import json
import time
from typing import Dict, Optional, Union

from google.cloud import tasks_v2beta3
from google.protobuf import timestamp_pb2

import settings


class TasksMixin:
    @property
    def _cloud_tasks_client(self):
        return tasks_v2beta3.CloudTasksClient()

    def send_task(
        self,
        url: str,
        queue_name: str = 'default',
        http_method: str = 'POST',
        payload: Optional[Union[str, Dict]] = None,
        schedule_time: Optional[int] = None,
        name: Optional[str] = None,
    ) -> None:
        parent = self._cloud_tasks_client.queue_path(
            settings.GOOGLE_CLOUD_PROJECT,  # example: my-project
            settings.GOOGLE_CLOUD_PROJECT_LOCATION,  # example: europe-west1
            queue_name,
        )

        task = {
            'app_engine_http_request': {'http_method': http_method, 'relative_uri': url}
        }

        if name:
            task['name'] = name

        if isinstance(payload, dict):
            payload = json.dumps(payload)

        if payload is not None:
            converted_payload = payload.encode()

            task['app_engine_http_request']['body'] = converted_payload

        if schedule_time is not None:
            now = time.time() + schedule_time
            seconds = int(now)
            nanos = int((now - seconds) * 10 ** 9)

            # Create Timestamp protobuf.
            timestamp = timestamp_pb2.Timestamp(seconds=seconds, nanos=nanos)

            # Add the timestamp to the tasks.
            task['schedule_time'] = timestamp

        self._cloud_tasks_client.create_task(parent, task)

Params:

Param Description
url A URL to your API endpoint, example: /v1/users
queue_name Queue name
http_method HTTP method, POST, GET, etc.
payload payload body of a HTTP Request, example: {test: 123}
schedule_time Number of seconds to delay task execution
name Task name

How to use it?

from mixins import TasksMixin


class MyHandler(TasksMixin):
    def on_get(self, req, resp):
        self.send_task(
            '/v1/foo/bar',
            queue_name='my-queue',
            schedule_time=60,
            payload={'product_id': 123, 'user_id': 321}
        )

Remember that the task name must be unique for some period of time. According to documentation:

When you create a new task, App Engine assigns the task a unique name by default. However, you can assign your own name to a task by using the name parameter. An advantage of assigning your own task names is that named tasks are de-duplicated, which means you can use task names to guarantee that a task is only added once. De-duplication continues for 9 days after the task is completed or deleted.

You can catch AlreadyExists exception to check if the task has unique name.

from google.api_core.exceptions import AlreadyExists

from mixins import TasksMixin


class MyHandler(TasksMixin):
    def on_get(self, req, resp):
        try:
            self.send_task('/v1/foo/bar')
        except AlreadyExists:
            logging.debug('Not unique!')