Skip to content

Commit a2a83c3

Browse files
Filter out jobs from different queues (#13)
* Filter out jobs from different queues * Fix mypy * Fix tests
1 parent 40d78cc commit a2a83c3

3 files changed

Lines changed: 44 additions & 5 deletions

File tree

arq_admin/queue.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -168,9 +168,16 @@ async def _get_job_id_to_status_map(self) -> Dict[str, JobStatus]:
168168
job_ids_with_prefixes = (match.groupdict() for match in regex_matches_from_arq_keys if match is not None)
169169

170170
job_ids_to_scores = {key[0].decode('utf-8'): key[1] for key in job_ids_with_scores}
171+
job_ids_in_queue = set(job_ids_to_scores.keys())
171172
job_ids_to_prefixes = dict(sorted(
172173
# not only ensure that we don't get key error but also filter out stuff that's not a client job
173-
([key['job_id'], key['prefix']] for key in job_ids_with_prefixes if key['prefix'] in PREFIX_PRIORITY),
174+
(
175+
[key['job_id'], key['prefix']]
176+
for key in job_ids_with_prefixes
177+
if key['prefix'] in PREFIX_PRIORITY and (
178+
key['job_id'] in job_ids_in_queue or key['prefix'] == 'result'
179+
)
180+
),
174181
# make sure that more specific indices go after less specific ones
175182
key=lambda job_id_with_prefix: PREFIX_PRIORITY[job_id_with_prefix[-1]],
176183
))
@@ -189,4 +196,4 @@ def _get_job_status_from_raw_data(self, prefix: str, zscore: Optional[int]) -> J
189196
return JobStatus.in_progress
190197
if zscore:
191198
return JobStatus.deferred if zscore > timestamp_ms() else JobStatus.queued
192-
return JobStatus.not_found
199+
return JobStatus.not_found # pragma: nocover

arq_admin/views.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ class QueueListView(ListView):
2121

2222
def get_queryset(self) -> List[QueueStats]:
2323
result = asyncio.run(self._gather_queues())
24-
return result
24+
return result # pragma: nocover
2525

2626
def get_context_data(self, **kwargs: Any) -> Dict[str, Any]:
2727
context = super().get_context_data(**kwargs)
@@ -60,7 +60,7 @@ def job_status(self) -> str:
6060
def get_queryset(self) -> List[JobInfo]:
6161
queue_name = self.kwargs['queue_name'] # pragma: no cover
6262
jobs = asyncio.run(self._get_queue_jobs(queue_name))
63-
return sorted(jobs, key=attrgetter('enqueue_time'))
63+
return sorted(jobs, key=attrgetter('enqueue_time')) # pragma: nocover
6464

6565
def get_context_data(self, **kwargs: Any) -> Dict[str, Any]:
6666
context = super().get_context_data(**kwargs)

tests/test_views.py

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,11 @@
66
from django.contrib.messages import get_messages
77
from django.http import HttpResponseRedirect
88
from django.template.response import TemplateResponse
9-
from django.test import AsyncClient
9+
from django.test import AsyncClient, override_settings
1010
from django.urls import reverse
1111

1212
from arq_admin.queue import Queue
13+
from tests.settings import REDIS_SETTINGS
1314

1415

1516
@pytest.mark.asyncio()
@@ -125,3 +126,34 @@ async def test_post_job_abort_view(
125126
assert len(messages) == 1
126127
message = messages[0]
127128
assert message.tags == message_tag
129+
130+
131+
@pytest.mark.asyncio()
132+
@pytest.mark.django_db()
133+
@pytest.mark.usefixtures('django_login')
134+
@override_settings(ARQ_QUEUES={
135+
default_queue_name: REDIS_SETTINGS,
136+
'arq:queue2': REDIS_SETTINGS,
137+
})
138+
async def test_two_queues_detail_views(async_client: AsyncClient, redis: ArqRedis) -> None:
139+
second_queue_name = 'arq:queue2'
140+
import arq_admin.settings as arq_admin_settings
141+
from django.conf import settings as django_settings
142+
arq_admin_settings.ARQ_QUEUES = django_settings.ARQ_QUEUES
143+
144+
await redis.enqueue_job('successful_task', _job_id='job1', _queue_name=default_queue_name)
145+
await redis.enqueue_job('successful_task', _job_id='job2', _queue_name=second_queue_name)
146+
147+
# Check detail view for default queue
148+
url1 = reverse('arq_admin:all_jobs', kwargs={'queue_name': default_queue_name})
149+
result1 = await async_client.get(url1)
150+
assert isinstance(result1, TemplateResponse)
151+
assert len(result1.context_data['object_list']) == 1
152+
assert result1.context_data['object_list'][0].job_id == 'job1'
153+
154+
# Check detail view for second queue
155+
url2 = reverse('arq_admin:all_jobs', kwargs={'queue_name': second_queue_name})
156+
result2 = await async_client.get(url2)
157+
assert isinstance(result2, TemplateResponse)
158+
assert len(result2.context_data['object_list']) == 1
159+
assert result2.context_data['object_list'][0].job_id == 'job2'

0 commit comments

Comments
 (0)