723 lines
31 KiB
Rust
723 lines
31 KiB
Rust
use std::{sync::Arc, time::Duration, collections::HashMap};
|
|
use tokio::time::sleep;
|
|
use uuid::Uuid;
|
|
use futures::future::join_all;
|
|
use anyhow::Result;
|
|
use readur::{
|
|
AppState,
|
|
models::{CreateWebDAVDirectory, FileIngestionInfo},
|
|
test_utils::{TestContext, TestAuthHelper},
|
|
services::webdav::{
|
|
SmartSyncService,
|
|
SmartSyncStrategy,
|
|
SyncProgress,
|
|
SyncPhase,
|
|
WebDAVDiscoveryResult,
|
|
},
|
|
};
|
|
|
|
/// Helper function to create test setup
|
|
async fn create_smart_sync_test_state() -> (TestContext, Arc<AppState>, Uuid) {
|
|
let test_context = TestContext::new().await;
|
|
|
|
let auth_helper = TestAuthHelper::new(test_context.app().clone());
|
|
let test_user = auth_helper.create_test_user().await;
|
|
|
|
let state = test_context.state().clone();
|
|
let user_id = test_user.user_response.id;
|
|
|
|
(test_context, state, user_id)
|
|
}
|
|
|
|
/// Mock WebDAV service for testing smart sync behavior
|
|
#[derive(Clone)]
|
|
struct MockWebDAVServiceForSmartSync {
|
|
discovery_results: Arc<std::sync::Mutex<HashMap<String, WebDAVDiscoveryResult>>>,
|
|
call_count: Arc<std::sync::Mutex<u32>>,
|
|
delay_ms: u64,
|
|
should_fail: bool,
|
|
}
|
|
|
|
impl MockWebDAVServiceForSmartSync {
|
|
fn new(delay_ms: u64, should_fail: bool) -> Self {
|
|
Self {
|
|
discovery_results: Arc::new(std::sync::Mutex::new(HashMap::new())),
|
|
call_count: Arc::new(std::sync::Mutex::new(0)),
|
|
delay_ms,
|
|
should_fail,
|
|
}
|
|
}
|
|
|
|
fn set_discovery_result(&self, path: &str, result: WebDAVDiscoveryResult) {
|
|
let mut results = self.discovery_results.lock().unwrap();
|
|
results.insert(path.to_string(), result);
|
|
}
|
|
|
|
async fn mock_discover_files_and_directories(
|
|
&self,
|
|
directory_path: &str,
|
|
_recursive: bool,
|
|
) -> Result<WebDAVDiscoveryResult> {
|
|
// Increment call count
|
|
{
|
|
let mut count = self.call_count.lock().unwrap();
|
|
*count += 1;
|
|
}
|
|
|
|
// Simulate network delay
|
|
sleep(Duration::from_millis(self.delay_ms)).await;
|
|
|
|
if self.should_fail {
|
|
return Err(anyhow::anyhow!("Mock WebDAV discovery failed for {}", directory_path));
|
|
}
|
|
|
|
// Return preset result or generate default
|
|
let results = self.discovery_results.lock().unwrap();
|
|
if let Some(result) = results.get(directory_path) {
|
|
Ok(result.clone())
|
|
} else {
|
|
// Generate default result
|
|
Ok(WebDAVDiscoveryResult {
|
|
files: vec![
|
|
FileIngestionInfo {
|
|
name: "default.pdf".to_string(),
|
|
relative_path: format!("{}/default.pdf", directory_path),
|
|
full_path: format!("{}/default.pdf", directory_path),
|
|
path: format!("{}/default.pdf", directory_path),
|
|
size: 1024,
|
|
mime_type: "application/pdf".to_string(),
|
|
last_modified: Some(chrono::Utc::now()),
|
|
etag: format!("default-etag-{}", directory_path.replace('/', "-")),
|
|
is_directory: false,
|
|
created_at: None,
|
|
permissions: None,
|
|
owner: None,
|
|
group: None,
|
|
metadata: None,
|
|
}
|
|
],
|
|
directories: vec![
|
|
FileIngestionInfo {
|
|
name: "subdir".to_string(),
|
|
relative_path: format!("{}/subdir", directory_path),
|
|
full_path: format!("{}/subdir", directory_path),
|
|
path: format!("{}/subdir", directory_path),
|
|
size: 0,
|
|
mime_type: "inode/directory".to_string(),
|
|
last_modified: Some(chrono::Utc::now()),
|
|
etag: format!("dir-etag-{}", directory_path.replace('/', "-")),
|
|
is_directory: true,
|
|
created_at: None,
|
|
permissions: None,
|
|
owner: None,
|
|
group: None,
|
|
metadata: None,
|
|
}
|
|
],
|
|
})
|
|
}
|
|
}
|
|
|
|
fn get_call_count(&self) -> u32 {
|
|
*self.call_count.lock().unwrap()
|
|
}
|
|
}
|
|
|
|
/// Test concurrent smart sync evaluations with different ETag scenarios
|
|
#[tokio::test]
|
|
async fn test_concurrent_smart_sync_etag_evaluation() {
|
|
let (_test_context, state, user_id) = create_smart_sync_test_state().await;
|
|
|
|
let smart_sync_service = SmartSyncService::new(state.clone());
|
|
|
|
// Set up initial directory state
|
|
let initial_directories = vec![
|
|
CreateWebDAVDirectory {
|
|
user_id,
|
|
directory_path: "/test".to_string(),
|
|
directory_etag: "old-etag-1".to_string(),
|
|
file_count: 5,
|
|
total_size_bytes: 1024,
|
|
},
|
|
CreateWebDAVDirectory {
|
|
user_id,
|
|
directory_path: "/test/subdir1".to_string(),
|
|
directory_etag: "old-etag-2".to_string(),
|
|
file_count: 3,
|
|
total_size_bytes: 512,
|
|
},
|
|
CreateWebDAVDirectory {
|
|
user_id,
|
|
directory_path: "/test/subdir2".to_string(),
|
|
directory_etag: "old-etag-3".to_string(),
|
|
file_count: 2,
|
|
total_size_bytes: 256,
|
|
},
|
|
];
|
|
|
|
for dir in initial_directories {
|
|
state.db.create_or_update_webdav_directory(&dir).await
|
|
.expect("Failed to create initial directory");
|
|
}
|
|
|
|
// Create mock WebDAV services simulating different discovery scenarios
|
|
let mock_services = vec![
|
|
(MockWebDAVServiceForSmartSync::new(50, false), "unchanged"), // ETags unchanged
|
|
(MockWebDAVServiceForSmartSync::new(75, false), "changed"), // ETags changed
|
|
(MockWebDAVServiceForSmartSync::new(100, false), "new_dirs"), // New directories
|
|
(MockWebDAVServiceForSmartSync::new(125, false), "mixed"), // Mixed changes
|
|
(MockWebDAVServiceForSmartSync::new(25, true), "failed"), // Network failure
|
|
];
|
|
|
|
// Configure mock responses
|
|
for (mock_service, scenario) in &mock_services {
|
|
match *scenario {
|
|
"unchanged" => {
|
|
// Return same ETags as database
|
|
mock_service.set_discovery_result("/test", WebDAVDiscoveryResult {
|
|
files: vec![],
|
|
directories: vec![
|
|
FileIngestionInfo {
|
|
name: "subdir1".to_string(),
|
|
relative_path: "/test/subdir1".to_string(),
|
|
full_path: "/test/subdir1".to_string(),
|
|
path: "/test/subdir1".to_string(),
|
|
size: 0,
|
|
mime_type: "inode/directory".to_string(),
|
|
last_modified: Some(chrono::Utc::now()),
|
|
etag: "old-etag-2".to_string(), // Same as database
|
|
is_directory: true,
|
|
created_at: None,
|
|
permissions: None,
|
|
owner: None,
|
|
group: None,
|
|
metadata: None,
|
|
},
|
|
FileIngestionInfo {
|
|
name: "subdir2".to_string(),
|
|
relative_path: "/test/subdir2".to_string(),
|
|
full_path: "/test/subdir2".to_string(),
|
|
path: "/test/subdir2".to_string(),
|
|
size: 0,
|
|
mime_type: "inode/directory".to_string(),
|
|
last_modified: Some(chrono::Utc::now()),
|
|
etag: "old-etag-3".to_string(), // Same as database
|
|
is_directory: true,
|
|
created_at: None,
|
|
permissions: None,
|
|
owner: None,
|
|
group: None,
|
|
metadata: None,
|
|
},
|
|
],
|
|
});
|
|
}
|
|
"changed" => {
|
|
// Return different ETags
|
|
mock_service.set_discovery_result("/test", WebDAVDiscoveryResult {
|
|
files: vec![],
|
|
directories: vec![
|
|
FileIngestionInfo {
|
|
name: "subdir1".to_string(),
|
|
relative_path: "/test/subdir1".to_string(),
|
|
full_path: "/test/subdir1".to_string(),
|
|
path: "/test/subdir1".to_string(),
|
|
size: 0,
|
|
mime_type: "inode/directory".to_string(),
|
|
last_modified: Some(chrono::Utc::now()),
|
|
etag: "new-etag-2".to_string(), // Changed
|
|
is_directory: true,
|
|
created_at: None,
|
|
permissions: None,
|
|
owner: None,
|
|
group: None,
|
|
metadata: None,
|
|
},
|
|
],
|
|
});
|
|
}
|
|
"new_dirs" => {
|
|
// Return new directories
|
|
mock_service.set_discovery_result("/test", WebDAVDiscoveryResult {
|
|
files: vec![],
|
|
directories: vec![
|
|
FileIngestionInfo {
|
|
name: "new_subdir".to_string(),
|
|
relative_path: "/test/new_subdir".to_string(),
|
|
full_path: "/test/new_subdir".to_string(),
|
|
path: "/test/new_subdir".to_string(),
|
|
size: 0,
|
|
mime_type: "inode/directory".to_string(),
|
|
last_modified: Some(chrono::Utc::now()),
|
|
etag: "new-dir-etag".to_string(),
|
|
is_directory: true,
|
|
created_at: None,
|
|
permissions: None,
|
|
owner: None,
|
|
group: None,
|
|
metadata: None,
|
|
},
|
|
],
|
|
});
|
|
}
|
|
"mixed" => {
|
|
// Mix of changed and new
|
|
mock_service.set_discovery_result("/test", WebDAVDiscoveryResult {
|
|
files: vec![],
|
|
directories: vec![
|
|
FileIngestionInfo {
|
|
name: "subdir1".to_string(),
|
|
relative_path: "/test/subdir1".to_string(),
|
|
full_path: "/test/subdir1".to_string(),
|
|
path: "/test/subdir1".to_string(),
|
|
size: 0,
|
|
mime_type: "inode/directory".to_string(),
|
|
last_modified: Some(chrono::Utc::now()),
|
|
etag: "updated-etag-2".to_string(), // Changed
|
|
is_directory: true,
|
|
created_at: None,
|
|
permissions: None,
|
|
owner: None,
|
|
group: None,
|
|
metadata: None,
|
|
},
|
|
FileIngestionInfo {
|
|
name: "another_new_dir".to_string(),
|
|
relative_path: "/test/another_new_dir".to_string(),
|
|
full_path: "/test/another_new_dir".to_string(),
|
|
path: "/test/another_new_dir".to_string(),
|
|
size: 0,
|
|
mime_type: "inode/directory".to_string(),
|
|
last_modified: Some(chrono::Utc::now()),
|
|
etag: "another-new-etag".to_string(), // New
|
|
is_directory: true,
|
|
created_at: None,
|
|
permissions: None,
|
|
owner: None,
|
|
group: None,
|
|
metadata: None,
|
|
},
|
|
],
|
|
});
|
|
}
|
|
_ => {} // Failed case doesn't need setup
|
|
}
|
|
}
|
|
|
|
// Run concurrent smart sync evaluations
|
|
let concurrent_evaluations = mock_services.into_iter().enumerate().map(|(i, (mock_service, scenario))| {
|
|
let smart_sync_clone = smart_sync_service.clone();
|
|
let user_id = user_id;
|
|
let scenario = scenario.to_string();
|
|
tokio::spawn(async move {
|
|
println!("Starting smart sync evaluation {} ({})", i, scenario);
|
|
|
|
// Since we can't directly inject the mock into SmartSyncService,
|
|
// we'll simulate the evaluation logic by calling the database methods
|
|
// that SmartSyncService would call
|
|
|
|
// 1. Get known directories (what SmartSyncService.evaluate_sync_need does)
|
|
let known_dirs_result = smart_sync_clone.state().db.list_webdav_directories(user_id).await;
|
|
|
|
// 2. Simulate discovery with delay (mock WebDAV call)
|
|
let discovery_result = mock_service.mock_discover_files_and_directories("/test", false).await;
|
|
|
|
// 3. If discovery succeeded, update directory ETags (what perform_smart_sync would do)
|
|
let update_results = if let Ok(discovery) = &discovery_result {
|
|
let mut results = Vec::new();
|
|
for dir_info in &discovery.directories {
|
|
let update_dir = CreateWebDAVDirectory {
|
|
user_id,
|
|
directory_path: dir_info.relative_path.clone(),
|
|
directory_etag: dir_info.etag.clone(),
|
|
file_count: 0,
|
|
total_size_bytes: 0,
|
|
};
|
|
let result = smart_sync_clone.state().db.create_or_update_webdav_directory(&update_dir).await;
|
|
results.push(result.is_ok());
|
|
}
|
|
results
|
|
} else {
|
|
vec![]
|
|
};
|
|
|
|
println!("Completed smart sync evaluation {} ({})", i, scenario);
|
|
(i, scenario, known_dirs_result.is_ok(), discovery_result.is_ok(), update_results)
|
|
})
|
|
});
|
|
|
|
// Wait for all evaluations
|
|
let evaluation_results: Vec<_> = join_all(concurrent_evaluations).await;
|
|
|
|
// Analyze results
|
|
let mut scenario_stats = HashMap::new();
|
|
for result in evaluation_results {
|
|
assert!(result.is_ok(), "Evaluation task should complete without panicking");
|
|
let (task_id, scenario, db_read_ok, discovery_ok, updates) = result.unwrap();
|
|
|
|
*scenario_stats.entry(scenario.clone()).or_insert(0) += 1;
|
|
|
|
println!("Evaluation {}: {} - DB read: {}, Discovery: {}, Updates: {:?}",
|
|
task_id, scenario, db_read_ok, discovery_ok, updates);
|
|
|
|
// Database reads should always succeed
|
|
assert!(db_read_ok, "Database read should succeed for evaluation {}", task_id);
|
|
|
|
// Discovery should succeed unless it's the "failed" scenario
|
|
if scenario != "failed" {
|
|
assert!(discovery_ok, "Discovery should succeed for evaluation {} ({})", task_id, scenario);
|
|
}
|
|
}
|
|
|
|
println!("Scenario statistics: {:?}", scenario_stats);
|
|
|
|
// Verify final state consistency
|
|
let final_directories = state.db.list_webdav_directories(user_id).await
|
|
.expect("Failed to list final directories");
|
|
|
|
// Should have at least the original directories, possibly more from concurrent updates
|
|
assert!(final_directories.len() >= 3,
|
|
"Should have at least 3 directories, got {}", final_directories.len());
|
|
|
|
// Check that some directories were updated by successful evaluations
|
|
let updated_dirs = final_directories.iter()
|
|
.filter(|d| !d.directory_etag.starts_with("old-etag"))
|
|
.count();
|
|
|
|
println!("Updated directories: {}/{}", updated_dirs, final_directories.len());
|
|
|
|
// At least some directories should have been updated (unless all operations failed)
|
|
// This depends on the specific timing of concurrent operations
|
|
}
|
|
|
|
/// Test smart sync full deep scan vs targeted scan concurrency
|
|
#[tokio::test]
|
|
async fn test_concurrent_smart_sync_strategies() {
|
|
let (_test_context, state, user_id) = create_smart_sync_test_state().await;
|
|
|
|
let smart_sync_service = SmartSyncService::new(state.clone());
|
|
|
|
// Create extensive directory structure
|
|
let directories = vec![
|
|
("/project", "proj-etag-1"),
|
|
("/project/docs", "docs-etag-1"),
|
|
("/project/src", "src-etag-1"),
|
|
("/project/tests", "tests-etag-1"),
|
|
("/project/docs/api", "api-etag-1"),
|
|
("/project/docs/user", "user-etag-1"),
|
|
("/archive", "arch-etag-1"),
|
|
("/archive/2023", "2023-etag-1"),
|
|
("/archive/2024", "2024-etag-1"),
|
|
];
|
|
|
|
for (path, etag) in directories {
|
|
let directory = CreateWebDAVDirectory {
|
|
user_id,
|
|
directory_path: path.to_string(),
|
|
directory_etag: etag.to_string(),
|
|
file_count: 5,
|
|
total_size_bytes: 1024,
|
|
};
|
|
state.db.create_or_update_webdav_directory(&directory).await
|
|
.expect("Failed to create directory");
|
|
}
|
|
|
|
// Test concurrent operations with different strategies
|
|
let strategy_operations = vec![
|
|
(SmartSyncStrategy::FullDeepScan, "/project", "full_scan_1"),
|
|
(SmartSyncStrategy::TargetedScan(vec!["/project/docs".to_string()]), "/project", "targeted_1"),
|
|
(SmartSyncStrategy::FullDeepScan, "/archive", "full_scan_2"),
|
|
(SmartSyncStrategy::TargetedScan(vec!["/archive/2023".to_string(), "/archive/2024".to_string()]), "/archive", "targeted_2"),
|
|
(SmartSyncStrategy::FullDeepScan, "/project", "full_scan_3"), // Overlapping with targeted_1
|
|
];
|
|
|
|
let concurrent_strategy_tests = strategy_operations.into_iter().enumerate().map(|(i, (strategy, base_path, test_name))| {
|
|
let smart_sync_clone = smart_sync_service.clone();
|
|
let user_id = user_id;
|
|
let base_path = base_path.to_string();
|
|
let test_name = test_name.to_string();
|
|
tokio::spawn(async move {
|
|
println!("Starting strategy test {} ({}) for {}", i, test_name, base_path);
|
|
|
|
// Simulate what perform_smart_sync would do for each strategy
|
|
let result: Result<i32, anyhow::Error> = match strategy {
|
|
SmartSyncStrategy::FullDeepScan => {
|
|
// Simulate full deep scan - update all directories under base_path
|
|
let all_dirs = smart_sync_clone.state().db.list_webdav_directories(user_id).await?;
|
|
let relevant_dirs: Vec<_> = all_dirs.into_iter()
|
|
.filter(|d| d.directory_path.starts_with(&base_path))
|
|
.collect();
|
|
|
|
let mut update_count = 0;
|
|
for dir in relevant_dirs {
|
|
let updated_dir = CreateWebDAVDirectory {
|
|
user_id,
|
|
directory_path: dir.directory_path,
|
|
directory_etag: format!("{}-updated-by-{}", dir.directory_etag, test_name),
|
|
file_count: dir.file_count + 1,
|
|
total_size_bytes: dir.total_size_bytes + 100,
|
|
};
|
|
|
|
if smart_sync_clone.state().db.create_or_update_webdav_directory(&updated_dir).await.is_ok() {
|
|
update_count += 1;
|
|
}
|
|
}
|
|
Ok(update_count)
|
|
}
|
|
SmartSyncStrategy::TargetedScan(target_dirs) => {
|
|
// Simulate targeted scan - only update specific directories
|
|
let mut update_count = 0;
|
|
for target_dir in target_dirs {
|
|
let updated_dir = CreateWebDAVDirectory {
|
|
user_id,
|
|
directory_path: target_dir.clone(),
|
|
directory_etag: format!("targeted-etag-{}", test_name),
|
|
file_count: 10,
|
|
total_size_bytes: 2048,
|
|
};
|
|
|
|
if smart_sync_clone.state().db.create_or_update_webdav_directory(&updated_dir).await.is_ok() {
|
|
update_count += 1;
|
|
}
|
|
}
|
|
Ok(update_count)
|
|
}
|
|
};
|
|
|
|
println!("Completed strategy test {} ({})", i, test_name);
|
|
Result::<_, anyhow::Error>::Ok((i, test_name, result))
|
|
})
|
|
});
|
|
|
|
// Wait for all strategy tests
|
|
let strategy_results: Vec<_> = join_all(concurrent_strategy_tests).await;
|
|
|
|
// Analyze strategy execution results
|
|
let mut total_updates = 0;
|
|
for result in strategy_results {
|
|
assert!(result.is_ok(), "Strategy test task should complete");
|
|
let strategy_result = result.unwrap();
|
|
assert!(strategy_result.is_ok(), "Strategy test should not panic");
|
|
|
|
let (task_id, test_name, update_result) = strategy_result.unwrap();
|
|
match update_result {
|
|
Ok(update_count) => {
|
|
println!("Strategy test {}: {} updated {} directories", task_id, test_name, update_count);
|
|
total_updates += update_count;
|
|
}
|
|
Err(e) => {
|
|
println!("Strategy test {}: {} failed: {}", task_id, test_name, e);
|
|
}
|
|
}
|
|
}
|
|
|
|
println!("Total directory updates across all strategies: {}", total_updates);
|
|
|
|
// Verify final state
|
|
let final_directories = state.db.list_webdav_directories(user_id).await
|
|
.expect("Failed to list final directories");
|
|
|
|
// Should still have all directories
|
|
assert_eq!(final_directories.len(), 9, "Should have all 9 directories");
|
|
|
|
// Check for evidence of different strategy executions
|
|
let full_scan_updates = final_directories.iter()
|
|
.filter(|d| d.directory_etag.contains("updated-by-full_scan"))
|
|
.count();
|
|
|
|
let targeted_updates = final_directories.iter()
|
|
.filter(|d| d.directory_etag.contains("targeted-etag"))
|
|
.count();
|
|
|
|
println!("Full scan updates: {}, Targeted updates: {}", full_scan_updates, targeted_updates);
|
|
|
|
// At least some updates should have occurred
|
|
assert!(total_updates > 0, "At least some strategy operations should have updated directories");
|
|
}
|
|
|
|
/// Test smart sync progress tracking under concurrent operations
|
|
#[tokio::test]
|
|
async fn test_concurrent_smart_sync_progress_tracking() {
|
|
let (_test_context, state, user_id) = create_smart_sync_test_state().await;
|
|
|
|
let smart_sync_service = SmartSyncService::new(state.clone());
|
|
|
|
// Create multiple progress trackers for concurrent operations
|
|
let progress_operations = (0..5).map(|i| {
|
|
let smart_sync_clone = smart_sync_service.clone();
|
|
let user_id = user_id;
|
|
tokio::spawn(async move {
|
|
let progress = SyncProgress::new();
|
|
progress.set_phase(SyncPhase::Initializing);
|
|
|
|
println!("Progress operation {} starting", i);
|
|
|
|
// Simulate smart sync operation with progress tracking
|
|
progress.set_phase(SyncPhase::Evaluating);
|
|
progress.set_current_directory(&format!("/operation-{}", i));
|
|
|
|
// Simulate database operations
|
|
sleep(Duration::from_millis(50)).await;
|
|
|
|
progress.set_phase(SyncPhase::DiscoveringDirectories);
|
|
progress.set_current_directory(&format!("/operation-{}/subdir", i));
|
|
|
|
// Simulate discovery delay
|
|
sleep(Duration::from_millis(100)).await;
|
|
|
|
progress.set_phase(SyncPhase::SavingMetadata);
|
|
|
|
// Update directory (simulates saving discovered metadata)
|
|
let directory = CreateWebDAVDirectory {
|
|
user_id,
|
|
directory_path: format!("/operation-{}", i),
|
|
directory_etag: format!("progress-etag-{}", i),
|
|
file_count: i as i64,
|
|
total_size_bytes: (i as i64) * 1024,
|
|
};
|
|
|
|
let db_result = smart_sync_clone.state().db.create_or_update_webdav_directory(&directory).await;
|
|
|
|
if db_result.is_ok() {
|
|
progress.set_phase(SyncPhase::Completed);
|
|
} else {
|
|
progress.set_phase(SyncPhase::Failed("Database update failed".to_string()));
|
|
}
|
|
|
|
// Get final progress stats
|
|
let stats = progress.get_stats();
|
|
|
|
println!("Progress operation {} completed", i);
|
|
(i, db_result.is_ok(), stats)
|
|
})
|
|
});
|
|
|
|
// Wait for all progress operations
|
|
let progress_results: Vec<_> = join_all(progress_operations).await;
|
|
|
|
// Verify progress tracking results
|
|
let mut successful_operations = 0;
|
|
for result in progress_results {
|
|
assert!(result.is_ok(), "Progress tracking task should complete");
|
|
let (operation_id, db_success, stats) = result.unwrap();
|
|
|
|
if db_success {
|
|
successful_operations += 1;
|
|
}
|
|
|
|
if let Some(stats) = stats {
|
|
println!("Operation {}: Success: {}, Elapsed: {:?}, Errors: {:?}",
|
|
operation_id, db_success, stats.elapsed_time, stats.errors);
|
|
}
|
|
}
|
|
|
|
println!("Successful progress operations: {}/5", successful_operations);
|
|
|
|
// Verify created directories
|
|
let final_directories = state.db.list_webdav_directories(user_id).await
|
|
.expect("Failed to list directories");
|
|
|
|
let progress_dirs = final_directories.iter()
|
|
.filter(|d| d.directory_etag.starts_with("progress-etag-"))
|
|
.count();
|
|
|
|
assert_eq!(progress_dirs, successful_operations,
|
|
"Number of progress directories should match successful operations");
|
|
|
|
// All operations should have completed (successfully or not)
|
|
assert!(successful_operations > 0, "At least some operations should succeed");
|
|
}
|
|
|
|
/// Test concurrent smart sync operations with simulated ETag conflicts
|
|
#[tokio::test]
|
|
async fn test_concurrent_smart_sync_etag_conflicts() {
|
|
let (_test_context, state, user_id) = create_smart_sync_test_state().await;
|
|
|
|
let smart_sync_service = SmartSyncService::new(state.clone());
|
|
|
|
// Create a shared directory that multiple operations will try to update
|
|
let shared_directory = CreateWebDAVDirectory {
|
|
user_id,
|
|
directory_path: "/shared".to_string(),
|
|
directory_etag: "initial-shared-etag".to_string(),
|
|
file_count: 10,
|
|
total_size_bytes: 2048,
|
|
};
|
|
|
|
state.db.create_or_update_webdav_directory(&shared_directory).await
|
|
.expect("Failed to create shared directory");
|
|
|
|
// Create concurrent operations that all try to update the same directory
|
|
let etag_conflict_operations = (0..10).map(|i| {
|
|
let smart_sync_clone = smart_sync_service.clone();
|
|
let user_id = user_id;
|
|
tokio::spawn(async move {
|
|
println!("ETag conflict operation {} starting", i);
|
|
|
|
// First, read the current directory state
|
|
let current_dirs = smart_sync_clone.state().db.list_webdav_directories(user_id).await?;
|
|
let shared_dir = current_dirs.iter()
|
|
.find(|d| d.directory_path == "/shared")
|
|
.ok_or_else(|| anyhow::anyhow!("Shared directory not found"))?;
|
|
|
|
// Simulate discovery finding changes (each operation thinks it found different changes)
|
|
sleep(Duration::from_millis((i % 5) * 20)).await; // Variable delay to create race conditions
|
|
|
|
// Try to update with a new ETag (simulating discovered changes)
|
|
let updated_directory = CreateWebDAVDirectory {
|
|
user_id,
|
|
directory_path: "/shared".to_string(),
|
|
directory_etag: format!("conflict-etag-operation-{}", i),
|
|
file_count: shared_dir.file_count + i as i64,
|
|
total_size_bytes: shared_dir.total_size_bytes + (i as i64 * 100),
|
|
};
|
|
|
|
let update_result = smart_sync_clone.state().db.create_or_update_webdav_directory(&updated_directory).await;
|
|
|
|
println!("ETag conflict operation {} completed: {:?}", i, update_result.is_ok());
|
|
Result::<_, anyhow::Error>::Ok((i, update_result.is_ok()))
|
|
})
|
|
});
|
|
|
|
// Wait for all conflict operations
|
|
let conflict_results: Vec<_> = join_all(etag_conflict_operations).await;
|
|
|
|
// Analyze conflict resolution
|
|
let mut successful_updates = 0;
|
|
for result in conflict_results {
|
|
assert!(result.is_ok(), "ETag conflict task should complete");
|
|
let operation_result = result.unwrap();
|
|
assert!(operation_result.is_ok(), "ETag conflict operation should not panic");
|
|
|
|
let (operation_id, update_success) = operation_result.unwrap();
|
|
if update_success {
|
|
successful_updates += 1;
|
|
}
|
|
println!("ETag conflict operation {}: {}", operation_id, update_success);
|
|
}
|
|
|
|
println!("Successful ETag updates: {}/10", successful_updates);
|
|
|
|
// Verify final state
|
|
let final_directories = state.db.list_webdav_directories(user_id).await
|
|
.expect("Failed to list final directories");
|
|
|
|
assert_eq!(final_directories.len(), 1, "Should have exactly one shared directory");
|
|
|
|
let final_shared_dir = &final_directories[0];
|
|
assert_eq!(final_shared_dir.directory_path, "/shared");
|
|
|
|
// The final ETag should be from one of the operations
|
|
assert!(final_shared_dir.directory_etag.contains("conflict-etag-operation-") ||
|
|
final_shared_dir.directory_etag == "initial-shared-etag",
|
|
"Final ETag should be from one of the operations: {}", final_shared_dir.directory_etag);
|
|
|
|
// File count should have been updated by the successful operation
|
|
if successful_updates > 0 {
|
|
assert!(final_shared_dir.file_count >= 10,
|
|
"File count should have been updated: {}", final_shared_dir.file_count);
|
|
}
|
|
|
|
// All operations should have succeeded (database should handle concurrency gracefully)
|
|
assert!(successful_updates > 0, "At least some ETag updates should succeed");
|
|
} |