首次探秘 Django 的新后台任务框架
Django 6.0 在 django.tasks 中引入了内置的后台任务框架。但请勿急于淘汰 Celery、Huey 或其他首选解决方案。
发布说明对此有明确说明:
Django 负责任务创建和队列管理,但 不提供执行任务的工件机制 。任务执行必须由外部基础设施(如独立进程或服务)管理。
新版 django.tasks 模块的核心目标是 为任务队列实现提供通用 API 。Jake Howard 是这项增强功能的核心推动者。欢迎查阅Django论坛上的项目介绍。
他的参考实现(同时兼容早期Django版本的回溯移植)已发布于GitHub上的django-tasks。
但我们暂且忽略该实现,转而探索Django 6.0内置的精简版本。通过创建专属后端和工作进程,我们来实践:
我们的项目:通知系统
我们将创建一个应用程序,利用ntfy.sh向手机及其他设备发送通知。(我是它的粉丝!)
若想直接参与代码开发,请查看GitHub上的最终项目版本。
使用nfty向手机发送通知只需:
- 注册账户
- 创建主题。
- 在手机上安装应用并登录。
- 向
https://ntfy.sh/<yourtopic>发送HTTP请求
免费版仅提供公开主题和消息。这意味着任何订阅该主题的人都能看到您发送的内容。为满足我们的需求,可直接创建随机命名的主题(如UUID)。
项目配置要求将步骤4的URL作为环境变量提供。例如:
NTFY_URL=https://ntfy.sh/062519693d9c4913826f0a39aeea8a4c
以下是承担核心功能的函数:
import httpx
from django.conf import settings
def send_notification(message: str, title: str | None):
# Pass the title if specified.
headers = {"title": title} if title else {}
httpx.post(
settings.NTFY_URL,
content=message,
headers=headers,
)
真的。至此已具备发送接收通知的基本功能。

快速入门指南
建议查阅Django任务框架文档获取详细说明,但为节省时间我们提供简要指南。
定义任务
这是新框架的核心目标:通过 Django 标准 API 定义任务,而非使用任务队列专属装饰器或其他方法。
具体实现如下:
# ...
from django.tasks import task
@task
def send_notification(message: str, title: str | None):
# ...as before
该函数现已成为任务。准确来说是 django.tasks.Task 对象。
您无法再直接调用send_notification。任务必须通过enqueue方法运行。这可能与您的预期行为不符,但这是最佳方案。该设计彻底杜绝了在进程中意外调用任务(而非后台执行)的可能性。
task装饰器允许您指定任务优先级、队列名称和后端名称。可通过using方法覆盖这些设置,该方法将返回新的django.tasks.Task实例。
若需更精细地控制任务行为,可在装饰器中将takes_context设为True,并将context作为首参数传递。当前该上下文可访问任务结果,从而获取尝试次数等有用信息。
无法定义重试机制、延迟策略等完整任务队列实现中的高级功能——但这 并非 该组件的设计初衷。如有需要,您可通过检查任务上下文轻松添加自定义重试逻辑。
任务入队
向队列添加任务非常简单:
task_result = send_notification.enqueue(
message="Season's greeting!",
title="Santa has something to tell you"
)
执行任务
此处功能尚有不足。至少当前版本如此。Django 6.0 将提供 ImmediateBackend 和 DummyBackend 两种后端:前者立即执行任务,后者则完全不执行任务。
因此我们的项目包含一个基于数据库和工作进程的(演示)后端!
获取结果
若无需即时等待结果,可通过任务ID后续获取。只需调用任务的get_result(result_id)方法即可。
本项目包含一个定期通过htmx轮询未处理结果的视图。

