Skip to content

Commit 8a09f51

Browse files
authored
feat:add job-class kwarg to rqworker (#160)
1 parent 0b6d3b5 commit 8a09f51

File tree

5 files changed

+67
-3
lines changed

5 files changed

+67
-3
lines changed

scheduler/management/commands/rqworker.py

+4
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import click
66
from django.core.management.base import BaseCommand
77
from django.db import connections
8+
from django.template.defaultfilters import default
89
from redis.exceptions import ConnectionError
910
from rq.logutils import setup_loghandlers
1011

@@ -47,6 +48,8 @@ def add_arguments(self, parser):
4748
help='Maximum number of jobs to execute before terminating worker')
4849
parser.add_argument('--fork-job-execution', action='store', default=True, dest='fork_job_execution', type=bool,
4950
help='Fork job execution to another process')
51+
parser.add_argument('--job-class', action='store', dest='job_class',
52+
help='Jobs class to use')
5053
parser.add_argument(
5154
'queues', nargs='*', type=str,
5255
help='The queues to work on, separated by space, all queues should be using the same redis')
@@ -71,6 +74,7 @@ def handle(self, **options):
7174
w = create_worker(
7275
*queues,
7376
name=options['name'],
77+
job_class=options.get('job_class'),
7478
default_worker_ttl=options['worker_ttl'],
7579
fork_job_execution=options['fork_job_execution'], )
7680

scheduler/rq_classes.py

+8-3
Original file line numberDiff line numberDiff line change
@@ -70,9 +70,14 @@ def stop_execution(self, connection: Redis):
7070

7171
class DjangoWorker(Worker):
7272
def __init__(self, *args, **kwargs):
73-
self.fork_job_execution = kwargs.pop('fork_job_execution', True)
74-
kwargs['job_class'] = JobExecution
75-
kwargs['queue_class'] = DjangoQueue
73+
self.fork_job_execution = kwargs.pop("fork_job_execution", True)
74+
job_class = kwargs.get("job_class") or JobExecution
75+
if not isinstance(job_class, type) or not issubclass(job_class, JobExecution):
76+
raise ValueError("job_class must be a subclass of JobExecution")
77+
78+
# Update kwargs with the potentially modified job_class
79+
kwargs["job_class"] = job_class
80+
kwargs["queue_class"] = DjangoQueue
7681
super(DjangoWorker, self).__init__(*args, **kwargs)
7782

7883
def __eq__(self, other):

scheduler/tests/test_mgmt_cmds.py

+33
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,39 @@ def test_rqworker__no_queues_params(self):
3636
for job in jobs:
3737
self.assertTrue(job.is_failed)
3838

39+
def test_rqworker__job_class_param__green(self):
40+
queue = get_queue('default')
41+
42+
# enqueue some jobs that will fail
43+
jobs = []
44+
job_ids = []
45+
for _ in range(0, 3):
46+
job = queue.enqueue(failing_job)
47+
jobs.append(job)
48+
job_ids.append(job.id)
49+
50+
# Create a worker to execute these jobs
51+
call_command('rqworker', '--job-class', 'scheduler.rq_classes.JobExecution', fork_job_execution=False, burst=True)
52+
53+
# check if all jobs are really failed
54+
for job in jobs:
55+
self.assertTrue(job.is_failed)
56+
57+
def test_rqworker__bad_job_class__fail(self):
58+
queue = get_queue('default')
59+
60+
# enqueue some jobs that will fail
61+
jobs = []
62+
job_ids = []
63+
for _ in range(0, 3):
64+
job = queue.enqueue(failing_job)
65+
jobs.append(job)
66+
job_ids.append(job.id)
67+
68+
# Create a worker to execute these jobs
69+
with self.assertRaises(ImportError):
70+
call_command('rqworker', '--job-class', 'rq.badclass', fork_job_execution=False, burst=True)
71+
3972
def test_rqworker__run_jobs(self):
4073
queue = get_queue('default')
4174

scheduler/tests/test_worker.py

+12
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
import os
22
import uuid
33

4+
from rq.job import Job
5+
from scheduler.rq_classes import JobExecution
46
from scheduler.tests.testtools import SchedulerBaseCase
57
from scheduler.tools import create_worker
68
from . import test_settings # noqa
@@ -38,3 +40,13 @@ def test_create_worker__scheduler_interval(self):
3840
worker.work(burst=True)
3941
self.assertEqual(worker.scheduler.interval, 1)
4042
settings.SCHEDULER_CONFIG['SCHEDULER_INTERVAL'] = prev
43+
44+
def test_get_worker_with_custom_job_class(self):
45+
# Test with string representation of job_class
46+
worker = create_worker('default', job_class='scheduler.rq_classes.JobExecution')
47+
self.assertTrue(issubclass(worker.job_class, Job))
48+
self.assertTrue(issubclass(worker.job_class, JobExecution))
49+
50+
def test_get_worker_without_custom_job_class(self):
51+
worker = create_worker('default')
52+
self.assertTrue(issubclass(worker.job_class, JobExecution))

scheduler/tools.py

+10
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import croniter
55
from django.apps import apps
66
from django.utils import timezone
7+
from django.utils.module_loading import import_string
78

89
from scheduler.queues import get_queues, logger, get_queue
910
from scheduler.rq_classes import DjangoWorker, MODEL_NAMES
@@ -71,6 +72,15 @@ def create_worker(*queue_names, **kwargs):
7172
kwargs['name'] = _calc_worker_name(existing_worker_names)
7273

7374
kwargs['name'] = kwargs['name'].replace('/', '.')
75+
76+
# Handle job_class if provided
77+
if 'job_class' not in kwargs or kwargs["job_class"] is None:
78+
kwargs['job_class'] = 'scheduler.rq_classes.JobExecution'
79+
try:
80+
kwargs['job_class'] = import_string(kwargs['job_class'])
81+
except ImportError:
82+
raise ImportError(f"Could not import job class {kwargs['job_class']}")
83+
7484
worker = DjangoWorker(queues, connection=queues[0].connection, **kwargs)
7585
return worker
7686

0 commit comments

Comments
 (0)