首次探秘 Django 的新后台任务框架

Django 6.0 在 django.tasks 中引入了内置的后台任务框架。但请勿急于淘汰 Celery、Huey 或其他首选解决方案。

发布说明对此有明确说明:

Django 负责任务创建和队列管理,但 不提供执行任务的工件机制 。任务执行必须由外部基础设施(如独立进程或服务)管理。

新版 django.tasks 模块的核心目标是 为任务队列实现提供通用 APIJake Howard 是这项增强功能的核心推动者。欢迎查阅Django论坛上的项目介绍。

他的参考实现(同时兼容早期Django版本的回溯移植)已发布于GitHub上的django-tasks

但我们暂且忽略该实现,转而探索Django 6.0内置的精简版本。通过创建专属后端和工作进程,我们来实践:

元素周期表

我们的项目:通知系统

我们将创建一个应用程序,利用ntfy.sh向手机及其他设备发送通知。(我是它的粉丝!)

若想直接参与代码开发,请查看GitHub上的最终项目版本

使用nfty向手机发送通知只需:

  1. 注册账户
  2. 创建主题。
  3. 在手机上安装应用并登录。
  4. https://ntfy.sh/<yourtopic> 发送HTTP请求

免费版仅提供公开主题和消息。这意味着任何订阅该主题的人都能看到您发送的内容。为满足我们的需求,可直接创建随机命名的主题(如UUID)。

项目配置要求将步骤4的URL作为环境变量提供。例如:

NTFY_URL=https://ntfy.sh/062519693d9c4913826f0a39aeea8a4c

以下是承担核心功能的函数:

import httpx
from django.conf import settings

def send_notification(message: str, title: str | None):
    # Pass the title if specified.
    headers = {"title": title} if title else {}
    httpx.post(
        settings.NTFY_URL,
        content=message,
        headers=headers,
    )

真的。至此已具备发送接收通知的基本功能。

图0:首次探秘 Django 的新后台任务框架

快速入门指南

建议查阅Django任务框架文档获取详细说明,但为节省时间我们提供简要指南。

定义任务

这是新框架的核心目标:通过 Django 标准 API 定义任务,而非使用任务队列专属装饰器或其他方法。

具体实现如下:

# ...
from django.tasks import task

@task
def send_notification(message: str, title: str | None):
    # ...as before

该函数现已成为任务。准确来说是 django.tasks.Task 对象。

您无法再直接调用send_notification。任务必须通过enqueue方法运行。这可能与您的预期行为不符,但这是最佳方案。该设计彻底杜绝了在进程中意外调用任务(而非后台执行)的可能性。

task装饰器允许您指定任务优先级、队列名称和后端名称。可通过using方法覆盖这些设置,该方法将返回新的django.tasks.Task实例。

若需更精细地控制任务行为,可在装饰器中将takes_context设为True,并将context作为首参数传递。当前该上下文可访问任务结果,从而获取尝试次数等有用信息。

无法定义重试机制、延迟策略等完整任务队列实现中的高级功能——但这 并非 该组件的设计初衷。如有需要,您可通过检查任务上下文轻松添加自定义重试逻辑。

任务入队

向队列添加任务非常简单:

task_result = send_notification.enqueue(
    message="Season's greeting!", 
    title="Santa has something to tell you"
)

执行任务

此处功能尚有不足。至少当前版本如此。Django 6.0 将提供 ImmediateBackendDummyBackend 两种后端:前者立即执行任务,后者则完全不执行任务。

因此我们的项目包含一个基于数据库和工作进程的(演示)后端!

获取结果

若无需即时等待结果,可通过任务ID后续获取。只需调用任务的get_result(result_id)方法即可。

本项目包含一个定期通过htmx轮询未处理结果的视图。

图1:首次探秘 Django 的新后台任务框架

表单下方的列表展示每次任务执行的结果。表单提交后,新结果将添加至列表顶部。只要结果状态未变为FAILEDSUCCESSFUL,Htmx就会持续轮询更新。

