diff --git a/frontend/src/pages/FailedOcrPage.tsx b/frontend/src/pages/FailedOcrPage.tsx index f2ea023..87c47e9 100644 --- a/frontend/src/pages/FailedOcrPage.tsx +++ b/frontend/src/pages/FailedOcrPage.tsx @@ -49,7 +49,7 @@ import { OpenInNew as OpenInNewIcon, } from '@mui/icons-material'; import { format } from 'date-fns'; -import { api, documentService } from '../services/api'; +import { api, documentService, queueService } from '../services/api'; import DocumentViewer from '../components/DocumentViewer'; interface FailedDocument { @@ -138,6 +138,7 @@ const FailedOcrPage: React.FC = () => { const [loading, setLoading] = useState(true); const [duplicatesLoading, setDuplicatesLoading] = useState(false); const [retrying, setRetrying] = useState(null); + const [retryingAll, setRetryingAll] = useState(false); const [statistics, setStatistics] = useState(null); const [duplicateStatistics, setDuplicateStatistics] = useState(null); const [pagination, setPagination] = useState({ page: 1, limit: 25 }); @@ -258,6 +259,39 @@ const FailedOcrPage: React.FC = () => { } }; + const handleRetryAllFailed = async () => { + try { + setRetryingAll(true); + const response = await queueService.requeueFailed(); + + if (response.data.requeued_count > 0) { + setSnackbar({ + open: true, + message: `Successfully queued ${response.data.requeued_count} failed documents for OCR retry. Check the queue stats for progress.`, + severity: 'success' + }); + + // Refresh the list to update status + await fetchFailedDocuments(); + } else { + setSnackbar({ + open: true, + message: 'No failed documents found to retry', + severity: 'info' + }); + } + } catch (error) { + console.error('Failed to retry all failed OCR:', error); + setSnackbar({ + open: true, + message: 'Failed to retry all failed OCR documents', + severity: 'error' + }); + } finally { + setRetryingAll(false); + } + }; + const formatFileSize = (bytes: number): string => { if (bytes === 0) return '0 B'; const k = 1024; @@ -444,7 +478,7 @@ const FailedOcrPage: React.FC = () => { variant="outlined" startIcon={} onClick={refreshCurrentTab} - disabled={loading || duplicatesLoading} + disabled={loading || duplicatesLoading || retryingAll} > Refresh @@ -491,6 +525,19 @@ const FailedOcrPage: React.FC = () => { {statistics.total_failed} + + + diff --git a/migrations/20250628000002_populate_ocr_queue_from_pending.sql b/migrations/20250628000002_populate_ocr_queue_from_pending.sql new file mode 100644 index 0000000..2ed1cad --- /dev/null +++ b/migrations/20250628000002_populate_ocr_queue_from_pending.sql @@ -0,0 +1,36 @@ +-- Populate OCR queue with documents that have pending OCR status +-- This migration addresses the issue where documents marked as pending +-- by the confidence backfill migration are not in the processing queue + +-- Insert pending documents into OCR queue +INSERT INTO ocr_queue (document_id, priority, file_size, created_at) +SELECT + id, + -- Calculate priority based on file size (smaller files get higher priority) + CASE + WHEN file_size <= 1048576 THEN 10 -- <= 1MB: highest priority + WHEN file_size <= 5242880 THEN 8 -- 1-5MB: high priority + WHEN file_size <= 10485760 THEN 6 -- 5-10MB: medium priority + WHEN file_size <= 52428800 THEN 4 -- 10-50MB: low priority + ELSE 2 -- > 50MB: lowest priority + END as priority, + file_size, + NOW() as created_at +FROM documents +WHERE ocr_status = 'pending' + AND id NOT IN (SELECT document_id FROM ocr_queue) -- Avoid duplicates + AND file_path IS NOT NULL -- Only queue documents with files + AND (mime_type LIKE 'image/%' OR mime_type = 'application/pdf' OR mime_type = 'text/plain'); -- Only OCR-able types + +-- Log the result +DO $$ +DECLARE + queued_count INTEGER; +BEGIN + GET DIAGNOSTICS queued_count = ROW_COUNT; + RAISE NOTICE 'Added % pending documents to OCR queue for processing', queued_count; +END $$; + +-- Create helpful index for monitoring +CREATE INDEX IF NOT EXISTS idx_ocr_queue_document_status +ON ocr_queue(document_id, status, created_at); \ No newline at end of file diff --git a/src/bin/enqueue_pending_ocr.rs b/src/bin/enqueue_pending_ocr.rs new file mode 100644 index 0000000..750469e --- /dev/null +++ b/src/bin/enqueue_pending_ocr.rs @@ -0,0 +1,118 @@ +/*! + * CLI tool to enqueue documents with pending OCR status + * + * This addresses the issue where documents marked as pending by migrations + * are not automatically added to the OCR processing queue. + * + * Usage: cargo run --bin enqueue_pending_ocr + */ + +use anyhow::Result; +use sqlx::Row; +use tracing::{info, warn, error}; +use uuid::Uuid; + +use readur::{ + config::Config, + db::Database, + ocr::queue::OcrQueueService, +}; + +#[tokio::main] +async fn main() -> Result<()> { + // Initialize logging + tracing_subscriber::fmt::init(); + + info!("🔍 Scanning for documents with pending OCR status..."); + + // Load configuration + let config = Config::from_env()?; + + // Connect to database + let db = Database::new(&config.database_url).await?; + let queue_service = OcrQueueService::new(db.clone(), db.get_pool().clone(), 1); + + // Find documents with pending OCR status that aren't in the queue + let pending_documents = sqlx::query( + r#" + SELECT d.id, d.filename, d.file_size, d.mime_type, d.created_at + FROM documents d + LEFT JOIN ocr_queue oq ON d.id = oq.document_id + WHERE d.ocr_status = 'pending' + AND oq.document_id IS NULL + AND d.file_path IS NOT NULL + AND (d.mime_type LIKE 'image/%' OR d.mime_type = 'application/pdf' OR d.mime_type = 'text/plain') + ORDER BY d.created_at ASC + "# + ) + .fetch_all(db.get_pool()) + .await?; + + if pending_documents.is_empty() { + info!("✅ No pending documents found that need to be queued"); + return Ok(()); + } + + info!("📋 Found {} documents with pending OCR status", pending_documents.len()); + + // Prepare batch insert data + let mut documents_to_queue = Vec::new(); + + for row in &pending_documents { + let document_id: Uuid = row.get("id"); + let filename: String = row.get("filename"); + let file_size: i64 = row.get("file_size"); + let mime_type: String = row.get("mime_type"); + + // Calculate priority based on file size + let priority = match file_size { + 0..=1048576 => 10, // <= 1MB: highest priority + ..=5242880 => 8, // 1-5MB: high priority + ..=10485760 => 6, // 5-10MB: medium priority + ..=52428800 => 4, // 10-50MB: low priority + _ => 2, // > 50MB: lowest priority + }; + + let size_mb = file_size as f64 / (1024.0 * 1024.0); + info!(" 📄 {} ({}) - {:.2} MB - Priority {}", + filename, mime_type, size_mb, priority); + + documents_to_queue.push((document_id, priority, file_size)); + } + + // Batch enqueue documents + info!("🚀 Enqueuing {} documents for OCR processing...", documents_to_queue.len()); + + match queue_service.enqueue_documents_batch(documents_to_queue).await { + Ok(queue_ids) => { + info!("✅ Successfully queued {} documents for OCR processing", queue_ids.len()); + info!("🔄 OCR worker should start processing these documents automatically"); + + // Show queue statistics + match queue_service.get_stats().await { + Ok(stats) => { + info!("📊 Queue Statistics:"); + info!(" • Pending: {}", stats.pending_count); + info!(" • Processing: {}", stats.processing_count); + info!(" • Failed: {}", stats.failed_count); + info!(" • Completed today: {}", stats.completed_today); + if let Some(wait_time) = stats.avg_wait_time_minutes { + info!(" • Average wait time: {:.1} minutes", wait_time); + } + if let Some(oldest) = stats.oldest_pending_minutes { + info!(" • Oldest pending: {:.1} minutes", oldest); + } + } + Err(e) => { + warn!("Failed to get queue statistics: {}", e); + } + } + } + Err(e) => { + error!("❌ Failed to enqueue documents: {}", e); + return Err(e.into()); + } + } + + Ok(()) +} \ No newline at end of file diff --git a/src/routes/queue.rs b/src/routes/queue.rs index 59d9e42..9dacba5 100644 --- a/src/routes/queue.rs +++ b/src/routes/queue.rs @@ -21,6 +21,7 @@ pub fn router() -> Router> { Router::new() .route("/stats", get(get_queue_stats)) .route("/requeue-failed", post(requeue_failed)) + .route("/enqueue-pending", post(enqueue_pending_documents)) .route("/pause", post(pause_ocr_processing)) .route("/resume", post(resume_ocr_processing)) .route("/status", get(get_ocr_status)) @@ -172,4 +173,81 @@ async fn get_ocr_status( "is_paused": is_paused, "status": if is_paused { "paused" } else { "running" } }))) +} + +#[utoipa::path( + post, + path = "/api/queue/enqueue-pending", + tag = "queue", + security( + ("bearer_auth" = []) + ), + responses( + (status = 200, description = "Pending documents queued successfully"), + (status = 401, description = "Unauthorized"), + (status = 403, description = "Forbidden - Admin access required"), + (status = 500, description = "Internal server error") + ) +)] +async fn enqueue_pending_documents( + State(state): State>, + auth_user: AuthUser, +) -> Result, StatusCode> { + require_admin(&auth_user)?; + + // Find all documents with pending OCR status that aren't already in the queue + let pending_documents = sqlx::query( + r#" + SELECT d.id, d.file_size + FROM documents d + LEFT JOIN ocr_queue oq ON d.id = oq.document_id + WHERE d.ocr_status = 'pending' + AND oq.document_id IS NULL + AND d.file_path IS NOT NULL + AND (d.mime_type LIKE 'image/%' OR d.mime_type = 'application/pdf' OR d.mime_type = 'text/plain') + ORDER BY d.created_at ASC + "# + ) + .fetch_all(state.db.get_pool()) + .await + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + + if pending_documents.is_empty() { + return Ok(Json(serde_json::json!({ + "queued_count": 0, + "message": "No pending documents found to queue" + }))); + } + + // Prepare batch insert data + let documents_to_queue: Vec<(uuid::Uuid, i32, i64)> = pending_documents + .into_iter() + .map(|row| { + let document_id: uuid::Uuid = row.get("id"); + let file_size: i64 = row.get("file_size"); + + // Calculate priority based on file size + let priority = match file_size { + 0..=1048576 => 10, // <= 1MB: highest priority + ..=5242880 => 8, // 1-5MB: high priority + ..=10485760 => 6, // 5-10MB: medium priority + ..=52428800 => 4, // 10-50MB: low priority + _ => 2, // > 50MB: lowest priority + }; + + (document_id, priority, file_size) + }) + .collect(); + + // Batch enqueue documents + let queue_ids = state.queue_service + .enqueue_documents_batch(documents_to_queue) + .await + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + + Ok(Json(serde_json::json!({ + "queued_count": queue_ids.len(), + "message": format!("Successfully queued {} pending documents for OCR processing", queue_ids.len()), + "queue_ids": queue_ids + }))) } \ No newline at end of file