fix(stats): try to fix the stats extraction, again
This commit is contained in:
parent
e628b0d4d5
commit
a6f2b6df09
23
src/main.rs
23
src/main.rs
|
|
@ -6,6 +6,7 @@ use std::sync::Arc;
|
||||||
use tower_http::{cors::CorsLayer, services::{ServeDir, ServeFile}};
|
use tower_http::{cors::CorsLayer, services::{ServeDir, ServeFile}};
|
||||||
use tracing::{info, error, warn};
|
use tracing::{info, error, warn};
|
||||||
use anyhow;
|
use anyhow;
|
||||||
|
use sqlx::{Row, Column};
|
||||||
|
|
||||||
use readur::{config::Config, db::Database, AppState, *};
|
use readur::{config::Config, db::Database, AppState, *};
|
||||||
|
|
||||||
|
|
@ -224,6 +225,12 @@ async fn main() -> anyhow::Result<()> {
|
||||||
|
|
||||||
match function_check {
|
match function_check {
|
||||||
Ok(Some(def)) => {
|
Ok(Some(def)) => {
|
||||||
|
info!("📋 get_ocr_queue_stats function definition retrieved");
|
||||||
|
|
||||||
|
// Debug: print the actual function definition
|
||||||
|
info!("🔍 Function definition (first 500 chars): {}",
|
||||||
|
def.chars().take(500).collect::<String>());
|
||||||
|
|
||||||
// Check if it contains the correct logic from our latest migration
|
// Check if it contains the correct logic from our latest migration
|
||||||
if def.contains("document_stats") && def.contains("ocr_status = 'completed'") {
|
if def.contains("document_stats") && def.contains("ocr_status = 'completed'") {
|
||||||
info!("✅ get_ocr_queue_stats function has correct logic (uses documents table for completed_today)");
|
info!("✅ get_ocr_queue_stats function has correct logic (uses documents table for completed_today)");
|
||||||
|
|
@ -231,6 +238,22 @@ async fn main() -> anyhow::Result<()> {
|
||||||
error!("❌ get_ocr_queue_stats function still uses old logic - migration may have failed");
|
error!("❌ get_ocr_queue_stats function still uses old logic - migration may have failed");
|
||||||
info!("Function uses ocr_queue table instead of documents table for completed_today count");
|
info!("Function uses ocr_queue table instead of documents table for completed_today count");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Test the function execution at startup
|
||||||
|
info!("🧪 Testing function execution at startup...");
|
||||||
|
match sqlx::query("SELECT * FROM get_ocr_queue_stats()").fetch_one(web_db.get_pool()).await {
|
||||||
|
Ok(test_result) => {
|
||||||
|
info!("✅ Function executes successfully at startup");
|
||||||
|
let columns = test_result.columns();
|
||||||
|
info!("🔍 Function returns {} columns at startup:", columns.len());
|
||||||
|
for (i, column) in columns.iter().enumerate() {
|
||||||
|
info!(" Column {}: name='{}', type='{:?}'", i, column.name(), column.type_info());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
error!("❌ Function fails to execute at startup: {}", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
Ok(None) => error!("❌ get_ocr_queue_stats function does not exist after migration"),
|
Ok(None) => error!("❌ get_ocr_queue_stats function does not exist after migration"),
|
||||||
Err(e) => error!("❌ Failed to verify get_ocr_queue_stats function: {}", e),
|
Err(e) => error!("❌ Failed to verify get_ocr_queue_stats function: {}", e),
|
||||||
|
|
|
||||||
164
src/ocr/queue.rs
164
src/ocr/queue.rs
|
|
@ -1,7 +1,7 @@
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use chrono::{DateTime, Utc};
|
use chrono::{DateTime, Utc};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use sqlx::{FromRow, PgPool, Row};
|
use sqlx::{FromRow, PgPool, Row, Column};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::sync::atomic::{AtomicBool, Ordering};
|
use std::sync::atomic::{AtomicBool, Ordering};
|
||||||
use tokio::sync::Semaphore;
|
use tokio::sync::Semaphore;
|
||||||
|
|
@ -793,6 +793,39 @@ impl OcrQueueService {
|
||||||
|
|
||||||
/// Get queue statistics
|
/// Get queue statistics
|
||||||
pub async fn get_stats(&self) -> Result<QueueStats> {
|
pub async fn get_stats(&self) -> Result<QueueStats> {
|
||||||
|
tracing::debug!("OCR Queue: Starting get_stats() call");
|
||||||
|
|
||||||
|
// First, let's check the function signature/return type
|
||||||
|
let function_info = sqlx::query(
|
||||||
|
r#"
|
||||||
|
SELECT
|
||||||
|
p.proname as function_name,
|
||||||
|
pg_get_function_result(p.oid) as return_type,
|
||||||
|
pg_get_function_arguments(p.oid) as arguments
|
||||||
|
FROM pg_proc p
|
||||||
|
JOIN pg_namespace n ON p.pronamespace = n.oid
|
||||||
|
WHERE n.nspname = 'public' AND p.proname = 'get_ocr_queue_stats'
|
||||||
|
"#
|
||||||
|
)
|
||||||
|
.fetch_optional(&self.pool)
|
||||||
|
.await
|
||||||
|
.map_err(|e| {
|
||||||
|
tracing::error!("Failed to get function info: {}", e);
|
||||||
|
e
|
||||||
|
})?;
|
||||||
|
|
||||||
|
if let Some(info) = function_info {
|
||||||
|
let function_name: String = info.get("function_name");
|
||||||
|
let return_type: String = info.get("return_type");
|
||||||
|
let arguments: String = info.get("arguments");
|
||||||
|
tracing::debug!("Function info - name: {}, return_type: {}, arguments: {}", function_name, return_type, arguments);
|
||||||
|
} else {
|
||||||
|
tracing::error!("get_ocr_queue_stats function not found!");
|
||||||
|
return Err(anyhow::anyhow!("get_ocr_queue_stats function not found"));
|
||||||
|
}
|
||||||
|
|
||||||
|
tracing::debug!("OCR Queue: Calling get_ocr_queue_stats() function");
|
||||||
|
|
||||||
let stats = sqlx::query(
|
let stats = sqlx::query(
|
||||||
r#"
|
r#"
|
||||||
SELECT * FROM get_ocr_queue_stats()
|
SELECT * FROM get_ocr_queue_stats()
|
||||||
|
|
@ -802,16 +835,133 @@ impl OcrQueueService {
|
||||||
.await
|
.await
|
||||||
.map_err(|e| {
|
.map_err(|e| {
|
||||||
tracing::error!("Failed to get OCR queue stats: {}", e);
|
tracing::error!("Failed to get OCR queue stats: {}", e);
|
||||||
|
tracing::debug!("This indicates a function structure mismatch error");
|
||||||
e
|
e
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
|
tracing::debug!("OCR Queue: Successfully got result from function, analyzing structure...");
|
||||||
|
|
||||||
|
// Debug the actual columns returned
|
||||||
|
let columns = stats.columns();
|
||||||
|
tracing::debug!("Function returned {} columns:", columns.len());
|
||||||
|
for (i, column) in columns.iter().enumerate() {
|
||||||
|
let column_name = column.name();
|
||||||
|
let column_type = column.type_info();
|
||||||
|
tracing::debug!(" Column {}: name='{}', type='{:?}'", i, column_name, column_type);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Try to extract values with detailed logging
|
||||||
|
tracing::debug!("Attempting to extract pending_count...");
|
||||||
|
let pending_count = match stats.try_get::<i64, _>("pending_count") {
|
||||||
|
Ok(val) => {
|
||||||
|
tracing::debug!("Successfully got pending_count: {}", val);
|
||||||
|
val
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
tracing::error!("Failed to get pending_count: {}", e);
|
||||||
|
tracing::debug!("Trying different type for pending_count...");
|
||||||
|
stats.try_get::<Option<i64>, _>("pending_count")
|
||||||
|
.map_err(|e2| {
|
||||||
|
tracing::error!("Also failed with Option<i64>: {}", e2);
|
||||||
|
e
|
||||||
|
})?
|
||||||
|
.unwrap_or(0)
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
tracing::debug!("Attempting to extract processing_count...");
|
||||||
|
let processing_count = match stats.try_get::<i64, _>("processing_count") {
|
||||||
|
Ok(val) => {
|
||||||
|
tracing::debug!("Successfully got processing_count: {}", val);
|
||||||
|
val
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
tracing::error!("Failed to get processing_count: {}", e);
|
||||||
|
stats.try_get::<Option<i64>, _>("processing_count")?.unwrap_or(0)
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
tracing::debug!("Attempting to extract failed_count...");
|
||||||
|
let failed_count = match stats.try_get::<i64, _>("failed_count") {
|
||||||
|
Ok(val) => {
|
||||||
|
tracing::debug!("Successfully got failed_count: {}", val);
|
||||||
|
val
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
tracing::error!("Failed to get failed_count: {}", e);
|
||||||
|
stats.try_get::<Option<i64>, _>("failed_count")?.unwrap_or(0)
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
tracing::debug!("Attempting to extract completed_today...");
|
||||||
|
let completed_today = match stats.try_get::<i64, _>("completed_today") {
|
||||||
|
Ok(val) => {
|
||||||
|
tracing::debug!("Successfully got completed_today: {}", val);
|
||||||
|
val
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
tracing::error!("Failed to get completed_today: {}", e);
|
||||||
|
stats.try_get::<Option<i64>, _>("completed_today")?.unwrap_or(0)
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
tracing::debug!("Attempting to extract avg_wait_time_minutes...");
|
||||||
|
let avg_wait_time_minutes = match stats.try_get::<Option<f64>, _>("avg_wait_time_minutes") {
|
||||||
|
Ok(val) => {
|
||||||
|
tracing::debug!("Successfully got avg_wait_time_minutes: {:?}", val);
|
||||||
|
val
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
tracing::error!("Failed to get avg_wait_time_minutes: {}", e);
|
||||||
|
// Try as string and convert
|
||||||
|
match stats.try_get::<Option<String>, _>("avg_wait_time_minutes") {
|
||||||
|
Ok(Some(str_val)) => {
|
||||||
|
let float_val = str_val.parse::<f64>().ok();
|
||||||
|
tracing::debug!("Converted string '{}' to f64: {:?}", str_val, float_val);
|
||||||
|
float_val
|
||||||
|
}
|
||||||
|
Ok(None) => None,
|
||||||
|
Err(e2) => {
|
||||||
|
tracing::error!("Also failed with String: {}", e2);
|
||||||
|
return Err(anyhow::anyhow!("Failed to get avg_wait_time_minutes: {}", e));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
tracing::debug!("Attempting to extract oldest_pending_minutes...");
|
||||||
|
let oldest_pending_minutes = match stats.try_get::<Option<f64>, _>("oldest_pending_minutes") {
|
||||||
|
Ok(val) => {
|
||||||
|
tracing::debug!("Successfully got oldest_pending_minutes: {:?}", val);
|
||||||
|
val
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
tracing::error!("Failed to get oldest_pending_minutes: {}", e);
|
||||||
|
// Try as string and convert
|
||||||
|
match stats.try_get::<Option<String>, _>("oldest_pending_minutes") {
|
||||||
|
Ok(Some(str_val)) => {
|
||||||
|
let float_val = str_val.parse::<f64>().ok();
|
||||||
|
tracing::debug!("Converted string '{}' to f64: {:?}", str_val, float_val);
|
||||||
|
float_val
|
||||||
|
}
|
||||||
|
Ok(None) => None,
|
||||||
|
Err(e2) => {
|
||||||
|
tracing::error!("Also failed with String: {}", e2);
|
||||||
|
return Err(anyhow::anyhow!("Failed to get oldest_pending_minutes: {}", e));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
tracing::debug!("OCR Queue: Successfully extracted all values, creating QueueStats");
|
||||||
|
|
||||||
Ok(QueueStats {
|
Ok(QueueStats {
|
||||||
pending_count: stats.get("pending_count"),
|
pending_count,
|
||||||
processing_count: stats.get("processing_count"),
|
processing_count,
|
||||||
failed_count: stats.get("failed_count"),
|
failed_count,
|
||||||
completed_today: stats.get("completed_today"),
|
completed_today,
|
||||||
avg_wait_time_minutes: stats.get("avg_wait_time_minutes"),
|
avg_wait_time_minutes,
|
||||||
oldest_pending_minutes: stats.get("oldest_pending_minutes"),
|
oldest_pending_minutes,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue