diff --git a/Cargo.toml b/Cargo.toml index 1909acc..eaf69bf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,9 +15,6 @@ path = "src/bin/test_runner.rs" name = "analyze-webdav-performance" path = "src/bin/analyze-webdav-performance.rs" -[[bin]] -name = "webdav_loop_detection_stress" -path = "tests/stress/webdav_loop_detection_stress.rs" diff --git a/src/routes/sources/estimation.rs b/src/routes/sources/estimation.rs index 22d2c20..1c5279b 100644 --- a/src/routes/sources/estimation.rs +++ b/src/routes/sources/estimation.rs @@ -95,7 +95,6 @@ async fn estimate_webdav_crawl_internal( file_extensions: config.file_extensions.clone(), timeout_seconds: 300, server_type: config.server_type.clone(), - loop_detection: crate::services::webdav::loop_detection::LoopDetectionConfig::default(), }; // Create WebDAV service and estimate crawl diff --git a/src/routes/sources/sync.rs b/src/routes/sources/sync.rs index f2e4f11..7cd4ff5 100644 --- a/src/routes/sources/sync.rs +++ b/src/routes/sources/sync.rs @@ -240,7 +240,6 @@ pub async fn trigger_deep_scan( file_extensions: config.file_extensions.clone(), timeout_seconds: 600, // 10 minutes for deep scan server_type: config.server_type.clone(), - loop_detection: crate::services::webdav::LoopDetectionConfig::default(), }; let webdav_service = crate::services::webdav::WebDAVService::new(webdav_config.clone()) diff --git a/src/routes/webdav.rs b/src/routes/webdav.rs index 4383cad..6ac7d59 100644 --- a/src/routes/webdav.rs +++ b/src/routes/webdav.rs @@ -72,7 +72,6 @@ async fn get_user_webdav_config(state: &Arc, user_id: uuid::Uuid) -> R file_extensions: settings.webdav_file_extensions, timeout_seconds: 300, // 5 minutes timeout for crawl estimation server_type: Some("nextcloud".to_string()), // Default to Nextcloud - loop_detection: crate::services::webdav::loop_detection::LoopDetectionConfig::default(), }) } @@ -108,7 +107,6 @@ async fn test_webdav_connection( file_extensions: Vec::new(), timeout_seconds: 300, // 5 minutes timeout for crawl estimation server_type: test_config.server_type.clone(), - loop_detection: crate::services::webdav::loop_detection::LoopDetectionConfig::default(), }; // Create WebDAV service and test connection diff --git a/src/scheduling/source_scheduler.rs b/src/scheduling/source_scheduler.rs index e754e22..5022e04 100644 --- a/src/scheduling/source_scheduler.rs +++ b/src/scheduling/source_scheduler.rs @@ -745,7 +745,6 @@ impl SourceScheduler { file_extensions: webdav_config.file_extensions.clone(), timeout_seconds: 600, // 10 minutes for deep scan server_type: webdav_config.server_type.clone(), - loop_detection: crate::services::webdav::loop_detection::LoopDetectionConfig::default(), } )?; @@ -1033,7 +1032,6 @@ impl SourceScheduler { file_extensions: config.file_extensions.clone(), timeout_seconds: 30, // Quick connectivity test server_type: config.server_type.clone(), - loop_detection: crate::services::webdav::loop_detection::LoopDetectionConfig::default(), }; let webdav_service = crate::services::webdav::WebDAVService::new(webdav_config) diff --git a/src/scheduling/source_sync.rs b/src/scheduling/source_sync.rs index 980e2bb..c369a2a 100644 --- a/src/scheduling/source_sync.rs +++ b/src/scheduling/source_sync.rs @@ -109,7 +109,6 @@ impl SourceSyncService { file_extensions: config.file_extensions, timeout_seconds: 180, // 3 minutes for discover_files_in_folder operations server_type: config.server_type, - loop_detection: crate::services::webdav::loop_detection::LoopDetectionConfig::default(), }; let webdav_service = WebDAVService::new(webdav_config.clone()) diff --git a/src/scheduling/webdav_scheduler.rs b/src/scheduling/webdav_scheduler.rs index dfab9bb..f38ec20 100644 --- a/src/scheduling/webdav_scheduler.rs +++ b/src/scheduling/webdav_scheduler.rs @@ -275,7 +275,6 @@ impl WebDAVScheduler { file_extensions: settings.webdav_file_extensions.clone(), timeout_seconds: 30, server_type: Some("nextcloud".to_string()), - loop_detection: crate::services::webdav::loop_detection::LoopDetectionConfig::default(), }) } diff --git a/src/services/source_error_tracker.rs b/src/services/source_error_tracker.rs index d2a4a54..5b924c7 100644 --- a/src/services/source_error_tracker.rs +++ b/src/services/source_error_tracker.rs @@ -130,39 +130,92 @@ impl SourceErrorTracker { source_id: Option, resource_path: &str, ) -> Result { - // Check if there are any failures for this resource path - match self.db.is_source_known_failure(user_id, source_type, source_id, resource_path).await { - Ok(true) => { - // There is a known failure, but we need more details - // For now, implement a simple cooldown based on the fact that there's a failure - // In a real implementation, we'd need a method that returns failure details - let skip_reason = format!("Resource has previous failures recorded in system"); - - info!( - "⏭️ Skipping {} resource '{}' due to error tracking: {}", - source_type, resource_path, skip_reason - ); - - Ok(SkipDecision { - should_skip: true, - reason: skip_reason, - failure_count: 1, // We don't have exact count, use 1 as placeholder - time_since_last_failure_minutes: 0, // We don't have exact time - cooldown_remaining_minutes: Some(60), // Default 1 hour cooldown - }) - } - Ok(false) => { - debug!( - "✅ No previous failures for {} resource '{}', proceeding with scan", - source_type, resource_path - ); - Ok(SkipDecision { - should_skip: false, - reason: "No previous failures recorded".to_string(), - failure_count: 0, - time_since_last_failure_minutes: 0, - cooldown_remaining_minutes: None, - }) + // Get detailed failure information from the database + let query = crate::models::ListFailuresQuery { + source_type: Some(source_type), + source_id: source_id, + error_type: None, + severity: None, + include_resolved: Some(false), // Only unresolved failures + include_excluded: Some(true), // Include user-excluded resources + ready_for_retry: None, + limit: Some(1), + offset: None, + }; + + match self.db.list_source_scan_failures(user_id, &query).await { + Ok(failures) => { + if let Some(failure) = failures.into_iter() + .find(|f| f.resource_path == resource_path) { + + // Check if this failure should be skipped based on sophisticated database logic + let now = chrono::Utc::now(); + let time_since_last = now.signed_duration_since(failure.last_failure_at); + let time_since_last_minutes = time_since_last.num_minutes(); + + // Calculate cooldown remaining if there's a next retry time + let cooldown_remaining = if let Some(next_retry) = failure.next_retry_at { + let remaining = next_retry.signed_duration_since(now); + if remaining.num_seconds() > 0 { + Some(remaining.num_minutes().max(0)) + } else { + None + } + } else { + None + }; + + // Use the same sophisticated logic as the database query + let should_skip = failure.user_excluded || + (matches!(failure.error_severity, crate::models::SourceErrorSeverity::Critical | crate::models::SourceErrorSeverity::High) + && failure.failure_count > 3) || + cooldown_remaining.is_some(); + + let reason = if failure.user_excluded { + "Resource has been manually excluded by user".to_string() + } else if matches!(failure.error_severity, crate::models::SourceErrorSeverity::Critical | crate::models::SourceErrorSeverity::High) + && failure.failure_count > 3 { + format!("Resource has failed {} times with {} severity - giving up", + failure.failure_count, failure.error_severity) + } else if let Some(cooldown_mins) = cooldown_remaining { + format!("Resource is in cooldown period ({} minutes remaining)", cooldown_mins) + } else { + "Resource has unresolved failures but is ready for retry".to_string() + }; + + if should_skip { + info!( + "⏭️ Skipping {} resource '{}' due to error tracking: {}", + source_type, resource_path, reason + ); + } else { + debug!( + "🔄 Allowing retry of {} resource '{}' after {} failures", + source_type, resource_path, failure.failure_count + ); + } + + Ok(SkipDecision { + should_skip, + reason, + failure_count: failure.failure_count, + time_since_last_failure_minutes: time_since_last_minutes, + cooldown_remaining_minutes: cooldown_remaining, + }) + } else { + // No failure record found for this specific path + debug!( + "✅ No previous failures for {} resource '{}', proceeding with scan", + source_type, resource_path + ); + Ok(SkipDecision { + should_skip: false, + reason: "No previous failures recorded".to_string(), + failure_count: 0, + time_since_last_failure_minutes: 0, + cooldown_remaining_minutes: None, + }) + } } Err(e) => { warn!( @@ -407,11 +460,14 @@ impl SourceErrorTracker { _ => crate::models::RetryStrategy::Exponential, }; - let retry_delay = match error_type { - SourceErrorType::RateLimited => 600, // 10 minutes for rate limits - SourceErrorType::NetworkError => 60, // 1 minute for network issues - SourceErrorType::Timeout => 900, // 15 minutes for timeouts - _ => 300, // 5 minutes default + let (retry_delay, max_retries) = match error_type { + SourceErrorType::RateLimited => (600, 3), // 10 minutes, 3 retries max + SourceErrorType::NetworkError => (60, 5), // 1 minute, 5 retries max + SourceErrorType::Timeout => (300, 3), // 5 minutes, 3 retries max + SourceErrorType::ServerError => (300, 3), // 5 minutes, 3 retries max + SourceErrorType::NotFound => (0, 0), // Don't retry 404s + SourceErrorType::PermissionDenied => (0, 0), // Don't retry permission errors + _ => (300, 3), // 5 minutes default, 3 retries }; ErrorClassification { @@ -419,7 +475,7 @@ impl SourceErrorTracker { severity, retry_strategy, retry_delay_seconds: retry_delay, - max_retries: 5, + max_retries, user_friendly_message: format!("Error accessing resource: {}", error), recommended_action: "The system will retry this operation automatically.".to_string(), diagnostic_data: serde_json::json!({ diff --git a/src/services/webdav/config.rs b/src/services/webdav/config.rs index 0f15434..f498072 100644 --- a/src/services/webdav/config.rs +++ b/src/services/webdav/config.rs @@ -1,5 +1,4 @@ -use super::loop_detection::LoopDetectionConfig; /// WebDAV server configuration #[derive(Debug, Clone)] @@ -11,7 +10,6 @@ pub struct WebDAVConfig { pub file_extensions: Vec, pub timeout_seconds: u64, pub server_type: Option, // "nextcloud", "owncloud", "generic" - pub loop_detection: LoopDetectionConfig, } /// Retry configuration for WebDAV operations @@ -103,7 +101,6 @@ impl WebDAVConfig { file_extensions, timeout_seconds: 30, server_type: None, - loop_detection: LoopDetectionConfig::default(), } } diff --git a/src/services/webdav/loop_detection.rs b/src/services/webdav/loop_detection.rs deleted file mode 100644 index 11ae7fb..0000000 --- a/src/services/webdav/loop_detection.rs +++ /dev/null @@ -1,1232 +0,0 @@ -use std::collections::{HashMap, VecDeque}; -use std::sync::Arc; -use std::time::Duration; -use anyhow::{Result, anyhow}; -use tracing::{debug, warn, error, info}; -use serde::{Serialize, Deserialize}; -use uuid::Uuid; -use tokio::sync::{Mutex, RwLock}; -use chrono::{DateTime, Utc}; -use tokio::time::timeout; - -/// Configuration for loop detection behavior -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct LoopDetectionConfig { - /// Enable loop detection (default: true) - pub enabled: bool, - - /// Maximum number of times a directory can be accessed within the time window (default: 3) - pub max_access_count: usize, - - /// Time window for tracking directory accesses in seconds (default: 300 = 5 minutes) - pub time_window_secs: u64, - - /// Maximum time a directory scan can take before being flagged as stuck (default: 60 seconds) - pub max_scan_duration_secs: u64, - - /// Minimum time between directory scans to avoid immediate re-scan loops (default: 5 seconds) - pub min_scan_interval_secs: u64, - - /// Maximum depth for circular pattern detection (default: 10) - pub max_pattern_depth: usize, - - /// Maximum number of directories to track simultaneously (default: 1000, reduced from 10000) - pub max_tracked_directories: usize, - - /// Enable pattern analysis for A->B->A type cycles (default: true) - pub enable_pattern_analysis: bool, - - /// Log level for loop detection events (default: "warn") - pub log_level: String, - - /// Circuit breaker failure threshold before auto-disabling detection (default: 5) - pub circuit_breaker_failure_threshold: u32, - - /// Circuit breaker timeout before re-enabling detection in seconds (default: 300) - pub circuit_breaker_timeout_secs: u64, - - /// Enable graceful degradation when detection fails (default: true) - pub enable_graceful_degradation: bool, - - /// Maximum time to wait for mutex acquisition in milliseconds (default: 100) - pub mutex_timeout_ms: u64, -} - -impl Default for LoopDetectionConfig { - fn default() -> Self { - Self { - enabled: true, - max_access_count: 3, - time_window_secs: 300, - max_scan_duration_secs: 60, - min_scan_interval_secs: 5, - max_pattern_depth: 10, - max_tracked_directories: 1000, // Reduced from 10000 for better memory management - enable_pattern_analysis: true, - log_level: "warn".to_string(), - circuit_breaker_failure_threshold: 5, - circuit_breaker_timeout_secs: 300, - enable_graceful_degradation: true, - mutex_timeout_ms: 100, - } - } -} - -/// Represents a directory access event -#[derive(Debug, Clone)] -struct DirectoryAccess { - /// Path of the directory being accessed - path: String, - /// When the access started - started_at: DateTime, - /// When the access completed (None if still in progress) - completed_at: Option>, - /// Unique ID for this access - access_id: Uuid, - /// Operation type (scan, discovery, etc.) - operation: String, - /// Whether this access resulted in an error - error: Option, - /// Number of files found during this access - files_found: Option, - /// Number of subdirectories found during this access - subdirs_found: Option, -} - -/// Loop detection findings -#[derive(Debug, Clone, Serialize)] -pub struct LoopDetectionResult { - /// Whether a loop was detected - pub loop_detected: bool, - /// Type of loop detected - pub loop_type: Option, - /// Problematic directory path - pub problem_path: Option, - /// Detailed description of the issue - pub description: String, - /// Access pattern that led to detection - pub access_pattern: Vec, - /// Metrics about the detected issue - pub metrics: LoopMetrics, - /// Recommendations for resolving the issue - pub recommendations: Vec, - /// Timestamp when the loop was detected - pub detected_at: DateTime, - /// Suggested actions for the user - pub suggested_actions: Vec, - /// Whether this is a critical issue that requires immediate attention - pub is_critical: bool, -} - - -/// Types of loops that can be detected -#[derive(Debug, Clone, Serialize)] -pub enum LoopType { - /// Same directory accessed too frequently - FrequentReAccess, - /// Directory scan is taking too long - StuckScan, - /// Immediate re-scan of the same directory - ImmediateReScan, - /// Circular pattern detected (A->B->A or A->B->C->A) - CircularPattern, - /// Too many concurrent accesses to the same directory - ConcurrentAccess, -} - -/// Metrics about loop detection -#[derive(Debug, Clone, Serialize)] -pub struct LoopMetrics { - /// Number of accesses to the problematic path - pub access_count: usize, - /// Time span of the problematic accesses - pub time_span_secs: f64, - /// Average scan duration - pub avg_scan_duration_secs: f64, - /// Total files found across all accesses - pub total_files_found: usize, - /// Total subdirectories found across all accesses - pub total_subdirs_found: usize, - /// Number of failed accesses - pub failed_accesses: usize, -} - -/// Circuit breaker state for graceful degradation -#[derive(Debug, Clone)] -pub struct CircuitBreakerState { - pub failures: u32, - pub last_failure_time: Option>, - pub is_open: bool, -} - -/// Performance metrics for loop detection instrumentation -#[derive(Debug, Clone, Serialize)] -pub struct InstrumentationMetrics { - pub total_operations: u64, - pub avg_operation_duration_ms: f64, - pub max_operation_duration_ms: f64, - pub memory_usage_bytes: usize, - pub cache_hit_rate: f64, -} - -/// Internal state for tracking directory accesses -#[derive(Debug)] -struct LoopDetectionState { - /// Active directory accesses (path -> access info) - active_accesses: HashMap, - /// Historical accesses within the time window - access_history: VecDeque, - /// Pattern tracking for circular detection - now with bounded size - access_patterns: HashMap>, - /// Last access time for each directory - last_access_times: HashMap>, - /// Performance metrics - total_accesses: u64, - total_loops_detected: u64, - /// Configuration reference - config: LoopDetectionConfig, - /// Circuit breaker for graceful degradation - circuit_breaker: CircuitBreakerState, - /// Instrumentation metrics - instrumentation_metrics: InstrumentationMetrics, - /// Last cleanup timestamp - last_cleanup: DateTime, -} - -/// Main loop detection service -#[derive(Debug, Clone)] -pub struct LoopDetectionService { - state: Arc>, - /// Read-write lock for configuration updates - config: Arc>, -} - -impl LoopDetectionService { - /// Create a new loop detection service with default configuration - pub fn new() -> Self { - Self::with_config(LoopDetectionConfig::default()) - } - - /// Create a new loop detection service with custom configuration - pub fn with_config(config: LoopDetectionConfig) -> Self { - let now = Utc::now(); - let state = LoopDetectionState { - active_accesses: HashMap::new(), - access_history: VecDeque::new(), - access_patterns: HashMap::new(), - last_access_times: HashMap::new(), - total_accesses: 0, - total_loops_detected: 0, - config: config.clone(), - circuit_breaker: CircuitBreakerState { - failures: 0, - last_failure_time: None, - is_open: false, - }, - instrumentation_metrics: InstrumentationMetrics { - total_operations: 0, - avg_operation_duration_ms: 0.0, - max_operation_duration_ms: 0.0, - memory_usage_bytes: 0, - cache_hit_rate: 0.0, - }, - last_cleanup: now, - }; - - Self { - state: Arc::new(Mutex::new(state)), - config: Arc::new(RwLock::new(config)), - } - } - - /// Start tracking a directory access - pub async fn start_access(&self, path: &str, operation: &str) -> Result { - let config = self.config.read().await; - if !config.enabled { - return Ok(Uuid::new_v4()); // Return dummy ID when disabled - } - drop(config); - - let operation_start = Utc::now(); - - // Use timeout to prevent deadlocks - let mut state = match timeout( - Duration::from_millis(self.get_mutex_timeout().await), - self.state.lock() - ).await { - Ok(state) => state, - Err(_) => { - warn!("Loop detection mutex timeout for path '{}' - enabling graceful degradation", path); - if self.should_use_graceful_degradation().await { - return Ok(Uuid::new_v4()); // Return dummy ID and continue - } else { - return Err(anyhow!("Loop detection service unavailable: mutex timeout")); - } - } - }; - - // Check circuit breaker - if state.circuit_breaker.is_open { - if let Some(last_failure) = state.circuit_breaker.last_failure_time { - let config = self.config.read().await; - let timeout_duration = chrono::Duration::seconds(config.circuit_breaker_timeout_secs as i64); - if Utc::now().signed_duration_since(last_failure) > timeout_duration { - // Reset circuit breaker - state.circuit_breaker.is_open = false; - state.circuit_breaker.failures = 0; - info!("Loop detection circuit breaker reset for path '{}'" , path); - } else { - debug!("Loop detection circuit breaker is open for path '{}' - skipping detection", path); - return Ok(Uuid::new_v4()); // Return dummy ID when circuit breaker is open - } - } - } - - let access_id = Uuid::new_v4(); - let now = Utc::now(); - - // Update instrumentation metrics - state.instrumentation_metrics.total_operations += 1; - - // Periodic cleanup to prevent memory leaks - if now.signed_duration_since(state.last_cleanup).num_seconds() > 60 { - self.cleanup_state_internal(&mut state, now).await; - state.last_cleanup = now; - } - - // Check for immediate re-scan - if let Some(last_access) = state.last_access_times.get(path) { - let time_since_last = now.signed_duration_since(*last_access); - if time_since_last.num_seconds() < state.config.min_scan_interval_secs as i64 { - let result = LoopDetectionResult { - loop_detected: true, - loop_type: Some(LoopType::ImmediateReScan), - problem_path: Some(path.to_string()), - description: format!( - "Directory '{}' re-accessed after only {:.2}s (minimum interval: {}s)", - path, time_since_last.num_seconds(), state.config.min_scan_interval_secs - ), - access_pattern: vec![path.to_string()], - metrics: self.calculate_metrics(&state, path), - recommendations: vec![ - "Review your sync schedule - scanning this directory too frequently".to_string(), - "Check if multiple sync processes are running simultaneously".to_string(), - "Consider increasing the minimum scan interval in settings".to_string(), - ], - suggested_actions: vec![ - "Wait at least 5 seconds before rescanning the same directory".to_string(), - "Check your WebDAV sync configuration for conflicts".to_string(), - ], - detected_at: now, - is_critical: false, - }; - self.log_loop_detection(&result); - state.total_loops_detected += 1; - - // Track operation duration - let operation_duration = Utc::now().signed_duration_since(operation_start).num_milliseconds() as f64; - self.update_instrumentation_metrics(&mut state, operation_duration); - - return Err(anyhow!("Loop detected: {}. {}", result.description, result.suggested_actions.join(". "))); - } - } - - // Check for concurrent access to the same directory - if state.active_accesses.contains_key(path) { - let result = LoopDetectionResult { - loop_detected: true, - loop_type: Some(LoopType::ConcurrentAccess), - problem_path: Some(path.to_string()), - description: format!("Multiple simultaneous scans detected for directory '{}'", path), - access_pattern: vec![path.to_string()], - metrics: self.calculate_metrics(&state, path), - recommendations: vec![ - "This indicates a synchronization issue in your application".to_string(), - "Multiple sync processes may be running simultaneously".to_string(), - "Check your scheduling configuration".to_string(), - ], - suggested_actions: vec![ - "Stop any other running sync operations".to_string(), - "Review your sync schedule to prevent overlaps".to_string(), - "Contact support if this continues to occur".to_string(), - ], - detected_at: now, - is_critical: true, - }; - self.log_loop_detection(&result); - state.total_loops_detected += 1; - - // Track operation duration - let operation_duration = Utc::now().signed_duration_since(operation_start).num_milliseconds() as f64; - self.update_instrumentation_metrics(&mut state, operation_duration); - - return Err(anyhow!("Critical sync issue detected: {}. {}", result.description, result.suggested_actions.join(". "))); - } - - // Check access frequency - if let Some(loop_result) = self.check_access_frequency(&state, path, now) { - state.total_loops_detected += 1; - self.log_loop_detection(&loop_result); - - // Track operation duration - let operation_duration = Utc::now().signed_duration_since(operation_start).num_milliseconds() as f64; - self.update_instrumentation_metrics(&mut state, operation_duration); - - return Err(anyhow!("Sync loop detected: {}. {}", loop_result.description, loop_result.suggested_actions.join(". "))); - } - - // Check circular patterns - if state.config.enable_pattern_analysis { - if let Some(loop_result) = self.check_circular_patterns(&state, path, now) { - state.total_loops_detected += 1; - self.log_loop_detection(&loop_result); - - // Track operation duration - let operation_duration = Utc::now().signed_duration_since(operation_start).num_milliseconds() as f64; - self.update_instrumentation_metrics(&mut state, operation_duration); - - return Err(anyhow!("Circular sync pattern detected: {}. {}", loop_result.description, loop_result.suggested_actions.join(". "))); - } - } - - // Record the access - let access = DirectoryAccess { - path: path.to_string(), - started_at: now, - completed_at: None, - access_id, - operation: operation.to_string(), - error: None, - files_found: None, - subdirs_found: None, - }; - - state.active_accesses.insert(path.to_string(), access); - state.last_access_times.insert(path.to_string(), now); - state.total_accesses += 1; - - // Clean up old history to prevent memory growth - self.cleanup_old_history(&mut state, now); - - // Track operation duration - let operation_duration = Utc::now().signed_duration_since(operation_start).num_milliseconds() as f64; - self.update_instrumentation_metrics(&mut state, operation_duration); - - debug!("[{}] Started tracking access to '{}' with operation '{}'", access_id, path, operation); - Ok(access_id) - } - - /// Complete tracking a directory access - pub async fn complete_access( - &self, - access_id: Uuid, - files_found: Option, - subdirs_found: Option, - error: Option - ) -> Result<()> { - let config = self.config.read().await; - if !config.enabled { - return Ok(()); - } - drop(config); - - let operation_start = Utc::now(); - - // Use timeout to prevent deadlocks - let mut state = match timeout( - Duration::from_millis(self.get_mutex_timeout().await), - self.state.lock() - ).await { - Ok(state) => state, - Err(_) => { - warn!("Loop detection mutex timeout for access_id '{}' - enabling graceful degradation", access_id); - if self.should_use_graceful_degradation().await { - return Ok(()); // Silently continue when graceful degradation is enabled - } else { - return Err(anyhow!("Loop detection service unavailable: mutex timeout")); - } - } - }; - - let now = Utc::now(); - - // Handle circuit breaker failures - let operation_result: Result<()> = (|| { - // Find the access and collect information - let mut access_info = None; - let _max_scan_duration_secs = state.config.max_scan_duration_secs; - let _enable_pattern_analysis = state.config.enable_pattern_analysis; - let _max_pattern_depth = state.config.max_pattern_depth; - - // First pass: find and update the access - for (path, access) in state.active_accesses.iter_mut() { - if access.access_id == access_id { - access.completed_at = Some(now); - access.files_found = files_found; - access.subdirs_found = subdirs_found; - access.error = error.clone(); - - let duration = now.signed_duration_since(access.started_at); - - // Collect info for later processing - access_info = Some(( - path.clone(), - duration, - access.clone(), - )); - - debug!("[{}] Completed access to '{}' in {:.2}s, found {} files, {} subdirs", - access_id, path, duration.num_milliseconds() as f64 / 1000.0, - files_found.unwrap_or(0), subdirs_found.unwrap_or(0)); - break; - } - } - Ok(()) - })(); - - // Handle circuit breaker on operation failure - if operation_result.is_err() { - state.circuit_breaker.failures += 1; - state.circuit_breaker.last_failure_time = Some(now); - - if state.circuit_breaker.failures >= state.config.circuit_breaker_failure_threshold { - state.circuit_breaker.is_open = true; - warn!("Loop detection circuit breaker opened due to {} failures", state.circuit_breaker.failures); - } - } else { - // Reset failure count on successful operation - if state.circuit_breaker.failures > 0 { - state.circuit_breaker.failures = 0; - } - } - - // Find the access and update it if found - let mut access_info = None; - let max_scan_duration_secs = state.config.max_scan_duration_secs; - let enable_pattern_analysis = state.config.enable_pattern_analysis; - let max_pattern_depth = state.config.max_pattern_depth; - - // Look for the access to complete - for (path, access) in state.active_accesses.iter_mut() { - if access.access_id == access_id { - access.completed_at = Some(now); - access.files_found = files_found; - access.subdirs_found = subdirs_found; - access.error = error.clone(); - - let duration = now.signed_duration_since(access.started_at); - access_info = Some((path.clone(), duration, access.clone())); - - debug!("[{}] Completed access to '{}' in {:.2}s, found {} files, {} subdirs", - access_id, path, duration.num_milliseconds() as f64 / 1000.0, - files_found.unwrap_or(0), subdirs_found.unwrap_or(0)); - break; - } - } - - // Process the completed access - if let Some((path, duration, access)) = access_info { - // Check if this access took too long - if duration.num_seconds() > max_scan_duration_secs as i64 { - let result = LoopDetectionResult { - loop_detected: true, - loop_type: Some(LoopType::StuckScan), - problem_path: Some(path.clone()), - description: format!( - "Directory scan is taking too long: '{}' has been scanning for {:.1}s (limit: {}s)", - path, duration.num_milliseconds() as f64 / 1000.0, max_scan_duration_secs - ), - access_pattern: vec![path.clone()], - metrics: self.calculate_metrics(&state, &path), - recommendations: vec![ - "This directory may contain too many files or have connectivity issues".to_string(), - "Check your network connection to the WebDAV server".to_string(), - "Consider excluding large directories from sync if they're not needed".to_string(), - ], - suggested_actions: vec![ - "Wait for the current scan to complete or cancel it".to_string(), - "Check if the directory contains an unusually large number of files".to_string(), - "Consider increasing timeout settings if this directory is expected to be large".to_string(), - ], - detected_at: now, - is_critical: false, - }; - self.log_loop_detection(&result); - state.total_loops_detected += 1; - } - - // Move from active to history - state.active_accesses.remove(&path); - state.access_history.push_back(access.clone()); - - // Update pattern tracking with better memory management - if enable_pattern_analysis && state.access_patterns.len() < state.config.max_tracked_directories { - let pattern = state.access_patterns.entry(path.clone()) - .or_insert_with(VecDeque::new); - pattern.push_back(path); - if pattern.len() > max_pattern_depth { - pattern.pop_front(); - } - } - } - - // Track operation duration - let operation_duration = Utc::now().signed_duration_since(operation_start).num_milliseconds() as f64; - self.update_instrumentation_metrics(&mut state, operation_duration); - - operation_result - } - - /// Get current loop detection metrics - pub async fn get_metrics(&self) -> Result { - let config = self.config.read().await; - if !config.enabled { - return Ok(serde_json::json!({ - "enabled": false, - "message": "Loop detection is disabled" - })); - } - drop(config); - - // Use timeout to prevent deadlocks - let state = match timeout( - Duration::from_millis(self.get_mutex_timeout().await), - self.state.lock() - ).await { - Ok(state) => state, - Err(_) => { - return Ok(serde_json::json!({ - "enabled": true, - "error": "Service temporarily unavailable", - "message": "Metrics cannot be retrieved due to high load" - })); - } - }; - - Ok(serde_json::json!({ - "enabled": true, - "total_accesses": state.total_accesses, - "total_loops_detected": state.total_loops_detected, - "active_accesses": state.active_accesses.len(), - "history_size": state.access_history.len(), - "tracked_patterns": state.access_patterns.len(), - "circuit_breaker": { - "is_open": state.circuit_breaker.is_open, - "failures": state.circuit_breaker.failures, - "last_failure": state.circuit_breaker.last_failure_time - }, - "instrumentation": state.instrumentation_metrics, - "memory_usage_estimated_bytes": self.estimate_memory_usage(&state) - })) - } - - /// Check if loop detection is enabled - pub async fn is_enabled(&self) -> bool { - let config = self.config.read().await; - config.enabled - } - - /// Update configuration - pub async fn update_config(&self, new_config: LoopDetectionConfig) -> Result<()> { - let mut config = self.config.write().await; - *config = new_config.clone(); - - // Also update the config in state for backward compatibility - let state_result = timeout( - Duration::from_millis(100), // Short timeout for config updates - self.state.lock() - ).await; - - if let Ok(mut state) = state_result { - state.config = new_config; - info!("Loop detection configuration updated successfully"); - } else { - warn!("Could not update state config due to lock contention, but main config is updated"); - } - - Ok(()) - } - - /// Clear all tracking data (useful for testing) - pub async fn clear_state(&self) -> Result<()> { - let mut state = match timeout( - Duration::from_millis(self.get_mutex_timeout().await), - self.state.lock() - ).await { - Ok(state) => state, - Err(_) => return Err(anyhow!("Could not clear state: service unavailable")), - }; - - state.active_accesses.clear(); - state.access_history.clear(); - state.access_patterns.clear(); - state.last_access_times.clear(); - - // Reset circuit breaker - state.circuit_breaker = CircuitBreakerState { - failures: 0, - last_failure_time: None, - is_open: false, - }; - - // Reset instrumentation metrics - state.instrumentation_metrics = InstrumentationMetrics { - total_operations: 0, - avg_operation_duration_ms: 0.0, - max_operation_duration_ms: 0.0, - memory_usage_bytes: 0, - cache_hit_rate: 0.0, - }; - - debug!("Loop detection state cleared"); - Ok(()) - } - - // New helper methods for enhanced functionality - - /// Get mutex timeout from configuration - async fn get_mutex_timeout(&self) -> u64 { - let config = self.config.read().await; - config.mutex_timeout_ms - } - - /// Check if graceful degradation should be used - async fn should_use_graceful_degradation(&self) -> bool { - let config = self.config.read().await; - config.enable_graceful_degradation - } - - /// Update instrumentation metrics - fn update_instrumentation_metrics(&self, state: &mut LoopDetectionState, operation_duration_ms: f64) { - // Update instrumentation metrics - let memory_usage = self.estimate_memory_usage(state); - - let metrics = &mut state.instrumentation_metrics; - - // Update average operation duration - let total_ops = metrics.total_operations as f64; - if total_ops > 0.0 { - metrics.avg_operation_duration_ms = - (metrics.avg_operation_duration_ms * total_ops + operation_duration_ms) / (total_ops + 1.0); - } else { - metrics.avg_operation_duration_ms = operation_duration_ms; - } - - // Update max operation duration - if operation_duration_ms > metrics.max_operation_duration_ms { - metrics.max_operation_duration_ms = operation_duration_ms; - } - - // Update memory usage estimate - metrics.memory_usage_bytes = memory_usage; - } - - /// Estimate memory usage of the current state - fn estimate_memory_usage(&self, state: &LoopDetectionState) -> usize { - let mut size = std::mem::size_of::(); - - // Estimate HashMap and VecDeque sizes - size += state.active_accesses.len() * (std::mem::size_of::() + std::mem::size_of::()); - size += state.access_history.len() * std::mem::size_of::(); - - for (key, pattern) in &state.access_patterns { - size += key.len(); - size += pattern.len() * std::mem::size_of::(); - for path in pattern { - size += path.len(); - } - } - - size += state.last_access_times.len() * (std::mem::size_of::() + std::mem::size_of::>()); - - size - } - - /// Enhanced cleanup that also manages memory bounds - async fn cleanup_state_internal(&self, state: &mut LoopDetectionState, now: DateTime) { - // Clean up old history - let time_window = chrono::Duration::seconds(state.config.time_window_secs as i64); - let cutoff_time = now - time_window; - - // Remove old access history - while let Some(access) = state.access_history.front() { - if access.started_at < cutoff_time { - state.access_history.pop_front(); - } else { - break; - } - } - - // Aggressively clean up pattern tracking when approaching memory limits - if state.access_patterns.len() > state.config.max_tracked_directories { - let excess = state.access_patterns.len() - state.config.max_tracked_directories; - - // Remove patterns that haven't been accessed recently - let mut patterns_to_remove = Vec::new(); - for (path, _pattern) in &state.access_patterns { - if let Some(last_access) = state.last_access_times.get(path) { - if now.signed_duration_since(*last_access) > time_window { - patterns_to_remove.push(path.clone()); - if patterns_to_remove.len() >= excess { - break; - } - } - } - } - - // If we still have too many patterns, remove the oldest ones - if patterns_to_remove.len() < excess { - let mut remaining_patterns: Vec<_> = state.access_patterns.keys().cloned().collect(); - remaining_patterns.sort_by(|a, b| { - let time_a = state.last_access_times.get(a).unwrap_or(&cutoff_time); - let time_b = state.last_access_times.get(b).unwrap_or(&cutoff_time); - time_a.cmp(time_b) - }); - - for path in remaining_patterns.into_iter().take(excess - patterns_to_remove.len()) { - patterns_to_remove.push(path); - } - } - - for path in patterns_to_remove { - state.access_patterns.remove(&path); - } - } - - // Clean up last access times - let paths_to_remove: Vec = state.last_access_times - .iter() - .filter(|(_, &time)| time < cutoff_time) - .map(|(path, _)| path.clone()) - .collect(); - - for path in paths_to_remove { - state.last_access_times.remove(&path); - } - - debug!("Cleanup completed: {} active, {} history, {} patterns, {} last_access", - state.active_accesses.len(), - state.access_history.len(), - state.access_patterns.len(), - state.last_access_times.len()); - } - - // Private helper methods - - fn check_access_frequency( - &self, - state: &LoopDetectionState, - path: &str, - now: DateTime - ) -> Option { - let time_window = chrono::Duration::seconds(state.config.time_window_secs as i64); - let cutoff_time = now - time_window; - - let recent_accesses: Vec<_> = state.access_history - .iter() - .filter(|access| { - access.path == path && - access.started_at >= cutoff_time && - access.completed_at.is_some() - }) - .collect(); - - if recent_accesses.len() >= state.config.max_access_count { - let first_access_time = recent_accesses.first().unwrap().started_at; - let time_span = now.signed_duration_since(first_access_time); - - return Some(LoopDetectionResult { - loop_detected: true, - loop_type: Some(LoopType::FrequentReAccess), - problem_path: Some(path.to_string()), - description: format!( - "Directory '{}' has been scanned {} times in the last {:.1} minutes (limit: {} scans per {} minutes)", - path, recent_accesses.len(), time_span.num_minutes() as f64, - state.config.max_access_count, state.config.time_window_secs / 60 - ), - access_pattern: recent_accesses.iter().map(|a| a.path.clone()).collect(), - metrics: self.calculate_metrics_from_accesses(&recent_accesses), - recommendations: vec![ - "This directory is being scanned too frequently".to_string(), - "Check if multiple sync processes are running".to_string(), - "Review your sync schedule settings".to_string(), - ], - suggested_actions: vec![ - "Reduce sync frequency for this directory".to_string(), - "Check for duplicate sync configurations".to_string(), - "Consider excluding this directory if it changes infrequently".to_string(), - ], - detected_at: now, - is_critical: false, - }); - } - - None - } - - fn check_circular_patterns( - &self, - state: &LoopDetectionState, - path: &str, - now: DateTime - ) -> Option { - if let Some(pattern) = state.access_patterns.get(path) { - // Look for simple A->A patterns - if pattern.len() >= 2 && pattern.back() == Some(&path.to_string()) { - if let Some(second_last) = pattern.get(pattern.len() - 2) { - if second_last == path { - return Some(LoopDetectionResult { - loop_detected: true, - loop_type: Some(LoopType::CircularPattern), - problem_path: Some(path.to_string()), - description: format!("Circular directory access pattern detected for '{}'", path), - access_pattern: pattern.iter().cloned().collect(), - metrics: self.calculate_metrics(state, path), - recommendations: vec![ - "This indicates a potential infinite loop in directory scanning".to_string(), - "Check if the directory structure has circular references".to_string(), - "Verify that symbolic links are handled correctly".to_string(), - ], - suggested_actions: vec![ - "Stop the current sync operation".to_string(), - "Check for symbolic links that might create loops".to_string(), - "Contact support if this directory should not have circular references".to_string(), - ], - detected_at: now, - is_critical: true, - }); - } - } - } - - // Look for longer patterns like A->B->A or A->B->C->A - if pattern.len() >= 3 { - let pattern_vec: Vec = pattern.iter().cloned().collect(); - for i in 0..pattern_vec.len().saturating_sub(2) { - if pattern_vec[i] == path { - for j in (i + 2)..pattern_vec.len() { - if pattern_vec[j] == path { - let cycle: Vec = pattern_vec[i..=j].to_vec(); - return Some(LoopDetectionResult { - loop_detected: true, - loop_type: Some(LoopType::CircularPattern), - problem_path: Some(path.to_string()), - description: format!( - "Complex circular pattern detected: {} (involves {} directories)", - cycle.join(" → "), cycle.len() - ), - access_pattern: cycle.clone(), - metrics: self.calculate_metrics(state, path), - recommendations: vec![ - "Multiple directories are creating a circular reference".to_string(), - "This may indicate an issue with directory structure or symbolic links".to_string(), - "Review the directory hierarchy for unexpected links".to_string(), - ], - suggested_actions: vec![ - "Stop the sync and examine the directory structure".to_string(), - format!("Check these directories for circular links: {}", cycle.join(", ")), - "Consider excluding problematic directories from sync".to_string(), - ], - detected_at: now, - is_critical: true, - }); - } - } - } - } - } - } - - None - } - - fn calculate_metrics(&self, state: &LoopDetectionState, path: &str) -> LoopMetrics { - let accesses: Vec<_> = state.access_history - .iter() - .filter(|access| access.path == path) - .collect(); - - self.calculate_metrics_from_accesses(&accesses) - } - - fn calculate_metrics_from_accesses(&self, accesses: &[&DirectoryAccess]) -> LoopMetrics { - if accesses.is_empty() { - return LoopMetrics { - access_count: 0, - time_span_secs: 0.0, - avg_scan_duration_secs: 0.0, - total_files_found: 0, - total_subdirs_found: 0, - failed_accesses: 0, - }; - } - - let first_time = accesses.first().unwrap().started_at; - let last_time = accesses.last().unwrap().started_at; - let time_span = last_time.signed_duration_since(first_time); - - let total_duration_ms: i64 = accesses - .iter() - .filter_map(|access| { - access.completed_at.map(|end| end.signed_duration_since(access.started_at).num_milliseconds()) - }) - .sum(); - - let completed_count = accesses.iter().filter(|a| a.completed_at.is_some()).count(); - let avg_duration = if completed_count > 0 { - total_duration_ms as f64 / 1000.0 / completed_count as f64 - } else { - 0.0 - }; - - LoopMetrics { - access_count: accesses.len(), - time_span_secs: time_span.num_milliseconds() as f64 / 1000.0, - avg_scan_duration_secs: avg_duration, - total_files_found: accesses.iter().filter_map(|a| a.files_found).sum(), - total_subdirs_found: accesses.iter().filter_map(|a| a.subdirs_found).sum(), - failed_accesses: accesses.iter().filter(|a| a.error.is_some()).count(), - } - } - - fn cleanup_old_history(&self, state: &mut LoopDetectionState, now: DateTime) { - let time_window = chrono::Duration::seconds(state.config.time_window_secs as i64); - let cutoff_time = now - time_window; - - // Remove old access history - while let Some(access) = state.access_history.front() { - if access.started_at < cutoff_time { - state.access_history.pop_front(); - } else { - break; - } - } - - // Clean up pattern tracking if we're tracking too many directories - if state.access_patterns.len() > state.config.max_tracked_directories { - let excess = state.access_patterns.len() - state.config.max_tracked_directories; - let paths_to_remove: Vec = state.access_patterns - .keys() - .take(excess) - .cloned() - .collect(); - - for path in paths_to_remove { - state.access_patterns.remove(&path); - } - } - - // Clean up last access times - let paths_to_remove: Vec = state.last_access_times - .iter() - .filter(|(_, &time)| time < cutoff_time) - .map(|(path, _)| path.clone()) - .collect(); - - for path in paths_to_remove { - state.last_access_times.remove(&path); - } - } - - fn log_loop_detection(&self, result: &LoopDetectionResult) { - let log_level = "warn"; // Default to warn level for production safety - - let severity_prefix = if result.is_critical { "🚨 CRITICAL" } else { "⚠️ WARNING" }; - - let message = format!( - "{} - Sync Loop Detected\n│ Type: {:?}\n│ Directory: '{}'\n│ Issue: {}\n│ Pattern: {}\n│ Action needed: {}", - severity_prefix, - result.loop_type.as_ref().unwrap_or(&LoopType::FrequentReAccess), - result.problem_path.as_ref().unwrap_or(&"unknown".to_string()), - result.description, - result.access_pattern.join(" → "), - result.suggested_actions.first().unwrap_or(&"Review sync configuration".to_string()) - ); - - if result.is_critical { - error!("{}", message); - } else { - match log_level { - "error" => error!("{}", message), - "warn" => warn!("{}", message), - "info" => info!("{}", message), - "debug" => debug!("{}", message), - _ => warn!("{}", message), - } - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - use std::thread; - - #[tokio::test] - async fn test_immediate_rescan_detection() { - let service = LoopDetectionService::new(); - - // First access should succeed - let access1 = service.start_access("/test", "scan").await.unwrap(); - service.complete_access(access1, Some(5), Some(2), None).await.unwrap(); - - // Immediate second access should fail - let result = service.start_access("/test", "scan").await; - assert!(result.is_err()); - assert!(result.unwrap_err().to_string().contains("re-accessed after only")); - } - - #[tokio::test] - async fn test_concurrent_access_detection() { - let service = LoopDetectionService::new(); - - // Start first access - let _access1 = service.start_access("/test", "scan").await.unwrap(); - - // Second concurrent access should fail - let result = service.start_access("/test", "scan").await; - assert!(result.is_err()); - assert!(result.unwrap_err().to_string().contains("simultaneous scans detected")); - } - - #[tokio::test] - async fn test_frequency_detection() { - let mut config = LoopDetectionConfig::default(); - config.max_access_count = 2; - config.min_scan_interval_secs = 0; // Disable immediate re-scan check - let service = LoopDetectionService::with_config(config); - - // Do multiple accesses that complete quickly - for i in 0..3 { - let access = service.start_access("/test", "scan").await.unwrap(); - service.complete_access(access, Some(i), Some(1), None).await.unwrap(); - tokio::time::sleep(Duration::from_millis(100)).await; // Small delay - } - - // Next access should trigger frequency detection - let result = service.start_access("/test", "scan").await; - assert!(result.is_err()); - assert!(result.unwrap_err().to_string().contains("scanned 2 times")); - } - - #[tokio::test] - async fn test_metrics_collection() { - let service = LoopDetectionService::new(); - - let access = service.start_access("/test", "scan").await.unwrap(); - service.complete_access(access, Some(10), Some(3), None).await.unwrap(); - - let metrics = service.get_metrics().await.unwrap(); - assert_eq!(metrics["total_accesses"], 1); - assert_eq!(metrics["active_accesses"], 0); - assert!(metrics["enabled"].as_bool().unwrap()); - } - - #[tokio::test] - async fn test_disabled_service() { - let mut config = LoopDetectionConfig::default(); - config.enabled = false; - let service = LoopDetectionService::with_config(config); - - // Should not detect any loops when disabled - let access1 = service.start_access("/test", "scan").await.unwrap(); - service.complete_access(access1, Some(5), Some(2), None).await.unwrap(); - - let access2 = service.start_access("/test", "scan").await.unwrap(); - service.complete_access(access2, Some(5), Some(2), None).await.unwrap(); - - let metrics = service.get_metrics().await.unwrap(); - assert!(!metrics["enabled"].as_bool().unwrap()); - } - - #[tokio::test] - async fn test_circuit_breaker() { - let mut config = LoopDetectionConfig::default(); - config.circuit_breaker_failure_threshold = 2; - let service = LoopDetectionService::with_config(config); - - // Simulate circuit breaker by triggering concurrent access errors - let _access1 = service.start_access("/test", "scan").await.unwrap(); - - // Should fail with concurrent access - let _result1 = service.start_access("/test", "scan").await; - let _result2 = service.start_access("/test", "scan").await; - - let metrics = service.get_metrics().await.unwrap(); - assert!(metrics["circuit_breaker"]["failures"].as_u64().unwrap() > 0); - } - - #[tokio::test] - async fn test_graceful_degradation() { - let mut config = LoopDetectionConfig::default(); - config.enable_graceful_degradation = true; - config.mutex_timeout_ms = 1; // Very short timeout to trigger degradation - let service = LoopDetectionService::with_config(config); - - // This should not panic even with very short timeout - let result = service.start_access("/test", "scan").await; - assert!(result.is_ok()); // Should succeed with graceful degradation - } -} - -/// Separate configuration module to decouple from WebDAV config -pub mod config { - use super::LoopDetectionConfig; - - /// Create a loop detection config optimized for production use - pub fn production_config() -> LoopDetectionConfig { - LoopDetectionConfig { - enabled: true, - max_access_count: 3, - time_window_secs: 300, // 5 minutes - max_scan_duration_secs: 120, // 2 minutes for large directories - min_scan_interval_secs: 10, // Longer interval for production - max_pattern_depth: 5, // Reduced depth for better performance - max_tracked_directories: 500, // Conservative limit - enable_pattern_analysis: true, - log_level: "warn".to_string(), - circuit_breaker_failure_threshold: 3, - circuit_breaker_timeout_secs: 300, // 5 minutes - enable_graceful_degradation: true, - mutex_timeout_ms: 200, // 200ms timeout - } - } - - /// Create a loop detection config optimized for development/testing - pub fn development_config() -> LoopDetectionConfig { - LoopDetectionConfig { - enabled: true, - max_access_count: 5, // More lenient for dev - time_window_secs: 180, // 3 minutes - max_scan_duration_secs: 60, - min_scan_interval_secs: 2, // Shorter for faster development - max_pattern_depth: 10, - max_tracked_directories: 100, - enable_pattern_analysis: true, - log_level: "debug".to_string(), - circuit_breaker_failure_threshold: 5, - circuit_breaker_timeout_secs: 60, - enable_graceful_degradation: true, - mutex_timeout_ms: 500, // Longer timeout for debugging - } - } - - /// Create a minimal config that disables most detection for performance - pub fn minimal_config() -> LoopDetectionConfig { - LoopDetectionConfig { - enabled: true, - max_access_count: 10, // Very lenient - time_window_secs: 600, // 10 minutes - max_scan_duration_secs: 300, // 5 minutes - min_scan_interval_secs: 1, - max_pattern_depth: 3, - max_tracked_directories: 50, - enable_pattern_analysis: false, // Disabled for performance - log_level: "error".to_string(), // Only log errors - circuit_breaker_failure_threshold: 10, - circuit_breaker_timeout_secs: 600, - enable_graceful_degradation: true, - mutex_timeout_ms: 50, // Very short timeout - } - } -} \ No newline at end of file diff --git a/src/services/webdav/loop_detection_integration_tests.rs b/src/services/webdav/loop_detection_integration_tests.rs deleted file mode 100644 index 249fecf..0000000 --- a/src/services/webdav/loop_detection_integration_tests.rs +++ /dev/null @@ -1,323 +0,0 @@ -#[cfg(test)] -mod tests { - use super::super::*; - use super::super::loop_detection::{LoopDetectionService, LoopDetectionConfig, LoopType}; - use crate::{AppState, config::Config}; - use std::sync::Arc; - use std::time::Duration; - use tokio::time::sleep; - use uuid::Uuid; - - /// Helper to create a test WebDAV service with loop detection enabled - async fn create_test_webdav_service_with_loop_detection() -> WebDAVService { - let mut config = WebDAVConfig::new( - "http://localhost:8080".to_string(), - "test_user".to_string(), - "test_pass".to_string(), - vec!["/test".to_string()], - vec!["pdf".to_string(), "txt".to_string()], - ); - - // Configure loop detection with tight thresholds for testing - config.loop_detection = LoopDetectionConfig { - enabled: true, - max_access_count: 2, // Very low for testing - time_window_secs: 10, // Short window - max_scan_duration_secs: 5, // Short timeout - min_scan_interval_secs: 1, // Short interval - max_pattern_depth: 5, - max_tracked_directories: 100, - enable_pattern_analysis: true, - log_level: "debug".to_string(), - }; - - WebDAVService::new(config).expect("Failed to create WebDAV service") - } - - /// Helper to create a mock WebDAV server response for testing - fn create_mock_webdav_response(num_files: usize, num_dirs: usize) -> WebDAVDiscoveryResult { - let mut files = Vec::new(); - let mut directories = Vec::new(); - - for i in 0..num_files { - files.push(crate::models::FileIngestionInfo { - uuid: Uuid::new_v4(), - filename: format!("file_{}.pdf", i), - relative_path: format!("/test/file_{}.pdf", i), - absolute_url: format!("http://localhost:8080/test/file_{}.pdf", i), - file_size_bytes: 1024 * (i + 1) as i64, - last_modified: chrono::Utc::now(), - etag: format!("etag_{}", i), - content_type: "application/pdf".to_string(), - is_directory: false, - }); - } - - for i in 0..num_dirs { - directories.push(crate::models::FileIngestionInfo { - uuid: Uuid::new_v4(), - filename: format!("dir_{}", i), - relative_path: format!("/test/dir_{}", i), - absolute_url: format!("http://localhost:8080/test/dir_{}/", i), - file_size_bytes: 0, - last_modified: chrono::Utc::now(), - etag: format!("dir_etag_{}", i), - content_type: "httpd/unix-directory".to_string(), - is_directory: true, - }); - } - - WebDAVDiscoveryResult { files, directories } - } - - #[tokio::test] - async fn test_loop_detection_immediate_rescan() { - let service = create_test_webdav_service_with_loop_detection().await; - - // First access should succeed - let access1 = service.loop_detector.start_access("/test/path", "test_scan").unwrap(); - service.loop_detector.complete_access(access1, Some(5), Some(2), None).unwrap(); - - // Immediate second access should fail due to min_scan_interval - let result = service.loop_detector.start_access("/test/path", "test_scan"); - assert!(result.is_err()); - assert!(result.unwrap_err().to_string().contains("re-accessed after only")); - - // Metrics should show the loop detection - let metrics = service.get_loop_detection_metrics().unwrap(); - assert_eq!(metrics["total_loops_detected"], 1); - } - - #[tokio::test] - async fn test_loop_detection_concurrent_access() { - let service = create_test_webdav_service_with_loop_detection().await; - - // Start first access - let _access1 = service.loop_detector.start_access("/test/path", "scan1").unwrap(); - - // Concurrent access should fail - let result = service.loop_detector.start_access("/test/path", "scan2"); - assert!(result.is_err()); - assert!(result.unwrap_err().to_string().contains("Concurrent access detected")); - - let metrics = service.get_loop_detection_metrics().unwrap(); - assert_eq!(metrics["total_loops_detected"], 1); - } - - #[tokio::test] - async fn test_loop_detection_frequency_limit() { - let service = create_test_webdav_service_with_loop_detection().await; - - // Clear state to start fresh - service.clear_loop_detection_state().unwrap(); - - // Do multiple accesses that complete quickly - for i in 0..3 { - if i > 0 { - // Wait minimum interval to avoid immediate re-scan detection - sleep(Duration::from_millis(1100)).await; - } - - let access = service.loop_detector.start_access("/test/freq_path", &format!("scan_{}", i)); - - if i < 2 { - // First two should succeed - assert!(access.is_ok()); - let access_id = access.unwrap(); - service.loop_detector.complete_access(access_id, Some(i * 2), Some(i), None).unwrap(); - } else { - // Third should fail due to frequency limit - assert!(access.is_err()); - assert!(access.unwrap_err().to_string().contains("accessed 2 times")); - } - } - - let metrics = service.get_loop_detection_metrics().unwrap(); - assert_eq!(metrics["total_loops_detected"], 1); - } - - #[tokio::test] - async fn test_loop_detection_disabled() { - let mut config = WebDAVConfig::new( - "http://localhost:8080".to_string(), - "test_user".to_string(), - "test_pass".to_string(), - vec!["/test".to_string()], - vec!["pdf".to_string()], - ); - - // Disable loop detection - config.loop_detection.enabled = false; - - let service = WebDAVService::new(config).unwrap(); - - // Multiple rapid accesses should all succeed when disabled - for i in 0..5 { - let access = service.loop_detector.start_access("/test/path", &format!("scan_{}", i)).unwrap(); - service.loop_detector.complete_access(access, Some(i), Some(1), None).unwrap(); - } - - let metrics = service.get_loop_detection_metrics().unwrap(); - assert!(!metrics["enabled"].as_bool().unwrap()); - } - - #[tokio::test] - async fn test_loop_detection_error_tracking() { - let service = create_test_webdav_service_with_loop_detection().await; - - // Test error tracking in loop detection - let access = service.loop_detector.start_access("/test/error_path", "error_scan").unwrap(); - service.loop_detector.complete_access( - access, - None, - None, - Some("Test error message".to_string()) - ).unwrap(); - - let metrics = service.get_loop_detection_metrics().unwrap(); - assert_eq!(metrics["total_accesses"], 1); - assert_eq!(metrics["total_loops_detected"], 0); // No loops, just an error - } - - #[tokio::test] - async fn test_loop_detection_cleanup() { - let service = create_test_webdav_service_with_loop_detection().await; - - // Add some access data - for i in 0..3 { - let access = service.loop_detector.start_access(&format!("/test/cleanup_{}", i), "cleanup_scan").unwrap(); - service.loop_detector.complete_access(access, Some(i), Some(1), None).unwrap(); - sleep(Duration::from_millis(100)).await; // Small delay between accesses - } - - let metrics_before = service.get_loop_detection_metrics().unwrap(); - assert_eq!(metrics_before["total_accesses"], 3); - - // Clear state - service.clear_loop_detection_state().unwrap(); - - let metrics_after = service.get_loop_detection_metrics().unwrap(); - assert_eq!(metrics_after["active_accesses"], 0); - assert_eq!(metrics_after["history_size"], 0); - } - - #[tokio::test] - async fn test_loop_detection_config_update() { - let service = create_test_webdav_service_with_loop_detection().await; - - // Update configuration - let mut new_config = LoopDetectionConfig::default(); - new_config.max_access_count = 10; // Much higher limit - new_config.log_level = "info".to_string(); - - service.update_loop_detection_config(new_config).unwrap(); - - let metrics = service.get_loop_detection_metrics().unwrap(); - assert_eq!(metrics["config"]["max_access_count"], 10); - assert_eq!(metrics["config"]["log_level"], "info"); - } - - #[tokio::test] - async fn test_pattern_analysis_circular_detection() { - let service = create_test_webdav_service_with_loop_detection().await; - service.clear_loop_detection_state().unwrap(); - - // Simulate A -> B -> A pattern with proper timing - let paths = ["/test/path_a", "/test/path_b", "/test/path_a"]; - - for (i, path) in paths.iter().enumerate() { - if i > 0 { - sleep(Duration::from_millis(1100)).await; // Wait minimum interval - } - - let access = service.loop_detector.start_access(path, &format!("pattern_scan_{}", i)); - - if i < 2 { - // First two should succeed - assert!(access.is_ok()); - let access_id = access.unwrap(); - service.loop_detector.complete_access(access_id, Some(1), Some(0), None).unwrap(); - } else { - // Third access to path_a might trigger pattern detection - // Note: The exact behavior depends on the pattern detection algorithm - if let Err(e) = access { - println!("Pattern detection triggered: {}", e); - } else { - let access_id = access.unwrap(); - service.loop_detector.complete_access(access_id, Some(1), Some(0), None).unwrap(); - } - } - } - - let metrics = service.get_loop_detection_metrics().unwrap(); - println!("Pattern analysis metrics: {}", serde_json::to_string_pretty(&metrics).unwrap()); - } - - #[tokio::test] - async fn test_webdav_service_integration_with_loop_detection() { - // This test would ideally connect to a real WebDAV server - // For now, we test the integration points - let service = create_test_webdav_service_with_loop_detection().await; - - // Test that the service has loop detection enabled - let metrics = service.get_loop_detection_metrics().unwrap(); - assert!(metrics["enabled"].as_bool().unwrap()); - - // Test configuration access - assert_eq!(metrics["config"]["max_access_count"], 2); - assert_eq!(metrics["config"]["time_window_secs"], 10); - - // Test that we can update the config - let mut new_config = LoopDetectionConfig::default(); - new_config.enabled = false; - service.update_loop_detection_config(new_config).unwrap(); - - let updated_metrics = service.get_loop_detection_metrics().unwrap(); - assert!(!updated_metrics["enabled"].as_bool().unwrap()); - } - - /// Integration test with SmartSyncService - #[tokio::test] - async fn test_smart_sync_loop_detection_integration() { - // Create test app state - let test_config = Config::test_default(); - let app_state = Arc::new(AppState::new_for_testing(test_config).await.unwrap()); - - let smart_sync = SmartSyncService::new(app_state); - let webdav_service = create_test_webdav_service_with_loop_detection().await; - - // Test that SmartSyncService can access loop detection metrics - let metrics = smart_sync.get_loop_detection_metrics(&webdav_service).unwrap(); - assert!(metrics["enabled"].as_bool().unwrap()); - - // Test that metrics are properly structured - assert!(metrics.get("total_accesses").is_some()); - assert!(metrics.get("total_loops_detected").is_some()); - assert!(metrics.get("config").is_some()); - } - - /// Performance test to ensure loop detection doesn't significantly impact performance - #[tokio::test] - async fn test_loop_detection_performance() { - let service = create_test_webdav_service_with_loop_detection().await; - - let start_time = std::time::Instant::now(); - - // Perform many operations with different paths to avoid triggering detection - for i in 0..100 { - let path = format!("/test/perf_path_{}", i); - let access = service.loop_detector.start_access(&path, "perf_test").unwrap(); - service.loop_detector.complete_access(access, Some(10), Some(2), None).unwrap(); - } - - let elapsed = start_time.elapsed(); - println!("100 loop detection operations took: {:?}", elapsed); - - // Should complete quickly (within 1 second for 100 operations) - assert!(elapsed < Duration::from_secs(1), "Loop detection performance too slow: {:?}", elapsed); - - let metrics = service.get_loop_detection_metrics().unwrap(); - assert_eq!(metrics["total_accesses"], 100); - assert_eq!(metrics["total_loops_detected"], 0); - } -} \ No newline at end of file diff --git a/src/services/webdav/mod.rs b/src/services/webdav/mod.rs index 261dde3..dfddd79 100644 --- a/src/services/webdav/mod.rs +++ b/src/services/webdav/mod.rs @@ -5,7 +5,6 @@ pub mod config; pub mod service; pub mod smart_sync; pub mod progress_shim; // Backward compatibility shim for simplified progress tracking -pub mod loop_detection; // Loop detection and monitoring for sync operations // Re-export main types for convenience pub use common::build_user_agent; @@ -16,7 +15,6 @@ pub use service::{ ValidationRecommendation, ValidationAction, ValidationSummary }; pub use smart_sync::{SmartSyncService, SmartSyncDecision, SmartSyncStrategy, SmartSyncResult}; -pub use loop_detection::{LoopDetectionService, LoopDetectionConfig, LoopDetectionResult, LoopType}; // Backward compatibility exports for progress tracking (simplified) pub use progress_shim::{SyncProgress, SyncPhase, ProgressStats}; @@ -29,6 +27,4 @@ mod subdirectory_edge_cases_tests; #[cfg(test)] mod protocol_detection_tests; #[cfg(test)] -mod loop_detection_integration_tests; -#[cfg(test)] mod tests; \ No newline at end of file diff --git a/src/services/webdav/service.rs b/src/services/webdav/service.rs index 7e339b9..c880334 100644 --- a/src/services/webdav/service.rs +++ b/src/services/webdav/service.rs @@ -23,7 +23,6 @@ use crate::mime_detection::{detect_mime_from_content, MimeDetectionResult}; use super::{config::{WebDAVConfig, RetryConfig, ConcurrencyConfig}, SyncProgress}; use super::common::build_user_agent; -use super::loop_detection::LoopDetectionService; /// Results from WebDAV discovery including both files and directories #[derive(Debug, Clone)] @@ -154,8 +153,6 @@ pub struct WebDAVService { download_semaphore: Arc, /// Stores the working protocol (updated after successful protocol detection) working_protocol: Arc>>, - /// Loop detection service for monitoring sync patterns - loop_detector: LoopDetectionService, } impl WebDAVService { @@ -187,9 +184,6 @@ impl WebDAVService { let scan_semaphore = Arc::new(Semaphore::new(concurrency_config.max_concurrent_scans)); let download_semaphore = Arc::new(Semaphore::new(concurrency_config.max_concurrent_downloads)); - // Create loop detector with config from WebDAV config - let loop_detector = LoopDetectionService::with_config(config.loop_detection.clone()); - Ok(Self { client, config, @@ -198,24 +192,9 @@ impl WebDAVService { scan_semaphore, download_semaphore, working_protocol: Arc::new(std::sync::RwLock::new(None)), - loop_detector, }) } - /// Get loop detection metrics and status - pub async fn get_loop_detection_metrics(&self) -> Result { - self.loop_detector.get_metrics().await - } - - /// Update loop detection configuration - pub async fn update_loop_detection_config(&self, config: super::loop_detection::LoopDetectionConfig) -> Result<()> { - self.loop_detector.update_config(config).await - } - - /// Clear loop detection state (useful for testing) - pub async fn clear_loop_detection_state(&self) -> Result<()> { - self.loop_detector.clear_state().await - } // ============================================================================ // Protocol Detection Methods @@ -304,7 +283,6 @@ impl WebDAVService { file_extensions: self.config.file_extensions.clone(), timeout_seconds: self.config.timeout_seconds, server_type: self.config.server_type.clone(), - loop_detection: self.config.loop_detection.clone(), }; // Test basic OPTIONS request @@ -433,7 +411,6 @@ impl WebDAVService { file_extensions: vec![], timeout_seconds: 30, server_type: test_config.server_type.clone(), - loop_detection: super::loop_detection::LoopDetectionConfig::default(), }; let service = Self::new(config)?; @@ -452,7 +429,6 @@ impl WebDAVService { file_extensions: self.config.file_extensions.clone(), timeout_seconds: self.config.timeout_seconds, server_type: self.config.server_type.clone(), - loop_detection: self.config.loop_detection.clone(), }; let webdav_url = temp_config.webdav_url(); @@ -846,7 +822,6 @@ impl WebDAVService { file_extensions: self.config.file_extensions.clone(), timeout_seconds: self.config.timeout_seconds, server_type: self.config.server_type.clone(), - loop_detection: self.config.loop_detection.clone(), }; let base_url = temp_config.webdav_url(); let clean_path = path.trim_start_matches('/'); @@ -918,7 +893,6 @@ impl WebDAVService { file_extensions: self.config.file_extensions.clone(), timeout_seconds: self.config.timeout_seconds, server_type: self.config.server_type.clone(), - loop_detection: self.config.loop_detection.clone(), }; let base_url = temp_config.webdav_url(); @@ -1147,15 +1121,6 @@ impl WebDAVService { /// Discovers both files and directories in a single directory async fn discover_files_and_directories_single(&self, directory_path: &str) -> Result { - // Start loop detection tracking with graceful degradation - let access_id = match self.loop_detector.start_access(directory_path, "single_discovery").await { - Ok(id) => id, - Err(e) => { - // Log the loop detection error but continue with sync - warn!("Loop detection failed for '{}': {} - continuing sync without detection", directory_path, e); - uuid::Uuid::new_v4() // Use dummy ID to continue - } - }; let result = async { // Try the primary URL first, then fallback URLs if we get a 405 error @@ -1173,30 +1138,6 @@ impl WebDAVService { } }.await; - // Complete loop detection tracking with graceful degradation - match &result { - Ok(discovery) => { - if let Err(e) = self.loop_detector.complete_access( - access_id, - Some(discovery.files.len()), - Some(discovery.directories.len()), - None - ).await { - debug!("Loop detection completion failed for '{}': {} - sync completed successfully", directory_path, e); - } - } - Err(e) => { - if let Err(completion_err) = self.loop_detector.complete_access( - access_id, - None, - None, - Some(e.to_string()) - ).await { - debug!("Loop detection completion failed for '{}': {} - original error: {}", directory_path, completion_err, e); - } - } - } - result } @@ -1301,15 +1242,6 @@ impl WebDAVService { /// Discovers files and directories recursively async fn discover_files_and_directories_recursive(&self, directory_path: &str) -> Result { - // Start loop detection tracking for the root directory with graceful degradation - let access_id = match self.loop_detector.start_access(directory_path, "recursive_discovery").await { - Ok(id) => id, - Err(e) => { - // Log the loop detection error but continue with sync - warn!("Loop detection failed for recursive discovery '{}': {} - continuing sync without detection", directory_path, e); - uuid::Uuid::new_v4() // Use dummy ID to continue - } - }; let mut all_files = Vec::new(); let mut all_directories = Vec::new(); @@ -1381,16 +1313,6 @@ impl WebDAVService { info!("Recursive scan completed. Found {} files and {} directories", all_files.len(), all_directories.len()); - // Complete loop detection tracking with graceful degradation - if let Err(e) = self.loop_detector.complete_access( - access_id, - Some(all_files.len()), - Some(all_directories.len()), - None - ).await { - debug!("Loop detection completion failed for recursive discovery '{}': {} - sync completed successfully", directory_path, e); - } - Ok(WebDAVDiscoveryResult { files: all_files, directories: all_directories @@ -2323,7 +2245,6 @@ impl WebDAVService { file_extensions: self.config.file_extensions.clone(), timeout_seconds: self.config.timeout_seconds, server_type: self.config.server_type.clone(), - loop_detection: self.config.loop_detection.clone(), }; let options_response = self.authenticated_request( @@ -2662,7 +2583,6 @@ impl Clone for WebDAVService { scan_semaphore: Arc::clone(&self.scan_semaphore), download_semaphore: Arc::clone(&self.download_semaphore), working_protocol: Arc::clone(&self.working_protocol), - loop_detector: self.loop_detector.clone(), } } } @@ -2692,7 +2612,6 @@ mod tests { file_extensions: vec![], timeout_seconds: 30, server_type: Some("generic".to_string()), - loop_detection: super::loop_detection::LoopDetectionConfig::default(), }; let service = WebDAVService::new(config).expect("Failed to create WebDAV service"); @@ -2718,7 +2637,6 @@ mod tests { file_extensions: vec![], timeout_seconds: 30, server_type: Some("generic".to_string()), - loop_detection: super::loop_detection::LoopDetectionConfig::default(), }; let retry_config = RetryConfig { diff --git a/src/services/webdav/smart_sync.rs b/src/services/webdav/smart_sync.rs index 6bdf90a..38cc188 100644 --- a/src/services/webdav/smart_sync.rs +++ b/src/services/webdav/smart_sync.rs @@ -59,9 +59,12 @@ impl SmartSyncService { &self.state } - /// Get loop detection metrics from the WebDAV service - pub async fn get_loop_detection_metrics(&self, webdav_service: &WebDAVService) -> Result { - webdav_service.get_loop_detection_metrics().await + /// Get sync metrics (simplified version without loop detection) + pub async fn get_sync_metrics(&self, _webdav_service: &WebDAVService) -> Result { + Ok(serde_json::json!({ + "loop_detection_removed": true, + "message": "Loop detection has been replaced with database-based failure tracking" + })) } /// Evaluates whether sync is needed and determines the best strategy diff --git a/tests/stress/webdav_loop_detection_stress.rs b/tests/stress/webdav_loop_detection_stress.rs deleted file mode 100644 index 5a080b4..0000000 --- a/tests/stress/webdav_loop_detection_stress.rs +++ /dev/null @@ -1,562 +0,0 @@ -/*! - * WebDAV Loop Detection Stress Test - * - * This stress test exercises the actual WebDAV sync functionality with loop detection enabled. - * It creates scenarios that could cause loops and verifies that they are properly detected - * and reported by the instrumented sync code. - */ - -use std::sync::Arc; -use std::time::{Duration, Instant}; -use std::collections::HashMap; -use tokio::time::sleep; -use anyhow::{Result, Context}; -use tracing::{info, warn, error, debug}; -use uuid::Uuid; -use serde_json::{json, Value}; - -use readur::services::webdav::{ - WebDAVService, WebDAVConfig, SmartSyncService, - LoopDetectionConfig, LoopType -}; -use readur::{AppState, config::Config}; - -/// Configuration for the stress test -#[derive(Debug, Clone)] -pub struct StressTestConfig { - /// Duration to run the stress test - pub duration_secs: u64, - /// WebDAV server URL for testing - pub webdav_url: String, - /// WebDAV username - pub username: String, - /// WebDAV password - pub password: String, - /// Number of concurrent sync operations - pub concurrent_syncs: usize, - /// Directories to test - pub test_directories: Vec, - /// Whether to intentionally trigger loops for testing - pub trigger_test_loops: bool, - /// Loop detection timeout - pub loop_detection_timeout_secs: u64, -} - -impl Default for StressTestConfig { - fn default() -> Self { - Self { - duration_secs: std::env::var("STRESS_TEST_DURATION") - .unwrap_or_else(|_| "300".to_string()) - .parse() - .unwrap_or(300), - webdav_url: std::env::var("WEBDAV_DUFS_URL") - .unwrap_or_else(|_| "http://localhost:8080".to_string()), - username: std::env::var("WEBDAV_USERNAME") - .unwrap_or_else(|_| "webdav_user".to_string()), - password: std::env::var("WEBDAV_PASSWORD") - .unwrap_or_else(|_| "webdav_pass".to_string()), - concurrent_syncs: std::env::var("CONCURRENT_SYNCS") - .unwrap_or_else(|_| "4".to_string()) - .parse() - .unwrap_or(4), - test_directories: vec![ - "/stress_test".to_string(), - "/stress_test/nested".to_string(), - "/stress_test/deep/structure".to_string(), - "/stress_test/complex".to_string(), - ], - trigger_test_loops: std::env::var("TRIGGER_TEST_LOOPS") - .unwrap_or_else(|_| "true".to_string()) - .parse() - .unwrap_or(true), - loop_detection_timeout_secs: std::env::var("LOOP_DETECTION_TIMEOUT") - .unwrap_or_else(|_| "60".to_string()) - .parse() - .unwrap_or(60), - } - } -} - -/// Metrics collected during stress testing -#[derive(Debug, Clone)] -pub struct StressTestMetrics { - pub total_sync_operations: u64, - pub successful_syncs: u64, - pub failed_syncs: u64, - pub loops_detected: u64, - pub avg_sync_duration_ms: f64, - pub max_sync_duration_ms: u64, - pub min_sync_duration_ms: u64, - pub files_discovered: u64, - pub directories_discovered: u64, - pub errors_by_type: HashMap, - pub loop_types_detected: HashMap, -} - -impl Default for StressTestMetrics { - fn default() -> Self { - Self { - total_sync_operations: 0, - successful_syncs: 0, - failed_syncs: 0, - loops_detected: 0, - avg_sync_duration_ms: 0.0, - max_sync_duration_ms: 0, - min_sync_duration_ms: u64::MAX, - files_discovered: 0, - directories_discovered: 0, - errors_by_type: HashMap::new(), - loop_types_detected: HashMap::new(), - } - } -} - -/// Main stress test runner -pub struct WebDAVLoopDetectionStressTest { - config: StressTestConfig, - metrics: Arc>, -} - -impl WebDAVLoopDetectionStressTest { - pub fn new(config: StressTestConfig) -> Self { - Self { - config, - metrics: Arc::new(tokio::sync::Mutex::new(StressTestMetrics::default())), - } - } - - /// Create a WebDAV service with loop detection configured for stress testing - fn create_webdav_service(&self) -> Result { - let mut webdav_config = WebDAVConfig::new( - self.config.webdav_url.clone(), - self.config.username.clone(), - self.config.password.clone(), - self.config.test_directories.clone(), - vec!["pdf".to_string(), "txt".to_string(), "doc".to_string(), "docx".to_string()], - ); - - // Configure loop detection for stress testing - webdav_config.loop_detection = LoopDetectionConfig { - enabled: true, - max_access_count: 5, // Reasonable limit for stress testing - time_window_secs: 60, // 1-minute window - max_scan_duration_secs: self.config.loop_detection_timeout_secs, - min_scan_interval_secs: 2, // 2-second minimum interval - max_pattern_depth: 10, - max_tracked_directories: 1000, - enable_pattern_analysis: true, - log_level: "warn".to_string(), // Reduce log noise during stress test - }; - - WebDAVService::new(webdav_config) - .context("Failed to create WebDAV service for stress testing") - } - - /// Run the main stress test - pub async fn run(&self) -> Result { - info!("🚀 Starting WebDAV Loop Detection Stress Test"); - info!("Configuration: {:?}", self.config); - - let start_time = Instant::now(); - let end_time = start_time + Duration::from_secs(self.config.duration_secs); - - // Create WebDAV services for concurrent testing - let mut webdav_services = Vec::new(); - for i in 0..self.config.concurrent_syncs { - match self.create_webdav_service() { - Ok(service) => { - info!("✅ Created WebDAV service #{}", i); - webdav_services.push(service); - } - Err(e) => { - error!("❌ Failed to create WebDAV service #{}: {}", i, e); - return Err(e); - } - } - } - - // Create app state for SmartSyncService - let test_config = Config::test_default(); - let app_state = Arc::new(AppState::new_for_testing(test_config).await - .context("Failed to create app state for testing")?); - - let smart_sync_service = SmartSyncService::new(app_state.clone()); - - info!("🏁 Starting stress test operations..."); - - // Launch concurrent sync operations - let mut handles = Vec::new(); - - for (service_id, webdav_service) in webdav_services.into_iter().enumerate() { - let service = Arc::new(webdav_service); - let smart_sync = smart_sync_service.clone(); - let config = self.config.clone(); - let metrics = Arc::clone(&self.metrics); - - let handle = tokio::spawn(async move { - Self::run_sync_operations( - service_id, - service, - smart_sync, - config, - metrics, - end_time - ).await - }); - - handles.push(handle); - } - - // Wait for all operations to complete - for (i, handle) in handles.into_iter().enumerate() { - match handle.await { - Ok(result) => { - if let Err(e) = result { - warn!("Sync operation #{} completed with error: {}", i, e); - } else { - info!("✅ Sync operation #{} completed successfully", i); - } - } - Err(e) => { - error!("❌ Failed to join sync operation #{}: {}", i, e); - } - } - } - - let total_duration = start_time.elapsed(); - info!("🏁 Stress test completed in {:.2}s", total_duration.as_secs_f64()); - - // Generate final metrics - let final_metrics = self.generate_final_metrics().await; - self.print_stress_test_report(&final_metrics, total_duration); - - Ok(final_metrics) - } - - /// Run sync operations for a single WebDAV service - async fn run_sync_operations( - service_id: usize, - webdav_service: Arc, - smart_sync_service: SmartSyncService, - config: StressTestConfig, - metrics: Arc>, - end_time: Instant, - ) -> Result<()> { - let user_id = Uuid::new_v4(); - let mut operation_count = 0; - - info!("🔄 Service #{} starting sync operations", service_id); - - while Instant::now() < end_time { - operation_count += 1; - let op_start = Instant::now(); - - // Randomly select a directory to sync - let dir_index = operation_count % config.test_directories.len(); - let target_directory = &config.test_directories[dir_index]; - - debug!("Service #{} operation #{}: syncing {}", service_id, operation_count, target_directory); - - // Perform sync operation with loop detection - let sync_result = Self::perform_monitored_sync( - &*webdav_service, - &smart_sync_service, - user_id, - target_directory, - operation_count, - ).await; - - let op_duration = op_start.elapsed(); - - // Update metrics - Self::update_metrics( - &metrics, - &sync_result, - op_duration, - &*webdav_service, - ).await; - - // If we're testing loop triggers, occasionally create conditions that might cause loops - if config.trigger_test_loops && operation_count % 10 == 0 { - Self::trigger_test_loop_scenario(&*webdav_service, target_directory).await; - } - - // Brief pause between operations to avoid overwhelming the server - sleep(Duration::from_millis(100 + (service_id * 50) as u64)).await; - } - - info!("📊 Service #{} completed {} operations", service_id, operation_count); - Ok(()) - } - - /// Perform a single sync operation with comprehensive monitoring - async fn perform_monitored_sync( - webdav_service: &WebDAVService, - smart_sync_service: &SmartSyncService, - user_id: Uuid, - directory: &str, - operation_id: usize, - ) -> Result<(usize, usize)> { - // First evaluate if sync is needed - match smart_sync_service.evaluate_sync_need( - user_id, - webdav_service, - directory, - None, // No progress tracking for stress test - ).await { - Ok(decision) => { - match decision { - readur::services::webdav::SmartSyncDecision::SkipSync => { - debug!("Operation #{}: Sync skipped for {}", operation_id, directory); - Ok((0, 0)) - } - readur::services::webdav::SmartSyncDecision::RequiresSync(strategy) => { - // Perform the actual sync - match smart_sync_service.perform_smart_sync( - user_id, - None, // No source ID for stress test - webdav_service, - directory, - strategy, - None, // No progress tracking - ).await { - Ok(result) => Ok((result.files.len(), result.directories.len())), - Err(e) => { - if e.to_string().contains("Loop detected") { - debug!("Operation #{}: Loop detected for {} - {}", operation_id, directory, e); - Err(e) - } else { - warn!("Operation #{}: Sync failed for {} - {}", operation_id, directory, e); - Err(e) - } - } - } - } - } - } - Err(e) => { - warn!("Operation #{}: Sync evaluation failed for {} - {}", operation_id, directory, e); - Err(e) - } - } - } - - /// Trigger test scenarios that might cause loops (for testing purposes) - async fn trigger_test_loop_scenario(webdav_service: &WebDAVService, directory: &str) { - debug!("🧪 Triggering test loop scenario for {}", directory); - - // Rapid repeated access to the same directory - for i in 0..3 { - match webdav_service.discover_files_and_directories(directory, false).await { - Ok(_) => debug!("Test loop trigger #{} succeeded for {}", i, directory), - Err(e) => { - if e.to_string().contains("Loop detected") { - debug!("✅ Test loop scenario successfully triggered loop detection: {}", e); - return; - } else { - debug!("Test loop trigger #{} failed for {}: {}", i, directory, e); - } - } - } - - // Very short delay to trigger immediate re-scan detection - sleep(Duration::from_millis(100)).await; - } - } - - /// Update metrics based on sync operation result - async fn update_metrics( - metrics: &Arc>, - sync_result: &Result<(usize, usize)>, - duration: Duration, - webdav_service: &WebDAVService, - ) { - let mut m = metrics.lock().await; - m.total_sync_operations += 1; - - let duration_ms = duration.as_millis() as u64; - m.max_sync_duration_ms = m.max_sync_duration_ms.max(duration_ms); - m.min_sync_duration_ms = m.min_sync_duration_ms.min(duration_ms); - - // Update average duration - let total_duration = m.avg_sync_duration_ms * (m.total_sync_operations - 1) as f64; - m.avg_sync_duration_ms = (total_duration + duration_ms as f64) / m.total_sync_operations as f64; - - match sync_result { - Ok((files, dirs)) => { - m.successful_syncs += 1; - m.files_discovered += *files as u64; - m.directories_discovered += *dirs as u64; - } - Err(e) => { - m.failed_syncs += 1; - - let error_msg = e.to_string(); - if error_msg.contains("Loop detected") { - m.loops_detected += 1; - - // Classify loop types - if error_msg.contains("re-accessed after only") { - *m.loop_types_detected.entry("ImmediateReScan".to_string()).or_insert(0) += 1; - } else if error_msg.contains("Concurrent access detected") { - *m.loop_types_detected.entry("ConcurrentAccess".to_string()).or_insert(0) += 1; - } else if error_msg.contains("accessed") && error_msg.contains("times") { - *m.loop_types_detected.entry("FrequentReAccess".to_string()).or_insert(0) += 1; - } else if error_msg.contains("stuck") { - *m.loop_types_detected.entry("StuckScan".to_string()).or_insert(0) += 1; - } else if error_msg.contains("Circular pattern") { - *m.loop_types_detected.entry("CircularPattern".to_string()).or_insert(0) += 1; - } else { - *m.loop_types_detected.entry("Other".to_string()).or_insert(0) += 1; - } - } else { - // Classify other error types - let error_type = if error_msg.contains("timeout") { - "Timeout" - } else if error_msg.contains("connection") { - "Connection" - } else if error_msg.contains("404") || error_msg.contains("Not Found") { - "NotFound" - } else if error_msg.contains("403") || error_msg.contains("Forbidden") { - "Forbidden" - } else if error_msg.contains("500") || error_msg.contains("Internal Server Error") { - "ServerError" - } else { - "Unknown" - }; - - *m.errors_by_type.entry(error_type.to_string()).or_insert(0) += 1; - } - } - } - - // Collect loop detection metrics from the WebDAV service - if let Ok(ld_metrics) = webdav_service.get_loop_detection_metrics() { - if let Some(total_loops) = ld_metrics.get("total_loops_detected") { - if let Some(loops) = total_loops.as_u64() { - // Update our metrics with the actual count from loop detector - m.loops_detected = m.loops_detected.max(loops); - } - } - } - } - - /// Generate final comprehensive metrics - async fn generate_final_metrics(&self) -> StressTestMetrics { - self.metrics.lock().await.clone() - } - - /// Print a comprehensive stress test report - fn print_stress_test_report(&self, metrics: &StressTestMetrics, total_duration: Duration) { - println!("\n" + "=".repeat(80).as_str()); - println!("📊 WEBDAV LOOP DETECTION STRESS TEST REPORT"); - println!("=".repeat(80)); - - println!("\n🕒 Test Duration: {:.2}s", total_duration.as_secs_f64()); - println!("🔄 Total Sync Operations: {}", metrics.total_sync_operations); - println!("✅ Successful Syncs: {} ({:.1}%)", - metrics.successful_syncs, - metrics.successful_syncs as f64 / metrics.total_sync_operations as f64 * 100.0); - println!("❌ Failed Syncs: {} ({:.1}%)", - metrics.failed_syncs, - metrics.failed_syncs as f64 / metrics.total_sync_operations as f64 * 100.0); - - println!("\n🔄 Loop Detection Results:"); - println!(" 🚨 Loops Detected: {} ({:.1}%)", - metrics.loops_detected, - metrics.loops_detected as f64 / metrics.total_sync_operations as f64 * 100.0); - - if !metrics.loop_types_detected.is_empty() { - println!(" 📊 Loop Types Detected:"); - for (loop_type, count) in &metrics.loop_types_detected { - println!(" - {}: {}", loop_type, count); - } - } - - println!("\n⚡ Performance Metrics:"); - println!(" 📈 Average Sync Duration: {:.2}ms", metrics.avg_sync_duration_ms); - println!(" 🏃 Fastest Sync: {}ms", metrics.min_sync_duration_ms); - println!(" 🐌 Slowest Sync: {}ms", metrics.max_sync_duration_ms); - println!(" 🏁 Operations per Second: {:.2}", - metrics.total_sync_operations as f64 / total_duration.as_secs_f64()); - - println!("\n📁 Discovery Results:"); - println!(" 📄 Files Discovered: {}", metrics.files_discovered); - println!(" 📂 Directories Discovered: {}", metrics.directories_discovered); - - if !metrics.errors_by_type.is_empty() { - println!("\n❌ Error Breakdown:"); - for (error_type, count) in &metrics.errors_by_type { - println!(" - {}: {} ({:.1}%)", - error_type, count, - *count as f64 / metrics.failed_syncs as f64 * 100.0); - } - } - - println!("\n" + "=".repeat(80).as_str()); - - // Generate JSON report for CI/CD - let report = json!({ - "test_type": "webdav_loop_detection_stress", - "duration_secs": total_duration.as_secs_f64(), - "total_operations": metrics.total_sync_operations, - "successful_operations": metrics.successful_syncs, - "failed_operations": metrics.failed_syncs, - "success_rate": metrics.successful_syncs as f64 / metrics.total_sync_operations as f64 * 100.0, - "loops_detected": metrics.loops_detected, - "loop_detection_rate": metrics.loops_detected as f64 / metrics.total_sync_operations as f64 * 100.0, - "avg_duration_ms": metrics.avg_sync_duration_ms, - "min_duration_ms": metrics.min_sync_duration_ms, - "max_duration_ms": metrics.max_sync_duration_ms, - "ops_per_second": metrics.total_sync_operations as f64 / total_duration.as_secs_f64(), - "files_discovered": metrics.files_discovered, - "directories_discovered": metrics.directories_discovered, - "loop_types": metrics.loop_types_detected, - "error_types": metrics.errors_by_type, - }); - - // Write JSON report for CI/CD consumption - if let Ok(report_dir) = std::env::var("STRESS_RESULTS_DIR") { - let report_path = format!("{}/webdav_loop_detection_report.json", report_dir); - if let Err(e) = std::fs::write(&report_path, serde_json::to_string_pretty(&report).unwrap()) { - warn!("Failed to write JSON report to {}: {}", report_path, e); - } else { - info!("📋 JSON report written to {}", report_path); - } - } - } -} - -/// Main entry point for the stress test -#[tokio::main] -async fn main() -> Result<()> { - // Initialize tracing - tracing_subscriber::fmt() - .with_env_filter( - std::env::var("RUST_LOG").unwrap_or_else(|_| "info,webdav_loop_detection_stress=debug".to_string()) - ) - .init(); - - let config = StressTestConfig::default(); - let stress_test = WebDAVLoopDetectionStressTest::new(config); - - let metrics = stress_test.run().await - .context("Stress test failed")?; - - // Exit with error code if too many loops were detected (indicating a problem) - let loop_rate = metrics.loops_detected as f64 / metrics.total_sync_operations as f64 * 100.0; - if loop_rate > 50.0 { - error!("🚨 CRITICAL: Loop detection rate ({:.1}%) exceeds threshold (50%)", loop_rate); - std::process::exit(1); - } - - // Exit with error code if success rate is too low - let success_rate = metrics.successful_syncs as f64 / metrics.total_sync_operations as f64 * 100.0; - if success_rate < 70.0 { - error!("🚨 CRITICAL: Success rate ({:.1}%) below threshold (70%)", success_rate); - std::process::exit(1); - } - - info!("🎉 Stress test completed successfully!"); - Ok(()) -} \ No newline at end of file