/*! * Source Sync Cancellation Workflow Integration Tests * * Comprehensive end-to-end integration tests for source sync cancellation functionality: * - Full sync cancellation workflow via API endpoints * - Cancellation during different sync phases * - Multiple cancellation request handling * - Status monitoring and transitions * - Resource cleanup verification * - Database state consistency */ use std::sync::Arc; use std::time::Duration; use uuid::Uuid; use chrono::Utc; use serde_json::json; use tokio::time::sleep; use axum::{ body::Body, extract::Path, http::{Request, StatusCode}, Router, }; use tower::ServiceExt; use readur::{ AppState, config::Config, db::Database, models::{Source, SourceType, SourceStatus, User, CreateSource, CreateUser, UserRole}, auth::Claims, }; /// Create a test app state with database and real source scheduler async fn create_test_app_state() -> Arc { let database_url = std::env::var("TEST_DATABASE_URL") .or_else(|_| std::env::var("DATABASE_URL")) .unwrap_or_else(|_| "postgresql://readur:readur@localhost:5432/readur".to_string()); let config = Config { database_url, server_address: "127.0.0.1:8080".to_string(), jwt_secret: "test_secret_for_sync_cancellation".to_string(), upload_path: "/tmp/test_uploads_sync_cancel".to_string(), watch_folder: "/tmp/watch_sync_cancel".to_string(), user_watch_base_dir: "./user_watch".to_string(), enable_per_user_watch: false, allowed_file_types: vec!["pdf".to_string(), "txt".to_string(), "jpg".to_string(), "png".to_string()], watch_interval_seconds: Some(30), file_stability_check_ms: Some(500), max_file_age_hours: Some(24), ocr_language: "eng".to_string(), concurrent_ocr_jobs: 2, ocr_timeout_seconds: 60, max_file_size_mb: 50, memory_limit_mb: 256, cpu_priority: "normal".to_string(), oidc_enabled: false, oidc_client_id: None, oidc_client_secret: None, oidc_issuer_url: None, oidc_redirect_uri: None, s3_enabled: false, s3_config: None, }; let db = Database::new(&config.database_url).await.unwrap(); // Create file service let storage_config = readur::storage::StorageConfig::Local { upload_path: config.upload_path.clone() }; let storage_backend = readur::storage::factory::create_storage_backend(storage_config).await.unwrap(); let file_service = Arc::new(readur::services::file_service::FileService::with_storage(config.upload_path.clone(), storage_backend)); let queue_service = Arc::new(readur::ocr::queue::OcrQueueService::new( db.clone(), db.pool.clone(), 2, file_service.clone(), )); let sync_progress_tracker = Arc::new(readur::services::sync_progress_tracker::SyncProgressTracker::new()); // Create initial app state let app_state = AppState { db: db.clone(), config, file_service: file_service.clone(), webdav_scheduler: None, source_scheduler: None, queue_service, oidc_client: None, sync_progress_tracker, user_watch_service: None, }; // Wrap in Arc for sharing let state_arc = Arc::new(app_state); // Create the real source scheduler let source_scheduler = Arc::new(readur::scheduling::source_scheduler::SourceScheduler::new(state_arc.clone())); // Now we need to update the AppState with the scheduler // Since AppState is already wrapped in Arc, we need to use a different approach // Let's create a new AppState with the scheduler Arc::new(AppState { db: state_arc.db.clone(), config: state_arc.config.clone(), file_service: state_arc.file_service.clone(), webdav_scheduler: None, source_scheduler: Some(source_scheduler), queue_service: state_arc.queue_service.clone(), oidc_client: None, sync_progress_tracker: state_arc.sync_progress_tracker.clone(), user_watch_service: None, }) } /// Create a test user for sync cancellation tests async fn create_test_user(state: &AppState) -> User { let user_id = Uuid::new_v4(); let create_user = CreateUser { username: format!("testuser_sync_cancel_{}", user_id), email: format!("testuser_sync_cancel_{}@example.com", user_id), password: "test_password".to_string(), role: Some(UserRole::Admin), }; state.db.create_user(create_user).await.unwrap() } /// Create a test WebDAV source for cancellation testing /// Uses a non-existent server so sync will fail, but we can test the cancellation workflow async fn create_test_webdav_source(state: &AppState, user_id: Uuid, name: &str) -> Source { let create_source = CreateSource { name: name.to_string(), source_type: SourceType::WebDAV, enabled: Some(true), config: json!({ "server_url": "https://test-webdav-server-for-cancellation-testing.example.com/remote.php/webdav", "username": "test_user", "password": "test_password", "watch_folders": ["/TestFolder"], "file_extensions": [".pdf", ".txt", ".jpg", ".png"], "auto_sync": false, "sync_interval_minutes": 60, "server_type": "nextcloud" }), }; state.db.create_source(user_id, &create_source).await.unwrap() } /// Wait for a source to reach a specific status with timeout async fn wait_for_source_status( state: &AppState, user_id: Uuid, source_id: Uuid, expected_status: SourceStatus, timeout_ms: u64 ) -> bool { let start_time = std::time::Instant::now(); let timeout_duration = Duration::from_millis(timeout_ms); while start_time.elapsed() < timeout_duration { if let Ok(Some(source)) = state.db.get_source(user_id, source_id).await { if source.status == expected_status { return true; } } sleep(Duration::from_millis(100)).await; } false } /// Wait for sync to actually start (status becomes Syncing) async fn wait_for_sync_to_start( state: &AppState, user_id: Uuid, source_id: Uuid, timeout_ms: u64 ) -> bool { wait_for_source_status(state, user_id, source_id, SourceStatus::Syncing, timeout_ms).await } /// Wait for sync to stop (status becomes Idle or Error) async fn wait_for_sync_to_stop( state: &AppState, user_id: Uuid, source_id: Uuid, timeout_ms: u64 ) -> bool { let start_time = std::time::Instant::now(); let timeout_duration = Duration::from_millis(timeout_ms); while start_time.elapsed() < timeout_duration { if let Ok(Some(source)) = state.db.get_source(user_id, source_id).await { if matches!(source.status, SourceStatus::Idle | SourceStatus::Error) { return true; } } sleep(Duration::from_millis(100)).await; } false } /// Create HTTP client for API testing fn create_test_app(state: Arc) -> Router { use axum::{routing::get, Router}; Router::new() .route("/api/health", get(readur::health_check)) .nest("/api/auth", readur::routes::auth::router()) .nest("/api/documents", readur::routes::documents::router()) .nest("/api/ignored-files", readur::routes::ignored_files::ignored_files_routes()) .nest("/api/labels", readur::routes::labels::router()) .nest("/api/metrics", readur::routes::metrics::router()) .nest("/metrics", readur::routes::prometheus_metrics::router()) .nest("/api/notifications", readur::routes::notifications::router()) .nest("/api/ocr", readur::routes::ocr::router()) .nest("/api/queue", readur::routes::queue::router()) .nest("/api/search", readur::routes::search::router()) .nest("/api/settings", readur::routes::settings::router()) .nest("/api/sources", readur::routes::sources::router()) .nest("/api/users", readur::routes::users::router()) .nest("/api/webdav", readur::routes::webdav::router()) .with_state(state) } /// Create authorization header for test user fn create_auth_header(user: &User, jwt_secret: &str) -> String { let claims = Claims { sub: user.id, username: user.username.clone(), exp: (chrono::Utc::now() + chrono::Duration::hours(24)).timestamp() as usize, }; let token = jsonwebtoken::encode( &jsonwebtoken::Header::default(), &claims, &jsonwebtoken::EncodingKey::from_secret(jwt_secret.as_ref()), ).unwrap(); format!("Bearer {}", token) } #[tokio::test] async fn test_complete_sync_cancellation_workflow() { println!("๐Ÿงช Testing complete sync cancellation workflow"); let state = create_test_app_state().await; let user = create_test_user(&state).await; let source = create_test_webdav_source(&state, user.id, "Test Cancellation Source").await; let app = create_test_app(state.clone()); // Create auth header let auth_header = create_auth_header(&user, &state.config.jwt_secret); println!("โœ… Created test user and source: {}", source.id); // Step 1: Verify source is initially idle let response = app .clone() .oneshot( Request::builder() .method("GET") .uri(format!("/api/sources/{}/sync/status", source.id)) .header("Authorization", &auth_header) .body(Body::empty()) .unwrap(), ) .await .unwrap(); assert_eq!(response.status(), StatusCode::OK); println!("โœ… Source is initially idle"); // Step 2: Start sync using the real scheduler let response = app .clone() .oneshot( Request::builder() .method("POST") .uri(format!("/api/sources/{}/sync", source.id)) .header("Authorization", &auth_header) .body(Body::empty()) .unwrap(), ) .await .unwrap(); let actual_status = response.status(); println!("๐Ÿ” Sync start actual status: {}", actual_status); // With real scheduler, should return OK (unless already running) assert!(matches!(actual_status, StatusCode::OK | StatusCode::CONFLICT)); println!("โœ… Sync start request completed with status: {}", actual_status); // Step 3: Wait for sync to actually start (with real scheduler) let sync_started = wait_for_sync_to_start(&state, user.id, source.id, 5000).await; if sync_started { println!("โœ… Sync actually started - status changed to Syncing"); // Give it a moment to establish the sync sleep(Duration::from_millis(500)).await; } else { println!("โš ๏ธ Sync did not start within timeout (may fail quickly due to invalid server)"); // The sync might fail immediately due to invalid server, which is fine for testing cancellation } // Step 4: Cancel the sync let response = app .clone() .oneshot( Request::builder() .method("POST") .uri(format!("/api/sources/{}/sync/stop", source.id)) .header("Authorization", &auth_header) .body(Body::empty()) .unwrap(), ) .await .unwrap(); assert_eq!(response.status(), StatusCode::OK); println!("โœ… Sync cancellation request successful"); // Step 5: Wait for sync to actually stop with real scheduler let sync_stopped = wait_for_sync_to_stop(&state, user.id, source.id, 10000).await; if sync_stopped { println!("โœ… Sync actually stopped - status changed to Idle/Error"); } else { println!("โš ๏ธ Sync did not stop within timeout, checking current status"); } let source_after_cancel = state.db.get_source(user.id, source.id).await.unwrap().unwrap(); println!("โœ… Source status after cancellation: {:?}", source_after_cancel.status); // With real scheduler, we should see proper status transitions assert!(matches!(source_after_cancel.status, SourceStatus::Idle | SourceStatus::Error), "Source should be Idle or Error after cancellation, got: {:?}", source_after_cancel.status); // Step 6: Verify sync status API shows no active sync let response = app .clone() .oneshot( Request::builder() .method("GET") .uri(format!("/api/sources/{}/sync/status", source.id)) .header("Authorization", &auth_header) .body(Body::empty()) .unwrap(), ) .await .unwrap(); assert_eq!(response.status(), StatusCode::OK); println!("โœ… Sync status API accessible after cancellation"); // Cleanup state.db.delete_source(user.id, source.id).await.unwrap(); state.db.delete_user(user.id).await.unwrap(); println!("๐ŸŽ‰ Complete sync cancellation workflow test passed"); } #[tokio::test] async fn test_multiple_cancellation_requests() { println!("๐Ÿงช Testing multiple cancellation requests handling"); let state = create_test_app_state().await; let user = create_test_user(&state).await; let source = create_test_webdav_source(&state, user.id, "Multiple Cancel Test Source").await; let app = create_test_app(state.clone()); let auth_header = create_auth_header(&user, &state.config.jwt_secret); println!("โœ… Created test setup for multiple cancellation test"); // Start sync let response = app .clone() .oneshot( Request::builder() .method("POST") .uri(format!("/api/sources/{}/sync", source.id)) .header("Authorization", &auth_header) .body(Body::empty()) .unwrap(), ) .await .unwrap(); println!("โœ… Sync started with status: {}", response.status()); // Wait briefly sleep(Duration::from_millis(200)).await; // Send multiple cancellation requests concurrently let mut cancel_handles = Vec::new(); for i in 0..3 { let app_clone = app.clone(); let auth_header_clone = auth_header.clone(); let source_id = source.id; let handle = tokio::spawn(async move { let response = app_clone .oneshot( Request::builder() .method("POST") .uri(format!("/api/sources/{}/sync/stop", source_id)) .header("Authorization", auth_header_clone) .body(Body::empty()) .unwrap(), ) .await .unwrap(); println!("โœ… Cancellation request {} completed with status: {}", i + 1, response.status()); response.status() }); cancel_handles.push(handle); } // Wait for all cancellation requests to complete let mut success_count = 0; for handle in cancel_handles { let status = handle.await.unwrap(); if status == StatusCode::OK { success_count += 1; } } // All cancellation requests should succeed (idempotent) assert_eq!(success_count, 3); println!("โœ… All {} cancellation requests succeeded", success_count); // Verify final state sleep(Duration::from_millis(1000)).await; let final_source = state.db.get_source(user.id, source.id).await.unwrap().unwrap(); println!("โœ… Final source status: {:?}", final_source.status); // Cleanup state.db.delete_source(user.id, source.id).await.unwrap(); state.db.delete_user(user.id).await.unwrap(); println!("๐ŸŽ‰ Multiple cancellation requests test passed"); } #[tokio::test] async fn test_cancellation_without_active_sync() { println!("๐Ÿงช Testing cancellation when no sync is active"); let state = create_test_app_state().await; let user = create_test_user(&state).await; let source = create_test_webdav_source(&state, user.id, "No Active Sync Source").await; let app = create_test_app(state.clone()); let auth_header = create_auth_header(&user, &state.config.jwt_secret); println!("โœ… Created test setup for no active sync test"); // Verify source is idle let initial_source = state.db.get_source(user.id, source.id).await.unwrap().unwrap(); assert_eq!(initial_source.status, SourceStatus::Idle); println!("โœ… Source is initially idle: {:?}", initial_source.status); // Try to cancel sync when none is active let response = app .clone() .oneshot( Request::builder() .method("POST") .uri(format!("/api/sources/{}/sync/stop", source.id)) .header("Authorization", &auth_header) .body(Body::empty()) .unwrap(), ) .await .unwrap(); // Should succeed (idempotent behavior) assert_eq!(response.status(), StatusCode::OK); println!("โœ… Cancellation without active sync succeeded"); // Verify source remains idle let final_source = state.db.get_source(user.id, source.id).await.unwrap().unwrap(); assert_eq!(final_source.status, SourceStatus::Idle); println!("โœ… Source remains idle after cancellation: {:?}", final_source.status); // Cleanup state.db.delete_source(user.id, source.id).await.unwrap(); state.db.delete_user(user.id).await.unwrap(); println!("๐ŸŽ‰ Cancellation without active sync test passed"); } #[tokio::test] async fn test_sync_status_monitoring_during_cancellation() { println!("๐Ÿงช Testing sync status monitoring during cancellation"); let state = create_test_app_state().await; let user = create_test_user(&state).await; let source = create_test_webdav_source(&state, user.id, "Status Monitor Source").await; let app = create_test_app(state.clone()); let auth_header = create_auth_header(&user, &state.config.jwt_secret); println!("โœ… Created test setup for status monitoring test"); // Start sync let response = app .clone() .oneshot( Request::builder() .method("POST") .uri(format!("/api/sources/{}/sync", source.id)) .header("Authorization", &auth_header) .body(Body::empty()) .unwrap(), ) .await .unwrap(); println!("โœ… Sync started with status: {}", response.status()); // Monitor sync status before cancellation let response = app .clone() .oneshot( Request::builder() .method("GET") .uri(format!("/api/sources/{}/sync/status", source.id)) .header("Authorization", &auth_header) .body(Body::empty()) .unwrap(), ) .await .unwrap(); assert_eq!(response.status(), StatusCode::OK); println!("โœ… Sync status API accessible before cancellation"); // Cancel sync let response = app .clone() .oneshot( Request::builder() .method("POST") .uri(format!("/api/sources/{}/sync/stop", source.id)) .header("Authorization", &auth_header) .body(Body::empty()) .unwrap(), ) .await .unwrap(); assert_eq!(response.status(), StatusCode::OK); println!("โœ… Sync cancellation successful"); // Monitor sync status after cancellation sleep(Duration::from_millis(500)).await; let response = app .clone() .oneshot( Request::builder() .method("GET") .uri(format!("/api/sources/{}/sync/status", source.id)) .header("Authorization", &auth_header) .body(Body::empty()) .unwrap(), ) .await .unwrap(); assert_eq!(response.status(), StatusCode::OK); println!("โœ… Sync status API accessible after cancellation"); // Check database state let final_source = state.db.get_source(user.id, source.id).await.unwrap().unwrap(); println!("โœ… Final database status: {:?}", final_source.status); // Cleanup state.db.delete_source(user.id, source.id).await.unwrap(); state.db.delete_user(user.id).await.unwrap(); println!("๐ŸŽ‰ Sync status monitoring during cancellation test passed"); } #[tokio::test] async fn test_cancellation_with_unauthorized_user() { println!("๐Ÿงช Testing cancellation with unauthorized user"); let state = create_test_app_state().await; let owner_user = create_test_user(&state).await; let unauthorized_user = create_test_user(&state).await; let source = create_test_webdav_source(&state, owner_user.id, "Unauthorized Test Source").await; let app = create_test_app(state.clone()); let unauthorized_auth_header = create_auth_header(&unauthorized_user, &state.config.jwt_secret); println!("โœ… Created test setup with unauthorized user"); // Try to cancel sync with unauthorized user let response = app .clone() .oneshot( Request::builder() .method("POST") .uri(format!("/api/sources/{}/sync/stop", source.id)) .header("Authorization", &unauthorized_auth_header) .body(Body::empty()) .unwrap(), ) .await .unwrap(); // Should return 404 (source not found for this user) assert_eq!(response.status(), StatusCode::NOT_FOUND); println!("โœ… Unauthorized cancellation properly rejected with 404"); // Cleanup state.db.delete_source(owner_user.id, source.id).await.unwrap(); state.db.delete_user(owner_user.id).await.unwrap(); state.db.delete_user(unauthorized_user.id).await.unwrap(); println!("๐ŸŽ‰ Unauthorized user cancellation test passed"); } #[tokio::test] async fn test_cancellation_of_nonexistent_source() { println!("๐Ÿงช Testing cancellation of nonexistent source"); let state = create_test_app_state().await; let user = create_test_user(&state).await; let app = create_test_app(state.clone()); let auth_header = create_auth_header(&user, &state.config.jwt_secret); let nonexistent_source_id = Uuid::new_v4(); println!("โœ… Created test setup for nonexistent source test"); // Try to cancel sync for nonexistent source let response = app .clone() .oneshot( Request::builder() .method("POST") .uri(format!("/api/sources/{}/sync/stop", nonexistent_source_id)) .header("Authorization", &auth_header) .body(Body::empty()) .unwrap(), ) .await .unwrap(); // Should return 404 assert_eq!(response.status(), StatusCode::NOT_FOUND); println!("โœ… Nonexistent source cancellation properly rejected with 404"); // Cleanup state.db.delete_user(user.id).await.unwrap(); println!("๐ŸŽ‰ Nonexistent source cancellation test passed"); } #[tokio::test] async fn test_sync_start_cancel_start_sequence() { println!("๐Ÿงช Testing sync start -> cancel -> start sequence"); let state = create_test_app_state().await; let user = create_test_user(&state).await; let source = create_test_webdav_source(&state, user.id, "Start Cancel Start Source").await; let app = create_test_app(state.clone()); let auth_header = create_auth_header(&user, &state.config.jwt_secret); println!("โœ… Created test setup for start-cancel-start sequence"); // Step 1: Start sync let response = app .clone() .oneshot( Request::builder() .method("POST") .uri(format!("/api/sources/{}/sync", source.id)) .header("Authorization", &auth_header) .body(Body::empty()) .unwrap(), ) .await .unwrap(); println!("โœ… First sync start: {}", response.status()); // Step 2: Wait briefly then cancel sleep(Duration::from_millis(300)).await; let response = app .clone() .oneshot( Request::builder() .method("POST") .uri(format!("/api/sources/{}/sync/stop", source.id)) .header("Authorization", &auth_header) .body(Body::empty()) .unwrap(), ) .await .unwrap(); assert_eq!(response.status(), StatusCode::OK); println!("โœ… Sync cancellation successful"); // Step 3: Wait for cancellation to complete sleep(Duration::from_millis(1000)).await; // Step 4: Start sync again let response = app .clone() .oneshot( Request::builder() .method("POST") .uri(format!("/api/sources/{}/sync", source.id)) .header("Authorization", &auth_header) .body(Body::empty()) .unwrap(), ) .await .unwrap(); // Should succeed or return expected error status let status = response.status(); assert!(matches!(status, StatusCode::OK | StatusCode::CONFLICT | StatusCode::INTERNAL_SERVER_ERROR)); println!("โœ… Second sync start after cancellation: {}", status); // Step 5: Cancel the second sync sleep(Duration::from_millis(300)).await; let response = app .clone() .oneshot( Request::builder() .method("POST") .uri(format!("/api/sources/{}/sync/stop", source.id)) .header("Authorization", &auth_header) .body(Body::empty()) .unwrap(), ) .await .unwrap(); assert_eq!(response.status(), StatusCode::OK); println!("โœ… Second cancellation successful"); // Verify final state sleep(Duration::from_millis(1000)).await; let final_source = state.db.get_source(user.id, source.id).await.unwrap().unwrap(); println!("โœ… Final source status: {:?}", final_source.status); // Cleanup state.db.delete_source(user.id, source.id).await.unwrap(); state.db.delete_user(user.id).await.unwrap(); println!("๐ŸŽ‰ Start-cancel-start sequence test passed"); } /// Test that validates sync actually stops working, not just changes status #[tokio::test] async fn test_sync_actually_stops_working() { println!("๐Ÿงช Testing that sync cancellation actually stops sync work"); let state = create_test_app_state().await; let user = create_test_user(&state).await; let source = create_test_webdav_source(&state, user.id, "Actual Stop Test Source").await; let app = create_test_app(state.clone()); let auth_header = create_auth_header(&user, &state.config.jwt_secret); println!("โœ… Created test setup for actual sync stop validation"); // First check that progress tracker shows no active syncs let initial_active_syncs = state.sync_progress_tracker.get_active_source_ids(); assert!(initial_active_syncs.is_empty(), "Should have no active syncs initially"); assert!(!state.sync_progress_tracker.is_syncing(source.id), "Source should not be syncing initially"); // Step 1: Start sync and verify it's actually registered as active let response = app .clone() .oneshot( Request::builder() .method("POST") .uri(format!("/api/sources/{}/sync", source.id)) .header("Authorization", &auth_header) .body(Body::empty()) .unwrap(), ) .await .unwrap(); println!("โœ… Sync start response: {}", response.status()); // Step 2: Wait for sync to actually start and be registered let mut sync_became_active = false; for attempt in 1..=20 { // Wait up to 2 seconds sleep(Duration::from_millis(100)).await; if state.sync_progress_tracker.is_syncing(source.id) { sync_became_active = true; println!("โœ… Sync became active after {} attempts ({}ms)", attempt, attempt * 100); break; } } // Verify sync actually became active if !sync_became_active { println!("โš ๏ธ Sync never became active - checking database status"); let db_source = state.db.get_source(user.id, source.id).await.unwrap().unwrap(); println!("Database source status: {:?}", db_source.status); // If sync didn't start due to no scheduler or other issues, that's fine for this test // The important part is that we test stopping when sync IS active if db_source.status != SourceStatus::Syncing { println!("โš ๏ธ Skipping actual stop test - sync never started (likely no scheduler available)"); // Cleanup state.db.delete_source(user.id, source.id).await.unwrap(); state.db.delete_user(user.id).await.unwrap(); return; } } // Step 3: Verify sync is tracked in multiple places let active_syncs_before_stop = state.sync_progress_tracker.get_active_source_ids(); println!("๐Ÿ“Š Active syncs before stop: {:?}", active_syncs_before_stop); let progress_before_stop = state.sync_progress_tracker.get_progress(source.id); println!("๐Ÿ“Š Progress before stop: {:?}", progress_before_stop); let db_source_before_stop = state.db.get_source(user.id, source.id).await.unwrap().unwrap(); println!("๐Ÿ“Š Database status before stop: {:?}", db_source_before_stop.status); // Step 4: Stop the sync let stop_response = app .clone() .oneshot( Request::builder() .method("POST") .uri(format!("/api/sources/{}/sync/stop", source.id)) .header("Authorization", &auth_header) .body(Body::empty()) .unwrap(), ) .await .unwrap(); assert_eq!(stop_response.status(), StatusCode::OK); println!("โœ… Stop sync request successful"); // Step 5: Verify sync actually stops working // Check progress tracker immediately (should be unregistered) let progress_after_stop_immediate = state.sync_progress_tracker.get_progress(source.id); println!("๐Ÿ“Š Progress immediately after stop: {:?}", progress_after_stop_immediate); // Wait a bit for all cleanup to complete sleep(Duration::from_millis(500)).await; let active_syncs_after_stop = state.sync_progress_tracker.get_active_source_ids(); println!("๐Ÿ“Š Active syncs after stop: {:?}", active_syncs_after_stop); let progress_after_stop = state.sync_progress_tracker.get_progress(source.id); println!("๐Ÿ“Š Progress after stop with delay: {:?}", progress_after_stop); let db_source_after_stop = state.db.get_source(user.id, source.id).await.unwrap().unwrap(); println!("๐Ÿ“Š Database status after stop: {:?}", db_source_after_stop.status); // Step 6: Assertions to verify sync actually stopped // The source should no longer be tracked as actively syncing assert!(!state.sync_progress_tracker.is_syncing(source.id), "Source should not be tracked as syncing after stop"); // The source should not be in the active syncs list assert!(!active_syncs_after_stop.contains(&source.id), "Source should not be in active syncs list after stop"); // Database status should be Idle (not Syncing) assert_eq!(db_source_after_stop.status, SourceStatus::Idle, "Database status should be Idle after stop"); // Progress should either be None or show as not active if let Some(progress) = progress_after_stop { assert!(!progress.is_active, "Progress should show as not active after stop"); } println!("โœ… All sync stop validations passed"); // Step 7: Test that sync can be restarted after stop sleep(Duration::from_millis(1000)).await; // Wait for complete cleanup let restart_response = app .clone() .oneshot( Request::builder() .method("POST") .uri(format!("/api/sources/{}/sync", source.id)) .header("Authorization", &auth_header) .body(Body::empty()) .unwrap(), ) .await .unwrap(); println!("โœ… Restart after stop response: {}", restart_response.status()); // Stop the restarted sync for cleanup sleep(Duration::from_millis(200)).await; let final_stop_response = app .clone() .oneshot( Request::builder() .method("POST") .uri(format!("/api/sources/{}/sync/stop", source.id)) .header("Authorization", &auth_header) .body(Body::empty()) .unwrap(), ) .await .unwrap(); println!("โœ… Final stop response: {}", final_stop_response.status()); // Cleanup sleep(Duration::from_millis(500)).await; state.db.delete_source(user.id, source.id).await.unwrap(); state.db.delete_user(user.id).await.unwrap(); println!("๐ŸŽ‰ Actual sync stop validation test passed"); } /// Test that validates sync cancellation during different phases #[tokio::test] async fn test_sync_cancellation_during_different_phases() { println!("๐Ÿงช Testing sync cancellation during different phases"); let state = create_test_app_state().await; let user = create_test_user(&state).await; let source = create_test_webdav_source(&state, user.id, "Phase Cancellation Test Source").await; let app = create_test_app(state.clone()); let auth_header = create_auth_header(&user, &state.config.jwt_secret); println!("โœ… Created test setup for phase-based cancellation"); // Test cancellation at different timing intervals to catch different phases let test_delays = vec![50, 150, 300, 500]; // Different delays in milliseconds for (i, delay_ms) in test_delays.iter().enumerate() { println!("๐Ÿ”„ Testing cancellation after {}ms delay (iteration {})", delay_ms, i + 1); // Start sync let response = app .clone() .oneshot( Request::builder() .method("POST") .uri(format!("/api/sources/{}/sync", source.id)) .header("Authorization", &auth_header) .body(Body::empty()) .unwrap(), ) .await .unwrap(); println!(" ๐Ÿ“ก Sync start status: {}", response.status()); // Wait for the specified delay to let sync progress sleep(Duration::from_millis(*delay_ms)).await; // Check what phase we might be in (if any) let progress_info = state.sync_progress_tracker.get_progress(source.id); if let Some(progress) = &progress_info { println!(" ๐Ÿ“Š Cancelling during phase: {} ({})", progress.phase, progress.phase_description); } else { println!(" ๐Ÿ“Š No progress info available - sync may not have started or already completed"); } // Cancel the sync let cancel_response = app .clone() .oneshot( Request::builder() .method("POST") .uri(format!("/api/sources/{}/sync/stop", source.id)) .header("Authorization", &auth_header) .body(Body::empty()) .unwrap(), ) .await .unwrap(); assert_eq!(cancel_response.status(), StatusCode::OK); println!(" โœ… Cancellation successful"); // Verify cleanup sleep(Duration::from_millis(300)).await; assert!(!state.sync_progress_tracker.is_syncing(source.id), "Source should not be syncing after cancellation in iteration {}", i + 1); let db_source = state.db.get_source(user.id, source.id).await.unwrap().unwrap(); assert_eq!(db_source.status, SourceStatus::Idle, "Source should be idle after cancellation in iteration {}", i + 1); println!(" โœ… Cleanup verified for iteration {}", i + 1); // Wait before next iteration to ensure complete cleanup sleep(Duration::from_millis(500)).await; } // Cleanup state.db.delete_source(user.id, source.id).await.unwrap(); state.db.delete_user(user.id).await.unwrap(); println!("๐ŸŽ‰ Phase-based cancellation test passed"); } /// Test resource cleanup validation after sync cancellation #[tokio::test] async fn test_resource_cleanup_after_cancellation() { println!("๐Ÿงช Testing resource cleanup after sync cancellation"); let state = create_test_app_state().await; let user = create_test_user(&state).await; let source = create_test_webdav_source(&state, user.id, "Resource Cleanup Test Source").await; let app = create_test_app(state.clone()); let auth_header = create_auth_header(&user, &state.config.jwt_secret); println!("โœ… Created test setup for resource cleanup validation"); // Record initial state let initial_active_syncs = state.sync_progress_tracker.get_active_source_ids(); let initial_progress = state.sync_progress_tracker.get_progress(source.id); println!("๐Ÿ“Š Initial active syncs: {:?}", initial_active_syncs); println!("๐Ÿ“Š Initial progress: {:?}", initial_progress); // Start sync let response = app .clone() .oneshot( Request::builder() .method("POST") .uri(format!("/api/sources/{}/sync", source.id)) .header("Authorization", &auth_header) .body(Body::empty()) .unwrap(), ) .await .unwrap(); println!("โœ… Sync started with status: {}", response.status()); // Wait for sync to become active sleep(Duration::from_millis(200)).await; // Record active state let active_syncs_during = state.sync_progress_tracker.get_active_source_ids(); let progress_during = state.sync_progress_tracker.get_progress(source.id); let is_syncing_during = state.sync_progress_tracker.is_syncing(source.id); println!("๐Ÿ“Š Active syncs during: {:?}", active_syncs_during); println!("๐Ÿ“Š Progress during: {:?}", progress_during); println!("๐Ÿ“Š Is syncing during: {}", is_syncing_during); // Cancel sync let cancel_response = app .clone() .oneshot( Request::builder() .method("POST") .uri(format!("/api/sources/{}/sync/stop", source.id)) .header("Authorization", &auth_header) .body(Body::empty()) .unwrap(), ) .await .unwrap(); assert_eq!(cancel_response.status(), StatusCode::OK); println!("โœ… Sync cancellation successful"); // Wait for cleanup to complete sleep(Duration::from_millis(1000)).await; // Verify complete cleanup let final_active_syncs = state.sync_progress_tracker.get_active_source_ids(); let final_progress = state.sync_progress_tracker.get_progress(source.id); let is_syncing_final = state.sync_progress_tracker.is_syncing(source.id); let db_source_final = state.db.get_source(user.id, source.id).await.unwrap().unwrap(); println!("๐Ÿ“Š Final active syncs: {:?}", final_active_syncs); println!("๐Ÿ“Š Final progress: {:?}", final_progress); println!("๐Ÿ“Š Is syncing final: {}", is_syncing_final); println!("๐Ÿ“Š Final DB status: {:?}", db_source_final.status); // Assertions for complete cleanup assert!(!final_active_syncs.contains(&source.id), "Source should be removed from active syncs list"); assert!(!is_syncing_final, "Progress tracker should not show source as syncing"); assert_eq!(db_source_final.status, SourceStatus::Idle, "Database should show source as Idle"); // If progress exists, it should not be active if let Some(progress) = final_progress { assert!(!progress.is_active, "Any remaining progress should show as inactive"); } // Test multiple rapid start/stop cycles to stress test cleanup println!("๐Ÿ”„ Testing rapid start/stop cycles"); for cycle in 1..=3 { println!(" ๐Ÿ”„ Cycle {}", cycle); // Start let start_response = app .clone() .oneshot( Request::builder() .method("POST") .uri(format!("/api/sources/{}/sync", source.id)) .header("Authorization", &auth_header) .body(Body::empty()) .unwrap(), ) .await .unwrap(); println!(" ๐Ÿ“ก Start: {}", start_response.status()); // Brief wait sleep(Duration::from_millis(100)).await; // Stop let stop_response = app .clone() .oneshot( Request::builder() .method("POST") .uri(format!("/api/sources/{}/sync/stop", source.id)) .header("Authorization", &auth_header) .body(Body::empty()) .unwrap(), ) .await .unwrap(); println!(" ๐Ÿ›‘ Stop: {}", stop_response.status()); // Verify cleanup after each cycle sleep(Duration::from_millis(300)).await; assert!(!state.sync_progress_tracker.is_syncing(source.id), "Source should not be syncing after cycle {}", cycle); let db_check = state.db.get_source(user.id, source.id).await.unwrap().unwrap(); assert_eq!(db_check.status, SourceStatus::Idle, "Source should be idle after cycle {}", cycle); } println!("โœ… Rapid cycle cleanup verified"); // Cleanup state.db.delete_source(user.id, source.id).await.unwrap(); state.db.delete_user(user.id).await.unwrap(); println!("๐ŸŽ‰ Resource cleanup validation test passed"); } /// Test that validates cancellation token propagation through sync layers #[tokio::test] async fn test_cancellation_token_propagation() { println!("๐Ÿงช Testing cancellation token propagation through sync layers"); let state = create_test_app_state().await; let user = create_test_user(&state).await; let source = create_test_webdav_source(&state, user.id, "Token Propagation Test Source").await; let app = create_test_app(state.clone()); let auth_header = create_auth_header(&user, &state.config.jwt_secret); // Create multiple sources to test concurrent cancellation handling let source2 = create_test_webdav_source(&state, user.id, "Second Token Test Source").await; let source3 = create_test_webdav_source(&state, user.id, "Third Token Test Source").await; println!("โœ… Created test setup with multiple sources for token propagation"); // Start multiple syncs concurrently let sync_futures = vec![ app.clone().oneshot( Request::builder() .method("POST") .uri(format!("/api/sources/{}/sync", source.id)) .header("Authorization", &auth_header) .body(Body::empty()) .unwrap(), ), app.clone().oneshot( Request::builder() .method("POST") .uri(format!("/api/sources/{}/sync", source2.id)) .header("Authorization", &auth_header) .body(Body::empty()) .unwrap(), ), app.clone().oneshot( Request::builder() .method("POST") .uri(format!("/api/sources/{}/sync", source3.id)) .header("Authorization", &auth_header) .body(Body::empty()) .unwrap(), ), ]; let results = futures::future::join_all(sync_futures).await; for (i, result) in results.iter().enumerate() { if let Ok(response) = result { println!("โœ… Source {} sync start: {}", i + 1, response.status()); } } // Wait for syncs to potentially start sleep(Duration::from_millis(300)).await; // Record which sources are actually active let active_before = state.sync_progress_tracker.get_active_source_ids(); println!("๐Ÿ“Š Active syncs before cancellation: {:?}", active_before); // Test individual cancellation (should only affect specific source) let cancel_response = app .clone() .oneshot( Request::builder() .method("POST") .uri(format!("/api/sources/{}/sync/stop", source.id)) .header("Authorization", &auth_header) .body(Body::empty()) .unwrap(), ) .await .unwrap(); assert_eq!(cancel_response.status(), StatusCode::OK); println!("โœ… Individual source cancellation successful"); // Wait for cancellation to propagate sleep(Duration::from_millis(500)).await; // Verify that only the cancelled source stopped let active_after_individual = state.sync_progress_tracker.get_active_source_ids(); println!("๐Ÿ“Š Active syncs after individual cancellation: {:?}", active_after_individual); // The cancelled source should not be active assert!(!state.sync_progress_tracker.is_syncing(source.id), "Cancelled source should not be syncing"); // Other sources might still be active (depending on implementation) // The key test is that the cancellation was isolated to the correct source // Cancel the remaining sources let remaining_cancel_futures = vec![ app.clone().oneshot( Request::builder() .method("POST") .uri(format!("/api/sources/{}/sync/stop", source2.id)) .header("Authorization", &auth_header) .body(Body::empty()) .unwrap(), ), app.clone().oneshot( Request::builder() .method("POST") .uri(format!("/api/sources/{}/sync/stop", source3.id)) .header("Authorization", &auth_header) .body(Body::empty()) .unwrap(), ), ]; let cancel_results = futures::future::join_all(remaining_cancel_futures).await; for (i, result) in cancel_results.iter().enumerate() { if let Ok(response) = result { println!("โœ… Remaining source {} cancel: {}", i + 2, response.status()); } } // Wait for all cancellations to complete sleep(Duration::from_millis(1000)).await; // Verify all sources are now idle let final_active = state.sync_progress_tracker.get_active_source_ids(); println!("๐Ÿ“Š Final active syncs: {:?}", final_active); assert!(!state.sync_progress_tracker.is_syncing(source.id), "Source 1 should not be syncing"); assert!(!state.sync_progress_tracker.is_syncing(source2.id), "Source 2 should not be syncing"); assert!(!state.sync_progress_tracker.is_syncing(source3.id), "Source 3 should not be syncing"); // Verify database states let db_sources = vec![ state.db.get_source(user.id, source.id).await.unwrap().unwrap(), state.db.get_source(user.id, source2.id).await.unwrap().unwrap(), state.db.get_source(user.id, source3.id).await.unwrap().unwrap(), ]; for (i, db_source) in db_sources.iter().enumerate() { assert_eq!(db_source.status, SourceStatus::Idle, "Database source {} should be idle", i + 1); println!("๐Ÿ“Š Database source {} status: {:?}", i + 1, db_source.status); } // Cleanup state.db.delete_source(user.id, source.id).await.unwrap(); state.db.delete_source(user.id, source2.id).await.unwrap(); state.db.delete_source(user.id, source3.id).await.unwrap(); state.db.delete_user(user.id).await.unwrap(); println!("๐ŸŽ‰ Cancellation token propagation test passed"); } /// Comprehensive test that validates the complete sync cancellation workflow /// This is the main test that covers all aspects of sync cancellation #[tokio::test] async fn test_comprehensive_sync_cancellation_validation() { println!("๐Ÿงช COMPREHENSIVE TEST: Complete sync cancellation validation"); let state = create_test_app_state().await; let user = create_test_user(&state).await; let source = create_test_webdav_source(&state, user.id, "Comprehensive Cancellation Test").await; let app = create_test_app(state.clone()); let auth_header = create_auth_header(&user, &state.config.jwt_secret); println!("โœ… Created comprehensive test environment"); // PHASE 1: Validate initial state println!("๐Ÿ“ PHASE 1: Initial state validation"); let initial_db_source = state.db.get_source(user.id, source.id).await.unwrap().unwrap(); let initial_active_syncs = state.sync_progress_tracker.get_active_source_ids(); let initial_is_syncing = state.sync_progress_tracker.is_syncing(source.id); let initial_progress = state.sync_progress_tracker.get_progress(source.id); assert_eq!(initial_db_source.status, SourceStatus::Idle, "Initial DB status should be Idle"); assert!(initial_active_syncs.is_empty(), "Initial active syncs should be empty"); assert!(!initial_is_syncing, "Initial sync state should be false"); assert!(initial_progress.is_none(), "Initial progress should be None"); println!("โœ… PHASE 1 PASSED: All initial states correct"); // PHASE 2: Start sync and validate activation println!("๐Ÿ“ PHASE 2: Sync activation validation"); let start_response = app .clone() .oneshot( Request::builder() .method("POST") .uri(format!("/api/sources/{}/sync", source.id)) .header("Authorization", &auth_header) .body(Body::empty()) .unwrap(), ) .await .unwrap(); println!("๐Ÿ“ก Sync start response: {}", start_response.status()); // Wait for sync to activate and check multiple indicators let mut sync_activation_verified = false; for attempt in 1..=30 { // Wait up to 3 seconds sleep(Duration::from_millis(100)).await; let db_source = state.db.get_source(user.id, source.id).await.unwrap().unwrap(); let is_syncing = state.sync_progress_tracker.is_syncing(source.id); let active_syncs = state.sync_progress_tracker.get_active_source_ids(); if db_source.status == SourceStatus::Syncing || is_syncing || active_syncs.contains(&source.id) { sync_activation_verified = true; println!("โœ… Sync activation verified after {} attempts:", attempt); println!(" ๐Ÿ“Š DB Status: {:?}", db_source.status); println!(" ๐Ÿ“Š Is Syncing: {}", is_syncing); println!(" ๐Ÿ“Š Active Syncs: {:?}", active_syncs); break; } } if !sync_activation_verified { println!("โš ๏ธ PHASE 2 CONDITIONAL PASS: Sync never activated (likely no scheduler)"); // Cleanup and exit gracefully state.db.delete_source(user.id, source.id).await.unwrap(); state.db.delete_user(user.id).await.unwrap(); return; } println!("โœ… PHASE 2 PASSED: Sync activation verified"); // PHASE 3: Validate active sync state across all systems println!("๐Ÿ“ PHASE 3: Active sync state validation"); let active_db_source = state.db.get_source(user.id, source.id).await.unwrap().unwrap(); let active_is_syncing = state.sync_progress_tracker.is_syncing(source.id); let active_syncs_list = state.sync_progress_tracker.get_active_source_ids(); let active_progress = state.sync_progress_tracker.get_progress(source.id); println!("๐Ÿ“Š Active state summary:"); println!(" ๐Ÿ“Š DB Status: {:?}", active_db_source.status); println!(" ๐Ÿ“Š Is Syncing: {}", active_is_syncing); println!(" ๐Ÿ“Š Active Syncs: {:?}", active_syncs_list); println!(" ๐Ÿ“Š Progress Active: {:?}", active_progress.as_ref().map(|p| p.is_active)); // At least one indicator should show sync is active let sync_indicators_active = active_db_source.status == SourceStatus::Syncing || active_is_syncing || active_syncs_list.contains(&source.id); assert!(sync_indicators_active, "At least one sync indicator should show active state"); println!("โœ… PHASE 3 PASSED: Active sync state validated"); // PHASE 4: Cancel sync and validate immediate response println!("๐Ÿ“ PHASE 4: Sync cancellation execution"); let cancel_response = app .clone() .oneshot( Request::builder() .method("POST") .uri(format!("/api/sources/{}/sync/stop", source.id)) .header("Authorization", &auth_header) .body(Body::empty()) .unwrap(), ) .await .unwrap(); assert_eq!(cancel_response.status(), StatusCode::OK); println!("โœ… PHASE 4 PASSED: Cancellation request successful"); // PHASE 5: Validate cancellation propagation and cleanup println!("๐Ÿ“ PHASE 5: Cancellation cleanup validation"); // Check immediate state (some cleanup might be instant) let immediate_is_syncing = state.sync_progress_tracker.is_syncing(source.id); let immediate_active_syncs = state.sync_progress_tracker.get_active_source_ids(); println!("๐Ÿ“Š Immediate post-cancel state:"); println!(" ๐Ÿ“Š Is Syncing: {}", immediate_is_syncing); println!(" ๐Ÿ“Š Active Syncs: {:?}", immediate_active_syncs); // Wait for complete cleanup sleep(Duration::from_millis(1500)).await; let final_db_source = state.db.get_source(user.id, source.id).await.unwrap().unwrap(); let final_is_syncing = state.sync_progress_tracker.is_syncing(source.id); let final_active_syncs = state.sync_progress_tracker.get_active_source_ids(); let final_progress = state.sync_progress_tracker.get_progress(source.id); println!("๐Ÿ“Š Final post-cancel state:"); println!(" ๐Ÿ“Š DB Status: {:?}", final_db_source.status); println!(" ๐Ÿ“Š Is Syncing: {}", final_is_syncing); println!(" ๐Ÿ“Š Active Syncs: {:?}", final_active_syncs); println!(" ๐Ÿ“Š Progress: {:?}", final_progress.as_ref().map(|p| (p.is_active, &p.phase))); // CRITICAL ASSERTIONS: These must all pass for proper cancellation assert_eq!(final_db_source.status, SourceStatus::Idle, "CRITICAL: Database status must be Idle after cancellation"); assert!(!final_is_syncing, "CRITICAL: Progress tracker must not show source as syncing"); assert!(!final_active_syncs.contains(&source.id), "CRITICAL: Source must not be in active syncs list"); if let Some(progress) = final_progress { assert!(!progress.is_active, "CRITICAL: Any remaining progress must show as inactive"); } println!("โœ… PHASE 5 PASSED: Complete cancellation cleanup verified"); // PHASE 6: Validate restart capability after cancellation println!("๐Ÿ“ PHASE 6: Post-cancellation restart validation"); sleep(Duration::from_millis(500)).await; // Ensure complete cleanup let restart_response = app .clone() .oneshot( Request::builder() .method("POST") .uri(format!("/api/sources/{}/sync", source.id)) .header("Authorization", &auth_header) .body(Body::empty()) .unwrap(), ) .await .unwrap(); println!("๐Ÿ“ก Restart response: {}", restart_response.status()); // The restart should succeed (or fail with expected reasons, not due to lingering state) let acceptable_restart_statuses = [StatusCode::OK, StatusCode::CONFLICT, StatusCode::INTERNAL_SERVER_ERROR, StatusCode::NOT_IMPLEMENTED]; assert!(acceptable_restart_statuses.contains(&restart_response.status()), "Restart should succeed or fail with expected status, got: {}", restart_response.status()); // Clean up the restarted sync sleep(Duration::from_millis(200)).await; let final_cleanup_response = app .clone() .oneshot( Request::builder() .method("POST") .uri(format!("/api/sources/{}/sync/stop", source.id)) .header("Authorization", &auth_header) .body(Body::empty()) .unwrap(), ) .await .unwrap(); println!("๐Ÿ“ก Final cleanup response: {}", final_cleanup_response.status()); println!("โœ… PHASE 6 PASSED: Restart capability validated"); // Final cleanup sleep(Duration::from_millis(500)).await; state.db.delete_source(user.id, source.id).await.unwrap(); state.db.delete_user(user.id).await.unwrap(); println!("๐ŸŽ‰ COMPREHENSIVE TEST PASSED: Complete sync cancellation validation successful"); println!(" โœ… All 6 phases validated successfully"); println!(" โœ… Sync actually stops working (not just status changes)"); println!(" โœ… Resources properly cleaned up"); println!(" โœ… System remains in consistent state"); }