December 7th, 2019

Google Cloud Tasks and Django Rest Framework (application/octet-stream)

Some time ago I described how to use Google Cloud Tasks on Google App Engine Standard with Python 3.7.

The question is how to use this solution with the Django Rest Framework?

According to the Cloud Tasks documentation, by default, the Content-Type header is set to application/octet-stream.

Content-Type: By default, the Content-Type header is set to "application/octet-stream". The default can be overridden by explicitly setting Content-Type to a particular media type when the task is created. For example, Content-Type can be set to "application/json".

If you want to keep the application/octet-stream, you have to create a parser.

import json
from rest_framework.parsers import BaseParser
class OctetStreamParser(BaseParser):
"""
Octet stream parser.
"""
charset = None
format = None
media_type = "application/octet-stream"
render_style = "binary"
def parse(self, stream, media_type=None, parser_context=None):
return json.loads(stream.read().decode())

You can send a task to a queue as usual, for example:

self.send_task(
f"/images/download",
payload={
"image_url": image,
},
queue_name="download",
schedule_time=60,
)

The task will send the data to your API endpoint as application/octet-stream so you need to apply the parser, example:

from rest_framework import decorators
from rest_framework.decorators import parser_classes
from rest_framework.response import Response
from core.parsers.raw import OctetStreamParser
@decorators.api_view(["POST"])
@parser_classes((OctetStreamParser,))
def download_images(request, format=None):
payload = request.data # you can access the payload data
return Response({"status": "OK"})

Do you want to support me?

© 2019 Przemysław Kołodziejczyk