Skip to content
Snippets Groups Projects
Verified Commit c81581cc authored by Nik | Klampfradler's avatar Nik | Klampfradler
Browse files

Simplify and rename task recording decorator

parent 0bdf5664
No related branches found
No related tags found
2 merge requests!494Refactor ProgressRecorder for non-optional Celery usage and add doc strings,!491Resolve "Make Celery non-optional"
import time
from decimal import Decimal
from functools import wraps
from typing import Callable, Union
from celery_progress.backend import PROGRESS_STATE, AbstractProgressRecorder
......@@ -99,26 +99,15 @@ class ProgressRecorder(AbstractProgressRecorder):
self.set_progress(self.current)
@classmethod
def record(cls, orig: Callable):
"""Decorate a function to use Celery and a progress recorder.
It wraps the function in a Celery task and calls its delay method when invoked.
Additionally, it adds the matching `ProgressRecorder` instance as first argument.
def recorded_task(cls, orig: Callable) -> app.Task:
"""Create a Celery task that receives a ProgressRecorder.
Returns a Task object with a wrapper that passes the recorder instance
as the recorder keyword argument.
"""
@wraps(orig)
def _inject_recorder(task, *args, **kwargs):
recorder = ProgressRecorder(task)
return orig(*args, **kwargs, recorder=recorder)
def recorder_func(self, *args, **kwargs):
recorder = ProgressRecorder(self)
orig(recorder, *args, **kwargs)
var_name = f"{orig.__module__}.{orig.__name__}"
task = app.task(recorder_func, bind=True, name=var_name)
def wrapped(*args, **kwargs):
return task.delay(*args, **kwargs)
return wrapped
return app.task(_inject_recorder, bind=True)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment