Websocket Routes Only Work on FastAPI, not APIRouter #11779
-
First Check
Commit to Help
Example Codeapp = FastAPI()
ws_router = APIRouter()
@ws_router.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await manager.connect(websocket)
while websocket.client_state.CONNECTED:
try:
await websocket.send_json({"Hello": "World!"})
time.sleep(3)
except (WebSocketDisconnect, ConnectionClosed):
await manager.disconnect(websocket)
break
file_router = APIRouter()
@filerouter.post("/fetch-all",
response_model=FetchItemResponse)
async def fetch_all_items(
credentials: Annotated[HTTPAuthorizationCredentials, Depends(security)],
userid: Annotated[str, Form()],
upload_repo: UploadRepository = Depends(get_mongodb_repo(UploadRepository))
):
try:
print("userid", userid)
item_list, total = upload_repo.list_items(userid=userid)
print(item_list)
return FetchItemResponse(itemlist=[FetchModel(id=item.id,originname=item.originname, filepath=item.filepath, createdat=item.createdat, status=item.status) for item in item_list])
except Exception as e:
print(e)
return None
app.include_router(file_router, prefix="/files", tags=["files"])
app.include_router(ws_router, prefix="/ws", tags=["websocket"]) DescriptionSimilarity Operating SystemWindows Operating System DetailsNo response FastAPI Version0.111.0 Pydantic Version2.7.4 Python Version3.12.4 Additional ContextNo response |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 3 replies
-
You are using |
Beta Was this translation helpful? Give feedback.
Try changing
time.sleep(3)
toawait asyncio.sleep(3)
.time.sleep
is blocking function, in async functions you should useawait asyncio.sleep
instead.If this doesn't help, you probably have errors in the implementation of
manager
.