fastapi操作异步redis模块aioredis

  • main.py

import aioredis
from fastapi import FastAPI
from api.views import router as api_router
from ws.views import router as wss_router

from config import redis_config


async def redis_pool(db: int = 0):
    """
    redis连接池
    :return:
    """
    redis = await aioredis.create_redis_pool(
        f"redis://:{redis_config.get('password')}@{redis_config.get('host')}/{db}?encoding=utf-8"
    )
    return redis


def create_app():
    application = FastAPI()
    application.include_router(api_router, prefix="/api")
    application.include_router(wss_router, prefix="/ws")
    return application


app = create_app()


@app.on_event("startup")
async def create_redis():
    app.state.redis = await redis_pool()


@app.on_event("shutdown")
async def close_redis():
    await app.state.redis.close()
  • xxx_router.py

from fastapi import APIRouter, Request
from .response import Response

router = APIRouter()


@router.get("/")
async def api_index():
    return "/api"


@router.get("/online_devices", response_model=Response)
async def online_devices(request: Request):
    redis_client = request.app.state.redis
    keys = await redis_client.hgetall("online_devices")
    return Response(message="successfully", data=keys)

Last updated