use anyhow::Result; use chrono::Utc; use sqlx::{PgPool, Row, postgres::PgPoolOptions}; use std::time::Duration; use uuid::Uuid; use tokio::time::{sleep, timeout}; use crate::models::{CreateUser, Document, SearchRequest, SearchMode, SearchSnippet, HighlightRange, EnhancedDocumentResponse, User}; #[derive(Clone)] pub struct Database { pub pool: PgPool, } impl Database { pub async fn new(database_url: &str) -> Result { let pool = PgPoolOptions::new() .max_connections(50) // Increased from 20 to handle more concurrent requests .acquire_timeout(Duration::from_secs(10)) // Increased from 3 to 10 seconds .idle_timeout(Duration::from_secs(600)) // 10 minute idle timeout .max_lifetime(Duration::from_secs(1800)) // 30 minute max lifetime .min_connections(5) // Maintain minimum connections .connect(database_url) .await?; Ok(Self { pool }) } pub async fn new_with_pool_config(database_url: &str, max_connections: u32, min_connections: u32) -> Result { let pool = PgPoolOptions::new() .max_connections(max_connections) .acquire_timeout(Duration::from_secs(10)) .idle_timeout(Duration::from_secs(600)) .max_lifetime(Duration::from_secs(1800)) .min_connections(min_connections) .connect(database_url) .await?; Ok(Self { pool }) } pub fn get_pool(&self) -> &PgPool { &self.pool } pub async fn with_retry(&self, operation: F) -> Result where F: Fn() -> Fut, Fut: std::future::Future>, { const MAX_RETRIES: usize = 3; const BASE_DELAY_MS: u64 = 100; for attempt in 0..MAX_RETRIES { match timeout(Duration::from_secs(15), operation()).await { Ok(Ok(result)) => return Ok(result), Ok(Err(e)) if attempt == MAX_RETRIES - 1 => return Err(e), Ok(Err(e)) => { tracing::warn!("Database operation failed, attempt {} of {}: {}", attempt + 1, MAX_RETRIES, e); } Err(_) if attempt == MAX_RETRIES - 1 => { return Err(anyhow::anyhow!("Database operation timed out after {} retries", MAX_RETRIES)); } Err(_) => { tracing::warn!("Database operation timed out, attempt {} of {}", attempt + 1, MAX_RETRIES); } } // Exponential backoff with jitter let delay_ms = BASE_DELAY_MS * (2_u64.pow(attempt as u32)); let jitter = (std::ptr::addr_of!(attempt) as usize) % (delay_ms as usize / 2 + 1); sleep(Duration::from_millis(delay_ms + jitter as u64)).await; } unreachable!() } pub async fn migrate(&self) -> Result<()> { // Create extensions sqlx::query(r#"CREATE EXTENSION IF NOT EXISTS "uuid-ossp""#) .execute(&self.pool) .await?; sqlx::query(r#"CREATE EXTENSION IF NOT EXISTS "pg_trgm""#) .execute(&self.pool) .await?; // Create users table sqlx::query( r#" CREATE TABLE IF NOT EXISTS users ( id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), username VARCHAR(255) UNIQUE NOT NULL, email VARCHAR(255) UNIQUE NOT NULL, password_hash VARCHAR(255) NOT NULL, role VARCHAR(10) DEFAULT 'user', created_at TIMESTAMPTZ DEFAULT NOW(), updated_at TIMESTAMPTZ DEFAULT NOW() ) "#, ) .execute(&self.pool) .await?; // Create documents table sqlx::query( r#" CREATE TABLE IF NOT EXISTS documents ( id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), filename VARCHAR(255) NOT NULL, original_filename VARCHAR(255) NOT NULL, file_path VARCHAR(500) NOT NULL, file_size BIGINT NOT NULL, mime_type VARCHAR(100) NOT NULL, content TEXT, ocr_text TEXT, tags TEXT[] DEFAULT '{}', created_at TIMESTAMPTZ DEFAULT NOW(), updated_at TIMESTAMPTZ DEFAULT NOW(), user_id UUID REFERENCES users(id) ON DELETE CASCADE ) "#, ) .execute(&self.pool) .await?; // Create indexes sqlx::query(r#"CREATE INDEX IF NOT EXISTS idx_documents_user_id ON documents(user_id)"#) .execute(&self.pool) .await?; sqlx::query(r#"CREATE INDEX IF NOT EXISTS idx_documents_filename ON documents(filename)"#) .execute(&self.pool) .await?; sqlx::query(r#"CREATE INDEX IF NOT EXISTS idx_documents_mime_type ON documents(mime_type)"#) .execute(&self.pool) .await?; sqlx::query(r#"CREATE INDEX IF NOT EXISTS idx_documents_tags ON documents USING GIN(tags)"#) .execute(&self.pool) .await?; sqlx::query(r#"CREATE INDEX IF NOT EXISTS idx_documents_content_search ON documents USING GIN(to_tsvector('english', COALESCE(content, '') || ' ' || COALESCE(ocr_text, '')))"#) .execute(&self.pool) .await?; // Enhanced indexes for substring matching and similarity sqlx::query(r#"CREATE INDEX IF NOT EXISTS idx_documents_filename_trgm ON documents USING GIN(filename gin_trgm_ops)"#) .execute(&self.pool) .await?; sqlx::query(r#"CREATE INDEX IF NOT EXISTS idx_documents_content_trgm ON documents USING GIN((COALESCE(content, '') || ' ' || COALESCE(ocr_text, '')) gin_trgm_ops)"#) .execute(&self.pool) .await?; // Create settings table sqlx::query( r#" CREATE TABLE IF NOT EXISTS settings ( id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), user_id UUID REFERENCES users(id) ON DELETE CASCADE UNIQUE, ocr_language VARCHAR(10) DEFAULT 'eng', concurrent_ocr_jobs INT DEFAULT 4, ocr_timeout_seconds INT DEFAULT 300, max_file_size_mb INT DEFAULT 50, allowed_file_types TEXT[] DEFAULT ARRAY['pdf', 'png', 'jpg', 'jpeg', 'tiff', 'bmp', 'txt'], auto_rotate_images BOOLEAN DEFAULT TRUE, enable_image_preprocessing BOOLEAN DEFAULT TRUE, search_results_per_page INT DEFAULT 25, search_snippet_length INT DEFAULT 200, fuzzy_search_threshold REAL DEFAULT 0.8, retention_days INT, enable_auto_cleanup BOOLEAN DEFAULT FALSE, enable_compression BOOLEAN DEFAULT FALSE, memory_limit_mb INT DEFAULT 512, cpu_priority VARCHAR(10) DEFAULT 'normal', enable_background_ocr BOOLEAN DEFAULT TRUE, ocr_page_segmentation_mode INT DEFAULT 3, ocr_engine_mode INT DEFAULT 3, ocr_min_confidence REAL DEFAULT 30.0, ocr_dpi INT DEFAULT 300, ocr_enhance_contrast BOOLEAN DEFAULT TRUE, ocr_remove_noise BOOLEAN DEFAULT TRUE, ocr_detect_orientation BOOLEAN DEFAULT TRUE, ocr_whitelist_chars TEXT, ocr_blacklist_chars TEXT, webdav_enabled BOOLEAN DEFAULT FALSE, webdav_server_url TEXT, webdav_username TEXT, webdav_password TEXT, webdav_watch_folders TEXT[] DEFAULT ARRAY['/Documents']::TEXT[], webdav_file_extensions TEXT[] DEFAULT ARRAY['pdf', 'png', 'jpg', 'jpeg', 'tiff', 'bmp', 'txt']::TEXT[], webdav_auto_sync BOOLEAN DEFAULT FALSE, webdav_sync_interval_minutes INTEGER DEFAULT 60, created_at TIMESTAMPTZ DEFAULT NOW(), updated_at TIMESTAMPTZ DEFAULT NOW() ) "#, ) .execute(&self.pool) .await?; // Run OCR queue migration - execute each statement separately self.run_ocr_queue_migration().await?; Ok(()) } async fn run_ocr_queue_migration(&self) -> Result<()> { // Create OCR queue table sqlx::query( r#" CREATE TABLE IF NOT EXISTS ocr_queue ( id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), document_id UUID REFERENCES documents(id) ON DELETE CASCADE, status VARCHAR(20) DEFAULT 'pending', priority INT DEFAULT 5, attempts INT DEFAULT 0, max_attempts INT DEFAULT 3, created_at TIMESTAMPTZ DEFAULT NOW(), started_at TIMESTAMPTZ, completed_at TIMESTAMPTZ, error_message TEXT, worker_id VARCHAR(100), processing_time_ms INT, file_size BIGINT, CONSTRAINT check_status CHECK (status IN ('pending', 'processing', 'completed', 'failed', 'cancelled')) ) "# ) .execute(&self.pool) .await?; // Create indexes sqlx::query("CREATE INDEX IF NOT EXISTS idx_ocr_queue_status ON ocr_queue(status, priority DESC, created_at)") .execute(&self.pool) .await?; sqlx::query("CREATE INDEX IF NOT EXISTS idx_ocr_queue_document_id ON ocr_queue(document_id)") .execute(&self.pool) .await?; sqlx::query("CREATE INDEX IF NOT EXISTS idx_ocr_queue_worker ON ocr_queue(worker_id) WHERE status = 'processing'") .execute(&self.pool) .await?; sqlx::query("CREATE INDEX IF NOT EXISTS idx_ocr_queue_created_at ON ocr_queue(created_at) WHERE status = 'pending'") .execute(&self.pool) .await?; // Add columns to documents table sqlx::query("ALTER TABLE documents ADD COLUMN IF NOT EXISTS ocr_status VARCHAR(20) DEFAULT 'pending'") .execute(&self.pool) .await?; sqlx::query("ALTER TABLE documents ADD COLUMN IF NOT EXISTS ocr_error TEXT") .execute(&self.pool) .await?; sqlx::query("ALTER TABLE documents ADD COLUMN IF NOT EXISTS ocr_completed_at TIMESTAMPTZ") .execute(&self.pool) .await?; // Create metrics table sqlx::query( r#" CREATE TABLE IF NOT EXISTS ocr_metrics ( id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), date DATE DEFAULT CURRENT_DATE, hour INT DEFAULT EXTRACT(HOUR FROM NOW()), total_processed INT DEFAULT 0, total_failed INT DEFAULT 0, total_retried INT DEFAULT 0, avg_processing_time_ms INT, max_processing_time_ms INT, min_processing_time_ms INT, queue_depth INT, active_workers INT, UNIQUE(date, hour) ) "# ) .execute(&self.pool) .await?; // Create the statistics function sqlx::query( r#" CREATE OR REPLACE FUNCTION get_ocr_queue_stats() RETURNS TABLE ( pending_count BIGINT, processing_count BIGINT, failed_count BIGINT, completed_today BIGINT, avg_wait_time_minutes DOUBLE PRECISION, oldest_pending_minutes DOUBLE PRECISION ) AS $$ BEGIN RETURN QUERY SELECT COUNT(*) FILTER (WHERE status = 'pending') as pending_count, COUNT(*) FILTER (WHERE status = 'processing') as processing_count, COUNT(*) FILTER (WHERE status = 'failed' AND attempts >= max_attempts) as failed_count, COUNT(*) FILTER (WHERE status = 'completed' AND completed_at >= CURRENT_DATE) as completed_today, AVG(EXTRACT(EPOCH FROM (COALESCE(started_at, NOW()) - created_at))/60) FILTER (WHERE status IN ('processing', 'completed')) as avg_wait_time_minutes, MAX(EXTRACT(EPOCH FROM (NOW() - created_at))/60) FILTER (WHERE status = 'pending') as oldest_pending_minutes FROM ocr_queue; END; $$ LANGUAGE plpgsql "# ) .execute(&self.pool) .await?; Ok(()) } pub async fn create_user(&self, user: CreateUser) -> Result { let password_hash = bcrypt::hash(&user.password, 12)?; let now = Utc::now(); let row = sqlx::query( r#" INSERT INTO users (username, email, password_hash, role, created_at, updated_at) VALUES ($1, $2, $3, $4, $5, $6) RETURNING id, username, email, password_hash, role, created_at, updated_at "# ) .bind(&user.username) .bind(&user.email) .bind(&password_hash) .bind(user.role.as_ref().unwrap_or(&crate::models::UserRole::User).to_string()) .bind(now) .bind(now) .fetch_one(&self.pool) .await?; Ok(User { id: row.get("id"), username: row.get("username"), email: row.get("email"), password_hash: row.get("password_hash"), role: row.get::("role").try_into().unwrap_or(crate::models::UserRole::User), created_at: row.get("created_at"), updated_at: row.get("updated_at"), }) } pub async fn get_user_by_username(&self, username: &str) -> Result> { let row = sqlx::query( "SELECT id, username, email, password_hash, role, created_at, updated_at FROM users WHERE username = $1" ) .bind(username) .fetch_optional(&self.pool) .await?; match row { Some(row) => Ok(Some(User { id: row.get("id"), username: row.get("username"), email: row.get("email"), password_hash: row.get("password_hash"), role: row.get::("role").try_into().unwrap_or(crate::models::UserRole::User), created_at: row.get("created_at"), updated_at: row.get("updated_at"), })), None => Ok(None), } } pub async fn get_user_by_id(&self, id: Uuid) -> Result> { let row = sqlx::query( "SELECT id, username, email, password_hash, role, created_at, updated_at FROM users WHERE id = $1" ) .bind(id) .fetch_optional(&self.pool) .await?; match row { Some(row) => Ok(Some(User { id: row.get("id"), username: row.get("username"), email: row.get("email"), password_hash: row.get("password_hash"), role: row.get::("role").try_into().unwrap_or(crate::models::UserRole::User), created_at: row.get("created_at"), updated_at: row.get("updated_at"), })), None => Ok(None), } } pub async fn create_document(&self, document: Document) -> Result { let row = sqlx::query( r#" INSERT INTO documents (id, filename, original_filename, file_path, file_size, mime_type, content, ocr_text, ocr_confidence, ocr_word_count, ocr_processing_time_ms, ocr_status, ocr_error, ocr_completed_at, tags, created_at, updated_at, user_id) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18) RETURNING id, filename, original_filename, file_path, file_size, mime_type, content, ocr_text, ocr_confidence, ocr_word_count, ocr_processing_time_ms, ocr_status, ocr_error, ocr_completed_at, tags, created_at, updated_at, user_id "# ) .bind(document.id) .bind(&document.filename) .bind(&document.original_filename) .bind(&document.file_path) .bind(document.file_size) .bind(&document.mime_type) .bind(&document.content) .bind(&document.ocr_text) .bind(document.ocr_confidence) .bind(document.ocr_word_count) .bind(document.ocr_processing_time_ms) .bind(&document.ocr_status) .bind(&document.ocr_error) .bind(document.ocr_completed_at) .bind(&document.tags) .bind(document.created_at) .bind(document.updated_at) .bind(document.user_id) .fetch_one(&self.pool) .await?; Ok(Document { id: row.get("id"), filename: row.get("filename"), original_filename: row.get("original_filename"), file_path: row.get("file_path"), file_size: row.get("file_size"), mime_type: row.get("mime_type"), content: row.get("content"), ocr_text: row.get("ocr_text"), ocr_confidence: row.get("ocr_confidence"), ocr_word_count: row.get("ocr_word_count"), ocr_processing_time_ms: row.get("ocr_processing_time_ms"), ocr_status: row.get("ocr_status"), ocr_error: row.get("ocr_error"), ocr_completed_at: row.get("ocr_completed_at"), tags: row.get("tags"), created_at: row.get("created_at"), updated_at: row.get("updated_at"), user_id: row.get("user_id"), }) } pub async fn get_documents_by_user_with_role(&self, user_id: Uuid, user_role: crate::models::UserRole, limit: i64, offset: i64) -> Result> { let query = if user_role == crate::models::UserRole::Admin { // Admins can see all documents r#" SELECT id, filename, original_filename, file_path, file_size, mime_type, content, ocr_text, ocr_confidence, ocr_word_count, ocr_processing_time_ms, ocr_status, ocr_error, ocr_completed_at, tags, created_at, updated_at, user_id FROM documents ORDER BY created_at DESC LIMIT $1 OFFSET $2 "# } else { // Regular users can only see their own documents r#" SELECT id, filename, original_filename, file_path, file_size, mime_type, content, ocr_text, ocr_confidence, ocr_word_count, ocr_processing_time_ms, ocr_status, ocr_error, ocr_completed_at, tags, created_at, updated_at, user_id FROM documents WHERE user_id = $3 ORDER BY created_at DESC LIMIT $1 OFFSET $2 "# }; let rows = if user_role == crate::models::UserRole::Admin { sqlx::query(query) .bind(limit) .bind(offset) .fetch_all(&self.pool) .await? } else { sqlx::query(query) .bind(limit) .bind(offset) .bind(user_id) .fetch_all(&self.pool) .await? }; let documents = rows .into_iter() .map(|row| Document { id: row.get("id"), filename: row.get("filename"), original_filename: row.get("original_filename"), file_path: row.get("file_path"), file_size: row.get("file_size"), mime_type: row.get("mime_type"), content: row.get("content"), ocr_text: row.get("ocr_text"), ocr_confidence: row.get("ocr_confidence"), ocr_word_count: row.get("ocr_word_count"), ocr_processing_time_ms: row.get("ocr_processing_time_ms"), ocr_status: row.get("ocr_status"), ocr_error: row.get("ocr_error"), ocr_completed_at: row.get("ocr_completed_at"), tags: row.get("tags"), created_at: row.get("created_at"), updated_at: row.get("updated_at"), user_id: row.get("user_id"), }) .collect(); Ok(documents) } pub async fn get_documents_by_user_with_role_and_filter(&self, user_id: Uuid, user_role: crate::models::UserRole, limit: i64, offset: i64, ocr_status: Option<&str>) -> Result> { let rows = match (user_role == crate::models::UserRole::Admin, ocr_status) { (true, Some(status)) => { // Admin with OCR filter sqlx::query( r#" SELECT id, filename, original_filename, file_path, file_size, mime_type, content, ocr_text, ocr_confidence, ocr_word_count, ocr_processing_time_ms, ocr_status, ocr_error, ocr_completed_at, tags, created_at, updated_at, user_id FROM documents WHERE ocr_status = $3 ORDER BY created_at DESC LIMIT $1 OFFSET $2 "# ) .bind(limit) .bind(offset) .bind(status) .fetch_all(&self.pool) .await? } (true, None) => { // Admin without OCR filter sqlx::query( r#" SELECT id, filename, original_filename, file_path, file_size, mime_type, content, ocr_text, ocr_confidence, ocr_word_count, ocr_processing_time_ms, ocr_status, ocr_error, ocr_completed_at, tags, created_at, updated_at, user_id FROM documents ORDER BY created_at DESC LIMIT $1 OFFSET $2 "# ) .bind(limit) .bind(offset) .fetch_all(&self.pool) .await? } (false, Some(status)) => { // Regular user with OCR filter sqlx::query( r#" SELECT id, filename, original_filename, file_path, file_size, mime_type, content, ocr_text, ocr_confidence, ocr_word_count, ocr_processing_time_ms, ocr_status, ocr_error, ocr_completed_at, tags, created_at, updated_at, user_id FROM documents WHERE user_id = $3 AND ocr_status = $4 ORDER BY created_at DESC LIMIT $1 OFFSET $2 "# ) .bind(limit) .bind(offset) .bind(user_id) .bind(status) .fetch_all(&self.pool) .await? } (false, None) => { // Regular user without OCR filter sqlx::query( r#" SELECT id, filename, original_filename, file_path, file_size, mime_type, content, ocr_text, ocr_confidence, ocr_word_count, ocr_processing_time_ms, ocr_status, ocr_error, ocr_completed_at, tags, created_at, updated_at, user_id FROM documents WHERE user_id = $3 ORDER BY created_at DESC LIMIT $1 OFFSET $2 "# ) .bind(limit) .bind(offset) .bind(user_id) .fetch_all(&self.pool) .await? } }; let documents = rows .into_iter() .map(|row| Document { id: row.get("id"), filename: row.get("filename"), original_filename: row.get("original_filename"), file_path: row.get("file_path"), file_size: row.get("file_size"), mime_type: row.get("mime_type"), content: row.get("content"), ocr_text: row.get("ocr_text"), ocr_confidence: row.get("ocr_confidence"), ocr_word_count: row.get("ocr_word_count"), ocr_processing_time_ms: row.get("ocr_processing_time_ms"), ocr_status: row.get("ocr_status"), ocr_error: row.get("ocr_error"), ocr_completed_at: row.get("ocr_completed_at"), tags: row.get("tags"), created_at: row.get("created_at"), updated_at: row.get("updated_at"), user_id: row.get("user_id"), }) .collect(); Ok(documents) } pub async fn get_documents_count_with_role_and_filter(&self, user_id: Uuid, user_role: crate::models::UserRole, ocr_status: Option<&str>) -> Result { let count = match (user_role == crate::models::UserRole::Admin, ocr_status) { (true, Some(status)) => { // Admin with OCR filter sqlx::query_scalar::<_, i64>( "SELECT COUNT(*) FROM documents WHERE ocr_status = $1" ) .bind(status) .fetch_one(&self.pool) .await? } (true, None) => { // Admin without OCR filter sqlx::query_scalar::<_, i64>( "SELECT COUNT(*) FROM documents" ) .fetch_one(&self.pool) .await? } (false, Some(status)) => { // Regular user with OCR filter sqlx::query_scalar::<_, i64>( "SELECT COUNT(*) FROM documents WHERE user_id = $1 AND ocr_status = $2" ) .bind(user_id) .bind(status) .fetch_one(&self.pool) .await? } (false, None) => { // Regular user without OCR filter sqlx::query_scalar::<_, i64>( "SELECT COUNT(*) FROM documents WHERE user_id = $1" ) .bind(user_id) .fetch_one(&self.pool) .await? } }; Ok(count) } pub async fn get_documents_by_user(&self, user_id: Uuid, limit: i64, offset: i64) -> Result> { let rows = sqlx::query( r#" SELECT id, filename, original_filename, file_path, file_size, mime_type, content, ocr_text, ocr_confidence, ocr_word_count, ocr_processing_time_ms, ocr_status, ocr_error, ocr_completed_at, tags, created_at, updated_at, user_id FROM documents WHERE user_id = $1 ORDER BY created_at DESC LIMIT $2 OFFSET $3 "# ) .bind(user_id) .bind(limit) .bind(offset) .fetch_all(&self.pool) .await?; let documents = rows .into_iter() .map(|row| Document { id: row.get("id"), filename: row.get("filename"), original_filename: row.get("original_filename"), file_path: row.get("file_path"), file_size: row.get("file_size"), mime_type: row.get("mime_type"), content: row.get("content"), ocr_text: row.get("ocr_text"), ocr_confidence: row.get("ocr_confidence"), ocr_word_count: row.get("ocr_word_count"), ocr_processing_time_ms: row.get("ocr_processing_time_ms"), ocr_status: row.get("ocr_status"), ocr_error: row.get("ocr_error"), ocr_completed_at: row.get("ocr_completed_at"), tags: row.get("tags"), created_at: row.get("created_at"), updated_at: row.get("updated_at"), user_id: row.get("user_id"), }) .collect(); Ok(documents) } pub async fn find_documents_by_filename(&self, filename: &str) -> Result> { let rows = sqlx::query( r#" SELECT id, filename, original_filename, file_path, file_size, mime_type, content, ocr_text, ocr_confidence, ocr_word_count, ocr_processing_time_ms, ocr_status, ocr_error, ocr_completed_at, tags, created_at, updated_at, user_id FROM documents WHERE filename = $1 OR original_filename = $1 ORDER BY created_at DESC "# ) .bind(filename) .fetch_all(&self.pool) .await?; let documents = rows .into_iter() .map(|row| Document { id: row.get("id"), filename: row.get("filename"), original_filename: row.get("original_filename"), file_path: row.get("file_path"), file_size: row.get("file_size"), mime_type: row.get("mime_type"), content: row.get("content"), ocr_text: row.get("ocr_text"), ocr_confidence: row.get("ocr_confidence"), ocr_word_count: row.get("ocr_word_count"), ocr_processing_time_ms: row.get("ocr_processing_time_ms"), ocr_status: row.get("ocr_status"), ocr_error: row.get("ocr_error"), ocr_completed_at: row.get("ocr_completed_at"), tags: row.get("tags"), created_at: row.get("created_at"), updated_at: row.get("updated_at"), user_id: row.get("user_id"), }) .collect(); Ok(documents) } pub async fn search_documents(&self, user_id: Uuid, search: SearchRequest) -> Result<(Vec, i64)> { let mut query_builder = sqlx::QueryBuilder::new( r#" SELECT id, filename, original_filename, file_path, file_size, mime_type, content, ocr_text, ocr_confidence, ocr_word_count, ocr_processing_time_ms, ocr_status, ocr_error, ocr_completed_at, tags, created_at, updated_at, user_id, ts_rank(to_tsvector('english', COALESCE(content, '') || ' ' || COALESCE(ocr_text, '')), plainto_tsquery('english', "# ); query_builder.push_bind(&search.query); query_builder.push(")) as rank FROM documents WHERE user_id = "); query_builder.push_bind(user_id); query_builder.push(" AND to_tsvector('english', COALESCE(content, '') || ' ' || COALESCE(ocr_text, '')) @@ plainto_tsquery('english', "); query_builder.push_bind(&search.query); query_builder.push(")"); if let Some(tags) = &search.tags { if !tags.is_empty() { query_builder.push(" AND tags && "); query_builder.push_bind(tags); } } if let Some(mime_types) = &search.mime_types { if !mime_types.is_empty() { query_builder.push(" AND mime_type = ANY("); query_builder.push_bind(mime_types); query_builder.push(")"); } } query_builder.push(" ORDER BY rank DESC, created_at DESC"); if let Some(limit) = search.limit { query_builder.push(" LIMIT "); query_builder.push_bind(limit); } if let Some(offset) = search.offset { query_builder.push(" OFFSET "); query_builder.push_bind(offset); } let rows = query_builder.build().fetch_all(&self.pool).await?; let documents = rows .into_iter() .map(|row| Document { id: row.get("id"), filename: row.get("filename"), original_filename: row.get("original_filename"), file_path: row.get("file_path"), file_size: row.get("file_size"), mime_type: row.get("mime_type"), content: row.get("content"), ocr_text: row.get("ocr_text"), ocr_confidence: row.get("ocr_confidence"), ocr_word_count: row.get("ocr_word_count"), ocr_processing_time_ms: row.get("ocr_processing_time_ms"), ocr_status: row.get("ocr_status"), ocr_error: row.get("ocr_error"), ocr_completed_at: row.get("ocr_completed_at"), tags: row.get("tags"), created_at: row.get("created_at"), updated_at: row.get("updated_at"), user_id: row.get("user_id"), }) .collect(); let total_row = sqlx::query( r#" SELECT COUNT(*) as total FROM documents WHERE user_id = $1 AND to_tsvector('english', COALESCE(content, '') || ' ' || COALESCE(ocr_text, '')) @@ plainto_tsquery('english', $2) "# ) .bind(user_id) .bind(&search.query) .fetch_one(&self.pool) .await?; let total: i64 = total_row.get("total"); Ok((documents, total)) } pub async fn enhanced_search_documents_with_role(&self, user_id: Uuid, user_role: crate::models::UserRole, search: SearchRequest) -> Result<(Vec, i64, u64)> { let start_time = std::time::Instant::now(); // Build search query based on search mode with enhanced substring matching let search_mode = search.search_mode.as_ref().unwrap_or(&SearchMode::Simple); // For fuzzy mode, we'll use similarity matching which is better for substrings let use_similarity = matches!(search_mode, SearchMode::Fuzzy); let user_filter = if user_role == crate::models::UserRole::Admin { // Admins can search all documents "" } else { // Regular users can only search their own documents " AND user_id = " }; let mut query_builder = if use_similarity { // Use trigram similarity for substring matching let mut builder = sqlx::QueryBuilder::new( r#" SELECT id, filename, original_filename, file_path, file_size, mime_type, content, ocr_text, ocr_confidence, ocr_word_count, ocr_processing_time_ms, ocr_status, ocr_error, ocr_completed_at, tags, created_at, updated_at, user_id, GREATEST( similarity(filename, "# ); builder.push_bind(&search.query); builder.push(r#"), similarity(COALESCE(content, '') || ' ' || COALESCE(ocr_text, ''), "#); builder.push_bind(&search.query); builder.push(r#"), ts_rank(to_tsvector('english', COALESCE(content, '') || ' ' || COALESCE(ocr_text, '')), plainto_tsquery('english', "#); builder.push_bind(&search.query); builder.push(r#")) ) as rank FROM documents WHERE ( filename % "#); builder.push_bind(&search.query); builder.push(r#" OR (COALESCE(content, '') || ' ' || COALESCE(ocr_text, '')) % "#); builder.push_bind(&search.query); builder.push(r#" OR to_tsvector('english', COALESCE(content, '') || ' ' || COALESCE(ocr_text, '')) @@ plainto_tsquery('english', "#); builder.push_bind(&search.query); builder.push(r#") )"#); if !user_filter.is_empty() { builder.push(user_filter); builder.push_bind(user_id); } builder } else { // Use traditional full-text search with enhanced ranking let query_function = match search_mode { SearchMode::Simple => "plainto_tsquery", SearchMode::Phrase => "phraseto_tsquery", SearchMode::Boolean => "to_tsquery", SearchMode::Fuzzy => "plainto_tsquery", // fallback }; let mut builder = sqlx::QueryBuilder::new(&format!( r#" SELECT id, filename, original_filename, file_path, file_size, mime_type, content, ocr_text, ocr_confidence, ocr_word_count, ocr_processing_time_ms, ocr_status, ocr_error, ocr_completed_at, tags, created_at, updated_at, user_id, GREATEST( CASE WHEN filename ILIKE '%' || "# )); builder.push_bind(&search.query); builder.push(&format!(r#" || '%' THEN 0.8 ELSE 0 END, ts_rank(to_tsvector('english', COALESCE(content, '') || ' ' || COALESCE(ocr_text, '')), {}('english', "#, query_function)); builder.push_bind(&search.query); builder.push(&format!(r#")) ) as rank FROM documents WHERE ( filename ILIKE '%' || "#)); builder.push_bind(&search.query); builder.push(&format!(r#" || '%' OR to_tsvector('english', COALESCE(content, '') || ' ' || COALESCE(ocr_text, '')) @@ {}('english', "#, query_function)); builder.push_bind(&search.query); builder.push(r#") )"#); if !user_filter.is_empty() { builder.push(user_filter); builder.push_bind(user_id); } builder }; if let Some(tags) = &search.tags { if !tags.is_empty() { query_builder.push(" AND tags && "); query_builder.push_bind(tags); } } if let Some(mime_types) = &search.mime_types { if !mime_types.is_empty() { query_builder.push(" AND mime_type = ANY("); query_builder.push_bind(mime_types); query_builder.push(")"); } } query_builder.push(" ORDER BY rank DESC, created_at DESC"); if let Some(limit) = search.limit { query_builder.push(" LIMIT "); query_builder.push_bind(limit); } if let Some(offset) = search.offset { query_builder.push(" OFFSET "); query_builder.push_bind(offset); } let rows = query_builder.build().fetch_all(&self.pool).await?; let include_snippets = search.include_snippets.unwrap_or(true); let snippet_length = search.snippet_length.unwrap_or(200); let mut documents = Vec::new(); for row in rows { let doc_id: Uuid = row.get("id"); let content: Option = row.get("content"); let ocr_text: Option = row.get("ocr_text"); let rank: f32 = row.get("rank"); let snippets = if include_snippets { self.generate_snippets(&search.query, content.as_deref(), ocr_text.as_deref(), snippet_length) } else { Vec::new() }; documents.push(EnhancedDocumentResponse { id: doc_id, filename: row.get("filename"), original_filename: row.get("original_filename"), file_size: row.get("file_size"), mime_type: row.get("mime_type"), tags: row.get("tags"), created_at: row.get("created_at"), has_ocr_text: ocr_text.is_some(), ocr_confidence: row.get("ocr_confidence"), ocr_word_count: row.get("ocr_word_count"), ocr_processing_time_ms: row.get("ocr_processing_time_ms"), ocr_status: row.get("ocr_status"), search_rank: Some(rank), snippets, }); } // Get the query function for total count let query_function = if use_similarity { "plainto_tsquery" } else { match search_mode { SearchMode::Simple => "plainto_tsquery", SearchMode::Phrase => "phraseto_tsquery", SearchMode::Boolean => "to_tsquery", SearchMode::Fuzzy => "plainto_tsquery", } }; let total_row = if user_role == crate::models::UserRole::Admin { sqlx::query(&format!( r#" SELECT COUNT(*) as total FROM documents WHERE to_tsvector('english', COALESCE(content, '') || ' ' || COALESCE(ocr_text, '')) @@ {}('english', $1) "#, query_function )) .bind(&search.query) .fetch_one(&self.pool) .await? } else { sqlx::query(&format!( r#" SELECT COUNT(*) as total FROM documents WHERE user_id = $1 AND to_tsvector('english', COALESCE(content, '') || ' ' || COALESCE(ocr_text, '')) @@ {}('english', $2) "#, query_function )) .bind(user_id) .bind(&search.query) .fetch_one(&self.pool) .await? }; let total: i64 = total_row.get("total"); let query_time = start_time.elapsed().as_millis() as u64; Ok((documents, total, query_time)) } pub async fn enhanced_search_documents(&self, user_id: Uuid, search: SearchRequest) -> Result<(Vec, i64, u64)> { let start_time = std::time::Instant::now(); // Build search query based on search mode with enhanced substring matching let search_mode = search.search_mode.as_ref().unwrap_or(&SearchMode::Simple); // For fuzzy mode, we'll use similarity matching which is better for substrings let use_similarity = matches!(search_mode, SearchMode::Fuzzy); let mut query_builder = if use_similarity { // Use trigram similarity for substring matching let mut builder = sqlx::QueryBuilder::new( r#" SELECT id, filename, original_filename, file_path, file_size, mime_type, content, ocr_text, ocr_confidence, ocr_word_count, ocr_processing_time_ms, ocr_status, ocr_error, ocr_completed_at, tags, created_at, updated_at, user_id, GREATEST( similarity(filename, "# ); builder.push_bind(&search.query); builder.push(r#"), similarity(COALESCE(content, '') || ' ' || COALESCE(ocr_text, ''), "#); builder.push_bind(&search.query); builder.push(r#"), ts_rank(to_tsvector('english', COALESCE(content, '') || ' ' || COALESCE(ocr_text, '')), plainto_tsquery('english', "#); builder.push_bind(&search.query); builder.push(r#")) ) as rank FROM documents WHERE user_id = "#); builder.push_bind(user_id); builder.push(r#" AND ( filename % "#); builder.push_bind(&search.query); builder.push(r#" OR (COALESCE(content, '') || ' ' || COALESCE(ocr_text, '')) % "#); builder.push_bind(&search.query); builder.push(r#" OR to_tsvector('english', COALESCE(content, '') || ' ' || COALESCE(ocr_text, '')) @@ plainto_tsquery('english', "#); builder.push_bind(&search.query); builder.push(r#") )"#); builder } else { // Use traditional full-text search with enhanced ranking let query_function = match search_mode { SearchMode::Simple => "plainto_tsquery", SearchMode::Phrase => "phraseto_tsquery", SearchMode::Boolean => "to_tsquery", SearchMode::Fuzzy => "plainto_tsquery", // fallback }; let mut builder = sqlx::QueryBuilder::new(&format!( r#" SELECT id, filename, original_filename, file_path, file_size, mime_type, content, ocr_text, ocr_confidence, ocr_word_count, ocr_processing_time_ms, ocr_status, ocr_error, ocr_completed_at, tags, created_at, updated_at, user_id, GREATEST( CASE WHEN filename ILIKE '%' || "# )); builder.push_bind(&search.query); builder.push(&format!(r#" || '%' THEN 0.8 ELSE 0 END, ts_rank(to_tsvector('english', COALESCE(content, '') || ' ' || COALESCE(ocr_text, '')), {}('english', "#, query_function)); builder.push_bind(&search.query); builder.push(&format!(r#")) ) as rank FROM documents WHERE user_id = "#)); builder.push_bind(user_id); builder.push(&format!(r#" AND ( filename ILIKE '%' || "#)); builder.push_bind(&search.query); builder.push(&format!(r#" || '%' OR to_tsvector('english', COALESCE(content, '') || ' ' || COALESCE(ocr_text, '')) @@ {}('english', "#, query_function)); builder.push_bind(&search.query); builder.push(r#") )"#); builder }; if let Some(tags) = &search.tags { if !tags.is_empty() { query_builder.push(" AND tags && "); query_builder.push_bind(tags); } } if let Some(mime_types) = &search.mime_types { if !mime_types.is_empty() { query_builder.push(" AND mime_type = ANY("); query_builder.push_bind(mime_types); query_builder.push(")"); } } query_builder.push(" ORDER BY rank DESC, created_at DESC"); if let Some(limit) = search.limit { query_builder.push(" LIMIT "); query_builder.push_bind(limit); } if let Some(offset) = search.offset { query_builder.push(" OFFSET "); query_builder.push_bind(offset); } let rows = query_builder.build().fetch_all(&self.pool).await?; let include_snippets = search.include_snippets.unwrap_or(true); let snippet_length = search.snippet_length.unwrap_or(200); let mut documents = Vec::new(); for row in rows { let doc_id: Uuid = row.get("id"); let content: Option = row.get("content"); let ocr_text: Option = row.get("ocr_text"); let rank: f32 = row.get("rank"); let snippets = if include_snippets { self.generate_snippets(&search.query, content.as_deref(), ocr_text.as_deref(), snippet_length) } else { Vec::new() }; documents.push(EnhancedDocumentResponse { id: doc_id, filename: row.get("filename"), original_filename: row.get("original_filename"), file_size: row.get("file_size"), mime_type: row.get("mime_type"), tags: row.get("tags"), created_at: row.get("created_at"), has_ocr_text: ocr_text.is_some(), ocr_confidence: row.get("ocr_confidence"), ocr_word_count: row.get("ocr_word_count"), ocr_processing_time_ms: row.get("ocr_processing_time_ms"), ocr_status: row.get("ocr_status"), search_rank: Some(rank), snippets, }); } // Get the query function for total count let query_function = if use_similarity { "plainto_tsquery" } else { match search_mode { SearchMode::Simple => "plainto_tsquery", SearchMode::Phrase => "phraseto_tsquery", SearchMode::Boolean => "to_tsquery", SearchMode::Fuzzy => "plainto_tsquery", } }; let total_row = sqlx::query(&format!( r#" SELECT COUNT(*) as total FROM documents WHERE user_id = $1 AND to_tsvector('english', COALESCE(content, '') || ' ' || COALESCE(ocr_text, '')) @@ {}('english', $2) "#, query_function )) .bind(user_id) .bind(&search.query) .fetch_one(&self.pool) .await?; let total: i64 = total_row.get("total"); let query_time = start_time.elapsed().as_millis() as u64; Ok((documents, total, query_time)) } fn generate_snippets(&self, query: &str, content: Option<&str>, ocr_text: Option<&str>, snippet_length: i32) -> Vec { let mut snippets = Vec::new(); // Combine content and OCR text let full_text = match (content, ocr_text) { (Some(c), Some(o)) => format!("{} {}", c, o), (Some(c), None) => c.to_string(), (None, Some(o)) => o.to_string(), (None, None) => return snippets, }; // Enhanced substring matching for better context let query_terms: Vec<&str> = query.split_whitespace().collect(); let text_lower = full_text.to_lowercase(); let query_lower = query.to_lowercase(); // Find exact matches first let mut match_positions = Vec::new(); // 1. Look for exact query matches for (i, _) in text_lower.match_indices(&query_lower) { match_positions.push((i, query.len(), "exact")); } // 2. Look for individual term matches (substring matching) for term in &query_terms { if term.len() >= 3 { // Only match terms of reasonable length let term_lower = term.to_lowercase(); for (i, _) in text_lower.match_indices(&term_lower) { // Check if this isn't already part of an exact match let is_duplicate = match_positions.iter().any(|(pos, len, _)| { i >= *pos && i < *pos + *len }); if !is_duplicate { match_positions.push((i, term.len(), "term")); } } } } // 3. Look for partial word matches (for "docu" -> "document" cases) for term in &query_terms { if term.len() >= 3 { let term_lower = term.to_lowercase(); // Find words that start with our search term let words_regex = regex::Regex::new(&format!(r"\b{}[a-zA-Z]*\b", regex::escape(&term_lower))).unwrap(); for mat in words_regex.find_iter(&text_lower) { let is_duplicate = match_positions.iter().any(|(pos, len, _)| { mat.start() >= *pos && mat.start() < *pos + *len }); if !is_duplicate { match_positions.push((mat.start(), mat.end() - mat.start(), "partial")); } } } } // Sort matches by position and remove overlaps match_positions.sort_by_key(|&(pos, _, _)| pos); // Generate snippets around matches for (match_pos, match_len, _match_type) in match_positions.iter().take(5) { let context_size = (snippet_length as usize).saturating_sub(*match_len) / 2; let snippet_start = match_pos.saturating_sub(context_size); let snippet_end = std::cmp::min( match_pos + match_len + context_size, full_text.len() ); // Find word boundaries to avoid cutting words let snippet_start = self.find_word_boundary(&full_text, snippet_start, true); let snippet_end = self.find_word_boundary(&full_text, snippet_end, false); if snippet_start < snippet_end && snippet_start < full_text.len() { let snippet_text = &full_text[snippet_start..snippet_end]; // Find all highlight ranges within this snippet let mut highlight_ranges = Vec::new(); let snippet_lower = snippet_text.to_lowercase(); // Highlight exact query match for (match_start, _) in snippet_lower.match_indices(&query_lower) { highlight_ranges.push(HighlightRange { start: match_start as i32, end: (match_start + query.len()) as i32, }); } // Highlight individual terms if no exact match if highlight_ranges.is_empty() { for term in &query_terms { if term.len() >= 3 { let term_lower = term.to_lowercase(); for (match_start, _) in snippet_lower.match_indices(&term_lower) { highlight_ranges.push(HighlightRange { start: match_start as i32, end: (match_start + term.len()) as i32, }); } } } } // Remove duplicate highlights and sort highlight_ranges.sort_by_key(|r| r.start); highlight_ranges.dedup_by_key(|r| r.start); snippets.push(SearchSnippet { text: snippet_text.to_string(), start_offset: snippet_start as i32, end_offset: snippet_end as i32, highlight_ranges, }); // Limit to avoid too many snippets if snippets.len() >= 3 { break; } } } snippets } fn find_word_boundary(&self, text: &str, mut pos: usize, search_backward: bool) -> usize { if pos >= text.len() { return text.len(); } let chars: Vec = text.chars().collect(); if search_backward { // Search backward for word boundary while pos > 0 && chars.get(pos.saturating_sub(1)).map_or(false, |c| c.is_alphanumeric()) { pos = pos.saturating_sub(1); } } else { // Search forward for word boundary while pos < chars.len() && chars.get(pos).map_or(false, |c| c.is_alphanumeric()) { pos += 1; } } // Convert back to byte position chars.iter().take(pos).map(|c| c.len_utf8()).sum() } pub async fn update_document_ocr(&self, id: Uuid, ocr_text: &str) -> Result<()> { sqlx::query("UPDATE documents SET ocr_text = $1, updated_at = NOW() WHERE id = $2") .bind(ocr_text) .bind(id) .execute(&self.pool) .await?; Ok(()) } pub async fn get_all_users(&self) -> Result> { let rows = sqlx::query( "SELECT id, username, email, password_hash, role, created_at, updated_at FROM users ORDER BY created_at DESC" ) .fetch_all(&self.pool) .await?; let users = rows .into_iter() .map(|row| User { id: row.get("id"), username: row.get("username"), email: row.get("email"), password_hash: row.get("password_hash"), role: row.get::("role").try_into().unwrap_or(crate::models::UserRole::User), created_at: row.get("created_at"), updated_at: row.get("updated_at"), }) .collect(); Ok(users) } pub async fn update_user(&self, id: Uuid, username: Option, email: Option, password: Option) -> Result { let user = self.get_user_by_id(id).await?.ok_or_else(|| anyhow::anyhow!("User not found"))?; let username = username.unwrap_or(user.username); let email = email.unwrap_or(user.email); let password_hash = if let Some(pwd) = password { bcrypt::hash(&pwd, 12)? } else { user.password_hash }; let row = sqlx::query( r#" UPDATE users SET username = $1, email = $2, password_hash = $3, updated_at = NOW() WHERE id = $4 RETURNING id, username, email, password_hash, role, created_at, updated_at "# ) .bind(&username) .bind(&email) .bind(&password_hash) .bind(id) .fetch_one(&self.pool) .await?; Ok(User { id: row.get("id"), username: row.get("username"), email: row.get("email"), password_hash: row.get("password_hash"), role: row.get::("role").try_into().unwrap_or(crate::models::UserRole::User), created_at: row.get("created_at"), updated_at: row.get("updated_at"), }) } pub async fn delete_user(&self, id: Uuid) -> Result<()> { sqlx::query("DELETE FROM users WHERE id = $1") .bind(id) .execute(&self.pool) .await?; Ok(()) } pub async fn get_user_settings(&self, user_id: Uuid) -> Result> { self.with_retry(|| async { let row = sqlx::query( r#"SELECT id, user_id, ocr_language, concurrent_ocr_jobs, ocr_timeout_seconds, max_file_size_mb, allowed_file_types, auto_rotate_images, enable_image_preprocessing, search_results_per_page, search_snippet_length, fuzzy_search_threshold, retention_days, enable_auto_cleanup, enable_compression, memory_limit_mb, cpu_priority, enable_background_ocr, ocr_page_segmentation_mode, ocr_engine_mode, ocr_min_confidence, ocr_dpi, ocr_enhance_contrast, ocr_remove_noise, ocr_detect_orientation, ocr_whitelist_chars, ocr_blacklist_chars, ocr_brightness_boost, ocr_contrast_multiplier, ocr_noise_reduction_level, ocr_sharpening_strength, ocr_morphological_operations, ocr_adaptive_threshold_window_size, ocr_histogram_equalization, ocr_upscale_factor, ocr_max_image_width, ocr_max_image_height, save_processed_images, ocr_quality_threshold_brightness, ocr_quality_threshold_contrast, ocr_quality_threshold_noise, ocr_quality_threshold_sharpness, ocr_skip_enhancement, webdav_enabled, webdav_server_url, webdav_username, webdav_password, webdav_watch_folders, webdav_file_extensions, webdav_auto_sync, webdav_sync_interval_minutes, created_at, updated_at FROM settings WHERE user_id = $1"# ) .bind(user_id) .fetch_optional(&self.pool) .await .map_err(|e| anyhow::anyhow!("Database query failed: {}", e))?; match row { Some(row) => Ok(Some(crate::models::Settings { id: row.get("id"), user_id: row.get("user_id"), ocr_language: row.get("ocr_language"), concurrent_ocr_jobs: row.get("concurrent_ocr_jobs"), ocr_timeout_seconds: row.get("ocr_timeout_seconds"), max_file_size_mb: row.get("max_file_size_mb"), allowed_file_types: row.get("allowed_file_types"), auto_rotate_images: row.get("auto_rotate_images"), enable_image_preprocessing: row.get("enable_image_preprocessing"), search_results_per_page: row.get("search_results_per_page"), search_snippet_length: row.get("search_snippet_length"), fuzzy_search_threshold: row.get("fuzzy_search_threshold"), retention_days: row.get("retention_days"), enable_auto_cleanup: row.get("enable_auto_cleanup"), enable_compression: row.get("enable_compression"), memory_limit_mb: row.get("memory_limit_mb"), cpu_priority: row.get("cpu_priority"), enable_background_ocr: row.get("enable_background_ocr"), ocr_page_segmentation_mode: row.get("ocr_page_segmentation_mode"), ocr_engine_mode: row.get("ocr_engine_mode"), ocr_min_confidence: row.get("ocr_min_confidence"), ocr_dpi: row.get("ocr_dpi"), ocr_enhance_contrast: row.get("ocr_enhance_contrast"), ocr_remove_noise: row.get("ocr_remove_noise"), ocr_detect_orientation: row.get("ocr_detect_orientation"), ocr_whitelist_chars: row.get("ocr_whitelist_chars"), ocr_blacklist_chars: row.get("ocr_blacklist_chars"), ocr_brightness_boost: row.get("ocr_brightness_boost"), ocr_contrast_multiplier: row.get("ocr_contrast_multiplier"), ocr_noise_reduction_level: row.get("ocr_noise_reduction_level"), ocr_sharpening_strength: row.get("ocr_sharpening_strength"), ocr_morphological_operations: row.get("ocr_morphological_operations"), ocr_adaptive_threshold_window_size: row.get("ocr_adaptive_threshold_window_size"), ocr_histogram_equalization: row.get("ocr_histogram_equalization"), ocr_upscale_factor: row.get("ocr_upscale_factor"), ocr_max_image_width: row.get("ocr_max_image_width"), ocr_max_image_height: row.get("ocr_max_image_height"), save_processed_images: row.get("save_processed_images"), ocr_quality_threshold_brightness: row.get("ocr_quality_threshold_brightness"), ocr_quality_threshold_contrast: row.get("ocr_quality_threshold_contrast"), ocr_quality_threshold_noise: row.get("ocr_quality_threshold_noise"), ocr_quality_threshold_sharpness: row.get("ocr_quality_threshold_sharpness"), ocr_skip_enhancement: row.get("ocr_skip_enhancement"), webdav_enabled: row.get("webdav_enabled"), webdav_server_url: row.get("webdav_server_url"), webdav_username: row.get("webdav_username"), webdav_password: row.get("webdav_password"), webdav_watch_folders: row.get("webdav_watch_folders"), webdav_file_extensions: row.get("webdav_file_extensions"), webdav_auto_sync: row.get("webdav_auto_sync"), webdav_sync_interval_minutes: row.get("webdav_sync_interval_minutes"), created_at: row.get("created_at"), updated_at: row.get("updated_at"), })), None => Ok(None), } }).await } pub async fn get_all_user_settings(&self) -> Result> { let rows = sqlx::query( r#"SELECT id, user_id, ocr_language, concurrent_ocr_jobs, ocr_timeout_seconds, max_file_size_mb, allowed_file_types, auto_rotate_images, enable_image_preprocessing, search_results_per_page, search_snippet_length, fuzzy_search_threshold, retention_days, enable_auto_cleanup, enable_compression, memory_limit_mb, cpu_priority, enable_background_ocr, ocr_page_segmentation_mode, ocr_engine_mode, ocr_min_confidence, ocr_dpi, ocr_enhance_contrast, ocr_remove_noise, ocr_detect_orientation, ocr_whitelist_chars, ocr_blacklist_chars, ocr_brightness_boost, ocr_contrast_multiplier, ocr_noise_reduction_level, ocr_sharpening_strength, ocr_morphological_operations, ocr_adaptive_threshold_window_size, ocr_histogram_equalization, ocr_upscale_factor, ocr_max_image_width, ocr_max_image_height, save_processed_images, ocr_quality_threshold_brightness, ocr_quality_threshold_contrast, ocr_quality_threshold_noise, ocr_quality_threshold_sharpness, ocr_skip_enhancement, webdav_enabled, webdav_server_url, webdav_username, webdav_password, webdav_watch_folders, webdav_file_extensions, webdav_auto_sync, webdav_sync_interval_minutes, created_at, updated_at FROM settings WHERE webdav_enabled = true AND webdav_auto_sync = true"# ) .fetch_all(&self.pool) .await?; let mut settings_list = Vec::new(); for row in rows { settings_list.push(crate::models::Settings { id: row.get("id"), user_id: row.get("user_id"), ocr_language: row.get("ocr_language"), concurrent_ocr_jobs: row.get("concurrent_ocr_jobs"), ocr_timeout_seconds: row.get("ocr_timeout_seconds"), max_file_size_mb: row.get("max_file_size_mb"), allowed_file_types: row.get("allowed_file_types"), auto_rotate_images: row.get("auto_rotate_images"), enable_image_preprocessing: row.get("enable_image_preprocessing"), search_results_per_page: row.get("search_results_per_page"), search_snippet_length: row.get("search_snippet_length"), fuzzy_search_threshold: row.get("fuzzy_search_threshold"), retention_days: row.get("retention_days"), enable_auto_cleanup: row.get("enable_auto_cleanup"), enable_compression: row.get("enable_compression"), memory_limit_mb: row.get("memory_limit_mb"), cpu_priority: row.get("cpu_priority"), enable_background_ocr: row.get("enable_background_ocr"), ocr_page_segmentation_mode: row.get("ocr_page_segmentation_mode"), ocr_engine_mode: row.get("ocr_engine_mode"), ocr_min_confidence: row.get("ocr_min_confidence"), ocr_dpi: row.get("ocr_dpi"), ocr_enhance_contrast: row.get("ocr_enhance_contrast"), ocr_remove_noise: row.get("ocr_remove_noise"), ocr_detect_orientation: row.get("ocr_detect_orientation"), ocr_whitelist_chars: row.get("ocr_whitelist_chars"), ocr_blacklist_chars: row.get("ocr_blacklist_chars"), ocr_brightness_boost: row.get("ocr_brightness_boost"), ocr_contrast_multiplier: row.get("ocr_contrast_multiplier"), ocr_noise_reduction_level: row.get("ocr_noise_reduction_level"), ocr_sharpening_strength: row.get("ocr_sharpening_strength"), ocr_morphological_operations: row.get("ocr_morphological_operations"), ocr_adaptive_threshold_window_size: row.get("ocr_adaptive_threshold_window_size"), ocr_histogram_equalization: row.get("ocr_histogram_equalization"), ocr_upscale_factor: row.get("ocr_upscale_factor"), ocr_max_image_width: row.get("ocr_max_image_width"), ocr_max_image_height: row.get("ocr_max_image_height"), save_processed_images: row.get("save_processed_images"), ocr_quality_threshold_brightness: row.get("ocr_quality_threshold_brightness"), ocr_quality_threshold_contrast: row.get("ocr_quality_threshold_contrast"), ocr_quality_threshold_noise: row.get("ocr_quality_threshold_noise"), ocr_quality_threshold_sharpness: row.get("ocr_quality_threshold_sharpness"), ocr_skip_enhancement: row.get("ocr_skip_enhancement"), webdav_enabled: row.get("webdav_enabled"), webdav_server_url: row.get("webdav_server_url"), webdav_username: row.get("webdav_username"), webdav_password: row.get("webdav_password"), webdav_watch_folders: row.get("webdav_watch_folders"), webdav_file_extensions: row.get("webdav_file_extensions"), webdav_auto_sync: row.get("webdav_auto_sync"), webdav_sync_interval_minutes: row.get("webdav_sync_interval_minutes"), created_at: row.get("created_at"), updated_at: row.get("updated_at"), }); } Ok(settings_list) } pub async fn create_or_update_settings(&self, user_id: Uuid, settings: &crate::models::UpdateSettings) -> Result { // Get existing settings to merge with updates let existing = self.get_user_settings(user_id).await?; let defaults = crate::models::Settings::default(); // Merge existing/defaults with updates let current = existing.unwrap_or_else(|| { let mut s = defaults; s.user_id = user_id; s }); let row = sqlx::query( r#" INSERT INTO settings ( user_id, ocr_language, concurrent_ocr_jobs, ocr_timeout_seconds, max_file_size_mb, allowed_file_types, auto_rotate_images, enable_image_preprocessing, search_results_per_page, search_snippet_length, fuzzy_search_threshold, retention_days, enable_auto_cleanup, enable_compression, memory_limit_mb, cpu_priority, enable_background_ocr, ocr_page_segmentation_mode, ocr_engine_mode, ocr_min_confidence, ocr_dpi, ocr_enhance_contrast, ocr_remove_noise, ocr_detect_orientation, ocr_whitelist_chars, ocr_blacklist_chars, ocr_brightness_boost, ocr_contrast_multiplier, ocr_noise_reduction_level, ocr_sharpening_strength, ocr_morphological_operations, ocr_adaptive_threshold_window_size, ocr_histogram_equalization, ocr_upscale_factor, ocr_max_image_width, ocr_max_image_height, save_processed_images, ocr_quality_threshold_brightness, ocr_quality_threshold_contrast, ocr_quality_threshold_noise, ocr_quality_threshold_sharpness, ocr_skip_enhancement, webdav_enabled, webdav_server_url, webdav_username, webdav_password, webdav_watch_folders, webdav_file_extensions, webdav_auto_sync, webdav_sync_interval_minutes ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25, $26, $27, $28, $29, $30, $31, $32, $33, $34, $35, $36, $37, $38, $39, $40, $41, $42, $43, $44, $45, $46, $47, $48, $49, $50) ON CONFLICT (user_id) DO UPDATE SET ocr_language = $2, concurrent_ocr_jobs = $3, ocr_timeout_seconds = $4, max_file_size_mb = $5, allowed_file_types = $6, auto_rotate_images = $7, enable_image_preprocessing = $8, search_results_per_page = $9, search_snippet_length = $10, fuzzy_search_threshold = $11, retention_days = $12, enable_auto_cleanup = $13, enable_compression = $14, memory_limit_mb = $15, cpu_priority = $16, enable_background_ocr = $17, ocr_page_segmentation_mode = $18, ocr_engine_mode = $19, ocr_min_confidence = $20, ocr_dpi = $21, ocr_enhance_contrast = $22, ocr_remove_noise = $23, ocr_detect_orientation = $24, ocr_whitelist_chars = $25, ocr_blacklist_chars = $26, ocr_brightness_boost = $27, ocr_contrast_multiplier = $28, ocr_noise_reduction_level = $29, ocr_sharpening_strength = $30, ocr_morphological_operations = $31, ocr_adaptive_threshold_window_size = $32, ocr_histogram_equalization = $33, ocr_upscale_factor = $34, ocr_max_image_width = $35, ocr_max_image_height = $36, save_processed_images = $37, ocr_quality_threshold_brightness = $38, ocr_quality_threshold_contrast = $39, ocr_quality_threshold_noise = $40, ocr_quality_threshold_sharpness = $41, ocr_skip_enhancement = $42, webdav_enabled = $43, webdav_server_url = $44, webdav_username = $45, webdav_password = $46, webdav_watch_folders = $47, webdav_file_extensions = $48, webdav_auto_sync = $49, webdav_sync_interval_minutes = $50, updated_at = NOW() RETURNING id, user_id, ocr_language, concurrent_ocr_jobs, ocr_timeout_seconds, max_file_size_mb, allowed_file_types, auto_rotate_images, enable_image_preprocessing, search_results_per_page, search_snippet_length, fuzzy_search_threshold, retention_days, enable_auto_cleanup, enable_compression, memory_limit_mb, cpu_priority, enable_background_ocr, ocr_page_segmentation_mode, ocr_engine_mode, ocr_min_confidence, ocr_dpi, ocr_enhance_contrast, ocr_remove_noise, ocr_detect_orientation, ocr_whitelist_chars, ocr_blacklist_chars, ocr_brightness_boost, ocr_contrast_multiplier, ocr_noise_reduction_level, ocr_sharpening_strength, ocr_morphological_operations, ocr_adaptive_threshold_window_size, ocr_histogram_equalization, ocr_upscale_factor, ocr_max_image_width, ocr_max_image_height, save_processed_images, ocr_quality_threshold_brightness, ocr_quality_threshold_contrast, ocr_quality_threshold_noise, ocr_quality_threshold_sharpness, ocr_skip_enhancement, webdav_enabled, webdav_server_url, webdav_username, webdav_password, webdav_watch_folders, webdav_file_extensions, webdav_auto_sync, webdav_sync_interval_minutes, created_at, updated_at "# ) .bind(user_id) .bind(settings.ocr_language.as_ref().unwrap_or(¤t.ocr_language)) .bind(settings.concurrent_ocr_jobs.unwrap_or(current.concurrent_ocr_jobs)) .bind(settings.ocr_timeout_seconds.unwrap_or(current.ocr_timeout_seconds)) .bind(settings.max_file_size_mb.unwrap_or(current.max_file_size_mb)) .bind(settings.allowed_file_types.as_ref().unwrap_or(¤t.allowed_file_types)) .bind(settings.auto_rotate_images.unwrap_or(current.auto_rotate_images)) .bind(settings.enable_image_preprocessing.unwrap_or(current.enable_image_preprocessing)) .bind(settings.search_results_per_page.unwrap_or(current.search_results_per_page)) .bind(settings.search_snippet_length.unwrap_or(current.search_snippet_length)) .bind(settings.fuzzy_search_threshold.unwrap_or(current.fuzzy_search_threshold)) .bind(settings.retention_days.unwrap_or(current.retention_days)) .bind(settings.enable_auto_cleanup.unwrap_or(current.enable_auto_cleanup)) .bind(settings.enable_compression.unwrap_or(current.enable_compression)) .bind(settings.memory_limit_mb.unwrap_or(current.memory_limit_mb)) .bind(settings.cpu_priority.as_ref().unwrap_or(¤t.cpu_priority)) .bind(settings.enable_background_ocr.unwrap_or(current.enable_background_ocr)) .bind(settings.ocr_page_segmentation_mode.unwrap_or(current.ocr_page_segmentation_mode)) .bind(settings.ocr_engine_mode.unwrap_or(current.ocr_engine_mode)) .bind(settings.ocr_min_confidence.unwrap_or(current.ocr_min_confidence)) .bind(settings.ocr_dpi.unwrap_or(current.ocr_dpi)) .bind(settings.ocr_enhance_contrast.unwrap_or(current.ocr_enhance_contrast)) .bind(settings.ocr_remove_noise.unwrap_or(current.ocr_remove_noise)) .bind(settings.ocr_detect_orientation.unwrap_or(current.ocr_detect_orientation)) .bind(settings.ocr_whitelist_chars.as_ref().unwrap_or(¤t.ocr_whitelist_chars)) .bind(settings.ocr_blacklist_chars.as_ref().unwrap_or(¤t.ocr_blacklist_chars)) .bind(settings.ocr_brightness_boost.unwrap_or(current.ocr_brightness_boost)) .bind(settings.ocr_contrast_multiplier.unwrap_or(current.ocr_contrast_multiplier)) .bind(settings.ocr_noise_reduction_level.unwrap_or(current.ocr_noise_reduction_level)) .bind(settings.ocr_sharpening_strength.unwrap_or(current.ocr_sharpening_strength)) .bind(settings.ocr_morphological_operations.unwrap_or(current.ocr_morphological_operations)) .bind(settings.ocr_adaptive_threshold_window_size.unwrap_or(current.ocr_adaptive_threshold_window_size)) .bind(settings.ocr_histogram_equalization.unwrap_or(current.ocr_histogram_equalization)) .bind(settings.ocr_upscale_factor.unwrap_or(current.ocr_upscale_factor)) .bind(settings.ocr_max_image_width.unwrap_or(current.ocr_max_image_width)) .bind(settings.ocr_max_image_height.unwrap_or(current.ocr_max_image_height)) .bind(settings.save_processed_images.unwrap_or(current.save_processed_images)) .bind(settings.ocr_quality_threshold_brightness.unwrap_or(current.ocr_quality_threshold_brightness)) .bind(settings.ocr_quality_threshold_contrast.unwrap_or(current.ocr_quality_threshold_contrast)) .bind(settings.ocr_quality_threshold_noise.unwrap_or(current.ocr_quality_threshold_noise)) .bind(settings.ocr_quality_threshold_sharpness.unwrap_or(current.ocr_quality_threshold_sharpness)) .bind(settings.ocr_skip_enhancement.unwrap_or(current.ocr_skip_enhancement)) .bind(settings.webdav_enabled.unwrap_or(current.webdav_enabled)) .bind(settings.webdav_server_url.as_ref().unwrap_or(¤t.webdav_server_url)) .bind(settings.webdav_username.as_ref().unwrap_or(¤t.webdav_username)) .bind(settings.webdav_password.as_ref().unwrap_or(¤t.webdav_password)) .bind(settings.webdav_watch_folders.as_ref().unwrap_or(¤t.webdav_watch_folders)) .bind(settings.webdav_file_extensions.as_ref().unwrap_or(¤t.webdav_file_extensions)) .bind(settings.webdav_auto_sync.unwrap_or(current.webdav_auto_sync)) .bind(settings.webdav_sync_interval_minutes.unwrap_or(current.webdav_sync_interval_minutes)) .fetch_one(&self.pool) .await?; Ok(crate::models::Settings { id: row.get("id"), user_id: row.get("user_id"), ocr_language: row.get("ocr_language"), concurrent_ocr_jobs: row.get("concurrent_ocr_jobs"), ocr_timeout_seconds: row.get("ocr_timeout_seconds"), max_file_size_mb: row.get("max_file_size_mb"), allowed_file_types: row.get("allowed_file_types"), auto_rotate_images: row.get("auto_rotate_images"), enable_image_preprocessing: row.get("enable_image_preprocessing"), search_results_per_page: row.get("search_results_per_page"), search_snippet_length: row.get("search_snippet_length"), fuzzy_search_threshold: row.get("fuzzy_search_threshold"), retention_days: row.get("retention_days"), enable_auto_cleanup: row.get("enable_auto_cleanup"), enable_compression: row.get("enable_compression"), memory_limit_mb: row.get("memory_limit_mb"), cpu_priority: row.get("cpu_priority"), enable_background_ocr: row.get("enable_background_ocr"), ocr_page_segmentation_mode: row.get("ocr_page_segmentation_mode"), ocr_engine_mode: row.get("ocr_engine_mode"), ocr_min_confidence: row.get("ocr_min_confidence"), ocr_dpi: row.get("ocr_dpi"), ocr_enhance_contrast: row.get("ocr_enhance_contrast"), ocr_remove_noise: row.get("ocr_remove_noise"), ocr_detect_orientation: row.get("ocr_detect_orientation"), ocr_whitelist_chars: row.get("ocr_whitelist_chars"), ocr_blacklist_chars: row.get("ocr_blacklist_chars"), ocr_brightness_boost: row.get("ocr_brightness_boost"), ocr_contrast_multiplier: row.get("ocr_contrast_multiplier"), ocr_noise_reduction_level: row.get("ocr_noise_reduction_level"), ocr_sharpening_strength: row.get("ocr_sharpening_strength"), ocr_morphological_operations: row.get("ocr_morphological_operations"), ocr_adaptive_threshold_window_size: row.get("ocr_adaptive_threshold_window_size"), ocr_histogram_equalization: row.get("ocr_histogram_equalization"), ocr_upscale_factor: row.get("ocr_upscale_factor"), ocr_max_image_width: row.get("ocr_max_image_width"), ocr_max_image_height: row.get("ocr_max_image_height"), save_processed_images: row.get("save_processed_images"), ocr_quality_threshold_brightness: row.get("ocr_quality_threshold_brightness"), ocr_quality_threshold_contrast: row.get("ocr_quality_threshold_contrast"), ocr_quality_threshold_noise: row.get("ocr_quality_threshold_noise"), ocr_quality_threshold_sharpness: row.get("ocr_quality_threshold_sharpness"), ocr_skip_enhancement: row.get("ocr_skip_enhancement"), webdav_enabled: row.get("webdav_enabled"), webdav_server_url: row.get("webdav_server_url"), webdav_username: row.get("webdav_username"), webdav_password: row.get("webdav_password"), webdav_watch_folders: row.get("webdav_watch_folders"), webdav_file_extensions: row.get("webdav_file_extensions"), webdav_auto_sync: row.get("webdav_auto_sync"), webdav_sync_interval_minutes: row.get("webdav_sync_interval_minutes"), created_at: row.get("created_at"), updated_at: row.get("updated_at"), }) } // Notification methods pub async fn create_notification(&self, user_id: Uuid, notification: &crate::models::CreateNotification) -> Result { self.with_retry(|| async { let row = sqlx::query( r#"INSERT INTO notifications (user_id, notification_type, title, message, action_url, metadata) VALUES ($1, $2, $3, $4, $5, $6) RETURNING id, user_id, notification_type, title, message, read, action_url, metadata, created_at"# ) .bind(user_id) .bind(¬ification.notification_type) .bind(¬ification.title) .bind(¬ification.message) .bind(¬ification.action_url) .bind(¬ification.metadata) .fetch_one(&self.pool) .await .map_err(|e| anyhow::anyhow!("Database insert failed: {}", e))?; Ok(crate::models::Notification { id: row.get("id"), user_id: row.get("user_id"), notification_type: row.get("notification_type"), title: row.get("title"), message: row.get("message"), read: row.get("read"), action_url: row.get("action_url"), metadata: row.get("metadata"), created_at: row.get("created_at"), }) }).await } pub async fn get_user_notifications(&self, user_id: Uuid, limit: i64, offset: i64) -> Result> { let rows = sqlx::query( r#"SELECT id, user_id, notification_type, title, message, read, action_url, metadata, created_at FROM notifications WHERE user_id = $1 ORDER BY created_at DESC LIMIT $2 OFFSET $3"# ) .bind(user_id) .bind(limit) .bind(offset) .fetch_all(&self.pool) .await?; let mut notifications = Vec::new(); for row in rows { notifications.push(crate::models::Notification { id: row.get("id"), user_id: row.get("user_id"), notification_type: row.get("notification_type"), title: row.get("title"), message: row.get("message"), read: row.get("read"), action_url: row.get("action_url"), metadata: row.get("metadata"), created_at: row.get("created_at"), }); } Ok(notifications) } pub async fn get_unread_notification_count(&self, user_id: Uuid) -> Result { let row = sqlx::query("SELECT COUNT(*) as count FROM notifications WHERE user_id = $1 AND read = false") .bind(user_id) .fetch_one(&self.pool) .await?; Ok(row.get("count")) } pub async fn mark_notification_read(&self, user_id: Uuid, notification_id: Uuid) -> Result<()> { sqlx::query("UPDATE notifications SET read = true WHERE id = $1 AND user_id = $2") .bind(notification_id) .bind(user_id) .execute(&self.pool) .await?; Ok(()) } pub async fn mark_all_notifications_read(&self, user_id: Uuid) -> Result<()> { sqlx::query("UPDATE notifications SET read = true WHERE user_id = $1 AND read = false") .bind(user_id) .execute(&self.pool) .await?; Ok(()) } pub async fn delete_notification(&self, user_id: Uuid, notification_id: Uuid) -> Result<()> { sqlx::query("DELETE FROM notifications WHERE id = $1 AND user_id = $2") .bind(notification_id) .bind(user_id) .execute(&self.pool) .await?; Ok(()) } pub async fn get_notification_summary(&self, user_id: Uuid) -> Result { let unread_count = self.get_unread_notification_count(user_id).await?; let recent_notifications = self.get_user_notifications(user_id, 5, 0).await?; Ok(crate::models::NotificationSummary { unread_count, recent_notifications, }) } // WebDAV sync state operations pub async fn get_webdav_sync_state(&self, user_id: Uuid) -> Result> { self.with_retry(|| async { let row = sqlx::query( r#"SELECT id, user_id, last_sync_at, sync_cursor, is_running, files_processed, files_remaining, current_folder, errors, created_at, updated_at FROM webdav_sync_state WHERE user_id = $1"# ) .bind(user_id) .fetch_optional(&self.pool) .await .map_err(|e| anyhow::anyhow!("Database query failed: {}", e))?; match row { Some(row) => Ok(Some(crate::models::WebDAVSyncState { id: row.get("id"), user_id: row.get("user_id"), last_sync_at: row.get("last_sync_at"), sync_cursor: row.get("sync_cursor"), is_running: row.get("is_running"), files_processed: row.get("files_processed"), files_remaining: row.get("files_remaining"), current_folder: row.get("current_folder"), errors: row.get("errors"), created_at: row.get("created_at"), updated_at: row.get("updated_at"), })), None => Ok(None), } }).await } pub async fn update_webdav_sync_state(&self, user_id: Uuid, state: &crate::models::UpdateWebDAVSyncState) -> Result<()> { self.with_retry(|| async { sqlx::query( r#"INSERT INTO webdav_sync_state (user_id, last_sync_at, sync_cursor, is_running, files_processed, files_remaining, current_folder, errors, updated_at) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, NOW()) ON CONFLICT (user_id) DO UPDATE SET last_sync_at = EXCLUDED.last_sync_at, sync_cursor = EXCLUDED.sync_cursor, is_running = EXCLUDED.is_running, files_processed = EXCLUDED.files_processed, files_remaining = EXCLUDED.files_remaining, current_folder = EXCLUDED.current_folder, errors = EXCLUDED.errors, updated_at = NOW()"# ) .bind(user_id) .bind(state.last_sync_at) .bind(&state.sync_cursor) .bind(state.is_running) .bind(state.files_processed) .bind(state.files_remaining) .bind(&state.current_folder) .bind(&state.errors) .execute(&self.pool) .await .map_err(|e| anyhow::anyhow!("Database update failed: {}", e))?; Ok(()) }).await } // Reset any running WebDAV syncs on startup (handles server restart during sync) pub async fn reset_running_webdav_syncs(&self) -> Result { let result = sqlx::query( r#"UPDATE webdav_sync_state SET is_running = false, current_folder = NULL, errors = CASE WHEN array_length(errors, 1) IS NULL OR array_length(errors, 1) = 0 THEN ARRAY['Sync interrupted by server restart'] ELSE array_append(errors, 'Sync interrupted by server restart') END, updated_at = NOW() WHERE is_running = true"# ) .execute(&self.pool) .await?; Ok(result.rows_affected() as i64) } // WebDAV file tracking operations pub async fn get_webdav_file_by_path(&self, user_id: Uuid, webdav_path: &str) -> Result> { let row = sqlx::query( r#"SELECT id, user_id, webdav_path, etag, last_modified, file_size, mime_type, document_id, sync_status, sync_error, created_at, updated_at FROM webdav_files WHERE user_id = $1 AND webdav_path = $2"# ) .bind(user_id) .bind(webdav_path) .fetch_optional(&self.pool) .await?; match row { Some(row) => Ok(Some(crate::models::WebDAVFile { id: row.get("id"), user_id: row.get("user_id"), webdav_path: row.get("webdav_path"), etag: row.get("etag"), last_modified: row.get("last_modified"), file_size: row.get("file_size"), mime_type: row.get("mime_type"), document_id: row.get("document_id"), sync_status: row.get("sync_status"), sync_error: row.get("sync_error"), created_at: row.get("created_at"), updated_at: row.get("updated_at"), })), None => Ok(None), } } pub async fn create_or_update_webdav_file(&self, file: &crate::models::CreateWebDAVFile) -> Result { let row = sqlx::query( r#"INSERT INTO webdav_files (user_id, webdav_path, etag, last_modified, file_size, mime_type, document_id, sync_status, sync_error) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) ON CONFLICT (user_id, webdav_path) DO UPDATE SET etag = EXCLUDED.etag, last_modified = EXCLUDED.last_modified, file_size = EXCLUDED.file_size, mime_type = EXCLUDED.mime_type, document_id = EXCLUDED.document_id, sync_status = EXCLUDED.sync_status, sync_error = EXCLUDED.sync_error, updated_at = NOW() RETURNING id, user_id, webdav_path, etag, last_modified, file_size, mime_type, document_id, sync_status, sync_error, created_at, updated_at"# ) .bind(file.user_id) .bind(&file.webdav_path) .bind(&file.etag) .bind(file.last_modified) .bind(file.file_size) .bind(&file.mime_type) .bind(file.document_id) .bind(&file.sync_status) .bind(&file.sync_error) .fetch_one(&self.pool) .await?; Ok(crate::models::WebDAVFile { id: row.get("id"), user_id: row.get("user_id"), webdav_path: row.get("webdav_path"), etag: row.get("etag"), last_modified: row.get("last_modified"), file_size: row.get("file_size"), mime_type: row.get("mime_type"), document_id: row.get("document_id"), sync_status: row.get("sync_status"), sync_error: row.get("sync_error"), created_at: row.get("created_at"), updated_at: row.get("updated_at"), }) } pub async fn get_pending_webdav_files(&self, user_id: Uuid, limit: i64) -> Result> { let rows = sqlx::query( r#"SELECT id, user_id, webdav_path, etag, last_modified, file_size, mime_type, document_id, sync_status, sync_error, created_at, updated_at FROM webdav_files WHERE user_id = $1 AND sync_status = 'pending' ORDER BY created_at ASC LIMIT $2"# ) .bind(user_id) .bind(limit) .fetch_all(&self.pool) .await?; let mut files = Vec::new(); for row in rows { files.push(crate::models::WebDAVFile { id: row.get("id"), user_id: row.get("user_id"), webdav_path: row.get("webdav_path"), etag: row.get("etag"), last_modified: row.get("last_modified"), file_size: row.get("file_size"), mime_type: row.get("mime_type"), document_id: row.get("document_id"), sync_status: row.get("sync_status"), sync_error: row.get("sync_error"), created_at: row.get("created_at"), updated_at: row.get("updated_at"), }); } Ok(files) } // Sources methods pub async fn create_source(&self, user_id: Uuid, source: &crate::models::CreateSource) -> Result { let id = Uuid::new_v4(); let now = Utc::now(); let row = sqlx::query( r#"INSERT INTO sources (id, user_id, name, source_type, enabled, config, status, created_at, updated_at) VALUES ($1, $2, $3, $4, $5, $6, 'idle', $7, $8) RETURNING *"# ) .bind(id) .bind(user_id) .bind(&source.name) .bind(source.source_type.to_string()) .bind(source.enabled.unwrap_or(true)) .bind(&source.config) .bind(now) .bind(now) .fetch_one(&self.pool) .await?; Ok(crate::models::Source { id: row.get("id"), user_id: row.get("user_id"), name: row.get("name"), source_type: row.get::("source_type").try_into().map_err(|e: String| anyhow::anyhow!(e))?, enabled: row.get("enabled"), config: row.get("config"), status: row.get::("status").try_into().map_err(|e: String| anyhow::anyhow!(e))?, last_sync_at: row.get("last_sync_at"), last_error: row.get("last_error"), last_error_at: row.get("last_error_at"), total_files_synced: row.get("total_files_synced"), total_files_pending: row.get("total_files_pending"), total_size_bytes: row.get("total_size_bytes"), created_at: row.get("created_at"), updated_at: row.get("updated_at"), }) } pub async fn get_source(&self, user_id: Uuid, source_id: Uuid) -> Result> { let row = sqlx::query( r#"SELECT * FROM sources WHERE id = $1 AND user_id = $2"# ) .bind(source_id) .bind(user_id) .fetch_optional(&self.pool) .await?; match row { Some(row) => Ok(Some(crate::models::Source { id: row.get("id"), user_id: row.get("user_id"), name: row.get("name"), source_type: row.get::("source_type").try_into().map_err(|e: String| anyhow::anyhow!(e))?, enabled: row.get("enabled"), config: row.get("config"), status: row.get::("status").try_into().map_err(|e: String| anyhow::anyhow!(e))?, last_sync_at: row.get("last_sync_at"), last_error: row.get("last_error"), last_error_at: row.get("last_error_at"), total_files_synced: row.get("total_files_synced"), total_files_pending: row.get("total_files_pending"), total_size_bytes: row.get("total_size_bytes"), created_at: row.get("created_at"), updated_at: row.get("updated_at"), })), None => Ok(None), } } pub async fn get_sources(&self, user_id: Uuid) -> Result> { let rows = sqlx::query( r#"SELECT * FROM sources WHERE user_id = $1 ORDER BY created_at DESC"# ) .bind(user_id) .fetch_all(&self.pool) .await?; let mut sources = Vec::new(); for row in rows { sources.push(crate::models::Source { id: row.get("id"), user_id: row.get("user_id"), name: row.get("name"), source_type: row.get::("source_type").try_into().map_err(|e: String| anyhow::anyhow!(e))?, enabled: row.get("enabled"), config: row.get("config"), status: row.get::("status").try_into().map_err(|e: String| anyhow::anyhow!(e))?, last_sync_at: row.get("last_sync_at"), last_error: row.get("last_error"), last_error_at: row.get("last_error_at"), total_files_synced: row.get("total_files_synced"), total_files_pending: row.get("total_files_pending"), total_size_bytes: row.get("total_size_bytes"), created_at: row.get("created_at"), updated_at: row.get("updated_at"), }); } Ok(sources) } pub async fn update_source(&self, user_id: Uuid, source_id: Uuid, update: &crate::models::UpdateSource) -> Result { let mut query = String::from("UPDATE sources SET updated_at = NOW()"); let mut bind_count = 1; if update.name.is_some() { bind_count += 1; query.push_str(&format!(", name = ${}", bind_count)); } if update.enabled.is_some() { bind_count += 1; query.push_str(&format!(", enabled = ${}", bind_count)); } if update.config.is_some() { bind_count += 1; query.push_str(&format!(", config = ${}", bind_count)); } bind_count += 1; query.push_str(&format!(" WHERE id = ${}", bind_count)); bind_count += 1; query.push_str(&format!(" AND user_id = ${} RETURNING *", bind_count)); let mut query_builder = sqlx::query(&query); // Bind values in order if let Some(name) = &update.name { query_builder = query_builder.bind(name); } if let Some(enabled) = &update.enabled { query_builder = query_builder.bind(enabled); } if let Some(config) = &update.config { query_builder = query_builder.bind(config); } query_builder = query_builder.bind(source_id); query_builder = query_builder.bind(user_id); let row = query_builder.fetch_one(&self.pool).await?; Ok(crate::models::Source { id: row.get("id"), user_id: row.get("user_id"), name: row.get("name"), source_type: row.get::("source_type").try_into().map_err(|e: String| anyhow::anyhow!(e))?, enabled: row.get("enabled"), config: row.get("config"), status: row.get::("status").try_into().map_err(|e: String| anyhow::anyhow!(e))?, last_sync_at: row.get("last_sync_at"), last_error: row.get("last_error"), last_error_at: row.get("last_error_at"), total_files_synced: row.get("total_files_synced"), total_files_pending: row.get("total_files_pending"), total_size_bytes: row.get("total_size_bytes"), created_at: row.get("created_at"), updated_at: row.get("updated_at"), }) } pub async fn delete_source(&self, user_id: Uuid, source_id: Uuid) -> Result { let result = sqlx::query( r#"DELETE FROM sources WHERE id = $1 AND user_id = $2"# ) .bind(source_id) .bind(user_id) .execute(&self.pool) .await?; Ok(result.rows_affected() > 0) } pub async fn update_source_status(&self, source_id: Uuid, status: crate::models::SourceStatus, error: Option) -> Result<()> { if let Some(error_msg) = error { sqlx::query( r#"UPDATE sources SET status = $1, last_error = $2, last_error_at = NOW(), updated_at = NOW() WHERE id = $3"# ) .bind(status.to_string()) .bind(error_msg) .bind(source_id) .execute(&self.pool) .await?; } else { sqlx::query( r#"UPDATE sources SET status = $1, updated_at = NOW() WHERE id = $2"# ) .bind(status.to_string()) .bind(source_id) .execute(&self.pool) .await?; } Ok(()) } pub async fn update_source_sync_stats(&self, source_id: Uuid, files_synced: i64, files_pending: i64, size_bytes: i64) -> Result<()> { sqlx::query( r#"UPDATE sources SET total_files_synced = $1, total_files_pending = $2, total_size_bytes = $3, last_sync_at = NOW(), updated_at = NOW() WHERE id = $4"# ) .bind(files_synced) .bind(files_pending) .bind(size_bytes) .bind(source_id) .execute(&self.pool) .await?; Ok(()) } pub async fn get_recent_documents_for_source(&self, source_id: Uuid, limit: i64) -> Result> { let rows = sqlx::query( r#"SELECT * FROM documents WHERE source_id = $1 ORDER BY created_at DESC LIMIT $2"# ) .bind(source_id) .bind(limit) .fetch_all(&self.pool) .await?; let mut documents = Vec::new(); for row in rows { documents.push(Document { id: row.get("id"), filename: row.get("filename"), original_filename: row.get("original_filename"), file_path: row.get("file_path"), file_size: row.get("file_size"), mime_type: row.get("mime_type"), content: row.get("content"), ocr_text: row.get("ocr_text"), ocr_confidence: row.get("ocr_confidence"), ocr_word_count: row.get("ocr_word_count"), ocr_processing_time_ms: row.get("ocr_processing_time_ms"), ocr_status: row.get("ocr_status"), ocr_error: row.get("ocr_error"), ocr_completed_at: row.get("ocr_completed_at"), tags: row.get("tags"), created_at: row.get("created_at"), updated_at: row.get("updated_at"), user_id: row.get("user_id"), }); } Ok(documents) } // Source management operations pub async fn get_all_sources(&self) -> Result> { let rows = sqlx::query( r#"SELECT id, user_id, name, source_type, enabled, config, status, last_sync_at, last_error, last_error_at, total_files_synced, total_files_pending, total_size_bytes, created_at, updated_at FROM sources ORDER BY created_at DESC"# ) .fetch_all(&self.pool) .await?; let mut sources = Vec::new(); for row in rows { sources.push(crate::models::Source { id: row.get("id"), user_id: row.get("user_id"), name: row.get("name"), source_type: row.get::("source_type").try_into() .map_err(|e| anyhow::anyhow!("Invalid source type: {}", e))?, enabled: row.get("enabled"), config: row.get("config"), status: row.get::("status").try_into() .map_err(|e| anyhow::anyhow!("Invalid source status: {}", e))?, last_sync_at: row.get("last_sync_at"), last_error: row.get("last_error"), last_error_at: row.get("last_error_at"), total_files_synced: row.get("total_files_synced"), total_files_pending: row.get("total_files_pending"), total_size_bytes: row.get("total_size_bytes"), created_at: row.get("created_at"), updated_at: row.get("updated_at"), }); } Ok(sources) } pub async fn get_sources_for_sync(&self) -> Result> { let rows = sqlx::query( r#"SELECT id, user_id, name, source_type, enabled, config, status, last_sync_at, last_error, last_error_at, total_files_synced, total_files_pending, total_size_bytes, created_at, updated_at FROM sources WHERE enabled = true AND status != 'syncing' ORDER BY last_sync_at ASC NULLS FIRST"# ) .fetch_all(&self.pool) .await?; let mut sources = Vec::new(); for row in rows { sources.push(crate::models::Source { id: row.get("id"), user_id: row.get("user_id"), name: row.get("name"), source_type: row.get::("source_type").try_into() .map_err(|e| anyhow::anyhow!("Invalid source type: {}", e))?, enabled: row.get("enabled"), config: row.get("config"), status: row.get::("status").try_into() .map_err(|e| anyhow::anyhow!("Invalid source status: {}", e))?, last_sync_at: row.get("last_sync_at"), last_error: row.get("last_error"), last_error_at: row.get("last_error_at"), total_files_synced: row.get("total_files_synced"), total_files_pending: row.get("total_files_pending"), total_size_bytes: row.get("total_size_bytes"), created_at: row.get("created_at"), updated_at: row.get("updated_at"), }); } Ok(sources) } pub async fn get_source_by_id(&self, source_id: Uuid) -> Result> { let row = sqlx::query( r#"SELECT id, user_id, name, source_type, enabled, config, status, last_sync_at, last_error, last_error_at, total_files_synced, total_files_pending, total_size_bytes, created_at, updated_at FROM sources WHERE id = $1"# ) .bind(source_id) .fetch_optional(&self.pool) .await?; if let Some(row) = row { Ok(Some(crate::models::Source { id: row.get("id"), user_id: row.get("user_id"), name: row.get("name"), source_type: row.get::("source_type").try_into() .map_err(|e| anyhow::anyhow!("Invalid source type: {}", e))?, enabled: row.get("enabled"), config: row.get("config"), status: row.get::("status").try_into() .map_err(|e| anyhow::anyhow!("Invalid source status: {}", e))?, last_sync_at: row.get("last_sync_at"), last_error: row.get("last_error"), last_error_at: row.get("last_error_at"), total_files_synced: row.get("total_files_synced"), total_files_pending: row.get("total_files_pending"), total_size_bytes: row.get("total_size_bytes"), created_at: row.get("created_at"), updated_at: row.get("updated_at"), })) } else { Ok(None) } } // Processed images operations pub async fn create_processed_image(&self, processed_image: &crate::models::CreateProcessedImage) -> Result { let row = sqlx::query( r#"INSERT INTO processed_images (document_id, user_id, original_image_path, processed_image_path, processing_parameters, processing_steps, image_width, image_height, file_size) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) RETURNING id, document_id, user_id, original_image_path, processed_image_path, processing_parameters, processing_steps, image_width, image_height, file_size, created_at"# ) .bind(processed_image.document_id) .bind(processed_image.user_id) .bind(&processed_image.original_image_path) .bind(&processed_image.processed_image_path) .bind(&processed_image.processing_parameters) .bind(&processed_image.processing_steps) .bind(processed_image.image_width) .bind(processed_image.image_height) .bind(processed_image.file_size) .fetch_one(&self.pool) .await?; Ok(crate::models::ProcessedImage { id: row.get("id"), document_id: row.get("document_id"), user_id: row.get("user_id"), original_image_path: row.get("original_image_path"), processed_image_path: row.get("processed_image_path"), processing_parameters: row.get("processing_parameters"), processing_steps: row.get("processing_steps"), image_width: row.get("image_width"), image_height: row.get("image_height"), file_size: row.get("file_size"), created_at: row.get("created_at"), }) } pub async fn get_processed_image_by_document_id(&self, document_id: Uuid, user_id: Uuid) -> Result> { let row = sqlx::query( r#"SELECT id, document_id, user_id, original_image_path, processed_image_path, processing_parameters, processing_steps, image_width, image_height, file_size, created_at FROM processed_images WHERE document_id = $1 AND user_id = $2 ORDER BY created_at DESC LIMIT 1"# ) .bind(document_id) .bind(user_id) .fetch_optional(&self.pool) .await?; if let Some(row) = row { Ok(Some(crate::models::ProcessedImage { id: row.get("id"), document_id: row.get("document_id"), user_id: row.get("user_id"), original_image_path: row.get("original_image_path"), processed_image_path: row.get("processed_image_path"), processing_parameters: row.get("processing_parameters"), processing_steps: row.get("processing_steps"), image_width: row.get("image_width"), image_height: row.get("image_height"), file_size: row.get("file_size"), created_at: row.get("created_at"), })) } else { Ok(None) } } pub async fn delete_processed_images_by_document_id(&self, document_id: Uuid) -> Result<()> { sqlx::query("DELETE FROM processed_images WHERE document_id = $1") .bind(document_id) .execute(&self.pool) .await?; Ok(()) } }