use anyhow::Result; use chrono::Utc; use std::sync::Arc; use uuid::Uuid; use sha2::{Sha256, Digest}; use readur::{ AppState, db::Database, config::Config, models::{FileIngestionInfo, Document, Source, SourceType, SourceStatus}, }; // Helper function to calculate file hash fn calculate_file_hash(data: &[u8]) -> String { let mut hasher = Sha256::new(); hasher.update(data); let result = hasher.finalize(); format!("{:x}", result) } // Helper function to create test file info fn create_test_file_info(name: &str, path: &str, content: &[u8]) -> FileIngestionInfo { FileIngestionInfo { name: name.to_string(), relative_path: path.to_string(), full_path: path.to_string(), #[allow(deprecated)] path: path.to_string(), size: content.len() as i64, last_modified: Some(Utc::now()), etag: "test-etag".to_string(), mime_type: "application/pdf".to_string(), is_directory: false, created_at: None, permissions: None, owner: None, group: None, metadata: None, } } // Helper function to create test document fn create_test_document(user_id: Uuid, filename: &str, file_hash: String) -> Document { Document { id: Uuid::new_v4(), filename: filename.to_string(), original_filename: filename.to_string(), file_path: format!("/tmp/{}", filename), file_size: 1024, mime_type: "application/pdf".to_string(), content: None, ocr_text: None, ocr_confidence: None, ocr_word_count: None, ocr_processing_time_ms: None, ocr_status: Some("pending".to_string()), ocr_error: None, ocr_completed_at: None, ocr_retry_count: None, ocr_failure_reason: None, tags: Vec::new(), created_at: Utc::now(), updated_at: Utc::now(), user_id, file_hash: Some(file_hash), original_created_at: None, original_modified_at: None, source_path: None, source_type: None, source_id: None, file_permissions: None, file_owner: None, file_group: None, source_metadata: None, } } // Helper function to create test source fn create_test_source(user_id: Uuid, source_type: SourceType) -> Source { Source { id: Uuid::new_v4(), user_id, name: "Test Source".to_string(), source_type, config: serde_json::json!({}), status: SourceStatus::Idle, enabled: true, last_sync_at: None, last_error: None, last_error_at: None, total_files_synced: 0, total_files_pending: 0, total_size_bytes: 0, created_at: Utc::now(), updated_at: Utc::now(), validation_status: None, last_validation_at: None, validation_score: None, validation_issues: None, } } // Helper function to create a test user with unique identifier async fn create_test_user(db: &Database, username: &str) -> Result { use readur::models::{CreateUser, UserRole}; let unique_suffix = Uuid::new_v4().simple(); let user = CreateUser { username: format!("{}_{}", username, unique_suffix), email: format!("{}_{}@example.com", username, unique_suffix), password: "password123".to_string(), role: Some(UserRole::User), }; let created_user = db.create_user(user).await?; Ok(created_user.id) } async fn create_test_app_state() -> Result> { let config = Config::from_env().unwrap_or_else(|_| { let database_url = std::env::var("DATABASE_URL") .or_else(|_| std::env::var("TEST_DATABASE_URL")) .unwrap_or_else(|_| "postgresql://readur:readur@localhost:5432/readur".to_string()); Config { database_url, server_address: "127.0.0.1:8000".to_string(), jwt_secret: "test-secret".to_string(), upload_path: "./test-uploads".to_string(), watch_folder: "./test-watch".to_string(), user_watch_base_dir: "./user_watch".to_string(), enable_per_user_watch: false, allowed_file_types: vec!["pdf".to_string(), "txt".to_string()], watch_interval_seconds: Some(30), file_stability_check_ms: Some(500), max_file_age_hours: None, ocr_language: "eng".to_string(), concurrent_ocr_jobs: 2, ocr_timeout_seconds: 60, max_file_size_mb: 10, memory_limit_mb: 256, cpu_priority: "normal".to_string(), oidc_enabled: false, oidc_client_id: None, oidc_client_secret: None, oidc_issuer_url: None, oidc_redirect_uri: None, oidc_auto_register: None, allow_local_auth: None, s3_enabled: false, s3_config: None, } }); let db = Database::new(&config.database_url).await?; // Create file service let storage_config = readur::storage::StorageConfig::Local { upload_path: config.upload_path.clone() }; let storage_backend = readur::storage::factory::create_storage_backend(storage_config).await?; let file_service = std::sync::Arc::new(readur::services::file_service::FileService::with_storage(config.upload_path.clone(), storage_backend)); let queue_service = std::sync::Arc::new( readur::ocr::queue::OcrQueueService::new(db.clone(), db.get_pool().clone(), 1, file_service.clone()) ); Ok(Arc::new(AppState { db: db.clone(), config, file_service, webdav_scheduler: None, source_scheduler: None, queue_service, oidc_client: None, sync_progress_tracker: std::sync::Arc::new(readur::services::sync_progress_tracker::SyncProgressTracker::new()), user_watch_service: None, webdav_metrics_collector: None, })) } #[tokio::test] async fn test_source_sync_duplicate_detection_skips_duplicate() -> Result<()> { let state = create_test_app_state().await?; let user_id = create_test_user(&state.db, "source_sync_test").await?; // Test content let test_content = b"This is test content for source sync duplicate detection"; let file_hash = calculate_file_hash(test_content); // Create existing document with same hash let existing_doc = create_test_document(user_id, "existing.pdf", file_hash.clone()); state.db.create_document(existing_doc).await?; // Check if duplicate exists using the efficient method let duplicate_check = state.db.get_document_by_user_and_hash(user_id, &file_hash).await?; assert!(duplicate_check.is_some(), "Should find existing document with same hash"); let found_doc = duplicate_check.unwrap(); assert_eq!(found_doc.file_hash, Some(file_hash)); assert_eq!(found_doc.user_id, user_id); Ok(()) } #[tokio::test] async fn test_source_sync_duplicate_detection_processes_unique() -> Result<()> { let state = create_test_app_state().await?; let user_id = create_test_user(&state.db, "source_sync_test").await?; // Test content let test_content = b"This is unique content that should be processed by source sync"; let file_hash = calculate_file_hash(test_content); // Verify no existing document with this hash let duplicate_check = state.db.get_document_by_user_and_hash(user_id, &file_hash).await?; assert!(duplicate_check.is_none(), "Should not find any existing document with this hash"); // This indicates the file would be processed normally Ok(()) } #[tokio::test] async fn test_source_sync_duplicate_different_users() -> Result<()> { let state = create_test_app_state().await?; let user1_id = create_test_user(&state.db, "source_sync_user1").await?; let user2_id = create_test_user(&state.db, "source_sync_user2").await?; // Test content let test_content = b"Shared content between different users in source sync"; let file_hash = calculate_file_hash(test_content); // Create document for user1 with this hash let user1_doc = create_test_document(user1_id, "user1.pdf", file_hash.clone()); state.db.create_document(user1_doc).await?; // Check that user2 doesn't see user1's document as duplicate let duplicate_check = state.db.get_document_by_user_and_hash(user2_id, &file_hash).await?; assert!(duplicate_check.is_none(), "User2 should not see user1's document as duplicate"); // User2 should be able to create their own document with same hash let user2_doc = create_test_document(user2_id, "user2.pdf", file_hash.clone()); let result = state.db.create_document(user2_doc).await; assert!(result.is_ok(), "User2 should be able to create document with same hash"); Ok(()) } #[tokio::test] async fn test_source_sync_hash_calculation_consistency() -> Result<()> { let test_content = b"Test content for hash consistency in source sync"; // Calculate hash multiple times let hash1 = calculate_file_hash(test_content); let hash2 = calculate_file_hash(test_content); let hash3 = calculate_file_hash(test_content); // All hashes should be identical assert_eq!(hash1, hash2); assert_eq!(hash2, hash3); // Hash should be 64 characters (SHA256 hex) assert_eq!(hash1.len(), 64); // Should be valid hex assert!(hash1.chars().all(|c| c.is_ascii_hexdigit())); Ok(()) } #[tokio::test] async fn test_source_sync_duplicate_detection_performance() -> Result<()> { let state = create_test_app_state().await?; let user_id = create_test_user(&state.db, "source_sync_test").await?; // Create multiple documents with different hashes let mut created_hashes = Vec::new(); for i in 0..10 { let content = format!("Test content number {}", i); let hash = calculate_file_hash(content.as_bytes()); created_hashes.push(hash.clone()); let doc = create_test_document(user_id, &format!("test{}.pdf", i), hash); state.db.create_document(doc).await?; } // Test lookup performance - should be fast even with multiple documents let start = std::time::Instant::now(); for hash in &created_hashes { let result = state.db.get_document_by_user_and_hash(user_id, hash).await?; assert!(result.is_some(), "Should find document with hash: {}", hash); } let duration = start.elapsed(); assert!(duration.as_millis() < 1000, "Hash lookups should be fast: {:?}", duration); Ok(()) } #[tokio::test] async fn test_source_sync_file_modification_detection() -> Result<()> { let state = create_test_app_state().await?; let user_id = create_test_user(&state.db, "source_sync_test").await?; // Original content let original_content = b"Original file content"; let original_hash = calculate_file_hash(original_content); // Modified content (same file, different content) let modified_content = b"Modified file content"; let modified_hash = calculate_file_hash(modified_content); // Create document with original content let original_doc = create_test_document(user_id, "test.pdf", original_hash.clone()); state.db.create_document(original_doc).await?; // Check original content is found let original_check = state.db.get_document_by_user_and_hash(user_id, &original_hash).await?; assert!(original_check.is_some(), "Should find document with original hash"); // Check modified content is not found (different hash) let modified_check = state.db.get_document_by_user_and_hash(user_id, &modified_hash).await?; assert!(modified_check.is_none(), "Should not find document with modified hash"); // Verify hashes are actually different assert_ne!(original_hash, modified_hash, "Original and modified content should have different hashes"); Ok(()) } #[tokio::test] async fn test_source_sync_edge_case_empty_files() -> Result<()> { let state = create_test_app_state().await?; let user_id = create_test_user(&state.db, "source_sync_test").await?; // Empty file content let empty_content = b""; let empty_hash = calculate_file_hash(empty_content); // Create document with empty content let empty_doc = create_test_document(user_id, "empty.pdf", empty_hash.clone()); state.db.create_document(empty_doc).await?; // Check empty file is found let empty_check = state.db.get_document_by_user_and_hash(user_id, &empty_hash).await?; assert!(empty_check.is_some(), "Should find document with empty content hash"); // Verify empty hash is the known SHA256 empty string hash assert_eq!(empty_hash, "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"); Ok(()) } #[tokio::test] async fn test_source_sync_large_file_hash_consistency() -> Result<()> { // Simulate large file content let large_content = vec![b'A'; 10_000_000]; // 10MB of 'A' characters // Calculate hash let hash = calculate_file_hash(&large_content); // Hash should still be 64 characters assert_eq!(hash.len(), 64); assert!(hash.chars().all(|c| c.is_ascii_hexdigit())); // Calculate same hash again to ensure consistency let hash2 = calculate_file_hash(&large_content); assert_eq!(hash, hash2); Ok(()) } #[tokio::test] async fn test_source_sync_binary_file_handling() -> Result<()> { let state = create_test_app_state().await?; let user_id = create_test_user(&state.db, "source_sync_test").await?; // Binary content (PDF header + some binary data) let mut binary_content = b"%PDF-1.4\n".to_vec(); binary_content.extend_from_slice(&[0u8, 1u8, 2u8, 3u8, 255u8, 254u8, 253u8]); let binary_hash = calculate_file_hash(&binary_content); // Create document with binary content let binary_doc = create_test_document(user_id, "binary.pdf", binary_hash.clone()); state.db.create_document(binary_doc).await?; // Check binary file is found let binary_check = state.db.get_document_by_user_and_hash(user_id, &binary_hash).await?; assert!(binary_check.is_some(), "Should find document with binary content hash"); Ok(()) } #[tokio::test] async fn test_source_sync_unicode_filename_handling() -> Result<()> { let state = create_test_app_state().await?; let user_id = create_test_user(&state.db, "source_sync_test").await?; // Unicode content and filename let unicode_content = "Test content with unicode: 测试内容 🚀 café".as_bytes(); let unicode_hash = calculate_file_hash(unicode_content); // Create document with unicode filename let unicode_doc = create_test_document(user_id, "测试文档🚀.pdf", unicode_hash.clone()); state.db.create_document(unicode_doc).await?; // Check unicode file is found let unicode_check = state.db.get_document_by_user_and_hash(user_id, &unicode_hash).await?; assert!(unicode_check.is_some(), "Should find document with unicode content hash"); let found_doc = unicode_check.unwrap(); assert_eq!(found_doc.filename, "测试文档🚀.pdf"); Ok(()) } #[tokio::test] async fn test_source_sync_concurrent_hash_operations() -> Result<()> { let state = create_test_app_state().await?; let user_id = create_test_user(&state.db, "source_sync_test").await?; // Create multiple concurrent hash lookup operations let mut handles = Vec::new(); for i in 0..20 { let state_clone = state.clone(); let hash = format!("{}test_hash_concurrent_{}", "a".repeat(40), i); let handle = tokio::spawn(async move { state_clone.db.get_document_by_user_and_hash(user_id, &hash).await }); handles.push(handle); } // Wait for all concurrent operations let mut results = Vec::new(); for handle in handles { let result = handle.await??; results.push(result); } // All should return None (no documents exist with these hashes) for (i, result) in results.iter().enumerate() { assert!(result.is_none(), "Concurrent operation {} should return None", i); } Ok(()) } #[tokio::test] async fn test_source_sync_duplicate_prevention_race_condition() -> Result<()> { let state = create_test_app_state().await?; let user_id = create_test_user(&state.db, "source_sync_test").await?; let test_hash = "race_condition_test_hash_123456789012345678901234567890123456"; // Try to create multiple documents with same hash concurrently let mut handles = Vec::new(); for i in 0..5 { let state_clone = state.clone(); let hash_clone = test_hash.to_string(); let handle = tokio::spawn(async move { let doc = create_test_document(user_id, &format!("test{}.pdf", i), hash_clone); state_clone.db.create_document(doc).await }); handles.push(handle); } // Wait for all operations and count successes let mut success_count = 0; let mut error_count = 0; for handle in handles { match handle.await? { Ok(_) => success_count += 1, Err(_) => error_count += 1, } } // Only one should succeed due to unique constraint assert_eq!(success_count, 1, "Only one document should be created successfully"); assert_eq!(error_count, 4, "Four operations should fail due to duplicate hash"); Ok(()) }