基于FastAPI和Tortoise-ORM的WebSocket类的封装
基于FastAPI和Tortoise-ORM的WebSocket类的封装,用于实现数据的实时传输
以下是基于FastAPI和Tortoise-ORM的WebSocket类的封装,用于实现数据的实时传输:
from fastapi import FastAPI, HTTPException, WebSocket
from fastapi.responses import HTMLResponse
import uuid
from typing import List
from tortoise.contrib.fastapi import register_tortoise
from tortoise import fields
from tortoise.models import Model
class ChatHistory(Model):
id = fields.UUIDField(pk=True)
user = fields.CharField(max_length=20)
message = fields.CharField(max_length=200)
class Meta:
table = "chat_history"
class WebSocketManager:
def __init__(self):
self.active_connections = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def broadcast(self, message: str):
for connection in self.active_connections:
await connection.send_text(message)
app = FastAPI()
manager = WebSocketManager()
@app.get("/")
async def index():
return HTMLResponse("""
<html>
<head>
<title>WebSocket Chat</title>
</head>
<body>
<h1>WebSocket Chat</h1>
<form action="" onsubmit="sendMessage(event)">
<input type="text" id="messageText" autocomplete="off"/>
<button>Send</button>
</form>
<ul id='messages'>
</ul>
<script>
var ws = new WebSocket("ws://localhost:8000/ws");
ws.onmessage = function(event) {
var messages = document.getElementById('messages')
var message = document.createElement('li')
var content = document.createTextNode(event.data)
message.appendChild(content)
messages.appendChild(message)
};
function sendMessage(event) {
var input = document.getElementById("messageText")
ws.send(input.value)
input.value = ''
event.preventDefault()
}
</script>
</body>
</html>
""")
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
message = f"{websocket.client.host}: {data}"
await manager.broadcast(message)
chat = ChatHistory(user="user", message=message)
await chat.save()
except Exception as e:
print(str(e))
finally:
manager.disconnect(websocket)
register_tortoise(
app,
db_url="sqlite://db.sqlite3",
modules={"models": ["__main__"]},
generate_schemas=True,
add_exception_handlers=True,
)
if __name__ == "__main__":
import uvicorn
uvicorn.run("main:app", reload=True)
在这个例子中,我们实现了一个简单的聊天室,用户可以通过WebSocket与服务器端进行交互。当用户发送消息时,服务器会将该消息广播给所有连接到聊天室的用户,并将消息保存到数据库中(使用Tortoise-ORM)。同时,我们可以通过访问http://localhost:8000/来查看聊天记录。
Last updated