Skip to content

Commit bed7f4c

Browse files
authored
Move ML recap analysis to heavy-processing Celery worker (#4571)
1 parent 4883f6c commit bed7f4c

6 files changed

Lines changed: 565 additions & 132 deletions

File tree

backend/reviews/admin.py

Lines changed: 53 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,20 @@
1717
from users.admin_mixins import ConferencePermissionMixin
1818

1919

20+
def get_accepted_submissions(conference):
21+
return (
22+
Submission.objects.filter(conference=conference)
23+
.filter(
24+
Q(pending_status=Submission.STATUS.accepted)
25+
| Q(pending_status__isnull=True, status=Submission.STATUS.accepted)
26+
| Q(pending_status="", status=Submission.STATUS.accepted)
27+
)
28+
.select_related("speaker", "type", "audience_level")
29+
.prefetch_related("languages")
30+
.order_by("id")
31+
)
32+
33+
2034
class AvailableScoreOptionInline(admin.TabularInline):
2135
model = AvailableScoreOption
2236

@@ -366,16 +380,7 @@ def review_shortlist_view(self, request, review_session_id):
366380
return TemplateResponse(request, adapter.shortlist_template, context)
367381

368382
def _get_accepted_submissions(self, conference):
369-
return (
370-
Submission.objects.filter(conference=conference)
371-
.filter(
372-
Q(pending_status=Submission.STATUS.accepted)
373-
| Q(pending_status__isnull=True, status=Submission.STATUS.accepted)
374-
| Q(pending_status="", status=Submission.STATUS.accepted)
375-
)
376-
.select_related("speaker", "type", "audience_level")
377-
.prefetch_related("languages")
378-
)
383+
return get_accepted_submissions(conference)
379384

380385
def review_recap_view(self, request, review_session_id):
381386
review_session = ReviewSession.objects.get(id=review_session_id)
@@ -448,49 +453,51 @@ def review_recap_compute_analysis_view(self, request, review_session_id):
448453
raise PermissionDenied()
449454

450455
conference = review_session.conference
451-
accepted_submissions = self._get_accepted_submissions(conference)
456+
accepted_submissions = list(self._get_accepted_submissions(conference))
452457
force_recompute = request.GET.get("recompute") == "1"
453458

454-
from reviews.similar_talks import compute_similar_talks, compute_topic_clusters
459+
from django.core.cache import cache
455460

456-
similar_talks = compute_similar_talks(
457-
accepted_submissions,
458-
top_n=5,
459-
conference_id=conference.id,
460-
force_recompute=force_recompute,
461-
)
461+
from pycon.tasks import check_pending_heavy_processing_work
462+
from reviews.cache_keys import get_cache_key
463+
from reviews.tasks import compute_recap_analysis
462464

463-
topic_clusters = compute_topic_clusters(
464-
accepted_submissions,
465-
min_topic_size=3,
466-
conference_id=conference.id,
467-
force_recompute=force_recompute,
465+
combined_cache_key = get_cache_key(
466+
"recap_analysis", conference.id, accepted_submissions
468467
)
469468

470-
# Build submissions list with similar talks, sorted by highest similarity
471-
submissions_list = sorted(
472-
[
473-
{
474-
"id": s.id,
475-
"title": str(s.title),
476-
"type": s.type.name,
477-
"speaker": s.speaker.display_name if s.speaker else "Unknown",
478-
"similar": similar_talks.get(s.id, []),
479-
}
480-
for s in accepted_submissions
481-
],
482-
key=lambda x: max(
483-
(item["similarity"] for item in x["similar"]), default=0
484-
),
485-
reverse=True,
486-
)
469+
if not force_recompute:
470+
cached_result = cache.get(combined_cache_key)
471+
if cached_result is not None:
472+
return JsonResponse(cached_result)
487473

488-
return JsonResponse(
489-
{
490-
"submissions_list": submissions_list,
491-
"topic_clusters": topic_clusters,
492-
}
493-
)
474+
# Use cache.add as a lock to prevent duplicate task dispatch.
475+
# Short TTL so lock auto-expires if the worker is killed before cleanup.
476+
computing_key = f"{combined_cache_key}:computing"
477+
478+
# Check for stale lock from a crashed/finished task
479+
existing_task_id = cache.get(computing_key)
480+
if existing_task_id:
481+
from celery.result import AsyncResult
482+
483+
if AsyncResult(existing_task_id).state in (
484+
"SUCCESS",
485+
"FAILURE",
486+
"REVOKED",
487+
):
488+
cache.delete(computing_key)
489+
490+
if cache.add(computing_key, "pending", timeout=300):
491+
result = compute_recap_analysis.apply_async(
492+
args=[conference.id, combined_cache_key],
493+
kwargs={"force_recompute": force_recompute},
494+
queue="heavy_processing",
495+
)
496+
# Store task ID so subsequent requests can detect stale locks
497+
cache.set(computing_key, result.id, timeout=300)
498+
check_pending_heavy_processing_work.delay()
499+
500+
return JsonResponse({"status": "processing"})
494501

495502
def review_view(self, request, review_session_id, review_item_id):
496503
review_session = ReviewSession.objects.get(id=review_session_id)

backend/reviews/cache_keys.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
import hashlib
2+
3+
4+
def get_embedding_text(submission) -> str:
5+
"""Combine title, elevator_pitch, and abstract into a single string for embedding."""
6+
title = str(submission.title) if submission.title else ""
7+
elevator_pitch = str(submission.elevator_pitch) if submission.elevator_pitch else ""
8+
abstract = (
9+
str(submission.abstract)
10+
if hasattr(submission, "abstract") and submission.abstract
11+
else ""
12+
)
13+
return f"{title}. {elevator_pitch}. {abstract}"
14+
15+
16+
def get_cache_key(prefix: str, conference_id: int, submissions) -> str:
17+
"""Generate a cache key based on conference and submission content."""
18+
content_hash = hashlib.md5()
19+
for s in sorted(submissions, key=lambda x: x.id):
20+
content_hash.update(f"{s.id}:{get_embedding_text(s)}".encode())
21+
return f"{prefix}:conf_{conference_id}:{content_hash.hexdigest()}"

backend/reviews/similar_talks.py

Lines changed: 3 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import functools
2-
import hashlib
32
import logging
43

54
import nltk
@@ -12,6 +11,9 @@
1211
from sklearn.feature_extraction.text import CountVectorizer
1312
from sklearn.metrics.pairwise import cosine_similarity
1413

14+
from reviews.cache_keys import get_cache_key as _get_cache_key
15+
from reviews.cache_keys import get_embedding_text
16+
1517
logger = logging.getLogger(__name__)
1618

1719
CACHE_TIMEOUT = 60 * 60 * 24 # 24 hours
@@ -222,18 +224,6 @@ def get_embedding_model():
222224
return SentenceTransformer("all-MiniLM-L6-v2", token=False)
223225

224226

225-
def get_embedding_text(submission) -> str:
226-
"""Combine title, elevator_pitch, and abstract into a single string for embedding."""
227-
title = str(submission.title) if submission.title else ""
228-
elevator_pitch = str(submission.elevator_pitch) if submission.elevator_pitch else ""
229-
abstract = (
230-
str(submission.abstract)
231-
if hasattr(submission, "abstract") and submission.abstract
232-
else ""
233-
)
234-
return f"{title}. {elevator_pitch}. {abstract}"
235-
236-
237227
def _get_submission_languages(submissions) -> set[str]:
238228
"""Extract all unique language codes from submissions."""
239229
language_codes = set()
@@ -245,14 +235,6 @@ def _get_submission_languages(submissions) -> set[str]:
245235
return language_codes or {"en"}
246236

247237

248-
def _get_cache_key(prefix: str, conference_id: int, submissions) -> str:
249-
"""Generate a cache key based on conference and submission content."""
250-
content_hash = hashlib.md5()
251-
for s in sorted(submissions, key=lambda x: x.id):
252-
content_hash.update(f"{s.id}:{get_embedding_text(s)}".encode())
253-
return f"{prefix}:conf_{conference_id}:{content_hash.hexdigest()}"
254-
255-
256238
def compute_similar_talks(
257239
submissions, top_n=5, conference_id=None, force_recompute=False
258240
):

backend/reviews/tasks.py

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
import logging
2+
3+
from pycon.celery import app
4+
5+
logger = logging.getLogger(__name__)
6+
7+
RESULT_CACHE_TTL = 60 * 60 * 24 # 24 hours
8+
ERROR_CACHE_TTL = 60 * 2 # 2 minutes
9+
10+
11+
@app.task
12+
def compute_recap_analysis(conference_id, combined_cache_key, force_recompute=False):
13+
from django.core.cache import cache
14+
15+
from conferences.models import Conference
16+
from reviews.admin import get_accepted_submissions
17+
from reviews.similar_talks import (
18+
compute_similar_talks,
19+
compute_topic_clusters,
20+
)
21+
22+
try:
23+
conference = Conference.objects.get(id=conference_id)
24+
except Conference.DoesNotExist:
25+
logger.error(
26+
"Conference %s not found for recap analysis", conference_id
27+
)
28+
cache.delete(f"{combined_cache_key}:computing")
29+
return
30+
31+
accepted_submissions = list(get_accepted_submissions(conference))
32+
33+
try:
34+
# Pass conference_id=None to skip individual function caching;
35+
# the combined result is cached under combined_cache_key instead.
36+
similar_talks = compute_similar_talks(
37+
accepted_submissions,
38+
top_n=5,
39+
conference_id=None,
40+
force_recompute=force_recompute,
41+
)
42+
43+
topic_clusters = compute_topic_clusters(
44+
accepted_submissions,
45+
min_topic_size=3,
46+
conference_id=None,
47+
force_recompute=force_recompute,
48+
)
49+
50+
submissions_list = sorted(
51+
[
52+
{
53+
"id": s.id,
54+
"title": str(s.title),
55+
"type": s.type.name,
56+
"speaker": s.speaker.display_name if s.speaker else "Unknown",
57+
"similar": similar_talks.get(s.id, []),
58+
}
59+
for s in accepted_submissions
60+
],
61+
key=lambda x: max(
62+
(item["similarity"] for item in x["similar"]), default=0
63+
),
64+
reverse=True,
65+
)
66+
67+
result = {
68+
"submissions_list": submissions_list,
69+
"topic_clusters": topic_clusters,
70+
}
71+
72+
cache.set(combined_cache_key, result, RESULT_CACHE_TTL)
73+
74+
return result
75+
except Exception:
76+
logger.exception(
77+
"Failed to compute recap analysis for conference %s", conference_id
78+
)
79+
cache.set(
80+
combined_cache_key,
81+
{"status": "error", "message": "Analysis failed. Please try again."},
82+
ERROR_CACHE_TTL,
83+
)
84+
raise
85+
finally:
86+
cache.delete(f"{combined_cache_key}:computing")

backend/reviews/templates/reviews-recap.html

Lines changed: 83 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -570,6 +570,82 @@ <h2 class="recap-section-title">🔗 Similar Talks</h2>
570570
section.style.display = '';
571571
}
572572

573+
var pollTimer = null;
574+
var pollStartTime = null;
575+
var pollAttempt = 0;
576+
var POLL_TIMEOUT = 360000; // 6 minutes – must exceed backend lock TTL (300s)
577+
578+
function getNextPollInterval() {
579+
// Exponential backoff: 1s, 2s, 3s, 5s, 10s, 15s, 15s...
580+
var intervals = [1000, 2000, 3000, 5000, 10000, 15000];
581+
return intervals[Math.min(pollAttempt, intervals.length - 1)];
582+
}
583+
584+
function stopPolling() {
585+
if (pollTimer) {
586+
clearTimeout(pollTimer);
587+
pollTimer = null;
588+
}
589+
pollStartTime = null;
590+
pollAttempt = 0;
591+
}
592+
593+
function showError(message) {
594+
stopPolling();
595+
loading.style.display = 'none';
596+
btn.disabled = false;
597+
btn.textContent = 'Compute Topic Clusters & Similar Talks';
598+
recomputeBtn.disabled = false;
599+
recomputeBtn.textContent = 'Recompute (ignore cache)';
600+
errorDiv.textContent = message;
601+
errorDiv.style.display = '';
602+
}
603+
604+
function handleResult(data) {
605+
if (data.status === 'error') {
606+
showError(data.message || 'Analysis failed. Please try again.');
607+
recomputeBtn.style.display = '';
608+
btn.style.display = 'none';
609+
return;
610+
}
611+
612+
loading.style.display = 'none';
613+
btn.style.display = 'none';
614+
recomputeBtn.style.display = '';
615+
recomputeBtn.disabled = false;
616+
recomputeBtn.textContent = 'Recompute (ignore cache)';
617+
618+
renderTopicClusters(data.topic_clusters);
619+
renderSimilarTalks(data.submissions_list);
620+
}
621+
622+
function pollForResults() {
623+
if (pollStartTime && (Date.now() - pollStartTime) > POLL_TIMEOUT) {
624+
showError('Analysis is taking longer than expected. Please try again later.');
625+
return;
626+
}
627+
628+
fetch(computeUrl, {
629+
headers: { 'X-Requested-With': 'XMLHttpRequest' }
630+
})
631+
.then(function(response) {
632+
if (!response.ok) throw new Error('Server error: ' + response.status);
633+
return response.json();
634+
})
635+
.then(function(data) {
636+
if (data.status === 'processing') {
637+
pollAttempt++;
638+
pollTimer = setTimeout(pollForResults, getNextPollInterval());
639+
return;
640+
}
641+
stopPolling();
642+
handleResult(data);
643+
})
644+
.catch(function(err) {
645+
showError('Failed to compute analysis: ' + err.message);
646+
});
647+
}
648+
573649
function fetchAnalysis(recompute) {
574650
var url = recompute ? computeUrl + '?recompute=1' : computeUrl;
575651
var activeBtn = recompute ? recomputeBtn : btn;
@@ -587,14 +663,13 @@ <h2 class="recap-section-title">🔗 Similar Talks</h2>
587663
return response.json();
588664
})
589665
.then(function(data) {
590-
loading.style.display = 'none';
591-
btn.style.display = 'none';
592-
recomputeBtn.style.display = '';
593-
recomputeBtn.disabled = false;
594-
recomputeBtn.textContent = 'Recompute (ignore cache)';
595-
596-
renderTopicClusters(data.topic_clusters);
597-
renderSimilarTalks(data.submissions_list);
666+
if (data.status === 'processing') {
667+
pollStartTime = Date.now();
668+
pollAttempt = 0;
669+
pollTimer = setTimeout(pollForResults, getNextPollInterval());
670+
return;
671+
}
672+
handleResult(data);
598673
})
599674
.catch(function(err) {
600675
loading.style.display = 'none';

0 commit comments

Comments
 (0)