Some time ago I described how to use Google Cloud Tasks on Google App Engine Standard with Python 3.7.
The question is how to use this solution with the Django Rest Framework?
According to the Cloud Tasks documentation, by default, the Content-Type
header is set to application/octet-stream
.
Content-Type: By default, the Content-Type header is set to “application/octet-stream”. The default can be overridden by explicitly setting Content-Type to a particular media type when the task is created. For example, Content-Type can be set to “application/json”.
If you want to keep the application/octet-stream
, you have to create a parser.
import json
from rest_framework.parsers import BaseParser
class OctetStreamParser(BaseParser): """ Octet stream parser. """
charset = None format = None media_type = "application/octet-stream" render_style = "binary"
def parse(self, stream, media_type=None, parser_context=None): return json.loads(stream.read().decode())
You can send a task to a queue as usual, for example:
self.send_task( f"/images/download", payload={ "image_url": image, }, queue_name="download", schedule_time=60,)
The task will send the data to your API endpoint as application/octet-stream
so you need to apply the parser, example:
from rest_framework import decoratorsfrom rest_framework.decorators import parser_classesfrom rest_framework.response import Response
from core.parsers.raw import OctetStreamParser
@decorators.api_view(["POST"])@parser_classes((OctetStreamParser,))def download_images(request, format=None): payload = request.data # you can access the payload data return Response({"status": "OK"})