feat(webdav): get rid of complex loop detection

This commit is contained in:
perf3ct 2025-09-09 02:11:57 +00:00
parent 88c376f655
commit aa5bd77753
No known key found for this signature in database
GPG Key ID: 569C4EEC436F5232
15 changed files with 101 additions and 2259 deletions

View File

@ -15,9 +15,6 @@ path = "src/bin/test_runner.rs"
name = "analyze-webdav-performance"
path = "src/bin/analyze-webdav-performance.rs"
[[bin]]
name = "webdav_loop_detection_stress"
path = "tests/stress/webdav_loop_detection_stress.rs"

View File

@ -95,7 +95,6 @@ async fn estimate_webdav_crawl_internal(
file_extensions: config.file_extensions.clone(),
timeout_seconds: 300,
server_type: config.server_type.clone(),
loop_detection: crate::services::webdav::loop_detection::LoopDetectionConfig::default(),
};
// Create WebDAV service and estimate crawl

View File

@ -240,7 +240,6 @@ pub async fn trigger_deep_scan(
file_extensions: config.file_extensions.clone(),
timeout_seconds: 600, // 10 minutes for deep scan
server_type: config.server_type.clone(),
loop_detection: crate::services::webdav::LoopDetectionConfig::default(),
};
let webdav_service = crate::services::webdav::WebDAVService::new(webdav_config.clone())

View File

@ -72,7 +72,6 @@ async fn get_user_webdav_config(state: &Arc<AppState>, user_id: uuid::Uuid) -> R
file_extensions: settings.webdav_file_extensions,
timeout_seconds: 300, // 5 minutes timeout for crawl estimation
server_type: Some("nextcloud".to_string()), // Default to Nextcloud
loop_detection: crate::services::webdav::loop_detection::LoopDetectionConfig::default(),
})
}
@ -108,7 +107,6 @@ async fn test_webdav_connection(
file_extensions: Vec::new(),
timeout_seconds: 300, // 5 minutes timeout for crawl estimation
server_type: test_config.server_type.clone(),
loop_detection: crate::services::webdav::loop_detection::LoopDetectionConfig::default(),
};
// Create WebDAV service and test connection

View File

@ -745,7 +745,6 @@ impl SourceScheduler {
file_extensions: webdav_config.file_extensions.clone(),
timeout_seconds: 600, // 10 minutes for deep scan
server_type: webdav_config.server_type.clone(),
loop_detection: crate::services::webdav::loop_detection::LoopDetectionConfig::default(),
}
)?;
@ -1033,7 +1032,6 @@ impl SourceScheduler {
file_extensions: config.file_extensions.clone(),
timeout_seconds: 30, // Quick connectivity test
server_type: config.server_type.clone(),
loop_detection: crate::services::webdav::loop_detection::LoopDetectionConfig::default(),
};
let webdav_service = crate::services::webdav::WebDAVService::new(webdav_config)

View File

@ -109,7 +109,6 @@ impl SourceSyncService {
file_extensions: config.file_extensions,
timeout_seconds: 180, // 3 minutes for discover_files_in_folder operations
server_type: config.server_type,
loop_detection: crate::services::webdav::loop_detection::LoopDetectionConfig::default(),
};
let webdav_service = WebDAVService::new(webdav_config.clone())

View File

@ -275,7 +275,6 @@ impl WebDAVScheduler {
file_extensions: settings.webdav_file_extensions.clone(),
timeout_seconds: 30,
server_type: Some("nextcloud".to_string()),
loop_detection: crate::services::webdav::loop_detection::LoopDetectionConfig::default(),
})
}

View File

