FastAPI Redis 操作

# coding: utf8

import aioredis
from fastapi import FastAPI, Request

app = FastAPI()

async def get_redis_pool() -> aioredis.Redis:
    # 2.0之前 redis = await aioredis.create_redis_pool(f"redis://:root12345@127.0.0.1:6379/0?encoding=utf-8")
    redis = await aioredis.from_url(f"redis://:@127.0.0.1:6379/0?encoding=utf-8")
    return redis


@app.on_event('startup')
async def startup_event():
    ''' 获取链接 '''
    app.state.redis = await get_redis_pool()

@app.on_event('shutdown')
async def shutdown_event():
    ''' 关闭 '''
    app.state.redis.close()
    await app.state.redis.wait_closed()


@app.post('/redis')
async def set_redis(request: Request, key: str, value: str):
    ''' fastapi中有对应的Request请求上下文中包含有我们的app对象,我们可以在请求的上下文对象,获取我们的当前的app的对象,然后再进行获取相关的实例进行操作 '''
    #  等待 redis写入  await异步变同步 
    # 如果不关心结果可以不用await,但是这里下一步要取值,
    # 必须得先等存完值 后再取值
    await request.app.state.redis.set(key, value)
    v = await request.app.state.redis.get(key)
    return {'v': v}

pip intsall aioredis==2.0.1

pip install --upgrade pip

Last updated