implementieren
Code: Select all
import types
@types.coroutine
def wait(seconds: int) -> None:
start_time = time.time()
while time.time() - start_time < seconds:
yield
Code: Select all
import socket
from socket import BlockingIOError
@types.coroutine
def request(url: str) -> None:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((url, 80))
s.send(f'GET / HTTP/1.1\nHost: {url}\n\n'.encode())
s.setblocking(False)
response = b''
while True:
try:
data = s.recv(1024)
response += data
except BlockingIOError:
if response:
break
yield
< /code>
Diese Methode überprüft jede Iteration, um festzustellen, ob Daten abgerufen werden können. Wenn ja, fahren Sie mit den anderen Aufgaben in der Ereignisschleife fort. Zum Beispiel im Fall von Linux ist dies epoll
Code: Select all
@types.coroutine
async def request(url: str) -> None:
loop = asyncio.get_event_loop()
await loop.sock_connect(url, 80)
...