Skip to content
Snippets Groups Projects

Resolve "Pre-define periodic tasks"

Merged Nik | Klampfradler requested to merge 602-pre-define-periodic-tasks into master
4 files
+ 74
2
Compare changes
  • Side-by-side
  • Inline
Files
4
@@ -332,3 +332,60 @@ def process_custom_context_processors(context_processors: list) -> Dict[str, Any
for processor in processors:
context.update(processor(None))
return context
def create_default_celery_schedule():
"""Create default periodic tasks in database for tasks that have a schedule defined."""
from celery import current_app
from celery.schedules import BaseSchedule, crontab, schedule, solar
from django_celery_beat.clockedschedule import clocked
from django_celery_beat.models import (
ClockedSchedule,
CrontabSchedule,
IntervalSchedule,
PeriodicTask,
SolarSchedule,
)
defined_periodic_tasks = PeriodicTask.objects.values_list("task", flat=True).all()
for name, task in current_app.tasks.items():
if name in defined_periodic_tasks:
# Task is already known in database, skip
continue
run_every = getattr(task, "run_every", None)
if not run_every:
# Task has no default schedule, skip
continue
if isinstance(run_every, (float, int, timedelta)):
# Schedule is defined as a raw seconds value or timedelta, convert to schedule class
run_every = schedule(run_every)
elif not isinstance(run_every, BaseSchedule):
raise ValueError(f"Task {name} has an invalid schedule defined.")
# Find matching django-celery-beat schedule model
if isinstance(run_every, clocked):
Schedule = ClockedSchedule
attr = "clocked"
elif isinstance(run_every, crontab):
Schedule = CrontabSchedule
attr = "crontab"
elif isinstance(run_every, schedule):
Schedule = IntervalSchedule
attr = "interval"
elif isinstance(run_every, solar):
Schedule = SolarSchedule
attr = "solar"
else:
raise ValueError(f"Task {name} has an unknown schedule class defined.")
# Get or create schedule in database
db_schedule = Schedule.from_schedule(run_every)
db_schedule.save()
# Create periodic task
PeriodicTask.objects.create(
name=f"{name} (default schedule)", task=name, **{attr: db_schedule}
)
Loading