feat(dev): break up the large documents.rs file

This commit is contained in:
perf3ct 2025-07-03 19:47:31 +00:00
parent 2a6f20b0d3
commit 2b7d901b9d
No known key found for this signature in database
GPG Key ID: 569C4EEC436F5232
7 changed files with 1143 additions and 1999 deletions

File diff suppressed because it is too large Load Diff

198
src/db/documents/crud.rs Normal file
View File

@ -0,0 +1,198 @@
use anyhow::Result;
use sqlx::{QueryBuilder, Postgres};
use uuid::Uuid;
use crate::models::{Document, UserRole};
use super::helpers::{map_row_to_document, apply_role_based_filter, apply_pagination, DOCUMENT_FIELDS};
use crate::db::Database;
impl Database {
/// Creates a new document in the database
pub async fn create_document(&self, document: Document) -> Result<Document> {
let query_str = format!(
r#"
INSERT INTO documents (id, filename, original_filename, file_path, file_size, mime_type, content, ocr_text, ocr_confidence, ocr_word_count, ocr_processing_time_ms, ocr_status, ocr_error, ocr_completed_at, ocr_retry_count, ocr_failure_reason, tags, created_at, updated_at, user_id, file_hash, original_created_at, original_modified_at, source_metadata)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24)
RETURNING {}
"#,
DOCUMENT_FIELDS
);
let row = sqlx::query(&query_str)
.bind(document.id)
.bind(&document.filename)
.bind(&document.original_filename)
.bind(&document.file_path)
.bind(document.file_size)
.bind(&document.mime_type)
.bind(&document.content)
.bind(&document.ocr_text)
.bind(document.ocr_confidence)
.bind(document.ocr_word_count)
.bind(document.ocr_processing_time_ms)
.bind(&document.ocr_status)
.bind(&document.ocr_error)
.bind(document.ocr_completed_at)
.bind(document.ocr_retry_count)
.bind(&document.ocr_failure_reason)
.bind(&document.tags)
.bind(document.created_at)
.bind(document.updated_at)
.bind(document.user_id)
.bind(&document.file_hash)
.bind(document.original_created_at)
.bind(document.original_modified_at)
.bind(&document.source_metadata)
.fetch_one(&self.pool)
.await?;
Ok(map_row_to_document(&row))
}
/// Retrieves a document by ID with role-based access control
pub async fn get_document_by_id(&self, document_id: Uuid, user_id: Uuid, user_role: UserRole) -> Result<Option<Document>> {
let mut query = QueryBuilder::<Postgres>::new("SELECT ");
query.push(DOCUMENT_FIELDS);
query.push(" FROM documents WHERE id = ");
query.push_bind(document_id);
apply_role_based_filter(&mut query, user_id, user_role);
let row = query
.build()
.fetch_optional(&self.pool)
.await?;
Ok(row.map(|r| map_row_to_document(&r)))
}
/// Gets documents for a user with role-based access and pagination
pub async fn get_documents_by_user(&self, user_id: Uuid, limit: i64, offset: i64) -> Result<Vec<Document>> {
let query_str = format!(
r#"
SELECT {}
FROM documents
WHERE user_id = $1
ORDER BY created_at DESC
LIMIT $2 OFFSET $3
"#,
DOCUMENT_FIELDS
);
let rows = sqlx::query(&query_str)
.bind(user_id)
.bind(limit)
.bind(offset)
.fetch_all(&self.pool)
.await?;
Ok(rows.iter().map(map_row_to_document).collect())
}
/// Gets documents with role-based access control
pub async fn get_documents_by_user_with_role(&self, user_id: Uuid, user_role: UserRole, limit: i64, offset: i64) -> Result<Vec<Document>> {
let mut query = QueryBuilder::<Postgres>::new("SELECT ");
query.push(DOCUMENT_FIELDS);
query.push(" FROM documents WHERE 1=1");
apply_role_based_filter(&mut query, user_id, user_role);
query.push(" ORDER BY created_at DESC");
apply_pagination(&mut query, limit, offset);
let rows = query
.build()
.fetch_all(&self.pool)
.await?;
Ok(rows.iter().map(map_row_to_document).collect())
}
/// Finds a document by user and file hash (for duplicate detection)
pub async fn get_document_by_user_and_hash(&self, user_id: Uuid, file_hash: &str) -> Result<Option<Document>> {
let query_str = format!(
r#"
SELECT {}
FROM documents
WHERE user_id = $1 AND file_hash = $2
"#,
DOCUMENT_FIELDS
);
let row = sqlx::query(&query_str)
.bind(user_id)
.bind(file_hash)
.fetch_optional(&self.pool)
.await?;
Ok(row.map(|r| map_row_to_document(&r)))
}
/// Finds documents by filename or original filename
pub async fn find_documents_by_filename(&self, user_id: Uuid, filename: &str, limit: i64, offset: i64) -> Result<Vec<Document>> {
let query_str = format!(
r#"
SELECT {}
FROM documents
WHERE user_id = $1 AND (filename ILIKE $2 OR original_filename ILIKE $2)
ORDER BY created_at DESC
LIMIT $3 OFFSET $4
"#,
DOCUMENT_FIELDS
);
let search_pattern = format!("%{}%", filename);
let rows = sqlx::query(&query_str)
.bind(user_id)
.bind(search_pattern)
.bind(limit)
.bind(offset)
.fetch_all(&self.pool)
.await?;
Ok(rows.iter().map(map_row_to_document).collect())
}
/// Updates the OCR text for a document
pub async fn update_document_ocr(&self, document_id: Uuid, ocr_text: Option<String>, ocr_confidence: Option<f32>, ocr_word_count: Option<i32>, ocr_processing_time_ms: Option<i32>, ocr_status: Option<String>) -> Result<()> {
sqlx::query(
r#"
UPDATE documents
SET ocr_text = $2, ocr_confidence = $3, ocr_word_count = $4, ocr_processing_time_ms = $5, ocr_status = $6, updated_at = NOW()
WHERE id = $1
"#
)
.bind(document_id)
.bind(ocr_text)
.bind(ocr_confidence)
.bind(ocr_word_count)
.bind(ocr_processing_time_ms)
.bind(ocr_status)
.execute(&self.pool)
.await?;
Ok(())
}
/// Gets recent documents for a specific source
pub async fn get_recent_documents_for_source(&self, user_id: Uuid, source_id: Uuid, limit: i64) -> Result<Vec<Document>> {
let query_str = format!(
r#"
SELECT {}
FROM documents
WHERE user_id = $1 AND source_metadata->>'source_id' = $2
ORDER BY created_at DESC
LIMIT $3
"#,
DOCUMENT_FIELDS
);
let rows = sqlx::query(&query_str)
.bind(user_id)
.bind(source_id.to_string())
.bind(limit)
.fetch_all(&self.pool)
.await?;
Ok(rows.iter().map(map_row_to_document).collect())
}
}

