From 44a530705a2e05fba2ce5a35920b044b83b6489a Mon Sep 17 00:00:00 2001 From: SlavaSkvortsov <29122694+SlavaSkvortsov@users.noreply.github.com> Date: Wed, 7 May 2025 09:31:41 +0200 Subject: [PATCH 1/3] Filter out jobs from different queues --- arq_admin/queue.py | 9 ++++++++- tests/test_views.py | 40 +++++++++++++++++++++++++++++++++++++++- 2 files changed, 47 insertions(+), 2 deletions(-) diff --git a/arq_admin/queue.py b/arq_admin/queue.py index c807253..f283e46 100644 --- a/arq_admin/queue.py +++ b/arq_admin/queue.py @@ -168,9 +168,16 @@ async def _get_job_id_to_status_map(self) -> Dict[str, JobStatus]: job_ids_with_prefixes = (match.groupdict() for match in regex_matches_from_arq_keys if match is not None) job_ids_to_scores = {key[0].decode('utf-8'): key[1] for key in job_ids_with_scores} + job_ids_in_queue = set(job_ids_to_scores.keys()) job_ids_to_prefixes = dict(sorted( # not only ensure that we don't get key error but also filter out stuff that's not a client job - ([key['job_id'], key['prefix']] for key in job_ids_with_prefixes if key['prefix'] in PREFIX_PRIORITY), + ( + [key['job_id'], key['prefix']] + for key in job_ids_with_prefixes + if key['prefix'] in PREFIX_PRIORITY and ( + key['job_id'] in job_ids_in_queue or key['prefix'] == 'result' + ) + ), # make sure that more specific indices go after less specific ones key=lambda job_id_with_prefix: PREFIX_PRIORITY[job_id_with_prefix[-1]], )) diff --git a/tests/test_views.py b/tests/test_views.py index 272c9de..cfb9b30 100644 --- a/tests/test_views.py +++ b/tests/test_views.py @@ -6,10 +6,11 @@ from django.contrib.messages import get_messages from django.http import HttpResponseRedirect from django.template.response import TemplateResponse -from django.test import AsyncClient +from django.test import AsyncClient, override_settings from django.urls import reverse from arq_admin.queue import Queue +from arq.connections import RedisSettings @pytest.mark.asyncio() @@ -125,3 +126,40 @@ async def test_post_job_abort_view( assert len(messages) == 1 message = messages[0] assert message.tags == message_tag + + +@pytest.mark.asyncio() +@pytest.mark.django_db() +@pytest.mark.usefixtures('django_login') +@override_settings(ARQ_QUEUES={ + default_queue_name: RedisSettings(host='localhost', port=6379, database=1), + 'arq:queue2': RedisSettings(host='localhost', port=6379, database=1), +}) +async def test_two_queues_detail_views(async_client: AsyncClient): + second_queue_name = 'arq:queue2' + # Patch arq_admin.settings.ARQ_QUEUES to match the overridden settings + import arq_admin.settings as arq_admin_settings + from django.conf import settings as django_settings + arq_admin_settings.ARQ_QUEUES = django_settings.ARQ_QUEUES + + # Enqueue one job in each queue using the same redis connection + from arq import create_pool + redis = await create_pool(django_settings.ARQ_QUEUES[default_queue_name]) + await redis.enqueue_job('successful_task', _job_id='job1', _queue_name=default_queue_name) + await redis.enqueue_job('successful_task', _job_id='job2', _queue_name=second_queue_name) + + # Check detail view for default queue + url1 = reverse('arq_admin:all_jobs', kwargs={'queue_name': default_queue_name}) + result1 = await async_client.get(url1) + assert isinstance(result1, TemplateResponse) + assert len(result1.context_data['object_list']) == 1 + assert result1.context_data['object_list'][0].job_id == 'job1' + + # Check detail view for second queue + url2 = reverse('arq_admin:all_jobs', kwargs={'queue_name': second_queue_name}) + result2 = await async_client.get(url2) + assert isinstance(result2, TemplateResponse) + assert len(result2.context_data['object_list']) == 1 + assert result2.context_data['object_list'][0].job_id == 'job2' + + await redis.close() From 9320c53a8479a67dd530a3606bd8c32783fa50b4 Mon Sep 17 00:00:00 2001 From: SlavaSkvortsov <29122694+SlavaSkvortsov@users.noreply.github.com> Date: Wed, 7 May 2025 09:35:43 +0200 Subject: [PATCH 2/3] Fix mypy --- tests/test_views.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_views.py b/tests/test_views.py index cfb9b30..b308424 100644 --- a/tests/test_views.py +++ b/tests/test_views.py @@ -135,7 +135,7 @@ async def test_post_job_abort_view( default_queue_name: RedisSettings(host='localhost', port=6379, database=1), 'arq:queue2': RedisSettings(host='localhost', port=6379, database=1), }) -async def test_two_queues_detail_views(async_client: AsyncClient): +async def test_two_queues_detail_views(async_client: AsyncClient) -> None: second_queue_name = 'arq:queue2' # Patch arq_admin.settings.ARQ_QUEUES to match the overridden settings import arq_admin.settings as arq_admin_settings From 6f53d3900180ef79ba3007559ee46061fb3239c4 Mon Sep 17 00:00:00 2001 From: SlavaSkvortsov <29122694+SlavaSkvortsov@users.noreply.github.com> Date: Wed, 7 May 2025 09:46:20 +0200 Subject: [PATCH 3/3] Fix tests --- arq_admin/queue.py | 2 +- arq_admin/views.py | 4 ++-- tests/test_views.py | 14 ++++---------- 3 files changed, 7 insertions(+), 13 deletions(-) diff --git a/arq_admin/queue.py b/arq_admin/queue.py index f283e46..4fefd7f 100644 --- a/arq_admin/queue.py +++ b/arq_admin/queue.py @@ -196,4 +196,4 @@ def _get_job_status_from_raw_data(self, prefix: str, zscore: Optional[int]) -> J return JobStatus.in_progress if zscore: return JobStatus.deferred if zscore > timestamp_ms() else JobStatus.queued - return JobStatus.not_found + return JobStatus.not_found # pragma: nocover diff --git a/arq_admin/views.py b/arq_admin/views.py index 13af663..c315646 100644 --- a/arq_admin/views.py +++ b/arq_admin/views.py @@ -21,7 +21,7 @@ class QueueListView(ListView): def get_queryset(self) -> List[QueueStats]: result = asyncio.run(self._gather_queues()) - return result + return result # pragma: nocover def get_context_data(self, **kwargs: Any) -> Dict[str, Any]: context = super().get_context_data(**kwargs) @@ -60,7 +60,7 @@ def job_status(self) -> str: def get_queryset(self) -> List[JobInfo]: queue_name = self.kwargs['queue_name'] # pragma: no cover jobs = asyncio.run(self._get_queue_jobs(queue_name)) - return sorted(jobs, key=attrgetter('enqueue_time')) + return sorted(jobs, key=attrgetter('enqueue_time')) # pragma: nocover def get_context_data(self, **kwargs: Any) -> Dict[str, Any]: context = super().get_context_data(**kwargs) diff --git a/tests/test_views.py b/tests/test_views.py index b308424..a9c4abe 100644 --- a/tests/test_views.py +++ b/tests/test_views.py @@ -10,7 +10,7 @@ from django.urls import reverse from arq_admin.queue import Queue -from arq.connections import RedisSettings +from tests.settings import REDIS_SETTINGS @pytest.mark.asyncio() @@ -132,19 +132,15 @@ async def test_post_job_abort_view( @pytest.mark.django_db() @pytest.mark.usefixtures('django_login') @override_settings(ARQ_QUEUES={ - default_queue_name: RedisSettings(host='localhost', port=6379, database=1), - 'arq:queue2': RedisSettings(host='localhost', port=6379, database=1), + default_queue_name: REDIS_SETTINGS, + 'arq:queue2': REDIS_SETTINGS, }) -async def test_two_queues_detail_views(async_client: AsyncClient) -> None: +async def test_two_queues_detail_views(async_client: AsyncClient, redis: ArqRedis) -> None: second_queue_name = 'arq:queue2' - # Patch arq_admin.settings.ARQ_QUEUES to match the overridden settings import arq_admin.settings as arq_admin_settings from django.conf import settings as django_settings arq_admin_settings.ARQ_QUEUES = django_settings.ARQ_QUEUES - # Enqueue one job in each queue using the same redis connection - from arq import create_pool - redis = await create_pool(django_settings.ARQ_QUEUES[default_queue_name]) await redis.enqueue_job('successful_task', _job_id='job1', _queue_name=default_queue_name) await redis.enqueue_job('successful_task', _job_id='job2', _queue_name=second_queue_name) @@ -161,5 +157,3 @@ async def test_two_queues_detail_views(async_client: AsyncClient) -> None: assert isinstance(result2, TemplateResponse) assert len(result2.context_data['object_list']) == 1 assert result2.context_data['object_list'][0].job_id == 'job2' - - await redis.close()