@ -130,39 +130,92 @@ impl SourceErrorTracker {
source_id: Option<Uuid>,
resource_path: &str,
) -> Result<SkipDecision> {
// Check if there are any failures for this resource path
match self.db.is_source_known_failure(user_id, source_type, source_id, resource_path).await {
Ok(true) => {
// There is a known failure, but we need more details
// For now, implement a simple cooldown based on the fact that there's a failure
// In a real implementation, we'd need a method that returns failure details
let skip_reason = format!("Resource has previous failures recorded in system");
// Get detailed failure information from the database
let query = crate::models::ListFailuresQuery {
source_type: Some(source_type),
source_id: source_id,
error_type: None,
severity: None,
include_resolved: Some(false), // Only unresolved failures
include_excluded: Some(true), // Include user-excluded resources
ready_for_retry: None,
limit: Some(1),
offset: None,
};
info!(
"⏭️ Skipping {} resource '{}' due to error tracking: {}",
source_type, resource_path, skip_reason
);
match self.db.list_source_scan_failures(user_id, &query).await {
Ok(failures) => {
if let Some(failure) = failures.into_iter()
.find(|f| f.resource_path == resource_path) {
Ok(SkipDecision {
should_skip: true,
reason: skip_reason,
failure_count: 1, // We don't have exact count, use 1 as placeholder
time_since_last_failure_minutes: 0, // We don't have exact time
cooldown_remaining_minutes: Some(60), // Default 1 hour cooldown
})
}
Ok(false) => {
debug!(
"✅ No previous failures for {} resource '{}', proceeding with scan",
source_type, resource_path
);
Ok(SkipDecision {
should_skip: false,
reason: "No previous failures recorded".to_string(),
failure_count: 0,
time_since_last_failure_minutes: 0,
cooldown_remaining_minutes: None,
})
// Check if this failure should be skipped based on sophisticated database logic
let now = chrono::Utc::now();
let time_since_last = now.signed_duration_since(failure.last_failure_at);
let time_since_last_minutes = time_since_last.num_minutes();
// Calculate cooldown remaining if there's a next retry time
let cooldown_remaining = if let Some(next_retry) = failure.next_retry_at {
let remaining = next_retry.signed_duration_since(now);
if remaining.num_seconds() > 0 {
Some(remaining.num_minutes().max(0))
} else {
None
}
} else {
None
};
// Use the same sophisticated logic as the database query
let should_skip = failure.user_excluded ||
(matches!(failure.error_severity, crate::models::SourceErrorSeverity::Critical | crate::models::SourceErrorSeverity::High)
&& failure.failure_count > 3) ||
cooldown_remaining.is_some();
let reason = if failure.user_excluded {
"Resource has been manually excluded by user".to_string()
} else if matches!(failure.error_severity, crate::models::SourceErrorSeverity::Critical | crate::models::SourceErrorSeverity::High)
&& failure.failure_count > 3 {
format!("Resource has failed {} times with {} severity - giving up",
failure.failure_count, failure.error_severity)
} else if let Some(cooldown_mins) = cooldown_remaining {
format!("Resource is in cooldown period ({} minutes remaining)", cooldown_mins)
} else {
"Resource has unresolved failures but is ready for retry".to_string()
};
if should_skip {
info!(
"⏭️ Skipping {} resource '{}' due to error tracking: {}",
source_type, resource_path, reason
);
} else {
debug!(
"🔄 Allowing retry of {} resource '{}' after {} failures",
source_type, resource_path, failure.failure_count
);
}
Ok(SkipDecision {
should_skip,
reason,
failure_count: failure.failure_count,
time_since_last_failure_minutes: time_since_last_minutes,
cooldown_remaining_minutes: cooldown_remaining,
})
} else {
// No failure record found for this specific path
debug!(
"✅ No previous failures for {} resource '{}', proceeding with scan",
source_type, resource_path
);
Ok(SkipDecision {
should_skip: false,
reason: "No previous failures recorded".to_string(),
failure_count: 0,
time_since_last_failure_minutes: 0,
cooldown_remaining_minutes: None,
})
}
}
Err(e) => {
warn!(
@ -407,11 +460,14 @@ impl SourceErrorTracker {
_ => crate::models::RetryStrategy::Exponential,
};
let retry_delay = match error_type {
SourceErrorType::RateLimited => 600, // 10 minutes for rate limits
SourceErrorType::NetworkError => 60, // 1 minute for network issues
SourceErrorType::Timeout => 900, // 15 minutes for timeouts
_ => 300, // 5 minutes default
let (retry_delay, max_retries) = match error_type {
SourceErrorType::RateLimited => (600, 3), // 10 minutes, 3 retries max
SourceErrorType::NetworkError => (60, 5), // 1 minute, 5 retries max
SourceErrorType::Timeout => (300, 3), // 5 minutes, 3 retries max
SourceErrorType::ServerError => (300, 3), // 5 minutes, 3 retries max
SourceErrorType::NotFound => (0, 0), // Don't retry 404s
SourceErrorType::PermissionDenied => (0, 0), // Don't retry permission errors
_ => (300, 3), // 5 minutes default, 3 retries
};
ErrorClassification {
@ -419,7 +475,7 @@ impl SourceErrorTracker {
severity,
retry_strategy,
retry_delay_seconds: retry_delay,
max_retries: 5,
max_retries,
user_friendly_message: format!("Error accessing resource: {}", error),
recommended_action: "The system will retry this operation automatically.".to_string(),
diagnostic_data: serde_json::json!({

View File

@ -1,5 +1,4 @@
use super::loop_detection::LoopDetectionConfig;
/// WebDAV server configuration
#[derive(Debug, Clone)]
@ -11,7 +10,6 @@ pub struct WebDAVConfig {
pub file_extensions: Vec<String>,
pub timeout_seconds: u64,
pub server_type: Option<String>, // "nextcloud", "owncloud", "generic"
pub loop_detection: LoopDetectionConfig,
}
/// Retry configuration for WebDAV operations
@ -103,7 +101,6 @@ impl WebDAVConfig {
file_extensions,
timeout_seconds: 30,
server_type: None,
loop_detection: LoopDetectionConfig::default(),
}
}

File diff suppressed because it is too large Load Diff

View File

@ -1,323 +0,0 @@
#[cfg(test)]
mod tests {
use super::super::*;
use super::super::loop_detection::{LoopDetectionService, LoopDetectionConfig, LoopType};
use crate::{AppState, config::Config};
use std::sync::Arc;
use std::time::Duration;
use tokio::time::sleep;
use uuid::Uuid;
/// Helper to create a test WebDAV service with loop detection enabled
async fn create_test_webdav_service_with_loop_detection() -> WebDAVService {
let mut config = WebDAVConfig::new(
"http://localhost:8080".to_string(),
"test_user".to_string(),
"test_pass".to_string(),
vec!["/test".to_string()],
vec!["pdf".to_string(), "txt".to_string()],
);
// Configure loop detection with tight thresholds for testing
config.loop_detection = LoopDetectionConfig {
enabled: true,
max_access_count: 2, // Very low for testing
time_window_secs: 10, // Short window
max_scan_duration_secs: 5, // Short timeout
min_scan_interval_secs: 1, // Short interval
max_pattern_depth: 5,
max_tracked_directories: 100,
enable_pattern_analysis: true,
log_level: "debug".to_string(),
};
WebDAVService::new(config).expect("Failed to create WebDAV service")
}
/// Helper to create a mock WebDAV server response for testing
fn create_mock_webdav_response(num_files: usize, num_dirs: usize) -> WebDAVDiscoveryResult {
let mut files = Vec::new();
let mut directories = Vec::new();
for i in 0..num_files {
files.push(crate::models::FileIngestionInfo {
uuid: Uuid::new_v4(),
filename: format!("file_{}.pdf", i),
relative_path: format!("/test/file_{}.pdf", i),
absolute_url: format!("http://localhost:8080/test/file_{}.pdf", i),
file_size_bytes: 1024 * (i + 1) as i64,
last_modified: chrono::Utc::now(),
etag: format!("etag_{}", i),
content_type: "application/pdf".to_string(),
is_directory: false,
});
}
for i in 0..num_dirs {
directories.push(crate::models::FileIngestionInfo {
uuid: Uuid::new_v4(),
filename: format!("dir_{}", i),
relative_path: format!("/test/dir_{}", i),
absolute_url: format!("http://localhost:8080/test/dir_{}/", i),
file_size_bytes: 0,
last_modified: chrono::Utc::now(),
etag: format!("dir_etag_{}", i),
content_type: "httpd/unix-directory".to_string(),
is_directory: true,
});
}
WebDAVDiscoveryResult { files, directories }
}
#[tokio::test]
async fn test_loop_detection_immediate_rescan() {
let service = create_test_webdav_service_with_loop_detection().await;
// First access should succeed
let access1 = service.loop_detector.start_access("/test/path", "test_scan").unwrap();
service.loop_detector.complete_access(access1, Some(5), Some(2), None).unwrap();
// Immediate second access should fail due to min_scan_interval
let result = service.loop_detector.start_access("/test/path", "test_scan");
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("re-accessed after only"));
// Metrics should show the loop detection
let metrics = service.get_loop_detection_metrics().unwrap();
assert_eq!(metrics["total_loops_detected"], 1);
}
#[tokio::test]
async fn test_loop_detection_concurrent_access() {
let service = create_test_webdav_service_with_loop_detection().await;
// Start first access
let _access1 = service.loop_detector.start_access("/test/path", "scan1").unwrap();
// Concurrent access should fail
let result = service.loop_detector.start_access("/test/path", "scan2");
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("Concurrent access detected"));
let metrics = service.get_loop_detection_metrics().unwrap();
assert_eq!(metrics["total_loops_detected"], 1);
}
#[tokio::test]
async fn test_loop_detection_frequency_limit() {
let service = create_test_webdav_service_with_loop_detection().await;
// Clear state to start fresh
service.clear_loop_detection_state().unwrap();
// Do multiple accesses that complete quickly
for i in 0..3 {
if i > 0 {
// Wait minimum interval to avoid immediate re-scan detection
sleep(Duration::from_millis(1100)).await;
}
let access = service.loop_detector.start_access("/test/freq_path", &format!("scan_{}", i));
if i < 2 {
// First two should succeed
assert!(access.is_ok());
let access_id = access.unwrap();
service.loop_detector.complete_access(access_id, Some(i * 2), Some(i), None).unwrap();
} else {
// Third should fail due to frequency limit
assert!(access.is_err());
assert!(access.unwrap_err().to_string().contains("accessed 2 times"));
}
}
let metrics = service.get_loop_detection_metrics().unwrap();
assert_eq!(metrics["total_loops_detected"], 1);
}
#[tokio::test]
async fn test_loop_detection_disabled() {
let mut config = WebDAVConfig::new(
"http://localhost:8080".to_string(),
"test_user".to_string(),
"test_pass".to_string(),
vec!["/test".to_string()],
vec!["pdf".to_string()],
);
// Disable loop detection
config.loop_detection.enabled = false;
let service = WebDAVService::new(config).unwrap();
// Multiple rapid accesses should all succeed when disabled
for i in 0..5 {
let access = service.loop_detector.start_access("/test/path", &format!("scan_{}", i)).unwrap();
service.loop_detector.complete_access(access, Some(i), Some(1), None).unwrap();
}
let metrics = service.get_loop_detection_metrics().unwrap();
assert!(!metrics["enabled"].as_bool().unwrap());
}
#[tokio::test]
async fn test_loop_detection_error_tracking() {
let service = create_test_webdav_service_with_loop_detection().await;
// Test error tracking in loop detection
let access = service.loop_detector.start_access("/test/error_path", "error_scan").unwrap();
service.loop_detector.complete_access(
access,
None,
None,
Some("Test error message".to_string())
).unwrap();
let metrics = service.get_loop_detection_metrics().unwrap();
assert_eq!(metrics["total_accesses"], 1);
assert_eq!(metrics["total_loops_detected"], 0); // No loops, just an error
}
#[tokio::test]
async fn test_loop_detection_cleanup() {
let service = create_test_webdav_service_with_loop_detection().await;
// Add some access data
for i in 0..3 {
let access = service.loop_detector.start_access(&format!("/test/cleanup_{}", i), "cleanup_scan").unwrap();
service.loop_detector.complete_access(access, Some(i), Some(1), None).unwrap();
sleep(Duration::from_millis(100)).await; // Small delay between accesses
}
let metrics_before = service.get_loop_detection_metrics().unwrap();
assert_eq!(metrics_before["total_accesses"], 3);
// Clear state
service.clear_loop_detection_state().unwrap();
let metrics_after = service.get_loop_detection_metrics().unwrap();
assert_eq!(metrics_after["active_accesses"], 0);
assert_eq!(metrics_after["history_size"], 0);
}
#[tokio::test]
async fn test_loop_detection_config_update() {
let service = create_test_webdav_service_with_loop_detection().await;
// Update configuration
let mut new_config = LoopDetectionConfig::default();
new_config.max_access_count = 10; // Much higher limit
new_config.log_level = "info".to_string();
service.update_loop_detection_config(new_config).unwrap();
let metrics = service.get_loop_detection_metrics().unwrap();
assert_eq!(metrics["config"]["max_access_count"], 10);
assert_eq!(metrics["config"]["log_level"], "info");
}
#[tokio::test]
async fn test_pattern_analysis_circular_detection() {
let service = create_test_webdav_service_with_loop_detection().await;
service.clear_loop_detection_state().unwrap();
// Simulate A -> B -> A pattern with proper timing
let paths = ["/test/path_a", "/test/path_b", "/test/path_a"];
for (i, path) in paths.iter().enumerate() {
if i > 0 {
sleep(Duration::from_millis(1100)).await; // Wait minimum interval
}
let access = service.loop_detector.start_access(path, &format!("pattern_scan_{}", i));
if i < 2 {
// First two should succeed
assert!(access.is_ok());
let access_id = access.unwrap();
service.loop_detector.complete_access(access_id, Some(1), Some(0), None).unwrap();
} else {
// Third access to path_a might trigger pattern detection
// Note: The exact behavior depends on the pattern detection algorithm
if let Err(e) = access {
println!("Pattern detection triggered: {}", e);
} else {
let access_id = access.unwrap();
service.loop_detector.complete_access(access_id, Some(1), Some(0), None).unwrap();
}
}
}
let metrics = service.get_loop_detection_metrics().unwrap();
println!("Pattern analysis metrics: {}", serde_json::to_string_pretty(&metrics).unwrap());
}
#[tokio::test]
async fn test_webdav_service_integration_with_loop_detection() {
// This test would ideally connect to a real WebDAV server
// For now, we test the integration points
let service = create_test_webdav_service_with_loop_detection().await;
// Test that the service has loop detection enabled
let metrics = service.get_loop_detection_metrics().unwrap();
assert!(metrics["enabled"].as_bool().unwrap());
// Test configuration access
assert_eq!(metrics["config"]["max_access_count"], 2);
assert_eq!(metrics["config"]["time_window_secs"], 10);
// Test that we can update the config
let mut new_config = LoopDetectionConfig::default();
new_config.enabled = false;
service.update_loop_detection_config(new_config).unwrap();
let updated_metrics = service.get_loop_detection_metrics().unwrap();
assert!(!updated_metrics["enabled"].as_bool().unwrap());
}
/// Integration test with SmartSyncService
#[tokio::test]
async fn test_smart_sync_loop_detection_integration() {
// Create test app state
let test_config = Config::test_default();
let app_state = Arc::new(AppState::new_for_testing(test_config).await.unwrap());
let smart_sync = SmartSyncService::new(app_state);
let webdav_service = create_test_webdav_service_with_loop_detection().await;
// Test that SmartSyncService can access loop detection metrics
let metrics = smart_sync.get_loop_detection_metrics(&webdav_service).unwrap();
assert!(metrics["enabled"].as_bool().unwrap());
// Test that metrics are properly structured
assert!(metrics.get("total_accesses").is_some());
assert!(metrics.get("total_loops_detected").is_some());
assert!(metrics.get("config").is_some());
}
/// Performance test to ensure loop detection doesn't significantly impact performance
#[tokio::test]
async fn test_loop_detection_performance() {
let service = create_test_webdav_service_with_loop_detection().await;
let start_time = std::time::Instant::now();
// Perform many operations with different paths to avoid triggering detection
for i in 0..100 {
let path = format!("/test/perf_path_{}", i);
let access = service.loop_detector.start_access(&path, "perf_test").unwrap();
service.loop_detector.complete_access(access, Some(10), Some(2), None).unwrap();
}
let elapsed = start_time.elapsed();
println!("100 loop detection operations took: {:?}", elapsed);
// Should complete quickly (within 1 second for 100 operations)
assert!(elapsed < Duration::from_secs(1), "Loop detection performance too slow: {:?}", elapsed);
let metrics = service.get_loop_detection_metrics().unwrap();
assert_eq!(metrics["total_accesses"], 100);
assert_eq!(metrics["total_loops_detected"], 0);
}
}

View File

@ -5,7 +5,6 @@ pub mod config;
pub mod service;
pub mod smart_sync;
pub mod progress_shim; // Backward compatibility shim for simplified progress tracking
pub mod loop_detection; // Loop detection and monitoring for sync operations
// Re-export main types for convenience
pub use common::build_user_agent;
@ -16,7 +15,6 @@ pub use service::{
ValidationRecommendation, ValidationAction, ValidationSummary
};
pub use smart_sync::{SmartSyncService, SmartSyncDecision, SmartSyncStrategy, SmartSyncResult};
pub use loop_detection::{LoopDetectionService, LoopDetectionConfig, LoopDetectionResult, LoopType};
// Backward compatibility exports for progress tracking (simplified)
pub use progress_shim::{SyncProgress, SyncPhase, ProgressStats};
@ -29,6 +27,4 @@ mod subdirectory_edge_cases_tests;
#[cfg(test)]
mod protocol_detection_tests;
#[cfg(test)]
mod loop_detection_integration_tests;
#[cfg(test)]
mod tests;

View File

@ -23,7 +23,6 @@ use crate::mime_detection::{detect_mime_from_content, MimeDetectionResult};
use super::{config::{WebDAVConfig, RetryConfig, ConcurrencyConfig}, SyncProgress};
use super::common::build_user_agent;
use super::loop_detection::LoopDetectionService;
/// Results from WebDAV discovery including both files and directories
#[derive(Debug, Clone)]
@ -154,8 +153,6 @@ pub struct WebDAVService {
download_semaphore: Arc<Semaphore>,
/// Stores the working protocol (updated after successful protocol detection)
working_protocol: Arc<std::sync::RwLock<Option<String>>>,
/// Loop detection service for monitoring sync patterns
loop_detector: LoopDetectionService,
}
impl WebDAVService {
@ -187,9 +184,6 @@ impl WebDAVService {
let scan_semaphore = Arc::new(Semaphore::new(concurrency_config.max_concurrent_scans));
let download_semaphore = Arc::new(Semaphore::new(concurrency_config.max_concurrent_downloads));
// Create loop detector with config from WebDAV config
let loop_detector = LoopDetectionService::with_config(config.loop_detection.clone());
Ok(Self {
client,
config,
@ -198,24 +192,9 @@ impl WebDAVService {
scan_semaphore,
download_semaphore,
working_protocol: Arc::new(std::sync::RwLock::new(None)),
loop_detector,
})
}
/// Get loop detection metrics and status
pub async fn get_loop_detection_metrics(&self) -> Result<serde_json::Value> {
self.loop_detector.get_metrics().await
}
/// Update loop detection configuration
pub async fn update_loop_detection_config(&self, config: super::loop_detection::LoopDetectionConfig) -> Result<()> {
self.loop_detector.update_config(config).await
}
/// Clear loop detection state (useful for testing)
pub async fn clear_loop_detection_state(&self) -> Result<()> {
self.loop_detector.clear_state().await
}
// ============================================================================
// Protocol Detection Methods
@ -304,7 +283,6 @@ impl WebDAVService {
file_extensions: self.config.file_extensions.clone(),
timeout_seconds: self.config.timeout_seconds,
server_type: self.config.server_type.clone(),
loop_detection: self.config.loop_detection.clone(),
};
// Test basic OPTIONS request
@ -433,7 +411,6 @@ impl WebDAVService {
file_extensions: vec![],
timeout_seconds: 30,
server_type: test_config.server_type.clone(),
loop_detection: super::loop_detection::LoopDetectionConfig::default(),
};
let service = Self::new(config)?;
@ -452,7 +429,6 @@ impl WebDAVService {
file_extensions: self.config.file_extensions.clone(),
timeout_seconds: self.config.timeout_seconds,
server_type: self.config.server_type.clone(),
loop_detection: self.config.loop_detection.clone(),
};
let webdav_url = temp_config.webdav_url();
@ -846,7 +822,6 @@ impl WebDAVService {
file_extensions: self.config.file_extensions.clone(),
timeout_seconds: self.config.timeout_seconds,
server_type: self.config.server_type.clone(),
loop_detection: self.config.loop_detection.clone(),
};
let base_url = temp_config.webdav_url();
let clean_path = path.trim_start_matches('/');
@ -918,7 +893,6 @@ impl WebDAVService {
file_extensions: self.config.file_extensions.clone(),
timeout_seconds: self.config.timeout_seconds,
server_type: self.config.server_type.clone(),
loop_detection: self.config.loop_detection.clone(),
};
let base_url = temp_config.webdav_url();
@ -1147,15 +1121,6 @@ impl WebDAVService {
/// Discovers both files and directories in a single directory
async fn discover_files_and_directories_single(&self, directory_path: &str) -> Result<WebDAVDiscoveryResult> {
// Start loop detection tracking with graceful degradation
let access_id = match self.loop_detector.start_access(directory_path, "single_discovery").await {
Ok(id) => id,
Err(e) => {
// Log the loop detection error but continue with sync
warn!("Loop detection failed for '{}': {} - continuing sync without detection", directory_path, e);
uuid::Uuid::new_v4() // Use dummy ID to continue
}
};
let result = async {
// Try the primary URL first, then fallback URLs if we get a 405 error
@ -1173,30 +1138,6 @@ impl WebDAVService {
}
}.await;
// Complete loop detection tracking with graceful degradation
match &result {
Ok(discovery) => {
if let Err(e) = self.loop_detector.complete_access(
access_id,
Some(discovery.files.len()),
Some(discovery.directories.len()),
None
).await {
debug!("Loop detection completion failed for '{}': {} - sync completed successfully", directory_path, e);
}
}
Err(e) => {
if let Err(completion_err) = self.loop_detector.complete_access(
access_id,
None,
None,
Some(e.to_string())
).await {
debug!("Loop detection completion failed for '{}': {} - original error: {}", directory_path, completion_err, e);
}
}
}
result
}
@ -1301,15 +1242,6 @@ impl WebDAVService {
/// Discovers files and directories recursively
async fn discover_files_and_directories_recursive(&self, directory_path: &str) -> Result<WebDAVDiscoveryResult> {
// Start loop detection tracking for the root directory with graceful degradation
let access_id = match self.loop_detector.start_access(directory_path, "recursive_discovery").await {
Ok(id) => id,
Err(e) => {
// Log the loop detection error but continue with sync
warn!("Loop detection failed for recursive discovery '{}': {} - continuing sync without detection", directory_path, e);
uuid::Uuid::new_v4() // Use dummy ID to continue
}
};
let mut all_files = Vec::new();
let mut all_directories = Vec::new();
@ -1381,16 +1313,6 @@ impl WebDAVService {
info!("Recursive scan completed. Found {} files and {} directories", all_files.len(), all_directories.len());
// Complete loop detection tracking with graceful degradation
if let Err(e) = self.loop_detector.complete_access(
access_id,
Some(all_files.len()),
Some(all_directories.len()),
None
).await {
debug!("Loop detection completion failed for recursive discovery '{}': {} - sync completed successfully", directory_path, e);
}
Ok(WebDAVDiscoveryResult {
files: all_files,
directories: all_directories
@ -2323,7 +2245,6 @@ impl WebDAVService {
file_extensions: self.config.file_extensions.clone(),
timeout_seconds: self.config.timeout_seconds,
server_type: self.config.server_type.clone(),
loop_detection: self.config.loop_detection.clone(),
};
let options_response = self.authenticated_request(
@ -2662,7 +2583,6 @@ impl Clone for WebDAVService {
scan_semaphore: Arc::clone(&self.scan_semaphore),
download_semaphore: Arc::clone(&self.download_semaphore),
working_protocol: Arc::clone(&self.working_protocol),
loop_detector: self.loop_detector.clone(),
}
}
}
@ -2692,7 +2612,6 @@ mod tests {
file_extensions: vec![],
timeout_seconds: 30,
server_type: Some("generic".to_string()),
loop_detection: super::loop_detection::LoopDetectionConfig::default(),
};
let service = WebDAVService::new(config).expect("Failed to create WebDAV service");
@ -2718,7 +2637,6 @@ mod tests {
file_extensions: vec![],
timeout_seconds: 30,
server_type: Some("generic".to_string()),
loop_detection: super::loop_detection::LoopDetectionConfig::default(),
};
let retry_config = RetryConfig {

View File

@ -59,9 +59,12 @@ impl SmartSyncService {
&self.state
}
/// Get loop detection metrics from the WebDAV service
pub async fn get_loop_detection_metrics(&self, webdav_service: &WebDAVService) -> Result<serde_json::Value> {
webdav_service.get_loop_detection_metrics().await
/// Get sync metrics (simplified version without loop detection)
pub async fn get_sync_metrics(&self, _webdav_service: &WebDAVService) -> Result<serde_json::Value> {
Ok(serde_json::json!({
"loop_detection_removed": true,
"message": "Loop detection has been replaced with database-based failure tracking"
}))
}
/// Evaluates whether sync is needed and determines the best strategy

View File

@ -1,562 +0,0 @@
/*!
* WebDAV Loop Detection Stress Test
*
* This stress test exercises the actual WebDAV sync functionality with loop detection enabled.
* It creates scenarios that could cause loops and verifies that they are properly detected
* and reported by the instrumented sync code.
*/
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::collections::HashMap;
use tokio::time::sleep;
use anyhow::{Result, Context};
use tracing::{info, warn, error, debug};
use uuid::Uuid;
use serde_json::{json, Value};
use readur::services::webdav::{
WebDAVService, WebDAVConfig, SmartSyncService,
LoopDetectionConfig, LoopType
};
use readur::{AppState, config::Config};
/// Configuration for the stress test
#[derive(Debug, Clone)]
pub struct StressTestConfig {
/// Duration to run the stress test
pub duration_secs: u64,
/// WebDAV server URL for testing
pub webdav_url: String,
/// WebDAV username
pub username: String,
/// WebDAV password
pub password: String,
/// Number of concurrent sync operations
pub concurrent_syncs: usize,
/// Directories to test
pub test_directories: Vec<String>,
/// Whether to intentionally trigger loops for testing
pub trigger_test_loops: bool,
/// Loop detection timeout
pub loop_detection_timeout_secs: u64,
}
impl Default for StressTestConfig {
fn default() -> Self {
Self {
duration_secs: std::env::var("STRESS_TEST_DURATION")
.unwrap_or_else(|_| "300".to_string())
.parse()
.unwrap_or(300),
webdav_url: std::env::var("WEBDAV_DUFS_URL")
.unwrap_or_else(|_| "http://localhost:8080".to_string()),
username: std::env::var("WEBDAV_USERNAME")
.unwrap_or_else(|_| "webdav_user".to_string()),
password: std::env::var("WEBDAV_PASSWORD")
.unwrap_or_else(|_| "webdav_pass".to_string()),
concurrent_syncs: std::env::var("CONCURRENT_SYNCS")
.unwrap_or_else(|_| "4".to_string())
.parse()
.unwrap_or(4),
test_directories: vec![
"/stress_test".to_string(),
"/stress_test/nested".to_string(),
"/stress_test/deep/structure".to_string(),
"/stress_test/complex".to_string(),
],
trigger_test_loops: std::env::var("TRIGGER_TEST_LOOPS")
.unwrap_or_else(|_| "true".to_string())
.parse()
.unwrap_or(true),
loop_detection_timeout_secs: std::env::var("LOOP_DETECTION_TIMEOUT")
.unwrap_or_else(|_| "60".to_string())
.parse()
.unwrap_or(60),
}
}
}
/// Metrics collected during stress testing
#[derive(Debug, Clone)]
pub struct StressTestMetrics {
pub total_sync_operations: u64,
pub successful_syncs: u64,
pub failed_syncs: u64,
pub loops_detected: u64,
pub avg_sync_duration_ms: f64,
pub max_sync_duration_ms: u64,
pub min_sync_duration_ms: u64,
pub files_discovered: u64,
pub directories_discovered: u64,
pub errors_by_type: HashMap<String, u64>,
pub loop_types_detected: HashMap<String, u64>,
}
impl Default for StressTestMetrics {
fn default() -> Self {
Self {
total_sync_operations: 0,
successful_syncs: 0,
failed_syncs: 0,
loops_detected: 0,
avg_sync_duration_ms: 0.0,
max_sync_duration_ms: 0,
min_sync_duration_ms: u64::MAX,
files_discovered: 0,
directories_discovered: 0,
errors_by_type: HashMap::new(),
loop_types_detected: HashMap::new(),
}
}
}
/// Main stress test runner
pub struct WebDAVLoopDetectionStressTest {
config: StressTestConfig,
metrics: Arc<tokio::sync::Mutex<StressTestMetrics>>,
}
impl WebDAVLoopDetectionStressTest {
pub fn new(config: StressTestConfig) -> Self {
Self {
config,
metrics: Arc::new(tokio::sync::Mutex::new(StressTestMetrics::default())),
}
}
/// Create a WebDAV service with loop detection configured for stress testing
fn create_webdav_service(&self) -> Result<WebDAVService> {
let mut webdav_config = WebDAVConfig::new(
self.config.webdav_url.clone(),
self.config.username.clone(),
self.config.password.clone(),
self.config.test_directories.clone(),
vec!["pdf".to_string(), "txt".to_string(), "doc".to_string(), "docx".to_string()],
);
// Configure loop detection for stress testing
webdav_config.loop_detection = LoopDetectionConfig {
enabled: true,
max_access_count: 5, // Reasonable limit for stress testing
time_window_secs: 60, // 1-minute window
max_scan_duration_secs: self.config.loop_detection_timeout_secs,
min_scan_interval_secs: 2, // 2-second minimum interval
max_pattern_depth: 10,
max_tracked_directories: 1000,
enable_pattern_analysis: true,
log_level: "warn".to_string(), // Reduce log noise during stress test
};
WebDAVService::new(webdav_config)
.context("Failed to create WebDAV service for stress testing")
}
/// Run the main stress test
pub async fn run(&self) -> Result<StressTestMetrics> {
info!("🚀 Starting WebDAV Loop Detection Stress Test");
info!("Configuration: {:?}", self.config);
let start_time = Instant::now();
let end_time = start_time + Duration::from_secs(self.config.duration_secs);
// Create WebDAV services for concurrent testing
let mut webdav_services = Vec::new();
for i in 0..self.config.concurrent_syncs {
match self.create_webdav_service() {
Ok(service) => {
info!("✅ Created WebDAV service #{}", i);
webdav_services.push(service);
}
Err(e) => {
error!("❌ Failed to create WebDAV service #{}: {}", i, e);
return Err(e);
}
}
}
// Create app state for SmartSyncService
let test_config = Config::test_default();
let app_state = Arc::new(AppState::new_for_testing(test_config).await
.context("Failed to create app state for testing")?);
let smart_sync_service = SmartSyncService::new(app_state.clone());
info!("🏁 Starting stress test operations...");
// Launch concurrent sync operations
let mut handles = Vec::new();
for (service_id, webdav_service) in webdav_services.into_iter().enumerate() {
let service = Arc::new(webdav_service);
let smart_sync = smart_sync_service.clone();
let config = self.config.clone();
let metrics = Arc::clone(&self.metrics);
let handle = tokio::spawn(async move {
Self::run_sync_operations(
service_id,
service,
smart_sync,
config,
metrics,
end_time
).await
});
handles.push(handle);
}
// Wait for all operations to complete
for (i, handle) in handles.into_iter().enumerate() {
match handle.await {
Ok(result) => {
if let Err(e) = result {
warn!("Sync operation #{} completed with error: {}", i, e);
} else {
info!("✅ Sync operation #{} completed successfully", i);
}
}
Err(e) => {
error!("❌ Failed to join sync operation #{}: {}", i, e);
}
}
}
let total_duration = start_time.elapsed();
info!("🏁 Stress test completed in {:.2}s", total_duration.as_secs_f64());
// Generate final metrics
let final_metrics = self.generate_final_metrics().await;
self.print_stress_test_report(&final_metrics, total_duration);
Ok(final_metrics)
}
/// Run sync operations for a single WebDAV service
async fn run_sync_operations(
service_id: usize,
webdav_service: Arc<WebDAVService>,
smart_sync_service: SmartSyncService,
config: StressTestConfig,
metrics: Arc<tokio::sync::Mutex<StressTestMetrics>>,
end_time: Instant,
) -> Result<()> {
let user_id = Uuid::new_v4();
let mut operation_count = 0;
info!("🔄 Service #{} starting sync operations", service_id);
while Instant::now() < end_time {
operation_count += 1;
let op_start = Instant::now();
// Randomly select a directory to sync
let dir_index = operation_count % config.test_directories.len();
let target_directory = &config.test_directories[dir_index];
debug!("Service #{} operation #{}: syncing {}", service_id, operation_count, target_directory);
// Perform sync operation with loop detection
let sync_result = Self::perform_monitored_sync(
&*webdav_service,
&smart_sync_service,
user_id,
target_directory,
operation_count,
).await;
let op_duration = op_start.elapsed();
// Update metrics
Self::update_metrics(
&metrics,
&sync_result,
op_duration,
&*webdav_service,
).await;
// If we're testing loop triggers, occasionally create conditions that might cause loops
if config.trigger_test_loops && operation_count % 10 == 0 {
Self::trigger_test_loop_scenario(&*webdav_service, target_directory).await;
}
// Brief pause between operations to avoid overwhelming the server
sleep(Duration::from_millis(100 + (service_id * 50) as u64)).await;
}
info!("📊 Service #{} completed {} operations", service_id, operation_count);
Ok(())
}
/// Perform a single sync operation with comprehensive monitoring
async fn perform_monitored_sync(
webdav_service: &WebDAVService,
smart_sync_service: &SmartSyncService,
user_id: Uuid,
directory: &str,
operation_id: usize,
) -> Result<(usize, usize)> {
// First evaluate if sync is needed
match smart_sync_service.evaluate_sync_need(
user_id,
webdav_service,
directory,
None, // No progress tracking for stress test
).await {
Ok(decision) => {
match decision {
readur::services::webdav::SmartSyncDecision::SkipSync => {
debug!("Operation #{}: Sync skipped for {}", operation_id, directory);
Ok((0, 0))
}
readur::services::webdav::SmartSyncDecision::RequiresSync(strategy) => {
// Perform the actual sync
match smart_sync_service.perform_smart_sync(
user_id,
None, // No source ID for stress test
webdav_service,
directory,
strategy,
None, // No progress tracking
).await {
Ok(result) => Ok((result.files.len(), result.directories.len())),
Err(e) => {
if e.to_string().contains("Loop detected") {
debug!("Operation #{}: Loop detected for {} - {}", operation_id, directory, e);
Err(e)
} else {
warn!("Operation #{}: Sync failed for {} - {}", operation_id, directory, e);
Err(e)
}
}
}
}
}
}
Err(e) => {
warn!("Operation #{}: Sync evaluation failed for {} - {}", operation_id, directory, e);
Err(e)
}
}
}
/// Trigger test scenarios that might cause loops (for testing purposes)
async fn trigger_test_loop_scenario(webdav_service: &WebDAVService, directory: &str) {
debug!("🧪 Triggering test loop scenario for {}", directory);
// Rapid repeated access to the same directory
for i in 0..3 {
match webdav_service.discover_files_and_directories(directory, false).await {
Ok(_) => debug!("Test loop trigger #{} succeeded for {}", i, directory),
Err(e) => {
if e.to_string().contains("Loop detected") {
debug!("✅ Test loop scenario successfully triggered loop detection: {}", e);
return;
} else {
debug!("Test loop trigger #{} failed for {}: {}", i, directory, e);
}
}
}
// Very short delay to trigger immediate re-scan detection
sleep(Duration::from_millis(100)).await;
}
}
/// Update metrics based on sync operation result
async fn update_metrics(
metrics: &Arc<tokio::sync::Mutex<StressTestMetrics>>,
sync_result: &Result<(usize, usize)>,
duration: Duration,
webdav_service: &WebDAVService,
) {
let mut m = metrics.lock().await;
m.total_sync_operations += 1;
let duration_ms = duration.as_millis() as u64;
m.max_sync_duration_ms = m.max_sync_duration_ms.max(duration_ms);
m.min_sync_duration_ms = m.min_sync_duration_ms.min(duration_ms);
// Update average duration
let total_duration = m.avg_sync_duration_ms * (m.total_sync_operations - 1) as f64;
m.avg_sync_duration_ms = (total_duration + duration_ms as f64) / m.total_sync_operations as f64;
match sync_result {
Ok((files, dirs)) => {
m.successful_syncs += 1;
m.files_discovered += *files as u64;
m.directories_discovered += *dirs as u64;
}
Err(e) => {
m.failed_syncs += 1;
let error_msg = e.to_string();
if error_msg.contains("Loop detected") {
m.loops_detected += 1;
// Classify loop types
if error_msg.contains("re-accessed after only") {
*m.loop_types_detected.entry("ImmediateReScan".to_string()).or_insert(0) += 1;
} else if error_msg.contains("Concurrent access detected") {
*m.loop_types_detected.entry("ConcurrentAccess".to_string()).or_insert(0) += 1;
} else if error_msg.contains("accessed") && error_msg.contains("times") {
*m.loop_types_detected.entry("FrequentReAccess".to_string()).or_insert(0) += 1;
} else if error_msg.contains("stuck") {
*m.loop_types_detected.entry("StuckScan".to_string()).or_insert(0) += 1;
} else if error_msg.contains("Circular pattern") {
*m.loop_types_detected.entry("CircularPattern".to_string()).or_insert(0) += 1;
} else {
*m.loop_types_detected.entry("Other".to_string()).or_insert(0) += 1;
}
} else {
// Classify other error types
let error_type = if error_msg.contains("timeout") {
"Timeout"
} else if error_msg.contains("connection") {
"Connection"
} else if error_msg.contains("404") || error_msg.contains("Not Found") {
"NotFound"
} else if error_msg.contains("403") || error_msg.contains("Forbidden") {
"Forbidden"
} else if error_msg.contains("500") || error_msg.contains("Internal Server Error") {
"ServerError"
} else {
"Unknown"
};
*m.errors_by_type.entry(error_type.to_string()).or_insert(0) += 1;
}
}
}
// Collect loop detection metrics from the WebDAV service
if let Ok(ld_metrics) = webdav_service.get_loop_detection_metrics() {
if let Some(total_loops) = ld_metrics.get("total_loops_detected") {
if let Some(loops) = total_loops.as_u64() {
// Update our metrics with the actual count from loop detector
m.loops_detected = m.loops_detected.max(loops);
}
}
}
}
/// Generate final comprehensive metrics
async fn generate_final_metrics(&self) -> StressTestMetrics {
self.metrics.lock().await.clone()
}
/// Print a comprehensive stress test report
fn print_stress_test_report(&self, metrics: &StressTestMetrics, total_duration: Duration) {
println!("\n" + "=".repeat(80).as_str());
println!("📊 WEBDAV LOOP DETECTION STRESS TEST REPORT");
println!("=".repeat(80));
println!("\n🕒 Test Duration: {:.2}s", total_duration.as_secs_f64());
println!("🔄 Total Sync Operations: {}", metrics.total_sync_operations);
println!("✅ Successful Syncs: {} ({:.1}%)",
metrics.successful_syncs,
metrics.successful_syncs as f64 / metrics.total_sync_operations as f64 * 100.0);
println!("❌ Failed Syncs: {} ({:.1}%)",
metrics.failed_syncs,
metrics.failed_syncs as f64 / metrics.total_sync_operations as f64 * 100.0);
println!("\n🔄 Loop Detection Results:");
println!(" 🚨 Loops Detected: {} ({:.1}%)",
metrics.loops_detected,
metrics.loops_detected as f64 / metrics.total_sync_operations as f64 * 100.0);
if !metrics.loop_types_detected.is_empty() {
println!(" 📊 Loop Types Detected:");
for (loop_type, count) in &metrics.loop_types_detected {
println!(" - {}: {}", loop_type, count);
}
}
println!("\n⚡ Performance Metrics:");
println!(" 📈 Average Sync Duration: {:.2}ms", metrics.avg_sync_duration_ms);
println!(" 🏃 Fastest Sync: {}ms", metrics.min_sync_duration_ms);
println!(" 🐌 Slowest Sync: {}ms", metrics.max_sync_duration_ms);
println!(" 🏁 Operations per Second: {:.2}",
metrics.total_sync_operations as f64 / total_duration.as_secs_f64());
println!("\n📁 Discovery Results:");
println!(" 📄 Files Discovered: {}", metrics.files_discovered);
println!(" 📂 Directories Discovered: {}", metrics.directories_discovered);
if !metrics.errors_by_type.is_empty() {
println!("\n❌ Error Breakdown:");
for (error_type, count) in &metrics.errors_by_type {
println!(" - {}: {} ({:.1}%)",
error_type, count,
*count as f64 / metrics.failed_syncs as f64 * 100.0);
}
}
println!("\n" + "=".repeat(80).as_str());
// Generate JSON report for CI/CD
let report = json!({
"test_type": "webdav_loop_detection_stress",
"duration_secs": total_duration.as_secs_f64(),
"total_operations": metrics.total_sync_operations,
"successful_operations": metrics.successful_syncs,
"failed_operations": metrics.failed_syncs,
"success_rate": metrics.successful_syncs as f64 / metrics.total_sync_operations as f64 * 100.0,
"loops_detected": metrics.loops_detected,
"loop_detection_rate": metrics.loops_detected as f64 / metrics.total_sync_operations as f64 * 100.0,
"avg_duration_ms": metrics.avg_sync_duration_ms,
"min_duration_ms": metrics.min_sync_duration_ms,
"max_duration_ms": metrics.max_sync_duration_ms,
"ops_per_second": metrics.total_sync_operations as f64 / total_duration.as_secs_f64(),
"files_discovered": metrics.files_discovered,
"directories_discovered": metrics.directories_discovered,
"loop_types": metrics.loop_types_detected,
"error_types": metrics.errors_by_type,
});
// Write JSON report for CI/CD consumption
if let Ok(report_dir) = std::env::var("STRESS_RESULTS_DIR") {
let report_path = format!("{}/webdav_loop_detection_report.json", report_dir);
if let Err(e) = std::fs::write(&report_path, serde_json::to_string_pretty(&report).unwrap()) {
warn!("Failed to write JSON report to {}: {}", report_path, e);
} else {
info!("📋 JSON report written to {}", report_path);
}
}
}
}
/// Main entry point for the stress test
#[tokio::main]
async fn main() -> Result<()> {
// Initialize tracing
tracing_subscriber::fmt()
.with_env_filter(
std::env::var("RUST_LOG").unwrap_or_else(|_| "info,webdav_loop_detection_stress=debug".to_string())
)
.init();
let config = StressTestConfig::default();
let stress_test = WebDAVLoopDetectionStressTest::new(config);
let metrics = stress_test.run().await
.context("Stress test failed")?;
// Exit with error code if too many loops were detected (indicating a problem)
let loop_rate = metrics.loops_detected as f64 / metrics.total_sync_operations as f64 * 100.0;
if loop_rate > 50.0 {
error!("🚨 CRITICAL: Loop detection rate ({:.1}%) exceeds threshold (50%)", loop_rate);
std::process::exit(1);
}
// Exit with error code if success rate is too low
let success_rate = metrics.successful_syncs as f64 / metrics.total_sync_operations as f64 * 100.0;
if success_rate < 70.0 {
error!("🚨 CRITICAL: Success rate ({:.1}%) below threshold (70%)", success_rate);
std::process::exit(1);
}
info!("🎉 Stress test completed successfully!");
Ok(())
}