View File

@ -0,0 +1,98 @@
use anyhow::Result;
use sqlx::{Row, QueryBuilder, Postgres};
use uuid::Uuid;
use crate::models::{Document, UserRole};
/// Standard document fields for SELECT queries
pub const DOCUMENT_FIELDS: &str = r#"
id, filename, original_filename, file_path, file_size, mime_type,
content, ocr_text, ocr_confidence, ocr_word_count, ocr_processing_time_ms,
ocr_status, ocr_error, ocr_completed_at, ocr_retry_count, ocr_failure_reason,
tags, created_at, updated_at, user_id, file_hash, original_created_at,
original_modified_at, source_metadata
"#;
/// Maps a database row to a Document struct
/// This eliminates the ~15+ instances of duplicate row mapping code
pub fn map_row_to_document(row: &sqlx::postgres::PgRow) -> Document {
Document {
id: row.get("id"),
filename: row.get("filename"),
original_filename: row.get("original_filename"),
file_path: row.get("file_path"),
file_size: row.get("file_size"),
mime_type: row.get("mime_type"),
content: row.get("content"),
ocr_text: row.get("ocr_text"),
ocr_confidence: row.get("ocr_confidence"),
ocr_word_count: row.get("ocr_word_count"),
ocr_processing_time_ms: row.get("ocr_processing_time_ms"),
ocr_status: row.get("ocr_status"),
ocr_error: row.get("ocr_error"),
ocr_completed_at: row.get("ocr_completed_at"),
ocr_retry_count: row.get("ocr_retry_count"),
ocr_failure_reason: row.get("ocr_failure_reason"),
tags: row.get("tags"),
created_at: row.get("created_at"),
updated_at: row.get("updated_at"),
user_id: row.get("user_id"),
file_hash: row.get("file_hash"),
original_created_at: row.get("original_created_at"),
original_modified_at: row.get("original_modified_at"),
source_metadata: row.get("source_metadata"),
}
}
/// Applies role-based filtering to a query builder
/// Admins can see all documents, regular users only see their own
pub fn apply_role_based_filter(
query: &mut QueryBuilder<Postgres>,
user_id: Uuid,
role: UserRole
) {
match role {
UserRole::Admin => {
// Admins can see all documents - no additional filter needed
}
UserRole::User => {
query.push(" AND user_id = ");
query.push_bind(user_id);
}
}
}
/// Applies pagination to a query builder
pub fn apply_pagination(query: &mut QueryBuilder<Postgres>, limit: i64, offset: i64) {
query.push(" LIMIT ");
query.push_bind(limit);
query.push(" OFFSET ");
query.push_bind(offset);
}
/// Helper to determine if a character is a word boundary for snippet generation
pub fn is_word_boundary(c: char) -> bool {
c.is_whitespace() || c.is_ascii_punctuation()
}
/// Finds word boundary for snippet generation
pub fn find_word_boundary(text: &str, position: usize, search_forward: bool) -> usize {
let chars: Vec<char> = text.chars().collect();
let start_pos = if position >= chars.len() { chars.len() - 1 } else { position };
if search_forward {
for i in start_pos..chars.len() {
if is_word_boundary(chars[i]) {
return text.char_indices().nth(i).map(|(idx, _)| idx).unwrap_or(text.len());
}
}
text.len()
} else {
for i in (0..=start_pos).rev() {
if is_word_boundary(chars[i]) {
return text.char_indices().nth(i).map(|(idx, _)| idx).unwrap_or(0);
}
}
0
}
}

