588 lines
20 KiB
Rust
588 lines
20 KiB
Rust
/*!
|
|
* Database Monitoring and Alerting System
|
|
*
|
|
* Provides real-time monitoring of database health, OCR processing,
|
|
* and automatic alerting for potential issues.
|
|
*/
|
|
|
|
use sqlx::PgPool;
|
|
use serde::{Deserialize, Serialize};
|
|
use tokio::time::{Duration, interval};
|
|
use tracing::{error, warn, info, debug};
|
|
use anyhow::Result;
|
|
use std::sync::Arc;
|
|
|
|
/// Database monitoring service that runs in the background
|
|
pub struct DatabaseMonitor {
|
|
pool: PgPool,
|
|
config: MonitoringConfig,
|
|
}
|
|
|
|
#[derive(Debug, Clone)]
|
|
pub struct MonitoringConfig {
|
|
pub check_interval_secs: u64,
|
|
pub stuck_job_threshold_minutes: i32,
|
|
pub high_queue_size_threshold: i32,
|
|
pub low_confidence_threshold: f64,
|
|
pub pool_utilization_threshold: u8,
|
|
pub slow_query_threshold_ms: u64,
|
|
pub enable_auto_recovery: bool,
|
|
}
|
|
|
|
impl Default for MonitoringConfig {
|
|
fn default() -> Self {
|
|
Self {
|
|
check_interval_secs: 60,
|
|
stuck_job_threshold_minutes: 30,
|
|
high_queue_size_threshold: 100,
|
|
low_confidence_threshold: 70.0,
|
|
pool_utilization_threshold: 80,
|
|
slow_query_threshold_ms: 5000,
|
|
enable_auto_recovery: true,
|
|
}
|
|
}
|
|
}
|
|
|
|
#[derive(Debug, Serialize, Deserialize)]
|
|
pub struct DatabaseHealth {
|
|
pub overall_status: HealthStatus,
|
|
pub ocr_processing: OcrProcessingHealth,
|
|
pub queue_health: QueueHealth,
|
|
pub connection_pool: PoolHealth,
|
|
pub data_consistency: ConsistencyHealth,
|
|
pub performance_metrics: PerformanceMetrics,
|
|
pub timestamp: chrono::DateTime<chrono::Utc>,
|
|
}
|
|
|
|
#[derive(Debug, Serialize, Deserialize)]
|
|
pub enum HealthStatus {
|
|
Healthy,
|
|
Warning,
|
|
Critical,
|
|
Unknown,
|
|
}
|
|
|
|
#[derive(Debug, Serialize, Deserialize)]
|
|
pub struct OcrProcessingHealth {
|
|
pub status: HealthStatus,
|
|
pub pending_jobs: i32,
|
|
pub processing_jobs: i32,
|
|
pub stuck_jobs: i32,
|
|
pub failed_jobs_last_hour: i32,
|
|
pub average_confidence: Option<f64>,
|
|
pub average_processing_time_ms: Option<f64>,
|
|
pub throughput_per_minute: f64,
|
|
}
|
|
|
|
#[derive(Debug, Serialize, Deserialize)]
|
|
pub struct QueueHealth {
|
|
pub status: HealthStatus,
|
|
pub queue_size: i32,
|
|
pub oldest_pending_age_minutes: Option<i32>,
|
|
pub worker_count: i32,
|
|
pub queue_growth_rate: f64,
|
|
}
|
|
|
|
#[derive(Debug, Serialize, Deserialize)]
|
|
pub struct PoolHealth {
|
|
pub status: HealthStatus,
|
|
pub total_connections: u32,
|
|
pub active_connections: u32,
|
|
pub idle_connections: u32,
|
|
pub utilization_percent: u8,
|
|
pub average_response_time_ms: u64,
|
|
}
|
|
|
|
#[derive(Debug, Serialize, Deserialize)]
|
|
pub struct ConsistencyHealth {
|
|
pub status: HealthStatus,
|
|
pub orphaned_queue_items: i32,
|
|
pub documents_without_files: i32,
|
|
pub inconsistent_ocr_states: i32,
|
|
pub data_integrity_score: f64,
|
|
}
|
|
|
|
#[derive(Debug, Serialize, Deserialize)]
|
|
pub struct PerformanceMetrics {
|
|
pub queries_per_second: f64,
|
|
pub slow_queries_count: i32,
|
|
pub cache_hit_ratio: Option<f64>,
|
|
pub index_usage_efficiency: f64,
|
|
pub deadlock_count: i32,
|
|
}
|
|
|
|
impl DatabaseMonitor {
|
|
pub fn new(pool: PgPool, config: MonitoringConfig) -> Self {
|
|
Self { pool, config }
|
|
}
|
|
|
|
/// Start the monitoring service
|
|
pub async fn start(self: Arc<Self>) {
|
|
let mut interval = interval(Duration::from_secs(self.config.check_interval_secs));
|
|
|
|
info!("Database monitoring started with {}s intervals", self.config.check_interval_secs);
|
|
|
|
loop {
|
|
interval.tick().await;
|
|
|
|
match self.perform_health_check().await {
|
|
Ok(health) => {
|
|
self.process_health_report(health).await;
|
|
}
|
|
Err(e) => {
|
|
error!("Database health check failed: {}", e);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Perform comprehensive database health check
|
|
async fn perform_health_check(&self) -> Result<DatabaseHealth> {
|
|
let start_time = std::time::Instant::now();
|
|
|
|
// Run all health checks concurrently
|
|
let (ocr_health, queue_health, pool_health, consistency_health, perf_metrics) = tokio::try_join!(
|
|
self.check_ocr_processing_health(),
|
|
self.check_queue_health(),
|
|
self.check_pool_health(),
|
|
self.check_data_consistency(),
|
|
self.check_performance_metrics()
|
|
)?;
|
|
|
|
let overall_status = self.determine_overall_status(&ocr_health, &queue_health, &pool_health, &consistency_health);
|
|
|
|
let health_check_duration = start_time.elapsed();
|
|
debug!("Health check completed in {:?}", health_check_duration);
|
|
|
|
Ok(DatabaseHealth {
|
|
overall_status,
|
|
ocr_processing: ocr_health,
|
|
queue_health,
|
|
connection_pool: pool_health,
|
|
data_consistency: consistency_health,
|
|
performance_metrics: perf_metrics,
|
|
timestamp: chrono::Utc::now(),
|
|
})
|
|
}
|
|
|
|
async fn check_ocr_processing_health(&self) -> Result<OcrProcessingHealth> {
|
|
let stats = sqlx::query!(
|
|
r#"
|
|
SELECT
|
|
COUNT(*) FILTER (WHERE ocr_status = 'pending') as pending,
|
|
COUNT(*) FILTER (WHERE ocr_status = 'processing') as processing,
|
|
COUNT(*) FILTER (WHERE ocr_status = 'processing' AND updated_at < NOW() - INTERVAL '30 minutes') as stuck,
|
|
COUNT(*) FILTER (WHERE ocr_status = 'failed' AND updated_at > NOW() - INTERVAL '1 hour') as failed_recent,
|
|
AVG(ocr_confidence) FILTER (WHERE ocr_status = 'completed' AND ocr_completed_at > NOW() - INTERVAL '1 hour') as avg_confidence,
|
|
AVG(ocr_processing_time_ms) FILTER (WHERE ocr_status = 'completed' AND ocr_completed_at > NOW() - INTERVAL '1 hour') as avg_time,
|
|
COUNT(*) FILTER (WHERE ocr_status = 'completed' AND ocr_completed_at > NOW() - INTERVAL '1 minute') as completed_last_minute
|
|
FROM documents
|
|
"#
|
|
)
|
|
.fetch_one(&self.pool)
|
|
.await?;
|
|
|
|
let pending = stats.pending.unwrap_or(0) as i32;
|
|
let processing = stats.processing.unwrap_or(0) as i32;
|
|
let stuck = stats.stuck.unwrap_or(0) as i32;
|
|
let failed_recent = stats.failed_recent.unwrap_or(0) as i32;
|
|
let avg_confidence = stats.avg_confidence;
|
|
let avg_time = stats.avg_time;
|
|
let throughput = stats.completed_last_minute.unwrap_or(0) as f64;
|
|
|
|
let status = if stuck > 0 || failed_recent > 10 {
|
|
HealthStatus::Critical
|
|
} else if pending > self.config.high_queue_size_threshold || avg_confidence.unwrap_or(100.0) < self.config.low_confidence_threshold {
|
|
HealthStatus::Warning
|
|
} else {
|
|
HealthStatus::Healthy
|
|
};
|
|
|
|
Ok(OcrProcessingHealth {
|
|
status,
|
|
pending_jobs: pending,
|
|
processing_jobs: processing,
|
|
stuck_jobs: stuck,
|
|
failed_jobs_last_hour: failed_recent,
|
|
average_confidence: avg_confidence,
|
|
average_processing_time_ms: avg_time,
|
|
throughput_per_minute: throughput,
|
|
})
|
|
}
|
|
|
|
async fn check_queue_health(&self) -> Result<QueueHealth> {
|
|
let queue_stats = sqlx::query!(
|
|
r#"
|
|
SELECT
|
|
COUNT(*) as total_items,
|
|
MIN(EXTRACT(EPOCH FROM (NOW() - created_at))/60) as oldest_pending_minutes,
|
|
COUNT(DISTINCT worker_id) FILTER (WHERE status = 'processing') as active_workers
|
|
FROM ocr_queue
|
|
WHERE status IN ('pending', 'processing')
|
|
"#
|
|
)
|
|
.fetch_one(&self.pool)
|
|
.await?;
|
|
|
|
let queue_size = queue_stats.total_items.unwrap_or(0) as i32;
|
|
let oldest_pending = queue_stats.oldest_pending_minutes.map(|m| m as i32);
|
|
let worker_count = queue_stats.active_workers.unwrap_or(0) as i32;
|
|
|
|
// Calculate queue growth rate (simplified)
|
|
let growth_rate = 0.0; // Would need historical data for accurate calculation
|
|
|
|
let status = if queue_size > self.config.high_queue_size_threshold {
|
|
HealthStatus::Critical
|
|
} else if queue_size > self.config.high_queue_size_threshold / 2 {
|
|
HealthStatus::Warning
|
|
} else {
|
|
HealthStatus::Healthy
|
|
};
|
|
|
|
Ok(QueueHealth {
|
|
status,
|
|
queue_size,
|
|
oldest_pending_age_minutes: oldest_pending,
|
|
worker_count,
|
|
queue_growth_rate: growth_rate,
|
|
})
|
|
}
|
|
|
|
async fn check_pool_health(&self) -> Result<PoolHealth> {
|
|
let start = std::time::Instant::now();
|
|
|
|
// Test pool responsiveness
|
|
sqlx::query!("SELECT 1")
|
|
.fetch_one(&self.pool)
|
|
.await?;
|
|
|
|
let response_time = start.elapsed().as_millis() as u64;
|
|
|
|
let total_connections = self.pool.size();
|
|
let idle_connections = self.pool.num_idle();
|
|
let active_connections = total_connections - idle_connections;
|
|
let utilization = if total_connections > 0 {
|
|
(active_connections as f64 / total_connections as f64 * 100.0) as u8
|
|
} else {
|
|
0
|
|
};
|
|
|
|
let status = if utilization > self.config.pool_utilization_threshold {
|
|
HealthStatus::Critical
|
|
} else if utilization > self.config.pool_utilization_threshold / 2 || response_time > self.config.slow_query_threshold_ms {
|
|
HealthStatus::Warning
|
|
} else {
|
|
HealthStatus::Healthy
|
|
};
|
|
|
|
Ok(PoolHealth {
|
|
status,
|
|
total_connections,
|
|
active_connections,
|
|
idle_connections,
|
|
utilization_percent: utilization,
|
|
average_response_time_ms: response_time,
|
|
})
|
|
}
|
|
|
|
async fn check_data_consistency(&self) -> Result<ConsistencyHealth> {
|
|
let consistency_check = sqlx::query!(
|
|
r#"
|
|
SELECT
|
|
-- Orphaned queue items
|
|
(SELECT COUNT(*) FROM ocr_queue q
|
|
LEFT JOIN documents d ON q.document_id = d.id
|
|
WHERE d.id IS NULL) as orphaned_queue,
|
|
|
|
-- Documents without files (would need file system check)
|
|
0 as missing_files,
|
|
|
|
-- Inconsistent OCR states
|
|
(SELECT COUNT(*) FROM documents d
|
|
JOIN ocr_queue q ON d.id = q.document_id
|
|
WHERE d.ocr_status = 'completed' AND q.status != 'completed') as inconsistent_states
|
|
"#
|
|
)
|
|
.fetch_one(&self.pool)
|
|
.await?;
|
|
|
|
let orphaned = consistency_check.orphaned_queue.unwrap_or(0) as i32;
|
|
let missing_files = consistency_check.missing_files.unwrap_or(0) as i32;
|
|
let inconsistent = consistency_check.inconsistent_states.unwrap_or(0) as i32;
|
|
|
|
let total_issues = orphaned + missing_files + inconsistent;
|
|
let integrity_score = if total_issues == 0 { 100.0 } else { 100.0 - (total_issues as f64 * 10.0).min(100.0) };
|
|
|
|
let status = if total_issues > 10 {
|
|
HealthStatus::Critical
|
|
} else if total_issues > 0 {
|
|
HealthStatus::Warning
|
|
} else {
|
|
HealthStatus::Healthy
|
|
};
|
|
|
|
Ok(ConsistencyHealth {
|
|
status,
|
|
orphaned_queue_items: orphaned,
|
|
documents_without_files: missing_files,
|
|
inconsistent_ocr_states: inconsistent,
|
|
data_integrity_score: integrity_score,
|
|
})
|
|
}
|
|
|
|
async fn check_performance_metrics(&self) -> Result<PerformanceMetrics> {
|
|
// These would need more sophisticated monitoring in production
|
|
// For now, return basic metrics
|
|
|
|
Ok(PerformanceMetrics {
|
|
queries_per_second: 0.0,
|
|
slow_queries_count: 0,
|
|
cache_hit_ratio: None,
|
|
index_usage_efficiency: 95.0,
|
|
deadlock_count: 0,
|
|
})
|
|
}
|
|
|
|
fn determine_overall_status(
|
|
&self,
|
|
ocr: &OcrProcessingHealth,
|
|
queue: &QueueHealth,
|
|
pool: &PoolHealth,
|
|
consistency: &ConsistencyHealth,
|
|
) -> HealthStatus {
|
|
let statuses = [&ocr.status, &queue.status, &pool.status, &consistency.status];
|
|
|
|
if statuses.iter().any(|s| matches!(s, HealthStatus::Critical)) {
|
|
HealthStatus::Critical
|
|
} else if statuses.iter().any(|s| matches!(s, HealthStatus::Warning)) {
|
|
HealthStatus::Warning
|
|
} else {
|
|
HealthStatus::Healthy
|
|
}
|
|
}
|
|
|
|
/// Process health report and take actions
|
|
async fn process_health_report(&self, health: DatabaseHealth) {
|
|
match health.overall_status {
|
|
HealthStatus::Critical => {
|
|
error!("🚨 CRITICAL: Database health issues detected!");
|
|
self.handle_critical_issues(&health).await;
|
|
}
|
|
HealthStatus::Warning => {
|
|
warn!("⚠️ WARNING: Database health degraded");
|
|
self.handle_warnings(&health).await;
|
|
}
|
|
HealthStatus::Healthy => {
|
|
debug!("✅ Database health is good");
|
|
}
|
|
HealthStatus::Unknown => {
|
|
warn!("❓ Database health status unknown");
|
|
}
|
|
}
|
|
|
|
// Log key metrics
|
|
info!(
|
|
"DB Health: OCR pending={}, processing={}, stuck={}, pool={}%",
|
|
health.ocr_processing.pending_jobs,
|
|
health.ocr_processing.processing_jobs,
|
|
health.ocr_processing.stuck_jobs,
|
|
health.connection_pool.utilization_percent
|
|
);
|
|
}
|
|
|
|
async fn handle_critical_issues(&self, health: &DatabaseHealth) {
|
|
if self.config.enable_auto_recovery {
|
|
// Reset stuck OCR jobs
|
|
if health.ocr_processing.stuck_jobs > 0 {
|
|
match self.reset_stuck_jobs().await {
|
|
Ok(reset_count) => {
|
|
warn!("Auto-recovery: Reset {} stuck OCR jobs", reset_count);
|
|
}
|
|
Err(e) => {
|
|
error!("Failed to reset stuck OCR jobs: {}", e);
|
|
}
|
|
}
|
|
}
|
|
|
|
// Clean up orphaned queue items
|
|
if health.data_consistency.orphaned_queue_items > 0 {
|
|
match self.cleanup_orphaned_items().await {
|
|
Ok(cleanup_count) => {
|
|
warn!("Auto-recovery: Cleaned up {} orphaned queue items", cleanup_count);
|
|
}
|
|
Err(e) => {
|
|
error!("Failed to cleanup orphaned items: {}", e);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
async fn handle_warnings(&self, health: &DatabaseHealth) {
|
|
// Log detailed warning information
|
|
if health.ocr_processing.pending_jobs > self.config.high_queue_size_threshold / 2 {
|
|
warn!("High OCR queue size: {} pending jobs", health.ocr_processing.pending_jobs);
|
|
}
|
|
|
|
if health.connection_pool.utilization_percent > self.config.pool_utilization_threshold / 2 {
|
|
warn!("High connection pool utilization: {}%", health.connection_pool.utilization_percent);
|
|
}
|
|
}
|
|
|
|
async fn reset_stuck_jobs(&self) -> Result<i32> {
|
|
let result = sqlx::query!(
|
|
"SELECT reset_stuck_ocr_jobs($1) as reset_count",
|
|
self.config.stuck_job_threshold_minutes
|
|
)
|
|
.fetch_one(&self.pool)
|
|
.await?;
|
|
|
|
Ok(result.reset_count.unwrap_or(0))
|
|
}
|
|
|
|
async fn cleanup_orphaned_items(&self) -> Result<i32> {
|
|
let result = sqlx::query!(
|
|
r#"
|
|
DELETE FROM ocr_queue
|
|
WHERE document_id NOT IN (SELECT id FROM documents)
|
|
"#
|
|
)
|
|
.execute(&self.pool)
|
|
.await?;
|
|
|
|
Ok(result.rows_affected() as i32)
|
|
}
|
|
|
|
/// Get current health status (for API endpoints)
|
|
pub async fn get_current_health(&self) -> Result<DatabaseHealth> {
|
|
self.perform_health_check().await
|
|
}
|
|
|
|
/// Force a consistency check and cleanup
|
|
pub async fn force_cleanup(&self) -> Result<String> {
|
|
let reset_count = self.reset_stuck_jobs().await?;
|
|
let cleanup_count = self.cleanup_orphaned_items().await?;
|
|
|
|
// Refresh OCR stats
|
|
sqlx::query!("SELECT refresh_ocr_stats()")
|
|
.execute(&self.pool)
|
|
.await?;
|
|
|
|
Ok(format!(
|
|
"Cleanup completed: {} stuck jobs reset, {} orphaned items removed",
|
|
reset_count, cleanup_count
|
|
))
|
|
}
|
|
}
|
|
|
|
/// Alert configuration for different severity levels
|
|
#[derive(Debug, Clone)]
|
|
pub struct AlertConfig {
|
|
pub email_notifications: bool,
|
|
pub slack_webhook: Option<String>,
|
|
pub critical_alert_cooldown_minutes: u64,
|
|
pub warning_alert_cooldown_minutes: u64,
|
|
}
|
|
|
|
/// Alert manager for sending notifications
|
|
pub struct AlertManager {
|
|
config: AlertConfig,
|
|
last_critical_alert: std::sync::Mutex<Option<chrono::DateTime<chrono::Utc>>>,
|
|
last_warning_alert: std::sync::Mutex<Option<chrono::DateTime<chrono::Utc>>>,
|
|
}
|
|
|
|
impl AlertManager {
|
|
pub fn new(config: AlertConfig) -> Self {
|
|
Self {
|
|
config,
|
|
last_critical_alert: std::sync::Mutex::new(None),
|
|
last_warning_alert: std::sync::Mutex::new(None),
|
|
}
|
|
}
|
|
|
|
pub async fn send_alert(&self, health: &DatabaseHealth) -> Result<()> {
|
|
match health.overall_status {
|
|
HealthStatus::Critical => {
|
|
if self.should_send_critical_alert() {
|
|
self.send_critical_alert(health).await?;
|
|
self.update_last_critical_alert();
|
|
}
|
|
}
|
|
HealthStatus::Warning => {
|
|
if self.should_send_warning_alert() {
|
|
self.send_warning_alert(health).await?;
|
|
self.update_last_warning_alert();
|
|
}
|
|
}
|
|
_ => {}
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
fn should_send_critical_alert(&self) -> bool {
|
|
let last_alert = self.last_critical_alert.lock().unwrap();
|
|
match *last_alert {
|
|
Some(last) => {
|
|
let cooldown = chrono::Duration::minutes(self.config.critical_alert_cooldown_minutes as i64);
|
|
chrono::Utc::now() - last > cooldown
|
|
}
|
|
None => true,
|
|
}
|
|
}
|
|
|
|
fn should_send_warning_alert(&self) -> bool {
|
|
let last_alert = self.last_warning_alert.lock().unwrap();
|
|
match *last_alert {
|
|
Some(last) => {
|
|
let cooldown = chrono::Duration::minutes(self.config.warning_alert_cooldown_minutes as i64);
|
|
chrono::Utc::now() - last > cooldown
|
|
}
|
|
None => true,
|
|
}
|
|
}
|
|
|
|
async fn send_critical_alert(&self, health: &DatabaseHealth) -> Result<()> {
|
|
let message = format!(
|
|
"🚨 CRITICAL DATABASE ALERT 🚨\n\
|
|
Stuck OCR jobs: {}\n\
|
|
Pool utilization: {}%\n\
|
|
Orphaned queue items: {}\n\
|
|
Timestamp: {}",
|
|
health.ocr_processing.stuck_jobs,
|
|
health.connection_pool.utilization_percent,
|
|
health.data_consistency.orphaned_queue_items,
|
|
health.timestamp
|
|
);
|
|
|
|
error!("{}", message);
|
|
// Add actual notification sending logic here (email, Slack, etc.)
|
|
Ok(())
|
|
}
|
|
|
|
async fn send_warning_alert(&self, health: &DatabaseHealth) -> Result<()> {
|
|
let message = format!(
|
|
"⚠️ Database Warning\n\
|
|
Pending OCR jobs: {}\n\
|
|
Pool utilization: {}%\n\
|
|
Average confidence: {:.1}%\n\
|
|
Timestamp: {}",
|
|
health.ocr_processing.pending_jobs,
|
|
health.connection_pool.utilization_percent,
|
|
health.ocr_processing.average_confidence.unwrap_or(0.0),
|
|
health.timestamp
|
|
);
|
|
|
|
warn!("{}", message);
|
|
Ok(())
|
|
}
|
|
|
|
fn update_last_critical_alert(&self) {
|
|
let mut last_alert = self.last_critical_alert.lock().unwrap();
|
|
*last_alert = Some(chrono::Utc::now());
|
|
}
|
|
|
|
fn update_last_warning_alert(&self) {
|
|
let mut last_alert = self.last_warning_alert.lock().unwrap();
|
|
*last_alert = Some(chrono::Utc::now());
|
|
}
|
|
} |