def task_result(request, result_id, status):
    result = send_notification.get_result(result_id)
    if result.status == status:
        # No need to swap the result.
        return HttpResponse(status=204)
    return TemplateResponse(request, "index.html#result", {"result": result})

想知道index.html#results在做什么?Django 6.0还引入了模板片段。在此示例中,我们的视图实际上发送的响应仅包含名为result的模板片段。

幕后机制

当你用 task 装饰器修饰可调用对象时,系统会调用配置后端的 task_class 类来封装该对象。默认使用 django.task.Task 类。

该类的 enqueue 方法会进一步调用配置后端的 enqueue 方法。

调用其get_result方法的逻辑类似:调用配置后端的get_result方法并传递结果。

由于无需工作进程,这基本就是任务后端所需实现的全部功能。很酷吧?我们来添加一个后端吧?

任务数据库后端

我们的目标:

  • 基于数据库实现基础任务后端
  • 支持“自动重试”机制

我们的enqueueget_result方法将返回默认django.tasks.TaskResult实例。这决定了我们需要存储的最小数据量,我们将通过名为Task的模型实现存储。

模型设计

基于 django.tasksTaskResultTask 的属性(即“数据类”),我们先创建 Task 模型的初稿:

class Task(models.Model):
    priority = models.IntegerField(default=0)
    callable_path = models.CharField(max_length=255)
    backend = models.CharField(max_length=200)
    queue_name = models.CharField(max_length=100)
    run_after = models.DateTimeField(null=True, blank=True)
    takes_context = models.BooleanField(default=False)
    # Stores args and kwargs
    arguments = models.JSONField(null=True, blank=True)
    status = models.CharField(
        choices=TaskResultStatus.choices, max_length=10, default=TaskResultStatus.READY
    )
    enqueued_at = models.DateTimeField()
    started_at = models.DateTimeField(blank=True, null=True)
    finished_at = models.DateTimeField(blank=True, null=True)
    last_attempted_at = models.DateTimeField(blank=True, null=True)
    return_value = models.JSONField(null=True, blank=True)

缺失了什么?首先,TaskResult还包含遇到的错误列表以及处理任务的工人的ID。这些信息我们 或许 可以忽略。

TaskResult.attempts属性基于工人ID的数量。若在任务内部使用任务上下文,必然需要依赖此类信息。

我们可通过为每个字段添加JSONField将这些细节纳入Task模型——这正是参考实现中的当前做法。

但让我们更明确地定义模型:记录每次任务执行尝试及其潜在错误,并通过外键与任务关联:

class Error(models.Model):
    exception_class_path = models.TextField()
    traceback = models.TextField()

class AttemptResultStatus(TextChoices):
    # Subset of TaskResultStatus.
    FAILED = TaskResultStatus.FAILED
    SUCCESSFUL = TaskResultStatus.SUCCESSFUL

class Attempt(models.Model):
    task = models.ForeignKey(Task, related_name="attempts", on_delete=models.CASCADE)
    error = models.OneToOneField(
        Error, related_name="attempt", on_delete=models.CASCADE, null=True, blank=True
    )
    worker_id = models.CharField(max_length=MAX_LENGTH_WORKER_ID)
    started_at = models.DateTimeField()
    stopped_at = models.DateTimeField(blank=True, null=True)
    status = models.CharField(
        choices=AttemptResultStatus.choices, max_length=10, blank=True
    )

此架构确保我们拥有执行任务所需的所有信息,同时在请求TaskResult时能提供完整细节。

一切看似完美,但我们还需考虑工作进程的需求。它必须能够:

  1. 快速检查待处理任务
  2. 领取其中一项任务
  3. 处理该任务并标记为失败、成功或准备就绪 (以便稍后重试)

虽然现有架构能实现上述功能,但我希望进一步优化设计。

