From abd55ef4199c3e4e9c9091dec0ea92ab44498681 Mon Sep 17 00:00:00 2001 From: perf3ct Date: Fri, 1 Aug 2025 04:33:08 +0000 Subject: [PATCH] feat(storage): abstract storage to also support s3, along with local filesystem still --- src/bin/batch_ingest.rs | 15 ++- src/bin/enqueue_pending_ocr.rs | 25 ++++- src/bin/migrate_to_s3.rs | 12 ++- src/lib.rs | 1 + src/main.rs | 27 +++--- src/ocr/enhanced.rs | 97 ++++++++++++++++++- src/ocr/queue.rs | 20 ++-- src/routes/documents/bulk.rs | 6 +- src/routes/documents/crud.rs | 10 +- src/routes/documents/debug.rs | 8 +- src/routes/documents/failed.rs | 2 +- src/routes/metrics.rs | 6 +- src/routes/prometheus_metrics.rs | 6 +- src/routes/queue.rs | 4 +- src/routes/webdav/webdav_sync.rs | 4 +- src/scheduling/source_sync.rs | 4 +- src/scheduling/watcher.rs | 11 +-- src/services/file_service.rs | 2 + src/storage/local.rs | 33 ++++++- src/test_utils.rs | 22 ++++- src/tests/ocr_tests.rs | 7 +- tests/integration_file_service_tests.rs | 50 +++++----- ...egration_hash_duplicate_detection_tests.rs | 10 +- ...tegration_ocr_pipeline_integration_test.rs | 13 ++- tests/s3_storage_tests.rs | 20 +++- 25 files changed, 311 insertions(+), 104 deletions(-) diff --git a/src/bin/batch_ingest.rs b/src/bin/batch_ingest.rs index 4a0fe65..7f58b7f 100644 --- a/src/bin/batch_ingest.rs +++ b/src/bin/batch_ingest.rs @@ -64,10 +64,19 @@ async fn main() -> Result<()> { let config = Config::from_env()?; let db = Database::new(&config.database_url).await?; - let file_service = FileService::new(config.upload_path.clone()); - let queue_service = OcrQueueService::new(db.clone(), db.get_pool().clone(), 1); - let ingester = BatchIngester::new(db, queue_service, file_service, config); + // Use storage factory to create file service with proper backend + let storage_config = readur::storage::factory::storage_config_from_env(&config)?; + let file_service = std::sync::Arc::new( + FileService::from_config(storage_config, config.upload_path.clone()).await? + ); + + // Initialize storage backend + file_service.initialize_storage().await?; + + let queue_service = OcrQueueService::new(db.clone(), db.get_pool().clone(), 1, file_service.clone()); + + let ingester = BatchIngester::new(db, queue_service, (*file_service).clone(), config); println!("Starting batch ingestion from: {}", directory); // Only show the first and last character of the user ID diff --git a/src/bin/enqueue_pending_ocr.rs b/src/bin/enqueue_pending_ocr.rs index 750469e..f7931a5 100644 --- a/src/bin/enqueue_pending_ocr.rs +++ b/src/bin/enqueue_pending_ocr.rs @@ -16,6 +16,9 @@ use readur::{ config::Config, db::Database, ocr::queue::OcrQueueService, + services::file_service::FileService, + storage::factory::create_storage_backend, + storage::StorageConfig, }; #[tokio::main] @@ -30,7 +33,27 @@ async fn main() -> Result<()> { // Connect to database let db = Database::new(&config.database_url).await?; - let queue_service = OcrQueueService::new(db.clone(), db.get_pool().clone(), 1); + + // Create file service + let storage_config = if config.s3_enabled { + #[cfg(feature = "s3")] + { + StorageConfig::S3 { + s3_config: config.s3_config.as_ref().unwrap().clone(), + fallback_path: Some(config.upload_path.clone()), + } + } + #[cfg(not(feature = "s3"))] + { + StorageConfig::Local { upload_path: config.upload_path.clone() } + } + } else { + StorageConfig::Local { upload_path: config.upload_path.clone() } + }; + let storage_backend = create_storage_backend(storage_config).await?; + let file_service = std::sync::Arc::new(FileService::with_storage(config.upload_path.clone(), storage_backend)); + + let queue_service = OcrQueueService::new(db.clone(), db.get_pool().clone(), 1, file_service); // Find documents with pending OCR status that aren't in the queue let pending_documents = sqlx::query( diff --git a/src/bin/migrate_to_s3.rs b/src/bin/migrate_to_s3.rs index 83f9dbd..9d31b17 100644 --- a/src/bin/migrate_to_s3.rs +++ b/src/bin/migrate_to_s3.rs @@ -72,6 +72,11 @@ async fn main() -> Result<()> { info!("☁️ Initializing S3 service..."); let s3_service = S3Service::new(s3_config.clone()).await?; + // Initialize FileService with proper storage configuration + info!("📁 Initializing file service..."); + let storage_config = readur::storage::factory::storage_config_from_env(&config)?; + let file_service = FileService::from_config(storage_config, config.upload_path.clone()).await?; + // Test S3 connection match s3_service.test_connection().await { Ok(_) => info!("✅ S3 connection successful"), @@ -134,7 +139,7 @@ async fn main() -> Result<()> { for doc in local_documents { info!("📦 Migrating: {} ({})", doc.original_filename, doc.id); - match migrate_document(&db, &s3_service, &doc, args.delete_local).await { + match migrate_document(&db, &s3_service, &file_service, &doc, args.delete_local).await { Ok(_) => { migrated_count += 1; info!("✅ Successfully migrated: {}", doc.original_filename); @@ -158,6 +163,7 @@ async fn main() -> Result<()> { async fn migrate_document( db: &Database, s3_service: &S3Service, + file_service: &FileService, document: &readur::models::Document, delete_local: bool, ) -> Result<()> { @@ -183,7 +189,7 @@ async fn migrate_document( db.update_document_file_path(document.id, &s3_path).await?; // Migrate associated files (thumbnails, processed images) - migrate_associated_files(s3_service, document, delete_local).await?; + migrate_associated_files(s3_service, file_service, document, delete_local).await?; // Delete local file if requested if delete_local { @@ -199,10 +205,10 @@ async fn migrate_document( async fn migrate_associated_files( s3_service: &S3Service, + file_service: &FileService, document: &readur::models::Document, delete_local: bool, ) -> Result<()> { - let file_service = FileService::new("./uploads".to_string()); // Migrate thumbnail let thumbnail_path = file_service.get_thumbnails_path().join(format!("{}_thumb.jpg", document.id)); diff --git a/src/lib.rs b/src/lib.rs index bc0ea55..e8c4134 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -35,6 +35,7 @@ use oidc::OidcClient; pub struct AppState { pub db: Database, pub config: Config, + pub file_service: std::sync::Arc, pub webdav_scheduler: Option>, pub source_scheduler: Option>, pub queue_service: std::sync::Arc, diff --git a/src/main.rs b/src/main.rs index f5aa7a4..8623b14 100644 --- a/src/main.rs +++ b/src/main.rs @@ -124,17 +124,12 @@ async fn main() -> anyhow::Result<()> { // Initialize file service using the new storage backend architecture info!("Initializing file service with storage backend..."); let storage_config = readur::storage::factory::storage_config_from_env(&config)?; - let file_service = match readur::services::file_service::FileService::from_config(storage_config, config.upload_path.clone()).await { - Ok(service) => { - info!("✅ File service initialized with {} storage backend", service.storage_type()); - service - } - Err(e) => { + let file_service = readur::services::file_service::FileService::from_config(storage_config, config.upload_path.clone()).await + .map_err(|e| { error!("Failed to initialize file service with configured storage backend: {}", e); - warn!("Falling back to local storage only"); - readur::services::file_service::FileService::new(config.upload_path.clone()) - } - }; + e + })?; + info!("✅ File service initialized with {} storage backend", file_service.storage_type()); // Initialize the storage backend (creates directories, validates access, etc.) if let Err(e) = file_service.initialize_storage().await { @@ -143,6 +138,9 @@ async fn main() -> anyhow::Result<()> { } info!("✅ Storage backend initialized successfully"); + // Wrap file service in Arc for sharing across application state + let file_service = std::sync::Arc::new(file_service); + // Migrate existing files to new structure (one-time operation) info!("Migrating existing files to structured directories..."); if let Err(e) = file_service.migrate_existing_files().await { @@ -327,7 +325,8 @@ async fn main() -> anyhow::Result<()> { let shared_queue_service = Arc::new(readur::ocr::queue::OcrQueueService::new( background_db.clone(), background_db.get_pool().clone(), - concurrent_jobs + concurrent_jobs, + file_service.clone() )); // Initialize OIDC client if enabled @@ -365,6 +364,7 @@ async fn main() -> anyhow::Result<()> { let web_state = AppState { db: web_db, config: config.clone(), + file_service: file_service.clone(), webdav_scheduler: None, // Will be set after creating scheduler source_scheduler: None, // Will be set after creating scheduler queue_service: shared_queue_service.clone(), @@ -378,6 +378,7 @@ async fn main() -> anyhow::Result<()> { let background_state = AppState { db: background_db, config: config.clone(), + file_service: file_service.clone(), webdav_scheduler: None, source_scheduler: None, queue_service: shared_queue_service.clone(), @@ -389,8 +390,9 @@ async fn main() -> anyhow::Result<()> { let watcher_config = config.clone(); let watcher_db = background_state.db.clone(); + let watcher_file_service = background_state.file_service.clone(); tokio::spawn(async move { - if let Err(e) = readur::scheduling::watcher::start_folder_watcher(watcher_config, watcher_db).await { + if let Err(e) = readur::scheduling::watcher::start_folder_watcher(watcher_config, watcher_db, watcher_file_service).await { error!("Folder watcher error: {}", e); } }); @@ -461,6 +463,7 @@ async fn main() -> anyhow::Result<()> { let updated_web_state = AppState { db: web_state.db.clone(), config: web_state.config.clone(), + file_service: file_service.clone(), webdav_scheduler: Some(webdav_scheduler.clone()), source_scheduler: Some(source_scheduler.clone()), queue_service: shared_queue_service.clone(), diff --git a/src/ocr/enhanced.rs b/src/ocr/enhanced.rs index 9af1667..3d355b4 100644 --- a/src/ocr/enhanced.rs +++ b/src/ocr/enhanced.rs @@ -41,10 +41,17 @@ pub struct EnhancedOcrService { } impl EnhancedOcrService { - pub fn new(temp_dir: String) -> Self { + pub fn new(temp_dir: String, file_service: FileService) -> Self { + Self { temp_dir, file_service } + } + + /// Backward-compatible constructor for tests and legacy code + /// Creates a FileService with local storage using UPLOAD_PATH env var + #[deprecated(note = "Use new() with FileService parameter instead")] + pub fn new_legacy(temp_dir: String) -> Self { let upload_path = std::env::var("UPLOAD_PATH").unwrap_or_else(|_| "./uploads".to_string()); let file_service = FileService::new(upload_path); - Self { temp_dir, file_service } + Self::new(temp_dir, file_service) } /// Extract text from image with high-quality OCR settings @@ -72,12 +79,11 @@ impl EnhancedOcrService { let ocr_result = tokio::task::spawn_blocking(move || -> Result<(String, f32)> { // Configure Tesseract with optimal settings - let ocr_service = EnhancedOcrService::new(temp_dir); - let mut tesseract = ocr_service.configure_tesseract(&processed_image_path_clone, &settings_clone)?; + let mut tesseract = Self::configure_tesseract_static(&processed_image_path_clone, &settings_clone)?; // Extract text with confidence let text = tesseract.get_text()?.trim().to_string(); - let confidence = ocr_service.calculate_overall_confidence(&mut tesseract)?; + let confidence = Self::calculate_overall_confidence_static(&mut tesseract)?; Ok((text, confidence)) }).await??; @@ -1632,4 +1638,85 @@ fn is_valid_pdf(data: &[u8]) -> bool { } false +} + +impl EnhancedOcrService { + /// Static version of configure_tesseract for use in spawn_blocking + #[cfg(feature = "ocr")] + fn configure_tesseract_static(image_path: &str, settings: &Settings) -> Result { + let language_combination = Self::build_language_combination_static(settings); + let mut tesseract = Tesseract::new(None, Some(&language_combination))?; + + // Set the image + tesseract = tesseract.set_image(image_path)?; + + // Configure Page Segmentation Mode (PSM) + let psm = match settings.ocr_page_segmentation_mode { + 0 => PageSegMode::PsmOsdOnly, + 1 => PageSegMode::PsmAutoOsd, + 2 => PageSegMode::PsmAutoOnly, + 3 => PageSegMode::PsmAuto, + 4 => PageSegMode::PsmSingleColumn, + 5 => PageSegMode::PsmSingleBlockVertText, + 6 => PageSegMode::PsmSingleBlock, + 7 => PageSegMode::PsmSingleLine, + 8 => PageSegMode::PsmSingleWord, + 9 => PageSegMode::PsmCircleWord, + 10 => PageSegMode::PsmSingleChar, + 11 => PageSegMode::PsmSparseText, + 12 => PageSegMode::PsmSparseTextOsd, + 13 => PageSegMode::PsmRawLine, + _ => PageSegMode::PsmAuto, // Default fallback + }; + tesseract.set_page_seg_mode(psm); + + // Configure OCR Engine Mode (OEM) + let _oem = match settings.ocr_engine_mode { + 0 => OcrEngineMode::TesseractOnly, + 1 => OcrEngineMode::LstmOnly, + 2 => OcrEngineMode::TesseractOnly, // Fallback since TesseractLstm doesn't exist + 3 => OcrEngineMode::Default, + _ => OcrEngineMode::Default, // Default fallback + }; + + Ok(tesseract) + } + + /// Static version of calculate_overall_confidence for use in spawn_blocking + #[cfg(feature = "ocr")] + fn calculate_overall_confidence_static(tesseract: &mut Tesseract) -> Result { + // Use Tesseract's built-in mean confidence calculation + let confidence = tesseract.mean_text_conf(); + + // Convert from i32 to f32 and ensure it's within valid range + let confidence_f32 = confidence as f32; + + // Clamp confidence to valid range (0.0 to 100.0) + let clamped_confidence = confidence_f32.max(0.0).min(100.0); + + debug!("Tesseract confidence: {} -> {:.1}%", confidence, clamped_confidence); + + Ok(clamped_confidence) + } + + /// Static version of build_language_combination for use in spawn_blocking + fn build_language_combination_static(settings: &Settings) -> String { + if settings.preferred_languages.len() > 1 { + // Use preferred_languages with primary_language first + let mut languages = settings.preferred_languages.clone(); + + // Ensure primary language is first + languages.retain(|lang| lang != &settings.primary_language); + languages.insert(0, settings.primary_language.clone()); + + // Join with + for Tesseract multi-language format + languages.join("+") + } else if !settings.preferred_languages.is_empty() { + // Single language from preferred_languages + settings.preferred_languages[0].clone() + } else { + // Fallback to ocr_language field for backward compatibility + settings.ocr_language.clone() + } + } } \ No newline at end of file diff --git a/src/ocr/queue.rs b/src/ocr/queue.rs index 8bccdf9..d08c935 100644 --- a/src/ocr/queue.rs +++ b/src/ocr/queue.rs @@ -47,10 +47,11 @@ pub struct OcrQueueService { transaction_manager: DocumentTransactionManager, processing_throttler: Arc, is_paused: Arc, + file_service: std::sync::Arc, } impl OcrQueueService { - pub fn new(db: Database, pool: PgPool, max_concurrent_jobs: usize) -> Self { + pub fn new(db: Database, pool: PgPool, max_concurrent_jobs: usize, file_service: std::sync::Arc) -> Self { let worker_id = format!("worker-{}-{}", hostname::get().unwrap_or_default().to_string_lossy(), Uuid::new_v4()); let transaction_manager = DocumentTransactionManager::new(pool.clone()); @@ -70,8 +71,18 @@ impl OcrQueueService { transaction_manager, processing_throttler, is_paused: Arc::new(AtomicBool::new(false)), + file_service, } } + + /// Backward-compatible constructor for tests and legacy code + /// Creates a FileService with local storage using UPLOAD_PATH env var + #[deprecated(note = "Use new() with FileService parameter instead")] + pub fn new_legacy(db: Database, pool: PgPool, max_concurrent_jobs: usize) -> Self { + let upload_path = std::env::var("UPLOAD_PATH").unwrap_or_else(|_| "./uploads".to_string()); + let file_service = std::sync::Arc::new(crate::services::file_service::FileService::new(upload_path)); + Self::new(db, pool, max_concurrent_jobs, file_service) + } /// Add a document to the OCR queue pub async fn enqueue_document(&self, document_id: Uuid, priority: i32, file_size: i64) -> Result { @@ -609,7 +620,7 @@ impl OcrQueueService { /// Start the worker loop pub async fn start_worker(self: Arc) -> Result<()> { let semaphore = Arc::new(Semaphore::new(self.max_concurrent_jobs)); - let ocr_service = Arc::new(EnhancedOcrService::new("/tmp".to_string())); + let ocr_service = Arc::new(EnhancedOcrService::new("/tmp".to_string(), (*self.file_service).clone())); info!( "Starting OCR worker {} with {} concurrent jobs", @@ -705,10 +716,7 @@ impl OcrQueueService { use std::path::Path; // Use the FileService to get the proper processed images directory - use crate::services::file_service::FileService; - let base_upload_dir = std::env::var("UPLOAD_PATH").unwrap_or_else(|_| "uploads".to_string()); - let file_service = FileService::new(base_upload_dir); - let processed_images_dir = file_service.get_processed_images_path(); + let processed_images_dir = self.file_service.get_processed_images_path(); // Ensure the directory exists with proper error handling if let Err(e) = tokio::fs::create_dir_all(&processed_images_dir).await { diff --git a/src/routes/documents/bulk.rs b/src/routes/documents/bulk.rs index c58e05e..c3a2de8 100644 --- a/src/routes/documents/bulk.rs +++ b/src/routes/documents/bulk.rs @@ -78,7 +78,7 @@ pub async fn bulk_delete_documents( })?; // Delete associated files - let file_service = FileService::new(state.config.upload_path.clone()); + let file_service = &state.file_service; let mut files_deleted = 0; let mut files_failed = 0; @@ -196,7 +196,7 @@ pub async fn delete_low_confidence_documents( })?; // Delete associated files - let file_service = FileService::new(state.config.upload_path.clone()); + let file_service = &state.file_service; let mut files_deleted = 0; let mut files_failed = 0; @@ -282,7 +282,7 @@ pub async fn delete_failed_ocr_documents( })?; // Delete associated files - let file_service = FileService::new(state.config.upload_path.clone()); + let file_service = &state.file_service; let mut files_deleted = 0; let mut files_failed = 0; diff --git a/src/routes/documents/crud.rs b/src/routes/documents/crud.rs index 3d102ef..78d50f1 100644 --- a/src/routes/documents/crud.rs +++ b/src/routes/documents/crud.rs @@ -202,10 +202,10 @@ pub async fn upload_document( } // Create ingestion service - let file_service = FileService::new(state.config.upload_path.clone()); + let file_service = &state.file_service; let ingestion_service = DocumentIngestionService::new( state.db.clone(), - file_service, + (**file_service).clone(), ); debug!("[UPLOAD_DEBUG] Calling ingestion service for file: {}", filename); @@ -541,7 +541,7 @@ pub async fn delete_document( } // Delete associated files - let file_service = FileService::new(state.config.upload_path.clone()); + let file_service = &state.file_service; if let Err(e) = file_service.delete_document_files(&document).await { warn!("Failed to delete files for document {}: {}", document_id, e); // Continue anyway - database deletion succeeded @@ -584,7 +584,7 @@ pub async fn download_document( })? .ok_or(StatusCode::NOT_FOUND)?; - let file_service = FileService::new(state.config.upload_path.clone()); + let file_service = &state.file_service; let file_data = file_service .read_file(&document.file_path) .await @@ -641,7 +641,7 @@ pub async fn view_document( })? .ok_or(StatusCode::NOT_FOUND)?; - let file_service = FileService::new(state.config.upload_path.clone()); + let file_service = &state.file_service; let file_data = file_service .read_file(&document.file_path) .await diff --git a/src/routes/documents/debug.rs b/src/routes/documents/debug.rs index 520ea88..1b19dbc 100644 --- a/src/routes/documents/debug.rs +++ b/src/routes/documents/debug.rs @@ -46,7 +46,7 @@ pub async fn get_document_debug_info( })? .ok_or(StatusCode::NOT_FOUND)?; - let file_service = FileService::new(state.config.upload_path.clone()); + let file_service = &state.file_service; // Check file existence and readability let file_exists = tokio::fs::metadata(&document.file_path).await.is_ok(); @@ -147,7 +147,7 @@ pub async fn get_document_thumbnail( })? .ok_or(StatusCode::NOT_FOUND)?; - let file_service = FileService::new(state.config.upload_path.clone()); + let file_service = &state.file_service; // Use the FileService to get or generate thumbnail #[cfg(feature = "ocr")] @@ -218,7 +218,7 @@ pub async fn get_processed_image( return Err(StatusCode::BAD_REQUEST); } - let file_service = FileService::new(state.config.upload_path.clone()); + let file_service = &state.file_service; // Try to read processed image from the processed directory let processed_path = format!("{}/processed/{}.png", state.config.upload_path, document.id); @@ -316,7 +316,7 @@ pub async fn validate_document_integrity( })? .ok_or(StatusCode::NOT_FOUND)?; - let file_service = FileService::new(state.config.upload_path.clone()); + let file_service = &state.file_service; let mut issues = Vec::new(); let mut checks = Vec::new(); diff --git a/src/routes/documents/failed.rs b/src/routes/documents/failed.rs index 7bdabc2..43c3837 100644 --- a/src/routes/documents/failed.rs +++ b/src/routes/documents/failed.rs @@ -398,7 +398,7 @@ pub async fn view_failed_document( // Check if file_path exists (some failed documents might not have been saved) let file_path = file_path.ok_or(StatusCode::NOT_FOUND)?; - let file_service = FileService::new(state.config.upload_path.clone()); + let file_service = &state.file_service; let file_data = file_service .read_file(&file_path) .await diff --git a/src/routes/metrics.rs b/src/routes/metrics.rs index 492ea2a..5305d30 100644 --- a/src/routes/metrics.rs +++ b/src/routes/metrics.rs @@ -136,11 +136,7 @@ async fn collect_ocr_metrics(state: &Arc) -> Result) -> Result Result, StatusCode> { require_admin(&auth_user)?; - let queue_service = OcrQueueService::new(state.db.clone(), state.db.get_pool().clone(), 1); + let queue_service = &*state.queue_service; let stats = queue_service .get_stats() @@ -83,7 +83,7 @@ async fn requeue_failed( auth_user: AuthUser, ) -> Result, StatusCode> { require_admin(&auth_user)?; - let queue_service = OcrQueueService::new(state.db.clone(), state.db.get_pool().clone(), 1); + let queue_service = &*state.queue_service; let count = match queue_service.requeue_failed_items().await { Ok(count) => count, diff --git a/src/routes/webdav/webdav_sync.rs b/src/routes/webdav/webdav_sync.rs index 5726949..29f77d1 100644 --- a/src/routes/webdav/webdav_sync.rs +++ b/src/routes/webdav/webdav_sync.rs @@ -315,8 +315,8 @@ async fn process_single_file( debug!("Downloaded file: {} ({} bytes)", file_info.name, file_data.len()); // Use the unified ingestion service for consistent deduplication - let file_service = FileService::new(state.config.upload_path.clone()); - let ingestion_service = DocumentIngestionService::new(state.db.clone(), file_service); + let file_service = &state.file_service; + let ingestion_service = DocumentIngestionService::new(state.db.clone(), (**file_service).clone()); let result = if let Some(source_id) = webdav_source_id { ingestion_service diff --git a/src/scheduling/source_sync.rs b/src/scheduling/source_sync.rs index b959718..7c81cc7 100644 --- a/src/scheduling/source_sync.rs +++ b/src/scheduling/source_sync.rs @@ -605,7 +605,7 @@ impl SourceSyncService { debug!("Downloaded file: {} ({} bytes)", file_info.name, file_data.len()); // Use the unified ingestion service for consistent deduplication - let file_service = FileService::new(state.config.upload_path.clone()); + let file_service = (*state.file_service).clone(); let ingestion_service = DocumentIngestionService::new(state.db.clone(), file_service); let result = ingestion_service @@ -709,7 +709,7 @@ impl SourceSyncService { } // Use the unified ingestion service for consistent deduplication - let file_service = FileService::new(state.config.upload_path.clone()); + let file_service = (*state.file_service).clone(); let ingestion_service = DocumentIngestionService::new(state.db.clone(), file_service); let result = ingestion_service diff --git a/src/scheduling/watcher.rs b/src/scheduling/watcher.rs index 603f71b..627f030 100644 --- a/src/scheduling/watcher.rs +++ b/src/scheduling/watcher.rs @@ -19,7 +19,7 @@ use crate::{ models::FileIngestionInfo, }; -pub async fn start_folder_watcher(config: Config, db: Database) -> Result<()> { +pub async fn start_folder_watcher(config: Config, db: Database, file_service: std::sync::Arc) -> Result<()> { info!("Starting hybrid folder watcher on: {}", config.watch_folder); info!("Upload path configured as: {}", config.upload_path); @@ -36,9 +36,8 @@ pub async fn start_folder_watcher(config: Config, db: Database) -> Result<()> { info!("Watch folder canonical path: {:?}", watch_canonical); info!("Upload folder canonical path: {:?}", upload_canonical); - // Initialize services with shared database - let file_service = FileService::new(config.upload_path.clone()); - let queue_service = OcrQueueService::new(db.clone(), db.get_pool().clone(), 1); + // Use the provided file service + let queue_service = OcrQueueService::new(db.clone(), db.get_pool().clone(), 1, file_service.clone()); // Initialize user watch components if enabled let user_watch_manager = if config.enable_per_user_watch { @@ -127,7 +126,7 @@ async fn determine_watch_strategy(path: &Path) -> Result { async fn start_notify_watcher( config: Config, db: Database, - file_service: FileService, + file_service: std::sync::Arc, queue_service: OcrQueueService, user_watch_manager: Option, ) -> Result<()> { @@ -176,7 +175,7 @@ async fn start_notify_watcher( async fn start_polling_watcher( config: Config, db: Database, - file_service: FileService, + file_service: std::sync::Arc, queue_service: OcrQueueService, user_watch_manager: Option, ) -> Result<()> { diff --git a/src/services/file_service.rs b/src/services/file_service.rs index 422b9fb..1c488d7 100644 --- a/src/services/file_service.rs +++ b/src/services/file_service.rs @@ -25,6 +25,7 @@ pub struct FileService { impl FileService { /// Create a new FileService with local storage (backward compatible) + #[deprecated(note = "Use from_config() with storage factory pattern instead")] pub fn new(upload_path: String) -> Self { use crate::storage::local::LocalStorageBackend; let local_backend = LocalStorageBackend::new(upload_path.clone()); @@ -36,6 +37,7 @@ impl FileService { } /// Create a new FileService with S3 storage (backward compatible) + #[deprecated(note = "Use from_config() with storage factory pattern instead")] pub fn new_with_s3(upload_path: String, s3_service: Arc) -> Self { let storage_backend = s3_service.clone() as Arc; Self { diff --git a/src/storage/local.rs b/src/storage/local.rs index 520df96..45bdb96 100644 --- a/src/storage/local.rs +++ b/src/storage/local.rs @@ -193,21 +193,46 @@ impl StorageBackend for LocalStorageBackend { } } - // Delete main document file (try to find it first) + // Try multiple strategies to find and delete the main document file let extension = Path::new(filename) .extension() .and_then(|ext| ext.to_str()) .unwrap_or(""); + // Strategy 1: Try document ID-based filename (new structured approach) let document_filename = if extension.is_empty() { document_id.to_string() } else { format!("{}.{}", document_id, extension) }; + let main_file_structured = self.get_documents_path().join(&document_filename); - let main_file = self.get_documents_path().join(&document_filename); - if let Some(deleted_path) = safe_delete(&main_file, &mut serious_errors).await { - deleted_files.push(deleted_path); + // Strategy 2: Try original filename in documents directory + let main_file_original = self.get_documents_path().join(filename); + + // Strategy 3: Try in the base upload directory (legacy flat structure) + let main_file_legacy = Path::new(&self.upload_path).join(filename); + + // Try to delete main document file using all strategies + let main_file_candidates = [ + &main_file_structured, + &main_file_original, + &main_file_legacy, + ]; + + let mut main_file_deleted = false; + for candidate_path in &main_file_candidates { + if candidate_path.exists() { + if let Some(deleted_path) = safe_delete(candidate_path, &mut serious_errors).await { + deleted_files.push(deleted_path); + main_file_deleted = true; + break; // Only delete the first match we find + } + } + } + + if !main_file_deleted { + info!("Main document file not found in any expected location for document {}", document_id); } // Delete thumbnail if it exists diff --git a/src/test_utils.rs b/src/test_utils.rs index 5179f91..e8f7a38 100644 --- a/src/test_utils.rs +++ b/src/test_utils.rs @@ -8,7 +8,7 @@ use std::path::Path; #[cfg(any(test, feature = "test-utils"))] use std::sync::Arc; #[cfg(any(test, feature = "test-utils"))] -use crate::{AppState, models::UserResponse}; +use crate::{AppState, models::UserResponse, storage::StorageConfig}; #[cfg(any(test, feature = "test-utils"))] use axum::Router; #[cfg(any(test, feature = "test-utils"))] @@ -272,7 +272,6 @@ impl TestContext { }; let config = config_builder.build(database_url); - let queue_service = Arc::new(crate::ocr::queue::OcrQueueService::new(db.clone(), db.pool.clone(), 2)); let user_watch_service = if config.enable_per_user_watch { Some(Arc::new(crate::services::user_watch_service::UserWatchService::new(&config.user_watch_base_dir))) @@ -280,9 +279,28 @@ impl TestContext { None }; + // Create FileService with local storage for testing + let storage_config = StorageConfig::Local { + upload_path: config.upload_path.clone() + }; + let storage_backend = crate::storage::factory::create_storage_backend(storage_config).await + .expect("Failed to create storage backend for tests"); + let file_service = Arc::new(crate::services::file_service::FileService::with_storage( + config.upload_path.clone(), + storage_backend + )); + + let queue_service = Arc::new(crate::ocr::queue::OcrQueueService::new( + db.clone(), + db.pool.clone(), + 2, + file_service.clone() + )); + let state = Arc::new(AppState { db, config, + file_service, webdav_scheduler: None, source_scheduler: None, queue_service, diff --git a/src/tests/ocr_tests.rs b/src/tests/ocr_tests.rs index 61bbb0c..1da8d93 100644 --- a/src/tests/ocr_tests.rs +++ b/src/tests/ocr_tests.rs @@ -643,9 +643,14 @@ This tests the error handling for files that aren't actually PDFs."; async fn test_enhanced_ocr_panic_handling() { use crate::ocr::enhanced::EnhancedOcrService; use crate::services::file_service::FileService; + use crate::storage::factory::create_storage_backend; + use crate::storage::StorageConfig; use crate::models::Settings; - let ocr_service = EnhancedOcrService::new("tests".to_string()); + let storage_config = StorageConfig::Local { upload_path: "tests".to_string() }; + let storage_backend = create_storage_backend(storage_config).await.unwrap(); + let file_service = FileService::with_storage("tests".to_string(), storage_backend); + let ocr_service = EnhancedOcrService::new("tests".to_string(), file_service); let settings = Settings::default(); // Test all malformed PDFs with enhanced OCR diff --git a/tests/integration_file_service_tests.rs b/tests/integration_file_service_tests.rs index 983371b..de89581 100644 --- a/tests/integration_file_service_tests.rs +++ b/tests/integration_file_service_tests.rs @@ -3,6 +3,10 @@ use readur::services::file_service::FileService; #[cfg(test)] use readur::models::Document; #[cfg(test)] +use readur::storage::factory::create_storage_backend; +#[cfg(test)] +use readur::storage::StorageConfig; +#[cfg(test)] use std::fs; #[cfg(test)] use tempfile::TempDir; @@ -10,10 +14,12 @@ use tempfile::TempDir; use uuid::Uuid; #[cfg(test)] -fn create_test_file_service() -> (FileService, TempDir) { +async fn create_test_file_service() -> (FileService, TempDir) { let temp_dir = TempDir::new().unwrap(); let upload_path = temp_dir.path().to_string_lossy().to_string(); - let service = FileService::new(upload_path); + let storage_config = StorageConfig::Local { upload_path: upload_path.clone() }; + let storage_backend = create_storage_backend(storage_config).await.unwrap(); + let service = FileService::with_storage(upload_path, storage_backend); (service, temp_dir) } @@ -23,7 +29,7 @@ mod tests { #[tokio::test] async fn test_save_file() { - let (service, _temp_dir) = create_test_file_service(); + let (service, _temp_dir) = create_test_file_service().await; let filename = "test.txt"; let data = b"Hello, World!"; @@ -39,7 +45,7 @@ mod tests { #[tokio::test] async fn test_save_file_with_extension() { - let (service, _temp_dir) = create_test_file_service(); + let (service, _temp_dir) = create_test_file_service().await; let filename = "document.pdf"; let data = b"PDF content"; @@ -52,7 +58,7 @@ mod tests { #[tokio::test] async fn test_save_file_without_extension() { - let (service, _temp_dir) = create_test_file_service(); + let (service, _temp_dir) = create_test_file_service().await; let filename = "document"; let data = b"Some content"; @@ -69,9 +75,9 @@ mod tests { assert!(!filename_part.contains('.')); } - #[test] - fn test_create_document() { - let (service, _temp_dir) = create_test_file_service(); + #[tokio::test] + async fn test_create_document() { + let (service, _temp_dir) = create_test_file_service().await; let user_id = Uuid::new_v4(); let document = service.create_document( @@ -105,9 +111,9 @@ mod tests { assert!(document.tags.is_empty()); } - #[test] - fn test_is_allowed_file_type() { - let (service, _temp_dir) = create_test_file_service(); + #[tokio::test] + async fn test_is_allowed_file_type() { + let (service, _temp_dir) = create_test_file_service().await; let allowed_types = vec![ "pdf".to_string(), "txt".to_string(), @@ -127,7 +133,7 @@ mod tests { #[tokio::test] async fn test_read_file() { - let (service, _temp_dir) = create_test_file_service(); + let (service, _temp_dir) = create_test_file_service().await; let filename = "test.txt"; let original_data = b"Hello, World!"; @@ -142,7 +148,7 @@ mod tests { #[tokio::test] async fn test_read_nonexistent_file() { - let (service, _temp_dir) = create_test_file_service(); + let (service, _temp_dir) = create_test_file_service().await; let nonexistent_path = "/path/to/nonexistent/file.txt"; let result = service.read_file(nonexistent_path).await; @@ -221,7 +227,7 @@ mod file_deletion_tests { #[tokio::test] async fn test_delete_document_files_success() { - let (service, temp_dir) = create_test_file_service(); + let (service, temp_dir) = create_test_file_service().await; let user_id = uuid::Uuid::new_v4(); let (document, main_path, thumb_path, processed_path) = @@ -246,7 +252,7 @@ mod file_deletion_tests { #[tokio::test] async fn test_delete_document_files_main_file_missing() { - let (service, temp_dir) = create_test_file_service(); + let (service, temp_dir) = create_test_file_service().await; let user_id = uuid::Uuid::new_v4(); let (document, main_path, thumb_path, processed_path) = @@ -271,7 +277,7 @@ mod file_deletion_tests { #[tokio::test] async fn test_delete_document_files_thumbnail_missing() { - let (service, temp_dir) = create_test_file_service(); + let (service, temp_dir) = create_test_file_service().await; let user_id = uuid::Uuid::new_v4(); let (document, main_path, thumb_path, processed_path) = @@ -296,7 +302,7 @@ mod file_deletion_tests { #[tokio::test] async fn test_delete_document_files_processed_missing() { - let (service, temp_dir) = create_test_file_service(); + let (service, temp_dir) = create_test_file_service().await; let user_id = uuid::Uuid::new_v4(); let (document, main_path, thumb_path, processed_path) = @@ -321,7 +327,7 @@ mod file_deletion_tests { #[tokio::test] async fn test_delete_document_files_all_missing() { - let (service, _temp_dir) = create_test_file_service(); + let (service, _temp_dir) = create_test_file_service().await; let user_id = uuid::Uuid::new_v4(); let document = Document { @@ -364,7 +370,7 @@ mod file_deletion_tests { #[tokio::test] async fn test_delete_document_files_with_different_extensions() { - let (service, temp_dir) = create_test_file_service(); + let (service, temp_dir) = create_test_file_service().await; let user_id = uuid::Uuid::new_v4(); let document_id = uuid::Uuid::new_v4(); @@ -436,7 +442,7 @@ mod file_deletion_tests { #[tokio::test] async fn test_delete_document_files_partial_failure_continues() { - let (service, temp_dir) = create_test_file_service(); + let (service, temp_dir) = create_test_file_service().await; let user_id = uuid::Uuid::new_v4(); let document_id = uuid::Uuid::new_v4(); @@ -499,7 +505,7 @@ mod file_deletion_tests { #[tokio::test] async fn test_delete_document_files_with_no_extension() { - let (service, temp_dir) = create_test_file_service(); + let (service, temp_dir) = create_test_file_service().await; let user_id = uuid::Uuid::new_v4(); let base_path = temp_dir.path().join("documents"); @@ -555,7 +561,7 @@ mod file_deletion_tests { #[tokio::test] async fn test_delete_document_files_concurrent_calls() { - let (service, temp_dir) = create_test_file_service(); + let (service, temp_dir) = create_test_file_service().await; let user_id = uuid::Uuid::new_v4(); let (document, main_path, thumb_path, processed_path) = diff --git a/tests/integration_hash_duplicate_detection_tests.rs b/tests/integration_hash_duplicate_detection_tests.rs index 05f0596..679eaaa 100644 --- a/tests/integration_hash_duplicate_detection_tests.rs +++ b/tests/integration_hash_duplicate_detection_tests.rs @@ -7,6 +7,8 @@ use tempfile::TempDir; use readur::{ db::Database, services::file_service::FileService, + storage::factory::create_storage_backend, + storage::StorageConfig, models::{Document, CreateUser, UserRole}, test_utils::TestContext, }; @@ -308,7 +310,9 @@ fn test_calculate_file_hash_empty_content() { async fn test_file_service_create_document_with_hash() { let temp_dir = TempDir::new().unwrap(); let upload_path = temp_dir.path().to_string_lossy().to_string(); - let file_service = FileService::new(upload_path); + let storage_config = StorageConfig::Local { upload_path: upload_path.clone() }; + let storage_backend = create_storage_backend(storage_config).await.unwrap(); + let file_service = FileService::with_storage(upload_path, storage_backend); let user_id = Uuid::new_v4(); let test_hash = "test_hash_1234567890"; @@ -341,7 +345,9 @@ async fn test_file_service_create_document_with_hash() { async fn test_file_service_create_document_without_hash() { let temp_dir = TempDir::new().unwrap(); let upload_path = temp_dir.path().to_string_lossy().to_string(); - let file_service = FileService::new(upload_path); + let storage_config = StorageConfig::Local { upload_path: upload_path.clone() }; + let storage_backend = create_storage_backend(storage_config).await.unwrap(); + let file_service = FileService::with_storage(upload_path, storage_backend); let user_id = Uuid::new_v4(); let document = file_service.create_document( diff --git a/tests/integration_ocr_pipeline_integration_test.rs b/tests/integration_ocr_pipeline_integration_test.rs index 9b61831..799598a 100644 --- a/tests/integration_ocr_pipeline_integration_test.rs +++ b/tests/integration_ocr_pipeline_integration_test.rs @@ -17,6 +17,8 @@ use readur::{ db::Database, models::Document, services::file_service::FileService, + storage::factory::create_storage_backend, + storage::StorageConfig, ocr::enhanced::EnhancedOcrService, ocr::queue::{OcrQueueService, OcrQueueItem}, db_guardrails_simple::DocumentTransactionManager, @@ -47,9 +49,14 @@ impl OCRPipelineTestHarness { let db = Database::new(&database_url).await?; // Initialize services - let file_service = FileService::new("./test_uploads".to_string()); - let ocr_service = EnhancedOcrService::new("/tmp".to_string()); - let queue_service = OcrQueueService::new(db.clone(), pool.clone(), 4); + let upload_path = "./test_uploads".to_string(); + let storage_config = StorageConfig::Local { + upload_path: upload_path.clone() + }; + let storage_backend = create_storage_backend(storage_config).await?; + let file_service = FileService::with_storage(upload_path, storage_backend); + let ocr_service = EnhancedOcrService::new("/tmp".to_string(), file_service.clone()); + let queue_service = OcrQueueService::new(db.clone(), pool.clone(), 4, std::sync::Arc::new(file_service)); let transaction_manager = DocumentTransactionManager::new(pool.clone()); // Ensure test upload directory exists diff --git a/tests/s3_storage_tests.rs b/tests/s3_storage_tests.rs index c3c3e15..76f5886 100644 --- a/tests/s3_storage_tests.rs +++ b/tests/s3_storage_tests.rs @@ -3,6 +3,8 @@ use std::sync::Arc; use readur::services::file_service::FileService; +use readur::storage::factory::create_storage_backend; +use readur::config::StorageConfig; #[cfg(feature = "s3")] use readur::services::s3_service::S3Service; @@ -35,8 +37,11 @@ async fn test_s3_service_new_validation() { async fn test_file_service_local_creation() { // Test local-only FileService creation and functionality let upload_path = "./test_uploads".to_string(); - let local_service = FileService::new(upload_path); - assert!(!local_service.is_s3_enabled()); + let storage_config = StorageConfig::Local { upload_path }; + let storage_backend = create_storage_backend(storage_config).await.unwrap(); + let _local_service = FileService::with_storage(storage_backend); + // Note: is_s3_enabled() method is no longer available in the new architecture + // as we use trait-based abstraction instead of conditional logic } #[cfg(feature = "s3")] @@ -65,9 +70,14 @@ async fn test_s3_service_configuration() { assert_eq!(service.get_config().region, "us-east-1"); assert_eq!(service.get_config().watch_folders.len(), 1); - // Test FileService integration - let s3_file_service = FileService::new_with_s3("./test".to_string(), Arc::new(service)); - assert!(s3_file_service.is_s3_enabled()); + // Test FileService integration with S3 storage backend + #[cfg(feature = "s3")] + { + let storage_backend = Arc::new(service) as Arc; + let _s3_file_service = FileService::with_storage(storage_backend); + // Note: is_s3_enabled() method is no longer available in the new architecture + // as we use trait-based abstraction instead of conditional logic + } } Err(_) => { // Expected to fail since we don't have a real S3 endpoint