Hier ist meine , wobei ich die Funktionen in Tasks läuft.
Code: Select all
verify_mail.delay([email protected])
Code: Select all
from celery import shared_task
from time import sleep
from django.shortcuts import render, redirect, HttpResponse
import boto3
@shared_task
def verify_mail(new_email):
ses = boto3.client('ses')
response = ses.verify_email_identity(
EmailAddress = new_email
)
return None
< /code>
Wenn ich so ausführe: verify_mail ([email protected]), gibt es kein Problem. Aber wenn ich mit Dely () laufe, funktioniert es nicht.celery -A myProject worker -l INFO --without-gossip --without-mingle --without-heartbeat -Ofair --pool=solo
Code: Select all
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myProject.settings')
app = Celery('myProject')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
@app.task(bind=True, ignore_result=True)
def debug_task(self):
print(f'Request: {self.request!r}')
Code: Select all
CELERY_BROKER_URL = "sqs://{aws_access_key}:{aws_secret_key}@".format(
aws_access_key=AWS_ACCESS_KEY_ID, aws_secret_key=AWS_SECRET_ACCESS_KEY
)
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TASK_SERIALIZER = 'json'
CELERY_BROKER_TRANSPORT_OPTIONS = {
'region': 'eu-central-1',
}
CELERY_RESULT_BACKEND = None
CELERY_ENABLE_REMOTE_CONTROL = False
CELERY_SEND_EVENTS = False