PROGRAMMING

Google App Engine - Get the least loaded queue

#python , #google app engine

Some time ago we had a problem with a huge number of tasks in the queues. Look at this ;-)

Google App Engine queues
Google App Engine queues

The first value is a creation time of the oldest task and the second value is a number of the tasks in a single queue. Reason of that? At first, a lot of tasks, really, we run big operations, some of them can be repeated because of some errors, and of course.. a lot of legacy code.

We wanted to fix that problem as quick as possible. The first try was to divide tasks for several queues. I’ve created a method which pushed a task to least loaded queue. It checks which queue has the smallest number of tasks and push the task into this queue. It is very simple. First, I’ve added several queues to queue.yaml, for example:

- name: analytics-0
  rate: 200/s
  bucket_size: 100
- name: analytics-1
  rate: 200/s
  bucket_size: 100
- name: analytics-2
  rate: 200/s
  bucket_size: 100
- name: analytics-3
  rate: 200/s
  bucket_size: 100
- name: analytics-4
  rate: 200/s
  bucket_size: 100
- name: analytics-5
  rate: 200/s
  bucket_size: 100
- name: analytics-6
  rate: 200/s
  bucket_size: 100
- name: analytics-7
  rate: 200/s
  bucket_size: 100
- name: analytics-8
  rate: 200/s
  bucket_size: 100
- name: analytics-9
  rate: 200/s

Here is a method.

import random

from google.appengine.api.taskqueue import QueueStatistics
from google.appengine.runtime import DeadlineExceededError


def get_least_loaded_queues_list(queue_names, full_list=False):
    """
    Example of queue_names parameter:
    ANALYTICS_QUEUES = [
        'analytics-{}'.format(number) for number in range(0, 10)
    ]
    full_list=False - return only most empty queue
    full_list=True - return all queues with number of tasks
    """

    queues_list = {}

    try:
        queue_stats = QueueStatistics.fetch(queue_names)
    except DeadlineExceededError:
        logging.info('Cannot fetch queue statistics. Random queue.')
        return random.choice(queue_names)

    for queue in queue_stats:
        queues_list[queue.queue.name] = queue.tasks

    if not queues_list:
        queues_list = [('default', 1), ]  # It is safe to always have a queue
    else:
        queues_list = sorted(
            queues_list.iteritems(), key=operator.itemgetter(1)
        )

    return queues_list if full_list and len(queues_list) else queues_list[0][0]

This method requires one parameter - queue_names, which should be a list of queue names and optional full_list parameter which defines how the queues should be returned. If full_list is set to True then method will return a list with all passed queues from the least to the most loaded. If full_list will be set to False then the method will return only least loaded queue. This method gets statistics for all queues passed to the method and return a list with sorted names queues by their number of the tasks (or string with queue name if full_list is set to False). As you can see we catches one exception - DeadlineExceededError. Sometimes Google API can’t get us queue statistic because of this exception, then i don’t want to ask again. The method returns random queue - i think that it is faster.

How did this help us?

We had a smaller number of tasks in every queue and all the tasks were executed faster. But it wasn’t enough for us because the queues still have too much tasks and sometimes we needed to wait several hours or even one day for some tasks (but not two months! ;-)). We decided to manage queue by checking queues names. I described it in a separate post. You can read about it here