Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 15 additions & 11 deletions apps/worker/tasks/notify.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import sentry_sdk
from asgiref.sync import async_to_sync
from celery.exceptions import MaxRetriesExceededError
from sqlalchemy import and_
from sqlalchemy.orm.session import Session

Expand Down Expand Up @@ -174,23 +173,28 @@ def _attempt_retry(
*args,
**kwargs,
):
try:
self._call_upload_breadcrumb_task(
commit_sha=commit.commitid,
repo_id=commit.repoid,
milestone=Milestones.NOTIFICATIONS_SENT,
error=Errors.INTERNAL_RETRYING,
)
self.retry(max_retries=max_retries, countdown=countdown)
except MaxRetriesExceededError:
self._call_upload_breadcrumb_task(
commit_sha=commit.commitid,
repo_id=commit.repoid,
milestone=Milestones.NOTIFICATIONS_SENT,
error=Errors.INTERNAL_RETRYING,
)
if not self.safe_retry(max_retries=max_retries, countdown=countdown):
# Handle both UserYaml objects and dicts
yaml_dict = None
if current_yaml:
if hasattr(current_yaml, "to_dict"):
yaml_dict = current_yaml.to_dict()
elif isinstance(current_yaml, dict):
yaml_dict = current_yaml
log.warning(
"Not attempting to retry notifications since we already retried too many times",
extra={
"repoid": commit.repoid,
"commit": commit.commitid,
"max_retries": max_retries,
"next_countdown_would_be": countdown,
"current_yaml": current_yaml.to_dict(),
"current_yaml": yaml_dict,
},
)
self.log_checkpoint(UploadFlow.NOTIF_TOO_MANY_RETRIES)
Expand Down
29 changes: 24 additions & 5 deletions apps/worker/tasks/tests/unit/test_notify_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import httpx
import pytest
import respx
from celery.exceptions import MaxRetriesExceededError, Retry
from celery.exceptions import Retry
from freezegun import freeze_time