View File

@ -0,0 +1,304 @@
use anyhow::Result;
use sqlx::{QueryBuilder, Postgres};
use uuid::Uuid;
use crate::models::{Document, UserRole, FacetItem};
use crate::routes::labels::Label;
use super::helpers::{map_row_to_document, apply_role_based_filter, DOCUMENT_FIELDS};
use crate::db::Database;
impl Database {
/// Gets labels for a specific document
pub async fn get_document_labels(&self, document_id: Uuid) -> Result<Vec<Label>> {
let rows = sqlx::query_as::<_, Label>(
r#"
SELECT l.id, l.user_id, l.name, l.color, l.created_at, l.updated_at
FROM labels l
JOIN document_labels dl ON l.id = dl.label_id
WHERE dl.document_id = $1
ORDER BY l.name
"#
)
.bind(document_id)
.fetch_all(&self.pool)
.await?;
Ok(rows)
}
/// Gets labels for multiple documents in batch
pub async fn get_labels_for_documents(&self, document_ids: &[Uuid]) -> Result<Vec<(Uuid, Vec<Label>)>> {
if document_ids.is_empty() {
return Ok(Vec::new());
}
let rows = sqlx::query(
r#"
SELECT dl.document_id, l.id as label_id, l.user_id, l.name, l.color, l.created_at, l.updated_at
FROM labels l
JOIN document_labels dl ON l.id = dl.label_id
WHERE dl.document_id = ANY($1)
ORDER BY dl.document_id, l.name
"#
)
.bind(document_ids)
.fetch_all(&self.pool)
.await?;
let mut result = Vec::new();
let mut current_doc_id: Option<Uuid> = None;
let mut current_labels = Vec::new();
for row in rows {
let doc_id: Uuid = row.get("document_id");
let label = Label {
id: row.get("label_id"),
user_id: row.get("user_id"),
name: row.get("name"),
color: row.get("color"),
created_at: row.get("created_at"),
updated_at: row.get("updated_at"),
};
if Some(doc_id) != current_doc_id {
if let Some(prev_doc_id) = current_doc_id {
result.push((prev_doc_id, std::mem::take(&mut current_labels)));
}
current_doc_id = Some(doc_id);
}
current_labels.push(label);
}
if let Some(doc_id) = current_doc_id {
result.push((doc_id, current_labels));
}
Ok(result)
}
/// Finds duplicate documents by file hash for a user
pub async fn get_user_duplicates(&self, user_id: Uuid, user_role: UserRole, limit: i64, offset: i64) -> Result<Vec<Vec<Document>>> {
let mut query = QueryBuilder::<Postgres>::new(
r#"
WITH duplicate_hashes AS (
SELECT file_hash, COUNT(*) as count
FROM documents
WHERE file_hash IS NOT NULL
"#
);
if user_role != UserRole::Admin {
query.push(" AND user_id = ");
query.push_bind(user_id);
}
query.push(
r#"
GROUP BY file_hash
HAVING COUNT(*) > 1
)
SELECT d.*
FROM documents d
JOIN duplicate_hashes dh ON d.file_hash = dh.file_hash
WHERE d.file_hash IS NOT NULL
"#
);
if user_role != UserRole::Admin {
query.push(" AND d.user_id = ");
query.push_bind(user_id);
}
query.push(" ORDER BY d.file_hash, d.created_at");
let rows = query.build().fetch_all(&self.pool).await?;
let documents: Vec<Document> = rows.iter().map(map_row_to_document).collect();
// Group documents by file hash
let mut duplicate_groups = Vec::new();
let mut current_group = Vec::new();
let mut current_hash: Option<String> = None;
for document in documents {
if document.file_hash != current_hash {
if !current_group.is_empty() {
duplicate_groups.push(std::mem::take(&mut current_group));
}
current_hash = document.file_hash.clone();
}
current_group.push(document);
}
if !current_group.is_empty() {
duplicate_groups.push(current_group);
}
// Apply pagination to groups
let start = offset as usize;
let end = (offset + limit) as usize;
Ok(duplicate_groups.into_iter().skip(start).take(end - start).collect())
}
/// Gets MIME type facets (aggregated counts by MIME type)
pub async fn get_mime_type_facets(&self, user_id: Uuid, user_role: UserRole) -> Result<Vec<FacetItem>> {
let mut query = QueryBuilder::<Postgres>::new(
"SELECT mime_type as value, COUNT(*) as count FROM documents WHERE 1=1"
);
apply_role_based_filter(&mut query, user_id, user_role);
query.push(" GROUP BY mime_type ORDER BY count DESC, mime_type");
let rows = query.build().fetch_all(&self.pool).await?;
Ok(rows.into_iter().map(|row| FacetItem {
value: row.get("value"),
count: row.get("count"),
}).collect())
}
/// Gets tag facets (aggregated counts by tag)
pub async fn get_tag_facets(&self, user_id: Uuid, user_role: UserRole) -> Result<Vec<FacetItem>> {
let mut query = QueryBuilder::<Postgres>::new(
"SELECT unnest(tags) as value, COUNT(*) as count FROM documents WHERE 1=1"
);
apply_role_based_filter(&mut query, user_id, user_role);
query.push(" GROUP BY unnest(tags) ORDER BY count DESC, value");
let rows = query.build().fetch_all(&self.pool).await?;
Ok(rows.into_iter().map(|row| FacetItem {
value: row.get("value"),
count: row.get("count"),
}).collect())
}
/// Counts documents for a specific source
pub async fn count_documents_for_source(&self, user_id: Uuid, source_id: Uuid) -> Result<(i64, i64)> {
let row = sqlx::query(
r#"
SELECT
COUNT(*) as total_documents,
COUNT(CASE WHEN ocr_text IS NOT NULL THEN 1 END) as total_documents_ocr
FROM documents
WHERE user_id = $1 AND source_metadata->>'source_id' = $2
"#
)
.bind(user_id)
.bind(source_id.to_string())
.fetch_one(&self.pool)
.await?;
Ok((row.get("total_documents"), row.get("total_documents_ocr")))
}
/// Counts documents for multiple sources in batch
pub async fn count_documents_for_sources(&self, user_id: Uuid, source_ids: &[Uuid]) -> Result<Vec<(Uuid, i64, i64)>> {
if source_ids.is_empty() {
return Ok(Vec::new());
}
let source_id_strings: Vec<String> = source_ids.iter().map(|id| id.to_string()).collect();
let rows = sqlx::query(
r#"
SELECT
source_metadata->>'source_id' as source_id_str,
COUNT(*) as total_documents,
COUNT(CASE WHEN ocr_text IS NOT NULL THEN 1 END) as total_documents_ocr
FROM documents
WHERE user_id = $1 AND source_metadata->>'source_id' = ANY($2)
GROUP BY source_metadata->>'source_id'
"#
)
.bind(user_id)
.bind(&source_id_strings)
.fetch_all(&self.pool)
.await?;
Ok(rows.into_iter().map(|row| {
let source_id_str: String = row.get("source_id_str");
let source_id = Uuid::parse_str(&source_id_str).unwrap_or_default();
let total_documents: i64 = row.get("total_documents");
let total_documents_ocr: i64 = row.get("total_documents_ocr");
(source_id, total_documents, total_documents_ocr)
}).collect())
}
/// Gets documents by user with role-based access and OCR status filtering
pub async fn get_documents_by_user_with_role_and_filter(
&self,
user_id: Uuid,
user_role: UserRole,
ocr_status: Option<&str>,
limit: i64,
offset: i64
) -> Result<Vec<Document>> {
let mut query = QueryBuilder::<Postgres>::new("SELECT ");
query.push(DOCUMENT_FIELDS);
query.push(" FROM documents WHERE 1=1");
apply_role_based_filter(&mut query, user_id, user_role);
if let Some(status) = ocr_status {
match status {
"pending" => {
query.push(" AND (ocr_status IS NULL OR ocr_status = 'pending')");
}
"completed" => {
query.push(" AND ocr_status = 'completed'");
}
"failed" => {
query.push(" AND ocr_status = 'failed'");
}
_ => {
query.push(" AND ocr_status = ");
query.push_bind(status);
}
}
}
query.push(" ORDER BY created_at DESC");
query.push(" LIMIT ");
query.push_bind(limit);
query.push(" OFFSET ");
query.push_bind(offset);
let rows = query.build().fetch_all(&self.pool).await?;
Ok(rows.iter().map(map_row_to_document).collect())
}
/// Counts documents with role-based access and OCR status filtering
pub async fn get_documents_count_with_role_and_filter(
&self,
user_id: Uuid,
user_role: UserRole,
ocr_status: Option<&str>
) -> Result<i64> {
let mut query = QueryBuilder::<Postgres>::new("SELECT COUNT(*) FROM documents WHERE 1=1");
apply_role_based_filter(&mut query, user_id, user_role);
if let Some(status) = ocr_status {
match status {
"pending" => {
query.push(" AND (ocr_status IS NULL OR ocr_status = 'pending')");
}
"completed" => {
query.push(" AND ocr_status = 'completed'");
}
"failed" => {
query.push(" AND ocr_status = 'failed'");
}
_ => {
query.push(" AND ocr_status = ");
query.push_bind(status);
}
}
}
let row = query.build().fetch_one(&self.pool).await?;
Ok(row.get(0))
}
}

