From ee0eaad56234e36a197062673850372832e21238 Mon Sep 17 00:00:00 2001 From: perf3ct Date: Mon, 16 Jun 2025 01:25:30 +0000 Subject: [PATCH] feat(unit): add more throttling/concurrency tests --- tests/simple_throttling_test.rs | 361 +++++++++++++++++++++++ tests/throttled_high_concurrency_test.rs | 2 +- 2 files changed, 362 insertions(+), 1 deletion(-) create mode 100644 tests/simple_throttling_test.rs diff --git a/tests/simple_throttling_test.rs b/tests/simple_throttling_test.rs new file mode 100644 index 0000000..90823dc --- /dev/null +++ b/tests/simple_throttling_test.rs @@ -0,0 +1,361 @@ +/*! + * Simple Throttling Test - Use runtime database connection + * + * This test uses the same database configuration as the running server + * to validate the throttling mechanism works correctly. + */ + +use anyhow::Result; +use sqlx::{PgPool, Row}; +use std::sync::Arc; +use tokio::time::{Duration, Instant, sleep}; +use tracing::{info, warn, error}; +use uuid::Uuid; + +use readur::{ + db::Database, + ocr_queue::OcrQueueService, + enhanced_ocr::EnhancedOcrService, +}; + +// Use the same database URL as the running server +const DB_URL: &str = "postgresql://readur:readur@localhost:5432/readur"; + +struct SimpleThrottleTest { + pool: PgPool, + queue_service: Arc, +} + +impl SimpleThrottleTest { + async fn new() -> Result { + let pool = sqlx::postgres::PgPoolOptions::new() + .max_connections(20) + .acquire_timeout(Duration::from_secs(10)) + .connect(DB_URL) + .await?; + + let db = Database::new(DB_URL).await?; + + // Create queue service with throttling (max 15 concurrent jobs) + let queue_service = Arc::new(OcrQueueService::new( + db.clone(), + pool.clone(), + 15 // This should prevent DB pool exhaustion + )); + + Ok(Self { + pool, + queue_service, + }) + } + + async fn create_test_user(&self) -> Result { + let user_id = Uuid::new_v4(); + let timestamp = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_millis(); + + sqlx::query( + r#" + INSERT INTO users (id, username, email, password_hash, role) + VALUES ($1, $2, $3, $4, 'user') + "# + ) + .bind(user_id) + .bind(format!("throttle_test_{}", timestamp)) + .bind(format!("throttle_{}@example.com", timestamp)) + .bind("test_hash") + .execute(&self.pool) + .await?; + + info!("āœ… Created test user: {}", user_id); + Ok(user_id) + } + + async fn create_test_documents(&self, user_id: Uuid, count: usize) -> Result> { + let mut doc_ids = Vec::new(); + + info!("šŸ“ Creating {} test documents for throttling test", count); + + for i in 1..=count { + let content = format!("THROTTLE-TEST-CONTENT-{:03}-{}", i, Uuid::new_v4()); + let filename = format!("throttle_test_{:03}.txt", i); + let doc_id = Uuid::new_v4(); + + // Create document record + sqlx::query( + r#" + INSERT INTO documents ( + id, filename, original_filename, file_path, file_size, + mime_type, content, user_id, ocr_status, created_at, updated_at + ) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, 'pending', NOW(), NOW()) + "# + ) + .bind(doc_id) + .bind(&filename) + .bind(&filename) + .bind(format!("/tmp/throttle_test_{}.txt", doc_id)) + .bind(content.len() as i64) + .bind("text/plain") + .bind(&content) + .bind(user_id) + .execute(&self.pool) + .await?; + + // Enqueue for OCR processing + let priority = 10 - (i % 5) as i32; + self.queue_service.enqueue_document(doc_id, priority, content.len() as i64).await?; + + doc_ids.push(doc_id); + + if i % 10 == 0 { + info!(" āœ… Created {} documents so far", i); + } + } + + info!("āœ… All {} documents created and enqueued", count); + Ok(doc_ids) + } + + async fn simulate_concurrent_processing(&self, workers: usize, max_time_seconds: u64) -> Result<()> { + info!("šŸ­ Starting {} concurrent workers for {} seconds", workers, max_time_seconds); + + let mut handles = Vec::new(); + let end_time = Instant::now() + Duration::from_secs(max_time_seconds); + + for worker_id in 1..=workers { + let queue_service = self.queue_service.clone(); + let worker_end_time = end_time; + + let handle = tokio::spawn(async move { + let worker_name = format!("worker-{}", worker_id); + let ocr_service = EnhancedOcrService::new("/tmp".to_string()); + let mut jobs_processed = 0; + + info!("Worker {} starting", worker_name); + + while Instant::now() < worker_end_time { + match queue_service.dequeue().await { + Ok(Some(item)) => { + info!("Worker {} processing job {} for document {}", + worker_name, item.id, item.document_id); + + // Process with built-in throttling + if let Err(e) = queue_service.process_item(item, &ocr_service).await { + error!("Worker {} processing error: {}", worker_name, e); + } else { + jobs_processed += 1; + } + } + Ok(None) => { + // No jobs available, wait a bit + sleep(Duration::from_millis(100)).await; + } + Err(e) => { + error!("Worker {} dequeue error: {}", worker_name, e); + sleep(Duration::from_secs(1)).await; + } + } + } + + info!("Worker {} completed, processed {} jobs", worker_name, jobs_processed); + jobs_processed + }); + + handles.push(handle); + } + + // Wait for all workers to complete + let mut total_processed = 0; + for handle in handles { + let jobs_processed = handle.await?; + total_processed += jobs_processed; + } + + info!("šŸ All workers completed. Total jobs processed: {}", total_processed); + Ok(()) + } + + async fn check_results(&self, expected_docs: &[Uuid]) -> Result { + info!("šŸ” Checking results for {} documents", expected_docs.len()); + + let mut results = TestResults { + total: expected_docs.len(), + completed: 0, + failed: 0, + pending: 0, + processing: 0, + empty_content: 0, + }; + + for doc_id in expected_docs { + let row = sqlx::query( + "SELECT ocr_status, ocr_text FROM documents WHERE id = $1" + ) + .bind(doc_id) + .fetch_one(&self.pool) + .await?; + + let status: Option = row.get("ocr_status"); + let ocr_text: Option = row.get("ocr_text"); + + match status.as_deref() { + Some("completed") => { + results.completed += 1; + if ocr_text.as_deref().unwrap_or("").is_empty() { + results.empty_content += 1; + } + } + Some("failed") => results.failed += 1, + Some("processing") => results.processing += 1, + Some("pending") => results.pending += 1, + _ => {} + } + } + + Ok(results) + } + + async fn cleanup(&self, user_id: Uuid) -> Result<()> { + // Clean up test data + sqlx::query("DELETE FROM documents WHERE user_id = $1") + .bind(user_id) + .execute(&self.pool) + .await?; + + sqlx::query("DELETE FROM users WHERE id = $1") + .bind(user_id) + .execute(&self.pool) + .await?; + + info!("āœ… Cleanup completed"); + Ok(()) + } +} + +#[derive(Debug)] +struct TestResults { + total: usize, + completed: usize, + failed: usize, + pending: usize, + processing: usize, + empty_content: usize, +} + +impl TestResults { + fn completion_rate(&self) -> f64 { + if self.total == 0 { return 0.0; } + ((self.completed + self.failed) as f64 / self.total as f64) * 100.0 + } +} + +#[tokio::test] +async fn test_throttling_with_25_documents() { + println!("šŸš€ THROTTLING TEST - 25 DOCUMENTS"); + println!("================================="); + + let test = SimpleThrottleTest::new().await + .expect("Failed to initialize test"); + + // Create test user + let user_id = test.create_test_user().await + .expect("Failed to create test user"); + + // Create 25 test documents (this previously caused empty content) + let doc_count = 25; + let doc_ids = test.create_test_documents(user_id, doc_count).await + .expect("Failed to create test documents"); + + // Start concurrent processing for 60 seconds + test.simulate_concurrent_processing(5, 60).await + .expect("Failed to process documents"); + + // Wait a bit more for any remaining jobs + sleep(Duration::from_secs(10)).await; + + // Check results + let results = test.check_results(&doc_ids).await + .expect("Failed to check results"); + + // Cleanup + test.cleanup(user_id).await.expect("Failed to cleanup"); + + // Print results + println!("\nšŸ† TEST RESULTS:"); + println!("================"); + println!("šŸ“Š Total Documents: {}", results.total); + println!("āœ… Completed: {}", results.completed); + println!("āŒ Failed: {}", results.failed); + println!("ā³ Pending: {}", results.pending); + println!("šŸ”„ Processing: {}", results.processing); + println!("🚫 Empty Content: {}", results.empty_content); + println!("šŸ“ˆ Completion Rate: {:.1}%", results.completion_rate()); + + // Key assertion: No empty content (this was the main issue before throttling) + assert_eq!(results.empty_content, 0, + "Found {} documents with empty content! Throttling failed to prevent DB pool exhaustion", + results.empty_content); + + // Should have reasonable completion rate + assert!(results.completion_rate() >= 70.0, + "Completion rate too low: {:.1}% (expected >= 70%)", results.completion_rate()); + + println!("šŸŽ‰ Throttling test PASSED! No empty content found."); +} + +#[tokio::test] +async fn test_throttling_with_50_documents() { + println!("šŸš€ THROTTLING TEST - 50 DOCUMENTS"); + println!("================================="); + + let test = SimpleThrottleTest::new().await + .expect("Failed to initialize test"); + + // Create test user + let user_id = test.create_test_user().await + .expect("Failed to create test user"); + + // Create 50 test documents (this should definitely test the throttling) + let doc_count = 50; + let doc_ids = test.create_test_documents(user_id, doc_count).await + .expect("Failed to create test documents"); + + // Start concurrent processing for 120 seconds (longer for more documents) + test.simulate_concurrent_processing(8, 120).await + .expect("Failed to process documents"); + + // Wait a bit more for any remaining jobs + sleep(Duration::from_secs(15)).await; + + // Check results + let results = test.check_results(&doc_ids).await + .expect("Failed to check results"); + + // Cleanup + test.cleanup(user_id).await.expect("Failed to cleanup"); + + // Print results + println!("\nšŸ† TEST RESULTS:"); + println!("================"); + println!("šŸ“Š Total Documents: {}", results.total); + println!("āœ… Completed: {}", results.completed); + println!("āŒ Failed: {}", results.failed); + println!("ā³ Pending: {}", results.pending); + println!("šŸ”„ Processing: {}", results.processing); + println!("🚫 Empty Content: {}", results.empty_content); + println!("šŸ“ˆ Completion Rate: {:.1}%", results.completion_rate()); + + // Key assertion: No empty content (this was the main issue before throttling) + assert_eq!(results.empty_content, 0, + "Found {} documents with empty content! Throttling failed to prevent DB pool exhaustion", + results.empty_content); + + // Should have reasonable completion rate even with high load + assert!(results.completion_rate() >= 60.0, + "Completion rate too low: {:.1}% (expected >= 60%)", results.completion_rate()); + + println!("šŸŽ‰ High-load throttling test PASSED! No empty content found with 50 documents."); +} \ No newline at end of file diff --git a/tests/throttled_high_concurrency_test.rs b/tests/throttled_high_concurrency_test.rs index 92e414c..d7f9155 100644 --- a/tests/throttled_high_concurrency_test.rs +++ b/tests/throttled_high_concurrency_test.rs @@ -24,7 +24,7 @@ use readur::{ request_throttler::RequestThrottler, }; -const TEST_DB_URL: &str = "postgresql://readur_user:readur_password@localhost:5432/readur"; +const TEST_DB_URL: &str = "postgresql://readur:readur@localhost:5432/readur"; struct ThrottledTestHarness { db: Database,