feat(tests): create even further comprehensive tests for migration testing, because I'm traumatized from Vikunja

This commit is contained in:
perf3ct 2025-07-11 17:25:29 +00:00
parent 897c7ff15c
commit 69c94ab1e5
6 changed files with 3284 additions and 0 deletions

View File

@ -0,0 +1,640 @@
use readur::test_utils::TestContext;
use sqlx::{PgPool, Row};
use uuid::Uuid;
use std::collections::HashMap;
#[cfg(test)]
mod comprehensive_migration_tests {
use super::*;
#[tokio::test]
async fn test_migration_with_prefilled_data() {
let ctx = TestContext::new().await;
let pool = ctx.state.db.get_pool();
// Step 1: Prefill the database with test data
let test_data = prefill_test_data(pool).await;
// Step 2: Verify the prefilled data exists
verify_prefilled_data(pool, &test_data).await;
// Step 3: Simulate and test the failed documents migration
test_failed_documents_migration(pool, &test_data).await;
// Step 4: Verify schema integrity after migration
verify_schema_integrity(pool).await;
// Step 5: Test data consistency after migration
verify_data_consistency_after_migration(pool, &test_data).await;
}
#[tokio::test]
async fn test_migration_preserves_data_integrity() {
let ctx = TestContext::new().await;
let pool = ctx.state.db.get_pool();
// Create comprehensive test data covering all edge cases
let user_id = create_test_user(pool).await;
// Insert various types of documents
let document_scenarios = vec![
DocumentScenario {
filename: "normal_success.pdf",
ocr_status: "completed",
ocr_failure_reason: None,
ocr_error: None,
ocr_confidence: Some(0.95),
ocr_text: Some("This is a successful OCR"),
file_size: 1024,
},
DocumentScenario {
filename: "low_confidence_fail.pdf",
ocr_status: "failed",
ocr_failure_reason: Some("low_ocr_confidence"),
ocr_error: Some("OCR confidence below threshold"),
ocr_confidence: Some(0.3),
ocr_text: Some("Partially recognized text"),
file_size: 2048,
},
DocumentScenario {
filename: "timeout_fail.pdf",
ocr_status: "failed",
ocr_failure_reason: Some("timeout"),
ocr_error: Some("OCR processing timed out after 60 seconds"),
ocr_confidence: None,
ocr_text: None,
file_size: 10485760, // 10MB
},
DocumentScenario {
filename: "memory_fail.pdf",
ocr_status: "failed",
ocr_failure_reason: Some("memory_limit"),
ocr_error: Some("Memory limit exceeded"),
ocr_confidence: None,
ocr_text: None,
file_size: 52428800, // 50MB
},
DocumentScenario {
filename: "corrupted_file.pdf",
ocr_status: "failed",
ocr_failure_reason: Some("file_corrupted"),
ocr_error: Some("PDF file appears to be corrupted"),
ocr_confidence: None,
ocr_text: None,
file_size: 512,
},
DocumentScenario {
filename: "unsupported.xyz",
ocr_status: "failed",
ocr_failure_reason: Some("unsupported_format"),
ocr_error: Some("File format not supported"),
ocr_confidence: None,
ocr_text: None,
file_size: 256,
},
DocumentScenario {
filename: "pending_ocr.pdf",
ocr_status: "pending",
ocr_failure_reason: None,
ocr_error: None,
ocr_confidence: None,
ocr_text: None,
file_size: 4096,
},
];
// Insert all test documents
let mut document_ids = HashMap::new();
for scenario in &document_scenarios {
let doc_id = insert_test_document(pool, user_id, scenario).await;
document_ids.insert(scenario.filename, doc_id);
}
// Count documents before migration
let failed_count_before: i64 = sqlx::query_scalar(
"SELECT COUNT(*) FROM documents WHERE ocr_status = 'failed'"
)
.fetch_one(pool)
.await
.unwrap();
let successful_count_before: i64 = sqlx::query_scalar(
"SELECT COUNT(*) FROM documents WHERE ocr_status = 'completed'"
)
.fetch_one(pool)
.await
.unwrap();
// Verify the migration query works correctly (simulate the migration)
let migration_preview = sqlx::query(
r#"
SELECT
d.filename,
d.ocr_failure_reason,
CASE
WHEN d.ocr_failure_reason = 'low_ocr_confidence' THEN 'low_ocr_confidence'
WHEN d.ocr_failure_reason = 'timeout' THEN 'ocr_timeout'
WHEN d.ocr_failure_reason = 'memory_limit' THEN 'ocr_memory_limit'
WHEN d.ocr_failure_reason = 'pdf_parsing_error' THEN 'pdf_parsing_error'
WHEN d.ocr_failure_reason = 'corrupted' OR d.ocr_failure_reason = 'file_corrupted' THEN 'file_corrupted'
WHEN d.ocr_failure_reason = 'unsupported_format' THEN 'unsupported_format'
WHEN d.ocr_failure_reason = 'access_denied' THEN 'access_denied'
ELSE 'other'
END as mapped_failure_reason
FROM documents d
WHERE d.ocr_status = 'failed'
"#
)
.fetch_all(pool)
.await
.unwrap();
// Verify mappings are correct
for row in migration_preview {
let filename: String = row.get("filename");
let original_reason: Option<String> = row.get("ocr_failure_reason");
let mapped_reason: String = row.get("mapped_failure_reason");
println!("Migration mapping: {} - {:?} -> {}", filename, original_reason, mapped_reason);
// Verify specific mappings
match original_reason.as_deref() {
Some("low_ocr_confidence") => assert_eq!(mapped_reason, "low_ocr_confidence"),
Some("timeout") => assert_eq!(mapped_reason, "ocr_timeout"),
Some("memory_limit") => assert_eq!(mapped_reason, "ocr_memory_limit"),
Some("file_corrupted") => assert_eq!(mapped_reason, "file_corrupted"),
Some("unsupported_format") => assert_eq!(mapped_reason, "unsupported_format"),
_ => assert_eq!(mapped_reason, "other"),
}
}
// Verify that successful and pending documents are not affected
assert_eq!(successful_count_before, 1, "Should have 1 successful document");
assert_eq!(failed_count_before, 5, "Should have 5 failed documents");
}
#[tokio::test]
async fn test_migration_with_ocr_queue_data() {
let ctx = TestContext::new().await;
let pool = ctx.state.db.get_pool();
let user_id = create_test_user(pool).await;
// Create a document with OCR queue history
let doc_id = Uuid::new_v4();
sqlx::query(
r#"
INSERT INTO documents (id, user_id, filename, original_filename, file_path, file_size, mime_type, ocr_status, ocr_failure_reason, ocr_error)
VALUES ($1, $2, $3, $3, '/test/path', 1000, 'application/pdf', 'failed', 'timeout', 'OCR timeout after retries')
"#
)
.bind(doc_id)
.bind(user_id)
.bind("retry_test.pdf")
.execute(pool)
.await
.unwrap();
// Add OCR queue entries to simulate retry history
for i in 0..3 {
sqlx::query(
r#"
INSERT INTO ocr_queue (document_id, priority, status, error_message, created_at)
VALUES ($1, $2, $3, $4, NOW() - INTERVAL '1 hour' * $5)
"#
)
.bind(doc_id)
.bind(1)
.bind(if i < 2 { "failed" } else { "processing" })
.bind(if i < 2 { Some("Retry attempt failed") } else { None })
.bind((3 - i) as i32)
.execute(pool)
.await
.unwrap();
}
// Test the migration query with retry count
let result = sqlx::query(
r#"
SELECT
d.filename,
d.ocr_failure_reason,
COALESCE(q.retry_count, 0) as retry_count
FROM documents d
LEFT JOIN (
SELECT document_id, COUNT(*) as retry_count
FROM ocr_queue
WHERE status IN ('failed', 'completed')
GROUP BY document_id
) q ON d.id = q.document_id
WHERE d.id = $1
"#
)
.bind(doc_id)
.fetch_one(pool)
.await
.unwrap();
let retry_count: i64 = result.get("retry_count");
assert_eq!(retry_count, 2, "Should have 2 failed retry attempts");
}
#[tokio::test]
async fn test_migration_handles_null_values() {
let ctx = TestContext::new().await;
let pool = ctx.state.db.get_pool();
let user_id = create_test_user(pool).await;
// Insert documents with various NULL values
let null_scenarios = vec![
("null_reason.pdf", None, Some("Error without reason")),
("null_error.pdf", Some("unknown"), None),
("all_nulls.pdf", None, None),
];
for (filename, reason, error) in &null_scenarios {
sqlx::query(
r#"
INSERT INTO documents (user_id, filename, original_filename, file_path, file_size, mime_type, ocr_status, ocr_failure_reason, ocr_error)
VALUES ($1, $2, $2, '/test/path', 1000, 'application/pdf', 'failed', $3, $4)
"#
)
.bind(user_id)
.bind(filename)
.bind(reason)
.bind(error)
.execute(pool)
.await
.unwrap();
}
// Verify migration handles NULLs correctly
let migrated_data = sqlx::query(
r#"
SELECT
filename,
ocr_failure_reason,
CASE
WHEN ocr_failure_reason = 'low_ocr_confidence' THEN 'low_ocr_confidence'
WHEN ocr_failure_reason = 'timeout' THEN 'ocr_timeout'
WHEN ocr_failure_reason = 'memory_limit' THEN 'ocr_memory_limit'
WHEN ocr_failure_reason = 'pdf_parsing_error' THEN 'pdf_parsing_error'
WHEN ocr_failure_reason = 'corrupted' OR ocr_failure_reason = 'file_corrupted' THEN 'file_corrupted'
WHEN ocr_failure_reason = 'unsupported_format' THEN 'unsupported_format'
WHEN ocr_failure_reason = 'access_denied' THEN 'access_denied'
ELSE 'other'
END as mapped_reason,
ocr_error
FROM documents
WHERE user_id = $1 AND ocr_status = 'failed'
ORDER BY filename
"#
)
.bind(user_id)
.fetch_all(pool)
.await
.unwrap();
assert_eq!(migrated_data.len(), 3);
for row in migrated_data {
let mapped_reason: String = row.get("mapped_reason");
assert_eq!(mapped_reason, "other", "NULL or unknown reasons should map to 'other'");
}
}
#[tokio::test]
async fn test_migration_performance_with_large_dataset() {
let ctx = TestContext::new().await;
let pool = ctx.state.db.get_pool();
let user_id = create_test_user(pool).await;
// Insert a large number of failed documents
let batch_size = 100;
let start_time = std::time::Instant::now();
for batch in 0..10 {
let mut query = String::from(
"INSERT INTO documents (user_id, filename, original_filename, file_path, file_size, mime_type, ocr_status, ocr_failure_reason, ocr_error) VALUES "
);
let mut _values: Vec<String> = Vec::new();
for i in 0..batch_size {
let doc_num = batch * batch_size + i;
let filename = format!("bulk_doc_{}.pdf", doc_num);
let reason = match doc_num % 5 {
0 => "low_ocr_confidence",
1 => "timeout",
2 => "memory_limit",
3 => "file_corrupted",
_ => "unknown_error",
};
if i > 0 {
query.push_str(", ");
}
query.push_str(&format!("($1, '{}', '{}', '/test/path', 1000, 'application/pdf', 'failed', '{}', 'Test error')",
filename, filename, reason));
}
sqlx::query(&query)
.bind(user_id)
.execute(pool)
.await
.unwrap();
}
let insert_duration = start_time.elapsed();
println!("Inserted 1000 documents in {:?}", insert_duration);
// Measure migration query performance
let migration_start = std::time::Instant::now();
let count: i64 = sqlx::query_scalar(
"SELECT COUNT(*) FROM documents WHERE ocr_status = 'failed'"
)
.fetch_one(pool)
.await
.unwrap();
assert_eq!(count, 1000, "Should have 1000 failed documents");
// Simulate the migration SELECT
let _migration_data = sqlx::query(
r#"
SELECT * FROM documents WHERE ocr_status = 'failed'
"#
)
.fetch_all(pool)
.await
.unwrap();
let migration_duration = migration_start.elapsed();
println!("Migration query completed in {:?}", migration_duration);
// Performance assertion - migration should complete reasonably fast
assert!(migration_duration.as_secs() < 5, "Migration query should complete within 5 seconds");
}
// Helper functions
struct TestData {
user_id: Uuid,
document_ids: HashMap<String, Uuid>,
failure_scenarios: Vec<(String, String, String)>,
}
struct DocumentScenario {
filename: &'static str,
ocr_status: &'static str,
ocr_failure_reason: Option<&'static str>,
ocr_error: Option<&'static str>,
ocr_confidence: Option<f32>,
ocr_text: Option<&'static str>,
file_size: i64,
}
async fn create_test_user(pool: &PgPool) -> Uuid {
let user_id = Uuid::new_v4();
let unique_suffix = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos();
let username = format!("test_migration_user_{}", unique_suffix);
let email = format!("test_migration_{}@example.com", unique_suffix);
sqlx::query(
"INSERT INTO users (id, username, email, password_hash, role) VALUES ($1, $2, $3, $4, $5)"
)
.bind(user_id)
.bind(&username)
.bind(&email)
.bind("test_hash")
.bind("user")
.execute(pool)
.await
.unwrap();
user_id
}
async fn insert_test_document(pool: &PgPool, user_id: Uuid, scenario: &DocumentScenario) -> Uuid {
let doc_id = Uuid::new_v4();
sqlx::query(
r#"
INSERT INTO documents (
id, user_id, filename, original_filename, file_path, file_size,
mime_type, ocr_status, ocr_failure_reason, ocr_error,
ocr_confidence, ocr_text
) VALUES (
$1, $2, $3, $3, '/test/path', $4, $5, $6, $7, $8, $9, $10
)
"#
)
.bind(doc_id)
.bind(user_id)
.bind(scenario.filename)
.bind(scenario.file_size)
.bind(if scenario.filename.ends_with(".pdf") { "application/pdf" } else { "application/octet-stream" })
.bind(scenario.ocr_status)
.bind(scenario.ocr_failure_reason)
.bind(scenario.ocr_error)
.bind(scenario.ocr_confidence)
.bind(scenario.ocr_text)
.execute(pool)
.await
.unwrap();
doc_id
}
async fn prefill_test_data(pool: &PgPool) -> TestData {
let user_id = create_test_user(pool).await;
let mut document_ids = HashMap::new();
let failure_scenarios = vec![
("timeout_doc.pdf".to_string(), "timeout".to_string(), "OCR processing timed out".to_string()),
("memory_doc.pdf".to_string(), "memory_limit".to_string(), "Memory limit exceeded".to_string()),
("corrupt_doc.pdf".to_string(), "file_corrupted".to_string(), "File is corrupted".to_string()),
("low_conf_doc.pdf".to_string(), "low_ocr_confidence".to_string(), "Confidence too low".to_string()),
];
// Insert test documents
for (filename, reason, error) in &failure_scenarios {
let doc_id = Uuid::new_v4();
sqlx::query(
r#"
INSERT INTO documents (
id, user_id, filename, original_filename, file_path, file_size,
mime_type, ocr_status, ocr_failure_reason, ocr_error
) VALUES (
$1, $2, $3, $3, '/test/path', 1000, 'application/pdf',
'failed', $4, $5
)
"#
)
.bind(doc_id)
.bind(user_id)
.bind(filename)
.bind(reason)
.bind(error)
.execute(pool)
.await
.unwrap();
document_ids.insert(filename.clone(), doc_id);
}
TestData {
user_id,
document_ids,
failure_scenarios,
}
}
async fn verify_prefilled_data(pool: &PgPool, test_data: &TestData) {
let count: i64 = sqlx::query_scalar(
"SELECT COUNT(*) FROM documents WHERE user_id = $1 AND ocr_status = 'failed'"
)
.bind(test_data.user_id)
.fetch_one(pool)
.await
.unwrap();
assert_eq!(count, test_data.failure_scenarios.len() as i64,
"All test documents should be inserted");
}
async fn test_failed_documents_migration(pool: &PgPool, test_data: &TestData) {
// Simulate the migration
let result = sqlx::query(
r#"
INSERT INTO failed_documents (
user_id, filename, original_filename, file_path, file_size,
mime_type, error_message, failure_reason, failure_stage, ingestion_source
)
SELECT
d.user_id, d.filename, d.original_filename, d.file_path, d.file_size,
d.mime_type, d.ocr_error,
CASE
WHEN d.ocr_failure_reason = 'low_ocr_confidence' THEN 'low_ocr_confidence'
WHEN d.ocr_failure_reason = 'timeout' THEN 'ocr_timeout'
WHEN d.ocr_failure_reason = 'memory_limit' THEN 'ocr_memory_limit'
WHEN d.ocr_failure_reason = 'pdf_parsing_error' THEN 'pdf_parsing_error'
WHEN d.ocr_failure_reason = 'corrupted' OR d.ocr_failure_reason = 'file_corrupted' THEN 'file_corrupted'
WHEN d.ocr_failure_reason = 'unsupported_format' THEN 'unsupported_format'
WHEN d.ocr_failure_reason = 'access_denied' THEN 'access_denied'
ELSE 'other'
END as failure_reason,
'ocr' as failure_stage,
'test_migration' as ingestion_source
FROM documents d
WHERE d.ocr_status = 'failed' AND d.user_id = $1
"#
)
.bind(test_data.user_id)
.execute(pool)
.await;
assert!(result.is_ok(), "Migration should succeed");
// Verify all documents were migrated
let migrated_count: i64 = sqlx::query_scalar(
"SELECT COUNT(*) FROM failed_documents WHERE user_id = $1 AND ingestion_source = 'test_migration'"
)
.bind(test_data.user_id)
.fetch_one(pool)
.await
.unwrap();
assert_eq!(migrated_count, test_data.failure_scenarios.len() as i64,
"All failed documents should be migrated");
}
async fn verify_schema_integrity(pool: &PgPool) {
// Check that all expected tables exist
let tables = sqlx::query(
"SELECT table_name FROM information_schema.tables WHERE table_schema = 'public'"
)
.fetch_all(pool)
.await
.unwrap();
let table_names: Vec<String> = tables.iter()
.map(|row| row.get("table_name"))
.collect();
assert!(table_names.contains(&"documents".to_string()));
assert!(table_names.contains(&"failed_documents".to_string()));
assert!(table_names.contains(&"users".to_string()));
assert!(table_names.contains(&"ocr_queue".to_string()));
// Check that constraints exist on failed_documents
let constraints = sqlx::query(
r#"
SELECT constraint_name, constraint_type
FROM information_schema.table_constraints
WHERE table_name = 'failed_documents' AND constraint_type = 'CHECK'
"#
)
.fetch_all(pool)
.await
.unwrap();
let constraint_names: Vec<String> = constraints.iter()
.map(|row| row.get("constraint_name"))
.collect();
assert!(constraint_names.iter().any(|name| name.contains("failure_reason")),
"Should have check constraint for failure_reason");
assert!(constraint_names.iter().any(|name| name.contains("failure_stage")),
"Should have check constraint for failure_stage");
}
async fn verify_data_consistency_after_migration(pool: &PgPool, test_data: &TestData) {
// Verify specific failure reason mappings
let mappings = vec![
("timeout_doc.pdf", "ocr_timeout"),
("memory_doc.pdf", "ocr_memory_limit"),
("corrupt_doc.pdf", "file_corrupted"),
("low_conf_doc.pdf", "low_ocr_confidence"),
];
for (filename, expected_reason) in mappings {
let result = sqlx::query(
"SELECT failure_reason FROM failed_documents WHERE filename = $1 AND user_id = $2"
)
.bind(filename)
.bind(test_data.user_id)
.fetch_optional(pool)
.await
.unwrap();
assert!(result.is_some(), "Document {} should exist in failed_documents", filename);
let actual_reason: String = result.unwrap().get("failure_reason");
assert_eq!(actual_reason, expected_reason,
"Failure reason for {} should be mapped correctly", filename);
}
// Verify all migrated documents have proper metadata
let all_migrated = sqlx::query(
"SELECT * FROM failed_documents WHERE user_id = $1"
)
.bind(test_data.user_id)
.fetch_all(pool)
.await
.unwrap();
for row in all_migrated {
let failure_stage: String = row.get("failure_stage");
assert_eq!(failure_stage, "ocr", "All migrated documents should have 'ocr' as failure_stage");
let filename: String = row.get("filename");
assert!(test_data.document_ids.contains_key(&filename),
"Migrated document should be from our test data");
}
}
}

