From 8ebffe4aa3a638b9c25e2a70166bbae958c9fc51 Mon Sep 17 00:00:00 2001 From: perf3ct Date: Sun, 15 Jun 2025 05:04:48 +0000 Subject: [PATCH] feat(db): try to improve db queue and number of connections --- src/db.rs | 174 +++++++++++++++++++------------ src/routes/metrics.rs | 63 ++++++----- src/routes/prometheus_metrics.rs | 6 +- 3 files changed, 148 insertions(+), 95 deletions(-) diff --git a/src/db.rs b/src/db.rs index f3a538e..b3283f7 100644 --- a/src/db.rs +++ b/src/db.rs @@ -3,6 +3,7 @@ use chrono::Utc; use sqlx::{PgPool, Row, postgres::PgPoolOptions}; use std::time::Duration; use uuid::Uuid; +use tokio::time::{sleep, timeout}; use crate::models::{CreateUser, Document, SearchRequest, SearchMode, SearchSnippet, HighlightRange, EnhancedDocumentResponse, User}; @@ -14,10 +15,11 @@ pub struct Database { impl Database { pub async fn new(database_url: &str) -> Result { let pool = PgPoolOptions::new() - .max_connections(20) // Increase from default 10 - .acquire_timeout(Duration::from_secs(3)) // 3 second timeout + .max_connections(50) // Increased from 20 to handle more concurrent requests + .acquire_timeout(Duration::from_secs(10)) // Increased from 3 to 10 seconds .idle_timeout(Duration::from_secs(600)) // 10 minute idle timeout .max_lifetime(Duration::from_secs(1800)) // 30 minute max lifetime + .min_connections(5) // Maintain minimum connections .connect(database_url) .await?; Ok(Self { pool }) @@ -27,6 +29,38 @@ impl Database { &self.pool } + pub async fn with_retry(&self, operation: F) -> Result + where + F: Fn() -> Fut, + Fut: std::future::Future>, + { + const MAX_RETRIES: usize = 3; + const BASE_DELAY_MS: u64 = 100; + + for attempt in 0..MAX_RETRIES { + match timeout(Duration::from_secs(15), operation()).await { + Ok(Ok(result)) => return Ok(result), + Ok(Err(e)) if attempt == MAX_RETRIES - 1 => return Err(e), + Ok(Err(e)) => { + tracing::warn!("Database operation failed, attempt {} of {}: {}", attempt + 1, MAX_RETRIES, e); + } + Err(_) if attempt == MAX_RETRIES - 1 => { + return Err(anyhow::anyhow!("Database operation timed out after {} retries", MAX_RETRIES)); + } + Err(_) => { + tracing::warn!("Database operation timed out, attempt {} of {}", attempt + 1, MAX_RETRIES); + } + } + + // Exponential backoff with jitter + let delay_ms = BASE_DELAY_MS * (2_u64.pow(attempt as u32)); + let jitter = (std::ptr::addr_of!(attempt) as usize) % (delay_ms as usize / 2 + 1); + sleep(Duration::from_millis(delay_ms + jitter as u64)).await; + } + + unreachable!() + } + pub async fn migrate(&self) -> Result<()> { // Create extensions sqlx::query(r#"CREATE EXTENSION IF NOT EXISTS "uuid-ossp""#) @@ -1217,22 +1251,24 @@ impl Database { } pub async fn get_user_settings(&self, user_id: Uuid) -> Result> { - let row = sqlx::query( - r#"SELECT id, user_id, ocr_language, concurrent_ocr_jobs, ocr_timeout_seconds, - max_file_size_mb, allowed_file_types, auto_rotate_images, enable_image_preprocessing, - search_results_per_page, search_snippet_length, fuzzy_search_threshold, - retention_days, enable_auto_cleanup, enable_compression, memory_limit_mb, - cpu_priority, enable_background_ocr, ocr_page_segmentation_mode, ocr_engine_mode, - ocr_min_confidence, ocr_dpi, ocr_enhance_contrast, ocr_remove_noise, - ocr_detect_orientation, ocr_whitelist_chars, ocr_blacklist_chars, - webdav_enabled, webdav_server_url, webdav_username, webdav_password, - webdav_watch_folders, webdav_file_extensions, webdav_auto_sync, webdav_sync_interval_minutes, - created_at, updated_at - FROM settings WHERE user_id = $1"# - ) - .bind(user_id) - .fetch_optional(&self.pool) - .await?; + self.with_retry(|| async { + let row = sqlx::query( + r#"SELECT id, user_id, ocr_language, concurrent_ocr_jobs, ocr_timeout_seconds, + max_file_size_mb, allowed_file_types, auto_rotate_images, enable_image_preprocessing, + search_results_per_page, search_snippet_length, fuzzy_search_threshold, + retention_days, enable_auto_cleanup, enable_compression, memory_limit_mb, + cpu_priority, enable_background_ocr, ocr_page_segmentation_mode, ocr_engine_mode, + ocr_min_confidence, ocr_dpi, ocr_enhance_contrast, ocr_remove_noise, + ocr_detect_orientation, ocr_whitelist_chars, ocr_blacklist_chars, + webdav_enabled, webdav_server_url, webdav_username, webdav_password, + webdav_watch_folders, webdav_file_extensions, webdav_auto_sync, webdav_sync_interval_minutes, + created_at, updated_at + FROM settings WHERE user_id = $1"# + ) + .bind(user_id) + .fetch_optional(&self.pool) + .await + .map_err(|e| anyhow::anyhow!("Database query failed: {}", e))?; match row { Some(row) => Ok(Some(crate::models::Settings { @@ -1276,6 +1312,7 @@ impl Database { })), None => Ok(None), } + }).await } pub async fn get_all_user_settings(&self) -> Result> { @@ -1495,19 +1532,21 @@ impl Database { // Notification methods pub async fn create_notification(&self, user_id: Uuid, notification: &crate::models::CreateNotification) -> Result { - let row = sqlx::query( - r#"INSERT INTO notifications (user_id, notification_type, title, message, action_url, metadata) - VALUES ($1, $2, $3, $4, $5, $6) - RETURNING id, user_id, notification_type, title, message, read, action_url, metadata, created_at"# - ) - .bind(user_id) - .bind(¬ification.notification_type) - .bind(¬ification.title) - .bind(¬ification.message) - .bind(¬ification.action_url) - .bind(¬ification.metadata) - .fetch_one(&self.pool) - .await?; + self.with_retry(|| async { + let row = sqlx::query( + r#"INSERT INTO notifications (user_id, notification_type, title, message, action_url, metadata) + VALUES ($1, $2, $3, $4, $5, $6) + RETURNING id, user_id, notification_type, title, message, read, action_url, metadata, created_at"# + ) + .bind(user_id) + .bind(¬ification.notification_type) + .bind(¬ification.title) + .bind(¬ification.message) + .bind(¬ification.action_url) + .bind(¬ification.metadata) + .fetch_one(&self.pool) + .await + .map_err(|e| anyhow::anyhow!("Database insert failed: {}", e))?; Ok(crate::models::Notification { id: row.get("id"), @@ -1520,6 +1559,7 @@ impl Database { metadata: row.get("metadata"), created_at: row.get("created_at"), }) + }).await } pub async fn get_user_notifications(&self, user_id: Uuid, limit: i64, offset: i64) -> Result> { @@ -1604,14 +1644,16 @@ impl Database { // WebDAV sync state operations pub async fn get_webdav_sync_state(&self, user_id: Uuid) -> Result> { - let row = sqlx::query( - r#"SELECT id, user_id, last_sync_at, sync_cursor, is_running, files_processed, - files_remaining, current_folder, errors, created_at, updated_at - FROM webdav_sync_state WHERE user_id = $1"# - ) - .bind(user_id) - .fetch_optional(&self.pool) - .await?; + self.with_retry(|| async { + let row = sqlx::query( + r#"SELECT id, user_id, last_sync_at, sync_cursor, is_running, files_processed, + files_remaining, current_folder, errors, created_at, updated_at + FROM webdav_sync_state WHERE user_id = $1"# + ) + .bind(user_id) + .fetch_optional(&self.pool) + .await + .map_err(|e| anyhow::anyhow!("Database query failed: {}", e))?; match row { Some(row) => Ok(Some(crate::models::WebDAVSyncState { @@ -1629,35 +1671,39 @@ impl Database { })), None => Ok(None), } + }).await } pub async fn update_webdav_sync_state(&self, user_id: Uuid, state: &crate::models::UpdateWebDAVSyncState) -> Result<()> { - sqlx::query( - r#"INSERT INTO webdav_sync_state (user_id, last_sync_at, sync_cursor, is_running, - files_processed, files_remaining, current_folder, errors, updated_at) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, NOW()) - ON CONFLICT (user_id) DO UPDATE SET - last_sync_at = EXCLUDED.last_sync_at, - sync_cursor = EXCLUDED.sync_cursor, - is_running = EXCLUDED.is_running, - files_processed = EXCLUDED.files_processed, - files_remaining = EXCLUDED.files_remaining, - current_folder = EXCLUDED.current_folder, - errors = EXCLUDED.errors, - updated_at = NOW()"# - ) - .bind(user_id) - .bind(state.last_sync_at) - .bind(&state.sync_cursor) - .bind(state.is_running) - .bind(state.files_processed) - .bind(state.files_remaining) - .bind(&state.current_folder) - .bind(&state.errors) - .execute(&self.pool) - .await?; + self.with_retry(|| async { + sqlx::query( + r#"INSERT INTO webdav_sync_state (user_id, last_sync_at, sync_cursor, is_running, + files_processed, files_remaining, current_folder, errors, updated_at) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, NOW()) + ON CONFLICT (user_id) DO UPDATE SET + last_sync_at = EXCLUDED.last_sync_at, + sync_cursor = EXCLUDED.sync_cursor, + is_running = EXCLUDED.is_running, + files_processed = EXCLUDED.files_processed, + files_remaining = EXCLUDED.files_remaining, + current_folder = EXCLUDED.current_folder, + errors = EXCLUDED.errors, + updated_at = NOW()"# + ) + .bind(user_id) + .bind(state.last_sync_at) + .bind(&state.sync_cursor) + .bind(state.is_running) + .bind(state.files_processed) + .bind(state.files_remaining) + .bind(&state.current_folder) + .bind(&state.errors) + .execute(&self.pool) + .await + .map_err(|e| anyhow::anyhow!("Database update failed: {}", e))?; - Ok(()) + Ok(()) + }).await } // Reset any running WebDAV syncs on startup (handles server restart during sync) diff --git a/src/routes/metrics.rs b/src/routes/metrics.rs index 1ba6ede..a0cd291 100644 --- a/src/routes/metrics.rs +++ b/src/routes/metrics.rs @@ -152,43 +152,50 @@ async fn collect_ocr_metrics(state: &Arc) -> Result) -> Result { - // Get total document count - let total_docs = sqlx::query_scalar::<_, i64>("SELECT COUNT(*) FROM documents") - .fetch_one(&state.db.pool) - .await - .map_err(|e| { - tracing::error!("Failed to get total document count: {}", e); - StatusCode::INTERNAL_SERVER_ERROR - })?; + // Get total document count using retry mechanism + let total_docs = state.db.with_retry(|| async { + sqlx::query_scalar::<_, i64>("SELECT COUNT(*) FROM documents") + .fetch_one(&state.db.pool) + .await + .map_err(|e| anyhow::anyhow!("Failed to get total document count: {}", e)) + }).await.map_err(|e| { + tracing::error!("Failed to get total document count: {}", e); + StatusCode::INTERNAL_SERVER_ERROR + })?; // Get documents uploaded today - let docs_today = sqlx::query_scalar::<_, i64>( - "SELECT COUNT(*) FROM documents WHERE DATE(created_at) = CURRENT_DATE" - ) - .fetch_one(&state.db.pool) - .await - .map_err(|e| { + let docs_today = state.db.with_retry(|| async { + sqlx::query_scalar::<_, i64>( + "SELECT COUNT(*) FROM documents WHERE DATE(created_at) = CURRENT_DATE" + ) + .fetch_one(&state.db.pool) + .await + .map_err(|e| anyhow::anyhow!("Failed to get today's document count: {}", e)) + }).await.map_err(|e| { tracing::error!("Failed to get today's document count: {}", e); StatusCode::INTERNAL_SERVER_ERROR })?; // Get total storage size - let total_size = sqlx::query_scalar::<_, Option>("SELECT SUM(file_size) FROM documents") - .fetch_one(&state.db.pool) - .await - .map_err(|e| { - tracing::error!("Failed to get total storage size: {}", e); - StatusCode::INTERNAL_SERVER_ERROR - })? - .unwrap_or(0); + let total_size = state.db.with_retry(|| async { + sqlx::query_scalar::<_, Option>("SELECT CAST(COALESCE(SUM(file_size), 0) AS DOUBLE PRECISION) FROM documents") + .fetch_one(&state.db.pool) + .await + .map_err(|e| anyhow::anyhow!("Failed to get total storage size: {}", e)) + }).await.map_err(|e| { + tracing::error!("Failed to get total storage size: {}", e); + StatusCode::INTERNAL_SERVER_ERROR + })?.unwrap_or(0.0) as i64; // Get documents with and without OCR - let docs_with_ocr = sqlx::query_scalar::<_, i64>( - "SELECT COUNT(*) FROM documents WHERE has_ocr_text = true" - ) - .fetch_one(&state.db.pool) - .await - .map_err(|e| { + let docs_with_ocr = state.db.with_retry(|| async { + sqlx::query_scalar::<_, i64>( + "SELECT COUNT(*) FROM documents WHERE ocr_text IS NOT NULL AND ocr_text != ''" + ) + .fetch_one(&state.db.pool) + .await + .map_err(|e| anyhow::anyhow!("Failed to get OCR document count: {}", e)) + }).await.map_err(|e| { tracing::error!("Failed to get OCR document count: {}", e); StatusCode::INTERNAL_SERVER_ERROR })?; diff --git a/src/routes/prometheus_metrics.rs b/src/routes/prometheus_metrics.rs index 39bf9c8..cd6e780 100644 --- a/src/routes/prometheus_metrics.rs +++ b/src/routes/prometheus_metrics.rs @@ -142,18 +142,18 @@ async fn collect_document_metrics(state: &Arc) -> Result>("SELECT SUM(file_size) FROM documents") + let total_size = sqlx::query_scalar::<_, Option>("SELECT CAST(COALESCE(SUM(file_size), 0) AS DOUBLE PRECISION) FROM documents") .fetch_one(&state.db.pool) .await .map_err(|e| { tracing::error!("Failed to get total storage size: {}", e); StatusCode::INTERNAL_SERVER_ERROR })? - .unwrap_or(0); + .unwrap_or(0.0) as i64; // Get documents with and without OCR let docs_with_ocr = sqlx::query_scalar::<_, i64>( - "SELECT COUNT(*) FROM documents WHERE has_ocr_text = true" + "SELECT COUNT(*) FROM documents WHERE ocr_text IS NOT NULL AND ocr_text != ''" ) .fetch_one(&state.db.pool) .await