feat(server): continue to try to wrangle the failed and ignored documents

This commit is contained in:
perf3ct 2025-06-29 23:27:51 +00:00
parent e8a7d7bf0f
commit fef28a33c6
7 changed files with 1142 additions and 268 deletions

View File

@ -0,0 +1,152 @@
import React, { useState, useEffect } from 'react';
import {
Box,
Typography,
CircularProgress,
Alert,
Paper,
} from '@mui/material';
import { api } from '../services/api';
interface FailedDocumentViewerProps {
failedDocumentId: string;
filename: string;
mimeType: string;
}
const FailedDocumentViewer: React.FC<FailedDocumentViewerProps> = ({
failedDocumentId,
filename,
mimeType,
}) => {
const [loading, setLoading] = useState<boolean>(true);
const [error, setError] = useState<string | null>(null);
const [documentUrl, setDocumentUrl] = useState<string | null>(null);
useEffect(() => {
loadFailedDocument();
// Cleanup URL when component unmounts
return () => {
if (documentUrl) {
window.URL.revokeObjectURL(documentUrl);
}
};
}, [failedDocumentId]);
const loadFailedDocument = async (): Promise<void> => {
try {
setLoading(true);
setError(null);
// Use the new failed document view endpoint
const response = await api.get(`/documents/failed/${failedDocumentId}/view`, {
responseType: 'blob'
});
const url = window.URL.createObjectURL(new Blob([response.data], { type: mimeType }));
setDocumentUrl(url);
} catch (err: any) {
console.error('Failed to load failed document:', err);
if (err.response?.status === 404) {
setError('Document file not found or has been deleted');
} else {
setError('Failed to load document for viewing');
}
} finally {
setLoading(false);
}
};
if (loading) {
return (
<Box sx={{
display: 'flex',
justifyContent: 'center',
alignItems: 'center',
minHeight: '200px'
}}>
<CircularProgress />
</Box>
);
}
if (error) {
return (
<Alert severity="error" sx={{ m: 2 }}>
<Typography variant="body2">{error}</Typography>
<Typography variant="caption" sx={{ mt: 1, display: 'block' }}>
The original file may have been deleted or moved from storage.
</Typography>
</Alert>
);
}
return (
<Paper elevation={2} sx={{
p: 2,
borderRadius: 2,
backgroundColor: 'background.paper',
minHeight: '300px'
}}>
{documentUrl && (
<>
{mimeType.startsWith('image/') ? (
<Box sx={{ textAlign: 'center' }}>
<img
src={documentUrl}
alt={filename}
style={{
maxWidth: '100%',
maxHeight: '400px',
objectFit: 'contain',
}}
/>
</Box>
) : mimeType === 'application/pdf' ? (
<iframe
src={documentUrl}
width="100%"
height="400px"
style={{ border: 'none', borderRadius: '4px' }}
title={filename}
/>
) : mimeType.startsWith('text/') ? (
<Box sx={{
fontFamily: 'monospace',
fontSize: '0.875rem',
whiteSpace: 'pre-wrap',
backgroundColor: 'grey.50',
p: 2,
borderRadius: 1,
maxHeight: '400px',
overflow: 'auto'
}}>
<iframe
src={documentUrl}
width="100%"
height="400px"
style={{ border: 'none' }}
title={filename}
/>
</Box>
) : (
<Box sx={{ textAlign: 'center', py: 4 }}>
<Typography variant="body1" color="text.secondary">
Cannot preview this file type ({mimeType})
</Typography>
<Typography variant="body2" color="text.secondary" sx={{ mt: 1 }}>
File: {filename}
</Typography>
<Typography variant="body2" color="text.secondary">
You can try downloading the file to view it locally.
</Typography>
</Box>
)}
</>
)}
</Paper>
);
};
export default FailedDocumentViewer;

File diff suppressed because it is too large Load Diff

View File