class Task(models.Model):
    # ...
    # This field is used to keep track of when to run a task (again).
    # run_after remains unchanged after enqueueing.
    available_after = models.DateTimeField()
    # Denormalized count of attempts.
    attempt_count = models.IntegerField(default=0)
    # Set when a worker starts processing this task.
    worker_id = models.CharField(max_length=MAX_LENGTH_WORKER_ID, blank=True)
    # ...

available_after字段将存储任务最早可执行时间。若任务已指定run_after(可通过任务的… using()方法实现),则available_after将设为此值。否则默认采用当前日期时间(均以UTC为准)。

当任务需重试时,available_after将设为任务可执行的下一个时间点。换言之:我们可实施 延迟策略

attempt_count字段能简化可用任务的查询流程。任何attempt_count超过最大允许值的任务均可忽略。虽然其状态本应设置为FAILED(默认应被排除),但我们仍可通过修改配置调整最大重试次数。

当工作者申领任务时,worker_id字段将被填充。这能防止其他工作者拾取该任务(前提是工作者ID具有唯一性)。

任务入队与结果获取

任务入队操作极其简单:从Task数据类实例创建Task模型实例,保存即可!当然,这需要在最终结果转换为TaskResult后完成。

我们使用模型数据库ID的字符串版本作为结果ID。

获取结果同样只需加载任务及其尝试记录,并转换为TaskResult即可。

以下是当前任务后端的简化版本:

class DatabaseBackend(BaseTaskBackend):
    supports_defer = True
    supports_async_task = False
    supports_get_result = True
    supports_priority = True

    def enqueue(self, task: Task, args, kwargs):
        self.validate_task(task)
        model = self.queue_store.enqueue(task, args, kwargs)
        task_result = TaskResult(
            task=task,
            id=str(model.pk),
            # ...
            # More properties being set
            # ...
        )
        return task_result

    def get_result(self, result_id):
        return self.model_to_result(
            self.queue_store.get(result_id)
        )

    def model_to_result(self, model: models.Task) -> TaskResult:
        ...

大量功能依赖于queue_store属性实现。在深入探讨前,我们将先说明该后端的配置选项。

配置

我们需要为以下项设置默认值:

  • 最大尝试次数(重试次数)
  • 退避因子;即使用 math.pow(factor, attempts) 实现退避机制

这些参数可针对每个队列单独定制。因此我们在 OPTIONS 中最终得到如下配置:

TASKS = {
    "default": {
        "BACKEND": "messagecenter.dbtasks.backend.DatabaseBackend",
        "OPTIONS": {
            "queues": {
                "low_priority": {
                    "max_attempts": 5,
                }
            },
            "max_attempts": 10,
            "backoff_factor": 3,
            "purge": {"finished": "10 days", "unfinished": "20 days"},
        },
    }
}

添加到 low_priority 队列的任务最多尝试五次,退避因子为 3。其他任务最多尝试十次,采用相同的退避因子。

队列存储

QueueStore是后端的配套组件,专注于任务检索与入队、执行任务检查及任务申领。

然而将其纳入的主要目的是简化工人进程。正如我们将看到的,工人进程会获得队列存储的独立副本, 仅限于其需要处理的队列

工人进程

至少在本项目中,工人进程的职责是向运行器提供待处理任务的信息,并驱动后端对这些任务进行处理。其实现逻辑如下:

class Worker:
    def __init__(
        self,
        id_: str | None,
        backend_name: str,
        only: set[str] | None,
        excluding: set[str] | None,
    ):
        # Grab the backend and its queue_store.
        self.backend = task_backends[backend_name]
        queue_store: QueueStore = self.backend.queue_store
        # Limit the queue_store to the select queues.
        if only or excluding:
            queue_store = queue_store.subset(only=only, excluding=excluding)
        self.queue_store = queue_store
        # Use or create and id. "Must" be unique.
        self.id = (
            id_ if id_ else create_id(backend_name, queues=queue_store.queue_names)
        )

    def has_more(self) -> bool:
        return self.queue_store.has_more()

    def process(self):
        with transaction.atomic():
            tm = self.queue_store.claim_first_available(worker_id=self.id)
        if tm is not None:
            self.backend.process_task(tm)