View File

@ -0,0 +1,755 @@
use readur::test_utils::TestContext;
use sqlx::{PgPool, Row};
use uuid::Uuid;
use std::process::Command;
use std::path::Path;
use sha2::{Sha256, Digest};
#[cfg(test)]
mod latest_migration_tests {
use super::*;
#[tokio::test]
async fn test_latest_migration_from_previous_state() {
// Step 1: Get the migration files and identify the latest two
let migration_files = get_sorted_migration_files();
if migration_files.len() < 2 {
println!("✅ Only one or no migrations found - skipping previous state test");
return;
}
let second_to_last = &migration_files[migration_files.len() - 2];
let latest = &migration_files[migration_files.len() - 1];
println!("🔄 Testing migration from second-to-last to latest:");
println!(" Previous: {}", extract_migration_name(second_to_last));
println!(" Latest: {}", extract_migration_name(latest));
// Step 2: Create a fresh database and apply migrations up to second-to-last
let ctx = TestContext::new().await;
let pool = ctx.state.db.get_pool();
// Apply all migrations except the latest one using SQLx migration runner
let migration_files = get_sorted_migration_files();
let target_index = migration_files.iter()
.position(|f| f == second_to_last)
.expect("Second-to-last migration not found");
// Apply migrations up to target_index (excluding the latest)
apply_selected_migrations(pool, &migration_files[..target_index+1]).await;
// Step 3: Prefill the database with realistic data in the previous state
let test_data = prefill_database_for_previous_state(pool).await;
// Step 4: Apply the latest migration
apply_single_migration(pool, latest).await;
// Step 5: Validate the migration succeeded and data is intact
validate_latest_migration_success(pool, &test_data, latest).await;
println!("✅ Latest migration successfully applied from previous state");
}
#[tokio::test]
async fn test_latest_migration_with_edge_case_data() {
let migration_files = get_sorted_migration_files();
if migration_files.len() < 2 {
println!("✅ Only one or no migrations found - skipping edge case test");
return;
}
let second_to_last = &migration_files[migration_files.len() - 2];
let latest = &migration_files[migration_files.len() - 1];
println!("🧪 Testing latest migration with edge case data:");
println!(" Testing migration: {}", extract_migration_name(latest));
let ctx = TestContext::new().await;
let pool = ctx.state.db.get_pool();
// Apply migrations up to second-to-last
let migration_files = get_sorted_migration_files();
let target_index = migration_files.iter()
.position(|f| f == second_to_last)
.expect("Second-to-last migration not found");
apply_selected_migrations(pool, &migration_files[..target_index+1]).await;
// Create edge case data that might break the migration
let edge_case_data = create_edge_case_data(pool).await;
// Apply the latest migration
let migration_result = apply_single_migration_safe(pool, latest).await;
match migration_result {
Ok(_) => {
println!("✅ Latest migration handled edge cases successfully");
validate_edge_case_migration(pool, &edge_case_data).await;
}
Err(e) => {
panic!("❌ Latest migration failed with edge case data: {:?}", e);
}
}
}
#[tokio::test]
async fn test_latest_migration_rollback_safety() {
let migration_files = get_sorted_migration_files();
if migration_files.len() < 2 {
println!("✅ Only one or no migrations found - skipping rollback safety test");
return;
}
let second_to_last = &migration_files[migration_files.len() - 2];
let latest = &migration_files[migration_files.len() - 1];
println!("🔒 Testing rollback safety for latest migration:");
let ctx = TestContext::new().await;
let pool = ctx.state.db.get_pool();
// Apply migrations up to second-to-last
let migration_files = get_sorted_migration_files();
let target_index = migration_files.iter()
.position(|f| f == second_to_last)
.expect("Second-to-last migration not found");
apply_selected_migrations(pool, &migration_files[..target_index+1]).await;
// Capture schema snapshot before latest migration
let schema_before = capture_schema_snapshot(pool).await;
// Apply latest migration
apply_single_migration(pool, latest).await;
// Capture schema after latest migration
let schema_after = capture_schema_snapshot(pool).await;
// Validate schema changes are reasonable
validate_schema_changes(&schema_before, &schema_after, latest);
// Test that the migration doesn't break existing functionality
test_basic_database_operations(pool).await;
println!("✅ Latest migration rollback safety verified");
}
#[tokio::test]
async fn test_latest_migration_performance() {
let migration_files = get_sorted_migration_files();
if migration_files.len() < 1 {
println!("✅ No migrations found - skipping performance test");
return;
}
let latest = &migration_files[migration_files.len() - 1];
println!("⚡ Testing performance of latest migration:");
println!(" Migration: {}", extract_migration_name(latest));
let ctx = TestContext::new().await;
let pool = ctx.state.db.get_pool();
// Apply all migrations except the latest
if migration_files.len() > 1 {
let second_to_last = &migration_files[migration_files.len() - 2];
let target_index = migration_files.iter()
.position(|f| f == second_to_last)
.expect("Second-to-last migration not found");
apply_selected_migrations(pool, &migration_files[..target_index+1]).await;
}
// Create a substantial amount of data
create_performance_test_data(pool, 1000).await;
// Measure migration time
let start_time = std::time::Instant::now();
apply_single_migration(pool, latest).await;
let migration_duration = start_time.elapsed();
println!("⏱️ Latest migration completed in: {:?}", migration_duration);
// Performance assertion - should complete reasonably fast even with data
assert!(
migration_duration.as_secs() < 10,
"Latest migration took too long: {:?}. Consider optimizing for larger datasets.",
migration_duration
);
// Verify data integrity after migration
verify_data_integrity_after_performance_test(pool).await;
println!("✅ Latest migration performance acceptable");
}
// Helper functions
struct TestData {
users: Vec<TestUser>,
documents: Vec<TestDocument>,
failed_documents: Vec<TestFailedDocument>,
metadata: DatabaseMetadata,
}
struct TestUser {
id: Uuid,
username: String,
email: String,
}
struct TestDocument {
id: Uuid,
user_id: Uuid,
filename: String,
status: String,
}
struct TestFailedDocument {
id: Uuid,
user_id: Uuid,
filename: String,
reason: String,
}
struct DatabaseMetadata {
table_count: usize,
total_records: usize,
schema_version: String,
}
struct SchemaSnapshot {
tables: Vec<String>,
columns: std::collections::HashMap<String, Vec<String>>,
constraints: Vec<String>,
}
fn get_sorted_migration_files() -> Vec<String> {
let migrations_dir = Path::new("migrations");
let mut files = Vec::new();
if let Ok(entries) = std::fs::read_dir(migrations_dir) {
for entry in entries {
if let Ok(entry) = entry {
let path = entry.path();
if path.extension().and_then(|s| s.to_str()) == Some("sql") {
files.push(path.to_string_lossy().to_string());
}
}
}
}
files.sort();
files
}
fn extract_migration_name(filepath: &str) -> String {
Path::new(filepath)
.file_name()
.unwrap()
.to_string_lossy()
.to_string()
}
async fn apply_selected_migrations(pool: &PgPool, migration_files: &[String]) {
// Create the migrations table if it doesn't exist
sqlx::query(
"CREATE TABLE IF NOT EXISTS _sqlx_migrations (
version BIGINT PRIMARY KEY,
description TEXT NOT NULL,
installed_on TIMESTAMPTZ NOT NULL DEFAULT NOW(),
success BOOLEAN NOT NULL,
checksum BYTEA NOT NULL,
execution_time BIGINT NOT NULL
)"
)
.execute(pool)
.await
.expect("Failed to create migrations table");
for migration_file in migration_files {
let migration_name = extract_migration_name(migration_file);
// Extract version from filename
let version = migration_name
.split('_')
.next()
.and_then(|s| s.parse::<i64>().ok())
.expect(&format!("Failed to parse migration version from {}", migration_name));
// Check if this migration is already applied
let exists = sqlx::query_scalar::<_, bool>(
"SELECT EXISTS(SELECT 1 FROM _sqlx_migrations WHERE version = $1)"
)
.bind(version)
.fetch_one(pool)
.await
.unwrap_or(false);
if exists {
println!(" ⏭️ Skipped (already applied): {}", migration_name);
continue;
}
// Apply this migration
let content = std::fs::read_to_string(migration_file)
.expect(&format!("Failed to read migration file: {}", migration_file));
let start_time = std::time::Instant::now();
// Use raw SQL execution to handle complex PostgreSQL statements including functions
sqlx::raw_sql(&content)
.execute(pool)
.await
.expect(&format!("Failed to apply migration: {}", migration_name));
let execution_time = start_time.elapsed().as_millis() as i64;
let checksum = Sha256::digest(content.as_bytes()).to_vec();
// Record the migration as applied
sqlx::query(
"INSERT INTO _sqlx_migrations (version, description, success, checksum, execution_time)
VALUES ($1, $2, $3, $4, $5)"
)
.bind(version)
.bind(migration_name.clone())
.bind(true)
.bind(checksum)
.bind(execution_time)
.execute(pool)
.await
.expect("Failed to record migration");
println!(" ✓ Applied: {}", migration_name);
}
}
async fn apply_single_migration(pool: &PgPool, migration_file: &str) {
let result = apply_single_migration_safe(pool, migration_file).await;
result.expect(&format!("Failed to apply migration: {}", migration_file));
}
async fn apply_single_migration_safe(pool: &PgPool, migration_file: &str) -> Result<(), sqlx::Error> {
let content = std::fs::read_to_string(migration_file)
.expect(&format!("Failed to read migration file: {}", migration_file));
let migration_name = extract_migration_name(migration_file);
println!(" 🔄 Applying: {}", migration_name);
// Use raw SQL execution to handle complex PostgreSQL statements including functions
sqlx::raw_sql(&content).execute(pool).await?;
println!(" ✅ Applied: {}", migration_name);
Ok(())
}
async fn prefill_database_for_previous_state(pool: &PgPool) -> TestData {
let mut users = Vec::new();
let mut documents = Vec::new();
let mut failed_documents = Vec::new();
// Create test users
for i in 0..5 {
let user_id = Uuid::new_v4();
let username = format!("previous_state_user_{}", i);
let email = format!("previous_{}@test.com", i);
sqlx::query(
"INSERT INTO users (id, username, email, password_hash, role) VALUES ($1, $2, $3, $4, $5)"
)
.bind(user_id)
.bind(&username)
.bind(&email)
.bind("test_hash")
.bind("user")
.execute(pool)
.await
.expect("Failed to create test user");
users.push(TestUser { id: user_id, username, email });
}
// Create test documents for each user
for user in &users {
for j in 0..3 {
let doc_id = Uuid::new_v4();
let filename = format!("previous_doc_{}_{}.pdf", user.username, j);
let status = if j == 0 { "completed" } else { "failed" };
// Check if documents table exists before inserting
let table_exists = sqlx::query(
"SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'documents')"
)
.fetch_one(pool)
.await
.unwrap()
.get::<bool, _>(0);
if table_exists {
// Check if original_filename column exists
let original_filename_exists = sqlx::query_scalar::<_, bool>(
"SELECT EXISTS (SELECT 1 FROM information_schema.columns
WHERE table_name = 'documents' AND column_name = 'original_filename')"
)
.fetch_one(pool)
.await
.unwrap_or(false);
if original_filename_exists {
sqlx::query(
"INSERT INTO documents (id, user_id, filename, original_filename, file_path, file_size, mime_type, ocr_status)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)"
)
.bind(doc_id)
.bind(user.id)
.bind(&filename)
.bind(&filename) // Use same filename for original_filename
.bind(format!("/test/{}", filename))
.bind(1024_i64)
.bind("application/pdf")
.bind(status)
.execute(pool)
.await
.expect("Failed to create test document");
} else {
sqlx::query(
"INSERT INTO documents (id, user_id, filename, file_path, file_size, mime_type, ocr_status)
VALUES ($1, $2, $3, $4, $5, $6, $7)"
)
.bind(doc_id)
.bind(user.id)
.bind(&filename)
.bind(format!("/test/{}", filename))
.bind(1024_i64)
.bind("application/pdf")
.bind(status)
.execute(pool)
.await
.expect("Failed to create test document");
}
}
documents.push(TestDocument {
id: doc_id,
user_id: user.id,
filename,
status: status.to_string(),
});
}
}
// Create failed documents if the table exists
let failed_docs_exists = sqlx::query(
"SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'failed_documents')"
)
.fetch_one(pool)
.await
.unwrap()
.get::<bool, _>(0);
if failed_docs_exists {
for user in &users {
let failed_id = Uuid::new_v4();
let filename = format!("failed_previous_{}.pdf", user.username);
sqlx::query(
"INSERT INTO failed_documents (id, user_id, filename, failure_reason, failure_stage, ingestion_source)
VALUES ($1, $2, $3, $4, $5, $6)"
)
.bind(failed_id)
.bind(user.id)
.bind(&filename)
.bind("other")
.bind("ocr")
.bind("test")
.execute(pool)
.await
.expect("Failed to create test failed document");
failed_documents.push(TestFailedDocument {
id: failed_id,
user_id: user.id,
filename,
reason: "other".to_string(),
});
}
}
let total_records = users.len() + documents.len() + failed_documents.len();
TestData {
users,
documents,
failed_documents,
metadata: DatabaseMetadata {
table_count: get_table_count(pool).await,
total_records,
schema_version: "previous".to_string(),
},
}
}
async fn create_edge_case_data(pool: &PgPool) -> TestData {
let mut users = Vec::new();
let mut documents = Vec::new();
let mut failed_documents = Vec::new();
// Create edge case users
let long_string = "a".repeat(50);
let edge_cases = vec![
("edge_empty_", ""),
("edge_special_", "user@domain.com"),
("edge_unicode_", "test_ñäme@tëst.com"),
("edge_long_", long_string.as_str()),
];
for (prefix, suffix) in edge_cases {
let user_id = Uuid::new_v4();
let username = format!("{}{}", prefix, user_id.to_string().split('-').next().unwrap());
let email = if suffix.is_empty() {
format!("{}@test.com", username)
} else {
suffix.to_string()
};
sqlx::query(
"INSERT INTO users (id, username, email, password_hash, role) VALUES ($1, $2, $3, $4, $5)"
)
.bind(user_id)
.bind(&username)
.bind(&email)
.bind("test_hash")
.bind("user")
.execute(pool)
.await
.expect("Failed to create edge case user");
users.push(TestUser { id: user_id, username, email });
}
let total_records = users.len();
TestData {
users,
documents,
failed_documents,
metadata: DatabaseMetadata {
table_count: get_table_count(pool).await,
total_records,
schema_version: "edge_case".to_string(),
},
}
}
async fn validate_latest_migration_success(pool: &PgPool, test_data: &TestData, migration_file: &str) {
let migration_name = extract_migration_name(migration_file);
// Verify that our test data still exists
let user_count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM users")
.fetch_one(pool)
.await
.unwrap();
assert!(
user_count >= test_data.users.len() as i64,
"User data lost after migration {}",
migration_name
);
// Check that the migration was applied successfully by verifying the schema
let current_table_count = get_table_count(pool).await;
println!(" 📊 Validation results:");
println!(" - Users preserved: {} / {}", user_count, test_data.users.len());
println!(" - Tables before: {}", test_data.metadata.table_count);
println!(" - Tables after: {}", current_table_count);
// Test basic database operations still work
test_basic_database_operations(pool).await;
}
async fn validate_edge_case_migration(pool: &PgPool, test_data: &TestData) {
// Verify edge case data survived migration
for user in &test_data.users {
let user_exists = sqlx::query(
"SELECT 1 FROM users WHERE id = $1"
)
.bind(user.id)
.fetch_optional(pool)
.await
.unwrap();
assert!(
user_exists.is_some(),
"Edge case user {} lost during migration",
user.username
);
}
println!(" ✅ All edge case data preserved");
}
async fn capture_schema_snapshot(pool: &PgPool) -> SchemaSnapshot {
// Get all tables
let tables = sqlx::query(
"SELECT table_name FROM information_schema.tables
WHERE table_schema = 'public' AND table_type = 'BASE TABLE'"
)
.fetch_all(pool)
.await
.unwrap();
let table_names: Vec<String> = tables.iter()
.map(|row| row.get("table_name"))
.collect();
// Get columns for each table
let mut columns = std::collections::HashMap::new();
for table in &table_names {
let table_columns = sqlx::query(
"SELECT column_name FROM information_schema.columns
WHERE table_schema = 'public' AND table_name = $1"
)
.bind(table)
.fetch_all(pool)
.await
.unwrap();
let column_names: Vec<String> = table_columns.iter()
.map(|row| row.get("column_name"))
.collect();
columns.insert(table.clone(), column_names);
}
// Get constraints
let constraints = sqlx::query(
"SELECT constraint_name FROM information_schema.table_constraints
WHERE table_schema = 'public'"
)
.fetch_all(pool)
.await
.unwrap();
let constraint_names: Vec<String> = constraints.iter()
.map(|row| row.get("constraint_name"))
.collect();
SchemaSnapshot {
tables: table_names,
columns,
constraints: constraint_names,
}
}
fn validate_schema_changes(before: &SchemaSnapshot, after: &SchemaSnapshot, migration_file: &str) {
let migration_name = extract_migration_name(migration_file);
// Check for new tables
let new_tables: Vec<_> = after.tables.iter()
.filter(|table| !before.tables.contains(table))
.collect();
if !new_tables.is_empty() {
println!(" 📋 New tables added by {}: {:?}", migration_name, new_tables);
}
// Check for removed tables (should be rare and carefully considered)
let removed_tables: Vec<_> = before.tables.iter()
.filter(|table| !after.tables.contains(table))
.collect();
if !removed_tables.is_empty() {
println!(" ⚠️ Tables removed by {}: {:?}", migration_name, removed_tables);
// Note: In production, you might want to assert this is intentional
}
// Check for new constraints
let new_constraints: Vec<_> = after.constraints.iter()
.filter(|constraint| !before.constraints.contains(constraint))
.collect();
if !new_constraints.is_empty() {
println!(" 🔒 New constraints added: {}", new_constraints.len());
}
println!(" ✅ Schema changes validated");
}
async fn test_basic_database_operations(pool: &PgPool) {
// Test that we can still perform basic operations
// Test user creation
let test_user_id = Uuid::new_v4();
let result = sqlx::query(
"INSERT INTO users (id, username, email, password_hash, role)
VALUES ($1, $2, $3, $4, $5)"
)
.bind(test_user_id)
.bind("operation_test_user")
.bind("operation_test@test.com")
.bind("test_hash")
.bind("user")
.execute(pool)
.await;
assert!(result.is_ok(), "Basic user creation should still work");
// Clean up
sqlx::query("DELETE FROM users WHERE id = $1")
.bind(test_user_id)
.execute(pool)
.await
.unwrap();
println!(" ✅ Basic database operations verified");
}
async fn create_performance_test_data(pool: &PgPool, user_count: usize) {
println!(" 📊 Creating {} users for performance testing...", user_count);
for i in 0..user_count {
let user_id = Uuid::new_v4();
let username = format!("perf_user_{}", i);
let email = format!("perf_{}@test.com", i);
sqlx::query(
"INSERT INTO users (id, username, email, password_hash, role) VALUES ($1, $2, $3, $4, $5)"
)
.bind(user_id)
.bind(&username)
.bind(&email)
.bind("test_hash")
.bind("user")
.execute(pool)
.await
.expect("Failed to create performance test user");
}
println!(" ✅ Performance test data created");
}
async fn verify_data_integrity_after_performance_test(pool: &PgPool) {
let user_count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM users")
.fetch_one(pool)
.await
.unwrap();
assert!(user_count > 0, "Performance test data should exist after migration");
println!(" ✅ Data integrity verified: {} users", user_count);
}
async fn get_table_count(pool: &PgPool) -> usize {
let tables = sqlx::query(
"SELECT COUNT(*) as count FROM information_schema.tables
WHERE table_schema = 'public' AND table_type = 'BASE TABLE'"
)
.fetch_one(pool)
.await
.unwrap();
tables.get::<i64, _>("count") as usize
}
}

