一个简单的Demo展示fastapi+tortoise-orm+celery如何搭配

1. 创建并激活虚拟环境

python3 -m venv venv
source venv/*/activate

2. 安装依赖包

pip install fastapi 'uvicorn[standard]' tortoise-orm 'celery[redis]' fastapi-cdn-host

3. 配置数据库连接参数

- config.py

from typing import TypedDictclass TortoiseInitParam(TypedDict):db_url: strmodules: dictDB_CONFIG: TortoiseInitParam = dict(db_url="sqlite://db.sqlite3",modules={"models": ["models"]},
)

4. 定义表结构(直接从FastAPI Examples - Tortoise ORM v0.20.0 Documentation 拷贝过来的)

- models.py

from tortoise import fields, models
from tortoise.contrib.pydantic import pydantic_model_creatorclass Users(models.Model):"""The User model"""id = fields.IntField(pk=True)#: This is a usernameusername = fields.CharField(max_length=20, unique=True)name = fields.CharField(max_length=50, null=True)family_name = fields.CharField(max_length=50, null=True)category = fields.CharField(max_length=30, default="misc")password_hash = fields.CharField(max_length=128, null=True)# created_at = fields.DatetimeField(auto_now_add=True)# modified_at = fields.DatetimeField(auto_now=True)def full_name(self) -> str:"""Returns the best name"""if self.name or self.family_name:return f"{self.name or ''} {self.family_name or ''}".strip()return self.usernameclass PydanticMeta:computed = ["full_name"]exclude = ["password_hash"]User_Pydantic = pydantic_model_creator(Users, name="User")
UserIn_Pydantic = pydantic_model_creator(Users, name="UserIn", exclude_readonly=True)

5. 配置celery

- tasks.py

#!/usr/bin/env python
import shlex
import subprocess
from pathlib import Pathimport anyio
from celery import Celery
from celery.signals import worker_process_init, worker_process_shutdown
from config import DB_CONFIG
from models import Users
from tortoise import TortoiseREDIS_URL = "redis://localhost:6379"
app = Celery(__name__, broker=REDIS_URL, backend=REDIS_URL)async def init_db() -> None:await Tortoise.init(db_url=DB_CONFIG["db_url"], modules=DB_CONFIG["modules"])@worker_process_init.connect
def init_worker(**kwargs):anyio.run(init_db)@worker_process_shutdown.connect
def close_worker(**kwargs):anyio.run(Tortoise.close_connections)async def _create_user(data) -> int:user_obj = await Users.create(**data)return user_obj.id@app.task
def save_user_to_db(data) -> int:return anyio.run(_create_user, data)def main():subprocess.run(shlex.split(f"celery -A {Path(__file__).stem} worker"))if __name__ == "__main__":main()

6. FastAPI接口示例:

- main.py

#!/usr/bin/env python
from pathlib import Path
from typing import Listimport celery
import fastapi_cdn_host
import uvicorn
from fastapi import FastAPI
from pydantic import BaseModel
from starlette.exceptions import HTTPException
from tortoise.contrib.fastapi import register_tortoisefrom config import DB_CONFIG
from models import User_Pydantic, UserIn_Pydantic, Users
from tasks import save_user_to_dbapp = FastAPI(title="Tortoise ORM FastAPI example")
fastapi_cdn_host.monkey_patch(app)class Status(BaseModel):message: str@app.get("/users", response_model=List[User_Pydantic])
async def get_users():return await User_Pydantic.from_queryset(Users.all())@app.post("/users", response_model=User_Pydantic)
async def create_user(user: UserIn_Pydantic):celery_task = save_user_to_db.delay(user.model_dump(exclude_unset=True))timeout = 2try:user_id = celery_task.get(timeout=timeout)except celery.exceptions.TimeoutError:raise HTTPException(detail=f"Failed to create user in celery within {timeout} seconds",status_code=400,)return await User_Pydantic.from_queryset_single(Users.get(id=user_id))@app.get("/user/{user_id}", response_model=User_Pydantic)
async def get_user(user_id: int):return await User_Pydantic.from_queryset_single(Users.get(id=user_id))@app.put("/user/{user_id}", response_model=User_Pydantic)
async def update_user(user_id: int, user: UserIn_Pydantic):await Users.filter(id=user_id).update(**user.model_dump(exclude_unset=True))return await User_Pydantic.from_queryset_single(Users.get(id=user_id))@app.delete("/user/{user_id}", response_model=Status)
async def delete_user(user_id: int):deleted_count = await Users.filter(id=user_id).delete()if not deleted_count:raise HTTPException(status_code=404, detail=f"User {user_id} not found")return Status(message=f"Deleted user {user_id}")register_tortoise(app,db_url=DB_CONFIG["db_url"],modules=DB_CONFIG["modules"],generate_schemas=True,add_exception_handlers=True,
)if __name__ == "__main__":uvicorn.run(f"{Path(__file__).stem}:app", reload=True)

7. 启动后台服务

python main.py

8. 启动Celery Worker

python tasks.py

9. 打开浏览器验证效果

http://localhost:8000/docs#/default/create_user_users_post

结果如下

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/589106.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

HarmonyOS NEXT应用开发之ForEach:循环渲染

ForEach接口基于数组类型数据来进行循环渲染,需要与容器组件配合使用,且接口返回的组件应当是允许包含在ForEach父容器组件中的子组件。例如,ListItem组件要求ForEach的父容器组件必须为 List组件 。 说明: 从API version 9开始&a…

计算机网络|谢希仁版|数据链路层

数据链路层 数据链路层研究的是什么?数据链路层的几个共同问题数据链路与链路帧通信规程 三个基本问题封装成帧透明传输差错检测可靠传输 点对点协议PPPPPP协议应满足的需求PPP协议的组成PPP协议帧的格式各字段的意义字节填充零比特填充PPP协议的工作状态 使用广播信…

恒创科技:Windows 服务器关闭防火墙的两种方法

​  防火墙是在Windows服务器中的一项安全功能,可监视所有网络流量(传入和传出),并根据一组旨在保护系统免受黑客攻击的规则允许或阻止流量。然而,在某些情况下,您可能需要暂时关闭防火墙以进行特定的网络配置或测试。本文将介绍…

03 Python进阶:MySQL - mysql-connector

mysql-connector安装 要在 Python 中使用 MySQL 数据库,你需要安装 MySQL 官方提供的 MySQL Connector/Python。下面是安装 MySQL Connector/Python 的步骤: 首先,确保你已经安装了 Python,如果没有安装,可以在 Python…

mac、windows 电脑安装使用多个版本的node

我们为啥要安装多个不同版本的node? 开发旧项目时,使用低版本Nodejs。开发新项目时,需使用高版本Node.js。可使用n同时安装多个版本Node.js,并切换到指定版本Node.js。 mac电脑安装 一、全局安装 npm install -g n 二、mac电脑…

如何优化TCP?TCP的可靠传输机制是什么?

在网络世界中,传输层协议扮演着至关重要的角色,特别是TCP协议,以其可靠的数据传输特性而广受青睐。然而,随着网络的发展和数据量的激增,传统的TCP协议在效率方面遭遇了挑战。小编将深入分析TCP的可靠性传输机制&#x…

LeetCode_394(字符串解码)

双栈法 public String decodeString(String s) {String res "";Stack<Integer> countStack new Stack<>();Stack<String> resStack new Stack<>();int idx 0;while (idx < s.length()){char cur s.charAt(idx);//处理数字if(Charact…

2024第八届全国青少年无人机大赛暨中国航空航天科普展览会

2024第八届全国青少年无人机大赛暨中国航空航天科普展览会 邀请函 主办单位&#xff1a; 中国航空学会 重庆市南岸区人民政府 招商执行单位&#xff1a; 重庆港华展览有限公司 为更好的培养空航天产业人才&#xff0c;汇聚航空教育产业创新科技&#xff0c;丰富和完善航…

【WEEK6】 【DAY2】DQL Data Querying - Part Two 【English Version】

2024.4.2 Tuesday Following the previous article 【WEEK6】 【DAY1】DQL Data Query - Part One【English Version】 Contents 4.4. Join Queries4.4.1. JOIN Comparison4.4.2. Seven Types of JOINs4.4.3. Examples4.4.3.1. In this example, the results of INNER JOIN and…

如何使用VNC+Cpolar实现Windows电脑公网远程控制Ubuntu系统桌面

文章目录 前言1. VisualSVN安装与配置2. VisualSVN Server管理界面配置3. 安装cpolar内网穿透3.1 注册账号3.2 下载cpolar客户端3.3 登录cpolar web ui管理界面3.4 创建公网地址 4. 固定公网地址访问 前言 SVN 是 subversion 的缩写&#xff0c;是一个开放源代码的版本控制系统…

Java就近原则和this关键字

Java 中的就近原则和 this 关键字有着密切的关系&#xff0c;特别是在处理成员变量与方法参数同名的情况下。就近原则指的是在同一作用域下&#xff0c;优先使用最近声明的变量或参数。 在 Java 中&#xff0c;如果一个方法的参数与类的成员变量同名&#xff0c;为了明确指示要…

Hadoop和zookeeper集群相关执行脚本(未完,持续更新中~)

1、Hadoop集群查看状态 搭建Hadoop数据集群时&#xff0c;按以下路径操作即可生成脚本 [test_1analysis01 bin]$ pwd /home/test_1/hadoop/bin [test_01analysis01 bin]$ vim jpsall #!/bin/bash for host in analysis01 analysis02 analysis03 do echo $host s…