diff --git a/src/main.rs b/src/main.rs index 552e330..98d0e02 100644 --- a/src/main.rs +++ b/src/main.rs @@ -173,29 +173,36 @@ async fn main() -> anyhow::Result<()> { // Run SQLx migrations info!("Running SQLx migrations..."); let migrations = sqlx::migrate!("./migrations"); - info!("Found {} migrations", migrations.migrations.len()); + let total_migrations = migrations.migrations.len(); - for migration in migrations.migrations.iter() { - info!("Migration available: {} - {}", migration.version, migration.description); - } - - // Check current migration status - let applied_result = sqlx::query("SELECT version, description FROM _sqlx_migrations ORDER BY version") - .fetch_all(web_db.get_pool()) - .await; - - match applied_result { - Ok(rows) => { - info!("Currently applied migrations:"); - for row in rows { - let version: i64 = row.get("version"); - let description: String = row.get("description"); - info!(" - {} {}", version, description); + if total_migrations > 0 { + // Verify migrations are in correct chronological order + let mut is_ordered = true; + let mut prev_version = 0i64; + + for migration in migrations.migrations.iter() { + if migration.version <= prev_version { + error!("❌ Migration {} is out of order (previous: {})", migration.version, prev_version); + is_ordered = false; } + prev_version = migration.version; } - Err(e) => { - info!("No existing migrations found (this is normal for first run): {}", e); + + if is_ordered { + info!("✅ {} migrations found in correct chronological order", total_migrations); + } else { + error!("❌ Migrations are not in chronological order - this may cause issues"); } + + // Log first and last migration for reference + let first_migration = &migrations.migrations[0]; + let last_migration = &migrations.migrations[total_migrations - 1]; + + info!("Migration range: {} ({}) → {} ({})", + first_migration.version, first_migration.description, + last_migration.version, last_migration.description); + } else { + info!("No migrations found"); } // Check if ocr_error column exists @@ -226,7 +233,35 @@ async fn main() -> anyhow::Result<()> { let result = migrations.run(web_db.get_pool()).await; match result { - Ok(_) => info!("SQLx migrations completed successfully"), + Ok(_) => { + info!("✅ SQLx migrations completed successfully"); + + // Verify the get_ocr_queue_stats function has the correct implementation + let function_check = sqlx::query_scalar::<_, Option>( + r#" + SELECT pg_get_functiondef(p.oid) + FROM pg_proc p + JOIN pg_namespace n ON p.pronamespace = n.oid + WHERE n.nspname = 'public' AND p.proname = 'get_ocr_queue_stats' + "# + ) + .fetch_one(web_db.get_pool()) + .await; + + match function_check { + Ok(Some(def)) => { + // Check if it contains the correct logic from our latest migration + if def.contains("document_stats") && def.contains("documents.ocr_status = 'completed'") { + info!("✅ get_ocr_queue_stats function has correct logic (uses documents table for completed_today)"); + } else { + error!("❌ get_ocr_queue_stats function still uses old logic - migration may have failed"); + info!("Function uses ocr_queue table instead of documents table for completed_today count"); + } + } + Ok(None) => error!("❌ get_ocr_queue_stats function does not exist after migration"), + Err(e) => error!("❌ Failed to verify get_ocr_queue_stats function: {}", e), + } + } Err(e) => { error!("Failed to run SQLx migrations: {}", e); return Err(e.into()); diff --git a/src/ocr/queue.rs b/src/ocr/queue.rs index 1386fe7..079b20c 100644 --- a/src/ocr/queue.rs +++ b/src/ocr/queue.rs @@ -1,7 +1,7 @@ use anyhow::Result; use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; -use sqlx::{FromRow, PgPool, Row, Column}; +use sqlx::{FromRow, PgPool, Row}; use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; use tokio::sync::Semaphore; @@ -793,33 +793,6 @@ impl OcrQueueService { /// Get queue statistics pub async fn get_stats(&self) -> Result { - tracing::debug!("OCR Queue: Starting get_stats() call"); - - // First, let's check if the function exists and what it returns - let function_exists = sqlx::query_scalar::<_, bool>( - r#" - SELECT EXISTS ( - SELECT 1 FROM pg_proc p - JOIN pg_namespace n ON p.pronamespace = n.oid - WHERE n.nspname = 'public' AND p.proname = 'get_ocr_queue_stats' - ) - "# - ) - .fetch_one(&self.pool) - .await - .map_err(|e| { - tracing::error!("OCR Queue: Failed to check if function exists: {}", e); - e - })?; - - if !function_exists { - tracing::error!("OCR Queue: Function get_ocr_queue_stats() does not exist"); - return Err(anyhow::anyhow!("Function get_ocr_queue_stats() does not exist")); - } - - tracing::debug!("OCR Queue: Function get_ocr_queue_stats() exists, attempting to call it"); - - // Call the function let stats = sqlx::query( r#" SELECT * FROM get_ocr_queue_stats() @@ -828,97 +801,17 @@ impl OcrQueueService { .fetch_one(&self.pool) .await .map_err(|e| { - tracing::error!("OCR Queue: Failed to execute get_ocr_queue_stats(): {}", e); + tracing::error!("Failed to get OCR queue stats: {}", e); e })?; - tracing::debug!("OCR Queue: Successfully fetched stats row"); - - // Debug: Print all column names, their types, and their values - let columns = stats.columns(); - for (i, column) in columns.iter().enumerate() { - let column_name = column.name(); - let column_type = column.type_info(); - - tracing::debug!("OCR Queue: Column {}: name='{}', type='{:?}'", i, column_name, column_type); - } - - // Try to extract each field with detailed error handling - let pending_count = match stats.try_get::("pending_count") { - Ok(val) => { - tracing::debug!("OCR Queue: pending_count = {}", val); - val - } - Err(e) => { - tracing::error!("OCR Queue: Failed to get pending_count: {}", e); - return Err(anyhow::anyhow!("Failed to get pending_count: {}", e)); - } - }; - - let processing_count = match stats.try_get::("processing_count") { - Ok(val) => { - tracing::debug!("OCR Queue: processing_count = {}", val); - val - } - Err(e) => { - tracing::error!("OCR Queue: Failed to get processing_count: {}", e); - return Err(anyhow::anyhow!("Failed to get processing_count: {}", e)); - } - }; - - let failed_count = match stats.try_get::("failed_count") { - Ok(val) => { - tracing::debug!("OCR Queue: failed_count = {}", val); - val - } - Err(e) => { - tracing::error!("OCR Queue: Failed to get failed_count: {}", e); - return Err(anyhow::anyhow!("Failed to get failed_count: {}", e)); - } - }; - - let completed_today = match stats.try_get::("completed_today") { - Ok(val) => { - tracing::debug!("OCR Queue: completed_today = {}", val); - val - } - Err(e) => { - tracing::error!("OCR Queue: Failed to get completed_today: {}", e); - return Err(anyhow::anyhow!("Failed to get completed_today: {}", e)); - } - }; - - let avg_wait_time_minutes = match stats.try_get::, _>("avg_wait_time_minutes") { - Ok(val) => { - tracing::debug!("OCR Queue: avg_wait_time_minutes = {:?}", val); - val - } - Err(e) => { - tracing::error!("OCR Queue: Failed to get avg_wait_time_minutes: {}", e); - return Err(anyhow::anyhow!("Failed to get avg_wait_time_minutes: {}", e)); - } - }; - - let oldest_pending_minutes = match stats.try_get::, _>("oldest_pending_minutes") { - Ok(val) => { - tracing::debug!("OCR Queue: oldest_pending_minutes = {:?}", val); - val - } - Err(e) => { - tracing::error!("OCR Queue: Failed to get oldest_pending_minutes: {}", e); - return Err(anyhow::anyhow!("Failed to get oldest_pending_minutes: {}", e)); - } - }; - - tracing::debug!("OCR Queue: Successfully extracted all stats fields"); - Ok(QueueStats { - pending_count, - processing_count, - failed_count, - completed_today, - avg_wait_time_minutes, - oldest_pending_minutes, + pending_count: stats.get("pending_count"), + processing_count: stats.get("processing_count"), + failed_count: stats.get("failed_count"), + completed_today: stats.get("completed_today"), + avg_wait_time_minutes: stats.get("avg_wait_time_minutes"), + oldest_pending_minutes: stats.get("oldest_pending_minutes"), }) }