View File

@ -0,0 +1,364 @@
use sqlx::PgPool;
use std::path::Path;
use std::fs;
#[cfg(test)]
mod migration_ordering_tests {
use super::*;
#[test]
fn test_migration_files_have_unique_timestamps() {
let migration_files = get_migration_files();
let mut timestamps = Vec::new();
for file in &migration_files {
let timestamp = extract_timestamp(&file);
assert!(
!timestamps.contains(&timestamp),
"Duplicate migration timestamp found: {} in file {}",
timestamp, file
);
timestamps.push(timestamp);
}
println!("✅ All migration files have unique timestamps");
}
#[test]
fn test_migration_files_are_chronologically_ordered() {
let migration_files = get_migration_files();
let mut timestamps: Vec<u64> = migration_files.iter()
.map(|f| extract_timestamp(f).parse::<u64>().unwrap())
.collect();
let mut sorted_timestamps = timestamps.clone();
sorted_timestamps.sort();
assert_eq!(
timestamps, sorted_timestamps,
"Migration files are not in chronological order"
);
println!("✅ Migration files are chronologically ordered");
}
#[test]
fn test_migration_naming_convention() {
let migration_files = get_migration_files();
for file in &migration_files {
let filename = Path::new(&file).file_name().unwrap().to_str().unwrap();
// Check format: TIMESTAMP_description.sql
assert!(
filename.ends_with(".sql"),
"Migration file {} doesn't end with .sql",
filename
);
let parts: Vec<&str> = filename.split('_').collect();
assert!(
parts.len() >= 2,
"Migration file {} doesn't follow TIMESTAMP_description format",
filename
);
// Check timestamp format (should be 14-17 digits)
let timestamp = parts[0];
assert!(
timestamp.len() >= 14 && timestamp.len() <= 17,
"Migration timestamp {} has invalid length in file {}",
timestamp, filename
);
assert!(
timestamp.chars().all(|c| c.is_numeric()),
"Migration timestamp {} contains non-numeric characters in file {}",
timestamp, filename
);
// Check description
let description_parts = &parts[1..];
let description = description_parts.join("_");
let description_without_ext = description.trim_end_matches(".sql");
assert!(
!description_without_ext.is_empty(),
"Migration file {} has empty description",
filename
);
assert!(
description_without_ext.chars().all(|c| c.is_alphanumeric() || c == '_'),
"Migration description contains invalid characters in file {}",
filename
);
}
println!("✅ All migration files follow naming convention");
}
#[test]
fn test_migration_dependencies() {
let migration_files = get_migration_files();
let migration_contents = read_all_migrations();
// Check for common dependency patterns
for (i, (file, content)) in migration_contents.iter().enumerate() {
// Check if migration references tables that should exist
let referenced_tables = extract_referenced_tables(&content);
for table in &referenced_tables {
// Skip system tables
if table.starts_with("pg_") || table.starts_with("information_schema") {
continue;
}
// Check if table is created in current or previous migrations
let table_exists = table_exists_before_migration(&migration_contents, i, table);
// Special cases for tables that might be created in the same migration
let creates_table = content.to_lowercase().contains(&format!("create table {}", table.to_lowercase())) ||
content.to_lowercase().contains(&format!("create table if not exists {}", table.to_lowercase()));
if !creates_table && !table_exists {
println!("Warning: Migration {} references table '{}' that may not exist", file, table);
}
}
}
println!("✅ Migration dependencies checked");
}
#[test]
fn test_no_drop_statements_in_migrations() {
let migration_contents = read_all_migrations();
for (file, content) in &migration_contents {
let lowercase_content = content.to_lowercase();
// Check for dangerous DROP statements
assert!(
!lowercase_content.contains("drop table") || lowercase_content.contains("drop table if exists"),
"Migration {} contains DROP TABLE statement without IF EXISTS",
file
);
assert!(
!lowercase_content.contains("drop database"),
"Migration {} contains dangerous DROP DATABASE statement",
file
);
assert!(
!lowercase_content.contains("drop schema"),
"Migration {} contains DROP SCHEMA statement",
file
);
}
println!("✅ No dangerous DROP statements found");
}
#[test]
fn test_migration_transactions() {
let migration_contents = read_all_migrations();
for (file, content) in &migration_contents {
let lowercase_content = content.to_lowercase();
// Check that migrations don't contain explicit transaction statements
// (SQLx handles transactions automatically)
assert!(
!lowercase_content.contains("begin;") && !lowercase_content.contains("begin transaction"),
"Migration {} contains explicit BEGIN statement",
file
);
assert!(
!lowercase_content.contains("commit;"),
"Migration {} contains explicit COMMIT statement",
file
);
assert!(
!lowercase_content.contains("rollback;"),
"Migration {} contains explicit ROLLBACK statement",
file
);
}
println!("✅ Migrations don't contain explicit transaction statements");
}
#[tokio::test]
async fn test_migration_idempotency() {
// This test would be run in CI to ensure migrations can be run multiple times
// We'll create a simple check here
let migration_contents = read_all_migrations();
for (file, content) in &migration_contents {
// Check for CREATE statements with IF NOT EXISTS
if content.to_lowercase().contains("create table") {
let has_if_not_exists = content.to_lowercase().contains("create table if not exists");
if !has_if_not_exists {
println!("Warning: Migration {} creates table without IF NOT EXISTS", file);
}
}
if content.to_lowercase().contains("create index") {
let has_if_not_exists = content.to_lowercase().contains("create index if not exists");
if !has_if_not_exists {
println!("Warning: Migration {} creates index without IF NOT EXISTS", file);
}
}
}
println!("✅ Migration idempotency patterns checked");
}
#[test]
fn test_migration_comments() {
let migration_contents = read_all_migrations();
let mut undocumented_migrations = Vec::new();
for (file, content) in &migration_contents {
// Check if migration has comments explaining what it does
let has_comments = content.contains("--") || content.contains("/*");
if !has_comments {
undocumented_migrations.push(file.clone());
}
// Check for specific important migrations that should have detailed comments
if file.contains("failed_documents") {
assert!(
content.contains("--") && content.len() > 200,
"Migration {} dealing with failed_documents should have detailed comments",
file
);
}
}
if !undocumented_migrations.is_empty() {
println!("Warning: The following migrations lack comments: {:?}", undocumented_migrations);
}
println!("✅ Migration documentation checked");
}
#[test]
fn test_migration_file_consistency() {
let migration_files = get_migration_files();
for file in &migration_files {
let content = fs::read_to_string(&file).unwrap();
// Check for consistent line endings
assert!(
!content.contains("\r\n") || !content.contains("\n"),
"Migration {} has mixed line endings",
file
);
// Check for trailing whitespace (optional check, can be disabled)
for (line_num, line) in content.lines().enumerate() {
if line.ends_with(' ') || line.ends_with('\t') {
println!("Note: Migration {} has trailing whitespace on line {} (style preference)", file, line_num + 1);
}
}
// Check file ends with newline (optional check, can be disabled)
if !content.ends_with('\n') {
println!("Note: Migration {} doesn't end with newline (style preference)", file);
}
}
println!("✅ Migration file consistency verified");
}
// Helper functions
fn get_migration_files() -> Vec<String> {
let migrations_dir = Path::new("migrations");
let mut files = Vec::new();
if let Ok(entries) = fs::read_dir(migrations_dir) {
for entry in entries {
if let Ok(entry) = entry {
let path = entry.path();
if path.extension().and_then(|s| s.to_str()) == Some("sql") {
files.push(path.to_string_lossy().to_string());
}
}
}
}
files.sort();
files
}
fn extract_timestamp(filepath: &str) -> String {
let filename = Path::new(filepath).file_name().unwrap().to_str().unwrap();
filename.split('_').next().unwrap().to_string()
}
fn read_all_migrations() -> Vec<(String, String)> {
let migration_files = get_migration_files();
let mut contents = Vec::new();
for file in migration_files {
if let Ok(content) = fs::read_to_string(&file) {
contents.push((file, content));
}
}
contents
}
fn extract_referenced_tables(content: &str) -> Vec<String> {
let mut tables = Vec::new();
// Simple regex-like patterns to find table references
let patterns = vec![
"references ", "from ", "join ", "into ", "update ", "delete from ",
"alter table ", "constraint.*references", "on delete", "on update"
];
for line in content.lines() {
let lower_line = line.to_lowercase();
for pattern in &patterns {
if lower_line.contains(pattern) {
// Extract table name (simplified - real implementation would use regex)
let parts: Vec<&str> = lower_line.split_whitespace().collect();
for (i, part) in parts.iter().enumerate() {
if part == &pattern.trim() && i + 1 < parts.len() {
let table_name = parts[i + 1].trim_matches(|c: char| !c.is_alphanumeric() && c != '_');
if !table_name.is_empty() && !table_name.starts_with("$") {
tables.push(table_name.to_string());
}
}
}
}
}
}
tables.sort();
tables.dedup();
tables
}
fn table_exists_before_migration(migrations: &[(String, String)], current_index: usize, table_name: &str) -> bool {
for i in 0..current_index {
let (_, content) = &migrations[i];
if content.to_lowercase().contains(&format!("create table {}", table_name.to_lowercase())) ||
content.to_lowercase().contains(&format!("create table if not exists {}", table_name.to_lowercase())) {
return true;
}
}
// Check for base tables that should always exist
let base_tables = vec!["users", "documents", "settings"];
base_tables.contains(&table_name)
}
}

