diff --git a/src/db/documents.rs b/src/db/documents.rs deleted file mode 100644 index ad33af3..0000000 --- a/src/db/documents.rs +++ /dev/null @@ -1,1999 +0,0 @@ -use anyhow::Result; -use sqlx::{Row, QueryBuilder}; -use uuid::Uuid; - -use crate::models::{Document, SearchRequest, SearchMode, SearchSnippet, HighlightRange, EnhancedDocumentResponse}; -use crate::routes::labels::Label; -use super::Database; - -impl Database { - pub async fn create_document(&self, document: Document) -> Result { - let row = sqlx::query( - r#" - INSERT INTO documents (id, filename, original_filename, file_path, file_size, mime_type, content, ocr_text, ocr_confidence, ocr_word_count, ocr_processing_time_ms, ocr_status, ocr_error, ocr_completed_at, ocr_retry_count, ocr_failure_reason, tags, created_at, updated_at, user_id, file_hash, original_created_at, original_modified_at, source_metadata) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24) - RETURNING id, filename, original_filename, file_path, file_size, mime_type, content, ocr_text, ocr_confidence, ocr_word_count, ocr_processing_time_ms, ocr_status, ocr_error, ocr_completed_at, ocr_retry_count, ocr_failure_reason, tags, created_at, updated_at, user_id, file_hash, original_created_at, original_modified_at, source_metadata - "# - ) - .bind(document.id) - .bind(&document.filename) - .bind(&document.original_filename) - .bind(&document.file_path) - .bind(document.file_size) - .bind(&document.mime_type) - .bind(&document.content) - .bind(&document.ocr_text) - .bind(document.ocr_confidence) - .bind(document.ocr_word_count) - .bind(document.ocr_processing_time_ms) - .bind(&document.ocr_status) - .bind(&document.ocr_error) - .bind(document.ocr_completed_at) - .bind(document.ocr_retry_count) - .bind(&document.ocr_failure_reason) - .bind(&document.tags) - .bind(document.created_at) - .bind(document.updated_at) - .bind(document.user_id) - .bind(&document.file_hash) - .bind(document.original_created_at) - .bind(document.original_modified_at) - .bind(&document.source_metadata) - .fetch_one(&self.pool) - .await?; - - Ok(Document { - id: row.get("id"), - filename: row.get("filename"), - original_filename: row.get("original_filename"), - file_path: row.get("file_path"), - file_size: row.get("file_size"), - mime_type: row.get("mime_type"), - content: row.get("content"), - ocr_text: row.get("ocr_text"), - ocr_confidence: row.get("ocr_confidence"), - ocr_word_count: row.get("ocr_word_count"), - ocr_processing_time_ms: row.get("ocr_processing_time_ms"), - ocr_status: row.get("ocr_status"), - ocr_error: row.get("ocr_error"), - ocr_completed_at: row.get("ocr_completed_at"), - ocr_retry_count: row.get("ocr_retry_count"), - ocr_failure_reason: row.get("ocr_failure_reason"), - tags: row.get("tags"), - created_at: row.get("created_at"), - updated_at: row.get("updated_at"), - user_id: row.get("user_id"), - file_hash: row.get("file_hash"), - original_created_at: row.get("original_created_at"), - original_modified_at: row.get("original_modified_at"), - source_metadata: row.get("source_metadata"), - }) - } - - pub async fn get_documents_by_user_with_role(&self, user_id: Uuid, user_role: crate::models::UserRole, limit: i64, offset: i64) -> Result> { - let query = if user_role == crate::models::UserRole::Admin { - // Admins can see all documents - r#" - SELECT id, filename, original_filename, file_path, file_size, mime_type, content, ocr_text, ocr_confidence, ocr_word_count, ocr_processing_time_ms, ocr_status, ocr_error, ocr_completed_at, ocr_retry_count, ocr_failure_reason, tags, created_at, updated_at, user_id, file_hash, original_created_at, original_modified_at, source_metadata - FROM documents - ORDER BY created_at DESC - LIMIT $1 OFFSET $2 - "# - } else { - // Regular users can only see their own documents - r#" - SELECT id, filename, original_filename, file_path, file_size, mime_type, content, ocr_text, ocr_confidence, ocr_word_count, ocr_processing_time_ms, ocr_status, ocr_error, ocr_completed_at, ocr_retry_count, ocr_failure_reason, tags, created_at, updated_at, user_id, file_hash, original_created_at, original_modified_at, source_metadata - FROM documents - WHERE user_id = $3 - ORDER BY created_at DESC - LIMIT $1 OFFSET $2 - "# - }; - - let rows = if user_role == crate::models::UserRole::Admin { - sqlx::query(query) - .bind(limit) - .bind(offset) - .fetch_all(&self.pool) - .await? - } else { - sqlx::query(query) - .bind(limit) - .bind(offset) - .bind(user_id) - .fetch_all(&self.pool) - .await? - }; - - let documents = rows - .into_iter() - .map(|row| Document { - id: row.get("id"), - filename: row.get("filename"), - original_filename: row.get("original_filename"), - file_path: row.get("file_path"), - file_size: row.get("file_size"), - mime_type: row.get("mime_type"), - content: row.get("content"), - ocr_text: row.get("ocr_text"), - ocr_confidence: row.get("ocr_confidence"), - ocr_word_count: row.get("ocr_word_count"), - ocr_processing_time_ms: row.get("ocr_processing_time_ms"), - ocr_status: row.get("ocr_status"), - ocr_error: row.get("ocr_error"), - ocr_completed_at: row.get("ocr_completed_at"), - ocr_retry_count: row.get("ocr_retry_count"), - ocr_failure_reason: row.get("ocr_failure_reason"), - tags: row.get("tags"), - created_at: row.get("created_at"), - updated_at: row.get("updated_at"), - user_id: row.get("user_id"), - file_hash: row.get("file_hash"), - original_created_at: row.get("original_created_at"), - original_modified_at: row.get("original_modified_at"), - source_metadata: row.get("source_metadata"), - }) - .collect(); - - Ok(documents) - } - - pub async fn get_documents_by_user_with_role_and_filter(&self, user_id: Uuid, user_role: crate::models::UserRole, limit: i64, offset: i64, ocr_status: Option<&str>) -> Result> { - let rows = match (user_role == crate::models::UserRole::Admin, ocr_status) { - (true, Some(status)) => { - // Admin with OCR filter - sqlx::query( - r#" - SELECT id, filename, original_filename, file_path, file_size, mime_type, content, ocr_text, ocr_confidence, ocr_word_count, ocr_processing_time_ms, ocr_status, ocr_error, ocr_completed_at, ocr_retry_count, ocr_failure_reason, tags, created_at, updated_at, user_id, file_hash, original_created_at, original_modified_at, source_metadata - FROM documents - WHERE ocr_status = $3 - ORDER BY created_at DESC - LIMIT $1 OFFSET $2 - "# - ) - .bind(limit) - .bind(offset) - .bind(status) - .fetch_all(&self.pool) - .await? - } - (true, None) => { - // Admin without OCR filter - sqlx::query( - r#" - SELECT id, filename, original_filename, file_path, file_size, mime_type, content, ocr_text, ocr_confidence, ocr_word_count, ocr_processing_time_ms, ocr_status, ocr_error, ocr_completed_at, ocr_retry_count, ocr_failure_reason, tags, created_at, updated_at, user_id, file_hash, original_created_at, original_modified_at, source_metadata - FROM documents - ORDER BY created_at DESC - LIMIT $1 OFFSET $2 - "# - ) - .bind(limit) - .bind(offset) - .fetch_all(&self.pool) - .await? - } - (false, Some(status)) => { - // Regular user with OCR filter - sqlx::query( - r#" - SELECT id, filename, original_filename, file_path, file_size, mime_type, content, ocr_text, ocr_confidence, ocr_word_count, ocr_processing_time_ms, ocr_status, ocr_error, ocr_completed_at, ocr_retry_count, ocr_failure_reason, tags, created_at, updated_at, user_id, file_hash, original_created_at, original_modified_at, source_metadata - FROM documents - WHERE user_id = $3 AND ocr_status = $4 - ORDER BY created_at DESC - LIMIT $1 OFFSET $2 - "# - ) - .bind(limit) - .bind(offset) - .bind(user_id) - .bind(status) - .fetch_all(&self.pool) - .await? - } - (false, None) => { - // Regular user without OCR filter - sqlx::query( - r#" - SELECT id, filename, original_filename, file_path, file_size, mime_type, content, ocr_text, ocr_confidence, ocr_word_count, ocr_processing_time_ms, ocr_status, ocr_error, ocr_completed_at, ocr_retry_count, ocr_failure_reason, tags, created_at, updated_at, user_id, file_hash, original_created_at, original_modified_at, source_metadata - FROM documents - WHERE user_id = $3 - ORDER BY created_at DESC - LIMIT $1 OFFSET $2 - "# - ) - .bind(limit) - .bind(offset) - .bind(user_id) - .fetch_all(&self.pool) - .await? - } - }; - - let documents = rows - .into_iter() - .map(|row| Document { - id: row.get("id"), - filename: row.get("filename"), - original_filename: row.get("original_filename"), - file_path: row.get("file_path"), - file_size: row.get("file_size"), - mime_type: row.get("mime_type"), - content: row.get("content"), - ocr_text: row.get("ocr_text"), - ocr_confidence: row.get("ocr_confidence"), - ocr_word_count: row.get("ocr_word_count"), - ocr_processing_time_ms: row.get("ocr_processing_time_ms"), - ocr_status: row.get("ocr_status"), - ocr_error: row.get("ocr_error"), - ocr_completed_at: row.get("ocr_completed_at"), - ocr_retry_count: row.get("ocr_retry_count"), - ocr_failure_reason: row.get("ocr_failure_reason"), - tags: row.get("tags"), - created_at: row.get("created_at"), - updated_at: row.get("updated_at"), - user_id: row.get("user_id"), - file_hash: row.get("file_hash"), - original_created_at: row.get("original_created_at"), - original_modified_at: row.get("original_modified_at"), - source_metadata: row.get("source_metadata"), - }) - .collect(); - - Ok(documents) - } - - pub async fn get_documents_count_with_role_and_filter(&self, user_id: Uuid, user_role: crate::models::UserRole, ocr_status: Option<&str>) -> Result { - let count = match (user_role == crate::models::UserRole::Admin, ocr_status) { - (true, Some(status)) => { - // Admin with OCR filter - sqlx::query_scalar::<_, i64>( - "SELECT COUNT(*) FROM documents WHERE ocr_status = $1" - ) - .bind(status) - .fetch_one(&self.pool) - .await? - } - (true, None) => { - // Admin without OCR filter - sqlx::query_scalar::<_, i64>( - "SELECT COUNT(*) FROM documents" - ) - .fetch_one(&self.pool) - .await? - } - (false, Some(status)) => { - // Regular user with OCR filter - sqlx::query_scalar::<_, i64>( - "SELECT COUNT(*) FROM documents WHERE user_id = $1 AND ocr_status = $2" - ) - .bind(user_id) - .bind(status) - .fetch_one(&self.pool) - .await? - } - (false, None) => { - // Regular user without OCR filter - sqlx::query_scalar::<_, i64>( - "SELECT COUNT(*) FROM documents WHERE user_id = $1" - ) - .bind(user_id) - .fetch_one(&self.pool) - .await? - } - }; - - Ok(count) - } - - pub async fn get_documents_by_user(&self, user_id: Uuid, limit: i64, offset: i64) -> Result> { - let rows = sqlx::query( - r#" - SELECT id, filename, original_filename, file_path, file_size, mime_type, content, ocr_text, ocr_confidence, ocr_word_count, ocr_processing_time_ms, ocr_status, ocr_error, ocr_completed_at, ocr_retry_count, ocr_failure_reason, tags, created_at, updated_at, user_id, file_hash, original_created_at, original_modified_at, source_metadata - FROM documents - WHERE user_id = $1 - ORDER BY created_at DESC - LIMIT $2 OFFSET $3 - "# - ) - .bind(user_id) - .bind(limit) - .bind(offset) - .fetch_all(&self.pool) - .await?; - - let documents = rows - .into_iter() - .map(|row| Document { - id: row.get("id"), - filename: row.get("filename"), - original_filename: row.get("original_filename"), - file_path: row.get("file_path"), - file_size: row.get("file_size"), - mime_type: row.get("mime_type"), - content: row.get("content"), - ocr_text: row.get("ocr_text"), - ocr_confidence: row.get("ocr_confidence"), - ocr_word_count: row.get("ocr_word_count"), - ocr_processing_time_ms: row.get("ocr_processing_time_ms"), - ocr_status: row.get("ocr_status"), - ocr_error: row.get("ocr_error"), - ocr_completed_at: row.get("ocr_completed_at"), - ocr_retry_count: row.get("ocr_retry_count"), - ocr_failure_reason: row.get("ocr_failure_reason"), - tags: row.get("tags"), - created_at: row.get("created_at"), - updated_at: row.get("updated_at"), - user_id: row.get("user_id"), - file_hash: row.get("file_hash"), - original_created_at: row.get("original_created_at"), - original_modified_at: row.get("original_modified_at"), - source_metadata: row.get("source_metadata"), - }) - .collect(); - - Ok(documents) - } - - pub async fn find_documents_by_filename(&self, filename: &str) -> Result> { - let rows = sqlx::query( - r#" - SELECT id, filename, original_filename, file_path, file_size, mime_type, content, ocr_text, ocr_confidence, ocr_word_count, ocr_processing_time_ms, ocr_status, ocr_error, ocr_completed_at, ocr_retry_count, ocr_failure_reason, tags, created_at, updated_at, user_id, file_hash, original_created_at, original_modified_at, source_metadata - FROM documents - WHERE filename = $1 OR original_filename = $1 - ORDER BY created_at DESC - "# - ) - .bind(filename) - .fetch_all(&self.pool) - .await?; - - let documents = rows - .into_iter() - .map(|row| Document { - id: row.get("id"), - filename: row.get("filename"), - original_filename: row.get("original_filename"), - file_path: row.get("file_path"), - file_size: row.get("file_size"), - mime_type: row.get("mime_type"), - content: row.get("content"), - ocr_text: row.get("ocr_text"), - ocr_confidence: row.get("ocr_confidence"), - ocr_word_count: row.get("ocr_word_count"), - ocr_processing_time_ms: row.get("ocr_processing_time_ms"), - ocr_status: row.get("ocr_status"), - ocr_error: row.get("ocr_error"), - ocr_completed_at: row.get("ocr_completed_at"), - ocr_retry_count: row.get("ocr_retry_count"), - ocr_failure_reason: row.get("ocr_failure_reason"), - tags: row.get("tags"), - created_at: row.get("created_at"), - updated_at: row.get("updated_at"), - user_id: row.get("user_id"), - file_hash: row.get("file_hash"), - original_created_at: row.get("original_created_at"), - original_modified_at: row.get("original_modified_at"), - source_metadata: row.get("source_metadata"), - }) - .collect(); - - Ok(documents) - } - - pub async fn search_documents(&self, user_id: Uuid, search: SearchRequest) -> Result<(Vec, i64)> { - let mut query_builder = QueryBuilder::new( - r#" - SELECT id, filename, original_filename, file_path, file_size, mime_type, content, ocr_text, ocr_confidence, ocr_word_count, ocr_processing_time_ms, ocr_status, ocr_error, ocr_completed_at, ocr_retry_count, ocr_failure_reason, tags, created_at, updated_at, user_id, file_hash, original_created_at, original_modified_at, source_metadata, - ts_rank(to_tsvector('english', COALESCE(content, '') || ' ' || COALESCE(ocr_text, '')), plainto_tsquery('english', "# - ); - - query_builder.push_bind(&search.query); - query_builder.push(")) as rank FROM documents WHERE user_id = "); - query_builder.push_bind(user_id); - query_builder.push(" AND to_tsvector('english', COALESCE(content, '') || ' ' || COALESCE(ocr_text, '')) @@ plainto_tsquery('english', "); - query_builder.push_bind(&search.query); - query_builder.push(")"); - - if let Some(tags) = &search.tags { - if !tags.is_empty() { - query_builder.push(" AND tags && "); - query_builder.push_bind(tags); - } - } - - if let Some(mime_types) = &search.mime_types { - if !mime_types.is_empty() { - query_builder.push(" AND mime_type = ANY("); - query_builder.push_bind(mime_types); - query_builder.push(")"); - } - } - - query_builder.push(" ORDER BY rank DESC, created_at DESC"); - - if let Some(limit) = search.limit { - query_builder.push(" LIMIT "); - query_builder.push_bind(limit); - } - - if let Some(offset) = search.offset { - query_builder.push(" OFFSET "); - query_builder.push_bind(offset); - } - - let rows = query_builder.build().fetch_all(&self.pool).await?; - - let documents = rows - .into_iter() - .map(|row| Document { - id: row.get("id"), - filename: row.get("filename"), - original_filename: row.get("original_filename"), - file_path: row.get("file_path"), - file_size: row.get("file_size"), - mime_type: row.get("mime_type"), - content: row.get("content"), - ocr_text: row.get("ocr_text"), - ocr_confidence: row.get("ocr_confidence"), - ocr_word_count: row.get("ocr_word_count"), - ocr_processing_time_ms: row.get("ocr_processing_time_ms"), - ocr_status: row.get("ocr_status"), - ocr_error: row.get("ocr_error"), - ocr_completed_at: row.get("ocr_completed_at"), - ocr_retry_count: row.get("ocr_retry_count"), - ocr_failure_reason: row.get("ocr_failure_reason"), - tags: row.get("tags"), - created_at: row.get("created_at"), - updated_at: row.get("updated_at"), - user_id: row.get("user_id"), - file_hash: row.get("file_hash"), - original_created_at: row.get("original_created_at"), - original_modified_at: row.get("original_modified_at"), - source_metadata: row.get("source_metadata"), - }) - .collect(); - - let total_row = sqlx::query( - r#" - SELECT COUNT(*) as total FROM documents - WHERE user_id = $1 - AND to_tsvector('english', COALESCE(content, '') || ' ' || COALESCE(ocr_text, '')) @@ plainto_tsquery('english', $2) - "# - ) - .bind(user_id) - .bind(&search.query) - .fetch_one(&self.pool) - .await?; - - let total: i64 = total_row.get("total"); - - Ok((documents, total)) - } - - pub async fn enhanced_search_documents_with_role(&self, user_id: Uuid, user_role: crate::models::UserRole, search: SearchRequest) -> Result<(Vec, i64, u64)> { - let start_time = std::time::Instant::now(); - - // Build search query based on search mode with enhanced substring matching - let search_mode = search.search_mode.as_ref().unwrap_or(&SearchMode::Simple); - - // For fuzzy mode, we'll use similarity matching which is better for substrings - let use_similarity = matches!(search_mode, SearchMode::Fuzzy); - - let user_filter = if user_role == crate::models::UserRole::Admin { - // Admins can search all documents - "" - } else { - // Regular users can only search their own documents - " AND user_id = " - }; - - let mut query_builder = if use_similarity { - // Use trigram similarity for substring matching - let mut builder = QueryBuilder::new( - r#" - SELECT id, filename, original_filename, file_path, file_size, mime_type, content, ocr_text, ocr_confidence, ocr_word_count, ocr_processing_time_ms, ocr_status, ocr_error, ocr_completed_at, ocr_retry_count, ocr_failure_reason, tags, created_at, updated_at, user_id, file_hash, original_created_at, original_modified_at, source_metadata, - GREATEST( - similarity(filename, "# - ); - builder.push_bind(&search.query); - builder.push(r#"), - similarity(COALESCE(content, '') || ' ' || COALESCE(ocr_text, ''), "#); - builder.push_bind(&search.query); - builder.push(r#"), - ts_rank(to_tsvector('english', COALESCE(content, '') || ' ' || COALESCE(ocr_text, '')), plainto_tsquery('english', "#); - builder.push_bind(&search.query); - builder.push(r#")) - ) as rank - FROM documents - WHERE ( - filename % "#); - builder.push_bind(&search.query); - builder.push(r#" OR - (COALESCE(content, '') || ' ' || COALESCE(ocr_text, '')) % "#); - builder.push_bind(&search.query); - builder.push(r#" OR - to_tsvector('english', COALESCE(content, '') || ' ' || COALESCE(ocr_text, '')) @@ plainto_tsquery('english', "#); - builder.push_bind(&search.query); - builder.push(r#") - )"#); - - if !user_filter.is_empty() { - builder.push(user_filter); - builder.push_bind(user_id); - } - - builder - } else { - // Use traditional full-text search with enhanced ranking - let query_function = match search_mode { - SearchMode::Simple => "plainto_tsquery", - SearchMode::Phrase => "phraseto_tsquery", - SearchMode::Boolean => "to_tsquery", - SearchMode::Fuzzy => "plainto_tsquery", // fallback - }; - - let mut builder = QueryBuilder::new(&format!( - r#" - SELECT id, filename, original_filename, file_path, file_size, mime_type, content, ocr_text, ocr_confidence, ocr_word_count, ocr_processing_time_ms, ocr_status, ocr_error, ocr_completed_at, ocr_retry_count, ocr_failure_reason, tags, created_at, updated_at, user_id, file_hash, original_created_at, original_modified_at, source_metadata, - GREATEST( - CASE WHEN filename ILIKE '%' || "# - )); - builder.push_bind(&search.query); - builder.push(&format!(r#" || '%' THEN 0.8 ELSE 0 END, - ts_rank(to_tsvector('english', COALESCE(content, '') || ' ' || COALESCE(ocr_text, '')), {}('english', "#, query_function)); - builder.push_bind(&search.query); - builder.push(&format!(r#")) - ) as rank - FROM documents - WHERE ( - filename ILIKE '%' || "#)); - builder.push_bind(&search.query); - builder.push(&format!(r#" || '%' OR - to_tsvector('english', COALESCE(content, '') || ' ' || COALESCE(ocr_text, '')) @@ {}('english', "#, query_function)); - builder.push_bind(&search.query); - builder.push(r#") - )"#); - - if !user_filter.is_empty() { - builder.push(user_filter); - builder.push_bind(user_id); - } - - builder - }; - - if let Some(tags) = &search.tags { - if !tags.is_empty() { - query_builder.push(" AND tags && "); - query_builder.push_bind(tags); - } - } - - if let Some(mime_types) = &search.mime_types { - if !mime_types.is_empty() { - query_builder.push(" AND mime_type = ANY("); - query_builder.push_bind(mime_types); - query_builder.push(")"); - } - } - - query_builder.push(" ORDER BY rank DESC, created_at DESC"); - - if let Some(limit) = search.limit { - query_builder.push(" LIMIT "); - query_builder.push_bind(limit); - } - - if let Some(offset) = search.offset { - query_builder.push(" OFFSET "); - query_builder.push_bind(offset); - } - - let rows = query_builder.build().fetch_all(&self.pool).await?; - - let include_snippets = search.include_snippets.unwrap_or(true); - let snippet_length = search.snippet_length.unwrap_or(200); - - let mut documents = Vec::new(); - for row in rows { - let doc_id: Uuid = row.get("id"); - let content: Option = row.get("content"); - let ocr_text: Option = row.get("ocr_text"); - let rank: f32 = row.get("rank"); - - let snippets = if include_snippets { - self.generate_snippets(&search.query, content.as_deref(), ocr_text.as_deref(), snippet_length) - } else { - Vec::new() - }; - - documents.push(EnhancedDocumentResponse { - id: doc_id, - filename: row.get("filename"), - original_filename: row.get("original_filename"), - file_size: row.get("file_size"), - mime_type: row.get("mime_type"), - tags: row.get("tags"), - created_at: row.get("created_at"), - has_ocr_text: ocr_text.is_some(), - ocr_confidence: row.get("ocr_confidence"), - ocr_word_count: row.get("ocr_word_count"), - ocr_processing_time_ms: row.get("ocr_processing_time_ms"), - ocr_status: row.get("ocr_status"), - search_rank: Some(rank), - snippets, - }); - } - - // Get the query function for total count - let query_function = if use_similarity { - "plainto_tsquery" - } else { - match search_mode { - SearchMode::Simple => "plainto_tsquery", - SearchMode::Phrase => "phraseto_tsquery", - SearchMode::Boolean => "to_tsquery", - SearchMode::Fuzzy => "plainto_tsquery", - } - }; - - let total_row = if user_role == crate::models::UserRole::Admin { - sqlx::query(&format!( - r#" - SELECT COUNT(*) as total FROM documents - WHERE to_tsvector('english', COALESCE(content, '') || ' ' || COALESCE(ocr_text, '')) @@ {}('english', $1) - "#, query_function - )) - .bind(&search.query) - .fetch_one(&self.pool) - .await? - } else { - sqlx::query(&format!( - r#" - SELECT COUNT(*) as total FROM documents - WHERE user_id = $1 - AND to_tsvector('english', COALESCE(content, '') || ' ' || COALESCE(ocr_text, '')) @@ {}('english', $2) - "#, query_function - )) - .bind(user_id) - .bind(&search.query) - .fetch_one(&self.pool) - .await? - }; - - let total: i64 = total_row.get("total"); - let query_time = start_time.elapsed().as_millis() as u64; - - Ok((documents, total, query_time)) - } - - pub async fn enhanced_search_documents(&self, user_id: Uuid, search: SearchRequest) -> Result<(Vec, i64, u64)> { - let start_time = std::time::Instant::now(); - - // Build search query based on search mode with enhanced substring matching - let search_mode = search.search_mode.as_ref().unwrap_or(&SearchMode::Simple); - - // For fuzzy mode, we'll use similarity matching which is better for substrings - let use_similarity = matches!(search_mode, SearchMode::Fuzzy); - - let mut query_builder = if use_similarity { - // Use trigram similarity for substring matching - let mut builder = QueryBuilder::new( - r#" - SELECT id, filename, original_filename, file_path, file_size, mime_type, content, ocr_text, ocr_confidence, ocr_word_count, ocr_processing_time_ms, ocr_status, ocr_error, ocr_completed_at, ocr_retry_count, ocr_failure_reason, tags, created_at, updated_at, user_id, file_hash, original_created_at, original_modified_at, source_metadata, - GREATEST( - similarity(filename, "# - ); - builder.push_bind(&search.query); - builder.push(r#"), - similarity(COALESCE(content, '') || ' ' || COALESCE(ocr_text, ''), "#); - builder.push_bind(&search.query); - builder.push(r#"), - ts_rank(to_tsvector('english', COALESCE(content, '') || ' ' || COALESCE(ocr_text, '')), plainto_tsquery('english', "#); - builder.push_bind(&search.query); - builder.push(r#")) - ) as rank - FROM documents - WHERE user_id = "#); - builder.push_bind(user_id); - builder.push(r#" AND ( - filename % "#); - builder.push_bind(&search.query); - builder.push(r#" OR - (COALESCE(content, '') || ' ' || COALESCE(ocr_text, '')) % "#); - builder.push_bind(&search.query); - builder.push(r#" OR - to_tsvector('english', COALESCE(content, '') || ' ' || COALESCE(ocr_text, '')) @@ plainto_tsquery('english', "#); - builder.push_bind(&search.query); - builder.push(r#") - )"#); - builder - } else { - // Use traditional full-text search with enhanced ranking - let query_function = match search_mode { - SearchMode::Simple => "plainto_tsquery", - SearchMode::Phrase => "phraseto_tsquery", - SearchMode::Boolean => "to_tsquery", - SearchMode::Fuzzy => "plainto_tsquery", // fallback - }; - - let mut builder = QueryBuilder::new(&format!( - r#" - SELECT id, filename, original_filename, file_path, file_size, mime_type, content, ocr_text, ocr_confidence, ocr_word_count, ocr_processing_time_ms, ocr_status, ocr_error, ocr_completed_at, ocr_retry_count, ocr_failure_reason, tags, created_at, updated_at, user_id, file_hash, original_created_at, original_modified_at, source_metadata, - GREATEST( - CASE WHEN filename ILIKE '%' || "# - )); - builder.push_bind(&search.query); - builder.push(&format!(r#" || '%' THEN 0.8 ELSE 0 END, - ts_rank(to_tsvector('english', COALESCE(content, '') || ' ' || COALESCE(ocr_text, '')), {}('english', "#, query_function)); - builder.push_bind(&search.query); - builder.push(&format!(r#")) - ) as rank - FROM documents - WHERE user_id = "#)); - builder.push_bind(user_id); - builder.push(&format!(r#" AND ( - filename ILIKE '%' || "#)); - builder.push_bind(&search.query); - builder.push(&format!(r#" || '%' OR - to_tsvector('english', COALESCE(content, '') || ' ' || COALESCE(ocr_text, '')) @@ {}('english', "#, query_function)); - builder.push_bind(&search.query); - builder.push(r#") - )"#); - builder - }; - - if let Some(tags) = &search.tags { - if !tags.is_empty() { - query_builder.push(" AND tags && "); - query_builder.push_bind(tags); - } - } - - if let Some(mime_types) = &search.mime_types { - if !mime_types.is_empty() { - query_builder.push(" AND mime_type = ANY("); - query_builder.push_bind(mime_types); - query_builder.push(")"); - } - } - - query_builder.push(" ORDER BY rank DESC, created_at DESC"); - - if let Some(limit) = search.limit { - query_builder.push(" LIMIT "); - query_builder.push_bind(limit); - } - - if let Some(offset) = search.offset { - query_builder.push(" OFFSET "); - query_builder.push_bind(offset); - } - - let rows = query_builder.build().fetch_all(&self.pool).await?; - - let include_snippets = search.include_snippets.unwrap_or(true); - let snippet_length = search.snippet_length.unwrap_or(200); - - let mut documents = Vec::new(); - for row in rows { - let doc_id: Uuid = row.get("id"); - let content: Option = row.get("content"); - let ocr_text: Option = row.get("ocr_text"); - let rank: f32 = row.get("rank"); - - let snippets = if include_snippets { - self.generate_snippets(&search.query, content.as_deref(), ocr_text.as_deref(), snippet_length) - } else { - Vec::new() - }; - - documents.push(EnhancedDocumentResponse { - id: doc_id, - filename: row.get("filename"), - original_filename: row.get("original_filename"), - file_size: row.get("file_size"), - mime_type: row.get("mime_type"), - tags: row.get("tags"), - created_at: row.get("created_at"), - has_ocr_text: ocr_text.is_some(), - ocr_confidence: row.get("ocr_confidence"), - ocr_word_count: row.get("ocr_word_count"), - ocr_processing_time_ms: row.get("ocr_processing_time_ms"), - ocr_status: row.get("ocr_status"), - search_rank: Some(rank), - snippets, - }); - } - - // Get the query function for total count - let query_function = if use_similarity { - "plainto_tsquery" - } else { - match search_mode { - SearchMode::Simple => "plainto_tsquery", - SearchMode::Phrase => "phraseto_tsquery", - SearchMode::Boolean => "to_tsquery", - SearchMode::Fuzzy => "plainto_tsquery", - } - }; - - let total_row = sqlx::query(&format!( - r#" - SELECT COUNT(*) as total FROM documents - WHERE user_id = $1 - AND to_tsvector('english', COALESCE(content, '') || ' ' || COALESCE(ocr_text, '')) @@ {}('english', $2) - "#, query_function - )) - .bind(user_id) - .bind(&search.query) - .fetch_one(&self.pool) - .await?; - - let total: i64 = total_row.get("total"); - let query_time = start_time.elapsed().as_millis() as u64; - - Ok((documents, total, query_time)) - } - - fn generate_snippets(&self, query: &str, content: Option<&str>, ocr_text: Option<&str>, snippet_length: i32) -> Vec { - let mut snippets = Vec::new(); - - // Combine content and OCR text - let full_text = match (content, ocr_text) { - (Some(c), Some(o)) => format!("{} {}", c, o), - (Some(c), None) => c.to_string(), - (None, Some(o)) => o.to_string(), - (None, None) => return snippets, - }; - - // Enhanced substring matching for better context - let query_terms: Vec<&str> = query.split_whitespace().collect(); - let text_lower = full_text.to_lowercase(); - let query_lower = query.to_lowercase(); - - // Find exact matches first - let mut match_positions = Vec::new(); - - // 1. Look for exact query matches - for (i, _) in text_lower.match_indices(&query_lower) { - match_positions.push((i, query.len(), "exact")); - } - - // 2. Look for individual term matches (substring matching) - for term in &query_terms { - if term.len() >= 3 { // Only match terms of reasonable length - let term_lower = term.to_lowercase(); - for (i, _) in text_lower.match_indices(&term_lower) { - // Check if this isn't already part of an exact match - let is_duplicate = match_positions.iter().any(|(pos, len, _)| { - i >= *pos && i < *pos + *len - }); - if !is_duplicate { - match_positions.push((i, term.len(), "term")); - } - } - } - } - - // 3. Look for partial word matches (for "docu" -> "document" cases) - for term in &query_terms { - if term.len() >= 3 { - let term_lower = term.to_lowercase(); - // Find words that start with our search term - let words_regex = regex::Regex::new(&format!(r"\b{}[a-zA-Z]*\b", regex::escape(&term_lower))).unwrap(); - for mat in words_regex.find_iter(&text_lower) { - let is_duplicate = match_positions.iter().any(|(pos, len, _)| { - mat.start() >= *pos && mat.start() < *pos + *len - }); - if !is_duplicate { - match_positions.push((mat.start(), mat.end() - mat.start(), "partial")); - } - } - } - } - - // Sort matches by position and remove overlaps - match_positions.sort_by_key(|&(pos, _, _)| pos); - - // Generate snippets around matches - for (match_pos, match_len, _match_type) in match_positions.iter().take(5) { - let context_size = (snippet_length as usize).saturating_sub(*match_len) / 2; - - let snippet_start = match_pos.saturating_sub(context_size); - let snippet_end = std::cmp::min( - match_pos + match_len + context_size, - full_text.len() - ); - - // Find word boundaries to avoid cutting words - let snippet_start = self.find_word_boundary(&full_text, snippet_start, true); - let snippet_end = self.find_word_boundary(&full_text, snippet_end, false); - - if snippet_start < snippet_end && snippet_start < full_text.len() { - let snippet_text = &full_text[snippet_start..snippet_end]; - - // Find all highlight ranges within this snippet - let mut highlight_ranges = Vec::new(); - let snippet_lower = snippet_text.to_lowercase(); - - // Highlight exact query match - for (match_start, _) in snippet_lower.match_indices(&query_lower) { - highlight_ranges.push(HighlightRange { - start: match_start as i32, - end: (match_start + query.len()) as i32, - }); - } - - // Highlight individual terms if no exact match - if highlight_ranges.is_empty() { - for term in &query_terms { - if term.len() >= 3 { - let term_lower = term.to_lowercase(); - for (match_start, _) in snippet_lower.match_indices(&term_lower) { - highlight_ranges.push(HighlightRange { - start: match_start as i32, - end: (match_start + term.len()) as i32, - }); - } - } - } - } - - // Remove duplicate highlights and sort - highlight_ranges.sort_by_key(|r| r.start); - highlight_ranges.dedup_by_key(|r| r.start); - - snippets.push(SearchSnippet { - text: snippet_text.to_string(), - start_offset: snippet_start as i32, - end_offset: snippet_end as i32, - highlight_ranges, - }); - - // Limit to avoid too many snippets - if snippets.len() >= 3 { - break; - } - } - } - - snippets - } - - fn find_word_boundary(&self, text: &str, mut pos: usize, search_backward: bool) -> usize { - if pos >= text.len() { - return text.len(); - } - - let chars: Vec = text.chars().collect(); - - if search_backward { - // Search backward for word boundary - while pos > 0 && chars.get(pos.saturating_sub(1)).map_or(false, |c| c.is_alphanumeric()) { - pos = pos.saturating_sub(1); - } - } else { - // Search forward for word boundary - while pos < chars.len() && chars.get(pos).map_or(false, |c| c.is_alphanumeric()) { - pos += 1; - } - } - - // Convert back to byte position - chars.iter().take(pos).map(|c| c.len_utf8()).sum() - } - - pub async fn update_document_ocr(&self, id: Uuid, ocr_text: &str) -> Result<()> { - sqlx::query("UPDATE documents SET ocr_text = $1, updated_at = NOW() WHERE id = $2") - .bind(ocr_text) - .bind(id) - .execute(&self.pool) - .await?; - - Ok(()) - } - - pub async fn get_recent_documents_for_source(&self, source_id: Uuid, limit: i64) -> Result> { - let rows = sqlx::query( - r#"SELECT id, filename, original_filename, file_path, file_size, mime_type, content, ocr_text, ocr_confidence, ocr_word_count, ocr_processing_time_ms, ocr_status, ocr_error, ocr_completed_at, ocr_retry_count, ocr_failure_reason, tags, created_at, updated_at, user_id, file_hash, original_created_at, original_modified_at, source_metadata FROM documents - WHERE source_id = $1 - ORDER BY created_at DESC - LIMIT $2"# - ) - .bind(source_id) - .bind(limit) - .fetch_all(&self.pool) - .await?; - - let mut documents = Vec::new(); - for row in rows { - documents.push(Document { - id: row.get("id"), - filename: row.get("filename"), - original_filename: row.get("original_filename"), - file_path: row.get("file_path"), - file_size: row.get("file_size"), - mime_type: row.get("mime_type"), - content: row.get("content"), - ocr_text: row.get("ocr_text"), - ocr_confidence: row.get("ocr_confidence"), - ocr_word_count: row.get("ocr_word_count"), - ocr_processing_time_ms: row.get("ocr_processing_time_ms"), - ocr_status: row.get("ocr_status"), - ocr_error: row.get("ocr_error"), - ocr_completed_at: row.get("ocr_completed_at"), - ocr_retry_count: row.get("ocr_retry_count"), - ocr_failure_reason: row.get("ocr_failure_reason"), - tags: row.get("tags"), - created_at: row.get("created_at"), - updated_at: row.get("updated_at"), - user_id: row.get("user_id"), - file_hash: row.get("file_hash"), - original_created_at: row.get("original_created_at"), - original_modified_at: row.get("original_modified_at"), - source_metadata: row.get("source_metadata"), - }); - } - - Ok(documents) - } - - pub async fn get_mime_type_facets(&self, user_id: Uuid, user_role: crate::models::UserRole) -> Result> { - let query = if user_role == crate::models::UserRole::Admin { - // Admins see facets for all documents - r#" - SELECT mime_type, COUNT(*) as count - FROM documents - GROUP BY mime_type - ORDER BY count DESC - "# - } else { - // Regular users see facets for their own documents - r#" - SELECT mime_type, COUNT(*) as count - FROM documents - WHERE user_id = $1 - GROUP BY mime_type - ORDER BY count DESC - "# - }; - - let rows = if user_role == crate::models::UserRole::Admin { - sqlx::query(query) - .fetch_all(&self.pool) - .await? - } else { - sqlx::query(query) - .bind(user_id) - .fetch_all(&self.pool) - .await? - }; - - let facets = rows - .into_iter() - .map(|row| (row.get("mime_type"), row.get("count"))) - .collect(); - - Ok(facets) - } - - pub async fn get_tag_facets(&self, user_id: Uuid, user_role: crate::models::UserRole) -> Result> { - let query = if user_role == crate::models::UserRole::Admin { - // Admins see facets for all documents - r#" - SELECT UNNEST(tags) as tag, COUNT(*) as count - FROM documents - GROUP BY tag - ORDER BY count DESC - "# - } else { - // Regular users see facets for their own documents - r#" - SELECT UNNEST(tags) as tag, COUNT(*) as count - FROM documents - WHERE user_id = $1 - GROUP BY tag - ORDER BY count DESC - "# - }; - - let rows = if user_role == crate::models::UserRole::Admin { - sqlx::query(query) - .fetch_all(&self.pool) - .await? - } else { - sqlx::query(query) - .bind(user_id) - .fetch_all(&self.pool) - .await? - }; - - let facets = rows - .into_iter() - .map(|row| (row.get("tag"), row.get("count"))) - .collect(); - - Ok(facets) - } - - pub async fn get_document_by_id(&self, document_id: Uuid, user_id: Uuid, user_role: crate::models::UserRole) -> Result> { - let query = if user_role == crate::models::UserRole::Admin { - // Admins can see any document - r#" - SELECT id, filename, original_filename, file_path, file_size, mime_type, content, ocr_text, ocr_confidence, ocr_word_count, ocr_processing_time_ms, ocr_status, ocr_error, ocr_completed_at, ocr_retry_count, ocr_failure_reason, tags, created_at, updated_at, user_id, file_hash, original_created_at, original_modified_at, source_metadata - FROM documents - WHERE id = $1 - "# - } else { - // Regular users can only see their own documents - r#" - SELECT id, filename, original_filename, file_path, file_size, mime_type, content, ocr_text, ocr_confidence, ocr_word_count, ocr_processing_time_ms, ocr_status, ocr_error, ocr_completed_at, ocr_retry_count, ocr_failure_reason, tags, created_at, updated_at, user_id, file_hash, original_created_at, original_modified_at, source_metadata - FROM documents - WHERE id = $1 AND user_id = $2 - "# - }; - - let row = if user_role == crate::models::UserRole::Admin { - sqlx::query(query) - .bind(document_id) - .fetch_optional(&self.pool) - .await? - } else { - sqlx::query(query) - .bind(document_id) - .bind(user_id) - .fetch_optional(&self.pool) - .await? - }; - - match row { - Some(row) => Ok(Some(Document { - id: row.get("id"), - filename: row.get("filename"), - original_filename: row.get("original_filename"), - file_path: row.get("file_path"), - file_size: row.get("file_size"), - mime_type: row.get("mime_type"), - content: row.get("content"), - ocr_text: row.get("ocr_text"), - ocr_confidence: row.get("ocr_confidence"), - ocr_word_count: row.get("ocr_word_count"), - ocr_processing_time_ms: row.get("ocr_processing_time_ms"), - ocr_status: row.get("ocr_status"), - ocr_error: row.get("ocr_error"), - ocr_completed_at: row.get("ocr_completed_at"), - ocr_retry_count: row.get("ocr_retry_count"), - ocr_failure_reason: row.get("ocr_failure_reason"), - tags: row.get("tags"), - created_at: row.get("created_at"), - updated_at: row.get("updated_at"), - user_id: row.get("user_id"), - file_hash: row.get("file_hash"), - original_created_at: row.get("original_created_at"), - original_modified_at: row.get("original_modified_at"), - source_metadata: row.get("source_metadata"), - })), - None => Ok(None), - } - } - - /// Check if a document with the given file hash already exists for the user - pub async fn get_document_by_user_and_hash(&self, user_id: Uuid, file_hash: &str) -> Result> { - let row = sqlx::query( - r#" - SELECT id, filename, original_filename, file_path, file_size, mime_type, content, ocr_text, ocr_confidence, ocr_word_count, ocr_processing_time_ms, ocr_status, ocr_error, ocr_completed_at, ocr_retry_count, ocr_failure_reason, tags, created_at, updated_at, user_id, file_hash, original_created_at, original_modified_at, source_metadata - FROM documents - WHERE user_id = $1 AND file_hash = $2 - LIMIT 1 - "# - ) - .bind(user_id) - .bind(file_hash) - .fetch_optional(&self.pool) - .await?; - - match row { - Some(row) => Ok(Some(Document { - id: row.get("id"), - filename: row.get("filename"), - original_filename: row.get("original_filename"), - file_path: row.get("file_path"), - file_size: row.get("file_size"), - mime_type: row.get("mime_type"), - content: row.get("content"), - ocr_text: row.get("ocr_text"), - ocr_confidence: row.get("ocr_confidence"), - ocr_word_count: row.get("ocr_word_count"), - ocr_processing_time_ms: row.get("ocr_processing_time_ms"), - ocr_status: row.get("ocr_status"), - ocr_error: row.get("ocr_error"), - ocr_completed_at: row.get("ocr_completed_at"), - ocr_retry_count: row.get("ocr_retry_count"), - ocr_failure_reason: row.get("ocr_failure_reason"), - tags: row.get("tags"), - created_at: row.get("created_at"), - updated_at: row.get("updated_at"), - user_id: row.get("user_id"), - file_hash: row.get("file_hash"), - original_created_at: row.get("original_created_at"), - original_modified_at: row.get("original_modified_at"), - source_metadata: row.get("source_metadata"), - })), - None => Ok(None), - } - } - - /// Get documents grouped by duplicate hashes for a user - pub async fn get_user_duplicates(&self, user_id: Uuid, user_role: crate::models::UserRole, limit: i64, offset: i64) -> Result<(Vec, i64)> { - let (docs_query, count_query) = if user_role == crate::models::UserRole::Admin { - // Admins can see all duplicates - ( - r#" - SELECT - file_hash, - COUNT(*) as duplicate_count, - MIN(created_at) as first_uploaded, - MAX(created_at) as last_uploaded, - json_agg( - json_build_object( - 'id', id, - 'filename', filename, - 'original_filename', original_filename, - 'file_size', file_size, - 'mime_type', mime_type, - 'created_at', created_at, - 'user_id', user_id - ) ORDER BY created_at - ) as documents - FROM documents - WHERE file_hash IS NOT NULL - GROUP BY file_hash - HAVING COUNT(*) > 1 - ORDER BY duplicate_count DESC, first_uploaded DESC - LIMIT $1 OFFSET $2 - "#, - r#" - SELECT COUNT(*) as total FROM ( - SELECT file_hash - FROM documents - WHERE file_hash IS NOT NULL - GROUP BY file_hash - HAVING COUNT(*) > 1 - ) as duplicate_groups - "# - ) - } else { - // Regular users see only their own duplicates - ( - r#" - SELECT - file_hash, - COUNT(*) as duplicate_count, - MIN(created_at) as first_uploaded, - MAX(created_at) as last_uploaded, - json_agg( - json_build_object( - 'id', id, - 'filename', filename, - 'original_filename', original_filename, - 'file_size', file_size, - 'mime_type', mime_type, - 'created_at', created_at, - 'user_id', user_id - ) ORDER BY created_at - ) as documents - FROM documents - WHERE user_id = $3 AND file_hash IS NOT NULL - GROUP BY file_hash - HAVING COUNT(*) > 1 - ORDER BY duplicate_count DESC, first_uploaded DESC - LIMIT $1 OFFSET $2 - "#, - r#" - SELECT COUNT(*) as total FROM ( - SELECT file_hash - FROM documents - WHERE user_id = $1 AND file_hash IS NOT NULL - GROUP BY file_hash - HAVING COUNT(*) > 1 - ) as duplicate_groups - "# - ) - }; - - let rows = if user_role == crate::models::UserRole::Admin { - sqlx::query(docs_query) - .bind(limit) - .bind(offset) - .fetch_all(&self.pool) - .await? - } else { - sqlx::query(docs_query) - .bind(limit) - .bind(offset) - .bind(user_id) - .fetch_all(&self.pool) - .await? - }; - - let duplicates: Vec = rows - .into_iter() - .map(|row| { - serde_json::json!({ - "file_hash": row.get::("file_hash"), - "duplicate_count": row.get::("duplicate_count"), - "first_uploaded": row.get::, _>("first_uploaded"), - "last_uploaded": row.get::, _>("last_uploaded"), - "documents": row.get::("documents") - }) - }) - .collect(); - - let total = if user_role == crate::models::UserRole::Admin { - sqlx::query_scalar::<_, i64>(count_query) - .fetch_one(&self.pool) - .await? - } else { - sqlx::query_scalar::<_, i64>(count_query) - .bind(user_id) - .fetch_one(&self.pool) - .await? - }; - - Ok((duplicates, total)) - } - - pub async fn get_document_labels(&self, document_id: Uuid) -> Result> { - let labels = sqlx::query_as::<_, Label>( - r#" - SELECT - l.id, l.user_id, l.name, l.description, l.color, - l.background_color, l.icon, l.is_system, l.created_at, l.updated_at, - 0::bigint as document_count, 0::bigint as source_count - FROM labels l - INNER JOIN document_labels dl ON l.id = dl.label_id - WHERE dl.document_id = $1 - ORDER BY l.name - "# - ) - .bind(document_id) - .fetch_all(&self.pool) - .await?; - - Ok(labels) - } - - pub async fn get_labels_for_documents(&self, document_ids: &[Uuid]) -> Result>> { - if document_ids.is_empty() { - return Ok(std::collections::HashMap::new()); - } - - let rows = sqlx::query( - r#" - SELECT - dl.document_id, - l.id, l.user_id, l.name, l.description, l.color, - l.background_color, l.icon, l.is_system, l.created_at, l.updated_at - FROM labels l - INNER JOIN document_labels dl ON l.id = dl.label_id - WHERE dl.document_id = ANY($1) - ORDER BY dl.document_id, l.name - "# - ) - .bind(document_ids) - .fetch_all(&self.pool) - .await?; - - let mut labels_map: std::collections::HashMap> = std::collections::HashMap::new(); - - for row in rows { - let document_id: Uuid = row.get("document_id"); - let label = Label { - id: row.get("id"), - user_id: row.get("user_id"), - name: row.get("name"), - description: row.get("description"), - color: row.get("color"), - background_color: row.get("background_color"), - icon: row.get("icon"), - is_system: row.get("is_system"), - created_at: row.get("created_at"), - updated_at: row.get("updated_at"), - document_count: 0, - source_count: 0, - }; - - labels_map.entry(document_id).or_insert_with(Vec::new).push(label); - } - - Ok(labels_map) - } - - pub async fn delete_document(&self, document_id: Uuid, user_id: Uuid, user_role: crate::models::UserRole) -> Result> { - let document = if user_role == crate::models::UserRole::Admin { - let row = sqlx::query( - r#" - DELETE FROM documents - WHERE id = $1 - RETURNING id, filename, original_filename, file_path, file_size, mime_type, content, ocr_text, ocr_confidence, ocr_word_count, ocr_processing_time_ms, ocr_status, ocr_error, ocr_completed_at, ocr_retry_count, ocr_failure_reason, tags, created_at, updated_at, user_id, file_hash, original_created_at, original_modified_at, source_metadata - "#, - ) - .bind(document_id) - .fetch_optional(&self.pool) - .await?; - - row.map(|r| Document { - id: r.get("id"), - filename: r.get("filename"), - original_filename: r.get("original_filename"), - file_path: r.get("file_path"), - file_size: r.get("file_size"), - mime_type: r.get("mime_type"), - content: r.get("content"), - ocr_text: r.get("ocr_text"), - ocr_confidence: r.get("ocr_confidence"), - ocr_word_count: r.get("ocr_word_count"), - ocr_processing_time_ms: r.get("ocr_processing_time_ms"), - ocr_status: r.get("ocr_status"), - ocr_error: r.get("ocr_error"), - ocr_completed_at: r.get("ocr_completed_at"), - ocr_retry_count: r.get("ocr_retry_count"), - ocr_failure_reason: r.get("ocr_failure_reason"), - tags: r.get("tags"), - created_at: r.get("created_at"), - updated_at: r.get("updated_at"), - user_id: r.get("user_id"), - file_hash: r.get("file_hash"), - original_created_at: r.get("original_created_at"), - original_modified_at: r.get("original_modified_at"), - source_metadata: r.get("source_metadata"), - }) - } else { - let row = sqlx::query( - r#" - DELETE FROM documents - WHERE id = $1 AND user_id = $2 - RETURNING id, filename, original_filename, file_path, file_size, mime_type, content, ocr_text, ocr_confidence, ocr_word_count, ocr_processing_time_ms, ocr_status, ocr_error, ocr_completed_at, ocr_retry_count, ocr_failure_reason, tags, created_at, updated_at, user_id, file_hash, original_created_at, original_modified_at, source_metadata - "#, - ) - .bind(document_id) - .bind(user_id) - .fetch_optional(&self.pool) - .await?; - - row.map(|r| Document { - id: r.get("id"), - filename: r.get("filename"), - original_filename: r.get("original_filename"), - file_path: r.get("file_path"), - file_size: r.get("file_size"), - mime_type: r.get("mime_type"), - content: r.get("content"), - ocr_text: r.get("ocr_text"), - ocr_confidence: r.get("ocr_confidence"), - ocr_word_count: r.get("ocr_word_count"), - ocr_processing_time_ms: r.get("ocr_processing_time_ms"), - ocr_status: r.get("ocr_status"), - ocr_error: r.get("ocr_error"), - ocr_completed_at: r.get("ocr_completed_at"), - ocr_retry_count: r.get("ocr_retry_count"), - ocr_failure_reason: r.get("ocr_failure_reason"), - tags: r.get("tags"), - created_at: r.get("created_at"), - updated_at: r.get("updated_at"), - user_id: r.get("user_id"), - file_hash: r.get("file_hash"), - original_created_at: r.get("original_created_at"), - original_modified_at: r.get("original_modified_at"), - source_metadata: r.get("source_metadata"), - }) - }; - - Ok(document) - } - - pub async fn bulk_delete_documents(&self, document_ids: &[uuid::Uuid], user_id: uuid::Uuid, user_role: crate::models::UserRole) -> Result> { - if document_ids.is_empty() { - return Ok(Vec::new()); - } - - let deleted_documents = if user_role == crate::models::UserRole::Admin { - let rows = sqlx::query( - r#" - DELETE FROM documents - WHERE id = ANY($1) - RETURNING id, filename, original_filename, file_path, file_size, mime_type, content, ocr_text, ocr_confidence, ocr_word_count, ocr_processing_time_ms, ocr_status, ocr_error, ocr_completed_at, ocr_retry_count, ocr_failure_reason, tags, created_at, updated_at, user_id, file_hash, original_created_at, original_modified_at, source_metadata - "#, - ) - .bind(document_ids) - .fetch_all(&self.pool) - .await?; - - rows.into_iter().map(|r| Document { - id: r.get("id"), - filename: r.get("filename"), - original_filename: r.get("original_filename"), - file_path: r.get("file_path"), - file_size: r.get("file_size"), - mime_type: r.get("mime_type"), - content: r.get("content"), - ocr_text: r.get("ocr_text"), - ocr_confidence: r.get("ocr_confidence"), - ocr_word_count: r.get("ocr_word_count"), - ocr_processing_time_ms: r.get("ocr_processing_time_ms"), - ocr_status: r.get("ocr_status"), - ocr_error: r.get("ocr_error"), - ocr_completed_at: r.get("ocr_completed_at"), - ocr_retry_count: r.get("ocr_retry_count"), - ocr_failure_reason: r.get("ocr_failure_reason"), - tags: r.get("tags"), - created_at: r.get("created_at"), - updated_at: r.get("updated_at"), - user_id: r.get("user_id"), - file_hash: r.get("file_hash"), - original_created_at: r.get("original_created_at"), - original_modified_at: r.get("original_modified_at"), - source_metadata: r.get("source_metadata"), - }).collect() - } else { - let rows = sqlx::query( - r#" - DELETE FROM documents - WHERE id = ANY($1) AND user_id = $2 - RETURNING id, filename, original_filename, file_path, file_size, mime_type, content, ocr_text, ocr_confidence, ocr_word_count, ocr_processing_time_ms, ocr_status, ocr_error, ocr_completed_at, ocr_retry_count, ocr_failure_reason, tags, created_at, updated_at, user_id, file_hash, original_created_at, original_modified_at, source_metadata - "#, - ) - .bind(document_ids) - .bind(user_id) - .fetch_all(&self.pool) - .await?; - - rows.into_iter().map(|r| Document { - id: r.get("id"), - filename: r.get("filename"), - original_filename: r.get("original_filename"), - file_path: r.get("file_path"), - file_size: r.get("file_size"), - mime_type: r.get("mime_type"), - content: r.get("content"), - ocr_text: r.get("ocr_text"), - ocr_confidence: r.get("ocr_confidence"), - ocr_word_count: r.get("ocr_word_count"), - ocr_processing_time_ms: r.get("ocr_processing_time_ms"), - ocr_status: r.get("ocr_status"), - ocr_error: r.get("ocr_error"), - ocr_completed_at: r.get("ocr_completed_at"), - ocr_retry_count: r.get("ocr_retry_count"), - ocr_failure_reason: r.get("ocr_failure_reason"), - tags: r.get("tags"), - created_at: r.get("created_at"), - updated_at: r.get("updated_at"), - user_id: r.get("user_id"), - file_hash: r.get("file_hash"), - original_created_at: r.get("original_created_at"), - original_modified_at: r.get("original_modified_at"), - source_metadata: r.get("source_metadata"), - }).collect() - }; - - Ok(deleted_documents) - } - - - pub async fn find_documents_by_confidence_threshold(&self, max_confidence: f32, user_id: uuid::Uuid, user_role: crate::models::UserRole) -> Result> { - let documents = if user_role == crate::models::UserRole::Admin { - let rows = sqlx::query( - r#" - SELECT id, filename, original_filename, file_path, file_size, mime_type, content, ocr_text, ocr_confidence, ocr_word_count, ocr_processing_time_ms, ocr_status, ocr_error, ocr_completed_at, ocr_retry_count, ocr_failure_reason, tags, created_at, updated_at, user_id, file_hash, original_created_at, original_modified_at, source_metadata - FROM documents - WHERE ocr_confidence IS NOT NULL AND ocr_confidence < $1 - ORDER BY ocr_confidence ASC, created_at DESC - "#, - ) - .bind(max_confidence) - .fetch_all(&self.pool) - .await?; - - rows.into_iter().map(|r| Document { - id: r.get("id"), - filename: r.get("filename"), - original_filename: r.get("original_filename"), - file_path: r.get("file_path"), - file_size: r.get("file_size"), - mime_type: r.get("mime_type"), - content: r.get("content"), - ocr_text: r.get("ocr_text"), - ocr_confidence: r.get("ocr_confidence"), - ocr_word_count: r.get("ocr_word_count"), - ocr_processing_time_ms: r.get("ocr_processing_time_ms"), - ocr_status: r.get("ocr_status"), - ocr_error: r.get("ocr_error"), - ocr_completed_at: r.get("ocr_completed_at"), - ocr_retry_count: r.get("ocr_retry_count"), - ocr_failure_reason: r.get("ocr_failure_reason"), - tags: r.get("tags"), - created_at: r.get("created_at"), - updated_at: r.get("updated_at"), - user_id: r.get("user_id"), - file_hash: r.get("file_hash"), - original_created_at: r.get("original_created_at"), - original_modified_at: r.get("original_modified_at"), - source_metadata: r.get("source_metadata"), - }).collect() - } else { - let rows = sqlx::query( - r#" - SELECT id, filename, original_filename, file_path, file_size, mime_type, content, ocr_text, ocr_confidence, ocr_word_count, ocr_processing_time_ms, ocr_status, ocr_error, ocr_completed_at, ocr_retry_count, ocr_failure_reason, tags, created_at, updated_at, user_id, file_hash, original_created_at, original_modified_at, source_metadata - FROM documents - WHERE ocr_confidence IS NOT NULL AND ocr_confidence < $1 AND user_id = $2 - ORDER BY ocr_confidence ASC, created_at DESC - "#, - ) - .bind(max_confidence) - .bind(user_id) - .fetch_all(&self.pool) - .await?; - - rows.into_iter().map(|r| Document { - id: r.get("id"), - filename: r.get("filename"), - original_filename: r.get("original_filename"), - file_path: r.get("file_path"), - file_size: r.get("file_size"), - mime_type: r.get("mime_type"), - content: r.get("content"), - ocr_text: r.get("ocr_text"), - ocr_confidence: r.get("ocr_confidence"), - ocr_word_count: r.get("ocr_word_count"), - ocr_processing_time_ms: r.get("ocr_processing_time_ms"), - ocr_status: r.get("ocr_status"), - ocr_error: r.get("ocr_error"), - ocr_completed_at: r.get("ocr_completed_at"), - ocr_retry_count: r.get("ocr_retry_count"), - ocr_failure_reason: r.get("ocr_failure_reason"), - tags: r.get("tags"), - created_at: r.get("created_at"), - updated_at: r.get("updated_at"), - user_id: r.get("user_id"), - file_hash: r.get("file_hash"), - original_created_at: r.get("original_created_at"), - original_modified_at: r.get("original_modified_at"), - source_metadata: r.get("source_metadata"), - }).collect() - }; - - Ok(documents) - } - - /// Find documents with failed OCR processing - pub async fn find_failed_ocr_documents(&self, user_id: uuid::Uuid, user_role: crate::models::UserRole) -> Result> { - let documents = if user_role == crate::models::UserRole::Admin { - let rows = sqlx::query( - r#" - SELECT id, filename, original_filename, file_path, file_size, mime_type, content, ocr_text, ocr_confidence, ocr_word_count, ocr_processing_time_ms, ocr_status, ocr_error, ocr_completed_at, ocr_retry_count, ocr_failure_reason, tags, created_at, updated_at, user_id, file_hash, original_created_at, original_modified_at, source_metadata - FROM documents - WHERE ocr_status = 'failed' OR (ocr_confidence IS NULL AND ocr_status != 'pending' AND ocr_status != 'processing') - ORDER BY created_at DESC - "#, - ) - .fetch_all(&self.pool) - .await?; - - rows.into_iter().map(|r| Document { - id: r.get("id"), - filename: r.get("filename"), - original_filename: r.get("original_filename"), - file_path: r.get("file_path"), - file_size: r.get("file_size"), - mime_type: r.get("mime_type"), - content: r.get("content"), - ocr_text: r.get("ocr_text"), - ocr_confidence: r.get("ocr_confidence"), - ocr_word_count: r.get("ocr_word_count"), - ocr_processing_time_ms: r.get("ocr_processing_time_ms"), - ocr_status: r.get("ocr_status"), - ocr_error: r.get("ocr_error"), - ocr_completed_at: r.get("ocr_completed_at"), - ocr_retry_count: r.get("ocr_retry_count"), - ocr_failure_reason: r.get("ocr_failure_reason"), - tags: r.get("tags"), - created_at: r.get("created_at"), - updated_at: r.get("updated_at"), - user_id: r.get("user_id"), - file_hash: r.get("file_hash"), - original_created_at: r.get("original_created_at"), - original_modified_at: r.get("original_modified_at"), - source_metadata: r.get("source_metadata"), - }).collect() - } else { - let rows = sqlx::query( - r#" - SELECT id, filename, original_filename, file_path, file_size, mime_type, content, ocr_text, ocr_confidence, ocr_word_count, ocr_processing_time_ms, ocr_status, ocr_error, ocr_completed_at, ocr_retry_count, ocr_failure_reason, tags, created_at, updated_at, user_id, file_hash, original_created_at, original_modified_at, source_metadata - FROM documents - WHERE (ocr_status = 'failed' OR (ocr_confidence IS NULL AND ocr_status != 'pending' AND ocr_status != 'processing')) AND user_id = $1 - ORDER BY created_at DESC - "#, - ) - .bind(user_id) - .fetch_all(&self.pool) - .await?; - - rows.into_iter().map(|r| Document { - id: r.get("id"), - filename: r.get("filename"), - original_filename: r.get("original_filename"), - file_path: r.get("file_path"), - file_size: r.get("file_size"), - mime_type: r.get("mime_type"), - content: r.get("content"), - ocr_text: r.get("ocr_text"), - ocr_confidence: r.get("ocr_confidence"), - ocr_word_count: r.get("ocr_word_count"), - ocr_processing_time_ms: r.get("ocr_processing_time_ms"), - ocr_status: r.get("ocr_status"), - ocr_error: r.get("ocr_error"), - ocr_completed_at: r.get("ocr_completed_at"), - ocr_retry_count: r.get("ocr_retry_count"), - ocr_failure_reason: r.get("ocr_failure_reason"), - tags: r.get("tags"), - created_at: r.get("created_at"), - updated_at: r.get("updated_at"), - user_id: r.get("user_id"), - file_hash: r.get("file_hash"), - original_created_at: r.get("original_created_at"), - original_modified_at: r.get("original_modified_at"), - source_metadata: r.get("source_metadata"), - }).collect() - }; - - Ok(documents) - } - - /// Find documents with low confidence or failed OCR (combined) - pub async fn find_low_confidence_and_failed_documents(&self, max_confidence: f32, user_id: uuid::Uuid, user_role: crate::models::UserRole) -> Result> { - let documents = if user_role == crate::models::UserRole::Admin { - let rows = sqlx::query( - r#" - SELECT id, filename, original_filename, file_path, file_size, mime_type, content, ocr_text, ocr_confidence, ocr_word_count, ocr_processing_time_ms, ocr_status, ocr_error, ocr_completed_at, ocr_retry_count, ocr_failure_reason, tags, created_at, updated_at, user_id, file_hash, original_created_at, original_modified_at, source_metadata - FROM documents - WHERE (ocr_confidence IS NOT NULL AND ocr_confidence < $1) - OR ocr_status = 'failed' - ORDER BY - CASE WHEN ocr_confidence IS NOT NULL THEN ocr_confidence ELSE -1 END ASC, - created_at DESC - "#, - ) - .bind(max_confidence) - .fetch_all(&self.pool) - .await?; - - rows.into_iter().map(|r| Document { - id: r.get("id"), - filename: r.get("filename"), - original_filename: r.get("original_filename"), - file_path: r.get("file_path"), - file_size: r.get("file_size"), - mime_type: r.get("mime_type"), - content: r.get("content"), - ocr_text: r.get("ocr_text"), - ocr_confidence: r.get("ocr_confidence"), - ocr_word_count: r.get("ocr_word_count"), - ocr_processing_time_ms: r.get("ocr_processing_time_ms"), - ocr_status: r.get("ocr_status"), - ocr_error: r.get("ocr_error"), - ocr_completed_at: r.get("ocr_completed_at"), - ocr_retry_count: r.get("ocr_retry_count"), - ocr_failure_reason: r.get("ocr_failure_reason"), - tags: r.get("tags"), - created_at: r.get("created_at"), - updated_at: r.get("updated_at"), - user_id: r.get("user_id"), - file_hash: r.get("file_hash"), - original_created_at: r.get("original_created_at"), - original_modified_at: r.get("original_modified_at"), - source_metadata: r.get("source_metadata"), - }).collect() - } else { - let rows = sqlx::query( - r#" - SELECT id, filename, original_filename, file_path, file_size, mime_type, content, ocr_text, ocr_confidence, ocr_word_count, ocr_processing_time_ms, ocr_status, ocr_error, ocr_completed_at, ocr_retry_count, ocr_failure_reason, tags, created_at, updated_at, user_id, file_hash, original_created_at, original_modified_at, source_metadata - FROM documents - WHERE ((ocr_confidence IS NOT NULL AND ocr_confidence < $1) - OR ocr_status = 'failed') - AND user_id = $2 - ORDER BY - CASE WHEN ocr_confidence IS NOT NULL THEN ocr_confidence ELSE -1 END ASC, - created_at DESC - "#, - ) - .bind(max_confidence) - .bind(user_id) - .fetch_all(&self.pool) - .await?; - - rows.into_iter().map(|r| Document { - id: r.get("id"), - filename: r.get("filename"), - original_filename: r.get("original_filename"), - file_path: r.get("file_path"), - file_size: r.get("file_size"), - mime_type: r.get("mime_type"), - content: r.get("content"), - ocr_text: r.get("ocr_text"), - ocr_confidence: r.get("ocr_confidence"), - ocr_word_count: r.get("ocr_word_count"), - ocr_processing_time_ms: r.get("ocr_processing_time_ms"), - ocr_status: r.get("ocr_status"), - ocr_error: r.get("ocr_error"), - ocr_completed_at: r.get("ocr_completed_at"), - ocr_retry_count: r.get("ocr_retry_count"), - ocr_failure_reason: r.get("ocr_failure_reason"), - tags: r.get("tags"), - created_at: r.get("created_at"), - updated_at: r.get("updated_at"), - user_id: r.get("user_id"), - file_hash: r.get("file_hash"), - original_created_at: r.get("original_created_at"), - original_modified_at: r.get("original_modified_at"), - source_metadata: r.get("source_metadata"), - }).collect() - }; - - Ok(documents) - } - - pub async fn count_documents_for_source(&self, source_id: Uuid) -> Result<(i64, i64)> { - let row = sqlx::query( - r#" - SELECT - COUNT(*) as total_documents, - COUNT(CASE WHEN ocr_status = 'completed' AND ocr_text IS NOT NULL THEN 1 END) as total_documents_ocr - FROM documents - WHERE source_id = $1 - "# - ) - .bind(source_id) - .fetch_one(&self.pool) - .await?; - - let total_documents: i64 = row.get("total_documents"); - let total_documents_ocr: i64 = row.get("total_documents_ocr"); - - Ok((total_documents, total_documents_ocr)) - } - - pub async fn count_documents_for_sources(&self, source_ids: &[Uuid]) -> Result> { - if source_ids.is_empty() { - return Ok(vec![]); - } - - let query = format!( - r#" - SELECT - source_id, - COUNT(*) as total_documents, - COUNT(CASE WHEN ocr_status = 'completed' AND ocr_text IS NOT NULL THEN 1 END) as total_documents_ocr - FROM documents - WHERE source_id = ANY($1) - GROUP BY source_id - "# - ); - - let rows = sqlx::query(&query) - .bind(source_ids) - .fetch_all(&self.pool) - .await?; - - let results = rows - .into_iter() - .map(|row| { - let source_id: Uuid = row.get("source_id"); - let total_documents: i64 = row.get("total_documents"); - let total_documents_ocr: i64 = row.get("total_documents_ocr"); - (source_id, total_documents, total_documents_ocr) - }) - .collect(); - - Ok(results) - } - - /// Create a failed document record - pub async fn create_failed_document( - &self, - user_id: Uuid, - filename: String, - original_filename: Option, - original_path: Option, - file_path: Option, - file_size: Option, - file_hash: Option, - mime_type: Option, - content: Option, - tags: Vec, - ocr_text: Option, - ocr_confidence: Option, - ocr_word_count: Option, - ocr_processing_time_ms: Option, - failure_reason: String, - failure_stage: String, - existing_document_id: Option, - ingestion_source: String, - error_message: Option, - retry_count: Option, - ) -> Result { - let id = Uuid::new_v4(); - - sqlx::query( - r#" - INSERT INTO failed_documents ( - id, user_id, filename, original_filename, original_path, file_path, - file_size, file_hash, mime_type, content, tags, ocr_text, - ocr_confidence, ocr_word_count, ocr_processing_time_ms, - failure_reason, failure_stage, existing_document_id, - ingestion_source, error_message, retry_count, created_at, updated_at - ) VALUES ( - $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, - $16, $17, $18, $19, $20, $21, NOW(), NOW() - ) - "# - ) - .bind(id) - .bind(user_id) - .bind(&filename) - .bind(&original_filename) - .bind(&original_path) - .bind(&file_path) - .bind(file_size) - .bind(&file_hash) - .bind(&mime_type) - .bind(&content) - .bind(&tags) - .bind(&ocr_text) - .bind(ocr_confidence) - .bind(ocr_word_count) - .bind(ocr_processing_time_ms) - .bind(&failure_reason) - .bind(&failure_stage) - .bind(existing_document_id) - .bind(&ingestion_source) - .bind(&error_message) - .bind(retry_count) - .execute(&self.pool) - .await?; - - Ok(id) - } - - /// Create a failed document from an existing document that failed OCR - pub async fn create_failed_document_from_document( - &self, - document: &Document, - failure_reason: String, - error_message: Option, - retry_count: Option, - ) -> Result { - self.create_failed_document( - document.user_id, // user_id is required in Document struct - document.filename.clone(), - Some(document.original_filename.clone()), - None, // original_path - not available in Document model - Some(document.file_path.clone()), - Some(document.file_size), - document.file_hash.clone(), - Some(document.mime_type.clone()), - document.content.clone(), - document.tags.clone(), - document.ocr_text.clone(), - document.ocr_confidence, - document.ocr_word_count, - document.ocr_processing_time_ms, - failure_reason, - "ocr".to_string(), // OCR failure stage - None, // existing_document_id - "unknown".to_string(), // Default ingestion source - would need to be passed in for better tracking - error_message, - retry_count, - ).await - } -} \ No newline at end of file diff --git a/src/db/documents/crud.rs b/src/db/documents/crud.rs new file mode 100644 index 0000000..e48b8a8 --- /dev/null +++ b/src/db/documents/crud.rs @@ -0,0 +1,198 @@ +use anyhow::Result; +use sqlx::{QueryBuilder, Postgres}; +use uuid::Uuid; + +use crate::models::{Document, UserRole}; +use super::helpers::{map_row_to_document, apply_role_based_filter, apply_pagination, DOCUMENT_FIELDS}; +use crate::db::Database; + +impl Database { + /// Creates a new document in the database + pub async fn create_document(&self, document: Document) -> Result { + let query_str = format!( + r#" + INSERT INTO documents (id, filename, original_filename, file_path, file_size, mime_type, content, ocr_text, ocr_confidence, ocr_word_count, ocr_processing_time_ms, ocr_status, ocr_error, ocr_completed_at, ocr_retry_count, ocr_failure_reason, tags, created_at, updated_at, user_id, file_hash, original_created_at, original_modified_at, source_metadata) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24) + RETURNING {} + "#, + DOCUMENT_FIELDS + ); + + let row = sqlx::query(&query_str) + .bind(document.id) + .bind(&document.filename) + .bind(&document.original_filename) + .bind(&document.file_path) + .bind(document.file_size) + .bind(&document.mime_type) + .bind(&document.content) + .bind(&document.ocr_text) + .bind(document.ocr_confidence) + .bind(document.ocr_word_count) + .bind(document.ocr_processing_time_ms) + .bind(&document.ocr_status) + .bind(&document.ocr_error) + .bind(document.ocr_completed_at) + .bind(document.ocr_retry_count) + .bind(&document.ocr_failure_reason) + .bind(&document.tags) + .bind(document.created_at) + .bind(document.updated_at) + .bind(document.user_id) + .bind(&document.file_hash) + .bind(document.original_created_at) + .bind(document.original_modified_at) + .bind(&document.source_metadata) + .fetch_one(&self.pool) + .await?; + + Ok(map_row_to_document(&row)) + } + + /// Retrieves a document by ID with role-based access control + pub async fn get_document_by_id(&self, document_id: Uuid, user_id: Uuid, user_role: UserRole) -> Result> { + let mut query = QueryBuilder::::new("SELECT "); + query.push(DOCUMENT_FIELDS); + query.push(" FROM documents WHERE id = "); + query.push_bind(document_id); + + apply_role_based_filter(&mut query, user_id, user_role); + + let row = query + .build() + .fetch_optional(&self.pool) + .await?; + + Ok(row.map(|r| map_row_to_document(&r))) + } + + /// Gets documents for a user with role-based access and pagination + pub async fn get_documents_by_user(&self, user_id: Uuid, limit: i64, offset: i64) -> Result> { + let query_str = format!( + r#" + SELECT {} + FROM documents + WHERE user_id = $1 + ORDER BY created_at DESC + LIMIT $2 OFFSET $3 + "#, + DOCUMENT_FIELDS + ); + + let rows = sqlx::query(&query_str) + .bind(user_id) + .bind(limit) + .bind(offset) + .fetch_all(&self.pool) + .await?; + + Ok(rows.iter().map(map_row_to_document).collect()) + } + + /// Gets documents with role-based access control + pub async fn get_documents_by_user_with_role(&self, user_id: Uuid, user_role: UserRole, limit: i64, offset: i64) -> Result> { + let mut query = QueryBuilder::::new("SELECT "); + query.push(DOCUMENT_FIELDS); + query.push(" FROM documents WHERE 1=1"); + + apply_role_based_filter(&mut query, user_id, user_role); + query.push(" ORDER BY created_at DESC"); + apply_pagination(&mut query, limit, offset); + + let rows = query + .build() + .fetch_all(&self.pool) + .await?; + + Ok(rows.iter().map(map_row_to_document).collect()) + } + + /// Finds a document by user and file hash (for duplicate detection) + pub async fn get_document_by_user_and_hash(&self, user_id: Uuid, file_hash: &str) -> Result> { + let query_str = format!( + r#" + SELECT {} + FROM documents + WHERE user_id = $1 AND file_hash = $2 + "#, + DOCUMENT_FIELDS + ); + + let row = sqlx::query(&query_str) + .bind(user_id) + .bind(file_hash) + .fetch_optional(&self.pool) + .await?; + + Ok(row.map(|r| map_row_to_document(&r))) + } + + /// Finds documents by filename or original filename + pub async fn find_documents_by_filename(&self, user_id: Uuid, filename: &str, limit: i64, offset: i64) -> Result> { + let query_str = format!( + r#" + SELECT {} + FROM documents + WHERE user_id = $1 AND (filename ILIKE $2 OR original_filename ILIKE $2) + ORDER BY created_at DESC + LIMIT $3 OFFSET $4 + "#, + DOCUMENT_FIELDS + ); + + let search_pattern = format!("%{}%", filename); + let rows = sqlx::query(&query_str) + .bind(user_id) + .bind(search_pattern) + .bind(limit) + .bind(offset) + .fetch_all(&self.pool) + .await?; + + Ok(rows.iter().map(map_row_to_document).collect()) + } + + /// Updates the OCR text for a document + pub async fn update_document_ocr(&self, document_id: Uuid, ocr_text: Option, ocr_confidence: Option, ocr_word_count: Option, ocr_processing_time_ms: Option, ocr_status: Option) -> Result<()> { + sqlx::query( + r#" + UPDATE documents + SET ocr_text = $2, ocr_confidence = $3, ocr_word_count = $4, ocr_processing_time_ms = $5, ocr_status = $6, updated_at = NOW() + WHERE id = $1 + "# + ) + .bind(document_id) + .bind(ocr_text) + .bind(ocr_confidence) + .bind(ocr_word_count) + .bind(ocr_processing_time_ms) + .bind(ocr_status) + .execute(&self.pool) + .await?; + + Ok(()) + } + + /// Gets recent documents for a specific source + pub async fn get_recent_documents_for_source(&self, user_id: Uuid, source_id: Uuid, limit: i64) -> Result> { + let query_str = format!( + r#" + SELECT {} + FROM documents + WHERE user_id = $1 AND source_metadata->>'source_id' = $2 + ORDER BY created_at DESC + LIMIT $3 + "#, + DOCUMENT_FIELDS + ); + + let rows = sqlx::query(&query_str) + .bind(user_id) + .bind(source_id.to_string()) + .bind(limit) + .fetch_all(&self.pool) + .await?; + + Ok(rows.iter().map(map_row_to_document).collect()) + } +} \ No newline at end of file diff --git a/src/db/documents/helpers.rs b/src/db/documents/helpers.rs new file mode 100644 index 0000000..63e9eee --- /dev/null +++ b/src/db/documents/helpers.rs @@ -0,0 +1,98 @@ +use anyhow::Result; +use sqlx::{Row, QueryBuilder, Postgres}; +use uuid::Uuid; + +use crate::models::{Document, UserRole}; + +/// Standard document fields for SELECT queries +pub const DOCUMENT_FIELDS: &str = r#" + id, filename, original_filename, file_path, file_size, mime_type, + content, ocr_text, ocr_confidence, ocr_word_count, ocr_processing_time_ms, + ocr_status, ocr_error, ocr_completed_at, ocr_retry_count, ocr_failure_reason, + tags, created_at, updated_at, user_id, file_hash, original_created_at, + original_modified_at, source_metadata +"#; + +/// Maps a database row to a Document struct +/// This eliminates the ~15+ instances of duplicate row mapping code +pub fn map_row_to_document(row: &sqlx::postgres::PgRow) -> Document { + Document { + id: row.get("id"), + filename: row.get("filename"), + original_filename: row.get("original_filename"), + file_path: row.get("file_path"), + file_size: row.get("file_size"), + mime_type: row.get("mime_type"), + content: row.get("content"), + ocr_text: row.get("ocr_text"), + ocr_confidence: row.get("ocr_confidence"), + ocr_word_count: row.get("ocr_word_count"), + ocr_processing_time_ms: row.get("ocr_processing_time_ms"), + ocr_status: row.get("ocr_status"), + ocr_error: row.get("ocr_error"), + ocr_completed_at: row.get("ocr_completed_at"), + ocr_retry_count: row.get("ocr_retry_count"), + ocr_failure_reason: row.get("ocr_failure_reason"), + tags: row.get("tags"), + created_at: row.get("created_at"), + updated_at: row.get("updated_at"), + user_id: row.get("user_id"), + file_hash: row.get("file_hash"), + original_created_at: row.get("original_created_at"), + original_modified_at: row.get("original_modified_at"), + source_metadata: row.get("source_metadata"), + } +} + +/// Applies role-based filtering to a query builder +/// Admins can see all documents, regular users only see their own +pub fn apply_role_based_filter( + query: &mut QueryBuilder, + user_id: Uuid, + role: UserRole +) { + match role { + UserRole::Admin => { + // Admins can see all documents - no additional filter needed + } + UserRole::User => { + query.push(" AND user_id = "); + query.push_bind(user_id); + } + } +} + +/// Applies pagination to a query builder +pub fn apply_pagination(query: &mut QueryBuilder, limit: i64, offset: i64) { + query.push(" LIMIT "); + query.push_bind(limit); + query.push(" OFFSET "); + query.push_bind(offset); +} + +/// Helper to determine if a character is a word boundary for snippet generation +pub fn is_word_boundary(c: char) -> bool { + c.is_whitespace() || c.is_ascii_punctuation() +} + +/// Finds word boundary for snippet generation +pub fn find_word_boundary(text: &str, position: usize, search_forward: bool) -> usize { + let chars: Vec = text.chars().collect(); + let start_pos = if position >= chars.len() { chars.len() - 1 } else { position }; + + if search_forward { + for i in start_pos..chars.len() { + if is_word_boundary(chars[i]) { + return text.char_indices().nth(i).map(|(idx, _)| idx).unwrap_or(text.len()); + } + } + text.len() + } else { + for i in (0..=start_pos).rev() { + if is_word_boundary(chars[i]) { + return text.char_indices().nth(i).map(|(idx, _)| idx).unwrap_or(0); + } + } + 0 + } +} \ No newline at end of file diff --git a/src/db/documents/management.rs b/src/db/documents/management.rs new file mode 100644 index 0000000..3a80137 --- /dev/null +++ b/src/db/documents/management.rs @@ -0,0 +1,304 @@ +use anyhow::Result; +use sqlx::{QueryBuilder, Postgres}; +use uuid::Uuid; + +use crate::models::{Document, UserRole, FacetItem}; +use crate::routes::labels::Label; +use super::helpers::{map_row_to_document, apply_role_based_filter, DOCUMENT_FIELDS}; +use crate::db::Database; + +impl Database { + /// Gets labels for a specific document + pub async fn get_document_labels(&self, document_id: Uuid) -> Result> { + let rows = sqlx::query_as::<_, Label>( + r#" + SELECT l.id, l.user_id, l.name, l.color, l.created_at, l.updated_at + FROM labels l + JOIN document_labels dl ON l.id = dl.label_id + WHERE dl.document_id = $1 + ORDER BY l.name + "# + ) + .bind(document_id) + .fetch_all(&self.pool) + .await?; + + Ok(rows) + } + + /// Gets labels for multiple documents in batch + pub async fn get_labels_for_documents(&self, document_ids: &[Uuid]) -> Result)>> { + if document_ids.is_empty() { + return Ok(Vec::new()); + } + + let rows = sqlx::query( + r#" + SELECT dl.document_id, l.id as label_id, l.user_id, l.name, l.color, l.created_at, l.updated_at + FROM labels l + JOIN document_labels dl ON l.id = dl.label_id + WHERE dl.document_id = ANY($1) + ORDER BY dl.document_id, l.name + "# + ) + .bind(document_ids) + .fetch_all(&self.pool) + .await?; + + let mut result = Vec::new(); + let mut current_doc_id: Option = None; + let mut current_labels = Vec::new(); + + for row in rows { + let doc_id: Uuid = row.get("document_id"); + let label = Label { + id: row.get("label_id"), + user_id: row.get("user_id"), + name: row.get("name"), + color: row.get("color"), + created_at: row.get("created_at"), + updated_at: row.get("updated_at"), + }; + + if Some(doc_id) != current_doc_id { + if let Some(prev_doc_id) = current_doc_id { + result.push((prev_doc_id, std::mem::take(&mut current_labels))); + } + current_doc_id = Some(doc_id); + } + + current_labels.push(label); + } + + if let Some(doc_id) = current_doc_id { + result.push((doc_id, current_labels)); + } + + Ok(result) + } + + /// Finds duplicate documents by file hash for a user + pub async fn get_user_duplicates(&self, user_id: Uuid, user_role: UserRole, limit: i64, offset: i64) -> Result>> { + let mut query = QueryBuilder::::new( + r#" + WITH duplicate_hashes AS ( + SELECT file_hash, COUNT(*) as count + FROM documents + WHERE file_hash IS NOT NULL + "# + ); + + if user_role != UserRole::Admin { + query.push(" AND user_id = "); + query.push_bind(user_id); + } + + query.push( + r#" + GROUP BY file_hash + HAVING COUNT(*) > 1 + ) + SELECT d.* + FROM documents d + JOIN duplicate_hashes dh ON d.file_hash = dh.file_hash + WHERE d.file_hash IS NOT NULL + "# + ); + + if user_role != UserRole::Admin { + query.push(" AND d.user_id = "); + query.push_bind(user_id); + } + + query.push(" ORDER BY d.file_hash, d.created_at"); + + let rows = query.build().fetch_all(&self.pool).await?; + let documents: Vec = rows.iter().map(map_row_to_document).collect(); + + // Group documents by file hash + let mut duplicate_groups = Vec::new(); + let mut current_group = Vec::new(); + let mut current_hash: Option = None; + + for document in documents { + if document.file_hash != current_hash { + if !current_group.is_empty() { + duplicate_groups.push(std::mem::take(&mut current_group)); + } + current_hash = document.file_hash.clone(); + } + current_group.push(document); + } + + if !current_group.is_empty() { + duplicate_groups.push(current_group); + } + + // Apply pagination to groups + let start = offset as usize; + let end = (offset + limit) as usize; + Ok(duplicate_groups.into_iter().skip(start).take(end - start).collect()) + } + + /// Gets MIME type facets (aggregated counts by MIME type) + pub async fn get_mime_type_facets(&self, user_id: Uuid, user_role: UserRole) -> Result> { + let mut query = QueryBuilder::::new( + "SELECT mime_type as value, COUNT(*) as count FROM documents WHERE 1=1" + ); + + apply_role_based_filter(&mut query, user_id, user_role); + query.push(" GROUP BY mime_type ORDER BY count DESC, mime_type"); + + let rows = query.build().fetch_all(&self.pool).await?; + + Ok(rows.into_iter().map(|row| FacetItem { + value: row.get("value"), + count: row.get("count"), + }).collect()) + } + + /// Gets tag facets (aggregated counts by tag) + pub async fn get_tag_facets(&self, user_id: Uuid, user_role: UserRole) -> Result> { + let mut query = QueryBuilder::::new( + "SELECT unnest(tags) as value, COUNT(*) as count FROM documents WHERE 1=1" + ); + + apply_role_based_filter(&mut query, user_id, user_role); + query.push(" GROUP BY unnest(tags) ORDER BY count DESC, value"); + + let rows = query.build().fetch_all(&self.pool).await?; + + Ok(rows.into_iter().map(|row| FacetItem { + value: row.get("value"), + count: row.get("count"), + }).collect()) + } + + /// Counts documents for a specific source + pub async fn count_documents_for_source(&self, user_id: Uuid, source_id: Uuid) -> Result<(i64, i64)> { + let row = sqlx::query( + r#" + SELECT + COUNT(*) as total_documents, + COUNT(CASE WHEN ocr_text IS NOT NULL THEN 1 END) as total_documents_ocr + FROM documents + WHERE user_id = $1 AND source_metadata->>'source_id' = $2 + "# + ) + .bind(user_id) + .bind(source_id.to_string()) + .fetch_one(&self.pool) + .await?; + + Ok((row.get("total_documents"), row.get("total_documents_ocr"))) + } + + /// Counts documents for multiple sources in batch + pub async fn count_documents_for_sources(&self, user_id: Uuid, source_ids: &[Uuid]) -> Result> { + if source_ids.is_empty() { + return Ok(Vec::new()); + } + + let source_id_strings: Vec = source_ids.iter().map(|id| id.to_string()).collect(); + + let rows = sqlx::query( + r#" + SELECT + source_metadata->>'source_id' as source_id_str, + COUNT(*) as total_documents, + COUNT(CASE WHEN ocr_text IS NOT NULL THEN 1 END) as total_documents_ocr + FROM documents + WHERE user_id = $1 AND source_metadata->>'source_id' = ANY($2) + GROUP BY source_metadata->>'source_id' + "# + ) + .bind(user_id) + .bind(&source_id_strings) + .fetch_all(&self.pool) + .await?; + + Ok(rows.into_iter().map(|row| { + let source_id_str: String = row.get("source_id_str"); + let source_id = Uuid::parse_str(&source_id_str).unwrap_or_default(); + let total_documents: i64 = row.get("total_documents"); + let total_documents_ocr: i64 = row.get("total_documents_ocr"); + (source_id, total_documents, total_documents_ocr) + }).collect()) + } + + /// Gets documents by user with role-based access and OCR status filtering + pub async fn get_documents_by_user_with_role_and_filter( + &self, + user_id: Uuid, + user_role: UserRole, + ocr_status: Option<&str>, + limit: i64, + offset: i64 + ) -> Result> { + let mut query = QueryBuilder::::new("SELECT "); + query.push(DOCUMENT_FIELDS); + query.push(" FROM documents WHERE 1=1"); + + apply_role_based_filter(&mut query, user_id, user_role); + + if let Some(status) = ocr_status { + match status { + "pending" => { + query.push(" AND (ocr_status IS NULL OR ocr_status = 'pending')"); + } + "completed" => { + query.push(" AND ocr_status = 'completed'"); + } + "failed" => { + query.push(" AND ocr_status = 'failed'"); + } + _ => { + query.push(" AND ocr_status = "); + query.push_bind(status); + } + } + } + + query.push(" ORDER BY created_at DESC"); + query.push(" LIMIT "); + query.push_bind(limit); + query.push(" OFFSET "); + query.push_bind(offset); + + let rows = query.build().fetch_all(&self.pool).await?; + Ok(rows.iter().map(map_row_to_document).collect()) + } + + /// Counts documents with role-based access and OCR status filtering + pub async fn get_documents_count_with_role_and_filter( + &self, + user_id: Uuid, + user_role: UserRole, + ocr_status: Option<&str> + ) -> Result { + let mut query = QueryBuilder::::new("SELECT COUNT(*) FROM documents WHERE 1=1"); + + apply_role_based_filter(&mut query, user_id, user_role); + + if let Some(status) = ocr_status { + match status { + "pending" => { + query.push(" AND (ocr_status IS NULL OR ocr_status = 'pending')"); + } + "completed" => { + query.push(" AND ocr_status = 'completed'"); + } + "failed" => { + query.push(" AND ocr_status = 'failed'"); + } + _ => { + query.push(" AND ocr_status = "); + query.push_bind(status); + } + } + } + + let row = query.build().fetch_one(&self.pool).await?; + Ok(row.get(0)) + } +} \ No newline at end of file diff --git a/src/db/documents/mod.rs b/src/db/documents/mod.rs new file mode 100644 index 0000000..863099c --- /dev/null +++ b/src/db/documents/mod.rs @@ -0,0 +1,10 @@ +// Documents database operations organized into focused modules + +mod helpers; +mod crud; +mod search; +mod management; +mod operations; + +// Re-export helper functions for use by other modules if needed +pub use helpers::*; \ No newline at end of file diff --git a/src/db/documents/operations.rs b/src/db/documents/operations.rs new file mode 100644 index 0000000..ab029c8 --- /dev/null +++ b/src/db/documents/operations.rs @@ -0,0 +1,274 @@ +use anyhow::Result; +use sqlx::{QueryBuilder, Postgres, Transaction}; +use uuid::Uuid; + +use crate::models::{Document, UserRole, FailedDocument}; +use super::helpers::{map_row_to_document, apply_role_based_filter, DOCUMENT_FIELDS}; +use crate::db::Database; + +impl Database { + /// Deletes a single document with role-based access control + pub async fn delete_document(&self, document_id: Uuid, user_id: Uuid, user_role: UserRole) -> Result { + let mut query = QueryBuilder::::new("DELETE FROM documents WHERE id = "); + query.push_bind(document_id); + + apply_role_based_filter(&mut query, user_id, user_role); + + let result = query.build().execute(&self.pool).await?; + Ok(result.rows_affected() > 0) + } + + /// Bulk deletes multiple documents with role-based access control + pub async fn bulk_delete_documents(&self, document_ids: &[Uuid], user_id: Uuid, user_role: UserRole) -> Result<(Vec, Vec)> { + if document_ids.is_empty() { + return Ok((Vec::new(), Vec::new())); + } + + let mut tx = self.pool.begin().await?; + let mut deleted_ids = Vec::new(); + let mut failed_ids = Vec::new(); + + for &doc_id in document_ids { + let mut query = QueryBuilder::::new("DELETE FROM documents WHERE id = "); + query.push_bind(doc_id); + + apply_role_based_filter(&mut query, user_id, user_role); + query.push(" RETURNING id"); + + match query.build().fetch_optional(&mut *tx).await { + Ok(Some(row)) => { + let deleted_id: Uuid = row.get("id"); + deleted_ids.push(deleted_id); + } + Ok(None) => { + failed_ids.push(doc_id); + } + Err(_) => { + failed_ids.push(doc_id); + } + } + } + + tx.commit().await?; + Ok((deleted_ids, failed_ids)) + } + + /// Finds documents with OCR confidence below threshold + pub async fn find_documents_by_confidence_threshold(&self, user_id: Uuid, user_role: UserRole, max_confidence: f32, limit: i64, offset: i64) -> Result> { + let mut query = QueryBuilder::::new("SELECT "); + query.push(DOCUMENT_FIELDS); + query.push(" FROM documents WHERE ocr_confidence IS NOT NULL AND ocr_confidence <= "); + query.push_bind(max_confidence); + + apply_role_based_filter(&mut query, user_id, user_role); + query.push(" ORDER BY ocr_confidence ASC, created_at DESC"); + query.push(" LIMIT "); + query.push_bind(limit); + query.push(" OFFSET "); + query.push_bind(offset); + + let rows = query.build().fetch_all(&self.pool).await?; + Ok(rows.iter().map(map_row_to_document).collect()) + } + + /// Finds documents with failed OCR processing + pub async fn find_failed_ocr_documents(&self, user_id: Uuid, user_role: UserRole, limit: i64, offset: i64) -> Result> { + let mut query = QueryBuilder::::new("SELECT "); + query.push(DOCUMENT_FIELDS); + query.push(" FROM documents WHERE ocr_status = 'failed'"); + + apply_role_based_filter(&mut query, user_id, user_role); + query.push(" ORDER BY created_at DESC"); + query.push(" LIMIT "); + query.push_bind(limit); + query.push(" OFFSET "); + query.push_bind(offset); + + let rows = query.build().fetch_all(&self.pool).await?; + Ok(rows.iter().map(map_row_to_document).collect()) + } + + /// Finds both low confidence and failed OCR documents + pub async fn find_low_confidence_and_failed_documents(&self, user_id: Uuid, user_role: UserRole, max_confidence: f32, limit: i64, offset: i64) -> Result> { + let mut query = QueryBuilder::::new("SELECT "); + query.push(DOCUMENT_FIELDS); + query.push(" FROM documents WHERE (ocr_status = 'failed' OR (ocr_confidence IS NOT NULL AND ocr_confidence <= "); + query.push_bind(max_confidence); + query.push("))"); + + apply_role_based_filter(&mut query, user_id, user_role); + query.push(" ORDER BY CASE WHEN ocr_status = 'failed' THEN 0 ELSE 1 END, ocr_confidence ASC, created_at DESC"); + query.push(" LIMIT "); + query.push_bind(limit); + query.push(" OFFSET "); + query.push_bind(offset); + + let rows = query.build().fetch_all(&self.pool).await?; + Ok(rows.iter().map(map_row_to_document).collect()) + } + + /// Creates a failed document record + pub async fn create_failed_document(&self, failed_document: FailedDocument) -> Result { + let row = sqlx::query( + r#" + INSERT INTO failed_documents ( + id, user_id, filename, original_filename, original_path, file_path, + file_size, file_hash, mime_type, content, tags, ocr_text, ocr_confidence, + ocr_word_count, ocr_processing_time_ms, failure_reason, failure_stage, + existing_document_id, ingestion_source, error_message, retry_count, + last_retry_at, created_at, updated_at + ) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24) + RETURNING * + "# + ) + .bind(failed_document.id) + .bind(failed_document.user_id) + .bind(&failed_document.filename) + .bind(&failed_document.original_filename) + .bind(&failed_document.original_path) + .bind(&failed_document.file_path) + .bind(failed_document.file_size) + .bind(&failed_document.file_hash) + .bind(&failed_document.mime_type) + .bind(&failed_document.content) + .bind(&failed_document.tags) + .bind(&failed_document.ocr_text) + .bind(failed_document.ocr_confidence) + .bind(failed_document.ocr_word_count) + .bind(failed_document.ocr_processing_time_ms) + .bind(&failed_document.failure_reason) + .bind(&failed_document.failure_stage) + .bind(failed_document.existing_document_id) + .bind(&failed_document.ingestion_source) + .bind(&failed_document.error_message) + .bind(failed_document.retry_count) + .bind(failed_document.last_retry_at) + .bind(failed_document.created_at) + .bind(failed_document.updated_at) + .fetch_one(&self.pool) + .await?; + + Ok(FailedDocument { + id: row.get("id"), + user_id: row.get("user_id"), + filename: row.get("filename"), + original_filename: row.get("original_filename"), + original_path: row.get("original_path"), + file_path: row.get("file_path"), + file_size: row.get("file_size"), + file_hash: row.get("file_hash"), + mime_type: row.get("mime_type"), + content: row.get("content"), + tags: row.get("tags"), + ocr_text: row.get("ocr_text"), + ocr_confidence: row.get("ocr_confidence"), + ocr_word_count: row.get("ocr_word_count"), + ocr_processing_time_ms: row.get("ocr_processing_time_ms"), + failure_reason: row.get("failure_reason"), + failure_stage: row.get("failure_stage"), + existing_document_id: row.get("existing_document_id"), + ingestion_source: row.get("ingestion_source"), + error_message: row.get("error_message"), + retry_count: row.get("retry_count"), + last_retry_at: row.get("last_retry_at"), + created_at: row.get("created_at"), + updated_at: row.get("updated_at"), + }) + } + + /// Creates a failed document record from an existing document + pub async fn create_failed_document_from_document(&self, document: &Document, failure_reason: &str, failure_stage: &str, error_message: Option<&str>) -> Result { + let failed_doc = FailedDocument { + id: Uuid::new_v4(), + user_id: document.user_id, + filename: document.filename.clone(), + original_filename: Some(document.original_filename.clone()), + original_path: Some(document.file_path.clone()), + file_path: Some(document.file_path.clone()), + file_size: Some(document.file_size), + file_hash: document.file_hash.clone(), + mime_type: Some(document.mime_type.clone()), + content: document.content.clone(), + tags: document.tags.clone(), + ocr_text: document.ocr_text.clone(), + ocr_confidence: document.ocr_confidence, + ocr_word_count: document.ocr_word_count, + ocr_processing_time_ms: document.ocr_processing_time_ms, + failure_reason: failure_reason.to_string(), + failure_stage: failure_stage.to_string(), + existing_document_id: Some(document.id), + ingestion_source: "document_processing".to_string(), + error_message: error_message.map(|s| s.to_string()), + retry_count: Some(0), + last_retry_at: None, + created_at: chrono::Utc::now(), + updated_at: chrono::Utc::now(), + }; + + self.create_failed_document(failed_doc).await + } + + /// Updates OCR retry information for a document + pub async fn update_document_ocr_retry(&self, document_id: Uuid, retry_count: i32, failure_reason: Option<&str>) -> Result<()> { + sqlx::query( + r#" + UPDATE documents + SET ocr_retry_count = $2, ocr_failure_reason = $3, updated_at = NOW() + WHERE id = $1 + "# + ) + .bind(document_id) + .bind(retry_count) + .bind(failure_reason) + .execute(&self.pool) + .await?; + + Ok(()) + } + + /// Marks documents as completed OCR processing + pub async fn mark_documents_ocr_completed(&self, document_ids: &[Uuid]) -> Result { + if document_ids.is_empty() { + return Ok(0); + } + + let result = sqlx::query( + r#" + UPDATE documents + SET ocr_status = 'completed', ocr_completed_at = NOW(), updated_at = NOW() + WHERE id = ANY($1) + "# + ) + .bind(document_ids) + .execute(&self.pool) + .await?; + + Ok(result.rows_affected()) + } + + /// Counts documents by OCR status + pub async fn count_documents_by_ocr_status(&self, user_id: Uuid, user_role: UserRole) -> Result<(i64, i64, i64, i64)> { + let mut query = QueryBuilder::::new( + r#" + SELECT + COUNT(*) as total, + COUNT(CASE WHEN ocr_status IS NULL OR ocr_status = 'pending' THEN 1 END) as pending, + COUNT(CASE WHEN ocr_status = 'completed' THEN 1 END) as completed, + COUNT(CASE WHEN ocr_status = 'failed' THEN 1 END) as failed + FROM documents WHERE 1=1 + "# + ); + + apply_role_based_filter(&mut query, user_id, user_role); + + let row = query.build().fetch_one(&self.pool).await?; + + Ok(( + row.get("total"), + row.get("pending"), + row.get("completed"), + row.get("failed"), + )) + } +} \ No newline at end of file diff --git a/src/db/documents/search.rs b/src/db/documents/search.rs new file mode 100644 index 0000000..5c8d40c --- /dev/null +++ b/src/db/documents/search.rs @@ -0,0 +1,259 @@ +use anyhow::Result; +use sqlx::{QueryBuilder, Postgres}; +use uuid::Uuid; + +use crate::models::{Document, UserRole, SearchRequest, SearchMode, SearchSnippet, HighlightRange, EnhancedDocumentResponse}; +use super::helpers::{map_row_to_document, apply_role_based_filter, apply_pagination, find_word_boundary, DOCUMENT_FIELDS}; +use crate::db::Database; + +impl Database { + /// Performs basic document search with PostgreSQL full-text search + pub async fn search_documents(&self, user_id: Uuid, search_request: &SearchRequest) -> Result> { + let mut query = QueryBuilder::::new("SELECT "); + query.push(DOCUMENT_FIELDS); + query.push(" FROM documents WHERE user_id = "); + query.push_bind(user_id); + + // Add search conditions + if !search_request.query.trim().is_empty() { + query.push(" AND (to_tsvector('english', COALESCE(content, '')) @@ plainto_tsquery('english', "); + query.push_bind(&search_request.query); + query.push(") OR to_tsvector('english', COALESCE(ocr_text, '')) @@ plainto_tsquery('english', "); + query.push_bind(&search_request.query); + query.push("))"); + } + + // Add tag filtering + if let Some(ref tags) = search_request.tags { + if !tags.is_empty() { + query.push(" AND tags && "); + query.push_bind(tags); + } + } + + // Add MIME type filtering + if let Some(ref mime_types) = search_request.mime_types { + if !mime_types.is_empty() { + query.push(" AND mime_type = ANY("); + query.push_bind(mime_types); + query.push(")"); + } + } + + query.push(" ORDER BY created_at DESC"); + + let limit = search_request.limit.unwrap_or(25); + let offset = search_request.offset.unwrap_or(0); + apply_pagination(&mut query, limit, offset); + + let rows = query.build().fetch_all(&self.pool).await?; + Ok(rows.iter().map(map_row_to_document).collect()) + } + + /// Enhanced search with snippets and ranking + pub async fn enhanced_search_documents(&self, user_id: Uuid, search_request: &SearchRequest) -> Result> { + self.enhanced_search_documents_with_role(user_id, UserRole::User, search_request).await + } + + /// Enhanced search with role-based access control + pub async fn enhanced_search_documents_with_role(&self, user_id: Uuid, user_role: UserRole, search_request: &SearchRequest) -> Result> { + let search_query = search_request.query.trim(); + let include_snippets = search_request.include_snippets.unwrap_or(true); + let snippet_length = search_request.snippet_length.unwrap_or(200) as usize; + + let mut query = QueryBuilder::::new("SELECT "); + query.push(DOCUMENT_FIELDS); + + // Add search ranking if there's a query + if !search_query.is_empty() { + match search_request.search_mode.as_ref().unwrap_or(&SearchMode::Simple) { + SearchMode::Simple => { + query.push(", ts_rank(to_tsvector('english', COALESCE(content, '') || ' ' || COALESCE(ocr_text, '')), plainto_tsquery('english', "); + query.push_bind(search_query); + query.push(")) as search_rank"); + } + SearchMode::Phrase => { + query.push(", ts_rank(to_tsvector('english', COALESCE(content, '') || ' ' || COALESCE(ocr_text, '')), phraseto_tsquery('english', "); + query.push_bind(search_query); + query.push(")) as search_rank"); + } + SearchMode::Boolean => { + query.push(", ts_rank(to_tsvector('english', COALESCE(content, '') || ' ' || COALESCE(ocr_text, '')), to_tsquery('english', "); + query.push_bind(search_query); + query.push(")) as search_rank"); + } + SearchMode::Fuzzy => { + query.push(", similarity(COALESCE(content, '') || ' ' || COALESCE(ocr_text, ''), "); + query.push_bind(search_query); + query.push(") as search_rank"); + } + } + } else { + query.push(", 0.0 as search_rank"); + } + + query.push(" FROM documents WHERE 1=1"); + + apply_role_based_filter(&mut query, user_id, user_role); + + // Add search conditions + if !search_query.is_empty() { + match search_request.search_mode.as_ref().unwrap_or(&SearchMode::Simple) { + SearchMode::Simple => { + query.push(" AND (to_tsvector('english', COALESCE(content, '')) @@ plainto_tsquery('english', "); + query.push_bind(search_query); + query.push(") OR to_tsvector('english', COALESCE(ocr_text, '')) @@ plainto_tsquery('english', "); + query.push_bind(search_query); + query.push("))"); + } + SearchMode::Phrase => { + query.push(" AND (to_tsvector('english', COALESCE(content, '')) @@ phraseto_tsquery('english', "); + query.push_bind(search_query); + query.push(") OR to_tsvector('english', COALESCE(ocr_text, '')) @@ phraseto_tsquery('english', "); + query.push_bind(search_query); + query.push("))"); + } + SearchMode::Boolean => { + query.push(" AND (to_tsvector('english', COALESCE(content, '')) @@ to_tsquery('english', "); + query.push_bind(search_query); + query.push(") OR to_tsvector('english', COALESCE(ocr_text, '')) @@ to_tsquery('english', "); + query.push_bind(search_query); + query.push("))"); + } + SearchMode::Fuzzy => { + query.push(" AND similarity(COALESCE(content, '') || ' ' || COALESCE(ocr_text, ''), "); + query.push_bind(search_query); + query.push(") > 0.3"); + } + } + } + + // Add filtering + if let Some(ref tags) = search_request.tags { + if !tags.is_empty() { + query.push(" AND tags && "); + query.push_bind(tags); + } + } + + if let Some(ref mime_types) = search_request.mime_types { + if !mime_types.is_empty() { + query.push(" AND mime_type = ANY("); + query.push_bind(mime_types); + query.push(")"); + } + } + + query.push(" ORDER BY search_rank DESC, created_at DESC"); + + let limit = search_request.limit.unwrap_or(25); + let offset = search_request.offset.unwrap_or(0); + apply_pagination(&mut query, limit, offset); + + let rows = query.build().fetch_all(&self.pool).await?; + + let mut results = Vec::new(); + for row in rows { + let document = map_row_to_document(&row); + let search_rank: f32 = row.try_get("search_rank").unwrap_or(0.0); + + let snippets = if include_snippets && !search_query.is_empty() { + self.generate_snippets(&document, search_query, snippet_length).await + } else { + Vec::new() + }; + + results.push(EnhancedDocumentResponse { + id: document.id, + filename: document.filename, + original_filename: document.original_filename, + file_size: document.file_size, + mime_type: document.mime_type, + tags: document.tags, + created_at: document.created_at, + has_ocr_text: document.ocr_text.is_some(), + ocr_confidence: document.ocr_confidence, + ocr_word_count: document.ocr_word_count, + ocr_processing_time_ms: document.ocr_processing_time_ms, + ocr_status: document.ocr_status, + search_rank: Some(search_rank), + snippets, + }); + } + + Ok(results) + } + + /// Generates search snippets with highlighted matches + pub async fn generate_snippets(&self, document: &Document, search_query: &str, snippet_length: usize) -> Vec { + let mut snippets = Vec::new(); + let search_terms: Vec<&str> = search_query.split_whitespace().collect(); + + // Search in content and OCR text + let texts = vec![ + ("content", document.content.as_deref().unwrap_or("")), + ("ocr_text", document.ocr_text.as_deref().unwrap_or("")) + ]; + + for (source, text) in texts { + if text.is_empty() { + continue; + } + + let text_lower = text.to_lowercase(); + for term in &search_terms { + let term_lower = term.to_lowercase(); + let mut start_pos = 0; + + while let Some(match_pos) = text_lower[start_pos..].find(&term_lower) { + let absolute_match_pos = start_pos + match_pos; + + // Calculate snippet boundaries + let snippet_start = if absolute_match_pos >= snippet_length / 2 { + find_word_boundary(text, absolute_match_pos - snippet_length / 2, false) + } else { + 0 + }; + + let snippet_end = { + let desired_end = snippet_start + snippet_length; + if desired_end < text.len() { + find_word_boundary(text, desired_end, true) + } else { + text.len() + } + }; + + let snippet_text = &text[snippet_start..snippet_end]; + + // Calculate highlight range relative to snippet + let highlight_start = absolute_match_pos - snippet_start; + let highlight_end = highlight_start + term.len(); + + let highlight_ranges = vec![HighlightRange { + start: highlight_start as i32, + end: highlight_end as i32, + }]; + + snippets.push(SearchSnippet { + text: snippet_text.to_string(), + start_offset: snippet_start as i32, + end_offset: snippet_end as i32, + highlight_ranges, + }); + + start_pos = absolute_match_pos + term.len(); + + // Limit snippets per term + if snippets.len() >= 3 { + break; + } + } + } + } + + // Remove duplicates and limit total snippets + snippets.truncate(5); + snippets + } +} \ No newline at end of file