Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 48 additions & 7 deletions apps/worker/tasks/export_test_analytics_data.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
import logging
import pathlib
import tempfile
Expand All @@ -18,6 +19,9 @@

log = logging.getLogger(__name__)

# Batch size for processing test runs from the database
BATCH_SIZE = 10000


def serialize_test_run(test_run: dict) -> list:
"""
Expand Down Expand Up @@ -96,7 +100,6 @@ def run_impl(
"error": f"No repositories found for owner {integration_name}",
}

# Initialize GCS client and bucket
gcs_client = storage.Client(project=gcp_project_id)
bucket = gcs_client.bucket(destination_bucket)

Expand Down Expand Up @@ -132,20 +135,58 @@ def run_impl(
.order_by("-timestamp")
.values(*fields)
)
test_runs_data = [
serialize_test_run(tr) for tr in list(test_runs_qs)
]

repo_data = {"fields": fields, "data": test_runs_data}
# Stream test runs to a JSON file
with tempfile.NamedTemporaryFile(
mode="w",
suffix=".json",
delete=False,
dir=temp_dir,
) as json_file:
json_file.write('{"fields": ')
json.dump(fields, json_file)
json_file.write(', "data": [')

first_item = True
total_processed = 0

for test_run in test_runs_qs.iterator(
chunk_size=BATCH_SIZE
):
if not first_item:
json_file.write(", ")
else:
first_item = False

json.dump(serialize_test_run(test_run), json_file)
total_processed += 1

if total_processed % BATCH_SIZE == 0:
log.debug(
f"Processed {total_processed} test runs for {repo_name}"
)

json_file.write("]}")
json_file_path = json_file.name

# Upload the JSON file, then cleaning it up
blob_name = f"{integration_name}/{repo_name}.json"
archiver.upload_json(blob_name, repo_data)
with open(json_file_path, "rb") as f:
archiver._add_file(blob_name, f)

pathlib.Path(json_file_path).unlink()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think we should have a try here to handle still running .unlink even if we run into a problem uploading the file

Copy link
Contributor Author

@adrianviquez adrianviquez Dec 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrapped the add_file call on a try/finally so that file cleanup happens. Also added missing_ok=True in case the temp file isnt created for some reason and we prevent an exception from it.


repositories_succeeded.append({"name": repo_name})

end_time = datetime.now()
duration = (end_time - start_time).total_seconds()
log.info(
f"Successfully processed repository {repo_name}: {len(test_runs_data)} test runs in {duration:.2f}s"
"Successfully processed repository test runs",
extra={
"name": repo_name,
"total_processed": total_processed,
"duration": duration,
},
)
except Exception as e:
log.error(
Expand Down
47 changes: 20 additions & 27 deletions apps/worker/tasks/tests/unit/test_export_test_analytics_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,10 @@ def test_successful_export_test_run_data(
mock_gcs_and_archiver,
sample_test_run_data,
):
mock_testrun.objects.filter.return_value.order_by.return_value.values.return_value = sample_test_run_data
# Create a mock queryset that supports iterator()
mock_queryset = MagicMock()
mock_queryset.iterator.return_value = iter(sample_test_run_data)
mock_testrun.objects.filter.return_value.order_by.return_value.values.return_value = mock_queryset

result = ExportTestAnalyticsDataTask().run_impl(
dbsession,
Expand All @@ -202,11 +205,10 @@ def test_successful_export_test_run_data(
assert len(result["repositories_failed"]) == 0

archiver_instance = mock_gcs_and_archiver["archiver_instance"]
archiver_instance.upload_json.assert_called_once()
call_args = archiver_instance.upload_json.call_args
# The new implementation uses _add_file instead of upload_json
archiver_instance._add_file.assert_called_once()
call_args = archiver_instance._add_file.call_args
assert call_args[0][0] == "test_owner/test_repo.json"
assert "fields" in call_args[0][1]
assert "data" in call_args[0][1]

@patch("tasks.export_test_analytics_data.Testrun")
@patch("tasks.export_test_analytics_data.sentry_sdk")
Expand Down Expand Up @@ -260,9 +262,9 @@ def filter_side_effect(*args, **kwargs):
raise Exception("Processing error for failing_repo")
else:
mock_result = MagicMock()
mock_result.order_by.return_value.values.return_value = (
sample_test_run_data
)
mock_queryset = MagicMock()
mock_queryset.iterator.return_value = iter(sample_test_run_data)
mock_result.order_by.return_value.values.return_value = mock_queryset
return mock_result

mock_testrun.objects.filter.side_effect = filter_side_effect
Expand All @@ -288,7 +290,7 @@ def filter_side_effect(*args, **kwargs):
assert mock_sentry.capture_exception.call_count == 1

archiver_instance = mock_gcs_and_archiver["archiver_instance"]
assert archiver_instance.upload_json.call_count == 2
assert archiver_instance._add_file.call_count == 2

@patch("tasks.export_test_analytics_data.Testrun")
def test_export_with_multiple_test_runs(
Expand All @@ -299,7 +301,10 @@ def test_export_with_multiple_test_runs(
mock_gcs_and_archiver,
multiple_test_runs_data,
):
mock_testrun.objects.filter.return_value.order_by.return_value.values.return_value = multiple_test_runs_data
# Create a mock queryset that supports iterator()
mock_queryset = MagicMock()
mock_queryset.iterator.return_value = iter(multiple_test_runs_data)
mock_testrun.objects.filter.return_value.order_by.return_value.values.return_value = mock_queryset

result = ExportTestAnalyticsDataTask().run_impl(
dbsession,
Expand All @@ -314,20 +319,8 @@ def test_export_with_multiple_test_runs(
assert len(result["repositories_failed"]) == 0

archiver_instance = mock_gcs_and_archiver["archiver_instance"]
archiver_instance.upload_json.assert_called_once()
call_args = archiver_instance.upload_json.call_args
uploaded_data = call_args[0][1]

assert len(uploaded_data["data"]) == 3
assert uploaded_data["fields"] == [
"filename",
"timestamp",
"testsuite",
"outcome",
"duration_seconds",
"failure_message",
"framework",
"commit_sha",
"branch",
"flags",
]
archiver_instance._add_file.assert_called_once()
# We can't easily inspect the JSON file content with the new streaming approach,
# but we can verify the correct blob name was used
call_args = archiver_instance._add_file.call_args
assert call_args[0][0] == "test_owner/test_repo.json"
Loading