10
src/db/documents/mod.rs Normal file
View File

@ -0,0 +1,10 @@
// Documents database operations organized into focused modules
mod helpers;
mod crud;
mod search;
mod management;
mod operations;
// Re-export helper functions for use by other modules if needed
pub use helpers::*;

View File

@ -0,0 +1,274 @@
use anyhow::Result;
use sqlx::{QueryBuilder, Postgres, Transaction};
use uuid::Uuid;
use crate::models::{Document, UserRole, FailedDocument};
use super::helpers::{map_row_to_document, apply_role_based_filter, DOCUMENT_FIELDS};
use crate::db::Database;
impl Database {
/// Deletes a single document with role-based access control
pub async fn delete_document(&self, document_id: Uuid, user_id: Uuid, user_role: UserRole) -> Result<bool> {
let mut query = QueryBuilder::<Postgres>::new("DELETE FROM documents WHERE id = ");
query.push_bind(document_id);
apply_role_based_filter(&mut query, user_id, user_role);
let result = query.build().execute(&self.pool).await?;
Ok(result.rows_affected() > 0)
}
/// Bulk deletes multiple documents with role-based access control
pub async fn bulk_delete_documents(&self, document_ids: &[Uuid], user_id: Uuid, user_role: UserRole) -> Result<(Vec<Uuid>, Vec<Uuid>)> {
if document_ids.is_empty() {
return Ok((Vec::new(), Vec::new()));
}
let mut tx = self.pool.begin().await?;
let mut deleted_ids = Vec::new();
let mut failed_ids = Vec::new();
for &doc_id in document_ids {
let mut query = QueryBuilder::<Postgres>::new("DELETE FROM documents WHERE id = ");
query.push_bind(doc_id);
apply_role_based_filter(&mut query, user_id, user_role);
query.push(" RETURNING id");
match query.build().fetch_optional(&mut *tx).await {
Ok(Some(row)) => {
let deleted_id: Uuid = row.get("id");
deleted_ids.push(deleted_id);
}
Ok(None) => {
failed_ids.push(doc_id);
}
Err(_) => {
failed_ids.push(doc_id);
}
}
}
tx.commit().await?;
Ok((deleted_ids, failed_ids))
}
/// Finds documents with OCR confidence below threshold
pub async fn find_documents_by_confidence_threshold(&self, user_id: Uuid, user_role: UserRole, max_confidence: f32, limit: i64, offset: i64) -> Result<Vec<Document>> {
let mut query = QueryBuilder::<Postgres>::new("SELECT ");
query.push(DOCUMENT_FIELDS);
query.push(" FROM documents WHERE ocr_confidence IS NOT NULL AND ocr_confidence <= ");
query.push_bind(max_confidence);
apply_role_based_filter(&mut query, user_id, user_role);
query.push(" ORDER BY ocr_confidence ASC, created_at DESC");
query.push(" LIMIT ");
query.push_bind(limit);
query.push(" OFFSET ");
query.push_bind(offset);
let rows = query.build().fetch_all(&self.pool).await?;
Ok(rows.iter().map(map_row_to_document).collect())
}
/// Finds documents with failed OCR processing
pub async fn find_failed_ocr_documents(&self, user_id: Uuid, user_role: UserRole, limit: i64, offset: i64) -> Result<Vec<Document>> {
let mut query = QueryBuilder::<Postgres>::new("SELECT ");
query.push(DOCUMENT_FIELDS);
query.push(" FROM documents WHERE ocr_status = 'failed'");
apply_role_based_filter(&mut query, user_id, user_role);
query.push(" ORDER BY created_at DESC");
query.push(" LIMIT ");
query.push_bind(limit);
query.push(" OFFSET ");
query.push_bind(offset);
let rows = query.build().fetch_all(&self.pool).await?;
Ok(rows.iter().map(map_row_to_document).collect())
}
/// Finds both low confidence and failed OCR documents
pub async fn find_low_confidence_and_failed_documents(&self, user_id: Uuid, user_role: UserRole, max_confidence: f32, limit: i64, offset: i64) -> Result<Vec<Document>> {
let mut query = QueryBuilder::<Postgres>::new("SELECT ");
query.push(DOCUMENT_FIELDS);
query.push(" FROM documents WHERE (ocr_status = 'failed' OR (ocr_confidence IS NOT NULL AND ocr_confidence <= ");
query.push_bind(max_confidence);
query.push("))");
apply_role_based_filter(&mut query, user_id, user_role);
query.push(" ORDER BY CASE WHEN ocr_status = 'failed' THEN 0 ELSE 1 END, ocr_confidence ASC, created_at DESC");
query.push(" LIMIT ");
query.push_bind(limit);
query.push(" OFFSET ");
query.push_bind(offset);
let rows = query.build().fetch_all(&self.pool).await?;
Ok(rows.iter().map(map_row_to_document).collect())
}
/// Creates a failed document record
pub async fn create_failed_document(&self, failed_document: FailedDocument) -> Result<FailedDocument> {
let row = sqlx::query(
r#"
INSERT INTO failed_documents (
id, user_id, filename, original_filename, original_path, file_path,
file_size, file_hash, mime_type, content, tags, ocr_text, ocr_confidence,
ocr_word_count, ocr_processing_time_ms, failure_reason, failure_stage,
existing_document_id, ingestion_source, error_message, retry_count,
last_retry_at, created_at, updated_at
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24)
RETURNING *
"#
)
.bind(failed_document.id)
.bind(failed_document.user_id)
.bind(&failed_document.filename)
.bind(&failed_document.original_filename)
.bind(&failed_document.original_path)
.bind(&failed_document.file_path)
.bind(failed_document.file_size)
.bind(&failed_document.file_hash)
.bind(&failed_document.mime_type)
.bind(&failed_document.content)
.bind(&failed_document.tags)
.bind(&failed_document.ocr_text)
.bind(failed_document.ocr_confidence)
.bind(failed_document.ocr_word_count)
.bind(failed_document.ocr_processing_time_ms)
.bind(&failed_document.failure_reason)
.bind(&failed_document.failure_stage)
.bind(failed_document.existing_document_id)
.bind(&failed_document.ingestion_source)
.bind(&failed_document.error_message)
.bind(failed_document.retry_count)
.bind(failed_document.last_retry_at)
.bind(failed_document.created_at)
.bind(failed_document.updated_at)
.fetch_one(&self.pool)
.await?;
Ok(FailedDocument {
id: row.get("id"),
user_id: row.get("user_id"),
filename: row.get("filename"),
original_filename: row.get("original_filename"),
original_path: row.get("original_path"),
file_path: row.get("file_path"),
file_size: row.get("file_size"),
file_hash: row.get("file_hash"),
mime_type: row.get("mime_type"),
content: row.get("content"),
tags: row.get("tags"),
ocr_text: row.get("ocr_text"),
ocr_confidence: row.get("ocr_confidence"),
ocr_word_count: row.get("ocr_word_count"),
ocr_processing_time_ms: row.get("ocr_processing_time_ms"),
failure_reason: row.get("failure_reason"),
failure_stage: row.get("failure_stage"),
existing_document_id: row.get("existing_document_id"),
ingestion_source: row.get("ingestion_source"),
error_message: row.get("error_message"),
retry_count: row.get("retry_count"),
last_retry_at: row.get("last_retry_at"),
created_at: row.get("created_at"),
updated_at: row.get("updated_at"),
})
}
/// Creates a failed document record from an existing document
pub async fn create_failed_document_from_document(&self, document: &Document, failure_reason: &str, failure_stage: &str, error_message: Option<&str>) -> Result<FailedDocument> {
let failed_doc = FailedDocument {
id: Uuid::new_v4(),
user_id: document.user_id,
filename: document.filename.clone(),
original_filename: Some(document.original_filename.clone()),
original_path: Some(document.file_path.clone()),
file_path: Some(document.file_path.clone()),
file_size: Some(document.file_size),
file_hash: document.file_hash.clone(),
mime_type: Some(document.mime_type.clone()),
content: document.content.clone(),
tags: document.tags.clone(),
ocr_text: document.ocr_text.clone(),
ocr_confidence: document.ocr_confidence,
ocr_word_count: document.ocr_word_count,
ocr_processing_time_ms: document.ocr_processing_time_ms,
failure_reason: failure_reason.to_string(),
failure_stage: failure_stage.to_string(),
existing_document_id: Some(document.id),
ingestion_source: "document_processing".to_string(),
error_message: error_message.map(|s| s.to_string()),
retry_count: Some(0),
last_retry_at: None,
created_at: chrono::Utc::now(),
updated_at: chrono::Utc::now(),
};
self.create_failed_document(failed_doc).await
}
/// Updates OCR retry information for a document
pub async fn update_document_ocr_retry(&self, document_id: Uuid, retry_count: i32, failure_reason: Option<&str>) -> Result<()> {
sqlx::query(
r#"
UPDATE documents
SET ocr_retry_count = $2, ocr_failure_reason = $3, updated_at = NOW()
WHERE id = $1
"#
)
.bind(document_id)
.bind(retry_count)
.bind(failure_reason)
.execute(&self.pool)
.await?;
Ok(())
}
/// Marks documents as completed OCR processing
pub async fn mark_documents_ocr_completed(&self, document_ids: &[Uuid]) -> Result<u64> {
if document_ids.is_empty() {
return Ok(0);
}
let result = sqlx::query(
r#"
UPDATE documents
SET ocr_status = 'completed', ocr_completed_at = NOW(), updated_at = NOW()
WHERE id = ANY($1)
"#
)
.bind(document_ids)
.execute(&self.pool)
.await?;
Ok(result.rows_affected())
}
/// Counts documents by OCR status
pub async fn count_documents_by_ocr_status(&self, user_id: Uuid, user_role: UserRole) -> Result<(i64, i64, i64, i64)> {
let mut query = QueryBuilder::<Postgres>::new(
r#"
SELECT
COUNT(*) as total,
COUNT(CASE WHEN ocr_status IS NULL OR ocr_status = 'pending' THEN 1 END) as pending,
COUNT(CASE WHEN ocr_status = 'completed' THEN 1 END) as completed,
COUNT(CASE WHEN ocr_status = 'failed' THEN 1 END) as failed
FROM documents WHERE 1=1
"#
);
apply_role_based_filter(&mut query, user_id, user_role);
let row = query.build().fetch_one(&self.pool).await?;
Ok((
row.get("total"),
row.get("pending"),
row.get("completed"),
row.get("failed"),
))
}
}

