1. 创建并激活虚拟环境
python3 -m venv venv
source venv/*/activate
2. 安装依赖包
pip install fastapi 'uvicorn[standard]' tortoise-orm 'celery[redis]' fastapi-cdn-host
3. 配置数据库连接参数
- config.py
from typing import TypedDictclass TortoiseInitParam(TypedDict):db_url: strmodules: dictDB_CONFIG: TortoiseInitParam = dict(db_url="sqlite://db.sqlite3",modules={"models": ["models"]},
)
4. 定义表结构(直接从FastAPI Examples - Tortoise ORM v0.20.0 Documentation 拷贝过来的)
- models.py
from tortoise import fields, models
from tortoise.contrib.pydantic import pydantic_model_creatorclass Users(models.Model):"""The User model"""id = fields.IntField(pk=True)#: This is a usernameusername = fields.CharField(max_length=20, unique=True)name = fields.CharField(max_length=50, null=True)family_name = fields.CharField(max_length=50, null=True)category = fields.CharField(max_length=30, default="misc")password_hash = fields.CharField(max_length=128, null=True)# created_at = fields.DatetimeField(auto_now_add=True)# modified_at = fields.DatetimeField(auto_now=True)def full_name(self) -> str:"""Returns the best name"""if self.name or self.family_name:return f"{self.name or ''} {self.family_name or ''}".strip()return self.usernameclass PydanticMeta:computed = ["full_name"]exclude = ["password_hash"]User_Pydantic = pydantic_model_creator(Users, name="User")
UserIn_Pydantic = pydantic_model_creator(Users, name="UserIn", exclude_readonly=True)
5. 配置celery
- tasks.py
#!/usr/bin/env python
import shlex
import subprocess
from pathlib import Pathimport anyio
from celery import Celery
from celery.signals import worker_process_init, worker_process_shutdown
from config import DB_CONFIG
from models import Users
from tortoise import TortoiseREDIS_URL = "redis://localhost:6379"
app = Celery(__name__, broker=REDIS_URL, backend=REDIS_URL)async def init_db() -> None:await Tortoise.init(db_url=DB_CONFIG["db_url"], modules=DB_CONFIG["modules"])@worker_process_init.connect
def init_worker(**kwargs):anyio.run(init_db)@worker_process_shutdown.connect
def close_worker(**kwargs):anyio.run(Tortoise.close_connections)async def _create_user(data) -> int:user_obj = await Users.create(**data)return user_obj.id@app.task
def save_user_to_db(data) -> int:return anyio.run(_create_user, data)def main():subprocess.run(shlex.split(f"celery -A {Path(__file__).stem} worker"))if __name__ == "__main__":main()
6. FastAPI接口示例:
- main.py
#!/usr/bin/env python
from pathlib import Path
from typing import Listimport celery
import fastapi_cdn_host
import uvicorn
from fastapi import FastAPI
from pydantic import BaseModel
from starlette.exceptions import HTTPException
from tortoise.contrib.fastapi import register_tortoisefrom config import DB_CONFIG
from models import User_Pydantic, UserIn_Pydantic, Users
from tasks import save_user_to_dbapp = FastAPI(title="Tortoise ORM FastAPI example")
fastapi_cdn_host.monkey_patch(app)class Status(BaseModel):message: str@app.get("/users", response_model=List[User_Pydantic])
async def get_users():return await User_Pydantic.from_queryset(Users.all())@app.post("/users", response_model=User_Pydantic)
async def create_user(user: UserIn_Pydantic):celery_task = save_user_to_db.delay(user.model_dump(exclude_unset=True))timeout = 2try:user_id = celery_task.get(timeout=timeout)except celery.exceptions.TimeoutError:raise HTTPException(detail=f"Failed to create user in celery within {timeout} seconds",status_code=400,)return await User_Pydantic.from_queryset_single(Users.get(id=user_id))@app.get("/user/{user_id}", response_model=User_Pydantic)
async def get_user(user_id: int):return await User_Pydantic.from_queryset_single(Users.get(id=user_id))@app.put("/user/{user_id}", response_model=User_Pydantic)
async def update_user(user_id: int, user: UserIn_Pydantic):await Users.filter(id=user_id).update(**user.model_dump(exclude_unset=True))return await User_Pydantic.from_queryset_single(Users.get(id=user_id))@app.delete("/user/{user_id}", response_model=Status)
async def delete_user(user_id: int):deleted_count = await Users.filter(id=user_id).delete()if not deleted_count:raise HTTPException(status_code=404, detail=f"User {user_id} not found")return Status(message=f"Deleted user {user_id}")register_tortoise(app,db_url=DB_CONFIG["db_url"],modules=DB_CONFIG["modules"],generate_schemas=True,add_exception_handlers=True,
)if __name__ == "__main__":uvicorn.run(f"{Path(__file__).stem}:app", reload=True)
7. 启动后台服务
python main.py
8. 启动Celery Worker
python tasks.py
9. 打开浏览器验证效果
http://localhost:8000/docs#/default/create_user_users_post
结果如下