diff --git a/src/routes/documents.rs b/src/routes/documents.rs deleted file mode 100644 index 726e987..0000000 --- a/src/routes/documents.rs +++ /dev/null @@ -1,2200 +0,0 @@ -use axum::{ - extract::{Multipart, Path, Query, State}, - http::{StatusCode, header::CONTENT_TYPE}, - response::{Json, Response}, - routing::{get, post, delete}, - Router, -}; -use serde::{Deserialize, Serialize}; -use std::sync::Arc; -use utoipa::ToSchema; -use sqlx::Row; - -use crate::{ - auth::AuthUser, - ingestion::document_ingestion::{DocumentIngestionService, IngestionResult}, - services::file_service::FileService, - models::DocumentResponse, - AppState, -}; -use tracing; - -#[derive(Deserialize, ToSchema)] -struct PaginationQuery { - limit: Option, - offset: Option, - ocr_status: Option, -} - -#[derive(Deserialize, ToSchema)] -struct FailedDocumentsQuery { - limit: Option, - offset: Option, - stage: Option, // 'ocr', 'ingestion', 'validation', etc. - reason: Option, // 'duplicate_content', 'low_ocr_confidence', etc. -} - -#[derive(Deserialize, Serialize, ToSchema)] -pub struct BulkDeleteRequest { - pub document_ids: Vec, -} - -#[derive(Deserialize, Serialize, ToSchema)] -pub struct DeleteLowConfidenceRequest { - pub max_confidence: f32, - pub preview_only: Option, -} - -pub fn router() -> Router> { - Router::new() - .route("/", post(upload_document)) - .route("/", get(list_documents)) - .route("/", delete(bulk_delete_documents)) - .route("/{id}", get(get_document_by_id)) - .route("/{id}", delete(delete_document)) - .route("/{id}/download", get(download_document)) - .route("/{id}/view", get(view_document)) - .route("/{id}/thumbnail", get(get_document_thumbnail)) - .route("/{id}/ocr", get(get_document_ocr)) - .route("/{id}/processed-image", get(get_processed_image)) - .route("/{id}/retry-ocr", post(retry_ocr)) - .route("/{id}/debug", get(get_document_debug_info)) - .route("/duplicates", get(get_user_duplicates)) - .route("/failed", get(get_failed_documents)) - .route("/failed/{id}/view", get(view_failed_document)) - .route("/delete-low-confidence", post(delete_low_confidence_documents)) - .route("/delete-failed-ocr", post(delete_failed_ocr_documents)) - .route("/ocr/bulk-retry", post(crate::routes::documents_ocr_retry::bulk_retry_ocr)) - .route("/ocr/retry-stats", get(crate::routes::documents_ocr_retry::get_ocr_retry_stats)) - .route("/ocr/retry-recommendations", get(crate::routes::documents_ocr_retry::get_retry_recommendations)) - .route("/{id}/ocr/retry-history", get(crate::routes::documents_ocr_retry::get_document_retry_history)) -} - -#[utoipa::path( - get, - path = "/api/documents/{id}", - tag = "documents", - security( - ("bearer_auth" = []) - ), - params( - ("id" = uuid::Uuid, Path, description = "Document ID") - ), - responses( - (status = 200, description = "Document details", body = DocumentResponse), - (status = 404, description = "Document not found"), - (status = 401, description = "Unauthorized") - ) -)] -async fn get_document_by_id( - State(state): State>, - auth_user: AuthUser, - Path(document_id): Path, -) -> Result, StatusCode> { - // Get specific document with proper role-based access - let document = state - .db - .get_document_by_id(document_id, auth_user.user.id, auth_user.user.role) - .await - .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)? - .ok_or(StatusCode::NOT_FOUND)?; - - // Get labels for this document - let labels = state - .db - .get_document_labels(document_id) - .await - .unwrap_or_else(|_| Vec::new()); - - // Convert to DocumentResponse - let response = DocumentResponse { - id: document.id, - filename: document.filename, - original_filename: document.original_filename, - file_size: document.file_size, - mime_type: document.mime_type, - created_at: document.created_at, - has_ocr_text: document.ocr_text.is_some(), - tags: document.tags, - labels, - ocr_confidence: document.ocr_confidence, - ocr_word_count: document.ocr_word_count, - ocr_processing_time_ms: document.ocr_processing_time_ms, - ocr_status: document.ocr_status, - original_created_at: document.original_created_at, - original_modified_at: document.original_modified_at, - source_metadata: document.source_metadata, - }; - - Ok(Json(response)) -} - -#[utoipa::path( - post, - path = "/api/documents", - tag = "documents", - security( - ("bearer_auth" = []) - ), - request_body(content = String, description = "Multipart form data with file. Supported formats: PDF, PNG, JPG, JPEG, TIFF, BMP, TXT. OCR will be automatically performed on image and PDF files.", content_type = "multipart/form-data"), - responses( - (status = 200, description = "Document uploaded successfully. OCR processing will begin automatically if enabled in user settings.", body = DocumentResponse), - (status = 400, description = "Bad request - invalid file type or malformed data"), - (status = 413, description = "Payload too large - file exceeds size limit"), - (status = 401, description = "Unauthorized - valid authentication required") - ) -)] -async fn upload_document( - State(state): State>, - auth_user: AuthUser, - mut multipart: Multipart, -) -> Result, StatusCode> { - let file_service = FileService::new(state.config.upload_path.clone()); - let ingestion_service = DocumentIngestionService::new(state.db.clone(), file_service.clone()); - - // Get user settings for file upload restrictions - let settings = state - .db - .get_user_settings(auth_user.user.id) - .await - .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)? - .unwrap_or_else(|| crate::models::Settings::default()); - - let mut label_ids: Option> = None; - - // First pass: collect all multipart fields - while let Some(field) = multipart.next_field().await.map_err(|_| StatusCode::BAD_REQUEST)? { - let name = field.name().unwrap_or("").to_string(); - - tracing::info!("Processing multipart field: {}", name); - - if name == "label_ids" { - let label_ids_text = field.text().await.map_err(|_| StatusCode::BAD_REQUEST)?; - tracing::info!("Received label_ids field: {}", label_ids_text); - - match serde_json::from_str::>(&label_ids_text) { - Ok(ids) => { - tracing::info!("Successfully parsed {} label IDs: {:?}", ids.len(), ids); - label_ids = Some(ids); - }, - Err(e) => { - tracing::warn!("Failed to parse label_ids from upload: {} - Error: {}", label_ids_text, e); - } - } - } else if name == "file" { - let filename = field - .file_name() - .ok_or(StatusCode::BAD_REQUEST)? - .to_string(); - - let data = field.bytes().await.map_err(|_| StatusCode::BAD_REQUEST)?; - let data_len = data.len(); - let file_size = data.len() as i64; - tracing::info!("Received file: {}, size: {} bytes", filename, data_len); - - // Check file size limit - let max_size_bytes = (settings.max_file_size_mb as i64) * 1024 * 1024; - if file_size > max_size_bytes { - return Err(StatusCode::PAYLOAD_TOO_LARGE); - } - - let mime_type = mime_guess::from_path(&filename) - .first_or_octet_stream() - .to_string(); - - // Use the unified ingestion service with AllowDuplicateContent policy - // This will create separate documents for different filenames even with same content - let result = ingestion_service - .ingest_upload(&filename, data.to_vec(), &mime_type, auth_user.user.id) - .await - .map_err(|e| { - tracing::error!("Document ingestion failed for user {} filename {}: {}", - auth_user.user.id, filename, e); - StatusCode::INTERNAL_SERVER_ERROR - })?; - - let (saved_document, should_queue_ocr) = match result { - IngestionResult::Created(doc) => (doc, true), // New document - queue for OCR - IngestionResult::ExistingDocument(doc) => (doc, false), // Existing document - don't re-queue OCR - _ => return Err(StatusCode::INTERNAL_SERVER_ERROR), - }; - - let document_id = saved_document.id; - let enable_background_ocr = settings.enable_background_ocr; - - if enable_background_ocr && should_queue_ocr { - // Use the shared queue service from AppState instead of creating a new one - // Calculate priority based on file size - let priority = match saved_document.file_size { - 0..=1048576 => 10, // <= 1MB: highest priority - ..=5242880 => 8, // 1-5MB: high priority - ..=10485760 => 6, // 5-10MB: medium priority - ..=52428800 => 4, // 10-50MB: low priority - _ => 2, // > 50MB: lowest priority - }; - - state.queue_service.enqueue_document(document_id, priority, saved_document.file_size).await - .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; - } - - return Ok(Json(saved_document.into())); - } - } - - // This should not be reached as file processing is handled above - // If we get here, no file was provided - - Err(StatusCode::BAD_REQUEST) -} - - -#[utoipa::path( - get, - path = "/api/documents", - tag = "documents", - security( - ("bearer_auth" = []) - ), - params( - ("limit" = Option, Query, description = "Number of documents to return (default: 50)"), - ("offset" = Option, Query, description = "Number of documents to skip (default: 0)"), - ("ocr_status" = Option, Query, description = "Filter by OCR status (pending, processing, completed, failed)") - ), - responses( - (status = 200, description = "Paginated list of user documents with metadata", body = String), - (status = 401, description = "Unauthorized") - ) -)] -async fn list_documents( - State(state): State>, - auth_user: AuthUser, - Query(pagination): Query, -) -> Result, StatusCode> { - let limit = pagination.limit.unwrap_or(50); - let offset = pagination.offset.unwrap_or(0); - - let user_id = auth_user.user.id; - let user_role = auth_user.user.role; - let ocr_filter = pagination.ocr_status.as_deref(); - - let (documents, total_count) = tokio::try_join!( - state.db.get_documents_by_user_with_role_and_filter( - user_id, - user_role.clone(), - limit, - offset, - ocr_filter - ), - state.db.get_documents_count_with_role_and_filter( - user_id, - user_role, - ocr_filter - ) - ).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; - - // Get labels for all documents efficiently - let document_ids: Vec = documents.iter().map(|doc| doc.id).collect(); - let labels_map = state - .db - .get_labels_for_documents(&document_ids) - .await - .unwrap_or_else(|_| std::collections::HashMap::new()); - - let documents_response: Vec = documents.into_iter().map(|doc| { - let mut response: DocumentResponse = doc.into(); - response.labels = labels_map.get(&response.id).cloned().unwrap_or_else(Vec::new); - response - }).collect(); - - let response = serde_json::json!({ - "documents": documents_response, - "pagination": { - "total": total_count, - "limit": limit, - "offset": offset, - "has_more": offset + limit < total_count - } - }); - - Ok(Json(response)) -} - -#[utoipa::path( - get, - path = "/api/documents/{id}/download", - tag = "documents", - security( - ("bearer_auth" = []) - ), - params( - ("id" = uuid::Uuid, Path, description = "Document ID") - ), - responses( - (status = 200, description = "Document file content", content_type = "application/octet-stream"), - (status = 404, description = "Document not found"), - (status = 401, description = "Unauthorized") - ) -)] -async fn download_document( - State(state): State>, - auth_user: AuthUser, - Path(document_id): Path, -) -> Result, StatusCode> { - let document = state - .db - .get_document_by_id(document_id, auth_user.user.id, auth_user.user.role) - .await - .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)? - .ok_or(StatusCode::NOT_FOUND)?; - - let file_service = FileService::new(state.config.upload_path.clone()); - let file_data = file_service - .read_file(&document.file_path) - .await - .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; - - Ok(file_data) -} - -#[utoipa::path( - get, - path = "/api/documents/{id}/view", - tag = "documents", - security( - ("bearer_auth" = []) - ), - params( - ("id" = uuid::Uuid, Path, description = "Document ID") - ), - responses( - (status = 200, description = "Document content for viewing in browser"), - (status = 404, description = "Document not found"), - (status = 401, description = "Unauthorized") - ) -)] -async fn view_document( - State(state): State>, - auth_user: AuthUser, - Path(document_id): Path, -) -> Result { - let document = state - .db - .get_document_by_id(document_id, auth_user.user.id, auth_user.user.role) - .await - .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)? - .ok_or(StatusCode::NOT_FOUND)?; - - let file_service = FileService::new(state.config.upload_path.clone()); - let file_data = file_service - .read_file(&document.file_path) - .await - .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; - - // Determine content type from file extension - let content_type = mime_guess::from_path(&document.filename) - .first_or_octet_stream() - .to_string(); - - let response = Response::builder() - .header(CONTENT_TYPE, content_type) - .header("Content-Length", file_data.len()) - .body(file_data.into()) - .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; - - Ok(response) -} - -#[utoipa::path( - get, - path = "/api/documents/{id}/thumbnail", - tag = "documents", - security( - ("bearer_auth" = []) - ), - params( - ("id" = uuid::Uuid, Path, description = "Document ID") - ), - responses( - (status = 200, description = "Document thumbnail image", content_type = "image/jpeg"), - (status = 404, description = "Document not found or thumbnail not available"), - (status = 401, description = "Unauthorized") - ) -)] -async fn get_document_thumbnail( - State(state): State>, - auth_user: AuthUser, - Path(document_id): Path, -) -> Result { - let document = state - .db - .get_document_by_id(document_id, auth_user.user.id, auth_user.user.role) - .await - .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)? - .ok_or(StatusCode::NOT_FOUND)?; - - let file_service = FileService::new(state.config.upload_path.clone()); - - // Try to generate or get cached thumbnail - match file_service.get_or_generate_thumbnail(&document.file_path, &document.filename).await { - Ok(thumbnail_data) => { - Ok(Response::builder() - .header(CONTENT_TYPE, "image/jpeg") - .header("Content-Length", thumbnail_data.len()) - .header("Cache-Control", "public, max-age=3600") // Cache for 1 hour - .body(thumbnail_data.into()) - .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?) - } - Err(e) => { - // Log the error for debugging - tracing::error!("Failed to generate thumbnail for document {}: {}", document_id, e); - Err(StatusCode::NOT_FOUND) - } - } -} - -#[utoipa::path( - get, - path = "/api/documents/{id}/ocr", - tag = "documents", - security( - ("bearer_auth" = []) - ), - params( - ("id" = uuid::Uuid, Path, description = "Document ID") - ), - responses( - (status = 200, description = "OCR extracted text and metadata", body = String), - (status = 404, description = "Document not found"), - (status = 401, description = "Unauthorized"), - (status = 500, description = "Internal server error") - ) -)] -async fn get_document_ocr( - State(state): State>, - auth_user: AuthUser, - Path(document_id): Path, -) -> Result, StatusCode> { - let document = state - .db - .get_document_by_id(document_id, auth_user.user.id, auth_user.user.role) - .await - .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)? - .ok_or(StatusCode::NOT_FOUND)?; - - // Return OCR text and metadata - Ok(Json(serde_json::json!({ - "document_id": document.id, - "filename": document.filename, - "has_ocr_text": document.ocr_text.is_some(), - "ocr_text": document.ocr_text, - "ocr_confidence": document.ocr_confidence, - "ocr_word_count": document.ocr_word_count, - "ocr_processing_time_ms": document.ocr_processing_time_ms, - "ocr_status": document.ocr_status, - "ocr_error": document.ocr_error, - "ocr_completed_at": document.ocr_completed_at - }))) -} - -#[utoipa::path( - get, - path = "/api/documents/{id}/processed-image", - tag = "documents", - security( - ("bearer_auth" = []) - ), - params( - ("id" = uuid::Uuid, Path, description = "Document ID") - ), - responses( - (status = 200, description = "Processed image file", content_type = "image/png"), - (status = 404, description = "Document or processed image not found"), - (status = 401, description = "Unauthorized") - ) -)] -async fn get_processed_image( - State(state): State>, - auth_user: AuthUser, - Path(document_id): Path, -) -> Result { - // Check if document exists and belongs to user - let _document = state - .db - .get_document_by_id(document_id, auth_user.user.id, auth_user.user.role) - .await - .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)? - .ok_or(StatusCode::NOT_FOUND)?; - - // Get processed image record - let processed_image = state - .db - .get_processed_image_by_document_id(document_id, auth_user.user.id) - .await - .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)? - .ok_or(StatusCode::NOT_FOUND)?; - - // Read processed image file - let image_data = tokio::fs::read(&processed_image.processed_image_path) - .await - .map_err(|_| StatusCode::NOT_FOUND)?; - - // Return image as PNG - let response = Response::builder() - .status(StatusCode::OK) - .header(CONTENT_TYPE, "image/png") - .header("Cache-Control", "public, max-age=86400") // Cache for 1 day - .body(image_data.into()) - .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; - - Ok(response) -} - -#[utoipa::path( - post, - path = "/api/documents/{id}/retry-ocr", - tag = "documents", - security( - ("bearer_auth" = []) - ), - params( - ("id" = uuid::Uuid, Path, description = "Document ID") - ), - responses( - (status = 200, description = "OCR retry queued successfully", body = String), - (status = 404, description = "Document not found"), - (status = 400, description = "Document is not eligible for OCR retry"), - (status = 401, description = "Unauthorized") - ) -)] -async fn retry_ocr( - State(state): State>, - auth_user: AuthUser, - Path(document_id): Path, -) -> Result, StatusCode> { - crate::debug_log!("OCR_RETRY", - "document_id" => document_id, - "user_id" => auth_user.user.id, - "message" => "Starting OCR retry request" - ); - - // Check if document exists and belongs to user - let document = state - .db - .get_document_by_id(document_id, auth_user.user.id, auth_user.user.role) - .await - .map_err(|e| { - crate::debug_error!("OCR_RETRY", format!("Failed to get document {}: {}", document_id, e)); - StatusCode::INTERNAL_SERVER_ERROR - })? - .ok_or_else(|| { - crate::debug_log!("OCR_RETRY", &format!("Document {} not found or access denied for user {}", document_id, auth_user.user.id)); - StatusCode::NOT_FOUND - })?; - - // Check if document is eligible for OCR retry (all documents are now retryable) - let current_status = document.ocr_status.as_deref().unwrap_or("unknown"); - let eligible = true; // All documents are retryable - - crate::debug_log!("OCR_RETRY", - "document_id" => document_id, - "filename" => &document.filename, - "current_status" => current_status, - "eligible" => eligible, - "file_size" => document.file_size, - "retry_count" => document.ocr_retry_count.unwrap_or(0), - "message" => "Checking document eligibility" - ); - - if !eligible { - crate::debug_log!("OCR_RETRY", &format!("Document {} is not eligible for retry - current status: {}", document_id, current_status)); - return Ok(Json(serde_json::json!({ - "success": false, - "message": format!("Document is not eligible for OCR retry. Current status: {}", current_status), - "current_status": document.ocr_status - }))); - } - - // Reset document OCR fields - crate::debug_log!("OCR_RETRY", - "document_id" => document_id, - "message" => "Resetting document OCR fields" - ); - - let reset_result = sqlx::query( - r#" - UPDATE documents - SET ocr_status = 'pending', - ocr_text = NULL, - ocr_error = NULL, - ocr_failure_reason = NULL, - ocr_confidence = NULL, - ocr_word_count = NULL, - ocr_processing_time_ms = NULL, - ocr_completed_at = NULL, - updated_at = NOW() - WHERE id = $1 - "# - ) - .bind(document_id) - .execute(state.db.get_pool()) - .await - .map_err(|e| { - crate::debug_error!("OCR_RETRY", format!("Failed to reset OCR fields for document {}: {}", document_id, e)); - StatusCode::INTERNAL_SERVER_ERROR - })?; - - if reset_result.rows_affected() == 0 { - crate::debug_error!("OCR_RETRY", format!("No rows affected when resetting OCR fields for document {}", document_id)); - return Err(StatusCode::NOT_FOUND); - } - - crate::debug_log!("OCR_RETRY", - "document_id" => document_id, - "rows_affected" => reset_result.rows_affected(), - "message" => "Successfully reset OCR fields" - ); - - // Calculate priority based on file size (higher priority for retries) - let priority = match document.file_size { - 0..=1048576 => 15, // <= 1MB: highest priority (boosted for retry) - ..=5242880 => 12, // 1-5MB: high priority - ..=10485760 => 10, // 5-10MB: medium priority - ..=52428800 => 8, // 10-50MB: low priority - _ => 6, // > 50MB: lowest priority - }; - - crate::debug_log!("OCR_RETRY", - "document_id" => document_id, - "file_size" => document.file_size, - "priority" => priority, - "message" => "Calculated retry priority" - ); - - // Add to OCR queue with detailed logging - crate::debug_log!("OCR_RETRY", - "document_id" => document_id, - "priority" => priority, - "file_size" => document.file_size, - "message" => "Enqueueing document for OCR processing" - ); - - match state.queue_service.enqueue_document(document_id, priority, document.file_size).await { - Ok(queue_id) => { - crate::debug_log!("OCR_RETRY", - "document_id" => document_id, - "queue_id" => queue_id, - "priority" => priority, - "message" => "Successfully enqueued document" - ); - - // Record retry history - crate::debug_log!("OCR_RETRY", - "document_id" => document_id, - "user_id" => auth_user.user.id, - "queue_id" => queue_id, - "message" => "Recording retry history" - ); - - if let Err(e) = crate::db::ocr_retry::record_ocr_retry( - state.db.get_pool(), - document_id, - auth_user.user.id, - "manual_retry", - priority, - Some(queue_id), - ).await { - crate::debug_error!("OCR_RETRY", format!("Failed to record retry history for document {}: {}", document_id, e)); - tracing::warn!("Failed to record retry history for document {}: {}", document_id, e); - } else { - crate::debug_log!("OCR_RETRY", - "document_id" => document_id, - "queue_id" => queue_id, - "message" => "Successfully recorded retry history" - ); - } - - crate::debug_log!("OCR_RETRY", - "document_id" => document_id, - "filename" => &document.filename, - "queue_id" => queue_id, - "priority" => priority, - "file_size" => document.file_size, - "message" => "OCR retry process completed successfully" - ); - - tracing::info!( - "OCR retry queued for document {} ({}): queue_id={}, priority={}, size={}", - document_id, document.filename, queue_id, priority, document.file_size - ); - - Ok(Json(serde_json::json!({ - "success": true, - "message": "OCR retry queued successfully", - "queue_id": queue_id, - "document_id": document_id, - "priority": priority, - "estimated_wait_minutes": calculate_estimated_wait_time(priority).await - }))) - } - Err(e) => { - crate::debug_error!("OCR_RETRY", format!("Failed to enqueue document {}: {}", document_id, e)); - tracing::error!("Failed to queue OCR retry for document {}: {}", document_id, e); - Err(StatusCode::INTERNAL_SERVER_ERROR) - } - } -} - -#[utoipa::path( - get, - path = "/api/documents/{id}/debug", - tag = "documents", - security( - ("bearer_auth" = []) - ), - params( - ("id" = uuid::Uuid, Path, description = "Document ID") - ), - responses( - (status = 200, description = "Debug information for document processing pipeline", body = String), - (status = 404, description = "Document not found"), - (status = 401, description = "Unauthorized") - ) -)] -async fn get_document_debug_info( - State(state): State>, - auth_user: AuthUser, - Path(document_id): Path, -) -> Result, StatusCode> { - tracing::info!("Starting debug analysis for document {} by user {}", document_id, auth_user.user.id); - - // Get the document - let document = match state - .db - .get_document_by_id(document_id, auth_user.user.id, auth_user.user.role) - .await - { - Ok(Some(doc)) => { - tracing::info!("Found document: {} ({})", doc.filename, doc.mime_type); - doc - } - Ok(None) => { - tracing::warn!("Document {} not found for user {}", document_id, auth_user.user.id); - return Err(StatusCode::NOT_FOUND); - } - Err(e) => { - tracing::error!("Database error fetching document {}: {}", document_id, e); - return Err(StatusCode::INTERNAL_SERVER_ERROR); - } - }; - - // Get user settings - tracing::info!("Fetching user settings for user {}", auth_user.user.id); - let settings = match state - .db - .get_user_settings(auth_user.user.id) - .await - { - Ok(Some(s)) => { - tracing::info!("Found user settings: OCR enabled={}, min_confidence={}", s.enable_background_ocr, s.ocr_min_confidence); - s - } - Ok(None) => { - tracing::info!("No user settings found, using defaults"); - crate::models::Settings::default() - } - Err(e) => { - tracing::error!("Error fetching user settings: {}", e); - return Err(StatusCode::INTERNAL_SERVER_ERROR); - } - }; - - // Get OCR queue history for this document - tracing::info!("Fetching OCR queue history for document {}", document_id); - let queue_history = match sqlx::query( - r#" - SELECT id, status, priority, created_at, started_at, completed_at, - error_message, attempts, worker_id - FROM ocr_queue - WHERE document_id = $1 - ORDER BY created_at DESC - LIMIT 10 - "# - ) - .bind(document_id) - .fetch_all(state.db.get_pool()) - .await { - Ok(history) => { - tracing::info!("Queue history query successful, found {} entries", history.len()); - history - }, - Err(e) => { - tracing::error!("Queue history query error: {}", e); - return Err(StatusCode::INTERNAL_SERVER_ERROR); - } - }; - - // Get processed image info if it exists - tracing::info!("Fetching processed image for document {}", document_id); - let processed_image = match state - .db - .get_processed_image_by_document_id(document_id, auth_user.user.id) - .await { - Ok(Some(img)) => { - tracing::info!("Found processed image for document {}", document_id); - Some(img) - }, - Ok(None) => { - tracing::info!("No processed image found for document {}", document_id); - None - }, - Err(e) => { - tracing::warn!("Error fetching processed image for document {}: {}", document_id, e); - None - } - }; - - // Get failed document record if it exists - tracing::info!("Fetching failed document record for document {}", document_id); - let failed_document = match sqlx::query( - r#" - SELECT failure_reason, failure_stage, error_message, retry_count, - last_retry_at, created_at, content, ocr_text, ocr_confidence, - ocr_word_count, ocr_processing_time_ms - FROM failed_documents - WHERE id = $1 OR existing_document_id = $1 - ORDER BY created_at DESC - LIMIT 1 - "# - ) - .bind(document_id) - .fetch_optional(state.db.get_pool()) - .await { - Ok(result) => { - tracing::info!("Failed document query successful, found: {}", result.is_some()); - result - }, - Err(e) => { - tracing::error!("Failed document query error: {}", e); - return Err(StatusCode::INTERNAL_SERVER_ERROR); - } - }; - - // Get detailed OCR processing logs and attempts - tracing::info!("Fetching detailed OCR processing logs for document {}", document_id); - let ocr_processing_logs = match sqlx::query( - r#" - SELECT id, status, priority, created_at, started_at, completed_at, - error_message, attempts, worker_id, processing_time_ms, file_size - FROM ocr_queue - WHERE document_id = $1 - ORDER BY created_at ASC - "# - ) - .bind(document_id) - .fetch_all(state.db.get_pool()) - .await { - Ok(logs) => { - tracing::info!("OCR processing logs query successful, found {} entries", logs.len()); - logs - }, - Err(e) => { - tracing::error!("OCR processing logs query error: {}", e); - return Err(StatusCode::INTERNAL_SERVER_ERROR); - } - }; - - // File service for file info - let file_service = FileService::new(state.config.upload_path.clone()); - - // Check if file exists - let file_exists = tokio::fs::metadata(&document.file_path).await.is_ok(); - let file_metadata = if file_exists { - tokio::fs::metadata(&document.file_path).await.ok() - } else { - None - }; - - // Try to analyze file content for additional diagnostic info - tracing::info!("Analyzing file content for document {} (exists: {})", document_id, file_exists); - let file_analysis = if file_exists { - match analyze_file_content(&document.file_path, &document.mime_type).await { - Ok(analysis) => { - tracing::info!("File analysis successful for document {}", document_id); - analysis - }, - Err(e) => { - tracing::warn!("Failed to analyze file content for {}: {}", document_id, e); - FileAnalysis { - error_details: Some(format!("File analysis failed: {}", e)), - ..Default::default() - } - } - } - } else { - tracing::warn!("File does not exist for document {}, skipping analysis", document_id); - FileAnalysis::default() - }; - - // Pipeline steps analysis - let mut pipeline_steps = Vec::new(); - - // Step 1: File Upload & Ingestion - pipeline_steps.push(serde_json::json!({ - "step": 1, - "name": "File Upload & Ingestion", - "status": "completed", // Document exists if we got this far - "details": { - "filename": document.filename, - "original_filename": document.original_filename, - "file_size": document.file_size, - "mime_type": document.mime_type, - "file_exists": file_exists, - "file_path": document.file_path, - "created_at": document.created_at, - "file_metadata": file_metadata.as_ref().map(|m| serde_json::json!({ - "size": m.len(), - "modified": m.modified().ok(), - "is_file": m.is_file(), - "is_dir": m.is_dir() - })), - "file_analysis": file_analysis - }, - "success": true, - "error": None:: - })); - - // Step 2: OCR Queue Enrollment - let queue_enrollment_status = if queue_history.is_empty() { - if settings.enable_background_ocr { - "not_queued" - } else { - "ocr_disabled" - } - } else { - "queued" - }; - - pipeline_steps.push(serde_json::json!({ - "step": 2, - "name": "OCR Queue Enrollment", - "status": queue_enrollment_status, - "details": { - "user_ocr_enabled": settings.enable_background_ocr, - "queue_entries_count": queue_history.len(), - "queue_history": queue_history.iter().map(|row| serde_json::json!({ - "id": row.get::("id"), - "status": row.get::("status"), - "priority": row.get::("priority"), - "created_at": row.get::, _>("created_at"), - "started_at": row.get::>, _>("started_at"), - "completed_at": row.get::>, _>("completed_at"), - "error_message": row.get::, _>("error_message"), - "attempts": row.get::("attempts"), - "worker_id": row.get::, _>("worker_id") - })).collect::>() - }, - "success": !queue_history.is_empty() || !settings.enable_background_ocr, - "error": if !settings.enable_background_ocr && queue_history.is_empty() { - Some("OCR processing is disabled in user settings") - } else { None } - })); - - // Step 3: OCR Processing - let ocr_status = document.ocr_status.as_deref().unwrap_or("not_started"); - let ocr_success = matches!(ocr_status, "completed"); - - pipeline_steps.push(serde_json::json!({ - "step": 3, - "name": "OCR Text Extraction", - "status": ocr_status, - "details": { - "ocr_text_length": document.ocr_text.as_ref().map(|t| t.len()).unwrap_or(0), - "ocr_confidence": document.ocr_confidence, - "ocr_word_count": document.ocr_word_count, - "ocr_processing_time_ms": document.ocr_processing_time_ms, - "ocr_completed_at": document.ocr_completed_at, - "ocr_error": document.ocr_error, - "has_processed_image": processed_image.is_some(), - "processed_image_info": processed_image.as_ref().map(|pi| serde_json::json!({ - "image_path": pi.processed_image_path, - "image_width": pi.image_width, - "image_height": pi.image_height, - "file_size": pi.file_size, - "processing_parameters": pi.processing_parameters, - "processing_steps": pi.processing_steps, - "created_at": pi.created_at - })) - }, - "success": ocr_success, - "error": document.ocr_error.clone() - })); - - // Step 4: Quality Validation - let quality_passed = if let Some(confidence) = document.ocr_confidence { - confidence >= settings.ocr_min_confidence && document.ocr_word_count.unwrap_or(0) > 0 - } else { - false - }; - - pipeline_steps.push(serde_json::json!({ - "step": 4, - "name": "OCR Quality Validation", - "status": if ocr_success { - if quality_passed { "passed" } else { "failed" } - } else { - "not_reached" - }, - "details": { - "quality_thresholds": { - "min_confidence": settings.ocr_min_confidence, - "brightness_threshold": settings.ocr_quality_threshold_brightness, - "contrast_threshold": settings.ocr_quality_threshold_contrast, - "noise_threshold": settings.ocr_quality_threshold_noise, - "sharpness_threshold": settings.ocr_quality_threshold_sharpness - }, - "actual_values": { - "confidence": document.ocr_confidence, - "word_count": document.ocr_word_count, - "processed_image_available": processed_image.is_some(), - "processing_parameters": processed_image.as_ref().map(|pi| &pi.processing_parameters) - }, - "quality_checks": { - "confidence_check": document.ocr_confidence.map(|c| c >= settings.ocr_min_confidence), - "word_count_check": document.ocr_word_count.map(|w| w > 0), - "processed_image_available": processed_image.is_some() - } - }, - "success": quality_passed, - "error": if !quality_passed && ocr_success { - Some(format!("Quality validation failed: confidence {:.1}% (required: {:.1}%), words: {}", - document.ocr_confidence.unwrap_or(0.0), - settings.ocr_min_confidence, - document.ocr_word_count.unwrap_or(0) - )) - } else { None } - })); - - // Overall summary - let overall_status = if quality_passed { - "success" - } else if matches!(ocr_status, "failed") { - "failed" - } else if matches!(ocr_status, "processing") { - "processing" - } else if matches!(ocr_status, "pending") { - "pending" - } else { - "not_started" - }; - - Ok(Json(serde_json::json!({ - "document_id": document_id, - "filename": document.filename, - "overall_status": overall_status, - "pipeline_steps": pipeline_steps, - "failed_document_info": failed_document.as_ref().map(|row| serde_json::json!({ - "failure_reason": row.get::("failure_reason"), - "failure_stage": row.get::("failure_stage"), - "error_message": row.get::, _>("error_message"), - "retry_count": row.get::, _>("retry_count"), - "last_retry_at": row.get::>, _>("last_retry_at"), - "created_at": row.get::, _>("created_at"), - "content_preview": row.get::, _>("content").map(|c| - c.chars().take(200).collect::() - ), - "failed_ocr_text": row.get::, _>("ocr_text"), - "failed_ocr_confidence": row.get::, _>("ocr_confidence"), - "failed_ocr_word_count": row.get::, _>("ocr_word_count"), - "failed_ocr_processing_time_ms": row.get::, _>("ocr_processing_time_ms") - })), - "user_settings": { - "enable_background_ocr": settings.enable_background_ocr, - "ocr_min_confidence": settings.ocr_min_confidence, - "max_file_size_mb": settings.max_file_size_mb, - "quality_thresholds": { - "brightness": settings.ocr_quality_threshold_brightness, - "contrast": settings.ocr_quality_threshold_contrast, - "noise": settings.ocr_quality_threshold_noise, - "sharpness": settings.ocr_quality_threshold_sharpness - } - }, - "debug_timestamp": chrono::Utc::now(), - "file_analysis": file_analysis, - "detailed_processing_logs": ocr_processing_logs.iter().map(|row| serde_json::json!({ - "id": row.get::("id"), - "status": row.get::("status"), - "priority": row.get::("priority"), - "created_at": row.get::, _>("created_at"), - "started_at": row.get::>, _>("started_at"), - "completed_at": row.get::>, _>("completed_at"), - "error_message": row.get::, _>("error_message"), - "attempts": row.get::("attempts"), - "worker_id": row.get::, _>("worker_id"), - "processing_time_ms": row.get::, _>("processing_time_ms"), - "file_size": row.get::, _>("file_size"), - // Calculate processing duration if both timestamps are available - "processing_duration_ms": if let (Some(started), Some(completed)) = ( - row.get::>, _>("started_at"), - row.get::>, _>("completed_at") - ) { - Some((completed.timestamp_millis() - started.timestamp_millis()) as i32) - } else { - row.get::, _>("processing_time_ms") - }, - // Calculate queue wait time - "queue_wait_time_ms": if let Some(started) = row.get::>, _>("started_at") { - let created = row.get::, _>("created_at"); - Some((started.timestamp_millis() - created.timestamp_millis()) as i32) - } else { - None:: - } - })).collect::>() - }))) -} - -#[derive(Debug, Default, serde::Serialize)] -struct FileAnalysis { - file_type: String, - file_size_bytes: u64, - is_readable: bool, - pdf_info: Option, - text_preview: Option, - error_details: Option, -} - -#[derive(Debug, serde::Serialize)] -struct PdfAnalysis { - is_valid_pdf: bool, - page_count: Option, - has_text_content: bool, - has_images: bool, - is_encrypted: bool, - pdf_version: Option, - font_count: usize, - text_extraction_error: Option, - estimated_text_length: usize, -} - -async fn analyze_file_content(file_path: &str, mime_type: &str) -> Result> { - let mut analysis = FileAnalysis { - file_type: mime_type.to_string(), - ..Default::default() - }; - - // Try to read file size - if let Ok(metadata) = tokio::fs::metadata(file_path).await { - analysis.file_size_bytes = metadata.len(); - } - - // Try to read the file - let file_content = match tokio::fs::read(file_path).await { - Ok(content) => { - analysis.is_readable = true; - content - } - Err(e) => { - analysis.error_details = Some(format!("Failed to read file: {}", e)); - return Ok(analysis); - } - }; - - // Analyze based on file type - if mime_type.contains("pdf") { - analysis.pdf_info = Some(analyze_pdf_content(&file_content).await); - } else if mime_type.starts_with("text/") { - // For text files, show a preview - match String::from_utf8(file_content.clone()) { - Ok(text) => { - analysis.text_preview = Some(text.chars().take(500).collect()); - } - Err(e) => { - analysis.error_details = Some(format!("Failed to decode text file: {}", e)); - } - } - } - - Ok(analysis) -} - -async fn analyze_pdf_content(content: &[u8]) -> PdfAnalysis { - use std::panic; - - let mut analysis = PdfAnalysis { - is_valid_pdf: false, - page_count: None, - has_text_content: false, - has_images: false, - is_encrypted: false, - pdf_version: None, - font_count: 0, - text_extraction_error: None, - estimated_text_length: 0, - }; - - // Check PDF header - if content.len() < 8 { - analysis.text_extraction_error = Some("File too small to be a valid PDF".to_string()); - return analysis; - } - - if !content.starts_with(b"%PDF-") { - analysis.text_extraction_error = Some("File does not start with PDF header".to_string()); - return analysis; - } - - analysis.is_valid_pdf = true; - - // Extract PDF version from header - if content.len() >= 8 { - if let Ok(header) = std::str::from_utf8(&content[0..8]) { - if let Some(version) = header.strip_prefix("%PDF-") { - analysis.pdf_version = Some(version.to_string()); - } - } - } - - // Try to extract text using pdf_extract (same as the main OCR pipeline) - let text_result = panic::catch_unwind(|| { - pdf_extract::extract_text_from_mem(content) - }); - - match text_result { - Ok(Ok(text)) => { - analysis.has_text_content = !text.trim().is_empty(); - analysis.estimated_text_length = text.len(); - - // Count words for comparison with OCR results - let word_count = text.split_whitespace().count(); - if word_count == 0 && text.len() > 0 { - analysis.text_extraction_error = Some("PDF contains characters but no extractable words".to_string()); - } - } - Ok(Err(e)) => { - analysis.text_extraction_error = Some(format!("PDF text extraction failed: {}", e)); - } - Err(_) => { - analysis.text_extraction_error = Some("PDF text extraction panicked (likely corrupted PDF)".to_string()); - } - } - - // Basic PDF structure analysis - let content_str = String::from_utf8_lossy(content); - - // Check for encryption - analysis.is_encrypted = content_str.contains("/Encrypt"); - - // Check for images - analysis.has_images = content_str.contains("/Image") || content_str.contains("/XObject"); - - // Estimate page count (rough) - let page_matches = content_str.matches("/Type /Page").count(); - if page_matches > 0 { - analysis.page_count = Some(page_matches as i32); - } - - // Count fonts (rough) - analysis.font_count = content_str.matches("/Type /Font").count(); - - analysis -} - -#[utoipa::path( - get, - path = "/api/documents/failed-ocr", - tag = "documents", - security( - ("bearer_auth" = []) - ), - params( - ("limit" = Option, Query, description = "Number of documents to return (default: 50)"), - ("offset" = Option, Query, description = "Number of documents to skip (default: 0)") - ), - responses( - (status = 200, description = "List of documents with failed OCR", body = String), - (status = 401, description = "Unauthorized") - ) -)] -async fn get_failed_ocr_documents( - State(state): State>, - auth_user: AuthUser, - Query(pagination): Query, -) -> Result, StatusCode> { - let limit = pagination.limit.unwrap_or(50); - let offset = pagination.offset.unwrap_or(0); - - // Get failed OCR documents with additional failure details - let failed_docs = sqlx::query( - r#" - SELECT d.id, d.filename, d.original_filename, d.file_path, d.file_size, - d.mime_type, d.created_at, d.updated_at, d.user_id, - d.ocr_status, d.ocr_error, d.ocr_failure_reason, - d.ocr_completed_at, d.tags, - -- Count retry attempts from OCR queue - COALESCE(q.retry_count, 0) as retry_count, - q.last_attempt_at - FROM documents d - LEFT JOIN ( - SELECT document_id, - COUNT(*) as retry_count, - MAX(created_at) as last_attempt_at - FROM ocr_queue - WHERE status IN ('failed', 'completed') - GROUP BY document_id - ) q ON d.id = q.document_id - WHERE d.ocr_status = 'failed' - AND ($1::uuid IS NULL OR d.user_id = $1) -- Admin can see all, users see only their own - ORDER BY d.updated_at DESC - LIMIT $2 OFFSET $3 - "# - ) - .bind(if auth_user.user.role == crate::models::UserRole::Admin { - None - } else { - Some(auth_user.user.id) - }) - .bind(limit) - .bind(offset) - .fetch_all(state.db.get_pool()) - .await - .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; - - // Count total failed documents - let total_count: i64 = sqlx::query_scalar( - r#" - SELECT COUNT(*) - FROM documents - WHERE ocr_status = 'failed' - AND ($1::uuid IS NULL OR user_id = $1) - "# - ) - .bind(if auth_user.user.role == crate::models::UserRole::Admin { - None - } else { - Some(auth_user.user.id) - }) - .fetch_one(state.db.get_pool()) - .await - .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; - - let failed_documents: Vec = failed_docs - .into_iter() - .map(|row| { - let tags: Vec = row.get::>, _>("tags").unwrap_or_default(); - - serde_json::json!({ - "id": row.get::("id"), - "filename": row.get::("filename"), - "original_filename": row.get::("original_filename"), - "file_size": row.get::("file_size"), - "mime_type": row.get::("mime_type"), - "created_at": row.get::, _>("created_at"), - "updated_at": row.get::, _>("updated_at"), - "tags": tags, - "ocr_status": row.get::, _>("ocr_status"), - "ocr_error": row.get::, _>("ocr_error"), - "ocr_failure_reason": row.get::, _>("ocr_failure_reason"), - "ocr_completed_at": row.get::>, _>("ocr_completed_at"), - "retry_count": row.get::, _>("retry_count").unwrap_or(0), - "last_attempt_at": row.get::>, _>("last_attempt_at"), - "can_retry": true, - "failure_category": categorize_failure_reason( - row.get::, _>("ocr_failure_reason").as_deref(), - row.get::, _>("ocr_error").as_deref() - ) - }) - }) - .collect(); - - let response = serde_json::json!({ - "documents": failed_documents, - "pagination": { - "total": total_count, - "limit": limit, - "offset": offset, - "has_more": offset + limit < total_count - }, - "statistics": { - "total_failed": total_count, - "failure_categories": get_failure_statistics(&state, auth_user.user.id, auth_user.user.role.clone()).await? - } - }); - - Ok(Json(response)) -} - -#[utoipa::path( - get, - path = "/api/documents/failed", - tag = "documents", - security( - ("bearer_auth" = []) - ), - params( - ("limit" = Option, Query, description = "Number of documents to return"), - ("offset" = Option, Query, description = "Number of documents to skip"), - ("stage" = Option, Query, description = "Filter by failure stage (ocr, ingestion, validation, etc.)"), - ("reason" = Option, Query, description = "Filter by failure reason") - ), - responses( - (status = 200, description = "List of failed documents", body = String), - (status = 401, description = "Unauthorized") - ) -)] -async fn get_failed_documents( - State(state): State>, - auth_user: AuthUser, - Query(params): Query, -) -> Result, StatusCode> { - let limit = params.limit.unwrap_or(25); - let offset = params.offset.unwrap_or(0); - - // Query the unified failed_documents table - let mut query_builder = sqlx::QueryBuilder::new( - r#" - SELECT id, filename, original_filename, file_path, file_size, mime_type, - content, tags, ocr_text, ocr_confidence, ocr_word_count, ocr_processing_time_ms, - failure_reason, failure_stage, error_message, existing_document_id, - ingestion_source, retry_count, last_retry_at, created_at, updated_at - FROM failed_documents - WHERE ($1::uuid IS NULL OR user_id = $1) - "# - ); - - let mut bind_count = 1; - - // Add stage filter if specified - if let Some(stage) = ¶ms.stage { - bind_count += 1; - query_builder.push(&format!(" AND failure_stage = ${}", bind_count)); - } - - // Add reason filter if specified - if let Some(reason) = ¶ms.reason { - bind_count += 1; - query_builder.push(&format!(" AND failure_reason = ${}", bind_count)); - } - - query_builder.push(" ORDER BY created_at DESC"); - query_builder.push(&format!(" LIMIT ${} OFFSET ${}", bind_count + 1, bind_count + 2)); - - let mut query = query_builder.build(); - - // Bind parameters in order - query = query.bind(if auth_user.user.role == crate::models::UserRole::Admin { - None - } else { - Some(auth_user.user.id) - }); - - if let Some(stage) = ¶ms.stage { - query = query.bind(stage); - } - - if let Some(reason) = ¶ms.reason { - query = query.bind(reason); - } - - query = query.bind(limit).bind(offset); - - let failed_docs = query - .fetch_all(state.db.get_pool()) - .await - .map_err(|e| { - tracing::error!("Failed to fetch failed documents: {}", e); - StatusCode::INTERNAL_SERVER_ERROR - })?; - - // Count total for pagination - let mut count_query_builder = sqlx::QueryBuilder::new( - "SELECT COUNT(*) FROM failed_documents WHERE ($1::uuid IS NULL OR user_id = $1)" - ); - - let mut count_bind_count = 1; - - if let Some(stage) = ¶ms.stage { - count_bind_count += 1; - count_query_builder.push(&format!(" AND failure_stage = ${}", count_bind_count)); - } - - if let Some(reason) = ¶ms.reason { - count_bind_count += 1; - count_query_builder.push(&format!(" AND failure_reason = ${}", count_bind_count)); - } - - let mut count_query = count_query_builder.build_query_scalar::(); - - count_query = count_query.bind(if auth_user.user.role == crate::models::UserRole::Admin { - None - } else { - Some(auth_user.user.id) - }); - - if let Some(stage) = ¶ms.stage { - count_query = count_query.bind(stage); - } - - if let Some(reason) = ¶ms.reason { - count_query = count_query.bind(reason); - } - - let total_count = count_query - .fetch_one(state.db.get_pool()) - .await - .unwrap_or(0); - - // Convert to JSON response format - let documents: Vec = failed_docs.iter().map(|row| { - serde_json::json!({ - "id": row.get::("id"), - "filename": row.get::("filename"), - "original_filename": row.get::, _>("original_filename"), - "file_path": row.get::, _>("file_path"), - "file_size": row.get::, _>("file_size"), - "mime_type": row.get::, _>("mime_type"), - "content": row.get::, _>("content"), - "tags": row.get::, _>("tags"), - "ocr_text": row.get::, _>("ocr_text"), - "ocr_confidence": row.get::, _>("ocr_confidence"), - "ocr_word_count": row.get::, _>("ocr_word_count"), - "ocr_processing_time_ms": row.get::, _>("ocr_processing_time_ms"), - "failure_reason": row.get::("failure_reason"), - "failure_stage": row.get::("failure_stage"), - "error_message": row.get::, _>("error_message"), - "existing_document_id": row.get::, _>("existing_document_id"), - "ingestion_source": row.get::("ingestion_source"), - "retry_count": row.get::, _>("retry_count"), - "last_retry_at": row.get::>, _>("last_retry_at"), - "created_at": row.get::, _>("created_at"), - "updated_at": row.get::, _>("updated_at"), - - // Computed fields for backward compatibility - "failure_category": categorize_failure_reason( - Some(&row.get::("failure_reason")), - row.get::, _>("error_message").as_deref() - ), - "source": match row.get::("failure_stage").as_str() { - "ocr" => "OCR Processing", - "ingestion" => "Document Ingestion", - "validation" => "Document Validation", - "storage" => "File Storage", - "processing" => "Document Processing", - "sync" => "Source Synchronization", - _ => "Unknown" - } - }) - }).collect(); - - // Calculate statistics for the response - let mut stage_stats = std::collections::HashMap::new(); - let mut reason_stats = std::collections::HashMap::new(); - - for doc in &documents { - let stage = doc["failure_stage"].as_str().unwrap_or("unknown"); - let reason = doc["failure_reason"].as_str().unwrap_or("unknown"); - - *stage_stats.entry(stage).or_insert(0) += 1; - *reason_stats.entry(reason).or_insert(0) += 1; - } - - let response = serde_json::json!({ - "documents": documents, - "pagination": { - "limit": limit, - "offset": offset, - "total": total_count, - "total_pages": (total_count as f64 / limit as f64).ceil() as i64 - }, - "statistics": { - "total_failed": total_count, - "by_stage": stage_stats, - "by_reason": reason_stats - }, - "filters": { - "stage": params.stage, - "reason": params.reason - } - }); - - Ok(Json(response)) -} - -#[utoipa::path( - get, - path = "/api/documents/failed/{id}/view", - tag = "documents", - security( - ("bearer_auth" = []) - ), - params( - ("id" = uuid::Uuid, Path, description = "Failed Document ID") - ), - responses( - (status = 200, description = "Failed document content for viewing in browser"), - (status = 404, description = "Failed document not found or file deleted"), - (status = 401, description = "Unauthorized") - ) -)] -async fn view_failed_document( - State(state): State>, - auth_user: AuthUser, - Path(failed_document_id): Path, -) -> Result { - // Get failed document from database - let row = sqlx::query( - r#" - SELECT file_path, filename, mime_type, user_id - FROM failed_documents - WHERE id = $1 AND ($2::uuid IS NULL OR user_id = $2) - "# - ) - .bind(failed_document_id) - .bind(if auth_user.user.role == crate::models::UserRole::Admin { - None - } else { - Some(auth_user.user.id) - }) - .fetch_optional(&state.db.pool) - .await - .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)? - .ok_or(StatusCode::NOT_FOUND)?; - - let file_path: Option = row.get("file_path"); - let filename: String = row.get("filename"); - let mime_type: Option = row.get("mime_type"); - - // Check if file_path exists (some failed documents might not have been saved) - let file_path = file_path.ok_or(StatusCode::NOT_FOUND)?; - - let file_service = FileService::new(state.config.upload_path.clone()); - let file_data = file_service - .read_file(&file_path) - .await - .map_err(|_| StatusCode::NOT_FOUND)?; // File was deleted or moved - - // Determine content type from mime_type or file extension - let content_type = mime_type - .unwrap_or_else(|| { - mime_guess::from_path(&filename) - .first_or_octet_stream() - .to_string() - }); - - let response = Response::builder() - .header(CONTENT_TYPE, content_type) - .header("Content-Length", file_data.len()) - .header("Content-Disposition", format!("inline; filename=\"{}\"", filename)) - .body(file_data.into()) - .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; - - Ok(response) -} - -async fn calculate_estimated_wait_time(priority: i32) -> i64 { - // Simple estimation based on priority - in a real implementation, - // this would check actual queue depth and processing times - match priority { - 15.. => 1, // High priority retry: ~1 minute - 10..14 => 3, // Medium priority: ~3 minutes - 5..9 => 10, // Low priority: ~10 minutes - _ => 30, // Very low priority: ~30 minutes - } -} - -fn categorize_failure_reason(failure_reason: Option<&str>, error_message: Option<&str>) -> &'static str { - match failure_reason { - Some("pdf_font_encoding") => "PDF Font Issues", - Some("pdf_corruption") => "PDF Corruption", - Some("processing_timeout") => "Timeout", - Some("memory_limit") => "Memory Limit", - Some("pdf_parsing_panic") => "PDF Parsing Error", - Some("low_ocr_confidence") => "Low OCR Confidence", - Some("unknown") | None => { - // Try to categorize based on error message - if let Some(error) = error_message { - let error_lower = error.to_lowercase(); - if error_lower.contains("timeout") { - "Timeout" - } else if error_lower.contains("memory") { - "Memory Limit" - } else if error_lower.contains("font") || error_lower.contains("encoding") { - "PDF Font Issues" - } else if error_lower.contains("corrupt") { - "PDF Corruption" - } else if error_lower.contains("quality below threshold") || error_lower.contains("confidence") { - "Low OCR Confidence" - } else { - "Unknown Error" - } - } else { - "Unknown Error" - } - } - _ => "Other" - } -} - -async fn get_failure_statistics( - state: &Arc, - user_id: uuid::Uuid, - user_role: crate::models::UserRole -) -> Result { - let stats = sqlx::query( - r#" - SELECT - ocr_failure_reason, - COUNT(*) as count - FROM documents - WHERE ocr_status = 'failed' - AND ($1::uuid IS NULL OR user_id = $1) - GROUP BY ocr_failure_reason - ORDER BY count DESC - "# - ) - .bind(if user_role == crate::models::UserRole::Admin { - None - } else { - Some(user_id) - }) - .fetch_all(state.db.get_pool()) - .await - .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; - - let categories: Vec = stats - .into_iter() - .map(|row| { - let reason = row.get::, _>("ocr_failure_reason"); - let count = row.get::("count"); - - serde_json::json!({ - "reason": reason.clone().unwrap_or_else(|| "unknown".to_string()), - "display_name": categorize_failure_reason(reason.as_deref(), None), - "count": count - }) - }) - .collect(); - - Ok(serde_json::json!(categories)) -} - -#[utoipa::path( - get, - path = "/api/documents/duplicates", - tag = "documents", - security( - ("bearer_auth" = []) - ), - params( - ("limit" = Option, Query, description = "Number of duplicate groups to return per page"), - ("offset" = Option, Query, description = "Number of duplicate groups to skip") - ), - responses( - (status = 200, description = "User's duplicate documents grouped by hash", body = String), - (status = 401, description = "Unauthorized") - ) -)] -async fn get_user_duplicates( - State(state): State>, - auth_user: AuthUser, - Query(query): Query, -) -> Result, StatusCode> { - let limit = query.limit.unwrap_or(25); - let offset = query.offset.unwrap_or(0); - - let (duplicates, total_count) = state - .db - .get_user_duplicates(auth_user.user.id, auth_user.user.role, limit, offset) - .await - .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; - - let response = serde_json::json!({ - "duplicates": duplicates, - "pagination": { - "total": total_count, - "limit": limit, - "offset": offset, - "has_more": offset + limit < total_count - }, - "statistics": { - "total_duplicate_groups": total_count - } - }); - - Ok(Json(response)) -} - -#[utoipa::path( - delete, - path = "/api/documents/{id}", - tag = "documents", - security( - ("bearer_auth" = []) - ), - params( - ("id" = uuid::Uuid, Path, description = "Document ID") - ), - responses( - (status = 200, description = "Document deleted successfully", body = String), - (status = 404, description = "Document not found"), - (status = 401, description = "Unauthorized") - ) -)] -pub async fn delete_document( - State(state): State>, - auth_user: AuthUser, - Path(document_id): Path, -) -> Result, StatusCode> { - let deleted_document = state - .db - .delete_document(document_id, auth_user.user.id, auth_user.user.role) - .await - .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)? - .ok_or(StatusCode::NOT_FOUND)?; - - // Create ignored file record for future source sync prevention - if let Err(e) = crate::db::ignored_files::create_ignored_file_from_document( - state.db.get_pool(), - document_id, - auth_user.user.id, - Some("deleted by user".to_string()), - None, // source_type will be determined by sync processes - None, // source_path will be determined by sync processes - None, // source_identifier will be determined by sync processes - ).await { - tracing::warn!("Failed to create ignored file record for document {}: {}", document_id, e); - } - - let file_service = FileService::new(state.config.upload_path.clone()); - - if let Err(e) = file_service.delete_document_files(&deleted_document).await { - tracing::warn!("Failed to delete some files for document {}: {}", document_id, e); - } - - Ok(Json(serde_json::json!({ - "success": true, - "message": "Document deleted successfully", - "document_id": document_id, - "filename": deleted_document.filename - }))) -} - -#[utoipa::path( - delete, - path = "/api/documents", - tag = "documents", - security( - ("bearer_auth" = []) - ), - request_body(content = BulkDeleteRequest, description = "List of document IDs to delete"), - responses( - (status = 200, description = "Documents deleted successfully", body = String), - (status = 400, description = "Bad request - no document IDs provided"), - (status = 401, description = "Unauthorized") - ) -)] -pub async fn bulk_delete_documents( - State(state): State>, - auth_user: AuthUser, - Json(request): Json, -) -> Result, StatusCode> { - if request.document_ids.is_empty() { - return Ok(Json(serde_json::json!({ - "success": false, - "message": "No document IDs provided", - "deleted_count": 0 - }))); - } - - let deleted_documents = state - .db - .bulk_delete_documents(&request.document_ids, auth_user.user.id, auth_user.user.role) - .await - .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; - - // Create ignored file records for all successfully deleted documents - let mut ignored_file_creation_failures = 0; - for document in &deleted_documents { - if let Err(e) = crate::db::ignored_files::create_ignored_file_from_document( - state.db.get_pool(), - document.id, - auth_user.user.id, - Some("bulk deleted by user".to_string()), - None, // source_type will be determined by sync processes - None, // source_path will be determined by sync processes - None, // source_identifier will be determined by sync processes - ).await { - ignored_file_creation_failures += 1; - tracing::warn!("Failed to create ignored file record for document {}: {}", document.id, e); - } - } - - let file_service = FileService::new(state.config.upload_path.clone()); - let mut successful_file_deletions = 0; - let mut failed_file_deletions = 0; - - for document in &deleted_documents { - match file_service.delete_document_files(document).await { - Ok(_) => successful_file_deletions += 1, - Err(e) => { - failed_file_deletions += 1; - tracing::warn!("Failed to delete files for document {}: {}", document.id, e); - } - } - } - - let deleted_count = deleted_documents.len(); - let requested_count = request.document_ids.len(); - - let message = if deleted_count == requested_count { - format!("Successfully deleted {} documents", deleted_count) - } else { - format!("Deleted {} of {} requested documents (some may not exist or belong to other users)", deleted_count, requested_count) - }; - - Ok(Json(serde_json::json!({ - "success": true, - "message": message, - "deleted_count": deleted_count, - "requested_count": requested_count, - "successful_file_deletions": successful_file_deletions, - "failed_file_deletions": failed_file_deletions, - "ignored_file_creation_failures": ignored_file_creation_failures, - "deleted_document_ids": deleted_documents.iter().map(|d| d.id).collect::>() - }))) -} - -#[utoipa::path( - post, - path = "/api/documents/delete-low-confidence", - request_body = DeleteLowConfidenceRequest, - responses( - (status = 200, description = "Low confidence documents operation result"), - (status = 401, description = "Unauthorized"), - (status = 500, description = "Internal server error") - ), - security( - ("bearer_auth" = []) - ), - tag = "documents" -)] -pub async fn delete_low_confidence_documents( - State(state): State>, - auth_user: AuthUser, - Json(request): Json, -) -> Result, StatusCode> { - if request.max_confidence < 0.0 || request.max_confidence > 100.0 { - return Ok(Json(serde_json::json!({ - "success": false, - "message": "max_confidence must be between 0.0 and 100.0", - "matched_count": 0 - }))); - } - - let is_preview = request.preview_only.unwrap_or(false); - - // Find documents with confidence below threshold OR failed OCR - let matched_documents = state - .db - .find_low_confidence_and_failed_documents(request.max_confidence, auth_user.user.id, auth_user.user.role) - .await - .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; - - let matched_count = matched_documents.len(); - - if is_preview { - // Convert documents to response format with key details - let document_details: Vec = matched_documents.iter().map(|d| { - serde_json::json!({ - "id": d.id, - "filename": d.filename, - "original_filename": d.original_filename, - "file_size": d.file_size, - "ocr_confidence": d.ocr_confidence, - "ocr_status": d.ocr_status, - "created_at": d.created_at, - "mime_type": d.mime_type - }) - }).collect(); - - return Ok(Json(serde_json::json!({ - "success": true, - "message": format!("Found {} documents with OCR confidence below {}%", matched_count, request.max_confidence), - "matched_count": matched_count, - "preview": true, - "document_ids": matched_documents.iter().map(|d| d.id).collect::>(), - "documents": document_details - }))); - } - - if matched_documents.is_empty() { - return Ok(Json(serde_json::json!({ - "success": true, - "message": format!("No documents found with OCR confidence below {}%", request.max_confidence), - "deleted_count": 0 - }))); - } - - // Extract document IDs for bulk deletion - let document_ids: Vec = matched_documents.iter().map(|d| d.id).collect(); - - // Use existing bulk delete logic - let deleted_documents = state - .db - .bulk_delete_documents(&document_ids, auth_user.user.id, auth_user.user.role) - .await - .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; - - // Create ignored file records for all successfully deleted documents - let mut ignored_file_creation_failures = 0; - for document in &deleted_documents { - if let Err(e) = crate::db::ignored_files::create_ignored_file_from_document( - state.db.get_pool(), - document.id, - auth_user.user.id, - Some(format!("deleted due to low OCR confidence ({}%)", - document.ocr_confidence.unwrap_or(0.0))), - None, - None, - None, - ).await { - ignored_file_creation_failures += 1; - tracing::warn!("Failed to create ignored file record for document {}: {}", document.id, e); - } - } - - let file_service = FileService::new(state.config.upload_path.clone()); - let mut successful_file_deletions = 0; - let mut failed_file_deletions = 0; - - for document in &deleted_documents { - match file_service.delete_document_files(document).await { - Ok(_) => successful_file_deletions += 1, - Err(e) => { - failed_file_deletions += 1; - tracing::warn!("Failed to delete files for document {}: {}", document.id, e); - } - } - } - - let deleted_count = deleted_documents.len(); - - Ok(Json(serde_json::json!({ - "success": true, - "message": format!("Successfully deleted {} documents with OCR confidence below {}%", deleted_count, request.max_confidence), - "deleted_count": deleted_count, - "matched_count": matched_count, - "successful_file_deletions": successful_file_deletions, - "failed_file_deletions": failed_file_deletions, - "ignored_file_creation_failures": ignored_file_creation_failures, - "deleted_document_ids": deleted_documents.iter().map(|d| d.id).collect::>() - }))) -} - -/// Delete all documents with failed OCR processing -#[utoipa::path( - post, - path = "/api/documents/delete-failed-ocr", - tag = "documents", - security( - ("bearer_auth" = []) - ), - request_body = serde_json::Value, - responses( - (status = 200, description = "Failed OCR documents deleted successfully", body = serde_json::Value), - (status = 401, description = "Unauthorized"), - (status = 500, description = "Internal server error") - ) -)] -pub async fn delete_failed_ocr_documents( - State(state): State>, - auth_user: AuthUser, - Json(request): Json, -) -> Result, StatusCode> { - let is_preview = request.get("preview_only").and_then(|v| v.as_bool()).unwrap_or(false); - - // Find documents with failed OCR - let matched_documents = state - .db - .find_failed_ocr_documents(auth_user.user.id, auth_user.user.role) - .await - .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; - - let matched_count = matched_documents.len(); - - if is_preview { - return Ok(Json(serde_json::json!({ - "success": true, - "message": format!("Found {} documents with failed OCR processing", matched_count), - "matched_count": matched_count, - "preview": true, - "document_ids": matched_documents.iter().map(|d| d.id).collect::>() - }))); - } - - if matched_documents.is_empty() { - return Ok(Json(serde_json::json!({ - "success": true, - "message": "No documents found with failed OCR processing", - "deleted_count": 0 - }))); - } - - // Extract document IDs for bulk deletion - let document_ids: Vec = matched_documents.iter().map(|d| d.id).collect(); - - // Use existing bulk delete logic - let deleted_documents = state - .db - .bulk_delete_documents(&document_ids, auth_user.user.id, auth_user.user.role) - .await - .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; - - // Create ignored file records for all successfully deleted documents - let mut ignored_file_creation_failures = 0; - for document in &deleted_documents { - let reason = if let Some(ref error) = document.ocr_error { - format!("deleted due to failed OCR processing: {}", error) - } else { - "deleted due to failed OCR processing".to_string() - }; - - if let Err(e) = crate::db::ignored_files::create_ignored_file_from_document( - state.db.get_pool(), - document.id, - auth_user.user.id, - Some(reason), - None, - None, - None, - ).await { - ignored_file_creation_failures += 1; - tracing::warn!("Failed to create ignored file record for document {}: {}", document.id, e); - } - } - - let file_service = FileService::new(state.config.upload_path.clone()); - let mut successful_file_deletions = 0; - let mut failed_file_deletions = 0; - - for document in &deleted_documents { - match file_service.delete_document_files(document).await { - Ok(_) => successful_file_deletions += 1, - Err(e) => { - failed_file_deletions += 1; - tracing::warn!("Failed to delete files for document {}: {}", document.id, e); - } - } - } - - let deleted_count = deleted_documents.len(); - - Ok(Json(serde_json::json!({ - "success": true, - "message": format!("Successfully deleted {} documents with failed OCR processing", deleted_count), - "deleted_count": deleted_count, - "matched_count": matched_count, - "successful_file_deletions": successful_file_deletions, - "failed_file_deletions": failed_file_deletions, - "ignored_file_creation_failures": ignored_file_creation_failures, - "deleted_document_ids": deleted_documents.iter().map(|d| d.id).collect::>() - }))) -} \ No newline at end of file diff --git a/src/routes/documents/bulk.rs b/src/routes/documents/bulk.rs new file mode 100644 index 0000000..6aced9b --- /dev/null +++ b/src/routes/documents/bulk.rs @@ -0,0 +1,401 @@ +use axum::{ + extract::{Query, State}, + http::StatusCode, + response::Json, +}; +use std::sync::Arc; +use tracing::{debug, error, info, warn}; + +use crate::{ + auth::AuthUser, + services::file_service::FileService, + AppState, +}; +use super::types::{BulkDeleteRequest, DeleteLowConfidenceRequest, BulkDeleteResponse}; + +/// Bulk delete multiple documents +#[utoipa::path( + delete, + path = "/api/documents", + tag = "documents", + security( + ("bearer_auth" = []) + ), + request_body = BulkDeleteRequest, + responses( + (status = 200, description = "Bulk delete results", body = BulkDeleteResponse), + (status = 400, description = "Bad request"), + (status = 401, description = "Unauthorized"), + (status = 500, description = "Internal server error") + ) +)] +pub async fn bulk_delete_documents( + State(state): State>, + auth_user: AuthUser, + Json(request): Json, +) -> Result, StatusCode> { + if request.document_ids.is_empty() { + return Err(StatusCode::BAD_REQUEST); + } + + if request.document_ids.len() > 1000 { + return Err(StatusCode::BAD_REQUEST); + } + + info!("Bulk deleting {} documents", request.document_ids.len()); + + // Get documents first to check access and collect file paths + let mut documents_to_delete = Vec::new(); + let mut accessible_ids = Vec::new(); + + for document_id in &request.document_ids { + match state + .db + .get_document_by_id(*document_id, auth_user.user.id, auth_user.user.role) + .await + { + Ok(Some(document)) => { + documents_to_delete.push(document); + accessible_ids.push(*document_id); + } + Ok(None) => { + debug!("Document {} not found or access denied", document_id); + } + Err(e) => { + error!("Error checking document {}: {}", document_id, e); + } + } + } + + // Perform bulk delete from database + let (deleted_ids, failed_ids) = state + .db + .bulk_delete_documents(&accessible_ids, auth_user.user.id, auth_user.user.role) + .await + .map_err(|e| { + error!("Database error during bulk delete: {}", e); + StatusCode::INTERNAL_SERVER_ERROR + })?; + + // Delete associated files + let file_service = FileService::new(state.config.clone()); + let mut files_deleted = 0; + let mut files_failed = 0; + + for document in documents_to_delete { + if deleted_ids.contains(&document.id) { + match file_service.delete_document_files(&document).await { + Ok(_) => files_deleted += 1, + Err(e) => { + warn!("Failed to delete files for document {}: {}", document.id, e); + files_failed += 1; + } + } + } + } + + let response = BulkDeleteResponse { + deleted_count: deleted_ids.len() as i64, + failed_count: failed_ids.len() as i64, + deleted_documents: deleted_ids, + failed_documents: failed_ids, + total_files_deleted: files_deleted, + total_files_failed: files_failed, + }; + + info!("Bulk delete completed: {} deleted, {} failed", + response.deleted_count, response.failed_count); + + Ok(Json(response)) +} + +/// Delete documents with low OCR confidence +#[utoipa::path( + post, + path = "/api/documents/delete-low-confidence", + tag = "documents", + security( + ("bearer_auth" = []) + ), + request_body = DeleteLowConfidenceRequest, + responses( + (status = 200, description = "Low confidence delete results", body = BulkDeleteResponse), + (status = 400, description = "Bad request"), + (status = 401, description = "Unauthorized"), + (status = 500, description = "Internal server error") + ) +)] +pub async fn delete_low_confidence_documents( + State(state): State>, + auth_user: AuthUser, + Json(request): Json, +) -> Result, StatusCode> { + if request.max_confidence < 0.0 || request.max_confidence > 100.0 { + return Err(StatusCode::BAD_REQUEST); + } + + let preview_only = request.preview_only.unwrap_or(false); + + info!("Finding documents with OCR confidence <= {}", request.max_confidence); + + // Find documents with low confidence + let low_confidence_docs = state + .db + .find_documents_by_confidence_threshold( + auth_user.user.id, + auth_user.user.role, + request.max_confidence, + 1000, // Limit to prevent excessive operations + 0, + ) + .await + .map_err(|e| { + error!("Database error finding low confidence documents: {}", e); + StatusCode::INTERNAL_SERVER_ERROR + })?; + + if preview_only { + let preview_docs: Vec<_> = low_confidence_docs + .iter() + .take(10) // Show max 10 in preview + .map(|doc| serde_json::json!({ + "id": doc.id, + "filename": doc.original_filename, + "ocr_confidence": doc.ocr_confidence, + "created_at": doc.created_at + })) + .collect(); + + return Ok(Json(serde_json::json!({ + "preview": true, + "total_found": low_confidence_docs.len(), + "documents": preview_docs, + "message": format!("Found {} documents with OCR confidence <= {}", + low_confidence_docs.len(), request.max_confidence) + }))); + } + + // Perform actual deletion + let document_ids: Vec = low_confidence_docs.iter().map(|d| d.id).collect(); + + if document_ids.is_empty() { + return Ok(Json(serde_json::json!({ + "deleted_count": 0, + "failed_count": 0, + "message": "No documents found with the specified confidence threshold" + }))); + } + + let (deleted_ids, failed_ids) = state + .db + .bulk_delete_documents(&document_ids, auth_user.user.id, auth_user.user.role) + .await + .map_err(|e| { + error!("Database error during low confidence bulk delete: {}", e); + StatusCode::INTERNAL_SERVER_ERROR + })?; + + // Delete associated files + let file_service = FileService::new(state.config.clone()); + let mut files_deleted = 0; + let mut files_failed = 0; + + for document in low_confidence_docs { + if deleted_ids.contains(&document.id) { + match file_service.delete_document_files(&document).await { + Ok(_) => files_deleted += 1, + Err(e) => { + warn!("Failed to delete files for document {}: {}", document.id, e); + files_failed += 1; + } + } + } + } + + info!("Low confidence delete completed: {} deleted, {} failed", + deleted_ids.len(), failed_ids.len()); + + Ok(Json(serde_json::json!({ + "deleted_count": deleted_ids.len(), + "failed_count": failed_ids.len(), + "files_deleted": files_deleted, + "files_failed": files_failed, + "deleted_documents": deleted_ids, + "failed_documents": failed_ids, + "message": format!("Deleted {} documents with OCR confidence <= {}", + deleted_ids.len(), request.max_confidence) + }))) +} + +/// Delete documents with failed OCR +#[utoipa::path( + post, + path = "/api/documents/delete-failed-ocr", + tag = "documents", + security( + ("bearer_auth" = []) + ), + responses( + (status = 200, description = "Failed OCR delete results"), + (status = 401, description = "Unauthorized"), + (status = 500, description = "Internal server error") + ) +)] +pub async fn delete_failed_ocr_documents( + State(state): State>, + auth_user: AuthUser, +) -> Result, StatusCode> { + info!("Finding documents with failed OCR"); + + // Find documents with failed OCR + let failed_ocr_docs = state + .db + .find_failed_ocr_documents( + auth_user.user.id, + auth_user.user.role, + 1000, // Limit to prevent excessive operations + 0, + ) + .await + .map_err(|e| { + error!("Database error finding failed OCR documents: {}", e); + StatusCode::INTERNAL_SERVER_ERROR + })?; + + if failed_ocr_docs.is_empty() { + return Ok(Json(serde_json::json!({ + "deleted_count": 0, + "message": "No documents found with failed OCR status" + }))); + } + + // Perform deletion + let document_ids: Vec = failed_ocr_docs.iter().map(|d| d.id).collect(); + + let (deleted_ids, failed_ids) = state + .db + .bulk_delete_documents(&document_ids, auth_user.user.id, auth_user.user.role) + .await + .map_err(|e| { + error!("Database error during failed OCR bulk delete: {}", e); + StatusCode::INTERNAL_SERVER_ERROR + })?; + + // Delete associated files + let file_service = FileService::new(state.config.clone()); + let mut files_deleted = 0; + let mut files_failed = 0; + + for document in failed_ocr_docs { + if deleted_ids.contains(&document.id) { + match file_service.delete_document_files(&document).await { + Ok(_) => files_deleted += 1, + Err(e) => { + warn!("Failed to delete files for document {}: {}", document.id, e); + files_failed += 1; + } + } + } + } + + info!("Failed OCR delete completed: {} deleted, {} failed", + deleted_ids.len(), failed_ids.len()); + + Ok(Json(serde_json::json!({ + "deleted_count": deleted_ids.len(), + "failed_count": failed_ids.len(), + "files_deleted": files_deleted, + "files_failed": files_failed, + "deleted_documents": deleted_ids, + "failed_documents": failed_ids, + "message": format!("Deleted {} documents with failed OCR", deleted_ids.len()) + }))) +} + +/// Get documents marked for deletion (cleanup preview) +pub async fn get_cleanup_preview( + State(state): State>, + auth_user: AuthUser, + Query(params): Query>, +) -> Result, StatusCode> { + let max_confidence = params + .get("max_confidence") + .and_then(|s| s.parse::().ok()) + .unwrap_or(30.0); + + let include_failed = params + .get("include_failed") + .and_then(|s| s.parse::().ok()) + .unwrap_or(true); + + let mut cleanup_candidates = Vec::new(); + let mut total_size = 0i64; + + // Get low confidence documents + let low_confidence_docs = state + .db + .find_documents_by_confidence_threshold( + auth_user.user.id, + auth_user.user.role, + max_confidence, + 100, + 0, + ) + .await + .map_err(|e| { + error!("Database error finding low confidence documents: {}", e); + StatusCode::INTERNAL_SERVER_ERROR + })?; + + for doc in low_confidence_docs { + total_size += doc.file_size; + cleanup_candidates.push(serde_json::json!({ + "id": doc.id, + "filename": doc.original_filename, + "file_size": doc.file_size, + "ocr_confidence": doc.ocr_confidence, + "reason": "low_confidence", + "created_at": doc.created_at + })); + } + + // Get failed OCR documents if requested + if include_failed { + let failed_docs = state + .db + .find_failed_ocr_documents( + auth_user.user.id, + auth_user.user.role, + 100, + 0, + ) + .await + .map_err(|e| { + error!("Database error finding failed OCR documents: {}", e); + StatusCode::INTERNAL_SERVER_ERROR + })?; + + for doc in failed_docs { + total_size += doc.file_size; + cleanup_candidates.push(serde_json::json!({ + "id": doc.id, + "filename": doc.original_filename, + "file_size": doc.file_size, + "ocr_status": doc.ocr_status, + "reason": "failed_ocr", + "created_at": doc.created_at + })); + } + } + + Ok(Json(serde_json::json!({ + "total_candidates": cleanup_candidates.len(), + "total_size_bytes": total_size, + "total_size_mb": (total_size as f64 / 1024.0 / 1024.0).round(), + "candidates": cleanup_candidates, + "criteria": { + "max_confidence": max_confidence, + "include_failed_ocr": include_failed + } + }))) +} \ No newline at end of file diff --git a/src/routes/documents/crud.rs b/src/routes/documents/crud.rs new file mode 100644 index 0000000..051a9ac --- /dev/null +++ b/src/routes/documents/crud.rs @@ -0,0 +1,477 @@ +use axum::{ + extract::{Multipart, Path, Query, State}, + http::{StatusCode, header::CONTENT_TYPE}, + response::{Json, Response}, + body::Body, +}; +use std::sync::Arc; +use tracing::{debug, error, info, warn}; + +use crate::{ + auth::AuthUser, + ingestion::document_ingestion::{DocumentIngestionService, IngestionResult}, + services::file_service::FileService, + models::DocumentResponse, + AppState, +}; +use super::types::{PaginationQuery, DocumentUploadResponse}; + +/// Upload a new document +#[utoipa::path( + post, + path = "/api/documents", + tag = "documents", + security( + ("bearer_auth" = []) + ), + request_body(content = String, description = "Document file", content_type = "multipart/form-data"), + responses( + (status = 200, description = "Document uploaded successfully", body = DocumentUploadResponse), + (status = 400, description = "Bad request"), + (status = 401, description = "Unauthorized"), + (status = 413, description = "File too large"), + (status = 500, description = "Internal server error") + ) +)] +pub async fn upload_document( + State(state): State>, + auth_user: AuthUser, + mut multipart: Multipart, +) -> Result, StatusCode> { + let mut uploaded_file = None; + + while let Some(field) = multipart.next_field().await.map_err(|e| { + error!("Failed to get multipart field: {}", e); + StatusCode::BAD_REQUEST + })? { + + if field.name() == Some("file") { + let filename = field.file_name() + .ok_or_else(|| { + error!("No filename provided in upload"); + StatusCode::BAD_REQUEST + })? + .to_string(); + + let content_type = field.content_type() + .unwrap_or("application/octet-stream") + .to_string(); + + let data = field.bytes().await.map_err(|e| { + error!("Failed to read file data: {}", e); + StatusCode::BAD_REQUEST + })?; + + uploaded_file = Some((filename, content_type, data.to_vec())); + break; + } + } + + let (filename, content_type, data) = uploaded_file.ok_or_else(|| { + error!("No file found in upload"); + StatusCode::BAD_REQUEST + })?; + + info!("Uploading document: {} ({} bytes)", filename, data.len()); + + // Create ingestion service + let file_service = FileService::new(state.config.clone()); + let ingestion_service = DocumentIngestionService::new( + state.db.clone(), + file_service, + state.config.clone(), + ); + + match ingestion_service.ingest_document( + data, + &filename, + &content_type, + auth_user.user.id, + "web_upload".to_string(), + ).await { + Ok(IngestionResult::Success(document)) => { + info!("Document uploaded successfully: {}", document.id); + Ok(Json(DocumentUploadResponse { + document_id: document.id, + filename: document.filename, + file_size: document.file_size, + mime_type: document.mime_type, + status: "success".to_string(), + message: "Document uploaded successfully".to_string(), + })) + } + Ok(IngestionResult::Duplicate(existing_doc)) => { + warn!("Duplicate document upload attempted: {}", existing_doc.id); + Ok(Json(DocumentUploadResponse { + document_id: existing_doc.id, + filename: existing_doc.filename, + file_size: existing_doc.file_size, + mime_type: existing_doc.mime_type, + status: "duplicate".to_string(), + message: "Document already exists".to_string(), + })) + } + Ok(IngestionResult::Failed(failed_doc)) => { + error!("Document ingestion failed: {}", failed_doc.error_message.as_deref().unwrap_or("Unknown error")); + Err(StatusCode::UNPROCESSABLE_ENTITY) + } + Err(e) => { + error!("Failed to ingest document: {}", e); + Err(StatusCode::INTERNAL_SERVER_ERROR) + } + } +} + +/// Get a specific document by ID +#[utoipa::path( + get, + path = "/api/documents/{id}", + tag = "documents", + security( + ("bearer_auth" = []) + ), + params( + ("id" = uuid::Uuid, Path, description = "Document ID") + ), + responses( + (status = 200, description = "Document details", body = DocumentResponse), + (status = 404, description = "Document not found"), + (status = 401, description = "Unauthorized") + ) +)] +pub async fn get_document_by_id( + State(state): State>, + auth_user: AuthUser, + Path(document_id): Path, +) -> Result, StatusCode> { + let document = state + .db + .get_document_by_id(document_id, auth_user.user.id, auth_user.user.role) + .await + .map_err(|e| { + error!("Database error getting document {}: {}", document_id, e); + StatusCode::INTERNAL_SERVER_ERROR + })? + .ok_or(StatusCode::NOT_FOUND)?; + + // Get labels for this document + let labels = state + .db + .get_document_labels(document_id) + .await + .map_err(|e| { + error!("Failed to get labels for document {}: {}", document_id, e); + StatusCode::INTERNAL_SERVER_ERROR + })?; + + let mut response = DocumentResponse::from(document); + response.labels = labels; + + Ok(Json(response)) +} + +/// List documents with pagination and filtering +#[utoipa::path( + get, + path = "/api/documents", + tag = "documents", + security( + ("bearer_auth" = []) + ), + params(PaginationQuery), + responses( + (status = 200, description = "List of documents", body = Vec), + (status = 401, description = "Unauthorized"), + (status = 500, description = "Internal server error") + ) +)] +pub async fn list_documents( + State(state): State>, + auth_user: AuthUser, + Query(query): Query, +) -> Result>, StatusCode> { + let limit = query.limit.unwrap_or(25); + let offset = query.offset.unwrap_or(0); + + let documents = if let Some(ocr_status) = query.ocr_status.as_deref() { + state + .db + .get_documents_by_user_with_role_and_filter( + auth_user.user.id, + auth_user.user.role, + Some(ocr_status), + limit, + offset, + ) + .await + } else { + state + .db + .get_documents_by_user_with_role( + auth_user.user.id, + auth_user.user.role, + limit, + offset, + ) + .await + } + .map_err(|e| { + error!("Database error listing documents: {}", e); + StatusCode::INTERNAL_SERVER_ERROR + })?; + + // Get document IDs for batch label fetching + let document_ids: Vec = documents.iter().map(|d| d.id).collect(); + + // Get labels for all documents in batch + let labels_map = if !document_ids.is_empty() { + let labels = state + .db + .get_labels_for_documents(&document_ids) + .await + .map_err(|e| { + error!("Failed to get labels for documents: {}", e); + StatusCode::INTERNAL_SERVER_ERROR + })?; + + labels.into_iter().collect::>() + } else { + std::collections::HashMap::new() + }; + + // Convert to response format with labels + let responses: Vec = documents + .into_iter() + .map(|doc| { + let mut response = DocumentResponse::from(doc.clone()); + if let Some(labels) = labels_map.get(&doc.id) { + response.labels = labels.clone(); + } + response + }) + .collect(); + + Ok(Json(responses)) +} + +/// Delete a specific document +#[utoipa::path( + delete, + path = "/api/documents/{id}", + tag = "documents", + security( + ("bearer_auth" = []) + ), + params( + ("id" = uuid::Uuid, Path, description = "Document ID") + ), + responses( + (status = 204, description = "Document deleted successfully"), + (status = 404, description = "Document not found"), + (status = 401, description = "Unauthorized"), + (status = 500, description = "Internal server error") + ) +)] +pub async fn delete_document( + State(state): State>, + auth_user: AuthUser, + Path(document_id): Path, +) -> Result { + // Get document first to check if it exists and user has access + let document = state + .db + .get_document_by_id(document_id, auth_user.user.id, auth_user.user.role) + .await + .map_err(|e| { + error!("Database error getting document {}: {}", document_id, e); + StatusCode::INTERNAL_SERVER_ERROR + })? + .ok_or(StatusCode::NOT_FOUND)?; + + // Delete from database + let deleted = state + .db + .delete_document(document_id, auth_user.user.id, auth_user.user.role) + .await + .map_err(|e| { + error!("Database error deleting document {}: {}", document_id, e); + StatusCode::INTERNAL_SERVER_ERROR + })?; + + if !deleted { + return Err(StatusCode::NOT_FOUND); + } + + // Delete associated files + let file_service = FileService::new(state.config.clone()); + if let Err(e) = file_service.delete_document_files(&document).await { + warn!("Failed to delete files for document {}: {}", document_id, e); + // Continue anyway - database deletion succeeded + } + + info!("Document deleted successfully: {}", document_id); + Ok(StatusCode::NO_CONTENT) +} + +/// Download a document file +#[utoipa::path( + get, + path = "/api/documents/{id}/download", + tag = "documents", + security( + ("bearer_auth" = []) + ), + params( + ("id" = uuid::Uuid, Path, description = "Document ID") + ), + responses( + (status = 200, description = "Document file", content_type = "application/octet-stream"), + (status = 404, description = "Document not found"), + (status = 401, description = "Unauthorized"), + (status = 500, description = "Internal server error") + ) +)] +pub async fn download_document( + State(state): State>, + auth_user: AuthUser, + Path(document_id): Path, +) -> Result, StatusCode> { + let document = state + .db + .get_document_by_id(document_id, auth_user.user.id, auth_user.user.role) + .await + .map_err(|e| { + error!("Database error getting document {}: {}", document_id, e); + StatusCode::INTERNAL_SERVER_ERROR + })? + .ok_or(StatusCode::NOT_FOUND)?; + + let file_service = FileService::new(state.config.clone()); + let file_data = file_service + .read_document_file(&document) + .await + .map_err(|e| { + error!("Failed to read document file {}: {}", document_id, e); + StatusCode::INTERNAL_SERVER_ERROR + })?; + + let response = Response::builder() + .status(StatusCode::OK) + .header(CONTENT_TYPE, document.mime_type) + .header("Content-Disposition", format!("attachment; filename=\"{}\"", document.original_filename)) + .header("Content-Length", file_data.len().to_string()) + .body(Body::from(file_data)) + .map_err(|e| { + error!("Failed to build response: {}", e); + StatusCode::INTERNAL_SERVER_ERROR + })?; + + debug!("Document downloaded: {}", document_id); + Ok(response) +} + +/// View a document in the browser +#[utoipa::path( + get, + path = "/api/documents/{id}/view", + tag = "documents", + security( + ("bearer_auth" = []) + ), + params( + ("id" = uuid::Uuid, Path, description = "Document ID") + ), + responses( + (status = 200, description = "Document file for viewing", content_type = "application/octet-stream"), + (status = 404, description = "Document not found"), + (status = 401, description = "Unauthorized"), + (status = 500, description = "Internal server error") + ) +)] +pub async fn view_document( + State(state): State>, + auth_user: AuthUser, + Path(document_id): Path, +) -> Result, StatusCode> { + let document = state + .db + .get_document_by_id(document_id, auth_user.user.id, auth_user.user.role) + .await + .map_err(|e| { + error!("Database error getting document {}: {}", document_id, e); + StatusCode::INTERNAL_SERVER_ERROR + })? + .ok_or(StatusCode::NOT_FOUND)?; + + let file_service = FileService::new(state.config.clone()); + let file_data = file_service + .read_document_file(&document) + .await + .map_err(|e| { + error!("Failed to read document file {}: {}", document_id, e); + StatusCode::INTERNAL_SERVER_ERROR + })?; + + let response = Response::builder() + .status(StatusCode::OK) + .header(CONTENT_TYPE, document.mime_type) + .header("Content-Length", file_data.len().to_string()) + .body(Body::from(file_data)) + .map_err(|e| { + error!("Failed to build response: {}", e); + StatusCode::INTERNAL_SERVER_ERROR + })?; + + debug!("Document viewed: {}", document_id); + Ok(response) +} + +/// Get user's duplicate documents +#[utoipa::path( + get, + path = "/api/documents/duplicates", + tag = "documents", + security( + ("bearer_auth" = []) + ), + params( + ("limit" = Option, Query, description = "Number of duplicate groups to return per page"), + ("offset" = Option, Query, description = "Number of duplicate groups to skip") + ), + responses( + (status = 200, description = "User's duplicate documents grouped by hash", body = serde_json::Value), + (status = 401, description = "Unauthorized") + ) +)] +pub async fn get_user_duplicates( + State(state): State>, + auth_user: AuthUser, + Query(query): Query, +) -> Result, StatusCode> { + let limit = query.limit.unwrap_or(25); + let offset = query.offset.unwrap_or(0); + + let (duplicates, total_count) = state + .db + .get_user_duplicates(auth_user.user.id, auth_user.user.role, limit, offset) + .await + .map_err(|e| { + error!("Failed to get user duplicates: {}", e); + StatusCode::INTERNAL_SERVER_ERROR + })?; + + let response = serde_json::json!({ + "duplicates": duplicates, + "pagination": { + "total": total_count, + "limit": limit, + "offset": offset, + "has_more": offset + limit < total_count + }, + "statistics": { + "total_duplicate_groups": total_count + } + }); + + Ok(Json(response)) +} \ No newline at end of file diff --git a/src/routes/documents/debug.rs b/src/routes/documents/debug.rs new file mode 100644 index 0000000..3a2b879 --- /dev/null +++ b/src/routes/documents/debug.rs @@ -0,0 +1,358 @@ +use axum::{ + extract::{Path, State}, + http::StatusCode, + response::Json, +}; +use std::sync::Arc; +use tracing::{debug, error, info}; + +use crate::{ + auth::AuthUser, + services::file_service::FileService, + AppState, +}; +use super::types::DocumentDebugInfo; + +/// Get comprehensive debug information for a document +#[utoipa::path( + get, + path = "/api/documents/{id}/debug", + tag = "documents", + security( + ("bearer_auth" = []) + ), + params( + ("id" = uuid::Uuid, Path, description = "Document ID") + ), + responses( + (status = 200, description = "Document debug information", body = DocumentDebugInfo), + (status = 404, description = "Document not found"), + (status = 401, description = "Unauthorized"), + (status = 500, description = "Internal server error") + ) +)] +pub async fn get_document_debug_info( + State(state): State>, + auth_user: AuthUser, + Path(document_id): Path, +) -> Result, StatusCode> { + let document = state + .db + .get_document_by_id(document_id, auth_user.user.id, auth_user.user.role) + .await + .map_err(|e| { + error!("Database error getting document {}: {}", document_id, e); + StatusCode::INTERNAL_SERVER_ERROR + })? + .ok_or(StatusCode::NOT_FOUND)?; + + let file_service = FileService::new(state.config.clone()); + + // Check file existence and readability + let file_exists = file_service.document_file_exists(&document).await.unwrap_or(false); + let readable = if file_exists { + file_service.read_document_file(&document).await.is_ok() + } else { + false + }; + + // Get file permissions (simplified) + let permissions = if file_exists { + Some("readable".to_string()) // This could be expanded with actual file permissions + } else { + None + }; + + // Construct processing steps based on document state + let mut processing_steps = vec!["uploaded".to_string()]; + + if document.content.is_some() { + processing_steps.push("content_extracted".to_string()); + } + + match document.ocr_status.as_deref() { + Some("pending") => processing_steps.push("ocr_queued".to_string()), + Some("processing") => processing_steps.push("ocr_in_progress".to_string()), + Some("completed") => processing_steps.push("ocr_completed".to_string()), + Some("failed") => processing_steps.push("ocr_failed".to_string()), + _ => {} + } + + if document.ocr_text.is_some() { + processing_steps.push("ocr_text_available".to_string()); + } + + let debug_info = DocumentDebugInfo { + document_id: document.id, + filename: document.original_filename, + file_path: document.file_path, + file_size: document.file_size, + mime_type: document.mime_type, + created_at: document.created_at, + ocr_status: document.ocr_status, + ocr_confidence: document.ocr_confidence, + ocr_word_count: document.ocr_word_count, + processing_steps, + file_exists, + readable, + permissions, + }; + + debug!("Debug info generated for document: {}", document_id); + Ok(Json(debug_info)) +} + +/// Get thumbnail for a document (if available) +#[utoipa::path( + get, + path = "/api/documents/{id}/thumbnail", + tag = "documents", + security( + ("bearer_auth" = []) + ), + params( + ("id" = uuid::Uuid, Path, description = "Document ID") + ), + responses( + (status = 200, description = "Document thumbnail", content_type = "image/jpeg"), + (status = 404, description = "Document or thumbnail not found"), + (status = 401, description = "Unauthorized"), + (status = 500, description = "Internal server error") + ) +)] +pub async fn get_document_thumbnail( + State(state): State>, + auth_user: AuthUser, + Path(document_id): Path, +) -> Result { + let document = state + .db + .get_document_by_id(document_id, auth_user.user.id, auth_user.user.role) + .await + .map_err(|e| { + error!("Database error getting document {}: {}", document_id, e); + StatusCode::INTERNAL_SERVER_ERROR + })? + .ok_or(StatusCode::NOT_FOUND)?; + + let file_service = FileService::new(state.config.clone()); + + match file_service.get_document_thumbnail(&document).await { + Ok(thumbnail_data) => { + let response = axum::response::Response::builder() + .status(StatusCode::OK) + .header("Content-Type", "image/jpeg") + .header("Content-Length", thumbnail_data.len().to_string()) + .header("Cache-Control", "public, max-age=3600") // Cache for 1 hour + .body(axum::body::Body::from(thumbnail_data)) + .map_err(|e| { + error!("Failed to build thumbnail response: {}", e); + StatusCode::INTERNAL_SERVER_ERROR + })?; + + debug!("Thumbnail served for document: {}", document_id); + Ok(response) + } + Err(_) => { + // Return a default "no thumbnail" response or generate one on the fly + Err(StatusCode::NOT_FOUND) + } + } +} + +/// Get processed image for a document (if available) +#[utoipa::path( + get, + path = "/api/documents/{id}/processed-image", + tag = "documents", + security( + ("bearer_auth" = []) + ), + params( + ("id" = uuid::Uuid, Path, description = "Document ID") + ), + responses( + (status = 200, description = "Processed image", content_type = "image/png"), + (status = 404, description = "Document or processed image not found"), + (status = 401, description = "Unauthorized"), + (status = 500, description = "Internal server error") + ) +)] +pub async fn get_processed_image( + State(state): State>, + auth_user: AuthUser, + Path(document_id): Path, +) -> Result { + let document = state + .db + .get_document_by_id(document_id, auth_user.user.id, auth_user.user.role) + .await + .map_err(|e| { + error!("Database error getting document {}: {}", document_id, e); + StatusCode::INTERNAL_SERVER_ERROR + })? + .ok_or(StatusCode::NOT_FOUND)?; + + // Check if this is an image document + if !document.mime_type.starts_with("image/") { + return Err(StatusCode::BAD_REQUEST); + } + + let file_service = FileService::new(state.config.clone()); + + match file_service.get_processed_image(&document).await { + Ok(image_data) => { + let response = axum::response::Response::builder() + .status(StatusCode::OK) + .header("Content-Type", "image/png") + .header("Content-Length", image_data.len().to_string()) + .header("Cache-Control", "public, max-age=3600") // Cache for 1 hour + .body(axum::body::Body::from(image_data)) + .map_err(|e| { + error!("Failed to build processed image response: {}", e); + StatusCode::INTERNAL_SERVER_ERROR + })?; + + debug!("Processed image served for document: {}", document_id); + Ok(response) + } + Err(_) => { + Err(StatusCode::NOT_FOUND) + } + } +} + +/// Get system-wide document statistics +pub async fn get_document_statistics( + State(state): State>, + auth_user: AuthUser, +) -> Result, StatusCode> { + // Get OCR statistics + let (total, pending, completed, failed) = state + .db + .count_documents_by_ocr_status(auth_user.user.id, auth_user.user.role) + .await + .map_err(|e| { + error!("Database error getting OCR stats: {}", e); + StatusCode::INTERNAL_SERVER_ERROR + })?; + + // Get MIME type distribution + let mime_type_facets = state + .db + .get_mime_type_facets(auth_user.user.id, auth_user.user.role) + .await + .map_err(|e| { + error!("Database error getting MIME type facets: {}", e); + StatusCode::INTERNAL_SERVER_ERROR + })?; + + // Get recent upload activity (simplified) + let recent_documents = state + .db + .get_documents_by_user_with_role(auth_user.user.id, auth_user.user.role, 10, 0) + .await + .map_err(|e| { + error!("Database error getting recent documents: {}", e); + StatusCode::INTERNAL_SERVER_ERROR + })?; + + let total_file_size: i64 = recent_documents.iter().map(|d| d.file_size).sum(); + + Ok(Json(serde_json::json!({ + "document_counts": { + "total": total, + "pending_ocr": pending, + "completed_ocr": completed, + "failed_ocr": failed + }, + "mime_types": mime_type_facets, + "storage": { + "recent_documents_size": total_file_size, + "recent_documents_count": recent_documents.len() + }, + "activity": { + "recent_uploads": recent_documents.len(), + "last_upload": recent_documents.first().map(|d| d.created_at) + } + }))) +} + +/// Validate document integrity +pub async fn validate_document_integrity( + State(state): State>, + auth_user: AuthUser, + Path(document_id): Path, +) -> Result, StatusCode> { + let document = state + .db + .get_document_by_id(document_id, auth_user.user.id, auth_user.user.role) + .await + .map_err(|e| { + error!("Database error getting document {}: {}", document_id, e); + StatusCode::INTERNAL_SERVER_ERROR + })? + .ok_or(StatusCode::NOT_FOUND)?; + + let file_service = FileService::new(state.config.clone()); + let mut issues = Vec::new(); + let mut checks = Vec::new(); + + // Check file existence + checks.push("file_existence".to_string()); + if !file_service.document_file_exists(&document).await.unwrap_or(false) { + issues.push("File does not exist on disk".to_string()); + } + + // Check file readability + checks.push("file_readability".to_string()); + match file_service.read_document_file(&document).await { + Ok(data) => { + // Verify file size matches + if data.len() as i64 != document.file_size { + issues.push(format!( + "File size mismatch: database={}, actual={}", + document.file_size, + data.len() + )); + } + } + Err(e) => { + issues.push(format!("Cannot read file: {}", e)); + } + } + + // Check OCR consistency + checks.push("ocr_consistency".to_string()); + if document.ocr_text.is_some() && document.ocr_status.as_deref() != Some("completed") { + issues.push("OCR text exists but status is not 'completed'".to_string()); + } + + if document.ocr_text.is_none() && document.ocr_status.as_deref() == Some("completed") { + issues.push("OCR status is 'completed' but no OCR text available".to_string()); + } + + // Check confidence consistency + checks.push("confidence_consistency".to_string()); + if let Some(confidence) = document.ocr_confidence { + if confidence < 0.0 || confidence > 100.0 { + issues.push(format!("Invalid OCR confidence value: {}", confidence)); + } + } + + let is_valid = issues.is_empty(); + + info!("Document {} integrity check: {} issues found", document_id, issues.len()); + + Ok(Json(serde_json::json!({ + "document_id": document_id, + "is_valid": is_valid, + "checks_performed": checks, + "issues": issues, + "summary": if is_valid { + "Document integrity is good" + } else { + format!("Found {} integrity issues", issues.len()) + } + }))) +} \ No newline at end of file diff --git a/src/routes/documents/failed.rs b/src/routes/documents/failed.rs new file mode 100644 index 0000000..aaa698b --- /dev/null +++ b/src/routes/documents/failed.rs @@ -0,0 +1,522 @@ +use axum::{ + extract::{Path, Query, State}, + http::StatusCode, + response::{Json, Response}, + body::Body, +}; +use std::sync::Arc; +use tracing::{debug, error, info, warn}; +use std::collections::HashMap; + +use crate::{ + auth::AuthUser, + models::UserRole, + services::file_service::FileService, + AppState, +}; +use super::types::FailedDocumentsQuery; + +/// Get failed documents with filtering and pagination +#[utoipa::path( + get, + path = "/api/documents/failed", + tag = "documents", + security( + ("bearer_auth" = []) + ), + params( + ("limit" = Option, Query, description = "Number of documents to return"), + ("offset" = Option, Query, description = "Number of documents to skip"), + ("stage" = Option, Query, description = "Filter by failure stage (ocr, ingestion, validation, etc.)"), + ("reason" = Option, Query, description = "Filter by failure reason") + ), + responses( + (status = 200, description = "Failed documents list", body = serde_json::Value), + (status = 401, description = "Unauthorized"), + (status = 500, description = "Internal server error") + ) +)] +pub async fn get_failed_documents( + State(state): State>, + auth_user: AuthUser, + Query(params): Query, +) -> Result, StatusCode> { + let limit = params.limit.unwrap_or(25); + let offset = params.offset.unwrap_or(0); + + // Query the unified failed_documents table + let mut query_builder = sqlx::QueryBuilder::new( + r#" + SELECT id, filename, original_filename, file_path, file_size, mime_type, + content, tags, ocr_text, ocr_confidence, ocr_word_count, ocr_processing_time_ms, + failure_reason, failure_stage, error_message, existing_document_id, + ingestion_source, retry_count, last_retry_at, created_at, updated_at + FROM failed_documents + WHERE ($1::uuid IS NULL OR user_id = $1) + "# + ); + + let mut bind_count = 1; + + // Add stage filter if specified + if let Some(stage) = ¶ms.stage { + bind_count += 1; + query_builder.push(&format!(" AND failure_stage = ${}", bind_count)); + } + + // Add reason filter if specified + if let Some(reason) = ¶ms.reason { + bind_count += 1; + query_builder.push(&format!(" AND failure_reason = ${}", bind_count)); + } + + query_builder.push(" ORDER BY created_at DESC"); + query_builder.push(&format!(" LIMIT ${} OFFSET ${}", bind_count + 1, bind_count + 2)); + + let mut query = query_builder.build(); + + // Bind parameters in order + query = query.bind(if auth_user.user.role == UserRole::Admin { + None + } else { + Some(auth_user.user.id) + }); + + if let Some(stage) = ¶ms.stage { + query = query.bind(stage); + } + + if let Some(reason) = ¶ms.reason { + query = query.bind(reason); + } + + query = query.bind(limit).bind(offset); + + let failed_docs = query + .fetch_all(state.db.get_pool()) + .await + .map_err(|e| { + error!("Failed to fetch failed documents: {}", e); + StatusCode::INTERNAL_SERVER_ERROR + })?; + + // Count total for pagination + let mut count_query_builder = sqlx::QueryBuilder::new( + "SELECT COUNT(*) FROM failed_documents WHERE ($1::uuid IS NULL OR user_id = $1)" + ); + + let mut count_bind_count = 1; + + if let Some(stage) = ¶ms.stage { + count_bind_count += 1; + count_query_builder.push(&format!(" AND failure_stage = ${}", count_bind_count)); + } + + if let Some(reason) = ¶ms.reason { + count_bind_count += 1; + count_query_builder.push(&format!(" AND failure_reason = ${}", count_bind_count)); + } + + let mut count_query = count_query_builder.build_query_scalar::(); + + count_query = count_query.bind(if auth_user.user.role == UserRole::Admin { + None + } else { + Some(auth_user.user.id) + }); + + if let Some(stage) = ¶ms.stage { + count_query = count_query.bind(stage); + } + + if let Some(reason) = ¶ms.reason { + count_query = count_query.bind(reason); + } + + let total_count = count_query + .fetch_one(state.db.get_pool()) + .await + .unwrap_or(0); + + // Convert to JSON response format + let documents: Vec = failed_docs.iter().map(|row| { + serde_json::json!({ + "id": row.get::("id"), + "filename": row.get::("filename"), + "original_filename": row.get::, _>("original_filename"), + "file_path": row.get::, _>("file_path"), + "file_size": row.get::, _>("file_size"), + "mime_type": row.get::, _>("mime_type"), + "content": row.get::, _>("content"), + "tags": row.get::, _>("tags"), + "ocr_text": row.get::, _>("ocr_text"), + "ocr_confidence": row.get::, _>("ocr_confidence"), + "ocr_word_count": row.get::, _>("ocr_word_count"), + "ocr_processing_time_ms": row.get::, _>("ocr_processing_time_ms"), + "failure_reason": row.get::("failure_reason"), + "failure_stage": row.get::("failure_stage"), + "error_message": row.get::, _>("error_message"), + "existing_document_id": row.get::, _>("existing_document_id"), + "ingestion_source": row.get::("ingestion_source"), + "retry_count": row.get::, _>("retry_count"), + "last_retry_at": row.get::>, _>("last_retry_at"), + "created_at": row.get::, _>("created_at"), + "updated_at": row.get::, _>("updated_at"), + + // Computed fields for backward compatibility + "failure_category": categorize_failure_reason( + Some(&row.get::("failure_reason")), + row.get::, _>("error_message").as_deref() + ), + "source": match row.get::("failure_stage").as_str() { + "ocr" => "OCR Processing", + "ingestion" => "Document Ingestion", + "validation" => "Document Validation", + "storage" => "File Storage", + "processing" => "Document Processing", + "sync" => "Source Synchronization", + _ => "Unknown" + } + }) + }).collect(); + + // Calculate statistics for the response + let mut stage_stats = HashMap::new(); + let mut reason_stats = HashMap::new(); + + for doc in &documents { + let stage = doc["failure_stage"].as_str().unwrap_or("unknown"); + let reason = doc["failure_reason"].as_str().unwrap_or("unknown"); + + *stage_stats.entry(stage).or_insert(0) += 1; + *reason_stats.entry(reason).or_insert(0) += 1; + } + + let response = serde_json::json!({ + "documents": documents, + "pagination": { + "limit": limit, + "offset": offset, + "total": total_count, + "total_pages": (total_count as f64 / limit as f64).ceil() as i64 + }, + "statistics": { + "total_failed": total_count, + "by_stage": stage_stats, + "by_reason": reason_stats + }, + "filters": { + "stage": params.stage, + "reason": params.reason + } + }); + + Ok(Json(response)) +} + +/// Get failed OCR documents with detailed information +#[utoipa::path( + get, + path = "/api/documents/failed-ocr", + tag = "documents", + security( + ("bearer_auth" = []) + ), + params( + ("limit" = Option, Query, description = "Number of documents to return"), + ("offset" = Option, Query, description = "Number of documents to skip") + ), + responses( + (status = 200, description = "Failed OCR documents list", body = serde_json::Value), + (status = 401, description = "Unauthorized"), + (status = 500, description = "Internal server error") + ) +)] +pub async fn get_failed_ocr_documents( + State(state): State>, + auth_user: AuthUser, + Query(pagination): Query, +) -> Result, StatusCode> { + let limit = pagination.limit.unwrap_or(50); + let offset = pagination.offset.unwrap_or(0); + + // Get failed OCR documents with additional failure details + let failed_docs = sqlx::query( + r#" + SELECT d.id, d.filename, d.original_filename, d.file_path, d.file_size, + d.mime_type, d.created_at, d.updated_at, d.user_id, + d.ocr_status, d.ocr_error, d.ocr_failure_reason, + d.ocr_completed_at, d.tags, + -- Count retry attempts from OCR queue + COALESCE(q.retry_count, 0) as retry_count, + q.last_attempt_at + FROM documents d + LEFT JOIN ( + SELECT document_id, + COUNT(*) as retry_count, + MAX(created_at) as last_attempt_at + FROM ocr_queue + WHERE status IN ('failed', 'completed') + GROUP BY document_id + ) q ON d.id = q.document_id + WHERE d.ocr_status = 'failed' + AND ($1::uuid IS NULL OR d.user_id = $1) -- Admin can see all, users see only their own + ORDER BY d.updated_at DESC + LIMIT $2 OFFSET $3 + "# + ) + .bind(if auth_user.user.role == UserRole::Admin { + None + } else { + Some(auth_user.user.id) + }) + .bind(limit) + .bind(offset) + .fetch_all(state.db.get_pool()) + .await + .map_err(|e| { + error!("Failed to fetch failed OCR documents: {}", e); + StatusCode::INTERNAL_SERVER_ERROR + })?; + + // Count total failed documents + let total_count: i64 = sqlx::query_scalar( + r#" + SELECT COUNT(*) + FROM documents + WHERE ocr_status = 'failed' + AND ($1::uuid IS NULL OR user_id = $1) + "# + ) + .bind(if auth_user.user.role == UserRole::Admin { + None + } else { + Some(auth_user.user.id) + }) + .fetch_one(state.db.get_pool()) + .await + .map_err(|e| { + error!("Failed to count failed OCR documents: {}", e); + StatusCode::INTERNAL_SERVER_ERROR + })?; + + let failed_documents: Vec = failed_docs + .into_iter() + .map(|row| { + let tags: Vec = row.get::>, _>("tags").unwrap_or_default(); + + serde_json::json!({ + "id": row.get::("id"), + "filename": row.get::("filename"), + "original_filename": row.get::("original_filename"), + "file_size": row.get::("file_size"), + "mime_type": row.get::("mime_type"), + "created_at": row.get::, _>("created_at"), + "updated_at": row.get::, _>("updated_at"), + "tags": tags, + "ocr_status": row.get::, _>("ocr_status"), + "ocr_error": row.get::, _>("ocr_error"), + "ocr_failure_reason": row.get::, _>("ocr_failure_reason"), + "ocr_completed_at": row.get::>, _>("ocr_completed_at"), + "retry_count": row.get::, _>("retry_count").unwrap_or(0), + "last_attempt_at": row.get::>, _>("last_attempt_at"), + "can_retry": true, + "failure_category": categorize_failure_reason( + row.get::, _>("ocr_failure_reason").as_deref(), + row.get::, _>("ocr_error").as_deref() + ) + }) + }) + .collect(); + + let response = serde_json::json!({ + "documents": failed_documents, + "pagination": { + "total": total_count, + "limit": limit, + "offset": offset, + "has_more": offset + limit < total_count + }, + "statistics": { + "total_failed": total_count, + "failure_categories": get_failure_statistics(&state, auth_user.user.id, auth_user.user.role.clone()).await? + } + }); + + Ok(Json(response)) +} + +/// View a failed document file +#[utoipa::path( + get, + path = "/api/documents/failed/{id}/view", + tag = "documents", + security( + ("bearer_auth" = []) + ), + params( + ("id" = uuid::Uuid, Path, description = "Failed Document ID") + ), + responses( + (status = 200, description = "Failed document content for viewing in browser"), + (status = 404, description = "Failed document not found or file deleted"), + (status = 401, description = "Unauthorized") + ) +)] +pub async fn view_failed_document( + State(state): State>, + auth_user: AuthUser, + Path(failed_document_id): Path, +) -> Result { + // Get failed document from database + let row = sqlx::query( + r#" + SELECT file_path, filename, mime_type, user_id + FROM failed_documents + WHERE id = $1 AND ($2::uuid IS NULL OR user_id = $2) + "# + ) + .bind(failed_document_id) + .bind(if auth_user.user.role == UserRole::Admin { + None + } else { + Some(auth_user.user.id) + }) + .fetch_optional(&state.db.pool) + .await + .map_err(|e| { + error!("Failed to fetch failed document: {}", e); + StatusCode::INTERNAL_SERVER_ERROR + })? + .ok_or(StatusCode::NOT_FOUND)?; + + let file_path: Option = row.get("file_path"); + let filename: String = row.get("filename"); + let mime_type: Option = row.get("mime_type"); + + // Check if file_path exists (some failed documents might not have been saved) + let file_path = file_path.ok_or(StatusCode::NOT_FOUND)?; + + let file_service = FileService::new(state.config.clone()); + let file_data = file_service + .read_file(&file_path) + .await + .map_err(|e| { + error!("Failed to read failed document file: {}", e); + StatusCode::NOT_FOUND + })?; + + // Determine content type from mime_type or file extension + let content_type = mime_type + .unwrap_or_else(|| { + mime_guess::from_path(&filename) + .first_or_octet_stream() + .to_string() + }); + + let response = Response::builder() + .header("Content-Type", content_type) + .header("Content-Length", file_data.len()) + .header("Content-Disposition", format!("inline; filename=\"{}\"", filename)) + .body(Body::from(file_data)) + .map_err(|e| { + error!("Failed to build response: {}", e); + StatusCode::INTERNAL_SERVER_ERROR + })?; + + debug!("Failed document viewed: {}", failed_document_id); + Ok(response) +} + +/// Helper function to categorize failure reasons +fn categorize_failure_reason(failure_reason: Option<&str>, error_message: Option<&str>) -> &'static str { + match failure_reason { + Some("pdf_font_encoding") => "PDF Font Issues", + Some("pdf_corruption") => "PDF Corruption", + Some("processing_timeout") => "Timeout", + Some("memory_limit") => "Memory Limit", + Some("pdf_parsing_panic") => "PDF Parsing Error", + Some("low_ocr_confidence") => "Low OCR Confidence", + Some("unknown") | None => { + // Try to categorize based on error message + if let Some(error) = error_message { + let error_lower = error.to_lowercase(); + if error_lower.contains("timeout") { + "Timeout" + } else if error_lower.contains("memory") { + "Memory Limit" + } else if error_lower.contains("font") || error_lower.contains("encoding") { + "PDF Font Issues" + } else if error_lower.contains("corrupt") { + "PDF Corruption" + } else if error_lower.contains("quality below threshold") || error_lower.contains("confidence") { + "Low OCR Confidence" + } else { + "Unknown Error" + } + } else { + "Unknown Error" + } + } + _ => "Other" + } +} + +/// Helper function to get failure statistics +async fn get_failure_statistics( + state: &Arc, + user_id: uuid::Uuid, + user_role: UserRole +) -> Result { + let stats = sqlx::query( + r#" + SELECT + ocr_failure_reason, + COUNT(*) as count + FROM documents + WHERE ocr_status = 'failed' + AND ($1::uuid IS NULL OR user_id = $1) + GROUP BY ocr_failure_reason + ORDER BY count DESC + "# + ) + .bind(if user_role == UserRole::Admin { + None + } else { + Some(user_id) + }) + .fetch_all(state.db.get_pool()) + .await + .map_err(|e| { + error!("Failed to get failure statistics: {}", e); + StatusCode::INTERNAL_SERVER_ERROR + })?; + + let categories: Vec = stats + .into_iter() + .map(|row| { + let reason = row.get::, _>("ocr_failure_reason"); + let count = row.get::("count"); + + serde_json::json!({ + "reason": reason.clone().unwrap_or_else(|| "unknown".to_string()), + "display_name": categorize_failure_reason(reason.as_deref(), None), + "count": count + }) + }) + .collect(); + + Ok(serde_json::json!(categories)) +} + +/// Helper function to calculate estimated wait time for retries +pub async fn calculate_estimated_wait_time(priority: i32) -> i64 { + // Simple estimation based on priority - in a real implementation, + // this would check actual queue depth and processing times + match priority { + 15.. => 1, // High priority retry: ~1 minute + 10..14 => 3, // Medium priority: ~3 minutes + 5..9 => 10, // Low priority: ~10 minutes + _ => 30, // Very low priority: ~30 minutes + } +} \ No newline at end of file diff --git a/src/routes/documents/mod.rs b/src/routes/documents/mod.rs new file mode 100644 index 0000000..3ba2420 --- /dev/null +++ b/src/routes/documents/mod.rs @@ -0,0 +1,14 @@ +pub mod types; +pub mod crud; +pub mod ocr; +pub mod bulk; +pub mod debug; +pub mod failed; + +// Re-export commonly used types and functions for backward compatibility +pub use types::*; +pub use crud::*; +pub use ocr::*; +pub use bulk::*; +pub use debug::*; +pub use failed::*; \ No newline at end of file diff --git a/src/routes/documents/ocr.rs b/src/routes/documents/ocr.rs new file mode 100644 index 0000000..a250a56 --- /dev/null +++ b/src/routes/documents/ocr.rs @@ -0,0 +1,281 @@ +use axum::{ + extract::{Path, State}, + http::StatusCode, + response::Json, +}; +use std::sync::Arc; +use tracing::{debug, error, info, warn}; + +use crate::{ + auth::AuthUser, + models::DocumentOcrResponse, + AppState, +}; + +/// Get OCR text for a document +#[utoipa::path( + get, + path = "/api/documents/{id}/ocr", + tag = "documents", + security( + ("bearer_auth" = []) + ), + params( + ("id" = uuid::Uuid, Path, description = "Document ID") + ), + responses( + (status = 200, description = "Document OCR text", body = DocumentOcrResponse), + (status = 404, description = "Document not found"), + (status = 401, description = "Unauthorized"), + (status = 500, description = "Internal server error") + ) +)] +pub async fn get_document_ocr( + State(state): State>, + auth_user: AuthUser, + Path(document_id): Path, +) -> Result, StatusCode> { + let document = state + .db + .get_document_by_id(document_id, auth_user.user.id, auth_user.user.role) + .await + .map_err(|e| { + error!("Database error getting document {}: {}", document_id, e); + StatusCode::INTERNAL_SERVER_ERROR + })? + .ok_or(StatusCode::NOT_FOUND)?; + + let response = DocumentOcrResponse { + document_id: document.id, + filename: document.original_filename, + has_ocr_text: document.ocr_text.is_some(), + ocr_text: document.ocr_text, + ocr_confidence: document.ocr_confidence, + ocr_status: document.ocr_status, + ocr_processing_time_ms: document.ocr_processing_time_ms, + detected_language: None, // This would need to be stored separately if needed + pages_processed: None, // This would need to be stored separately if needed + }; + + Ok(Json(response)) +} + +/// Retry OCR processing for a document +#[utoipa::path( + post, + path = "/api/documents/{id}/retry-ocr", + tag = "documents", + security( + ("bearer_auth" = []) + ), + params( + ("id" = uuid::Uuid, Path, description = "Document ID") + ), + responses( + (status = 200, description = "OCR retry initiated"), + (status = 404, description = "Document not found"), + (status = 401, description = "Unauthorized"), + (status = 409, description = "OCR already in progress"), + (status = 500, description = "Internal server error") + ) +)] +pub async fn retry_ocr( + State(state): State>, + auth_user: AuthUser, + Path(document_id): Path, +) -> Result, StatusCode> { + // Get document first to check if it exists and user has access + let document = state + .db + .get_document_by_id(document_id, auth_user.user.id, auth_user.user.role) + .await + .map_err(|e| { + error!("Database error getting document {}: {}", document_id, e); + StatusCode::INTERNAL_SERVER_ERROR + })? + .ok_or(StatusCode::NOT_FOUND)?; + + // Check if OCR is already in progress + if let Some(ref status) = document.ocr_status { + if status == "processing" { + return Ok(Json(serde_json::json!({ + "success": false, + "message": "OCR is already in progress for this document" + }))); + } + } + + // Add to OCR queue + match state.ocr_queue.enqueue_document(document.id, auth_user.user.id, 1).await { + Ok(_) => { + info!("Document {} queued for OCR retry", document_id); + Ok(Json(serde_json::json!({ + "success": true, + "message": "Document queued for OCR processing" + }))) + } + Err(e) => { + error!("Failed to queue document {} for OCR: {}", document_id, e); + Err(StatusCode::INTERNAL_SERVER_ERROR) + } + } +} + +/// Get OCR processing status for multiple documents +pub async fn get_ocr_status_batch( + State(state): State>, + auth_user: AuthUser, + Json(document_ids): Json>, +) -> Result, StatusCode> { + if document_ids.len() > 100 { + return Err(StatusCode::BAD_REQUEST); + } + + let mut results = Vec::new(); + + for document_id in document_ids { + match state + .db + .get_document_by_id(document_id, auth_user.user.id, auth_user.user.role) + .await + { + Ok(Some(document)) => { + results.push(serde_json::json!({ + "document_id": document.id, + "ocr_status": document.ocr_status, + "ocr_confidence": document.ocr_confidence, + "has_ocr_text": document.ocr_text.is_some(), + "retry_count": document.ocr_retry_count.unwrap_or(0) + })); + } + Ok(None) => { + results.push(serde_json::json!({ + "document_id": document_id, + "error": "Document not found" + })); + } + Err(e) => { + error!("Error getting document {}: {}", document_id, e); + results.push(serde_json::json!({ + "document_id": document_id, + "error": "Database error" + })); + } + } + } + + Ok(Json(serde_json::json!({ + "results": results + }))) +} + +/// Cancel OCR processing for a document +pub async fn cancel_ocr( + State(state): State>, + auth_user: AuthUser, + Path(document_id): Path, +) -> Result, StatusCode> { + // Verify user has access to the document + let _document = state + .db + .get_document_by_id(document_id, auth_user.user.id, auth_user.user.role) + .await + .map_err(|e| { + error!("Database error getting document {}: {}", document_id, e); + StatusCode::INTERNAL_SERVER_ERROR + })? + .ok_or(StatusCode::NOT_FOUND)?; + + // Try to remove from queue + match state.ocr_queue.remove_from_queue(document_id).await { + Ok(removed) => { + if removed { + info!("Document {} removed from OCR queue", document_id); + Ok(Json(serde_json::json!({ + "success": true, + "message": "Document removed from OCR queue" + }))) + } else { + Ok(Json(serde_json::json!({ + "success": false, + "message": "Document was not in the OCR queue" + }))) + } + } + Err(e) => { + error!("Failed to remove document {} from OCR queue: {}", document_id, e); + Err(StatusCode::INTERNAL_SERVER_ERROR) + } + } +} + +/// Get OCR processing statistics +pub async fn get_ocr_stats( + State(state): State>, + auth_user: AuthUser, +) -> Result, StatusCode> { + let (total, pending, completed, failed) = state + .db + .count_documents_by_ocr_status(auth_user.user.id, auth_user.user.role) + .await + .map_err(|e| { + error!("Database error getting OCR stats: {}", e); + StatusCode::INTERNAL_SERVER_ERROR + })?; + + // Get queue statistics + let queue_size = state + .ocr_queue + .get_queue_size() + .await + .map_err(|e| { + error!("Failed to get OCR queue size: {}", e); + StatusCode::INTERNAL_SERVER_ERROR + })?; + + let active_jobs = state + .ocr_queue + .get_active_jobs_count() + .await + .map_err(|e| { + error!("Failed to get active OCR jobs count: {}", e); + StatusCode::INTERNAL_SERVER_ERROR + })?; + + Ok(Json(serde_json::json!({ + "total_documents": total, + "pending_ocr": pending, + "completed_ocr": completed, + "failed_ocr": failed, + "queue_size": queue_size, + "active_jobs": active_jobs, + "completion_rate": if total > 0 { (completed as f64 / total as f64 * 100.0) } else { 0.0 } + }))) +} + +/// Update OCR settings for a document +pub async fn update_ocr_settings( + State(state): State>, + auth_user: AuthUser, + Path(document_id): Path, + Json(settings): Json, +) -> Result, StatusCode> { + // Verify user has access to the document + let _document = state + .db + .get_document_by_id(document_id, auth_user.user.id, auth_user.user.role) + .await + .map_err(|e| { + error!("Database error getting document {}: {}", document_id, e); + StatusCode::INTERNAL_SERVER_ERROR + })? + .ok_or(StatusCode::NOT_FOUND)?; + + // For now, just return success - OCR settings would be stored in metadata + debug!("OCR settings updated for document {}: {:?}", document_id, settings); + + Ok(Json(serde_json::json!({ + "success": true, + "message": "OCR settings updated" + }))) +} \ No newline at end of file diff --git a/src/routes/documents/types.rs b/src/routes/documents/types.rs new file mode 100644 index 0000000..73377c6 --- /dev/null +++ b/src/routes/documents/types.rs @@ -0,0 +1,86 @@ +use serde::{Deserialize, Serialize}; +use utoipa::ToSchema; + +#[derive(Deserialize, ToSchema)] +pub struct PaginationQuery { + pub limit: Option, + pub offset: Option, + pub ocr_status: Option, +} + +#[derive(Deserialize, ToSchema)] +pub struct FailedDocumentsQuery { + pub limit: Option, + pub offset: Option, + pub stage: Option, // 'ocr', 'ingestion', 'validation', etc. + pub reason: Option, // 'duplicate_content', 'low_ocr_confidence', etc. +} + +#[derive(Deserialize, Serialize, ToSchema)] +pub struct BulkDeleteRequest { + pub document_ids: Vec, +} + +#[derive(Deserialize, Serialize, ToSchema)] +pub struct DeleteLowConfidenceRequest { + pub max_confidence: f32, + pub preview_only: Option, +} + +#[derive(Serialize, ToSchema)] +pub struct DocumentUploadResponse { + pub document_id: uuid::Uuid, + pub filename: String, + pub file_size: i64, + pub mime_type: String, + pub status: String, + pub message: String, +} + +#[derive(Serialize, ToSchema)] +pub struct BulkDeleteResponse { + pub deleted_count: i64, + pub failed_count: i64, + pub deleted_documents: Vec, + pub failed_documents: Vec, + pub total_files_deleted: i64, + pub total_files_failed: i64, +} + +#[derive(Serialize, ToSchema)] +pub struct DocumentDebugInfo { + pub document_id: uuid::Uuid, + pub filename: String, + pub file_path: String, + pub file_size: i64, + pub mime_type: String, + pub created_at: chrono::DateTime, + pub ocr_status: Option, + pub ocr_confidence: Option, + pub ocr_word_count: Option, + pub processing_steps: Vec, + pub file_exists: bool, + pub readable: bool, + pub permissions: Option, +} + +impl Default for PaginationQuery { + fn default() -> Self { + Self { + limit: Some(25), + offset: Some(0), + ocr_status: None, + } + } +} + +impl Default for FailedDocumentsQuery { + fn default() -> Self { + Self { + limit: Some(25), + offset: Some(0), + stage: None, + reason: None, + } + } +} \ No newline at end of file diff --git a/src/routes/documents_old.rs.bak b/src/routes/documents_old.rs.bak new file mode 100644 index 0000000..1342895 --- /dev/null +++ b/src/routes/documents_old.rs.bak @@ -0,0 +1,62 @@ +use axum::{ + routing::{get, post, delete}, + Router, +}; +use std::sync::Arc; + +use crate::AppState; + +// Import all the modularized functions +mod types; +mod crud; +mod ocr; +mod bulk; +mod debug; +mod failed; + +// Re-export types for external use +pub use types::*; + +// Use the individual module functions +use crud::*; +use ocr::*; +use bulk::*; +use debug::*; +use failed::*; + +/// Documents router with all document-related endpoints +pub fn router() -> Router> { + Router::new() + // Basic CRUD operations + .route("/", post(upload_document)) + .route("/", get(list_documents)) + .route("/", delete(bulk_delete_documents)) + .route("/{id}", get(get_document_by_id)) + .route("/{id}", delete(delete_document)) + .route("/{id}/download", get(download_document)) + .route("/{id}/view", get(view_document)) + + // OCR operations + .route("/{id}/ocr", get(get_document_ocr)) + .route("/{id}/retry-ocr", post(retry_ocr)) + .route("/ocr/bulk-retry", post(crate::routes::documents_ocr_retry::bulk_retry_ocr)) + .route("/ocr/retry-stats", get(crate::routes::documents_ocr_retry::get_ocr_retry_stats)) + .route("/ocr/retry-recommendations", get(crate::routes::documents_ocr_retry::get_retry_recommendations)) + .route("/{id}/ocr/retry-history", get(crate::routes::documents_ocr_retry::get_document_retry_history)) + + // Bulk operations + .route("/delete-low-confidence", post(delete_low_confidence_documents)) + .route("/delete-failed-ocr", post(delete_failed_ocr_documents)) + + // Debug and diagnostic operations + .route("/{id}/debug", get(get_document_debug_info)) + .route("/{id}/thumbnail", get(get_document_thumbnail)) + .route("/{id}/processed-image", get(get_processed_image)) + + // Failed document management + .route("/failed", get(get_failed_documents)) + .route("/failed/{id}/view", get(view_failed_document)) + + // Other operations + .route("/duplicates", get(get_user_duplicates)) +} \ No newline at end of file diff --git a/src/swagger.rs b/src/swagger.rs index 85bb6fa..51dd76b 100644 --- a/src/swagger.rs +++ b/src/swagger.rs @@ -39,23 +39,23 @@ use crate::{ crate::routes::auth::oidc_login, crate::routes::auth::oidc_callback, // Document endpoints - crate::routes::documents::upload_document, - crate::routes::documents::list_documents, - crate::routes::documents::get_document_by_id, - crate::routes::documents::delete_document, - crate::routes::documents::bulk_delete_documents, - crate::routes::documents::download_document, - crate::routes::documents::view_document, - crate::routes::documents::get_document_thumbnail, - crate::routes::documents::get_document_ocr, - crate::routes::documents::get_processed_image, - crate::routes::documents::retry_ocr, - crate::routes::documents::get_document_debug_info, - crate::routes::documents::get_failed_ocr_documents, - crate::routes::documents::view_failed_document, - crate::routes::documents::delete_low_confidence_documents, - crate::routes::documents::delete_failed_ocr_documents, - crate::routes::documents::get_user_duplicates, + crate::routes::documents::crud::upload_document, + crate::routes::documents::crud::list_documents, + crate::routes::documents::crud::get_document_by_id, + crate::routes::documents::crud::delete_document, + crate::routes::documents::bulk::bulk_delete_documents, + crate::routes::documents::crud::download_document, + crate::routes::documents::crud::view_document, + crate::routes::documents::debug::get_document_thumbnail, + crate::routes::documents::ocr::get_document_ocr, + crate::routes::documents::debug::get_processed_image, + crate::routes::documents::ocr::retry_ocr, + crate::routes::documents::debug::get_document_debug_info, + crate::routes::documents::failed::get_failed_ocr_documents, + crate::routes::documents::failed::view_failed_document, + crate::routes::documents::bulk::delete_low_confidence_documents, + crate::routes::documents::bulk::delete_failed_ocr_documents, + crate::routes::documents::crud::get_user_duplicates, // Labels endpoints crate::routes::labels::get_labels, crate::routes::labels::create_label,