Fastapi fallen (oder nicht empfangen?) Anfragen, wenn nur mehrere Arbeitnehmer verwendet werdenPython

Python-Programme
Anonymous
 Fastapi fallen (oder nicht empfangen?) Anfragen, wenn nur mehrere Arbeitnehmer verwendet werden

Post by Anonymous »

im Ausführen einer Fastapi -App mit mehreren Mitarbeitern. Die App verfügt über einen einzelnen Endpunkt, der eine kleine Menge Text benötigt und einen Sentcetransformer verwendet, um ihn einzubetten. Wenn ich diese App auf einem Arbeiter ausführe, funktioniert alles großartig. Es kann ungefähr 25 Anfragen gleichzeitig bearbeiten, ohne dass eine einzige Anfrage länger als eine Sekunde dauert. Cool. Ich möchte diese Zahl jedoch erhöhen, also starte ich den Server mit mehr Mitarbeitern. Wenn dies jedoch manchmal Anfragen erledigt, erhalte ich während des Tests eine Zeitüberschreitung (meine Zeitüberschreitung beträgt 10 Sekunden) (alle Anfragen ohne Lesezeitout wurden in weniger als einer Sekunde erfüllt). Ich dachte, dies lag daran, dass Fastapi aus irgendeinem Grund nur länger als gewöhnlich dauerte, um die Aufgaben zu erledigen Beginn des Endpunkts nie gedruckt). Auch nachdem eine Sekunde vergangen ist und die anderen über 25 Anfragen weniger als eine Sekunde abgeschlossen waren 4 Core i7 CPU. p> unten ist a reproduzierbar main.py . Sie können es mit Fastapi Run -Workers main.py .
Bitte beachten asynchron (indem Sie das Adder_Thread loswerden und die Warteschlange () in Def -Embo ()) < /p>
direkt aufrufen.

Code: Select all

import asyncio
import os
import time
from concurrent.futures import Future
from contextlib import asynccontextmanager
from queue import Queue
from threading import Thread

from fastapi import FastAPI, Query
from numpy import ndarray
from pydantic import BaseModel, ConfigDict
from sentence_transformers import SentenceTransformer

class TextEmbedQueueItem(BaseModel):
"""queue item for dense embedder"""

model_config = ConfigDict(arbitrary_types_allowed=True)

text: str
future: Future

def thread_queue_conumser(
model: SentenceTransformer,
queue: Queue[TextEmbedQueueItem | None],
):
while True:
item = queue.get()
if item is None:
break
if item.future.cancelled():
continue
result = model.encode([item.text])
if not item.future.cancelled():
try:
item.future.set_result(result)
except:
pass

class TextEmbedder:
"""
Dense Model Controler
"""

def __init__(self):
self.local_model = SentenceTransformer(
"nomic-ai/nomic-embed-text-v1",
trust_remote_code=True,
)

self.task_runner_name = "TextEmbedder"
self.queue = Queue()
self.thread = Thread(
target=thread_queue_conumser,
args=(self.local_model, self.queue),
daemon=True,
)
self._loop = asyncio.new_event_loop()
self.thread.start()

self.adder_thread = Thread(
target=self.run_add_to_queue,
daemon=True,
)
self.adder_thread.start()

def run_add_to_queue(self):
asyncio.set_event_loop(self._loop)
self._loop.run_forever()
self._loop.close()

def stop(self):
print("stopping!")
while not self.queue.empty():
try:
self.queue.get(timeout=1)
except:
break
print("adding none!")

self.queue.put(None)
self.thread.join()
print("stop done!")

def _add_embed(self, text: str, future: Future[ndarray]):
self.queue.put(TextEmbedQueueItem(text=text, future=future))

def embed(self, text: str):
"""embed text.  if returns None, then model is not ready for x seconds!"""
future: Future[ndarray] = Future()
self._loop.call_soon_threadsafe(self._add_embed, text, future)
return future

dense_embedder = TextEmbedder()

def teardown():
dense_embedder.stop()

@asynccontextmanager
async def lifespan(fast_app: FastAPI):
"""startup and shutdown tasks for the app"""
# start up
# =======
yield
# everything after here runs after shutdown
# end all threads and loops
teardown()

app = FastAPI(lifespan=lifespan)

@app.post("/search")
async def search(
text: str = Query(...),
elastic_index: str = Query(...),
page: int = Query(1, ge=1),
force_embed: bool = Query(False),
num: int = Query(...),
):
print("here", str(num))
st = time.time()
text_embedding_task: asyncio.Future[ndarray] | None = None

text_embedding_task = asyncio.wrap_future(dense_embedder.embed(text))

print("queued up task", str(num))

text_embeding = await text_embedding_task
print("returning", str(num))

