Readur/tests/integration_performance_loa...

773 lines
30 KiB
Rust
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/*!
* Performance and Load Testing Integration Tests
*
* Tests system performance under various load conditions including:
* - High-volume document uploads
* - Concurrent user operations
* - Database query performance
* - OCR processing throughput
* - Search performance with large datasets
* - Memory and resource usage patterns
* - Response time consistency
* - System scalability limits
*/
use reqwest::Client;
use serde_json::Value;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::Semaphore;
use tokio::time::sleep;
use uuid::Uuid;
use chrono;
use readur::models::{CreateUser, LoginRequest, LoginResponse, UserRole};
use readur::routes::documents::types::{DocumentUploadResponse, PaginatedDocumentsResponse};
fn get_base_url() -> String {
std::env::var("API_URL").unwrap_or_else(|_| "http://localhost:8000".to_string())
}
const LOAD_TEST_TIMEOUT: Duration = Duration::from_secs(300); // 5 minutes for load tests
/// Performance metrics tracker
#[derive(Debug, Clone)]
struct PerformanceMetrics {
total_requests: usize,
successful_requests: usize,
failed_requests: usize,
total_duration: Duration,
min_response_time: Duration,
max_response_time: Duration,
response_times: Vec<Duration>,
}
impl PerformanceMetrics {
fn new() -> Self {
Self {
total_requests: 0,
successful_requests: 0,
failed_requests: 0,
total_duration: Duration::ZERO,
min_response_time: Duration::from_secs(u64::MAX),
max_response_time: Duration::ZERO,
response_times: Vec::new(),
}
}
fn add_result(&mut self, success: bool, response_time: Duration) {
self.total_requests += 1;
if success {
self.successful_requests += 1;
} else {
self.failed_requests += 1;
}
self.response_times.push(response_time);
self.total_duration += response_time;
if response_time < self.min_response_time {
self.min_response_time = response_time;
}
if response_time > self.max_response_time {
self.max_response_time = response_time;
}
}
fn average_response_time(&self) -> Duration {
if self.total_requests > 0 {
self.total_duration / self.total_requests as u32
} else {
Duration::ZERO
}
}
fn percentile(&self, p: f64) -> Duration {
if self.response_times.is_empty() {
return Duration::ZERO;
}
let mut sorted_times = self.response_times.clone();
sorted_times.sort();
let index = ((sorted_times.len() as f64 - 1.0) * p / 100.0).round() as usize;
sorted_times[index.min(sorted_times.len() - 1)]
}
fn success_rate(&self) -> f64 {
if self.total_requests > 0 {
self.successful_requests as f64 / self.total_requests as f64
} else {
0.0
}
}
fn requests_per_second(&self, total_elapsed: Duration) -> f64 {
if total_elapsed.as_secs_f64() > 0.0 {
self.total_requests as f64 / total_elapsed.as_secs_f64()
} else {
0.0
}
}
}
/// Load test client with performance tracking
struct LoadTestClient {
client: Client,
token: Option<String>,
user_id: Option<String>,
}
impl LoadTestClient {
fn new() -> Self {
Self {
client: Client::builder()
.timeout(Duration::from_secs(60))
.build()
.expect("Failed to create load test client"),
token: None,
user_id: None,
}
}
/// Setup a test user for load testing
async fn setup_user(&mut self, user_index: usize) -> Result<String, Box<dyn std::error::Error + Send + Sync>> {
// Use UUID for guaranteed uniqueness across concurrent test execution
let test_id = Uuid::new_v4().simple().to_string();
let nanos = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos();
let username = format!("load_test_user_{}_{}_{}_{}_{}", user_index, test_id, nanos, Uuid::new_v4().simple(), chrono::Utc::now().timestamp_millis());
let email = format!("load_test_{}_{}@{}.example.com", user_index, test_id, nanos);
let password = "loadtestpassword123";
// Register user
let user_data = CreateUser {
username: username.clone(),
email: email.clone(),
password: password.to_string(),
role: Some(UserRole::User),
};
let register_response = self.client
.post(&format!("{}/api/auth/register", get_base_url()))
.json(&user_data)
.send()
.await?;
if !register_response.status().is_success() {
return Err(format!("Registration failed: {}", register_response.text().await?).into());
}
// Login to get token
let login_data = LoginRequest {
username: username.clone(),
password: password.to_string(),
};
let login_response = self.client
.post(&format!("{}/api/auth/login", get_base_url()))
.json(&login_data)
.send()
.await?;
if !login_response.status().is_success() {
return Err(format!("Login failed: {}", login_response.text().await?).into());
}
let login_result: LoginResponse = login_response.json().await?;
self.token = Some(login_result.token.clone());
// Get user info
let me_response = self.client
.get(&format!("{}/api/auth/me", get_base_url()))
.header("Authorization", format!("Bearer {}", login_result.token))
.send()
.await?;
if me_response.status().is_success() {
let user_info: Value = me_response.json().await?;
self.user_id = user_info["id"].as_str().map(|s| s.to_string());
}
Ok(login_result.token)
}
/// Perform a timed document upload
async fn timed_upload(&self, content: &str, filename: &str) -> Result<(DocumentUploadResponse, Duration), Box<dyn std::error::Error + Send + Sync>> {
let start = Instant::now();
let token = self.token.as_ref().ok_or("Not authenticated")?;
let part = reqwest::multipart::Part::text(content.to_string())
.file_name(filename.to_string())
.mime_str("text/plain")?;
let form = reqwest::multipart::Form::new()
.part("file", part);
let response = self.client
.post(&format!("{}/api/documents", get_base_url()))
.header("Authorization", format!("Bearer {}", token))
.multipart(form)
.send()
.await?;
let elapsed = start.elapsed();
if !response.status().is_success() {
return Err(format!("Upload failed: {}", response.text().await?).into());
}
let document: DocumentUploadResponse = response.json().await?;
Ok((document, elapsed))
}
/// Perform a timed document list request
async fn timed_list_documents(&self) -> Result<(Vec<Value>, Duration), Box<dyn std::error::Error + Send + Sync>> {
let start = Instant::now();
let token = self.token.as_ref().ok_or("Not authenticated")?;
let response = self.client
.get(&format!("{}/api/documents", get_base_url()))
.header("Authorization", format!("Bearer {}", token))
.send()
.await?;
let elapsed = start.elapsed();
if !response.status().is_success() {
return Err(format!("List documents failed: {}", response.text().await?).into());
}
let paginated_response: PaginatedDocumentsResponse = response.json().await?;
let documents_array: Vec<serde_json::Value> = paginated_response.documents
.into_iter()
.map(|doc| serde_json::to_value(doc).unwrap())
.collect();
Ok((documents_array, elapsed))
}
/// Perform a timed search request
async fn timed_search(&self, query: &str) -> Result<(Value, Duration), Box<dyn std::error::Error + Send + Sync>> {
let start = Instant::now();
let token = self.token.as_ref().ok_or("Not authenticated")?;
let response = self.client
.get(&format!("{}/api/search", get_base_url()))
.header("Authorization", format!("Bearer {}", token))
.query(&[("query", query)])
.send()
.await?;
let elapsed = start.elapsed();
if !response.status().is_success() {
return Err(format!("Search failed: {}", response.text().await?).into());
}
let results: Value = response.json().await?;
Ok((results, elapsed))
}
}
impl Clone for LoadTestClient {
fn clone(&self) -> Self {
Self {
client: self.client.clone(),
token: self.token.clone(),
user_id: self.user_id.clone(),
}
}
}
#[tokio::test]
async fn test_high_volume_document_uploads() {
println!("📤 Testing high-volume document uploads...");
let mut client = LoadTestClient::new();
client.setup_user(0).await
.expect("Failed to setup test user");
let upload_count = 50;
let concurrent_limit = 10;
let semaphore = Arc::new(Semaphore::new(concurrent_limit));
let mut metrics = PerformanceMetrics::new();
let overall_start = Instant::now();
println!("🚀 Starting {} concurrent uploads with limit of {}", upload_count, concurrent_limit);
let mut handles = Vec::new();
for i in 0..upload_count {
let client_clone = client.clone();
let semaphore_clone = semaphore.clone();
let handle = tokio::spawn(async move {
let _permit = semaphore_clone.acquire().await.expect("Failed to acquire semaphore");
let content = format!(
"Load test document content for upload {}.\n\
This document contains multiple lines of text to provide meaningful content for OCR processing.\n\
Generated at: {}\n\
Document ID: LOAD-TEST-{}\n\
Content length should be sufficient for testing purposes.",
i,
chrono::Utc::now().format("%Y-%m-%d %H:%M:%S"),
Uuid::new_v4()
);
let filename = format!("load_test_{}.txt", i);
let result = client_clone.timed_upload(&content, &filename).await;
match result {
Ok((document, duration)) => (i, true, duration, Some(document.id.to_string())),
Err(_) => (i, false, Duration::ZERO, None),
}
});
handles.push(handle);
}
// Wait for all uploads to complete
let mut upload_results = Vec::new();
for handle in handles {
let result = handle.await.expect("Upload task should complete");
upload_results.push(result);
}
let overall_elapsed = overall_start.elapsed();
// Collect metrics
for (_, success, duration, _) in &upload_results {
metrics.add_result(*success, *duration);
}
// Print performance results
println!("📊 High-Volume Upload Performance Results:");
println!(" Total uploads: {}", metrics.total_requests);
println!(" Successful: {}", metrics.successful_requests);
println!(" Failed: {}", metrics.failed_requests);
println!(" Success rate: {:.2}%", metrics.success_rate() * 100.0);
println!(" Total time: {:?}", overall_elapsed);
println!(" Throughput: {:.2} uploads/sec", metrics.requests_per_second(overall_elapsed));
println!(" Average response time: {:?}", metrics.average_response_time());
println!(" Min response time: {:?}", metrics.min_response_time);
println!(" Max response time: {:?}", metrics.max_response_time);
println!(" 95th percentile: {:?}", metrics.percentile(95.0));
println!(" 99th percentile: {:?}", metrics.percentile(99.0));
// Performance assertions
assert!(metrics.success_rate() >= 0.9, "Success rate should be at least 90%");
assert!(metrics.average_response_time() < Duration::from_secs(10), "Average response time should be under 10 seconds");
assert!(metrics.percentile(95.0) < Duration::from_secs(20), "95th percentile should be under 20 seconds");
println!("🎉 High-volume document uploads test passed!");
}
#[tokio::test]
async fn test_concurrent_user_operations() {
println!("👥 Testing concurrent user operations...");
let user_count = 10;
let operations_per_user = 5;
// Setup multiple users
let mut clients = Vec::new();
for i in 0..user_count {
let mut client = LoadTestClient::new();
client.setup_user(i).await
.expect(&format!("Failed to setup user {}", i));
clients.push(client);
}
println!(" Setup {} concurrent users", user_count);
let overall_start = Instant::now();
let mut all_handles = Vec::new();
// Each user performs multiple operations concurrently
for (user_index, client) in clients.into_iter().enumerate() {
let handle = tokio::spawn(async move {
let mut user_metrics = PerformanceMetrics::new();
let mut operation_handles = Vec::new();
// Upload documents
for op_index in 0..operations_per_user {
let client_clone = client.clone();
let upload_handle = tokio::spawn(async move {
let content = format!("User {} operation {} content", user_index, op_index);
let filename = format!("user_{}_op_{}.txt", user_index, op_index);
client_clone.timed_upload(&content, &filename).await
});
operation_handles.push(upload_handle);
}
// Wait for all operations for this user
let mut successful_ops = 0;
let mut total_ops = 0;
let mut total_time = Duration::ZERO;
for handle in operation_handles {
total_ops += 1;
match handle.await.expect("Operation should complete") {
Ok((_, duration)) => {
successful_ops += 1;
total_time += duration;
user_metrics.add_result(true, duration);
}
Err(_) => {
user_metrics.add_result(false, Duration::ZERO);
}
}
}
(user_index, successful_ops, total_ops, user_metrics)
});
all_handles.push(handle);
}
// Wait for all users to complete their operations
let mut all_user_results = Vec::new();
for handle in all_handles {
let result = handle.await.expect("User operations should complete");
all_user_results.push(result);
}
let overall_elapsed = overall_start.elapsed();
// Aggregate metrics across all users
let mut global_metrics = PerformanceMetrics::new();
for (user_index, successful_ops, total_ops, user_metrics) in &all_user_results {
println!(" User {}: {}/{} operations successful", user_index, successful_ops, total_ops);
// Merge user metrics into global metrics
for &response_time in &user_metrics.response_times {
global_metrics.add_result(true, response_time);
}
global_metrics.failed_requests += user_metrics.failed_requests;
}
println!("📊 Concurrent User Operations Performance Results:");
println!(" Total users: {}", user_count);
println!(" Operations per user: {}", operations_per_user);
println!(" Total operations: {}", global_metrics.total_requests + global_metrics.failed_requests);
println!(" Successful operations: {}", global_metrics.successful_requests);
println!(" Failed operations: {}", global_metrics.failed_requests);
println!(" Overall success rate: {:.2}%", global_metrics.success_rate() * 100.0);
println!(" Total time: {:?}", overall_elapsed);
println!(" Throughput: {:.2} operations/sec", global_metrics.requests_per_second(overall_elapsed));
println!(" Average response time: {:?}", global_metrics.average_response_time());
println!(" 95th percentile: {:?}", global_metrics.percentile(95.0));
// Performance assertions
assert!(global_metrics.success_rate() >= 0.8, "Success rate should be at least 80% under load");
assert!(global_metrics.average_response_time() < Duration::from_secs(15), "Average response time should be reasonable under load");
println!("🎉 Concurrent user operations test passed!");
}
#[tokio::test]
async fn test_search_performance_with_load() {
println!("🔍 Testing search performance under load...");
let mut client = LoadTestClient::new();
client.setup_user(0).await
.expect("Failed to setup test user");
// First, upload several documents to create a searchable dataset
let document_count = 20;
println!("📤 Creating dataset with {} documents...", document_count);
let mut document_ids = Vec::new();
for i in 0..document_count {
let content = format!(
"Document {} for search performance testing.\n\
This document contains searchable keywords like: performance, test, document, search, load.\n\
Additional content: technology, system, user, data, processing.\n\
Unique identifier: SEARCH-PERF-{}\n\
Number: {}",
i, Uuid::new_v4(), i
);
let filename = format!("search_perf_doc_{}.txt", i);
match client.timed_upload(&content, &filename).await {
Ok((document, _)) => {
document_ids.push(document.id.to_string());
}
Err(e) => {
println!(" Failed to upload document {}: {}", i, e);
}
}
}
println!(" Created dataset with {} documents", document_ids.len());
// Wait a moment for documents to be indexed
sleep(Duration::from_secs(5)).await;
// Perform multiple search queries concurrently
let search_queries = vec![
"performance",
"test document",
"search load",
"technology system",
"user data",
"processing",
"unique identifier",
"SEARCH-PERF",
];
let searches_per_query = 5;
let mut search_metrics = PerformanceMetrics::new();
let search_start = Instant::now();
let mut search_handles = Vec::new();
for (query_index, query) in search_queries.iter().enumerate() {
for search_index in 0..searches_per_query {
let client_clone = client.clone();
let query_clone = query.to_string();
let handle = tokio::spawn(async move {
let result = client_clone.timed_search(&query_clone).await;
match result {
Ok((results, duration)) => {
let result_count = results["documents"].as_array()
.map(|arr| arr.len())
.unwrap_or(0);
(query_index, search_index, true, duration, result_count)
}
Err(_) => (query_index, search_index, false, Duration::ZERO, 0),
}
});
search_handles.push(handle);
}
}
// Wait for all search operations to complete
let mut search_results = Vec::new();
for handle in search_handles {
let result = handle.await.expect("Search task should complete");
search_results.push(result);
}
let search_elapsed = search_start.elapsed();
// Collect search metrics
for (_, _, success, duration, result_count) in &search_results {
search_metrics.add_result(*success, *duration);
if *success {
println!(" Search returned {} results in {:?}", result_count, duration);
}
}
println!("📊 Search Performance Results:");
println!(" Total searches: {}", search_metrics.total_requests);
println!(" Successful searches: {}", search_metrics.successful_requests);
println!(" Failed searches: {}", search_metrics.failed_requests);
println!(" Success rate: {:.2}%", search_metrics.success_rate() * 100.0);
println!(" Total time: {:?}", search_elapsed);
println!(" Search throughput: {:.2} searches/sec", search_metrics.requests_per_second(search_elapsed));
println!(" Average search time: {:?}", search_metrics.average_response_time());
println!(" Min search time: {:?}", search_metrics.min_response_time);
println!(" Max search time: {:?}", search_metrics.max_response_time);
println!(" 95th percentile: {:?}", search_metrics.percentile(95.0));
// Performance assertions for search
assert!(search_metrics.success_rate() >= 0.9, "Search success rate should be at least 90%");
assert!(search_metrics.average_response_time() < Duration::from_secs(5), "Average search time should be under 5 seconds");
assert!(search_metrics.percentile(95.0) < Duration::from_secs(10), "95th percentile search time should be under 10 seconds");
println!("🎉 Search performance under load test passed!");
}
#[tokio::test]
async fn test_database_query_performance() {
println!("🗄 Testing database query performance...");
let mut client = LoadTestClient::new();
client.setup_user(0).await
.expect("Failed to setup test user");
// Test repeated document list queries to stress database
let query_count = 100;
let concurrent_queries = 20;
let semaphore = Arc::new(Semaphore::new(concurrent_queries));
let mut query_metrics = PerformanceMetrics::new();
let query_start = Instant::now();
println!("🚀 Starting {} database queries with concurrency {}", query_count, concurrent_queries);
let mut query_handles = Vec::new();
for i in 0..query_count {
let client_clone = client.clone();
let semaphore_clone = semaphore.clone();
let handle = tokio::spawn(async move {
let _permit = semaphore_clone.acquire().await.expect("Failed to acquire semaphore");
let result = client_clone.timed_list_documents().await;
match result {
Ok((documents, duration)) => (i, true, duration, documents.len()),
Err(_) => (i, false, Duration::ZERO, 0),
}
});
query_handles.push(handle);
}
// Wait for all queries to complete
let mut query_results = Vec::new();
for handle in query_handles {
let result = handle.await.expect("Query task should complete");
query_results.push(result);
}
let query_elapsed = query_start.elapsed();
// Collect query metrics
for (_, success, duration, doc_count) in &query_results {
query_metrics.add_result(*success, *duration);
if *success && doc_count > &0 {
println!(" Query returned {} documents in {:?}", doc_count, duration);
}
}
println!("📊 Database Query Performance Results:");
println!(" Total queries: {}", query_metrics.total_requests);
println!(" Successful queries: {}", query_metrics.successful_requests);
println!(" Failed queries: {}", query_metrics.failed_requests);
println!(" Success rate: {:.2}%", query_metrics.success_rate() * 100.0);
println!(" Total time: {:?}", query_elapsed);
println!(" Query throughput: {:.2} queries/sec", query_metrics.requests_per_second(query_elapsed));
println!(" Average query time: {:?}", query_metrics.average_response_time());
println!(" Min query time: {:?}", query_metrics.min_response_time);
println!(" Max query time: {:?}", query_metrics.max_response_time);
println!(" 95th percentile: {:?}", query_metrics.percentile(95.0));
println!(" 99th percentile: {:?}", query_metrics.percentile(99.0));
// Performance assertions for database queries
assert!(query_metrics.success_rate() >= 0.95, "Database query success rate should be at least 95%");
assert!(query_metrics.average_response_time() < Duration::from_secs(2), "Average query time should be under 2 seconds");
assert!(query_metrics.percentile(95.0) < Duration::from_secs(5), "95th percentile query time should be under 5 seconds");
println!("🎉 Database query performance test passed!");
}
#[tokio::test]
async fn test_system_stability_under_sustained_load() {
println!("🔄 Testing system stability under sustained load...");
let mut client = LoadTestClient::new();
client.setup_user(0).await
.expect("Failed to setup test user");
let test_duration = Duration::from_secs(60); // 1 minute sustained load
let operation_interval = Duration::from_millis(500); // Operation every 500ms
let mut stability_metrics = PerformanceMetrics::new();
let stability_start = Instant::now();
println!(" Running sustained load for {:?} with operations every {:?}", test_duration, operation_interval);
let mut operation_counter = 0;
let mut response_time_samples = Vec::new();
while stability_start.elapsed() < test_duration {
let operation_start = Instant::now();
// Alternate between different operation types
let operation_result = match operation_counter % 3 {
0 => {
// Document list operation
client.timed_list_documents().await
.map(|(docs, duration)| (format!("list({} docs)", docs.len()), duration))
}
1 => {
// Document upload operation
let content = format!("Stability test document {}", operation_counter);
let filename = format!("stability_{}.txt", operation_counter);
client.timed_upload(&content, &filename).await
.map(|(doc, duration)| (format!("upload({})", doc.id), duration))
}
_ => {
// Search operation
let queries = ["test", "document", "stability"];
let query = queries[operation_counter % queries.len()];
client.timed_search(query).await
.map(|(results, duration)| {
let count = results["documents"].as_array().map(|a| a.len()).unwrap_or(0);
(format!("search({} results)", count), duration)
})
}
};
let operation_elapsed = operation_start.elapsed();
match operation_result {
Ok((operation_desc, response_time)) => {
stability_metrics.add_result(true, response_time);
response_time_samples.push((stability_start.elapsed(), response_time));
println!(" {:?}: {} completed in {:?}",
stability_start.elapsed(), operation_desc, response_time);
}
Err(e) => {
stability_metrics.add_result(false, operation_elapsed);
println!(" {:?}: Operation failed: {}", stability_start.elapsed(), e);
}
}
operation_counter += 1;
// Sleep to maintain operation interval
if operation_elapsed < operation_interval {
sleep(operation_interval - operation_elapsed).await;
}
}
let total_elapsed = stability_start.elapsed();
// Analyze stability over time
let sample_windows = 6; // Divide test into 6 windows
let window_duration = test_duration / sample_windows as u32;
println!("📊 System Stability Results:");
println!(" Test duration: {:?}", total_elapsed);
println!(" Total operations: {}", stability_metrics.total_requests);
println!(" Successful operations: {}", stability_metrics.successful_requests);
println!(" Failed operations: {}", stability_metrics.failed_requests);
println!(" Overall success rate: {:.2}%", stability_metrics.success_rate() * 100.0);
println!(" Average throughput: {:.2} ops/sec", stability_metrics.requests_per_second(total_elapsed));
println!(" Average response time: {:?}", stability_metrics.average_response_time());
// Analyze response time stability across windows
for window in 0..sample_windows {
let window_start = window_duration * window as u32;
let window_end = window_duration * (window + 1) as u32;
let window_samples: Vec<_> = response_time_samples.iter()
.filter(|(elapsed, _)| *elapsed >= window_start && *elapsed < window_end)
.map(|(_, duration)| *duration)
.collect();
if !window_samples.is_empty() {
let window_avg = window_samples.iter().sum::<Duration>() / window_samples.len() as u32;
println!(" Window {} ({:?}-{:?}): {} ops, avg {:?}",
window + 1, window_start, window_end, window_samples.len(), window_avg);
}
}
// Stability assertions
assert!(stability_metrics.success_rate() >= 0.8, "Success rate should remain above 80% under sustained load");
assert!(operation_counter >= 100, "Should complete at least 100 operations during stability test");
println!("🎉 System stability under sustained load test passed!");
}