diff --git a/src/batch_ingest.rs b/src/batch_ingest.rs index d930ef7..5e3e11c 100644 --- a/src/batch_ingest.rs +++ b/src/batch_ingest.rs @@ -6,12 +6,12 @@ use tokio::sync::Semaphore; use tracing::{error, info, warn}; use uuid::Uuid; use walkdir::WalkDir; -use sha2::{Sha256, Digest}; use crate::{ config::Config, db::Database, file_service::FileService, + document_ingestion::{DocumentIngestionService, IngestionResult}, ocr_queue::OcrQueueService, }; @@ -189,47 +189,36 @@ async fn process_single_file( // Read file data let file_data = fs::read(&path).await?; - // Calculate file hash for deduplication - let file_hash = calculate_file_hash(&file_data); - - // Check for duplicate content using efficient hash lookup - match db.get_document_by_user_and_hash(user_id, &file_hash).await { - Ok(Some(existing_doc)) => { - info!("Skipping duplicate file: {} matches existing document {} (hash: {})", - filename, existing_doc.original_filename, &file_hash[..8]); - return Ok(None); // Skip processing duplicate - } - Ok(None) => { - info!("No duplicate content found for hash {}, proceeding with file processing", &file_hash[..8]); - } - Err(e) => { - warn!("Error checking for duplicate hash {}: {}", &file_hash[..8], e); - // Continue processing even if duplicate check fails - } - } - let mime_type = mime_guess::from_path(&filename) .first_or_octet_stream() .to_string(); - // Save file - let file_path = file_service.save_file(&filename, &file_data).await?; + // Use the unified ingestion service for consistent deduplication + let ingestion_service = DocumentIngestionService::new(db, file_service); - // Create document with hash - let document = file_service.create_document( - &filename, - &filename, - &file_path, - file_size, - &mime_type, - user_id, - Some(file_hash), - ); - - // Save to database (without OCR) - let created_doc = db.create_document(document).await?; - - Ok(Some((created_doc.id, file_size))) + let result = ingestion_service + .ingest_batch_file(&filename, file_data, &mime_type, user_id) + .await + .map_err(|e| anyhow::anyhow!(e))?; + + match result { + IngestionResult::Created(doc) => { + info!("Created new document for batch file {}: {}", filename, doc.id); + Ok(Some((doc.id, file_size))) + } + IngestionResult::Skipped { existing_document_id, reason } => { + info!("Skipped duplicate batch file {}: {} (existing: {})", filename, reason, existing_document_id); + Ok(None) // File was skipped due to deduplication + } + IngestionResult::ExistingDocument(doc) => { + info!("Found existing document for batch file {}: {}", filename, doc.id); + Ok(None) // Don't re-queue for OCR + } + IngestionResult::TrackedAsDuplicate { existing_document_id } => { + info!("Tracked batch file {} as duplicate of existing document: {}", filename, existing_document_id); + Ok(None) // File was tracked as duplicate + } + } } fn calculate_priority(file_size: i64) -> i32 { @@ -247,9 +236,3 @@ fn calculate_priority(file_size: i64) -> i32 { } } -fn calculate_file_hash(data: &[u8]) -> String { - let mut hasher = Sha256::new(); - hasher.update(data); - let result = hasher.finalize(); - format!("{:x}", result) -} \ No newline at end of file diff --git a/src/routes/webdav.rs b/src/routes/webdav.rs index b5c147e..197c2f3 100644 --- a/src/routes/webdav.rs +++ b/src/routes/webdav.rs @@ -346,7 +346,7 @@ async fn start_webdav_sync( let enable_background_ocr = user_settings.enable_background_ocr; tokio::spawn(async move { - match perform_webdav_sync_with_tracking(state_clone.clone(), user_id, webdav_service, webdav_config, enable_background_ocr).await { + match perform_webdav_sync_with_tracking(state_clone.clone(), user_id, webdav_service, webdav_config, enable_background_ocr, None).await { Ok(files_processed) => { info!("WebDAV sync completed successfully for user {}: {} files processed", user_id, files_processed); diff --git a/src/routes/webdav/webdav_sync.rs b/src/routes/webdav/webdav_sync.rs index a558837..c2fda18 100644 --- a/src/routes/webdav/webdav_sync.rs +++ b/src/routes/webdav/webdav_sync.rs @@ -4,12 +4,12 @@ use tracing::{error, info, warn}; use chrono::Utc; use tokio::sync::Semaphore; use futures::stream::{FuturesUnordered, StreamExt}; -use sha2::{Sha256, Digest}; use crate::{ AppState, models::{CreateWebDAVFile, UpdateWebDAVSyncState}, file_service::FileService, + document_ingestion::{DocumentIngestionService, IngestionResult}, webdav_service::{WebDAVConfig, WebDAVService}, }; @@ -19,6 +19,7 @@ pub async fn perform_webdav_sync_with_tracking( webdav_service: WebDAVService, config: WebDAVConfig, enable_background_ocr: bool, + webdav_source_id: Option, ) -> Result> { info!("Performing WebDAV sync for user {} on {} folders", user_id, config.watch_folders.len()); @@ -59,7 +60,7 @@ pub async fn perform_webdav_sync_with_tracking( }; // Perform sync with proper cleanup - let sync_result = perform_sync_internal(state.clone(), user_id, webdav_service, config, enable_background_ocr).await; + let sync_result = perform_sync_internal(state.clone(), user_id, webdav_service, config, enable_background_ocr, webdav_source_id).await; match &sync_result { Ok(files_processed) => { @@ -80,6 +81,7 @@ async fn perform_sync_internal( webdav_service: WebDAVService, config: WebDAVConfig, enable_background_ocr: bool, + webdav_source_id: Option, ) -> Result> { let mut total_files_processed = 0; @@ -161,6 +163,7 @@ async fn perform_sync_internal( &file_info_clone, enable_background_ocr, semaphore_clone, + webdav_source_id, ).await }; @@ -230,6 +233,7 @@ async fn process_single_file( file_info: &crate::models::FileInfo, enable_background_ocr: bool, semaphore: Arc, + webdav_source_id: Option, ) -> Result { // Acquire semaphore permit to limit concurrent downloads let _permit = semaphore.acquire().await.map_err(|e| format!("Semaphore error: {}", e))?; @@ -273,74 +277,68 @@ async fn process_single_file( info!("Downloaded file: {} ({} bytes)", file_info.name, file_data.len()); - // Calculate file hash for deduplication - let file_hash = calculate_file_hash(&file_data); + // Use the unified ingestion service for consistent deduplication + let file_service = FileService::new(state.config.upload_path.clone()); + let ingestion_service = DocumentIngestionService::new(state.db.clone(), file_service); - // Check if this exact file content already exists for this user using efficient hash lookup - info!("Checking for duplicate content for user {}: {} (hash: {}, size: {} bytes)", - user_id, file_info.name, &file_hash[..8], file_data.len()); - - // Use efficient database hash lookup instead of reading all documents - match state.db.get_document_by_user_and_hash(user_id, &file_hash).await { - Ok(Some(existing_doc)) => { - info!("Found duplicate content for user {}: {} matches existing document {} (hash: {})", - user_id, file_info.name, existing_doc.original_filename, &file_hash[..8]); - - // Record this WebDAV file as a duplicate but link to existing document - let webdav_file = CreateWebDAVFile { + let result = if let Some(source_id) = webdav_source_id { + ingestion_service + .ingest_from_webdav( + &file_info.name, + file_data, + &file_info.mime_type, user_id, - webdav_path: file_info.path.clone(), - etag: file_info.etag.clone(), - last_modified: file_info.last_modified, - file_size: file_info.size, - mime_type: file_info.mime_type.clone(), - document_id: Some(existing_doc.id), // Link to existing document - sync_status: "duplicate_content".to_string(), - sync_error: None, - }; + source_id, + ) + .await + } else { + // Fallback for backward compatibility - treat as generic WebDAV sync + ingestion_service + .ingest_from_source( + &file_info.name, + file_data, + &file_info.mime_type, + user_id, + uuid::Uuid::new_v4(), // Generate a temporary ID for tracking + "webdav_sync", + ) + .await + }; + + let result = result.map_err(|e| format!("Document ingestion failed for {}: {}", file_info.name, e))?; + + let (document, should_queue_ocr, webdav_sync_status) = match result { + IngestionResult::Created(doc) => { + info!("Created new document for {}: {}", file_info.name, doc.id); + (doc, true, "synced") // New document - queue for OCR + } + IngestionResult::ExistingDocument(doc) => { + info!("Found existing document for {}: {}", file_info.name, doc.id); + (doc, false, "duplicate_content") // Existing document - don't re-queue OCR + } + IngestionResult::TrackedAsDuplicate { existing_document_id } => { + info!("Tracked {} as duplicate of existing document: {}", file_info.name, existing_document_id); - if let Err(e) = state.db.create_or_update_webdav_file(&webdav_file).await { - error!("Failed to record duplicate WebDAV file: {}", e); - } + // For duplicates, we still need to get the document info for WebDAV tracking + let existing_doc = state.db.get_document_by_id(existing_document_id, user_id, crate::models::UserRole::User).await + .map_err(|e| format!("Failed to get existing document: {}", e))? + .ok_or_else(|| "Document not found".to_string())?; - info!("WebDAV file marked as duplicate_content, skipping processing"); - return Ok(false); // Not processed (duplicate) + (existing_doc, false, "duplicate_content") // Track as duplicate } - Ok(None) => { - info!("No duplicate content found for hash {}, proceeding with file processing", &file_hash[..8]); + IngestionResult::Skipped { existing_document_id, reason: _ } => { + info!("Skipped duplicate file {}: existing document {}", file_info.name, existing_document_id); + + // For skipped files, we still need to get the document info for WebDAV tracking + let existing_doc = state.db.get_document_by_id(existing_document_id, user_id, crate::models::UserRole::User).await + .map_err(|e| format!("Failed to get existing document: {}", e))? + .ok_or_else(|| "Document not found".to_string())?; + + (existing_doc, false, "duplicate_content") // Track as duplicate } - Err(e) => { - warn!("Error checking for duplicate hash {}: {}", &file_hash[..8], e); - // Continue processing even if duplicate check fails - } - } - - // Create file service and save file to disk - let file_service = FileService::new(state.config.upload_path.clone()); - - let saved_file_path = file_service.save_file(&file_info.name, &file_data).await - .map_err(|e| format!("Failed to save {}: {}", file_info.name, e))?; - - // Create document record with hash - let file_service = FileService::new(state.config.upload_path.clone()); - let document = file_service.create_document( - &file_info.name, - &file_info.name, // original filename same as name - &saved_file_path, - file_data.len() as i64, - &file_info.mime_type, - user_id, - Some(file_hash.clone()), // Store the calculated hash - ); - - // Save document to database - let created_document = state.db.create_document(document) - .await - .map_err(|e| format!("Failed to create document {}: {}", file_info.name, e))?; - - info!("Created document record for {}: {}", file_info.name, created_document.id); - - // Record successful file in WebDAV files table + }; + + // Record WebDAV file in tracking table let webdav_file = CreateWebDAVFile { user_id, webdav_path: file_info.path.clone(), @@ -348,8 +346,8 @@ async fn process_single_file( last_modified: file_info.last_modified, file_size: file_info.size, mime_type: file_info.mime_type.clone(), - document_id: Some(created_document.id), - sync_status: "synced".to_string(), + document_id: Some(document.id), + sync_status: webdav_sync_status.to_string(), sync_error: None, }; @@ -357,45 +355,26 @@ async fn process_single_file( error!("Failed to record WebDAV file: {}", e); } - // Queue for OCR processing if enabled - if enable_background_ocr { - info!("Background OCR is enabled, queueing document {} for processing", created_document.id); + // Queue for OCR processing if enabled and this is a new document + if enable_background_ocr && should_queue_ocr { + info!("Background OCR is enabled, queueing document {} for processing", document.id); - match state.db.pool.acquire().await { - Ok(_conn) => { - let queue_service = crate::ocr_queue::OcrQueueService::new( - state.db.clone(), - state.db.pool.clone(), - 4 - ); - - // Determine priority based on file size - let priority = if file_info.size <= 1024 * 1024 { 10 } // ≤ 1MB: High priority - else if file_info.size <= 5 * 1024 * 1024 { 8 } // ≤ 5MB: Medium priority - else if file_info.size <= 10 * 1024 * 1024 { 6 } // ≤ 10MB: Normal priority - else if file_info.size <= 50 * 1024 * 1024 { 4 } // ≤ 50MB: Low priority - else { 2 }; // > 50MB: Lowest priority - - if let Err(e) = queue_service.enqueue_document(created_document.id, priority, file_info.size).await { - error!("Failed to enqueue document for OCR: {}", e); - } else { - info!("Enqueued document {} for OCR processing", created_document.id); - } - } - Err(e) => { - error!("Failed to connect to database for OCR queueing: {}", e); - } + // Determine priority based on file size + let priority = if file_info.size <= 1024 * 1024 { 10 } // ≤ 1MB: High priority + else if file_info.size <= 5 * 1024 * 1024 { 8 } // ≤ 5MB: Medium priority + else if file_info.size <= 10 * 1024 * 1024 { 6 } // ≤ 10MB: Normal priority + else if file_info.size <= 50 * 1024 * 1024 { 4 } // ≤ 50MB: Low priority + else { 2 }; // > 50MB: Lowest priority + + if let Err(e) = state.queue_service.enqueue_document(document.id, priority, file_info.size).await { + error!("Failed to enqueue document for OCR: {}", e); + } else { + info!("Enqueued document {} for OCR processing", document.id); } } else { - info!("Background OCR is disabled, skipping OCR queue for document {}", created_document.id); + info!("Background OCR is disabled or document already processed, skipping OCR queue for document {}", document.id); } Ok(true) // Successfully processed } -fn calculate_file_hash(data: &[u8]) -> String { - let mut hasher = Sha256::new(); - hasher.update(data); - let result = hasher.finalize(); - format!("{:x}", result) -} \ No newline at end of file diff --git a/src/source_sync.rs b/src/source_sync.rs index cffe4c3..98f64ca 100644 --- a/src/source_sync.rs +++ b/src/source_sync.rs @@ -5,7 +5,6 @@ use chrono::Utc; use tokio::sync::Semaphore; use tokio_util::sync::CancellationToken; use futures::stream::{FuturesUnordered, StreamExt}; -use sha2::{Sha256, Digest}; use tracing::{error, info, warn}; use uuid::Uuid; @@ -13,6 +12,7 @@ use crate::{ AppState, models::{FileInfo, Source, SourceType, SourceStatus, LocalFolderSourceConfig, S3SourceConfig, WebDAVSourceConfig}, file_service::FileService, + document_ingestion::{DocumentIngestionService, IngestionResult}, local_folder_service::LocalFolderService, s3_service::S3Service, webdav_service::{WebDAVService, WebDAVConfig}, @@ -507,7 +507,7 @@ impl SourceSyncService { async fn process_single_file( state: Arc, user_id: Uuid, - _source_id: Uuid, + source_id: Uuid, file_info: &FileInfo, enable_background_ocr: bool, semaphore: Arc, @@ -521,9 +521,6 @@ impl SourceSyncService { .map_err(|e| anyhow!("Semaphore error: {}", e))?; info!("Processing file: {}", file_info.path); - - // Check if we've already processed this file by looking for documents with same source - // This is a simplified version - you might want to implement source-specific tracking tables // Download the file let file_data = download_file(file_info.path.clone()).await @@ -531,73 +528,55 @@ impl SourceSyncService { info!("Downloaded file: {} ({} bytes)", file_info.name, file_data.len()); - // Calculate file hash for deduplication - let file_hash = Self::calculate_file_hash(&file_data); - - // Check for duplicate content using efficient hash lookup - match state.db.get_document_by_user_and_hash(user_id, &file_hash).await { - Ok(Some(existing_doc)) => { - info!("File content already exists for user {}: {} matches existing document {} (hash: {})", - user_id, file_info.name, existing_doc.original_filename, &file_hash[..8]); - return Ok(false); // Skip processing duplicate - } - Ok(None) => { - info!("No duplicate content found for hash {}, proceeding with file processing", &file_hash[..8]); - } - Err(e) => { - warn!("Error checking for duplicate hash {}: {}", &file_hash[..8], e); - // Continue processing even if duplicate check fails - } - } - - // Save file to disk + // Use the unified ingestion service for consistent deduplication let file_service = FileService::new(state.config.upload_path.clone()); - let saved_file_path = file_service.save_file(&file_info.name, &file_data).await - .map_err(|e| anyhow!("Failed to save {}: {}", file_info.name, e))?; + let ingestion_service = DocumentIngestionService::new(state.db.clone(), file_service); + + let result = ingestion_service + .ingest_from_source( + &file_info.name, + file_data, + &file_info.mime_type, + user_id, + source_id, + "source_sync", + ) + .await + .map_err(|e| anyhow!("Document ingestion failed for {}: {}", file_info.name, e))?; - // Create document record with hash - let document = file_service.create_document( - &file_info.name, - &file_info.name, - &saved_file_path, - file_data.len() as i64, - &file_info.mime_type, - user_id, - Some(file_hash.clone()), // Store the calculated hash - ); + let (document, should_queue_ocr) = match result { + IngestionResult::Created(doc) => { + info!("Created new document for {}: {}", file_info.name, doc.id); + (doc, true) // New document - queue for OCR + } + IngestionResult::Skipped { existing_document_id, reason } => { + info!("Skipped duplicate file {}: {} (existing: {})", file_info.name, reason, existing_document_id); + return Ok(false); // File was skipped due to deduplication + } + IngestionResult::ExistingDocument(doc) => { + info!("Found existing document for {}: {}", file_info.name, doc.id); + (doc, false) // Existing document - don't re-queue OCR + } + IngestionResult::TrackedAsDuplicate { existing_document_id } => { + info!("Tracked {} as duplicate of existing document: {}", file_info.name, existing_document_id); + return Ok(false); // File was tracked as duplicate + } + }; - let created_document = state.db.create_document(document).await - .map_err(|e| anyhow!("Failed to create document {}: {}", file_info.name, e))?; + // Queue for OCR if enabled and this is a new document + if enable_background_ocr && should_queue_ocr { + info!("Background OCR enabled, queueing document {} for processing", document.id); - info!("Created document record for {}: {}", file_info.name, created_document.id); + let priority = if file_info.size <= 1024 * 1024 { 10 } + else if file_info.size <= 5 * 1024 * 1024 { 8 } + else if file_info.size <= 10 * 1024 * 1024 { 6 } + else if file_info.size <= 50 * 1024 * 1024 { 4 } + else { 2 }; - // Queue for OCR if enabled - if enable_background_ocr { - info!("Background OCR enabled, queueing document {} for processing", created_document.id); - - match state.db.pool.acquire().await { - Ok(_conn) => { - let queue_service = crate::ocr_queue::OcrQueueService::new( - state.db.clone(), - state.db.pool.clone(), - 4 - ); - - let priority = if file_info.size <= 1024 * 1024 { 10 } - else if file_info.size <= 5 * 1024 * 1024 { 8 } - else if file_info.size <= 10 * 1024 * 1024 { 6 } - else if file_info.size <= 50 * 1024 * 1024 { 4 } - else { 2 }; - - if let Err(e) = queue_service.enqueue_document(created_document.id, priority, file_info.size).await { - error!("Failed to enqueue document for OCR: {}", e); - } else { - info!("Enqueued document {} for OCR processing", created_document.id); - } - } - Err(e) => { - error!("Failed to connect to database for OCR queueing: {}", e); - } + if let Err(e) = state.queue_service.enqueue_document(document.id, priority, file_info.size).await { + error!("Failed to enqueue document for OCR: {}", e); + } else { + info!("Enqueued document {} for OCR processing", document.id); } } @@ -607,7 +586,7 @@ impl SourceSyncService { async fn process_single_file_with_cancellation( state: Arc, user_id: Uuid, - _source_id: Uuid, + source_id: Uuid, file_info: &FileInfo, enable_background_ocr: bool, semaphore: Arc, @@ -647,79 +626,61 @@ impl SourceSyncService { info!("Downloaded file: {} ({} bytes)", file_info.name, file_data.len()); - // Calculate file hash for deduplication - let file_hash = Self::calculate_file_hash(&file_data); - - // Check for duplicate content using efficient hash lookup - match state.db.get_document_by_user_and_hash(user_id, &file_hash).await { - Ok(Some(existing_doc)) => { - info!("File content already exists for user {}: {} matches existing document {} (hash: {})", - user_id, file_info.name, existing_doc.original_filename, &file_hash[..8]); - return Ok(false); // Skip processing duplicate - } - Ok(None) => { - info!("No duplicate content found for hash {}, proceeding with file processing", &file_hash[..8]); - } - Err(e) => { - warn!("Error checking for duplicate hash {}: {}", &file_hash[..8], e); - // Continue processing even if duplicate check fails - } - } - - // Check for cancellation before saving + // Check for cancellation before processing if cancellation_token.is_cancelled() { - info!("File processing cancelled before saving: {}", file_info.path); + info!("File processing cancelled before ingestion: {}", file_info.path); return Err(anyhow!("Processing cancelled")); } - // Save file to disk + // Use the unified ingestion service for consistent deduplication let file_service = FileService::new(state.config.upload_path.clone()); - let saved_file_path = file_service.save_file(&file_info.name, &file_data).await - .map_err(|e| anyhow!("Failed to save {}: {}", file_info.name, e))?; + let ingestion_service = DocumentIngestionService::new(state.db.clone(), file_service); + + let result = ingestion_service + .ingest_from_source( + &file_info.name, + file_data, + &file_info.mime_type, + user_id, + source_id, + "source_sync", + ) + .await + .map_err(|e| anyhow!("Document ingestion failed for {}: {}", file_info.name, e))?; - // Create document record with hash - let document = file_service.create_document( - &file_info.name, - &file_info.name, - &saved_file_path, - file_data.len() as i64, - &file_info.mime_type, - user_id, - Some(file_hash.clone()), // Store the calculated hash - ); + let (document, should_queue_ocr) = match result { + IngestionResult::Created(doc) => { + info!("Created new document for {}: {}", file_info.name, doc.id); + (doc, true) // New document - queue for OCR + } + IngestionResult::Skipped { existing_document_id, reason } => { + info!("Skipped duplicate file {}: {} (existing: {})", file_info.name, reason, existing_document_id); + return Ok(false); // File was skipped due to deduplication + } + IngestionResult::ExistingDocument(doc) => { + info!("Found existing document for {}: {}", file_info.name, doc.id); + (doc, false) // Existing document - don't re-queue OCR + } + IngestionResult::TrackedAsDuplicate { existing_document_id } => { + info!("Tracked {} as duplicate of existing document: {}", file_info.name, existing_document_id); + return Ok(false); // File was tracked as duplicate + } + }; - let created_document = state.db.create_document(document).await - .map_err(|e| anyhow!("Failed to create document {}: {}", file_info.name, e))?; + // Queue for OCR if enabled and this is a new document (OCR continues even if sync is cancelled) + if enable_background_ocr && should_queue_ocr { + info!("Background OCR enabled, queueing document {} for processing", document.id); - info!("Created document record for {}: {}", file_info.name, created_document.id); + let priority = if file_info.size <= 1024 * 1024 { 10 } + else if file_info.size <= 5 * 1024 * 1024 { 8 } + else if file_info.size <= 10 * 1024 * 1024 { 6 } + else if file_info.size <= 50 * 1024 * 1024 { 4 } + else { 2 }; - // Queue for OCR if enabled (OCR continues even if sync is cancelled) - if enable_background_ocr { - info!("Background OCR enabled, queueing document {} for processing", created_document.id); - - match state.db.pool.acquire().await { - Ok(_conn) => { - let queue_service = crate::ocr_queue::OcrQueueService::new( - state.db.clone(), - state.db.pool.clone(), - 4 - ); - - let priority = if file_info.size <= 1024 * 1024 { 10 } - else if file_info.size <= 5 * 1024 * 1024 { 8 } - else if file_info.size <= 10 * 1024 * 1024 { 6 } - else if file_info.size <= 50 * 1024 * 1024 { 4 } - else { 2 }; - - if let Err(e) = queue_service.enqueue_document(created_document.id, priority, file_info.size).await { - error!("Failed to enqueue document for OCR: {}", e); - } else { - info!("Enqueued document {} for OCR processing", created_document.id); - } - } - Err(e) => { - error!("Failed to connect to database for OCR queueing: {}", e); - } + if let Err(e) = state.queue_service.enqueue_document(document.id, priority, file_info.size).await { + error!("Failed to enqueue document for OCR: {}", e); + } else { + info!("Enqueued document {} for OCR processing", document.id); } } @@ -752,10 +713,4 @@ impl SourceSyncService { Ok(()) } - fn calculate_file_hash(data: &[u8]) -> String { - let mut hasher = Sha256::new(); - hasher.update(data); - let result = hasher.finalize(); - format!("{:x}", result) - } } \ No newline at end of file diff --git a/src/watcher.rs b/src/watcher.rs index 626ef16..06c2942 100644 --- a/src/watcher.rs +++ b/src/watcher.rs @@ -7,9 +7,14 @@ use tokio::sync::mpsc; use tokio::time::{interval, sleep}; use tracing::{debug, error, info, warn}; use walkdir::WalkDir; -use sha2::{Sha256, Digest}; -use crate::{config::Config, db::Database, file_service::FileService, ocr_queue::OcrQueueService}; +use crate::{ + config::Config, + db::Database, + file_service::FileService, + document_ingestion::{DocumentIngestionService, IngestionResult}, + ocr_queue::OcrQueueService +}; pub async fn start_folder_watcher(config: Config, db: Database) -> Result<()> { info!("Starting hybrid folder watcher on: {}", config.watch_folder); @@ -315,36 +320,6 @@ async fn process_file( .ok_or_else(|| anyhow::anyhow!("Admin user not found. Please ensure the admin user is created."))?; let admin_user_id = admin_user.id; - // Calculate file hash for deduplication - let file_hash = calculate_file_hash(&file_data); - - // Check if this exact file content already exists for the admin user - debug!("Checking for duplicate content for admin user: {} (hash: {}, size: {} bytes)", - filename, &file_hash[..8], file_size); - - // Query documents with the same file size for the admin user only - if let Ok(existing_docs) = db.get_documents_by_user_with_role(admin_user_id, crate::models::UserRole::Admin, 1000, 0).await { - let matching_docs: Vec<_> = existing_docs.into_iter() - .filter(|doc| doc.file_size == file_size) - .collect(); - - debug!("Found {} documents with same size for admin user", matching_docs.len()); - - for existing_doc in matching_docs { - // Read the existing file and compare hashes - if let Ok(existing_file_data) = tokio::fs::read(&existing_doc.file_path).await { - let existing_hash = calculate_file_hash(&existing_file_data); - if file_hash == existing_hash { - info!("Skipping duplicate file content: {} (hash: {}, already exists as: {})", - filename, &file_hash[..8], existing_doc.original_filename); - return Ok(()); - } - } - } - } - - debug!("File content is unique: {} (hash: {})", filename, &file_hash[..8]); - // Validate PDF files before processing if mime_type == "application/pdf" { if !is_valid_pdf(&file_data) { @@ -360,28 +335,34 @@ async fn process_file( } } - let saved_file_path = file_service.save_file(&filename, &file_data).await?; + // Use the unified ingestion service for consistent deduplication + let ingestion_service = DocumentIngestionService::new(db.clone(), file_service.clone()); - // Calculate file hash for deduplication - let file_hash = calculate_file_hash(&file_data); - - let document = file_service.create_document( - &filename, - &filename, - &saved_file_path, - file_size, - &mime_type, - admin_user_id, - Some(file_hash), - ); - - let created_doc = db.create_document(document).await?; - - // Enqueue for OCR processing with priority based on file size and type - let priority = calculate_priority(file_size, &mime_type); - queue_service.enqueue_document(created_doc.id, priority, file_size).await?; - - info!("Successfully queued file for OCR: {} (size: {} bytes)", filename, file_size); + let result = ingestion_service + .ingest_batch_file(&filename, file_data, &mime_type, admin_user_id) + .await + .map_err(|e| anyhow::anyhow!(e))?; + + match result { + IngestionResult::Created(doc) => { + info!("Created new document for watch folder file {}: {}", filename, doc.id); + + // Enqueue for OCR processing with priority based on file size and type + let priority = calculate_priority(file_size, &mime_type); + queue_service.enqueue_document(doc.id, priority, file_size).await?; + + info!("Successfully queued file for OCR: {} (size: {} bytes)", filename, file_size); + } + IngestionResult::Skipped { existing_document_id, reason } => { + info!("Skipped duplicate watch folder file {}: {} (existing: {})", filename, reason, existing_document_id); + } + IngestionResult::ExistingDocument(doc) => { + info!("Found existing document for watch folder file {}: {} (not re-queuing for OCR)", filename, doc.id); + } + IngestionResult::TrackedAsDuplicate { existing_document_id } => { + info!("Tracked watch folder file {} as duplicate of existing document: {}", filename, existing_document_id); + } + } Ok(()) } @@ -463,9 +444,3 @@ fn clean_pdf_data(data: &[u8]) -> &[u8] { data } -fn calculate_file_hash(data: &[u8]) -> String { - let mut hasher = Sha256::new(); - hasher.update(data); - let result = hasher.finalize(); - format!("{:x}", result) -} \ No newline at end of file diff --git a/src/webdav_scheduler.rs b/src/webdav_scheduler.rs index bdb6f3a..262e9f4 100644 --- a/src/webdav_scheduler.rs +++ b/src/webdav_scheduler.rs @@ -96,7 +96,7 @@ impl WebDAVScheduler { info!("Resuming interrupted WebDAV sync for user {}", user_id); tokio::spawn(async move { - match perform_webdav_sync_with_tracking(state_clone.clone(), user_id, webdav_service, webdav_config, enable_background_ocr).await { + match perform_webdav_sync_with_tracking(state_clone.clone(), user_id, webdav_service, webdav_config, enable_background_ocr, None).await { Ok(files_processed) => { info!("Resumed WebDAV sync completed for user {}: {} files processed", user_id, files_processed); @@ -156,7 +156,7 @@ impl WebDAVScheduler { let enable_background_ocr = user_settings.enable_background_ocr; tokio::spawn(async move { - match perform_webdav_sync_with_tracking(state_clone.clone(), user_id, webdav_service, webdav_config, enable_background_ocr).await { + match perform_webdav_sync_with_tracking(state_clone.clone(), user_id, webdav_service, webdav_config, enable_background_ocr, None).await { Ok(files_processed) => { info!("Background WebDAV sync completed for user {}: {} files processed", user_id, files_processed);