259
src/db/documents/search.rs Normal file
View File

@ -0,0 +1,259 @@
use anyhow::Result;
use sqlx::{QueryBuilder, Postgres};
use uuid::Uuid;
use crate::models::{Document, UserRole, SearchRequest, SearchMode, SearchSnippet, HighlightRange, EnhancedDocumentResponse};
use super::helpers::{map_row_to_document, apply_role_based_filter, apply_pagination, find_word_boundary, DOCUMENT_FIELDS};
use crate::db::Database;
impl Database {
/// Performs basic document search with PostgreSQL full-text search
pub async fn search_documents(&self, user_id: Uuid, search_request: &SearchRequest) -> Result<Vec<Document>> {
let mut query = QueryBuilder::<Postgres>::new("SELECT ");
query.push(DOCUMENT_FIELDS);
query.push(" FROM documents WHERE user_id = ");
query.push_bind(user_id);
// Add search conditions
if !search_request.query.trim().is_empty() {
query.push(" AND (to_tsvector('english', COALESCE(content, '')) @@ plainto_tsquery('english', ");
query.push_bind(&search_request.query);
query.push(") OR to_tsvector('english', COALESCE(ocr_text, '')) @@ plainto_tsquery('english', ");
query.push_bind(&search_request.query);
query.push("))");
}
// Add tag filtering
if let Some(ref tags) = search_request.tags {
if !tags.is_empty() {
query.push(" AND tags && ");
query.push_bind(tags);
}
}
// Add MIME type filtering
if let Some(ref mime_types) = search_request.mime_types {
if !mime_types.is_empty() {
query.push(" AND mime_type = ANY(");
query.push_bind(mime_types);
query.push(")");
}
}
query.push(" ORDER BY created_at DESC");
let limit = search_request.limit.unwrap_or(25);
let offset = search_request.offset.unwrap_or(0);
apply_pagination(&mut query, limit, offset);
let rows = query.build().fetch_all(&self.pool).await?;
Ok(rows.iter().map(map_row_to_document).collect())
}
/// Enhanced search with snippets and ranking
pub async fn enhanced_search_documents(&self, user_id: Uuid, search_request: &SearchRequest) -> Result<Vec<EnhancedDocumentResponse>> {
self.enhanced_search_documents_with_role(user_id, UserRole::User, search_request).await
}
/// Enhanced search with role-based access control
pub async fn enhanced_search_documents_with_role(&self, user_id: Uuid, user_role: UserRole, search_request: &SearchRequest) -> Result<Vec<EnhancedDocumentResponse>> {
let search_query = search_request.query.trim();
let include_snippets = search_request.include_snippets.unwrap_or(true);
let snippet_length = search_request.snippet_length.unwrap_or(200) as usize;
let mut query = QueryBuilder::<Postgres>::new("SELECT ");
query.push(DOCUMENT_FIELDS);
// Add search ranking if there's a query
if !search_query.is_empty() {
match search_request.search_mode.as_ref().unwrap_or(&SearchMode::Simple) {
SearchMode::Simple => {
query.push(", ts_rank(to_tsvector('english', COALESCE(content, '') || ' ' || COALESCE(ocr_text, '')), plainto_tsquery('english', ");
query.push_bind(search_query);
query.push(")) as search_rank");
}
SearchMode::Phrase => {
query.push(", ts_rank(to_tsvector('english', COALESCE(content, '') || ' ' || COALESCE(ocr_text, '')), phraseto_tsquery('english', ");
query.push_bind(search_query);
query.push(")) as search_rank");
}
SearchMode::Boolean => {
query.push(", ts_rank(to_tsvector('english', COALESCE(content, '') || ' ' || COALESCE(ocr_text, '')), to_tsquery('english', ");
query.push_bind(search_query);
query.push(")) as search_rank");
}
SearchMode::Fuzzy => {
query.push(", similarity(COALESCE(content, '') || ' ' || COALESCE(ocr_text, ''), ");
query.push_bind(search_query);
query.push(") as search_rank");
}
}
} else {
query.push(", 0.0 as search_rank");
}
query.push(" FROM documents WHERE 1=1");
apply_role_based_filter(&mut query, user_id, user_role);
// Add search conditions
if !search_query.is_empty() {
match search_request.search_mode.as_ref().unwrap_or(&SearchMode::Simple) {
SearchMode::Simple => {
query.push(" AND (to_tsvector('english', COALESCE(content, '')) @@ plainto_tsquery('english', ");
query.push_bind(search_query);
query.push(") OR to_tsvector('english', COALESCE(ocr_text, '')) @@ plainto_tsquery('english', ");
query.push_bind(search_query);
query.push("))");
}
SearchMode::Phrase => {
query.push(" AND (to_tsvector('english', COALESCE(content, '')) @@ phraseto_tsquery('english', ");
query.push_bind(search_query);
query.push(") OR to_tsvector('english', COALESCE(ocr_text, '')) @@ phraseto_tsquery('english', ");
query.push_bind(search_query);
query.push("))");
}
SearchMode::Boolean => {
query.push(" AND (to_tsvector('english', COALESCE(content, '')) @@ to_tsquery('english', ");
query.push_bind(search_query);
query.push(") OR to_tsvector('english', COALESCE(ocr_text, '')) @@ to_tsquery('english', ");
query.push_bind(search_query);
query.push("))");
}
SearchMode::Fuzzy => {
query.push(" AND similarity(COALESCE(content, '') || ' ' || COALESCE(ocr_text, ''), ");
query.push_bind(search_query);
query.push(") > 0.3");
}
}
}
// Add filtering
if let Some(ref tags) = search_request.tags {
if !tags.is_empty() {
query.push(" AND tags && ");
query.push_bind(tags);
}
}
if let Some(ref mime_types) = search_request.mime_types {
if !mime_types.is_empty() {
query.push(" AND mime_type = ANY(");
query.push_bind(mime_types);
query.push(")");
}
}
query.push(" ORDER BY search_rank DESC, created_at DESC");
let limit = search_request.limit.unwrap_or(25);
let offset = search_request.offset.unwrap_or(0);
apply_pagination(&mut query, limit, offset);
let rows = query.build().fetch_all(&self.pool).await?;
let mut results = Vec::new();
for row in rows {
let document = map_row_to_document(&row);
let search_rank: f32 = row.try_get("search_rank").unwrap_or(0.0);
let snippets = if include_snippets && !search_query.is_empty() {
self.generate_snippets(&document, search_query, snippet_length).await
} else {
Vec::new()
};
results.push(EnhancedDocumentResponse {
id: document.id,
filename: document.filename,
original_filename: document.original_filename,
file_size: document.file_size,
mime_type: document.mime_type,
tags: document.tags,
created_at: document.created_at,
has_ocr_text: document.ocr_text.is_some(),
ocr_confidence: document.ocr_confidence,
ocr_word_count: document.ocr_word_count,
ocr_processing_time_ms: document.ocr_processing_time_ms,
ocr_status: document.ocr_status,
search_rank: Some(search_rank),
snippets,
});
}
Ok(results)
}
/// Generates search snippets with highlighted matches
pub async fn generate_snippets(&self, document: &Document, search_query: &str, snippet_length: usize) -> Vec<SearchSnippet> {
let mut snippets = Vec::new();
let search_terms: Vec<&str> = search_query.split_whitespace().collect();
// Search in content and OCR text
let texts = vec![
("content", document.content.as_deref().unwrap_or("")),
("ocr_text", document.ocr_text.as_deref().unwrap_or(""))
];
for (source, text) in texts {
if text.is_empty() {
continue;
}
let text_lower = text.to_lowercase();
for term in &search_terms {
let term_lower = term.to_lowercase();
let mut start_pos = 0;
while let Some(match_pos) = text_lower[start_pos..].find(&term_lower) {
let absolute_match_pos = start_pos + match_pos;
// Calculate snippet boundaries
let snippet_start = if absolute_match_pos >= snippet_length / 2 {
find_word_boundary(text, absolute_match_pos - snippet_length / 2, false)
} else {
0
};
let snippet_end = {
let desired_end = snippet_start + snippet_length;
if desired_end < text.len() {
find_word_boundary(text, desired_end, true)
} else {
text.len()
}
};
let snippet_text = &text[snippet_start..snippet_end];
// Calculate highlight range relative to snippet
let highlight_start = absolute_match_pos - snippet_start;
let highlight_end = highlight_start + term.len();
let highlight_ranges = vec![HighlightRange {
start: highlight_start as i32,
end: highlight_end as i32,
}];
snippets.push(SearchSnippet {
text: snippet_text.to_string(),
start_offset: snippet_start as i32,
end_offset: snippet_end as i32,
highlight_ranges,
});
start_pos = absolute_match_pos + term.len();
// Limit snippets per term
if snippets.len() >= 3 {
break;
}
}
}
}
// Remove duplicates and limit total snippets
snippets.truncate(5);
snippets
}
}