View File

@ -0,0 +1,521 @@
use readur::test_utils::TestContext;
use sqlx::{PgPool, Row};
use std::collections::{HashMap, HashSet};
#[cfg(test)]
mod migration_schema_validation_tests {
use super::*;
#[tokio::test]
async fn test_all_expected_tables_exist() {
let ctx = TestContext::new().await;
let pool = ctx.state.db.get_pool();
let expected_tables = vec![
"users",
"documents",
"document_labels",
"failed_documents",
"ignored_files",
"labels",
"notifications",
"ocr_metrics",
"ocr_queue",
"ocr_retry_history",
"processed_images",
"settings",
"source_labels",
"sources",
"webdav_directories",
"webdav_files",
"webdav_sync_state",
"_sqlx_migrations",
];
let existing_tables = get_all_tables(pool).await;
for table in expected_tables {
assert!(
existing_tables.contains(table),
"Expected table '{}' not found in database schema",
table
);
}
println!("✅ All expected tables exist");
}
#[tokio::test]
async fn test_table_columns_and_types() {
let ctx = TestContext::new().await;
let pool = ctx.state.db.get_pool();
// Define expected columns for critical tables
let table_schemas = vec![
TableSchema {
name: "documents",
columns: vec![
("id", "uuid", false),
("user_id", "uuid", false),
("filename", "text", false),
("original_filename", "text", true),
("file_path", "text", false),
("file_size", "bigint", false),
("file_hash", "character varying", true),
("mime_type", "text", false),
("content", "text", true),
("tags", "ARRAY", true),
("ocr_text", "text", true),
("ocr_status", "character varying", false),
("ocr_confidence", "real", true),
("ocr_failure_reason", "text", true),
("created_at", "timestamp with time zone", false),
("updated_at", "timestamp with time zone", false),
],
},
TableSchema {
name: "failed_documents",
columns: vec![
("id", "uuid", false),
("user_id", "uuid", true),
("filename", "text", false),
("failure_reason", "text", false),
("failure_stage", "text", false),
("ingestion_source", "text", false),
("error_message", "text", true),
("retry_count", "integer", true),
("created_at", "timestamp with time zone", true),
("updated_at", "timestamp with time zone", true),
],
},
TableSchema {
name: "ocr_queue",
columns: vec![
("id", "uuid", false),
("document_id", "uuid", false),
("priority", "integer", false),
("status", "character varying", false),
("error_message", "text", true),
("processing_started_at", "timestamp with time zone", true),
("processing_completed_at", "timestamp with time zone", true),
("created_at", "timestamp with time zone", false),
("updated_at", "timestamp with time zone", false),
],
},
];
for schema in table_schemas {
validate_table_schema(pool, &schema).await;
}
}
#[tokio::test]
async fn test_all_constraints_exist() {
let ctx = TestContext::new().await;
let pool = ctx.state.db.get_pool();
// Test primary keys
let primary_keys = vec![
("documents", "documents_pkey"),
("users", "users_pkey"),
("failed_documents", "failed_documents_pkey"),
("ocr_queue", "ocr_queue_pkey"),
("labels", "labels_pkey"),
("settings", "settings_pkey"),
];
for (table, constraint) in primary_keys {
let exists = constraint_exists(pool, table, constraint, "PRIMARY KEY").await;
assert!(exists, "Primary key '{}' not found on table '{}'", constraint, table);
}
// Test foreign keys
let foreign_keys = vec![
("documents", "documents_user_id_fkey"),
("failed_documents", "failed_documents_user_id_fkey"),
("failed_documents", "failed_documents_existing_document_id_fkey"),
("ocr_queue", "ocr_queue_document_id_fkey"),
("document_labels", "document_labels_document_id_fkey"),
("document_labels", "document_labels_label_id_fkey"),
];
for (table, constraint) in foreign_keys {
let exists = constraint_exists(pool, table, constraint, "FOREIGN KEY").await;
assert!(exists, "Foreign key '{}' not found on table '{}'", constraint, table);
}
// Test check constraints
let check_constraints = vec![
("failed_documents", "check_failure_reason"),
("failed_documents", "check_failure_stage"),
("documents", "check_ocr_status"),
("users", "check_role"),
];
for (table, constraint) in check_constraints {
let exists = constraint_exists(pool, table, constraint, "CHECK").await;
assert!(exists, "Check constraint '{}' not found on table '{}'", constraint, table);
}
println!("✅ All expected constraints exist");
}
#[tokio::test]
async fn test_indexes_for_performance() {
let ctx = TestContext::new().await;
let pool = ctx.state.db.get_pool();
let expected_indexes = vec![
("documents", "idx_documents_user_id"),
("documents", "idx_documents_created_at"),
("documents", "idx_documents_ocr_status"),
("failed_documents", "idx_failed_documents_user_id"),
("failed_documents", "idx_failed_documents_created_at"),
("failed_documents", "idx_failed_documents_failure_reason"),
("failed_documents", "idx_failed_documents_failure_stage"),
("ocr_queue", "idx_ocr_queue_status"),
("ocr_queue", "idx_ocr_queue_document_id"),
];
for (table, index) in expected_indexes {
let exists = index_exists(pool, table, index).await;
assert!(exists, "Performance index '{}' not found on table '{}'", index, table);
}
println!("✅ All performance indexes exist");
}
#[tokio::test]
async fn test_views_and_functions() {
let ctx = TestContext::new().await;
let pool = ctx.state.db.get_pool();
// Test views
let expected_views = vec![
"failed_documents_summary",
"legacy_failed_ocr_documents",
"ocr_analytics",
];
let existing_views = get_all_views(pool).await;
for view in expected_views {
assert!(
existing_views.contains(view),
"Expected view '{}' not found in database",
view
);
}
// Test functions
let expected_functions = vec![
"add_document_to_ocr_queue",
"get_ocr_queue_stats",
];
let existing_functions = get_all_functions(pool).await;
for func in expected_functions {
assert!(
existing_functions.contains(func),
"Expected function '{}' not found in database",
func
);
}
println!("✅ All views and functions exist");
}
#[tokio::test]
async fn test_enum_values_match_constraints() {
let ctx = TestContext::new().await;
let pool = ctx.state.db.get_pool();
// Test failure_reason enum values
let failure_reasons = vec![
"duplicate_content", "duplicate_filename", "unsupported_format",
"file_too_large", "file_corrupted", "access_denied",
"low_ocr_confidence", "ocr_timeout", "ocr_memory_limit",
"pdf_parsing_error", "storage_quota_exceeded", "network_error",
"permission_denied", "virus_detected", "invalid_structure",
"policy_violation", "other"
];
for reason in &failure_reasons {
let result = sqlx::query(
"SELECT 1 WHERE $1::text IN (SELECT unnest(enum_range(NULL::text)::text[]))"
)
.bind(reason)
.fetch_optional(pool)
.await;
// If this is not an enum type, test the CHECK constraint instead
if result.is_err() || result.unwrap().is_none() {
// Test by attempting insert with valid value (should succeed)
// We'll use a transaction that we rollback to avoid polluting test data
let mut tx = pool.begin().await.unwrap();
// First create a test user
let test_user_id = uuid::Uuid::new_v4();
sqlx::query(
"INSERT INTO users (id, username, email, password_hash, role) VALUES ($1, $2, $3, $4, $5)"
)
.bind(test_user_id)
.bind(format!("enum_test_{}", uuid::Uuid::new_v4()))
.bind(format!("enum_test_{}@test.com", uuid::Uuid::new_v4()))
.bind("test")
.bind("user")
.execute(&mut *tx)
.await
.unwrap();
let insert_result = sqlx::query(
"INSERT INTO failed_documents (user_id, filename, failure_reason, failure_stage, ingestion_source)
VALUES ($1, 'test.pdf', $2, 'ocr', 'test')"
)
.bind(test_user_id)
.bind(reason)
.execute(&mut *tx)
.await;
assert!(insert_result.is_ok(),
"Valid failure_reason '{}' should be accepted by constraint", reason);
tx.rollback().await.unwrap();
}
}
// Test failure_stage enum values
let failure_stages = vec![
"ingestion", "validation", "ocr", "storage", "processing", "sync"
];
for stage in &failure_stages {
let mut tx = pool.begin().await.unwrap();
// Create test user
let test_user_id = uuid::Uuid::new_v4();
sqlx::query(
"INSERT INTO users (id, username, email, password_hash, role) VALUES ($1, $2, $3, $4, $5)"
)
.bind(test_user_id)
.bind(format!("stage_test_{}", uuid::Uuid::new_v4()))
.bind(format!("stage_test_{}@test.com", uuid::Uuid::new_v4()))
.bind("test")
.bind("user")
.execute(&mut *tx)
.await
.unwrap();
let insert_result = sqlx::query(
"INSERT INTO failed_documents (user_id, filename, failure_reason, failure_stage, ingestion_source)
VALUES ($1, 'test.pdf', 'other', $2, 'test')"
)
.bind(test_user_id)
.bind(stage)
.execute(&mut *tx)
.await;
assert!(insert_result.is_ok(),
"Valid failure_stage '{}' should be accepted by constraint", stage);
tx.rollback().await.unwrap();
}
println!("✅ All enum values match constraints");
}
#[tokio::test]
async fn test_migration_specific_changes() {
let ctx = TestContext::new().await;
let pool = ctx.state.db.get_pool();
// Test that failed_documents table has all columns from migration
let failed_docs_columns = get_table_columns(pool, "failed_documents").await;
let migration_columns = vec![
"id", "user_id", "filename", "original_filename", "original_path",
"file_path", "file_size", "file_hash", "mime_type", "content", "tags",
"ocr_text", "ocr_confidence", "ocr_word_count", "ocr_processing_time_ms",
"failure_reason", "failure_stage", "existing_document_id",
"ingestion_source", "error_message", "retry_count", "last_retry_at",
"created_at", "updated_at"
];
for col in migration_columns {
assert!(
failed_docs_columns.contains(&col.to_string()),
"Column '{}' not found in failed_documents table",
col
);
}
// Test that documents table has ocr_failure_reason column
let docs_columns = get_table_columns(pool, "documents").await;
assert!(
docs_columns.contains(&"ocr_failure_reason".to_string()),
"ocr_failure_reason column not found in documents table"
);
// Test that the legacy view exists
let views = get_all_views(pool).await;
assert!(
views.contains("legacy_failed_ocr_documents"),
"legacy_failed_ocr_documents view not found"
);
println!("✅ Migration-specific changes verified");
}
// Helper functions
struct TableSchema {
name: &'static str,
columns: Vec<(&'static str, &'static str, bool)>, // (name, type, nullable)
}
async fn get_all_tables(pool: &PgPool) -> HashSet<String> {
let rows = sqlx::query(
"SELECT table_name FROM information_schema.tables
WHERE table_schema = 'public' AND table_type = 'BASE TABLE'"
)
.fetch_all(pool)
.await
.unwrap();
rows.into_iter()
.map(|row| row.get("table_name"))
.collect()
}
async fn get_all_views(pool: &PgPool) -> HashSet<String> {
let rows = sqlx::query(
"SELECT table_name FROM information_schema.views WHERE table_schema = 'public'"
)
.fetch_all(pool)
.await
.unwrap();
rows.into_iter()
.map(|row| row.get("table_name"))
.collect()
}
async fn get_all_functions(pool: &PgPool) -> HashSet<String> {
let rows = sqlx::query(
"SELECT routine_name FROM information_schema.routines
WHERE routine_schema = 'public' AND routine_type = 'FUNCTION'"
)
.fetch_all(pool)
.await
.unwrap();
rows.into_iter()
.map(|row| row.get("routine_name"))
.collect()
}
async fn get_table_columns(pool: &PgPool, table_name: &str) -> Vec<String> {
let rows = sqlx::query(
"SELECT column_name FROM information_schema.columns
WHERE table_schema = 'public' AND table_name = $1"
)
.bind(table_name)
.fetch_all(pool)
.await
.unwrap();
rows.into_iter()
.map(|row| row.get("column_name"))
.collect()
}
async fn validate_table_schema(pool: &PgPool, schema: &TableSchema) {
let columns = sqlx::query(
"SELECT column_name, data_type, is_nullable
FROM information_schema.columns
WHERE table_schema = 'public' AND table_name = $1"
)
.bind(schema.name)
.fetch_all(pool)
.await
.unwrap();
let column_map: HashMap<String, (String, bool)> = columns.into_iter()
.map(|row| {
let name: String = row.get("column_name");
let data_type: String = row.get("data_type");
let is_nullable: String = row.get("is_nullable");
(name, (data_type, is_nullable == "YES"))
})
.collect();
for (col_name, expected_type, nullable) in &schema.columns {
let column_info = column_map.get(*col_name);
assert!(
column_info.is_some(),
"Column '{}' not found in table '{}'",
col_name, schema.name
);
let (actual_type, actual_nullable) = column_info.unwrap();
// Type checking (handle array types specially)
if expected_type == &"ARRAY" {
assert!(
actual_type.contains("ARRAY") || actual_type.contains("[]"),
"Column '{}' in table '{}' expected array type but got '{}'",
col_name, schema.name, actual_type
);
} else {
assert!(
actual_type.to_lowercase().contains(&expected_type.to_lowercase()),
"Column '{}' in table '{}' expected type '{}' but got '{}'",
col_name, schema.name, expected_type, actual_type
);
}
assert_eq!(
actual_nullable, nullable,
"Column '{}' in table '{}' nullable mismatch",
col_name, schema.name
);
}
println!("✅ Schema validated for table '{}'", schema.name);
}
async fn constraint_exists(pool: &PgPool, table: &str, constraint: &str, constraint_type: &str) -> bool {
let result = sqlx::query(
"SELECT 1 FROM information_schema.table_constraints
WHERE table_schema = 'public'
AND table_name = $1
AND constraint_name = $2
AND constraint_type = $3"
)
.bind(table)
.bind(constraint)
.bind(constraint_type)
.fetch_optional(pool)
.await
.unwrap();
result.is_some()
}
async fn index_exists(pool: &PgPool, table: &str, index: &str) -> bool {
let result = sqlx::query(
"SELECT 1 FROM pg_indexes
WHERE schemaname = 'public'
AND tablename = $1
AND indexname = $2"
)
.bind(table)
.bind(index)
.fetch_optional(pool)
.await
.unwrap();
result.is_some()
}
}

