Skip to content

Commit f90e39d

Browse files
Handle redis errors on queues view (#7)
1 parent 2175aaa commit f90e39d

5 files changed

Lines changed: 120 additions & 74 deletions

File tree

arq_admin/queue.py

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import asyncio
22
from dataclasses import dataclass
3-
from typing import List, NamedTuple, Optional, Set
3+
from typing import List, Optional, Set
44

55
from arq import ArqRedis
66
from arq.connections import RedisSettings
@@ -13,15 +13,18 @@
1313
from arq_admin.redis import get_redis
1414

1515

16-
class QueueStats(NamedTuple):
16+
@dataclass
17+
class QueueStats:
1718
name: str
1819
host: str
1920
port: int
2021
database: int
2122

22-
queued_jobs: int
23-
running_jobs: int
24-
deferred_jobs: int
23+
queued_jobs: Optional[int] = None
24+
running_jobs: Optional[int] = None
25+
deferred_jobs: Optional[int] = None
26+
27+
error: Optional[str] = None
2528

2629

2730
@dataclass
@@ -50,20 +53,24 @@ async def get_jobs(self, status: Optional[JobStatus] = None) -> List[JobInfo]:
5053
return jobs
5154

5255
async def get_stats(self) -> QueueStats:
53-
async with get_redis(self.redis_settings) as redis:
54-
job_ids = await self._get_job_ids(redis)
55-
56-
statuses = await asyncio.gather(*[self._get_job_status(job_id, redis) for job_id in job_ids])
57-
58-
return QueueStats(
56+
result = QueueStats(
5957
name=self.name,
6058
host=str(self.redis_settings.host),
6159
port=self.redis_settings.port,
6260
database=self.redis_settings.database,
63-
queued_jobs=len([status for status in statuses if status == JobStatus.queued]),
64-
running_jobs=len([status for status in statuses if status == JobStatus.in_progress]),
65-
deferred_jobs=len([status for status in statuses if status == JobStatus.deferred]),
6661
)
62+
try:
63+
async with get_redis(self.redis_settings) as redis:
64+
job_ids = await self._get_job_ids(redis)
65+
statuses = await asyncio.gather(*[self._get_job_status(job_id, redis) for job_id in job_ids])
66+
except Exception as ex: # noqa: B902
67+
result.error = str(ex)
68+
else:
69+
result.queued_jobs = len([status for status in statuses if status == JobStatus.queued])
70+
result.running_jobs = len([status for status in statuses if status == JobStatus.in_progress])
71+
result.deferred_jobs = len([status for status in statuses if status == JobStatus.deferred])
72+
73+
return result
6774

6875
async def get_job_by_id(self, job_id: str, redis: Optional[ArqRedis] = None) -> JobInfo:
6976
if redis is None:

arq_admin/templates/arq_admin/queues.html

Lines changed: 71 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -3,67 +3,88 @@
33
{% block title %}Queues {{ block.super }}{% endblock %}
44

55
{% block extrastyle %}
6-
{{ block.super }}
7-
<style>table {width: 100%;}</style>
6+
{{ block.super }}
7+
<style>
8+
table {
9+
width: 100%;
10+
}
11+
</style>
812
{% endblock %}
913

1014
{% block content_title %}<h1>ARQ Queues</h1>{% endblock %}
1115

1216
{% block breadcrumbs %}
13-
<div class="breadcrumbs">
14-
<a href="{% url 'admin:index' %}">Home</a> &rsaquo;
15-
<a href="{% url 'arq_admin:home' %}">Django ARQ</a>
16-
</div>
17+
<div class="breadcrumbs">
18+
<a href="{% url 'admin:index' %}">Home</a> &rsaquo;
19+
<a href="{% url 'arq_admin:home' %}">Django ARQ</a>
20+
</div>
1721
{% endblock %}
1822

1923
{% block content %}
2024

21-
<div id="content-main">
25+
<div id="content-main">
2226

2327
<div class="module">
24-
<table>
25-
<thead>
26-
<tr>
27-
<th>Name</th>
28-
<th>Queued Jobs</th>
29-
<th>Deferred Jobs</th>
30-
<th>Running Jobs</th>
31-
<th>Host</th>
32-
<th>Port</th>
33-
<th>DB</th>
34-
</tr>
35-
</thead>
36-
<tbody>
37-
{% for queue in object_list %}
38-
<tr class = "{% cycle 'row1' 'row2' %}">
39-
<th>
40-
<a href="{% url 'arq_admin:all_jobs' queue.name %}">
41-
{{ queue.name }}
42-
</a>
43-
</th>
44-
<td>
45-
<a href="{% url 'arq_admin:queued_jobs' queue.name %}">
46-
{{ queue.queued_jobs }}
47-
</a>
48-
</td>
49-
<th>
50-
<a href="{% url 'arq_admin:deferred_jobs' queue.name %}">
51-
{{ queue.deferred_jobs }}
52-
</a>
53-
</th>
54-
<th>
55-
<a href="{% url 'arq_admin:running_jobs' queue.name %}">
56-
{{ queue.running_jobs }}
57-
</a>
58-
</th>
59-
<td>{{ queue.host }}</td>
60-
<td>{{ queue.port }}</td>
61-
<td>{{ queue.database }}</td>
62-
</tr>
63-
{% endfor %}
64-
</tbody>
65-
</table>
28+
{% for queue in object_list %}
29+
{% if queue.error %}
30+
<h2>{{ queue.name }} - <span style="color: red;">{{ queue.error }}</span></h2>
31+
{% endif %}
32+
{% endfor %}
33+
<table>
34+
<thead>
35+
<tr>
36+
<th>Name</th>
37+
<th>Queued Jobs</th>
38+
<th>Deferred Jobs</th>
39+
<th>Running Jobs</th>
40+
<th>Host</th>
41+
<th>Port</th>
42+
<th>DB</th>
43+
</tr>
44+
</thead>
45+
<tbody>
46+
{% for queue in object_list %}
47+
<tr class="{% cycle 'row1' 'row2' %}">
48+
<th>
49+
<a href="{% url 'arq_admin:all_jobs' queue.name %}">
50+
{{ queue.name }}
51+
</a>
52+
</th>
53+
<td>
54+
{% if queue.queued_jobs is None %}
55+
56+
{% else %}
57+
<a href="{% url 'arq_admin:queued_jobs' queue.name %}">
58+
{{ queue.queued_jobs }}
59+
</a>
60+
{% endif %}
61+
</td>
62+
<th>
63+
{% if queue.deferred_jobs is None %}
64+
65+
{% else %}
66+
<a href="{% url 'arq_admin:deferred_jobs' queue.name %}">
67+
{{ queue.deferred_jobs }}
68+
</a>
69+
{% endif %}
70+
</th>
71+
<th>
72+
{% if queue.running_jobs is None %}
73+
74+
{% else %}
75+
<a href="{% url 'arq_admin:running_jobs' queue.name %}">
76+
{{ queue.running_jobs }}
77+
</a>
78+
{% endif %}
79+
</th>
80+
<td>{{ queue.host }}</td>
81+
<td>{{ queue.port }}</td>
82+
<td>{{ queue.database }}</td>
83+
</tr>
84+
{% endfor %}
85+
</tbody>
86+
</table>
6687
</div>
67-
</div>
88+
</div>
6889

6990
{% endblock %}

arq_admin/views.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,22 @@ class QueueListView(ListView):
1818
template_name = 'arq_admin/queues.html'
1919

2020
def get_queryset(self) -> List[QueueStats]:
21-
async def _gather_queues() -> List[QueueStats]:
22-
tasks = [Queue.from_name(name).get_stats() for name in ARQ_QUEUES.keys()]
2321

24-
return await asyncio.gather(*tasks)
25-
26-
return asyncio.run(_gather_queues())
22+
result = asyncio.run(self._gather_queues())
23+
return result
2724

2825
def get_context_data(self, **kwargs: Any) -> Dict[str, Any]:
2926
context = super().get_context_data(**kwargs)
3027
context.update(admin.site.each_context(self.request))
3128

3229
return context
3330

31+
@staticmethod
32+
async def _gather_queues() -> List[QueueStats]:
33+
tasks = [Queue.from_name(name).get_stats() for name in ARQ_QUEUES.keys()] # pragma: nocover
34+
35+
return await asyncio.gather(*tasks)
36+
3437

3538
@method_decorator(staff_member_required, name='dispatch')
3639
class BaseJobListView(ListView):

setup.cfg

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,10 +68,10 @@ filterwarnings =
6868
ignore::UserWarning:pytest.*:
6969
ignore::ResourceWarning:redis.*:
7070
junit_family = xunit1
71-
;addopts =
72-
; --cov=arq_admin
73-
; --cov-fail-under 100
74-
; --cov-report term-missing
71+
addopts =
72+
--cov=arq_admin
73+
--cov-fail-under 100
74+
--cov-report term-missing
7575

7676
[coverage:run]
7777
source = arq_admin

tests/test_queue.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from unittest.mock import MagicMock, patch
1+
from unittest.mock import AsyncMock, MagicMock, patch
22

33
import pytest
44
from arq.constants import default_queue_name
@@ -42,6 +42,21 @@ async def test_stats() -> None:
4242
)
4343

4444

45+
@pytest.mark.asyncio()
46+
@patch.object(Queue, '_get_job_ids')
47+
async def test_stats_with_error(mocked_get_job_ids: AsyncMock) -> None:
48+
error_text = 'test error'
49+
mocked_get_job_ids.side_effect = Exception(error_text)
50+
queue = Queue.from_name(default_queue_name)
51+
assert await queue.get_stats() == QueueStats(
52+
name=default_queue_name,
53+
host=settings.REDIS_SETTINGS.host,
54+
port=settings.REDIS_SETTINGS.port,
55+
database=settings.REDIS_SETTINGS.database,
56+
error=error_text,
57+
)
58+
59+
4560
@pytest.mark.asyncio()
4661
@patch.object(Job, 'info')
4762
async def test_deserialize_error(mocked_job_info: MagicMock, jobs_creator: JobsCreator) -> None:

0 commit comments

Comments
 (0)