/*! * Thread Separation and Performance Unit Tests * * Tests for thread separation and performance optimization including: * - Dedicated runtime separation (OCR, Background, DB) * - Thread pool isolation and resource allocation * - Performance monitoring and metrics * - Memory usage and CPU utilization * - Contention prevention between sync and OCR * - Database connection pool separation */ use std::sync::{Arc, Mutex, atomic::{AtomicU64, AtomicU32, Ordering}}; use std::time::{Duration, Instant, SystemTime}; use std::thread; use uuid::Uuid; use tokio::runtime::{Builder, Runtime}; use tokio::time::{sleep, timeout}; /// Test runtime configuration and separation #[test] fn test_runtime_configuration() { // Test OCR runtime configuration let ocr_runtime = Builder::new_multi_thread() .worker_threads(3) .thread_name("readur-ocr") .enable_all() .build(); assert!(ocr_runtime.is_ok(), "OCR runtime should be created successfully"); // Test background runtime configuration let background_runtime = Builder::new_multi_thread() .worker_threads(2) .thread_name("readur-background") .enable_all() .build(); assert!(background_runtime.is_ok(), "Background runtime should be created successfully"); // Test DB runtime configuration let db_runtime = Builder::new_multi_thread() .worker_threads(2) .thread_name("readur-db") .enable_all() .build(); assert!(db_runtime.is_ok(), "DB runtime should be created successfully"); } #[test] fn test_thread_pool_isolation() { // Test that different thread pools don't interfere with each other let ocr_counter = Arc::new(AtomicU32::new(0)); let background_counter = Arc::new(AtomicU32::new(0)); let db_counter = Arc::new(AtomicU32::new(0)); // Create separate runtimes let ocr_rt = Builder::new_multi_thread() .worker_threads(2) // Reduced thread count .thread_name("test-ocr") .enable_time() // Enable timers .build() .unwrap(); let bg_rt = Builder::new_multi_thread() .worker_threads(2) .thread_name("test-bg") .enable_time() // Enable timers .build() .unwrap(); let db_rt = Builder::new_multi_thread() .worker_threads(2) .thread_name("test-db") .enable_time() // Enable timers .build() .unwrap(); // Use scoped threads to avoid deadlocks std::thread::scope(|s| { let ocr_counter_clone = Arc::clone(&ocr_counter); let ocr_handle = s.spawn(move || { ocr_rt.block_on(async { for _ in 0..50 { // Reduced iterations ocr_counter_clone.fetch_add(1, Ordering::Relaxed); sleep(Duration::from_millis(1)).await; } }); }); let bg_counter_clone = Arc::clone(&background_counter); let bg_handle = s.spawn(move || { bg_rt.block_on(async { for _ in 0..50 { // Reduced iterations bg_counter_clone.fetch_add(1, Ordering::Relaxed); sleep(Duration::from_millis(1)).await; } }); }); let db_counter_clone = Arc::clone(&db_counter); let db_handle = s.spawn(move || { db_rt.block_on(async { for _ in 0..50 { // Reduced iterations db_counter_clone.fetch_add(1, Ordering::Relaxed); sleep(Duration::from_millis(1)).await; } }); }); // Wait for all threads to complete ocr_handle.join().unwrap(); bg_handle.join().unwrap(); db_handle.join().unwrap(); }); // Verify all work completed assert_eq!(ocr_counter.load(Ordering::Relaxed), 50); assert_eq!(background_counter.load(Ordering::Relaxed), 50); assert_eq!(db_counter.load(Ordering::Relaxed), 50); } #[tokio::test] async fn test_database_connection_pool_separation() { // Test separate connection pools for different workloads let web_pool = DatabaseConnectionPool::new("web", 20, 2); let background_pool = DatabaseConnectionPool::new("background", 30, 3); assert_eq!(web_pool.max_connections, 20); assert_eq!(web_pool.min_connections, 2); assert_eq!(web_pool.pool_name, "web"); assert_eq!(background_pool.max_connections, 30); assert_eq!(background_pool.min_connections, 3); assert_eq!(background_pool.pool_name, "background"); // Test connection acquisition let web_conn = web_pool.acquire_connection().await; assert!(web_conn.is_ok(), "Should acquire web connection"); let bg_conn = background_pool.acquire_connection().await; assert!(bg_conn.is_ok(), "Should acquire background connection"); // Test that pools are isolated assert_ne!(web_pool.pool_id, background_pool.pool_id); } #[derive(Debug, Clone)] struct DatabaseConnectionPool { pool_name: String, max_connections: u32, min_connections: u32, pool_id: Uuid, active_connections: Arc, } impl DatabaseConnectionPool { fn new(name: &str, max_conn: u32, min_conn: u32) -> Self { Self { pool_name: name.to_string(), max_connections: max_conn, min_connections: min_conn, pool_id: Uuid::new_v4(), active_connections: Arc::new(AtomicU32::new(0)), } } async fn acquire_connection(&self) -> Result { let current = self.active_connections.load(Ordering::Relaxed); if current >= self.max_connections { return Err("Max connections reached".to_string()); } self.active_connections.fetch_add(1, Ordering::Relaxed); Ok(DatabaseConnection { id: Uuid::new_v4(), pool_name: self.pool_name.clone(), }) } } #[derive(Debug)] struct DatabaseConnection { id: Uuid, pool_name: String, } #[test] fn test_performance_metrics_collection() { let metrics = PerformanceMetrics::new(); // Test OCR metrics metrics.record_ocr_operation(Duration::from_millis(150), true); metrics.record_ocr_operation(Duration::from_millis(200), true); metrics.record_ocr_operation(Duration::from_millis(100), false); // Failed let ocr_stats = metrics.get_ocr_stats(); assert_eq!(ocr_stats.total_operations, 3); assert_eq!(ocr_stats.successful_operations, 2); assert_eq!(ocr_stats.failed_operations, 1); let avg_duration = ocr_stats.average_duration(); assert!(avg_duration > Duration::from_millis(100)); assert!(avg_duration < Duration::from_millis(200)); // Test sync metrics metrics.record_sync_operation(Duration::from_secs(30), 50, 45); metrics.record_sync_operation(Duration::from_secs(45), 100, 95); let sync_stats = metrics.get_sync_stats(); assert_eq!(sync_stats.total_operations, 2); assert_eq!(sync_stats.total_files_processed, 150); assert_eq!(sync_stats.total_files_successful, 140); } struct PerformanceMetrics { ocr_operations: Arc>>, sync_operations: Arc>>, memory_samples: Arc>>, } impl PerformanceMetrics { fn new() -> Self { Self { ocr_operations: Arc::new(Mutex::new(Vec::new())), sync_operations: Arc::new(Mutex::new(Vec::new())), memory_samples: Arc::new(Mutex::new(Vec::new())), } } fn record_ocr_operation(&self, duration: Duration, success: bool) { let mut ops = self.ocr_operations.lock().unwrap(); ops.push(OcrOperation { duration, success, timestamp: SystemTime::now(), }); } fn record_sync_operation(&self, duration: Duration, files_found: u32, files_processed: u32) { let mut ops = self.sync_operations.lock().unwrap(); ops.push(SyncOperation { duration, files_found, files_processed, timestamp: SystemTime::now(), }); } fn get_ocr_stats(&self) -> OcrStats { let ops = self.ocr_operations.lock().unwrap(); let total = ops.len() as u32; let successful = ops.iter().filter(|op| op.success).count() as u32; let failed = total - successful; let total_duration: Duration = ops.iter().map(|op| op.duration).sum(); OcrStats { total_operations: total, successful_operations: successful, failed_operations: failed, total_duration, } } fn get_sync_stats(&self) -> SyncStats { let ops = self.sync_operations.lock().unwrap(); let total = ops.len() as u32; let total_files = ops.iter().map(|op| op.files_found).sum(); let successful_files = ops.iter().map(|op| op.files_processed).sum(); SyncStats { total_operations: total, total_files_processed: total_files, total_files_successful: successful_files, } } } #[derive(Debug, Clone)] struct OcrOperation { duration: Duration, success: bool, timestamp: SystemTime, } #[derive(Debug, Clone)] struct SyncOperation { duration: Duration, files_found: u32, files_processed: u32, timestamp: SystemTime, } #[derive(Debug, Clone)] struct MemorySample { heap_usage_mb: f64, stack_usage_mb: f64, timestamp: SystemTime, } #[derive(Debug)] struct OcrStats { total_operations: u32, successful_operations: u32, failed_operations: u32, total_duration: Duration, } impl OcrStats { fn success_rate(&self) -> f64 { if self.total_operations == 0 { 0.0 } else { self.successful_operations as f64 / self.total_operations as f64 } } fn average_duration(&self) -> Duration { if self.total_operations == 0 { Duration::ZERO } else { self.total_duration / self.total_operations } } } #[derive(Debug)] struct SyncStats { total_operations: u32, total_files_processed: u32, total_files_successful: u32, } #[test] fn test_memory_usage_monitoring() { let memory_monitor = MemoryMonitor::new(); // Simulate memory allocation let mut allocations = Vec::new(); for i in 0..100 { let allocation = vec![0u8; 1024 * 1024]; // 1MB allocation allocations.push(allocation); if i % 10 == 0 { memory_monitor.record_sample(); } } let stats = memory_monitor.get_stats(); assert!(stats.samples.len() >= 10, "Should have memory samples"); // Check that memory usage increased let first_sample = &stats.samples[0]; let last_sample = &stats.samples[stats.samples.len() - 1]; assert!(last_sample.heap_usage_mb >= first_sample.heap_usage_mb, "Memory usage should increase with allocations"); } struct MemoryMonitor { samples: Arc>>, } impl MemoryMonitor { fn new() -> Self { Self { samples: Arc::new(Mutex::new(Vec::new())), } } fn record_sample(&self) { let usage = self.get_current_memory_usage(); let mut samples = self.samples.lock().unwrap(); samples.push(usage); } fn get_current_memory_usage(&self) -> MemoryUsage { // In a real implementation, this would use system APIs to get actual memory usage // For testing, we simulate increasing usage let samples_count = self.samples.lock().unwrap().len(); MemoryUsage { heap_usage_mb: 50.0 + (samples_count as f64 * 5.0), // Simulated growth rss_mb: 100.0 + (samples_count as f64 * 10.0), timestamp: SystemTime::now(), } } fn get_stats(&self) -> MemoryStats { let samples = self.samples.lock().unwrap().clone(); MemoryStats { samples, } } } #[derive(Debug, Clone)] struct MemoryUsage { heap_usage_mb: f64, rss_mb: f64, timestamp: SystemTime, } #[derive(Debug)] struct MemoryStats { samples: Vec, } #[test] fn test_cpu_utilization_monitoring() { let cpu_monitor = CpuMonitor::new(); // Simulate CPU-intensive work let start = Instant::now(); let mut counter = 0u64; while start.elapsed() < Duration::from_millis(100) { counter += 1; if counter % 10000 == 0 { cpu_monitor.record_sample(); } } let stats = cpu_monitor.get_stats(); assert!(!stats.samples.is_empty(), "Should have CPU samples"); // Verify that CPU usage was recorded let avg_usage = stats.average_usage(); assert!(avg_usage >= 0.0 && avg_usage <= 100.0, "CPU usage should be between 0-100%"); } struct CpuMonitor { samples: Arc>>, } impl CpuMonitor { fn new() -> Self { Self { samples: Arc::new(Mutex::new(Vec::new())), } } fn record_sample(&self) { let usage = self.get_current_cpu_usage(); let mut samples = self.samples.lock().unwrap(); samples.push(usage); } fn get_current_cpu_usage(&self) -> CpuUsage { // Simulate CPU usage measurement let samples_count = self.samples.lock().unwrap().len(); CpuUsage { overall_percent: 25.0 + (samples_count as f64 * 2.0).min(75.0), user_percent: 20.0 + (samples_count as f64 * 1.5).min(60.0), system_percent: 5.0 + (samples_count as f64 * 0.5).min(15.0), timestamp: SystemTime::now(), } } fn get_stats(&self) -> CpuStats { let samples = self.samples.lock().unwrap().clone(); CpuStats { samples, } } } #[derive(Debug, Clone)] struct CpuUsage { overall_percent: f64, user_percent: f64, system_percent: f64, timestamp: SystemTime, } #[derive(Debug)] struct CpuStats { samples: Vec, } impl CpuStats { fn average_usage(&self) -> f64 { if self.samples.is_empty() { 0.0 } else { let total: f64 = self.samples.iter().map(|s| s.overall_percent).sum(); total / self.samples.len() as f64 } } } #[tokio::test] async fn test_thread_contention_prevention() { // Test that OCR and sync operations don't contend for resources let shared_resource = Arc::new(Mutex::new(SharedResource::new())); let ocr_completed = Arc::new(AtomicU32::new(0)); let sync_completed = Arc::new(AtomicU32::new(0)); let resource_clone1 = Arc::clone(&shared_resource); let ocr_counter = Arc::clone(&ocr_completed); // Simulate OCR work let ocr_handle = tokio::spawn(async move { for _ in 0..10 { { let mut resource = resource_clone1.lock().unwrap(); resource.ocr_operations += 1; } sleep(Duration::from_millis(10)).await; ocr_counter.fetch_add(1, Ordering::Relaxed); } }); let resource_clone2 = Arc::clone(&shared_resource); let sync_counter = Arc::clone(&sync_completed); // Simulate sync work let sync_handle = tokio::spawn(async move { for _ in 0..10 { { let mut resource = resource_clone2.lock().unwrap(); resource.sync_operations += 1; } sleep(Duration::from_millis(10)).await; sync_counter.fetch_add(1, Ordering::Relaxed); } }); // Wait for both to complete tokio::try_join!(ocr_handle, sync_handle).unwrap(); // Verify both completed successfully assert_eq!(ocr_completed.load(Ordering::Relaxed), 10); assert_eq!(sync_completed.load(Ordering::Relaxed), 10); let resource = shared_resource.lock().unwrap(); assert_eq!(resource.ocr_operations, 10); assert_eq!(resource.sync_operations, 10); } #[derive(Debug)] struct SharedResource { ocr_operations: u32, sync_operations: u32, } impl SharedResource { fn new() -> Self { Self { ocr_operations: 0, sync_operations: 0, } } } #[test] fn test_performance_degradation_detection() { let performance_tracker = PerformanceTracker::new(); // Record baseline performance for _ in 0..10 { performance_tracker.record_operation(Duration::from_millis(100), OperationType::Sync); } // Record degraded performance for _ in 0..5 { performance_tracker.record_operation(Duration::from_millis(300), OperationType::Sync); } let degradation = performance_tracker.detect_performance_degradation(OperationType::Sync); assert!(degradation.is_some(), "Should detect performance degradation"); let degradation = degradation.unwrap(); assert!(degradation.severity > 1.0, "Should show significant degradation"); assert!(degradation.baseline_duration < degradation.current_duration); } struct PerformanceTracker { operations: Arc>>, } impl PerformanceTracker { fn new() -> Self { Self { operations: Arc::new(Mutex::new(Vec::new())), } } fn record_operation(&self, duration: Duration, op_type: OperationType) { let mut ops = self.operations.lock().unwrap(); ops.push(PerformanceOperation { duration, op_type, timestamp: SystemTime::now(), }); } fn detect_performance_degradation(&self, op_type: OperationType) -> Option { let ops = self.operations.lock().unwrap(); let relevant_ops: Vec<_> = ops.iter() .filter(|op| op.op_type == op_type) .collect(); if relevant_ops.len() < 10 { return None; // Need more data } // Calculate baseline (first 10 operations) let baseline_duration: Duration = relevant_ops.iter() .take(10) .map(|op| op.duration) .sum::() / 10; // Calculate recent performance (last 5 operations) let recent_duration: Duration = relevant_ops.iter() .rev() .take(5) .map(|op| op.duration) .sum::() / 5; let severity = recent_duration.as_millis() as f64 / baseline_duration.as_millis() as f64; if severity > 1.5 { // 50% degradation threshold Some(PerformanceDegradation { baseline_duration, current_duration: recent_duration, severity, operation_type: op_type, }) } else { None } } } #[derive(Debug, Clone, PartialEq)] enum OperationType { Sync, Ocr, Database, } #[derive(Debug, Clone)] struct PerformanceOperation { duration: Duration, op_type: OperationType, timestamp: SystemTime, } #[derive(Debug)] struct PerformanceDegradation { baseline_duration: Duration, current_duration: Duration, severity: f64, operation_type: OperationType, } #[tokio::test] async fn test_backpressure_handling() { let queue = Arc::new(Mutex::new(TaskQueue::new(10))); // Max 10 items let processed_count = Arc::new(AtomicU32::new(0)); let stop_signal = Arc::new(AtomicU32::new(0)); let queue_clone = Arc::clone(&queue); let count_clone = Arc::clone(&processed_count); let stop_clone = Arc::clone(&stop_signal); // Start processor with timeout let processor_handle = tokio::spawn(async move { let start_time = Instant::now(); loop { // Exit if timeout exceeded (30 seconds) if start_time.elapsed() > Duration::from_secs(30) { break; } // Exit if stop signal received if stop_clone.load(Ordering::Relaxed) > 0 { break; } let task = { let mut q = queue_clone.lock().unwrap(); q.pop() }; match task { Some(_task) => { // Simulate processing sleep(Duration::from_millis(5)).await; // Faster processing count_clone.fetch_add(1, Ordering::Relaxed); }, None => { sleep(Duration::from_millis(2)).await; // Shorter sleep } } } }); // Try to add more tasks than queue capacity let mut successful_adds = 0; let mut backpressure_hits = 0; // Add tasks more aggressively to trigger backpressure for i in 0..25 { let mut queue_ref = queue.lock().unwrap(); if queue_ref.try_push(Task { id: i }) { successful_adds += 1; } else { backpressure_hits += 1; } drop(queue_ref); sleep(Duration::from_millis(1)).await; } // Wait a bit for processing, then signal stop sleep(Duration::from_millis(200)).await; stop_signal.store(1, Ordering::Relaxed); // Wait for processor with timeout let _ = timeout(Duration::from_secs(5), processor_handle).await; println!("Successful adds: {}, Backpressure hits: {}, Processed: {}", successful_adds, backpressure_hits, processed_count.load(Ordering::Relaxed)); assert!(backpressure_hits > 0, "Should hit backpressure when queue is full"); assert!(successful_adds > 0, "Should successfully add some tasks"); // Don't require exact equality since processing may not complete all tasks assert!(processed_count.load(Ordering::Relaxed) > 0, "Should process some tasks"); } #[derive(Debug, Clone)] struct Task { id: u32, } struct TaskQueue { items: Vec, max_size: usize, } impl TaskQueue { fn new(max_size: usize) -> Self { Self { items: Vec::new(), max_size, } } fn try_push(&mut self, task: Task) -> bool { if self.items.len() >= self.max_size { false // Backpressure } else { self.items.push(task); true } } fn pop(&mut self) -> Option { if self.items.is_empty() { None } else { Some(self.items.remove(0)) } } }