use std::{sync::Arc, time::Duration, collections::HashMap}; use tokio::time::sleep; use uuid::Uuid; use futures::future::join_all; use anyhow::Result; use readur::{ AppState, models::{CreateWebDAVDirectory, FileIngestionInfo}, test_utils::{TestContext, TestAuthHelper}, services::webdav::{ SmartSyncService, SmartSyncStrategy, SyncProgress, SyncPhase, WebDAVDiscoveryResult, }, }; /// Helper function to create test setup async fn create_smart_sync_test_state() -> (TestContext, Arc, Uuid) { let test_context = TestContext::new().await; let auth_helper = TestAuthHelper::new(test_context.app().clone()); let test_user = auth_helper.create_test_user().await; let state = test_context.state().clone(); let user_id = test_user.user_response.id; (test_context, state, user_id) } /// Mock WebDAV service for testing smart sync behavior #[derive(Clone)] struct MockWebDAVServiceForSmartSync { discovery_results: Arc>>, call_count: Arc>, delay_ms: u64, should_fail: bool, } impl MockWebDAVServiceForSmartSync { fn new(delay_ms: u64, should_fail: bool) -> Self { Self { discovery_results: Arc::new(std::sync::Mutex::new(HashMap::new())), call_count: Arc::new(std::sync::Mutex::new(0)), delay_ms, should_fail, } } fn set_discovery_result(&self, path: &str, result: WebDAVDiscoveryResult) { let mut results = self.discovery_results.lock().unwrap(); results.insert(path.to_string(), result); } async fn mock_discover_files_and_directories( &self, directory_path: &str, _recursive: bool, ) -> Result { // Increment call count { let mut count = self.call_count.lock().unwrap(); *count += 1; } // Simulate network delay sleep(Duration::from_millis(self.delay_ms)).await; if self.should_fail { return Err(anyhow::anyhow!("Mock WebDAV discovery failed for {}", directory_path)); } // Return preset result or generate default let results = self.discovery_results.lock().unwrap(); if let Some(result) = results.get(directory_path) { Ok(result.clone()) } else { // Generate default result Ok(WebDAVDiscoveryResult { files: vec![ FileIngestionInfo { name: "default.pdf".to_string(), relative_path: format!("{}/default.pdf", directory_path), full_path: format!("{}/default.pdf", directory_path), path: format!("{}/default.pdf", directory_path), size: 1024, mime_type: "application/pdf".to_string(), last_modified: Some(chrono::Utc::now()), etag: format!("default-etag-{}", directory_path.replace('/', "-")), is_directory: false, created_at: None, permissions: None, owner: None, group: None, metadata: None, } ], directories: vec![ FileIngestionInfo { name: "subdir".to_string(), relative_path: format!("{}/subdir", directory_path), full_path: format!("{}/subdir", directory_path), path: format!("{}/subdir", directory_path), size: 0, mime_type: "inode/directory".to_string(), last_modified: Some(chrono::Utc::now()), etag: format!("dir-etag-{}", directory_path.replace('/', "-")), is_directory: true, created_at: None, permissions: None, owner: None, group: None, metadata: None, } ], }) } } fn get_call_count(&self) -> u32 { *self.call_count.lock().unwrap() } } /// Test concurrent smart sync evaluations with different ETag scenarios #[tokio::test] async fn test_concurrent_smart_sync_etag_evaluation() { let (_test_context, state, user_id) = create_smart_sync_test_state().await; let smart_sync_service = SmartSyncService::new(state.clone()); // Set up initial directory state let initial_directories = vec![ CreateWebDAVDirectory { user_id, directory_path: "/test".to_string(), directory_etag: "old-etag-1".to_string(), file_count: 5, total_size_bytes: 1024, }, CreateWebDAVDirectory { user_id, directory_path: "/test/subdir1".to_string(), directory_etag: "old-etag-2".to_string(), file_count: 3, total_size_bytes: 512, }, CreateWebDAVDirectory { user_id, directory_path: "/test/subdir2".to_string(), directory_etag: "old-etag-3".to_string(), file_count: 2, total_size_bytes: 256, }, ]; for dir in initial_directories { state.db.create_or_update_webdav_directory(&dir).await .expect("Failed to create initial directory"); } // Create mock WebDAV services simulating different discovery scenarios let mock_services = vec![ (MockWebDAVServiceForSmartSync::new(50, false), "unchanged"), // ETags unchanged (MockWebDAVServiceForSmartSync::new(75, false), "changed"), // ETags changed (MockWebDAVServiceForSmartSync::new(100, false), "new_dirs"), // New directories (MockWebDAVServiceForSmartSync::new(125, false), "mixed"), // Mixed changes (MockWebDAVServiceForSmartSync::new(25, true), "failed"), // Network failure ]; // Configure mock responses for (mock_service, scenario) in &mock_services { match *scenario { "unchanged" => { // Return same ETags as database mock_service.set_discovery_result("/test", WebDAVDiscoveryResult { files: vec![], directories: vec![ FileIngestionInfo { name: "subdir1".to_string(), relative_path: "/test/subdir1".to_string(), full_path: "/test/subdir1".to_string(), path: "/test/subdir1".to_string(), size: 0, mime_type: "inode/directory".to_string(), last_modified: Some(chrono::Utc::now()), etag: "old-etag-2".to_string(), // Same as database is_directory: true, created_at: None, permissions: None, owner: None, group: None, metadata: None, }, FileIngestionInfo { name: "subdir2".to_string(), relative_path: "/test/subdir2".to_string(), full_path: "/test/subdir2".to_string(), path: "/test/subdir2".to_string(), size: 0, mime_type: "inode/directory".to_string(), last_modified: Some(chrono::Utc::now()), etag: "old-etag-3".to_string(), // Same as database is_directory: true, created_at: None, permissions: None, owner: None, group: None, metadata: None, }, ], }); } "changed" => { // Return different ETags mock_service.set_discovery_result("/test", WebDAVDiscoveryResult { files: vec![], directories: vec![ FileIngestionInfo { name: "subdir1".to_string(), relative_path: "/test/subdir1".to_string(), full_path: "/test/subdir1".to_string(), path: "/test/subdir1".to_string(), size: 0, mime_type: "inode/directory".to_string(), last_modified: Some(chrono::Utc::now()), etag: "new-etag-2".to_string(), // Changed is_directory: true, created_at: None, permissions: None, owner: None, group: None, metadata: None, }, ], }); } "new_dirs" => { // Return new directories mock_service.set_discovery_result("/test", WebDAVDiscoveryResult { files: vec![], directories: vec![ FileIngestionInfo { name: "new_subdir".to_string(), relative_path: "/test/new_subdir".to_string(), full_path: "/test/new_subdir".to_string(), path: "/test/new_subdir".to_string(), size: 0, mime_type: "inode/directory".to_string(), last_modified: Some(chrono::Utc::now()), etag: "new-dir-etag".to_string(), is_directory: true, created_at: None, permissions: None, owner: None, group: None, metadata: None, }, ], }); } "mixed" => { // Mix of changed and new mock_service.set_discovery_result("/test", WebDAVDiscoveryResult { files: vec![], directories: vec![ FileIngestionInfo { name: "subdir1".to_string(), relative_path: "/test/subdir1".to_string(), full_path: "/test/subdir1".to_string(), path: "/test/subdir1".to_string(), size: 0, mime_type: "inode/directory".to_string(), last_modified: Some(chrono::Utc::now()), etag: "updated-etag-2".to_string(), // Changed is_directory: true, created_at: None, permissions: None, owner: None, group: None, metadata: None, }, FileIngestionInfo { name: "another_new_dir".to_string(), relative_path: "/test/another_new_dir".to_string(), full_path: "/test/another_new_dir".to_string(), path: "/test/another_new_dir".to_string(), size: 0, mime_type: "inode/directory".to_string(), last_modified: Some(chrono::Utc::now()), etag: "another-new-etag".to_string(), // New is_directory: true, created_at: None, permissions: None, owner: None, group: None, metadata: None, }, ], }); } _ => {} // Failed case doesn't need setup } } // Run concurrent smart sync evaluations let concurrent_evaluations = mock_services.into_iter().enumerate().map(|(i, (mock_service, scenario))| { let smart_sync_clone = smart_sync_service.clone(); let user_id = user_id; let scenario = scenario.to_string(); tokio::spawn(async move { println!("Starting smart sync evaluation {} ({})", i, scenario); // Since we can't directly inject the mock into SmartSyncService, // we'll simulate the evaluation logic by calling the database methods // that SmartSyncService would call // 1. Get known directories (what SmartSyncService.evaluate_sync_need does) let known_dirs_result = smart_sync_clone.state().db.list_webdav_directories(user_id).await; // 2. Simulate discovery with delay (mock WebDAV call) let discovery_result = mock_service.mock_discover_files_and_directories("/test", false).await; // 3. If discovery succeeded, update directory ETags (what perform_smart_sync would do) let update_results = if let Ok(discovery) = &discovery_result { let mut results = Vec::new(); for dir_info in &discovery.directories { let update_dir = CreateWebDAVDirectory { user_id, directory_path: dir_info.relative_path.clone(), directory_etag: dir_info.etag.clone(), file_count: 0, total_size_bytes: 0, }; let result = smart_sync_clone.state().db.create_or_update_webdav_directory(&update_dir).await; results.push(result.is_ok()); } results } else { vec![] }; println!("Completed smart sync evaluation {} ({})", i, scenario); (i, scenario, known_dirs_result.is_ok(), discovery_result.is_ok(), update_results) }) }); // Wait for all evaluations let evaluation_results: Vec<_> = join_all(concurrent_evaluations).await; // Analyze results let mut scenario_stats = HashMap::new(); for result in evaluation_results { assert!(result.is_ok(), "Evaluation task should complete without panicking"); let (task_id, scenario, db_read_ok, discovery_ok, updates) = result.unwrap(); *scenario_stats.entry(scenario.clone()).or_insert(0) += 1; println!("Evaluation {}: {} - DB read: {}, Discovery: {}, Updates: {:?}", task_id, scenario, db_read_ok, discovery_ok, updates); // Database reads should always succeed assert!(db_read_ok, "Database read should succeed for evaluation {}", task_id); // Discovery should succeed unless it's the "failed" scenario if scenario != "failed" { assert!(discovery_ok, "Discovery should succeed for evaluation {} ({})", task_id, scenario); } } println!("Scenario statistics: {:?}", scenario_stats); // Verify final state consistency let final_directories = state.db.list_webdav_directories(user_id).await .expect("Failed to list final directories"); // Should have at least the original directories, possibly more from concurrent updates assert!(final_directories.len() >= 3, "Should have at least 3 directories, got {}", final_directories.len()); // Check that some directories were updated by successful evaluations let updated_dirs = final_directories.iter() .filter(|d| !d.directory_etag.starts_with("old-etag")) .count(); println!("Updated directories: {}/{}", updated_dirs, final_directories.len()); // At least some directories should have been updated (unless all operations failed) // This depends on the specific timing of concurrent operations } /// Test smart sync full deep scan vs targeted scan concurrency #[tokio::test] async fn test_concurrent_smart_sync_strategies() { let (_test_context, state, user_id) = create_smart_sync_test_state().await; let smart_sync_service = SmartSyncService::new(state.clone()); // Create extensive directory structure let directories = vec![ ("/project", "proj-etag-1"), ("/project/docs", "docs-etag-1"), ("/project/src", "src-etag-1"), ("/project/tests", "tests-etag-1"), ("/project/docs/api", "api-etag-1"), ("/project/docs/user", "user-etag-1"), ("/archive", "arch-etag-1"), ("/archive/2023", "2023-etag-1"), ("/archive/2024", "2024-etag-1"), ]; for (path, etag) in directories { let directory = CreateWebDAVDirectory { user_id, directory_path: path.to_string(), directory_etag: etag.to_string(), file_count: 5, total_size_bytes: 1024, }; state.db.create_or_update_webdav_directory(&directory).await .expect("Failed to create directory"); } // Test concurrent operations with different strategies let strategy_operations = vec![ (SmartSyncStrategy::FullDeepScan, "/project", "full_scan_1"), (SmartSyncStrategy::TargetedScan(vec!["/project/docs".to_string()]), "/project", "targeted_1"), (SmartSyncStrategy::FullDeepScan, "/archive", "full_scan_2"), (SmartSyncStrategy::TargetedScan(vec!["/archive/2023".to_string(), "/archive/2024".to_string()]), "/archive", "targeted_2"), (SmartSyncStrategy::FullDeepScan, "/project", "full_scan_3"), // Overlapping with targeted_1 ]; let concurrent_strategy_tests = strategy_operations.into_iter().enumerate().map(|(i, (strategy, base_path, test_name))| { let smart_sync_clone = smart_sync_service.clone(); let user_id = user_id; let base_path = base_path.to_string(); let test_name = test_name.to_string(); tokio::spawn(async move { println!("Starting strategy test {} ({}) for {}", i, test_name, base_path); // Simulate what perform_smart_sync would do for each strategy let result: Result = match strategy { SmartSyncStrategy::FullDeepScan => { // Simulate full deep scan - update all directories under base_path let all_dirs = smart_sync_clone.state().db.list_webdav_directories(user_id).await?; let relevant_dirs: Vec<_> = all_dirs.into_iter() .filter(|d| d.directory_path.starts_with(&base_path)) .collect(); let mut update_count = 0; for dir in relevant_dirs { let updated_dir = CreateWebDAVDirectory { user_id, directory_path: dir.directory_path, directory_etag: format!("{}-updated-by-{}", dir.directory_etag, test_name), file_count: dir.file_count + 1, total_size_bytes: dir.total_size_bytes + 100, }; if smart_sync_clone.state().db.create_or_update_webdav_directory(&updated_dir).await.is_ok() { update_count += 1; } } Ok(update_count) } SmartSyncStrategy::TargetedScan(target_dirs) => { // Simulate targeted scan - only update specific directories let mut update_count = 0; for target_dir in target_dirs { let updated_dir = CreateWebDAVDirectory { user_id, directory_path: target_dir.clone(), directory_etag: format!("targeted-etag-{}", test_name), file_count: 10, total_size_bytes: 2048, }; if smart_sync_clone.state().db.create_or_update_webdav_directory(&updated_dir).await.is_ok() { update_count += 1; } } Ok(update_count) } }; println!("Completed strategy test {} ({})", i, test_name); Result::<_, anyhow::Error>::Ok((i, test_name, result)) }) }); // Wait for all strategy tests let strategy_results: Vec<_> = join_all(concurrent_strategy_tests).await; // Analyze strategy execution results let mut total_updates = 0; for result in strategy_results { assert!(result.is_ok(), "Strategy test task should complete"); let strategy_result = result.unwrap(); assert!(strategy_result.is_ok(), "Strategy test should not panic"); let (task_id, test_name, update_result) = strategy_result.unwrap(); match update_result { Ok(update_count) => { println!("Strategy test {}: {} updated {} directories", task_id, test_name, update_count); total_updates += update_count; } Err(e) => { println!("Strategy test {}: {} failed: {}", task_id, test_name, e); } } } println!("Total directory updates across all strategies: {}", total_updates); // Verify final state let final_directories = state.db.list_webdav_directories(user_id).await .expect("Failed to list final directories"); // Should still have all directories assert_eq!(final_directories.len(), 9, "Should have all 9 directories"); // Check for evidence of different strategy executions let full_scan_updates = final_directories.iter() .filter(|d| d.directory_etag.contains("updated-by-full_scan")) .count(); let targeted_updates = final_directories.iter() .filter(|d| d.directory_etag.contains("targeted-etag")) .count(); println!("Full scan updates: {}, Targeted updates: {}", full_scan_updates, targeted_updates); // At least some updates should have occurred assert!(total_updates > 0, "At least some strategy operations should have updated directories"); } /// Test smart sync progress tracking under concurrent operations #[tokio::test] async fn test_concurrent_smart_sync_progress_tracking() { let (_test_context, state, user_id) = create_smart_sync_test_state().await; let smart_sync_service = SmartSyncService::new(state.clone()); // Create multiple progress trackers for concurrent operations let progress_operations = (0..5).map(|i| { let smart_sync_clone = smart_sync_service.clone(); let user_id = user_id; tokio::spawn(async move { let progress = SyncProgress::new(); progress.set_phase(SyncPhase::Initializing); println!("Progress operation {} starting", i); // Simulate smart sync operation with progress tracking progress.set_phase(SyncPhase::Evaluating); progress.set_current_directory(&format!("/operation-{}", i)); // Simulate database operations sleep(Duration::from_millis(50)).await; progress.set_phase(SyncPhase::DiscoveringDirectories); progress.set_current_directory(&format!("/operation-{}/subdir", i)); // Simulate discovery delay sleep(Duration::from_millis(100)).await; progress.set_phase(SyncPhase::SavingMetadata); // Update directory (simulates saving discovered metadata) let directory = CreateWebDAVDirectory { user_id, directory_path: format!("/operation-{}", i), directory_etag: format!("progress-etag-{}", i), file_count: i as i64, total_size_bytes: (i as i64) * 1024, }; let db_result = smart_sync_clone.state().db.create_or_update_webdav_directory(&directory).await; if db_result.is_ok() { progress.set_phase(SyncPhase::Completed); } else { progress.set_phase(SyncPhase::Failed("Database update failed".to_string())); } // Get final progress stats let stats = progress.get_stats(); println!("Progress operation {} completed", i); (i, db_result.is_ok(), stats) }) }); // Wait for all progress operations let progress_results: Vec<_> = join_all(progress_operations).await; // Verify progress tracking results let mut successful_operations = 0; for result in progress_results { assert!(result.is_ok(), "Progress tracking task should complete"); let (operation_id, db_success, stats) = result.unwrap(); if db_success { successful_operations += 1; } if let Some(stats) = stats { println!("Operation {}: Success: {}, Elapsed: {:?}, Errors: {:?}", operation_id, db_success, stats.elapsed_time, stats.errors); } } println!("Successful progress operations: {}/5", successful_operations); // Verify created directories let final_directories = state.db.list_webdav_directories(user_id).await .expect("Failed to list directories"); let progress_dirs = final_directories.iter() .filter(|d| d.directory_etag.starts_with("progress-etag-")) .count(); assert_eq!(progress_dirs, successful_operations, "Number of progress directories should match successful operations"); // All operations should have completed (successfully or not) assert!(successful_operations > 0, "At least some operations should succeed"); } /// Test concurrent smart sync operations with simulated ETag conflicts #[tokio::test] async fn test_concurrent_smart_sync_etag_conflicts() { let (_test_context, state, user_id) = create_smart_sync_test_state().await; let smart_sync_service = SmartSyncService::new(state.clone()); // Create a shared directory that multiple operations will try to update let shared_directory = CreateWebDAVDirectory { user_id, directory_path: "/shared".to_string(), directory_etag: "initial-shared-etag".to_string(), file_count: 10, total_size_bytes: 2048, }; state.db.create_or_update_webdav_directory(&shared_directory).await .expect("Failed to create shared directory"); // Create concurrent operations that all try to update the same directory let etag_conflict_operations = (0..10).map(|i| { let smart_sync_clone = smart_sync_service.clone(); let user_id = user_id; tokio::spawn(async move { println!("ETag conflict operation {} starting", i); // First, read the current directory state let current_dirs = smart_sync_clone.state().db.list_webdav_directories(user_id).await?; let shared_dir = current_dirs.iter() .find(|d| d.directory_path == "/shared") .ok_or_else(|| anyhow::anyhow!("Shared directory not found"))?; // Simulate discovery finding changes (each operation thinks it found different changes) sleep(Duration::from_millis((i % 5) * 20)).await; // Variable delay to create race conditions // Try to update with a new ETag (simulating discovered changes) let updated_directory = CreateWebDAVDirectory { user_id, directory_path: "/shared".to_string(), directory_etag: format!("conflict-etag-operation-{}", i), file_count: shared_dir.file_count + i as i64, total_size_bytes: shared_dir.total_size_bytes + (i as i64 * 100), }; let update_result = smart_sync_clone.state().db.create_or_update_webdav_directory(&updated_directory).await; println!("ETag conflict operation {} completed: {:?}", i, update_result.is_ok()); Result::<_, anyhow::Error>::Ok((i, update_result.is_ok())) }) }); // Wait for all conflict operations let conflict_results: Vec<_> = join_all(etag_conflict_operations).await; // Analyze conflict resolution let mut successful_updates = 0; for result in conflict_results { assert!(result.is_ok(), "ETag conflict task should complete"); let operation_result = result.unwrap(); assert!(operation_result.is_ok(), "ETag conflict operation should not panic"); let (operation_id, update_success) = operation_result.unwrap(); if update_success { successful_updates += 1; } println!("ETag conflict operation {}: {}", operation_id, update_success); } println!("Successful ETag updates: {}/10", successful_updates); // Verify final state let final_directories = state.db.list_webdav_directories(user_id).await .expect("Failed to list final directories"); assert_eq!(final_directories.len(), 1, "Should have exactly one shared directory"); let final_shared_dir = &final_directories[0]; assert_eq!(final_shared_dir.directory_path, "/shared"); // The final ETag should be from one of the operations assert!(final_shared_dir.directory_etag.contains("conflict-etag-operation-") || final_shared_dir.directory_etag == "initial-shared-etag", "Final ETag should be from one of the operations: {}", final_shared_dir.directory_etag); // File count should have been updated by the successful operation if successful_updates > 0 { assert!(final_shared_dir.file_count >= 10, "File count should have been updated: {}", final_shared_dir.file_count); } // All operations should have succeeded (database should handle concurrency gracefully) assert!(successful_updates > 0, "At least some ETag updates should succeed"); }