-
Notifications
You must be signed in to change notification settings - Fork 10
CCMRG-1864 Add Dead Letter Queue (DLQ) for Failed Tasks #561
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 9 commits
e3225d9
d502524
4fc2b93
375f18d
7315ea5
40d8774
6c41200
c247bfa
9a3b255
e07cb3b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,119 @@ | ||
| """ | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. no, this should follow the paradigm with Django, please see other |
||
| Django Admin interface for Dead Letter Queue (DLQ) management. | ||
| This provides a web-based interface for inspecting and recovering tasks | ||
| that were saved to the DLQ after exhausting all retries. | ||
| """ | ||
|
|
||
| from django.contrib import admin, messages | ||
| from django.http import HttpResponseRedirect | ||
| from django.shortcuts import render | ||
| from django.urls import path, re_path, reverse | ||
|
|
||
| from services.task.task import celery_app | ||
| from shared.celery_config import dlq_recovery_task_name | ||
|
|
||
|
|
||
| def list_dlq(request): | ||
| """List all DLQ keys.""" | ||
| task_name_filter = request.GET.get("task_name", None) | ||
| # Call DLQ recovery task synchronously via Celery | ||
| # The task is registered in the worker app, but send_task will route it correctly | ||
| result = celery_app.send_task( | ||
| dlq_recovery_task_name, | ||
| kwargs={"action": "list", "task_name_filter": task_name_filter}, | ||
| ).get(timeout=30) # 30 second timeout for admin operations | ||
|
|
||
| if result.get("success"): | ||
| keys = result.get("keys", []) | ||
| context = { | ||
| **admin.site.each_context(request), | ||
| "title": "Dead Letter Queue", | ||
| "keys": keys, | ||
| "total_keys": result.get("total_keys", 0), | ||
| "task_name_filter": task_name_filter, | ||
| "opts": {"app_label": "core", "model_name": "dlq"}, | ||
| } | ||
| return render(request, "admin/core/dlq_list.html", context) | ||
| else: | ||
| messages.error(request, f"Failed to list DLQ keys: {result.get('error')}") | ||
| return HttpResponseRedirect(reverse("admin:index")) | ||
|
|
||
|
|
||
| def recover_dlq(request, dlq_key): | ||
| """Recover tasks from a DLQ key.""" | ||
| # Django automatically URL-decodes the path parameter | ||
| # Call DLQ recovery task synchronously via Celery | ||
| result = celery_app.send_task( | ||
| dlq_recovery_task_name, | ||
| kwargs={"action": "recover", "dlq_key": dlq_key}, | ||
| ).get(timeout=60) # 60 second timeout for recovery operations | ||
|
|
||
| if result.get("success"): | ||
| recovered = result.get("recovered_count", 0) | ||
| failed = result.get("failed_count", 0) | ||
| if recovered > 0: | ||
| messages.success( | ||
| request, | ||
| f"Successfully recovered {recovered} task(s) from DLQ key: {dlq_key}", | ||
| ) | ||
| if failed > 0: | ||
| messages.warning( | ||
| request, | ||
| f"Failed to recover {failed} task(s) from DLQ key: {dlq_key}", | ||
| ) | ||
| else: | ||
| messages.error( | ||
| request, f"Failed to recover tasks: {result.get('error', 'Unknown error')}" | ||
| ) | ||
|
|
||
| return HttpResponseRedirect(reverse("admin:core_dlq_list")) | ||
|
|
||
|
|
||
| def delete_dlq(request, dlq_key): | ||
| """Delete tasks from a DLQ key.""" | ||
| # Django automatically URL-decodes the path parameter | ||
| # Call DLQ recovery task synchronously via Celery | ||
| result = celery_app.send_task( | ||
| dlq_recovery_task_name, | ||
| kwargs={"action": "delete", "dlq_key": dlq_key}, | ||
| ).get(timeout=30) # 30 second timeout for delete operations | ||
|
|
||
| if result.get("success"): | ||
| deleted = result.get("deleted_count", 0) | ||
| messages.success( | ||
| request, | ||
| f"Successfully deleted {deleted} task(s) from DLQ key: {dlq_key}", | ||
| ) | ||
| else: | ||
| messages.error( | ||
| request, f"Failed to delete tasks: {result.get('error', 'Unknown error')}" | ||
| ) | ||
|
|
||
| return HttpResponseRedirect(reverse("admin:core_dlq_list")) | ||
|
|
||
|
|
||
| # Register DLQ admin URLs by extending admin site's get_urls | ||
| _original_get_urls = admin.site.get_urls | ||
|
|
||
|
|
||
| def get_urls_with_dlq(): | ||
| """Add DLQ URLs to admin site.""" | ||
| dlq_urls = [ | ||
| path("dlq/", admin.site.admin_view(list_dlq), name="core_dlq_list"), | ||
| # Use re_path to handle slashes in dlq_key | ||
| re_path( | ||
| r"^dlq/recover/(?P<dlq_key>.+)/$", | ||
| admin.site.admin_view(recover_dlq), | ||
| name="core_dlq_recover", | ||
| ), | ||
| re_path( | ||
| r"^dlq/delete/(?P<dlq_key>.+)/$", | ||
| admin.site.admin_view(delete_dlq), | ||
| name="core_dlq_delete", | ||
| ), | ||
| ] | ||
| return dlq_urls + _original_get_urls() | ||
|
|
||
|
|
||
| admin.site.get_urls = get_urls_with_dlq | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,52 @@ | ||
| {% extends "admin/base_site.html" %} | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. does this really need its own template? why can't we use django admin's built-in?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is no db model for the dead letter queue, so it needs a little bit of custom template. Will attempt to reformat it more in line with other pages . |
||
| {% load i18n admin_urls static admin_list %} | ||
|
|
||
| {% block title %}Dead Letter Queue{% endblock %} | ||
|
|
||
| {% block content %} | ||
| <h1>Dead Letter Queue</h1> | ||
|
|
||
| {% if task_name_filter %} | ||
| <p>Filtered by task name: <strong>{{ task_name_filter }}</strong></p> | ||
| {% endif %} | ||
|
|
||
| <p>Total DLQ keys: <strong>{{ total_keys }}</strong></p> | ||
|
|
||
| {% if keys %} | ||
| <table> | ||
| <thead> | ||
| <tr> | ||
| <th>DLQ Key</th> | ||
| <th>Task Count</th> | ||
| <th>TTL (seconds)</th> | ||
| <th>Actions</th> | ||
| </tr> | ||
| </thead> | ||
| <tbody> | ||
| {% for key_info in keys %} | ||
| <tr> | ||
| <td><code>{{ key_info.key }}</code></td> | ||
| <td>{{ key_info.count }}</td> | ||
| <td>{{ key_info.ttl_seconds }}</td> | ||
| <td> | ||
| <a href="{% url 'admin:core_dlq_recover' dlq_key=key_info.key %}" class="button">Recover</a> | ||
| <a href="{% url 'admin:core_dlq_delete' dlq_key=key_info.key %}" class="button" onclick="return confirm('Are you sure you want to delete this DLQ key?');">Delete</a> | ||
| </td> | ||
| </tr> | ||
| {% endfor %} | ||
| </tbody> | ||
| </table> | ||
| {% else %} | ||
| <p>No DLQ keys found.</p> | ||
| {% endif %} | ||
|
|
||
| <form method="get" action="{% url 'admin:core_dlq_list' %}"> | ||
| <label for="task_name">Filter by task name:</label> | ||
| <input type="text" name="task_name" id="task_name" value="{{ task_name_filter|default:'' }}" placeholder="e.g., app.tasks.upload.Upload"> | ||
| <input type="submit" value="Filter"> | ||
| {% if task_name_filter %} | ||
| <a href="{% url 'admin:core_dlq_list' %}">Clear filter</a> | ||
| {% endif %} | ||
| </form> | ||
| {% endblock %} | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why are we importing this? is should be its own
admin.pyfile