diff --git a/tests/comprehensive_migration_tests.rs b/tests/comprehensive_migration_tests.rs new file mode 100644 index 0000000..8bed356 --- /dev/null +++ b/tests/comprehensive_migration_tests.rs @@ -0,0 +1,640 @@ +use readur::test_utils::TestContext; +use sqlx::{PgPool, Row}; +use uuid::Uuid; +use std::collections::HashMap; + +#[cfg(test)] +mod comprehensive_migration_tests { + use super::*; + + #[tokio::test] + async fn test_migration_with_prefilled_data() { + let ctx = TestContext::new().await; + let pool = ctx.state.db.get_pool(); + + // Step 1: Prefill the database with test data + let test_data = prefill_test_data(pool).await; + + // Step 2: Verify the prefilled data exists + verify_prefilled_data(pool, &test_data).await; + + // Step 3: Simulate and test the failed documents migration + test_failed_documents_migration(pool, &test_data).await; + + // Step 4: Verify schema integrity after migration + verify_schema_integrity(pool).await; + + // Step 5: Test data consistency after migration + verify_data_consistency_after_migration(pool, &test_data).await; + } + + #[tokio::test] + async fn test_migration_preserves_data_integrity() { + let ctx = TestContext::new().await; + let pool = ctx.state.db.get_pool(); + + // Create comprehensive test data covering all edge cases + let user_id = create_test_user(pool).await; + + // Insert various types of documents + let document_scenarios = vec![ + DocumentScenario { + filename: "normal_success.pdf", + ocr_status: "completed", + ocr_failure_reason: None, + ocr_error: None, + ocr_confidence: Some(0.95), + ocr_text: Some("This is a successful OCR"), + file_size: 1024, + }, + DocumentScenario { + filename: "low_confidence_fail.pdf", + ocr_status: "failed", + ocr_failure_reason: Some("low_ocr_confidence"), + ocr_error: Some("OCR confidence below threshold"), + ocr_confidence: Some(0.3), + ocr_text: Some("Partially recognized text"), + file_size: 2048, + }, + DocumentScenario { + filename: "timeout_fail.pdf", + ocr_status: "failed", + ocr_failure_reason: Some("timeout"), + ocr_error: Some("OCR processing timed out after 60 seconds"), + ocr_confidence: None, + ocr_text: None, + file_size: 10485760, // 10MB + }, + DocumentScenario { + filename: "memory_fail.pdf", + ocr_status: "failed", + ocr_failure_reason: Some("memory_limit"), + ocr_error: Some("Memory limit exceeded"), + ocr_confidence: None, + ocr_text: None, + file_size: 52428800, // 50MB + }, + DocumentScenario { + filename: "corrupted_file.pdf", + ocr_status: "failed", + ocr_failure_reason: Some("file_corrupted"), + ocr_error: Some("PDF file appears to be corrupted"), + ocr_confidence: None, + ocr_text: None, + file_size: 512, + }, + DocumentScenario { + filename: "unsupported.xyz", + ocr_status: "failed", + ocr_failure_reason: Some("unsupported_format"), + ocr_error: Some("File format not supported"), + ocr_confidence: None, + ocr_text: None, + file_size: 256, + }, + DocumentScenario { + filename: "pending_ocr.pdf", + ocr_status: "pending", + ocr_failure_reason: None, + ocr_error: None, + ocr_confidence: None, + ocr_text: None, + file_size: 4096, + }, + ]; + + // Insert all test documents + let mut document_ids = HashMap::new(); + for scenario in &document_scenarios { + let doc_id = insert_test_document(pool, user_id, scenario).await; + document_ids.insert(scenario.filename, doc_id); + } + + // Count documents before migration + let failed_count_before: i64 = sqlx::query_scalar( + "SELECT COUNT(*) FROM documents WHERE ocr_status = 'failed'" + ) + .fetch_one(pool) + .await + .unwrap(); + + let successful_count_before: i64 = sqlx::query_scalar( + "SELECT COUNT(*) FROM documents WHERE ocr_status = 'completed'" + ) + .fetch_one(pool) + .await + .unwrap(); + + // Verify the migration query works correctly (simulate the migration) + let migration_preview = sqlx::query( + r#" + SELECT + d.filename, + d.ocr_failure_reason, + CASE + WHEN d.ocr_failure_reason = 'low_ocr_confidence' THEN 'low_ocr_confidence' + WHEN d.ocr_failure_reason = 'timeout' THEN 'ocr_timeout' + WHEN d.ocr_failure_reason = 'memory_limit' THEN 'ocr_memory_limit' + WHEN d.ocr_failure_reason = 'pdf_parsing_error' THEN 'pdf_parsing_error' + WHEN d.ocr_failure_reason = 'corrupted' OR d.ocr_failure_reason = 'file_corrupted' THEN 'file_corrupted' + WHEN d.ocr_failure_reason = 'unsupported_format' THEN 'unsupported_format' + WHEN d.ocr_failure_reason = 'access_denied' THEN 'access_denied' + ELSE 'other' + END as mapped_failure_reason + FROM documents d + WHERE d.ocr_status = 'failed' + "# + ) + .fetch_all(pool) + .await + .unwrap(); + + // Verify mappings are correct + for row in migration_preview { + let filename: String = row.get("filename"); + let original_reason: Option = row.get("ocr_failure_reason"); + let mapped_reason: String = row.get("mapped_failure_reason"); + + println!("Migration mapping: {} - {:?} -> {}", filename, original_reason, mapped_reason); + + // Verify specific mappings + match original_reason.as_deref() { + Some("low_ocr_confidence") => assert_eq!(mapped_reason, "low_ocr_confidence"), + Some("timeout") => assert_eq!(mapped_reason, "ocr_timeout"), + Some("memory_limit") => assert_eq!(mapped_reason, "ocr_memory_limit"), + Some("file_corrupted") => assert_eq!(mapped_reason, "file_corrupted"), + Some("unsupported_format") => assert_eq!(mapped_reason, "unsupported_format"), + _ => assert_eq!(mapped_reason, "other"), + } + } + + // Verify that successful and pending documents are not affected + assert_eq!(successful_count_before, 1, "Should have 1 successful document"); + assert_eq!(failed_count_before, 5, "Should have 5 failed documents"); + } + + #[tokio::test] + async fn test_migration_with_ocr_queue_data() { + let ctx = TestContext::new().await; + let pool = ctx.state.db.get_pool(); + + let user_id = create_test_user(pool).await; + + // Create a document with OCR queue history + let doc_id = Uuid::new_v4(); + sqlx::query( + r#" + INSERT INTO documents (id, user_id, filename, original_filename, file_path, file_size, mime_type, ocr_status, ocr_failure_reason, ocr_error) + VALUES ($1, $2, $3, $3, '/test/path', 1000, 'application/pdf', 'failed', 'timeout', 'OCR timeout after retries') + "# + ) + .bind(doc_id) + .bind(user_id) + .bind("retry_test.pdf") + .execute(pool) + .await + .unwrap(); + + // Add OCR queue entries to simulate retry history + for i in 0..3 { + sqlx::query( + r#" + INSERT INTO ocr_queue (document_id, priority, status, error_message, created_at) + VALUES ($1, $2, $3, $4, NOW() - INTERVAL '1 hour' * $5) + "# + ) + .bind(doc_id) + .bind(1) + .bind(if i < 2 { "failed" } else { "processing" }) + .bind(if i < 2 { Some("Retry attempt failed") } else { None }) + .bind((3 - i) as i32) + .execute(pool) + .await + .unwrap(); + } + + // Test the migration query with retry count + let result = sqlx::query( + r#" + SELECT + d.filename, + d.ocr_failure_reason, + COALESCE(q.retry_count, 0) as retry_count + FROM documents d + LEFT JOIN ( + SELECT document_id, COUNT(*) as retry_count + FROM ocr_queue + WHERE status IN ('failed', 'completed') + GROUP BY document_id + ) q ON d.id = q.document_id + WHERE d.id = $1 + "# + ) + .bind(doc_id) + .fetch_one(pool) + .await + .unwrap(); + + let retry_count: i64 = result.get("retry_count"); + assert_eq!(retry_count, 2, "Should have 2 failed retry attempts"); + } + + #[tokio::test] + async fn test_migration_handles_null_values() { + let ctx = TestContext::new().await; + let pool = ctx.state.db.get_pool(); + + let user_id = create_test_user(pool).await; + + // Insert documents with various NULL values + let null_scenarios = vec![ + ("null_reason.pdf", None, Some("Error without reason")), + ("null_error.pdf", Some("unknown"), None), + ("all_nulls.pdf", None, None), + ]; + + for (filename, reason, error) in &null_scenarios { + sqlx::query( + r#" + INSERT INTO documents (user_id, filename, original_filename, file_path, file_size, mime_type, ocr_status, ocr_failure_reason, ocr_error) + VALUES ($1, $2, $2, '/test/path', 1000, 'application/pdf', 'failed', $3, $4) + "# + ) + .bind(user_id) + .bind(filename) + .bind(reason) + .bind(error) + .execute(pool) + .await + .unwrap(); + } + + // Verify migration handles NULLs correctly + let migrated_data = sqlx::query( + r#" + SELECT + filename, + ocr_failure_reason, + CASE + WHEN ocr_failure_reason = 'low_ocr_confidence' THEN 'low_ocr_confidence' + WHEN ocr_failure_reason = 'timeout' THEN 'ocr_timeout' + WHEN ocr_failure_reason = 'memory_limit' THEN 'ocr_memory_limit' + WHEN ocr_failure_reason = 'pdf_parsing_error' THEN 'pdf_parsing_error' + WHEN ocr_failure_reason = 'corrupted' OR ocr_failure_reason = 'file_corrupted' THEN 'file_corrupted' + WHEN ocr_failure_reason = 'unsupported_format' THEN 'unsupported_format' + WHEN ocr_failure_reason = 'access_denied' THEN 'access_denied' + ELSE 'other' + END as mapped_reason, + ocr_error + FROM documents + WHERE user_id = $1 AND ocr_status = 'failed' + ORDER BY filename + "# + ) + .bind(user_id) + .fetch_all(pool) + .await + .unwrap(); + + assert_eq!(migrated_data.len(), 3); + for row in migrated_data { + let mapped_reason: String = row.get("mapped_reason"); + assert_eq!(mapped_reason, "other", "NULL or unknown reasons should map to 'other'"); + } + } + + #[tokio::test] + async fn test_migration_performance_with_large_dataset() { + let ctx = TestContext::new().await; + let pool = ctx.state.db.get_pool(); + + let user_id = create_test_user(pool).await; + + // Insert a large number of failed documents + let batch_size = 100; + let start_time = std::time::Instant::now(); + + for batch in 0..10 { + let mut query = String::from( + "INSERT INTO documents (user_id, filename, original_filename, file_path, file_size, mime_type, ocr_status, ocr_failure_reason, ocr_error) VALUES " + ); + let mut _values: Vec = Vec::new(); + + for i in 0..batch_size { + let doc_num = batch * batch_size + i; + let filename = format!("bulk_doc_{}.pdf", doc_num); + let reason = match doc_num % 5 { + 0 => "low_ocr_confidence", + 1 => "timeout", + 2 => "memory_limit", + 3 => "file_corrupted", + _ => "unknown_error", + }; + + if i > 0 { + query.push_str(", "); + } + query.push_str(&format!("($1, '{}', '{}', '/test/path', 1000, 'application/pdf', 'failed', '{}', 'Test error')", + filename, filename, reason)); + } + + sqlx::query(&query) + .bind(user_id) + .execute(pool) + .await + .unwrap(); + } + + let insert_duration = start_time.elapsed(); + println!("Inserted 1000 documents in {:?}", insert_duration); + + // Measure migration query performance + let migration_start = std::time::Instant::now(); + + let count: i64 = sqlx::query_scalar( + "SELECT COUNT(*) FROM documents WHERE ocr_status = 'failed'" + ) + .fetch_one(pool) + .await + .unwrap(); + + assert_eq!(count, 1000, "Should have 1000 failed documents"); + + // Simulate the migration SELECT + let _migration_data = sqlx::query( + r#" + SELECT * FROM documents WHERE ocr_status = 'failed' + "# + ) + .fetch_all(pool) + .await + .unwrap(); + + let migration_duration = migration_start.elapsed(); + println!("Migration query completed in {:?}", migration_duration); + + // Performance assertion - migration should complete reasonably fast + assert!(migration_duration.as_secs() < 5, "Migration query should complete within 5 seconds"); + } + + // Helper functions + + struct TestData { + user_id: Uuid, + document_ids: HashMap, + failure_scenarios: Vec<(String, String, String)>, + } + + struct DocumentScenario { + filename: &'static str, + ocr_status: &'static str, + ocr_failure_reason: Option<&'static str>, + ocr_error: Option<&'static str>, + ocr_confidence: Option, + ocr_text: Option<&'static str>, + file_size: i64, + } + + async fn create_test_user(pool: &PgPool) -> Uuid { + let user_id = Uuid::new_v4(); + let unique_suffix = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_nanos(); + let username = format!("test_migration_user_{}", unique_suffix); + let email = format!("test_migration_{}@example.com", unique_suffix); + + sqlx::query( + "INSERT INTO users (id, username, email, password_hash, role) VALUES ($1, $2, $3, $4, $5)" + ) + .bind(user_id) + .bind(&username) + .bind(&email) + .bind("test_hash") + .bind("user") + .execute(pool) + .await + .unwrap(); + + user_id + } + + async fn insert_test_document(pool: &PgPool, user_id: Uuid, scenario: &DocumentScenario) -> Uuid { + let doc_id = Uuid::new_v4(); + + sqlx::query( + r#" + INSERT INTO documents ( + id, user_id, filename, original_filename, file_path, file_size, + mime_type, ocr_status, ocr_failure_reason, ocr_error, + ocr_confidence, ocr_text + ) VALUES ( + $1, $2, $3, $3, '/test/path', $4, $5, $6, $7, $8, $9, $10 + ) + "# + ) + .bind(doc_id) + .bind(user_id) + .bind(scenario.filename) + .bind(scenario.file_size) + .bind(if scenario.filename.ends_with(".pdf") { "application/pdf" } else { "application/octet-stream" }) + .bind(scenario.ocr_status) + .bind(scenario.ocr_failure_reason) + .bind(scenario.ocr_error) + .bind(scenario.ocr_confidence) + .bind(scenario.ocr_text) + .execute(pool) + .await + .unwrap(); + + doc_id + } + + async fn prefill_test_data(pool: &PgPool) -> TestData { + let user_id = create_test_user(pool).await; + let mut document_ids = HashMap::new(); + + let failure_scenarios = vec![ + ("timeout_doc.pdf".to_string(), "timeout".to_string(), "OCR processing timed out".to_string()), + ("memory_doc.pdf".to_string(), "memory_limit".to_string(), "Memory limit exceeded".to_string()), + ("corrupt_doc.pdf".to_string(), "file_corrupted".to_string(), "File is corrupted".to_string()), + ("low_conf_doc.pdf".to_string(), "low_ocr_confidence".to_string(), "Confidence too low".to_string()), + ]; + + // Insert test documents + for (filename, reason, error) in &failure_scenarios { + let doc_id = Uuid::new_v4(); + sqlx::query( + r#" + INSERT INTO documents ( + id, user_id, filename, original_filename, file_path, file_size, + mime_type, ocr_status, ocr_failure_reason, ocr_error + ) VALUES ( + $1, $2, $3, $3, '/test/path', 1000, 'application/pdf', + 'failed', $4, $5 + ) + "# + ) + .bind(doc_id) + .bind(user_id) + .bind(filename) + .bind(reason) + .bind(error) + .execute(pool) + .await + .unwrap(); + + document_ids.insert(filename.clone(), doc_id); + } + + TestData { + user_id, + document_ids, + failure_scenarios, + } + } + + async fn verify_prefilled_data(pool: &PgPool, test_data: &TestData) { + let count: i64 = sqlx::query_scalar( + "SELECT COUNT(*) FROM documents WHERE user_id = $1 AND ocr_status = 'failed'" + ) + .bind(test_data.user_id) + .fetch_one(pool) + .await + .unwrap(); + + assert_eq!(count, test_data.failure_scenarios.len() as i64, + "All test documents should be inserted"); + } + + async fn test_failed_documents_migration(pool: &PgPool, test_data: &TestData) { + // Simulate the migration + let result = sqlx::query( + r#" + INSERT INTO failed_documents ( + user_id, filename, original_filename, file_path, file_size, + mime_type, error_message, failure_reason, failure_stage, ingestion_source + ) + SELECT + d.user_id, d.filename, d.original_filename, d.file_path, d.file_size, + d.mime_type, d.ocr_error, + CASE + WHEN d.ocr_failure_reason = 'low_ocr_confidence' THEN 'low_ocr_confidence' + WHEN d.ocr_failure_reason = 'timeout' THEN 'ocr_timeout' + WHEN d.ocr_failure_reason = 'memory_limit' THEN 'ocr_memory_limit' + WHEN d.ocr_failure_reason = 'pdf_parsing_error' THEN 'pdf_parsing_error' + WHEN d.ocr_failure_reason = 'corrupted' OR d.ocr_failure_reason = 'file_corrupted' THEN 'file_corrupted' + WHEN d.ocr_failure_reason = 'unsupported_format' THEN 'unsupported_format' + WHEN d.ocr_failure_reason = 'access_denied' THEN 'access_denied' + ELSE 'other' + END as failure_reason, + 'ocr' as failure_stage, + 'test_migration' as ingestion_source + FROM documents d + WHERE d.ocr_status = 'failed' AND d.user_id = $1 + "# + ) + .bind(test_data.user_id) + .execute(pool) + .await; + + assert!(result.is_ok(), "Migration should succeed"); + + // Verify all documents were migrated + let migrated_count: i64 = sqlx::query_scalar( + "SELECT COUNT(*) FROM failed_documents WHERE user_id = $1 AND ingestion_source = 'test_migration'" + ) + .bind(test_data.user_id) + .fetch_one(pool) + .await + .unwrap(); + + assert_eq!(migrated_count, test_data.failure_scenarios.len() as i64, + "All failed documents should be migrated"); + } + + async fn verify_schema_integrity(pool: &PgPool) { + // Check that all expected tables exist + let tables = sqlx::query( + "SELECT table_name FROM information_schema.tables WHERE table_schema = 'public'" + ) + .fetch_all(pool) + .await + .unwrap(); + + let table_names: Vec = tables.iter() + .map(|row| row.get("table_name")) + .collect(); + + assert!(table_names.contains(&"documents".to_string())); + assert!(table_names.contains(&"failed_documents".to_string())); + assert!(table_names.contains(&"users".to_string())); + assert!(table_names.contains(&"ocr_queue".to_string())); + + // Check that constraints exist on failed_documents + let constraints = sqlx::query( + r#" + SELECT constraint_name, constraint_type + FROM information_schema.table_constraints + WHERE table_name = 'failed_documents' AND constraint_type = 'CHECK' + "# + ) + .fetch_all(pool) + .await + .unwrap(); + + let constraint_names: Vec = constraints.iter() + .map(|row| row.get("constraint_name")) + .collect(); + + assert!(constraint_names.iter().any(|name| name.contains("failure_reason")), + "Should have check constraint for failure_reason"); + assert!(constraint_names.iter().any(|name| name.contains("failure_stage")), + "Should have check constraint for failure_stage"); + } + + async fn verify_data_consistency_after_migration(pool: &PgPool, test_data: &TestData) { + // Verify specific failure reason mappings + let mappings = vec![ + ("timeout_doc.pdf", "ocr_timeout"), + ("memory_doc.pdf", "ocr_memory_limit"), + ("corrupt_doc.pdf", "file_corrupted"), + ("low_conf_doc.pdf", "low_ocr_confidence"), + ]; + + for (filename, expected_reason) in mappings { + let result = sqlx::query( + "SELECT failure_reason FROM failed_documents WHERE filename = $1 AND user_id = $2" + ) + .bind(filename) + .bind(test_data.user_id) + .fetch_optional(pool) + .await + .unwrap(); + + assert!(result.is_some(), "Document {} should exist in failed_documents", filename); + + let actual_reason: String = result.unwrap().get("failure_reason"); + assert_eq!(actual_reason, expected_reason, + "Failure reason for {} should be mapped correctly", filename); + } + + // Verify all migrated documents have proper metadata + let all_migrated = sqlx::query( + "SELECT * FROM failed_documents WHERE user_id = $1" + ) + .bind(test_data.user_id) + .fetch_all(pool) + .await + .unwrap(); + + for row in all_migrated { + let failure_stage: String = row.get("failure_stage"); + assert_eq!(failure_stage, "ocr", "All migrated documents should have 'ocr' as failure_stage"); + + let filename: String = row.get("filename"); + assert!(test_data.document_ids.contains_key(&filename), + "Migrated document should be from our test data"); + } + } +} \ No newline at end of file diff --git a/tests/latest_migration_tests.rs b/tests/latest_migration_tests.rs new file mode 100644 index 0000000..efd174b --- /dev/null +++ b/tests/latest_migration_tests.rs @@ -0,0 +1,755 @@ +use readur::test_utils::TestContext; +use sqlx::{PgPool, Row}; +use uuid::Uuid; +use std::process::Command; +use std::path::Path; +use sha2::{Sha256, Digest}; + +#[cfg(test)] +mod latest_migration_tests { + use super::*; + + #[tokio::test] + async fn test_latest_migration_from_previous_state() { + // Step 1: Get the migration files and identify the latest two + let migration_files = get_sorted_migration_files(); + + if migration_files.len() < 2 { + println!("βœ… Only one or no migrations found - skipping previous state test"); + return; + } + + let second_to_last = &migration_files[migration_files.len() - 2]; + let latest = &migration_files[migration_files.len() - 1]; + + println!("πŸ”„ Testing migration from second-to-last to latest:"); + println!(" Previous: {}", extract_migration_name(second_to_last)); + println!(" Latest: {}", extract_migration_name(latest)); + + // Step 2: Create a fresh database and apply migrations up to second-to-last + let ctx = TestContext::new().await; + let pool = ctx.state.db.get_pool(); + + // Apply all migrations except the latest one using SQLx migration runner + let migration_files = get_sorted_migration_files(); + let target_index = migration_files.iter() + .position(|f| f == second_to_last) + .expect("Second-to-last migration not found"); + + // Apply migrations up to target_index (excluding the latest) + apply_selected_migrations(pool, &migration_files[..target_index+1]).await; + + // Step 3: Prefill the database with realistic data in the previous state + let test_data = prefill_database_for_previous_state(pool).await; + + // Step 4: Apply the latest migration + apply_single_migration(pool, latest).await; + + // Step 5: Validate the migration succeeded and data is intact + validate_latest_migration_success(pool, &test_data, latest).await; + + println!("βœ… Latest migration successfully applied from previous state"); + } + + #[tokio::test] + async fn test_latest_migration_with_edge_case_data() { + let migration_files = get_sorted_migration_files(); + + if migration_files.len() < 2 { + println!("βœ… Only one or no migrations found - skipping edge case test"); + return; + } + + let second_to_last = &migration_files[migration_files.len() - 2]; + let latest = &migration_files[migration_files.len() - 1]; + + println!("πŸ§ͺ Testing latest migration with edge case data:"); + println!(" Testing migration: {}", extract_migration_name(latest)); + + let ctx = TestContext::new().await; + let pool = ctx.state.db.get_pool(); + + // Apply migrations up to second-to-last + let migration_files = get_sorted_migration_files(); + let target_index = migration_files.iter() + .position(|f| f == second_to_last) + .expect("Second-to-last migration not found"); + apply_selected_migrations(pool, &migration_files[..target_index+1]).await; + + // Create edge case data that might break the migration + let edge_case_data = create_edge_case_data(pool).await; + + // Apply the latest migration + let migration_result = apply_single_migration_safe(pool, latest).await; + + match migration_result { + Ok(_) => { + println!("βœ… Latest migration handled edge cases successfully"); + validate_edge_case_migration(pool, &edge_case_data).await; + } + Err(e) => { + panic!("❌ Latest migration failed with edge case data: {:?}", e); + } + } + } + + #[tokio::test] + async fn test_latest_migration_rollback_safety() { + let migration_files = get_sorted_migration_files(); + + if migration_files.len() < 2 { + println!("βœ… Only one or no migrations found - skipping rollback safety test"); + return; + } + + let second_to_last = &migration_files[migration_files.len() - 2]; + let latest = &migration_files[migration_files.len() - 1]; + + println!("πŸ”’ Testing rollback safety for latest migration:"); + + let ctx = TestContext::new().await; + let pool = ctx.state.db.get_pool(); + + // Apply migrations up to second-to-last + let migration_files = get_sorted_migration_files(); + let target_index = migration_files.iter() + .position(|f| f == second_to_last) + .expect("Second-to-last migration not found"); + apply_selected_migrations(pool, &migration_files[..target_index+1]).await; + + // Capture schema snapshot before latest migration + let schema_before = capture_schema_snapshot(pool).await; + + // Apply latest migration + apply_single_migration(pool, latest).await; + + // Capture schema after latest migration + let schema_after = capture_schema_snapshot(pool).await; + + // Validate schema changes are reasonable + validate_schema_changes(&schema_before, &schema_after, latest); + + // Test that the migration doesn't break existing functionality + test_basic_database_operations(pool).await; + + println!("βœ… Latest migration rollback safety verified"); + } + + #[tokio::test] + async fn test_latest_migration_performance() { + let migration_files = get_sorted_migration_files(); + + if migration_files.len() < 1 { + println!("βœ… No migrations found - skipping performance test"); + return; + } + + let latest = &migration_files[migration_files.len() - 1]; + + println!("⚑ Testing performance of latest migration:"); + println!(" Migration: {}", extract_migration_name(latest)); + + let ctx = TestContext::new().await; + let pool = ctx.state.db.get_pool(); + + // Apply all migrations except the latest + if migration_files.len() > 1 { + let second_to_last = &migration_files[migration_files.len() - 2]; + let target_index = migration_files.iter() + .position(|f| f == second_to_last) + .expect("Second-to-last migration not found"); + apply_selected_migrations(pool, &migration_files[..target_index+1]).await; + } + + // Create a substantial amount of data + create_performance_test_data(pool, 1000).await; + + // Measure migration time + let start_time = std::time::Instant::now(); + apply_single_migration(pool, latest).await; + let migration_duration = start_time.elapsed(); + + println!("⏱️ Latest migration completed in: {:?}", migration_duration); + + // Performance assertion - should complete reasonably fast even with data + assert!( + migration_duration.as_secs() < 10, + "Latest migration took too long: {:?}. Consider optimizing for larger datasets.", + migration_duration + ); + + // Verify data integrity after migration + verify_data_integrity_after_performance_test(pool).await; + + println!("βœ… Latest migration performance acceptable"); + } + + // Helper functions + + struct TestData { + users: Vec, + documents: Vec, + failed_documents: Vec, + metadata: DatabaseMetadata, + } + + struct TestUser { + id: Uuid, + username: String, + email: String, + } + + struct TestDocument { + id: Uuid, + user_id: Uuid, + filename: String, + status: String, + } + + struct TestFailedDocument { + id: Uuid, + user_id: Uuid, + filename: String, + reason: String, + } + + struct DatabaseMetadata { + table_count: usize, + total_records: usize, + schema_version: String, + } + + struct SchemaSnapshot { + tables: Vec, + columns: std::collections::HashMap>, + constraints: Vec, + } + + fn get_sorted_migration_files() -> Vec { + let migrations_dir = Path::new("migrations"); + let mut files = Vec::new(); + + if let Ok(entries) = std::fs::read_dir(migrations_dir) { + for entry in entries { + if let Ok(entry) = entry { + let path = entry.path(); + if path.extension().and_then(|s| s.to_str()) == Some("sql") { + files.push(path.to_string_lossy().to_string()); + } + } + } + } + + files.sort(); + files + } + + fn extract_migration_name(filepath: &str) -> String { + Path::new(filepath) + .file_name() + .unwrap() + .to_string_lossy() + .to_string() + } + + async fn apply_selected_migrations(pool: &PgPool, migration_files: &[String]) { + // Create the migrations table if it doesn't exist + sqlx::query( + "CREATE TABLE IF NOT EXISTS _sqlx_migrations ( + version BIGINT PRIMARY KEY, + description TEXT NOT NULL, + installed_on TIMESTAMPTZ NOT NULL DEFAULT NOW(), + success BOOLEAN NOT NULL, + checksum BYTEA NOT NULL, + execution_time BIGINT NOT NULL + )" + ) + .execute(pool) + .await + .expect("Failed to create migrations table"); + + for migration_file in migration_files { + let migration_name = extract_migration_name(migration_file); + + // Extract version from filename + let version = migration_name + .split('_') + .next() + .and_then(|s| s.parse::().ok()) + .expect(&format!("Failed to parse migration version from {}", migration_name)); + + // Check if this migration is already applied + let exists = sqlx::query_scalar::<_, bool>( + "SELECT EXISTS(SELECT 1 FROM _sqlx_migrations WHERE version = $1)" + ) + .bind(version) + .fetch_one(pool) + .await + .unwrap_or(false); + + if exists { + println!(" ⏭️ Skipped (already applied): {}", migration_name); + continue; + } + + // Apply this migration + let content = std::fs::read_to_string(migration_file) + .expect(&format!("Failed to read migration file: {}", migration_file)); + + let start_time = std::time::Instant::now(); + + // Use raw SQL execution to handle complex PostgreSQL statements including functions + sqlx::raw_sql(&content) + .execute(pool) + .await + .expect(&format!("Failed to apply migration: {}", migration_name)); + + let execution_time = start_time.elapsed().as_millis() as i64; + let checksum = Sha256::digest(content.as_bytes()).to_vec(); + + // Record the migration as applied + sqlx::query( + "INSERT INTO _sqlx_migrations (version, description, success, checksum, execution_time) + VALUES ($1, $2, $3, $4, $5)" + ) + .bind(version) + .bind(migration_name.clone()) + .bind(true) + .bind(checksum) + .bind(execution_time) + .execute(pool) + .await + .expect("Failed to record migration"); + + println!(" βœ“ Applied: {}", migration_name); + } + } + + async fn apply_single_migration(pool: &PgPool, migration_file: &str) { + let result = apply_single_migration_safe(pool, migration_file).await; + result.expect(&format!("Failed to apply migration: {}", migration_file)); + } + + async fn apply_single_migration_safe(pool: &PgPool, migration_file: &str) -> Result<(), sqlx::Error> { + let content = std::fs::read_to_string(migration_file) + .expect(&format!("Failed to read migration file: {}", migration_file)); + + let migration_name = extract_migration_name(migration_file); + println!(" πŸ”„ Applying: {}", migration_name); + + // Use raw SQL execution to handle complex PostgreSQL statements including functions + sqlx::raw_sql(&content).execute(pool).await?; + + println!(" βœ… Applied: {}", migration_name); + Ok(()) + } + + async fn prefill_database_for_previous_state(pool: &PgPool) -> TestData { + let mut users = Vec::new(); + let mut documents = Vec::new(); + let mut failed_documents = Vec::new(); + + // Create test users + for i in 0..5 { + let user_id = Uuid::new_v4(); + let username = format!("previous_state_user_{}", i); + let email = format!("previous_{}@test.com", i); + + sqlx::query( + "INSERT INTO users (id, username, email, password_hash, role) VALUES ($1, $2, $3, $4, $5)" + ) + .bind(user_id) + .bind(&username) + .bind(&email) + .bind("test_hash") + .bind("user") + .execute(pool) + .await + .expect("Failed to create test user"); + + users.push(TestUser { id: user_id, username, email }); + } + + // Create test documents for each user + for user in &users { + for j in 0..3 { + let doc_id = Uuid::new_v4(); + let filename = format!("previous_doc_{}_{}.pdf", user.username, j); + let status = if j == 0 { "completed" } else { "failed" }; + + // Check if documents table exists before inserting + let table_exists = sqlx::query( + "SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'documents')" + ) + .fetch_one(pool) + .await + .unwrap() + .get::(0); + + if table_exists { + // Check if original_filename column exists + let original_filename_exists = sqlx::query_scalar::<_, bool>( + "SELECT EXISTS (SELECT 1 FROM information_schema.columns + WHERE table_name = 'documents' AND column_name = 'original_filename')" + ) + .fetch_one(pool) + .await + .unwrap_or(false); + + if original_filename_exists { + sqlx::query( + "INSERT INTO documents (id, user_id, filename, original_filename, file_path, file_size, mime_type, ocr_status) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8)" + ) + .bind(doc_id) + .bind(user.id) + .bind(&filename) + .bind(&filename) // Use same filename for original_filename + .bind(format!("/test/{}", filename)) + .bind(1024_i64) + .bind("application/pdf") + .bind(status) + .execute(pool) + .await + .expect("Failed to create test document"); + } else { + sqlx::query( + "INSERT INTO documents (id, user_id, filename, file_path, file_size, mime_type, ocr_status) + VALUES ($1, $2, $3, $4, $5, $6, $7)" + ) + .bind(doc_id) + .bind(user.id) + .bind(&filename) + .bind(format!("/test/{}", filename)) + .bind(1024_i64) + .bind("application/pdf") + .bind(status) + .execute(pool) + .await + .expect("Failed to create test document"); + } + } + + documents.push(TestDocument { + id: doc_id, + user_id: user.id, + filename, + status: status.to_string(), + }); + } + } + + // Create failed documents if the table exists + let failed_docs_exists = sqlx::query( + "SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'failed_documents')" + ) + .fetch_one(pool) + .await + .unwrap() + .get::(0); + + if failed_docs_exists { + for user in &users { + let failed_id = Uuid::new_v4(); + let filename = format!("failed_previous_{}.pdf", user.username); + + sqlx::query( + "INSERT INTO failed_documents (id, user_id, filename, failure_reason, failure_stage, ingestion_source) + VALUES ($1, $2, $3, $4, $5, $6)" + ) + .bind(failed_id) + .bind(user.id) + .bind(&filename) + .bind("other") + .bind("ocr") + .bind("test") + .execute(pool) + .await + .expect("Failed to create test failed document"); + + failed_documents.push(TestFailedDocument { + id: failed_id, + user_id: user.id, + filename, + reason: "other".to_string(), + }); + } + } + + let total_records = users.len() + documents.len() + failed_documents.len(); + + TestData { + users, + documents, + failed_documents, + metadata: DatabaseMetadata { + table_count: get_table_count(pool).await, + total_records, + schema_version: "previous".to_string(), + }, + } + } + + async fn create_edge_case_data(pool: &PgPool) -> TestData { + let mut users = Vec::new(); + let mut documents = Vec::new(); + let mut failed_documents = Vec::new(); + + // Create edge case users + let long_string = "a".repeat(50); + let edge_cases = vec![ + ("edge_empty_", ""), + ("edge_special_", "user@domain.com"), + ("edge_unicode_", "test_Γ±Γ€me@tΓ«st.com"), + ("edge_long_", long_string.as_str()), + ]; + + for (prefix, suffix) in edge_cases { + let user_id = Uuid::new_v4(); + let username = format!("{}{}", prefix, user_id.to_string().split('-').next().unwrap()); + let email = if suffix.is_empty() { + format!("{}@test.com", username) + } else { + suffix.to_string() + }; + + sqlx::query( + "INSERT INTO users (id, username, email, password_hash, role) VALUES ($1, $2, $3, $4, $5)" + ) + .bind(user_id) + .bind(&username) + .bind(&email) + .bind("test_hash") + .bind("user") + .execute(pool) + .await + .expect("Failed to create edge case user"); + + users.push(TestUser { id: user_id, username, email }); + } + + let total_records = users.len(); + + TestData { + users, + documents, + failed_documents, + metadata: DatabaseMetadata { + table_count: get_table_count(pool).await, + total_records, + schema_version: "edge_case".to_string(), + }, + } + } + + async fn validate_latest_migration_success(pool: &PgPool, test_data: &TestData, migration_file: &str) { + let migration_name = extract_migration_name(migration_file); + + // Verify that our test data still exists + let user_count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM users") + .fetch_one(pool) + .await + .unwrap(); + + assert!( + user_count >= test_data.users.len() as i64, + "User data lost after migration {}", + migration_name + ); + + // Check that the migration was applied successfully by verifying the schema + let current_table_count = get_table_count(pool).await; + + println!(" πŸ“Š Validation results:"); + println!(" - Users preserved: {} / {}", user_count, test_data.users.len()); + println!(" - Tables before: {}", test_data.metadata.table_count); + println!(" - Tables after: {}", current_table_count); + + // Test basic database operations still work + test_basic_database_operations(pool).await; + } + + async fn validate_edge_case_migration(pool: &PgPool, test_data: &TestData) { + // Verify edge case data survived migration + for user in &test_data.users { + let user_exists = sqlx::query( + "SELECT 1 FROM users WHERE id = $1" + ) + .bind(user.id) + .fetch_optional(pool) + .await + .unwrap(); + + assert!( + user_exists.is_some(), + "Edge case user {} lost during migration", + user.username + ); + } + + println!(" βœ… All edge case data preserved"); + } + + async fn capture_schema_snapshot(pool: &PgPool) -> SchemaSnapshot { + // Get all tables + let tables = sqlx::query( + "SELECT table_name FROM information_schema.tables + WHERE table_schema = 'public' AND table_type = 'BASE TABLE'" + ) + .fetch_all(pool) + .await + .unwrap(); + + let table_names: Vec = tables.iter() + .map(|row| row.get("table_name")) + .collect(); + + // Get columns for each table + let mut columns = std::collections::HashMap::new(); + for table in &table_names { + let table_columns = sqlx::query( + "SELECT column_name FROM information_schema.columns + WHERE table_schema = 'public' AND table_name = $1" + ) + .bind(table) + .fetch_all(pool) + .await + .unwrap(); + + let column_names: Vec = table_columns.iter() + .map(|row| row.get("column_name")) + .collect(); + + columns.insert(table.clone(), column_names); + } + + // Get constraints + let constraints = sqlx::query( + "SELECT constraint_name FROM information_schema.table_constraints + WHERE table_schema = 'public'" + ) + .fetch_all(pool) + .await + .unwrap(); + + let constraint_names: Vec = constraints.iter() + .map(|row| row.get("constraint_name")) + .collect(); + + SchemaSnapshot { + tables: table_names, + columns, + constraints: constraint_names, + } + } + + fn validate_schema_changes(before: &SchemaSnapshot, after: &SchemaSnapshot, migration_file: &str) { + let migration_name = extract_migration_name(migration_file); + + // Check for new tables + let new_tables: Vec<_> = after.tables.iter() + .filter(|table| !before.tables.contains(table)) + .collect(); + + if !new_tables.is_empty() { + println!(" πŸ“‹ New tables added by {}: {:?}", migration_name, new_tables); + } + + // Check for removed tables (should be rare and carefully considered) + let removed_tables: Vec<_> = before.tables.iter() + .filter(|table| !after.tables.contains(table)) + .collect(); + + if !removed_tables.is_empty() { + println!(" ⚠️ Tables removed by {}: {:?}", migration_name, removed_tables); + // Note: In production, you might want to assert this is intentional + } + + // Check for new constraints + let new_constraints: Vec<_> = after.constraints.iter() + .filter(|constraint| !before.constraints.contains(constraint)) + .collect(); + + if !new_constraints.is_empty() { + println!(" πŸ”’ New constraints added: {}", new_constraints.len()); + } + + println!(" βœ… Schema changes validated"); + } + + async fn test_basic_database_operations(pool: &PgPool) { + // Test that we can still perform basic operations + + // Test user creation + let test_user_id = Uuid::new_v4(); + let result = sqlx::query( + "INSERT INTO users (id, username, email, password_hash, role) + VALUES ($1, $2, $3, $4, $5)" + ) + .bind(test_user_id) + .bind("operation_test_user") + .bind("operation_test@test.com") + .bind("test_hash") + .bind("user") + .execute(pool) + .await; + + assert!(result.is_ok(), "Basic user creation should still work"); + + // Clean up + sqlx::query("DELETE FROM users WHERE id = $1") + .bind(test_user_id) + .execute(pool) + .await + .unwrap(); + + println!(" βœ… Basic database operations verified"); + } + + async fn create_performance_test_data(pool: &PgPool, user_count: usize) { + println!(" πŸ“Š Creating {} users for performance testing...", user_count); + + for i in 0..user_count { + let user_id = Uuid::new_v4(); + let username = format!("perf_user_{}", i); + let email = format!("perf_{}@test.com", i); + + sqlx::query( + "INSERT INTO users (id, username, email, password_hash, role) VALUES ($1, $2, $3, $4, $5)" + ) + .bind(user_id) + .bind(&username) + .bind(&email) + .bind("test_hash") + .bind("user") + .execute(pool) + .await + .expect("Failed to create performance test user"); + } + + println!(" βœ… Performance test data created"); + } + + async fn verify_data_integrity_after_performance_test(pool: &PgPool) { + let user_count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM users") + .fetch_one(pool) + .await + .unwrap(); + + assert!(user_count > 0, "Performance test data should exist after migration"); + + println!(" βœ… Data integrity verified: {} users", user_count); + } + + async fn get_table_count(pool: &PgPool) -> usize { + let tables = sqlx::query( + "SELECT COUNT(*) as count FROM information_schema.tables + WHERE table_schema = 'public' AND table_type = 'BASE TABLE'" + ) + .fetch_one(pool) + .await + .unwrap(); + + tables.get::("count") as usize + } +} \ No newline at end of file diff --git a/tests/migration_ordering_tests.rs b/tests/migration_ordering_tests.rs new file mode 100644 index 0000000..fda5124 --- /dev/null +++ b/tests/migration_ordering_tests.rs @@ -0,0 +1,364 @@ +use sqlx::PgPool; +use std::path::Path; +use std::fs; + +#[cfg(test)] +mod migration_ordering_tests { + use super::*; + + #[test] + fn test_migration_files_have_unique_timestamps() { + let migration_files = get_migration_files(); + let mut timestamps = Vec::new(); + + for file in &migration_files { + let timestamp = extract_timestamp(&file); + assert!( + !timestamps.contains(×tamp), + "Duplicate migration timestamp found: {} in file {}", + timestamp, file + ); + timestamps.push(timestamp); + } + + println!("βœ… All migration files have unique timestamps"); + } + + #[test] + fn test_migration_files_are_chronologically_ordered() { + let migration_files = get_migration_files(); + let mut timestamps: Vec = migration_files.iter() + .map(|f| extract_timestamp(f).parse::().unwrap()) + .collect(); + + let mut sorted_timestamps = timestamps.clone(); + sorted_timestamps.sort(); + + assert_eq!( + timestamps, sorted_timestamps, + "Migration files are not in chronological order" + ); + + println!("βœ… Migration files are chronologically ordered"); + } + + #[test] + fn test_migration_naming_convention() { + let migration_files = get_migration_files(); + + for file in &migration_files { + let filename = Path::new(&file).file_name().unwrap().to_str().unwrap(); + + // Check format: TIMESTAMP_description.sql + assert!( + filename.ends_with(".sql"), + "Migration file {} doesn't end with .sql", + filename + ); + + let parts: Vec<&str> = filename.split('_').collect(); + assert!( + parts.len() >= 2, + "Migration file {} doesn't follow TIMESTAMP_description format", + filename + ); + + // Check timestamp format (should be 14-17 digits) + let timestamp = parts[0]; + assert!( + timestamp.len() >= 14 && timestamp.len() <= 17, + "Migration timestamp {} has invalid length in file {}", + timestamp, filename + ); + + assert!( + timestamp.chars().all(|c| c.is_numeric()), + "Migration timestamp {} contains non-numeric characters in file {}", + timestamp, filename + ); + + // Check description + let description_parts = &parts[1..]; + let description = description_parts.join("_"); + let description_without_ext = description.trim_end_matches(".sql"); + + assert!( + !description_without_ext.is_empty(), + "Migration file {} has empty description", + filename + ); + + assert!( + description_without_ext.chars().all(|c| c.is_alphanumeric() || c == '_'), + "Migration description contains invalid characters in file {}", + filename + ); + } + + println!("βœ… All migration files follow naming convention"); + } + + #[test] + fn test_migration_dependencies() { + let migration_files = get_migration_files(); + let migration_contents = read_all_migrations(); + + // Check for common dependency patterns + for (i, (file, content)) in migration_contents.iter().enumerate() { + // Check if migration references tables that should exist + let referenced_tables = extract_referenced_tables(&content); + + for table in &referenced_tables { + // Skip system tables + if table.starts_with("pg_") || table.starts_with("information_schema") { + continue; + } + + // Check if table is created in current or previous migrations + let table_exists = table_exists_before_migration(&migration_contents, i, table); + + // Special cases for tables that might be created in the same migration + let creates_table = content.to_lowercase().contains(&format!("create table {}", table.to_lowercase())) || + content.to_lowercase().contains(&format!("create table if not exists {}", table.to_lowercase())); + + if !creates_table && !table_exists { + println!("Warning: Migration {} references table '{}' that may not exist", file, table); + } + } + } + + println!("βœ… Migration dependencies checked"); + } + + #[test] + fn test_no_drop_statements_in_migrations() { + let migration_contents = read_all_migrations(); + + for (file, content) in &migration_contents { + let lowercase_content = content.to_lowercase(); + + // Check for dangerous DROP statements + assert!( + !lowercase_content.contains("drop table") || lowercase_content.contains("drop table if exists"), + "Migration {} contains DROP TABLE statement without IF EXISTS", + file + ); + + assert!( + !lowercase_content.contains("drop database"), + "Migration {} contains dangerous DROP DATABASE statement", + file + ); + + assert!( + !lowercase_content.contains("drop schema"), + "Migration {} contains DROP SCHEMA statement", + file + ); + } + + println!("βœ… No dangerous DROP statements found"); + } + + #[test] + fn test_migration_transactions() { + let migration_contents = read_all_migrations(); + + for (file, content) in &migration_contents { + let lowercase_content = content.to_lowercase(); + + // Check that migrations don't contain explicit transaction statements + // (SQLx handles transactions automatically) + assert!( + !lowercase_content.contains("begin;") && !lowercase_content.contains("begin transaction"), + "Migration {} contains explicit BEGIN statement", + file + ); + + assert!( + !lowercase_content.contains("commit;"), + "Migration {} contains explicit COMMIT statement", + file + ); + + assert!( + !lowercase_content.contains("rollback;"), + "Migration {} contains explicit ROLLBACK statement", + file + ); + } + + println!("βœ… Migrations don't contain explicit transaction statements"); + } + + #[tokio::test] + async fn test_migration_idempotency() { + // This test would be run in CI to ensure migrations can be run multiple times + // We'll create a simple check here + let migration_contents = read_all_migrations(); + + for (file, content) in &migration_contents { + // Check for CREATE statements with IF NOT EXISTS + if content.to_lowercase().contains("create table") { + let has_if_not_exists = content.to_lowercase().contains("create table if not exists"); + if !has_if_not_exists { + println!("Warning: Migration {} creates table without IF NOT EXISTS", file); + } + } + + if content.to_lowercase().contains("create index") { + let has_if_not_exists = content.to_lowercase().contains("create index if not exists"); + if !has_if_not_exists { + println!("Warning: Migration {} creates index without IF NOT EXISTS", file); + } + } + } + + println!("βœ… Migration idempotency patterns checked"); + } + + #[test] + fn test_migration_comments() { + let migration_contents = read_all_migrations(); + let mut undocumented_migrations = Vec::new(); + + for (file, content) in &migration_contents { + // Check if migration has comments explaining what it does + let has_comments = content.contains("--") || content.contains("/*"); + + if !has_comments { + undocumented_migrations.push(file.clone()); + } + + // Check for specific important migrations that should have detailed comments + if file.contains("failed_documents") { + assert!( + content.contains("--") && content.len() > 200, + "Migration {} dealing with failed_documents should have detailed comments", + file + ); + } + } + + if !undocumented_migrations.is_empty() { + println!("Warning: The following migrations lack comments: {:?}", undocumented_migrations); + } + + println!("βœ… Migration documentation checked"); + } + + #[test] + fn test_migration_file_consistency() { + let migration_files = get_migration_files(); + + for file in &migration_files { + let content = fs::read_to_string(&file).unwrap(); + + // Check for consistent line endings + assert!( + !content.contains("\r\n") || !content.contains("\n"), + "Migration {} has mixed line endings", + file + ); + + // Check for trailing whitespace (optional check, can be disabled) + for (line_num, line) in content.lines().enumerate() { + if line.ends_with(' ') || line.ends_with('\t') { + println!("Note: Migration {} has trailing whitespace on line {} (style preference)", file, line_num + 1); + } + } + + // Check file ends with newline (optional check, can be disabled) + if !content.ends_with('\n') { + println!("Note: Migration {} doesn't end with newline (style preference)", file); + } + } + + println!("βœ… Migration file consistency verified"); + } + + // Helper functions + + fn get_migration_files() -> Vec { + let migrations_dir = Path::new("migrations"); + let mut files = Vec::new(); + + if let Ok(entries) = fs::read_dir(migrations_dir) { + for entry in entries { + if let Ok(entry) = entry { + let path = entry.path(); + if path.extension().and_then(|s| s.to_str()) == Some("sql") { + files.push(path.to_string_lossy().to_string()); + } + } + } + } + + files.sort(); + files + } + + fn extract_timestamp(filepath: &str) -> String { + let filename = Path::new(filepath).file_name().unwrap().to_str().unwrap(); + filename.split('_').next().unwrap().to_string() + } + + fn read_all_migrations() -> Vec<(String, String)> { + let migration_files = get_migration_files(); + let mut contents = Vec::new(); + + for file in migration_files { + if let Ok(content) = fs::read_to_string(&file) { + contents.push((file, content)); + } + } + + contents + } + + fn extract_referenced_tables(content: &str) -> Vec { + let mut tables = Vec::new(); + + // Simple regex-like patterns to find table references + let patterns = vec![ + "references ", "from ", "join ", "into ", "update ", "delete from ", + "alter table ", "constraint.*references", "on delete", "on update" + ]; + + for line in content.lines() { + let lower_line = line.to_lowercase(); + for pattern in &patterns { + if lower_line.contains(pattern) { + // Extract table name (simplified - real implementation would use regex) + let parts: Vec<&str> = lower_line.split_whitespace().collect(); + for (i, part) in parts.iter().enumerate() { + if part == &pattern.trim() && i + 1 < parts.len() { + let table_name = parts[i + 1].trim_matches(|c: char| !c.is_alphanumeric() && c != '_'); + if !table_name.is_empty() && !table_name.starts_with("$") { + tables.push(table_name.to_string()); + } + } + } + } + } + } + + tables.sort(); + tables.dedup(); + tables + } + + fn table_exists_before_migration(migrations: &[(String, String)], current_index: usize, table_name: &str) -> bool { + for i in 0..current_index { + let (_, content) = &migrations[i]; + if content.to_lowercase().contains(&format!("create table {}", table_name.to_lowercase())) || + content.to_lowercase().contains(&format!("create table if not exists {}", table_name.to_lowercase())) { + return true; + } + } + + // Check for base tables that should always exist + let base_tables = vec!["users", "documents", "settings"]; + base_tables.contains(&table_name) + } +} \ No newline at end of file diff --git a/tests/migration_schema_validation_tests.rs b/tests/migration_schema_validation_tests.rs new file mode 100644 index 0000000..2b7e76d --- /dev/null +++ b/tests/migration_schema_validation_tests.rs @@ -0,0 +1,521 @@ +use readur::test_utils::TestContext; +use sqlx::{PgPool, Row}; +use std::collections::{HashMap, HashSet}; + +#[cfg(test)] +mod migration_schema_validation_tests { + use super::*; + + #[tokio::test] + async fn test_all_expected_tables_exist() { + let ctx = TestContext::new().await; + let pool = ctx.state.db.get_pool(); + + let expected_tables = vec![ + "users", + "documents", + "document_labels", + "failed_documents", + "ignored_files", + "labels", + "notifications", + "ocr_metrics", + "ocr_queue", + "ocr_retry_history", + "processed_images", + "settings", + "source_labels", + "sources", + "webdav_directories", + "webdav_files", + "webdav_sync_state", + "_sqlx_migrations", + ]; + + let existing_tables = get_all_tables(pool).await; + + for table in expected_tables { + assert!( + existing_tables.contains(table), + "Expected table '{}' not found in database schema", + table + ); + } + + println!("βœ… All expected tables exist"); + } + + #[tokio::test] + async fn test_table_columns_and_types() { + let ctx = TestContext::new().await; + let pool = ctx.state.db.get_pool(); + + // Define expected columns for critical tables + let table_schemas = vec![ + TableSchema { + name: "documents", + columns: vec![ + ("id", "uuid", false), + ("user_id", "uuid", false), + ("filename", "text", false), + ("original_filename", "text", true), + ("file_path", "text", false), + ("file_size", "bigint", false), + ("file_hash", "character varying", true), + ("mime_type", "text", false), + ("content", "text", true), + ("tags", "ARRAY", true), + ("ocr_text", "text", true), + ("ocr_status", "character varying", false), + ("ocr_confidence", "real", true), + ("ocr_failure_reason", "text", true), + ("created_at", "timestamp with time zone", false), + ("updated_at", "timestamp with time zone", false), + ], + }, + TableSchema { + name: "failed_documents", + columns: vec![ + ("id", "uuid", false), + ("user_id", "uuid", true), + ("filename", "text", false), + ("failure_reason", "text", false), + ("failure_stage", "text", false), + ("ingestion_source", "text", false), + ("error_message", "text", true), + ("retry_count", "integer", true), + ("created_at", "timestamp with time zone", true), + ("updated_at", "timestamp with time zone", true), + ], + }, + TableSchema { + name: "ocr_queue", + columns: vec![ + ("id", "uuid", false), + ("document_id", "uuid", false), + ("priority", "integer", false), + ("status", "character varying", false), + ("error_message", "text", true), + ("processing_started_at", "timestamp with time zone", true), + ("processing_completed_at", "timestamp with time zone", true), + ("created_at", "timestamp with time zone", false), + ("updated_at", "timestamp with time zone", false), + ], + }, + ]; + + for schema in table_schemas { + validate_table_schema(pool, &schema).await; + } + } + + #[tokio::test] + async fn test_all_constraints_exist() { + let ctx = TestContext::new().await; + let pool = ctx.state.db.get_pool(); + + // Test primary keys + let primary_keys = vec![ + ("documents", "documents_pkey"), + ("users", "users_pkey"), + ("failed_documents", "failed_documents_pkey"), + ("ocr_queue", "ocr_queue_pkey"), + ("labels", "labels_pkey"), + ("settings", "settings_pkey"), + ]; + + for (table, constraint) in primary_keys { + let exists = constraint_exists(pool, table, constraint, "PRIMARY KEY").await; + assert!(exists, "Primary key '{}' not found on table '{}'", constraint, table); + } + + // Test foreign keys + let foreign_keys = vec![ + ("documents", "documents_user_id_fkey"), + ("failed_documents", "failed_documents_user_id_fkey"), + ("failed_documents", "failed_documents_existing_document_id_fkey"), + ("ocr_queue", "ocr_queue_document_id_fkey"), + ("document_labels", "document_labels_document_id_fkey"), + ("document_labels", "document_labels_label_id_fkey"), + ]; + + for (table, constraint) in foreign_keys { + let exists = constraint_exists(pool, table, constraint, "FOREIGN KEY").await; + assert!(exists, "Foreign key '{}' not found on table '{}'", constraint, table); + } + + // Test check constraints + let check_constraints = vec![ + ("failed_documents", "check_failure_reason"), + ("failed_documents", "check_failure_stage"), + ("documents", "check_ocr_status"), + ("users", "check_role"), + ]; + + for (table, constraint) in check_constraints { + let exists = constraint_exists(pool, table, constraint, "CHECK").await; + assert!(exists, "Check constraint '{}' not found on table '{}'", constraint, table); + } + + println!("βœ… All expected constraints exist"); + } + + #[tokio::test] + async fn test_indexes_for_performance() { + let ctx = TestContext::new().await; + let pool = ctx.state.db.get_pool(); + + let expected_indexes = vec![ + ("documents", "idx_documents_user_id"), + ("documents", "idx_documents_created_at"), + ("documents", "idx_documents_ocr_status"), + ("failed_documents", "idx_failed_documents_user_id"), + ("failed_documents", "idx_failed_documents_created_at"), + ("failed_documents", "idx_failed_documents_failure_reason"), + ("failed_documents", "idx_failed_documents_failure_stage"), + ("ocr_queue", "idx_ocr_queue_status"), + ("ocr_queue", "idx_ocr_queue_document_id"), + ]; + + for (table, index) in expected_indexes { + let exists = index_exists(pool, table, index).await; + assert!(exists, "Performance index '{}' not found on table '{}'", index, table); + } + + println!("βœ… All performance indexes exist"); + } + + #[tokio::test] + async fn test_views_and_functions() { + let ctx = TestContext::new().await; + let pool = ctx.state.db.get_pool(); + + // Test views + let expected_views = vec![ + "failed_documents_summary", + "legacy_failed_ocr_documents", + "ocr_analytics", + ]; + + let existing_views = get_all_views(pool).await; + + for view in expected_views { + assert!( + existing_views.contains(view), + "Expected view '{}' not found in database", + view + ); + } + + // Test functions + let expected_functions = vec![ + "add_document_to_ocr_queue", + "get_ocr_queue_stats", + ]; + + let existing_functions = get_all_functions(pool).await; + + for func in expected_functions { + assert!( + existing_functions.contains(func), + "Expected function '{}' not found in database", + func + ); + } + + println!("βœ… All views and functions exist"); + } + + #[tokio::test] + async fn test_enum_values_match_constraints() { + let ctx = TestContext::new().await; + let pool = ctx.state.db.get_pool(); + + // Test failure_reason enum values + let failure_reasons = vec![ + "duplicate_content", "duplicate_filename", "unsupported_format", + "file_too_large", "file_corrupted", "access_denied", + "low_ocr_confidence", "ocr_timeout", "ocr_memory_limit", + "pdf_parsing_error", "storage_quota_exceeded", "network_error", + "permission_denied", "virus_detected", "invalid_structure", + "policy_violation", "other" + ]; + + for reason in &failure_reasons { + let result = sqlx::query( + "SELECT 1 WHERE $1::text IN (SELECT unnest(enum_range(NULL::text)::text[]))" + ) + .bind(reason) + .fetch_optional(pool) + .await; + + // If this is not an enum type, test the CHECK constraint instead + if result.is_err() || result.unwrap().is_none() { + // Test by attempting insert with valid value (should succeed) + // We'll use a transaction that we rollback to avoid polluting test data + let mut tx = pool.begin().await.unwrap(); + + // First create a test user + let test_user_id = uuid::Uuid::new_v4(); + sqlx::query( + "INSERT INTO users (id, username, email, password_hash, role) VALUES ($1, $2, $3, $4, $5)" + ) + .bind(test_user_id) + .bind(format!("enum_test_{}", uuid::Uuid::new_v4())) + .bind(format!("enum_test_{}@test.com", uuid::Uuid::new_v4())) + .bind("test") + .bind("user") + .execute(&mut *tx) + .await + .unwrap(); + + let insert_result = sqlx::query( + "INSERT INTO failed_documents (user_id, filename, failure_reason, failure_stage, ingestion_source) + VALUES ($1, 'test.pdf', $2, 'ocr', 'test')" + ) + .bind(test_user_id) + .bind(reason) + .execute(&mut *tx) + .await; + + assert!(insert_result.is_ok(), + "Valid failure_reason '{}' should be accepted by constraint", reason); + + tx.rollback().await.unwrap(); + } + } + + // Test failure_stage enum values + let failure_stages = vec![ + "ingestion", "validation", "ocr", "storage", "processing", "sync" + ]; + + for stage in &failure_stages { + let mut tx = pool.begin().await.unwrap(); + + // Create test user + let test_user_id = uuid::Uuid::new_v4(); + sqlx::query( + "INSERT INTO users (id, username, email, password_hash, role) VALUES ($1, $2, $3, $4, $5)" + ) + .bind(test_user_id) + .bind(format!("stage_test_{}", uuid::Uuid::new_v4())) + .bind(format!("stage_test_{}@test.com", uuid::Uuid::new_v4())) + .bind("test") + .bind("user") + .execute(&mut *tx) + .await + .unwrap(); + + let insert_result = sqlx::query( + "INSERT INTO failed_documents (user_id, filename, failure_reason, failure_stage, ingestion_source) + VALUES ($1, 'test.pdf', 'other', $2, 'test')" + ) + .bind(test_user_id) + .bind(stage) + .execute(&mut *tx) + .await; + + assert!(insert_result.is_ok(), + "Valid failure_stage '{}' should be accepted by constraint", stage); + + tx.rollback().await.unwrap(); + } + + println!("βœ… All enum values match constraints"); + } + + #[tokio::test] + async fn test_migration_specific_changes() { + let ctx = TestContext::new().await; + let pool = ctx.state.db.get_pool(); + + // Test that failed_documents table has all columns from migration + let failed_docs_columns = get_table_columns(pool, "failed_documents").await; + + let migration_columns = vec![ + "id", "user_id", "filename", "original_filename", "original_path", + "file_path", "file_size", "file_hash", "mime_type", "content", "tags", + "ocr_text", "ocr_confidence", "ocr_word_count", "ocr_processing_time_ms", + "failure_reason", "failure_stage", "existing_document_id", + "ingestion_source", "error_message", "retry_count", "last_retry_at", + "created_at", "updated_at" + ]; + + for col in migration_columns { + assert!( + failed_docs_columns.contains(&col.to_string()), + "Column '{}' not found in failed_documents table", + col + ); + } + + // Test that documents table has ocr_failure_reason column + let docs_columns = get_table_columns(pool, "documents").await; + assert!( + docs_columns.contains(&"ocr_failure_reason".to_string()), + "ocr_failure_reason column not found in documents table" + ); + + // Test that the legacy view exists + let views = get_all_views(pool).await; + assert!( + views.contains("legacy_failed_ocr_documents"), + "legacy_failed_ocr_documents view not found" + ); + + println!("βœ… Migration-specific changes verified"); + } + + // Helper functions + + struct TableSchema { + name: &'static str, + columns: Vec<(&'static str, &'static str, bool)>, // (name, type, nullable) + } + + async fn get_all_tables(pool: &PgPool) -> HashSet { + let rows = sqlx::query( + "SELECT table_name FROM information_schema.tables + WHERE table_schema = 'public' AND table_type = 'BASE TABLE'" + ) + .fetch_all(pool) + .await + .unwrap(); + + rows.into_iter() + .map(|row| row.get("table_name")) + .collect() + } + + async fn get_all_views(pool: &PgPool) -> HashSet { + let rows = sqlx::query( + "SELECT table_name FROM information_schema.views WHERE table_schema = 'public'" + ) + .fetch_all(pool) + .await + .unwrap(); + + rows.into_iter() + .map(|row| row.get("table_name")) + .collect() + } + + async fn get_all_functions(pool: &PgPool) -> HashSet { + let rows = sqlx::query( + "SELECT routine_name FROM information_schema.routines + WHERE routine_schema = 'public' AND routine_type = 'FUNCTION'" + ) + .fetch_all(pool) + .await + .unwrap(); + + rows.into_iter() + .map(|row| row.get("routine_name")) + .collect() + } + + async fn get_table_columns(pool: &PgPool, table_name: &str) -> Vec { + let rows = sqlx::query( + "SELECT column_name FROM information_schema.columns + WHERE table_schema = 'public' AND table_name = $1" + ) + .bind(table_name) + .fetch_all(pool) + .await + .unwrap(); + + rows.into_iter() + .map(|row| row.get("column_name")) + .collect() + } + + async fn validate_table_schema(pool: &PgPool, schema: &TableSchema) { + let columns = sqlx::query( + "SELECT column_name, data_type, is_nullable + FROM information_schema.columns + WHERE table_schema = 'public' AND table_name = $1" + ) + .bind(schema.name) + .fetch_all(pool) + .await + .unwrap(); + + let column_map: HashMap = columns.into_iter() + .map(|row| { + let name: String = row.get("column_name"); + let data_type: String = row.get("data_type"); + let is_nullable: String = row.get("is_nullable"); + (name, (data_type, is_nullable == "YES")) + }) + .collect(); + + for (col_name, expected_type, nullable) in &schema.columns { + let column_info = column_map.get(*col_name); + assert!( + column_info.is_some(), + "Column '{}' not found in table '{}'", + col_name, schema.name + ); + + let (actual_type, actual_nullable) = column_info.unwrap(); + + // Type checking (handle array types specially) + if expected_type == &"ARRAY" { + assert!( + actual_type.contains("ARRAY") || actual_type.contains("[]"), + "Column '{}' in table '{}' expected array type but got '{}'", + col_name, schema.name, actual_type + ); + } else { + assert!( + actual_type.to_lowercase().contains(&expected_type.to_lowercase()), + "Column '{}' in table '{}' expected type '{}' but got '{}'", + col_name, schema.name, expected_type, actual_type + ); + } + + assert_eq!( + actual_nullable, nullable, + "Column '{}' in table '{}' nullable mismatch", + col_name, schema.name + ); + } + + println!("βœ… Schema validated for table '{}'", schema.name); + } + + async fn constraint_exists(pool: &PgPool, table: &str, constraint: &str, constraint_type: &str) -> bool { + let result = sqlx::query( + "SELECT 1 FROM information_schema.table_constraints + WHERE table_schema = 'public' + AND table_name = $1 + AND constraint_name = $2 + AND constraint_type = $3" + ) + .bind(table) + .bind(constraint) + .bind(constraint_type) + .fetch_optional(pool) + .await + .unwrap(); + + result.is_some() + } + + async fn index_exists(pool: &PgPool, table: &str, index: &str) -> bool { + let result = sqlx::query( + "SELECT 1 FROM pg_indexes + WHERE schemaname = 'public' + AND tablename = $1 + AND indexname = $2" + ) + .bind(table) + .bind(index) + .fetch_optional(pool) + .await + .unwrap(); + + result.is_some() + } +} \ No newline at end of file diff --git a/tests/pr_migration_validation_tests.rs b/tests/pr_migration_validation_tests.rs new file mode 100644 index 0000000..7845bce --- /dev/null +++ b/tests/pr_migration_validation_tests.rs @@ -0,0 +1,795 @@ +use readur::test_utils::TestContext; +use sqlx::{PgPool, Row}; +use uuid::Uuid; +use std::process::Command; + +#[cfg(test)] +mod pr_migration_validation_tests { + use super::*; + + #[tokio::test] + async fn test_new_migration_with_prefilled_data() { + // Check if this PR introduces any new migrations + let new_migrations = get_new_migrations_in_pr(); + + if new_migrations.is_empty() { + println!("βœ… No new migrations in this PR - skipping prefilled data test"); + return; + } + + println!("πŸ” Found {} new migration(s) in this PR:", new_migrations.len()); + for migration in &new_migrations { + println!(" - {}", migration); + } + + // Run the comprehensive test with prefilled data + let ctx = TestContext::new().await; + let pool = ctx.state.db.get_pool(); + + // Step 1: Prefill database with comprehensive test data + let test_data = prefill_comprehensive_test_data(pool).await; + println!("βœ… Prefilled database with {} test scenarios", test_data.scenarios.len()); + + // Step 2: Verify all migrations run successfully with prefilled data + verify_migrations_with_data(pool, &test_data).await; + + // Step 3: Test specific migration scenarios if they involve data transformation + if migration_involves_data_transformation(&new_migrations) { + test_data_transformation_integrity(pool, &test_data).await; + } + + // Step 4: Verify no data loss occurred + verify_no_data_loss(pool, &test_data).await; + + println!("βœ… All new migrations passed validation with prefilled data"); + } + + #[tokio::test] + async fn test_migration_rollback_safety() { + let new_migrations = get_new_migrations_in_pr(); + + if new_migrations.is_empty() { + println!("βœ… No new migrations in this PR - skipping rollback safety test"); + return; + } + + let ctx = TestContext::new().await; + let pool = ctx.state.db.get_pool(); + + // Create snapshot of schema before migrations + let schema_before = capture_schema_snapshot(pool).await; + + // Run migrations + let migration_result = sqlx::migrate!("./migrations").run(pool).await; + assert!(migration_result.is_ok(), "Migrations should succeed"); + + // Capture schema after migrations + let schema_after = capture_schema_snapshot(pool).await; + + // Verify schema changes are intentional + verify_schema_changes(&schema_before, &schema_after, &new_migrations); + + println!("βœ… Migration rollback safety verified"); + } + + #[tokio::test] + async fn test_migration_performance_impact() { + let new_migrations = get_new_migrations_in_pr(); + + if new_migrations.is_empty() { + return; + } + + let ctx = TestContext::new().await; + let pool = ctx.state.db.get_pool(); + + // Prefill with large dataset + create_performance_test_data(pool, 10000).await; + + // Measure migration execution time + let start = std::time::Instant::now(); + let result = sqlx::migrate!("./migrations").run(pool).await; + let duration = start.elapsed(); + + assert!(result.is_ok(), "Migrations should succeed"); + assert!( + duration.as_secs() < 30, + "Migrations took too long: {:?}. Consider optimizing for large datasets.", + duration + ); + + println!("βœ… Migration performance acceptable: {:?}", duration); + } + + // Data structures for comprehensive testing + + struct ComprehensiveTestData { + users: Vec, + documents: Vec, + scenarios: Vec, + total_records: usize, + } + + struct TestUser { + id: Uuid, + username: String, + role: String, + } + + struct TestDocument { + id: Uuid, + user_id: Uuid, + filename: String, + ocr_status: String, + failure_reason: Option, + metadata: DocumentMetadata, + } + + struct DocumentMetadata { + file_size: i64, + mime_type: String, + has_ocr_text: bool, + tags: Vec, + } + + struct TestScenario { + name: String, + description: String, + affected_tables: Vec, + record_count: usize, + } + + struct SchemaSnapshot { + tables: Vec, + indexes: Vec, + constraints: Vec, + views: Vec, + } + + struct TableInfo { + name: String, + columns: Vec, + row_count: i64, + } + + struct ColumnInfo { + name: String, + data_type: String, + is_nullable: bool, + } + + // Implementation functions + + async fn prefill_comprehensive_test_data(pool: &PgPool) -> ComprehensiveTestData { + let mut users = Vec::new(); + let mut documents = Vec::new(); + let mut scenarios = Vec::new(); + + // Create diverse user types + let user_types = vec![ + ("admin", "admin"), + ("regular", "user"), + ("readonly", "user"), + ]; + + for (user_type, role) in user_types { + let user = create_test_user_with_role(pool, user_type, role).await; + users.push(user); + } + + // Create various document scenarios + let document_scenarios = vec![ + // Successful documents + ("success_high_conf.pdf", "completed", None, 0.95, true), + ("success_medium_conf.pdf", "completed", None, 0.75, true), + ("success_with_tags.pdf", "completed", None, 0.85, true), + + // Failed documents with different reasons + ("fail_low_confidence.pdf", "failed", Some("low_ocr_confidence"), 0.3, true), + ("fail_timeout.pdf", "failed", Some("timeout"), 0.0, false), + ("fail_memory.pdf", "failed", Some("memory_limit"), 0.0, false), + ("fail_corrupted.pdf", "failed", Some("file_corrupted"), 0.0, false), + ("fail_unsupported.xyz", "failed", Some("unsupported_format"), 0.0, false), + ("fail_access_denied.pdf", "failed", Some("access_denied"), 0.0, false), + ("fail_parsing.pdf", "failed", Some("pdf_parsing_error"), 0.0, false), + ("fail_unknown.pdf", "failed", Some("unknown_error"), 0.0, false), + ("fail_null_reason.pdf", "failed", None, 0.0, false), + + // Pending documents + ("pending_new.pdf", "pending", None, 0.0, false), + ("pending_retry.pdf", "pending", None, 0.0, false), + + // Edge cases + ("edge_empty_file.pdf", "failed", Some("file_corrupted"), 0.0, false), + ("edge_huge_file.pdf", "failed", Some("file_too_large"), 0.0, false), + ("edge_special_chars_Β§.pdf", "completed", None, 0.9, true), + ]; + + // Create documents for each user + for user in &users { + for (filename, status, failure_reason, confidence, has_text) in &document_scenarios { + let doc = create_test_document( + pool, + user.id, + filename, + status, + failure_reason.as_deref(), + *confidence, + *has_text + ).await; + documents.push(doc); + } + } + + // Create OCR queue entries for some documents + for doc in documents.iter().filter(|d| d.ocr_status == "pending" || d.ocr_status == "failed") { + create_ocr_queue_entry(pool, doc.id).await; + } + + // Create scenarios description + scenarios.push(TestScenario { + name: "User Management".to_string(), + description: "Different user roles and permissions".to_string(), + affected_tables: vec!["users".to_string()], + record_count: users.len(), + }); + + scenarios.push(TestScenario { + name: "Document Processing".to_string(), + description: "Various document states and failure scenarios".to_string(), + affected_tables: vec!["documents".to_string(), "failed_documents".to_string()], + record_count: documents.len(), + }); + + scenarios.push(TestScenario { + name: "OCR Queue".to_string(), + description: "OCR processing queue with retries".to_string(), + affected_tables: vec!["ocr_queue".to_string()], + record_count: documents.iter().filter(|d| d.ocr_status != "completed").count(), + }); + + let total_records = users.len() + documents.len(); + + ComprehensiveTestData { + users, + documents, + scenarios, + total_records, + } + } + + async fn create_test_user_with_role(pool: &PgPool, user_type: &str, role: &str) -> TestUser { + let id = Uuid::new_v4(); + let username = format!("test_{}_{}", user_type, Uuid::new_v4().to_string().split('-').next().unwrap()); + + sqlx::query( + "INSERT INTO users (id, username, email, password_hash, role) VALUES ($1, $2, $3, $4, $5)" + ) + .bind(id) + .bind(&username) + .bind(format!("{}@test.com", username)) + .bind("test_hash") + .bind(role) + .execute(pool) + .await + .unwrap(); + + TestUser { id, username, role: role.to_string() } + } + + async fn create_test_document( + pool: &PgPool, + user_id: Uuid, + filename: &str, + status: &str, + failure_reason: Option<&str>, + confidence: f32, + has_text: bool, + ) -> TestDocument { + let id = Uuid::new_v4(); + let file_size = match filename { + f if f.contains("huge") => 104857600, // 100MB + f if f.contains("empty") => 0, + _ => 1024 * (1 + (id.as_bytes()[0] as i64)), // Variable size + }; + + let mime_type = if filename.ends_with(".pdf") { + "application/pdf" + } else { + "application/octet-stream" + }; + + let tags = if filename.contains("tags") { + vec!["important", "reviewed", "2024"] + } else { + vec![] + }; + + let ocr_text = if has_text { + Some(format!("Sample OCR text for document {}", filename)) + } else { + None + }; + + sqlx::query( + r#" + INSERT INTO documents ( + id, user_id, filename, original_filename, file_path, file_size, + mime_type, ocr_status, ocr_failure_reason, ocr_confidence, ocr_text, tags + ) VALUES ( + $1, $2, $3, $3, $4, $5, $6, $7, $8, $9, $10, $11 + ) + "# + ) + .bind(id) + .bind(user_id) + .bind(filename) + .bind(format!("/test/files/{}", filename)) + .bind(file_size) + .bind(mime_type) + .bind(status) + .bind(failure_reason) + .bind(if confidence > 0.0 { Some(confidence) } else { None }) + .bind(ocr_text) + .bind(&tags) + .execute(pool) + .await + .unwrap(); + + TestDocument { + id, + user_id, + filename: filename.to_string(), + ocr_status: status.to_string(), + failure_reason: failure_reason.map(|s| s.to_string()), + metadata: DocumentMetadata { + file_size, + mime_type: mime_type.to_string(), + has_ocr_text: has_text, + tags: tags.iter().map(|s| s.to_string()).collect(), + }, + } + } + + async fn create_ocr_queue_entry(pool: &PgPool, document_id: Uuid) { + sqlx::query( + "INSERT INTO ocr_queue (document_id, priority, status) VALUES ($1, $2, $3)" + ) + .bind(document_id) + .bind(1) + .bind("pending") + .execute(pool) + .await + .unwrap(); + } + + async fn verify_migrations_with_data(pool: &PgPool, test_data: &ComprehensiveTestData) { + // Count records before any potential data migration + let doc_count_before: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM documents") + .fetch_one(pool) + .await + .unwrap(); + + let user_count_before: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM users") + .fetch_one(pool) + .await + .unwrap(); + + println!("πŸ“Š Database state before migration verification:"); + println!(" - Users: {}", user_count_before); + println!(" - Documents: {}", doc_count_before); + + // Verify failed document migration if applicable + let failed_docs: i64 = sqlx::query_scalar( + "SELECT COUNT(*) FROM documents WHERE ocr_status = 'failed'" + ) + .fetch_one(pool) + .await + .unwrap(); + + if failed_docs > 0 { + println!(" - Failed documents to migrate: {}", failed_docs); + + // Verify migration mapping works correctly + let mapping_test = sqlx::query( + r#" + SELECT + ocr_failure_reason, + COUNT(*) as count, + CASE + WHEN ocr_failure_reason = 'low_ocr_confidence' THEN 'low_ocr_confidence' + WHEN ocr_failure_reason = 'timeout' THEN 'ocr_timeout' + WHEN ocr_failure_reason = 'memory_limit' THEN 'ocr_memory_limit' + WHEN ocr_failure_reason = 'pdf_parsing_error' THEN 'pdf_parsing_error' + WHEN ocr_failure_reason = 'corrupted' OR ocr_failure_reason = 'file_corrupted' THEN 'file_corrupted' + WHEN ocr_failure_reason = 'unsupported_format' THEN 'unsupported_format' + WHEN ocr_failure_reason = 'access_denied' THEN 'access_denied' + ELSE 'other' + END as mapped_reason + FROM documents + WHERE ocr_status = 'failed' + GROUP BY ocr_failure_reason + "# + ) + .fetch_all(pool) + .await + .unwrap(); + + println!(" - Failure reason mappings:"); + for row in mapping_test { + let original: Option = row.get("ocr_failure_reason"); + let mapped: String = row.get("mapped_reason"); + let count: i64 = row.get("count"); + println!(" {:?} -> {} ({} documents)", original, mapped, count); + } + } + } + + async fn test_data_transformation_integrity(pool: &PgPool, test_data: &ComprehensiveTestData) { + // Test that data transformations maintain integrity + println!("πŸ”„ Testing data transformation integrity..."); + + // Check if failed_documents table exists (indicating migration ran) + let failed_docs_exists = sqlx::query( + "SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'failed_documents')" + ) + .fetch_one(pool) + .await + .unwrap() + .get::(0); + + if failed_docs_exists { + // Verify all failed documents were migrated correctly + let migrated_count: i64 = sqlx::query_scalar( + "SELECT COUNT(*) FROM failed_documents WHERE failure_stage = 'ocr'" + ) + .fetch_one(pool) + .await + .unwrap(); + + let expected_failed = test_data.documents.iter() + .filter(|d| d.ocr_status == "failed") + .count(); + + assert!( + migrated_count >= expected_failed as i64, + "Not all failed documents were migrated: expected at least {}, got {}", + expected_failed, migrated_count + ); + + // Verify data integrity for specific test cases + for doc in test_data.documents.iter().filter(|d| d.ocr_status == "failed") { + let migrated = sqlx::query( + "SELECT * FROM failed_documents WHERE filename = $1" + ) + .bind(&doc.filename) + .fetch_optional(pool) + .await + .unwrap(); + + assert!( + migrated.is_some(), + "Failed document '{}' was not migrated", + doc.filename + ); + + if let Some(row) = migrated { + let failure_reason: String = row.get("failure_reason"); + + // Verify reason mapping + match doc.failure_reason.as_deref() { + Some("timeout") => assert_eq!(failure_reason, "ocr_timeout"), + Some("memory_limit") => assert_eq!(failure_reason, "ocr_memory_limit"), + Some("file_corrupted") => assert_eq!(failure_reason, "file_corrupted"), + Some("low_ocr_confidence") => assert_eq!(failure_reason, "low_ocr_confidence"), + Some("unknown_error") | None => assert_eq!(failure_reason, "other"), + _ => {} + } + } + } + } + + println!("βœ… Data transformation integrity verified"); + } + + async fn verify_no_data_loss(pool: &PgPool, test_data: &ComprehensiveTestData) { + println!("πŸ” Verifying no data loss occurred..."); + + // Check user count + let user_count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM users") + .fetch_one(pool) + .await + .unwrap(); + + assert!( + user_count >= test_data.users.len() as i64, + "User data loss detected: expected at least {}, got {}", + test_data.users.len(), user_count + ); + + // Check total document count (including migrated) + let doc_count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM documents") + .fetch_one(pool) + .await + .unwrap(); + + let failed_doc_count: i64 = sqlx::query_scalar( + "SELECT COUNT(*) FROM failed_documents WHERE ingestion_source IS NOT NULL" + ) + .fetch_one(pool) + .await + .unwrap_or(0); + + let total_docs = doc_count + failed_doc_count; + let expected_docs = test_data.documents.len() as i64; + + assert!( + total_docs >= expected_docs, + "Document data loss detected: expected at least {}, got {} (documents: {}, failed_documents: {})", + expected_docs, total_docs, doc_count, failed_doc_count + ); + + println!("βœ… No data loss detected"); + } + + async fn capture_schema_snapshot(pool: &PgPool) -> SchemaSnapshot { + let tables = sqlx::query( + r#" + SELECT + t.table_name, + COUNT(c.column_name) as column_count + FROM information_schema.tables t + LEFT JOIN information_schema.columns c + ON t.table_name = c.table_name + AND t.table_schema = c.table_schema + WHERE t.table_schema = 'public' + AND t.table_type = 'BASE TABLE' + GROUP BY t.table_name + ORDER BY t.table_name + "# + ) + .fetch_all(pool) + .await + .unwrap(); + + let mut table_infos = Vec::new(); + for table_row in tables { + let table_name: String = table_row.get("table_name"); + + // Get columns for this table + let columns = sqlx::query( + r#" + SELECT column_name, data_type, is_nullable + FROM information_schema.columns + WHERE table_schema = 'public' AND table_name = $1 + ORDER BY ordinal_position + "# + ) + .bind(&table_name) + .fetch_all(pool) + .await + .unwrap(); + + let column_infos: Vec = columns.into_iter() + .map(|col| ColumnInfo { + name: col.get("column_name"), + data_type: col.get("data_type"), + is_nullable: col.get::("is_nullable") == "YES", + }) + .collect(); + + // Get row count + let count_query = format!("SELECT COUNT(*) FROM {}", table_name); + let row_count: i64 = sqlx::query_scalar(&count_query) + .fetch_one(pool) + .await + .unwrap_or(0); + + table_infos.push(TableInfo { + name: table_name, + columns: column_infos, + row_count, + }); + } + + // Get indexes + let indexes = sqlx::query( + "SELECT indexname FROM pg_indexes WHERE schemaname = 'public'" + ) + .fetch_all(pool) + .await + .unwrap() + .into_iter() + .map(|row| row.get("indexname")) + .collect(); + + // Get constraints + let constraints = sqlx::query( + r#" + SELECT constraint_name || ' (' || constraint_type || ')' as constraint_info + FROM information_schema.table_constraints + WHERE constraint_schema = 'public' + "# + ) + .fetch_all(pool) + .await + .unwrap() + .into_iter() + .map(|row| row.get("constraint_info")) + .collect(); + + // Get views + let views = sqlx::query( + "SELECT table_name FROM information_schema.views WHERE table_schema = 'public'" + ) + .fetch_all(pool) + .await + .unwrap() + .into_iter() + .map(|row| row.get("table_name")) + .collect(); + + SchemaSnapshot { + tables: table_infos, + indexes, + constraints, + views, + } + } + + fn verify_schema_changes(before: &SchemaSnapshot, after: &SchemaSnapshot, migrations: &[String]) { + println!("πŸ“‹ Verifying schema changes..."); + + // Check for new tables + let before_tables: std::collections::HashSet<_> = before.tables.iter().map(|t| &t.name).collect(); + let after_tables: std::collections::HashSet<_> = after.tables.iter().map(|t| &t.name).collect(); + + let new_tables: Vec<_> = after_tables.difference(&before_tables).collect(); + if !new_tables.is_empty() { + println!(" New tables added: {:?}", new_tables); + } + + // Check for removed tables (should not happen in migrations) + let removed_tables: Vec<_> = before_tables.difference(&after_tables).collect(); + assert!( + removed_tables.is_empty(), + "Tables were removed in migration: {:?}", + removed_tables + ); + + // Check for column changes + for after_table in &after.tables { + if let Some(before_table) = before.tables.iter().find(|t| t.name == after_table.name) { + let before_cols: std::collections::HashSet<_> = before_table.columns.iter().map(|c| &c.name).collect(); + let after_cols: std::collections::HashSet<_> = after_table.columns.iter().map(|c| &c.name).collect(); + + let new_cols: Vec<_> = after_cols.difference(&before_cols).collect(); + if !new_cols.is_empty() { + println!(" New columns in {}: {:?}", after_table.name, new_cols); + } + + let removed_cols: Vec<_> = before_cols.difference(&after_cols).collect(); + if !removed_cols.is_empty() { + println!(" ⚠️ Removed columns in {}: {:?}", after_table.name, removed_cols); + } + } + } + + println!("βœ… Schema changes verified"); + } + + async fn create_performance_test_data(pool: &PgPool, count: usize) { + println!("πŸƒ Creating {} records for performance testing...", count); + + // Create a test user + let user_id = Uuid::new_v4(); + sqlx::query( + "INSERT INTO users (id, username, email, password_hash, role) VALUES ($1, $2, $3, $4, $5)" + ) + .bind(user_id) + .bind("perf_test_user") + .bind("perf@test.com") + .bind("test") + .bind("user") + .execute(pool) + .await + .unwrap(); + + // Batch insert documents + let batch_size = 100; + for batch_start in (0..count).step_by(batch_size) { + let batch_end = (batch_start + batch_size).min(count); + + let mut query = String::from( + "INSERT INTO documents (id, user_id, filename, original_filename, file_path, file_size, mime_type, ocr_status, ocr_failure_reason) VALUES " + ); + + for i in batch_start..batch_end { + if i > batch_start { + query.push_str(", "); + } + + let doc_id = Uuid::new_v4(); + let status = if i % 3 == 0 { "failed" } else { "completed" }; + let failure_reason = if status == "failed" { + match i % 5 { + 0 => "'timeout'", + 1 => "'memory_limit'", + 2 => "'file_corrupted'", + 3 => "'low_ocr_confidence'", + _ => "'unknown_error'", + } + } else { + "NULL" + }; + + query.push_str(&format!( + "('{}', '{}', 'perf_doc_{}.pdf', 'perf_doc_{}.pdf', '/test/perf_{}.pdf', 1024, 'application/pdf', '{}', {})", + doc_id, user_id, i, i, i, status, failure_reason + )); + } + + sqlx::query(&query).execute(pool).await.unwrap(); + } + + println!("βœ… Created {} test documents", count); + } + + fn get_new_migrations_in_pr() -> Vec { + // Check if we're in a CI environment or have a base branch to compare against + let base_branch = std::env::var("GITHUB_BASE_REF") + .or_else(|_| std::env::var("BASE_BRANCH")) + .unwrap_or_else(|_| "main".to_string()); + + let output = Command::new("git") + .args(["diff", "--name-only", &format!("origin/{}", base_branch), "HEAD", "--", "migrations/"]) + .output(); + + match output { + Ok(output) if output.status.success() => { + let files = String::from_utf8_lossy(&output.stdout); + files + .lines() + .filter(|line| line.ends_with(".sql") && !line.is_empty()) + .map(|s| s.to_string()) + .collect() + } + _ => { + // Fallback: check for uncommitted migration files + let output = Command::new("git") + .args(["status", "--porcelain", "migrations/"]) + .output() + .unwrap_or_else(|_| panic!("Failed to run git status")); + + if output.status.success() { + let files = String::from_utf8_lossy(&output.stdout); + files + .lines() + .filter(|line| line.contains(".sql") && (line.starts_with("A ") || line.starts_with("??"))) + .map(|line| line.split_whitespace().last().unwrap_or("").to_string()) + .filter(|f| !f.is_empty()) + .collect() + } else { + Vec::new() + } + } + } + } + + fn migration_involves_data_transformation(migrations: &[String]) -> bool { + // Check if any migration file contains data transformation keywords + for migration_file in migrations { + if let Ok(content) = std::fs::read_to_string(migration_file) { + let lowercase = content.to_lowercase(); + if lowercase.contains("insert into") && lowercase.contains("select") || + lowercase.contains("update") && lowercase.contains("set") || + lowercase.contains("migrate") || + lowercase.contains("transform") || + lowercase.contains("failed_documents") { + return true; + } + } + } + false + } +} \ No newline at end of file diff --git a/tests/simplified_migration_schema_validation.rs b/tests/simplified_migration_schema_validation.rs new file mode 100644 index 0000000..d21989f --- /dev/null +++ b/tests/simplified_migration_schema_validation.rs @@ -0,0 +1,209 @@ +use readur::test_utils::TestContext; +use sqlx::Row; +use std::collections::HashSet; + +#[cfg(test)] +mod simplified_migration_schema_validation_tests { + use super::*; + + #[tokio::test] + async fn test_core_tables_exist() { + let ctx = TestContext::new().await; + let pool = ctx.state.db.get_pool(); + + let core_tables = vec![ + "users", + "documents", + "failed_documents", + "ocr_queue", + "settings", + ]; + + let existing_tables = get_all_tables(pool).await; + + for table in core_tables { + assert!( + existing_tables.contains(table), + "Core table '{}' not found in database schema", + table + ); + } + + println!("βœ… All core tables exist"); + println!("Found {} total tables in database", existing_tables.len()); + } + + #[tokio::test] + async fn test_basic_schema_integrity() { + let ctx = TestContext::new().await; + let pool = ctx.state.db.get_pool(); + + // Test that we can query key tables without errors + let user_count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM users") + .fetch_one(pool) + .await + .unwrap(); + + let doc_count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM documents") + .fetch_one(pool) + .await + .unwrap(); + + let failed_doc_count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM failed_documents") + .fetch_one(pool) + .await + .unwrap(); + + println!("βœ… Basic schema integrity verified"); + println!(" - Users: {}", user_count); + println!(" - Documents: {}", doc_count); + println!(" - Failed documents: {}", failed_doc_count); + + // All counts should be non-negative (basic sanity check) + assert!(user_count >= 0); + assert!(doc_count >= 0); + assert!(failed_doc_count >= 0); + } + + #[tokio::test] + async fn test_migration_tables_structure() { + let ctx = TestContext::new().await; + let pool = ctx.state.db.get_pool(); + + // Test that failed_documents table has the expected columns for migration + let columns = sqlx::query( + "SELECT column_name FROM information_schema.columns + WHERE table_schema = 'public' AND table_name = 'failed_documents'" + ) + .fetch_all(pool) + .await + .unwrap(); + + let column_names: Vec = columns.iter() + .map(|row| row.get("column_name")) + .collect(); + + let migration_critical_columns = vec![ + "id", "user_id", "filename", "failure_reason", "failure_stage", "ingestion_source" + ]; + + for col in migration_critical_columns { + assert!( + column_names.contains(&col.to_string()), + "Critical column '{}' not found in failed_documents table", + col + ); + } + + println!("βœ… Migration-critical table structure verified"); + println!(" failed_documents has {} columns", column_names.len()); + } + + #[tokio::test] + async fn test_constraint_sampling() { + let ctx = TestContext::new().await; + let pool = ctx.state.db.get_pool(); + + // Test a few key constraints exist + let constraints = sqlx::query( + "SELECT constraint_name, constraint_type + FROM information_schema.table_constraints + WHERE table_schema = 'public'" + ) + .fetch_all(pool) + .await + .unwrap(); + + let primary_keys: Vec = constraints.iter() + .filter(|row| row.get::("constraint_type") == "PRIMARY KEY") + .map(|row| row.get("constraint_name")) + .collect(); + + let foreign_keys: Vec = constraints.iter() + .filter(|row| row.get::("constraint_type") == "FOREIGN KEY") + .map(|row| row.get("constraint_name")) + .collect(); + + let check_constraints: Vec = constraints.iter() + .filter(|row| row.get::("constraint_type") == "CHECK") + .map(|row| row.get("constraint_name")) + .collect(); + + println!("βœ… Database constraints verified"); + println!(" - Primary keys: {}", primary_keys.len()); + println!(" - Foreign keys: {}", foreign_keys.len()); + println!(" - Check constraints: {}", check_constraints.len()); + + // Basic sanity checks + assert!(primary_keys.len() > 0, "Should have at least one primary key"); + assert!(foreign_keys.len() > 0, "Should have at least one foreign key"); + } + + #[tokio::test] + async fn test_migration_workflow_readiness() { + let ctx = TestContext::new().await; + let pool = ctx.state.db.get_pool(); + + // Test that the database is ready for the migration workflow we test + // This includes checking that we can insert test data successfully + + // Create a test user + let user_id = uuid::Uuid::new_v4(); + let username = format!("migration_test_{}", user_id.to_string().split('-').next().unwrap()); + + let user_result = sqlx::query( + "INSERT INTO users (id, username, email, password_hash, role) VALUES ($1, $2, $3, $4, $5)" + ) + .bind(user_id) + .bind(&username) + .bind(format!("{}@test.com", username)) + .bind("test_hash") + .bind("user") + .execute(pool) + .await; + + assert!(user_result.is_ok(), "Should be able to create test user"); + + // Test that failed_documents accepts valid data + let failed_doc_result = sqlx::query( + "INSERT INTO failed_documents (user_id, filename, failure_reason, failure_stage, ingestion_source) + VALUES ($1, 'test.pdf', 'other', 'ocr', 'test')" + ) + .bind(user_id) + .execute(pool) + .await; + + assert!(failed_doc_result.is_ok(), "Should be able to insert into failed_documents"); + + // Clean up + sqlx::query("DELETE FROM failed_documents WHERE user_id = $1") + .bind(user_id) + .execute(pool) + .await + .unwrap(); + + sqlx::query("DELETE FROM users WHERE id = $1") + .bind(user_id) + .execute(pool) + .await + .unwrap(); + + println!("βœ… Migration workflow readiness verified"); + } + + // Helper functions + + async fn get_all_tables(pool: &sqlx::PgPool) -> HashSet { + let rows = sqlx::query( + "SELECT table_name FROM information_schema.tables + WHERE table_schema = 'public' AND table_type = 'BASE TABLE'" + ) + .fetch_all(pool) + .await + .unwrap(); + + rows.into_iter() + .map(|row| row.get("table_name")) + .collect() + } +} \ No newline at end of file