FastAPI WebSocket
Last updated
Last updated
from fastapi import WebSocket
@app.websocket('/ws')
async def websocket_dep(websocket:WebSocket):
await websocket.accept()
while True:
data =await websocket.receive_text()
websocket.send_text() # 响应内容
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge"
from fastapi import WebSocket
# 用于管理websocket的连接
class ConnectionManager:
def __init__(self):
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def send_message(self, message: str, websocket: WebSocket):
await websocket.send_text(message)
async def send_json(self,data,websocket: WebSocket):
await websocket.send_json(data)
# 给所有连接发送文本
async def broadcast(self, message: str):
for connection in self.active_connections:
await connection.send_text(message)
manager = ConnectionManager()
@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: int):
await manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
await manager.send_message(f"You wrote: {data}", websocket)
await manager.broadcast(f"Client #{client_id} says: {data}")
except WebSocketDisconnect:
manager.disconnect(websocket)
await manager.broadcast(f"Client #{client_id} left the chat")