@ -147,7 +147,7 @@ describe('DocumentManagementPage - Low Confidence Deletion', () => {
// Wait for tab content to render
await waitFor(() => {
const thresholdInput = screen.getByLabelText(/Maximum Confidence Threshold/i);
const thresholdInput = screen.getByLabelText(/Confidence Threshold/i);
expect(thresholdInput).toBeInTheDocument();
});
});
@ -224,7 +224,7 @@ describe('DocumentManagementPage - Low Confidence Deletion', () => {
// const lowConfidenceTab = screen.getByText(/Low Confidence/i);
// fireEvent.click(lowConfidenceTab);
//
// const thresholdInput = screen.getByLabelText(/Maximum Confidence Threshold/i);
// const thresholdInput = screen.getByLabelText(/Confidence Threshold/i);
//
// // Test invalid values
// fireEvent.change(thresholdInput, { target: { value: '150' } });

View File

@ -1798,6 +1798,104 @@ impl Database {
.collect();
Ok(results)
}
/// Create a failed document record
pub async fn create_failed_document(
&self,
user_id: Uuid,
filename: String,
original_filename: Option<String>,
original_path: Option<String>,
file_path: Option<String>,
file_size: Option<i64>,
file_hash: Option<String>,
mime_type: Option<String>,
content: Option<String>,
tags: Vec<String>,
ocr_text: Option<String>,
ocr_confidence: Option<f32>,
ocr_word_count: Option<i32>,
ocr_processing_time_ms: Option<i32>,
failure_reason: String,
failure_stage: String,
existing_document_id: Option<Uuid>,
ingestion_source: String,
error_message: Option<String>,
retry_count: Option<i32>,
) -> Result<Uuid> {
let id = Uuid::new_v4();
sqlx::query(
r#"
INSERT INTO failed_documents (
id, user_id, filename, original_filename, original_path, file_path,
file_size, file_hash, mime_type, content, tags, ocr_text,
ocr_confidence, ocr_word_count, ocr_processing_time_ms,
failure_reason, failure_stage, existing_document_id,
ingestion_source, error_message, retry_count, created_at, updated_at
) VALUES (
$1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15,
$16, $17, $18, $19, $20, $21, NOW(), NOW()
)
"#
)
.bind(id)
.bind(user_id)
.bind(&filename)
.bind(&original_filename)
.bind(&original_path)
.bind(&file_path)
.bind(file_size)
.bind(&file_hash)
.bind(&mime_type)
.bind(&content)
.bind(&tags)
.bind(&ocr_text)
.bind(ocr_confidence)
.bind(ocr_word_count)
.bind(ocr_processing_time_ms)
.bind(&failure_reason)
.bind(&failure_stage)
.bind(existing_document_id)
.bind(&ingestion_source)
.bind(&error_message)
.bind(retry_count)
.execute(&self.pool)
.await?;
Ok(id)
}
/// Create a failed document from an existing document that failed OCR
pub async fn create_failed_document_from_document(
&self,
document: &Document,
failure_reason: String,
error_message: Option<String>,
retry_count: Option<i32>,
) -> Result<Uuid> {
self.create_failed_document(
document.user_id, // user_id is required in Document struct
document.filename.clone(),
Some(document.original_filename.clone()),
None, // original_path - not available in Document model
Some(document.file_path.clone()),
Some(document.file_size),
document.file_hash.clone(),
Some(document.mime_type.clone()),
document.content.clone(),
document.tags.clone(),
document.ocr_text.clone(),
document.ocr_confidence,
document.ocr_word_count,
document.ocr_processing_time_ms,
failure_reason,
"ocr".to_string(), // OCR failure stage
None, // existing_document_id
"unknown".to_string(), // Default ingestion source - would need to be passed in for better tracking
error_message,
retry_count,
).await
}
}

View File

@ -110,13 +110,42 @@ impl DocumentIngestionService {
}
// Save file to storage
let file_path = self.file_service
let file_path = match self.file_service
.save_file(&request.filename, &request.file_data)
.await
.map_err(|e| {
warn!("Failed to save file {}: {}", request.filename, e);
e
})?;
.await {
Ok(path) => path,
Err(e) => {
warn!("Failed to save file {}: {}", request.filename, e);
// Create failed document record for storage failure
if let Err(failed_err) = self.db.create_failed_document(
request.user_id,
request.filename.clone(),
Some(request.original_filename.clone()),
None, // original_path
None, // file_path (couldn't save)
Some(file_size),
Some(file_hash.clone()),
Some(request.mime_type.clone()),
None, // content
Vec::new(), // tags
None, // ocr_text
None, // ocr_confidence
None, // ocr_word_count
None, // ocr_processing_time_ms
"storage_error".to_string(),
"storage".to_string(),
None, // existing_document_id
request.source_type.unwrap_or_else(|| "upload".to_string()),
Some(e.to_string()),
None, // retry_count
).await {
warn!("Failed to create failed document record for storage error: {}", failed_err);
}
return Err(e.into());
}
};
// Create document record
let document = self.file_service.create_document(
@ -158,6 +187,33 @@ impl DocumentIngestionService {
} else {
warn!("Failed to create document record for {} (hash: {}): {}",
request.filename, &file_hash[..8], e);
// Create failed document record for database creation failure
if let Err(failed_err) = self.db.create_failed_document(
request.user_id,
request.filename.clone(),
Some(request.original_filename.clone()),
None, // original_path
Some(file_path.clone()), // file was saved successfully
Some(file_size),
Some(file_hash.clone()),
Some(request.mime_type.clone()),
None, // content
Vec::new(), // tags
None, // ocr_text
None, // ocr_confidence
None, // ocr_word_count
None, // ocr_processing_time_ms
"database_error".to_string(),
"ingestion".to_string(),
None, // existing_document_id
request.source_type.unwrap_or_else(|| "upload".to_string()),
Some(e.to_string()),
None, // retry_count
).await {
warn!("Failed to create failed document record for database error: {}", failed_err);
}
return Err(e.into());
}
}

View File

@ -322,6 +322,14 @@ impl OcrQueueService {
warn!("⚠️ OCR quality issues for '{}' | Job: {} | Document: {} | {:.1}% confidence | {} words",
filename, item.id, item.document_id, ocr_result.confidence, ocr_result.word_count);
// Create failed document record using helper function
let _ = self.create_failed_document_from_ocr_error(
item.document_id,
"low_ocr_confidence",
&error_msg,
item.attempts,
).await;
// Mark as failed for quality issues with proper failure reason
sqlx::query(
r#"
@ -360,12 +368,30 @@ impl OcrQueueService {
Ok(false) => {
let error_msg = "OCR update failed validation (document may have been modified)";
warn!("{} for document {}", error_msg, item.document_id);
// Create failed document record using helper function
let _ = self.create_failed_document_from_ocr_error(
item.document_id,
"processing",
error_msg,
item.attempts,
).await;
self.mark_failed(item.id, error_msg).await?;
return Ok(());
}
Err(e) => {
let error_msg = format!("Transaction-safe OCR update failed: {}", e);
error!("{}", error_msg);
// Create failed document record using helper function
let _ = self.create_failed_document_from_ocr_error(
item.document_id,
"processing",
&error_msg,
item.attempts,
).await;
self.mark_failed(item.id, &error_msg).await?;
return Ok(());
}
@ -411,21 +437,7 @@ impl OcrQueueService {
let error_str = e.to_string();
// Classify error type and determine failure reason
let (failure_reason, should_suppress) = if error_str.contains("font encoding") ||
error_str.contains("missing unicode map") {
("pdf_font_encoding", true)
} else if error_str.contains("corrupted internal structure") ||
error_str.contains("corrupted") {
("pdf_corruption", true)
} else if error_str.contains("timeout") || error_str.contains("timed out") {
("processing_timeout", false)
} else if error_str.contains("memory") || error_str.contains("out of memory") {
("memory_limit", false)
} else if error_str.contains("panic") {
("pdf_parsing_panic", true)
} else {
("unknown", false)
};
let (failure_reason, should_suppress) = Self::classify_ocr_error(&error_str);
// Use intelligent logging based on error type
if should_suppress {
@ -439,6 +451,14 @@ impl OcrQueueService {
filename, item.id, item.document_id, failure_reason, e);
}
// Create failed document record using helper function
let _ = self.create_failed_document_from_ocr_error(
item.document_id,
failure_reason,
&error_msg,
item.attempts,
).await;
// Always use 'failed' status with specific failure reason
sqlx::query(
r#"
@ -731,4 +751,88 @@ impl OcrQueueService {
Ok(result.rows_affected() as i64)
}
/// Helper function to create failed document record from OCR failure
async fn create_failed_document_from_ocr_error(
&self,
document_id: Uuid,
failure_reason: &str,
error_message: &str,
retry_count: i32,
) -> Result<()> {
// Query document directly from database without user restrictions (OCR service context)
let document_row = sqlx::query(
r#"
SELECT id, filename, original_filename, file_path, file_size, mime_type,
content, ocr_text, ocr_confidence, ocr_word_count, ocr_processing_time_ms,
ocr_status, ocr_error, ocr_completed_at, tags, created_at, updated_at,
user_id, file_hash
FROM documents
WHERE id = $1
"#
)
.bind(document_id)
.fetch_optional(&self.pool)
.await?;
if let Some(row) = document_row {
// Extract document data
let user_id: Uuid = row.get("user_id");
let filename: String = row.get("filename");
let original_filename: String = row.get("original_filename");
let file_path: String = row.get("file_path");
let file_size: i64 = row.get("file_size");
let mime_type: String = row.get("mime_type");
let file_hash: Option<String> = row.get("file_hash");
// Create failed document record directly
if let Err(e) = self.db.create_failed_document(
user_id,
filename,
Some(original_filename),
None, // original_path
Some(file_path),
Some(file_size),
file_hash,
Some(mime_type),
None, // content
Vec::new(), // tags
None, // ocr_text
None, // ocr_confidence
None, // ocr_word_count
None, // ocr_processing_time_ms
failure_reason.to_string(),
"ocr".to_string(),
None, // existing_document_id
"ocr_queue".to_string(),
Some(error_message.to_string()),
Some(retry_count),
).await {
error!("Failed to create failed document record: {}", e);
}
}
Ok(())
}
/// Helper function to map OCR error strings to standardized failure reasons
fn classify_ocr_error(error_str: &str) -> (&'static str, bool) {
if error_str.contains("font encoding") || error_str.contains("missing unicode map") {
("pdf_font_encoding", true)
} else if error_str.contains("corrupted internal structure") || error_str.contains("corrupted") {
("pdf_corruption", true)
} else if error_str.contains("timeout") || error_str.contains("timed out") {
("ocr_timeout", false)
} else if error_str.contains("memory") || error_str.contains("out of memory") {
("ocr_memory_limit", false)
} else if error_str.contains("panic") {
("pdf_parsing_error", true)
} else if error_str.contains("unsupported") {
("unsupported_format", false)
} else if error_str.contains("too large") || error_str.contains("file size") {
("file_too_large", false)
} else {
("other", false)
}
}
}

View File

@ -60,6 +60,7 @@ pub fn router() -> Router<Arc<AppState>> {
.route("/{id}/retry-ocr", post(retry_ocr))
.route("/duplicates", get(get_user_duplicates))
.route("/failed", get(get_failed_documents))
.route("/failed/{id}/view", get(view_failed_document))
.route("/delete-low-confidence", post(delete_low_confidence_documents))
.route("/delete-failed-ocr", post(delete_failed_ocr_documents))
}
@ -961,6 +962,77 @@ async fn get_failed_documents(
Ok(Json(response))
}
#[utoipa::path(
get,
path = "/api/documents/failed/{id}/view",
tag = "documents",
security(
("bearer_auth" = [])
),
params(
("id" = uuid::Uuid, Path, description = "Failed Document ID")
),
responses(
(status = 200, description = "Failed document content for viewing in browser"),
(status = 404, description = "Failed document not found or file deleted"),
(status = 401, description = "Unauthorized")
)
)]
async fn view_failed_document(
State(state): State<Arc<AppState>>,
auth_user: AuthUser,
Path(failed_document_id): Path<uuid::Uuid>,
) -> Result<Response, StatusCode> {
// Get failed document from database
let row = sqlx::query(
r#"
SELECT file_path, filename, mime_type, user_id
FROM failed_documents
WHERE id = $1 AND ($2::uuid IS NULL OR user_id = $2)
"#
)
.bind(failed_document_id)
.bind(if auth_user.user.role == crate::models::UserRole::Admin {
None
} else {
Some(auth_user.user.id)
})
.fetch_optional(&state.db.pool)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
.ok_or(StatusCode::NOT_FOUND)?;
let file_path: Option<String> = row.get("file_path");
let filename: String = row.get("filename");
let mime_type: Option<String> = row.get("mime_type");
// Check if file_path exists (some failed documents might not have been saved)
let file_path = file_path.ok_or(StatusCode::NOT_FOUND)?;
let file_service = FileService::new(state.config.upload_path.clone());
let file_data = file_service
.read_file(&file_path)
.await
.map_err(|_| StatusCode::NOT_FOUND)?; // File was deleted or moved
// Determine content type from mime_type or file extension
let content_type = mime_type
.unwrap_or_else(|| {
mime_guess::from_path(&filename)
.first_or_octet_stream()
.to_string()
});
let response = Response::builder()
.header(CONTENT_TYPE, content_type)
.header("Content-Length", file_data.len())
.header("Content-Disposition", format!("inline; filename=\"{}\"", filename))
.body(file_data.into())
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
Ok(response)
}
async fn calculate_estimated_wait_time(priority: i32) -> i64 {
// Simple estimation based on priority - in a real implementation,
// this would check actual queue depth and processing times