return {
"time": round(time.time() - st, 2),
}
< /code>
und unten befindet sich die Datei, mit der ich sie testet (es dauert möglicherweise einige Läufe, um einige zum Auszeit zu bringen): < /p>
import asyncio
from typing import List

from httpx import AsyncClient, Limits, Response

async def main():
limits = Limits(max_keepalive_connections=100, max_connections=100)

async with AsyncClient(
base_url="http://localhost:8000", limits=limits
) as client:
tasks: List = []
number_of_total_requests = 30
number_of_requests_per_second = 30
for i in range(number_of_total_requests):
tasks.append(
asyncio.create_task(
client.post(
"/search?text=testing%20a%20search&elastic_index=test&page=1&num="
+ str(i),
timeout=10,
)
)
)
if i != 0 and i % number_of_requests_per_second == 0:
await asyncio.sleep(1)
responses = await asyncio.gather(*tasks, return_exceptions=True)
count = -1
success_times = 0
all_times = 0

num_greater_than_second = 0
num_less_than_second = 0
num_requets_timeouts = 0
for response in responses:
count += 1
if isinstance(response, Exception):
num_requets_timeouts += 1
continue
assert (
response.status_code == 200
)  # Adjust based on expected status
response_json = response.json()
did_in_under_a_second = response_json["time"] < 1
if not did_in_under_a_second:
num_greater_than_second += 1
else:
num_less_than_second += 1
success_times += response_json["time"]
all_times += response_json["time"]
print(count, response_json["time"])
print(f"Number over a second: {num_greater_than_second}")
print(f"Number of request timeouts: {num_requets_timeouts}")

if num_less_than_second != 0:
print(
f"Average time of successful requests (meaining requests that took less than a second): {success_times / (num_less_than_second)}"
)
if num_requets_timeouts != number_of_total_requests:
print(
f"Average time of all requests: {all_times / (number_of_total_requests - num_requets_timeouts)}"
)

if __name__ == "__main__":
asyncio.run(main())
Bearbeiten: Ich weiß, dass die Leute denken, dass dies ein Async gegen Sync -Problem ist. Das gleiche Problem tritt auf.

Code: Select all

import asyncio
import os
import time
from concurrent.futures import Future
from contextlib import asynccontextmanager
from queue import Queue
from threading import Thread

from fastapi import FastAPI, Query
from numpy import ndarray
from pydantic import BaseModel, ConfigDict
from sentence_transformers import SentenceTransformer

class TextEmbedQueueItem(BaseModel):
"""queue item for dense embedder"""

model_config = ConfigDict(arbitrary_types_allowed=True)

text: str
future: Future

def thread_queue_conumser(
model: SentenceTransformer,
queue: Queue[TextEmbedQueueItem | None],
):
while True:
item = queue.get()
if item is None:
break
if item.future.cancelled():
continue
result = model.encode([item.text])
if not item.future.cancelled():
try:
item.future.set_result(result)
except:
pass

class TextEmbedder:
"""
Dense Model Controler
"""

def __init__(self):
self.local_model = SentenceTransformer(
"nomic-ai/nomic-embed-text-v1",
trust_remote_code=True,
)

self.task_runner_name = "TextEmbedder"
self.queue = Queue()
self.thread = Thread(
target=thread_queue_conumser,
args=(self.local_model, self.queue),
daemon=True,
)
self._loop = asyncio.new_event_loop()
self.thread.start()

self.adder_thread = Thread(
target=self.run_add_to_queue,
daemon=True,
)
self.adder_thread.start()

def run_add_to_queue(self):
asyncio.set_event_loop(self._loop)
self._loop.run_forever()
self._loop.close()

def stop(self):
print("stopping!")
while not self.queue.empty():
try:
self.queue.get(timeout=1)
except:
break
print("adding none!")

self.queue.put(None)
self.thread.join()
print("stop done!")

def embed(self, text: str):
"""embed text. if returns None, then model is not ready for x seconds!"""
future: Future[ndarray] = Future()
self.queue.put(TextEmbedQueueItem(text=text, future=future))
return future

dense_embedder = TextEmbedder()

def teardown():
dense_embedder.stop()

@asynccontextmanager
async def lifespan(fast_app: FastAPI):
"""startup and shutdown tasks for the app"""
# start up
# =======
yield
# everything after here runs after shutdown
# end all threads and loops
teardown()

app = FastAPI(lifespan=lifespan)

@app.post("/search")
def search(
text: str = Query(...),
elastic_index: str = Query(...),
page: int = Query(1, ge=1),
force_embed: bool = Query(False),
num: int = Query(...),
):
print("here", str(num))
st = time.time()

print("queued up task", str(num))

text_embeding = dense_embedder.embed(text).result()
print("returning", str(num))

return {
"time": round(time.time() - st, 2),
}

Quick Reply

Change Text Case: 
   
  • Similar Topics
    Replies
    Views
    Last post