FastAPI中使用调度程序执行异步任务,与Redis交互

FastAPI中使用调度程序执行异步任务,与Redis交互

要在FastAPI中使用调度程序执行异步任务,并与Redis进行交互,可以使用asyncio库和aioredis库。

首先,确保已安装所需的依赖项。在终端中运行以下命令:

pip install fastapi[all]
pip install aioredis

接下来,创建一个名为scheduler.py的文件,其中包含调度程序的逻辑。在这个文件中,你可以定义一个异步函数,它将作为定期执行的任务。

import asyncio

async def task():
    # 这里是你的任务逻辑
    # 例如,与Redis进行交互
    redis = await aioredis.create_redis_pool('redis://localhost')
    # 执行Redis操作
    await redis.set('key', 'value')
    value = await redis.get('key')
    print(value)
    redis.close()
    await redis.wait_closed()


# 创建一个调度程序,并设置任务执行的时间间隔
async def scheduler():
    while True:
        await task()
        await asyncio.sleep(60)  # 每60秒执行一次任务

在你的FastAPI应用程序中,你需要创建一个路由,用于启动调度程序。创建一个名为main.py的文件,其中包含以下内容:

from fastapi import FastAPI
import asyncio

app = FastAPI()

# 启动调度程序
@app.on_event("startup")
async def startup_event():
    loop = asyncio.get_event_loop()
    loop.create_task(scheduler())

# 示例路由
@app.get("/")
async def root():
    return {"message": "Hello World"}

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

现在你可以运行FastAPI应用程序,并启动调度程序。在终端中运行以下命令:

python main.py

调度程序将在后台每60秒执行一次任务,并与Redis进行交互。

请确保将redis://localhost更改为你实际的Redis连接信息,例如主机和端口。根据你的需求和环境,你可能还需要在调度程序中设置其他参数,例如身份验证凭据。

Last updated