fix(stats): try to fix stats export, again
This commit is contained in:
parent
e17b158140
commit
a7e9f75eab
75
src/main.rs
75
src/main.rs
|
|
@ -173,29 +173,36 @@ async fn main() -> anyhow::Result<()> {
|
||||||
// Run SQLx migrations
|
// Run SQLx migrations
|
||||||
info!("Running SQLx migrations...");
|
info!("Running SQLx migrations...");
|
||||||
let migrations = sqlx::migrate!("./migrations");
|
let migrations = sqlx::migrate!("./migrations");
|
||||||
info!("Found {} migrations", migrations.migrations.len());
|
let total_migrations = migrations.migrations.len();
|
||||||
|
|
||||||
for migration in migrations.migrations.iter() {
|
if total_migrations > 0 {
|
||||||
info!("Migration available: {} - {}", migration.version, migration.description);
|
// Verify migrations are in correct chronological order
|
||||||
}
|
let mut is_ordered = true;
|
||||||
|
let mut prev_version = 0i64;
|
||||||
// Check current migration status
|
|
||||||
let applied_result = sqlx::query("SELECT version, description FROM _sqlx_migrations ORDER BY version")
|
for migration in migrations.migrations.iter() {
|
||||||
.fetch_all(web_db.get_pool())
|
if migration.version <= prev_version {
|
||||||
.await;
|
error!("❌ Migration {} is out of order (previous: {})", migration.version, prev_version);
|
||||||
|
is_ordered = false;
|
||||||
match applied_result {
|
|
||||||
Ok(rows) => {
|
|
||||||
info!("Currently applied migrations:");
|
|
||||||
for row in rows {
|
|
||||||
let version: i64 = row.get("version");
|
|
||||||
let description: String = row.get("description");
|
|
||||||
info!(" - {} {}", version, description);
|
|
||||||
}
|
}
|
||||||
|
prev_version = migration.version;
|
||||||
}
|
}
|
||||||
Err(e) => {
|
|
||||||
info!("No existing migrations found (this is normal for first run): {}", e);
|
if is_ordered {
|
||||||
|
info!("✅ {} migrations found in correct chronological order", total_migrations);
|
||||||
|
} else {
|
||||||
|
error!("❌ Migrations are not in chronological order - this may cause issues");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Log first and last migration for reference
|
||||||
|
let first_migration = &migrations.migrations[0];
|
||||||
|
let last_migration = &migrations.migrations[total_migrations - 1];
|
||||||
|
|
||||||
|
info!("Migration range: {} ({}) → {} ({})",
|
||||||
|
first_migration.version, first_migration.description,
|
||||||
|
last_migration.version, last_migration.description);
|
||||||
|
} else {
|
||||||
|
info!("No migrations found");
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check if ocr_error column exists
|
// Check if ocr_error column exists
|
||||||
|
|
@ -226,7 +233,35 @@ async fn main() -> anyhow::Result<()> {
|
||||||
|
|
||||||
let result = migrations.run(web_db.get_pool()).await;
|
let result = migrations.run(web_db.get_pool()).await;
|
||||||
match result {
|
match result {
|
||||||
Ok(_) => info!("SQLx migrations completed successfully"),
|
Ok(_) => {
|
||||||
|
info!("✅ SQLx migrations completed successfully");
|
||||||
|
|
||||||
|
// Verify the get_ocr_queue_stats function has the correct implementation
|
||||||
|
let function_check = sqlx::query_scalar::<_, Option<String>>(
|
||||||
|
r#"
|
||||||
|
SELECT pg_get_functiondef(p.oid)
|
||||||
|
FROM pg_proc p
|
||||||
|
JOIN pg_namespace n ON p.pronamespace = n.oid
|
||||||
|
WHERE n.nspname = 'public' AND p.proname = 'get_ocr_queue_stats'
|
||||||
|
"#
|
||||||
|
)
|
||||||
|
.fetch_one(web_db.get_pool())
|
||||||
|
.await;
|
||||||
|
|
||||||
|
match function_check {
|
||||||
|
Ok(Some(def)) => {
|
||||||
|
// Check if it contains the correct logic from our latest migration
|
||||||
|
if def.contains("document_stats") && def.contains("documents.ocr_status = 'completed'") {
|
||||||
|
info!("✅ get_ocr_queue_stats function has correct logic (uses documents table for completed_today)");
|
||||||
|
} else {
|
||||||
|
error!("❌ get_ocr_queue_stats function still uses old logic - migration may have failed");
|
||||||
|
info!("Function uses ocr_queue table instead of documents table for completed_today count");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(None) => error!("❌ get_ocr_queue_stats function does not exist after migration"),
|
||||||
|
Err(e) => error!("❌ Failed to verify get_ocr_queue_stats function: {}", e),
|
||||||
|
}
|
||||||
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
error!("Failed to run SQLx migrations: {}", e);
|
error!("Failed to run SQLx migrations: {}", e);
|
||||||
return Err(e.into());
|
return Err(e.into());
|
||||||
|
|
|
||||||
123
src/ocr/queue.rs
123
src/ocr/queue.rs
|
|
@ -1,7 +1,7 @@
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use chrono::{DateTime, Utc};
|
use chrono::{DateTime, Utc};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use sqlx::{FromRow, PgPool, Row, Column};
|
use sqlx::{FromRow, PgPool, Row};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::sync::atomic::{AtomicBool, Ordering};
|
use std::sync::atomic::{AtomicBool, Ordering};
|
||||||
use tokio::sync::Semaphore;
|
use tokio::sync::Semaphore;
|
||||||
|
|
@ -793,33 +793,6 @@ impl OcrQueueService {
|
||||||
|
|
||||||
/// Get queue statistics
|
/// Get queue statistics
|
||||||
pub async fn get_stats(&self) -> Result<QueueStats> {
|
pub async fn get_stats(&self) -> Result<QueueStats> {
|
||||||
tracing::debug!("OCR Queue: Starting get_stats() call");
|
|
||||||
|
|
||||||
// First, let's check if the function exists and what it returns
|
|
||||||
let function_exists = sqlx::query_scalar::<_, bool>(
|
|
||||||
r#"
|
|
||||||
SELECT EXISTS (
|
|
||||||
SELECT 1 FROM pg_proc p
|
|
||||||
JOIN pg_namespace n ON p.pronamespace = n.oid
|
|
||||||
WHERE n.nspname = 'public' AND p.proname = 'get_ocr_queue_stats'
|
|
||||||
)
|
|
||||||
"#
|
|
||||||
)
|
|
||||||
.fetch_one(&self.pool)
|
|
||||||
.await
|
|
||||||
.map_err(|e| {
|
|
||||||
tracing::error!("OCR Queue: Failed to check if function exists: {}", e);
|
|
||||||
e
|
|
||||||
})?;
|
|
||||||
|
|
||||||
if !function_exists {
|
|
||||||
tracing::error!("OCR Queue: Function get_ocr_queue_stats() does not exist");
|
|
||||||
return Err(anyhow::anyhow!("Function get_ocr_queue_stats() does not exist"));
|
|
||||||
}
|
|
||||||
|
|
||||||
tracing::debug!("OCR Queue: Function get_ocr_queue_stats() exists, attempting to call it");
|
|
||||||
|
|
||||||
// Call the function
|
|
||||||
let stats = sqlx::query(
|
let stats = sqlx::query(
|
||||||
r#"
|
r#"
|
||||||
SELECT * FROM get_ocr_queue_stats()
|
SELECT * FROM get_ocr_queue_stats()
|
||||||
|
|
@ -828,97 +801,17 @@ impl OcrQueueService {
|
||||||
.fetch_one(&self.pool)
|
.fetch_one(&self.pool)
|
||||||
.await
|
.await
|
||||||
.map_err(|e| {
|
.map_err(|e| {
|
||||||
tracing::error!("OCR Queue: Failed to execute get_ocr_queue_stats(): {}", e);
|
tracing::error!("Failed to get OCR queue stats: {}", e);
|
||||||
e
|
e
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
tracing::debug!("OCR Queue: Successfully fetched stats row");
|
|
||||||
|
|
||||||
// Debug: Print all column names, their types, and their values
|
|
||||||
let columns = stats.columns();
|
|
||||||
for (i, column) in columns.iter().enumerate() {
|
|
||||||
let column_name = column.name();
|
|
||||||
let column_type = column.type_info();
|
|
||||||
|
|
||||||
tracing::debug!("OCR Queue: Column {}: name='{}', type='{:?}'", i, column_name, column_type);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Try to extract each field with detailed error handling
|
|
||||||
let pending_count = match stats.try_get::<i64, _>("pending_count") {
|
|
||||||
Ok(val) => {
|
|
||||||
tracing::debug!("OCR Queue: pending_count = {}", val);
|
|
||||||
val
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
tracing::error!("OCR Queue: Failed to get pending_count: {}", e);
|
|
||||||
return Err(anyhow::anyhow!("Failed to get pending_count: {}", e));
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let processing_count = match stats.try_get::<i64, _>("processing_count") {
|
|
||||||
Ok(val) => {
|
|
||||||
tracing::debug!("OCR Queue: processing_count = {}", val);
|
|
||||||
val
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
tracing::error!("OCR Queue: Failed to get processing_count: {}", e);
|
|
||||||
return Err(anyhow::anyhow!("Failed to get processing_count: {}", e));
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let failed_count = match stats.try_get::<i64, _>("failed_count") {
|
|
||||||
Ok(val) => {
|
|
||||||
tracing::debug!("OCR Queue: failed_count = {}", val);
|
|
||||||
val
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
tracing::error!("OCR Queue: Failed to get failed_count: {}", e);
|
|
||||||
return Err(anyhow::anyhow!("Failed to get failed_count: {}", e));
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let completed_today = match stats.try_get::<i64, _>("completed_today") {
|
|
||||||
Ok(val) => {
|
|
||||||
tracing::debug!("OCR Queue: completed_today = {}", val);
|
|
||||||
val
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
tracing::error!("OCR Queue: Failed to get completed_today: {}", e);
|
|
||||||
return Err(anyhow::anyhow!("Failed to get completed_today: {}", e));
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let avg_wait_time_minutes = match stats.try_get::<Option<f64>, _>("avg_wait_time_minutes") {
|
|
||||||
Ok(val) => {
|
|
||||||
tracing::debug!("OCR Queue: avg_wait_time_minutes = {:?}", val);
|
|
||||||
val
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
tracing::error!("OCR Queue: Failed to get avg_wait_time_minutes: {}", e);
|
|
||||||
return Err(anyhow::anyhow!("Failed to get avg_wait_time_minutes: {}", e));
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let oldest_pending_minutes = match stats.try_get::<Option<f64>, _>("oldest_pending_minutes") {
|
|
||||||
Ok(val) => {
|
|
||||||
tracing::debug!("OCR Queue: oldest_pending_minutes = {:?}", val);
|
|
||||||
val
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
tracing::error!("OCR Queue: Failed to get oldest_pending_minutes: {}", e);
|
|
||||||
return Err(anyhow::anyhow!("Failed to get oldest_pending_minutes: {}", e));
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
tracing::debug!("OCR Queue: Successfully extracted all stats fields");
|
|
||||||
|
|
||||||
Ok(QueueStats {
|
Ok(QueueStats {
|
||||||
pending_count,
|
pending_count: stats.get("pending_count"),
|
||||||
processing_count,
|
processing_count: stats.get("processing_count"),
|
||||||
failed_count,
|
failed_count: stats.get("failed_count"),
|
||||||
completed_today,
|
completed_today: stats.get("completed_today"),
|
||||||
avg_wait_time_minutes,
|
avg_wait_time_minutes: stats.get("avg_wait_time_minutes"),
|
||||||
oldest_pending_minutes,
|
oldest_pending_minutes: stats.get("oldest_pending_minutes"),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue