use readur::test_utils::TestContext; use sqlx::{PgPool, Row}; use uuid::Uuid; use std::process::Command; #[cfg(test)] mod pr_migration_validation_tests { use super::*; #[tokio::test] async fn test_new_migration_with_prefilled_data() { // Check if this PR introduces any new migrations let new_migrations = get_new_migrations_in_pr(); if new_migrations.is_empty() { println!("โœ… No new migrations in this PR - skipping prefilled data test"); return; } println!("๐Ÿ” Found {} new migration(s) in this PR:", new_migrations.len()); for migration in &new_migrations { println!(" - {}", migration); } // Run the comprehensive test with prefilled data let ctx = TestContext::new().await; let pool = ctx.state.db.get_pool(); // Step 1: Prefill database with comprehensive test data let test_data = prefill_comprehensive_test_data(pool).await; println!("โœ… Prefilled database with {} test scenarios", test_data.scenarios.len()); // Step 2: Verify all migrations run successfully with prefilled data verify_migrations_with_data(pool, &test_data).await; // Step 3: Test specific migration scenarios if they involve data transformation if migration_involves_data_transformation(&new_migrations) { test_data_transformation_integrity(pool, &test_data).await; } // Step 4: Verify no data loss occurred verify_no_data_loss(pool, &test_data).await; println!("โœ… All new migrations passed validation with prefilled data"); } #[tokio::test] async fn test_migration_rollback_safety() { let new_migrations = get_new_migrations_in_pr(); if new_migrations.is_empty() { println!("โœ… No new migrations in this PR - skipping rollback safety test"); return; } let ctx = TestContext::new().await; let pool = ctx.state.db.get_pool(); // Create snapshot of schema before migrations let schema_before = capture_schema_snapshot(pool).await; // Run migrations let migration_result = sqlx::migrate!("./migrations").run(pool).await; assert!(migration_result.is_ok(), "Migrations should succeed"); // Capture schema after migrations let schema_after = capture_schema_snapshot(pool).await; // Verify schema changes are intentional verify_schema_changes(&schema_before, &schema_after, &new_migrations); println!("โœ… Migration rollback safety verified"); } #[tokio::test] async fn test_migration_performance_impact() { let new_migrations = get_new_migrations_in_pr(); if new_migrations.is_empty() { return; } let ctx = TestContext::new().await; let pool = ctx.state.db.get_pool(); // Prefill with large dataset create_performance_test_data(pool, 10000).await; // Measure migration execution time let start = std::time::Instant::now(); let result = sqlx::migrate!("./migrations").run(pool).await; let duration = start.elapsed(); assert!(result.is_ok(), "Migrations should succeed"); assert!( duration.as_secs() < 30, "Migrations took too long: {:?}. Consider optimizing for large datasets.", duration ); println!("โœ… Migration performance acceptable: {:?}", duration); } // Data structures for comprehensive testing struct ComprehensiveTestData { users: Vec, documents: Vec, scenarios: Vec, total_records: usize, } struct TestUser { id: Uuid, username: String, role: String, } struct TestDocument { id: Uuid, user_id: Uuid, filename: String, ocr_status: String, failure_reason: Option, metadata: DocumentMetadata, } struct DocumentMetadata { file_size: i64, mime_type: String, has_ocr_text: bool, tags: Vec, } struct TestScenario { name: String, description: String, affected_tables: Vec, record_count: usize, } struct SchemaSnapshot { tables: Vec, indexes: Vec, constraints: Vec, views: Vec, } struct TableInfo { name: String, columns: Vec, row_count: i64, } struct ColumnInfo { name: String, data_type: String, is_nullable: bool, } // Implementation functions async fn prefill_comprehensive_test_data(pool: &PgPool) -> ComprehensiveTestData { let mut users = Vec::new(); let mut documents = Vec::new(); let mut scenarios = Vec::new(); // Create diverse user types let user_types = vec![ ("admin", "admin"), ("regular", "user"), ("readonly", "user"), ]; for (user_type, role) in user_types { let user = create_test_user_with_role(pool, user_type, role).await; users.push(user); } // Create various document scenarios let document_scenarios = vec![ // Successful documents ("success_high_conf.pdf", "completed", None, 0.95, true), ("success_medium_conf.pdf", "completed", None, 0.75, true), ("success_with_tags.pdf", "completed", None, 0.85, true), // Failed documents with different reasons ("fail_low_confidence.pdf", "failed", Some("low_ocr_confidence"), 0.3, true), ("fail_timeout.pdf", "failed", Some("timeout"), 0.0, false), ("fail_memory.pdf", "failed", Some("memory_limit"), 0.0, false), ("fail_corrupted.pdf", "failed", Some("file_corrupted"), 0.0, false), ("fail_unsupported.xyz", "failed", Some("unsupported_format"), 0.0, false), ("fail_access_denied.pdf", "failed", Some("access_denied"), 0.0, false), ("fail_parsing.pdf", "failed", Some("pdf_parsing_error"), 0.0, false), ("fail_unknown.pdf", "failed", Some("unknown_error"), 0.0, false), ("fail_null_reason.pdf", "failed", None, 0.0, false), // Pending documents ("pending_new.pdf", "pending", None, 0.0, false), ("pending_retry.pdf", "pending", None, 0.0, false), // Edge cases ("edge_empty_file.pdf", "failed", Some("file_corrupted"), 0.0, false), ("edge_huge_file.pdf", "failed", Some("file_too_large"), 0.0, false), ("edge_special_chars_ยง.pdf", "completed", None, 0.9, true), ]; // Create documents for each user for user in &users { for (filename, status, failure_reason, confidence, has_text) in &document_scenarios { let doc = create_test_document( pool, user.id, filename, status, failure_reason.as_deref(), *confidence, *has_text ).await; documents.push(doc); } } // Create OCR queue entries for some documents for doc in documents.iter().filter(|d| d.ocr_status == "pending" || d.ocr_status == "failed") { create_ocr_queue_entry(pool, doc.id).await; } // Create scenarios description scenarios.push(TestScenario { name: "User Management".to_string(), description: "Different user roles and permissions".to_string(), affected_tables: vec!["users".to_string()], record_count: users.len(), }); scenarios.push(TestScenario { name: "Document Processing".to_string(), description: "Various document states and failure scenarios".to_string(), affected_tables: vec!["documents".to_string(), "failed_documents".to_string()], record_count: documents.len(), }); scenarios.push(TestScenario { name: "OCR Queue".to_string(), description: "OCR processing queue with retries".to_string(), affected_tables: vec!["ocr_queue".to_string()], record_count: documents.iter().filter(|d| d.ocr_status != "completed").count(), }); let total_records = users.len() + documents.len(); ComprehensiveTestData { users, documents, scenarios, total_records, } } async fn create_test_user_with_role(pool: &PgPool, user_type: &str, role: &str) -> TestUser { let id = Uuid::new_v4(); let username = format!("test_{}_{}", user_type, Uuid::new_v4().to_string().split('-').next().unwrap()); sqlx::query( "INSERT INTO users (id, username, email, password_hash, role) VALUES ($1, $2, $3, $4, $5)" ) .bind(id) .bind(&username) .bind(format!("{}@test.com", username)) .bind("test_hash") .bind(role) .execute(pool) .await .unwrap(); TestUser { id, username, role: role.to_string() } } async fn create_test_document( pool: &PgPool, user_id: Uuid, filename: &str, status: &str, failure_reason: Option<&str>, confidence: f32, has_text: bool, ) -> TestDocument { let id = Uuid::new_v4(); let file_size = match filename { f if f.contains("huge") => 104857600, // 100MB f if f.contains("empty") => 0, _ => 1024 * (1 + (id.as_bytes()[0] as i64)), // Variable size }; let mime_type = if filename.ends_with(".pdf") { "application/pdf" } else { "application/octet-stream" }; let tags = if filename.contains("tags") { vec!["important", "reviewed", "2024"] } else { vec![] }; let ocr_text = if has_text { Some(format!("Sample OCR text for document {}", filename)) } else { None }; sqlx::query( r#" INSERT INTO documents ( id, user_id, filename, original_filename, file_path, file_size, mime_type, ocr_status, ocr_failure_reason, ocr_confidence, ocr_text, tags ) VALUES ( $1, $2, $3, $3, $4, $5, $6, $7, $8, $9, $10, $11 ) "# ) .bind(id) .bind(user_id) .bind(filename) .bind(format!("/test/files/{}", filename)) .bind(file_size) .bind(mime_type) .bind(status) .bind(failure_reason) .bind(if confidence > 0.0 { Some(confidence) } else { None }) .bind(ocr_text) .bind(&tags) .execute(pool) .await .unwrap(); TestDocument { id, user_id, filename: filename.to_string(), ocr_status: status.to_string(), failure_reason: failure_reason.map(|s| s.to_string()), metadata: DocumentMetadata { file_size, mime_type: mime_type.to_string(), has_ocr_text: has_text, tags: tags.iter().map(|s| s.to_string()).collect(), }, } } async fn create_ocr_queue_entry(pool: &PgPool, document_id: Uuid) { sqlx::query( "INSERT INTO ocr_queue (document_id, priority, status) VALUES ($1, $2, $3)" ) .bind(document_id) .bind(1) .bind("pending") .execute(pool) .await .unwrap(); } async fn verify_migrations_with_data(pool: &PgPool, test_data: &ComprehensiveTestData) { // Count records before any potential data migration let doc_count_before: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM documents") .fetch_one(pool) .await .unwrap(); let user_count_before: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM users") .fetch_one(pool) .await .unwrap(); println!("๐Ÿ“Š Database state before migration verification:"); println!(" - Users: {}", user_count_before); println!(" - Documents: {}", doc_count_before); // Verify failed document migration if applicable let failed_docs: i64 = sqlx::query_scalar( "SELECT COUNT(*) FROM documents WHERE ocr_status = 'failed'" ) .fetch_one(pool) .await .unwrap(); if failed_docs > 0 { println!(" - Failed documents to migrate: {}", failed_docs); // Verify migration mapping works correctly let mapping_test = sqlx::query( r#" SELECT ocr_failure_reason, COUNT(*) as count, CASE WHEN ocr_failure_reason = 'low_ocr_confidence' THEN 'low_ocr_confidence' WHEN ocr_failure_reason = 'timeout' THEN 'ocr_timeout' WHEN ocr_failure_reason = 'memory_limit' THEN 'ocr_memory_limit' WHEN ocr_failure_reason = 'pdf_parsing_error' THEN 'pdf_parsing_error' WHEN ocr_failure_reason = 'corrupted' OR ocr_failure_reason = 'file_corrupted' THEN 'file_corrupted' WHEN ocr_failure_reason = 'unsupported_format' THEN 'unsupported_format' WHEN ocr_failure_reason = 'access_denied' THEN 'access_denied' ELSE 'other' END as mapped_reason FROM documents WHERE ocr_status = 'failed' GROUP BY ocr_failure_reason "# ) .fetch_all(pool) .await .unwrap(); println!(" - Failure reason mappings:"); for row in mapping_test { let original: Option = row.get("ocr_failure_reason"); let mapped: String = row.get("mapped_reason"); let count: i64 = row.get("count"); println!(" {:?} -> {} ({} documents)", original, mapped, count); } } } async fn test_data_transformation_integrity(pool: &PgPool, test_data: &ComprehensiveTestData) { // Test that data transformations maintain integrity println!("๐Ÿ”„ Testing data transformation integrity..."); // Check if failed_documents table exists (indicating migration ran) let failed_docs_exists = sqlx::query( "SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'failed_documents')" ) .fetch_one(pool) .await .unwrap() .get::(0); if failed_docs_exists { // Verify all failed documents were migrated correctly let migrated_count: i64 = sqlx::query_scalar( "SELECT COUNT(*) FROM failed_documents WHERE failure_stage = 'ocr'" ) .fetch_one(pool) .await .unwrap(); let expected_failed = test_data.documents.iter() .filter(|d| d.ocr_status == "failed") .count(); assert!( migrated_count >= expected_failed as i64, "Not all failed documents were migrated: expected at least {}, got {}", expected_failed, migrated_count ); // Verify data integrity for specific test cases for doc in test_data.documents.iter().filter(|d| d.ocr_status == "failed") { let migrated = sqlx::query( "SELECT * FROM failed_documents WHERE filename = $1" ) .bind(&doc.filename) .fetch_optional(pool) .await .unwrap(); assert!( migrated.is_some(), "Failed document '{}' was not migrated", doc.filename ); if let Some(row) = migrated { let failure_reason: String = row.get("failure_reason"); // Verify reason mapping match doc.failure_reason.as_deref() { Some("timeout") => assert_eq!(failure_reason, "ocr_timeout"), Some("memory_limit") => assert_eq!(failure_reason, "ocr_memory_limit"), Some("file_corrupted") => assert_eq!(failure_reason, "file_corrupted"), Some("low_ocr_confidence") => assert_eq!(failure_reason, "low_ocr_confidence"), Some("unknown_error") | None => assert_eq!(failure_reason, "other"), _ => {} } } } } println!("โœ… Data transformation integrity verified"); } async fn verify_no_data_loss(pool: &PgPool, test_data: &ComprehensiveTestData) { println!("๐Ÿ” Verifying no data loss occurred..."); // Check user count let user_count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM users") .fetch_one(pool) .await .unwrap(); assert!( user_count >= test_data.users.len() as i64, "User data loss detected: expected at least {}, got {}", test_data.users.len(), user_count ); // Check total document count (including migrated) let doc_count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM documents") .fetch_one(pool) .await .unwrap(); let failed_doc_count: i64 = sqlx::query_scalar( "SELECT COUNT(*) FROM failed_documents WHERE ingestion_source IS NOT NULL" ) .fetch_one(pool) .await .unwrap_or(0); let total_docs = doc_count + failed_doc_count; let expected_docs = test_data.documents.len() as i64; assert!( total_docs >= expected_docs, "Document data loss detected: expected at least {}, got {} (documents: {}, failed_documents: {})", expected_docs, total_docs, doc_count, failed_doc_count ); println!("โœ… No data loss detected"); } async fn capture_schema_snapshot(pool: &PgPool) -> SchemaSnapshot { let tables = sqlx::query( r#" SELECT t.table_name, COUNT(c.column_name) as column_count FROM information_schema.tables t LEFT JOIN information_schema.columns c ON t.table_name = c.table_name AND t.table_schema = c.table_schema WHERE t.table_schema = 'public' AND t.table_type = 'BASE TABLE' GROUP BY t.table_name ORDER BY t.table_name "# ) .fetch_all(pool) .await .unwrap(); let mut table_infos = Vec::new(); for table_row in tables { let table_name: String = table_row.get("table_name"); // Get columns for this table let columns = sqlx::query( r#" SELECT column_name, data_type, is_nullable FROM information_schema.columns WHERE table_schema = 'public' AND table_name = $1 ORDER BY ordinal_position "# ) .bind(&table_name) .fetch_all(pool) .await .unwrap(); let column_infos: Vec = columns.into_iter() .map(|col| ColumnInfo { name: col.get("column_name"), data_type: col.get("data_type"), is_nullable: col.get::("is_nullable") == "YES", }) .collect(); // Get row count let count_query = format!("SELECT COUNT(*) FROM {}", table_name); let row_count: i64 = sqlx::query_scalar(&count_query) .fetch_one(pool) .await .unwrap_or(0); table_infos.push(TableInfo { name: table_name, columns: column_infos, row_count, }); } // Get indexes let indexes = sqlx::query( "SELECT indexname FROM pg_indexes WHERE schemaname = 'public'" ) .fetch_all(pool) .await .unwrap() .into_iter() .map(|row| row.get("indexname")) .collect(); // Get constraints let constraints = sqlx::query( r#" SELECT constraint_name || ' (' || constraint_type || ')' as constraint_info FROM information_schema.table_constraints WHERE constraint_schema = 'public' "# ) .fetch_all(pool) .await .unwrap() .into_iter() .map(|row| row.get("constraint_info")) .collect(); // Get views let views = sqlx::query( "SELECT table_name FROM information_schema.views WHERE table_schema = 'public'" ) .fetch_all(pool) .await .unwrap() .into_iter() .map(|row| row.get("table_name")) .collect(); SchemaSnapshot { tables: table_infos, indexes, constraints, views, } } fn verify_schema_changes(before: &SchemaSnapshot, after: &SchemaSnapshot, migrations: &[String]) { println!("๐Ÿ“‹ Verifying schema changes..."); // Check for new tables let before_tables: std::collections::HashSet<_> = before.tables.iter().map(|t| &t.name).collect(); let after_tables: std::collections::HashSet<_> = after.tables.iter().map(|t| &t.name).collect(); let new_tables: Vec<_> = after_tables.difference(&before_tables).collect(); if !new_tables.is_empty() { println!(" New tables added: {:?}", new_tables); } // Check for removed tables (should not happen in migrations) let removed_tables: Vec<_> = before_tables.difference(&after_tables).collect(); assert!( removed_tables.is_empty(), "Tables were removed in migration: {:?}", removed_tables ); // Check for column changes for after_table in &after.tables { if let Some(before_table) = before.tables.iter().find(|t| t.name == after_table.name) { let before_cols: std::collections::HashSet<_> = before_table.columns.iter().map(|c| &c.name).collect(); let after_cols: std::collections::HashSet<_> = after_table.columns.iter().map(|c| &c.name).collect(); let new_cols: Vec<_> = after_cols.difference(&before_cols).collect(); if !new_cols.is_empty() { println!(" New columns in {}: {:?}", after_table.name, new_cols); } let removed_cols: Vec<_> = before_cols.difference(&after_cols).collect(); if !removed_cols.is_empty() { println!(" โš ๏ธ Removed columns in {}: {:?}", after_table.name, removed_cols); } } } println!("โœ… Schema changes verified"); } async fn create_performance_test_data(pool: &PgPool, count: usize) { println!("๐Ÿƒ Creating {} records for performance testing...", count); // Create a test user let user_id = Uuid::new_v4(); sqlx::query( "INSERT INTO users (id, username, email, password_hash, role) VALUES ($1, $2, $3, $4, $5)" ) .bind(user_id) .bind("perf_test_user") .bind("perf@test.com") .bind("test") .bind("user") .execute(pool) .await .unwrap(); // Batch insert documents let batch_size = 100; for batch_start in (0..count).step_by(batch_size) { let batch_end = (batch_start + batch_size).min(count); let mut query = String::from( "INSERT INTO documents (id, user_id, filename, original_filename, file_path, file_size, mime_type, ocr_status, ocr_failure_reason) VALUES " ); for i in batch_start..batch_end { if i > batch_start { query.push_str(", "); } let doc_id = Uuid::new_v4(); let status = if i % 3 == 0 { "failed" } else { "completed" }; let failure_reason = if status == "failed" { match i % 5 { 0 => "'timeout'", 1 => "'memory_limit'", 2 => "'file_corrupted'", 3 => "'low_ocr_confidence'", _ => "'unknown_error'", } } else { "NULL" }; query.push_str(&format!( "('{}', '{}', 'perf_doc_{}.pdf', 'perf_doc_{}.pdf', '/test/perf_{}.pdf', 1024, 'application/pdf', '{}', {})", doc_id, user_id, i, i, i, status, failure_reason )); } sqlx::query(&query).execute(pool).await.unwrap(); } println!("โœ… Created {} test documents", count); } fn get_new_migrations_in_pr() -> Vec { // Check if we're in a CI environment or have a base branch to compare against let base_branch = std::env::var("GITHUB_BASE_REF") .or_else(|_| std::env::var("BASE_BRANCH")) .unwrap_or_else(|_| "main".to_string()); let output = Command::new("git") .args(["diff", "--name-only", &format!("origin/{}", base_branch), "HEAD", "--", "migrations/"]) .output(); match output { Ok(output) if output.status.success() => { let files = String::from_utf8_lossy(&output.stdout); files .lines() .filter(|line| line.ends_with(".sql") && !line.is_empty()) .map(|s| s.to_string()) .collect() } _ => { // Fallback: check for uncommitted migration files let output = Command::new("git") .args(["status", "--porcelain", "migrations/"]) .output() .unwrap_or_else(|_| panic!("Failed to run git status")); if output.status.success() { let files = String::from_utf8_lossy(&output.stdout); files .lines() .filter(|line| line.contains(".sql") && (line.starts_with("A ") || line.starts_with("??"))) .map(|line| line.split_whitespace().last().unwrap_or("").to_string()) .filter(|f| !f.is_empty()) .collect() } else { Vec::new() } } } } fn migration_involves_data_transformation(migrations: &[String]) -> bool { // Check if any migration file contains data transformation keywords for migration_file in migrations { if let Ok(content) = std::fs::read_to_string(migration_file) { let lowercase = content.to_lowercase(); if lowercase.contains("insert into") && lowercase.contains("select") || lowercase.contains("update") && lowercase.contains("set") || lowercase.contains("migrate") || lowercase.contains("transform") || lowercase.contains("failed_documents") { return true; } } } false } }