您的位置:首页 > 科技 > 能源 > 公司工商查询_手机网站的建设_拼多多seo 优化软件_推广产品吸引人的句子

公司工商查询_手机网站的建设_拼多多seo 优化软件_推广产品吸引人的句子

2024/12/27 19:42:40 来源:https://blog.csdn.net/m0_68472237/article/details/143465617  浏览:    关键词:公司工商查询_手机网站的建设_拼多多seo 优化软件_推广产品吸引人的句子
公司工商查询_手机网站的建设_拼多多seo 优化软件_推广产品吸引人的句子

创建一个django项目

django-admin startproject celeryDemo

进入项目目录

cd celeryDemo

在你的 Django 项目中,创建一个 celery_.py 文件,通常放在项目的根目录(与 settings.py 同级):

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery# 设置 Django 的默认配置模块
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'celeryDemo.settings')app = Celery('celeryDemo')# 从 Django 的 settings.py 中加载配置
app.config_from_object('django.conf:settings', namespace='CELERY')# 自动发现任务模块
# app.autodiscover_tasks()
# 列出所有需要的应用
app.autodiscover_tasks(['home']) 

settings.py 中配置 Celery 添加一些基本配置,这里使用 Redis 作为消息代理:

# celery_config
CELERY_BROKER_URL = 'redis://localhost:6379/7'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/8'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'

创建任务 在你的 Django 应用中创建任务,app.tasks.py 文件中 app是你创建的app名 我这里是home

from celery import shared_task# 定时任务
@shared_task
def my_task():# 任务逻辑print("Task is running!")return "Task done!"# 并发任务
@shared_task
def add(x, y):print(x + y)return x + y

在app.views.py里添加视图函数

from django.shortcuts import render
from django.http import HttpResponse, JsonResponse
from home.tasks import adddef trigger_tasks(request):for i in range(10):add.delay(i, i + 1)  # 异步调用任务return HttpResponse("Tasks triggered!")def trigger_task(request):# 调用并发任务result = add.delay(4, 6)  # 异步调用任务# 获取任务 IDtask_id = result.idreturn JsonResponse({'task_id': task_id})def get_result(request, task_id):from celery.result import AsyncResultresult = AsyncResult(task_id)if result.ready():  # 检查任务是否完成response = {'task_id': task_id,'status': result.status,'result': result.result,  # 获取任务结果}else:response = {'task_id': task_id,'status': result.status,'result': None,}return JsonResponse(response)

设置定时任务 使用 Celery Beat 来设定定时任务。在你的 settings.py 中添加

from celery.schedules import crontabCELERY_BEAT_SCHEDULE = {'run-my-task-every-midnight': {'task': 'home.tasks.my_task',# 'schedule': crontab(minute=0, hour=0),  # 每天的 0 点 0 分执行'schedule': crontab('*/1'),  # 每1分钟执行一次},
}

创建tasks.py

from celery import shared_task# 定时任务
@shared_task
def my_task():# 任务逻辑print("Task is running!")return "Task done!"# 并发任务
@shared_task
def add(x, y):print(x + y)return x + y

设置 URL 路由 在你的 urls.py 中添加相应的 URL 路由,以便可以访问触发任务和获取结果的视图:

# home/urls.py
from django.urls import path
from . import viewsurlpatterns = [path('trigger_tasks/', views.trigger_tasks, name='trigger_tasks'),path('trigger-task/', views.trigger_task, name='trigger_task'),path('get-result/<str:task_id>/', views.get_result, name='get_result'),
]
# urls.py
from django.contrib import admin
from django.urls import path, includeurlpatterns = [path('admin/', admin.site.urls),path('home/', include('home.urls')),
]

启动 Celery Worker 和 Beat 在命令行中,启动 Celery Worker 和 Beat

celery -A celeryDemo worker --loglevel=info --concurrency=4 -P threadcelery -A celeryDemo beat --loglevel=info

启动django项目

python manage.py runserver 8002

监控和调试

确保你能看到 Worker 的日志输出,以验证任务是否成功执行。你可以使用 Flower 来监控 Celery 任务的执行情况:

pip install flower
celery -A celeryDemo flower

然后访问 http://localhost:5555 以查看任务的状态。

调用异步任务返回id
![调用异步任务返回id](https://i-blog.csdnimg.cn/direct/d46c5f92121745eabd5c5bec3223928e.png)

获取异步任务结果
在这里插入图片描述

查看记录
在这里插入图片描述

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com