diff --git a/tests/debug_pipeline_test.rs.disabled b/tests/debug_pipeline_test.rs.disabled deleted file mode 100644 index 58dde66..0000000 --- a/tests/debug_pipeline_test.rs.disabled +++ /dev/null @@ -1,663 +0,0 @@ -/*! - * Debug OCR Pipeline Test - Trace every step to find corruption source - */ - -use reqwest::Client; -use serde_json::Value; -use std::time::{Duration, Instant}; -use tokio::time::sleep; -use uuid::Uuid; -use futures; - -use readur::models::{DocumentResponse, CreateUser, LoginRequest, LoginResponse}; - -fn get_base_url() -> String { - std::env::var("API_URL").unwrap_or_else(|_| "http://localhost:8000".to_string()) -} -const TIMEOUT: Duration = Duration::from_secs(120); - -struct PipelineDebugger { - client: Client, - token: String, -} - -impl PipelineDebugger { - async fn new() -> Self { - let client = Client::new(); - - // Debug: Print the base URL we're trying to connect to - let base_url = get_base_url(); - println!("šŸ” DEBUG: Attempting to connect to server at: {}", base_url); - - // Check server health with better error handling - println!("šŸ” DEBUG: Checking server health at: {}/api/health", base_url); - - let health_check_result = client - .get(&format!("{}/api/health", base_url)) - .timeout(Duration::from_secs(5)) - .send() - .await; - - match health_check_result { - Ok(response) => { - println!("šŸ” DEBUG: Health check response status: {}", response.status()); - if !response.status().is_success() { - let status = response.status(); - let body = response.text().await.unwrap_or_else(|_| "Unable to read response body".to_string()); - panic!("Server not healthy. Status: {}, Body: {}", status, body); - } - println!("āœ… DEBUG: Server health check passed"); - } - Err(e) => { - println!("āŒ DEBUG: Failed to connect to server health endpoint"); - println!("āŒ DEBUG: Error type: {:?}", e); - if e.is_timeout() { - panic!("Health check timed out after 5 seconds"); - } else if e.is_connect() { - panic!("Could not connect to server at {}. Is the server running?", base_url); - } else { - panic!("Health check failed with error: {}", e); - } - } - } - - // Create test user - let timestamp = std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap() - .as_millis(); - let username = format!("pipeline_debug_{}", timestamp); - let email = format!("pipeline_debug_{}@test.com", timestamp); - - // Register user - let user_data = CreateUser { - username: username.clone(), - email: email.clone(), - password: "testpass123".to_string(), - role: Some(readur::models::UserRole::User), - }; - - let register_response = client - .post(&format!("{}/api/auth/register", get_base_url())) - .json(&user_data) - .send() - .await - .expect("Registration should work"); - - if !register_response.status().is_success() { - panic!("Registration failed: {}", register_response.text().await.unwrap_or_default()); - } - - // Login - let login_data = LoginRequest { - username: username.clone(), - password: "testpass123".to_string(), - }; - - let login_response = client - .post(&format!("{}/api/auth/login", get_base_url())) - .json(&login_data) - .send() - .await - .expect("Login should work"); - - if !login_response.status().is_success() { - panic!("Login failed: {}", login_response.text().await.unwrap_or_default()); - } - - let login_result: LoginResponse = login_response.json().await.expect("Login should return JSON"); - let token = login_result.token; - - println!("āœ… Pipeline debugger initialized for user: {}", username); - - Self { client, token } - } - - async fn upload_document_with_debug(&self, content: &str, filename: &str) -> DocumentResponse { - println!("\nšŸ“¤ UPLOAD PHASE - Starting upload for: {}", filename); - println!(" Content: {}", content); - println!(" Content Length: {} bytes", content.len()); - - let part = reqwest::multipart::Part::text(content.to_string()) - .file_name(filename.to_string()) - .mime_str("text/plain") - .expect("Valid mime type"); - let form = reqwest::multipart::Form::new().part("file", part); - - let upload_start = Instant::now(); - let upload_url = format!("{}/api/documents", get_base_url()); - println!(" šŸ” DEBUG: Uploading to URL: {}", upload_url); - println!(" šŸ” DEBUG: Using token (first 10 chars): {}...", &self.token[..10.min(self.token.len())]); - - let response_result = self.client - .post(&upload_url) - .header("Authorization", format!("Bearer {}", self.token)) - .multipart(form) - .send() - .await; - - let response = match response_result { - Ok(resp) => { - println!(" šŸ” DEBUG: Upload request sent successfully"); - resp - } - Err(e) => { - println!(" āŒ DEBUG: Upload request failed"); - println!(" āŒ DEBUG: Error type: {:?}", e); - if e.is_timeout() { - panic!("Upload request timed out"); - } else if e.is_connect() { - panic!("Could not connect to server for upload. Error: {}", e); - } else if e.is_request() { - panic!("Request building failed: {}", e); - } else { - panic!("Upload failed with network error: {}", e); - } - } - }; - - let upload_duration = upload_start.elapsed(); - println!(" šŸ” DEBUG: Upload response received. Status: {}", response.status()); - - if !response.status().is_success() { - let status = response.status(); - let headers = response.headers().clone(); - let body = response.text().await.unwrap_or_else(|_| "Unable to read response body".to_string()); - - println!(" āŒ DEBUG: Upload failed with status: {}", status); - println!(" āŒ DEBUG: Response headers: {:?}", headers); - println!(" āŒ DEBUG: Response body: {}", body); - - panic!("Upload failed with status {}: {}", status, body); - } - - let document: DocumentResponse = response.json().await.expect("Valid JSON"); - - println!(" āœ… Upload completed in {:?}", upload_duration); - println!(" šŸ“„ Document ID: {}", document.id); - println!(" šŸ“‚ Filename: {}", document.filename); - println!(" šŸ“ File Size: {} bytes", document.file_size); - println!(" šŸ·ļø MIME Type: {}", document.mime_type); - println!(" šŸ”„ Initial OCR Status: {:?}", document.ocr_status); - - document - } - - async fn trace_ocr_processing(&self, document_id: Uuid, expected_content: &str) -> Value { - println!("\nšŸ” OCR PROCESSING PHASE - Tracing for document: {}", document_id); - - let start = Instant::now(); - let mut last_status = String::new(); - let mut status_changes = Vec::new(); - let mut poll_count = 0; - - while start.elapsed() < TIMEOUT { - poll_count += 1; - - let response = self.client - .get(&format!("{}/api/documents/{}/ocr", get_base_url(), document_id)) - .header("Authorization", format!("Bearer {}", self.token)) - .send() - .await - .expect("OCR endpoint should work"); - - if !response.status().is_success() { - println!(" āŒ OCR endpoint error: {}", response.status()); - sleep(Duration::from_millis(100)).await; - continue; - } - - let ocr_data: Value = response.json().await.expect("Valid JSON"); - let current_status = ocr_data["ocr_status"].as_str().unwrap_or("unknown").to_string(); - - // Track status changes - if current_status != last_status { - let elapsed = start.elapsed(); - status_changes.push((elapsed, current_status.clone())); - println!(" šŸ“‹ Status Change #{}: {} -> {} (after {:?})", - status_changes.len(), last_status, current_status, elapsed); - last_status = current_status.clone(); - } - - // Detailed logging every 10 polls or on status change - if poll_count % 10 == 0 || status_changes.len() > 0 { - println!(" šŸ”„ Poll #{}: Status={}, HasText={}, TextLen={}", - poll_count, - current_status, - ocr_data["has_ocr_text"].as_bool().unwrap_or(false), - ocr_data["ocr_text"].as_str().unwrap_or("").len() - ); - - if let Some(confidence) = ocr_data["ocr_confidence"].as_f64() { - println!(" šŸ“Š Confidence: {:.1}%", confidence); - } - if let Some(word_count) = ocr_data["ocr_word_count"].as_i64() { - println!(" šŸ“ Word Count: {}", word_count); - } - if let Some(error) = ocr_data["ocr_error"].as_str() { - println!(" āŒ Error: {}", error); - } - } - - // Check if processing is complete - match current_status.as_str() { - "completed" => { - println!(" āœ… OCR Processing completed after {:?} and {} polls", start.elapsed(), poll_count); - - // Detailed final analysis - let ocr_text = ocr_data["ocr_text"].as_str().unwrap_or(""); - println!("\n šŸ”¬ FINAL CONTENT ANALYSIS:"); - println!(" Expected: {}", expected_content); - println!(" Actual: {}", ocr_text); - println!(" Match: {}", ocr_text == expected_content); - println!(" Expected Length: {} chars", expected_content.len()); - println!(" Actual Length: {} chars", ocr_text.len()); - - if ocr_text != expected_content { - println!(" āš ļø CONTENT MISMATCH DETECTED!"); - - // Character-by-character comparison - let expected_chars: Vec = expected_content.chars().collect(); - let actual_chars: Vec = ocr_text.chars().collect(); - - for (i, (e, a)) in expected_chars.iter().zip(actual_chars.iter()).enumerate() { - if e != a { - println!(" Diff at position {}: expected '{}' got '{}'", i, e, a); - break; - } - } - } - - return ocr_data; - } - "failed" => { - println!(" āŒ OCR Processing failed after {:?} and {} polls", start.elapsed(), poll_count); - return ocr_data; - } - _ => { - // Continue polling - } - } - - sleep(Duration::from_millis(50)).await; - } - - panic!("OCR processing did not complete within {:?}", TIMEOUT); - } - - async fn get_all_documents(&self) -> Vec { - let response = self.client - .get(&format!("{}/api/documents", get_base_url())) - .header("Authorization", format!("Bearer {}", self.token)) - .send() - .await - .expect("Documents endpoint should work"); - - if !response.status().is_success() { - panic!("Failed to get documents: {}", response.status()); - } - - let data: Value = response.json().await.expect("Valid JSON"); - - // Handle both paginated and non-paginated response formats - match data { - Value::Object(obj) if obj.contains_key("documents") => { - obj["documents"].as_array().unwrap_or(&vec![]).clone() - } - Value::Array(arr) => arr, - _ => vec![] - } - } -} - -#[tokio::test] -#[ignore = "Debug test - run manually when needed"] -async fn debug_high_concurrency_pipeline() { - println!("šŸš€ STARTING HIGH-CONCURRENCY PIPELINE DEBUG"); - println!("============================================"); - - let debugger = PipelineDebugger::new().await; - - // Create 5 documents with unique, easily identifiable content - let documents = vec![ - ("DOC-ALPHA-001-UNIQUE-SIGNATURE-ALPHA", "debug_alpha.txt"), - ("DOC-BRAVO-002-UNIQUE-SIGNATURE-BRAVO", "debug_bravo.txt"), - ("DOC-CHARLIE-003-UNIQUE-SIGNATURE-CHARLIE", "debug_charlie.txt"), - ("DOC-DELTA-004-UNIQUE-SIGNATURE-DELTA", "debug_delta.txt"), - ("DOC-ECHO-005-UNIQUE-SIGNATURE-ECHO", "debug_echo.txt"), - ]; - - println!("\nšŸ“ TEST DOCUMENTS:"); - for (i, (content, filename)) in documents.iter().enumerate() { - println!(" {}: {} -> {}", i+1, filename, content); - } - - // Phase 1: Upload all documents simultaneously - println!("\nšŸ PHASE 1: SIMULTANEOUS UPLOAD"); - println!("================================"); - - let upload_start = Instant::now(); - - // Execute all uploads concurrently - let uploaded_docs = futures::future::join_all( - documents.iter().map(|(content, filename)| { - debugger.upload_document_with_debug(content, filename) - }).collect::>() - ).await; - let upload_duration = upload_start.elapsed(); - - println!("\nāœ… ALL UPLOADS COMPLETED in {:?}", upload_duration); - - // Phase 2: Trace OCR processing for each document - println!("\nšŸ”¬ PHASE 2: OCR PROCESSING TRACE"); - println!("================================"); - - let mut ocr_tasks = Vec::new(); - - for (i, doc) in uploaded_docs.iter().enumerate() { - let doc_id = doc.id; - let expected_content = documents[i].0.to_string(); - let debugger_ref = &debugger; - - let task = async move { - let result = debugger_ref.trace_ocr_processing(doc_id, &expected_content).await; - (doc_id, expected_content, result) - }; - - ocr_tasks.push(task); - } - - // Process all OCR traces concurrently - let ocr_results = futures::future::join_all(ocr_tasks).await; - - // Phase 3: Comprehensive analysis - println!("\nšŸ“Š PHASE 3: COMPREHENSIVE ANALYSIS"); - println!("==================================="); - - let mut corrupted_docs = Vec::new(); - let mut successful_docs = Vec::new(); - - for (doc_id, expected_content, ocr_result) in ocr_results { - let actual_text = ocr_result["ocr_text"].as_str().unwrap_or(""); - let status = ocr_result["ocr_status"].as_str().unwrap_or("unknown"); - - println!("\nšŸ“„ Document Analysis: {}", doc_id); - println!(" Status: {}", status); - println!(" Expected: {}", expected_content); - println!(" Actual: {}", actual_text); - - if status == "completed" { - if actual_text == expected_content { - println!(" āœ… CONTENT CORRECT"); - successful_docs.push(doc_id); - } else { - println!(" āŒ CONTENT CORRUPTED"); - corrupted_docs.push((doc_id, expected_content.clone(), actual_text.to_string())); - - // Check if it contains any other document's content - for (other_expected, _) in &documents { - if other_expected != &expected_content && actual_text.contains(other_expected) { - println!(" šŸ”„ Contains content from: {}", other_expected); - } - } - } - } else { - println!(" āš ļø NON-COMPLETED STATUS: {}", status); - } - } - - // Phase 4: System state analysis - println!("\nšŸ—ļø PHASE 4: SYSTEM STATE ANALYSIS"); - println!("==================================="); - - let all_docs = debugger.get_all_documents().await; - println!("šŸ“‹ Total documents in system: {}", all_docs.len()); - - for doc in &all_docs { - if let (Some(id), Some(filename), Some(status)) = ( - doc["id"].as_str(), - doc["filename"].as_str(), - doc["ocr_status"].as_str() - ) { - println!(" šŸ“„ {}: {} -> {}", id, filename, status); - } - } - - // Final verdict - println!("\nšŸ† FINAL VERDICT"); - println!("================"); - println!("āœ… Successful: {}", successful_docs.len()); - println!("āŒ Corrupted: {}", corrupted_docs.len()); - - if corrupted_docs.is_empty() { - println!("šŸŽ‰ NO CORRUPTION DETECTED!"); - } else { - println!("🚨 CORRUPTION DETECTED IN {} DOCUMENTS:", corrupted_docs.len()); - for (doc_id, expected, actual) in &corrupted_docs { - println!(" šŸ“„ {}: expected '{}' got '{}'", doc_id, expected, actual); - } - - // Try to identify patterns - if corrupted_docs.iter().all(|(_, _, actual)| actual.is_empty()) { - println!("šŸ” PATTERN: All corrupted documents have EMPTY content"); - } else if corrupted_docs.iter().all(|(_, _, actual)| actual == &corrupted_docs[0].2) { - println!("šŸ” PATTERN: All corrupted documents have IDENTICAL content: '{}'", corrupted_docs[0].2); - } else { - println!("šŸ” PATTERN: Mixed corruption types detected"); - } - - panic!("CORRUPTION DETECTED - see analysis above"); - } -} - -#[tokio::test] -#[ignore = "Debug test - run manually when needed"] -async fn debug_extreme_high_concurrency_pipeline() { - println!("šŸš€ STARTING EXTREME HIGH-CONCURRENCY PIPELINE STRESS TEST"); - println!("========================================================"); - - let debugger = PipelineDebugger::new().await; - - // Create 50+ documents with unique, easily identifiable content - let mut documents = Vec::new(); - for i in 1..=55 { - let content = format!("STRESS-TEST-DOCUMENT-{:03}-UNIQUE-SIGNATURE-{:03}", i, i); - let filename = format!("stress_test_{:03}.txt", i); - documents.push((content, filename)); - } - - println!("\nšŸ“ STRESS TEST SETUP:"); - println!(" šŸ“Š Total Documents: {}", documents.len()); - println!(" šŸ”„ Concurrent Processing: All {} documents simultaneously", documents.len()); - println!(" šŸŽÆ Goal: Zero corruption across all documents"); - - // Phase 1: Upload all documents simultaneously - println!("\nšŸ PHASE 1: SIMULTANEOUS UPLOAD"); - println!("================================"); - - let upload_start = Instant::now(); - - // Execute all uploads concurrently - let uploaded_docs = futures::future::join_all( - documents.iter().map(|(content, filename)| { - debugger.upload_document_with_debug(content, filename) - }).collect::>() - ).await; - let upload_duration = upload_start.elapsed(); - - println!("\nāœ… ALL UPLOADS COMPLETED in {:?}", upload_duration); - - // Phase 2: Trace OCR processing for each document - println!("\nšŸ”¬ PHASE 2: OCR PROCESSING TRACE"); - println!("================================"); - - let mut ocr_tasks = Vec::new(); - - for (i, doc) in uploaded_docs.iter().enumerate() { - let doc_id = doc.id; - let expected_content = documents[i].0.to_string(); - let debugger_ref = &debugger; - - let task = async move { - let result = debugger_ref.trace_ocr_processing(doc_id, &expected_content).await; - (doc_id, expected_content, result) - }; - - ocr_tasks.push(task); - } - - // Process all OCR traces concurrently - let ocr_results = futures::future::join_all(ocr_tasks).await; - - // Phase 3: Comprehensive analysis - println!("\nšŸ“Š PHASE 3: COMPREHENSIVE ANALYSIS"); - println!("==================================="); - - let mut corrupted_docs = Vec::new(); - let mut successful_docs = Vec::new(); - - for (doc_id, expected_content, ocr_result) in ocr_results { - let actual_text = ocr_result["ocr_text"].as_str().unwrap_or(""); - let status = ocr_result["ocr_status"].as_str().unwrap_or("unknown"); - - println!("\nšŸ“„ Document Analysis: {}", doc_id); - println!(" Status: {}", status); - println!(" Expected: {}", expected_content); - println!(" Actual: {}", actual_text); - - if status == "completed" { - if actual_text == expected_content { - println!(" āœ… CONTENT CORRECT"); - successful_docs.push(doc_id); - } else { - println!(" āŒ CONTENT CORRUPTED"); - corrupted_docs.push((doc_id, expected_content.clone(), actual_text.to_string())); - - // Check if it contains any other document's content - for (other_expected, _) in &documents { - if other_expected != &expected_content && actual_text.contains(other_expected) { - println!(" šŸ”„ Contains content from: {}", other_expected); - } - } - } - } else { - println!(" āš ļø NON-COMPLETED STATUS: {}", status); - } - } - - // Phase 4: System state analysis - println!("\nšŸ—ļø PHASE 4: SYSTEM STATE ANALYSIS"); - println!("==================================="); - - let all_docs = debugger.get_all_documents().await; - println!("šŸ“‹ Total documents in system: {}", all_docs.len()); - - for doc in &all_docs { - if let (Some(id), Some(filename), Some(status)) = ( - doc["id"].as_str(), - doc["filename"].as_str(), - doc["ocr_status"].as_str() - ) { - println!(" šŸ“„ {}: {} -> {}", id, filename, status); - } - } - - // Final verdict - println!("\nšŸ† FINAL VERDICT"); - println!("================"); - println!("āœ… Successful: {}", successful_docs.len()); - println!("āŒ Corrupted: {}", corrupted_docs.len()); - - if corrupted_docs.is_empty() { - println!("šŸŽ‰ NO CORRUPTION DETECTED!"); - } else { - println!("🚨 CORRUPTION DETECTED IN {} DOCUMENTS:", corrupted_docs.len()); - for (doc_id, expected, actual) in &corrupted_docs { - println!(" šŸ“„ {}: expected '{}' got '{}'", doc_id, expected, actual); - } - - // Try to identify patterns - if corrupted_docs.iter().all(|(_, _, actual)| actual.is_empty()) { - println!("šŸ” PATTERN: All corrupted documents have EMPTY content"); - } else if corrupted_docs.iter().all(|(_, _, actual)| actual == &corrupted_docs[0].2) { - println!("šŸ” PATTERN: All corrupted documents have IDENTICAL content: '{}'", corrupted_docs[0].2); - } else { - println!("šŸ” PATTERN: Mixed corruption types detected"); - } - - panic!("CORRUPTION DETECTED - see analysis above"); - } -} - -#[tokio::test] -#[ignore = "Debug test - run manually when needed"] -async fn debug_document_upload_race_conditions() { - println!("šŸ” DEBUGGING DOCUMENT UPLOAD PROCESS"); - println!("===================================="); - - // First, let's do a basic connectivity test - println!("šŸ” DEBUG: Testing basic network connectivity..."); - let test_client = reqwest::Client::new(); - let base_url = get_base_url(); - println!("šŸ” DEBUG: Base URL from environment: {}", base_url); - - // Try a simple GET request first - match test_client.get(&base_url).send().await { - Ok(resp) => { - println!("āœ… DEBUG: Basic connectivity test passed. Status: {}", resp.status()); - } - Err(e) => { - println!("āŒ DEBUG: Basic connectivity test failed"); - println!("āŒ DEBUG: Error: {:?}", e); - panic!("Cannot connect to server at {}. Error: {}", base_url, e); - } - } - - let debugger = PipelineDebugger::new().await; - - // Upload same content with different filenames to test: - // 1. Concurrent upload race condition handling (no 500 errors) - // 2. Proper deduplication (identical content = same document ID) - let same_content = "IDENTICAL-CONTENT-FOR-RACE-CONDITION-TEST"; - let task1 = debugger.upload_document_with_debug(same_content, "race1.txt"); - let task2 = debugger.upload_document_with_debug(same_content, "race2.txt"); - let task3 = debugger.upload_document_with_debug(same_content, "race3.txt"); - - let (doc1, doc2, doc3) = futures::future::join3(task1, task2, task3).await; - let docs = vec![doc1, doc2, doc3]; - - println!("\nšŸ“Š UPLOAD RACE CONDITION ANALYSIS:"); - for (i, doc) in docs.iter().enumerate() { - println!(" Doc {}: ID={}, Filename={}, Size={}", - i+1, doc.id, doc.filename, doc.file_size); - } - - // Check deduplication behavior: identical content should result in same document ID - let mut ids: Vec<_> = docs.iter().map(|d| d.id).collect(); - ids.sort(); - ids.dedup(); - - if ids.len() == 1 { - println!("āœ… Correct deduplication: All identical content maps to same document ID"); - println!("āœ… Race condition handled properly: No 500 errors during concurrent uploads"); - } else if ids.len() == docs.len() { - println!("āŒ UNEXPECTED: All documents have unique IDs despite identical content"); - panic!("Deduplication not working - identical content should map to same document"); - } else { - println!("āŒ PARTIAL DEDUPLICATION: Some duplicates detected but not all"); - panic!("Inconsistent deduplication behavior"); - } - - // Verify all documents have the same content hash (should be identical) - let content_hashes: Vec<_> = docs.iter().map(|d| { - // We can't directly access file_hash from DocumentResponse, but we can verify - // they all have the same file size as a proxy for same content - d.file_size - }).collect(); - - if content_hashes.iter().all(|&size| size == content_hashes[0]) { - println!("āœ… All documents have same file size (content verification)"); - } else { - println!("āŒ Documents have different file sizes - test setup error"); - } -} \ No newline at end of file diff --git a/tests/manual_sync_tests.disabled b/tests/manual_sync_tests.disabled deleted file mode 100644 index 419e2f9..0000000 --- a/tests/manual_sync_tests.disabled +++ /dev/null @@ -1,619 +0,0 @@ -/*! - * Manual Sync Triggering Unit Tests - * - * Tests for manual sync triggering functionality including: - * - API endpoint testing - * - Source status validation - * - Conflict detection (already syncing) - * - Permission and authentication checks - * - Error handling and recovery - * - Integration with source scheduler - */ - -use std::sync::Arc; -use std::collections::HashMap; -use std::time::{SystemTime, Duration}; -use uuid::Uuid; -use chrono::Utc; -use serde_json::json; -use axum::http::StatusCode; - -use readur::{ - AppState, - config::Config, - db::Database, - models::{Source, SourceType, SourceStatus, WebDAVSourceConfig, User, UserRole}, - auth::AuthUser, - routes::sources, -}; - -/// Create a test app state -async fn create_test_app_state() -> Arc { - let config = Config { - database_url: "sqlite::memory:".to_string(), - server_address: "127.0.0.1:8080".to_string(), - jwt_secret: "test_secret".to_string(), - upload_path: "/tmp/test_uploads".to_string(), - watch_folder: "/tmp/test_watch".to_string(), - allowed_file_types: vec!["pdf".to_string(), "txt".to_string()], - watch_interval_seconds: Some(30), - file_stability_check_ms: Some(500), - max_file_age_hours: None, - ocr_language: "eng".to_string(), - concurrent_ocr_jobs: 2, - ocr_timeout_seconds: 60, - max_file_size_mb: 10, - memory_limit_mb: 256, - cpu_priority: "normal".to_string(), - }; - - let db = Database::new(&config.database_url).await.unwrap(); - let queue_service = std::sync::Arc::new(readur::ocr_queue::OcrQueueService::new(db.clone(), db.pool.clone(), 2)); - - Arc::new(AppState { - db, - config, - webdav_scheduler: None, - source_scheduler: None, - queue_service, - }) -} - -/// Create a test user -fn create_test_user() -> User { - User { - id: Uuid::new_v4(), - username: "testuser".to_string(), - email: "test@example.com".to_string(), - password_hash: "hashed_password".to_string(), - role: UserRole::User, - created_at: Utc::now(), - updated_at: Utc::now(), - } -} - -/// Create a test source in various states -fn create_test_source_with_status(status: SourceStatus, user_id: Uuid) -> Source { - Source { - id: Uuid::new_v4(), - user_id, - name: "Test WebDAV Source".to_string(), - source_type: SourceType::WebDAV, - enabled: true, - config: json!({ - "server_url": "https://cloud.example.com", - "username": "testuser", - "password": "testpass", - "watch_folders": ["/Documents"], - "file_extensions": [".pdf", ".txt"], - "auto_sync": true, - "sync_interval_minutes": 60, - "server_type": "nextcloud" - }), - status, - last_sync_at: None, - last_error: None, - last_error_at: None, - total_files_synced: 0, - total_files_pending: 0, - total_size_bytes: 0, - created_at: Utc::now(), - updated_at: Utc::now(), - validation_status: None, - last_validation_at: None, - validation_score: None, - validation_issues: None, - } -} - -#[tokio::test] -async fn test_manual_sync_trigger_idle_source() { - let state = create_test_app_state().await; - let user = create_test_user(); - let source = create_test_source_with_status(SourceStatus::Idle, user.id); - - // Test that idle source can be triggered for sync - let can_trigger = can_trigger_manual_sync(&source); - assert!(can_trigger, "Idle source should be available for manual sync"); - - // Test status update to syncing - let updated_status = SourceStatus::Syncing; - assert_ne!(source.status, updated_status); - assert!(is_valid_sync_trigger_transition(&source.status, &updated_status)); -} - -#[tokio::test] -async fn test_manual_sync_trigger_already_syncing() { - let state = create_test_app_state().await; - let user = create_test_user(); - let source = create_test_source_with_status(SourceStatus::Syncing, user.id); - - // Test that already syncing source cannot be triggered again - let can_trigger = can_trigger_manual_sync(&source); - assert!(!can_trigger, "Already syncing source should not allow manual sync"); - - // This should result in HTTP 409 Conflict - let expected_status = StatusCode::CONFLICT; - let result_status = get_expected_status_for_sync_trigger(&source); - assert_eq!(result_status, expected_status); -} - -#[tokio::test] -async fn test_manual_sync_trigger_error_state() { - let state = create_test_app_state().await; - let user = create_test_user(); - let mut source = create_test_source_with_status(SourceStatus::Error, user.id); - source.last_error = Some("Previous sync failed".to_string()); - source.last_error_at = Some(Utc::now()); - - // Test that source in error state can be triggered (retry) - let can_trigger = can_trigger_manual_sync(&source); - assert!(can_trigger, "Source in error state should allow manual sync retry"); - - // Test status transition from error to syncing - assert!(is_valid_sync_trigger_transition(&source.status, &SourceStatus::Syncing)); -} - -fn can_trigger_manual_sync(source: &Source) -> bool { - match source.status { - SourceStatus::Idle => true, - SourceStatus::Error => true, - SourceStatus::Syncing => false, - } -} - -fn is_valid_sync_trigger_transition(from: &SourceStatus, to: &SourceStatus) -> bool { - match (from, to) { - (SourceStatus::Idle, SourceStatus::Syncing) => true, - (SourceStatus::Error, SourceStatus::Syncing) => true, - _ => false, - } -} - -fn get_expected_status_for_sync_trigger(source: &Source) -> StatusCode { - match source.status { - SourceStatus::Idle => StatusCode::OK, - SourceStatus::Error => StatusCode::OK, - SourceStatus::Syncing => StatusCode::CONFLICT, - } -} - -#[tokio::test] -async fn test_source_ownership_validation() { - let user_1 = create_test_user(); - let user_2 = User { - id: Uuid::new_v4(), - username: "otheruser".to_string(), - email: "other@example.com".to_string(), - password_hash: "other_hash".to_string(), - role: UserRole::User, - created_at: Utc::now(), - updated_at: Utc::now(), - }; - - let source = create_test_source_with_status(SourceStatus::Idle, user_1.id); - - // Test that owner can trigger sync - assert!(can_user_trigger_sync(&user_1, &source)); - - // Test that non-owner cannot trigger sync - assert!(!can_user_trigger_sync(&user_2, &source)); - - // Test admin can trigger any sync - let admin_user = User { - id: Uuid::new_v4(), - username: "admin".to_string(), - email: "admin@example.com".to_string(), - password_hash: "admin_hash".to_string(), - role: UserRole::Admin, - created_at: Utc::now(), - updated_at: Utc::now(), - }; - - assert!(can_user_trigger_sync(&admin_user, &source)); -} - -fn can_user_trigger_sync(user: &User, source: &Source) -> bool { - user.role == UserRole::Admin || user.id == source.user_id -} - -#[test] -fn test_sync_trigger_request_validation() { - // Test valid source IDs - let valid_id = Uuid::new_v4(); - assert!(is_valid_source_id(&valid_id.to_string())); - - // Test invalid source IDs - let invalid_ids = vec![ - "", - "invalid-uuid", - "12345", - "not-a-uuid-at-all", - ]; - - for invalid_id in invalid_ids { - assert!(!is_valid_source_id(invalid_id), "Should reject invalid UUID: {}", invalid_id); - } -} - -fn is_valid_source_id(id_str: &str) -> bool { - Uuid::parse_str(id_str).is_ok() -} - -#[test] -fn test_sync_trigger_rate_limiting() { - - // Test rate limiting for manual sync triggers - let mut rate_limiter = SyncRateLimiter::new(); - let source_id = Uuid::new_v4(); - - // First trigger should be allowed - assert!(rate_limiter.can_trigger_sync(&source_id)); - rate_limiter.record_sync_trigger(&source_id); - - // Immediate second trigger should be blocked - assert!(!rate_limiter.can_trigger_sync(&source_id)); - - // After cooldown period, should be allowed again - rate_limiter.advance_time(Duration::from_secs(61)); // Advance past cooldown - assert!(rate_limiter.can_trigger_sync(&source_id)); -} - -struct SyncRateLimiter { - last_triggers: HashMap, - cooldown_period: Duration, - current_time: SystemTime, -} - -impl SyncRateLimiter { - fn new() -> Self { - Self { - last_triggers: HashMap::new(), - cooldown_period: Duration::from_secs(60), // 1 minute cooldown - current_time: SystemTime::now(), - } - } - - fn can_trigger_sync(&self, source_id: &Uuid) -> bool { - if let Some(&last_trigger) = self.last_triggers.get(source_id) { - self.current_time.duration_since(last_trigger).unwrap_or(Duration::ZERO) >= self.cooldown_period - } else { - true // Never triggered before - } - } - - fn record_sync_trigger(&mut self, source_id: &Uuid) { - self.last_triggers.insert(*source_id, self.current_time); - } - - fn advance_time(&mut self, duration: Duration) { - self.current_time += duration; - } -} - -#[tokio::test] -async fn test_sync_trigger_with_disabled_source() { - let state = create_test_app_state().await; - let user = create_test_user(); - let mut source = create_test_source_with_status(SourceStatus::Idle, user.id); - source.enabled = false; // Disable the source - - // Test that disabled source cannot be triggered - let can_trigger = can_trigger_disabled_source(&source); - assert!(!can_trigger, "Disabled source should not allow manual sync"); - - // This should result in HTTP 400 Bad Request - let expected_status = if source.enabled { - StatusCode::OK - } else { - StatusCode::BAD_REQUEST - }; - - assert_eq!(expected_status, StatusCode::BAD_REQUEST); -} - -fn can_trigger_disabled_source(source: &Source) -> bool { - source.enabled && can_trigger_manual_sync(source) -} - -#[test] -fn test_sync_trigger_configuration_validation() { - let user_id = Uuid::new_v4(); - - // Test valid WebDAV configuration - let valid_source = create_test_source_with_status(SourceStatus::Idle, user_id); - let config_result: Result = serde_json::from_value(valid_source.config.clone()); - assert!(config_result.is_ok(), "Valid configuration should parse successfully"); - - // Test invalid configuration - let mut invalid_source = create_test_source_with_status(SourceStatus::Idle, user_id); - invalid_source.config = json!({ - "server_url": "", // Invalid empty URL - "username": "test", - "password": "test" - // Missing required fields - }); - - let invalid_config_result: Result = serde_json::from_value(invalid_source.config.clone()); - assert!(invalid_config_result.is_err(), "Invalid configuration should fail to parse"); -} - -#[test] -fn test_concurrent_sync_trigger_protection() { - use std::sync::{Arc, Mutex}; - use std::collections::HashSet; - use std::thread; - - let active_syncs: Arc>> = Arc::new(Mutex::new(HashSet::new())); - let source_id = Uuid::new_v4(); - - let mut handles = vec![]; - let results = Arc::new(Mutex::new(Vec::new())); - - // Simulate multiple concurrent trigger attempts - for _ in 0..5 { - let active_syncs = Arc::clone(&active_syncs); - let results = Arc::clone(&results); - - let handle = thread::spawn(move || { - let mut syncs = active_syncs.lock().unwrap(); - let was_inserted = syncs.insert(source_id); - - results.lock().unwrap().push(was_inserted); - - // Simulate some work - std::thread::sleep(std::time::Duration::from_millis(10)); - }); - - handles.push(handle); - } - - // Wait for all threads - for handle in handles { - handle.join().unwrap(); - } - - let final_results = results.lock().unwrap(); - let successful_triggers = final_results.iter().filter(|&&success| success).count(); - - // Only one thread should have successfully triggered the sync - assert_eq!(successful_triggers, 1, "Only one concurrent sync trigger should succeed"); -} - -#[test] -fn test_sync_trigger_error_responses() { - // Test various error scenarios and their expected HTTP responses - let test_cases = vec![ - (SyncTriggerError::SourceNotFound, StatusCode::NOT_FOUND), - (SyncTriggerError::AlreadySyncing, StatusCode::CONFLICT), - (SyncTriggerError::SourceDisabled, StatusCode::BAD_REQUEST), - (SyncTriggerError::InvalidConfiguration, StatusCode::BAD_REQUEST), - (SyncTriggerError::PermissionDenied, StatusCode::FORBIDDEN), - (SyncTriggerError::RateLimited, StatusCode::TOO_MANY_REQUESTS), - (SyncTriggerError::InternalError, StatusCode::INTERNAL_SERVER_ERROR), - ]; - - for (error, expected_status) in test_cases { - let status = error.to_status_code(); - assert_eq!(status, expected_status, "Wrong status code for error: {:?}", error); - } -} - -#[derive(Debug, Clone)] -enum SyncTriggerError { - SourceNotFound, - AlreadySyncing, - SourceDisabled, - InvalidConfiguration, - PermissionDenied, - RateLimited, - InternalError, -} - -impl SyncTriggerError { - fn to_status_code(&self) -> StatusCode { - match self { - SyncTriggerError::SourceNotFound => StatusCode::NOT_FOUND, - SyncTriggerError::AlreadySyncing => StatusCode::CONFLICT, - SyncTriggerError::SourceDisabled => StatusCode::BAD_REQUEST, - SyncTriggerError::InvalidConfiguration => StatusCode::BAD_REQUEST, - SyncTriggerError::PermissionDenied => StatusCode::FORBIDDEN, - SyncTriggerError::RateLimited => StatusCode::TOO_MANY_REQUESTS, - SyncTriggerError::InternalError => StatusCode::INTERNAL_SERVER_ERROR, - } - } -} - -#[test] -fn test_manual_sync_metrics() { - // Test tracking of manual sync triggers vs automatic syncs - let mut sync_metrics = ManualSyncMetrics::new(); - let source_id = Uuid::new_v4(); - - // Record manual triggers - sync_metrics.record_manual_trigger(source_id); - sync_metrics.record_manual_trigger(source_id); - - // Record automatic syncs - sync_metrics.record_automatic_sync(source_id); - - let stats = sync_metrics.get_stats_for_source(&source_id); - assert_eq!(stats.manual_triggers, 2); - assert_eq!(stats.automatic_syncs, 1); - assert_eq!(stats.total_syncs(), 3); - - let manual_ratio = stats.manual_trigger_ratio(); - assert!((manual_ratio - 0.666).abs() < 0.01); // ~66.7% -} - -struct ManualSyncMetrics { - manual_triggers: HashMap, - automatic_syncs: HashMap, -} - -impl ManualSyncMetrics { - fn new() -> Self { - Self { - manual_triggers: HashMap::new(), - automatic_syncs: HashMap::new(), - } - } - - fn record_manual_trigger(&mut self, source_id: Uuid) { - *self.manual_triggers.entry(source_id).or_insert(0) += 1; - } - - fn record_automatic_sync(&mut self, source_id: Uuid) { - *self.automatic_syncs.entry(source_id).or_insert(0) += 1; - } - - fn get_stats_for_source(&self, source_id: &Uuid) -> SyncStats { - SyncStats { - manual_triggers: self.manual_triggers.get(source_id).copied().unwrap_or(0), - automatic_syncs: self.automatic_syncs.get(source_id).copied().unwrap_or(0), - } - } -} - -struct SyncStats { - manual_triggers: u32, - automatic_syncs: u32, -} - -impl SyncStats { - fn total_syncs(&self) -> u32 { - self.manual_triggers + self.automatic_syncs - } - - fn manual_trigger_ratio(&self) -> f64 { - if self.total_syncs() == 0 { - 0.0 - } else { - self.manual_triggers as f64 / self.total_syncs() as f64 - } - } -} - -#[test] -fn test_sync_trigger_audit_logging() { - // Test audit logging for manual sync triggers - let mut audit_log = SyncAuditLog::new(); - let user_id = Uuid::new_v4(); - let source_id = Uuid::new_v4(); - - // Record successful trigger - audit_log.log_sync_trigger(SyncTriggerEvent { - user_id, - source_id, - timestamp: Utc::now(), - result: SyncTriggerResult::Success, - user_agent: Some("Mozilla/5.0 (Test Browser)".to_string()), - ip_address: Some("192.168.1.100".to_string()), - }); - - // Record failed trigger - audit_log.log_sync_trigger(SyncTriggerEvent { - user_id, - source_id, - timestamp: Utc::now(), - result: SyncTriggerResult::Failed("Already syncing".to_string()), - user_agent: Some("Mozilla/5.0 (Test Browser)".to_string()), - ip_address: Some("192.168.1.100".to_string()), - }); - - let events = audit_log.get_events_for_user(&user_id); - assert_eq!(events.len(), 2); - assert!(matches!(events[0].result, SyncTriggerResult::Success)); - assert!(matches!(events[1].result, SyncTriggerResult::Failed(_))); -} - -struct SyncAuditLog { - events: Vec, -} - -impl SyncAuditLog { - fn new() -> Self { - Self { - events: Vec::new(), - } - } - - fn log_sync_trigger(&mut self, event: SyncTriggerEvent) { - self.events.push(event); - } - - fn get_events_for_user(&self, user_id: &Uuid) -> Vec<&SyncTriggerEvent> { - self.events.iter().filter(|e| e.user_id == *user_id).collect() - } -} - -#[derive(Debug, Clone)] -struct SyncTriggerEvent { - user_id: Uuid, - source_id: Uuid, - timestamp: chrono::DateTime, - result: SyncTriggerResult, - user_agent: Option, - ip_address: Option, -} - -#[derive(Debug, Clone)] -enum SyncTriggerResult { - Success, - Failed(String), -} - -#[tokio::test] -async fn test_sync_trigger_with_scheduler_integration() { - // Test integration with source scheduler - let state = create_test_app_state().await; - let user = create_test_user(); - let source = create_test_source_with_status(SourceStatus::Idle, user.id); - - // Test that trigger_sync method exists and handles the source - let sync_request = ManualSyncRequest { - source_id: source.id, - user_id: user.id, - force: false, // Don't force if already syncing - priority: SyncPriority::Normal, - }; - - // Simulate what the actual API would do - let can_proceed = validate_sync_request(&sync_request, &source); - assert!(can_proceed, "Valid sync request should be allowed"); -} - -#[derive(Debug, Clone)] -struct ManualSyncRequest { - source_id: Uuid, - user_id: Uuid, - force: bool, - priority: SyncPriority, -} - -#[derive(Debug, Clone)] -enum SyncPriority { - Low, - Normal, - High, - Urgent, -} - -fn validate_sync_request(request: &ManualSyncRequest, source: &Source) -> bool { - // Check ownership - if request.user_id != source.user_id { - return false; - } - - // Check if source is enabled - if !source.enabled { - return false; - } - - // Check status (allow force override) - if !request.force && source.status == SourceStatus::Syncing { - return false; - } - - true -} \ No newline at end of file diff --git a/tests/stress_test_25.rs.disabled b/tests/stress_test_25.rs.disabled deleted file mode 100644 index 4d7d385..0000000 --- a/tests/stress_test_25.rs.disabled +++ /dev/null @@ -1,227 +0,0 @@ -/*! - * Moderate Stress Test - 25 Documents for Complete Verification - */ - -use reqwest::Client; -use serde_json::Value; -use std::time::{Duration, Instant}; -use tokio::time::sleep; -use uuid::Uuid; -use futures; - -use readur::models::{DocumentResponse, CreateUser, LoginRequest, LoginResponse}; - -fn get_base_url() -> String { - std::env::var("API_URL").unwrap_or_else(|_| "http://localhost:8000".to_string()) -} -const TIMEOUT: Duration = Duration::from_secs(120); - -struct StressTester { - client: Client, - token: String, -} - -impl StressTester { - async fn new() -> Self { - let client = Client::new(); - - // Check server health - client.get(&format!("{}/api/health", get_base_url())) - .timeout(Duration::from_secs(5)) - .send() - .await - .expect("Server should be running"); - - // Create test user - let timestamp = std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap() - .as_millis(); - let username = format!("stress_25_{}", timestamp); - let email = format!("stress_25_{}@test.com", timestamp); - - // Register user - let user_data = CreateUser { - username: username.clone(), - email: email.clone(), - password: "testpass123".to_string(), - role: Some(readur::models::UserRole::User), - }; - - client.post(&format!("{}/api/auth/register", get_base_url())) - .json(&user_data) - .send() - .await - .expect("Registration should work"); - - // Login - let login_data = LoginRequest { - username: username.clone(), - password: "testpass123".to_string(), - }; - - let login_response = client - .post(&format!("{}/api/auth/login", get_base_url())) - .json(&login_data) - .send() - .await - .expect("Login should work"); - - let login_result: LoginResponse = login_response.json().await.expect("Login should return JSON"); - let token = login_result.token; - - println!("āœ… Stress tester initialized"); - - Self { client, token } - } - - async fn upload_document(&self, content: &str, filename: &str) -> DocumentResponse { - let part = reqwest::multipart::Part::text(content.to_string()) - .file_name(filename.to_string()) - .mime_str("text/plain") - .expect("Valid mime type"); - let form = reqwest::multipart::Form::new().part("file", part); - - let response = self.client - .post(&format!("{}/api/documents", get_base_url())) - .header("Authorization", format!("Bearer {}", self.token)) - .multipart(form) - .send() - .await - .expect("Upload should work"); - - response.json().await.expect("Valid JSON") - } - - async fn wait_for_ocr_completion(&self, document_ids: &[Uuid]) -> Vec { - let start = Instant::now(); - - while start.elapsed() < TIMEOUT { - let all_docs = self.get_all_documents().await; - let completed = all_docs.iter() - .filter(|doc| { - let doc_id_str = doc["id"].as_str().unwrap_or(""); - let status = doc["ocr_status"].as_str().unwrap_or(""); - document_ids.iter().any(|id| id.to_string() == doc_id_str) && status == "completed" - }) - .count(); - - if completed == document_ids.len() { - return all_docs.into_iter() - .filter(|doc| { - let doc_id_str = doc["id"].as_str().unwrap_or(""); - document_ids.iter().any(|id| id.to_string() == doc_id_str) - }) - .collect(); - } - - sleep(Duration::from_millis(500)).await; - } - - panic!("OCR processing did not complete within timeout"); - } - - async fn get_all_documents(&self) -> Vec { - let response = self.client - .get(&format!("{}/api/documents", get_base_url())) - .header("Authorization", format!("Bearer {}", self.token)) - .send() - .await - .expect("Documents endpoint should work"); - - let data: Value = response.json().await.expect("Valid JSON"); - - match data { - Value::Object(obj) if obj.contains_key("documents") => { - obj["documents"].as_array().unwrap_or(&vec![]).clone() - } - Value::Array(arr) => arr, - _ => vec![] - } - } -} - -#[tokio::test] -async fn stress_test_25_documents() { - println!("šŸš€ MODERATE STRESS TEST: 25 DOCUMENTS"); - println!("======================================"); - - let tester = StressTester::new().await; - - // Create 25 documents with unique content - let mut documents = Vec::new(); - for i in 1..=25 { - let content = format!("STRESS-DOC-{:02}-SIGNATURE-{:02}-UNIQUE-CONTENT", i, i); - let filename = format!("stress_{:02}.txt", i); - documents.push((content, filename)); - } - - println!("šŸ“Š Testing {} documents concurrently", documents.len()); - - // Phase 1: Upload all documents concurrently - println!("\nšŸ UPLOADING..."); - let upload_start = Instant::now(); - - let uploaded_docs = futures::future::join_all( - documents.iter().map(|(content, filename)| { - tester.upload_document(content, filename) - }).collect::>() - ).await; - - let upload_duration = upload_start.elapsed(); - println!("āœ… {} uploads completed in {:?}", uploaded_docs.len(), upload_duration); - - // Phase 2: Wait for OCR completion - println!("\nšŸ”¬ PROCESSING OCR..."); - let processing_start = Instant::now(); - let document_ids: Vec = uploaded_docs.iter().map(|doc| doc.id).collect(); - - let final_docs = tester.wait_for_ocr_completion(&document_ids).await; - let processing_duration = processing_start.elapsed(); - println!("āœ… OCR processing completed in {:?}", processing_duration); - - // Phase 3: Corruption Analysis - println!("\nšŸ“Š VERIFYING RESULTS..."); - let mut successful = 0; - let mut corrupted = 0; - let mut corruption_details = Vec::new(); - - for (i, doc) in final_docs.iter().enumerate() { - let expected_content = &documents[i].0; - let actual_text = doc["ocr_text"].as_str().unwrap_or(""); - let doc_id = doc["id"].as_str().unwrap_or(""); - - if actual_text == expected_content { - successful += 1; - } else { - corrupted += 1; - corruption_details.push((doc_id.to_string(), expected_content.clone(), actual_text.to_string())); - } - } - - // Final Results - println!("\nšŸ† STRESS TEST RESULTS"); - println!("======================"); - println!("šŸ“Š Total Documents: {}", documents.len()); - println!("āœ… Successful: {}", successful); - println!("āŒ Corrupted: {}", corrupted); - println!("šŸ“ˆ Success Rate: {:.1}%", (successful as f64 / documents.len() as f64) * 100.0); - println!("ā±ļø Upload Time: {:?}", upload_duration); - println!("ā±ļø OCR Time: {:?}", processing_duration); - println!("ā±ļø Total Time: {:?}", upload_duration + processing_duration); - - if corrupted == 0 { - println!("\nšŸŽ‰ STRESS TEST PASSED!"); - println!("šŸŽÆ ALL {} DOCUMENTS PROCESSED WITHOUT CORRUPTION!", documents.len()); - println!("šŸš€ HIGH CONCURRENCY OCR CORRUPTION ISSUE IS FULLY RESOLVED!"); - } else { - println!("\n🚨 STRESS TEST FAILED!"); - println!("āŒ CORRUPTION DETECTED IN {} DOCUMENTS:", corrupted); - - for (doc_id, expected, actual) in &corruption_details { - println!(" šŸ“„ {}: expected '{}' got '{}'", doc_id, expected, actual); - } - - panic!("CORRUPTION DETECTED in {} out of {} documents", corrupted, documents.len()); - } -} \ No newline at end of file diff --git a/tests/stress_test_simple.rs.disabled b/tests/stress_test_simple.rs.disabled deleted file mode 100644 index b9c2957..0000000 --- a/tests/stress_test_simple.rs.disabled +++ /dev/null @@ -1,269 +0,0 @@ -/*! - * Simple High-Concurrency Stress Test - Focus on Results Only - */ - -use reqwest::Client; -use serde_json::Value; -use std::time::{Duration, Instant}; -use tokio::time::sleep; -use uuid::Uuid; -use futures; - -use readur::models::{DocumentResponse, CreateUser, LoginRequest, LoginResponse}; - -fn get_base_url() -> String { - std::env::var("API_URL").unwrap_or_else(|_| "http://localhost:8000".to_string()) -} -const TIMEOUT: Duration = Duration::from_secs(180); - -struct SimpleStressTester { - client: Client, - token: String, -} - -impl SimpleStressTester { - async fn new() -> Self { - let client = Client::new(); - - // Check server health - let response = client - .get(&format!("{}/api/health", get_base_url())) - .timeout(Duration::from_secs(5)) - .send() - .await - .expect("Server should be running"); - - if !response.status().is_success() { - panic!("Server not healthy"); - } - - // Create test user - let timestamp = std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap() - .as_millis(); - let username = format!("stress_test_{}", timestamp); - let email = format!("stress_test_{}@test.com", timestamp); - - // Register user - let user_data = CreateUser { - username: username.clone(), - email: email.clone(), - password: "testpass123".to_string(), - role: Some(readur::models::UserRole::User), - }; - - let register_response = client - .post(&format!("{}/api/auth/register", get_base_url())) - .json(&user_data) - .send() - .await - .expect("Registration should work"); - - if !register_response.status().is_success() { - panic!("Registration failed: {}", register_response.text().await.unwrap_or_default()); - } - - // Login - let login_data = LoginRequest { - username: username.clone(), - password: "testpass123".to_string(), - }; - - let login_response = client - .post(&format!("{}/api/auth/login", get_base_url())) - .json(&login_data) - .send() - .await - .expect("Login should work"); - - if !login_response.status().is_success() { - panic!("Login failed: {}", login_response.text().await.unwrap_or_default()); - } - - let login_result: LoginResponse = login_response.json().await.expect("Login should return JSON"); - let token = login_result.token; - - println!("āœ… Stress tester initialized for user: {}", username); - - Self { client, token } - } - - async fn upload_document(&self, content: &str, filename: &str) -> DocumentResponse { - let part = reqwest::multipart::Part::text(content.to_string()) - .file_name(filename.to_string()) - .mime_str("text/plain") - .expect("Valid mime type"); - let form = reqwest::multipart::Form::new().part("file", part); - - let response = self.client - .post(&format!("{}/api/documents", get_base_url())) - .header("Authorization", format!("Bearer {}", self.token)) - .multipart(form) - .send() - .await - .expect("Upload should work"); - - if !response.status().is_success() { - panic!("Upload failed: {}", response.text().await.unwrap_or_default()); - } - - response.json().await.expect("Valid JSON") - } - - async fn wait_for_completion(&self, document_ids: &[Uuid]) -> Vec { - let start = Instant::now(); - let mut last_completed = 0; - - while start.elapsed() < TIMEOUT { - let all_docs = self.get_all_documents().await; - let completed = all_docs.iter() - .filter(|doc| { - let doc_id_str = doc["id"].as_str().unwrap_or(""); - let status = doc["ocr_status"].as_str().unwrap_or(""); - document_ids.iter().any(|id| id.to_string() == doc_id_str) && status == "completed" - }) - .count(); - - if completed != last_completed { - last_completed = completed; - let progress = (completed as f64 / document_ids.len() as f64) * 100.0; - println!(" šŸ“Š Progress: {}/{} documents completed ({:.1}%)", - completed, document_ids.len(), progress); - } - - if completed == document_ids.len() { - break; - } - - sleep(Duration::from_secs(1)).await; - } - - // Get final results - let all_docs = self.get_all_documents().await; - all_docs.into_iter() - .filter(|doc| { - let doc_id_str = doc["id"].as_str().unwrap_or(""); - document_ids.iter().any(|id| id.to_string() == doc_id_str) - }) - .collect() - } - - async fn get_all_documents(&self) -> Vec { - let response = self.client - .get(&format!("{}/api/documents", get_base_url())) - .header("Authorization", format!("Bearer {}", self.token)) - .send() - .await - .expect("Documents endpoint should work"); - - if !response.status().is_success() { - panic!("Failed to get documents: {}", response.status()); - } - - let data: Value = response.json().await.expect("Valid JSON"); - - match data { - Value::Object(obj) if obj.contains_key("documents") => { - obj["documents"].as_array().unwrap_or(&vec![]).clone() - } - Value::Array(arr) => arr, - _ => vec![] - } - } -} - -#[tokio::test] -async fn stress_test_50_plus_documents() { - println!("šŸš€ EXTREME STRESS TEST: 50+ DOCUMENTS"); - println!("====================================="); - - let tester = SimpleStressTester::new().await; - - // Create 50+ documents with unique content - let mut documents = Vec::new(); - for i in 1..=55 { - let content = format!("STRESS-TEST-DOCUMENT-{:03}-UNIQUE-SIGNATURE-{:03}", i, i); - let filename = format!("stress_test_{:03}.txt", i); - documents.push((content, filename)); - } - - println!("šŸ“Š Total Documents: {}", documents.len()); - - // Phase 1: Upload all documents concurrently - println!("\nšŸ PHASE 1: SIMULTANEOUS UPLOAD"); - let upload_start = Instant::now(); - - let uploaded_docs = futures::future::join_all( - documents.iter().map(|(content, filename)| { - tester.upload_document(content, filename) - }).collect::>() - ).await; - - let upload_duration = upload_start.elapsed(); - println!("āœ… All {} documents uploaded in {:?}", uploaded_docs.len(), upload_duration); - - // Phase 2: Wait for OCR completion - println!("\nšŸ”¬ PHASE 2: OCR PROCESSING"); - let processing_start = Instant::now(); - let document_ids: Vec = uploaded_docs.iter().map(|doc| doc.id).collect(); - - let final_docs = tester.wait_for_completion(&document_ids).await; - let processing_duration = processing_start.elapsed(); - println!("āœ… All OCR processing completed in {:?}", processing_duration); - - // Phase 3: Corruption Analysis - println!("\nšŸ“Š PHASE 3: CORRUPTION ANALYSIS"); - let mut successful = 0; - let mut corrupted = 0; - let mut corrupted_details = Vec::new(); - - for (i, doc) in final_docs.iter().enumerate() { - let expected_content = &documents[i].0; - let actual_text = doc["ocr_text"].as_str().unwrap_or(""); - let status = doc["ocr_status"].as_str().unwrap_or(""); - let doc_id = doc["id"].as_str().unwrap_or(""); - - if status == "completed" { - if actual_text == expected_content { - successful += 1; - } else { - corrupted += 1; - corrupted_details.push((doc_id.to_string(), expected_content.clone(), actual_text.to_string())); - - // Only show first few corruption details to avoid spam - if corrupted <= 3 { - println!(" āŒ CORRUPTION: {} expected '{}' got '{}'", doc_id, expected_content, actual_text); - } - } - } else { - println!(" āš ļø NON-COMPLETED: {} status={}", doc_id, status); - } - } - - // Final Results - println!("\nšŸ† FINAL RESULTS"); - println!("================"); - println!("šŸ“Š Total Documents: {}", documents.len()); - println!("āœ… Successful: {}", successful); - println!("āŒ Corrupted: {}", corrupted); - println!("šŸ“ˆ Success Rate: {:.1}%", (successful as f64 / documents.len() as f64) * 100.0); - println!("ā±ļø Total Time: {:?}", upload_duration + processing_duration); - - if corrupted == 0 { - println!("šŸŽ‰ NO CORRUPTION DETECTED! ALL {} DOCUMENTS PROCESSED CORRECTLY!", documents.len()); - } else { - println!("🚨 CORRUPTION DETECTED IN {} DOCUMENTS:", corrupted); - - // Analyze corruption patterns - if corrupted_details.iter().all(|(_, _, actual)| actual.is_empty()) { - println!("šŸ” PATTERN: All corrupted documents have EMPTY content"); - } else if corrupted_details.len() > 1 && corrupted_details.iter().all(|(_, _, actual)| actual == &corrupted_details[0].2) { - println!("šŸ” PATTERN: All corrupted documents have IDENTICAL content: '{}'", corrupted_details[0].2); - } else { - println!("šŸ” PATTERN: Mixed corruption types detected"); - } - - panic!("CORRUPTION DETECTED in {} out of {} documents", corrupted, documents.len()); - } -} \ No newline at end of file diff --git a/tests/throttled_high_concurrency_test.rs.disabled b/tests/throttled_high_concurrency_test.rs.disabled deleted file mode 100644 index 3854d66..0000000 --- a/tests/throttled_high_concurrency_test.rs.disabled +++ /dev/null @@ -1,409 +0,0 @@ -/*! - * Throttled High Concurrency OCR Test - * - * This test verifies that our new throttling mechanism properly handles - * high concurrency scenarios (50+ documents) without database connection - * pool exhaustion or corrupting OCR results. - */ - -use anyhow::Result; -use sqlx::{PgPool, Row}; -use std::sync::Arc; -use tokio::time::{Duration, Instant}; -use tracing::{info, warn, error}; -use uuid::Uuid; - -use readur::{ - config::Config, - db::Database, - models::{Document, Settings}, - file_service::FileService, - enhanced_ocr::EnhancedOcrService, - ocr_queue::OcrQueueService, - db_guardrails_simple::DocumentTransactionManager, - request_throttler::RequestThrottler, -}; - -fn get_test_db_url() -> String { - std::env::var("DATABASE_URL") - .or_else(|_| std::env::var("TEST_DATABASE_URL")) - .unwrap_or_else(|_| "postgresql://postgres:postgres@localhost:5432/readur_test".to_string()) -} - -struct ThrottledTestHarness { - db: Database, - pool: PgPool, - file_service: FileService, - queue_service: Arc, - transaction_manager: DocumentTransactionManager, -} - -impl ThrottledTestHarness { - async fn new() -> Result { - // Initialize database with proper connection limits - let pool = sqlx::postgres::PgPoolOptions::new() - .max_connections(30) // Higher limit for stress testing - .acquire_timeout(std::time::Duration::from_secs(15)) - .connect(&get_test_db_url()) - .await?; - - let db = Database::new(&get_test_db_url()).await?; - - // Initialize services - let file_service = FileService::new("./test_uploads".to_string()); - - // Create throttled queue service - this is the key improvement - let queue_service = Arc::new(OcrQueueService::new( - db.clone(), - pool.clone(), - 15 // Limit to 15 concurrent OCR jobs to prevent DB pool exhaustion - )); - - let transaction_manager = DocumentTransactionManager::new(pool.clone()); - - // Ensure test upload directory exists - std::fs::create_dir_all("./test_uploads").unwrap_or_default(); - - Ok(Self { - db, - pool, - file_service, - queue_service, - transaction_manager, - }) - } - - async fn create_test_user(&self) -> Result { - let user_id = Uuid::new_v4(); - let timestamp = std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap() - .as_millis(); - - sqlx::query( - r#" - INSERT INTO users (id, username, email, password_hash, role) - VALUES ($1, $2, $3, $4, 'user') - "# - ) - .bind(user_id) - .bind(format!("throttle_test_user_{}", timestamp)) - .bind(format!("throttle_test_{}@example.com", timestamp)) - .bind("dummy_hash") - .execute(&self.pool) - .await?; - - info!("āœ… Created test user: {}", user_id); - Ok(user_id) - } - - async fn create_test_documents(&self, user_id: Uuid, count: usize) -> Result> { - let mut documents = Vec::new(); - - info!("šŸ“ Creating {} test documents", count); - - for i in 1..=count { - let content = format!("THROTTLE-TEST-DOC-{:03}-UNIQUE-CONTENT-{}", i, Uuid::new_v4()); - let filename = format!("throttle_test_{:03}.txt", i); - let doc_id = Uuid::new_v4(); - let file_path = format!("./test_uploads/{}.txt", doc_id); - - // Write content to file - tokio::fs::write(&file_path, &content).await?; - - // Create document record - sqlx::query( - r#" - INSERT INTO documents ( - id, filename, original_filename, file_path, file_size, - mime_type, content, user_id, ocr_status, created_at, updated_at - ) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, 'pending', NOW(), NOW()) - "# - ) - .bind(doc_id) - .bind(&filename) - .bind(&filename) - .bind(&file_path) - .bind(content.len() as i64) - .bind("text/plain") - .bind(&content) - .bind(user_id) - .execute(&self.pool) - .await?; - - // Enqueue for OCR processing with random priority - let priority = 10 - (i % 5) as i32; // Priorities from 5-10 - self.queue_service.enqueue_document(doc_id, priority, content.len() as i64).await?; - - documents.push((doc_id, content)); - - if i % 10 == 0 { - info!(" āœ… Created {} documents so far", i); - } - } - - info!("āœ… All {} documents created and enqueued", count); - Ok(documents) - } - - async fn start_throttled_workers(&self, num_workers: usize) -> Result<()> { - info!("šŸ­ Starting {} throttled OCR workers", num_workers); - - let mut handles = Vec::new(); - - for worker_num in 1..=num_workers { - let queue_service = self.queue_service.clone(); - - let handle = tokio::spawn(async move { - let worker_id = format!("throttled-worker-{}", worker_num); - info!("Worker {} starting", worker_id); - - // Each worker runs for a limited time to avoid infinite loops - let start_time = Instant::now(); - let max_runtime = Duration::from_secs(300); // 5 minutes max - - // Run a simplified worker loop instead of calling start_worker - // start_worker() consumes the Arc, so we can't call it multiple times - loop { - if start_time.elapsed() > max_runtime { - break; - } - - // Process a single job if available - match queue_service.dequeue().await { - Ok(Some(item)) => { - info!("Worker {} processing job {}", worker_id, item.id); - // Process item using the built-in throttling - let ocr_service = readur::enhanced_ocr::EnhancedOcrService::new("/tmp".to_string()); - if let Err(e) = queue_service.process_item(item, &ocr_service).await { - error!("Worker {} processing error: {}", worker_id, e); - } - } - Ok(None) => { - // No jobs available, wait a bit - tokio::time::sleep(Duration::from_millis(100)).await; - } - Err(e) => { - error!("Worker {} dequeue error: {}", worker_id, e); - tokio::time::sleep(Duration::from_secs(1)).await; - } - } - } - - info!("Worker {} completed", worker_id); - }); - - handles.push(handle); - } - - // Don't wait for all workers to complete - they run in background - Ok(()) - } - - async fn wait_for_completion(&self, expected_docs: usize, timeout_minutes: u64) -> Result<()> { - let start_time = Instant::now(); - let timeout = Duration::from_secs(timeout_minutes * 60); - - info!("ā³ Waiting for {} documents to complete (timeout: {} minutes)", expected_docs, timeout_minutes); - - loop { - if start_time.elapsed() > timeout { - warn!("ā° Timeout reached waiting for OCR completion"); - break; - } - - // Check completion status - let completed_count: i64 = sqlx::query_scalar( - "SELECT COUNT(*) FROM documents WHERE ocr_status = 'completed'" - ) - .fetch_one(&self.pool) - .await?; - - let failed_count: i64 = sqlx::query_scalar( - "SELECT COUNT(*) FROM documents WHERE ocr_status = 'failed'" - ) - .fetch_one(&self.pool) - .await?; - - let processing_count: i64 = sqlx::query_scalar( - "SELECT COUNT(*) FROM documents WHERE ocr_status = 'processing'" - ) - .fetch_one(&self.pool) - .await?; - - let pending_count: i64 = sqlx::query_scalar( - "SELECT COUNT(*) FROM documents WHERE ocr_status = 'pending'" - ) - .fetch_one(&self.pool) - .await?; - - info!("šŸ“Š Status: {} completed, {} failed, {} processing, {} pending", - completed_count, failed_count, processing_count, pending_count); - - if completed_count + failed_count >= expected_docs as i64 { - info!("āœ… All documents have been processed!"); - break; - } - - tokio::time::sleep(Duration::from_secs(5)).await; - } - - Ok(()) - } - - async fn verify_results(&self, expected_documents: &[(Uuid, String)]) -> Result { - info!("šŸ” Verifying OCR results for {} documents", expected_documents.len()); - - let mut results = ThrottleTestResults { - total_documents: expected_documents.len(), - completed: 0, - failed: 0, - corrupted: 0, - empty_content: 0, - correct_content: 0, - }; - - for (doc_id, expected_content) in expected_documents { - let row = sqlx::query( - r#" - SELECT ocr_status, ocr_text, ocr_error, filename - FROM documents - WHERE id = $1 - "# - ) - .bind(doc_id) - .fetch_one(&self.pool) - .await?; - - let status: Option = row.get("ocr_status"); - let ocr_text: Option = row.get("ocr_text"); - let ocr_error: Option = row.get("ocr_error"); - let filename: String = row.get("filename"); - - match status.as_deref() { - Some("completed") => { - results.completed += 1; - - match ocr_text.as_deref() { - Some(text) if text.is_empty() => { - warn!("āŒ Document {} ({}) has empty OCR content", doc_id, filename); - results.empty_content += 1; - } - Some(text) if text == expected_content => { - results.correct_content += 1; - } - Some(text) => { - warn!("āŒ Document {} ({}) has corrupted content:", doc_id, filename); - warn!(" Expected: {}", expected_content); - warn!(" Got: {}", text); - results.corrupted += 1; - } - None => { - warn!("āŒ Document {} ({}) has NULL OCR content", doc_id, filename); - results.empty_content += 1; - } - } - } - Some("failed") => { - results.failed += 1; - info!("āš ļø Document {} ({}) failed: {}", doc_id, filename, - ocr_error.as_deref().unwrap_or("Unknown error")); - } - other => { - warn!("ā“ Document {} ({}) has unexpected status: {:?}", doc_id, filename, other); - } - } - } - - Ok(results) - } - - async fn cleanup(&self) -> Result<()> { - // Clean up test files - let _ = tokio::fs::remove_dir_all("./test_uploads").await; - Ok(()) - } -} - -#[derive(Debug)] -struct ThrottleTestResults { - total_documents: usize, - completed: usize, - failed: usize, - corrupted: usize, - empty_content: usize, - correct_content: usize, -} - -impl ThrottleTestResults { - fn success_rate(&self) -> f64 { - if self.total_documents == 0 { return 0.0; } - (self.correct_content as f64 / self.total_documents as f64) * 100.0 - } - - fn completion_rate(&self) -> f64 { - if self.total_documents == 0 { return 0.0; } - ((self.completed + self.failed) as f64 / self.total_documents as f64) * 100.0 - } -} - -#[tokio::test] -async fn test_throttled_high_concurrency_50_documents() { - println!("šŸš€ THROTTLED HIGH CONCURRENCY TEST - 50 DOCUMENTS"); - println!("================================================"); - - let harness = ThrottledTestHarness::new().await - .expect("Failed to initialize throttled test harness"); - - // Create test user - let user_id = harness.create_test_user().await - .expect("Failed to create test user"); - - // Create 50 test documents - let document_count = 50; - let test_documents = harness.create_test_documents(user_id, document_count).await - .expect("Failed to create test documents"); - - // Start multiple throttled workers - harness.start_throttled_workers(5).await - .expect("Failed to start throttled workers"); - - // Wait for completion with generous timeout - harness.wait_for_completion(document_count, 10).await - .expect("Failed to wait for completion"); - - // Verify results - let results = harness.verify_results(&test_documents).await - .expect("Failed to verify results"); - - // Cleanup - harness.cleanup().await.expect("Failed to cleanup"); - - // Print detailed results - println!("\nšŸ† THROTTLED TEST RESULTS:"); - println!("========================"); - println!("šŸ“Š Total Documents: {}", results.total_documents); - println!("āœ… Completed: {}", results.completed); - println!("āŒ Failed: {}", results.failed); - println!("šŸ”§ Correct Content: {}", results.correct_content); - println!("🚫 Empty Content: {}", results.empty_content); - println!("šŸ’„ Corrupted Content: {}", results.corrupted); - println!("šŸ“ˆ Success Rate: {:.1}%", results.success_rate()); - println!("šŸ“Š Completion Rate: {:.1}%", results.completion_rate()); - - // Assertions - assert!(results.completion_rate() >= 90.0, - "Completion rate too low: {:.1}% (expected >= 90%)", results.completion_rate()); - - assert!(results.empty_content == 0, - "Found {} documents with empty content (should be 0 with throttling)", results.empty_content); - - assert!(results.corrupted == 0, - "Found {} documents with corrupted content (should be 0 with throttling)", results.corrupted); - - assert!(results.success_rate() >= 80.0, - "Success rate too low: {:.1}% (expected >= 80%)", results.success_rate()); - - println!("šŸŽ‰ Throttled high concurrency test PASSED!"); -} \ No newline at end of file