This error is caused by creating an asyncio synchronization primitive such as an asyncio.Event
before starting your asyncio loop, then using that primitive in the loop. To fix this, create the primitives you need to use from within the loop.
This error was discovered when creating our real-time chat app example. In order to make the app real-time, we used GraphQL subscriptions, and created an asyncio.Event
on which to wait for updates.
Let’s look at the code that caused the error.
Problem
Below is what the file that contained the GraphQL schema looked like prior to fixing it:
from uuid import uuid4
import typing
import asyncio
import strawberry
@strawberry.type
class User:
uuid: strawberry.Private[str]
name: str
@strawberry.type
class Message:
sender: User
text: str
@strawberry.type
class Room:
updated: strawberry.Private[asyncio.Event] = asyncio.Event()
name: str = "Example Chat"
users: typing.List[User]
messages: typing.List[Message]
def get_user_by_uuid(self, uuid):
return next(user for user in self.users if user.uuid == uuid)
def add_user(self, name):
uuid = str(uuid4())
room.users.append(User(name=name, uuid=uuid))
return uuid
def add_message(self, uuid, text):
self.messages.append(Message(
sender=self.get_user_by_uuid(uuid),
text=text
))
self.updated.set()
self.updated.clear()
return self
async def wait_for_update(self):
await self.updated.wait()
room = Room(users=[], messages=[])
@strawberry.type
class Query:
@strawberry.field
async def get_room() -> Room:
return room
@strawberry.type
class Subscription:
@strawberry.subscription
async def room() -> typing.AsyncGenerator[Room, None]:
while True:
yield await Query.get_room()
await room.wait_for_update()
@strawberry.type
class Mutation:
@strawberry.field
async def join(name: str) -> str:
return room.add_user(name)
@strawberry.field
async def send_message(uuid: str, text: str) -> Room:
return room.add_message(uuid, text)
PythonEverything looked correct, but when running this app, we got the following error:
INFO: ('172.21.0.1', 50034) - "WebSocket /graphql" [accepted]
INFO: connection open
Task <Task pending name='Task-23' coro=<<async_generator_asend without __name__>()> cb=[_wait.<locals>._on_completion() at /usr/local/lib/python3.9/asyncio/tasks.py:509]> got Future <Future pending> attached to a different loop
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/strawberry/subscriptions/protocols/graphql_ws/handlers.py", line 164, in handle_async_results
async for result in result_source:
File "/usr/local/lib/python3.9/site-packages/graphql/execution/map_async_iterator.py", line 59, in __anext__
raise error
File "/opt/container/gql.py", line 58, in room
await room.wait_for_update()
File "/opt/container/gql.py", line 42, in wait_for_update
await self.updated.wait()
File "/usr/local/lib/python3.9/asyncio/locks.py", line 226, in wait
await fut
RuntimeError: Task <Task pending name='Task-23' coro=<<async_generator_asend without __name__>()> cb=[_wait.<locals>._on_completion() at /usr/local/lib/python3.9/asyncio/tasks.py:509]> got Future <Future pending> attached to a different loop
It took a while, but we discovered the issue was the following:
sender: User
text: str
@strawberry.type
class Room:
updated: strawberry.Private[asyncio.Event] = asyncio.Event()
name: str = "Example Chat"
users: typing.List[User]
messages: typing.List[Message]
def get_user_by_uuid(self, uuid):
return next(user for user in self.users if user.uuid == uuid)
def add_user(self, name):
uuid = str(uuid4())
room.users.append(User(name=name, uuid=uuid))
return uuid
def add_message(self, uuid, text):
self.messages.append(Message(
sender=self.get_user_by_uuid(uuid),
text=text
))
self.updated.set()
self.updated.clear()
return self
async def wait_for_update(self):
await self.updated.wait()
room = Room(users=[], messages=[])
PythonAs you can see, we create the room
global object right in the root indentation level of the file. That means, before the asyncio
loop is ever started, room
already exists along with the event that is created by default when the object is instantiated.
This is a problem, because that asyncio.Event
isn’t associated with the loop that the FastAPI app then runs in.
Solution
Below is how we fixed it:
@strawberry.type
class Room:
updated: strawberry.Private[typing.Optional[asyncio.Event]] = None
name: str = "Example Chat"
users: typing.List[User]
messages: typing.List[Message]
def get_user_by_uuid(self, uuid):
return next(user for user in self.users if user.uuid == uuid)
def add_user(self, name):
uuid = str(uuid4())
room.users.append(User(name=name, uuid=uuid))
return uuid
def add_message(self, uuid, text):
self.messages.append(Message(
sender=self.get_user_by_uuid(uuid),
text=text
))
self.updated.set()
self.updated.clear()
return self
async def wait_for_update(self):
if self.updated is None:
self.updated = asyncio.Event()
await self.updated.wait()
room = Room(users=[], messages=[])
PythonInstead of instantiating the asyncio.Event
object when the Room
object is instantiated (which would be when the file is first interpreted -long before the asyncio
loop has had a chance to start), we check when we want to wait on it if it doesn’t exist yet. If not, we create it there. This works because wait_for_update
is only ever called from inside the asyncio
loop.
Conclusion
In summary, asyncio
primitives must be created only from within the asyncio
loop you intend to use them. If you don’t, they’ll be associated with a different context, or none at all as was the case above.
Hope that helps.
John is a professional software engineer who has been solving problems with code for 15+ years. He has experience with full stack web development, container orchestration, mobile development, DevOps, Windows and Linux kernel development, cybersecurity, and reverse engineering. In his spare time, he’s researching the potential business applications of AI.