Wie setze ich die Parallelitätsgrenze pro Aufgabe (oder Warteschlange) in Sellerie fest?Python

Python-Programme
Anonymous
 Wie setze ich die Parallelitätsgrenze pro Aufgabe (oder Warteschlange) in Sellerie fest?

Post by Anonymous »

Mein Beispielcode ist: < /p>

[*]run.py
< /ul>

Code: Select all

from celery import Celery, Task

app = Celery(
"app",
task_serializer='json',
accept_content=['json'],
result_serializer='json',
timezone='Asia/Tokyo',
enable_utc=True,
backend='redis://127.0.0.1:6369/2',
broker='redis://127.0.0.1:6369/3',
include=['app.tasks']
)

app.conf.task_routes = {'app.tasks.long_run': {'queue': 'long_running_task'}}

class VerboseTask(Task):

def on_failure(self, exc, task_id, args, kwargs, einfo):
print("-------------------------------------------")
print(
f"FAILURE({task_id}):"
f"{self.request.task}{kwargs}, Exception={exc}, ErrorInfo={einfo}"
)
print("-------------------------------------------")

def on_success(self, retval, task_id, args, kwargs):
print("-------------------------------------------")
print(f"SUCCESS({task_id}):{self.request.task}{kwargs}")
print("-------------------------------------------")

def on_retry(self, exc, task_id, args, kwargs, einfo):
print("-------------------------------------------")
print(f"RETRY({task_id}):{self.request.task}{kwargs}")
print("-------------------------------------------")

< /code>

[*]app/tasks.py
< /ul>
from run import app, VerboseTask
from time import sleep

class TSK:

name = 'tsk'

@app.task(bind=True, base=VerboseTask)
def short_run(self, **kwargs):
for i in range(0, 3):
print(f'{self.request.task} [{self.request.id[0:5]}] → {i*5} ~ {(i+1)*5}')
sleep(5)
print('short_run finished')

@app.task(bind=True, base=VerboseTask)
def long_run(self, **kwargs):
for i in range(0, 10):
print(f'{self.request.task} [{self.request.id[0:5]}] → {i*5} ~ {(i+1)*5}')
sleep(5)
print('long_run finished')

< /code>
und Ich führen Sellerie mit < /p>
auscelery -A run worker -l info --concurrency=4 -Q long_running_task,celery
Das Problem ist, wenn ich Long_running_task viermal und dann übliche Sellerie Aufgabe long_running_task Konsumieren Sie alle Arbeiter und Sellerie Aufgabe auf vier Langzeitaufgaben. Wenn beispielsweise aktuelle Aufgaben zwei kurze Aufgaben und eine lang laufende Aufgabe sind, ist nur ein Arbeiter verfügbar und nur eine kurze Aufgabe in der Lage, ausgeführt zu werden. Langzeitaufgabe muss warten, während drei Arbeiter gleichzeitig frei sind, um eine davon zu konsumieren.

Code: Select all

celery -A run worker -l info --concurrency=2 -n worker1@%h -Q long_running_task
celery -A run worker -l info --concurrency=2 -n worker2@%h -Q celery
Aber

Ich möchte auch long_running_task Arbeiter für andere Aufgaben konsumieren, wenn das nicht im Leerlauf ist (das ist nicht so wichtig, und kann nicht ignoriert werden. Befehle.>

Quick Reply

Change Text Case: 
   
  • Similar Topics
    Replies
    Views
    Last post