feat(db): try to improve db queue and number of connections

This commit is contained in:
perf3ct 2025-06-15 05:04:48 +00:00
parent a8baa671ec
commit 8ebffe4aa3
3 changed files with 148 additions and 95 deletions

174
src/db.rs
View File

@ -3,6 +3,7 @@ use chrono::Utc;
use sqlx::{PgPool, Row, postgres::PgPoolOptions}; use sqlx::{PgPool, Row, postgres::PgPoolOptions};
use std::time::Duration; use std::time::Duration;
use uuid::Uuid; use uuid::Uuid;
use tokio::time::{sleep, timeout};
use crate::models::{CreateUser, Document, SearchRequest, SearchMode, SearchSnippet, HighlightRange, EnhancedDocumentResponse, User}; use crate::models::{CreateUser, Document, SearchRequest, SearchMode, SearchSnippet, HighlightRange, EnhancedDocumentResponse, User};
@ -14,10 +15,11 @@ pub struct Database {
impl Database { impl Database {
pub async fn new(database_url: &str) -> Result<Self> { pub async fn new(database_url: &str) -> Result<Self> {
let pool = PgPoolOptions::new() let pool = PgPoolOptions::new()
.max_connections(20) // Increase from default 10 .max_connections(50) // Increased from 20 to handle more concurrent requests
.acquire_timeout(Duration::from_secs(3)) // 3 second timeout .acquire_timeout(Duration::from_secs(10)) // Increased from 3 to 10 seconds
.idle_timeout(Duration::from_secs(600)) // 10 minute idle timeout .idle_timeout(Duration::from_secs(600)) // 10 minute idle timeout
.max_lifetime(Duration::from_secs(1800)) // 30 minute max lifetime .max_lifetime(Duration::from_secs(1800)) // 30 minute max lifetime
.min_connections(5) // Maintain minimum connections
.connect(database_url) .connect(database_url)
.await?; .await?;
Ok(Self { pool }) Ok(Self { pool })
@ -27,6 +29,38 @@ impl Database {
&self.pool &self.pool
} }
pub async fn with_retry<T, F, Fut>(&self, operation: F) -> Result<T>
where
F: Fn() -> Fut,
Fut: std::future::Future<Output = Result<T>>,
{
const MAX_RETRIES: usize = 3;
const BASE_DELAY_MS: u64 = 100;
for attempt in 0..MAX_RETRIES {
match timeout(Duration::from_secs(15), operation()).await {
Ok(Ok(result)) => return Ok(result),
Ok(Err(e)) if attempt == MAX_RETRIES - 1 => return Err(e),
Ok(Err(e)) => {
tracing::warn!("Database operation failed, attempt {} of {}: {}", attempt + 1, MAX_RETRIES, e);
}
Err(_) if attempt == MAX_RETRIES - 1 => {
return Err(anyhow::anyhow!("Database operation timed out after {} retries", MAX_RETRIES));
}
Err(_) => {
tracing::warn!("Database operation timed out, attempt {} of {}", attempt + 1, MAX_RETRIES);
}
}
// Exponential backoff with jitter
let delay_ms = BASE_DELAY_MS * (2_u64.pow(attempt as u32));
let jitter = (std::ptr::addr_of!(attempt) as usize) % (delay_ms as usize / 2 + 1);
sleep(Duration::from_millis(delay_ms + jitter as u64)).await;
}
unreachable!()
}
pub async fn migrate(&self) -> Result<()> { pub async fn migrate(&self) -> Result<()> {
// Create extensions // Create extensions
sqlx::query(r#"CREATE EXTENSION IF NOT EXISTS "uuid-ossp""#) sqlx::query(r#"CREATE EXTENSION IF NOT EXISTS "uuid-ossp""#)
@ -1217,22 +1251,24 @@ impl Database {
} }
pub async fn get_user_settings(&self, user_id: Uuid) -> Result<Option<crate::models::Settings>> { pub async fn get_user_settings(&self, user_id: Uuid) -> Result<Option<crate::models::Settings>> {
let row = sqlx::query( self.with_retry(|| async {
r#"SELECT id, user_id, ocr_language, concurrent_ocr_jobs, ocr_timeout_seconds, let row = sqlx::query(
max_file_size_mb, allowed_file_types, auto_rotate_images, enable_image_preprocessing, r#"SELECT id, user_id, ocr_language, concurrent_ocr_jobs, ocr_timeout_seconds,
search_results_per_page, search_snippet_length, fuzzy_search_threshold, max_file_size_mb, allowed_file_types, auto_rotate_images, enable_image_preprocessing,
retention_days, enable_auto_cleanup, enable_compression, memory_limit_mb, search_results_per_page, search_snippet_length, fuzzy_search_threshold,
cpu_priority, enable_background_ocr, ocr_page_segmentation_mode, ocr_engine_mode, retention_days, enable_auto_cleanup, enable_compression, memory_limit_mb,
ocr_min_confidence, ocr_dpi, ocr_enhance_contrast, ocr_remove_noise, cpu_priority, enable_background_ocr, ocr_page_segmentation_mode, ocr_engine_mode,
ocr_detect_orientation, ocr_whitelist_chars, ocr_blacklist_chars, ocr_min_confidence, ocr_dpi, ocr_enhance_contrast, ocr_remove_noise,
webdav_enabled, webdav_server_url, webdav_username, webdav_password, ocr_detect_orientation, ocr_whitelist_chars, ocr_blacklist_chars,
webdav_watch_folders, webdav_file_extensions, webdav_auto_sync, webdav_sync_interval_minutes, webdav_enabled, webdav_server_url, webdav_username, webdav_password,
created_at, updated_at webdav_watch_folders, webdav_file_extensions, webdav_auto_sync, webdav_sync_interval_minutes,
FROM settings WHERE user_id = $1"# created_at, updated_at
) FROM settings WHERE user_id = $1"#
.bind(user_id) )
.fetch_optional(&self.pool) .bind(user_id)
.await?; .fetch_optional(&self.pool)
.await
.map_err(|e| anyhow::anyhow!("Database query failed: {}", e))?;
match row { match row {
Some(row) => Ok(Some(crate::models::Settings { Some(row) => Ok(Some(crate::models::Settings {
@ -1276,6 +1312,7 @@ impl Database {
})), })),
None => Ok(None), None => Ok(None),
} }
}).await
} }
pub async fn get_all_user_settings(&self) -> Result<Vec<crate::models::Settings>> { pub async fn get_all_user_settings(&self) -> Result<Vec<crate::models::Settings>> {
@ -1495,19 +1532,21 @@ impl Database {
// Notification methods // Notification methods
pub async fn create_notification(&self, user_id: Uuid, notification: &crate::models::CreateNotification) -> Result<crate::models::Notification> { pub async fn create_notification(&self, user_id: Uuid, notification: &crate::models::CreateNotification) -> Result<crate::models::Notification> {
let row = sqlx::query( self.with_retry(|| async {
r#"INSERT INTO notifications (user_id, notification_type, title, message, action_url, metadata) let row = sqlx::query(
VALUES ($1, $2, $3, $4, $5, $6) r#"INSERT INTO notifications (user_id, notification_type, title, message, action_url, metadata)
RETURNING id, user_id, notification_type, title, message, read, action_url, metadata, created_at"# VALUES ($1, $2, $3, $4, $5, $6)
) RETURNING id, user_id, notification_type, title, message, read, action_url, metadata, created_at"#
.bind(user_id) )
.bind(&notification.notification_type) .bind(user_id)
.bind(&notification.title) .bind(&notification.notification_type)
.bind(&notification.message) .bind(&notification.title)
.bind(&notification.action_url) .bind(&notification.message)
.bind(&notification.metadata) .bind(&notification.action_url)
.fetch_one(&self.pool) .bind(&notification.metadata)
.await?; .fetch_one(&self.pool)
.await
.map_err(|e| anyhow::anyhow!("Database insert failed: {}", e))?;
Ok(crate::models::Notification { Ok(crate::models::Notification {
id: row.get("id"), id: row.get("id"),
@ -1520,6 +1559,7 @@ impl Database {
metadata: row.get("metadata"), metadata: row.get("metadata"),
created_at: row.get("created_at"), created_at: row.get("created_at"),
}) })
}).await
} }
pub async fn get_user_notifications(&self, user_id: Uuid, limit: i64, offset: i64) -> Result<Vec<crate::models::Notification>> { pub async fn get_user_notifications(&self, user_id: Uuid, limit: i64, offset: i64) -> Result<Vec<crate::models::Notification>> {
@ -1604,14 +1644,16 @@ impl Database {
// WebDAV sync state operations // WebDAV sync state operations
pub async fn get_webdav_sync_state(&self, user_id: Uuid) -> Result<Option<crate::models::WebDAVSyncState>> { pub async fn get_webdav_sync_state(&self, user_id: Uuid) -> Result<Option<crate::models::WebDAVSyncState>> {
let row = sqlx::query( self.with_retry(|| async {
r#"SELECT id, user_id, last_sync_at, sync_cursor, is_running, files_processed, let row = sqlx::query(
files_remaining, current_folder, errors, created_at, updated_at r#"SELECT id, user_id, last_sync_at, sync_cursor, is_running, files_processed,
FROM webdav_sync_state WHERE user_id = $1"# files_remaining, current_folder, errors, created_at, updated_at
) FROM webdav_sync_state WHERE user_id = $1"#
.bind(user_id) )
.fetch_optional(&self.pool) .bind(user_id)
.await?; .fetch_optional(&self.pool)
.await
.map_err(|e| anyhow::anyhow!("Database query failed: {}", e))?;
match row { match row {
Some(row) => Ok(Some(crate::models::WebDAVSyncState { Some(row) => Ok(Some(crate::models::WebDAVSyncState {
@ -1629,35 +1671,39 @@ impl Database {
})), })),
None => Ok(None), None => Ok(None),
} }
}).await
} }
pub async fn update_webdav_sync_state(&self, user_id: Uuid, state: &crate::models::UpdateWebDAVSyncState) -> Result<()> { pub async fn update_webdav_sync_state(&self, user_id: Uuid, state: &crate::models::UpdateWebDAVSyncState) -> Result<()> {
sqlx::query( self.with_retry(|| async {
r#"INSERT INTO webdav_sync_state (user_id, last_sync_at, sync_cursor, is_running, sqlx::query(
files_processed, files_remaining, current_folder, errors, updated_at) r#"INSERT INTO webdav_sync_state (user_id, last_sync_at, sync_cursor, is_running,
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, NOW()) files_processed, files_remaining, current_folder, errors, updated_at)
ON CONFLICT (user_id) DO UPDATE SET VALUES ($1, $2, $3, $4, $5, $6, $7, $8, NOW())
last_sync_at = EXCLUDED.last_sync_at, ON CONFLICT (user_id) DO UPDATE SET
sync_cursor = EXCLUDED.sync_cursor, last_sync_at = EXCLUDED.last_sync_at,
is_running = EXCLUDED.is_running, sync_cursor = EXCLUDED.sync_cursor,
files_processed = EXCLUDED.files_processed, is_running = EXCLUDED.is_running,
files_remaining = EXCLUDED.files_remaining, files_processed = EXCLUDED.files_processed,
current_folder = EXCLUDED.current_folder, files_remaining = EXCLUDED.files_remaining,
errors = EXCLUDED.errors, current_folder = EXCLUDED.current_folder,
updated_at = NOW()"# errors = EXCLUDED.errors,
) updated_at = NOW()"#
.bind(user_id) )
.bind(state.last_sync_at) .bind(user_id)
.bind(&state.sync_cursor) .bind(state.last_sync_at)
.bind(state.is_running) .bind(&state.sync_cursor)
.bind(state.files_processed) .bind(state.is_running)
.bind(state.files_remaining) .bind(state.files_processed)
.bind(&state.current_folder) .bind(state.files_remaining)
.bind(&state.errors) .bind(&state.current_folder)
.execute(&self.pool) .bind(&state.errors)
.await?; .execute(&self.pool)
.await
.map_err(|e| anyhow::anyhow!("Database update failed: {}", e))?;
Ok(()) Ok(())
}).await
} }
// Reset any running WebDAV syncs on startup (handles server restart during sync) // Reset any running WebDAV syncs on startup (handles server restart during sync)

View File

@ -152,43 +152,50 @@ async fn collect_ocr_metrics(state: &Arc<AppState>) -> Result<OcrMetrics, Status
} }
async fn collect_document_metrics(state: &Arc<AppState>) -> Result<DocumentMetrics, StatusCode> { async fn collect_document_metrics(state: &Arc<AppState>) -> Result<DocumentMetrics, StatusCode> {
// Get total document count // Get total document count using retry mechanism
let total_docs = sqlx::query_scalar::<_, i64>("SELECT COUNT(*) FROM documents") let total_docs = state.db.with_retry(|| async {
.fetch_one(&state.db.pool) sqlx::query_scalar::<_, i64>("SELECT COUNT(*) FROM documents")
.await .fetch_one(&state.db.pool)
.map_err(|e| { .await
tracing::error!("Failed to get total document count: {}", e); .map_err(|e| anyhow::anyhow!("Failed to get total document count: {}", e))
StatusCode::INTERNAL_SERVER_ERROR }).await.map_err(|e| {
})?; tracing::error!("Failed to get total document count: {}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
// Get documents uploaded today // Get documents uploaded today
let docs_today = sqlx::query_scalar::<_, i64>( let docs_today = state.db.with_retry(|| async {
"SELECT COUNT(*) FROM documents WHERE DATE(created_at) = CURRENT_DATE" sqlx::query_scalar::<_, i64>(
) "SELECT COUNT(*) FROM documents WHERE DATE(created_at) = CURRENT_DATE"
.fetch_one(&state.db.pool) )
.await .fetch_one(&state.db.pool)
.map_err(|e| { .await
.map_err(|e| anyhow::anyhow!("Failed to get today's document count: {}", e))
}).await.map_err(|e| {
tracing::error!("Failed to get today's document count: {}", e); tracing::error!("Failed to get today's document count: {}", e);
StatusCode::INTERNAL_SERVER_ERROR StatusCode::INTERNAL_SERVER_ERROR
})?; })?;
// Get total storage size // Get total storage size
let total_size = sqlx::query_scalar::<_, Option<i64>>("SELECT SUM(file_size) FROM documents") let total_size = state.db.with_retry(|| async {
.fetch_one(&state.db.pool) sqlx::query_scalar::<_, Option<f64>>("SELECT CAST(COALESCE(SUM(file_size), 0) AS DOUBLE PRECISION) FROM documents")
.await .fetch_one(&state.db.pool)
.map_err(|e| { .await
tracing::error!("Failed to get total storage size: {}", e); .map_err(|e| anyhow::anyhow!("Failed to get total storage size: {}", e))
StatusCode::INTERNAL_SERVER_ERROR }).await.map_err(|e| {
})? tracing::error!("Failed to get total storage size: {}", e);
.unwrap_or(0); StatusCode::INTERNAL_SERVER_ERROR
})?.unwrap_or(0.0) as i64;
// Get documents with and without OCR // Get documents with and without OCR
let docs_with_ocr = sqlx::query_scalar::<_, i64>( let docs_with_ocr = state.db.with_retry(|| async {
"SELECT COUNT(*) FROM documents WHERE has_ocr_text = true" sqlx::query_scalar::<_, i64>(
) "SELECT COUNT(*) FROM documents WHERE ocr_text IS NOT NULL AND ocr_text != ''"
.fetch_one(&state.db.pool) )
.await .fetch_one(&state.db.pool)
.map_err(|e| { .await
.map_err(|e| anyhow::anyhow!("Failed to get OCR document count: {}", e))
}).await.map_err(|e| {
tracing::error!("Failed to get OCR document count: {}", e); tracing::error!("Failed to get OCR document count: {}", e);
StatusCode::INTERNAL_SERVER_ERROR StatusCode::INTERNAL_SERVER_ERROR
})?; })?;

View File

@ -142,18 +142,18 @@ async fn collect_document_metrics(state: &Arc<AppState>) -> Result<DocumentMetri
})?; })?;
// Get total storage size // Get total storage size
let total_size = sqlx::query_scalar::<_, Option<i64>>("SELECT SUM(file_size) FROM documents") let total_size = sqlx::query_scalar::<_, Option<f64>>("SELECT CAST(COALESCE(SUM(file_size), 0) AS DOUBLE PRECISION) FROM documents")
.fetch_one(&state.db.pool) .fetch_one(&state.db.pool)
.await .await
.map_err(|e| { .map_err(|e| {
tracing::error!("Failed to get total storage size: {}", e); tracing::error!("Failed to get total storage size: {}", e);
StatusCode::INTERNAL_SERVER_ERROR StatusCode::INTERNAL_SERVER_ERROR
})? })?
.unwrap_or(0); .unwrap_or(0.0) as i64;
// Get documents with and without OCR // Get documents with and without OCR
let docs_with_ocr = sqlx::query_scalar::<_, i64>( let docs_with_ocr = sqlx::query_scalar::<_, i64>(
"SELECT COUNT(*) FROM documents WHERE has_ocr_text = true" "SELECT COUNT(*) FROM documents WHERE ocr_text IS NOT NULL AND ocr_text != ''"
) )
.fetch_one(&state.db.pool) .fetch_one(&state.db.pool)
.await .await