cleanup(tests): delete disabled tests
This commit is contained in:
parent
0b9b935334
commit
be31c14814
|
|
@ -1,663 +0,0 @@
|
||||||
/*!
|
|
||||||
* Debug OCR Pipeline Test - Trace every step to find corruption source
|
|
||||||
*/
|
|
||||||
|
|
||||||
use reqwest::Client;
|
|
||||||
use serde_json::Value;
|
|
||||||
use std::time::{Duration, Instant};
|
|
||||||
use tokio::time::sleep;
|
|
||||||
use uuid::Uuid;
|
|
||||||
use futures;
|
|
||||||
|
|
||||||
use readur::models::{DocumentResponse, CreateUser, LoginRequest, LoginResponse};
|
|
||||||
|
|
||||||
fn get_base_url() -> String {
|
|
||||||
std::env::var("API_URL").unwrap_or_else(|_| "http://localhost:8000".to_string())
|
|
||||||
}
|
|
||||||
const TIMEOUT: Duration = Duration::from_secs(120);
|
|
||||||
|
|
||||||
struct PipelineDebugger {
|
|
||||||
client: Client,
|
|
||||||
token: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl PipelineDebugger {
|
|
||||||
async fn new() -> Self {
|
|
||||||
let client = Client::new();
|
|
||||||
|
|
||||||
// Debug: Print the base URL we're trying to connect to
|
|
||||||
let base_url = get_base_url();
|
|
||||||
println!("🔍 DEBUG: Attempting to connect to server at: {}", base_url);
|
|
||||||
|
|
||||||
// Check server health with better error handling
|
|
||||||
println!("🔍 DEBUG: Checking server health at: {}/api/health", base_url);
|
|
||||||
|
|
||||||
let health_check_result = client
|
|
||||||
.get(&format!("{}/api/health", base_url))
|
|
||||||
.timeout(Duration::from_secs(5))
|
|
||||||
.send()
|
|
||||||
.await;
|
|
||||||
|
|
||||||
match health_check_result {
|
|
||||||
Ok(response) => {
|
|
||||||
println!("🔍 DEBUG: Health check response status: {}", response.status());
|
|
||||||
if !response.status().is_success() {
|
|
||||||
let status = response.status();
|
|
||||||
let body = response.text().await.unwrap_or_else(|_| "Unable to read response body".to_string());
|
|
||||||
panic!("Server not healthy. Status: {}, Body: {}", status, body);
|
|
||||||
}
|
|
||||||
println!("✅ DEBUG: Server health check passed");
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
println!("❌ DEBUG: Failed to connect to server health endpoint");
|
|
||||||
println!("❌ DEBUG: Error type: {:?}", e);
|
|
||||||
if e.is_timeout() {
|
|
||||||
panic!("Health check timed out after 5 seconds");
|
|
||||||
} else if e.is_connect() {
|
|
||||||
panic!("Could not connect to server at {}. Is the server running?", base_url);
|
|
||||||
} else {
|
|
||||||
panic!("Health check failed with error: {}", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create test user
|
|
||||||
let timestamp = std::time::SystemTime::now()
|
|
||||||
.duration_since(std::time::UNIX_EPOCH)
|
|
||||||
.unwrap()
|
|
||||||
.as_millis();
|
|
||||||
let username = format!("pipeline_debug_{}", timestamp);
|
|
||||||
let email = format!("pipeline_debug_{}@test.com", timestamp);
|
|
||||||
|
|
||||||
// Register user
|
|
||||||
let user_data = CreateUser {
|
|
||||||
username: username.clone(),
|
|
||||||
email: email.clone(),
|
|
||||||
password: "testpass123".to_string(),
|
|
||||||
role: Some(readur::models::UserRole::User),
|
|
||||||
};
|
|
||||||
|
|
||||||
let register_response = client
|
|
||||||
.post(&format!("{}/api/auth/register", get_base_url()))
|
|
||||||
.json(&user_data)
|
|
||||||
.send()
|
|
||||||
.await
|
|
||||||
.expect("Registration should work");
|
|
||||||
|
|
||||||
if !register_response.status().is_success() {
|
|
||||||
panic!("Registration failed: {}", register_response.text().await.unwrap_or_default());
|
|
||||||
}
|
|
||||||
|
|
||||||
// Login
|
|
||||||
let login_data = LoginRequest {
|
|
||||||
username: username.clone(),
|
|
||||||
password: "testpass123".to_string(),
|
|
||||||
};
|
|
||||||
|
|
||||||
let login_response = client
|
|
||||||
.post(&format!("{}/api/auth/login", get_base_url()))
|
|
||||||
.json(&login_data)
|
|
||||||
.send()
|
|
||||||
.await
|
|
||||||
.expect("Login should work");
|
|
||||||
|
|
||||||
if !login_response.status().is_success() {
|
|
||||||
panic!("Login failed: {}", login_response.text().await.unwrap_or_default());
|
|
||||||
}
|
|
||||||
|
|
||||||
let login_result: LoginResponse = login_response.json().await.expect("Login should return JSON");
|
|
||||||
let token = login_result.token;
|
|
||||||
|
|
||||||
println!("✅ Pipeline debugger initialized for user: {}", username);
|
|
||||||
|
|
||||||
Self { client, token }
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn upload_document_with_debug(&self, content: &str, filename: &str) -> DocumentResponse {
|
|
||||||
println!("\n📤 UPLOAD PHASE - Starting upload for: {}", filename);
|
|
||||||
println!(" Content: {}", content);
|
|
||||||
println!(" Content Length: {} bytes", content.len());
|
|
||||||
|
|
||||||
let part = reqwest::multipart::Part::text(content.to_string())
|
|
||||||
.file_name(filename.to_string())
|
|
||||||
.mime_str("text/plain")
|
|
||||||
.expect("Valid mime type");
|
|
||||||
let form = reqwest::multipart::Form::new().part("file", part);
|
|
||||||
|
|
||||||
let upload_start = Instant::now();
|
|
||||||
let upload_url = format!("{}/api/documents", get_base_url());
|
|
||||||
println!(" 🔍 DEBUG: Uploading to URL: {}", upload_url);
|
|
||||||
println!(" 🔍 DEBUG: Using token (first 10 chars): {}...", &self.token[..10.min(self.token.len())]);
|
|
||||||
|
|
||||||
let response_result = self.client
|
|
||||||
.post(&upload_url)
|
|
||||||
.header("Authorization", format!("Bearer {}", self.token))
|
|
||||||
.multipart(form)
|
|
||||||
.send()
|
|
||||||
.await;
|
|
||||||
|
|
||||||
let response = match response_result {
|
|
||||||
Ok(resp) => {
|
|
||||||
println!(" 🔍 DEBUG: Upload request sent successfully");
|
|
||||||
resp
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
println!(" ❌ DEBUG: Upload request failed");
|
|
||||||
println!(" ❌ DEBUG: Error type: {:?}", e);
|
|
||||||
if e.is_timeout() {
|
|
||||||
panic!("Upload request timed out");
|
|
||||||
} else if e.is_connect() {
|
|
||||||
panic!("Could not connect to server for upload. Error: {}", e);
|
|
||||||
} else if e.is_request() {
|
|
||||||
panic!("Request building failed: {}", e);
|
|
||||||
} else {
|
|
||||||
panic!("Upload failed with network error: {}", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let upload_duration = upload_start.elapsed();
|
|
||||||
println!(" 🔍 DEBUG: Upload response received. Status: {}", response.status());
|
|
||||||
|
|
||||||
if !response.status().is_success() {
|
|
||||||
let status = response.status();
|
|
||||||
let headers = response.headers().clone();
|
|
||||||
let body = response.text().await.unwrap_or_else(|_| "Unable to read response body".to_string());
|
|
||||||
|
|
||||||
println!(" ❌ DEBUG: Upload failed with status: {}", status);
|
|
||||||
println!(" ❌ DEBUG: Response headers: {:?}", headers);
|
|
||||||
println!(" ❌ DEBUG: Response body: {}", body);
|
|
||||||
|
|
||||||
panic!("Upload failed with status {}: {}", status, body);
|
|
||||||
}
|
|
||||||
|
|
||||||
let document: DocumentResponse = response.json().await.expect("Valid JSON");
|
|
||||||
|
|
||||||
println!(" ✅ Upload completed in {:?}", upload_duration);
|
|
||||||
println!(" 📄 Document ID: {}", document.id);
|
|
||||||
println!(" 📂 Filename: {}", document.filename);
|
|
||||||
println!(" 📏 File Size: {} bytes", document.file_size);
|
|
||||||
println!(" 🏷️ MIME Type: {}", document.mime_type);
|
|
||||||
println!(" 🔄 Initial OCR Status: {:?}", document.ocr_status);
|
|
||||||
|
|
||||||
document
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn trace_ocr_processing(&self, document_id: Uuid, expected_content: &str) -> Value {
|
|
||||||
println!("\n🔍 OCR PROCESSING PHASE - Tracing for document: {}", document_id);
|
|
||||||
|
|
||||||
let start = Instant::now();
|
|
||||||
let mut last_status = String::new();
|
|
||||||
let mut status_changes = Vec::new();
|
|
||||||
let mut poll_count = 0;
|
|
||||||
|
|
||||||
while start.elapsed() < TIMEOUT {
|
|
||||||
poll_count += 1;
|
|
||||||
|
|
||||||
let response = self.client
|
|
||||||
.get(&format!("{}/api/documents/{}/ocr", get_base_url(), document_id))
|
|
||||||
.header("Authorization", format!("Bearer {}", self.token))
|
|
||||||
.send()
|
|
||||||
.await
|
|
||||||
.expect("OCR endpoint should work");
|
|
||||||
|
|
||||||
if !response.status().is_success() {
|
|
||||||
println!(" ❌ OCR endpoint error: {}", response.status());
|
|
||||||
sleep(Duration::from_millis(100)).await;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
let ocr_data: Value = response.json().await.expect("Valid JSON");
|
|
||||||
let current_status = ocr_data["ocr_status"].as_str().unwrap_or("unknown").to_string();
|
|
||||||
|
|
||||||
// Track status changes
|
|
||||||
if current_status != last_status {
|
|
||||||
let elapsed = start.elapsed();
|
|
||||||
status_changes.push((elapsed, current_status.clone()));
|
|
||||||
println!(" 📋 Status Change #{}: {} -> {} (after {:?})",
|
|
||||||
status_changes.len(), last_status, current_status, elapsed);
|
|
||||||
last_status = current_status.clone();
|
|
||||||
}
|
|
||||||
|
|
||||||
// Detailed logging every 10 polls or on status change
|
|
||||||
if poll_count % 10 == 0 || status_changes.len() > 0 {
|
|
||||||
println!(" 🔄 Poll #{}: Status={}, HasText={}, TextLen={}",
|
|
||||||
poll_count,
|
|
||||||
current_status,
|
|
||||||
ocr_data["has_ocr_text"].as_bool().unwrap_or(false),
|
|
||||||
ocr_data["ocr_text"].as_str().unwrap_or("").len()
|
|
||||||
);
|
|
||||||
|
|
||||||
if let Some(confidence) = ocr_data["ocr_confidence"].as_f64() {
|
|
||||||
println!(" 📊 Confidence: {:.1}%", confidence);
|
|
||||||
}
|
|
||||||
if let Some(word_count) = ocr_data["ocr_word_count"].as_i64() {
|
|
||||||
println!(" 📝 Word Count: {}", word_count);
|
|
||||||
}
|
|
||||||
if let Some(error) = ocr_data["ocr_error"].as_str() {
|
|
||||||
println!(" ❌ Error: {}", error);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check if processing is complete
|
|
||||||
match current_status.as_str() {
|
|
||||||
"completed" => {
|
|
||||||
println!(" ✅ OCR Processing completed after {:?} and {} polls", start.elapsed(), poll_count);
|
|
||||||
|
|
||||||
// Detailed final analysis
|
|
||||||
let ocr_text = ocr_data["ocr_text"].as_str().unwrap_or("");
|
|
||||||
println!("\n 🔬 FINAL CONTENT ANALYSIS:");
|
|
||||||
println!(" Expected: {}", expected_content);
|
|
||||||
println!(" Actual: {}", ocr_text);
|
|
||||||
println!(" Match: {}", ocr_text == expected_content);
|
|
||||||
println!(" Expected Length: {} chars", expected_content.len());
|
|
||||||
println!(" Actual Length: {} chars", ocr_text.len());
|
|
||||||
|
|
||||||
if ocr_text != expected_content {
|
|
||||||
println!(" ⚠️ CONTENT MISMATCH DETECTED!");
|
|
||||||
|
|
||||||
// Character-by-character comparison
|
|
||||||
let expected_chars: Vec<char> = expected_content.chars().collect();
|
|
||||||
let actual_chars: Vec<char> = ocr_text.chars().collect();
|
|
||||||
|
|
||||||
for (i, (e, a)) in expected_chars.iter().zip(actual_chars.iter()).enumerate() {
|
|
||||||
if e != a {
|
|
||||||
println!(" Diff at position {}: expected '{}' got '{}'", i, e, a);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return ocr_data;
|
|
||||||
}
|
|
||||||
"failed" => {
|
|
||||||
println!(" ❌ OCR Processing failed after {:?} and {} polls", start.elapsed(), poll_count);
|
|
||||||
return ocr_data;
|
|
||||||
}
|
|
||||||
_ => {
|
|
||||||
// Continue polling
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
sleep(Duration::from_millis(50)).await;
|
|
||||||
}
|
|
||||||
|
|
||||||
panic!("OCR processing did not complete within {:?}", TIMEOUT);
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn get_all_documents(&self) -> Vec<Value> {
|
|
||||||
let response = self.client
|
|
||||||
.get(&format!("{}/api/documents", get_base_url()))
|
|
||||||
.header("Authorization", format!("Bearer {}", self.token))
|
|
||||||
.send()
|
|
||||||
.await
|
|
||||||
.expect("Documents endpoint should work");
|
|
||||||
|
|
||||||
if !response.status().is_success() {
|
|
||||||
panic!("Failed to get documents: {}", response.status());
|
|
||||||
}
|
|
||||||
|
|
||||||
let data: Value = response.json().await.expect("Valid JSON");
|
|
||||||
|
|
||||||
// Handle both paginated and non-paginated response formats
|
|
||||||
match data {
|
|
||||||
Value::Object(obj) if obj.contains_key("documents") => {
|
|
||||||
obj["documents"].as_array().unwrap_or(&vec![]).clone()
|
|
||||||
}
|
|
||||||
Value::Array(arr) => arr,
|
|
||||||
_ => vec![]
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
#[ignore = "Debug test - run manually when needed"]
|
|
||||||
async fn debug_high_concurrency_pipeline() {
|
|
||||||
println!("🚀 STARTING HIGH-CONCURRENCY PIPELINE DEBUG");
|
|
||||||
println!("============================================");
|
|
||||||
|
|
||||||
let debugger = PipelineDebugger::new().await;
|
|
||||||
|
|
||||||
// Create 5 documents with unique, easily identifiable content
|
|
||||||
let documents = vec![
|
|
||||||
("DOC-ALPHA-001-UNIQUE-SIGNATURE-ALPHA", "debug_alpha.txt"),
|
|
||||||
("DOC-BRAVO-002-UNIQUE-SIGNATURE-BRAVO", "debug_bravo.txt"),
|
|
||||||
("DOC-CHARLIE-003-UNIQUE-SIGNATURE-CHARLIE", "debug_charlie.txt"),
|
|
||||||
("DOC-DELTA-004-UNIQUE-SIGNATURE-DELTA", "debug_delta.txt"),
|
|
||||||
("DOC-ECHO-005-UNIQUE-SIGNATURE-ECHO", "debug_echo.txt"),
|
|
||||||
];
|
|
||||||
|
|
||||||
println!("\n📝 TEST DOCUMENTS:");
|
|
||||||
for (i, (content, filename)) in documents.iter().enumerate() {
|
|
||||||
println!(" {}: {} -> {}", i+1, filename, content);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Phase 1: Upload all documents simultaneously
|
|
||||||
println!("\n🏁 PHASE 1: SIMULTANEOUS UPLOAD");
|
|
||||||
println!("================================");
|
|
||||||
|
|
||||||
let upload_start = Instant::now();
|
|
||||||
|
|
||||||
// Execute all uploads concurrently
|
|
||||||
let uploaded_docs = futures::future::join_all(
|
|
||||||
documents.iter().map(|(content, filename)| {
|
|
||||||
debugger.upload_document_with_debug(content, filename)
|
|
||||||
}).collect::<Vec<_>>()
|
|
||||||
).await;
|
|
||||||
let upload_duration = upload_start.elapsed();
|
|
||||||
|
|
||||||
println!("\n✅ ALL UPLOADS COMPLETED in {:?}", upload_duration);
|
|
||||||
|
|
||||||
// Phase 2: Trace OCR processing for each document
|
|
||||||
println!("\n🔬 PHASE 2: OCR PROCESSING TRACE");
|
|
||||||
println!("================================");
|
|
||||||
|
|
||||||
let mut ocr_tasks = Vec::new();
|
|
||||||
|
|
||||||
for (i, doc) in uploaded_docs.iter().enumerate() {
|
|
||||||
let doc_id = doc.id;
|
|
||||||
let expected_content = documents[i].0.to_string();
|
|
||||||
let debugger_ref = &debugger;
|
|
||||||
|
|
||||||
let task = async move {
|
|
||||||
let result = debugger_ref.trace_ocr_processing(doc_id, &expected_content).await;
|
|
||||||
(doc_id, expected_content, result)
|
|
||||||
};
|
|
||||||
|
|
||||||
ocr_tasks.push(task);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Process all OCR traces concurrently
|
|
||||||
let ocr_results = futures::future::join_all(ocr_tasks).await;
|
|
||||||
|
|
||||||
// Phase 3: Comprehensive analysis
|
|
||||||
println!("\n📊 PHASE 3: COMPREHENSIVE ANALYSIS");
|
|
||||||
println!("===================================");
|
|
||||||
|
|
||||||
let mut corrupted_docs = Vec::new();
|
|
||||||
let mut successful_docs = Vec::new();
|
|
||||||
|
|
||||||
for (doc_id, expected_content, ocr_result) in ocr_results {
|
|
||||||
let actual_text = ocr_result["ocr_text"].as_str().unwrap_or("");
|
|
||||||
let status = ocr_result["ocr_status"].as_str().unwrap_or("unknown");
|
|
||||||
|
|
||||||
println!("\n📄 Document Analysis: {}", doc_id);
|
|
||||||
println!(" Status: {}", status);
|
|
||||||
println!(" Expected: {}", expected_content);
|
|
||||||
println!(" Actual: {}", actual_text);
|
|
||||||
|
|
||||||
if status == "completed" {
|
|
||||||
if actual_text == expected_content {
|
|
||||||
println!(" ✅ CONTENT CORRECT");
|
|
||||||
successful_docs.push(doc_id);
|
|
||||||
} else {
|
|
||||||
println!(" ❌ CONTENT CORRUPTED");
|
|
||||||
corrupted_docs.push((doc_id, expected_content.clone(), actual_text.to_string()));
|
|
||||||
|
|
||||||
// Check if it contains any other document's content
|
|
||||||
for (other_expected, _) in &documents {
|
|
||||||
if other_expected != &expected_content && actual_text.contains(other_expected) {
|
|
||||||
println!(" 🔄 Contains content from: {}", other_expected);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
println!(" ⚠️ NON-COMPLETED STATUS: {}", status);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Phase 4: System state analysis
|
|
||||||
println!("\n🏗️ PHASE 4: SYSTEM STATE ANALYSIS");
|
|
||||||
println!("===================================");
|
|
||||||
|
|
||||||
let all_docs = debugger.get_all_documents().await;
|
|
||||||
println!("📋 Total documents in system: {}", all_docs.len());
|
|
||||||
|
|
||||||
for doc in &all_docs {
|
|
||||||
if let (Some(id), Some(filename), Some(status)) = (
|
|
||||||
doc["id"].as_str(),
|
|
||||||
doc["filename"].as_str(),
|
|
||||||
doc["ocr_status"].as_str()
|
|
||||||
) {
|
|
||||||
println!(" 📄 {}: {} -> {}", id, filename, status);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Final verdict
|
|
||||||
println!("\n🏆 FINAL VERDICT");
|
|
||||||
println!("================");
|
|
||||||
println!("✅ Successful: {}", successful_docs.len());
|
|
||||||
println!("❌ Corrupted: {}", corrupted_docs.len());
|
|
||||||
|
|
||||||
if corrupted_docs.is_empty() {
|
|
||||||
println!("🎉 NO CORRUPTION DETECTED!");
|
|
||||||
} else {
|
|
||||||
println!("🚨 CORRUPTION DETECTED IN {} DOCUMENTS:", corrupted_docs.len());
|
|
||||||
for (doc_id, expected, actual) in &corrupted_docs {
|
|
||||||
println!(" 📄 {}: expected '{}' got '{}'", doc_id, expected, actual);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Try to identify patterns
|
|
||||||
if corrupted_docs.iter().all(|(_, _, actual)| actual.is_empty()) {
|
|
||||||
println!("🔍 PATTERN: All corrupted documents have EMPTY content");
|
|
||||||
} else if corrupted_docs.iter().all(|(_, _, actual)| actual == &corrupted_docs[0].2) {
|
|
||||||
println!("🔍 PATTERN: All corrupted documents have IDENTICAL content: '{}'", corrupted_docs[0].2);
|
|
||||||
} else {
|
|
||||||
println!("🔍 PATTERN: Mixed corruption types detected");
|
|
||||||
}
|
|
||||||
|
|
||||||
panic!("CORRUPTION DETECTED - see analysis above");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
#[ignore = "Debug test - run manually when needed"]
|
|
||||||
async fn debug_extreme_high_concurrency_pipeline() {
|
|
||||||
println!("🚀 STARTING EXTREME HIGH-CONCURRENCY PIPELINE STRESS TEST");
|
|
||||||
println!("========================================================");
|
|
||||||
|
|
||||||
let debugger = PipelineDebugger::new().await;
|
|
||||||
|
|
||||||
// Create 50+ documents with unique, easily identifiable content
|
|
||||||
let mut documents = Vec::new();
|
|
||||||
for i in 1..=55 {
|
|
||||||
let content = format!("STRESS-TEST-DOCUMENT-{:03}-UNIQUE-SIGNATURE-{:03}", i, i);
|
|
||||||
let filename = format!("stress_test_{:03}.txt", i);
|
|
||||||
documents.push((content, filename));
|
|
||||||
}
|
|
||||||
|
|
||||||
println!("\n📝 STRESS TEST SETUP:");
|
|
||||||
println!(" 📊 Total Documents: {}", documents.len());
|
|
||||||
println!(" 🔄 Concurrent Processing: All {} documents simultaneously", documents.len());
|
|
||||||
println!(" 🎯 Goal: Zero corruption across all documents");
|
|
||||||
|
|
||||||
// Phase 1: Upload all documents simultaneously
|
|
||||||
println!("\n🏁 PHASE 1: SIMULTANEOUS UPLOAD");
|
|
||||||
println!("================================");
|
|
||||||
|
|
||||||
let upload_start = Instant::now();
|
|
||||||
|
|
||||||
// Execute all uploads concurrently
|
|
||||||
let uploaded_docs = futures::future::join_all(
|
|
||||||
documents.iter().map(|(content, filename)| {
|
|
||||||
debugger.upload_document_with_debug(content, filename)
|
|
||||||
}).collect::<Vec<_>>()
|
|
||||||
).await;
|
|
||||||
let upload_duration = upload_start.elapsed();
|
|
||||||
|
|
||||||
println!("\n✅ ALL UPLOADS COMPLETED in {:?}", upload_duration);
|
|
||||||
|
|
||||||
// Phase 2: Trace OCR processing for each document
|
|
||||||
println!("\n🔬 PHASE 2: OCR PROCESSING TRACE");
|
|
||||||
println!("================================");
|
|
||||||
|
|
||||||
let mut ocr_tasks = Vec::new();
|
|
||||||
|
|
||||||
for (i, doc) in uploaded_docs.iter().enumerate() {
|
|
||||||
let doc_id = doc.id;
|
|
||||||
let expected_content = documents[i].0.to_string();
|
|
||||||
let debugger_ref = &debugger;
|
|
||||||
|
|
||||||
let task = async move {
|
|
||||||
let result = debugger_ref.trace_ocr_processing(doc_id, &expected_content).await;
|
|
||||||
(doc_id, expected_content, result)
|
|
||||||
};
|
|
||||||
|
|
||||||
ocr_tasks.push(task);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Process all OCR traces concurrently
|
|
||||||
let ocr_results = futures::future::join_all(ocr_tasks).await;
|
|
||||||
|
|
||||||
// Phase 3: Comprehensive analysis
|
|
||||||
println!("\n📊 PHASE 3: COMPREHENSIVE ANALYSIS");
|
|
||||||
println!("===================================");
|
|
||||||
|
|
||||||
let mut corrupted_docs = Vec::new();
|
|
||||||
let mut successful_docs = Vec::new();
|
|
||||||
|
|
||||||
for (doc_id, expected_content, ocr_result) in ocr_results {
|
|
||||||
let actual_text = ocr_result["ocr_text"].as_str().unwrap_or("");
|
|
||||||
let status = ocr_result["ocr_status"].as_str().unwrap_or("unknown");
|
|
||||||
|
|
||||||
println!("\n📄 Document Analysis: {}", doc_id);
|
|
||||||
println!(" Status: {}", status);
|
|
||||||
println!(" Expected: {}", expected_content);
|
|
||||||
println!(" Actual: {}", actual_text);
|
|
||||||
|
|
||||||
if status == "completed" {
|
|
||||||
if actual_text == expected_content {
|
|
||||||
println!(" ✅ CONTENT CORRECT");
|
|
||||||
successful_docs.push(doc_id);
|
|
||||||
} else {
|
|
||||||
println!(" ❌ CONTENT CORRUPTED");
|
|
||||||
corrupted_docs.push((doc_id, expected_content.clone(), actual_text.to_string()));
|
|
||||||
|
|
||||||
// Check if it contains any other document's content
|
|
||||||
for (other_expected, _) in &documents {
|
|
||||||
if other_expected != &expected_content && actual_text.contains(other_expected) {
|
|
||||||
println!(" 🔄 Contains content from: {}", other_expected);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
println!(" ⚠️ NON-COMPLETED STATUS: {}", status);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Phase 4: System state analysis
|
|
||||||
println!("\n🏗️ PHASE 4: SYSTEM STATE ANALYSIS");
|
|
||||||
println!("===================================");
|
|
||||||
|
|
||||||
let all_docs = debugger.get_all_documents().await;
|
|
||||||
println!("📋 Total documents in system: {}", all_docs.len());
|
|
||||||
|
|
||||||
for doc in &all_docs {
|
|
||||||
if let (Some(id), Some(filename), Some(status)) = (
|
|
||||||
doc["id"].as_str(),
|
|
||||||
doc["filename"].as_str(),
|
|
||||||
doc["ocr_status"].as_str()
|
|
||||||
) {
|
|
||||||
println!(" 📄 {}: {} -> {}", id, filename, status);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Final verdict
|
|
||||||
println!("\n🏆 FINAL VERDICT");
|
|
||||||
println!("================");
|
|
||||||
println!("✅ Successful: {}", successful_docs.len());
|
|
||||||
println!("❌ Corrupted: {}", corrupted_docs.len());
|
|
||||||
|
|
||||||
if corrupted_docs.is_empty() {
|
|
||||||
println!("🎉 NO CORRUPTION DETECTED!");
|
|
||||||
} else {
|
|
||||||
println!("🚨 CORRUPTION DETECTED IN {} DOCUMENTS:", corrupted_docs.len());
|
|
||||||
for (doc_id, expected, actual) in &corrupted_docs {
|
|
||||||
println!(" 📄 {}: expected '{}' got '{}'", doc_id, expected, actual);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Try to identify patterns
|
|
||||||
if corrupted_docs.iter().all(|(_, _, actual)| actual.is_empty()) {
|
|
||||||
println!("🔍 PATTERN: All corrupted documents have EMPTY content");
|
|
||||||
} else if corrupted_docs.iter().all(|(_, _, actual)| actual == &corrupted_docs[0].2) {
|
|
||||||
println!("🔍 PATTERN: All corrupted documents have IDENTICAL content: '{}'", corrupted_docs[0].2);
|
|
||||||
} else {
|
|
||||||
println!("🔍 PATTERN: Mixed corruption types detected");
|
|
||||||
}
|
|
||||||
|
|
||||||
panic!("CORRUPTION DETECTED - see analysis above");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
#[ignore = "Debug test - run manually when needed"]
|
|
||||||
async fn debug_document_upload_race_conditions() {
|
|
||||||
println!("🔍 DEBUGGING DOCUMENT UPLOAD PROCESS");
|
|
||||||
println!("====================================");
|
|
||||||
|
|
||||||
// First, let's do a basic connectivity test
|
|
||||||
println!("🔍 DEBUG: Testing basic network connectivity...");
|
|
||||||
let test_client = reqwest::Client::new();
|
|
||||||
let base_url = get_base_url();
|
|
||||||
println!("🔍 DEBUG: Base URL from environment: {}", base_url);
|
|
||||||
|
|
||||||
// Try a simple GET request first
|
|
||||||
match test_client.get(&base_url).send().await {
|
|
||||||
Ok(resp) => {
|
|
||||||
println!("✅ DEBUG: Basic connectivity test passed. Status: {}", resp.status());
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
println!("❌ DEBUG: Basic connectivity test failed");
|
|
||||||
println!("❌ DEBUG: Error: {:?}", e);
|
|
||||||
panic!("Cannot connect to server at {}. Error: {}", base_url, e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let debugger = PipelineDebugger::new().await;
|
|
||||||
|
|
||||||
// Upload same content with different filenames to test:
|
|
||||||
// 1. Concurrent upload race condition handling (no 500 errors)
|
|
||||||
// 2. Proper deduplication (identical content = same document ID)
|
|
||||||
let same_content = "IDENTICAL-CONTENT-FOR-RACE-CONDITION-TEST";
|
|
||||||
let task1 = debugger.upload_document_with_debug(same_content, "race1.txt");
|
|
||||||
let task2 = debugger.upload_document_with_debug(same_content, "race2.txt");
|
|
||||||
let task3 = debugger.upload_document_with_debug(same_content, "race3.txt");
|
|
||||||
|
|
||||||
let (doc1, doc2, doc3) = futures::future::join3(task1, task2, task3).await;
|
|
||||||
let docs = vec![doc1, doc2, doc3];
|
|
||||||
|
|
||||||
println!("\n📊 UPLOAD RACE CONDITION ANALYSIS:");
|
|
||||||
for (i, doc) in docs.iter().enumerate() {
|
|
||||||
println!(" Doc {}: ID={}, Filename={}, Size={}",
|
|
||||||
i+1, doc.id, doc.filename, doc.file_size);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check deduplication behavior: identical content should result in same document ID
|
|
||||||
let mut ids: Vec<_> = docs.iter().map(|d| d.id).collect();
|
|
||||||
ids.sort();
|
|
||||||
ids.dedup();
|
|
||||||
|
|
||||||
if ids.len() == 1 {
|
|
||||||
println!("✅ Correct deduplication: All identical content maps to same document ID");
|
|
||||||
println!("✅ Race condition handled properly: No 500 errors during concurrent uploads");
|
|
||||||
} else if ids.len() == docs.len() {
|
|
||||||
println!("❌ UNEXPECTED: All documents have unique IDs despite identical content");
|
|
||||||
panic!("Deduplication not working - identical content should map to same document");
|
|
||||||
} else {
|
|
||||||
println!("❌ PARTIAL DEDUPLICATION: Some duplicates detected but not all");
|
|
||||||
panic!("Inconsistent deduplication behavior");
|
|
||||||
}
|
|
||||||
|
|
||||||
// Verify all documents have the same content hash (should be identical)
|
|
||||||
let content_hashes: Vec<_> = docs.iter().map(|d| {
|
|
||||||
// We can't directly access file_hash from DocumentResponse, but we can verify
|
|
||||||
// they all have the same file size as a proxy for same content
|
|
||||||
d.file_size
|
|
||||||
}).collect();
|
|
||||||
|
|
||||||
if content_hashes.iter().all(|&size| size == content_hashes[0]) {
|
|
||||||
println!("✅ All documents have same file size (content verification)");
|
|
||||||
} else {
|
|
||||||
println!("❌ Documents have different file sizes - test setup error");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
@ -1,619 +0,0 @@
|
||||||
/*!
|
|
||||||
* Manual Sync Triggering Unit Tests
|
|
||||||
*
|
|
||||||
* Tests for manual sync triggering functionality including:
|
|
||||||
* - API endpoint testing
|
|
||||||
* - Source status validation
|
|
||||||
* - Conflict detection (already syncing)
|
|
||||||
* - Permission and authentication checks
|
|
||||||
* - Error handling and recovery
|
|
||||||
* - Integration with source scheduler
|
|
||||||
*/
|
|
||||||
|
|
||||||
use std::sync::Arc;
|
|
||||||
use std::collections::HashMap;
|
|
||||||
use std::time::{SystemTime, Duration};
|
|
||||||
use uuid::Uuid;
|
|
||||||
use chrono::Utc;
|
|
||||||
use serde_json::json;
|
|
||||||
use axum::http::StatusCode;
|
|
||||||
|
|
||||||
use readur::{
|
|
||||||
AppState,
|
|
||||||
config::Config,
|
|
||||||
db::Database,
|
|
||||||
models::{Source, SourceType, SourceStatus, WebDAVSourceConfig, User, UserRole},
|
|
||||||
auth::AuthUser,
|
|
||||||
routes::sources,
|
|
||||||
};
|
|
||||||
|
|
||||||
/// Create a test app state
|
|
||||||
async fn create_test_app_state() -> Arc<AppState> {
|
|
||||||
let config = Config {
|
|
||||||
database_url: "sqlite::memory:".to_string(),
|
|
||||||
server_address: "127.0.0.1:8080".to_string(),
|
|
||||||
jwt_secret: "test_secret".to_string(),
|
|
||||||
upload_path: "/tmp/test_uploads".to_string(),
|
|
||||||
watch_folder: "/tmp/test_watch".to_string(),
|
|
||||||
allowed_file_types: vec!["pdf".to_string(), "txt".to_string()],
|
|
||||||
watch_interval_seconds: Some(30),
|
|
||||||
file_stability_check_ms: Some(500),
|
|
||||||
max_file_age_hours: None,
|
|
||||||
ocr_language: "eng".to_string(),
|
|
||||||
concurrent_ocr_jobs: 2,
|
|
||||||
ocr_timeout_seconds: 60,
|
|
||||||
max_file_size_mb: 10,
|
|
||||||
memory_limit_mb: 256,
|
|
||||||
cpu_priority: "normal".to_string(),
|
|
||||||
};
|
|
||||||
|
|
||||||
let db = Database::new(&config.database_url).await.unwrap();
|
|
||||||
let queue_service = std::sync::Arc::new(readur::ocr_queue::OcrQueueService::new(db.clone(), db.pool.clone(), 2));
|
|
||||||
|
|
||||||
Arc::new(AppState {
|
|
||||||
db,
|
|
||||||
config,
|
|
||||||
webdav_scheduler: None,
|
|
||||||
source_scheduler: None,
|
|
||||||
queue_service,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Create a test user
|
|
||||||
fn create_test_user() -> User {
|
|
||||||
User {
|
|
||||||
id: Uuid::new_v4(),
|
|
||||||
username: "testuser".to_string(),
|
|
||||||
email: "test@example.com".to_string(),
|
|
||||||
password_hash: "hashed_password".to_string(),
|
|
||||||
role: UserRole::User,
|
|
||||||
created_at: Utc::now(),
|
|
||||||
updated_at: Utc::now(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Create a test source in various states
|
|
||||||
fn create_test_source_with_status(status: SourceStatus, user_id: Uuid) -> Source {
|
|
||||||
Source {
|
|
||||||
id: Uuid::new_v4(),
|
|
||||||
user_id,
|
|
||||||
name: "Test WebDAV Source".to_string(),
|
|
||||||
source_type: SourceType::WebDAV,
|
|
||||||
enabled: true,
|
|
||||||
config: json!({
|
|
||||||
"server_url": "https://cloud.example.com",
|
|
||||||
"username": "testuser",
|
|
||||||
"password": "testpass",
|
|
||||||
"watch_folders": ["/Documents"],
|
|
||||||
"file_extensions": [".pdf", ".txt"],
|
|
||||||
"auto_sync": true,
|
|
||||||
"sync_interval_minutes": 60,
|
|
||||||
"server_type": "nextcloud"
|
|
||||||
}),
|
|
||||||
status,
|
|
||||||
last_sync_at: None,
|
|
||||||
last_error: None,
|
|
||||||
last_error_at: None,
|
|
||||||
total_files_synced: 0,
|
|
||||||
total_files_pending: 0,
|
|
||||||
total_size_bytes: 0,
|
|
||||||
created_at: Utc::now(),
|
|
||||||
updated_at: Utc::now(),
|
|
||||||
validation_status: None,
|
|
||||||
last_validation_at: None,
|
|
||||||
validation_score: None,
|
|
||||||
validation_issues: None,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn test_manual_sync_trigger_idle_source() {
|
|
||||||
let state = create_test_app_state().await;
|
|
||||||
let user = create_test_user();
|
|
||||||
let source = create_test_source_with_status(SourceStatus::Idle, user.id);
|
|
||||||
|
|
||||||
// Test that idle source can be triggered for sync
|
|
||||||
let can_trigger = can_trigger_manual_sync(&source);
|
|
||||||
assert!(can_trigger, "Idle source should be available for manual sync");
|
|
||||||
|
|
||||||
// Test status update to syncing
|
|
||||||
let updated_status = SourceStatus::Syncing;
|
|
||||||
assert_ne!(source.status, updated_status);
|
|
||||||
assert!(is_valid_sync_trigger_transition(&source.status, &updated_status));
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn test_manual_sync_trigger_already_syncing() {
|
|
||||||
let state = create_test_app_state().await;
|
|
||||||
let user = create_test_user();
|
|
||||||
let source = create_test_source_with_status(SourceStatus::Syncing, user.id);
|
|
||||||
|
|
||||||
// Test that already syncing source cannot be triggered again
|
|
||||||
let can_trigger = can_trigger_manual_sync(&source);
|
|
||||||
assert!(!can_trigger, "Already syncing source should not allow manual sync");
|
|
||||||
|
|
||||||
// This should result in HTTP 409 Conflict
|
|
||||||
let expected_status = StatusCode::CONFLICT;
|
|
||||||
let result_status = get_expected_status_for_sync_trigger(&source);
|
|
||||||
assert_eq!(result_status, expected_status);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn test_manual_sync_trigger_error_state() {
|
|
||||||
let state = create_test_app_state().await;
|
|
||||||
let user = create_test_user();
|
|
||||||
let mut source = create_test_source_with_status(SourceStatus::Error, user.id);
|
|
||||||
source.last_error = Some("Previous sync failed".to_string());
|
|
||||||
source.last_error_at = Some(Utc::now());
|
|
||||||
|
|
||||||
// Test that source in error state can be triggered (retry)
|
|
||||||
let can_trigger = can_trigger_manual_sync(&source);
|
|
||||||
assert!(can_trigger, "Source in error state should allow manual sync retry");
|
|
||||||
|
|
||||||
// Test status transition from error to syncing
|
|
||||||
assert!(is_valid_sync_trigger_transition(&source.status, &SourceStatus::Syncing));
|
|
||||||
}
|
|
||||||
|
|
||||||
fn can_trigger_manual_sync(source: &Source) -> bool {
|
|
||||||
match source.status {
|
|
||||||
SourceStatus::Idle => true,
|
|
||||||
SourceStatus::Error => true,
|
|
||||||
SourceStatus::Syncing => false,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn is_valid_sync_trigger_transition(from: &SourceStatus, to: &SourceStatus) -> bool {
|
|
||||||
match (from, to) {
|
|
||||||
(SourceStatus::Idle, SourceStatus::Syncing) => true,
|
|
||||||
(SourceStatus::Error, SourceStatus::Syncing) => true,
|
|
||||||
_ => false,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn get_expected_status_for_sync_trigger(source: &Source) -> StatusCode {
|
|
||||||
match source.status {
|
|
||||||
SourceStatus::Idle => StatusCode::OK,
|
|
||||||
SourceStatus::Error => StatusCode::OK,
|
|
||||||
SourceStatus::Syncing => StatusCode::CONFLICT,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn test_source_ownership_validation() {
|
|
||||||
let user_1 = create_test_user();
|
|
||||||
let user_2 = User {
|
|
||||||
id: Uuid::new_v4(),
|
|
||||||
username: "otheruser".to_string(),
|
|
||||||
email: "other@example.com".to_string(),
|
|
||||||
password_hash: "other_hash".to_string(),
|
|
||||||
role: UserRole::User,
|
|
||||||
created_at: Utc::now(),
|
|
||||||
updated_at: Utc::now(),
|
|
||||||
};
|
|
||||||
|
|
||||||
let source = create_test_source_with_status(SourceStatus::Idle, user_1.id);
|
|
||||||
|
|
||||||
// Test that owner can trigger sync
|
|
||||||
assert!(can_user_trigger_sync(&user_1, &source));
|
|
||||||
|
|
||||||
// Test that non-owner cannot trigger sync
|
|
||||||
assert!(!can_user_trigger_sync(&user_2, &source));
|
|
||||||
|
|
||||||
// Test admin can trigger any sync
|
|
||||||
let admin_user = User {
|
|
||||||
id: Uuid::new_v4(),
|
|
||||||
username: "admin".to_string(),
|
|
||||||
email: "admin@example.com".to_string(),
|
|
||||||
password_hash: "admin_hash".to_string(),
|
|
||||||
role: UserRole::Admin,
|
|
||||||
created_at: Utc::now(),
|
|
||||||
updated_at: Utc::now(),
|
|
||||||
};
|
|
||||||
|
|
||||||
assert!(can_user_trigger_sync(&admin_user, &source));
|
|
||||||
}
|
|
||||||
|
|
||||||
fn can_user_trigger_sync(user: &User, source: &Source) -> bool {
|
|
||||||
user.role == UserRole::Admin || user.id == source.user_id
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_sync_trigger_request_validation() {
|
|
||||||
// Test valid source IDs
|
|
||||||
let valid_id = Uuid::new_v4();
|
|
||||||
assert!(is_valid_source_id(&valid_id.to_string()));
|
|
||||||
|
|
||||||
// Test invalid source IDs
|
|
||||||
let invalid_ids = vec![
|
|
||||||
"",
|
|
||||||
"invalid-uuid",
|
|
||||||
"12345",
|
|
||||||
"not-a-uuid-at-all",
|
|
||||||
];
|
|
||||||
|
|
||||||
for invalid_id in invalid_ids {
|
|
||||||
assert!(!is_valid_source_id(invalid_id), "Should reject invalid UUID: {}", invalid_id);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn is_valid_source_id(id_str: &str) -> bool {
|
|
||||||
Uuid::parse_str(id_str).is_ok()
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_sync_trigger_rate_limiting() {
|
|
||||||
|
|
||||||
// Test rate limiting for manual sync triggers
|
|
||||||
let mut rate_limiter = SyncRateLimiter::new();
|
|
||||||
let source_id = Uuid::new_v4();
|
|
||||||
|
|
||||||
// First trigger should be allowed
|
|
||||||
assert!(rate_limiter.can_trigger_sync(&source_id));
|
|
||||||
rate_limiter.record_sync_trigger(&source_id);
|
|
||||||
|
|
||||||
// Immediate second trigger should be blocked
|
|
||||||
assert!(!rate_limiter.can_trigger_sync(&source_id));
|
|
||||||
|
|
||||||
// After cooldown period, should be allowed again
|
|
||||||
rate_limiter.advance_time(Duration::from_secs(61)); // Advance past cooldown
|
|
||||||
assert!(rate_limiter.can_trigger_sync(&source_id));
|
|
||||||
}
|
|
||||||
|
|
||||||
struct SyncRateLimiter {
|
|
||||||
last_triggers: HashMap<Uuid, SystemTime>,
|
|
||||||
cooldown_period: Duration,
|
|
||||||
current_time: SystemTime,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl SyncRateLimiter {
|
|
||||||
fn new() -> Self {
|
|
||||||
Self {
|
|
||||||
last_triggers: HashMap::new(),
|
|
||||||
cooldown_period: Duration::from_secs(60), // 1 minute cooldown
|
|
||||||
current_time: SystemTime::now(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn can_trigger_sync(&self, source_id: &Uuid) -> bool {
|
|
||||||
if let Some(&last_trigger) = self.last_triggers.get(source_id) {
|
|
||||||
self.current_time.duration_since(last_trigger).unwrap_or(Duration::ZERO) >= self.cooldown_period
|
|
||||||
} else {
|
|
||||||
true // Never triggered before
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn record_sync_trigger(&mut self, source_id: &Uuid) {
|
|
||||||
self.last_triggers.insert(*source_id, self.current_time);
|
|
||||||
}
|
|
||||||
|
|
||||||
fn advance_time(&mut self, duration: Duration) {
|
|
||||||
self.current_time += duration;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn test_sync_trigger_with_disabled_source() {
|
|
||||||
let state = create_test_app_state().await;
|
|
||||||
let user = create_test_user();
|
|
||||||
let mut source = create_test_source_with_status(SourceStatus::Idle, user.id);
|
|
||||||
source.enabled = false; // Disable the source
|
|
||||||
|
|
||||||
// Test that disabled source cannot be triggered
|
|
||||||
let can_trigger = can_trigger_disabled_source(&source);
|
|
||||||
assert!(!can_trigger, "Disabled source should not allow manual sync");
|
|
||||||
|
|
||||||
// This should result in HTTP 400 Bad Request
|
|
||||||
let expected_status = if source.enabled {
|
|
||||||
StatusCode::OK
|
|
||||||
} else {
|
|
||||||
StatusCode::BAD_REQUEST
|
|
||||||
};
|
|
||||||
|
|
||||||
assert_eq!(expected_status, StatusCode::BAD_REQUEST);
|
|
||||||
}
|
|
||||||
|
|
||||||
fn can_trigger_disabled_source(source: &Source) -> bool {
|
|
||||||
source.enabled && can_trigger_manual_sync(source)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_sync_trigger_configuration_validation() {
|
|
||||||
let user_id = Uuid::new_v4();
|
|
||||||
|
|
||||||
// Test valid WebDAV configuration
|
|
||||||
let valid_source = create_test_source_with_status(SourceStatus::Idle, user_id);
|
|
||||||
let config_result: Result<WebDAVSourceConfig, _> = serde_json::from_value(valid_source.config.clone());
|
|
||||||
assert!(config_result.is_ok(), "Valid configuration should parse successfully");
|
|
||||||
|
|
||||||
// Test invalid configuration
|
|
||||||
let mut invalid_source = create_test_source_with_status(SourceStatus::Idle, user_id);
|
|
||||||
invalid_source.config = json!({
|
|
||||||
"server_url": "", // Invalid empty URL
|
|
||||||
"username": "test",
|
|
||||||
"password": "test"
|
|
||||||
// Missing required fields
|
|
||||||
});
|
|
||||||
|
|
||||||
let invalid_config_result: Result<WebDAVSourceConfig, _> = serde_json::from_value(invalid_source.config.clone());
|
|
||||||
assert!(invalid_config_result.is_err(), "Invalid configuration should fail to parse");
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_concurrent_sync_trigger_protection() {
|
|
||||||
use std::sync::{Arc, Mutex};
|
|
||||||
use std::collections::HashSet;
|
|
||||||
use std::thread;
|
|
||||||
|
|
||||||
let active_syncs: Arc<Mutex<HashSet<Uuid>>> = Arc::new(Mutex::new(HashSet::new()));
|
|
||||||
let source_id = Uuid::new_v4();
|
|
||||||
|
|
||||||
let mut handles = vec![];
|
|
||||||
let results = Arc::new(Mutex::new(Vec::new()));
|
|
||||||
|
|
||||||
// Simulate multiple concurrent trigger attempts
|
|
||||||
for _ in 0..5 {
|
|
||||||
let active_syncs = Arc::clone(&active_syncs);
|
|
||||||
let results = Arc::clone(&results);
|
|
||||||
|
|
||||||
let handle = thread::spawn(move || {
|
|
||||||
let mut syncs = active_syncs.lock().unwrap();
|
|
||||||
let was_inserted = syncs.insert(source_id);
|
|
||||||
|
|
||||||
results.lock().unwrap().push(was_inserted);
|
|
||||||
|
|
||||||
// Simulate some work
|
|
||||||
std::thread::sleep(std::time::Duration::from_millis(10));
|
|
||||||
});
|
|
||||||
|
|
||||||
handles.push(handle);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Wait for all threads
|
|
||||||
for handle in handles {
|
|
||||||
handle.join().unwrap();
|
|
||||||
}
|
|
||||||
|
|
||||||
let final_results = results.lock().unwrap();
|
|
||||||
let successful_triggers = final_results.iter().filter(|&&success| success).count();
|
|
||||||
|
|
||||||
// Only one thread should have successfully triggered the sync
|
|
||||||
assert_eq!(successful_triggers, 1, "Only one concurrent sync trigger should succeed");
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_sync_trigger_error_responses() {
|
|
||||||
// Test various error scenarios and their expected HTTP responses
|
|
||||||
let test_cases = vec![
|
|
||||||
(SyncTriggerError::SourceNotFound, StatusCode::NOT_FOUND),
|
|
||||||
(SyncTriggerError::AlreadySyncing, StatusCode::CONFLICT),
|
|
||||||
(SyncTriggerError::SourceDisabled, StatusCode::BAD_REQUEST),
|
|
||||||
(SyncTriggerError::InvalidConfiguration, StatusCode::BAD_REQUEST),
|
|
||||||
(SyncTriggerError::PermissionDenied, StatusCode::FORBIDDEN),
|
|
||||||
(SyncTriggerError::RateLimited, StatusCode::TOO_MANY_REQUESTS),
|
|
||||||
(SyncTriggerError::InternalError, StatusCode::INTERNAL_SERVER_ERROR),
|
|
||||||
];
|
|
||||||
|
|
||||||
for (error, expected_status) in test_cases {
|
|
||||||
let status = error.to_status_code();
|
|
||||||
assert_eq!(status, expected_status, "Wrong status code for error: {:?}", error);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
|
||||||
enum SyncTriggerError {
|
|
||||||
SourceNotFound,
|
|
||||||
AlreadySyncing,
|
|
||||||
SourceDisabled,
|
|
||||||
InvalidConfiguration,
|
|
||||||
PermissionDenied,
|
|
||||||
RateLimited,
|
|
||||||
InternalError,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl SyncTriggerError {
|
|
||||||
fn to_status_code(&self) -> StatusCode {
|
|
||||||
match self {
|
|
||||||
SyncTriggerError::SourceNotFound => StatusCode::NOT_FOUND,
|
|
||||||
SyncTriggerError::AlreadySyncing => StatusCode::CONFLICT,
|
|
||||||
SyncTriggerError::SourceDisabled => StatusCode::BAD_REQUEST,
|
|
||||||
SyncTriggerError::InvalidConfiguration => StatusCode::BAD_REQUEST,
|
|
||||||
SyncTriggerError::PermissionDenied => StatusCode::FORBIDDEN,
|
|
||||||
SyncTriggerError::RateLimited => StatusCode::TOO_MANY_REQUESTS,
|
|
||||||
SyncTriggerError::InternalError => StatusCode::INTERNAL_SERVER_ERROR,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_manual_sync_metrics() {
|
|
||||||
// Test tracking of manual sync triggers vs automatic syncs
|
|
||||||
let mut sync_metrics = ManualSyncMetrics::new();
|
|
||||||
let source_id = Uuid::new_v4();
|
|
||||||
|
|
||||||
// Record manual triggers
|
|
||||||
sync_metrics.record_manual_trigger(source_id);
|
|
||||||
sync_metrics.record_manual_trigger(source_id);
|
|
||||||
|
|
||||||
// Record automatic syncs
|
|
||||||
sync_metrics.record_automatic_sync(source_id);
|
|
||||||
|
|
||||||
let stats = sync_metrics.get_stats_for_source(&source_id);
|
|
||||||
assert_eq!(stats.manual_triggers, 2);
|
|
||||||
assert_eq!(stats.automatic_syncs, 1);
|
|
||||||
assert_eq!(stats.total_syncs(), 3);
|
|
||||||
|
|
||||||
let manual_ratio = stats.manual_trigger_ratio();
|
|
||||||
assert!((manual_ratio - 0.666).abs() < 0.01); // ~66.7%
|
|
||||||
}
|
|
||||||
|
|
||||||
struct ManualSyncMetrics {
|
|
||||||
manual_triggers: HashMap<Uuid, u32>,
|
|
||||||
automatic_syncs: HashMap<Uuid, u32>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl ManualSyncMetrics {
|
|
||||||
fn new() -> Self {
|
|
||||||
Self {
|
|
||||||
manual_triggers: HashMap::new(),
|
|
||||||
automatic_syncs: HashMap::new(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn record_manual_trigger(&mut self, source_id: Uuid) {
|
|
||||||
*self.manual_triggers.entry(source_id).or_insert(0) += 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
fn record_automatic_sync(&mut self, source_id: Uuid) {
|
|
||||||
*self.automatic_syncs.entry(source_id).or_insert(0) += 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
fn get_stats_for_source(&self, source_id: &Uuid) -> SyncStats {
|
|
||||||
SyncStats {
|
|
||||||
manual_triggers: self.manual_triggers.get(source_id).copied().unwrap_or(0),
|
|
||||||
automatic_syncs: self.automatic_syncs.get(source_id).copied().unwrap_or(0),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
struct SyncStats {
|
|
||||||
manual_triggers: u32,
|
|
||||||
automatic_syncs: u32,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl SyncStats {
|
|
||||||
fn total_syncs(&self) -> u32 {
|
|
||||||
self.manual_triggers + self.automatic_syncs
|
|
||||||
}
|
|
||||||
|
|
||||||
fn manual_trigger_ratio(&self) -> f64 {
|
|
||||||
if self.total_syncs() == 0 {
|
|
||||||
0.0
|
|
||||||
} else {
|
|
||||||
self.manual_triggers as f64 / self.total_syncs() as f64
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_sync_trigger_audit_logging() {
|
|
||||||
// Test audit logging for manual sync triggers
|
|
||||||
let mut audit_log = SyncAuditLog::new();
|
|
||||||
let user_id = Uuid::new_v4();
|
|
||||||
let source_id = Uuid::new_v4();
|
|
||||||
|
|
||||||
// Record successful trigger
|
|
||||||
audit_log.log_sync_trigger(SyncTriggerEvent {
|
|
||||||
user_id,
|
|
||||||
source_id,
|
|
||||||
timestamp: Utc::now(),
|
|
||||||
result: SyncTriggerResult::Success,
|
|
||||||
user_agent: Some("Mozilla/5.0 (Test Browser)".to_string()),
|
|
||||||
ip_address: Some("192.168.1.100".to_string()),
|
|
||||||
});
|
|
||||||
|
|
||||||
// Record failed trigger
|
|
||||||
audit_log.log_sync_trigger(SyncTriggerEvent {
|
|
||||||
user_id,
|
|
||||||
source_id,
|
|
||||||
timestamp: Utc::now(),
|
|
||||||
result: SyncTriggerResult::Failed("Already syncing".to_string()),
|
|
||||||
user_agent: Some("Mozilla/5.0 (Test Browser)".to_string()),
|
|
||||||
ip_address: Some("192.168.1.100".to_string()),
|
|
||||||
});
|
|
||||||
|
|
||||||
let events = audit_log.get_events_for_user(&user_id);
|
|
||||||
assert_eq!(events.len(), 2);
|
|
||||||
assert!(matches!(events[0].result, SyncTriggerResult::Success));
|
|
||||||
assert!(matches!(events[1].result, SyncTriggerResult::Failed(_)));
|
|
||||||
}
|
|
||||||
|
|
||||||
struct SyncAuditLog {
|
|
||||||
events: Vec<SyncTriggerEvent>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl SyncAuditLog {
|
|
||||||
fn new() -> Self {
|
|
||||||
Self {
|
|
||||||
events: Vec::new(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn log_sync_trigger(&mut self, event: SyncTriggerEvent) {
|
|
||||||
self.events.push(event);
|
|
||||||
}
|
|
||||||
|
|
||||||
fn get_events_for_user(&self, user_id: &Uuid) -> Vec<&SyncTriggerEvent> {
|
|
||||||
self.events.iter().filter(|e| e.user_id == *user_id).collect()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
|
||||||
struct SyncTriggerEvent {
|
|
||||||
user_id: Uuid,
|
|
||||||
source_id: Uuid,
|
|
||||||
timestamp: chrono::DateTime<Utc>,
|
|
||||||
result: SyncTriggerResult,
|
|
||||||
user_agent: Option<String>,
|
|
||||||
ip_address: Option<String>,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
|
||||||
enum SyncTriggerResult {
|
|
||||||
Success,
|
|
||||||
Failed(String),
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn test_sync_trigger_with_scheduler_integration() {
|
|
||||||
// Test integration with source scheduler
|
|
||||||
let state = create_test_app_state().await;
|
|
||||||
let user = create_test_user();
|
|
||||||
let source = create_test_source_with_status(SourceStatus::Idle, user.id);
|
|
||||||
|
|
||||||
// Test that trigger_sync method exists and handles the source
|
|
||||||
let sync_request = ManualSyncRequest {
|
|
||||||
source_id: source.id,
|
|
||||||
user_id: user.id,
|
|
||||||
force: false, // Don't force if already syncing
|
|
||||||
priority: SyncPriority::Normal,
|
|
||||||
};
|
|
||||||
|
|
||||||
// Simulate what the actual API would do
|
|
||||||
let can_proceed = validate_sync_request(&sync_request, &source);
|
|
||||||
assert!(can_proceed, "Valid sync request should be allowed");
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
|
||||||
struct ManualSyncRequest {
|
|
||||||
source_id: Uuid,
|
|
||||||
user_id: Uuid,
|
|
||||||
force: bool,
|
|
||||||
priority: SyncPriority,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
|
||||||
enum SyncPriority {
|
|
||||||
Low,
|
|
||||||
Normal,
|
|
||||||
High,
|
|
||||||
Urgent,
|
|
||||||
}
|
|
||||||
|
|
||||||
fn validate_sync_request(request: &ManualSyncRequest, source: &Source) -> bool {
|
|
||||||
// Check ownership
|
|
||||||
if request.user_id != source.user_id {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check if source is enabled
|
|
||||||
if !source.enabled {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check status (allow force override)
|
|
||||||
if !request.force && source.status == SourceStatus::Syncing {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
true
|
|
||||||
}
|
|
||||||
|
|
@ -1,227 +0,0 @@
|
||||||
/*!
|
|
||||||
* Moderate Stress Test - 25 Documents for Complete Verification
|
|
||||||
*/
|
|
||||||
|
|
||||||
use reqwest::Client;
|
|
||||||
use serde_json::Value;
|
|
||||||
use std::time::{Duration, Instant};
|
|
||||||
use tokio::time::sleep;
|
|
||||||
use uuid::Uuid;
|
|
||||||
use futures;
|
|
||||||
|
|
||||||
use readur::models::{DocumentResponse, CreateUser, LoginRequest, LoginResponse};
|
|
||||||
|
|
||||||
fn get_base_url() -> String {
|
|
||||||
std::env::var("API_URL").unwrap_or_else(|_| "http://localhost:8000".to_string())
|
|
||||||
}
|
|
||||||
const TIMEOUT: Duration = Duration::from_secs(120);
|
|
||||||
|
|
||||||
struct StressTester {
|
|
||||||
client: Client,
|
|
||||||
token: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl StressTester {
|
|
||||||
async fn new() -> Self {
|
|
||||||
let client = Client::new();
|
|
||||||
|
|
||||||
// Check server health
|
|
||||||
client.get(&format!("{}/api/health", get_base_url()))
|
|
||||||
.timeout(Duration::from_secs(5))
|
|
||||||
.send()
|
|
||||||
.await
|
|
||||||
.expect("Server should be running");
|
|
||||||
|
|
||||||
// Create test user
|
|
||||||
let timestamp = std::time::SystemTime::now()
|
|
||||||
.duration_since(std::time::UNIX_EPOCH)
|
|
||||||
.unwrap()
|
|
||||||
.as_millis();
|
|
||||||
let username = format!("stress_25_{}", timestamp);
|
|
||||||
let email = format!("stress_25_{}@test.com", timestamp);
|
|
||||||
|
|
||||||
// Register user
|
|
||||||
let user_data = CreateUser {
|
|
||||||
username: username.clone(),
|
|
||||||
email: email.clone(),
|
|
||||||
password: "testpass123".to_string(),
|
|
||||||
role: Some(readur::models::UserRole::User),
|
|
||||||
};
|
|
||||||
|
|
||||||
client.post(&format!("{}/api/auth/register", get_base_url()))
|
|
||||||
.json(&user_data)
|
|
||||||
.send()
|
|
||||||
.await
|
|
||||||
.expect("Registration should work");
|
|
||||||
|
|
||||||
// Login
|
|
||||||
let login_data = LoginRequest {
|
|
||||||
username: username.clone(),
|
|
||||||
password: "testpass123".to_string(),
|
|
||||||
};
|
|
||||||
|
|
||||||
let login_response = client
|
|
||||||
.post(&format!("{}/api/auth/login", get_base_url()))
|
|
||||||
.json(&login_data)
|
|
||||||
.send()
|
|
||||||
.await
|
|
||||||
.expect("Login should work");
|
|
||||||
|
|
||||||
let login_result: LoginResponse = login_response.json().await.expect("Login should return JSON");
|
|
||||||
let token = login_result.token;
|
|
||||||
|
|
||||||
println!("✅ Stress tester initialized");
|
|
||||||
|
|
||||||
Self { client, token }
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn upload_document(&self, content: &str, filename: &str) -> DocumentResponse {
|
|
||||||
let part = reqwest::multipart::Part::text(content.to_string())
|
|
||||||
.file_name(filename.to_string())
|
|
||||||
.mime_str("text/plain")
|
|
||||||
.expect("Valid mime type");
|
|
||||||
let form = reqwest::multipart::Form::new().part("file", part);
|
|
||||||
|
|
||||||
let response = self.client
|
|
||||||
.post(&format!("{}/api/documents", get_base_url()))
|
|
||||||
.header("Authorization", format!("Bearer {}", self.token))
|
|
||||||
.multipart(form)
|
|
||||||
.send()
|
|
||||||
.await
|
|
||||||
.expect("Upload should work");
|
|
||||||
|
|
||||||
response.json().await.expect("Valid JSON")
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn wait_for_ocr_completion(&self, document_ids: &[Uuid]) -> Vec<Value> {
|
|
||||||
let start = Instant::now();
|
|
||||||
|
|
||||||
while start.elapsed() < TIMEOUT {
|
|
||||||
let all_docs = self.get_all_documents().await;
|
|
||||||
let completed = all_docs.iter()
|
|
||||||
.filter(|doc| {
|
|
||||||
let doc_id_str = doc["id"].as_str().unwrap_or("");
|
|
||||||
let status = doc["ocr_status"].as_str().unwrap_or("");
|
|
||||||
document_ids.iter().any(|id| id.to_string() == doc_id_str) && status == "completed"
|
|
||||||
})
|
|
||||||
.count();
|
|
||||||
|
|
||||||
if completed == document_ids.len() {
|
|
||||||
return all_docs.into_iter()
|
|
||||||
.filter(|doc| {
|
|
||||||
let doc_id_str = doc["id"].as_str().unwrap_or("");
|
|
||||||
document_ids.iter().any(|id| id.to_string() == doc_id_str)
|
|
||||||
})
|
|
||||||
.collect();
|
|
||||||
}
|
|
||||||
|
|
||||||
sleep(Duration::from_millis(500)).await;
|
|
||||||
}
|
|
||||||
|
|
||||||
panic!("OCR processing did not complete within timeout");
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn get_all_documents(&self) -> Vec<Value> {
|
|
||||||
let response = self.client
|
|
||||||
.get(&format!("{}/api/documents", get_base_url()))
|
|
||||||
.header("Authorization", format!("Bearer {}", self.token))
|
|
||||||
.send()
|
|
||||||
.await
|
|
||||||
.expect("Documents endpoint should work");
|
|
||||||
|
|
||||||
let data: Value = response.json().await.expect("Valid JSON");
|
|
||||||
|
|
||||||
match data {
|
|
||||||
Value::Object(obj) if obj.contains_key("documents") => {
|
|
||||||
obj["documents"].as_array().unwrap_or(&vec![]).clone()
|
|
||||||
}
|
|
||||||
Value::Array(arr) => arr,
|
|
||||||
_ => vec![]
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn stress_test_25_documents() {
|
|
||||||
println!("🚀 MODERATE STRESS TEST: 25 DOCUMENTS");
|
|
||||||
println!("======================================");
|
|
||||||
|
|
||||||
let tester = StressTester::new().await;
|
|
||||||
|
|
||||||
// Create 25 documents with unique content
|
|
||||||
let mut documents = Vec::new();
|
|
||||||
for i in 1..=25 {
|
|
||||||
let content = format!("STRESS-DOC-{:02}-SIGNATURE-{:02}-UNIQUE-CONTENT", i, i);
|
|
||||||
let filename = format!("stress_{:02}.txt", i);
|
|
||||||
documents.push((content, filename));
|
|
||||||
}
|
|
||||||
|
|
||||||
println!("📊 Testing {} documents concurrently", documents.len());
|
|
||||||
|
|
||||||
// Phase 1: Upload all documents concurrently
|
|
||||||
println!("\n🏁 UPLOADING...");
|
|
||||||
let upload_start = Instant::now();
|
|
||||||
|
|
||||||
let uploaded_docs = futures::future::join_all(
|
|
||||||
documents.iter().map(|(content, filename)| {
|
|
||||||
tester.upload_document(content, filename)
|
|
||||||
}).collect::<Vec<_>>()
|
|
||||||
).await;
|
|
||||||
|
|
||||||
let upload_duration = upload_start.elapsed();
|
|
||||||
println!("✅ {} uploads completed in {:?}", uploaded_docs.len(), upload_duration);
|
|
||||||
|
|
||||||
// Phase 2: Wait for OCR completion
|
|
||||||
println!("\n🔬 PROCESSING OCR...");
|
|
||||||
let processing_start = Instant::now();
|
|
||||||
let document_ids: Vec<Uuid> = uploaded_docs.iter().map(|doc| doc.id).collect();
|
|
||||||
|
|
||||||
let final_docs = tester.wait_for_ocr_completion(&document_ids).await;
|
|
||||||
let processing_duration = processing_start.elapsed();
|
|
||||||
println!("✅ OCR processing completed in {:?}", processing_duration);
|
|
||||||
|
|
||||||
// Phase 3: Corruption Analysis
|
|
||||||
println!("\n📊 VERIFYING RESULTS...");
|
|
||||||
let mut successful = 0;
|
|
||||||
let mut corrupted = 0;
|
|
||||||
let mut corruption_details = Vec::new();
|
|
||||||
|
|
||||||
for (i, doc) in final_docs.iter().enumerate() {
|
|
||||||
let expected_content = &documents[i].0;
|
|
||||||
let actual_text = doc["ocr_text"].as_str().unwrap_or("");
|
|
||||||
let doc_id = doc["id"].as_str().unwrap_or("");
|
|
||||||
|
|
||||||
if actual_text == expected_content {
|
|
||||||
successful += 1;
|
|
||||||
} else {
|
|
||||||
corrupted += 1;
|
|
||||||
corruption_details.push((doc_id.to_string(), expected_content.clone(), actual_text.to_string()));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Final Results
|
|
||||||
println!("\n🏆 STRESS TEST RESULTS");
|
|
||||||
println!("======================");
|
|
||||||
println!("📊 Total Documents: {}", documents.len());
|
|
||||||
println!("✅ Successful: {}", successful);
|
|
||||||
println!("❌ Corrupted: {}", corrupted);
|
|
||||||
println!("📈 Success Rate: {:.1}%", (successful as f64 / documents.len() as f64) * 100.0);
|
|
||||||
println!("⏱️ Upload Time: {:?}", upload_duration);
|
|
||||||
println!("⏱️ OCR Time: {:?}", processing_duration);
|
|
||||||
println!("⏱️ Total Time: {:?}", upload_duration + processing_duration);
|
|
||||||
|
|
||||||
if corrupted == 0 {
|
|
||||||
println!("\n🎉 STRESS TEST PASSED!");
|
|
||||||
println!("🎯 ALL {} DOCUMENTS PROCESSED WITHOUT CORRUPTION!", documents.len());
|
|
||||||
println!("🚀 HIGH CONCURRENCY OCR CORRUPTION ISSUE IS FULLY RESOLVED!");
|
|
||||||
} else {
|
|
||||||
println!("\n🚨 STRESS TEST FAILED!");
|
|
||||||
println!("❌ CORRUPTION DETECTED IN {} DOCUMENTS:", corrupted);
|
|
||||||
|
|
||||||
for (doc_id, expected, actual) in &corruption_details {
|
|
||||||
println!(" 📄 {}: expected '{}' got '{}'", doc_id, expected, actual);
|
|
||||||
}
|
|
||||||
|
|
||||||
panic!("CORRUPTION DETECTED in {} out of {} documents", corrupted, documents.len());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
@ -1,269 +0,0 @@
|
||||||
/*!
|
|
||||||
* Simple High-Concurrency Stress Test - Focus on Results Only
|
|
||||||
*/
|
|
||||||
|
|
||||||
use reqwest::Client;
|
|
||||||
use serde_json::Value;
|
|
||||||
use std::time::{Duration, Instant};
|
|
||||||
use tokio::time::sleep;
|
|
||||||
use uuid::Uuid;
|
|
||||||
use futures;
|
|
||||||
|
|
||||||
use readur::models::{DocumentResponse, CreateUser, LoginRequest, LoginResponse};
|
|
||||||
|
|
||||||
fn get_base_url() -> String {
|
|
||||||
std::env::var("API_URL").unwrap_or_else(|_| "http://localhost:8000".to_string())
|
|
||||||
}
|
|
||||||
const TIMEOUT: Duration = Duration::from_secs(180);
|
|
||||||
|
|
||||||
struct SimpleStressTester {
|
|
||||||
client: Client,
|
|
||||||
token: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl SimpleStressTester {
|
|
||||||
async fn new() -> Self {
|
|
||||||
let client = Client::new();
|
|
||||||
|
|
||||||
// Check server health
|
|
||||||
let response = client
|
|
||||||
.get(&format!("{}/api/health", get_base_url()))
|
|
||||||
.timeout(Duration::from_secs(5))
|
|
||||||
.send()
|
|
||||||
.await
|
|
||||||
.expect("Server should be running");
|
|
||||||
|
|
||||||
if !response.status().is_success() {
|
|
||||||
panic!("Server not healthy");
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create test user
|
|
||||||
let timestamp = std::time::SystemTime::now()
|
|
||||||
.duration_since(std::time::UNIX_EPOCH)
|
|
||||||
.unwrap()
|
|
||||||
.as_millis();
|
|
||||||
let username = format!("stress_test_{}", timestamp);
|
|
||||||
let email = format!("stress_test_{}@test.com", timestamp);
|
|
||||||
|
|
||||||
// Register user
|
|
||||||
let user_data = CreateUser {
|
|
||||||
username: username.clone(),
|
|
||||||
email: email.clone(),
|
|
||||||
password: "testpass123".to_string(),
|
|
||||||
role: Some(readur::models::UserRole::User),
|
|
||||||
};
|
|
||||||
|
|
||||||
let register_response = client
|
|
||||||
.post(&format!("{}/api/auth/register", get_base_url()))
|
|
||||||
.json(&user_data)
|
|
||||||
.send()
|
|
||||||
.await
|
|
||||||
.expect("Registration should work");
|
|
||||||
|
|
||||||
if !register_response.status().is_success() {
|
|
||||||
panic!("Registration failed: {}", register_response.text().await.unwrap_or_default());
|
|
||||||
}
|
|
||||||
|
|
||||||
// Login
|
|
||||||
let login_data = LoginRequest {
|
|
||||||
username: username.clone(),
|
|
||||||
password: "testpass123".to_string(),
|
|
||||||
};
|
|
||||||
|
|
||||||
let login_response = client
|
|
||||||
.post(&format!("{}/api/auth/login", get_base_url()))
|
|
||||||
.json(&login_data)
|
|
||||||
.send()
|
|
||||||
.await
|
|
||||||
.expect("Login should work");
|
|
||||||
|
|
||||||
if !login_response.status().is_success() {
|
|
||||||
panic!("Login failed: {}", login_response.text().await.unwrap_or_default());
|
|
||||||
}
|
|
||||||
|
|
||||||
let login_result: LoginResponse = login_response.json().await.expect("Login should return JSON");
|
|
||||||
let token = login_result.token;
|
|
||||||
|
|
||||||
println!("✅ Stress tester initialized for user: {}", username);
|
|
||||||
|
|
||||||
Self { client, token }
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn upload_document(&self, content: &str, filename: &str) -> DocumentResponse {
|
|
||||||
let part = reqwest::multipart::Part::text(content.to_string())
|
|
||||||
.file_name(filename.to_string())
|
|
||||||
.mime_str("text/plain")
|
|
||||||
.expect("Valid mime type");
|
|
||||||
let form = reqwest::multipart::Form::new().part("file", part);
|
|
||||||
|
|
||||||
let response = self.client
|
|
||||||
.post(&format!("{}/api/documents", get_base_url()))
|
|
||||||
.header("Authorization", format!("Bearer {}", self.token))
|
|
||||||
.multipart(form)
|
|
||||||
.send()
|
|
||||||
.await
|
|
||||||
.expect("Upload should work");
|
|
||||||
|
|
||||||
if !response.status().is_success() {
|
|
||||||
panic!("Upload failed: {}", response.text().await.unwrap_or_default());
|
|
||||||
}
|
|
||||||
|
|
||||||
response.json().await.expect("Valid JSON")
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn wait_for_completion(&self, document_ids: &[Uuid]) -> Vec<Value> {
|
|
||||||
let start = Instant::now();
|
|
||||||
let mut last_completed = 0;
|
|
||||||
|
|
||||||
while start.elapsed() < TIMEOUT {
|
|
||||||
let all_docs = self.get_all_documents().await;
|
|
||||||
let completed = all_docs.iter()
|
|
||||||
.filter(|doc| {
|
|
||||||
let doc_id_str = doc["id"].as_str().unwrap_or("");
|
|
||||||
let status = doc["ocr_status"].as_str().unwrap_or("");
|
|
||||||
document_ids.iter().any(|id| id.to_string() == doc_id_str) && status == "completed"
|
|
||||||
})
|
|
||||||
.count();
|
|
||||||
|
|
||||||
if completed != last_completed {
|
|
||||||
last_completed = completed;
|
|
||||||
let progress = (completed as f64 / document_ids.len() as f64) * 100.0;
|
|
||||||
println!(" 📊 Progress: {}/{} documents completed ({:.1}%)",
|
|
||||||
completed, document_ids.len(), progress);
|
|
||||||
}
|
|
||||||
|
|
||||||
if completed == document_ids.len() {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
sleep(Duration::from_secs(1)).await;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Get final results
|
|
||||||
let all_docs = self.get_all_documents().await;
|
|
||||||
all_docs.into_iter()
|
|
||||||
.filter(|doc| {
|
|
||||||
let doc_id_str = doc["id"].as_str().unwrap_or("");
|
|
||||||
document_ids.iter().any(|id| id.to_string() == doc_id_str)
|
|
||||||
})
|
|
||||||
.collect()
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn get_all_documents(&self) -> Vec<Value> {
|
|
||||||
let response = self.client
|
|
||||||
.get(&format!("{}/api/documents", get_base_url()))
|
|
||||||
.header("Authorization", format!("Bearer {}", self.token))
|
|
||||||
.send()
|
|
||||||
.await
|
|
||||||
.expect("Documents endpoint should work");
|
|
||||||
|
|
||||||
if !response.status().is_success() {
|
|
||||||
panic!("Failed to get documents: {}", response.status());
|
|
||||||
}
|
|
||||||
|
|
||||||
let data: Value = response.json().await.expect("Valid JSON");
|
|
||||||
|
|
||||||
match data {
|
|
||||||
Value::Object(obj) if obj.contains_key("documents") => {
|
|
||||||
obj["documents"].as_array().unwrap_or(&vec![]).clone()
|
|
||||||
}
|
|
||||||
Value::Array(arr) => arr,
|
|
||||||
_ => vec![]
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn stress_test_50_plus_documents() {
|
|
||||||
println!("🚀 EXTREME STRESS TEST: 50+ DOCUMENTS");
|
|
||||||
println!("=====================================");
|
|
||||||
|
|
||||||
let tester = SimpleStressTester::new().await;
|
|
||||||
|
|
||||||
// Create 50+ documents with unique content
|
|
||||||
let mut documents = Vec::new();
|
|
||||||
for i in 1..=55 {
|
|
||||||
let content = format!("STRESS-TEST-DOCUMENT-{:03}-UNIQUE-SIGNATURE-{:03}", i, i);
|
|
||||||
let filename = format!("stress_test_{:03}.txt", i);
|
|
||||||
documents.push((content, filename));
|
|
||||||
}
|
|
||||||
|
|
||||||
println!("📊 Total Documents: {}", documents.len());
|
|
||||||
|
|
||||||
// Phase 1: Upload all documents concurrently
|
|
||||||
println!("\n🏁 PHASE 1: SIMULTANEOUS UPLOAD");
|
|
||||||
let upload_start = Instant::now();
|
|
||||||
|
|
||||||
let uploaded_docs = futures::future::join_all(
|
|
||||||
documents.iter().map(|(content, filename)| {
|
|
||||||
tester.upload_document(content, filename)
|
|
||||||
}).collect::<Vec<_>>()
|
|
||||||
).await;
|
|
||||||
|
|
||||||
let upload_duration = upload_start.elapsed();
|
|
||||||
println!("✅ All {} documents uploaded in {:?}", uploaded_docs.len(), upload_duration);
|
|
||||||
|
|
||||||
// Phase 2: Wait for OCR completion
|
|
||||||
println!("\n🔬 PHASE 2: OCR PROCESSING");
|
|
||||||
let processing_start = Instant::now();
|
|
||||||
let document_ids: Vec<Uuid> = uploaded_docs.iter().map(|doc| doc.id).collect();
|
|
||||||
|
|
||||||
let final_docs = tester.wait_for_completion(&document_ids).await;
|
|
||||||
let processing_duration = processing_start.elapsed();
|
|
||||||
println!("✅ All OCR processing completed in {:?}", processing_duration);
|
|
||||||
|
|
||||||
// Phase 3: Corruption Analysis
|
|
||||||
println!("\n📊 PHASE 3: CORRUPTION ANALYSIS");
|
|
||||||
let mut successful = 0;
|
|
||||||
let mut corrupted = 0;
|
|
||||||
let mut corrupted_details = Vec::new();
|
|
||||||
|
|
||||||
for (i, doc) in final_docs.iter().enumerate() {
|
|
||||||
let expected_content = &documents[i].0;
|
|
||||||
let actual_text = doc["ocr_text"].as_str().unwrap_or("");
|
|
||||||
let status = doc["ocr_status"].as_str().unwrap_or("");
|
|
||||||
let doc_id = doc["id"].as_str().unwrap_or("");
|
|
||||||
|
|
||||||
if status == "completed" {
|
|
||||||
if actual_text == expected_content {
|
|
||||||
successful += 1;
|
|
||||||
} else {
|
|
||||||
corrupted += 1;
|
|
||||||
corrupted_details.push((doc_id.to_string(), expected_content.clone(), actual_text.to_string()));
|
|
||||||
|
|
||||||
// Only show first few corruption details to avoid spam
|
|
||||||
if corrupted <= 3 {
|
|
||||||
println!(" ❌ CORRUPTION: {} expected '{}' got '{}'", doc_id, expected_content, actual_text);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
println!(" ⚠️ NON-COMPLETED: {} status={}", doc_id, status);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Final Results
|
|
||||||
println!("\n🏆 FINAL RESULTS");
|
|
||||||
println!("================");
|
|
||||||
println!("📊 Total Documents: {}", documents.len());
|
|
||||||
println!("✅ Successful: {}", successful);
|
|
||||||
println!("❌ Corrupted: {}", corrupted);
|
|
||||||
println!("📈 Success Rate: {:.1}%", (successful as f64 / documents.len() as f64) * 100.0);
|
|
||||||
println!("⏱️ Total Time: {:?}", upload_duration + processing_duration);
|
|
||||||
|
|
||||||
if corrupted == 0 {
|
|
||||||
println!("🎉 NO CORRUPTION DETECTED! ALL {} DOCUMENTS PROCESSED CORRECTLY!", documents.len());
|
|
||||||
} else {
|
|
||||||
println!("🚨 CORRUPTION DETECTED IN {} DOCUMENTS:", corrupted);
|
|
||||||
|
|
||||||
// Analyze corruption patterns
|
|
||||||
if corrupted_details.iter().all(|(_, _, actual)| actual.is_empty()) {
|
|
||||||
println!("🔍 PATTERN: All corrupted documents have EMPTY content");
|
|
||||||
} else if corrupted_details.len() > 1 && corrupted_details.iter().all(|(_, _, actual)| actual == &corrupted_details[0].2) {
|
|
||||||
println!("🔍 PATTERN: All corrupted documents have IDENTICAL content: '{}'", corrupted_details[0].2);
|
|
||||||
} else {
|
|
||||||
println!("🔍 PATTERN: Mixed corruption types detected");
|
|
||||||
}
|
|
||||||
|
|
||||||
panic!("CORRUPTION DETECTED in {} out of {} documents", corrupted, documents.len());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
@ -1,409 +0,0 @@
|
||||||
/*!
|
|
||||||
* Throttled High Concurrency OCR Test
|
|
||||||
*
|
|
||||||
* This test verifies that our new throttling mechanism properly handles
|
|
||||||
* high concurrency scenarios (50+ documents) without database connection
|
|
||||||
* pool exhaustion or corrupting OCR results.
|
|
||||||
*/
|
|
||||||
|
|
||||||
use anyhow::Result;
|
|
||||||
use sqlx::{PgPool, Row};
|
|
||||||
use std::sync::Arc;
|
|
||||||
use tokio::time::{Duration, Instant};
|
|
||||||
use tracing::{info, warn, error};
|
|
||||||
use uuid::Uuid;
|
|
||||||
|
|
||||||
use readur::{
|
|
||||||
config::Config,
|
|
||||||
db::Database,
|
|
||||||
models::{Document, Settings},
|
|
||||||
file_service::FileService,
|
|
||||||
enhanced_ocr::EnhancedOcrService,
|
|
||||||
ocr_queue::OcrQueueService,
|
|
||||||
db_guardrails_simple::DocumentTransactionManager,
|
|
||||||
request_throttler::RequestThrottler,
|
|
||||||
};
|
|
||||||
|
|
||||||
fn get_test_db_url() -> String {
|
|
||||||
std::env::var("DATABASE_URL")
|
|
||||||
.or_else(|_| std::env::var("TEST_DATABASE_URL"))
|
|
||||||
.unwrap_or_else(|_| "postgresql://postgres:postgres@localhost:5432/readur_test".to_string())
|
|
||||||
}
|
|
||||||
|
|
||||||
struct ThrottledTestHarness {
|
|
||||||
db: Database,
|
|
||||||
pool: PgPool,
|
|
||||||
file_service: FileService,
|
|
||||||
queue_service: Arc<OcrQueueService>,
|
|
||||||
transaction_manager: DocumentTransactionManager,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl ThrottledTestHarness {
|
|
||||||
async fn new() -> Result<Self> {
|
|
||||||
// Initialize database with proper connection limits
|
|
||||||
let pool = sqlx::postgres::PgPoolOptions::new()
|
|
||||||
.max_connections(30) // Higher limit for stress testing
|
|
||||||
.acquire_timeout(std::time::Duration::from_secs(15))
|
|
||||||
.connect(&get_test_db_url())
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
let db = Database::new(&get_test_db_url()).await?;
|
|
||||||
|
|
||||||
// Initialize services
|
|
||||||
let file_service = FileService::new("./test_uploads".to_string());
|
|
||||||
|
|
||||||
// Create throttled queue service - this is the key improvement
|
|
||||||
let queue_service = Arc::new(OcrQueueService::new(
|
|
||||||
db.clone(),
|
|
||||||
pool.clone(),
|
|
||||||
15 // Limit to 15 concurrent OCR jobs to prevent DB pool exhaustion
|
|
||||||
));
|
|
||||||
|
|
||||||
let transaction_manager = DocumentTransactionManager::new(pool.clone());
|
|
||||||
|
|
||||||
// Ensure test upload directory exists
|
|
||||||
std::fs::create_dir_all("./test_uploads").unwrap_or_default();
|
|
||||||
|
|
||||||
Ok(Self {
|
|
||||||
db,
|
|
||||||
pool,
|
|
||||||
file_service,
|
|
||||||
queue_service,
|
|
||||||
transaction_manager,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn create_test_user(&self) -> Result<Uuid> {
|
|
||||||
let user_id = Uuid::new_v4();
|
|
||||||
let timestamp = std::time::SystemTime::now()
|
|
||||||
.duration_since(std::time::UNIX_EPOCH)
|
|
||||||
.unwrap()
|
|
||||||
.as_millis();
|
|
||||||
|
|
||||||
sqlx::query(
|
|
||||||
r#"
|
|
||||||
INSERT INTO users (id, username, email, password_hash, role)
|
|
||||||
VALUES ($1, $2, $3, $4, 'user')
|
|
||||||
"#
|
|
||||||
)
|
|
||||||
.bind(user_id)
|
|
||||||
.bind(format!("throttle_test_user_{}", timestamp))
|
|
||||||
.bind(format!("throttle_test_{}@example.com", timestamp))
|
|
||||||
.bind("dummy_hash")
|
|
||||||
.execute(&self.pool)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
info!("✅ Created test user: {}", user_id);
|
|
||||||
Ok(user_id)
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn create_test_documents(&self, user_id: Uuid, count: usize) -> Result<Vec<(Uuid, String)>> {
|
|
||||||
let mut documents = Vec::new();
|
|
||||||
|
|
||||||
info!("📝 Creating {} test documents", count);
|
|
||||||
|
|
||||||
for i in 1..=count {
|
|
||||||
let content = format!("THROTTLE-TEST-DOC-{:03}-UNIQUE-CONTENT-{}", i, Uuid::new_v4());
|
|
||||||
let filename = format!("throttle_test_{:03}.txt", i);
|
|
||||||
let doc_id = Uuid::new_v4();
|
|
||||||
let file_path = format!("./test_uploads/{}.txt", doc_id);
|
|
||||||
|
|
||||||
// Write content to file
|
|
||||||
tokio::fs::write(&file_path, &content).await?;
|
|
||||||
|
|
||||||
// Create document record
|
|
||||||
sqlx::query(
|
|
||||||
r#"
|
|
||||||
INSERT INTO documents (
|
|
||||||
id, filename, original_filename, file_path, file_size,
|
|
||||||
mime_type, content, user_id, ocr_status, created_at, updated_at
|
|
||||||
)
|
|
||||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, 'pending', NOW(), NOW())
|
|
||||||
"#
|
|
||||||
)
|
|
||||||
.bind(doc_id)
|
|
||||||
.bind(&filename)
|
|
||||||
.bind(&filename)
|
|
||||||
.bind(&file_path)
|
|
||||||
.bind(content.len() as i64)
|
|
||||||
.bind("text/plain")
|
|
||||||
.bind(&content)
|
|
||||||
.bind(user_id)
|
|
||||||
.execute(&self.pool)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
// Enqueue for OCR processing with random priority
|
|
||||||
let priority = 10 - (i % 5) as i32; // Priorities from 5-10
|
|
||||||
self.queue_service.enqueue_document(doc_id, priority, content.len() as i64).await?;
|
|
||||||
|
|
||||||
documents.push((doc_id, content));
|
|
||||||
|
|
||||||
if i % 10 == 0 {
|
|
||||||
info!(" ✅ Created {} documents so far", i);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
info!("✅ All {} documents created and enqueued", count);
|
|
||||||
Ok(documents)
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn start_throttled_workers(&self, num_workers: usize) -> Result<()> {
|
|
||||||
info!("🏭 Starting {} throttled OCR workers", num_workers);
|
|
||||||
|
|
||||||
let mut handles = Vec::new();
|
|
||||||
|
|
||||||
for worker_num in 1..=num_workers {
|
|
||||||
let queue_service = self.queue_service.clone();
|
|
||||||
|
|
||||||
let handle = tokio::spawn(async move {
|
|
||||||
let worker_id = format!("throttled-worker-{}", worker_num);
|
|
||||||
info!("Worker {} starting", worker_id);
|
|
||||||
|
|
||||||
// Each worker runs for a limited time to avoid infinite loops
|
|
||||||
let start_time = Instant::now();
|
|
||||||
let max_runtime = Duration::from_secs(300); // 5 minutes max
|
|
||||||
|
|
||||||
// Run a simplified worker loop instead of calling start_worker
|
|
||||||
// start_worker() consumes the Arc<Self>, so we can't call it multiple times
|
|
||||||
loop {
|
|
||||||
if start_time.elapsed() > max_runtime {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Process a single job if available
|
|
||||||
match queue_service.dequeue().await {
|
|
||||||
Ok(Some(item)) => {
|
|
||||||
info!("Worker {} processing job {}", worker_id, item.id);
|
|
||||||
// Process item using the built-in throttling
|
|
||||||
let ocr_service = readur::enhanced_ocr::EnhancedOcrService::new("/tmp".to_string());
|
|
||||||
if let Err(e) = queue_service.process_item(item, &ocr_service).await {
|
|
||||||
error!("Worker {} processing error: {}", worker_id, e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(None) => {
|
|
||||||
// No jobs available, wait a bit
|
|
||||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
error!("Worker {} dequeue error: {}", worker_id, e);
|
|
||||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
info!("Worker {} completed", worker_id);
|
|
||||||
});
|
|
||||||
|
|
||||||
handles.push(handle);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Don't wait for all workers to complete - they run in background
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn wait_for_completion(&self, expected_docs: usize, timeout_minutes: u64) -> Result<()> {
|
|
||||||
let start_time = Instant::now();
|
|
||||||
let timeout = Duration::from_secs(timeout_minutes * 60);
|
|
||||||
|
|
||||||
info!("⏳ Waiting for {} documents to complete (timeout: {} minutes)", expected_docs, timeout_minutes);
|
|
||||||
|
|
||||||
loop {
|
|
||||||
if start_time.elapsed() > timeout {
|
|
||||||
warn!("⏰ Timeout reached waiting for OCR completion");
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check completion status
|
|
||||||
let completed_count: i64 = sqlx::query_scalar(
|
|
||||||
"SELECT COUNT(*) FROM documents WHERE ocr_status = 'completed'"
|
|
||||||
)
|
|
||||||
.fetch_one(&self.pool)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
let failed_count: i64 = sqlx::query_scalar(
|
|
||||||
"SELECT COUNT(*) FROM documents WHERE ocr_status = 'failed'"
|
|
||||||
)
|
|
||||||
.fetch_one(&self.pool)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
let processing_count: i64 = sqlx::query_scalar(
|
|
||||||
"SELECT COUNT(*) FROM documents WHERE ocr_status = 'processing'"
|
|
||||||
)
|
|
||||||
.fetch_one(&self.pool)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
let pending_count: i64 = sqlx::query_scalar(
|
|
||||||
"SELECT COUNT(*) FROM documents WHERE ocr_status = 'pending'"
|
|
||||||
)
|
|
||||||
.fetch_one(&self.pool)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
info!("📊 Status: {} completed, {} failed, {} processing, {} pending",
|
|
||||||
completed_count, failed_count, processing_count, pending_count);
|
|
||||||
|
|
||||||
if completed_count + failed_count >= expected_docs as i64 {
|
|
||||||
info!("✅ All documents have been processed!");
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
tokio::time::sleep(Duration::from_secs(5)).await;
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn verify_results(&self, expected_documents: &[(Uuid, String)]) -> Result<ThrottleTestResults> {
|
|
||||||
info!("🔍 Verifying OCR results for {} documents", expected_documents.len());
|
|
||||||
|
|
||||||
let mut results = ThrottleTestResults {
|
|
||||||
total_documents: expected_documents.len(),
|
|
||||||
completed: 0,
|
|
||||||
failed: 0,
|
|
||||||
corrupted: 0,
|
|
||||||
empty_content: 0,
|
|
||||||
correct_content: 0,
|
|
||||||
};
|
|
||||||
|
|
||||||
for (doc_id, expected_content) in expected_documents {
|
|
||||||
let row = sqlx::query(
|
|
||||||
r#"
|
|
||||||
SELECT ocr_status, ocr_text, ocr_error, filename
|
|
||||||
FROM documents
|
|
||||||
WHERE id = $1
|
|
||||||
"#
|
|
||||||
)
|
|
||||||
.bind(doc_id)
|
|
||||||
.fetch_one(&self.pool)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
let status: Option<String> = row.get("ocr_status");
|
|
||||||
let ocr_text: Option<String> = row.get("ocr_text");
|
|
||||||
let ocr_error: Option<String> = row.get("ocr_error");
|
|
||||||
let filename: String = row.get("filename");
|
|
||||||
|
|
||||||
match status.as_deref() {
|
|
||||||
Some("completed") => {
|
|
||||||
results.completed += 1;
|
|
||||||
|
|
||||||
match ocr_text.as_deref() {
|
|
||||||
Some(text) if text.is_empty() => {
|
|
||||||
warn!("❌ Document {} ({}) has empty OCR content", doc_id, filename);
|
|
||||||
results.empty_content += 1;
|
|
||||||
}
|
|
||||||
Some(text) if text == expected_content => {
|
|
||||||
results.correct_content += 1;
|
|
||||||
}
|
|
||||||
Some(text) => {
|
|
||||||
warn!("❌ Document {} ({}) has corrupted content:", doc_id, filename);
|
|
||||||
warn!(" Expected: {}", expected_content);
|
|
||||||
warn!(" Got: {}", text);
|
|
||||||
results.corrupted += 1;
|
|
||||||
}
|
|
||||||
None => {
|
|
||||||
warn!("❌ Document {} ({}) has NULL OCR content", doc_id, filename);
|
|
||||||
results.empty_content += 1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Some("failed") => {
|
|
||||||
results.failed += 1;
|
|
||||||
info!("⚠️ Document {} ({}) failed: {}", doc_id, filename,
|
|
||||||
ocr_error.as_deref().unwrap_or("Unknown error"));
|
|
||||||
}
|
|
||||||
other => {
|
|
||||||
warn!("❓ Document {} ({}) has unexpected status: {:?}", doc_id, filename, other);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(results)
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn cleanup(&self) -> Result<()> {
|
|
||||||
// Clean up test files
|
|
||||||
let _ = tokio::fs::remove_dir_all("./test_uploads").await;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
struct ThrottleTestResults {
|
|
||||||
total_documents: usize,
|
|
||||||
completed: usize,
|
|
||||||
failed: usize,
|
|
||||||
corrupted: usize,
|
|
||||||
empty_content: usize,
|
|
||||||
correct_content: usize,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl ThrottleTestResults {
|
|
||||||
fn success_rate(&self) -> f64 {
|
|
||||||
if self.total_documents == 0 { return 0.0; }
|
|
||||||
(self.correct_content as f64 / self.total_documents as f64) * 100.0
|
|
||||||
}
|
|
||||||
|
|
||||||
fn completion_rate(&self) -> f64 {
|
|
||||||
if self.total_documents == 0 { return 0.0; }
|
|
||||||
((self.completed + self.failed) as f64 / self.total_documents as f64) * 100.0
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn test_throttled_high_concurrency_50_documents() {
|
|
||||||
println!("🚀 THROTTLED HIGH CONCURRENCY TEST - 50 DOCUMENTS");
|
|
||||||
println!("================================================");
|
|
||||||
|
|
||||||
let harness = ThrottledTestHarness::new().await
|
|
||||||
.expect("Failed to initialize throttled test harness");
|
|
||||||
|
|
||||||
// Create test user
|
|
||||||
let user_id = harness.create_test_user().await
|
|
||||||
.expect("Failed to create test user");
|
|
||||||
|
|
||||||
// Create 50 test documents
|
|
||||||
let document_count = 50;
|
|
||||||
let test_documents = harness.create_test_documents(user_id, document_count).await
|
|
||||||
.expect("Failed to create test documents");
|
|
||||||
|
|
||||||
// Start multiple throttled workers
|
|
||||||
harness.start_throttled_workers(5).await
|
|
||||||
.expect("Failed to start throttled workers");
|
|
||||||
|
|
||||||
// Wait for completion with generous timeout
|
|
||||||
harness.wait_for_completion(document_count, 10).await
|
|
||||||
.expect("Failed to wait for completion");
|
|
||||||
|
|
||||||
// Verify results
|
|
||||||
let results = harness.verify_results(&test_documents).await
|
|
||||||
.expect("Failed to verify results");
|
|
||||||
|
|
||||||
// Cleanup
|
|
||||||
harness.cleanup().await.expect("Failed to cleanup");
|
|
||||||
|
|
||||||
// Print detailed results
|
|
||||||
println!("\n🏆 THROTTLED TEST RESULTS:");
|
|
||||||
println!("========================");
|
|
||||||
println!("📊 Total Documents: {}", results.total_documents);
|
|
||||||
println!("✅ Completed: {}", results.completed);
|
|
||||||
println!("❌ Failed: {}", results.failed);
|
|
||||||
println!("🔧 Correct Content: {}", results.correct_content);
|
|
||||||
println!("🚫 Empty Content: {}", results.empty_content);
|
|
||||||
println!("💥 Corrupted Content: {}", results.corrupted);
|
|
||||||
println!("📈 Success Rate: {:.1}%", results.success_rate());
|
|
||||||
println!("📊 Completion Rate: {:.1}%", results.completion_rate());
|
|
||||||
|
|
||||||
// Assertions
|
|
||||||
assert!(results.completion_rate() >= 90.0,
|
|
||||||
"Completion rate too low: {:.1}% (expected >= 90%)", results.completion_rate());
|
|
||||||
|
|
||||||
assert!(results.empty_content == 0,
|
|
||||||
"Found {} documents with empty content (should be 0 with throttling)", results.empty_content);
|
|
||||||
|
|
||||||
assert!(results.corrupted == 0,
|
|
||||||
"Found {} documents with corrupted content (should be 0 with throttling)", results.corrupted);
|
|
||||||
|
|
||||||
assert!(results.success_rate() >= 80.0,
|
|
||||||
"Success rate too low: {:.1}% (expected >= 80%)", results.success_rate());
|
|
||||||
|
|
||||||
println!("🎉 Throttled high concurrency test PASSED!");
|
|
||||||
}
|
|
||||||
Loading…
Reference in New Issue