FastAPI中使用调度程序执行异步任务,与Redis交互
FastAPI中使用调度程序执行异步任务,与Redis交互
要在FastAPI中使用调度程序执行异步任务,并与Redis进行交互,可以使用asyncio库和aioredis库。
首先,确保已安装所需的依赖项。在终端中运行以下命令:
pip install fastapi[all]
pip install aioredis
接下来,创建一个名为scheduler.py的文件,其中包含调度程序的逻辑。在这个文件中,你可以定义一个异步函数,它将作为定期执行的任务。
import asyncio
async def task():
# 这里是你的任务逻辑
# 例如,与Redis进行交互
redis = await aioredis.create_redis_pool('redis://localhost')
# 执行Redis操作
await redis.set('key', 'value')
value = await redis.get('key')
print(value)
redis.close()
await redis.wait_closed()
# 创建一个调度程序,并设置任务执行的时间间隔
async def scheduler():
while True:
await task()
await asyncio.sleep(60) # 每60秒执行一次任务
在你的FastAPI应用程序中,你需要创建一个路由,用于启动调度程序。创建一个名为main.py的文件,其中包含以下内容:
from fastapi import FastAPI
import asyncio
app = FastAPI()
# 启动调度程序
@app.on_event("startup")
async def startup_event():
loop = asyncio.get_event_loop()
loop.create_task(scheduler())
# 示例路由
@app.get("/")
async def root():
return {"message": "Hello World"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
现在你可以运行FastAPI应用程序,并启动调度程序。在终端中运行以下命令:
python main.py
调度程序将在后台每60秒执行一次任务,并与Redis进行交互。
请确保将redis://localhost更改为你实际的Redis连接信息,例如主机和端口。根据你的需求和环境,你可能还需要在调度程序中设置其他参数,例如身份验证凭据。
Last updated