FastAPI Redis 操作
# coding: utf8
import aioredis
from fastapi import FastAPI, Request
app = FastAPI()
async def get_redis_pool() -> aioredis.Redis:
# 2.0之前 redis = await aioredis.create_redis_pool(f"redis://:root12345@127.0.0.1:6379/0?encoding=utf-8")
redis = await aioredis.from_url(f"redis://:@127.0.0.1:6379/0?encoding=utf-8")
return redis
@app.on_event('startup')
async def startup_event():
''' 获取链接 '''
app.state.redis = await get_redis_pool()
@app.on_event('shutdown')
async def shutdown_event():
''' 关闭 '''
app.state.redis.close()
await app.state.redis.wait_closed()
@app.post('/redis')
async def set_redis(request: Request, key: str, value: str):
''' fastapi中有对应的Request请求上下文中包含有我们的app对象,我们可以在请求的上下文对象,获取我们的当前的app的对象,然后再进行获取相关的实例进行操作 '''
# 等待 redis写入 await异步变同步
# 如果不关心结果可以不用await,但是这里下一步要取值,
# 必须得先等存完值 后再取值
await request.app.state.redis.set(key, value)
v = await request.app.state.redis.get(key)
return {'v': v}
pip intsall aioredis==2.0.1
pip install --upgrade pip
Last updated