Django-cron
Django-cron 可以定期运行Django/Python代码,提供跟踪和执行任务的基本管道,大多数人最常用的两种方式是编写自定义Python脚本或每个cron的管理命令。除此之外,通常还需要一些跟踪成功,失败等的机制。
安装
-
使用pip安装,最好安装在虚拟环境中
-
添加
django_cron
到你的Django设置INSTALLED_APPS
INSTALLED_APPS = ['django_cron', ]
-
运行
python manage.py migrate django_cron
-
在代码中某个地方编写一个cron类,以扩展该类
CronJobBase
from django_cron import CronJobBase, Scheduleclass MyCronJob(CronJobBase):RUN_EVERY_MINS = 120 # every 2 hoursschedule = Schedule(run_every_mins=RUN_EVERY_MINS)code = 'my_app.my_cron_job' # a unique codedef do(self):pass # do your thing here
-
添加一个名为
CRON_CLASSES
(类似于MIDDLEWARE_CLASSES
等)的变量,他是一个字符串列表,每个字符串都是一个cron类。例如:CRON_CLASSES = ["my_app.cron.MyCronJob",# ... ]
-
现在,每次运行管理命令时,如果需要,所有cron都会运行,根据应用程序,可以根据需要从
Unix crontab
调用管理命令。每5分钟通常适用于大多数应用程序,例如:python manage.py runcrons
> crontab -e */5 * * * * source /home/ubuntu/.bashrc && source /home/ubuntu/work/your-project/bin/activate && python /home/ubuntu/work/your-project/src/manage.py runcrons > /home/ubuntu/cronjob.log
管理命令
-
使用运行特定的cron,例如:
python manage.py runcrons cron_class ...
# only run "my_app.cron.MyCronJob" $ python manage.py runcrons "my_app.cron.MyCronJob"# run "my_app.cron.MyCronJob" and "my_app.cron.AnotherCronJob" $ python manage.py runcrons "my_app.cron.MyCronJob" "my_app.cron.AnotherCronJob"
-
使用强制运行你的cron,例如:
python manage.py runcrons --force
# run all crons, immediately, regardless of run time $ python manage.py runcrons --force
-
运行时不会向控制台发送任何消息,例如:
python manage.py runcrons --silent
# run crons, if required, without message to console $ python manage.py runcrons --silent
配置
CRON_CLASSES - cron 类别列表
DJANGO_CRON_LOCK_BACKEND 锁类的路径,默认值:"django_cron.backends.lock.cache.CacheLock"`
DJANGO_CRON_LOCKFILE_PATH - 存储 FileLock 文件的路径,默认值:"/tmp"
DJANGO_CRON_LOCK_TIME - CacheLock 后端的超时值,默认值:24 * 60 * 60 # 24 hours
DJANGO_CRON_CACHE - CacheLock 后端使用的缓存名称,默认值:"default"
DJANGO_CRON_DELETE_LOGS_OLDER_THAN - 整数,清除日志条目的天数(可选 - 如果未设置,则不会删除任何条目)
Cron配置示例
失败后重试功能
通过传递RETRY_AFTER_FAILURE_MINS
参数来运行cron
这将不会再下次运行runcrons时重新运行,但至少会RETRY_AFTER_FAILURE_MINS
在上次失败后重新运行
class MyCronJob(CronJobBase):RUN_EVERY_MINS = 60 # every hoursRETRY_AFTER_FAILURE_MINS = 5schedule = Schedule(run_every_mins=RUN_EVERY_MINS, retry_after_failure_mins=RETRY_AFTER_FAILURE_MINS)
定时运行功能
RUN_EVERY_MINS
您可以通过传递或参数来运行 cron RUN_AT_TIMES
。
这将每小时运行一次:
class MyCronJob(CronJobBase):RUN_EVERY_MINS = 60 # every hoursschedule = Schedule(run_every_mins=RUN_EVERY_MINS)
这将在给定的时间运行作业:
class MyCronJob(CronJobBase):RUN_AT_TIMES = ['11:30', '14:00', '23:15']schedule = Schedule(run_at_times=RUN_AT_TIMES)
小时格式为HH:MM
(24 小时制)。django-cron
将按照设置所指定的您网站的本地时区来解释这些时间TIME_ZONE
。
您还可以混合使用这两种方法:
class MyCronJob(CronJobBase):RUN_EVERY_MINS = 120 # every 2 hoursRUN_AT_TIMES = ['6:30']schedule = Schedule(run_every_mins=RUN_EVERY_MINS, run_at_times=RUN_AT_TIMES)
这将每 2 小时运行一次作业,并在 6:30 运行一次。
允许并行运行
默认情况下不允许并行运行(出于安全原因)。但是,如果您想启用它们,只需添加:
ALLOW_PARALLEL_RUNS = True
在你的 CronJob 类中。笔记
请注意,这需要安装缓存框架,如https://docs.djangoproject.com/en/dev/topics/cache/所述
如果您希望覆盖所使用的缓存,请将其放入您的设置文件中:
DJANGO_CRON_CACHE = 'cron_cache'
运行失败通知CronJob
此示例 cron 检查上次 cron 作业的结果。如果连续 10 次失败,则会向用户发送电子邮件。
安装所需的依赖项:Django>=1.7.0
,django-common>=0.5.1
。
添加django_cron.cron.FailedRunsNotificationCronJob
到您的CRON_CLASSES
设置文件中。
在您的 cron 类中设置最小失败运行次数MIN_NUM_FAILURES
(默认值 = 10)。例如:
class MyCronJob(CronJobBase):RUN_EVERY_MINS = 10MIN_NUM_FAILURES = 3schedule = Schedule(run_every_mins=RUN_EVERY_MINS)code = 'app.MyCronJob'def do(self):... some action here ...
ADMINS
电子邮件从设置文件导入
要设置电子邮件前缀,您必须FAILED_RUNS_CRONJOB_EMAIL_PREFIX
在设置文件中添加(默认为空)。例如:
FAILED_RUNS_CRONJOB_EMAIL_PREFIX = "[Server check]: "
FailedRunsNotificationCronJob`检查每个 cron`CRON_CLASSES
锁定后端
您可以通过以下设置之一来使用两个内置锁定后端之一DJANGO_CRON_LOCK_BACKEND
:
django_cron.backends.lock.cache.CacheLock
(默认)django_cron.backends.lock.file.FileLock
缓存锁
该后端设置一个缓存变量来标记当前作业为“已运行”,并在释放锁时将其删除。
文件锁
该后端创建一个文件来标记当前作业为“已运行”,并在释放锁时将其删除。
定制锁
您还可以将自定义后端编写为的子类django_cron.backends.lock.base.DjangoCronJobLock
并定义lock()
和release()
方法。
django-filter
django-filter 提供了一种基于用户提供的参数筛选查询集的简单方法,假设我么有一个模型Product
模型
from django.db import modelsclass Product(models.Model):name = models.CharField(max_length=255)price = models.DecimalField(max_digits=5, decimal_places=2)description = models.TextField()release_date = models.DateField()manufacturer = models.ForeignKey(Manufacturer, on_delete=models.CASCADE)
过滤器
我们有许多字段,我们希望让用户根据名称,价格或release_date进行筛选。
import django_filtersclass ProductFilter(django_filters.FilterSet):name = django_filters.CharFilter(lookup_expr='iexact')class Meta:model = Productfields = ['price', 'release_date']
声明过滤器
声明式语法在创建过滤器时为您提供了最大的灵活性,但它相当冗长。我们将使用以下示例概述上的核心过滤器参数FilterSet
:
class ProductFilter(django_filters.FilterSet):price = django_filters.NumberFilter()price__gt = django_filters.NumberFilter(field_name='price', lookup_expr='gt')price__lt = django_filters.NumberFilter(field_name='price', lookup_expr='lt')release_year = django_filters.NumberFilter(field_name='release_date', lookup_expr='year')release_year__gt = django_filters.NumberFilter(field_name='release_date', lookup_expr='year__gt')release_year__lt = django_filters.NumberFilter(field_name='release_date', lookup_expr='year__lt')manufacturer__name = django_filters.CharFilter(lookup_expr='icontains')class Meta:model = Productfields = ['price', 'release_date', 'manufacturer']
对于过滤器有两个主要参数:
- field_name:要过来的模型字段的名称,可以使用Django的
__
语法遍历“关系路径”来过滤相关模型上的字段,例如,manufacturer__name
lookup_expr
:过滤时要使用的字段查找。Django 的__
语法可以再次用于支持查找转换。例如,year__gte
。
字段field_name
和一起代表一个完整的 Django 查找表达式。Django 的查找参考lookup_expr
中提供了查找表达式的详细说明。django-filter 支持包含转换和最终查找的表达式。
使用Meta.fields生成过滤器
FilterSet Meta 类提供了一个fields
属性,可用于轻松指定多个过滤器,而无需大量代码重复。基本语法支持多个字段名称的列表:
import django_filtersclass ProductFilter(django_filters.FilterSet):class Meta:model = Productfields = ['price', 'release_date']
以上代码针对“price”和“release_date”字段生成“精确”查找。
此外,可以使用字典为每个字段指定多个查找表达式:
import django_filtersclass ProductFilter(django_filters.FilterSet):class Meta:model = Productfields = {'price': ['lt', 'gt'],'release_date': ['exact', 'year__gt'],}
以上将生成“price__ lt”、“price__ gt”、“release_date”和“release_date__ year__gt”过滤器。
过滤器查找类型"exact"是隐式默认值,因此永远不会添加到过滤器名称中。在上面的示例中,发布日期的精确过滤器是“release_date”,而不是“release_date__exact”。这可以通过 FILTERS_DEFAULT_LOOKUP_EXPR 设置覆盖。
覆盖默认过滤器
与此类似django.contrib.admin.ModelAdmin
,可以使用 filter_overrides
类覆盖所有相同类型的模型字段的默认过滤器Meta
:
class ProductFilter(django_filters.FilterSet):class Meta:model = Productfields = {'name': ['exact'],'release_date': ['isnull'],}filter_overrides = {models.CharField: {'filter_class': django_filters.CharFilter,'extra': lambda f: {'lookup_expr': 'icontains',},},models.BooleanField: {'filter_class': django_filters.BooleanFilter,'extra': lambda f: {'widget': forms.CheckboxInput,},},}
基于请求的过滤
可以FilterSet
使用可选参数初始化request
。如果传递了请求对象,那么您可以在过滤期间访问该请求。这允许您根据请求的属性进行过滤,例如当前登录的用户或Accepts-Languages
标头。
无法保证请求一定会被提供给FilterSet 实例。任何依赖于请求的代码都应该处理None 的情况。
过滤主要项.qs
要按对象过滤主要查询集request
,只需覆盖该 FilterSet.qs
属性即可。例如,您可以过滤博客文章,只显示已发布的文章和登录用户拥有的文章(可能是作者的草稿文章)。
class ArticleFilter(django_filters.FilterSet):class Meta:model = Articlefields = [...]@propertydef qs(self):parent = super().qsauthor = getattr(self.request, 'user', None)return parent.filter(is_published=True) \| parent.filter(author=author)
过滤相关查询集ModelChoiceFilter
和 的参数queryset
支持 可调用行为。如果传递了可调用对象,它将以作为其唯一参数进行调用 。这允许您执行相同类型的基于请求的过滤,而无需诉诸覆盖。ModelChoiceFilter``ModelMultipleChoiceFilter``request``FilterSet.__init__
def departments(request):if request is None:return Department.objects.none()company = request.user.companyreturn company.department_set.all()class EmployeeFilter(filters.FilterSet):department = filters.ModelChoiceFilter(queryset=departments)...
自定义过滤Filter.method
您可以通过指定执行过滤来控制过滤器的行为。在方法参考method
中查看更多信息。请注意,您可以访问过滤器集的属性,例如。request
class F(django_filters.FilterSet):username = CharFilter(method='my_custom_filter')class Meta:model = Userfields = ['username']def my_custom_filter(self, queryset, name, value):return queryset.filter(**{name: value,})
视图
现在我们需要编写一个视图:
def product_list(request):f = ProductFilter(request.GET, queryset=Product.objects.all())return render(request, 'my_app/template.html', {'filter': f})
如果没有提供查询集参数,那么将使用模型默认管理器中的所有项目。
如果您想要访问视图中的已过滤对象(例如,如果您想要对它们进行分页),则可以这样做。它们位于 f.qs 中
URL 配置
我们需要一个 URL 模式来调用视图:
path('list/', views.product_list, name="product-list")
模板
最后我们需要一个模板:
{% extends "base.html" %}{% block content %}<form method="get">{{ filter.form.as_p }}<input type="submit" /></form>{% for obj in filter.qs %}{{ obj.name }} - ${{ obj.price }}<br />{% endfor %}
{% endblock %}
这就是全部内容!该form
属性包含一个普通的 Django 表单,当我们对其进行迭代时,FilterSet.qs
我们会在结果查询集中获取对象。
通用视图和配置
除了上述用法之外,django-filter 中还包含一个基于类的通用视图,位于django_filters.views.FilterView
。您必须提供model
或filterset_class
参数,类似于 ListView
Django 本身:
# urls.py
from django.urls import path
from django_filters.views import FilterView
from myapp.models import Producturlpatterns = [path("list/", FilterView.as_view(model=Product), name="product-list"),
]
如果您提供一个model
可选选项,您可以设置filterset_fields
指定一个列表或一个您想要包含的字段元组,用于自动构建过滤器集类。
您必须提供一个模板,在<app>/<model>_filter.html
其中获取上下文参数filter
。此外,上下文将包含 object_list
保存过滤查询集的内容。
django-filter 中仍包含一个传统的函数式通用视图,尽管它已被弃用。可以在 找到它 django_filters.views.object_filter
。您必须向它提供与基于类的视图相同的参数:
# urls.py
from django.urls import path
from django_filters.views import object_filter
from myapp.models import Producturlpatterns = [path("list/", object_filter, {'model': Product}, name="product-list"),
]
所需的模板及其上下文变量也与上面的基于类的视图相同。
与DRF集成
-
导入路径
from django_filters import rest_framework as filtersclass ProductFilter(filters.FilterSet):...
-
类视图中添加
DjangoFilterBackend
到filter_backends
from django_filters import rest_framework as filtersclass ProductList(generics.ListAPIView):queryset = Product.objects.all()serializer_class = ProductSerializerfilter_backends = (filters.DjangoFilterBackend,)filterset_fields = ('category', 'in_stock')
如果您想默认使用 django-filter 后端,请将其添加到DEFAULT_FILTER_BACKENDS
设置中。
# settings.py
INSTALLED_APPS = [# ...'rest_framework','django_filters',
]REST_FRAMEWORK = {'DEFAULT_FILTER_BACKENDS': ('django_filters.rest_framework.DjangoFilterBackend',# ...),
}
添加 FilterSet filterset_class
要启用过滤FilterSet
,请将其添加到filterset_class
视图类的参数中。
from rest_framework import generics
from django_filters import rest_framework as filters
from myapp import Productclass ProductFilter(filters.FilterSet):min_price = filters.NumberFilter(field_name="price", lookup_expr='gte')max_price = filters.NumberFilter(field_name="price", lookup_expr='lte')class Meta:model = Productfields = ['category', 'in_stock']class ProductList(generics.ListAPIView):queryset = Product.objects.all()serializer_class = ProductSerializerfilter_backends = (filters.DjangoFilterBackend,)filterset_class = ProductFilter
使用filterset_fields
快捷方式
FilterSet
您可以通过将其添加到视图类来绕过创建filterset_fields
。这相当于FilterSet
仅使用Meta.fields创建。
from rest_framework import generics
from django_filters import rest_framework as filters
from myapp import Productclass ProductList(generics.ListAPIView):queryset = Product.objects.all()filter_backends = (filters.DjangoFilterBackend,)filterset_fields = ('category', 'in_stock')# Equivalent FilterSet:
class ProductFilter(filters.FilterSet):class Meta:model = Productfields = ('category', 'in_stock')
请注意,不支持一起使用
filterset_fields
和。filterset_class
覆盖FIlterSet
创建
FilterSet
可以通过重写后端类上的以下方法来定制创建:
.get_filterset(self, request, queryset, view)
.get_filterset_class(self, view, queryset=None)
.get_filterset_kwargs(self, request, queryset, view)
您可以针对每个视图逐个覆盖这些方法,创建唯一的后端,或者可以使用这些方法将您自己的挂钩写入视图类。
class MyFilterBackend(filters.DjangoFilterBackend):def get_filterset_kwargs(self, request, queryset, view):kwargs = super().get_filterset_kwargs(request, queryset, view)# merge filterset kwargs provided by view classif hasattr(view, 'get_filterset_kwargs'):kwargs.update(view.get_filterset_kwargs())return kwargsclass BookFilter(filters.FilterSet):def __init__(self, *args, author=None, **kwargs):super().__init__(*args, **kwargs)# do something w/ authorclass BookViewSet(viewsets.ModelViewSet):filter_backends = [MyFilterBackend]filterset_class = BookFilterdef get_filterset_kwargs(self):return {'author': self.get_author(),}
使用核心 API 和开放 API 生成模式
后端类通过实现get_schema_fields()
和与 DRF 的模式生成集成get_schema_operation_parameters()
。get_schema_fields()
安装核心 API 时会自动启用。get_schema_operation_parameters()
对于开放 API 始终启用(自 DRF 3.9 起新增)。模式生成通常可以无缝运行,但是实现确实需要调用视图的get_queryset()
方法。需要注意的是,视图是在模式生成期间人工构建的,因此args
和kwargs
属性将为空。如果您依赖从 URL 解析的参数,则需要处理它们在中的缺失get_queryset()
。
例如,您的获取查询集方法可能如下所示:
class IssueViewSet(views.ModelViewSet):queryset = models.Issue.objects.all()def get_project(self):return models.Project.objects.get(pk=self.kwargs['project_id'])def get_queryset(self):project = self.get_project()return self.queryset \.filter(project=project) \.filter(author=self.request.user)
可以这样重写
class IssueViewSet(views.ModelViewSet):queryset = models.Issue.objects.all()def get_project(self):try:return models.Project.objects.get(pk=self.kwargs['project_id'])except models.Project.DoesNotExist:return Nonedef get_queryset(self):project = self.get_project()if project is None:return self.queryset.none()return self.queryset \.filter(project=project) \.filter(author=self.request.user)
或者更简单地说:
class IssueViewSet(views.ModelViewSet):queryset = models.Issue.objects.all()def get_queryset(self):# project_id may be Nonereturn self.queryset \.filter(project_id=self.kwargs.get('project_id')) \.filter(author=self.request.user)
django-cors-headers
将跨源资源共享(CORS)表头添加到响应的Django应用程序,这允许从其他来源向django应用程序发出浏览器内请求
安装django-cors-headers
pip install django-cors-headers
添加到应用程序中
INSTALLED_APPS = [ ... , "corsheaders" , ... ,
]
在中间件中监听响应
CorsMiddleware 需要放在第一位,如果不放在第一位,可能会造成某些请求没有添加CORS
MIDDLEWARE = [ "corsheaders.middleware.CorsMiddleware" , "django.middleware.common.CommonMiddleware" , ... ,
]
设置访问白名单
CORS_ORIGIN_WHITELIST = ('http://xxx.com','https://xxx.com','127.0.0.1:8080','localhost:8080',
)
设置运行携带cookie
CORS_ALLOW_CREDENTIALS = True
默认请求头列表
CORS_ALLOW_HEADERS = [ "accept" , "accept-encoding" , "authorization" , "content-type" , "dnt" , "origin" , "user-agent" , "x-csrftoken" , "x-requested-with" ,
]
supervisor部署Django项目
简介
Supervisor是一个用于进程管理的开源工具,通常用于在Unix和类Unix系统上监控和控制进程的运行。它提供了一个简单而强大的方式来管理后台进程,例如Web服务器、任务队列、应用程序等。
Supervisor的主要功能包括:
进程监控:Supervisor可以监控指定的进程,并在进程意外终止时自动重新启动它们。这对于确保关键进程的持续运行非常有用,以及防止因进程崩溃而导致的服务中断。
进程控制:Supervisor允许您通过命令行或API控制进程的启动、停止、重启等操作。这使得管理和操作进程变得更加便捷,无需手动干预或编写复杂的脚本。
日志管理:Supervisor可以捕获和管理进程的输出日志,包括标准输出和标准错误。它提供了对日志文件的轻松访问和旋转,以便有效地跟踪和调试应用程序的运行情况。
配置灵活:Supervisor使用简单的配置文件来定义要监控和管理的进程。您可以为每个进程指定启动命令、工作目录、运行用户等信息,并通过配置文件灵活地定义进程之间的依赖关系。
扩展性:Supervisor支持通过插件扩展其功能。您可以使用插件来添加额外的监控指标、告警机制、Web界面等,以满足特定需求或增强系统的可视化和可管理性。
Supervisor的使用非常广泛,特别适用于服务器环境下的进程管理。它被广泛应用于Web服务器(如Nginx、Apache)、应用程序框架(如Django、Flask)、队列处理(如Celery)等场景,以确保关键进程的稳定运行和自动恢复。
总而言之,Supervisor是一个可靠而灵活的进程管理工具,它简化了在Unix系统上管理和监控后台进程的任务,提供了更好的稳定性和可管理性。
安装
pip install supervisor
生成Supervisor配置文件
也可以使用默认的配置,默认的配置文件为
/etc/supervisord.conf
安装Supervisor之后可以直接使用内置的命令来生成一个默认的配置文件,这个配置文件也是作为Supervisor的启动文件,具体命令为:
echo_supervisord_conf > supervisord.conf
这样就得到了一个基本的配置文件,不过文件里面大部分内容都是注释,而且很多都是非必需的配置,我们可以直接来一个基础配置,如下:
[supervisord]
logfile=/var/log/supervisor/supervisord.log
pidfile=/tmp/supervisord.pid[unix_http_server]
file=/tmp/supervisor.sock[rpcinterface:supervisor]
supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface[supervisorctl]
serverurl=unix:///tmp/supervisor.sock[include]
files = /opt/cloud/izone/supervisord.d/*.conf
这里有几个地方需要注意,第一个是 pidfile 参数,这个就是进程自动生成的 pid 文件地址,然后是 file 参数和 serverurl 参数的地址应该报错一致,这两个文件会自动生成,所以要保证权限有。
include 里面的 files 就很类似 nginx 配置里面的 conf.d 目录,就是表示配置可以加载其他地方的配置,比如一些进程配置可以放到这里,当然,你也可以直接把配置放到这个主配置文件中。
创建服务配置
上面的主配置中 include 的目录中创建服务配置,比如django.conf,放到 /opt/cloud/izone/supervisord.d/ 目录里面就行。
情况1
[program:gunicorn-django]
# 启动 Gunicorn 服务的命令,这里是运行 Django 应用的 izone.wsgi 模块并监听 0.0.0.0:8000
command=gunicorn izone.wsgi -b 0.0.0.0:8000
# 设置运行命令时的工作目录
directory=/opt/cloud/izone
# 正常输出和错误输出的日志文件路径。这里将日志统一存放在 /var/log/supervisor/ 目录
stdout_logfile=/var/log/supervisor/gunicorn.django.log
stderr_logfile=/var/log/supervisor/gunicorn.django.log
# Supervisor 启动时自动启动服务
autostart=true
autorestart=true
# 程序启动后持续运行超过这个时间(秒)Supervisor 才认为启动成功。
startsecs=2
# 在停止进程时,Supervisor 等待进程平滑退出的最大秒数。
stopwaitsecs=2
# 服务启动的优先级,数值越小优先级越高。
priority=100
这里的参数解释一下:
command
:启动的进程命令,比如这里是用gunicorn
来启动Django
directory
:启动命令前进入的目录,比如这里是进入Django
项目根目录stdout_logfile
和stderr_logfile
:日志路径autostart
:跟随supervisor
一起启动autorestart
:进程死掉自动重启startsecs
:启动几秒后没有异常退出,就表示进程正常启动了stopwaitsecs
:杀死进程前等待的时间priority
:进程启动优先级,值小的最先启动,关闭的时候最后关闭
情况2
[program:gunicorn]
# 指定运行gunicorn的命令,包括配置文件路径
command=/data01/winterfall/anaconda3/bin/gunicorn algo_platform.wsgi:application -c /data01/winterfall/supervisor/gunicorn.conf
directory=/data01/winterfall/algo_platform_manager
user=winterfall
autostart=true
autorestart=true
redirect_stderr=true
# 日志文件的最大字节数,达到该大小时会创建新日志文件
stdout_logfile_maxbytes = 20MB
# 保留日志文件的最大数量
stdout_logfile_backups = 10
# 标准输出和重定向错误的日志文件路径
stdout_logfile = /data01/winterfall/supervisor/logs/gunicorn.log
django项目中设置
# wsgi.py 文件import os
from django.core.wsgi import get_wsgi_applicationos.environ.setdefault('DJANGO_SETTINGS_MODULE', 'algo_platform.settings')application = get_wsgi_application()
# settings.py 配置
# 允许的主机: 确保 ALLOWED_HOSTS 包含运行服务器的域名或 IP 地址,避免拒绝请求。
ALLOWED_HOSTS = ['*']
Gunicorn 配置文件 (gunicorn.conf
)
-
这个配置文件的位置由
-c
选项指定,你需要确保它存在且配置正确。一般包括以下内容:# 并行工作进程数 workers = 8 # 指定每个工作者的线程数 threads = 1 # 监听内网端口 bind = 'unix:/data01/winterfall/supervisor/tmp/gateway-manager.sock' # 是否后台运行 daemon = 'false' # 工作模式协程 gevent,它是一种基于协程的并发网络库,支持处理大量并发连接,适用于需要高并发和高 I/O 的场景。 worker_class = 'gevent' # 设置最大并发量 worker_connections = 2000 # 设置访问日志和错误信息日志路径 accesslog = '/data01/winterfall/logs/gunicorn_access.log' errorlog = '/data01/winterfall/logs/gunicorn_error.log' # 设置日志记录水平 loglevel = 'warning' # 进程文件 pidfile = '/data01/winterfall/supervisor/tmp/gateway-manager.pid'
-
Nginx配置
- Nginx 配置需要修改,指向这个 Unix 套接字而不是 IP 地址和端口。
server {listen 80;server_name your_domain.com;location / {proxy_pass http://unix:/data01/winterfall/supervisor/tmp/gateway-manager.sock;proxy_set_header Host $host;proxy_set_header X-Real-IP $remote_addr;proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;proxy_set_header X-Forwarded-Proto $scheme;} }
- 在
proxy_pass
指令中,将其设置为http://unix:/path/to/socket.sock;
-
权限设置:
- 确保创建 Unix 套接字的目录存在,并且运行 Gunicorn 和 Web 服务器(如 Nginx)的用户对该目录具有读写权限。
- 可以通过设置目录的权限或者使用
chown
命令更改文件所有者。
-
性能: Unix 套接字通常比 TCP 提供更低的延迟,因为它们绕过了网络栈。
-
安全: 使用 Unix 套接字时,连接无法在网络上传输,因此在防止外部攻击方面更为安全。
启动Supervisor
启动命令
配置创建之后,可以直接启动Supervisor,命令如下:
supervisord -c supervisord.conf
其实就很简单,使用 supervisord
启动,然后 -c
指定一下启动的配置文件。这个时候如果没有问题可以看到后台有个 gunicorn
进程在运行,也就是 django
在运行。
更新配置
涉及配置变动,需要更新。保存并关闭配置文件后,我们需要重新加载Supervisor的配置,使其生效。可以运行以下命令:
supervisorctl reread
supervisorctl update
这将使Supervisor读取新的配置文件并更新应用程序。
服务的操作
现在,可以使用Supervisor来启动、停止和管理Django应用程序了。可以运行以下命令:
supervisorctl start gunicorn-django
supervisorctl stop gunicorn-django
supervisorctl restart gunicorn-django
这将启动、停止或重新启动Django应用程序。
通过以上步骤,我们成功地使用Supervisor部署了Django应用程序。Supervisor将负责监控应用程序的运行状态,并在需要时自动重启应用程序。这样,我们可以确保Django应用程序在服务器上持续稳定地运行。
停止Supervisor
停止命令:
supervisorctl shutdown
容器化部署改动
修改 Dockerfile
由于更改了部署方式,所以在容器里面也要修改一下,之前是直接使用 gunicorn
来运行,现在改成 supervisord
。
修改为:
FROM python:3.9
ARG pip_index_url=https://pypi.org/simple
ARG pip_trusted_host=pypi.org
ENV PYTHONUNBUFFERED=1
WORKDIR /opt/cloud/izoneRUN cp /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && \mkdir -p /var/log/supervisor
COPY requirements.txt requirements.txt
RUN pip install --no-cache-dir -r requirements.txt --index-url $pip_index_url --trusted-host $pip_trusted_host
COPY . .CMD ["supervisord", "-n", "-c", "supervisord.conf"]
在容器里面必须加上 -n
参数,表示在前台运行,不然容器是无法运行的。
修改 docker-compose 文件
由于这次改动直接把 CMD
命令写到镜像构建里面了,所以容器启动的时候就会自动执行这个命令,那原来 docker-compose 文件里面的 command
参数就可以删除,具体要删除的是这里:
command: gunicorn izone.wsgi -b 0.0.0.0:8000
Celery
简介
celery是一个基于Python开发的异步消息任务队列,它专注于实时处理的异步任务队列,同时也支持任务调度,celery的设计是简单的,灵活且可靠,能够处理大量消息,适用于多种场景下的任务调度和执行。
celery使用场景
- 异步任务:将耗时操作交给celery异步任务执行,比如:发短信,邮件,消息推送,音视频处理等等
- 定时任务:利用celery的定时任务功能,可以定时执行某些任务,如每天的数据统计,定时 清理缓存等
- 分布式任务调度:在分布式系统中,celery可以作为任务调度的中心,协调各个节点的任务执行。在分布式系统中,celery可以作为任务调度的中心,协调各个节点上的任务执行
celery的架构
消息中间件(Broker):作为任务的中间人(Broker),负责接收和分发任务消息。Celery本身不提供消息中间件,但可以与第三方提供的消息中间件集成,如RabbitMQ、Redis等。
任务执行单元(Worker):是Celery处理任务的执行单元,Worker并发的运行在分布式的系统节点中,持续不断地监视任务队列,并从中获取新的任务进行处理。
任务执行结果存储(backend):用于存储Worker执行的任务结果,Celery支持以不同方式存储任务的结果,如Mysql、Redis等。
Django配置
-
在settings文件夹配置
# celery CELERY_BROKER_URL = 'redis://:{passwd}@{host}:{port}/0'.format(passwd=REDIS_PASSWD, host=REDIS_HOST, port=REDIS_PORT) CELERY_RESULT_BACKEND = 'redis://:{passwd}@{host}:{port}/1'.format(passwd=REDIS_PASSWD, host=REDIS_HOST,port=REDIS_PORT) CELERY_BEAT_SCHEDULE = {# 'check-application': {# 'task': 'application.tasks.get_api',# 'schedule': timedelta(days=7),# },'check-server-alarm': {'task': 'application.tasks.service_auto_start','schedule': timedelta(minutes=1)} }CELERY_ACCEPT_CONTENT = ['json'] CELERY_TASK_SERIALIZER = 'json' CELERY_RESULT_SERIALIZER = 'json' CELERY_TIMEZONE = 'Asia/Shanghai' # 时区配置 CELERY_TASK_RESULT_EXPIRES = 60*60*24 # 任务过期时间
-
新建celery.py文件,必须与settings.py同级目录,再在celery.py里面写入下面代码
#!/usr/bin/env python # encoding: utf-8from __future__ import absolute_import, unicode_literals from django.conf import settings import os from logging.config import dictConfig from celery import Celery from celery.signals import after_setup_logger@after_setup_logger.connect def setup_loggers(logger, *args, **kwargs):dictConfig(settings.LOGGING)os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'algo_platform.settings')app = Celery('algo-platform') app.config_from_object('django.conf:settings', namespace='CELERY') app.autodiscover_tasks()
-
在同级的__ init __.py文件中写入下面代码
#!/usr/bin/env python # coding: utf-8from __future__ import absolute_import, unicode_literals from .celery import app as celery_app import pymysql pymysql.install_as_MySQLdb()__all__ = ('celery_app',)
-
然后在app目录下新建task.py文件,这个文件是要执行的异步任务,定时任务,只能用task.py命名,为了区别是clery执行的任务所以我们还需给这个函数加上装饰器@shared_task,当然这里面的函数是我们自己定义的,可以是发送邮件等其它逻辑,但是必须要加上装饰器。
from celery import shared_task@shared_task def add(x,y):return x+y
from algo_platform import celery_app@celery_app.task(bind=True) def app_alarm(self, alarm_list):pass
-
提交任务到消息中间件,需要使用django视图函数来执行这个函数,delay是执行异步任务的,add的参数也需要写到delay里面,执行这个视图函数并不是运行了,只是提交了运行这个函数的任务
class AddView(APIView):def get(self, request):sum_ = add.delay(1, 2) # add是函数的名字,delay是提交异步任务到队列中,但是还没有执行print(sum_)print(sum_.get())return Response(data='ok')
class AddView(APIView):def get(self, request):if enable_alarm:app_alarm.delay(alarm_list)
-
执行任务
要执行这个任务需要celery的worker,所以我们要在终端输入以下命令来启动worker
celery -A house worker -l info -P eventlet --concurrency=200 #要将house换成django项目的名字其中 concurrency=200将worker的数量变成200
定时任务和异步任务
sum_ = add.delay(1, 2) #delay提交异步任务
sum_ = add.apply_async(args=[1,2],eta='什么时候执行') #apply_async表示定时任务
celery整合Supervisor
supvisorctl新增celery beat定时任务调度
celery.conf 配置
# 异步任务
[program:celery]
# 指定要允许的命令 -A 指定了celery应用 --loglevel=info 定义了日志级别
command=/data01/dingpeichang/anaconda3/bin/celery -A developers_ai worker --loglevel=info
# celery命令执行时所在的工作目录,这里是项目代码所在的路径
directory=/data01/dingpeichang/developers_ai/developers_ai_manager # 项目代码
# 运行该程序的用户名称
user=dingpeichang
# 指定同时运行的进程数
numprocs=1
# 设置为true表示在supervisor启动时自动启动该程序
autostart=true
# 如果程序意外退出,会自动重新启动
autorestart=true
# 如果程序启动后能持续运行超过指定秒(10秒),supervisor才会认为程序启动成功
startsecs=10
# 标准输出流日志的存放位置
stdout_logfile = /data01/dingpeichang/developers_ai/logs/celery.log
# 标准错误流日志的存放位置
stderr_logfile = /data01/dingpeichang/developers_ai/logs/celery_err.log
# 在停止进程时,supervisor等待进程平滑退出的最大秒数
stopwaitsecs = 600
# 用于停止进程的信号
stopsignal=QUIT
# 设置为true,表示将发送停止信号给包含子进程在内的整个进程组
killasgroup=true
# 设置该程序的启动优先级,数值越低优先级越高
priority=1000# 定时任务
[program:celerybeat]
# 用来启动 Celery Beat,用于任务调度
command=/data01/dingpeichang/anaconda3/bin/celery -A developers_ai beat --loglevel=info
directory=/data01/dingpeichang/developers_ai/developers_ai_manager # 项目代码
user=dingpeichang # 程序部署的用户环境
numprocs=1
autostart=true
autorestart=true
startsecs=10
stdout_logfile = /data01/dingpeichang/developers_ai/logs/celerybeat.log
stderr_logfile = /data01/dingpeichang/developers_ai/logs/celerybeat_err.log
stopwaitsecs = 600
stopsignal=QUIT
killasgroup=true
priority=999
不需要停止整个Supervisor来添加新的程序。可以按照以下步骤来添加Celery Beat:
- 在Supervisor的配置文件中添加Celery Beat的配置,如上面所示。
- 保存并关闭配置文件。
- 运行
supervisorctl reread
命令,让Supervisor重新读取配置文件。这个命令会告诉Supervisor有新的或者已经更改的配置文件。 - 运行
supervisorctl update
命令,让Supervisor按照新的配置启动和管理程序。这个命令会启动新配置的程序,并且如果有必要的话,也会重启已经更改配置的程序。
以上步骤完成后,Celery Beat应该就已经被Supervisor管理并运行了。你可以通过supervisorctl status
命令来查看所有程序的状态,确认Celery Beat是否已经启动。