View File

@ -0,0 +1,795 @@
use readur::test_utils::TestContext;
use sqlx::{PgPool, Row};
use uuid::Uuid;
use std::process::Command;
#[cfg(test)]
mod pr_migration_validation_tests {
use super::*;
#[tokio::test]
async fn test_new_migration_with_prefilled_data() {
// Check if this PR introduces any new migrations
let new_migrations = get_new_migrations_in_pr();
if new_migrations.is_empty() {
println!("✅ No new migrations in this PR - skipping prefilled data test");
return;
}
println!("🔍 Found {} new migration(s) in this PR:", new_migrations.len());
for migration in &new_migrations {
println!(" - {}", migration);
}
// Run the comprehensive test with prefilled data
let ctx = TestContext::new().await;
let pool = ctx.state.db.get_pool();
// Step 1: Prefill database with comprehensive test data
let test_data = prefill_comprehensive_test_data(pool).await;
println!("✅ Prefilled database with {} test scenarios", test_data.scenarios.len());
// Step 2: Verify all migrations run successfully with prefilled data
verify_migrations_with_data(pool, &test_data).await;
// Step 3: Test specific migration scenarios if they involve data transformation
if migration_involves_data_transformation(&new_migrations) {
test_data_transformation_integrity(pool, &test_data).await;
}
// Step 4: Verify no data loss occurred
verify_no_data_loss(pool, &test_data).await;
println!("✅ All new migrations passed validation with prefilled data");
}
#[tokio::test]
async fn test_migration_rollback_safety() {
let new_migrations = get_new_migrations_in_pr();
if new_migrations.is_empty() {
println!("✅ No new migrations in this PR - skipping rollback safety test");
return;
}
let ctx = TestContext::new().await;
let pool = ctx.state.db.get_pool();
// Create snapshot of schema before migrations
let schema_before = capture_schema_snapshot(pool).await;
// Run migrations
let migration_result = sqlx::migrate!("./migrations").run(pool).await;
assert!(migration_result.is_ok(), "Migrations should succeed");
// Capture schema after migrations
let schema_after = capture_schema_snapshot(pool).await;
// Verify schema changes are intentional
verify_schema_changes(&schema_before, &schema_after, &new_migrations);
println!("✅ Migration rollback safety verified");
}
#[tokio::test]
async fn test_migration_performance_impact() {
let new_migrations = get_new_migrations_in_pr();
if new_migrations.is_empty() {
return;
}
let ctx = TestContext::new().await;
let pool = ctx.state.db.get_pool();
// Prefill with large dataset
create_performance_test_data(pool, 10000).await;
// Measure migration execution time
let start = std::time::Instant::now();
let result = sqlx::migrate!("./migrations").run(pool).await;
let duration = start.elapsed();
assert!(result.is_ok(), "Migrations should succeed");
assert!(
duration.as_secs() < 30,
"Migrations took too long: {:?}. Consider optimizing for large datasets.",
duration
);
println!("✅ Migration performance acceptable: {:?}", duration);
}
// Data structures for comprehensive testing
struct ComprehensiveTestData {
users: Vec<TestUser>,
documents: Vec<TestDocument>,
scenarios: Vec<TestScenario>,
total_records: usize,
}
struct TestUser {
id: Uuid,
username: String,
role: String,
}
struct TestDocument {
id: Uuid,
user_id: Uuid,
filename: String,
ocr_status: String,
failure_reason: Option<String>,
metadata: DocumentMetadata,
}
struct DocumentMetadata {
file_size: i64,
mime_type: String,
has_ocr_text: bool,
tags: Vec<String>,
}
struct TestScenario {
name: String,
description: String,
affected_tables: Vec<String>,
record_count: usize,
}
struct SchemaSnapshot {
tables: Vec<TableInfo>,
indexes: Vec<String>,
constraints: Vec<String>,
views: Vec<String>,
}
struct TableInfo {
name: String,
columns: Vec<ColumnInfo>,
row_count: i64,
}
struct ColumnInfo {
name: String,
data_type: String,
is_nullable: bool,
}
// Implementation functions
async fn prefill_comprehensive_test_data(pool: &PgPool) -> ComprehensiveTestData {
let mut users = Vec::new();
let mut documents = Vec::new();
let mut scenarios = Vec::new();
// Create diverse user types
let user_types = vec![
("admin", "admin"),
("regular", "user"),
("readonly", "user"),
];
for (user_type, role) in user_types {
let user = create_test_user_with_role(pool, user_type, role).await;
users.push(user);
}
// Create various document scenarios
let document_scenarios = vec![
// Successful documents
("success_high_conf.pdf", "completed", None, 0.95, true),
("success_medium_conf.pdf", "completed", None, 0.75, true),
("success_with_tags.pdf", "completed", None, 0.85, true),
// Failed documents with different reasons
("fail_low_confidence.pdf", "failed", Some("low_ocr_confidence"), 0.3, true),
("fail_timeout.pdf", "failed", Some("timeout"), 0.0, false),
("fail_memory.pdf", "failed", Some("memory_limit"), 0.0, false),
("fail_corrupted.pdf", "failed", Some("file_corrupted"), 0.0, false),
("fail_unsupported.xyz", "failed", Some("unsupported_format"), 0.0, false),
("fail_access_denied.pdf", "failed", Some("access_denied"), 0.0, false),
("fail_parsing.pdf", "failed", Some("pdf_parsing_error"), 0.0, false),
("fail_unknown.pdf", "failed", Some("unknown_error"), 0.0, false),
("fail_null_reason.pdf", "failed", None, 0.0, false),
// Pending documents
("pending_new.pdf", "pending", None, 0.0, false),
("pending_retry.pdf", "pending", None, 0.0, false),
// Edge cases
("edge_empty_file.pdf", "failed", Some("file_corrupted"), 0.0, false),
("edge_huge_file.pdf", "failed", Some("file_too_large"), 0.0, false),
("edge_special_chars_§.pdf", "completed", None, 0.9, true),
];
// Create documents for each user
for user in &users {
for (filename, status, failure_reason, confidence, has_text) in &document_scenarios {
let doc = create_test_document(
pool,
user.id,
filename,
status,
failure_reason.as_deref(),
*confidence,
*has_text
).await;
documents.push(doc);
}
}
// Create OCR queue entries for some documents
for doc in documents.iter().filter(|d| d.ocr_status == "pending" || d.ocr_status == "failed") {
create_ocr_queue_entry(pool, doc.id).await;
}
// Create scenarios description
scenarios.push(TestScenario {
name: "User Management".to_string(),
description: "Different user roles and permissions".to_string(),
affected_tables: vec!["users".to_string()],
record_count: users.len(),
});
scenarios.push(TestScenario {
name: "Document Processing".to_string(),
description: "Various document states and failure scenarios".to_string(),
affected_tables: vec!["documents".to_string(), "failed_documents".to_string()],
record_count: documents.len(),
});
scenarios.push(TestScenario {
name: "OCR Queue".to_string(),
description: "OCR processing queue with retries".to_string(),
affected_tables: vec!["ocr_queue".to_string()],
record_count: documents.iter().filter(|d| d.ocr_status != "completed").count(),
});
let total_records = users.len() + documents.len();
ComprehensiveTestData {
users,
documents,
scenarios,
total_records,
}
}
async fn create_test_user_with_role(pool: &PgPool, user_type: &str, role: &str) -> TestUser {
let id = Uuid::new_v4();
let username = format!("test_{}_{}", user_type, Uuid::new_v4().to_string().split('-').next().unwrap());
sqlx::query(
"INSERT INTO users (id, username, email, password_hash, role) VALUES ($1, $2, $3, $4, $5)"
)
.bind(id)
.bind(&username)
.bind(format!("{}@test.com", username))
.bind("test_hash")
.bind(role)
.execute(pool)
.await
.unwrap();
TestUser { id, username, role: role.to_string() }
}
async fn create_test_document(
pool: &PgPool,
user_id: Uuid,
filename: &str,
status: &str,
failure_reason: Option<&str>,
confidence: f32,
has_text: bool,
) -> TestDocument {
let id = Uuid::new_v4();
let file_size = match filename {
f if f.contains("huge") => 104857600, // 100MB
f if f.contains("empty") => 0,
_ => 1024 * (1 + (id.as_bytes()[0] as i64)), // Variable size
};
let mime_type = if filename.ends_with(".pdf") {
"application/pdf"
} else {
"application/octet-stream"
};
let tags = if filename.contains("tags") {
vec!["important", "reviewed", "2024"]
} else {
vec![]
};
let ocr_text = if has_text {
Some(format!("Sample OCR text for document {}", filename))
} else {
None
};
sqlx::query(
r#"
INSERT INTO documents (
id, user_id, filename, original_filename, file_path, file_size,
mime_type, ocr_status, ocr_failure_reason, ocr_confidence, ocr_text, tags
) VALUES (
$1, $2, $3, $3, $4, $5, $6, $7, $8, $9, $10, $11
)
"#
)
.bind(id)
.bind(user_id)
.bind(filename)
.bind(format!("/test/files/{}", filename))
.bind(file_size)
.bind(mime_type)
.bind(status)
.bind(failure_reason)
.bind(if confidence > 0.0 { Some(confidence) } else { None })
.bind(ocr_text)
.bind(&tags)
.execute(pool)
.await
.unwrap();
TestDocument {
id,
user_id,
filename: filename.to_string(),
ocr_status: status.to_string(),
failure_reason: failure_reason.map(|s| s.to_string()),
metadata: DocumentMetadata {
file_size,
mime_type: mime_type.to_string(),
has_ocr_text: has_text,
tags: tags.iter().map(|s| s.to_string()).collect(),
},
}
}
async fn create_ocr_queue_entry(pool: &PgPool, document_id: Uuid) {
sqlx::query(
"INSERT INTO ocr_queue (document_id, priority, status) VALUES ($1, $2, $3)"
)
.bind(document_id)
.bind(1)
.bind("pending")
.execute(pool)
.await
.unwrap();
}
async fn verify_migrations_with_data(pool: &PgPool, test_data: &ComprehensiveTestData) {
// Count records before any potential data migration
let doc_count_before: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM documents")
.fetch_one(pool)
.await
.unwrap();
let user_count_before: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM users")
.fetch_one(pool)
.await
.unwrap();
println!("📊 Database state before migration verification:");
println!(" - Users: {}", user_count_before);
println!(" - Documents: {}", doc_count_before);
// Verify failed document migration if applicable
let failed_docs: i64 = sqlx::query_scalar(
"SELECT COUNT(*) FROM documents WHERE ocr_status = 'failed'"
)
.fetch_one(pool)
.await
.unwrap();
if failed_docs > 0 {
println!(" - Failed documents to migrate: {}", failed_docs);
// Verify migration mapping works correctly
let mapping_test = sqlx::query(
r#"
SELECT
ocr_failure_reason,
COUNT(*) as count,
CASE
WHEN ocr_failure_reason = 'low_ocr_confidence' THEN 'low_ocr_confidence'
WHEN ocr_failure_reason = 'timeout' THEN 'ocr_timeout'
WHEN ocr_failure_reason = 'memory_limit' THEN 'ocr_memory_limit'
WHEN ocr_failure_reason = 'pdf_parsing_error' THEN 'pdf_parsing_error'
WHEN ocr_failure_reason = 'corrupted' OR ocr_failure_reason = 'file_corrupted' THEN 'file_corrupted'
WHEN ocr_failure_reason = 'unsupported_format' THEN 'unsupported_format'
WHEN ocr_failure_reason = 'access_denied' THEN 'access_denied'
ELSE 'other'
END as mapped_reason
FROM documents
WHERE ocr_status = 'failed'
GROUP BY ocr_failure_reason
"#
)
.fetch_all(pool)
.await
.unwrap();
println!(" - Failure reason mappings:");
for row in mapping_test {
let original: Option<String> = row.get("ocr_failure_reason");
let mapped: String = row.get("mapped_reason");
let count: i64 = row.get("count");
println!(" {:?} -> {} ({} documents)", original, mapped, count);
}
}
}
async fn test_data_transformation_integrity(pool: &PgPool, test_data: &ComprehensiveTestData) {
// Test that data transformations maintain integrity
println!("🔄 Testing data transformation integrity...");
// Check if failed_documents table exists (indicating migration ran)
let failed_docs_exists = sqlx::query(
"SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'failed_documents')"
)
.fetch_one(pool)
.await
.unwrap()
.get::<bool, _>(0);
if failed_docs_exists {
// Verify all failed documents were migrated correctly
let migrated_count: i64 = sqlx::query_scalar(
"SELECT COUNT(*) FROM failed_documents WHERE failure_stage = 'ocr'"
)
.fetch_one(pool)
.await
.unwrap();
let expected_failed = test_data.documents.iter()
.filter(|d| d.ocr_status == "failed")
.count();
assert!(
migrated_count >= expected_failed as i64,
"Not all failed documents were migrated: expected at least {}, got {}",
expected_failed, migrated_count
);
// Verify data integrity for specific test cases
for doc in test_data.documents.iter().filter(|d| d.ocr_status == "failed") {
let migrated = sqlx::query(
"SELECT * FROM failed_documents WHERE filename = $1"
)
.bind(&doc.filename)
.fetch_optional(pool)
.await
.unwrap();
assert!(
migrated.is_some(),
"Failed document '{}' was not migrated",
doc.filename
);
if let Some(row) = migrated {
let failure_reason: String = row.get("failure_reason");
// Verify reason mapping
match doc.failure_reason.as_deref() {
Some("timeout") => assert_eq!(failure_reason, "ocr_timeout"),
Some("memory_limit") => assert_eq!(failure_reason, "ocr_memory_limit"),
Some("file_corrupted") => assert_eq!(failure_reason, "file_corrupted"),
Some("low_ocr_confidence") => assert_eq!(failure_reason, "low_ocr_confidence"),
Some("unknown_error") | None => assert_eq!(failure_reason, "other"),
_ => {}
}
}
}
}
println!("✅ Data transformation integrity verified");
}
async fn verify_no_data_loss(pool: &PgPool, test_data: &ComprehensiveTestData) {
println!("🔍 Verifying no data loss occurred...");
// Check user count
let user_count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM users")
.fetch_one(pool)
.await
.unwrap();
assert!(
user_count >= test_data.users.len() as i64,
"User data loss detected: expected at least {}, got {}",
test_data.users.len(), user_count
);
// Check total document count (including migrated)
let doc_count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM documents")
.fetch_one(pool)
.await
.unwrap();
let failed_doc_count: i64 = sqlx::query_scalar(
"SELECT COUNT(*) FROM failed_documents WHERE ingestion_source IS NOT NULL"
)
.fetch_one(pool)
.await
.unwrap_or(0);
let total_docs = doc_count + failed_doc_count;
let expected_docs = test_data.documents.len() as i64;
assert!(
total_docs >= expected_docs,
"Document data loss detected: expected at least {}, got {} (documents: {}, failed_documents: {})",
expected_docs, total_docs, doc_count, failed_doc_count
);
println!("✅ No data loss detected");
}
async fn capture_schema_snapshot(pool: &PgPool) -> SchemaSnapshot {
let tables = sqlx::query(
r#"
SELECT
t.table_name,
COUNT(c.column_name) as column_count
FROM information_schema.tables t
LEFT JOIN information_schema.columns c
ON t.table_name = c.table_name
AND t.table_schema = c.table_schema
WHERE t.table_schema = 'public'
AND t.table_type = 'BASE TABLE'
GROUP BY t.table_name
ORDER BY t.table_name
"#
)
.fetch_all(pool)
.await
.unwrap();
let mut table_infos = Vec::new();
for table_row in tables {
let table_name: String = table_row.get("table_name");
// Get columns for this table
let columns = sqlx::query(
r#"
SELECT column_name, data_type, is_nullable
FROM information_schema.columns
WHERE table_schema = 'public' AND table_name = $1
ORDER BY ordinal_position
"#
)
.bind(&table_name)
.fetch_all(pool)
.await
.unwrap();
let column_infos: Vec<ColumnInfo> = columns.into_iter()
.map(|col| ColumnInfo {
name: col.get("column_name"),
data_type: col.get("data_type"),
is_nullable: col.get::<String, _>("is_nullable") == "YES",
})
.collect();
// Get row count
let count_query = format!("SELECT COUNT(*) FROM {}", table_name);
let row_count: i64 = sqlx::query_scalar(&count_query)
.fetch_one(pool)
.await
.unwrap_or(0);
table_infos.push(TableInfo {
name: table_name,
columns: column_infos,
row_count,
});
}
// Get indexes
let indexes = sqlx::query(
"SELECT indexname FROM pg_indexes WHERE schemaname = 'public'"
)
.fetch_all(pool)
.await
.unwrap()
.into_iter()
.map(|row| row.get("indexname"))
.collect();
// Get constraints
let constraints = sqlx::query(
r#"
SELECT constraint_name || ' (' || constraint_type || ')' as constraint_info
FROM information_schema.table_constraints
WHERE constraint_schema = 'public'
"#
)
.fetch_all(pool)
.await
.unwrap()
.into_iter()
.map(|row| row.get("constraint_info"))
.collect();
// Get views
let views = sqlx::query(
"SELECT table_name FROM information_schema.views WHERE table_schema = 'public'"
)
.fetch_all(pool)
.await
.unwrap()
.into_iter()
.map(|row| row.get("table_name"))
.collect();
SchemaSnapshot {
tables: table_infos,
indexes,
constraints,
views,
}
}
fn verify_schema_changes(before: &SchemaSnapshot, after: &SchemaSnapshot, migrations: &[String]) {
println!("📋 Verifying schema changes...");
// Check for new tables
let before_tables: std::collections::HashSet<_> = before.tables.iter().map(|t| &t.name).collect();
let after_tables: std::collections::HashSet<_> = after.tables.iter().map(|t| &t.name).collect();
let new_tables: Vec<_> = after_tables.difference(&before_tables).collect();
if !new_tables.is_empty() {
println!(" New tables added: {:?}", new_tables);
}
// Check for removed tables (should not happen in migrations)
let removed_tables: Vec<_> = before_tables.difference(&after_tables).collect();
assert!(
removed_tables.is_empty(),
"Tables were removed in migration: {:?}",
removed_tables
);
// Check for column changes
for after_table in &after.tables {
if let Some(before_table) = before.tables.iter().find(|t| t.name == after_table.name) {
let before_cols: std::collections::HashSet<_> = before_table.columns.iter().map(|c| &c.name).collect();
let after_cols: std::collections::HashSet<_> = after_table.columns.iter().map(|c| &c.name).collect();
let new_cols: Vec<_> = after_cols.difference(&before_cols).collect();
if !new_cols.is_empty() {
println!(" New columns in {}: {:?}", after_table.name, new_cols);
}
let removed_cols: Vec<_> = before_cols.difference(&after_cols).collect();
if !removed_cols.is_empty() {
println!(" ⚠️ Removed columns in {}: {:?}", after_table.name, removed_cols);
}
}
}
println!("✅ Schema changes verified");
}
async fn create_performance_test_data(pool: &PgPool, count: usize) {
println!("🏃 Creating {} records for performance testing...", count);
// Create a test user
let user_id = Uuid::new_v4();
sqlx::query(
"INSERT INTO users (id, username, email, password_hash, role) VALUES ($1, $2, $3, $4, $5)"
)
.bind(user_id)
.bind("perf_test_user")
.bind("perf@test.com")
.bind("test")
.bind("user")
.execute(pool)
.await
.unwrap();
// Batch insert documents
let batch_size = 100;
for batch_start in (0..count).step_by(batch_size) {
let batch_end = (batch_start + batch_size).min(count);
let mut query = String::from(
"INSERT INTO documents (id, user_id, filename, original_filename, file_path, file_size, mime_type, ocr_status, ocr_failure_reason) VALUES "
);
for i in batch_start..batch_end {
if i > batch_start {
query.push_str(", ");
}
let doc_id = Uuid::new_v4();
let status = if i % 3 == 0 { "failed" } else { "completed" };
let failure_reason = if status == "failed" {
match i % 5 {
0 => "'timeout'",
1 => "'memory_limit'",
2 => "'file_corrupted'",
3 => "'low_ocr_confidence'",
_ => "'unknown_error'",
}
} else {
"NULL"
};
query.push_str(&format!(
"('{}', '{}', 'perf_doc_{}.pdf', 'perf_doc_{}.pdf', '/test/perf_{}.pdf', 1024, 'application/pdf', '{}', {})",
doc_id, user_id, i, i, i, status, failure_reason
));
}
sqlx::query(&query).execute(pool).await.unwrap();
}
println!("✅ Created {} test documents", count);
}
fn get_new_migrations_in_pr() -> Vec<String> {
// Check if we're in a CI environment or have a base branch to compare against
let base_branch = std::env::var("GITHUB_BASE_REF")
.or_else(|_| std::env::var("BASE_BRANCH"))
.unwrap_or_else(|_| "main".to_string());
let output = Command::new("git")
.args(["diff", "--name-only", &format!("origin/{}", base_branch), "HEAD", "--", "migrations/"])
.output();
match output {
Ok(output) if output.status.success() => {
let files = String::from_utf8_lossy(&output.stdout);
files
.lines()
.filter(|line| line.ends_with(".sql") && !line.is_empty())
.map(|s| s.to_string())
.collect()
}
_ => {
// Fallback: check for uncommitted migration files
let output = Command::new("git")
.args(["status", "--porcelain", "migrations/"])
.output()
.unwrap_or_else(|_| panic!("Failed to run git status"));
if output.status.success() {
let files = String::from_utf8_lossy(&output.stdout);
files
.lines()
.filter(|line| line.contains(".sql") && (line.starts_with("A ") || line.starts_with("??")))
.map(|line| line.split_whitespace().last().unwrap_or("").to_string())
.filter(|f| !f.is_empty())
.collect()
} else {
Vec::new()
}
}
}
}
fn migration_involves_data_transformation(migrations: &[String]) -> bool {
// Check if any migration file contains data transformation keywords
for migration_file in migrations {
if let Ok(content) = std::fs::read_to_string(migration_file) {
let lowercase = content.to_lowercase();
if lowercase.contains("insert into") && lowercase.contains("select") ||
lowercase.contains("update") && lowercase.contains("set") ||
lowercase.contains("migrate") ||
lowercase.contains("transform") ||
lowercase.contains("failed_documents") {
return true;
}
}
}
false
}
}

