from celery import Celery, Task
app = Celery(
"app",
task_serializer='json',
accept_content=['json'],
result_serializer='json',
timezone='Asia/Tokyo',
enable_utc=True,
backend='redis://127.0.0.1:6369/2',
broker='redis://127.0.0.1:6369/3',
include=['app.tasks']
)
app.conf.task_routes = {'app.tasks.long_run': {'queue': 'long_running_task'}}
class VerboseTask(Task):
def on_failure(self, exc, task_id, args, kwargs, einfo):
print("-------------------------------------------")
print(
f"FAILURE({task_id}):"
f"{self.request.task}{kwargs}, Exception={exc}, ErrorInfo={einfo}"
)
print("-------------------------------------------")
def on_success(self, retval, task_id, args, kwargs):
print("-------------------------------------------")
print(f"SUCCESS({task_id}):{self.request.task}{kwargs}")
print("-------------------------------------------")
def on_retry(self, exc, task_id, args, kwargs, einfo):
print("-------------------------------------------")
print(f"RETRY({task_id}):{self.request.task}{kwargs}")
print("-------------------------------------------")
< /code>
[*]app/tasks.py
< /ul>
from run import app, VerboseTask
from time import sleep
class TSK:
name = 'tsk'
@app.task(bind=True, base=VerboseTask)
def short_run(self, **kwargs):
for i in range(0, 3):
print(f'{self.request.task} [{self.request.id[0:5]}] → {i*5} ~ {(i+1)*5}')
sleep(5)
print('short_run finished')
@app.task(bind=True, base=VerboseTask)
def long_run(self, **kwargs):
for i in range(0, 10):
print(f'{self.request.task} [{self.request.id[0:5]}] → {i*5} ~ {(i+1)*5}')
sleep(5)
print('long_run finished')
< /code>
und Ich führen Sellerie mit < /p>
auscelery -A run worker -l info --concurrency=4 -Q long_running_task,celery
Das Problem ist, wenn ich Long_running_task viermal und dann übliche Sellerie Aufgabe long_running_task Konsumieren Sie alle Arbeiter und Sellerie Aufgabe auf vier Langzeitaufgaben. Wenn beispielsweise aktuelle Aufgaben zwei kurze Aufgaben und eine lang laufende Aufgabe sind, ist nur ein Arbeiter verfügbar und nur eine kurze Aufgabe in der Lage, ausgeführt zu werden. Langzeitaufgabe muss warten, während drei Arbeiter gleichzeitig frei sind, um eine davon zu konsumieren.
celery -A run worker -l info --concurrency=2 -n worker1@%h -Q long_running_task
celery -A run worker -l info --concurrency=2 -n worker2@%h -Q celery
Aber
Ich möchte auch long_running_task Arbeiter für andere Aufgaben konsumieren, wenn das nicht im Leerlauf ist (das ist nicht so wichtig, und kann nicht ignoriert werden. Befehle.>
[*]app/tasks.py < /ul> from run import app, VerboseTask from time import sleep
class TSK:
name = 'tsk'
@app.task(bind=True, base=VerboseTask) def short_run(self, **kwargs): for i in range(0, 3): print(f'{self.request.task} [{self.request.id[0:5]}] → {i*5} ~ {(i+1)*5}') sleep(5) print('short_run finished')
@app.task(bind=True, base=VerboseTask) def long_run(self, **kwargs): for i in range(0, 10): print(f'{self.request.task} [{self.request.id[0:5]}] → {i*5} ~ {(i+1)*5}') sleep(5) print('long_run finished')
< /code> und Ich führen Sellerie mit < /p> auscelery -A run worker -l info --concurrency=4 -Q long_running_task,celery [/code] Das Problem ist, wenn ich Long_running_task viermal und dann übliche Sellerie Aufgabe long_running_task Konsumieren Sie alle Arbeiter und Sellerie Aufgabe auf vier Langzeitaufgaben. Wenn beispielsweise aktuelle Aufgaben zwei kurze Aufgaben und eine lang laufende Aufgabe sind, ist nur ein Arbeiter verfügbar und nur eine kurze Aufgabe in der Lage, ausgeführt zu werden. Langzeitaufgabe muss warten, während drei Arbeiter gleichzeitig frei sind, um eine davon zu konsumieren.[code]celery -A run worker -l info --concurrency=2 -n worker1@%h -Q long_running_task celery -A run worker -l info --concurrency=2 -n worker2@%h -Q celery [/code] Aber
[url=viewtopic.php?t=14917]Ich möchte[/url] auch long_running_task Arbeiter für andere Aufgaben konsumieren, wenn das nicht im Leerlauf ist (das ist nicht so wichtig, und kann nicht ignoriert werden. Befehle.>
Ich möchte zwei Sellerie-Warteschlangen erstellen (für verschiedene Arten von Aufgaben)
Meine Sellerie-Konfiguration. Ich erwarte, dass diese Konfiguration zwei Warteschlangen „celery“ und „celery:1“...
Ich arbeite an einer Flask -Anwendung, die Sellerie zur Verarbeitung von Hintergrundaufgaben verwendet. Ich stoße auf Probleme mit kreisförmigen Importen und Aufgaben, die keine Fehler gefunden...
Ich führe einen Supervisor-Worker in einem Container als unabhängige Aufgabe in aws aus. Ich habe das Flag --stop-when-empty angegeben, aber die Aufgabe läuft weiter, auch wenn sich keine Nachrichten...