fastapi操作异步redis模块aioredis
main.py
import aioredis
from fastapi import FastAPI
from api.views import router as api_router
from ws.views import router as wss_router
from config import redis_config
async def redis_pool(db: int = 0):
"""
redis连接池
:return:
"""
redis = await aioredis.create_redis_pool(
f"redis://:{redis_config.get('password')}@{redis_config.get('host')}/{db}?encoding=utf-8"
)
return redis
def create_app():
application = FastAPI()
application.include_router(api_router, prefix="/api")
application.include_router(wss_router, prefix="/ws")
return application
app = create_app()
@app.on_event("startup")
async def create_redis():
app.state.redis = await redis_pool()
@app.on_event("shutdown")
async def close_redis():
await app.state.redis.close()
xxx_router.py
from fastapi import APIRouter, Request
from .response import Response
router = APIRouter()
@router.get("/")
async def api_index():
return "/api"
@router.get("/online_devices", response_model=Response)
async def online_devices(request: Request):
redis_client = request.app.state.redis
keys = await redis_client.hgetall("online_devices")
return Response(message="successfully", data=keys)
Last updated