im Ausführen einer Fastapi -App mit mehreren Mitarbeitern. Die App verfügt über einen einzelnen Endpunkt, der eine kleine Menge Text benötigt und einen Sentcetransformer verwendet, um ihn einzubetten. Wenn ich diese App auf einem Arbeiter ausführe, funktioniert alles großartig. Es kann ungefähr 25 Anfragen gleichzeitig bearbeiten, ohne dass eine einzige Anfrage länger als eine Sekunde dauert. Cool. Ich möchte diese Zahl jedoch erhöhen, also starte ich den Server mit mehr Mitarbeitern. Wenn dies jedoch manchmal Anfragen erledigt, erhalte ich während des Tests eine Zeitüberschreitung (meine Zeitüberschreitung beträgt 10 Sekunden) (alle Anfragen ohne Lesezeitout wurden in weniger als einer Sekunde erfüllt). Ich dachte, dies lag daran, dass Fastapi aus irgendeinem Grund nur länger als gewöhnlich dauerte, um die Aufgaben zu erledigen Beginn des Endpunkts nie gedruckt). Auch nachdem eine Sekunde vergangen ist und die anderen über 25 Anfragen weniger als eine Sekunde abgeschlossen waren 4 Core i7 CPU. p> unten ist a reproduzierbar main.py . Sie können es mit Fastapi Run -Workers main.py .
Bitte beachten asynchron (indem Sie das Adder_Thread loswerden und die Warteschlange () in Def -Embo ()) < /p>
direkt aufrufen.
im Ausführen einer Fastapi -App mit mehreren Mitarbeitern. Die App verfügt über einen einzelnen Endpunkt, der eine kleine Menge Text benötigt und einen Sentcetransformer verwendet, um ihn einzubetten. Wenn ich diese App auf einem Arbeiter ausführe, funktioniert alles großartig. Es kann ungefähr 25 Anfragen gleichzeitig bearbeiten, ohne dass eine einzige Anfrage länger als eine Sekunde dauert. Cool. Ich möchte diese Zahl jedoch erhöhen, also starte ich den Server mit mehr Mitarbeitern. Wenn dies jedoch manchmal Anfragen erledigt, erhalte ich während des Tests eine Zeitüberschreitung (meine Zeitüberschreitung beträgt 10 Sekunden) (alle Anfragen ohne Lesezeitout wurden in weniger als einer Sekunde erfüllt). Ich dachte, dies lag daran, dass Fastapi aus irgendeinem Grund nur länger als gewöhnlich dauerte, um die Aufgaben zu erledigen Beginn des Endpunkts nie gedruckt). Auch nachdem eine Sekunde vergangen ist und die anderen über 25 Anfragen weniger als eine Sekunde abgeschlossen waren 4 Core i7 CPU. p> unten ist a reproduzierbar main.py . Sie können es mit Fastapi Run -Workers main.py . Bitte beachten asynchron (indem Sie das Adder_Thread loswerden und die Warteschlange () in Def -Embo ()) < /p> direkt aufrufen.[code]import asyncio import os import time from concurrent.futures import Future from contextlib import asynccontextmanager from queue import Queue from threading import Thread
from fastapi import FastAPI, Query from numpy import ndarray from pydantic import BaseModel, ConfigDict from sentence_transformers import SentenceTransformer
class TextEmbedQueueItem(BaseModel): """queue item for dense embedder"""
def thread_queue_conumser( model: SentenceTransformer, queue: Queue[TextEmbedQueueItem | None], ): while True: item = queue.get() if item is None: break if item.future.cancelled(): continue result = model.encode([item.text]) if not item.future.cancelled(): try: item.future.set_result(result) except: pass
def embed(self, text: str): """embed text. if returns None, then model is not ready for x seconds!""" future: Future[ndarray] = Future() self._loop.call_soon_threadsafe(self._add_embed, text, future) return future
dense_embedder = TextEmbedder()
def teardown(): dense_embedder.stop()
@asynccontextmanager async def lifespan(fast_app: FastAPI): """startup and shutdown tasks for the app""" # start up # ======= yield # everything after here runs after shutdown # end all threads and loops teardown()
return { "time": round(time.time() - st, 2), } < /code> und unten befindet sich die Datei, mit der ich sie testet (es dauert möglicherweise einige Läufe, um einige zum Auszeit zu bringen): < /p> import asyncio from typing import List
async with AsyncClient( base_url="http://localhost:8000", limits=limits ) as client: tasks: List = [] number_of_total_requests = 30 number_of_requests_per_second = 30 for i in range(number_of_total_requests): tasks.append( asyncio.create_task( client.post( "/search?text=testing%20a%20search&elastic_index=test&page=1&num=" + str(i), timeout=10, ) ) ) if i != 0 and i % number_of_requests_per_second == 0: await asyncio.sleep(1) responses = await asyncio.gather(*tasks, return_exceptions=True) count = -1 success_times = 0 all_times = 0
num_greater_than_second = 0 num_less_than_second = 0 num_requets_timeouts = 0 for response in responses: count += 1 if isinstance(response, Exception): num_requets_timeouts += 1 continue assert ( response.status_code == 200 ) # Adjust based on expected status response_json = response.json() did_in_under_a_second = response_json["time"] < 1 if not did_in_under_a_second: num_greater_than_second += 1 else: num_less_than_second += 1 success_times += response_json["time"] all_times += response_json["time"] print(count, response_json["time"]) print(f"Number over a second: {num_greater_than_second}") print(f"Number of request timeouts: {num_requets_timeouts}")
if num_less_than_second != 0: print( f"Average time of successful requests (meaining requests that took less than a second): {success_times / (num_less_than_second)}" ) if num_requets_timeouts != number_of_total_requests: print( f"Average time of all requests: {all_times / (number_of_total_requests - num_requets_timeouts)}" )
if __name__ == "__main__": asyncio.run(main()) [/code] [b] Bearbeiten: Ich weiß, dass die Leute denken, dass dies ein Async gegen Sync -Problem ist. Das gleiche [url=viewtopic.php?t=11587]Problem[/url] tritt auf. [/b] [code]import asyncio import os import time from concurrent.futures import Future from contextlib import asynccontextmanager from queue import Queue from threading import Thread
from fastapi import FastAPI, Query from numpy import ndarray from pydantic import BaseModel, ConfigDict from sentence_transformers import SentenceTransformer
class TextEmbedQueueItem(BaseModel): """queue item for dense embedder"""
def thread_queue_conumser( model: SentenceTransformer, queue: Queue[TextEmbedQueueItem | None], ): while True: item = queue.get() if item is None: break if item.future.cancelled(): continue result = model.encode([item.text]) if not item.future.cancelled(): try: item.future.set_result(result) except: pass
def embed(self, text: str): """embed text. if returns None, then model is not ready for x seconds!""" future: Future[ndarray] = Future() self.queue.put(TextEmbedQueueItem(text=text, future=future)) return future
dense_embedder = TextEmbedder()
def teardown(): dense_embedder.stop()
@asynccontextmanager async def lifespan(fast_app: FastAPI): """startup and shutdown tasks for the app""" # start up # ======= yield # everything after here runs after shutdown # end all threads and loops teardown()
app = FastAPI(lifespan=lifespan)
@app.post("/search") def search( text: str = Query(...), elastic_index: str = Query(...), page: int = Query(1, ge=1), force_embed: bool = Query(False), num: int = Query(...), ): print("here", str(num)) st = time.time()
Wie senden und empfangen Sie Nachrichten mit Pythons Asyncio und der WebSockets -Bibliothek?
Ich verwende Django -Kanäle als Socket -Server. Im Grunde versuche ich also, diesen Socket -Server zu...
getestet mit PAHO Version 1.2.5
In diesem Beispiel sende ich eine Nachricht an das Ziel root/msg/1/data . Ich habe zwei Abonnements: root/msg/1/# und root/msg/+/# . Beide Abonnements stimmen mit der...
getestet mit PAHO -Version 1.2.5
In diesem Beispiel sende ich eine Nachricht an das Zielroot/msg/1/data
System.out.println( topic: + topic + message: + new String(message1.getPayload(),...
Ich habe einen Abschnitt einer CSHTML -Webseite mit einem Dropdown -Feld, aber ich muss stattdessen einen Combobox mit einem Eingang verwenden. $.getJSON( /orders/orderlist/picks , function (picks)...