View File

@ -0,0 +1,209 @@
use readur::test_utils::TestContext;
use sqlx::Row;
use std::collections::HashSet;
#[cfg(test)]
mod simplified_migration_schema_validation_tests {
use super::*;
#[tokio::test]
async fn test_core_tables_exist() {
let ctx = TestContext::new().await;
let pool = ctx.state.db.get_pool();
let core_tables = vec![
"users",
"documents",
"failed_documents",
"ocr_queue",
"settings",
];
let existing_tables = get_all_tables(pool).await;
for table in core_tables {
assert!(
existing_tables.contains(table),
"Core table '{}' not found in database schema",
table
);
}
println!("✅ All core tables exist");
println!("Found {} total tables in database", existing_tables.len());
}
#[tokio::test]
async fn test_basic_schema_integrity() {
let ctx = TestContext::new().await;
let pool = ctx.state.db.get_pool();
// Test that we can query key tables without errors
let user_count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM users")
.fetch_one(pool)
.await
.unwrap();
let doc_count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM documents")
.fetch_one(pool)
.await
.unwrap();
let failed_doc_count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM failed_documents")
.fetch_one(pool)
.await
.unwrap();
println!("✅ Basic schema integrity verified");
println!(" - Users: {}", user_count);
println!(" - Documents: {}", doc_count);
println!(" - Failed documents: {}", failed_doc_count);
// All counts should be non-negative (basic sanity check)
assert!(user_count >= 0);
assert!(doc_count >= 0);
assert!(failed_doc_count >= 0);
}
#[tokio::test]
async fn test_migration_tables_structure() {
let ctx = TestContext::new().await;
let pool = ctx.state.db.get_pool();
// Test that failed_documents table has the expected columns for migration
let columns = sqlx::query(
"SELECT column_name FROM information_schema.columns
WHERE table_schema = 'public' AND table_name = 'failed_documents'"
)
.fetch_all(pool)
.await
.unwrap();
let column_names: Vec<String> = columns.iter()
.map(|row| row.get("column_name"))
.collect();
let migration_critical_columns = vec![
"id", "user_id", "filename", "failure_reason", "failure_stage", "ingestion_source"
];
for col in migration_critical_columns {
assert!(
column_names.contains(&col.to_string()),
"Critical column '{}' not found in failed_documents table",
col
);
}
println!("✅ Migration-critical table structure verified");
println!(" failed_documents has {} columns", column_names.len());
}
#[tokio::test]
async fn test_constraint_sampling() {
let ctx = TestContext::new().await;
let pool = ctx.state.db.get_pool();
// Test a few key constraints exist
let constraints = sqlx::query(
"SELECT constraint_name, constraint_type
FROM information_schema.table_constraints
WHERE table_schema = 'public'"
)
.fetch_all(pool)
.await
.unwrap();
let primary_keys: Vec<String> = constraints.iter()
.filter(|row| row.get::<String, _>("constraint_type") == "PRIMARY KEY")
.map(|row| row.get("constraint_name"))
.collect();
let foreign_keys: Vec<String> = constraints.iter()
.filter(|row| row.get::<String, _>("constraint_type") == "FOREIGN KEY")
.map(|row| row.get("constraint_name"))
.collect();
let check_constraints: Vec<String> = constraints.iter()
.filter(|row| row.get::<String, _>("constraint_type") == "CHECK")
.map(|row| row.get("constraint_name"))
.collect();
println!("✅ Database constraints verified");
println!(" - Primary keys: {}", primary_keys.len());
println!(" - Foreign keys: {}", foreign_keys.len());
println!(" - Check constraints: {}", check_constraints.len());
// Basic sanity checks
assert!(primary_keys.len() > 0, "Should have at least one primary key");
assert!(foreign_keys.len() > 0, "Should have at least one foreign key");
}
#[tokio::test]
async fn test_migration_workflow_readiness() {
let ctx = TestContext::new().await;
let pool = ctx.state.db.get_pool();
// Test that the database is ready for the migration workflow we test
// This includes checking that we can insert test data successfully
// Create a test user
let user_id = uuid::Uuid::new_v4();
let username = format!("migration_test_{}", user_id.to_string().split('-').next().unwrap());
let user_result = sqlx::query(
"INSERT INTO users (id, username, email, password_hash, role) VALUES ($1, $2, $3, $4, $5)"
)
.bind(user_id)
.bind(&username)
.bind(format!("{}@test.com", username))
.bind("test_hash")
.bind("user")
.execute(pool)
.await;
assert!(user_result.is_ok(), "Should be able to create test user");
// Test that failed_documents accepts valid data
let failed_doc_result = sqlx::query(
"INSERT INTO failed_documents (user_id, filename, failure_reason, failure_stage, ingestion_source)
VALUES ($1, 'test.pdf', 'other', 'ocr', 'test')"
)
.bind(user_id)
.execute(pool)
.await;
assert!(failed_doc_result.is_ok(), "Should be able to insert into failed_documents");
// Clean up
sqlx::query("DELETE FROM failed_documents WHERE user_id = $1")
.bind(user_id)
.execute(pool)
.await
.unwrap();
sqlx::query("DELETE FROM users WHERE id = $1")
.bind(user_id)
.execute(pool)
.await
.unwrap();
println!("✅ Migration workflow readiness verified");
}
// Helper functions
async fn get_all_tables(pool: &sqlx::PgPool) -> HashSet<String> {
let rows = sqlx::query(
"SELECT table_name FROM information_schema.tables
WHERE table_schema = 'public' AND table_type = 'BASE TABLE'"
)
.fetch_all(pool)
.await
.unwrap();
rows.into_iter()
.map(|row| row.get("table_name"))
.collect()
}
}