diff --git a/src/ingestion/batch_ingest.rs b/src/ingestion/batch_ingest.rs index d1ac444..93bd61e 100644 --- a/src/ingestion/batch_ingest.rs +++ b/src/ingestion/batch_ingest.rs @@ -6,13 +6,15 @@ use tokio::sync::Semaphore; use tracing::{error, info, warn}; use uuid::Uuid; use walkdir::WalkDir; +use chrono::{DateTime, Utc}; use crate::{ config::Config, db::Database, services::file_service::FileService, - ingestion::document_ingestion::{DocumentIngestionService, IngestionResult}, + ingestion::document_ingestion::{DocumentIngestionService, IngestionResult, DeduplicationPolicy}, ocr::queue::OcrQueueService, + models::FileInfo, }; pub struct BatchIngester { @@ -164,58 +166,105 @@ impl BatchIngester { } } -async fn process_single_file( - path: PathBuf, - file_service: FileService, - user_id: Uuid, - db: Database, -) -> Result> { +/// Extract FileInfo from filesystem path and metadata +async fn extract_file_info_from_path(path: &Path) -> Result { + let metadata = fs::metadata(path).await?; let filename = path .file_name() .and_then(|n| n.to_str()) .unwrap_or("") .to_string(); - // Read file metadata - let metadata = fs::metadata(&path).await?; let file_size = metadata.len() as i64; + let mime_type = mime_guess::from_path(&filename) + .first_or_octet_stream() + .to_string(); + + // Extract timestamps + let last_modified = metadata.modified() + .ok() + .and_then(|time| time.duration_since(std::time::UNIX_EPOCH).ok()) + .map(|duration| DateTime::from_timestamp(duration.as_secs() as i64, 0).unwrap_or_else(Utc::now)); + + let created_at = metadata.created() + .ok() + .and_then(|time| time.duration_since(std::time::UNIX_EPOCH).ok()) + .map(|duration| DateTime::from_timestamp(duration.as_secs() as i64, 0).unwrap_or_else(Utc::now)); + + // Extract Unix permissions (if on Unix-like system) + #[cfg(unix)] + let (permissions, owner, group) = { + use std::os::unix::fs::MetadataExt; + let permissions = Some(metadata.mode()); + + // For now, just use uid/gid as strings (could be enhanced to resolve names later) + let owner = Some(metadata.uid().to_string()); + let group = Some(metadata.gid().to_string()); + + (permissions, owner, group) + }; + + // On non-Unix systems, permissions/owner/group are not available + #[cfg(not(unix))] + let (permissions, owner, group) = (None, None, None); + + Ok(FileInfo { + path: path.to_string_lossy().to_string(), + name: filename, + size: file_size, + mime_type, + last_modified, + etag: format!("{}-{}", file_size, last_modified.map_or(0, |t| t.timestamp())), + is_directory: metadata.is_dir(), + created_at, + permissions, + owner, + group, + metadata: None, // Could extract EXIF/other metadata in the future + }) +} + +async fn process_single_file( + path: PathBuf, + file_service: FileService, + user_id: Uuid, + db: Database, +) -> Result> { + // Extract file info with metadata + let file_info = extract_file_info_from_path(&path).await?; // Skip very large files (> 100MB) - if file_size > 100 * 1024 * 1024 { - warn!("Skipping large file: {} ({} MB)", filename, file_size / 1024 / 1024); + if file_info.size > 100 * 1024 * 1024 { + warn!("Skipping large file: {} ({} MB)", file_info.name, file_info.size / 1024 / 1024); return Ok(None); } // Read file data let file_data = fs::read(&path).await?; - let mime_type = mime_guess::from_path(&filename) - .first_or_octet_stream() - .to_string(); - - // Use the unified ingestion service for consistent deduplication + // Use the unified ingestion service with full metadata support let ingestion_service = DocumentIngestionService::new(db, file_service); let result = ingestion_service - .ingest_batch_file(&filename, file_data, &mime_type, user_id) + .ingest_from_file_info(&file_info, file_data, user_id, DeduplicationPolicy::Skip, "batch_ingest", None) .await .map_err(|e| anyhow::anyhow!(e))?; match result { IngestionResult::Created(doc) => { - info!("Created new document for batch file {}: {}", filename, doc.id); - Ok(Some((doc.id, file_size))) + info!("Created new document for batch file {}: {}", file_info.name, doc.id); + Ok(Some((doc.id, file_info.size))) } IngestionResult::Skipped { existing_document_id, reason } => { - info!("Skipped duplicate batch file {}: {} (existing: {})", filename, reason, existing_document_id); + info!("Skipped duplicate batch file {}: {} (existing: {})", file_info.name, reason, existing_document_id); Ok(None) // File was skipped due to deduplication } IngestionResult::ExistingDocument(doc) => { - info!("Found existing document for batch file {}: {}", filename, doc.id); + info!("Found existing document for batch file {}: {}", file_info.name, doc.id); Ok(None) // Don't re-queue for OCR } IngestionResult::TrackedAsDuplicate { existing_document_id } => { - info!("Tracked batch file {} as duplicate of existing document: {}", filename, existing_document_id); + info!("Tracked batch file {} as duplicate of existing document: {}", file_info.name, existing_document_id); Ok(None) // File was tracked as duplicate } } diff --git a/src/scheduling/watcher.rs b/src/scheduling/watcher.rs index f4c3ce1..9cb3434 100644 --- a/src/scheduling/watcher.rs +++ b/src/scheduling/watcher.rs @@ -7,13 +7,15 @@ use tokio::sync::mpsc; use tokio::time::{interval, sleep}; use tracing::{debug, error, info, warn}; use walkdir::WalkDir; +use chrono::{DateTime, Utc}; use crate::{ config::Config, db::Database, services::file_service::FileService, - ingestion::document_ingestion::{DocumentIngestionService, IngestionResult}, - ocr::queue::OcrQueueService + ingestion::document_ingestion::{DocumentIngestionService, IngestionResult, DeduplicationPolicy}, + ocr::queue::OcrQueueService, + models::FileInfo, }; pub async fn start_folder_watcher(config: Config, db: Database) -> Result<()> { @@ -335,38 +337,96 @@ async fn process_file( } } + // Extract file info with metadata + let file_info = extract_file_info_from_path(path).await?; + // Use the unified ingestion service for consistent deduplication let ingestion_service = DocumentIngestionService::new(db.clone(), file_service.clone()); let result = ingestion_service - .ingest_batch_file(&filename, file_data, &mime_type, admin_user_id) + .ingest_from_file_info(&file_info, file_data, admin_user_id, DeduplicationPolicy::Skip, "watch_folder", None) .await .map_err(|e| anyhow::anyhow!(e))?; match result { IngestionResult::Created(doc) => { - info!("Created new document for watch folder file {}: {}", filename, doc.id); + info!("Created new document for watch folder file {}: {}", file_info.name, doc.id); // Enqueue for OCR processing with priority based on file size and type - let priority = calculate_priority(file_size, &mime_type); - queue_service.enqueue_document(doc.id, priority, file_size).await?; + let priority = calculate_priority(file_info.size, &file_info.mime_type); + queue_service.enqueue_document(doc.id, priority, file_info.size).await?; - info!("Successfully queued file for OCR: {} (size: {} bytes)", filename, file_size); + info!("Successfully queued file for OCR: {} (size: {} bytes)", file_info.name, file_info.size); } IngestionResult::Skipped { existing_document_id, reason } => { - info!("Skipped duplicate watch folder file {}: {} (existing: {})", filename, reason, existing_document_id); + info!("Skipped duplicate watch folder file {}: {} (existing: {})", file_info.name, reason, existing_document_id); } IngestionResult::ExistingDocument(doc) => { - info!("Found existing document for watch folder file {}: {} (not re-queuing for OCR)", filename, doc.id); + info!("Found existing document for watch folder file {}: {} (not re-queuing for OCR)", file_info.name, doc.id); } IngestionResult::TrackedAsDuplicate { existing_document_id } => { - info!("Tracked watch folder file {} as duplicate of existing document: {}", filename, existing_document_id); + info!("Tracked watch folder file {} as duplicate of existing document: {}", file_info.name, existing_document_id); } } Ok(()) } +/// Extract FileInfo from filesystem path and metadata (for watcher) +async fn extract_file_info_from_path(path: &Path) -> Result { + let metadata = tokio::fs::metadata(path).await?; + let filename = path + .file_name() + .and_then(|n| n.to_str()) + .unwrap_or("") + .to_string(); + + let file_size = metadata.len() as i64; + let mime_type = mime_guess::from_path(&filename) + .first_or_octet_stream() + .to_string(); + + // Extract timestamps + let last_modified = metadata.modified() + .ok() + .and_then(|time| time.duration_since(std::time::UNIX_EPOCH).ok()) + .map(|duration| DateTime::from_timestamp(duration.as_secs() as i64, 0).unwrap_or_else(Utc::now)); + + let created_at = metadata.created() + .ok() + .and_then(|time| time.duration_since(std::time::UNIX_EPOCH).ok()) + .map(|duration| DateTime::from_timestamp(duration.as_secs() as i64, 0).unwrap_or_else(Utc::now)); + + // Extract Unix permissions (if on Unix-like system) + #[cfg(unix)] + let (permissions, owner, group) = { + use std::os::unix::fs::MetadataExt; + let permissions = Some(metadata.mode() as u32); + let owner = Some(metadata.uid().to_string()); + let group = Some(metadata.gid().to_string()); + (permissions, owner, group) + }; + + // On non-Unix systems, permissions/owner/group are not available + #[cfg(not(unix))] + let (permissions, owner, group) = (None, None, None); + + Ok(FileInfo { + path: path.to_string_lossy().to_string(), + name: filename, + size: file_size, + mime_type, + last_modified, + etag: format!("{}-{}", file_size, last_modified.map_or(0, |t| t.timestamp())), + is_directory: metadata.is_dir(), + created_at, + permissions, + owner, + group, + metadata: None, // Could extract EXIF/other metadata in the future + }) +} + fn is_ocr_able_file(mime_type: &str) -> bool { matches!(mime_type, "application/pdf" |