Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 6 additions & 30 deletions apps/worker/helpers/notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
from services.repository import (
EnrichedPull,
fetch_and_update_pull_request_information_from_commit,
fetch_pull_request_information,
get_repo_provider_service,
)
from services.yaml import UserYaml
Expand All @@ -19,7 +18,6 @@
from shared.torngit.base import TorngitBaseAdapter
from shared.torngit.exceptions import TorngitClientError
from shared.torngit.response_types import ProviderPull
from shared.upload.types import TAUploadContext

log = logging.getLogger(__name__)

Expand All @@ -38,17 +36,11 @@ class BaseNotifier:

This class is responsible for building and sending notifications related to
a specific commit.

Note that `commit` supports both a Commit object and a TAUploadContext
object so that it can be used in the new TA pipeline that is compliant with
Sentry's retention policies. Depending on which is passed in, a slightly
different code path is taken. Repository is also required as a parameter
since it is not given with TAUploadContext like it is with Commit.
"""

repo: Repository | SQLAlchemyRepository
# TODO: Deprecate database-reliant code path after old TA pipeline is removed
commit: Commit | TAUploadContext
commit: Commit
commit_yaml: UserYaml | None
_pull: EnrichedPull | ProviderPull | None | Literal[False] = False
_repo_service: TorngitBaseAdapter | None = None
Expand All @@ -57,26 +49,15 @@ def get_pull(self, do_log=True) -> EnrichedPull | ProviderPull | None:
repo_service = self.get_repo_service()

if self._pull is False:
if isinstance(self.commit, Commit):
self._pull = async_to_sync(
fetch_and_update_pull_request_information_from_commit
)(repo_service, self.commit, self.commit_yaml)
else:
self._pull = async_to_sync(fetch_pull_request_information)(
repo_service,
self.repo.repoid,
self.commit["commit_sha"],
self.commit["branch"],
self.commit["pull_id"],
)
self._pull = async_to_sync(
fetch_and_update_pull_request_information_from_commit
)(repo_service, self.commit, self.commit_yaml)

if self._pull is None and do_log:
log.info(
"Not notifying since there is no pull request associated with this commit",
extra={
"commitid": self.commit.commitid
if isinstance(self.commit, Commit)
else self.commit["commit_sha"],
"commitid": self.commit.commitid,
"repoid": self.repo.repoid,
},
)
Expand Down Expand Up @@ -133,12 +114,7 @@ def send_to_provider(self, pull: EnrichedPull | ProviderPull, message: str) -> b
except TorngitClientError:
log.error(
"Error creating/updating PR comment",
extra={
"commitid": self.commit.commitid
if isinstance(self.commit, Commit)
else self.commit["commit_sha"],
"pullid": pullid,
},
extra={"commitid": self.commit.commitid, "pullid": pullid},
)
return False

Expand Down
4 changes: 1 addition & 3 deletions apps/worker/services/test_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,9 +378,7 @@ def build_message(self) -> str:
generate_view_test_analytics_line(
# TODO: Deprecate database-reliant code path after old TA pipeline is removed
self.repo,
self.commit.branch
if isinstance(self.commit, Commit)
else self.commit["branch"],
self.commit.branch,
)
)
return "\n".join(message)
Expand Down
26 changes: 0 additions & 26 deletions apps/worker/services/tests/test_test_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
from shared.plan.constants import DEFAULT_FREE_PLAN
from shared.torngit.exceptions import TorngitClientError
from shared.torngit.response_types import ProviderPull
from shared.upload.types import TAUploadContext
from tests.helpers import mock_all_plans_and_tiers


Expand All @@ -48,31 +47,6 @@ def test_get_pull_none(mocker):
assert res is None


def test_get_pull_provider_pull(mocker):
provider_pull = ProviderPull(
id="1",
number="1",
title="Test PR",
state="open",
author={"username": "test_user"},
base={"branch": "main", "commitid": "base_commit"},
head={"branch": "feature", "commitid": "head_commit"},
merge_commit_sha=None,
)
mocker.patch(
"helpers.notifier.fetch_pull_request_information",
return_value=provider_pull,
)
repo = RepositoryFactory(repoid=12)
commit = TAUploadContext(commit_sha="abc123", branch="main", pull_id=1)
tn = TestResultsNotifier(repo, commit, None)
tn._repo_service = mock_repo_service()

res = tn.get_pull()

assert res == provider_pull


def test_send_to_provider():
commit = CommitFactory()
tn = TestResultsNotifier(commit.repository, commit, None)
Expand Down
8 changes: 5 additions & 3 deletions apps/worker/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,17 @@
from tasks.compute_comparison import compute_comparison_task
from tasks.compute_component_comparison import compute_component_comparison_task
from tasks.delete_owner import delete_owner_task
from tasks.detect_flakes import detect_flakes_task
from tasks.experimental.detect_flakes import detect_flakes_task
from tasks.experimental.ingest_testruns import ingest_testruns_task
from tasks.experimental.test_analytics_notifier import (
test_analytics_notifier_task,
)
from tasks.flush_repo import flush_repo
from tasks.github_app_webhooks_check import gh_webhook_check_task
from tasks.github_marketplace import ghm_sync_plans_task
from tasks.health_check import health_check_task
from tasks.hourly_check import hourly_check_task
from tasks.http_request import http_request_task
from tasks.ingest_testruns import ingest_testruns_task
from tasks.manual_trigger import manual_trigger_task
from tasks.mark_owner_for_deletion import mark_owner_for_deletion_task
from tasks.new_user_activated import new_user_activated_task
Expand All @@ -49,7 +52,6 @@
from tasks.sync_repo_languages_gql import sync_repo_languages_gql_task
from tasks.sync_repos import sync_repos_task
from tasks.sync_teams import sync_teams_task
from tasks.test_analytics_notifier import test_analytics_notifier_task
from tasks.test_results_finisher import test_results_finisher_task
from tasks.test_results_processor import test_results_processor_task
from tasks.timeseries_backfill import (
Expand Down
39 changes: 39 additions & 0 deletions apps/worker/tasks/experimental/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# Experimental Test Analytics Tasks

This module contains an experimental pipeline that mirrors an aspirational Test Analytics (TA) flow. The code is **not** production-ready—each task exists solely for exploration and will evolve substantially before promotion.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add this was generated by claude


## Task Topology

1. `IngestTestruns`
- Parses raw test-result uploads and writes normalized rows via the timeseries helpers.
- Creates `TAUpload` placeholders when the repository is eligible, queueing work for downstream tasks.
2. `DetectFlakes`
- Claims queued `TAUpload` rows and updates both `Testrun` and `Flake` state to reflect newly-detected flaky behavior.
- Reschedules itself while the queue has pending uploads, acting as the flake-processing loop.
3. `TestAnalyticsNotifierTask`
- Hydrates a `TestResultsNotifier` with the processed summaries and posts draft PR commentary where configured.
- Uses fencing tokens and redis locks to deduplicate concurrent notifications.

All tasks are registered with Celery but intentionally sequestered beneath the `experimental` namespace to prevent accidental inclusion in default worker imports.

## Coordinating the Pipeline

Within `apps/codecov-api/services/task/task.py`, the `TaskService` remains the entry point for scheduling. A prospective orchestration flow might resemble:

```python
service.detect_flakes(repo_id) # ensures the processing loop is active
service.schedule_task(
celery_config.ingest_testruns_task_name,
kwargs={"repoid": repo_id, "upload_context": context},
apply_async_kwargs={"countdown": 0},
)
```

Downstream notifications would be triggered either by ingest completion hooks or explicit API routes once the ingest/detect stages succeed. As we iterate, we will introduce dedicated service methods that chain the experimental tasks; for now, invoke the Celery signatures directly via the `TaskService` helpers when running local experiments.

## Running Locally

- Ensure the worker environment has access to Redis, the Django ORM, and any storage backends referenced by the upload context.
- Because these tasks are prototypes, expect migrations or data contracts to change. Clear related test data frequently during iteration.

Please share findings in the TA pipeline RFC and surface blockers to the Data Platform crew.
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,12 @@ class DetectFlakes(BaseCodecovTask, name=detect_flakes_task_name):
and releasing the lock is not atomic)

to guard against this, the task that just released the lock must check again

---
Experimental note: this task remains untested and is not production-ready. It
currently serves as part of the exploratory TA pipeline alongside the
experimental ingest-testruns and notifier tasks to validate the future
orchestration flow.
"""

