use std::sync::Arc; use std::path::Path; use anyhow::{anyhow, Result}; use chrono::Utc; use tokio::sync::Semaphore; use tokio_util::sync::CancellationToken; use futures::stream::{FuturesUnordered, StreamExt}; use sha2::{Sha256, Digest}; use tracing::{error, info, warn}; use uuid::Uuid; use crate::{ AppState, models::{FileInfo, Source, SourceType, SourceStatus, LocalFolderSourceConfig, S3SourceConfig, WebDAVSourceConfig}, file_service::FileService, local_folder_service::LocalFolderService, s3_service::S3Service, webdav_service::{WebDAVService, WebDAVConfig}, }; #[derive(Clone)] pub struct SourceSyncService { state: Arc, } impl SourceSyncService { pub fn new(state: Arc) -> Self { Self { state } } /// Perform sync for any source type pub async fn sync_source(&self, source: &Source, enable_background_ocr: bool) -> Result { // Call the cancellable version with no cancellation token self.sync_source_with_cancellation(source, enable_background_ocr, CancellationToken::new()).await } /// Perform sync for any source type with cancellation support pub async fn sync_source_with_cancellation(&self, source: &Source, enable_background_ocr: bool, cancellation_token: CancellationToken) -> Result { info!("Starting sync for source {} ({})", source.name, source.source_type); // Check for cancellation before starting if cancellation_token.is_cancelled() { info!("Sync for source {} was cancelled before starting", source.name); return Err(anyhow!("Sync cancelled")); } // Update source status to syncing if let Err(e) = self.update_source_status(source.id, SourceStatus::Syncing, None).await { error!("Failed to update source status: {}", e); } let sync_result = match source.source_type { SourceType::WebDAV => self.sync_webdav_source_with_cancellation(source, enable_background_ocr, cancellation_token.clone()).await, SourceType::LocalFolder => self.sync_local_folder_source_with_cancellation(source, enable_background_ocr, cancellation_token.clone()).await, SourceType::S3 => self.sync_s3_source_with_cancellation(source, enable_background_ocr, cancellation_token.clone()).await, }; match &sync_result { Ok(files_processed) => { if cancellation_token.is_cancelled() { info!("Sync for source {} was cancelled during execution", source.name); if let Err(e) = self.update_source_status(source.id, SourceStatus::Idle, Some("Sync cancelled by user")).await { error!("Failed to update source status after cancellation: {}", e); } } else { info!("Sync completed for source {}: {} files processed", source.name, files_processed); if let Err(e) = self.update_source_status(source.id, SourceStatus::Idle, None).await { error!("Failed to update source status after successful sync: {}", e); } } } Err(e) => { if cancellation_token.is_cancelled() { info!("Sync for source {} was cancelled: {}", source.name, e); if let Err(e) = self.update_source_status(source.id, SourceStatus::Idle, Some("Sync cancelled by user")).await { error!("Failed to update source status after cancellation: {}", e); } } else { error!("Sync failed for source {}: {}", source.name, e); let error_msg = format!("Sync failed: {}", e); if let Err(e) = self.update_source_status(source.id, SourceStatus::Error, Some(&error_msg)).await { error!("Failed to update source status after error: {}", e); } } } } sync_result } async fn sync_webdav_source(&self, source: &Source, enable_background_ocr: bool) -> Result { self.sync_webdav_source_with_cancellation(source, enable_background_ocr, CancellationToken::new()).await } async fn sync_webdav_source_with_cancellation(&self, source: &Source, enable_background_ocr: bool, cancellation_token: CancellationToken) -> Result { let config: WebDAVSourceConfig = serde_json::from_value(source.config.clone()) .map_err(|e| anyhow!("Invalid WebDAV config: {}", e))?; info!("WebDAV source sync config: server_url={}, username={}, watch_folders={:?}, file_extensions={:?}, server_type={:?}", config.server_url, config.username, config.watch_folders, config.file_extensions, config.server_type); // Requests to list files in a Nextcloud folder might take > 2 minutes // Set timeout to 3 minutes to accommodate large folder structures let webdav_config = WebDAVConfig { server_url: config.server_url, username: config.username, password: config.password, watch_folders: config.watch_folders, file_extensions: config.file_extensions, timeout_seconds: 180, // 3 minutes for discover_files_in_folder operations server_type: config.server_type, }; let webdav_service = WebDAVService::new(webdav_config.clone()) .map_err(|e| anyhow!("Failed to create WebDAV service: {}", e))?; info!("WebDAV service created successfully, starting sync with {} folders", webdav_config.watch_folders.len()); self.perform_sync_internal_with_cancellation( source.user_id, source.id, &webdav_config.watch_folders, &webdav_config.file_extensions, enable_background_ocr, cancellation_token, |folder_path| { let service = webdav_service.clone(); async move { info!("WebDAV discover_files_in_folder called for: {}", folder_path); let result = service.discover_files_in_folder(&folder_path).await; match &result { Ok(files) => info!("WebDAV discovered {} files in folder: {}", files.len(), folder_path), Err(e) => error!("WebDAV discovery failed for folder {}: {}", folder_path, e), } result } }, |file_path| { let service = webdav_service.clone(); async move { info!("WebDAV download_file called for: {}", file_path); let result = service.download_file(&file_path).await; match &result { Ok(data) => info!("WebDAV downloaded {} bytes for file: {}", data.len(), file_path), Err(e) => error!("WebDAV download failed for file {}: {}", file_path, e), } result } } ).await } async fn sync_local_folder_source(&self, source: &Source, enable_background_ocr: bool) -> Result { self.sync_local_folder_source_with_cancellation(source, enable_background_ocr, CancellationToken::new()).await } async fn sync_local_folder_source_with_cancellation(&self, source: &Source, enable_background_ocr: bool, cancellation_token: CancellationToken) -> Result { let config: LocalFolderSourceConfig = serde_json::from_value(source.config.clone()) .map_err(|e| anyhow!("Invalid LocalFolder config: {}", e))?; let local_service = LocalFolderService::new(config.clone()) .map_err(|e| anyhow!("Failed to create LocalFolder service: {}", e))?; self.perform_sync_internal_with_cancellation( source.user_id, source.id, &config.watch_folders, &config.file_extensions, enable_background_ocr, cancellation_token, |folder_path| { let service = local_service.clone(); async move { service.discover_files_in_folder(&folder_path).await } }, |file_path| { let service = local_service.clone(); async move { service.read_file(&file_path).await } } ).await } async fn sync_s3_source(&self, source: &Source, enable_background_ocr: bool) -> Result { self.sync_s3_source_with_cancellation(source, enable_background_ocr, CancellationToken::new()).await } async fn sync_s3_source_with_cancellation(&self, source: &Source, enable_background_ocr: bool, cancellation_token: CancellationToken) -> Result { let config: S3SourceConfig = serde_json::from_value(source.config.clone()) .map_err(|e| anyhow!("Invalid S3 config: {}", e))?; let s3_service = S3Service::new(config.clone()).await .map_err(|e| anyhow!("Failed to create S3 service: {}", e))?; self.perform_sync_internal_with_cancellation( source.user_id, source.id, &config.watch_folders, &config.file_extensions, enable_background_ocr, cancellation_token, |folder_path| { let service = s3_service.clone(); async move { service.discover_files_in_folder(&folder_path).await } }, |file_path| { let service = s3_service.clone(); async move { service.download_file(&file_path).await } } ).await } async fn perform_sync_internal( &self, user_id: Uuid, source_id: Uuid, watch_folders: &[String], file_extensions: &[String], enable_background_ocr: bool, discover_files: F, download_file: D, ) -> Result where F: Fn(String) -> Fut1, D: Fn(String) -> Fut2 + Clone, Fut1: std::future::Future>>, Fut2: std::future::Future>>, { let mut total_files_processed = 0; for folder_path in watch_folders { info!("Syncing folder: {}", folder_path); // Discover files in the folder match discover_files(folder_path.clone()).await { Ok(files) => { info!("Found {} files in folder {}", files.len(), folder_path); // Filter files for processing let files_to_process: Vec<_> = files.into_iter() .filter(|file_info| { if file_info.is_directory { return false; } let file_extension = Path::new(&file_info.name) .extension() .and_then(|ext| ext.to_str()) .unwrap_or("") .to_lowercase(); file_extensions.contains(&file_extension) }) .collect(); info!("Processing {} files from folder {}", files_to_process.len(), folder_path); // Process files concurrently with a limit let concurrent_limit = 5; let semaphore = Arc::new(Semaphore::new(concurrent_limit)); let mut folder_files_processed = 0; let mut file_futures = FuturesUnordered::new(); for file_info in files_to_process.iter() { let state_clone = self.state.clone(); let file_info_clone = file_info.clone(); let semaphore_clone = semaphore.clone(); let download_file_clone = download_file.clone(); let future = async move { Self::process_single_file( state_clone, user_id, source_id, &file_info_clone, enable_background_ocr, semaphore_clone, download_file_clone, ).await }; file_futures.push(future); } // Process files concurrently while let Some(result) = file_futures.next().await { match result { Ok(processed) => { if processed { folder_files_processed += 1; info!("Successfully processed file ({} completed in this folder)", folder_files_processed); } } Err(error) => { error!("File processing error: {}", error); } } } total_files_processed += folder_files_processed; } Err(e) => { error!("Failed to discover files in folder {}: {}", folder_path, e); } } } info!("Source sync completed: {} files processed", total_files_processed); Ok(total_files_processed) } async fn perform_sync_internal_with_cancellation( &self, user_id: Uuid, source_id: Uuid, watch_folders: &[String], file_extensions: &[String], enable_background_ocr: bool, cancellation_token: CancellationToken, discover_files: F, download_file: D, ) -> Result where F: Fn(String) -> Fut1, D: Fn(String) -> Fut2 + Clone, Fut1: std::future::Future>>, Fut2: std::future::Future>>, { let mut total_files_processed = 0; let mut total_files_discovered = 0; let mut total_size_bytes = 0i64; // First pass: discover all files and calculate totals for folder_path in watch_folders { if cancellation_token.is_cancelled() { info!("Sync cancelled during folder discovery"); return Err(anyhow!("Sync cancelled")); } match discover_files(folder_path.clone()).await { Ok(files) => { let files_to_process: Vec<_> = files.into_iter() .filter(|file_info| { if file_info.is_directory { return false; } let file_extension = Path::new(&file_info.name) .extension() .and_then(|ext| ext.to_str()) .unwrap_or("") .to_lowercase(); file_extensions.contains(&file_extension) }) .collect(); total_files_discovered += files_to_process.len(); total_size_bytes += files_to_process.iter().map(|f| f.size).sum::(); } Err(e) => { error!("Failed to discover files in folder {}: {}", folder_path, e); } } } // Update initial statistics with discovered files if let Err(e) = self.state.db.update_source_sync_stats( source_id, 0, // files_synced starts at 0 total_files_discovered as i64, total_size_bytes, ).await { error!("Failed to update initial sync stats: {}", e); } // Second pass: process files and update stats progressively for folder_path in watch_folders { // Check for cancellation before processing each folder if cancellation_token.is_cancelled() { info!("Sync cancelled during folder processing"); return Err(anyhow!("Sync cancelled")); } info!("Syncing folder: {}", folder_path); // Discover files in the folder match discover_files(folder_path.clone()).await { Ok(files) => { if cancellation_token.is_cancelled() { info!("Sync cancelled after discovering files"); return Err(anyhow!("Sync cancelled")); } info!("Found {} files in folder {}", files.len(), folder_path); // Filter files for processing let files_to_process: Vec<_> = files.into_iter() .filter(|file_info| { if file_info.is_directory { return false; } let file_extension = Path::new(&file_info.name) .extension() .and_then(|ext| ext.to_str()) .unwrap_or("") .to_lowercase(); file_extensions.contains(&file_extension) }) .collect(); info!("Processing {} files from folder {}", files_to_process.len(), folder_path); // Process files concurrently with a limit let concurrent_limit = 5; let semaphore = Arc::new(Semaphore::new(concurrent_limit)); let mut folder_files_processed = 0; let mut file_futures = FuturesUnordered::new(); for file_info in files_to_process.iter() { // Check for cancellation before processing each file if cancellation_token.is_cancelled() { info!("Sync cancelled during file processing"); return Err(anyhow!("Sync cancelled")); } let state_clone = self.state.clone(); let file_info_clone = file_info.clone(); let semaphore_clone = semaphore.clone(); let download_file_clone = download_file.clone(); let cancellation_token_clone = cancellation_token.clone(); let future = async move { Self::process_single_file_with_cancellation( state_clone, user_id, source_id, &file_info_clone, enable_background_ocr, semaphore_clone, download_file_clone, cancellation_token_clone, ).await }; file_futures.push(future); } // Process files concurrently and update stats periodically while let Some(result) = file_futures.next().await { // Check for cancellation during processing if cancellation_token.is_cancelled() { info!("Sync cancelled during concurrent file processing"); return Err(anyhow!("Sync cancelled")); } match result { Ok(processed) => { if processed { folder_files_processed += 1; total_files_processed += 1; // Update statistics every 10 files processed or every file if under 10 total if total_files_processed % 10 == 0 || total_files_discovered <= 10 { let files_pending = total_files_discovered as i64 - total_files_processed as i64; if let Err(e) = self.state.db.update_source_sync_stats( source_id, total_files_processed as i64, files_pending.max(0), total_size_bytes, ).await { error!("Failed to update sync stats: {}", e); } } info!("Successfully processed file ({} completed in this folder, {} total)", folder_files_processed, total_files_processed); } } Err(error) => { error!("File processing error: {}", error); } } } } Err(e) => { error!("Failed to discover files in folder {}: {}", folder_path, e); } } } // Final statistics update if let Err(e) = self.state.db.update_source_sync_stats( source_id, total_files_processed as i64, 0, // All files are now processed total_size_bytes, ).await { error!("Failed to update final sync stats: {}", e); } info!("Source sync completed: {} files processed", total_files_processed); Ok(total_files_processed) } async fn process_single_file( state: Arc, user_id: Uuid, _source_id: Uuid, file_info: &FileInfo, enable_background_ocr: bool, semaphore: Arc, download_file: D, ) -> Result where D: Fn(String) -> Fut, Fut: std::future::Future>>, { let _permit = semaphore.acquire().await .map_err(|e| anyhow!("Semaphore error: {}", e))?; info!("Processing file: {}", file_info.path); // Check if we've already processed this file by looking for documents with same source // This is a simplified version - you might want to implement source-specific tracking tables // Download the file let file_data = download_file(file_info.path.clone()).await .map_err(|e| anyhow!("Failed to download {}: {}", file_info.path, e))?; info!("Downloaded file: {} ({} bytes)", file_info.name, file_data.len()); // Calculate file hash for deduplication let file_hash = Self::calculate_file_hash(&file_data); // Check for duplicate content if let Ok(existing_docs) = state.db.get_documents_by_user_with_role( user_id, crate::models::UserRole::User, 1000, 0 ).await { let matching_docs: Vec<_> = existing_docs.into_iter() .filter(|doc| doc.file_size == file_data.len() as i64) .collect(); for existing_doc in matching_docs { if let Ok(existing_file_data) = tokio::fs::read(&existing_doc.file_path).await { let existing_hash = Self::calculate_file_hash(&existing_file_data); if file_hash == existing_hash { info!("File content already exists, skipping: {}", file_info.path); return Ok(false); } } } } // Save file to disk let file_service = FileService::new(state.config.upload_path.clone()); let saved_file_path = file_service.save_file(&file_info.name, &file_data).await .map_err(|e| anyhow!("Failed to save {}: {}", file_info.name, e))?; // Create document record let document = file_service.create_document( &file_info.name, &file_info.name, &saved_file_path, file_data.len() as i64, &file_info.mime_type, user_id, ); let created_document = state.db.create_document(document).await .map_err(|e| anyhow!("Failed to create document {}: {}", file_info.name, e))?; info!("Created document record for {}: {}", file_info.name, created_document.id); // Queue for OCR if enabled if enable_background_ocr { info!("Background OCR enabled, queueing document {} for processing", created_document.id); match state.db.pool.acquire().await { Ok(_conn) => { let queue_service = crate::ocr_queue::OcrQueueService::new( state.db.clone(), state.db.pool.clone(), 4 ); let priority = if file_info.size <= 1024 * 1024 { 10 } else if file_info.size <= 5 * 1024 * 1024 { 8 } else if file_info.size <= 10 * 1024 * 1024 { 6 } else if file_info.size <= 50 * 1024 * 1024 { 4 } else { 2 }; if let Err(e) = queue_service.enqueue_document(created_document.id, priority, file_info.size).await { error!("Failed to enqueue document for OCR: {}", e); } else { info!("Enqueued document {} for OCR processing", created_document.id); } } Err(e) => { error!("Failed to connect to database for OCR queueing: {}", e); } } } Ok(true) } async fn process_single_file_with_cancellation( state: Arc, user_id: Uuid, _source_id: Uuid, file_info: &FileInfo, enable_background_ocr: bool, semaphore: Arc, download_file: D, cancellation_token: CancellationToken, ) -> Result where D: Fn(String) -> Fut, Fut: std::future::Future>>, { // Check for cancellation before starting file processing if cancellation_token.is_cancelled() { info!("File processing cancelled before starting: {}", file_info.path); return Err(anyhow!("Processing cancelled")); } let _permit = semaphore.acquire().await .map_err(|e| anyhow!("Semaphore error: {}", e))?; info!("Processing file: {}", file_info.path); // Check for cancellation again after acquiring semaphore if cancellation_token.is_cancelled() { info!("File processing cancelled after acquiring semaphore: {}", file_info.path); return Err(anyhow!("Processing cancelled")); } // Download the file let file_data = download_file(file_info.path.clone()).await .map_err(|e| anyhow!("Failed to download {}: {}", file_info.path, e))?; // Check for cancellation after download if cancellation_token.is_cancelled() { info!("File processing cancelled after download: {}", file_info.path); return Err(anyhow!("Processing cancelled")); } info!("Downloaded file: {} ({} bytes)", file_info.name, file_data.len()); // Calculate file hash for deduplication let file_hash = Self::calculate_file_hash(&file_data); // Check for duplicate content if let Ok(existing_docs) = state.db.get_documents_by_user_with_role( user_id, crate::models::UserRole::User, 1000, 0 ).await { let matching_docs: Vec<_> = existing_docs.into_iter() .filter(|doc| doc.file_size == file_data.len() as i64) .collect(); for existing_doc in matching_docs { if let Ok(existing_file_data) = tokio::fs::read(&existing_doc.file_path).await { let existing_hash = Self::calculate_file_hash(&existing_file_data); if file_hash == existing_hash { info!("File content already exists, skipping: {}", file_info.path); return Ok(false); } } } } // Check for cancellation before saving if cancellation_token.is_cancelled() { info!("File processing cancelled before saving: {}", file_info.path); return Err(anyhow!("Processing cancelled")); } // Save file to disk let file_service = FileService::new(state.config.upload_path.clone()); let saved_file_path = file_service.save_file(&file_info.name, &file_data).await .map_err(|e| anyhow!("Failed to save {}: {}", file_info.name, e))?; // Create document record let document = file_service.create_document( &file_info.name, &file_info.name, &saved_file_path, file_data.len() as i64, &file_info.mime_type, user_id, ); let created_document = state.db.create_document(document).await .map_err(|e| anyhow!("Failed to create document {}: {}", file_info.name, e))?; info!("Created document record for {}: {}", file_info.name, created_document.id); // Queue for OCR if enabled (OCR continues even if sync is cancelled) if enable_background_ocr { info!("Background OCR enabled, queueing document {} for processing", created_document.id); match state.db.pool.acquire().await { Ok(_conn) => { let queue_service = crate::ocr_queue::OcrQueueService::new( state.db.clone(), state.db.pool.clone(), 4 ); let priority = if file_info.size <= 1024 * 1024 { 10 } else if file_info.size <= 5 * 1024 * 1024 { 8 } else if file_info.size <= 10 * 1024 * 1024 { 6 } else if file_info.size <= 50 * 1024 * 1024 { 4 } else { 2 }; if let Err(e) = queue_service.enqueue_document(created_document.id, priority, file_info.size).await { error!("Failed to enqueue document for OCR: {}", e); } else { info!("Enqueued document {} for OCR processing", created_document.id); } } Err(e) => { error!("Failed to connect to database for OCR queueing: {}", e); } } } Ok(true) } async fn update_source_status(&self, source_id: Uuid, status: SourceStatus, error_message: Option<&str>) -> Result<()> { let query = if let Some(error) = error_message { sqlx::query( r#"UPDATE sources SET status = $2, last_error = $3, last_error_at = NOW(), updated_at = NOW() WHERE id = $1"# ) .bind(source_id) .bind(status.to_string()) .bind(error) } else { sqlx::query( r#"UPDATE sources SET status = $2, last_error = NULL, last_error_at = NULL, updated_at = NOW() WHERE id = $1"# ) .bind(source_id) .bind(status.to_string()) }; query.execute(self.state.db.get_pool()).await .map_err(|e| anyhow!("Failed to update source status: {}", e))?; Ok(()) } fn calculate_file_hash(data: &[u8]) -> String { let mut hasher = Sha256::new(); hasher.update(data); let result = hasher.finalize(); format!("{:x}", result) } }