From 84577806ef302a38a7abc1e60ea32bacef1bf38a Mon Sep 17 00:00:00 2001 From: perf3ct Date: Sat, 28 Jun 2025 20:52:58 +0000 Subject: [PATCH] feat(server/client): add failed_documents table to handle failures, and move logic of failures --- frontend/src/components/Layout/AppLayout.tsx | 3 +- frontend/src/pages/FailedOcrPage.tsx | 319 +++++++++++++++++- frontend/src/pages/SourcesPage.tsx | 25 +- frontend/src/services/api.ts | 11 +- ...50628000003_add_failed_documents_table.sql | 103 ++++++ ...migrate_failed_ocr_to_failed_documents.sql | 123 +++++++ src/models.rs | 143 ++++++++ src/ocr/queue.rs | 3 +- src/routes/documents.rs | 226 ++++++++++++- 9 files changed, 936 insertions(+), 20 deletions(-) create mode 100644 migrations/20250628000003_add_failed_documents_table.sql create mode 100644 migrations/20250628000004_migrate_failed_ocr_to_failed_documents.sql diff --git a/frontend/src/components/Layout/AppLayout.tsx b/frontend/src/components/Layout/AppLayout.tsx index cd7b014..5283b8c 100644 --- a/frontend/src/components/Layout/AppLayout.tsx +++ b/frontend/src/components/Layout/AppLayout.tsx @@ -36,6 +36,7 @@ import { Label as LabelIcon, Block as BlockIcon, Api as ApiIcon, + ManageAccounts as ManageIcon, } from '@mui/icons-material'; import { useNavigate, useLocation } from 'react-router-dom'; import { useAuth } from '../../contexts/AuthContext'; @@ -69,7 +70,7 @@ const navigationItems: NavigationItem[] = [ { text: 'Labels', icon: LabelIcon, path: '/labels' }, { text: 'Sources', icon: StorageIcon, path: '/sources' }, { text: 'Watch Folder', icon: FolderIcon, path: '/watch' }, - { text: 'Failed OCR', icon: ErrorIcon, path: '/failed-ocr' }, + { text: 'Document Management', icon: ManageIcon, path: '/failed-ocr' }, { text: 'Ignored Files', icon: BlockIcon, path: '/ignored-files' }, ]; diff --git a/frontend/src/pages/FailedOcrPage.tsx b/frontend/src/pages/FailedOcrPage.tsx index 87c47e9..d493e50 100644 --- a/frontend/src/pages/FailedOcrPage.tsx +++ b/frontend/src/pages/FailedOcrPage.tsx @@ -47,6 +47,7 @@ import { Delete as DeleteIcon, FindInPage as FindInPageIcon, OpenInNew as OpenInNewIcon, + Warning as WarningIcon, } from '@mui/icons-material'; import { format } from 'date-fns'; import { api, documentService, queueService } from '../services/api'; @@ -135,16 +136,22 @@ const FailedOcrPage: React.FC = () => { const [currentTab, setCurrentTab] = useState(0); const [documents, setDocuments] = useState([]); const [duplicates, setDuplicates] = useState([]); + const [failedDocuments, setFailedDocuments] = useState([]); const [loading, setLoading] = useState(true); const [duplicatesLoading, setDuplicatesLoading] = useState(false); + const [failedDocumentsLoading, setFailedDocumentsLoading] = useState(false); + const [failedDocumentsFilters, setFailedDocumentsFilters] = useState<{ stage?: string; reason?: string }>({}); + const [selectedFailedDocument, setSelectedFailedDocument] = useState(null); const [retrying, setRetrying] = useState(null); const [retryingAll, setRetryingAll] = useState(false); const [statistics, setStatistics] = useState(null); const [duplicateStatistics, setDuplicateStatistics] = useState(null); const [pagination, setPagination] = useState({ page: 1, limit: 25 }); const [duplicatesPagination, setDuplicatesPagination] = useState({ page: 1, limit: 25 }); + const [failedDocumentsPagination, setFailedDocumentsPagination] = useState({ page: 1, limit: 25 }); const [totalPages, setTotalPages] = useState(0); const [duplicatesTotalPages, setDuplicatesTotalPages] = useState(0); + const [failedDocumentsTotalPages, setFailedDocumentsTotalPages] = useState(0); const [selectedDocument, setSelectedDocument] = useState(null); const [detailsOpen, setDetailsOpen] = useState(false); const [expandedRows, setExpandedRows] = useState>(new Set()); @@ -223,8 +230,59 @@ const FailedOcrPage: React.FC = () => { useEffect(() => { if (currentTab === 1) { fetchDuplicates(); + } else if (currentTab === 4) { + fetchFailedDocumentsList(); } - }, [currentTab, duplicatesPagination.page]); + }, [currentTab, duplicatesPagination.page, failedDocumentsPagination.page, failedDocumentsFilters]); + + const fetchFailedDocumentsList = async () => { + try { + setFailedDocumentsLoading(true); + const offset = (failedDocumentsPagination.page - 1) * failedDocumentsPagination.limit; + const response = await documentService.getFailedDocuments( + failedDocumentsPagination.limit, + offset, + failedDocumentsFilters.stage, + failedDocumentsFilters.reason + ); + + if (response?.data) { + setFailedDocuments(response.data.documents || []); + if (response.data.pagination) { + setFailedDocumentsTotalPages(Math.ceil(response.data.pagination.total / failedDocumentsPagination.limit)); + } + } + } catch (error) { + console.error('Failed to fetch failed documents:', error); + setSnackbar({ + open: true, + message: 'Failed to load failed documents', + severity: 'error' + }); + } finally { + setFailedDocumentsLoading(false); + } + }; + + const getFailureReasonColor = (reason: string): "error" | "warning" | "info" | "default" => { + switch (reason) { + case 'low_ocr_confidence': + case 'ocr_timeout': + case 'ocr_memory_limit': + case 'pdf_parsing_error': + return 'error'; + case 'duplicate_content': + case 'unsupported_format': + case 'file_too_large': + return 'warning'; + case 'file_corrupted': + case 'access_denied': + case 'permission_denied': + return 'error'; + default: + return 'default'; + } + }; const handleRetryOcr = async (document: FailedDocument) => { try { @@ -309,6 +367,8 @@ const FailedOcrPage: React.FC = () => { case 'Timeout': case 'Memory Limit': return 'error'; + case 'Low OCR Confidence': + return 'warning'; case 'Unknown Error': return 'info'; default: @@ -354,6 +414,8 @@ const FailedOcrPage: React.FC = () => { handlePreviewLowConfidence(); } else if (currentTab === 3) { handlePreviewFailedDocuments(); + } else if (currentTab === 4) { + fetchFailedDocumentsList(); } }; @@ -488,22 +550,27 @@ const FailedOcrPage: React.FC = () => { } - label={`Failed OCR${statistics ? ` (${statistics.total_failed})` : ''}`} + label={`Failed Documents${statistics ? ` (${statistics.total_failed})` : ''}`} iconPosition="start" /> } - label={`Duplicates${duplicateStatistics ? ` (${duplicateStatistics.total_duplicate_groups})` : ''}`} + label={`Duplicate Files${duplicateStatistics ? ` (${duplicateStatistics.total_duplicate_groups})` : ''}`} iconPosition="start" /> } - label={`Low Confidence${previewData ? ` (${previewData.matched_count})` : ''}`} + label={`Low Quality Manager${previewData ? ` (${previewData.matched_count})` : ''}`} iconPosition="start" /> } - label="Delete Failed" + label="Bulk Cleanup" + iconPosition="start" + /> + } + label="Failed Documents" iconPosition="start" /> @@ -1073,15 +1140,62 @@ const FailedOcrPage: React.FC = () => { 0 ? 'warning.main' : 'success.main'}> {previewData.message} - {previewData.matched_count > 0 && ( + {previewData.matched_count > 0 && previewData.documents && ( - - Document IDs that would be deleted: - - - {previewData.document_ids.slice(0, 10).join(', ')} - {previewData.document_ids.length > 10 && ` ... and ${previewData.document_ids.length - 10} more`} + + Documents that would be deleted: + + + + + Filename + Size + OCR Confidence + Status + Date + + + + {previewData.documents.slice(0, 20).map((doc: any) => ( + + + + {doc.original_filename || doc.filename} + + + + + {formatFileSize(doc.file_size)} + + + + + {doc.ocr_confidence ? `${doc.ocr_confidence.toFixed(1)}%` : 'N/A'} + + + + + + + + {new Date(doc.created_at).toLocaleDateString()} + + + + ))} + +
+
+ {previewData.documents.length > 20 && ( + + ... and {previewData.documents.length - 20} more documents + + )}
)} @@ -1175,6 +1289,187 @@ const FailedOcrPage: React.FC = () => { )} + {/* Failed Documents Tab Content */} + {currentTab === 4 && ( + <> + + Failed Documents Overview + + This shows all documents that failed at any stage of processing: ingestion, validation, OCR, storage, etc. + Use the filters to narrow down by failure stage or specific reason. + + + + {/* Filter Controls */} + + + + + setFailedDocumentsFilters(prev => ({ ...prev, stage: e.target.value || undefined }))} + fullWidth + SelectProps={{ native: true }} + > + + + + + + + + + + + setFailedDocumentsFilters(prev => ({ ...prev, reason: e.target.value || undefined }))} + fullWidth + SelectProps={{ native: true }} + > + + + + + + + + + + + + + + + + + + + {/* Failed Documents List */} + {failedDocuments.length > 0 && ( + + + + Failed Documents ({failedDocuments.length}) + + + + + + Filename + Stage + Reason + Size + Date + Actions + + + + {failedDocuments.slice((failedDocumentsPagination.page - 1) * failedDocumentsPagination.limit, failedDocumentsPagination.page * failedDocumentsPagination.limit).map((doc: any) => ( + + + + {doc.original_filename || doc.filename} + + {doc.ingestion_source && ( + + )} + + + + + + + + + + {doc.file_size ? formatFileSize(doc.file_size) : 'N/A'} + + + + + {new Date(doc.created_at).toLocaleDateString()} + + + + setSelectedFailedDocument(doc)} + title="View Details" + > + + + {doc.existing_document_id && ( + navigate(`/documents/${doc.existing_document_id}`)} + title="View Existing Document" + > + + + )} + + + ))} + +
+
+ + {/* Pagination */} + {failedDocumentsTotalPages > 1 && ( + + setFailedDocumentsPagination(prev => ({ ...prev, page }))} + color="primary" + /> + + )} +
+
+ )} + + {/* Loading State */} + {failedDocumentsLoading && ( + + + Loading failed documents... + + )} + + {/* Empty State */} + {!failedDocumentsLoading && failedDocuments.length === 0 && ( + + No Failed Documents Found + + No documents have failed processing with the current filters. This is good! + + + )} + + )} + {/* Confirmation Dialog */} { const [testingConnection, setTestingConnection] = useState(false); const [syncingSource, setSyncingSource] = useState(null); const [stoppingSync, setStoppingSync] = useState(null); + const [autoRefreshing, setAutoRefreshing] = useState(false); useEffect(() => { loadSources(); @@ -159,6 +160,25 @@ const SourcesPage: React.FC = () => { } }, [user]); + // Auto-refresh sources when any source is syncing + useEffect(() => { + const activeSyncingSources = sources.filter(source => source.status === 'syncing'); + + if (activeSyncingSources.length > 0) { + setAutoRefreshing(true); + const interval = setInterval(() => { + loadSources(); + }, 5000); // Poll every 5 seconds during active sync + + return () => { + clearInterval(interval); + setAutoRefreshing(false); + }; + } else { + setAutoRefreshing(false); + } + }, [sources]); + // Update default folders when source type changes useEffect(() => { if (!editingSource) { // Only for new sources @@ -979,8 +999,9 @@ const SourcesPage: React.FC = () => { {/* OCR Controls for Admin Users */} diff --git a/frontend/src/services/api.ts b/frontend/src/services/api.ts index 7b01d76..b29990b 100644 --- a/frontend/src/services/api.ts +++ b/frontend/src/services/api.ts @@ -200,8 +200,8 @@ export const documentService = { }, getFailedOcrDocuments: (limit = 50, offset = 0) => { - return api.get(`/documents/failed-ocr`, { - params: { limit, offset }, + return api.get(`/documents/failed`, { + params: { stage: 'ocr', limit, offset }, }) }, @@ -253,6 +253,13 @@ export const documentService = { preview_only: previewOnly }) }, + + getFailedDocuments: (limit = 25, offset = 0, stage?: string, reason?: string) => { + const params: any = { limit, offset }; + if (stage) params.stage = stage; + if (reason) params.reason = reason; + return api.get('/documents/failed', { params }) + }, } export interface OcrStatusResponse { diff --git a/migrations/20250628000003_add_failed_documents_table.sql b/migrations/20250628000003_add_failed_documents_table.sql new file mode 100644 index 0000000..bcdad9a --- /dev/null +++ b/migrations/20250628000003_add_failed_documents_table.sql @@ -0,0 +1,103 @@ +-- Add table to track documents that failed at any stage of processing +-- This provides visibility into documents that failed during: ingestion, validation, OCR, etc. + +CREATE TABLE IF NOT EXISTS failed_documents ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + user_id UUID REFERENCES users(id) ON DELETE CASCADE, + filename TEXT NOT NULL, + original_filename TEXT, -- Original name when uploaded (if available) + original_path TEXT, -- Path where file was located + file_path TEXT, -- Stored file path (if file was saved before failure) + file_size BIGINT, + file_hash VARCHAR(64), + mime_type TEXT, + + -- Document content (if available before failure) + content TEXT, -- Raw content if extracted + tags TEXT[], -- Tags that were assigned/detected + + -- OCR-related fields (for OCR stage failures) + ocr_text TEXT, -- Partial OCR text if extracted before failure + ocr_confidence REAL, -- OCR confidence if calculated + ocr_word_count INTEGER, -- Word count if calculated + ocr_processing_time_ms INTEGER, -- Processing time before failure + + -- Failure information + failure_reason TEXT NOT NULL, + failure_stage TEXT NOT NULL, -- 'ingestion', 'validation', 'ocr', 'storage', etc. + existing_document_id UUID REFERENCES documents(id) ON DELETE SET NULL, + ingestion_source TEXT NOT NULL, -- 'batch', 'sync', 'webdav', 'upload', etc. + error_message TEXT, -- Detailed error information + + -- Retry information + retry_count INTEGER DEFAULT 0, + last_retry_at TIMESTAMPTZ, + + -- Timestamps + created_at TIMESTAMPTZ DEFAULT NOW(), + updated_at TIMESTAMPTZ DEFAULT NOW(), + + CONSTRAINT check_failure_reason CHECK (failure_reason IN ( + 'duplicate_content', + 'duplicate_filename', + 'unsupported_format', + 'file_too_large', + 'file_corrupted', + 'access_denied', + 'low_ocr_confidence', + 'ocr_timeout', + 'ocr_memory_limit', + 'pdf_parsing_error', + 'storage_quota_exceeded', + 'network_error', + 'permission_denied', + 'virus_detected', + 'invalid_structure', + 'policy_violation', + 'other' + )), + + CONSTRAINT check_failure_stage CHECK (failure_stage IN ( + 'ingestion', + 'validation', + 'ocr', + 'storage', + 'processing', + 'sync' + )) +); + +-- Indexes for efficient querying +CREATE INDEX IF NOT EXISTS idx_failed_documents_user_id ON failed_documents(user_id); +CREATE INDEX IF NOT EXISTS idx_failed_documents_created_at ON failed_documents(created_at DESC); +CREATE INDEX IF NOT EXISTS idx_failed_documents_failure_reason ON failed_documents(failure_reason); +CREATE INDEX IF NOT EXISTS idx_failed_documents_failure_stage ON failed_documents(failure_stage); +CREATE INDEX IF NOT EXISTS idx_failed_documents_ingestion_source ON failed_documents(ingestion_source); +CREATE INDEX IF NOT EXISTS idx_failed_documents_file_hash ON failed_documents(file_hash) WHERE file_hash IS NOT NULL; + +-- Add comments for documentation +COMMENT ON TABLE failed_documents IS 'Tracks documents that failed at any stage of processing (ingestion, validation, OCR, etc.)'; +COMMENT ON COLUMN failed_documents.failure_reason IS 'Specific reason why the document failed'; +COMMENT ON COLUMN failed_documents.failure_stage IS 'Stage at which the document failed (ingestion, validation, ocr, etc.)'; +COMMENT ON COLUMN failed_documents.existing_document_id IS 'Reference to existing document if failed due to duplicate content'; +COMMENT ON COLUMN failed_documents.ingestion_source IS 'Source of the ingestion attempt (batch, sync, webdav, upload, etc.)'; +COMMENT ON COLUMN failed_documents.error_message IS 'Detailed error message for troubleshooting'; + +-- Create a view for failed documents summary by reason and stage +CREATE OR REPLACE VIEW failed_documents_summary AS +SELECT + failure_reason, + failure_stage, + ingestion_source, + COUNT(*) as document_count, + SUM(file_size) as total_size, + AVG(file_size) as avg_size, + MIN(created_at) as first_occurrence, + MAX(created_at) as last_occurrence +FROM failed_documents +GROUP BY failure_reason, failure_stage, ingestion_source +ORDER BY document_count DESC; + +-- Grant appropriate permissions +-- GRANT SELECT, INSERT ON failed_documents TO readur_user; +-- GRANT SELECT ON failed_documents_summary TO readur_user; \ No newline at end of file diff --git a/migrations/20250628000004_migrate_failed_ocr_to_failed_documents.sql b/migrations/20250628000004_migrate_failed_ocr_to_failed_documents.sql new file mode 100644 index 0000000..9f96878 --- /dev/null +++ b/migrations/20250628000004_migrate_failed_ocr_to_failed_documents.sql @@ -0,0 +1,123 @@ +-- Migration to move existing failed OCR documents from documents table to failed_documents table +-- This consolidates all failure tracking into a single table + +-- First, ensure the failed_documents table exists +-- (This migration depends on 20250628000003_add_failed_documents_table.sql) + +-- Move failed OCR documents to failed_documents table +INSERT INTO failed_documents ( + user_id, + filename, + original_filename, + file_path, + file_size, + file_hash, + mime_type, + content, + tags, + ocr_text, + ocr_confidence, + ocr_word_count, + ocr_processing_time_ms, + failure_reason, + failure_stage, + ingestion_source, + error_message, + retry_count, + created_at, + updated_at +) +SELECT + d.user_id, + d.filename, + d.original_filename, + d.file_path, + d.file_size, + d.file_hash, + d.mime_type, + d.content, + d.tags, + d.ocr_text, + d.ocr_confidence, + d.ocr_word_count, + d.ocr_processing_time_ms, + COALESCE(d.ocr_failure_reason, 'other') as failure_reason, + 'ocr' as failure_stage, + 'migration' as ingestion_source, -- Mark these as migrated from existing system + d.ocr_error as error_message, + COALESCE(q.retry_count, 0) as retry_count, + d.created_at, + d.updated_at +FROM documents d +LEFT JOIN ( + SELECT document_id, COUNT(*) as retry_count + FROM ocr_queue + WHERE status IN ('failed', 'completed') + GROUP BY document_id +) q ON d.id = q.document_id +WHERE d.ocr_status = 'failed'; + +-- Log the migration for audit purposes +INSERT INTO failed_documents ( + user_id, + filename, + original_filename, + failure_reason, + failure_stage, + ingestion_source, + error_message, + created_at, + updated_at +) VALUES ( + '00000000-0000-0000-0000-000000000000'::uuid, -- System user ID + 'migration_log', + 'Failed OCR Migration Log', + 'migration_completed', + 'migration', + 'system', + 'Migrated ' || (SELECT COUNT(*) FROM documents WHERE ocr_status = 'failed') || ' failed OCR documents to failed_documents table', + NOW(), + NOW() +); + +-- Remove failed OCR documents from documents table +-- Note: This uses CASCADE to also clean up related records in ocr_queue table +DELETE FROM documents WHERE ocr_status = 'failed'; + +-- Update statistics and constraints +ANALYZE documents; +ANALYZE failed_documents; + +-- Add comment documenting the migration +COMMENT ON TABLE failed_documents IS 'Tracks all documents that failed at any stage of processing. Consolidated from documents table (OCR failures) and new ingestion failures as of migration 20250628000004.'; + +-- Create indexes for efficient querying of migrated data +CREATE INDEX IF NOT EXISTS idx_failed_documents_failure_stage_reason ON failed_documents(failure_stage, failure_reason); +CREATE INDEX IF NOT EXISTS idx_failed_documents_ocr_confidence ON failed_documents(ocr_confidence) WHERE ocr_confidence IS NOT NULL; + +-- Optional: Create a view for backward compatibility during transition +CREATE OR REPLACE VIEW legacy_failed_ocr_documents AS +SELECT + id, + user_id, + filename, + original_filename, + file_path, + file_size, + mime_type, + tags, + ocr_text, + ocr_confidence, + ocr_word_count, + ocr_processing_time_ms, + failure_reason as ocr_failure_reason, + error_message as ocr_error, + 'failed' as ocr_status, + retry_count, + created_at, + updated_at +FROM failed_documents +WHERE failure_stage = 'ocr'; + +-- Grant appropriate permissions +-- GRANT SELECT ON legacy_failed_ocr_documents TO readur_user; \ No newline at end of file diff --git a/src/models.rs b/src/models.rs index ab9b421..8806142 100644 --- a/src/models.rs +++ b/src/models.rs @@ -135,6 +135,149 @@ pub struct Document { pub file_hash: Option, } +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, ToSchema)] +pub enum FailureReason { + #[serde(rename = "duplicate_content")] + DuplicateContent, + #[serde(rename = "duplicate_filename")] + DuplicateFilename, + #[serde(rename = "unsupported_format")] + UnsupportedFormat, + #[serde(rename = "file_too_large")] + FileTooLarge, + #[serde(rename = "file_corrupted")] + FileCorrupted, + #[serde(rename = "access_denied")] + AccessDenied, + #[serde(rename = "low_ocr_confidence")] + LowOcrConfidence, + #[serde(rename = "ocr_timeout")] + OcrTimeout, + #[serde(rename = "ocr_memory_limit")] + OcrMemoryLimit, + #[serde(rename = "pdf_parsing_error")] + PdfParsingError, + #[serde(rename = "storage_quota_exceeded")] + StorageQuotaExceeded, + #[serde(rename = "network_error")] + NetworkError, + #[serde(rename = "permission_denied")] + PermissionDenied, + #[serde(rename = "virus_detected")] + VirusDetected, + #[serde(rename = "invalid_structure")] + InvalidStructure, + #[serde(rename = "policy_violation")] + PolicyViolation, + #[serde(rename = "other")] + Other, +} + +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, ToSchema)] +pub enum FailureStage { + #[serde(rename = "ingestion")] + Ingestion, + #[serde(rename = "validation")] + Validation, + #[serde(rename = "ocr")] + Ocr, + #[serde(rename = "storage")] + Storage, + #[serde(rename = "processing")] + Processing, + #[serde(rename = "sync")] + Sync, +} + +impl std::fmt::Display for FailureReason { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + FailureReason::DuplicateContent => write!(f, "duplicate_content"), + FailureReason::DuplicateFilename => write!(f, "duplicate_filename"), + FailureReason::UnsupportedFormat => write!(f, "unsupported_format"), + FailureReason::FileTooLarge => write!(f, "file_too_large"), + FailureReason::FileCorrupted => write!(f, "file_corrupted"), + FailureReason::AccessDenied => write!(f, "access_denied"), + FailureReason::LowOcrConfidence => write!(f, "low_ocr_confidence"), + FailureReason::OcrTimeout => write!(f, "ocr_timeout"), + FailureReason::OcrMemoryLimit => write!(f, "ocr_memory_limit"), + FailureReason::PdfParsingError => write!(f, "pdf_parsing_error"), + FailureReason::StorageQuotaExceeded => write!(f, "storage_quota_exceeded"), + FailureReason::NetworkError => write!(f, "network_error"), + FailureReason::PermissionDenied => write!(f, "permission_denied"), + FailureReason::VirusDetected => write!(f, "virus_detected"), + FailureReason::InvalidStructure => write!(f, "invalid_structure"), + FailureReason::PolicyViolation => write!(f, "policy_violation"), + FailureReason::Other => write!(f, "other"), + } + } +} + +impl std::fmt::Display for FailureStage { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + FailureStage::Ingestion => write!(f, "ingestion"), + FailureStage::Validation => write!(f, "validation"), + FailureStage::Ocr => write!(f, "ocr"), + FailureStage::Storage => write!(f, "storage"), + FailureStage::Processing => write!(f, "processing"), + FailureStage::Sync => write!(f, "sync"), + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, FromRow, ToSchema)] +pub struct FailedDocument { + /// Unique identifier for the failed document record + pub id: Uuid, + /// User who attempted to ingest the document + pub user_id: Uuid, + /// Filename of the failed document + pub filename: String, + /// Original filename when uploaded + pub original_filename: Option, + /// Original path where the file was located + pub original_path: Option, + /// Stored file path (if file was saved before failure) + pub file_path: Option, + /// Size of the file in bytes + pub file_size: Option, + /// SHA256 hash of the file content + pub file_hash: Option, + /// MIME type of the file + pub mime_type: Option, + /// Raw content if extracted before failure + pub content: Option, + /// Tags that were assigned/detected + pub tags: Vec, + /// Partial OCR text if extracted before failure + pub ocr_text: Option, + /// OCR confidence if calculated + pub ocr_confidence: Option, + /// Word count if calculated + pub ocr_word_count: Option, + /// Processing time before failure in milliseconds + pub ocr_processing_time_ms: Option, + /// Reason why the document failed + pub failure_reason: String, + /// Stage at which the document failed + pub failure_stage: String, + /// Reference to existing document if failed due to duplicate + pub existing_document_id: Option, + /// Source of the ingestion attempt + pub ingestion_source: String, + /// Detailed error message + pub error_message: Option, + /// Number of retry attempts + pub retry_count: Option, + /// Last retry timestamp + pub last_retry_at: Option>, + /// When the document failed + pub created_at: DateTime, + /// Last update timestamp + pub updated_at: DateTime, +} + #[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] pub struct DocumentResponse { /// Unique identifier for the document diff --git a/src/ocr/queue.rs b/src/ocr/queue.rs index 9221e88..31d5c91 100644 --- a/src/ocr/queue.rs +++ b/src/ocr/queue.rs @@ -322,11 +322,12 @@ impl OcrQueueService { warn!("⚠️ OCR quality issues for '{}' | Job: {} | Document: {} | {:.1}% confidence | {} words", filename, item.id, item.document_id, ocr_result.confidence, ocr_result.word_count); - // Mark as failed for quality issues + // Mark as failed for quality issues with proper failure reason sqlx::query( r#" UPDATE documents SET ocr_status = 'failed', + ocr_failure_reason = 'low_ocr_confidence', ocr_error = $2, updated_at = NOW() WHERE id = $1 diff --git a/src/routes/documents.rs b/src/routes/documents.rs index 048899b..5082b1d 100644 --- a/src/routes/documents.rs +++ b/src/routes/documents.rs @@ -26,6 +26,14 @@ struct PaginationQuery { ocr_status: Option, } +#[derive(Deserialize, ToSchema)] +struct FailedDocumentsQuery { + limit: Option, + offset: Option, + stage: Option, // 'ocr', 'ingestion', 'validation', etc. + reason: Option, // 'duplicate_content', 'low_ocr_confidence', etc. +} + #[derive(Deserialize, Serialize, ToSchema)] pub struct BulkDeleteRequest { pub document_ids: Vec, @@ -50,8 +58,8 @@ pub fn router() -> Router> { .route("/{id}/ocr", get(get_document_ocr)) .route("/{id}/processed-image", get(get_processed_image)) .route("/{id}/retry-ocr", post(retry_ocr)) - .route("/failed-ocr", get(get_failed_ocr_documents)) .route("/duplicates", get(get_user_duplicates)) + .route("/failed", get(get_failed_documents)) .route("/delete-low-confidence", post(delete_low_confidence_documents)) .route("/delete-failed-ocr", post(delete_failed_ocr_documents)) } @@ -757,6 +765,202 @@ async fn get_failed_ocr_documents( Ok(Json(response)) } +#[utoipa::path( + get, + path = "/api/documents/failed", + tag = "documents", + security( + ("bearer_auth" = []) + ), + params( + ("limit" = Option, Query, description = "Number of documents to return"), + ("offset" = Option, Query, description = "Number of documents to skip"), + ("stage" = Option, Query, description = "Filter by failure stage (ocr, ingestion, validation, etc.)"), + ("reason" = Option, Query, description = "Filter by failure reason") + ), + responses( + (status = 200, description = "List of failed documents", body = String), + (status = 401, description = "Unauthorized") + ) +)] +async fn get_failed_documents( + State(state): State>, + auth_user: AuthUser, + Query(params): Query, +) -> Result, StatusCode> { + let limit = params.limit.unwrap_or(25); + let offset = params.offset.unwrap_or(0); + + // Query the unified failed_documents table + let mut query_builder = sqlx::QueryBuilder::new( + r#" + SELECT id, filename, original_filename, file_path, file_size, mime_type, + content, tags, ocr_text, ocr_confidence, ocr_word_count, ocr_processing_time_ms, + failure_reason, failure_stage, error_message, existing_document_id, + ingestion_source, retry_count, last_retry_at, created_at, updated_at + FROM failed_documents + WHERE ($1::uuid IS NULL OR user_id = $1) + "# + ); + + let mut bind_count = 1; + + // Add stage filter if specified + if let Some(stage) = ¶ms.stage { + bind_count += 1; + query_builder.push(&format!(" AND failure_stage = ${}", bind_count)); + } + + // Add reason filter if specified + if let Some(reason) = ¶ms.reason { + bind_count += 1; + query_builder.push(&format!(" AND failure_reason = ${}", bind_count)); + } + + query_builder.push(" ORDER BY created_at DESC"); + query_builder.push(&format!(" LIMIT ${} OFFSET ${}", bind_count + 1, bind_count + 2)); + + let mut query = query_builder.build(); + + // Bind parameters in order + query = query.bind(if auth_user.user.role == crate::models::UserRole::Admin { + None + } else { + Some(auth_user.user.id) + }); + + if let Some(stage) = ¶ms.stage { + query = query.bind(stage); + } + + if let Some(reason) = ¶ms.reason { + query = query.bind(reason); + } + + query = query.bind(limit).bind(offset); + + let failed_docs = query + .fetch_all(state.db.get_pool()) + .await + .map_err(|e| { + tracing::error!("Failed to fetch failed documents: {}", e); + StatusCode::INTERNAL_SERVER_ERROR + })?; + + // Count total for pagination + let mut count_query_builder = sqlx::QueryBuilder::new( + "SELECT COUNT(*) FROM failed_documents WHERE ($1::uuid IS NULL OR user_id = $1)" + ); + + let mut count_bind_count = 1; + + if let Some(stage) = ¶ms.stage { + count_bind_count += 1; + count_query_builder.push(&format!(" AND failure_stage = ${}", count_bind_count)); + } + + if let Some(reason) = ¶ms.reason { + count_bind_count += 1; + count_query_builder.push(&format!(" AND failure_reason = ${}", count_bind_count)); + } + + let mut count_query = count_query_builder.build_query_scalar::(); + + count_query = count_query.bind(if auth_user.user.role == crate::models::UserRole::Admin { + None + } else { + Some(auth_user.user.id) + }); + + if let Some(stage) = ¶ms.stage { + count_query = count_query.bind(stage); + } + + if let Some(reason) = ¶ms.reason { + count_query = count_query.bind(reason); + } + + let total_count = count_query + .fetch_one(state.db.get_pool()) + .await + .unwrap_or(0); + + // Convert to JSON response format + let documents: Vec = failed_docs.iter().map(|row| { + serde_json::json!({ + "id": row.get::("id"), + "filename": row.get::("filename"), + "original_filename": row.get::, _>("original_filename"), + "file_path": row.get::, _>("file_path"), + "file_size": row.get::, _>("file_size"), + "mime_type": row.get::, _>("mime_type"), + "content": row.get::, _>("content"), + "tags": row.get::, _>("tags"), + "ocr_text": row.get::, _>("ocr_text"), + "ocr_confidence": row.get::, _>("ocr_confidence"), + "ocr_word_count": row.get::, _>("ocr_word_count"), + "ocr_processing_time_ms": row.get::, _>("ocr_processing_time_ms"), + "failure_reason": row.get::("failure_reason"), + "failure_stage": row.get::("failure_stage"), + "error_message": row.get::, _>("error_message"), + "existing_document_id": row.get::, _>("existing_document_id"), + "ingestion_source": row.get::("ingestion_source"), + "retry_count": row.get::, _>("retry_count"), + "last_retry_at": row.get::>, _>("last_retry_at"), + "created_at": row.get::, _>("created_at"), + "updated_at": row.get::, _>("updated_at"), + + // Computed fields for backward compatibility + "failure_category": categorize_failure_reason( + Some(&row.get::("failure_reason")), + row.get::, _>("error_message").as_deref() + ), + "source": match row.get::("failure_stage").as_str() { + "ocr" => "OCR Processing", + "ingestion" => "Document Ingestion", + "validation" => "Document Validation", + "storage" => "File Storage", + "processing" => "Document Processing", + "sync" => "Source Synchronization", + _ => "Unknown" + } + }) + }).collect(); + + // Calculate statistics for the response + let mut stage_stats = std::collections::HashMap::new(); + let mut reason_stats = std::collections::HashMap::new(); + + for doc in &documents { + let stage = doc["failure_stage"].as_str().unwrap_or("unknown"); + let reason = doc["failure_reason"].as_str().unwrap_or("unknown"); + + *stage_stats.entry(stage).or_insert(0) += 1; + *reason_stats.entry(reason).or_insert(0) += 1; + } + + let response = serde_json::json!({ + "documents": documents, + "pagination": { + "limit": limit, + "offset": offset, + "total": total_count, + "total_pages": (total_count as f64 / limit as f64).ceil() as i64 + }, + "statistics": { + "total_failed": total_count, + "by_stage": stage_stats, + "by_reason": reason_stats + }, + "filters": { + "stage": params.stage, + "reason": params.reason + } + }); + + Ok(Json(response)) +} + async fn calculate_estimated_wait_time(priority: i32) -> i64 { // Simple estimation based on priority - in a real implementation, // this would check actual queue depth and processing times @@ -775,6 +979,7 @@ fn categorize_failure_reason(failure_reason: Option<&str>, error_message: Option Some("processing_timeout") => "Timeout", Some("memory_limit") => "Memory Limit", Some("pdf_parsing_panic") => "PDF Parsing Error", + Some("low_ocr_confidence") => "Low OCR Confidence", Some("unknown") | None => { // Try to categorize based on error message if let Some(error) = error_message { @@ -787,6 +992,8 @@ fn categorize_failure_reason(failure_reason: Option<&str>, error_message: Option "PDF Font Issues" } else if error_lower.contains("corrupt") { "PDF Corruption" + } else if error_lower.contains("quality below threshold") || error_lower.contains("confidence") { + "Low OCR Confidence" } else { "Unknown Error" } @@ -1066,12 +1273,27 @@ pub async fn delete_low_confidence_documents( let matched_count = matched_documents.len(); if is_preview { + // Convert documents to response format with key details + let document_details: Vec = matched_documents.iter().map(|d| { + serde_json::json!({ + "id": d.id, + "filename": d.filename, + "original_filename": d.original_filename, + "file_size": d.file_size, + "ocr_confidence": d.ocr_confidence, + "ocr_status": d.ocr_status, + "created_at": d.created_at, + "mime_type": d.mime_type + }) + }).collect(); + return Ok(Json(serde_json::json!({ "success": true, "message": format!("Found {} documents with OCR confidence below {}%", matched_count, request.max_confidence), "matched_count": matched_count, "preview": true, - "document_ids": matched_documents.iter().map(|d| d.id).collect::>() + "document_ids": matched_documents.iter().map(|d| d.id).collect::>(), + "documents": document_details }))); }