feat(dev): break up the large documents.rs file, again

This commit is contained in:
perf3ct 2025-07-03 23:33:53 +00:00
parent 6547130fb1
commit 220d6612a7
No known key found for this signature in database
GPG Key ID: 569C4EEC436F5232
10 changed files with 2218 additions and 2217 deletions

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,401 @@
use axum::{
extract::{Query, State},
http::StatusCode,
response::Json,
};
use std::sync::Arc;
use tracing::{debug, error, info, warn};
use crate::{
auth::AuthUser,
services::file_service::FileService,
AppState,
};
use super::types::{BulkDeleteRequest, DeleteLowConfidenceRequest, BulkDeleteResponse};
/// Bulk delete multiple documents
#[utoipa::path(
delete,
path = "/api/documents",
tag = "documents",
security(
("bearer_auth" = [])
),
request_body = BulkDeleteRequest,
responses(
(status = 200, description = "Bulk delete results", body = BulkDeleteResponse),
(status = 400, description = "Bad request"),
(status = 401, description = "Unauthorized"),
(status = 500, description = "Internal server error")
)
)]
pub async fn bulk_delete_documents(
State(state): State<Arc<AppState>>,
auth_user: AuthUser,
Json(request): Json<BulkDeleteRequest>,
) -> Result<Json<BulkDeleteResponse>, StatusCode> {
if request.document_ids.is_empty() {
return Err(StatusCode::BAD_REQUEST);
}
if request.document_ids.len() > 1000 {
return Err(StatusCode::BAD_REQUEST);
}
info!("Bulk deleting {} documents", request.document_ids.len());
// Get documents first to check access and collect file paths
let mut documents_to_delete = Vec::new();
let mut accessible_ids = Vec::new();
for document_id in &request.document_ids {
match state
.db
.get_document_by_id(*document_id, auth_user.user.id, auth_user.user.role)
.await
{
Ok(Some(document)) => {
documents_to_delete.push(document);
accessible_ids.push(*document_id);
}
Ok(None) => {
debug!("Document {} not found or access denied", document_id);
}
Err(e) => {
error!("Error checking document {}: {}", document_id, e);
}
}
}
// Perform bulk delete from database
let (deleted_ids, failed_ids) = state
.db
.bulk_delete_documents(&accessible_ids, auth_user.user.id, auth_user.user.role)
.await
.map_err(|e| {
error!("Database error during bulk delete: {}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
// Delete associated files
let file_service = FileService::new(state.config.clone());
let mut files_deleted = 0;
let mut files_failed = 0;
for document in documents_to_delete {
if deleted_ids.contains(&document.id) {
match file_service.delete_document_files(&document).await {
Ok(_) => files_deleted += 1,
Err(e) => {
warn!("Failed to delete files for document {}: {}", document.id, e);
files_failed += 1;
}
}
}
}
let response = BulkDeleteResponse {
deleted_count: deleted_ids.len() as i64,
failed_count: failed_ids.len() as i64,
deleted_documents: deleted_ids,
failed_documents: failed_ids,
total_files_deleted: files_deleted,
total_files_failed: files_failed,
};
info!("Bulk delete completed: {} deleted, {} failed",
response.deleted_count, response.failed_count);
Ok(Json(response))
}
/// Delete documents with low OCR confidence
#[utoipa::path(
post,
path = "/api/documents/delete-low-confidence",
tag = "documents",
security(
("bearer_auth" = [])
),
request_body = DeleteLowConfidenceRequest,
responses(
(status = 200, description = "Low confidence delete results", body = BulkDeleteResponse),
(status = 400, description = "Bad request"),
(status = 401, description = "Unauthorized"),
(status = 500, description = "Internal server error")
)
)]
pub async fn delete_low_confidence_documents(
State(state): State<Arc<AppState>>,
auth_user: AuthUser,
Json(request): Json<DeleteLowConfidenceRequest>,
) -> Result<Json<serde_json::Value>, StatusCode> {
if request.max_confidence < 0.0 || request.max_confidence > 100.0 {
return Err(StatusCode::BAD_REQUEST);
}
let preview_only = request.preview_only.unwrap_or(false);
info!("Finding documents with OCR confidence <= {}", request.max_confidence);
// Find documents with low confidence
let low_confidence_docs = state
.db
.find_documents_by_confidence_threshold(
auth_user.user.id,
auth_user.user.role,
request.max_confidence,
1000, // Limit to prevent excessive operations
0,
)
.await
.map_err(|e| {
error!("Database error finding low confidence documents: {}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
if preview_only {
let preview_docs: Vec<_> = low_confidence_docs
.iter()
.take(10) // Show max 10 in preview
.map(|doc| serde_json::json!({
"id": doc.id,
"filename": doc.original_filename,
"ocr_confidence": doc.ocr_confidence,
"created_at": doc.created_at
}))
.collect();
return Ok(Json(serde_json::json!({
"preview": true,
"total_found": low_confidence_docs.len(),
"documents": preview_docs,
"message": format!("Found {} documents with OCR confidence <= {}",
low_confidence_docs.len(), request.max_confidence)
})));
}
// Perform actual deletion
let document_ids: Vec<uuid::Uuid> = low_confidence_docs.iter().map(|d| d.id).collect();
if document_ids.is_empty() {
return Ok(Json(serde_json::json!({
"deleted_count": 0,
"failed_count": 0,
"message": "No documents found with the specified confidence threshold"
})));
}
let (deleted_ids, failed_ids) = state
.db
.bulk_delete_documents(&document_ids, auth_user.user.id, auth_user.user.role)
.await
.map_err(|e| {
error!("Database error during low confidence bulk delete: {}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
// Delete associated files
let file_service = FileService::new(state.config.clone());
let mut files_deleted = 0;
let mut files_failed = 0;
for document in low_confidence_docs {
if deleted_ids.contains(&document.id) {
match file_service.delete_document_files(&document).await {
Ok(_) => files_deleted += 1,
Err(e) => {
warn!("Failed to delete files for document {}: {}", document.id, e);
files_failed += 1;
}
}
}
}
info!("Low confidence delete completed: {} deleted, {} failed",
deleted_ids.len(), failed_ids.len());
Ok(Json(serde_json::json!({
"deleted_count": deleted_ids.len(),
"failed_count": failed_ids.len(),
"files_deleted": files_deleted,
"files_failed": files_failed,
"deleted_documents": deleted_ids,
"failed_documents": failed_ids,
"message": format!("Deleted {} documents with OCR confidence <= {}",
deleted_ids.len(), request.max_confidence)
})))
}
/// Delete documents with failed OCR
#[utoipa::path(
post,
path = "/api/documents/delete-failed-ocr",
tag = "documents",
security(
("bearer_auth" = [])
),
responses(
(status = 200, description = "Failed OCR delete results"),
(status = 401, description = "Unauthorized"),
(status = 500, description = "Internal server error")
)
)]
pub async fn delete_failed_ocr_documents(
State(state): State<Arc<AppState>>,
auth_user: AuthUser,
) -> Result<Json<serde_json::Value>, StatusCode> {
info!("Finding documents with failed OCR");
// Find documents with failed OCR
let failed_ocr_docs = state
.db
.find_failed_ocr_documents(
auth_user.user.id,
auth_user.user.role,
1000, // Limit to prevent excessive operations
0,
)
.await
.map_err(|e| {
error!("Database error finding failed OCR documents: {}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
if failed_ocr_docs.is_empty() {
return Ok(Json(serde_json::json!({
"deleted_count": 0,
"message": "No documents found with failed OCR status"
})));
}
// Perform deletion
let document_ids: Vec<uuid::Uuid> = failed_ocr_docs.iter().map(|d| d.id).collect();
let (deleted_ids, failed_ids) = state
.db
.bulk_delete_documents(&document_ids, auth_user.user.id, auth_user.user.role)
.await
.map_err(|e| {
error!("Database error during failed OCR bulk delete: {}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
// Delete associated files
let file_service = FileService::new(state.config.clone());
let mut files_deleted = 0;
let mut files_failed = 0;
for document in failed_ocr_docs {
if deleted_ids.contains(&document.id) {
match file_service.delete_document_files(&document).await {
Ok(_) => files_deleted += 1,
Err(e) => {
warn!("Failed to delete files for document {}: {}", document.id, e);
files_failed += 1;
}
}
}
}
info!("Failed OCR delete completed: {} deleted, {} failed",
deleted_ids.len(), failed_ids.len());
Ok(Json(serde_json::json!({
"deleted_count": deleted_ids.len(),
"failed_count": failed_ids.len(),
"files_deleted": files_deleted,
"files_failed": files_failed,
"deleted_documents": deleted_ids,
"failed_documents": failed_ids,
"message": format!("Deleted {} documents with failed OCR", deleted_ids.len())
})))
}
/// Get documents marked for deletion (cleanup preview)
pub async fn get_cleanup_preview(
State(state): State<Arc<AppState>>,
auth_user: AuthUser,
Query(params): Query<std::collections::HashMap<String, String>>,
) -> Result<Json<serde_json::Value>, StatusCode> {
let max_confidence = params
.get("max_confidence")
.and_then(|s| s.parse::<f32>().ok())
.unwrap_or(30.0);
let include_failed = params
.get("include_failed")
.and_then(|s| s.parse::<bool>().ok())
.unwrap_or(true);
let mut cleanup_candidates = Vec::new();
let mut total_size = 0i64;
// Get low confidence documents
let low_confidence_docs = state
.db
.find_documents_by_confidence_threshold(
auth_user.user.id,
auth_user.user.role,
max_confidence,
100,
0,
)
.await
.map_err(|e| {
error!("Database error finding low confidence documents: {}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
for doc in low_confidence_docs {
total_size += doc.file_size;
cleanup_candidates.push(serde_json::json!({
"id": doc.id,
"filename": doc.original_filename,
"file_size": doc.file_size,
"ocr_confidence": doc.ocr_confidence,
"reason": "low_confidence",
"created_at": doc.created_at
}));
}
// Get failed OCR documents if requested
if include_failed {
let failed_docs = state
.db
.find_failed_ocr_documents(
auth_user.user.id,
auth_user.user.role,
100,
0,
)
.await
.map_err(|e| {
error!("Database error finding failed OCR documents: {}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
for doc in failed_docs {
total_size += doc.file_size;
cleanup_candidates.push(serde_json::json!({
"id": doc.id,
"filename": doc.original_filename,
"file_size": doc.file_size,
"ocr_status": doc.ocr_status,
"reason": "failed_ocr",
"created_at": doc.created_at
}));
}
}
Ok(Json(serde_json::json!({
"total_candidates": cleanup_candidates.len(),
"total_size_bytes": total_size,
"total_size_mb": (total_size as f64 / 1024.0 / 1024.0).round(),
"candidates": cleanup_candidates,
"criteria": {
"max_confidence": max_confidence,
"include_failed_ocr": include_failed
}
})))
}

View File

@ -0,0 +1,477 @@
use axum::{
extract::{Multipart, Path, Query, State},
http::{StatusCode, header::CONTENT_TYPE},
response::{Json, Response},
body::Body,
};
use std::sync::Arc;
use tracing::{debug, error, info, warn};
use crate::{
auth::AuthUser,
ingestion::document_ingestion::{DocumentIngestionService, IngestionResult},
services::file_service::FileService,
models::DocumentResponse,
AppState,
};
use super::types::{PaginationQuery, DocumentUploadResponse};
/// Upload a new document
#[utoipa::path(
post,
path = "/api/documents",
tag = "documents",
security(
("bearer_auth" = [])
),
request_body(content = String, description = "Document file", content_type = "multipart/form-data"),
responses(
(status = 200, description = "Document uploaded successfully", body = DocumentUploadResponse),
(status = 400, description = "Bad request"),
(status = 401, description = "Unauthorized"),
(status = 413, description = "File too large"),
(status = 500, description = "Internal server error")
)
)]
pub async fn upload_document(
State(state): State<Arc<AppState>>,
auth_user: AuthUser,
mut multipart: Multipart,
) -> Result<Json<DocumentUploadResponse>, StatusCode> {
let mut uploaded_file = None;
while let Some(field) = multipart.next_field().await.map_err(|e| {
error!("Failed to get multipart field: {}", e);
StatusCode::BAD_REQUEST
})? {
if field.name() == Some("file") {
let filename = field.file_name()
.ok_or_else(|| {
error!("No filename provided in upload");
StatusCode::BAD_REQUEST
})?
.to_string();
let content_type = field.content_type()
.unwrap_or("application/octet-stream")
.to_string();
let data = field.bytes().await.map_err(|e| {
error!("Failed to read file data: {}", e);
StatusCode::BAD_REQUEST
})?;
uploaded_file = Some((filename, content_type, data.to_vec()));
break;
}
}
let (filename, content_type, data) = uploaded_file.ok_or_else(|| {
error!("No file found in upload");
StatusCode::BAD_REQUEST
})?;
info!("Uploading document: {} ({} bytes)", filename, data.len());
// Create ingestion service
let file_service = FileService::new(state.config.clone());
let ingestion_service = DocumentIngestionService::new(
state.db.clone(),
file_service,
state.config.clone(),
);
match ingestion_service.ingest_document(
data,
&filename,
&content_type,
auth_user.user.id,
"web_upload".to_string(),
).await {
Ok(IngestionResult::Success(document)) => {
info!("Document uploaded successfully: {}", document.id);
Ok(Json(DocumentUploadResponse {
document_id: document.id,
filename: document.filename,
file_size: document.file_size,
mime_type: document.mime_type,
status: "success".to_string(),
message: "Document uploaded successfully".to_string(),
}))
}
Ok(IngestionResult::Duplicate(existing_doc)) => {
warn!("Duplicate document upload attempted: {}", existing_doc.id);
Ok(Json(DocumentUploadResponse {
document_id: existing_doc.id,
filename: existing_doc.filename,
file_size: existing_doc.file_size,
mime_type: existing_doc.mime_type,
status: "duplicate".to_string(),
message: "Document already exists".to_string(),
}))
}
Ok(IngestionResult::Failed(failed_doc)) => {
error!("Document ingestion failed: {}", failed_doc.error_message.as_deref().unwrap_or("Unknown error"));
Err(StatusCode::UNPROCESSABLE_ENTITY)
}
Err(e) => {
error!("Failed to ingest document: {}", e);
Err(StatusCode::INTERNAL_SERVER_ERROR)
}
}
}
/// Get a specific document by ID
#[utoipa::path(
get,
path = "/api/documents/{id}",
tag = "documents",
security(
("bearer_auth" = [])
),
params(
("id" = uuid::Uuid, Path, description = "Document ID")
),
responses(
(status = 200, description = "Document details", body = DocumentResponse),
(status = 404, description = "Document not found"),
(status = 401, description = "Unauthorized")
)
)]
pub async fn get_document_by_id(
State(state): State<Arc<AppState>>,
auth_user: AuthUser,
Path(document_id): Path<uuid::Uuid>,
) -> Result<Json<DocumentResponse>, StatusCode> {
let document = state
.db
.get_document_by_id(document_id, auth_user.user.id, auth_user.user.role)
.await
.map_err(|e| {
error!("Database error getting document {}: {}", document_id, e);
StatusCode::INTERNAL_SERVER_ERROR
})?
.ok_or(StatusCode::NOT_FOUND)?;
// Get labels for this document
let labels = state
.db
.get_document_labels(document_id)
.await
.map_err(|e| {
error!("Failed to get labels for document {}: {}", document_id, e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
let mut response = DocumentResponse::from(document);
response.labels = labels;
Ok(Json(response))
}
/// List documents with pagination and filtering
#[utoipa::path(
get,
path = "/api/documents",
tag = "documents",
security(
("bearer_auth" = [])
),
params(PaginationQuery),
responses(
(status = 200, description = "List of documents", body = Vec<DocumentResponse>),
(status = 401, description = "Unauthorized"),
(status = 500, description = "Internal server error")
)
)]
pub async fn list_documents(
State(state): State<Arc<AppState>>,
auth_user: AuthUser,
Query(query): Query<PaginationQuery>,
) -> Result<Json<Vec<DocumentResponse>>, StatusCode> {
let limit = query.limit.unwrap_or(25);
let offset = query.offset.unwrap_or(0);
let documents = if let Some(ocr_status) = query.ocr_status.as_deref() {
state
.db
.get_documents_by_user_with_role_and_filter(
auth_user.user.id,
auth_user.user.role,
Some(ocr_status),
limit,
offset,
)
.await
} else {
state
.db
.get_documents_by_user_with_role(
auth_user.user.id,
auth_user.user.role,
limit,
offset,
)
.await
}
.map_err(|e| {
error!("Database error listing documents: {}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
// Get document IDs for batch label fetching
let document_ids: Vec<uuid::Uuid> = documents.iter().map(|d| d.id).collect();
// Get labels for all documents in batch
let labels_map = if !document_ids.is_empty() {
let labels = state
.db
.get_labels_for_documents(&document_ids)
.await
.map_err(|e| {
error!("Failed to get labels for documents: {}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
labels.into_iter().collect::<std::collections::HashMap<_, _>>()
} else {
std::collections::HashMap::new()
};
// Convert to response format with labels
let responses: Vec<DocumentResponse> = documents
.into_iter()
.map(|doc| {
let mut response = DocumentResponse::from(doc.clone());
if let Some(labels) = labels_map.get(&doc.id) {
response.labels = labels.clone();
}
response
})
.collect();
Ok(Json(responses))
}
/// Delete a specific document
#[utoipa::path(
delete,
path = "/api/documents/{id}",
tag = "documents",
security(
("bearer_auth" = [])
),
params(
("id" = uuid::Uuid, Path, description = "Document ID")
),
responses(
(status = 204, description = "Document deleted successfully"),
(status = 404, description = "Document not found"),
(status = 401, description = "Unauthorized"),
(status = 500, description = "Internal server error")
)
)]
pub async fn delete_document(
State(state): State<Arc<AppState>>,
auth_user: AuthUser,
Path(document_id): Path<uuid::Uuid>,
) -> Result<StatusCode, StatusCode> {
// Get document first to check if it exists and user has access
let document = state
.db
.get_document_by_id(document_id, auth_user.user.id, auth_user.user.role)
.await
.map_err(|e| {
error!("Database error getting document {}: {}", document_id, e);
StatusCode::INTERNAL_SERVER_ERROR
})?
.ok_or(StatusCode::NOT_FOUND)?;
// Delete from database
let deleted = state
.db
.delete_document(document_id, auth_user.user.id, auth_user.user.role)
.await
.map_err(|e| {
error!("Database error deleting document {}: {}", document_id, e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
if !deleted {
return Err(StatusCode::NOT_FOUND);
}
// Delete associated files
let file_service = FileService::new(state.config.clone());
if let Err(e) = file_service.delete_document_files(&document).await {
warn!("Failed to delete files for document {}: {}", document_id, e);
// Continue anyway - database deletion succeeded
}
info!("Document deleted successfully: {}", document_id);
Ok(StatusCode::NO_CONTENT)
}
/// Download a document file
#[utoipa::path(
get,
path = "/api/documents/{id}/download",
tag = "documents",
security(
("bearer_auth" = [])
),
params(
("id" = uuid::Uuid, Path, description = "Document ID")
),
responses(
(status = 200, description = "Document file", content_type = "application/octet-stream"),
(status = 404, description = "Document not found"),
(status = 401, description = "Unauthorized"),
(status = 500, description = "Internal server error")
)
)]
pub async fn download_document(
State(state): State<Arc<AppState>>,
auth_user: AuthUser,
Path(document_id): Path<uuid::Uuid>,
) -> Result<Response<Body>, StatusCode> {
let document = state
.db
.get_document_by_id(document_id, auth_user.user.id, auth_user.user.role)
.await
.map_err(|e| {
error!("Database error getting document {}: {}", document_id, e);
StatusCode::INTERNAL_SERVER_ERROR
})?
.ok_or(StatusCode::NOT_FOUND)?;
let file_service = FileService::new(state.config.clone());
let file_data = file_service
.read_document_file(&document)
.await
.map_err(|e| {
error!("Failed to read document file {}: {}", document_id, e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
let response = Response::builder()
.status(StatusCode::OK)
.header(CONTENT_TYPE, document.mime_type)
.header("Content-Disposition", format!("attachment; filename=\"{}\"", document.original_filename))
.header("Content-Length", file_data.len().to_string())
.body(Body::from(file_data))
.map_err(|e| {
error!("Failed to build response: {}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
debug!("Document downloaded: {}", document_id);
Ok(response)
}
/// View a document in the browser
#[utoipa::path(
get,
path = "/api/documents/{id}/view",
tag = "documents",
security(
("bearer_auth" = [])
),
params(
("id" = uuid::Uuid, Path, description = "Document ID")
),
responses(
(status = 200, description = "Document file for viewing", content_type = "application/octet-stream"),
(status = 404, description = "Document not found"),
(status = 401, description = "Unauthorized"),
(status = 500, description = "Internal server error")
)
)]
pub async fn view_document(
State(state): State<Arc<AppState>>,
auth_user: AuthUser,
Path(document_id): Path<uuid::Uuid>,
) -> Result<Response<Body>, StatusCode> {
let document = state
.db
.get_document_by_id(document_id, auth_user.user.id, auth_user.user.role)
.await
.map_err(|e| {
error!("Database error getting document {}: {}", document_id, e);
StatusCode::INTERNAL_SERVER_ERROR
})?
.ok_or(StatusCode::NOT_FOUND)?;
let file_service = FileService::new(state.config.clone());
let file_data = file_service
.read_document_file(&document)
.await
.map_err(|e| {
error!("Failed to read document file {}: {}", document_id, e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
let response = Response::builder()
.status(StatusCode::OK)
.header(CONTENT_TYPE, document.mime_type)
.header("Content-Length", file_data.len().to_string())
.body(Body::from(file_data))
.map_err(|e| {
error!("Failed to build response: {}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
debug!("Document viewed: {}", document_id);
Ok(response)
}
/// Get user's duplicate documents
#[utoipa::path(
get,
path = "/api/documents/duplicates",
tag = "documents",
security(
("bearer_auth" = [])
),
params(
("limit" = Option<i64>, Query, description = "Number of duplicate groups to return per page"),
("offset" = Option<i64>, Query, description = "Number of duplicate groups to skip")
),
responses(
(status = 200, description = "User's duplicate documents grouped by hash", body = serde_json::Value),
(status = 401, description = "Unauthorized")
)
)]
pub async fn get_user_duplicates(
State(state): State<Arc<AppState>>,
auth_user: AuthUser,
Query(query): Query<PaginationQuery>,
) -> Result<Json<serde_json::Value>, StatusCode> {
let limit = query.limit.unwrap_or(25);
let offset = query.offset.unwrap_or(0);
let (duplicates, total_count) = state
.db
.get_user_duplicates(auth_user.user.id, auth_user.user.role, limit, offset)
.await
.map_err(|e| {
error!("Failed to get user duplicates: {}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
let response = serde_json::json!({
"duplicates": duplicates,
"pagination": {
"total": total_count,
"limit": limit,
"offset": offset,
"has_more": offset + limit < total_count
},
"statistics": {
"total_duplicate_groups": total_count
}
});
Ok(Json(response))
}

View File

@ -0,0 +1,358 @@
use axum::{
extract::{Path, State},
http::StatusCode,
response::Json,
};
use std::sync::Arc;
use tracing::{debug, error, info};
use crate::{
auth::AuthUser,
services::file_service::FileService,
AppState,
};
use super::types::DocumentDebugInfo;
/// Get comprehensive debug information for a document
#[utoipa::path(
get,
path = "/api/documents/{id}/debug",
tag = "documents",
security(
("bearer_auth" = [])
),
params(
("id" = uuid::Uuid, Path, description = "Document ID")
),
responses(
(status = 200, description = "Document debug information", body = DocumentDebugInfo),
(status = 404, description = "Document not found"),
(status = 401, description = "Unauthorized"),
(status = 500, description = "Internal server error")
)
)]
pub async fn get_document_debug_info(
State(state): State<Arc<AppState>>,
auth_user: AuthUser,
Path(document_id): Path<uuid::Uuid>,
) -> Result<Json<DocumentDebugInfo>, StatusCode> {
let document = state
.db
.get_document_by_id(document_id, auth_user.user.id, auth_user.user.role)
.await
.map_err(|e| {
error!("Database error getting document {}: {}", document_id, e);
StatusCode::INTERNAL_SERVER_ERROR
})?
.ok_or(StatusCode::NOT_FOUND)?;
let file_service = FileService::new(state.config.clone());
// Check file existence and readability
let file_exists = file_service.document_file_exists(&document).await.unwrap_or(false);
let readable = if file_exists {
file_service.read_document_file(&document).await.is_ok()
} else {
false
};
// Get file permissions (simplified)
let permissions = if file_exists {
Some("readable".to_string()) // This could be expanded with actual file permissions
} else {
None
};
// Construct processing steps based on document state
let mut processing_steps = vec!["uploaded".to_string()];
if document.content.is_some() {
processing_steps.push("content_extracted".to_string());
}
match document.ocr_status.as_deref() {
Some("pending") => processing_steps.push("ocr_queued".to_string()),
Some("processing") => processing_steps.push("ocr_in_progress".to_string()),
Some("completed") => processing_steps.push("ocr_completed".to_string()),
Some("failed") => processing_steps.push("ocr_failed".to_string()),
_ => {}
}
if document.ocr_text.is_some() {
processing_steps.push("ocr_text_available".to_string());
}
let debug_info = DocumentDebugInfo {
document_id: document.id,
filename: document.original_filename,
file_path: document.file_path,
file_size: document.file_size,
mime_type: document.mime_type,
created_at: document.created_at,
ocr_status: document.ocr_status,
ocr_confidence: document.ocr_confidence,
ocr_word_count: document.ocr_word_count,
processing_steps,
file_exists,
readable,
permissions,
};
debug!("Debug info generated for document: {}", document_id);
Ok(Json(debug_info))
}
/// Get thumbnail for a document (if available)
#[utoipa::path(
get,
path = "/api/documents/{id}/thumbnail",
tag = "documents",
security(
("bearer_auth" = [])
),
params(
("id" = uuid::Uuid, Path, description = "Document ID")
),
responses(
(status = 200, description = "Document thumbnail", content_type = "image/jpeg"),
(status = 404, description = "Document or thumbnail not found"),
(status = 401, description = "Unauthorized"),
(status = 500, description = "Internal server error")
)
)]
pub async fn get_document_thumbnail(
State(state): State<Arc<AppState>>,
auth_user: AuthUser,
Path(document_id): Path<uuid::Uuid>,
) -> Result<axum::response::Response, StatusCode> {
let document = state
.db
.get_document_by_id(document_id, auth_user.user.id, auth_user.user.role)
.await
.map_err(|e| {
error!("Database error getting document {}: {}", document_id, e);
StatusCode::INTERNAL_SERVER_ERROR
})?
.ok_or(StatusCode::NOT_FOUND)?;
let file_service = FileService::new(state.config.clone());
match file_service.get_document_thumbnail(&document).await {
Ok(thumbnail_data) => {
let response = axum::response::Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "image/jpeg")
.header("Content-Length", thumbnail_data.len().to_string())
.header("Cache-Control", "public, max-age=3600") // Cache for 1 hour
.body(axum::body::Body::from(thumbnail_data))
.map_err(|e| {
error!("Failed to build thumbnail response: {}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
debug!("Thumbnail served for document: {}", document_id);
Ok(response)
}
Err(_) => {
// Return a default "no thumbnail" response or generate one on the fly
Err(StatusCode::NOT_FOUND)
}
}
}
/// Get processed image for a document (if available)
#[utoipa::path(
get,
path = "/api/documents/{id}/processed-image",
tag = "documents",
security(
("bearer_auth" = [])
),
params(
("id" = uuid::Uuid, Path, description = "Document ID")
),
responses(
(status = 200, description = "Processed image", content_type = "image/png"),
(status = 404, description = "Document or processed image not found"),
(status = 401, description = "Unauthorized"),
(status = 500, description = "Internal server error")
)
)]
pub async fn get_processed_image(
State(state): State<Arc<AppState>>,
auth_user: AuthUser,
Path(document_id): Path<uuid::Uuid>,
) -> Result<axum::response::Response, StatusCode> {
let document = state
.db
.get_document_by_id(document_id, auth_user.user.id, auth_user.user.role)
.await
.map_err(|e| {
error!("Database error getting document {}: {}", document_id, e);
StatusCode::INTERNAL_SERVER_ERROR
})?
.ok_or(StatusCode::NOT_FOUND)?;
// Check if this is an image document
if !document.mime_type.starts_with("image/") {
return Err(StatusCode::BAD_REQUEST);
}
let file_service = FileService::new(state.config.clone());
match file_service.get_processed_image(&document).await {
Ok(image_data) => {
let response = axum::response::Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "image/png")
.header("Content-Length", image_data.len().to_string())
.header("Cache-Control", "public, max-age=3600") // Cache for 1 hour
.body(axum::body::Body::from(image_data))
.map_err(|e| {
error!("Failed to build processed image response: {}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
debug!("Processed image served for document: {}", document_id);
Ok(response)
}
Err(_) => {
Err(StatusCode::NOT_FOUND)
}
}
}
/// Get system-wide document statistics
pub async fn get_document_statistics(
State(state): State<Arc<AppState>>,
auth_user: AuthUser,
) -> Result<Json<serde_json::Value>, StatusCode> {
// Get OCR statistics
let (total, pending, completed, failed) = state
.db
.count_documents_by_ocr_status(auth_user.user.id, auth_user.user.role)
.await
.map_err(|e| {
error!("Database error getting OCR stats: {}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
// Get MIME type distribution
let mime_type_facets = state
.db
.get_mime_type_facets(auth_user.user.id, auth_user.user.role)
.await
.map_err(|e| {
error!("Database error getting MIME type facets: {}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
// Get recent upload activity (simplified)
let recent_documents = state
.db
.get_documents_by_user_with_role(auth_user.user.id, auth_user.user.role, 10, 0)
.await
.map_err(|e| {
error!("Database error getting recent documents: {}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
let total_file_size: i64 = recent_documents.iter().map(|d| d.file_size).sum();
Ok(Json(serde_json::json!({
"document_counts": {
"total": total,
"pending_ocr": pending,
"completed_ocr": completed,
"failed_ocr": failed
},
"mime_types": mime_type_facets,
"storage": {
"recent_documents_size": total_file_size,
"recent_documents_count": recent_documents.len()
},
"activity": {
"recent_uploads": recent_documents.len(),
"last_upload": recent_documents.first().map(|d| d.created_at)
}
})))
}
/// Validate document integrity
pub async fn validate_document_integrity(
State(state): State<Arc<AppState>>,
auth_user: AuthUser,
Path(document_id): Path<uuid::Uuid>,
) -> Result<Json<serde_json::Value>, StatusCode> {
let document = state
.db
.get_document_by_id(document_id, auth_user.user.id, auth_user.user.role)
.await
.map_err(|e| {
error!("Database error getting document {}: {}", document_id, e);
StatusCode::INTERNAL_SERVER_ERROR
})?
.ok_or(StatusCode::NOT_FOUND)?;
let file_service = FileService::new(state.config.clone());
let mut issues = Vec::new();
let mut checks = Vec::new();
// Check file existence
checks.push("file_existence".to_string());
if !file_service.document_file_exists(&document).await.unwrap_or(false) {
issues.push("File does not exist on disk".to_string());
}
// Check file readability
checks.push("file_readability".to_string());
match file_service.read_document_file(&document).await {
Ok(data) => {
// Verify file size matches
if data.len() as i64 != document.file_size {
issues.push(format!(
"File size mismatch: database={}, actual={}",
document.file_size,
data.len()
));
}
}
Err(e) => {
issues.push(format!("Cannot read file: {}", e));
}
}
// Check OCR consistency
checks.push("ocr_consistency".to_string());
if document.ocr_text.is_some() && document.ocr_status.as_deref() != Some("completed") {
issues.push("OCR text exists but status is not 'completed'".to_string());
}
if document.ocr_text.is_none() && document.ocr_status.as_deref() == Some("completed") {
issues.push("OCR status is 'completed' but no OCR text available".to_string());
}
// Check confidence consistency
checks.push("confidence_consistency".to_string());
if let Some(confidence) = document.ocr_confidence {
if confidence < 0.0 || confidence > 100.0 {
issues.push(format!("Invalid OCR confidence value: {}", confidence));
}
}
let is_valid = issues.is_empty();
info!("Document {} integrity check: {} issues found", document_id, issues.len());
Ok(Json(serde_json::json!({
"document_id": document_id,
"is_valid": is_valid,
"checks_performed": checks,
"issues": issues,
"summary": if is_valid {
"Document integrity is good"
} else {
format!("Found {} integrity issues", issues.len())
}
})))
}

View File

@ -0,0 +1,522 @@
use axum::{
extract::{Path, Query, State},
http::StatusCode,
response::{Json, Response},
body::Body,
};
use std::sync::Arc;
use tracing::{debug, error, info, warn};
use std::collections::HashMap;
use crate::{
auth::AuthUser,
models::UserRole,
services::file_service::FileService,
AppState,
};
use super::types::FailedDocumentsQuery;
/// Get failed documents with filtering and pagination
#[utoipa::path(
get,
path = "/api/documents/failed",
tag = "documents",
security(
("bearer_auth" = [])
),
params(
("limit" = Option<i64>, Query, description = "Number of documents to return"),
("offset" = Option<i64>, Query, description = "Number of documents to skip"),
("stage" = Option<String>, Query, description = "Filter by failure stage (ocr, ingestion, validation, etc.)"),
("reason" = Option<String>, Query, description = "Filter by failure reason")
),
responses(
(status = 200, description = "Failed documents list", body = serde_json::Value),
(status = 401, description = "Unauthorized"),
(status = 500, description = "Internal server error")
)
)]
pub async fn get_failed_documents(
State(state): State<Arc<AppState>>,
auth_user: AuthUser,
Query(params): Query<FailedDocumentsQuery>,
) -> Result<Json<serde_json::Value>, StatusCode> {
let limit = params.limit.unwrap_or(25);
let offset = params.offset.unwrap_or(0);
// Query the unified failed_documents table
let mut query_builder = sqlx::QueryBuilder::new(
r#"
SELECT id, filename, original_filename, file_path, file_size, mime_type,
content, tags, ocr_text, ocr_confidence, ocr_word_count, ocr_processing_time_ms,
failure_reason, failure_stage, error_message, existing_document_id,
ingestion_source, retry_count, last_retry_at, created_at, updated_at
FROM failed_documents
WHERE ($1::uuid IS NULL OR user_id = $1)
"#
);
let mut bind_count = 1;
// Add stage filter if specified
if let Some(stage) = &params.stage {
bind_count += 1;
query_builder.push(&format!(" AND failure_stage = ${}", bind_count));
}
// Add reason filter if specified
if let Some(reason) = &params.reason {
bind_count += 1;
query_builder.push(&format!(" AND failure_reason = ${}", bind_count));
}
query_builder.push(" ORDER BY created_at DESC");
query_builder.push(&format!(" LIMIT ${} OFFSET ${}", bind_count + 1, bind_count + 2));
let mut query = query_builder.build();
// Bind parameters in order
query = query.bind(if auth_user.user.role == UserRole::Admin {
None
} else {
Some(auth_user.user.id)
});
if let Some(stage) = &params.stage {
query = query.bind(stage);
}
if let Some(reason) = &params.reason {
query = query.bind(reason);
}
query = query.bind(limit).bind(offset);
let failed_docs = query
.fetch_all(state.db.get_pool())
.await
.map_err(|e| {
error!("Failed to fetch failed documents: {}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
// Count total for pagination
let mut count_query_builder = sqlx::QueryBuilder::new(
"SELECT COUNT(*) FROM failed_documents WHERE ($1::uuid IS NULL OR user_id = $1)"
);
let mut count_bind_count = 1;
if let Some(stage) = &params.stage {
count_bind_count += 1;
count_query_builder.push(&format!(" AND failure_stage = ${}", count_bind_count));
}
if let Some(reason) = &params.reason {
count_bind_count += 1;
count_query_builder.push(&format!(" AND failure_reason = ${}", count_bind_count));
}
let mut count_query = count_query_builder.build_query_scalar::<i64>();
count_query = count_query.bind(if auth_user.user.role == UserRole::Admin {
None
} else {
Some(auth_user.user.id)
});
if let Some(stage) = &params.stage {
count_query = count_query.bind(stage);
}
if let Some(reason) = &params.reason {
count_query = count_query.bind(reason);
}
let total_count = count_query
.fetch_one(state.db.get_pool())
.await
.unwrap_or(0);
// Convert to JSON response format
let documents: Vec<serde_json::Value> = failed_docs.iter().map(|row| {
serde_json::json!({
"id": row.get::<uuid::Uuid, _>("id"),
"filename": row.get::<String, _>("filename"),
"original_filename": row.get::<Option<String>, _>("original_filename"),
"file_path": row.get::<Option<String>, _>("file_path"),
"file_size": row.get::<Option<i64>, _>("file_size"),
"mime_type": row.get::<Option<String>, _>("mime_type"),
"content": row.get::<Option<String>, _>("content"),
"tags": row.get::<Vec<String>, _>("tags"),
"ocr_text": row.get::<Option<String>, _>("ocr_text"),
"ocr_confidence": row.get::<Option<f32>, _>("ocr_confidence"),
"ocr_word_count": row.get::<Option<i32>, _>("ocr_word_count"),
"ocr_processing_time_ms": row.get::<Option<i32>, _>("ocr_processing_time_ms"),
"failure_reason": row.get::<String, _>("failure_reason"),
"failure_stage": row.get::<String, _>("failure_stage"),
"error_message": row.get::<Option<String>, _>("error_message"),
"existing_document_id": row.get::<Option<uuid::Uuid>, _>("existing_document_id"),
"ingestion_source": row.get::<String, _>("ingestion_source"),
"retry_count": row.get::<Option<i32>, _>("retry_count"),
"last_retry_at": row.get::<Option<chrono::DateTime<chrono::Utc>>, _>("last_retry_at"),
"created_at": row.get::<chrono::DateTime<chrono::Utc>, _>("created_at"),
"updated_at": row.get::<chrono::DateTime<chrono::Utc>, _>("updated_at"),
// Computed fields for backward compatibility
"failure_category": categorize_failure_reason(
Some(&row.get::<String, _>("failure_reason")),
row.get::<Option<String>, _>("error_message").as_deref()
),
"source": match row.get::<String, _>("failure_stage").as_str() {
"ocr" => "OCR Processing",
"ingestion" => "Document Ingestion",
"validation" => "Document Validation",
"storage" => "File Storage",
"processing" => "Document Processing",
"sync" => "Source Synchronization",
_ => "Unknown"
}
})
}).collect();
// Calculate statistics for the response
let mut stage_stats = HashMap::new();
let mut reason_stats = HashMap::new();
for doc in &documents {
let stage = doc["failure_stage"].as_str().unwrap_or("unknown");
let reason = doc["failure_reason"].as_str().unwrap_or("unknown");
*stage_stats.entry(stage).or_insert(0) += 1;
*reason_stats.entry(reason).or_insert(0) += 1;
}
let response = serde_json::json!({
"documents": documents,
"pagination": {
"limit": limit,
"offset": offset,
"total": total_count,
"total_pages": (total_count as f64 / limit as f64).ceil() as i64
},
"statistics": {
"total_failed": total_count,
"by_stage": stage_stats,
"by_reason": reason_stats
},
"filters": {
"stage": params.stage,
"reason": params.reason
}
});
Ok(Json(response))
}
/// Get failed OCR documents with detailed information
#[utoipa::path(
get,
path = "/api/documents/failed-ocr",
tag = "documents",
security(
("bearer_auth" = [])
),
params(
("limit" = Option<i64>, Query, description = "Number of documents to return"),
("offset" = Option<i64>, Query, description = "Number of documents to skip")
),
responses(
(status = 200, description = "Failed OCR documents list", body = serde_json::Value),
(status = 401, description = "Unauthorized"),
(status = 500, description = "Internal server error")
)
)]
pub async fn get_failed_ocr_documents(
State(state): State<Arc<AppState>>,
auth_user: AuthUser,
Query(pagination): Query<super::types::PaginationQuery>,
) -> Result<Json<serde_json::Value>, StatusCode> {
let limit = pagination.limit.unwrap_or(50);
let offset = pagination.offset.unwrap_or(0);
// Get failed OCR documents with additional failure details
let failed_docs = sqlx::query(
r#"
SELECT d.id, d.filename, d.original_filename, d.file_path, d.file_size,
d.mime_type, d.created_at, d.updated_at, d.user_id,
d.ocr_status, d.ocr_error, d.ocr_failure_reason,
d.ocr_completed_at, d.tags,
-- Count retry attempts from OCR queue
COALESCE(q.retry_count, 0) as retry_count,
q.last_attempt_at
FROM documents d
LEFT JOIN (
SELECT document_id,
COUNT(*) as retry_count,
MAX(created_at) as last_attempt_at
FROM ocr_queue
WHERE status IN ('failed', 'completed')
GROUP BY document_id
) q ON d.id = q.document_id
WHERE d.ocr_status = 'failed'
AND ($1::uuid IS NULL OR d.user_id = $1) -- Admin can see all, users see only their own
ORDER BY d.updated_at DESC
LIMIT $2 OFFSET $3
"#
)
.bind(if auth_user.user.role == UserRole::Admin {
None
} else {
Some(auth_user.user.id)
})
.bind(limit)
.bind(offset)
.fetch_all(state.db.get_pool())
.await
.map_err(|e| {
error!("Failed to fetch failed OCR documents: {}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
// Count total failed documents
let total_count: i64 = sqlx::query_scalar(
r#"
SELECT COUNT(*)
FROM documents
WHERE ocr_status = 'failed'
AND ($1::uuid IS NULL OR user_id = $1)
"#
)
.bind(if auth_user.user.role == UserRole::Admin {
None
} else {
Some(auth_user.user.id)
})
.fetch_one(state.db.get_pool())
.await
.map_err(|e| {
error!("Failed to count failed OCR documents: {}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
let failed_documents: Vec<serde_json::Value> = failed_docs
.into_iter()
.map(|row| {
let tags: Vec<String> = row.get::<Option<Vec<String>>, _>("tags").unwrap_or_default();
serde_json::json!({
"id": row.get::<uuid::Uuid, _>("id"),
"filename": row.get::<String, _>("filename"),
"original_filename": row.get::<String, _>("original_filename"),
"file_size": row.get::<i64, _>("file_size"),
"mime_type": row.get::<String, _>("mime_type"),
"created_at": row.get::<chrono::DateTime<chrono::Utc>, _>("created_at"),
"updated_at": row.get::<chrono::DateTime<chrono::Utc>, _>("updated_at"),
"tags": tags,
"ocr_status": row.get::<Option<String>, _>("ocr_status"),
"ocr_error": row.get::<Option<String>, _>("ocr_error"),
"ocr_failure_reason": row.get::<Option<String>, _>("ocr_failure_reason"),
"ocr_completed_at": row.get::<Option<chrono::DateTime<chrono::Utc>>, _>("ocr_completed_at"),
"retry_count": row.get::<Option<i64>, _>("retry_count").unwrap_or(0),
"last_attempt_at": row.get::<Option<chrono::DateTime<chrono::Utc>>, _>("last_attempt_at"),
"can_retry": true,
"failure_category": categorize_failure_reason(
row.get::<Option<String>, _>("ocr_failure_reason").as_deref(),
row.get::<Option<String>, _>("ocr_error").as_deref()
)
})
})
.collect();
let response = serde_json::json!({
"documents": failed_documents,
"pagination": {
"total": total_count,
"limit": limit,
"offset": offset,
"has_more": offset + limit < total_count
},
"statistics": {
"total_failed": total_count,
"failure_categories": get_failure_statistics(&state, auth_user.user.id, auth_user.user.role.clone()).await?
}
});
Ok(Json(response))
}
/// View a failed document file
#[utoipa::path(
get,
path = "/api/documents/failed/{id}/view",
tag = "documents",
security(
("bearer_auth" = [])
),
params(
("id" = uuid::Uuid, Path, description = "Failed Document ID")
),
responses(
(status = 200, description = "Failed document content for viewing in browser"),
(status = 404, description = "Failed document not found or file deleted"),
(status = 401, description = "Unauthorized")
)
)]
pub async fn view_failed_document(
State(state): State<Arc<AppState>>,
auth_user: AuthUser,
Path(failed_document_id): Path<uuid::Uuid>,
) -> Result<Response, StatusCode> {
// Get failed document from database
let row = sqlx::query(
r#"
SELECT file_path, filename, mime_type, user_id
FROM failed_documents
WHERE id = $1 AND ($2::uuid IS NULL OR user_id = $2)
"#
)
.bind(failed_document_id)
.bind(if auth_user.user.role == UserRole::Admin {
None
} else {
Some(auth_user.user.id)
})
.fetch_optional(&state.db.pool)
.await
.map_err(|e| {
error!("Failed to fetch failed document: {}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?
.ok_or(StatusCode::NOT_FOUND)?;
let file_path: Option<String> = row.get("file_path");
let filename: String = row.get("filename");
let mime_type: Option<String> = row.get("mime_type");
// Check if file_path exists (some failed documents might not have been saved)
let file_path = file_path.ok_or(StatusCode::NOT_FOUND)?;
let file_service = FileService::new(state.config.clone());
let file_data = file_service
.read_file(&file_path)
.await
.map_err(|e| {
error!("Failed to read failed document file: {}", e);
StatusCode::NOT_FOUND
})?;
// Determine content type from mime_type or file extension
let content_type = mime_type
.unwrap_or_else(|| {
mime_guess::from_path(&filename)
.first_or_octet_stream()
.to_string()
});
let response = Response::builder()
.header("Content-Type", content_type)
.header("Content-Length", file_data.len())
.header("Content-Disposition", format!("inline; filename=\"{}\"", filename))
.body(Body::from(file_data))
.map_err(|e| {
error!("Failed to build response: {}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
debug!("Failed document viewed: {}", failed_document_id);
Ok(response)
}
/// Helper function to categorize failure reasons
fn categorize_failure_reason(failure_reason: Option<&str>, error_message: Option<&str>) -> &'static str {
match failure_reason {
Some("pdf_font_encoding") => "PDF Font Issues",
Some("pdf_corruption") => "PDF Corruption",
Some("processing_timeout") => "Timeout",
Some("memory_limit") => "Memory Limit",
Some("pdf_parsing_panic") => "PDF Parsing Error",
Some("low_ocr_confidence") => "Low OCR Confidence",
Some("unknown") | None => {
// Try to categorize based on error message
if let Some(error) = error_message {
let error_lower = error.to_lowercase();
if error_lower.contains("timeout") {
"Timeout"
} else if error_lower.contains("memory") {
"Memory Limit"
} else if error_lower.contains("font") || error_lower.contains("encoding") {
"PDF Font Issues"
} else if error_lower.contains("corrupt") {
"PDF Corruption"
} else if error_lower.contains("quality below threshold") || error_lower.contains("confidence") {
"Low OCR Confidence"
} else {
"Unknown Error"
}
} else {
"Unknown Error"
}
}
_ => "Other"
}
}
/// Helper function to get failure statistics
async fn get_failure_statistics(
state: &Arc<AppState>,
user_id: uuid::Uuid,
user_role: UserRole
) -> Result<serde_json::Value, StatusCode> {
let stats = sqlx::query(
r#"
SELECT
ocr_failure_reason,
COUNT(*) as count
FROM documents
WHERE ocr_status = 'failed'
AND ($1::uuid IS NULL OR user_id = $1)
GROUP BY ocr_failure_reason
ORDER BY count DESC
"#
)
.bind(if user_role == UserRole::Admin {
None
} else {
Some(user_id)
})
.fetch_all(state.db.get_pool())
.await
.map_err(|e| {
error!("Failed to get failure statistics: {}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
let categories: Vec<serde_json::Value> = stats
.into_iter()
.map(|row| {
let reason = row.get::<Option<String>, _>("ocr_failure_reason");
let count = row.get::<i64, _>("count");
serde_json::json!({
"reason": reason.clone().unwrap_or_else(|| "unknown".to_string()),
"display_name": categorize_failure_reason(reason.as_deref(), None),
"count": count
})
})
.collect();
Ok(serde_json::json!(categories))
}
/// Helper function to calculate estimated wait time for retries
pub async fn calculate_estimated_wait_time(priority: i32) -> i64 {
// Simple estimation based on priority - in a real implementation,
// this would check actual queue depth and processing times
match priority {
15.. => 1, // High priority retry: ~1 minute
10..14 => 3, // Medium priority: ~3 minutes
5..9 => 10, // Low priority: ~10 minutes
_ => 30, // Very low priority: ~30 minutes
}
}

View File

@ -0,0 +1,14 @@
pub mod types;
pub mod crud;
pub mod ocr;
pub mod bulk;
pub mod debug;
pub mod failed;
// Re-export commonly used types and functions for backward compatibility
pub use types::*;
pub use crud::*;
pub use ocr::*;
pub use bulk::*;
pub use debug::*;
pub use failed::*;

281
src/routes/documents/ocr.rs Normal file
View File

@ -0,0 +1,281 @@
use axum::{
extract::{Path, State},
http::StatusCode,
response::Json,
};
use std::sync::Arc;
use tracing::{debug, error, info, warn};
use crate::{
auth::AuthUser,
models::DocumentOcrResponse,
AppState,
};
/// Get OCR text for a document
#[utoipa::path(
get,
path = "/api/documents/{id}/ocr",
tag = "documents",
security(
("bearer_auth" = [])
),
params(
("id" = uuid::Uuid, Path, description = "Document ID")
),
responses(
(status = 200, description = "Document OCR text", body = DocumentOcrResponse),
(status = 404, description = "Document not found"),
(status = 401, description = "Unauthorized"),
(status = 500, description = "Internal server error")
)
)]
pub async fn get_document_ocr(
State(state): State<Arc<AppState>>,
auth_user: AuthUser,
Path(document_id): Path<uuid::Uuid>,
) -> Result<Json<DocumentOcrResponse>, StatusCode> {
let document = state
.db
.get_document_by_id(document_id, auth_user.user.id, auth_user.user.role)
.await
.map_err(|e| {
error!("Database error getting document {}: {}", document_id, e);
StatusCode::INTERNAL_SERVER_ERROR
})?
.ok_or(StatusCode::NOT_FOUND)?;
let response = DocumentOcrResponse {
document_id: document.id,
filename: document.original_filename,
has_ocr_text: document.ocr_text.is_some(),
ocr_text: document.ocr_text,
ocr_confidence: document.ocr_confidence,
ocr_status: document.ocr_status,
ocr_processing_time_ms: document.ocr_processing_time_ms,
detected_language: None, // This would need to be stored separately if needed
pages_processed: None, // This would need to be stored separately if needed
};
Ok(Json(response))
}
/// Retry OCR processing for a document
#[utoipa::path(
post,
path = "/api/documents/{id}/retry-ocr",
tag = "documents",
security(
("bearer_auth" = [])
),
params(
("id" = uuid::Uuid, Path, description = "Document ID")
),
responses(
(status = 200, description = "OCR retry initiated"),
(status = 404, description = "Document not found"),
(status = 401, description = "Unauthorized"),
(status = 409, description = "OCR already in progress"),
(status = 500, description = "Internal server error")
)
)]
pub async fn retry_ocr(
State(state): State<Arc<AppState>>,
auth_user: AuthUser,
Path(document_id): Path<uuid::Uuid>,
) -> Result<Json<serde_json::Value>, StatusCode> {
// Get document first to check if it exists and user has access
let document = state
.db
.get_document_by_id(document_id, auth_user.user.id, auth_user.user.role)
.await
.map_err(|e| {
error!("Database error getting document {}: {}", document_id, e);
StatusCode::INTERNAL_SERVER_ERROR
})?
.ok_or(StatusCode::NOT_FOUND)?;
// Check if OCR is already in progress
if let Some(ref status) = document.ocr_status {
if status == "processing" {
return Ok(Json(serde_json::json!({
"success": false,
"message": "OCR is already in progress for this document"
})));
}
}
// Add to OCR queue
match state.ocr_queue.enqueue_document(document.id, auth_user.user.id, 1).await {
Ok(_) => {
info!("Document {} queued for OCR retry", document_id);
Ok(Json(serde_json::json!({
"success": true,
"message": "Document queued for OCR processing"
})))
}
Err(e) => {
error!("Failed to queue document {} for OCR: {}", document_id, e);
Err(StatusCode::INTERNAL_SERVER_ERROR)
}
}
}
/// Get OCR processing status for multiple documents
pub async fn get_ocr_status_batch(
State(state): State<Arc<AppState>>,
auth_user: AuthUser,
Json(document_ids): Json<Vec<uuid::Uuid>>,
) -> Result<Json<serde_json::Value>, StatusCode> {
if document_ids.len() > 100 {
return Err(StatusCode::BAD_REQUEST);
}
let mut results = Vec::new();
for document_id in document_ids {
match state
.db
.get_document_by_id(document_id, auth_user.user.id, auth_user.user.role)
.await
{
Ok(Some(document)) => {
results.push(serde_json::json!({
"document_id": document.id,
"ocr_status": document.ocr_status,
"ocr_confidence": document.ocr_confidence,
"has_ocr_text": document.ocr_text.is_some(),
"retry_count": document.ocr_retry_count.unwrap_or(0)
}));
}
Ok(None) => {
results.push(serde_json::json!({
"document_id": document_id,
"error": "Document not found"
}));
}
Err(e) => {
error!("Error getting document {}: {}", document_id, e);
results.push(serde_json::json!({
"document_id": document_id,
"error": "Database error"
}));
}
}
}
Ok(Json(serde_json::json!({
"results": results
})))
}
/// Cancel OCR processing for a document
pub async fn cancel_ocr(
State(state): State<Arc<AppState>>,
auth_user: AuthUser,
Path(document_id): Path<uuid::Uuid>,
) -> Result<Json<serde_json::Value>, StatusCode> {
// Verify user has access to the document
let _document = state
.db
.get_document_by_id(document_id, auth_user.user.id, auth_user.user.role)
.await
.map_err(|e| {
error!("Database error getting document {}: {}", document_id, e);
StatusCode::INTERNAL_SERVER_ERROR
})?
.ok_or(StatusCode::NOT_FOUND)?;
// Try to remove from queue
match state.ocr_queue.remove_from_queue(document_id).await {
Ok(removed) => {
if removed {
info!("Document {} removed from OCR queue", document_id);
Ok(Json(serde_json::json!({
"success": true,
"message": "Document removed from OCR queue"
})))
} else {
Ok(Json(serde_json::json!({
"success": false,
"message": "Document was not in the OCR queue"
})))
}
}
Err(e) => {
error!("Failed to remove document {} from OCR queue: {}", document_id, e);
Err(StatusCode::INTERNAL_SERVER_ERROR)
}
}
}
/// Get OCR processing statistics
pub async fn get_ocr_stats(
State(state): State<Arc<AppState>>,
auth_user: AuthUser,
) -> Result<Json<serde_json::Value>, StatusCode> {
let (total, pending, completed, failed) = state
.db
.count_documents_by_ocr_status(auth_user.user.id, auth_user.user.role)
.await
.map_err(|e| {
error!("Database error getting OCR stats: {}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
// Get queue statistics
let queue_size = state
.ocr_queue
.get_queue_size()
.await
.map_err(|e| {
error!("Failed to get OCR queue size: {}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
let active_jobs = state
.ocr_queue
.get_active_jobs_count()
.await
.map_err(|e| {
error!("Failed to get active OCR jobs count: {}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
Ok(Json(serde_json::json!({
"total_documents": total,
"pending_ocr": pending,
"completed_ocr": completed,
"failed_ocr": failed,
"queue_size": queue_size,
"active_jobs": active_jobs,
"completion_rate": if total > 0 { (completed as f64 / total as f64 * 100.0) } else { 0.0 }
})))
}
/// Update OCR settings for a document
pub async fn update_ocr_settings(
State(state): State<Arc<AppState>>,
auth_user: AuthUser,
Path(document_id): Path<uuid::Uuid>,
Json(settings): Json<serde_json::Value>,
) -> Result<Json<serde_json::Value>, StatusCode> {
// Verify user has access to the document
let _document = state
.db
.get_document_by_id(document_id, auth_user.user.id, auth_user.user.role)
.await
.map_err(|e| {
error!("Database error getting document {}: {}", document_id, e);
StatusCode::INTERNAL_SERVER_ERROR
})?
.ok_or(StatusCode::NOT_FOUND)?;
// For now, just return success - OCR settings would be stored in metadata
debug!("OCR settings updated for document {}: {:?}", document_id, settings);
Ok(Json(serde_json::json!({
"success": true,
"message": "OCR settings updated"
})))
}

View File

@ -0,0 +1,86 @@
use serde::{Deserialize, Serialize};
use utoipa::ToSchema;
#[derive(Deserialize, ToSchema)]
pub struct PaginationQuery {
pub limit: Option<i64>,
pub offset: Option<i64>,
pub ocr_status: Option<String>,
}
#[derive(Deserialize, ToSchema)]
pub struct FailedDocumentsQuery {
pub limit: Option<i64>,
pub offset: Option<i64>,
pub stage: Option<String>, // 'ocr', 'ingestion', 'validation', etc.
pub reason: Option<String>, // 'duplicate_content', 'low_ocr_confidence', etc.
}
#[derive(Deserialize, Serialize, ToSchema)]
pub struct BulkDeleteRequest {
pub document_ids: Vec<uuid::Uuid>,
}
#[derive(Deserialize, Serialize, ToSchema)]
pub struct DeleteLowConfidenceRequest {
pub max_confidence: f32,
pub preview_only: Option<bool>,
}
#[derive(Serialize, ToSchema)]
pub struct DocumentUploadResponse {
pub document_id: uuid::Uuid,
pub filename: String,
pub file_size: i64,
pub mime_type: String,
pub status: String,
pub message: String,
}
#[derive(Serialize, ToSchema)]
pub struct BulkDeleteResponse {
pub deleted_count: i64,
pub failed_count: i64,
pub deleted_documents: Vec<uuid::Uuid>,
pub failed_documents: Vec<uuid::Uuid>,
pub total_files_deleted: i64,
pub total_files_failed: i64,
}
#[derive(Serialize, ToSchema)]
pub struct DocumentDebugInfo {
pub document_id: uuid::Uuid,
pub filename: String,
pub file_path: String,
pub file_size: i64,
pub mime_type: String,
pub created_at: chrono::DateTime<chrono::Utc>,
pub ocr_status: Option<String>,
pub ocr_confidence: Option<f32>,
pub ocr_word_count: Option<i32>,
pub processing_steps: Vec<String>,
pub file_exists: bool,
pub readable: bool,
pub permissions: Option<String>,
}
impl Default for PaginationQuery {
fn default() -> Self {
Self {
limit: Some(25),
offset: Some(0),
ocr_status: None,
}
}
}
impl Default for FailedDocumentsQuery {
fn default() -> Self {
Self {
limit: Some(25),
offset: Some(0),
stage: None,
reason: None,
}
}
}

View File

@ -0,0 +1,62 @@
use axum::{
routing::{get, post, delete},
Router,
};
use std::sync::Arc;
use crate::AppState;
// Import all the modularized functions
mod types;
mod crud;
mod ocr;
mod bulk;
mod debug;
mod failed;
// Re-export types for external use
pub use types::*;
// Use the individual module functions
use crud::*;
use ocr::*;
use bulk::*;
use debug::*;
use failed::*;
/// Documents router with all document-related endpoints
pub fn router() -> Router<Arc<AppState>> {
Router::new()
// Basic CRUD operations
.route("/", post(upload_document))
.route("/", get(list_documents))
.route("/", delete(bulk_delete_documents))
.route("/{id}", get(get_document_by_id))
.route("/{id}", delete(delete_document))
.route("/{id}/download", get(download_document))
.route("/{id}/view", get(view_document))
// OCR operations
.route("/{id}/ocr", get(get_document_ocr))
.route("/{id}/retry-ocr", post(retry_ocr))
.route("/ocr/bulk-retry", post(crate::routes::documents_ocr_retry::bulk_retry_ocr))
.route("/ocr/retry-stats", get(crate::routes::documents_ocr_retry::get_ocr_retry_stats))
.route("/ocr/retry-recommendations", get(crate::routes::documents_ocr_retry::get_retry_recommendations))
.route("/{id}/ocr/retry-history", get(crate::routes::documents_ocr_retry::get_document_retry_history))
// Bulk operations
.route("/delete-low-confidence", post(delete_low_confidence_documents))
.route("/delete-failed-ocr", post(delete_failed_ocr_documents))
// Debug and diagnostic operations
.route("/{id}/debug", get(get_document_debug_info))
.route("/{id}/thumbnail", get(get_document_thumbnail))
.route("/{id}/processed-image", get(get_processed_image))
// Failed document management
.route("/failed", get(get_failed_documents))
.route("/failed/{id}/view", get(view_failed_document))
// Other operations
.route("/duplicates", get(get_user_duplicates))
}

View File

@ -39,23 +39,23 @@ use crate::{
crate::routes::auth::oidc_login,
crate::routes::auth::oidc_callback,
// Document endpoints
crate::routes::documents::upload_document,
crate::routes::documents::list_documents,
crate::routes::documents::get_document_by_id,
crate::routes::documents::delete_document,
crate::routes::documents::bulk_delete_documents,
crate::routes::documents::download_document,
crate::routes::documents::view_document,
crate::routes::documents::get_document_thumbnail,
crate::routes::documents::get_document_ocr,
crate::routes::documents::get_processed_image,
crate::routes::documents::retry_ocr,
crate::routes::documents::get_document_debug_info,
crate::routes::documents::get_failed_ocr_documents,
crate::routes::documents::view_failed_document,
crate::routes::documents::delete_low_confidence_documents,
crate::routes::documents::delete_failed_ocr_documents,
crate::routes::documents::get_user_duplicates,
crate::routes::documents::crud::upload_document,
crate::routes::documents::crud::list_documents,
crate::routes::documents::crud::get_document_by_id,
crate::routes::documents::crud::delete_document,
crate::routes::documents::bulk::bulk_delete_documents,
crate::routes::documents::crud::download_document,
crate::routes::documents::crud::view_document,
crate::routes::documents::debug::get_document_thumbnail,
crate::routes::documents::ocr::get_document_ocr,
crate::routes::documents::debug::get_processed_image,
crate::routes::documents::ocr::retry_ocr,
crate::routes::documents::debug::get_document_debug_info,
crate::routes::documents::failed::get_failed_ocr_documents,
crate::routes::documents::failed::view_failed_document,
crate::routes::documents::bulk::delete_low_confidence_documents,
crate::routes::documents::bulk::delete_failed_ocr_documents,
crate::routes::documents::crud::get_user_duplicates,
// Labels endpoints
crate::routes::labels::get_labels,
crate::routes::labels::create_label,