fix(ocr_status): populate the ocr queue with pending jobs and add easy 'retry' button
This commit is contained in:
parent
9f3371e4f3
commit
2d04f0094a
|
|
@ -49,7 +49,7 @@ import {
|
||||||
OpenInNew as OpenInNewIcon,
|
OpenInNew as OpenInNewIcon,
|
||||||
} from '@mui/icons-material';
|
} from '@mui/icons-material';
|
||||||
import { format } from 'date-fns';
|
import { format } from 'date-fns';
|
||||||
import { api, documentService } from '../services/api';
|
import { api, documentService, queueService } from '../services/api';
|
||||||
import DocumentViewer from '../components/DocumentViewer';
|
import DocumentViewer from '../components/DocumentViewer';
|
||||||
|
|
||||||
interface FailedDocument {
|
interface FailedDocument {
|
||||||
|
|
@ -138,6 +138,7 @@ const FailedOcrPage: React.FC = () => {
|
||||||
const [loading, setLoading] = useState(true);
|
const [loading, setLoading] = useState(true);
|
||||||
const [duplicatesLoading, setDuplicatesLoading] = useState(false);
|
const [duplicatesLoading, setDuplicatesLoading] = useState(false);
|
||||||
const [retrying, setRetrying] = useState<string | null>(null);
|
const [retrying, setRetrying] = useState<string | null>(null);
|
||||||
|
const [retryingAll, setRetryingAll] = useState(false);
|
||||||
const [statistics, setStatistics] = useState<FailedOcrResponse['statistics'] | null>(null);
|
const [statistics, setStatistics] = useState<FailedOcrResponse['statistics'] | null>(null);
|
||||||
const [duplicateStatistics, setDuplicateStatistics] = useState<DuplicatesResponse['statistics'] | null>(null);
|
const [duplicateStatistics, setDuplicateStatistics] = useState<DuplicatesResponse['statistics'] | null>(null);
|
||||||
const [pagination, setPagination] = useState({ page: 1, limit: 25 });
|
const [pagination, setPagination] = useState({ page: 1, limit: 25 });
|
||||||
|
|
@ -258,6 +259,39 @@ const FailedOcrPage: React.FC = () => {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
const handleRetryAllFailed = async () => {
|
||||||
|
try {
|
||||||
|
setRetryingAll(true);
|
||||||
|
const response = await queueService.requeueFailed();
|
||||||
|
|
||||||
|
if (response.data.requeued_count > 0) {
|
||||||
|
setSnackbar({
|
||||||
|
open: true,
|
||||||
|
message: `Successfully queued ${response.data.requeued_count} failed documents for OCR retry. Check the queue stats for progress.`,
|
||||||
|
severity: 'success'
|
||||||
|
});
|
||||||
|
|
||||||
|
// Refresh the list to update status
|
||||||
|
await fetchFailedDocuments();
|
||||||
|
} else {
|
||||||
|
setSnackbar({
|
||||||
|
open: true,
|
||||||
|
message: 'No failed documents found to retry',
|
||||||
|
severity: 'info'
|
||||||
|
});
|
||||||
|
}
|
||||||
|
} catch (error) {
|
||||||
|
console.error('Failed to retry all failed OCR:', error);
|
||||||
|
setSnackbar({
|
||||||
|
open: true,
|
||||||
|
message: 'Failed to retry all failed OCR documents',
|
||||||
|
severity: 'error'
|
||||||
|
});
|
||||||
|
} finally {
|
||||||
|
setRetryingAll(false);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
const formatFileSize = (bytes: number): string => {
|
const formatFileSize = (bytes: number): string => {
|
||||||
if (bytes === 0) return '0 B';
|
if (bytes === 0) return '0 B';
|
||||||
const k = 1024;
|
const k = 1024;
|
||||||
|
|
@ -444,7 +478,7 @@ const FailedOcrPage: React.FC = () => {
|
||||||
variant="outlined"
|
variant="outlined"
|
||||||
startIcon={<RefreshIcon />}
|
startIcon={<RefreshIcon />}
|
||||||
onClick={refreshCurrentTab}
|
onClick={refreshCurrentTab}
|
||||||
disabled={loading || duplicatesLoading}
|
disabled={loading || duplicatesLoading || retryingAll}
|
||||||
>
|
>
|
||||||
Refresh
|
Refresh
|
||||||
</Button>
|
</Button>
|
||||||
|
|
@ -491,6 +525,19 @@ const FailedOcrPage: React.FC = () => {
|
||||||
<Typography variant="h3" color="error.main">
|
<Typography variant="h3" color="error.main">
|
||||||
{statistics.total_failed}
|
{statistics.total_failed}
|
||||||
</Typography>
|
</Typography>
|
||||||
|
<Box sx={{ mt: 2 }}>
|
||||||
|
<Button
|
||||||
|
variant="contained"
|
||||||
|
color="warning"
|
||||||
|
startIcon={retryingAll ? <CircularProgress size={20} /> : <RefreshIcon />}
|
||||||
|
onClick={handleRetryAllFailed}
|
||||||
|
disabled={retryingAll || statistics.total_failed === 0}
|
||||||
|
size="small"
|
||||||
|
fullWidth
|
||||||
|
>
|
||||||
|
{retryingAll ? 'Retrying All...' : 'Retry All Failed OCR'}
|
||||||
|
</Button>
|
||||||
|
</Box>
|
||||||
</CardContent>
|
</CardContent>
|
||||||
</Card>
|
</Card>
|
||||||
</Grid>
|
</Grid>
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,36 @@
|
||||||
|
-- Populate OCR queue with documents that have pending OCR status
|
||||||
|
-- This migration addresses the issue where documents marked as pending
|
||||||
|
-- by the confidence backfill migration are not in the processing queue
|
||||||
|
|
||||||
|
-- Insert pending documents into OCR queue
|
||||||
|
INSERT INTO ocr_queue (document_id, priority, file_size, created_at)
|
||||||
|
SELECT
|
||||||
|
id,
|
||||||
|
-- Calculate priority based on file size (smaller files get higher priority)
|
||||||
|
CASE
|
||||||
|
WHEN file_size <= 1048576 THEN 10 -- <= 1MB: highest priority
|
||||||
|
WHEN file_size <= 5242880 THEN 8 -- 1-5MB: high priority
|
||||||
|
WHEN file_size <= 10485760 THEN 6 -- 5-10MB: medium priority
|
||||||
|
WHEN file_size <= 52428800 THEN 4 -- 10-50MB: low priority
|
||||||
|
ELSE 2 -- > 50MB: lowest priority
|
||||||
|
END as priority,
|
||||||
|
file_size,
|
||||||
|
NOW() as created_at
|
||||||
|
FROM documents
|
||||||
|
WHERE ocr_status = 'pending'
|
||||||
|
AND id NOT IN (SELECT document_id FROM ocr_queue) -- Avoid duplicates
|
||||||
|
AND file_path IS NOT NULL -- Only queue documents with files
|
||||||
|
AND (mime_type LIKE 'image/%' OR mime_type = 'application/pdf' OR mime_type = 'text/plain'); -- Only OCR-able types
|
||||||
|
|
||||||
|
-- Log the result
|
||||||
|
DO $$
|
||||||
|
DECLARE
|
||||||
|
queued_count INTEGER;
|
||||||
|
BEGIN
|
||||||
|
GET DIAGNOSTICS queued_count = ROW_COUNT;
|
||||||
|
RAISE NOTICE 'Added % pending documents to OCR queue for processing', queued_count;
|
||||||
|
END $$;
|
||||||
|
|
||||||
|
-- Create helpful index for monitoring
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_ocr_queue_document_status
|
||||||
|
ON ocr_queue(document_id, status, created_at);
|
||||||
|
|
@ -0,0 +1,118 @@
|
||||||
|
/*!
|
||||||
|
* CLI tool to enqueue documents with pending OCR status
|
||||||
|
*
|
||||||
|
* This addresses the issue where documents marked as pending by migrations
|
||||||
|
* are not automatically added to the OCR processing queue.
|
||||||
|
*
|
||||||
|
* Usage: cargo run --bin enqueue_pending_ocr
|
||||||
|
*/
|
||||||
|
|
||||||
|
use anyhow::Result;
|
||||||
|
use sqlx::Row;
|
||||||
|
use tracing::{info, warn, error};
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
use readur::{
|
||||||
|
config::Config,
|
||||||
|
db::Database,
|
||||||
|
ocr::queue::OcrQueueService,
|
||||||
|
};
|
||||||
|
|
||||||
|
#[tokio::main]
|
||||||
|
async fn main() -> Result<()> {
|
||||||
|
// Initialize logging
|
||||||
|
tracing_subscriber::fmt::init();
|
||||||
|
|
||||||
|
info!("🔍 Scanning for documents with pending OCR status...");
|
||||||
|
|
||||||
|
// Load configuration
|
||||||
|
let config = Config::from_env()?;
|
||||||
|
|
||||||
|
// Connect to database
|
||||||
|
let db = Database::new(&config.database_url).await?;
|
||||||
|
let queue_service = OcrQueueService::new(db.clone(), db.get_pool().clone(), 1);
|
||||||
|
|
||||||
|
// Find documents with pending OCR status that aren't in the queue
|
||||||
|
let pending_documents = sqlx::query(
|
||||||
|
r#"
|
||||||
|
SELECT d.id, d.filename, d.file_size, d.mime_type, d.created_at
|
||||||
|
FROM documents d
|
||||||
|
LEFT JOIN ocr_queue oq ON d.id = oq.document_id
|
||||||
|
WHERE d.ocr_status = 'pending'
|
||||||
|
AND oq.document_id IS NULL
|
||||||
|
AND d.file_path IS NOT NULL
|
||||||
|
AND (d.mime_type LIKE 'image/%' OR d.mime_type = 'application/pdf' OR d.mime_type = 'text/plain')
|
||||||
|
ORDER BY d.created_at ASC
|
||||||
|
"#
|
||||||
|
)
|
||||||
|
.fetch_all(db.get_pool())
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
if pending_documents.is_empty() {
|
||||||
|
info!("✅ No pending documents found that need to be queued");
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
info!("📋 Found {} documents with pending OCR status", pending_documents.len());
|
||||||
|
|
||||||
|
// Prepare batch insert data
|
||||||
|
let mut documents_to_queue = Vec::new();
|
||||||
|
|
||||||
|
for row in &pending_documents {
|
||||||
|
let document_id: Uuid = row.get("id");
|
||||||
|
let filename: String = row.get("filename");
|
||||||
|
let file_size: i64 = row.get("file_size");
|
||||||
|
let mime_type: String = row.get("mime_type");
|
||||||
|
|
||||||
|
// Calculate priority based on file size
|
||||||
|
let priority = match file_size {
|
||||||
|
0..=1048576 => 10, // <= 1MB: highest priority
|
||||||
|
..=5242880 => 8, // 1-5MB: high priority
|
||||||
|
..=10485760 => 6, // 5-10MB: medium priority
|
||||||
|
..=52428800 => 4, // 10-50MB: low priority
|
||||||
|
_ => 2, // > 50MB: lowest priority
|
||||||
|
};
|
||||||
|
|
||||||
|
let size_mb = file_size as f64 / (1024.0 * 1024.0);
|
||||||
|
info!(" 📄 {} ({}) - {:.2} MB - Priority {}",
|
||||||
|
filename, mime_type, size_mb, priority);
|
||||||
|
|
||||||
|
documents_to_queue.push((document_id, priority, file_size));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Batch enqueue documents
|
||||||
|
info!("🚀 Enqueuing {} documents for OCR processing...", documents_to_queue.len());
|
||||||
|
|
||||||
|
match queue_service.enqueue_documents_batch(documents_to_queue).await {
|
||||||
|
Ok(queue_ids) => {
|
||||||
|
info!("✅ Successfully queued {} documents for OCR processing", queue_ids.len());
|
||||||
|
info!("🔄 OCR worker should start processing these documents automatically");
|
||||||
|
|
||||||
|
// Show queue statistics
|
||||||
|
match queue_service.get_stats().await {
|
||||||
|
Ok(stats) => {
|
||||||
|
info!("📊 Queue Statistics:");
|
||||||
|
info!(" • Pending: {}", stats.pending_count);
|
||||||
|
info!(" • Processing: {}", stats.processing_count);
|
||||||
|
info!(" • Failed: {}", stats.failed_count);
|
||||||
|
info!(" • Completed today: {}", stats.completed_today);
|
||||||
|
if let Some(wait_time) = stats.avg_wait_time_minutes {
|
||||||
|
info!(" • Average wait time: {:.1} minutes", wait_time);
|
||||||
|
}
|
||||||
|
if let Some(oldest) = stats.oldest_pending_minutes {
|
||||||
|
info!(" • Oldest pending: {:.1} minutes", oldest);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
warn!("Failed to get queue statistics: {}", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
error!("❌ Failed to enqueue documents: {}", e);
|
||||||
|
return Err(e.into());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
@ -21,6 +21,7 @@ pub fn router() -> Router<Arc<AppState>> {
|
||||||
Router::new()
|
Router::new()
|
||||||
.route("/stats", get(get_queue_stats))
|
.route("/stats", get(get_queue_stats))
|
||||||
.route("/requeue-failed", post(requeue_failed))
|
.route("/requeue-failed", post(requeue_failed))
|
||||||
|
.route("/enqueue-pending", post(enqueue_pending_documents))
|
||||||
.route("/pause", post(pause_ocr_processing))
|
.route("/pause", post(pause_ocr_processing))
|
||||||
.route("/resume", post(resume_ocr_processing))
|
.route("/resume", post(resume_ocr_processing))
|
||||||
.route("/status", get(get_ocr_status))
|
.route("/status", get(get_ocr_status))
|
||||||
|
|
@ -172,4 +173,81 @@ async fn get_ocr_status(
|
||||||
"is_paused": is_paused,
|
"is_paused": is_paused,
|
||||||
"status": if is_paused { "paused" } else { "running" }
|
"status": if is_paused { "paused" } else { "running" }
|
||||||
})))
|
})))
|
||||||
|
}
|
||||||
|
|
||||||
|
#[utoipa::path(
|
||||||
|
post,
|
||||||
|
path = "/api/queue/enqueue-pending",
|
||||||
|
tag = "queue",
|
||||||
|
security(
|
||||||
|
("bearer_auth" = [])
|
||||||
|
),
|
||||||
|
responses(
|
||||||
|
(status = 200, description = "Pending documents queued successfully"),
|
||||||
|
(status = 401, description = "Unauthorized"),
|
||||||
|
(status = 403, description = "Forbidden - Admin access required"),
|
||||||
|
(status = 500, description = "Internal server error")
|
||||||
|
)
|
||||||
|
)]
|
||||||
|
async fn enqueue_pending_documents(
|
||||||
|
State(state): State<Arc<AppState>>,
|
||||||
|
auth_user: AuthUser,
|
||||||
|
) -> Result<Json<serde_json::Value>, StatusCode> {
|
||||||
|
require_admin(&auth_user)?;
|
||||||
|
|
||||||
|
// Find all documents with pending OCR status that aren't already in the queue
|
||||||
|
let pending_documents = sqlx::query(
|
||||||
|
r#"
|
||||||
|
SELECT d.id, d.file_size
|
||||||
|
FROM documents d
|
||||||
|
LEFT JOIN ocr_queue oq ON d.id = oq.document_id
|
||||||
|
WHERE d.ocr_status = 'pending'
|
||||||
|
AND oq.document_id IS NULL
|
||||||
|
AND d.file_path IS NOT NULL
|
||||||
|
AND (d.mime_type LIKE 'image/%' OR d.mime_type = 'application/pdf' OR d.mime_type = 'text/plain')
|
||||||
|
ORDER BY d.created_at ASC
|
||||||
|
"#
|
||||||
|
)
|
||||||
|
.fetch_all(state.db.get_pool())
|
||||||
|
.await
|
||||||
|
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
|
||||||
|
|
||||||
|
if pending_documents.is_empty() {
|
||||||
|
return Ok(Json(serde_json::json!({
|
||||||
|
"queued_count": 0,
|
||||||
|
"message": "No pending documents found to queue"
|
||||||
|
})));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Prepare batch insert data
|
||||||
|
let documents_to_queue: Vec<(uuid::Uuid, i32, i64)> = pending_documents
|
||||||
|
.into_iter()
|
||||||
|
.map(|row| {
|
||||||
|
let document_id: uuid::Uuid = row.get("id");
|
||||||
|
let file_size: i64 = row.get("file_size");
|
||||||
|
|
||||||
|
// Calculate priority based on file size
|
||||||
|
let priority = match file_size {
|
||||||
|
0..=1048576 => 10, // <= 1MB: highest priority
|
||||||
|
..=5242880 => 8, // 1-5MB: high priority
|
||||||
|
..=10485760 => 6, // 5-10MB: medium priority
|
||||||
|
..=52428800 => 4, // 10-50MB: low priority
|
||||||
|
_ => 2, // > 50MB: lowest priority
|
||||||
|
};
|
||||||
|
|
||||||
|
(document_id, priority, file_size)
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
// Batch enqueue documents
|
||||||
|
let queue_ids = state.queue_service
|
||||||
|
.enqueue_documents_batch(documents_to_queue)
|
||||||
|
.await
|
||||||
|
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
|
||||||
|
|
||||||
|
Ok(Json(serde_json::json!({
|
||||||
|
"queued_count": queue_ids.len(),
|
||||||
|
"message": format!("Successfully queued {} pending documents for OCR processing", queue_ids.len()),
|
||||||
|
"queue_ids": queue_ids
|
||||||
|
})))
|
||||||
}
|
}
|
||||||
Loading…
Reference in New Issue