Code: Select all
import asyncio
import random
task_group: asyncio.TaskGroup | None = None
async def coro1():
while True:
await asyncio.sleep(1)
print("coro1")
async def coro2():
while True:
await asyncio.sleep(1)
if random.random() < 0.1:
print("dead")
assert task_group is not None
task_group.cancel() # This function does not exist.
else:
print("Survived another second")
async def main():
global task_group
async with asyncio.TaskGroup() as tg:
task_group = tg
tg.create_task(coro1())
tg.create_task(coro2())
task_group = None
asyncio.run(main())
Mobile version