表单下方的列表展示每次任务执行的结果。表单提交后,新结果将添加至列表顶部。只要结果状态未变为FAILED或SUCCESSFUL,Htmx就会持续轮询更新。
def task_result(request, result_id, status):
result = send_notification.get_result(result_id)
if result.status == status:
# No need to swap the result.
return HttpResponse(status=204)
return TemplateResponse(request, "index.html#result", {"result": result})
想知道index.html#results在做什么?Django 6.0还引入了模板片段。在此示例中,我们的视图实际上发送的响应仅包含名为result的模板片段。
幕后机制
当你用 task 装饰器修饰可调用对象时,系统会调用配置后端的 task_class 类来封装该对象。默认使用 django.task.Task 类。
该类的 enqueue 方法会进一步调用配置后端的 enqueue 方法。
调用其get_result方法的逻辑类似:调用配置后端的get_result方法并传递结果。
由于无需工作进程,这基本就是任务后端所需实现的全部功能。很酷吧?我们来添加一个后端吧?
任务数据库后端
我们的目标:
- 基于数据库实现基础任务后端
- 支持“自动重试”机制
我们的enqueue和get_result方法将返回默认django.tasks.TaskResult实例。这决定了我们需要存储的最小数据量,我们将通过名为Task的模型实现存储。
模型设计
基于 django.tasks 中 TaskResult 和 Task 的属性(即“数据类”),我们先创建 Task 模型的初稿:
class Task(models.Model):
priority = models.IntegerField(default=0)
callable_path = models.CharField(max_length=255)
backend = models.CharField(max_length=200)
queue_name = models.CharField(max_length=100)
run_after = models.DateTimeField(null=True, blank=True)
takes_context = models.BooleanField(default=False)
# Stores args and kwargs
arguments = models.JSONField(null=True, blank=True)
status = models.CharField(
choices=TaskResultStatus.choices, max_length=10, default=TaskResultStatus.READY
)
enqueued_at = models.DateTimeField()
started_at = models.DateTimeField(blank=True, null=True)
finished_at = models.DateTimeField(blank=True, null=True)
last_attempted_at = models.DateTimeField(blank=True, null=True)
return_value = models.JSONField(null=True, blank=True)
缺失了什么?首先,TaskResult还包含遇到的错误列表以及处理任务的工人的ID。这些信息我们 或许 可以忽略。
但TaskResult.attempts属性基于工人ID的数量。若在任务内部使用任务上下文,必然需要依赖此类信息。
我们可通过为每个字段添加JSONField将这些细节纳入Task模型——这正是参考实现中的当前做法。
但让我们更明确地定义模型:记录每次任务执行尝试及其潜在错误,并通过外键与任务关联:
class Error(models.Model):
exception_class_path = models.TextField()
traceback = models.TextField()
class AttemptResultStatus(TextChoices):
# Subset of TaskResultStatus.
FAILED = TaskResultStatus.FAILED
SUCCESSFUL = TaskResultStatus.SUCCESSFUL
class Attempt(models.Model):
task = models.ForeignKey(Task, related_name="attempts", on_delete=models.CASCADE)
error = models.OneToOneField(
Error, related_name="attempt", on_delete=models.CASCADE, null=True, blank=True
)
worker_id = models.CharField(max_length=MAX_LENGTH_WORKER_ID)
started_at = models.DateTimeField()
stopped_at = models.DateTimeField(blank=True, null=True)
status = models.CharField(
choices=AttemptResultStatus.choices, max_length=10, blank=True
)
此架构确保我们拥有执行任务所需的所有信息,同时在请求TaskResult时能提供完整细节。
一切看似完美,但我们还需考虑工作进程的需求。它必须能够:
- 快速检查待处理任务
- 领取其中一项任务
- 处理该任务并标记为失败、成功或准备就绪 (以便稍后重试)
虽然现有架构能实现上述功能,但我希望进一步优化设计。
class Task(models.Model):
# ...
# This field is used to keep track of when to run a task (again).
# run_after remains unchanged after enqueueing.
available_after = models.DateTimeField()
# Denormalized count of attempts.
attempt_count = models.IntegerField(default=0)
# Set when a worker starts processing this task.
worker_id = models.CharField(max_length=MAX_LENGTH_WORKER_ID, blank=True)
# ...
available_after字段将存储任务最早可执行时间。若任务已指定run_after(可通过任务的… using()方法实现),则available_after将设为此值。否则默认采用当前日期时间(均以UTC为准)。
当任务需重试时,available_after将设为任务可执行的下一个时间点。换言之:我们可实施 延迟策略 。
attempt_count字段能简化可用任务的查询流程。任何attempt_count超过最大允许值的任务均可忽略。虽然其状态本应设置为FAILED(默认应被排除),但我们仍可通过修改配置调整最大重试次数。
当工作者申领任务时,worker_id字段将被填充。这能防止其他工作者拾取该任务(前提是工作者ID具有唯一性)。
任务入队与结果获取
任务入队操作极其简单:从Task数据类实例创建Task模型实例,保存即可!当然,这需要在最终结果转换为TaskResult后完成。
我们使用模型数据库ID的字符串版本作为结果ID。
获取结果同样只需加载任务及其尝试记录,并转换为TaskResult即可。
以下是当前任务后端的简化版本:
class DatabaseBackend(BaseTaskBackend):
supports_defer = True
supports_async_task = False
supports_get_result = True
supports_priority = True
def enqueue(self, task: Task, args, kwargs):
self.validate_task(task)
model = self.queue_store.enqueue(task, args, kwargs)
task_result = TaskResult(
task=task,
id=str(model.pk),
# ...
# More properties being set
# ...
)
return task_result
def get_result(self, result_id):
return self.model_to_result(
self.queue_store.get(result_id)
)
def model_to_result(self, model: models.Task) -> TaskResult:
...
大量功能依赖于queue_store属性实现。在深入探讨前,我们将先说明该后端的配置选项。
配置
我们需要为以下项设置默认值:
- 最大尝试次数(重试次数)
- 退避因子;即使用
math.pow(factor, attempts)实现退避机制
这些参数可针对每个队列单独定制。因此我们在 OPTIONS 中最终得到如下配置:
TASKS = {
"default": {
"BACKEND": "messagecenter.dbtasks.backend.DatabaseBackend",
"OPTIONS": {
"queues": {
"low_priority": {
"max_attempts": 5,
}
},
"max_attempts": 10,
"backoff_factor": 3,
"purge": {"finished": "10 days", "unfinished": "20 days"},
},
}
}
添加到 low_priority 队列的任务最多尝试五次,退避因子为 3。其他任务最多尝试十次,采用相同的退避因子。
队列存储
QueueStore类是后端的配套组件,专注于任务检索与入队、执行任务检查及任务申领。
然而将其纳入的主要目的是简化工人进程。正如我们将看到的,工人进程会获得队列存储的独立副本, 仅限于其需要处理的队列 。
工人进程
至少在本项目中,工人进程的职责是向运行器提供待处理任务的信息,并驱动后端对这些任务进行处理。其实现逻辑如下:
class Worker:
def __init__(
self,
id_: str | None,
backend_name: str,
only: set[str] | None,
excluding: set[str] | None,
):
# Grab the backend and its queue_store.
self.backend = task_backends[backend_name]
queue_store: QueueStore = self.backend.queue_store
# Limit the queue_store to the select queues.
if only or excluding:
queue_store = queue_store.subset(only=only, excluding=excluding)
self.queue_store = queue_store
# Use or create and id. "Must" be unique.
self.id = (
id_ if id_ else create_id(backend_name, queues=queue_store.queue_names)
)
def has_more(self) -> bool:
return self.queue_store.has_more()
def process(self):
with transaction.atomic():
tm = self.queue_store.claim_first_available(worker_id=self.id)
if tm is not None:
self.backend.process_task(tm)
要使工作者运行器正常运作,只需完成以下步骤:
- 创建工作者实例。
- 通过
has_more查询是否有待执行任务。 - 若有:指令其
process处理首个可用任务。若无:转至步骤4。 - 等待,然后返回步骤 2。
这正是我们的 dbtasks_worker 命令 的工作原理。
任务申领
队列存储提供peek方法,该方法返回队列中紧急程度最高的任务ID(综合考虑available_after、priority和attempt_count)。
这使运行器知晓是否存在待处理任务。下一步是申领其中一项任务。因此我们再次调用peek,若返回任务ID,则尝试申领该特定任务。
以下是比项目中QueueStore实现更基础清晰的版本:
def claim_first_available(
self, worker_id: str, attempts: int = 3
) -> models.Task | None:
qs = models.Task.filter(
worker_id="",
status=TaskResultStatus.READY,
)
for _ in range(attempts):
task_id = self.peek()
if not task_id:
return None
count = qs.filter(pk=task_id).update(
worker_id=self.id_,
status=TaskResultStatus.RUNNING,
)
if count:
return models.Task.objects.get(pk=task_id)
return None
若count为零,则申领失败;否则从数据库获取任务并开始处理。
循环机制的引入是因为当前状态是尝试申领peek识别出的任务后跳转至此,显然该任务已被其他工作者抢先处理。既然如此,不妨充分利用当前状态,尝试从队列中获取另一个任务。
处理任务
终于要进入真正执行操作的环节了!
后端process_task方法的流程:
- 创建
Attempt对象并构造当前TaskResult。 - 执行任务,捕获所有继承自
BaseException的异常,或在执行顺利时返回任务的return_value。 - 根据执行结果更新
Task模型、Attempt对象及TaskResult:成功时记录最终执行细节,失败时记录失败原因。 - 若执行失败:检查任务是否可重试。
再次说明:若需深入细节,请查阅代码库。
至此结束
当然,这个演示项目省略了所有需要深入思考的部分,比如工作进程的信号机制或数据库事务逻辑。这并非意味着无法实现——恰恰相反。只是本文并非探讨这些内容。
Django 引入这项功能后,必然会催生针对现有任务队列的新库或适配器。我们很快就会看到有人抱怨 django.tasks 功能不够完善。
因为如果你正在使用任务队列的高级功能,那么 django.tasks 可能无法满足你的需求。
复杂编排
某些任务队列库(如Celery)支持任务组合功能:可将一个任务的结果作为另一个任务的输入,为列表中的每个项排队处理任务等。
至此应已明确,支持此类编排并非django.tasks的目标。对此我完全不介意。要创建统一的API支持此功能根本不可行。我曾深陷那些声称支持此功能的库带来的各种问题。
重试机制
如前所述,当前无法自动重试失败任务,除非后端自行承担重任——就像我们的实现那样。
根据后端特性,这通常可自行处理。例如使用装饰器:
def retry(func):
@functools.wraps(func)
def wrapper(context: TaskContext, *args, **kwargs):
try:
return func(context, *args, **kwargs)
except BaseException as e:
result = context.task_result
backoff = math.pow(2, result.attempts)
run_after = datetime.now(tz=UTC) + timedelta(seconds=backoff)
result.task.using(run_after=run_after).enqueue(*args, **kwargs)
raise e
return wrapper
@task(takes_context=True)
@retry
def send_email(context: TaskContext, to: str, subject: str, body: str):
# Do your thing
...
真正的任务执行机制
确实如此。但参考实现确实提供了真正的任务执行器。请耐心等待,或者更好:加入贡献行列!
没有完美解决方案
我认为django.tasks很快就能覆盖至少80%的常见用例。是的,它的API简单且有限,但对我而言这更像是优势而非缺陷。我认为这已是最接近标准化方案的实现。
本文文字及图片出自 A first look at Django's new background tasks
若您不介意将队列保存在Postgres中,我推荐Procrastinate,效果极佳:
https://procrastinate.readthedocs.io/en/stable/index.html
核心功能并非专为Django设计,但提供可选集成方案。支持同步/异步处理、重试/取消等机制,扩展性极强,且架构设计简洁明了,测试完善。
据我记忆,其代码量仅为Celery的十分之一。
若你喜欢 Procastinate,或许也会青睐我的 Chancy——同样基于 Postgres 构建,但致力于集成最常用的附加功能。
支持速率限制、全局唯一性、超时机制、内存限制,可在同一工作进程中混合使用 asyncio/进程/线程/子解释器,提供工作流管理、定时任务、仪表盘、指标监控、Django 集成、任务重排、触发器、任务修剪、 Windows支持、队列标签(例如:在所有搭载GPU的Windows机器上运行此队列,在py3.14工作进程上运行此队列,在py3.11工作进程上运行此队列)等等等等…
https://tkte.ch/chancy/ & https://github.com/tktech/chancy
即将发布的v0.26版本将包含:HTTP API稳定性优化、仪表盘功能增强、支持数千步骤工作流的性能提升,以及Django任务集成。
我还要强烈推荐 procrastinate!
近两年来,我们已将所有 Django 后端的 Celery 任务迁移至 procrastinate,效果极佳。
将任务延迟处理与业务逻辑置于同一事务中,极大提升了系统一致性与调试便利性。更令人欣喜的是,只需查询数据库或查看Django管理界面,就能清晰掌握任务运行状态。
需要说明的是,procrastinate本身并未内置django-celery-beat的替代方案,但您完全可以在一天内轻松构建自己的解决方案:无需额外依赖即可实现 🙂
Celery在任何规模下运行/维护都堪称垃圾。对此我非常期待。Rq/temporal似乎也很好地解决了这个问题。
有人成功将Celery迁移到其他方案吗?有何经验分享?
我有个客户有两个项目。一个部署在自有硬件上,采用Django+Celery架构;另一个运行在AWS EC2上,仅使用Django。
在第一个项目中,我们用Celery运行耗时几秒到几分钟的任务。在第二个项目中,我们创建新虚拟机运行任务,并在任务完成后自动销毁虚拟机。通信通过共享数据库和SQS队列实现。
Celery存在周期性问题:工作者与RabbitMQ断开连接、Celery自身卡死、gevent异常(可能由C库引发但无法确定——部分工作者采用prefork模式但非全部)。
EC2虚拟机从未出现过问题。顺带一提,我们使用VirtualBox本地模拟EC2环境:一个Python类封装了启动虚拟机的API,生产环境中通过boto3实现,开发环境则使用VBoxManage。
令我困惑的是:虽然都是Linux、amd64架构和RabbitMQ环境,但另一位使用Rails和Sidekiq的客户运行着更多任务却毫无问题。Celery内部并发堆栈存在某些过于脆弱的设计。
搭配Redis后端的Celery表现相当稳定。
RabbitMQ及其相关组件实在令人头疼。
记得2010年代初在初创公司工作时,整个后台任务基础设施都依赖Celery。当时每天有数百万任务在数十台服务器上运行。Celery经常卡死,队列堆积如山。我们编写了各种疯狂脚本:重启Celery、检测并终止卡死进程等等。真是段“欢乐时光”。
深有同感。多年前接触Celery时,其维护成本和潜在陷阱远超预期。代码库和文档也略显混乱,毕竟这是个庞大的开源项目,参与者众多,倒也情有可原。总之,若在K8S环境就用Argo,否则另寻他法。初创公司追求效率的话,直接用procrastinate这类工具更合适。
已将Celery迁移至Argo Workflows。过程很简单没什么特别经验可分享。但启动速度会大幅下降,所以它不是直接替代方案,仅适用于长期运行的工作流。Celery比Argo Workflows更容易上手。Celery入门真的非常简单。我最喜欢Airflow,但它更接近Argo Workflows,都偏向于长期运行的工作流。近期计划尝试Hatchet。听说Temporal管理难度更高。
我们已从Celery迁移至Temporal。Temporal是卓越的分布式系统解决方案。
使用Celery时遇到了哪些问题?
队列系统常让我困惑的一点在于:一旦出现任务需要排队处理其他任务的情况,这种抽象方式便显得不妥。尤其当功能不断扩展时,这种矛盾会加剧,在建模业务流程时更是如此。
这是因为负责排队任务的代码必须预知后续流程,这破坏了关注点分离原则。用户注册代码为何需要知道报告生成任务需要排队?
对我而言更合理的方案是触发事件。代码只需宣告“此事件已发生”,由其他代码自主决定是否监听。这便形成了事件流而非队列,并可建立消费者组等机制。
我曾围绕此理念创建过(现已弃用的)项目https://lightbus.org,它在我们的用例中表现出色。希望如今已有更优解诞生。
因此我的建议是:在采用任务队列前,请先审视你实际建模的对象。但需警惕事件流设计的深坑!
二者并非互斥。“事件驱动”并不必然意味着异步。我曾构建过事件驱动的模块化单体架构,所有事件均同步处理。任务排队由接收方自主决定,因此不会跨越上下文边界。
你描述的需求似乎并非事件驱动,更像是编排机制——编排器需感知任务间依赖关系并按正确顺序执行(构建有向无环图DAG)。Airflow等工具正是如此运作。
看到Django内置了完整的后台任务解决方案真是太棒了。
问问用过Celery/Procrastinate/Chancy的朋友:实际项目中重试/ACK机制体验如何?有没有什么不足之处?
可观测性方面——仪表盘、追踪、指标——开箱即用是否足够,还是需要额外集成?
重构时遇到类型提示或装饰器式任务的陷阱了吗?我见过这些特性引发的问题。
最后,测试环境切换后端时是否真正无缝衔接,还是仅限于“演示环境”?
(我可能有偏见,毕竟我是Chancy的作者)
Celery的主要痛点在于可观测性。像Procastinate和Chancy这类数据库驱动的方案虽无法达到Celery+RabbitMQ的峰值吞吐量,但即便在每月14美元的VPS上,仍能轻松处理每日数百万任务。其代价是能深度洞察运行状态——所有状态都存储在数据库中,随时可查询。Procastinate和Chancy都支持Django集成,甚至可通过ORM进行查询。
以Chancy为例,重试功能仅需启用默认的插件即可实现(操作极其简单)——https://github.com/TkTech/chancy/blob/main/chancy/plugins/re…。您可自由替换该插件,添加任何复杂的重试策略。
Chancy还附带“足够好用”的指标插件和仪表盘。虽不适用于处理数万种不同任务的超高负载场景,但对多数项目已足够满足需求。即将发布的0.26版本中可查看新版界面及示例截图——https://github.com/TkTech/chancy/pull/58(该仪表盘实例运行于生产环境,每日处理约60万任务,硬件配置堪比烤面包机)。该仪表板支持本地独立运行并适配任意数据库,可嵌入工作进程,也可集成至现有ASGI应用中。
Celery Flower虽存在些许瑕疵,但总体能满足Celery任务管理需求,清晰显示任务失败或积压情况。
我已在生产环境使用django-tasks库约一年。其数据库后端和简洁界面表现出色。它显然并非旨在完全替代Celery,但对于无需额外基础设施的简单任务队列而言,运行效果相当理想。
这个?https://github.com/RealOrangeOne/django-tasks
这个和rq后端听起来都很有前景。
没错,RealOrangeOne仓库在合并到Django前就是工作仓库。文章讨论的就是这个。
它已合并到GitHub仓库了吗?我此前并不知情,且在此处未见:https://github.com/django/django/tree/main/django/tasks/back…
这个功能挺有意思的。
API是否支持进度报告?(例如“完成30%”)
当然在实现工作进程时可以手动构建,但我更希望API能体现这个功能。Celery似乎也缺少这方面的API支持。
有人知道为什么缺少这个功能吗?我觉得这不会让API变得复杂,对于长期运行的后台任务来说似乎是理所当然的。
我之前问过这个问题。据我所知,开发团队确实有计划添加此功能,但在首次发布时,任务系统被刻意设计得非常轻量级。
是否有可追溯的来源?邮件讨论串、Bug追踪器记录等?
实现进度报告意味着需要提前知道任务耗时,对吧?这种精确预测真的可行吗?
不过我猜可以采用策略进行近似估算,比如记录某类任务的历史执行时间,以此推断当前同类任务的进度。
> 实现进度报告功能,意味着你能够提前知道任务的运行时间,对吧?
不。你只需要知道总步骤数和当前所处的步骤。
你说得对,但我同样找不到任务队列库如何提前知道这些信息来实现进度报告。
有人知道实现过这个功能的任务队列库吗?我很想研究看看!
类型支持如何?我们刚经历了停机,因为Celery任务的变更未能触发mypy在运行时对所有调用点的报错。太多Python装饰器在设计时对类型支持考虑不足。
关于参数和关键字参数?没有。你的可调用对象会被替换成不可调用的Task实例。你需要调用它的enqueue(*args, **kwargs)方法,而 是的… 这当然没有类型声明。
静态分析在Python中永远无法完全可靠。举个简单例子:你可以定义仅在运行时存在的函数,因此从原理上讲,即使在静态类型检查中也无法验证,甚至无法确定函数的调用路径——除非实际以跟踪/分析器模式运行代码。
你可能需要类似pydantic的@validate_call装饰器。
> 你可以定义一个仅在运行时存在的函数,因此从原理上讲无法进行静态类型检查
能否举例说明无法类型化的函数?是指运行时生成字节码、使用lambda表达式定义函数,还是其他情况?
我曾尝试过Celery/Procrastinate/Chancy等工具,但始终感觉不够“顺手”。最终还是回归GCP云任务调度方案——对中小型项目而言实在太简洁了。
不过这次真的想试试这个方案。
太棒了!之前推荐的通常是celery库,但我始终没能搞定。具体细节记不清了,但存在难以克服的高门槛或兼容性障碍。这个集成完美契合Django的“内置电池”理念。
目前我都是通过独立脚本处理,这些脚本会挂接到Django模型和ORM上。必须在模块开头按特定顺序显式调用某些命令才能实现。
Celery通常与redis后端配合效果最佳。
处理模型时关键在于:传递给Celery任务时应使用ID而非模型实例本身。
这是因为Celery工作者可能运行在完全不同的环境中。
> 挂钩Django模型和ORM的脚本
Django为此提供了管理命令[1]。
长期使用Django时,每当需要新功能总会惊喜发现 “哦,Django早就实现了”
[1] https://docs.djangoproject.com/en/5.2/howto/custom-managemen…
这是令人振奋的进展。虽然多数场景我仍会继续使用Celery,但能在测试、持续集成等场景中无缝切换后端功能确实极具吸引力。
虽然尚未深入研究,但我想知道新API或默认设置能否优化Celery的某些缺陷,比如过早确认和重试机制。
看到这类改进令人振奋。从Rails转战Django后,我对SaaS应用的开箱即用功能略感失望(管理后台自然是主要例外)。即便在生态系统中,我也觉得Django的包管理比Rails更混乱且生产就绪性较低。当然我接触Django时间尚短,且两者的设计理念本就不同
Django这功能迟到十年了。令人沮丧的是,我们不得不使用各种权宜之计来弥补内置功能的缺失。
种树的最佳时机是二十年前。其次最佳的时机是现在。
大多数人使用的是Celery,它几乎和Django一样历史悠久。
你真的想要实现自动转译功能。这会是个不错的圣诞项目。