要使工作者运行器正常运作,只需完成以下步骤:

  1. 创建工作者实例。
  2. 通过has_more查询是否有待执行任务。
  3. 若有:指令其process处理首个可用任务。若无:转至步骤4。
  4. 等待,然后返回步骤 2。

这正是我们的 dbtasks_worker 命令 的工作原理。

任务申领

队列存储提供peek方法,该方法返回队列中紧急程度最高的任务ID(综合考虑available_afterpriorityattempt_count)。

这使运行器知晓是否存在待处理任务。下一步是申领其中一项任务。因此我们再次调用peek,若返回任务ID,则尝试申领该特定任务。

以下是比项目中QueueStore实现更基础清晰的版本:

def claim_first_available(
    self, worker_id: str, attempts: int = 3
) -> models.Task | None:
    qs = models.Task.filter(
        worker_id="", 
        status=TaskResultStatus.READY,
    )
    for _ in range(attempts):
        task_id = self.peek()
        if not task_id:
            return None
        count = qs.filter(pk=task_id).update(
            worker_id=self.id_,
            status=TaskResultStatus.RUNNING,
        )
        if count:
            return models.Task.objects.get(pk=task_id)
    return None

count为零,则申领失败;否则从数据库获取任务并开始处理。

循环机制的引入是因为当前状态是尝试申领peek识别出的任务后跳转至此,显然该任务已被其他工作者抢先处理。既然如此,不妨充分利用当前状态,尝试从队列中获取另一个任务。

处理任务

终于要进入真正执行操作的环节了!

后端process_task方法的流程:

  1. 创建Attempt对象并构造当前TaskResult
  2. 执行任务,捕获所有继承自BaseException的异常,或在执行顺利时返回任务的return_value
  3. 根据执行结果更新Task模型、Attempt对象及TaskResult:成功时记录最终执行细节,失败时记录失败原因。
  4. 若执行失败:检查任务是否可重试。

再次说明:若需深入细节,请查阅代码库

至此结束

当然,这个演示项目省略了所有需要深入思考的部分,比如工作进程的信号机制或数据库事务逻辑。这并非意味着无法实现——恰恰相反。只是本文并非探讨这些内容。

Django 引入这项功能后,必然会催生针对现有任务队列的新库或适配器。我们很快就会看到有人抱怨 django.tasks 功能不够完善。

因为如果你正在使用任务队列的高级功能,那么 django.tasks 可能无法满足你的需求。

复杂编排

某些任务队列库(如Celery)支持任务组合功能:可将一个任务的结果作为另一个任务的输入,为列表中的每个项排队处理任务等。

至此应已明确,支持此类编排并非django.tasks的目标。对此我完全不介意。要创建统一的API支持此功能根本不可行。我曾深陷那些声称支持此功能的库带来的各种问题。

重试机制

如前所述,当前无法自动重试失败任务,除非后端自行承担重任——就像我们的实现那样。

根据后端特性,这通常可自行处理。例如使用装饰器:

def retry(func):
    @functools.wraps(func)
    def wrapper(context: TaskContext, *args, **kwargs):
        try:
            return func(context, *args, **kwargs)
        except BaseException as e:
            result = context.task_result
            backoff = math.pow(2, result.attempts)
            run_after = datetime.now(tz=UTC) + timedelta(seconds=backoff)
            result.task.using(run_after=run_after).enqueue(*args, **kwargs)
            raise e
    return wrapper


@task(takes_context=True)
@retry
def send_email(context: TaskContext, to: str, subject: str, body: str):
    # Do your thing 
    ...

真正的任务执行机制

确实如此。但参考实现确实提供了真正的任务执行器。请耐心等待,或者更好:加入贡献行列!

没有完美解决方案

我认为django.tasks很快就能覆盖至少80%的常见用例。是的,它的API简单且有限,但对我而言这更像是优势而非缺陷。我认为这已是最接近标准化方案的实现。

