/ programming

Google App Engine - prevent of tasks duplication in the queues

This is a second part of how we solved issues with huge number of queues on Google App Engine. I recommend to start from first part - Google App Engine - Get the least loaded queue

Let's start again with this nice screenshot ;-)

Google App Engine queues

The first value is a creation time of the oldest task and the second value is a number of the tasks in a single queue. Reason of that? At first, a lot of tasks, really, we run big operations, some of them can be repeated because of some errors, and of course.. a lot of legacy code.

Because first fix described here didn't get us satisfying results we needed to think about another solution. Because we had a lot of tasks and as i investigated that those tasks can be repeated a lot of times i wanted to prevent that situation. Example: user can add/edit/delete/copy some item, _post/_put methods for this item runs N tasks, if user edits this item several times the number of the tasks can grow. Combining this with a large number of users, large number of tasks run by actions made by those users we got huge number of the tasks.

What is the solution? I've used two things from queues:

First, the method.

import collections
import datetime
import hashlib
import math
import re

from google.appengine.api import taskqueue
from google.appengine.ext import deferred


def unique_task_defer(obj, *args, **kwargs):

    def normalize_name(obj_name, name_from_args, minutes=None):
        date = datetime.datetime.now()
        if minutes:
            date += datetime.timedelta(minutes=minutes)

        name = '{}-{}-{}-{}'.format(
            obj_name,
            hashlib.sha1(name_from_args.encode('utf-8')).hexdigest(),
            date.strftime('%Y%m%d%H'),
            int(math.ceil(float(int(date.strftime('%M')) + 1) / 15))
        )

        name = name.encode('utf-8', 'replace')
        name = name.replace('_', '-')
        name = re.sub('[^\w-]', '-', name)

        return name

    kwargs_values = [
        str(value) for key, value in collections.OrderedDict(
            sorted(kwargs.items())
        ).iteritems() if not str(key).startswith('_')
    ]

    name_from_args = '-'.join(map(unicode, args + tuple(kwargs_values)))
    name = normalize_name(obj.__name__, name_from_args)

    kwargs['_name'] = name

    try:
        deferred.defer(obj, *args, **kwargs)
    except taskqueue.TaskAlreadyExistsError:
        logging.info(
            'Skipping task (already exists): {}, {}'.format(
                obj.__name__, name
            )
        )
    except taskqueue.TombstonedTaskError:
        name = normalize_name(obj.__name__, name_from_args, minutes=15)

        logging.info(
            'TombstonedTask (retry): {}, {}'.format(
                obj.__name__, name
            )
        )

        kwargs['_name'] = name
        kwargs['_countdown'] = 900

        try:
            deferred.defer(obj, *args, **kwargs)
        except taskqueue.TaskAlreadyExistsError:
            logging.info(
                'Skipping task (already exists, second try): {}, {}'.format(
                    obj.__name__, name
                )
            )

Some explanation how it works. Look at this.

unique_task_defer(
    task,
    parameter1,
    parameter2,
    _queue=get_least_loaded_queue(SOME_QUEUES_LIST),
)

This is how we use our method. Method get_least_loaded_queue is described in separate post, you can read about it here. If you want you can just remove _queue parameter or put some queue name.

At first we need to create a unique name for given task taking into account his parameters. It is made by normalize_name method. If the method gets the same task with the same parameters then method returns the same string every time. Queue name is composed from four parts:

  • task name
  • combined string from all parameters
  • date in format %Y%m%d%H
  • number of quarter of an hour, it could have four values:
    • 1 if it is a first quarter, example range: 01:00 - 01:14
    • 2 if it is a second quarter, example range: 01:15 - 01:29
    • 3 if it is a third quarter, example range: 01:30 - 01:45
    • 4 if it is a fourth quarter, example range: 01:46 - 01:59

If normalized_name will get minutes argument then those number of minutes will be added to third part of queue name - date. I will explain it later.

Let's run this task. We are using unique_task_defer method with some parameters. This method creates a unique queue name as described above and push it into some queue. This operation could raise two exceptions which we are interested in.

  • TaskAlreadyExistsError - it will be raised when task with given name will exist in a given queue
  • TombstonedTaskError - it will be raised when task with given name was executed in a given queue, it doesn't exist because it was finished

First, our method sets task name and wants to push task to the queue using deferred.defer method. If the task has never existed then it is pushed normally to the queue. Without magic.

If deferred.defer method raises TaskAlreadyExistsError then we don't want to run this task again. It exists in the queue, it is not finished and every new task will do the same operation so it is unnecessary.

If deferred.defer method raises TombstonedTaskError then it means that someone ran this task some time ago with the same parameters. We want to run this task once again because something could change but in this time we will delay time execution for 900 seconds (15 minutes) . Why? Answer is simple, to prevent of huge number of tasks in queue which are executed and to get better task management. We are using this method in cases when some delay is allowed.

Simple example.

  1. We run the task first time. It runs without problems and it is added to the queue.
  2. We run this task once again. It exists in the queue so deffered.defer raises TaskAlreadyExistsError and the task is killed and is not added to the queue once again.
  3. Our task is finished.
  4. We run this task once again. deffered.defer method raises TombstonedTaskError. We want to run this task again but with 15 minutes’ delay. Our method changes task name by changing fourth parameter in queue name - number of quarter of an hour. It gives us possibility to push task to the queue once again because it has different name. Our method also sets countdown parameter to the task which means that this task will wait in queue minimum 15 minutes and then it will be executed. In this time other tasks can be finished.
  5. When deffered.defer raises TombstonedTaskError we are also checking once again if this task exists in the queue. It is the same operation as in point 2.

Maybe it doesn't look like a great solution but it really helped us to resolve all issues with queues quickly. At the moment the oldest task in the queue has maximum 15 - 20 minutes but most of the time our queues look like this which is is the best proof that it works. ;-)

Google App Engine queues

This was achieved by two steps:

  1. Described in the post: Google App Engine - Get least loaded queue
  2. Described in this post.