use std::sync::Arc; use std::time::Duration; use std::collections::HashMap; use tokio::time::interval; use tokio::sync::RwLock; use tokio_util::sync::CancellationToken; use tracing::{error, info, warn}; use chrono::Utc; use uuid::Uuid; use sqlx::Row; use crate::{ AppState, models::{SourceType, LocalFolderSourceConfig, S3SourceConfig, WebDAVSourceConfig}, models::source::WebDAVTestConnection, }; use super::source_sync::SourceSyncService; struct SyncHealthAnalysis { score_penalty: i32, issues: Vec, } struct ErrorAnalysis { score_penalty: i32, issues: Vec, } pub struct SourceScheduler { state: Arc, sync_service: SourceSyncService, check_interval: Duration, // Track running sync tasks and their cancellation tokens running_syncs: Arc>>, } impl SourceScheduler { pub fn new(state: Arc) -> Self { let sync_service = SourceSyncService::new(state.clone()); Self { state, sync_service, check_interval: Duration::from_secs(60), // Check every minute for due syncs running_syncs: Arc::new(RwLock::new(HashMap::new())), } } pub async fn start(&self) { info!("Starting universal source sync scheduler"); // First, check for any interrupted syncs that need to be resumed if let Err(e) = self.resume_interrupted_syncs().await { error!("Error resuming interrupted syncs: {}", e); } let mut interval_timer = interval(self.check_interval); loop { interval_timer.tick().await; if let Err(e) = self.check_and_sync_sources().await { error!("Error in source sync scheduler: {}", e); } // Run periodic validation checks for all sources if let Err(e) = self.run_periodic_validations().await { error!("Error in periodic validation checks: {}", e); } } } async fn resume_interrupted_syncs(&self) -> Result<(), Box> { info!("Checking for interrupted source syncs to resume"); // First, reset any sources that are stuck in syncing state match self.state.db.reset_stuck_syncing_sources().await { Ok(reset_count) => { if reset_count > 0 { info!("Reset {} sources that were stuck in syncing state from previous session", reset_count); } } Err(e) => { error!("Failed to reset stuck syncing sources: {}", e); } } // Get all enabled sources that might have been interrupted let sources = match self.state.db.get_sources_for_sync().await { Ok(sources) => { info!("Successfully loaded {} sources from database for sync check", sources.len()); sources } Err(e) => { error!("Failed to load sources from database during startup: {}", e); return Err(e.into()); } }; for source in sources { info!("Processing source during startup check: ID={}, Name='{}', Type={}, Status={}", source.id, source.name, source.source_type.to_string(), source.status.to_string()); // Validate source configuration before attempting any operations if let Err(e) = self.validate_source_config(&source) { error!("❌ CONFIGURATION ERROR for source '{}' (ID: {}): {}", source.name, source.id, e); error!("Source config JSON: {}", serde_json::to_string_pretty(&source.config).unwrap_or_else(|_| "Invalid JSON".to_string())); continue; } // Sources are already reset to idle by reset_stuck_syncing_sources // We could add logic here to resume specific sources if needed info!("Source {} is now ready for normal scheduling", source.name); } Ok(()) } async fn check_and_sync_sources(&self) -> Result<(), Box> { // Get all sources that might need syncing let sources = self.state.db.get_sources_for_sync().await?; for source in sources { // Skip sources that are already in error status due to configuration issues if source.status == crate::models::SourceStatus::Error && source.last_error.as_ref().map(|e| e.contains("Configuration error")).unwrap_or(false) { // Only log this once every hour to reduce spam if source.last_error_at.map(|t| chrono::Utc::now() - t > chrono::Duration::hours(1)).unwrap_or(true) { warn!("⚠️ Skipping source '{}' (ID: {}) due to persistent configuration error: {}", source.name, source.id, source.last_error.as_ref().unwrap_or(&"Unknown error".to_string())); } continue; } // Validate source configuration before checking if sync is due if let Err(e) = self.validate_source_config(&source) { error!("❌ CONFIGURATION ERROR during background sync check for source '{}' (ID: {}): {}", source.name, source.id, e); error!("Source config JSON: {}", serde_json::to_string_pretty(&source.config).unwrap_or_else(|_| "Invalid JSON".to_string())); // Update source with error status if let Err(update_err) = sqlx::query( r#"UPDATE sources SET status = 'error', last_error = $1, last_error_at = NOW(), updated_at = NOW() WHERE id = $2"# ) .bind(format!("Configuration error: {}", e)) .bind(source.id) .execute(self.state.db.get_pool()) .await { error!("Failed to update source error status: {}", update_err); } continue; } // Check if sync is due for this source if self.is_sync_due(&source).await? { info!("Starting background sync for source: {} ({})", source.name, source.source_type); let sync_service = self.sync_service.clone(); let source_clone = source.clone(); let state_clone = self.state.clone(); let running_syncs_clone = self.running_syncs.clone(); // Create cancellation token for this sync let cancellation_token = CancellationToken::new(); // Register the sync task { let mut running_syncs = running_syncs_clone.write().await; running_syncs.insert(source.id, cancellation_token.clone()); } // Start sync in background task let sync_handle = tokio::spawn(async move { // Get user's OCR setting - simplified, you might want to store this in source config let enable_background_ocr = true; // Default to true, could be made configurable per source // Create progress tracker for this sync and register it let progress = Arc::new(crate::services::webdav::SyncProgress::new()); progress.set_phase(crate::services::webdav::SyncPhase::Initializing); state_clone.sync_progress_tracker.register_sync(source.id, progress.clone()); // Pass cancellation token to sync service match sync_service.sync_source_with_cancellation(&source_clone, enable_background_ocr, cancellation_token.clone()).await { Ok(files_processed) => { info!("Background sync completed for source {}: {} files processed", source_clone.name, files_processed); // Perform automatic validation check after sync completion if let Err(e) = Self::validate_source_health(&source_clone, &state_clone).await { error!("Failed to perform validation check: {}", e); } // Update last sync time if let Err(e) = sqlx::query( r#"UPDATE sources SET last_sync_at = NOW(), total_files_synced = total_files_synced + $2, updated_at = NOW() WHERE id = $1"# ) .bind(source_clone.id) .bind(files_processed as i64) .execute(state_clone.db.get_pool()) .await { error!("Failed to update source sync time: {}", e); } // Send notification if files were processed if files_processed > 0 { let notification = crate::models::CreateNotification { notification_type: "success".to_string(), title: "Source Sync Completed".to_string(), message: format!("Successfully processed {} files from {}", files_processed, source_clone.name), action_url: Some("/documents".to_string()), metadata: Some(serde_json::json!({ "source_type": source_clone.source_type.to_string(), "source_id": source_clone.id, "files_processed": files_processed })), }; if let Err(e) = state_clone.db.create_notification(source_clone.user_id, ¬ification).await { error!("Failed to create success notification: {}", e); } } } Err(e) => { error!("Background sync failed for source {}: {}", source_clone.name, e); // Send error notification let notification = crate::models::CreateNotification { notification_type: "error".to_string(), title: "Source Sync Failed".to_string(), message: format!("Sync failed for {}: {}", source_clone.name, e), action_url: Some("/sources".to_string()), metadata: Some(serde_json::json!({ "source_type": source_clone.source_type.to_string(), "source_id": source_clone.id, "error": e.to_string() })), }; if let Err(e) = state_clone.db.create_notification(source_clone.user_id, ¬ification).await { error!("Failed to create error notification: {}", e); } } } // Cleanup: Remove the sync from running list and unregister progress tracker { let mut running_syncs = running_syncs_clone.write().await; running_syncs.remove(&source_clone.id); } state_clone.sync_progress_tracker.unregister_sync(source_clone.id); }); } } Ok(()) } async fn is_sync_due(&self, source: &crate::models::Source) -> Result> { // Get sync interval from source config let sync_interval_minutes = match source.source_type { SourceType::WebDAV => { let config: WebDAVSourceConfig = serde_json::from_value(source.config.clone())?; if !config.auto_sync { return Ok(false); } config.sync_interval_minutes } SourceType::LocalFolder => { let config: LocalFolderSourceConfig = serde_json::from_value(source.config.clone())?; if !config.auto_sync { return Ok(false); } config.sync_interval_minutes } SourceType::S3 => { let config: S3SourceConfig = serde_json::from_value(source.config.clone())?; if !config.auto_sync { return Ok(false); } config.sync_interval_minutes } }; if sync_interval_minutes <= 0 { warn!("Invalid sync interval for source {}: {} minutes", source.name, sync_interval_minutes); return Ok(false); } // Check if a sync is already running if source.status.to_string() == "syncing" { info!("Sync already running for source {}", source.name); return Ok(false); } // Check last sync time if let Some(last_sync) = source.last_sync_at { let elapsed = Utc::now() - last_sync; let elapsed_minutes = elapsed.num_minutes(); if elapsed_minutes < sync_interval_minutes as i64 { // Only log this occasionally to avoid spam if elapsed_minutes % 10 == 0 { crate::debug_log!("SOURCE_SCHEDULER", "Sync not due for source {} (last sync {} minutes ago, interval {} minutes)", source.name, elapsed_minutes, sync_interval_minutes); } return Ok(false); } info!("Sync is due for source {} (last sync {} minutes ago, interval {} minutes)", source.name, elapsed_minutes, sync_interval_minutes); } else { info!("No previous sync found for source {}, sync is due", source.name); } // Sync is due Ok(true) } pub async fn trigger_sync(&self, source_id: uuid::Uuid) -> Result<(), Box> { info!("Triggering manual sync for source {}", source_id); // Check if sync is already running { let running_syncs = self.running_syncs.read().await; if running_syncs.contains_key(&source_id) { return Err("Sync already running for this source".into()); } } // First check if the source exists let source = match self.state.db.get_source_by_id(source_id).await? { Some(s) => s, None => return Err("Source not found".into()), }; // Validate source configuration before attempting sync if let Err(e) = self.validate_source_config(&source) { return Err(format!("Configuration error: {}", e).into()); } // Atomically start the sync - this prevents race conditions if !self.state.db.start_sync_atomic(source_id).await? { return Err("Could not start sync - source is already syncing".into()); } let sync_service = self.sync_service.clone(); let state_clone = self.state.clone(); let running_syncs_clone = self.running_syncs.clone(); // Create cancellation token for this sync let cancellation_token = CancellationToken::new(); // Register the sync task { let mut running_syncs = running_syncs_clone.write().await; running_syncs.insert(source_id, cancellation_token.clone()); } tokio::spawn(async move { let enable_background_ocr = true; // Could be made configurable // Create progress tracker for this sync and register it let progress = Arc::new(crate::services::webdav::SyncProgress::new()); progress.set_phase(crate::services::webdav::SyncPhase::Initializing); state_clone.sync_progress_tracker.register_sync(source_id, progress.clone()); // Ensure cleanup happens regardless of what happens in the sync operation let cleanup = || async { // Cleanup: Remove the sync from running list and unregister progress tracker { let mut running_syncs = running_syncs_clone.write().await; running_syncs.remove(&source_id); } state_clone.sync_progress_tracker.unregister_sync(source_id); }; // Execute the sync operation with a timeout to prevent hanging let sync_result = tokio::time::timeout( std::time::Duration::from_secs(300), // 5 minute timeout for sync operations sync_service.sync_source_with_cancellation(&source, enable_background_ocr, cancellation_token) ).await; match sync_result { Ok(Ok(files_processed)) => { info!("Manual sync completed for source {}: {} files processed", source.name, files_processed); // Atomically complete the sync if let Err(e) = state_clone.db.complete_sync_atomic( source_id, true, Some(files_processed as i64), None ).await { error!("Failed to atomically complete sync: {}", e); // Fallback to manual status update - force to idle let _ = sqlx::query( "UPDATE sources SET status = 'idle', last_error = NULL, last_error_at = NULL, updated_at = NOW() WHERE id = $1" ) .bind(source_id) .execute(state_clone.db.get_pool()) .await; } } Ok(Err(e)) => { error!("Manual sync failed for source {}: {}", source.name, e); // Atomically mark sync as failed if let Err(complete_err) = state_clone.db.complete_sync_atomic( source_id, false, None, Some(&format!("Sync failed: {}", e)) ).await { error!("Failed to atomically mark sync as failed: {}", complete_err); // Fallback to manual status update - force to error state let error_msg = format!("Sync failed: {}", e); let _ = sqlx::query( "UPDATE sources SET status = 'error', last_error = $2, last_error_at = NOW(), updated_at = NOW() WHERE id = $1" ) .bind(source_id) .bind(error_msg) .execute(state_clone.db.get_pool()) .await; } } Err(_timeout) => { error!("Manual sync timed out for source {}", source.name); // Handle timeout by resetting to error state let error_msg = "Sync operation timed out"; if let Err(complete_err) = state_clone.db.complete_sync_atomic( source_id, false, None, Some(error_msg) ).await { error!("Failed to atomically mark sync as timed out: {}", complete_err); // Fallback to manual status update - force to error state let _ = sqlx::query( "UPDATE sources SET status = 'error', last_error = $2, last_error_at = NOW(), updated_at = NOW() WHERE id = $1" ) .bind(source_id) .bind(error_msg) .execute(state_clone.db.get_pool()) .await; } } } cleanup().await; }); Ok(()) } pub async fn stop_sync(&self, source_id: Uuid) -> Result<(), Box> { info!("Stopping sync for source {}", source_id); // Get the cancellation token for this sync let cancellation_token = { let running_syncs = self.running_syncs.read().await; running_syncs.get(&source_id).cloned() }; if let Some(token) = cancellation_token { // Cancel the sync operation token.cancel(); info!("Cancellation signal sent for source {}", source_id); // Atomically update status to cancelled if let Err(e) = self.state.db.update_source_status_atomic( source_id, Some(crate::models::SourceStatus::Syncing), crate::models::SourceStatus::Idle, Some("Sync cancelled by user") ).await { error!("Failed to update source status after cancellation: {}", e); } // Immediately unregister from progress tracker to update UI self.state.sync_progress_tracker.unregister_sync(source_id); // Remove from running syncs list { let mut running_syncs = self.running_syncs.write().await; running_syncs.remove(&source_id); } info!("Sync cancellation completed for source {}", source_id); Ok(()) } else { Err("No running sync found for this source".into()) } } /// Force reset a source that may be stuck in syncing state /// This is used as a fail-safe mechanism for race conditions pub async fn force_reset_source(&self, source_id: Uuid) -> Result<(), Box> { info!("Force resetting potentially stuck source {}", source_id); // Remove from running syncs list { let mut running_syncs = self.running_syncs.write().await; running_syncs.remove(&source_id); } // Unregister from progress tracker self.state.sync_progress_tracker.unregister_sync(source_id); // Force reset database status to idle if let Err(e) = sqlx::query( "UPDATE sources SET status = 'idle', last_error = 'Force reset due to stuck sync', last_error_at = NOW(), updated_at = NOW() WHERE id = $1 AND status = 'syncing'" ) .bind(source_id) .execute(self.state.db.get_pool()) .await { error!("Failed to force reset source status: {}", e); return Err(e.into()); } info!("Source {} force reset completed", source_id); Ok(()) } /// Validates a source configuration and provides detailed error messages for debugging fn validate_source_config(&self, source: &crate::models::Source) -> Result<(), String> { use crate::models::{SourceType, WebDAVSourceConfig, S3SourceConfig, LocalFolderSourceConfig}; match source.source_type { SourceType::WebDAV => { // Attempt to deserialize WebDAV config let config: WebDAVSourceConfig = serde_json::from_value(source.config.clone()) .map_err(|e| format!("Failed to parse WebDAV configuration JSON: {}", e))?; // Validate server URL format self.validate_webdav_url(&config.server_url, &source.name)?; // Additional WebDAV validations if config.username.trim().is_empty() { return Err(format!("WebDAV username cannot be empty")); } if config.password.trim().is_empty() { return Err(format!("WebDAV password cannot be empty")); } if config.watch_folders.is_empty() { return Err(format!("WebDAV watch_folders cannot be empty")); } Ok(()) } SourceType::S3 => { let _config: S3SourceConfig = serde_json::from_value(source.config.clone()) .map_err(|e| format!("Failed to parse S3 configuration JSON: {}", e))?; Ok(()) } SourceType::LocalFolder => { let _config: LocalFolderSourceConfig = serde_json::from_value(source.config.clone()) .map_err(|e| format!("Failed to parse Local Folder configuration JSON: {}", e))?; Ok(()) } } } /// Validates WebDAV server URL and provides specific error messages fn validate_webdav_url(&self, server_url: &str, source_name: &str) -> Result<(), String> { if server_url.trim().is_empty() { return Err(format!("WebDAV server_url is empty")); } // Normalize URL by adding protocol if missing (consistent with WebDAVConfig) let normalized_url = crate::services::webdav::config::WebDAVConfig::normalize_server_url(server_url); // Try to parse the normalized URL to catch other issues match reqwest::Url::parse(&normalized_url) { Ok(url) => { if url.scheme() != "http" && url.scheme() != "https" { return Err(format!( "WebDAV server_url has invalid scheme '{}'. Only 'http' and 'https' are supported. \ Current URL: '{}'", url.scheme(), normalized_url )); } if url.host_str().is_none() { return Err(format!( "WebDAV server_url is missing hostname. \ Current URL: '{}'. \ Example: https://cloud.example.com", normalized_url )); } crate::debug_log!("SOURCE_SCHEDULER", "✅ WebDAV URL validation passed for source '{}': {} (normalized to: {})", source_name, server_url, normalized_url); Ok(()) } Err(e) => { Err(format!( "WebDAV server_url is not a valid URL: {}. \ Current value: '{}' (normalized to: '{}'). \ The URL must be absolute and include the full domain. \ Examples: \ - https://cloud.example.com \ - http://192.168.1.100:8080/webdav \ - https://nextcloud.mydomain.com", e, server_url, normalized_url )) } } } /// Check if a deep scan should be triggered based on sync results async fn check_and_trigger_deep_scan( source: &crate::models::Source, files_processed: usize, state: &Arc, ) -> Result<(), Box> { // Get sync history for intelligent decision making let recent_syncs = sqlx::query( r#" SELECT COUNT(*) as sync_count, SUM(CASE WHEN total_files_synced = 0 THEN 1 ELSE 0 END) as empty_sync_count, MAX(last_sync_at) as last_sync, MIN(last_sync_at) as first_sync FROM ( SELECT total_files_synced, last_sync_at FROM sources WHERE id = $1 ORDER BY last_sync_at DESC LIMIT 10 ) recent_syncs "# ) .bind(source.id) .fetch_one(state.db.get_pool()) .await?; // Get last deep scan time let last_deep_scan = sqlx::query( r#" SELECT MAX(created_at) as last_deep_scan FROM notifications WHERE user_id = $1 AND metadata->>'source_id' = $2 AND metadata->>'scan_type' = 'deep_scan' AND notification_type = 'success' "# ) .bind(source.user_id) .bind(source.id.to_string()) .fetch_one(state.db.get_pool()) .await?; let mut should_trigger_deep_scan = false; let mut reason = String::new(); // Trigger conditions: // 1. If the last 5+ syncs found no files, something might be wrong let empty_sync_count: i64 = recent_syncs.try_get("empty_sync_count").unwrap_or(0); if empty_sync_count >= 5 { should_trigger_deep_scan = true; reason = "Multiple consecutive syncs found no files - deep scan needed to verify directory structure".to_string(); } // 2. If we haven't done a deep scan in over 7 days let last_deep_time: Option> = last_deep_scan.try_get("last_deep_scan").ok(); if let Some(last_deep) = last_deep_time { let days_since_deep_scan = (chrono::Utc::now() - last_deep).num_days(); if days_since_deep_scan > 7 { should_trigger_deep_scan = true; reason = format!("No deep scan in {} days - periodic verification needed", days_since_deep_scan); } } // 3. If this is the first sync ever (no deep scan history) let sync_count: i64 = recent_syncs.try_get("sync_count").unwrap_or(0); if last_deep_time.is_none() && sync_count <= 1 { should_trigger_deep_scan = true; reason = "First sync completed - deep scan recommended for initial directory discovery".to_string(); } // 4. If sync found files but we've been getting inconsistent results else if files_processed > 0 { // Check for erratic sync patterns (alternating between finding files and not) let erratic_check = sqlx::query( r#" SELECT COUNT(DISTINCT CASE WHEN total_files_synced > 0 THEN 1 ELSE 0 END) as distinct_states FROM ( SELECT total_files_synced FROM sources WHERE id = $1 ORDER BY last_sync_at DESC LIMIT 5 ) recent "# ) .bind(source.id) .fetch_one(state.db.get_pool()) .await?; let distinct_states: i64 = erratic_check.try_get("distinct_states").unwrap_or(0); if distinct_states > 1 { should_trigger_deep_scan = true; reason = "Inconsistent sync results detected - deep scan needed for stability".to_string(); } } if should_trigger_deep_scan { info!("🎯 Intelligent deep scan trigger activated for source {}: {}", source.name, reason); // Create notification about automatic deep scan let notification = crate::models::CreateNotification { notification_type: "info".to_string(), title: "Automatic Deep Scan Triggered".to_string(), message: format!("Starting deep scan for {}: {}", source.name, reason), action_url: Some("/sources".to_string()), metadata: Some(serde_json::json!({ "source_type": source.source_type.to_string(), "source_id": source.id, "scan_type": "deep_scan", "trigger_reason": reason, "automatic": true })), }; if let Err(e) = state.db.create_notification(source.user_id, ¬ification).await { error!("Failed to create deep scan notification: {}", e); } // Trigger the deep scan via the API endpoint // We'll reuse the existing deep scan logic from the sources route let webdav_config: WebDAVSourceConfig = serde_json::from_value(source.config.clone())?; let webdav_service = crate::services::webdav::WebDAVService::new( crate::services::webdav::WebDAVConfig { server_url: webdav_config.server_url.clone(), username: webdav_config.username.clone(), password: webdav_config.password.clone(), watch_folders: webdav_config.watch_folders.clone(), file_extensions: webdav_config.file_extensions.clone(), timeout_seconds: 600, // 10 minutes for deep scan server_type: webdav_config.server_type.clone(), } )?; // Run smart deep scan in background let source_clone = source.clone(); let state_clone = state.clone(); tokio::spawn(async move { // Use smart sync service for automatic deep scans let smart_sync_service = crate::services::webdav::SmartSyncService::new(state_clone.clone()); let mut all_files_to_process = Vec::new(); let mut total_directories_tracked = 0; // Process all watch folders using smart deep scan for watch_folder in &webdav_config.watch_folders { match smart_sync_service.perform_smart_sync( source_clone.user_id, Some(source_clone.id), &webdav_service, watch_folder, crate::services::webdav::SmartSyncStrategy::FullDeepScan, // Force deep scan for automatic triggers None // No progress tracking for automatic syncs ).await { Ok(sync_result) => { all_files_to_process.extend(sync_result.files); total_directories_tracked += sync_result.directories.len(); } Err(e) => { error!("Automatic smart deep scan failed for watch folder {}: {}", watch_folder, e); } } } info!("🎉 Automatic smart deep scan completed for {}: {} files found, {} directories tracked", source_clone.name, all_files_to_process.len(), total_directories_tracked); // Process the files if any were found let files_processed = if !all_files_to_process.is_empty() { let total_files = all_files_to_process.len(); // Filter and process files as in the manual deep scan let files_to_process: Vec<_> = all_files_to_process.into_iter() .filter(|file_info| { if file_info.is_directory { return false; } let file_extension = std::path::Path::new(&file_info.name) .extension() .and_then(|ext| ext.to_str()) .unwrap_or("") .to_lowercase(); webdav_config.file_extensions.contains(&file_extension) }) .collect(); let processed_count = files_to_process.len(); if let Err(e) = crate::routes::webdav::webdav_sync::process_files_for_deep_scan( state_clone.clone(), source_clone.user_id, &webdav_service, &files_to_process, true, // enable background OCR Some(source_clone.id) ).await { error!("Failed to process files from automatic deep scan: {}", e); } processed_count } else { 0 }; // Success notification let notification = crate::models::CreateNotification { notification_type: "success".to_string(), title: "Automatic Deep Scan Completed".to_string(), message: format!("Deep scan of {} completed successfully", source_clone.name), action_url: Some("/documents".to_string()), metadata: Some(serde_json::json!({ "source_type": source_clone.source_type.to_string(), "source_id": source_clone.id, "scan_type": "deep_scan", "automatic": true, "files_found": files_processed })), }; if let Err(e) = state_clone.db.create_notification(source_clone.user_id, ¬ification).await { error!("Failed to create success notification: {}", e); } }); } Ok(()) } /// Perform automatic validation of source health and connectivity pub async fn validate_source_health( source: &crate::models::Source, state: &Arc, ) -> Result<(), Box> { info!("🔍 Starting validation check for source: {}", source.name); let mut validation_score = 100; let mut validation_issues = Vec::::new(); let mut validation_status = "healthy"; // 1. Configuration validation if let Err(config_error) = Self::validate_source_config_detailed(source) { validation_score -= 30; validation_status = "critical"; validation_issues.push(serde_json::json!({ "type": "configuration", "severity": "critical", "message": format!("Configuration error: {}", config_error), "recommendation": "Check and fix source configuration in settings" })); } // 2. Connectivity validation match source.source_type { crate::models::SourceType::WebDAV => { if let Err(e) = Self::validate_webdav_connectivity(source).await { validation_score -= 25; if validation_status == "healthy" { validation_status = "warning"; } validation_issues.push(serde_json::json!({ "type": "connectivity", "severity": "warning", "message": format!("WebDAV connectivity issue: {}", e), "recommendation": "Check server URL, credentials, and network connectivity" })); } } crate::models::SourceType::LocalFolder => { if let Err(e) = Self::validate_local_folder_access(source).await { validation_score -= 25; if validation_status == "healthy" { validation_status = "warning"; } validation_issues.push(serde_json::json!({ "type": "connectivity", "severity": "warning", "message": format!("Local folder access issue: {}", e), "recommendation": "Check folder permissions and path accessibility" })); } } crate::models::SourceType::S3 => { if let Err(e) = Self::validate_s3_connectivity(source).await { validation_score -= 25; if validation_status == "healthy" { validation_status = "warning"; } validation_issues.push(serde_json::json!({ "type": "connectivity", "severity": "warning", "message": format!("S3 connectivity issue: {}", e), "recommendation": "Check AWS credentials, bucket access, and permissions" })); } } } // 3. Sync pattern analysis if let Ok(sync_health) = Self::analyze_sync_patterns(source, state).await { validation_score -= sync_health.score_penalty; if sync_health.score_penalty > 15 && validation_status == "healthy" { validation_status = "warning"; } for issue in sync_health.issues { validation_issues.push(issue); } } // 4. Error rate analysis if let Ok(error_analysis) = Self::analyze_error_patterns(source, state).await { validation_score -= error_analysis.score_penalty; if error_analysis.score_penalty > 20 { validation_status = "warning"; } for issue in error_analysis.issues { validation_issues.push(issue); } } // Cap the minimum score at 0 validation_score = validation_score.max(0); // Update validation status in database let validation_issues_json = serde_json::to_string(&validation_issues) .unwrap_or_else(|_| "[]".to_string()); if let Err(e) = sqlx::query( r#" UPDATE sources SET validation_status = $1, last_validation_at = NOW(), validation_score = $2, validation_issues = $3, updated_at = NOW() WHERE id = $4 "# ) .bind(validation_status) .bind(validation_score) .bind(validation_issues_json) .bind(source.id) .execute(state.db.get_pool()) .await { error!("Failed to update validation status: {}", e); } // Send notification if there are critical issues if validation_status == "critical" || validation_score < 50 { let notification = crate::models::CreateNotification { notification_type: if validation_status == "critical" { "error" } else { "warning" }.to_string(), title: format!("Source Validation {}", if validation_status == "critical" { "Failed" } else { "Warning" }), message: format!("Source {} has validation issues (score: {})", source.name, validation_score), action_url: Some("/sources".to_string()), metadata: Some(serde_json::json!({ "source_type": source.source_type.to_string(), "source_id": source.id, "validation_type": "health_check", "validation_score": validation_score, "validation_status": validation_status, "issue_count": validation_issues.len() })), }; if let Err(e) = state.db.create_notification(source.user_id, ¬ification).await { error!("Failed to create validation notification: {}", e); } } info!("✅ Validation completed for {}: {} (score: {})", source.name, validation_status, validation_score); Ok(()) } fn validate_source_config_detailed(source: &crate::models::Source) -> Result<(), String> { // Reuse existing validation logic but return more detailed errors Self::validate_source_config_static(source) } fn validate_source_config_static(source: &crate::models::Source) -> Result<(), String> { use crate::models::{SourceType, WebDAVSourceConfig, S3SourceConfig, LocalFolderSourceConfig}; match source.source_type { SourceType::WebDAV => { let config: WebDAVSourceConfig = serde_json::from_value(source.config.clone()) .map_err(|e| format!("Failed to parse WebDAV configuration: {}", e))?; if config.server_url.trim().is_empty() { return Err("WebDAV server URL is empty".to_string()); } if config.username.trim().is_empty() { return Err("WebDAV username is empty".to_string()); } if config.password.trim().is_empty() { return Err("WebDAV password is empty".to_string()); } if config.watch_folders.is_empty() { return Err("WebDAV watch folders list is empty".to_string()); } Ok(()) } SourceType::S3 => { let _config: S3SourceConfig = serde_json::from_value(source.config.clone()) .map_err(|e| format!("Failed to parse S3 configuration: {}", e))?; Ok(()) } SourceType::LocalFolder => { let _config: LocalFolderSourceConfig = serde_json::from_value(source.config.clone()) .map_err(|e| format!("Failed to parse Local Folder configuration: {}", e))?; Ok(()) } } } async fn validate_webdav_connectivity(source: &crate::models::Source) -> Result<(), String> { use crate::models::WebDAVSourceConfig; let config: WebDAVSourceConfig = serde_json::from_value(source.config.clone()) .map_err(|e| format!("Config parse error: {}", e))?; let webdav_config = crate::services::webdav::WebDAVConfig { server_url: config.server_url.clone(), username: config.username.clone(), password: config.password.clone(), watch_folders: config.watch_folders.clone(), file_extensions: config.file_extensions.clone(), timeout_seconds: 30, // Quick connectivity test server_type: config.server_type.clone(), }; let webdav_service = crate::services::webdav::WebDAVService::new(webdav_config) .map_err(|e| format!("Service creation failed: {}", e))?; let test_config = WebDAVTestConnection { server_url: config.server_url, username: config.username, password: config.password, server_type: config.server_type, }; crate::services::webdav::WebDAVService::test_connection_with_config(&test_config).await .map_err(|e| format!("Connection test failed: {}", e))?; Ok(()) } async fn validate_local_folder_access(_source: &crate::models::Source) -> Result<(), String> { // Simplified local folder validation - could be enhanced // For now, just return OK as local folders are validated differently Ok(()) } async fn validate_s3_connectivity(_source: &crate::models::Source) -> Result<(), String> { // Simplified S3 validation - could be enhanced with actual AWS SDK calls // For now, just return OK as S3 validation requires more complex setup Ok(()) } async fn analyze_sync_patterns( source: &crate::models::Source, state: &Arc ) -> Result> { let mut score_penalty = 0; let mut issues = Vec::new(); // Check recent sync history let sync_stats = sqlx::query( r#" SELECT COUNT(*) as total_syncs, SUM(CASE WHEN total_files_synced = 0 THEN 1 ELSE 0 END) as empty_syncs, MAX(last_sync_at) as last_sync, AVG(total_files_synced) as avg_files_per_sync FROM sources WHERE id = $1 AND last_sync_at >= NOW() - INTERVAL '7 days' "# ) .bind(source.id) .fetch_one(state.db.get_pool()) .await?; let total_syncs: i64 = sync_stats.try_get("total_syncs").unwrap_or(0); let empty_syncs: i64 = sync_stats.try_get("empty_syncs").unwrap_or(0); if total_syncs > 0 { let empty_sync_ratio = (empty_syncs as f64) / (total_syncs as f64); if empty_sync_ratio > 0.8 { score_penalty += 20; issues.push(serde_json::json!({ "type": "sync_pattern", "severity": "warning", "message": format!("High empty sync ratio: {:.1}% of recent syncs found no files", empty_sync_ratio * 100.0), "recommendation": "This may indicate connectivity issues or that the source has no new content" })); } if total_syncs < 2 && chrono::Utc::now().signed_duration_since(source.created_at).num_days() > 1 { score_penalty += 10; issues.push(serde_json::json!({ "type": "sync_pattern", "severity": "info", "message": "Very few syncs performed since source creation", "recommendation": "Consider enabling auto-sync or manually triggering sync to ensure content is up to date" })); } } Ok(SyncHealthAnalysis { score_penalty, issues }) } async fn analyze_error_patterns( source: &crate::models::Source, _state: &Arc ) -> Result> { let mut score_penalty = 0; let mut issues = Vec::new(); // Check if source has recent errors if let Some(last_error_at) = source.last_error_at { let hours_since_error = chrono::Utc::now().signed_duration_since(last_error_at).num_hours(); if hours_since_error < 24 { score_penalty += 15; issues.push(serde_json::json!({ "type": "error_pattern", "severity": "warning", "message": format!("Recent error occurred {} hours ago", hours_since_error), "recommendation": format!("Last error: {}", source.last_error.as_deref().unwrap_or("Unknown error")) })); } } // Check if source is in error state if source.status == crate::models::SourceStatus::Error { score_penalty += 25; issues.push(serde_json::json!({ "type": "error_pattern", "severity": "critical", "message": "Source is currently in error state", "recommendation": "Review and fix the configuration or connectivity issues" })); } Ok(ErrorAnalysis { score_penalty, issues }) } async fn run_periodic_validations(&self) -> Result<(), Box> { // Get all enabled sources let sources = self.state.db.get_sources_for_sync().await?; for source in sources { // Only validate if it's been more than 30 minutes since last validation let should_validate = if let Some(last_validation) = source.last_validation_at { chrono::Utc::now().signed_duration_since(last_validation).num_minutes() > 30 } else { true // Never validated before }; if should_validate && source.enabled { info!("Running periodic validation for source: {}", source.name); // Run validation in background to avoid blocking let source_clone = source.clone(); let state_clone = self.state.clone(); tokio::spawn(async move { if let Err(e) = Self::validate_source_health(&source_clone, &state_clone).await { error!("Periodic validation failed for source {}: {}", source_clone.name, e); } }); } } Ok(()) } }