本文文字及图片出自 A first look at Django's new background tasks

共有 41 条评论

  1. 若您不介意将队列保存在Postgres中,我推荐Procrastinate,效果极佳:

    https://procrastinate.readthedocs.io/en/stable/index.html

    核心功能并非专为Django设计,但提供可选集成方案。支持同步/异步处理、重试/取消等机制,扩展性极强,且架构设计简洁明了,测试完善。

    据我记忆,其代码量仅为Celery的十分之一。

    • 若你喜欢 Procastinate,或许也会青睐我的 Chancy——同样基于 Postgres 构建,但致力于集成最常用的附加功能。

      支持速率限制、全局唯一性、超时机制、内存限制,可在同一工作进程中混合使用 asyncio/进程/线程/子解释器,提供工作流管理、定时任务、仪表盘、指标监控、Django 集成、任务重排、触发器、任务修剪、 Windows支持、队列标签(例如:在所有搭载GPU的Windows机器上运行此队列,在py3.14工作进程上运行此队列,在py3.11工作进程上运行此队列)等等等等…

      https://tkte.ch/chancy/ & https://github.com/tktech/chancy

      即将发布的v0.26版本将包含:HTTP API稳定性优化、仪表盘功能增强、支持数千步骤工作流的性能提升,以及Django任务集成。

    • 我还要强烈推荐 procrastinate!

      近两年来,我们已将所有 Django 后端的 Celery 任务迁移至 procrastinate,效果极佳。

      将任务延迟处理与业务逻辑置于同一事务中,极大提升了系统一致性与调试便利性。更令人欣喜的是,只需查询数据库或查看Django管理界面,就能清晰掌握任务运行状态。

      需要说明的是,procrastinate本身并未内置django-celery-beat的替代方案,但您完全可以在一天内轻松构建自己的解决方案:无需额外依赖即可实现 🙂

  2. Celery在任何规模下运行/维护都堪称垃圾。对此我非常期待。Rq/temporal似乎也很好地解决了这个问题。

    有人成功将Celery迁移到其他方案吗?有何经验分享?

    • 我有个客户有两个项目。一个部署在自有硬件上,采用Django+Celery架构;另一个运行在AWS EC2上,仅使用Django。

      在第一个项目中,我们用Celery运行耗时几秒到几分钟的任务。在第二个项目中,我们创建新虚拟机运行任务,并在任务完成后自动销毁虚拟机。通信通过共享数据库和SQS队列实现。

      Celery存在周期性问题:工作者与RabbitMQ断开连接、Celery自身卡死、gevent异常(可能由C库引发但无法确定——部分工作者采用prefork模式但非全部)。

      EC2虚拟机从未出现过问题。顺带一提,我们使用VirtualBox本地模拟EC2环境:一个Python类封装了启动虚拟机的API,生产环境中通过boto3实现,开发环境则使用VBoxManage。

      令我困惑的是:虽然都是Linux、amd64架构和RabbitMQ环境,但另一位使用Rails和Sidekiq的客户运行着更多任务却毫无问题。Celery内部并发堆栈存在某些过于脆弱的设计。

      • 搭配Redis后端的Celery表现相当稳定。

        RabbitMQ及其相关组件实在令人头疼。

    • 记得2010年代初在初创公司工作时,整个后台任务基础设施都依赖Celery。当时每天有数百万任务在数十台服务器上运行。Celery经常卡死,队列堆积如山。我们编写了各种疯狂脚本:重启Celery、检测并终止卡死进程等等。真是段“欢乐时光”。

    • 深有同感。多年前接触Celery时,其维护成本和潜在陷阱远超预期。代码库和文档也略显混乱,毕竟这是个庞大的开源项目,参与者众多,倒也情有可原。总之,若在K8S环境就用Argo,否则另寻他法。初创公司追求效率的话,直接用procrastinate这类工具更合适。

    • 已将Celery迁移至Argo Workflows。过程很简单没什么特别经验可分享。但启动速度会大幅下降,所以它不是直接替代方案,仅适用于长期运行的工作流。Celery比Argo Workflows更容易上手。Celery入门真的非常简单。我最喜欢Airflow,但它更接近Argo Workflows,都偏向于长期运行的工作流。近期计划尝试Hatchet。听说Temporal管理难度更高。

    • 我们已从Celery迁移至Temporal。Temporal是卓越的分布式系统解决方案。

    • 使用Celery时遇到了哪些问题?

  3. 队列系统常让我困惑的一点在于:一旦出现任务需要排队处理其他任务的情况,这种抽象方式便显得不妥。尤其当功能不断扩展时,这种矛盾会加剧,在建模业务流程时更是如此。

    这是因为负责排队任务的代码必须预知后续流程,这破坏了关注点分离原则。用户注册代码为何需要知道报告生成任务需要排队?

    对我而言更合理的方案是触发事件。代码只需宣告“此事件已发生”,由其他代码自主决定是否监听。这便形成了事件流而非队列,并可建立消费者组等机制。

    我曾围绕此理念创建过(现已弃用的)项目https://lightbus.org,它在我们的用例中表现出色。希望如今已有更优解诞生。

    因此我的建议是:在采用任务队列前,请先审视你实际建模的对象。但需警惕事件流设计的深坑!

    • 二者并非互斥。“事件驱动”并不必然意味着异步。我曾构建过事件驱动的模块化单体架构,所有事件均同步处理。任务排队由接收方自主决定,因此不会跨越上下文边界。

    • 你描述的需求似乎并非事件驱动,更像是编排机制——编排器需感知任务间依赖关系并按正确顺序执行(构建有向无环图DAG)。Airflow等工具正是如此运作。

  4. 看到Django内置了完整的后台任务解决方案真是太棒了。

    问问用过Celery/Procrastinate/Chancy的朋友:实际项目中重试/ACK机制体验如何?有没有什么不足之处?

    可观测性方面——仪表盘、追踪、指标——开箱即用是否足够,还是需要额外集成?

    重构时遇到类型提示或装饰器式任务的陷阱了吗?我见过这些特性引发的问题。

    最后,测试环境切换后端时是否真正无缝衔接,还是仅限于“演示环境”?

    • (我可能有偏见,毕竟我是Chancy的作者)

      Celery的主要痛点在于可观测性。像Procastinate和Chancy这类数据库驱动的方案虽无法达到Celery+RabbitMQ的峰值吞吐量,但即便在每月14美元的VPS上,仍能轻松处理每日数百万任务。其代价是能深度洞察运行状态——所有状态都存储在数据库中,随时可查询。Procastinate和Chancy都支持Django集成,甚至可通过ORM进行查询。

      以Chancy为例,重试功能仅需启用默认的插件即可实现(操作极其简单)——https://github.com/TkTech/chancy/blob/main/chancy/plugins/re…。您可自由替换该插件,添加任何复杂的重试策略。

      Chancy还附带“足够好用”的指标插件和仪表盘。虽不适用于处理数万种不同任务的超高负载场景,但对多数项目已足够满足需求。即将发布的0.26版本中可查看新版界面及示例截图——https://github.com/TkTech/chancy/pull/58(该仪表盘实例运行于生产环境,每日处理约60万任务,硬件配置堪比烤面包机)。该仪表板支持本地独立运行并适配任意数据库,可嵌入工作进程,也可集成至现有ASGI应用中。

      • Celery Flower虽存在些许瑕疵,但总体能满足Celery任务管理需求,清晰显示任务失败或积压情况。

  5. 我已在生产环境使用django-tasks库约一年。其数据库后端和简洁界面表现出色。它显然并非旨在完全替代Celery,但对于无需额外基础设施的简单任务队列而言,运行效果相当理想。

  6. 这个功能挺有意思的。

    API是否支持进度报告?(例如“完成30%”)

    当然在实现工作进程时可以手动构建,但我更希望API能体现这个功能。Celery似乎也缺少这方面的API支持。

    有人知道为什么缺少这个功能吗?我觉得这不会让API变得复杂,对于长期运行的后台任务来说似乎是理所当然的。

    • 我之前问过这个问题。据我所知,开发团队确实有计划添加此功能,但在首次发布时,任务系统被刻意设计得非常轻量级。

      • 是否有可追溯的来源?邮件讨论串、Bug追踪器记录等?

    • 实现进度报告意味着需要提前知道任务耗时,对吧?这种精确预测真的可行吗?

      不过我猜可以采用策略进行近似估算,比如记录某类任务的历史执行时间,以此推断当前同类任务的进度。

      • > 实现进度报告功能,意味着你能够提前知道任务的运行时间,对吧?

        不。你只需要知道总步骤数和当前所处的步骤。

        • 你说得对,但我同样找不到任务队列库如何提前知道这些信息来实现进度报告。

          有人知道实现过这个功能的任务队列库吗?我很想研究看看!

  7. 类型支持如何?我们刚经历了停机,因为Celery任务的变更未能触发mypy在运行时对所有调用点的报错。太多Python装饰器在设计时对类型支持考虑不足。

    • 关于参数和关键字参数?没有。你的可调用对象会被替换成不可调用的Task实例。你需要调用它的enqueue(*args, **kwargs)方法,而 是的… 这当然没有类型声明。

    • 静态分析在Python中永远无法完全可靠。举个简单例子:你可以定义仅在运行时存在的函数,因此从原理上讲,即使在静态类型检查中也无法验证,甚至无法确定函数的调用路径——除非实际以跟踪/分析器模式运行代码。

      你可能需要类似pydantic的@validate_call装饰器。

      • > 你可以定义一个仅在运行时存在的函数,因此从原理上讲无法进行静态类型检查

        能否举例说明无法类型化的函数?是指运行时生成字节码、使用lambda表达式定义函数,还是其他情况?

  8. 我曾尝试过Celery/Procrastinate/Chancy等工具,但始终感觉不够“顺手”。最终还是回归GCP云任务调度方案——对中小型项目而言实在太简洁了。

    不过这次真的想试试这个方案。

  9. 太棒了!之前推荐的通常是celery库,但我始终没能搞定。具体细节记不清了,但存在难以克服的高门槛或兼容性障碍。这个集成完美契合Django的“内置电池”理念。

    目前我都是通过独立脚本处理,这些脚本会挂接到Django模型和ORM上。必须在模块开头按特定顺序显式调用某些命令才能实现。

    • Celery通常与redis后端配合效果最佳。

      处理模型时关键在于:传递给Celery任务时应使用ID而非模型实例本身。

      这是因为Celery工作者可能运行在完全不同的环境中。

    • > 挂钩Django模型和ORM的脚本

      Django为此提供了管理命令[1]。

      长期使用Django时,每当需要新功能总会惊喜发现 “哦,Django早就实现了”

      [1] https://docs.djangoproject.com/en/5.2/howto/custom-managemen

  10. 这是令人振奋的进展。虽然多数场景我仍会继续使用Celery,但能在测试、持续集成等场景中无缝切换后端功能确实极具吸引力。

    虽然尚未深入研究,但我想知道新API或默认设置能否优化Celery的某些缺陷,比如过早确认和重试机制。

  11. 看到这类改进令人振奋。从Rails转战Django后,我对SaaS应用的开箱即用功能略感失望(管理后台自然是主要例外)。即便在生态系统中,我也觉得Django的包管理比Rails更混乱且生产就绪性较低。当然我接触Django时间尚短,且两者的设计理念本就不同

  12. Django这功能迟到十年了。令人沮丧的是,我们不得不使用各种权宜之计来弥补内置功能的缺失。

  13. 你真的想要实现自动转译功能。这会是个不错的圣诞项目。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注


京ICP备12002735号