def run_impl(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,17 @@ def not_private_and_free_or_team(repo: Repository):


class IngestTestruns(BaseCodecovTask, name=ingest_testruns_task_name):
"""
The idea with test analytics is that it's a pipeline of tasks, so this is one
of the first tasks in that pipeline. It needs to be fast and reliable, and if
anything fails we should be able to retry it.

---
Experimental note: this prototype is untested and not production-ready. It
emits `TAUpload` rows for the experimental detect-flakes task and drives the
draft notifier flow while we iterate on the future TA pipeline.
"""

def run_impl(
self,
_db_session,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ class TestAnalyticsNotifierTask(
"""
Send test analytics notifications while ensuring compliance with Sentry's
data retention policies.

---
Experimental note: this notifier remains an untested concept that depends on
the experimental ingest and detect-flakes tasks. Its purpose is to provide a
sandboxed flow for future TA pipeline development, and it should not be used
in production environments yet.
"""

def run_impl(
Expand Down Expand Up @@ -282,7 +288,7 @@ def notification_preparation(

notifier = TestResultsNotifier(
repo,
upload_context,
upload_context, # TODO: this won't work because test results notifier doesn't currently support the TAUpload Context
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you elaborate on "this won't work"

Should we just comment out the whole param to avoid the footgun?

commit_yaml,
_pull=pull,
_repo_service=repo_service,
Expand Down