Readur/tests/simple_throttling_test.rs

370 lines
13 KiB
Rust

/*!
* Simple Throttling Test - Use runtime database connection
*
* This test uses the same database configuration as the running server
* to validate the throttling mechanism works correctly.
*/
use anyhow::Result;
use sqlx::{PgPool, Row};
use std::sync::Arc;
use tokio::time::{Duration, Instant, sleep};
use tracing::{info, warn, error};
use uuid::Uuid;
use readur::{
db::Database,
ocr_queue::OcrQueueService,
enhanced_ocr::EnhancedOcrService,
};
// Use the same database URL as the running server
fn get_test_db_url() -> String {
std::env::var("DATABASE_URL")
.or_else(|_| std::env::var("TEST_DATABASE_URL"))
.unwrap_or_else(|_| "postgresql://readur:readur@localhost:5432/readur".to_string())
}
struct SimpleThrottleTest {
pool: PgPool,
queue_service: Arc<OcrQueueService>,
}
impl SimpleThrottleTest {
async fn new() -> Result<Self> {
let db_url = get_test_db_url();
let pool = sqlx::postgres::PgPoolOptions::new()
.max_connections(20)
.acquire_timeout(Duration::from_secs(10))
.connect(&db_url)
.await?;
let db = Database::new(&db_url).await?;
// Create queue service with throttling (max 15 concurrent jobs)
let queue_service = Arc::new(OcrQueueService::new(
db.clone(),
pool.clone(),
15 // This should prevent DB pool exhaustion
));
Ok(Self {
pool,
queue_service,
})
}
async fn create_test_user(&self) -> Result<Uuid> {
let user_id = Uuid::new_v4();
// Use UUID for guaranteed uniqueness across concurrent test execution
let test_id = Uuid::new_v4().simple().to_string();
let nanos = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos();
let username = format!("throttle_test_{}_{}_{}", test_id, nanos, Uuid::new_v4().simple());
let email = format!("throttle_{}_{}@{}.example.com", test_id, nanos, Uuid::new_v4().simple());
sqlx::query(
r#"
INSERT INTO users (id, username, email, password_hash, role)
VALUES ($1, $2, $3, $4, 'user')
"#
)
.bind(user_id)
.bind(&username)
.bind(&email)
.bind("test_hash")
.execute(&self.pool)
.await?;
info!("✅ Created test user: {}", user_id);
Ok(user_id)
}
async fn create_test_documents(&self, user_id: Uuid, count: usize) -> Result<Vec<Uuid>> {
let mut doc_ids = Vec::new();
info!("📝 Creating {} test documents for throttling test", count);
for i in 1..=count {
let content = format!("THROTTLE-TEST-CONTENT-{:03}-{}", i, Uuid::new_v4());
let filename = format!("throttle_test_{:03}.txt", i);
let doc_id = Uuid::new_v4();
// Create document record
sqlx::query(
r#"
INSERT INTO documents (
id, filename, original_filename, file_path, file_size,
mime_type, content, user_id, ocr_status, created_at, updated_at
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, 'pending', NOW(), NOW())
"#
)
.bind(doc_id)
.bind(&filename)
.bind(&filename)
.bind(format!("/tmp/throttle_test_{}.txt", doc_id))
.bind(content.len() as i64)
.bind("text/plain")
.bind(&content)
.bind(user_id)
.execute(&self.pool)
.await?;
// Enqueue for OCR processing
let priority = 10 - (i % 5) as i32;
self.queue_service.enqueue_document(doc_id, priority, content.len() as i64).await?;
doc_ids.push(doc_id);
if i % 10 == 0 {
info!(" ✅ Created {} documents so far", i);
}
}
info!("✅ All {} documents created and enqueued", count);
Ok(doc_ids)
}
async fn simulate_concurrent_processing(&self, workers: usize, max_time_seconds: u64) -> Result<()> {
info!("🏭 Starting {} concurrent workers for {} seconds", workers, max_time_seconds);
let mut handles = Vec::new();
let end_time = Instant::now() + Duration::from_secs(max_time_seconds);
for worker_id in 1..=workers {
let queue_service = self.queue_service.clone();
let worker_end_time = end_time;
let handle = tokio::spawn(async move {
let worker_name = format!("worker-{}", worker_id);
let ocr_service = EnhancedOcrService::new("/tmp".to_string());
let mut jobs_processed = 0;
info!("Worker {} starting", worker_name);
while Instant::now() < worker_end_time {
match queue_service.dequeue().await {
Ok(Some(item)) => {
info!("Worker {} processing job {} for document {}",
worker_name, item.id, item.document_id);
// Process with built-in throttling
if let Err(e) = queue_service.process_item(item, &ocr_service).await {
error!("Worker {} processing error: {}", worker_name, e);
} else {
jobs_processed += 1;
}
}
Ok(None) => {
// No jobs available, wait a bit
sleep(Duration::from_millis(100)).await;
}
Err(e) => {
error!("Worker {} dequeue error: {}", worker_name, e);
sleep(Duration::from_secs(1)).await;
}
}
}
info!("Worker {} completed, processed {} jobs", worker_name, jobs_processed);
jobs_processed
});
handles.push(handle);
}
// Wait for all workers to complete
let mut total_processed = 0;
for handle in handles {
let jobs_processed = handle.await?;
total_processed += jobs_processed;
}
info!("🏁 All workers completed. Total jobs processed: {}", total_processed);
Ok(())
}
async fn check_results(&self, expected_docs: &[Uuid]) -> Result<TestResults> {
info!("🔍 Checking results for {} documents", expected_docs.len());
let mut results = TestResults {
total: expected_docs.len(),
completed: 0,
failed: 0,
pending: 0,
processing: 0,
empty_content: 0,
};
for doc_id in expected_docs {
let row = sqlx::query(
"SELECT ocr_status, ocr_text FROM documents WHERE id = $1"
)
.bind(doc_id)
.fetch_one(&self.pool)
.await?;
let status: Option<String> = row.get("ocr_status");
let ocr_text: Option<String> = row.get("ocr_text");
match status.as_deref() {
Some("completed") => {
results.completed += 1;
if ocr_text.as_deref().unwrap_or("").is_empty() {
results.empty_content += 1;
}
}
Some("failed") => results.failed += 1,
Some("processing") => results.processing += 1,
Some("pending") => results.pending += 1,
_ => {}
}
}
Ok(results)
}
async fn cleanup(&self, user_id: Uuid) -> Result<()> {
// Clean up test data
sqlx::query("DELETE FROM documents WHERE user_id = $1")
.bind(user_id)
.execute(&self.pool)
.await?;
sqlx::query("DELETE FROM users WHERE id = $1")
.bind(user_id)
.execute(&self.pool)
.await?;
info!("✅ Cleanup completed");
Ok(())
}
}
#[derive(Debug)]
struct TestResults {
total: usize,
completed: usize,
failed: usize,
pending: usize,
processing: usize,
empty_content: usize,
}
impl TestResults {
fn completion_rate(&self) -> f64 {
if self.total == 0 { return 0.0; }
((self.completed + self.failed) as f64 / self.total as f64) * 100.0
}
}
#[tokio::test]
async fn test_throttling_with_25_documents() {
println!("🚀 THROTTLING TEST - 25 DOCUMENTS");
println!("=================================");
let test = SimpleThrottleTest::new().await
.expect("Failed to initialize test");
// Create test user
let user_id = test.create_test_user().await
.expect("Failed to create test user");
// Create 25 test documents (this previously caused empty content)
let doc_count = 25;
let doc_ids = test.create_test_documents(user_id, doc_count).await
.expect("Failed to create test documents");
// Start concurrent processing for 60 seconds
test.simulate_concurrent_processing(5, 60).await
.expect("Failed to process documents");
// Wait a bit more for any remaining jobs
sleep(Duration::from_secs(10)).await;
// Check results
let results = test.check_results(&doc_ids).await
.expect("Failed to check results");
// Cleanup
test.cleanup(user_id).await.expect("Failed to cleanup");
// Print results
println!("\n🏆 TEST RESULTS:");
println!("================");
println!("📊 Total Documents: {}", results.total);
println!("✅ Completed: {}", results.completed);
println!("❌ Failed: {}", results.failed);
println!("⏳ Pending: {}", results.pending);
println!("🔄 Processing: {}", results.processing);
println!("🚫 Empty Content: {}", results.empty_content);
println!("📈 Completion Rate: {:.1}%", results.completion_rate());
// Key assertion: No empty content (this was the main issue before throttling)
assert_eq!(results.empty_content, 0,
"Found {} documents with empty content! Throttling failed to prevent DB pool exhaustion",
results.empty_content);
// Should have reasonable completion rate
assert!(results.completion_rate() >= 70.0,
"Completion rate too low: {:.1}% (expected >= 70%)", results.completion_rate());
println!("🎉 Throttling test PASSED! No empty content found.");
}
#[tokio::test]
async fn test_throttling_with_50_documents() {
println!("🚀 THROTTLING TEST - 50 DOCUMENTS");
println!("=================================");
let test = SimpleThrottleTest::new().await
.expect("Failed to initialize test");
// Create test user
let user_id = test.create_test_user().await
.expect("Failed to create test user");
// Create 50 test documents (this should definitely test the throttling)
let doc_count = 50;
let doc_ids = test.create_test_documents(user_id, doc_count).await
.expect("Failed to create test documents");
// Start concurrent processing for 120 seconds (longer for more documents)
test.simulate_concurrent_processing(8, 120).await
.expect("Failed to process documents");
// Wait a bit more for any remaining jobs
sleep(Duration::from_secs(15)).await;
// Check results
let results = test.check_results(&doc_ids).await
.expect("Failed to check results");
// Cleanup
test.cleanup(user_id).await.expect("Failed to cleanup");
// Print results
println!("\n🏆 TEST RESULTS:");
println!("================");
println!("📊 Total Documents: {}", results.total);
println!("✅ Completed: {}", results.completed);
println!("❌ Failed: {}", results.failed);
println!("⏳ Pending: {}", results.pending);
println!("🔄 Processing: {}", results.processing);
println!("🚫 Empty Content: {}", results.empty_content);
println!("📈 Completion Rate: {:.1}%", results.completion_rate());
// Key assertion: No empty content (this was the main issue before throttling)
assert_eq!(results.empty_content, 0,
"Found {} documents with empty content! Throttling failed to prevent DB pool exhaustion",
results.empty_content);
// Should have reasonable completion rate even with high load
assert!(results.completion_rate() >= 60.0,
"Completion rate too low: {:.1}% (expected >= 60%)", results.completion_rate());
println!("🎉 High-load throttling test PASSED! No empty content found with 50 documents.");
}