from database.enums import Decoration, Notification, NotificationState
Expand Down Expand Up @@ -778,6 +778,11 @@ def test_simple_call_should_delay(
NotifyTask, "should_wait_longer", return_value=True
)
mocked_retry = mocker.patch.object(NotifyTask, "retry", side_effect=Retry())
mocked_safe_retry = mocker.patch.object(
NotifyTask,
"safe_retry",
side_effect=lambda **kwargs: mocked_retry(**kwargs),
)
fetch_and_update_whether_ci_passed_result = {}
mocker.patch.object(
NotifyTask,
Expand All @@ -800,7 +805,7 @@ def test_simple_call_should_delay(
commitid=commit.commitid,
current_yaml={},
)
mocked_retry.assert_called_with(countdown=15, max_retries=10)
mocked_safe_retry.assert_called_with(countdown=15, max_retries=10)
mocked_should_wait_longer.assert_called_with(
UserYaml({}), commit, fetch_and_update_whether_ci_passed_result
)
Expand Down Expand Up @@ -829,6 +834,11 @@ def test_simple_call_should_delay_using_integration(
NotifyTask, "should_wait_longer", return_value=True
)
mocked_retry = mocker.patch.object(NotifyTask, "retry", side_effect=Retry())
mocked_safe_retry = mocker.patch.object(
NotifyTask,
"safe_retry",
side_effect=lambda **kwargs: mocked_retry(**kwargs),
)
fetch_and_update_whether_ci_passed_result = {}
mocker.patch.object(
NotifyTask,
Expand All @@ -852,7 +862,7 @@ def test_simple_call_should_delay_using_integration(
commitid=commit.commitid,
current_yaml={},
)
mocked_retry.assert_called_with(countdown=180, max_retries=5)
mocked_safe_retry.assert_called_with(countdown=180, max_retries=5)
mocked_should_wait_longer.assert_called_with(
UserYaml({}), commit, fetch_and_update_whether_ci_passed_result
)
Expand Down Expand Up @@ -1057,6 +1067,15 @@ def test_notify_task_no_ghapp_available_one_rate_limited(
"tasks.notify.get_repo_provider_service"
)
mock_retry = mocker.patch.object(NotifyTask, "retry", return_value=None)

# Mock safe_retry to return True (retry scheduled) and call retry
def safe_retry_side_effect(**kwargs):
mock_retry(**kwargs)
return True # Return True to indicate retry was scheduled

mock_safe_retry = mocker.patch.object(
NotifyTask, "safe_retry", side_effect=safe_retry_side_effect
)
get_repo_provider_service.side_effect = NoConfiguredAppsAvailable(
apps_count=2, rate_limited_count=1, suspended_count=1
)
Expand All @@ -1078,7 +1097,7 @@ def test_notify_task_no_ghapp_available_one_rate_limited(
current_yaml=current_yaml,
)
assert res is None
mock_retry.assert_called_with(max_retries=10, countdown=45 * 60)
mock_safe_retry.assert_called_with(max_retries=10, countdown=45 * 60)
mock_self_app.tasks[upload_breadcrumb_task_name].apply_async.assert_has_calls(
[
call(
Expand Down Expand Up @@ -1286,7 +1305,7 @@ def test_notify_task_max_retries_exceeded(
self, dbsession, mocker, mock_repo_provider, mock_self_app
):
mocker.patch.object(NotifyTask, "should_wait_longer", return_value=True)
mocker.patch.object(NotifyTask, "retry", side_effect=MaxRetriesExceededError())
mocker.patch.object(NotifyTask, "safe_retry", return_value=False)
mocked_fetch_and_update_whether_ci_passed = mocker.patch.object(
NotifyTask, "fetch_and_update_whether_ci_passed"
)
Expand Down
71 changes: 64 additions & 7 deletions apps/worker/tasks/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -327,10 +327,24 @@ def run_impl(
milestone=milestone,
error=Errors.INTERNAL_RETRYING,
)
self.retry(
if not self.safe_retry(
max_retries=10,
countdown=retry_countdown,
kwargs=upload_context.kwargs_for_retry(kwargs),
)
):
self.maybe_log_upload_checkpoint(UploadFlow.TOO_MANY_RETRIES)
self._call_upload_breadcrumb_task(
commit_sha=commitid,
repo_id=repoid,
milestone=milestone,
error=Errors.INTERNAL_OUT_OF_RETRIES,
)
return {
"was_setup": False,
"was_updated": False,
"tasks_were_scheduled": False,
"reason": "too_many_retries",
}

lock_name = upload_context.lock_name("upload")
try:
Expand Down Expand Up @@ -422,11 +436,24 @@ def run_impl(
milestone=milestone,
error=Errors.INTERNAL_RETRYING,
)
self.retry(
if not self.safe_retry(
max_retries=3,
countdown=retry_countdown,
kwargs=upload_context.kwargs_for_retry(kwargs),
)
):
self.maybe_log_upload_checkpoint(UploadFlow.TOO_MANY_RETRIES)
self._call_upload_breadcrumb_task(
commit_sha=commitid,
repo_id=repoid,
milestone=milestone,
error=Errors.INTERNAL_OUT_OF_RETRIES,
)
return {
"was_setup": False,
"was_updated": False,
"tasks_were_scheduled": False,
"reason": "too_many_retries",
}
except SoftTimeLimitExceeded:
self._call_upload_breadcrumb_task(
commit_sha=commitid,
Expand All @@ -435,11 +462,24 @@ def run_impl(
error=Errors.TASK_TIMED_OUT,
)
retry_countdown = 20 * 2**self.request.retries
self.retry(
if not self.safe_retry(
max_retries=3,
countdown=retry_countdown,
kwargs=upload_context.kwargs_for_retry(kwargs),
)
):
self.maybe_log_upload_checkpoint(UploadFlow.TOO_MANY_RETRIES)
self._call_upload_breadcrumb_task(
commit_sha=commitid,
repo_id=repoid,
milestone=milestone,
error=Errors.INTERNAL_OUT_OF_RETRIES,
)
return {
"was_setup": False,
"was_updated": False,
"tasks_were_scheduled": False,
"reason": "too_many_retries",
}

@sentry_sdk.trace
def run_impl_within_lock(
Expand Down Expand Up @@ -547,7 +587,24 @@ def run_impl_within_lock(
milestone=Milestones.COMPILING_UPLOADS,
error=Errors.INTERNAL_RETRYING,
)
self.retry(countdown=60, kwargs=upload_context.kwargs_for_retry(kwargs))
if not self.safe_retry(
max_retries=10,
countdown=60,
kwargs=upload_context.kwargs_for_retry(kwargs),
):
self.maybe_log_upload_checkpoint(UploadFlow.TOO_MANY_RETRIES)
self._call_upload_breadcrumb_task(
commit_sha=commit.commitid,
repo_id=repository.repoid,
milestone=Milestones.COMPILING_UPLOADS,
error=Errors.INTERNAL_OUT_OF_RETRIES,
)
return {
"was_setup": False,
"was_updated": False,
"tasks_were_scheduled": False,
"reason": "too_many_retries",
}
except Exception as e:
log.error(
"Unexpected error during initialize_and_save_report",
Expand Down
13 changes: 12 additions & 1 deletion apps/worker/tasks/upload_finisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,18 @@ def _process_reports_with_lock(
"number_retries": self.request.retries,
},
)
self.retry(max_retries=MAX_RETRIES, countdown=retry_in)
if not self.safe_retry(max_retries=MAX_RETRIES, countdown=retry_in):
self._call_upload_breadcrumb_task(
commit_sha=commitid,
repo_id=repoid,
milestone=milestone,
upload_ids=upload_ids,
error=Errors.INTERNAL_OUT_OF_RETRIES,
)
return {
"error": "Max retries exceeded while acquiring report lock",
"upload_ids": upload_ids,
}

def _handle_finisher_lock(
self,
Expand Down
19 changes: 18 additions & 1 deletion apps/worker/tasks/upload_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,24 @@ def on_processing_error(error: ProcessingError):
upload_ids=[arguments["upload_id"]],
error=Errors.INTERNAL_RETRYING,
)
self.retry(max_retries=error.max_retries, countdown=countdown)
if not self.safe_retry(
max_retries=error.max_retries, countdown=countdown
):
# Max retries exceeded - fall through to exception handling
# safe_retry already incremented the metric, so we just need to capture the exception
sentry_sdk.capture_exception(
error.error_class(error.error_text),
contexts={
"upload_details": {
"commitid": commitid,
"error_code": error.code,
"repoid": repoid,
"retry_count": self.request.retries,
"storage_location": error.params.get("location"),
"upload_id": arguments.get("upload_id"),
}
},
)
elif error.is_retryable and self.request.retries >= error.max_retries:
sentry_sdk.capture_exception(
error.error_class(error.error_text),
Expand Down