From 58aaedf4a6b2412ff0ce20c490b0277dbe90a3f7 Mon Sep 17 00:00:00 2001 From: perf3ct Date: Tue, 17 Jun 2025 15:41:42 +0000 Subject: [PATCH] feat(server): add hash for documents --- .../20240618000001_add_file_hash_field.sql | 36 +++++++++ src/batch_ingest.rs | 30 +++++++- src/db/documents.rs | 58 +++++++++++++-- src/file_service.rs | 2 + src/models.rs | 1 + src/routes/documents.rs | 27 +++---- src/routes/webdav/webdav_sync.rs | 74 +++++++++---------- src/source_sync.rs | 70 ++++++++---------- src/tests/file_service_tests.rs | 2 + src/watcher.rs | 4 + 10 files changed, 202 insertions(+), 102 deletions(-) create mode 100644 migrations/20240618000001_add_file_hash_field.sql diff --git a/migrations/20240618000001_add_file_hash_field.sql b/migrations/20240618000001_add_file_hash_field.sql new file mode 100644 index 0000000..d8cd8a8 --- /dev/null +++ b/migrations/20240618000001_add_file_hash_field.sql @@ -0,0 +1,36 @@ +-- Add file_hash field to documents table for efficient duplicate detection +-- This will store SHA256 hash of file content to prevent duplicates + +-- Add the file_hash column to documents table +ALTER TABLE documents +ADD COLUMN IF NOT EXISTS file_hash VARCHAR(64); + +-- Create unique index to prevent hash duplicates per user +-- This enforces that each user cannot have duplicate file content +CREATE UNIQUE INDEX IF NOT EXISTS idx_documents_user_file_hash +ON documents(user_id, file_hash) +WHERE file_hash IS NOT NULL; + +-- Create additional index for efficient hash lookups +CREATE INDEX IF NOT EXISTS idx_documents_file_hash +ON documents(file_hash) +WHERE file_hash IS NOT NULL; + +-- Add helpful comments +COMMENT ON COLUMN documents.file_hash IS 'SHA256 hash of file content for duplicate detection - prevents same content from being stored multiple times per user'; + +-- Create a view for duplicate analysis +CREATE OR REPLACE VIEW document_duplicates_analysis AS +SELECT + file_hash, + COUNT(*) as duplicate_count, + array_agg(DISTINCT user_id ORDER BY user_id) as users_with_duplicates, + array_agg(filename ORDER BY created_at) as filenames, + MIN(created_at) as first_upload, + MAX(created_at) as last_upload, + SUM(file_size) as total_storage_used +FROM documents +WHERE file_hash IS NOT NULL +GROUP BY file_hash +HAVING COUNT(*) > 1 +ORDER BY duplicate_count DESC, total_storage_used DESC; \ No newline at end of file diff --git a/src/batch_ingest.rs b/src/batch_ingest.rs index 8908144..d930ef7 100644 --- a/src/batch_ingest.rs +++ b/src/batch_ingest.rs @@ -6,6 +6,7 @@ use tokio::sync::Semaphore; use tracing::{error, info, warn}; use uuid::Uuid; use walkdir::WalkDir; +use sha2::{Sha256, Digest}; use crate::{ config::Config, @@ -188,6 +189,25 @@ async fn process_single_file( // Read file data let file_data = fs::read(&path).await?; + // Calculate file hash for deduplication + let file_hash = calculate_file_hash(&file_data); + + // Check for duplicate content using efficient hash lookup + match db.get_document_by_user_and_hash(user_id, &file_hash).await { + Ok(Some(existing_doc)) => { + info!("Skipping duplicate file: {} matches existing document {} (hash: {})", + filename, existing_doc.original_filename, &file_hash[..8]); + return Ok(None); // Skip processing duplicate + } + Ok(None) => { + info!("No duplicate content found for hash {}, proceeding with file processing", &file_hash[..8]); + } + Err(e) => { + warn!("Error checking for duplicate hash {}: {}", &file_hash[..8], e); + // Continue processing even if duplicate check fails + } + } + let mime_type = mime_guess::from_path(&filename) .first_or_octet_stream() .to_string(); @@ -195,7 +215,7 @@ async fn process_single_file( // Save file let file_path = file_service.save_file(&filename, &file_data).await?; - // Create document + // Create document with hash let document = file_service.create_document( &filename, &filename, @@ -203,6 +223,7 @@ async fn process_single_file( file_size, &mime_type, user_id, + Some(file_hash), ); // Save to database (without OCR) @@ -224,4 +245,11 @@ fn calculate_priority(file_size: i64) -> i32 { ..=MB50 => 4, // 10-50MB: low priority _ => 2, // > 50MB: lowest priority } +} + +fn calculate_file_hash(data: &[u8]) -> String { + let mut hasher = Sha256::new(); + hasher.update(data); + let result = hasher.finalize(); + format!("{:x}", result) } \ No newline at end of file diff --git a/src/db/documents.rs b/src/db/documents.rs index de6cd74..a497741 100644 --- a/src/db/documents.rs +++ b/src/db/documents.rs @@ -9,9 +9,9 @@ impl Database { pub async fn create_document(&self, document: Document) -> Result { let row = sqlx::query( r#" - INSERT INTO documents (id, filename, original_filename, file_path, file_size, mime_type, content, ocr_text, ocr_confidence, ocr_word_count, ocr_processing_time_ms, ocr_status, ocr_error, ocr_completed_at, tags, created_at, updated_at, user_id) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18) - RETURNING id, filename, original_filename, file_path, file_size, mime_type, content, ocr_text, ocr_confidence, ocr_word_count, ocr_processing_time_ms, ocr_status, ocr_error, ocr_completed_at, tags, created_at, updated_at, user_id + INSERT INTO documents (id, filename, original_filename, file_path, file_size, mime_type, content, ocr_text, ocr_confidence, ocr_word_count, ocr_processing_time_ms, ocr_status, ocr_error, ocr_completed_at, tags, created_at, updated_at, user_id, file_hash) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19) + RETURNING id, filename, original_filename, file_path, file_size, mime_type, content, ocr_text, ocr_confidence, ocr_word_count, ocr_processing_time_ms, ocr_status, ocr_error, ocr_completed_at, tags, created_at, updated_at, user_id, file_hash "# ) .bind(document.id) @@ -32,6 +32,7 @@ impl Database { .bind(document.created_at) .bind(document.updated_at) .bind(document.user_id) + .bind(&document.file_hash) .fetch_one(&self.pool) .await?; @@ -54,6 +55,7 @@ impl Database { created_at: row.get("created_at"), updated_at: row.get("updated_at"), user_id: row.get("user_id"), + file_hash: row.get("file_hash"), }) } @@ -61,7 +63,7 @@ impl Database { let query = if user_role == crate::models::UserRole::Admin { // Admins can see all documents r#" - SELECT id, filename, original_filename, file_path, file_size, mime_type, content, ocr_text, ocr_confidence, ocr_word_count, ocr_processing_time_ms, ocr_status, ocr_error, ocr_completed_at, tags, created_at, updated_at, user_id + SELECT id, filename, original_filename, file_path, file_size, mime_type, content, ocr_text, ocr_confidence, ocr_word_count, ocr_processing_time_ms, ocr_status, ocr_error, ocr_completed_at, tags, created_at, updated_at, user_id, file_hash FROM documents ORDER BY created_at DESC LIMIT $1 OFFSET $2 @@ -69,7 +71,7 @@ impl Database { } else { // Regular users can only see their own documents r#" - SELECT id, filename, original_filename, file_path, file_size, mime_type, content, ocr_text, ocr_confidence, ocr_word_count, ocr_processing_time_ms, ocr_status, ocr_error, ocr_completed_at, tags, created_at, updated_at, user_id + SELECT id, filename, original_filename, file_path, file_size, mime_type, content, ocr_text, ocr_confidence, ocr_word_count, ocr_processing_time_ms, ocr_status, ocr_error, ocr_completed_at, tags, created_at, updated_at, user_id, file_hash FROM documents WHERE user_id = $3 ORDER BY created_at DESC @@ -113,6 +115,7 @@ impl Database { created_at: row.get("created_at"), updated_at: row.get("updated_at"), user_id: row.get("user_id"), + file_hash: row.get("file_hash"), }) .collect(); @@ -211,6 +214,7 @@ impl Database { created_at: row.get("created_at"), updated_at: row.get("updated_at"), user_id: row.get("user_id"), + file_hash: row.get("file_hash"), }) .collect(); @@ -297,6 +301,7 @@ impl Database { created_at: row.get("created_at"), updated_at: row.get("updated_at"), user_id: row.get("user_id"), + file_hash: row.get("file_hash"), }) .collect(); @@ -337,6 +342,7 @@ impl Database { created_at: row.get("created_at"), updated_at: row.get("updated_at"), user_id: row.get("user_id"), + file_hash: row.get("file_hash"), }) .collect(); @@ -407,6 +413,7 @@ impl Database { created_at: row.get("created_at"), updated_at: row.get("updated_at"), user_id: row.get("user_id"), + file_hash: row.get("file_hash"), }) .collect(); @@ -1122,4 +1129,45 @@ impl Database { None => Ok(None), } } + + /// Check if a document with the given file hash already exists for the user + pub async fn get_document_by_user_and_hash(&self, user_id: Uuid, file_hash: &str) -> Result> { + let row = sqlx::query( + r#" + SELECT id, filename, original_filename, file_path, file_size, mime_type, content, ocr_text, ocr_confidence, ocr_word_count, ocr_processing_time_ms, ocr_status, ocr_error, ocr_completed_at, tags, created_at, updated_at, user_id, file_hash + FROM documents + WHERE user_id = $1 AND file_hash = $2 + LIMIT 1 + "# + ) + .bind(user_id) + .bind(file_hash) + .fetch_optional(&self.pool) + .await?; + + match row { + Some(row) => Ok(Some(Document { + id: row.get("id"), + filename: row.get("filename"), + original_filename: row.get("original_filename"), + file_path: row.get("file_path"), + file_size: row.get("file_size"), + mime_type: row.get("mime_type"), + content: row.get("content"), + ocr_text: row.get("ocr_text"), + ocr_confidence: row.get("ocr_confidence"), + ocr_word_count: row.get("ocr_word_count"), + ocr_processing_time_ms: row.get("ocr_processing_time_ms"), + ocr_status: row.get("ocr_status"), + ocr_error: row.get("ocr_error"), + ocr_completed_at: row.get("ocr_completed_at"), + tags: row.get("tags"), + created_at: row.get("created_at"), + updated_at: row.get("updated_at"), + user_id: row.get("user_id"), + file_hash: row.get("file_hash"), + })), + None => Ok(None), + } + } } \ No newline at end of file diff --git a/src/file_service.rs b/src/file_service.rs index 7c03bcc..2b4ef15 100644 --- a/src/file_service.rs +++ b/src/file_service.rs @@ -156,6 +156,7 @@ impl FileService { file_size: i64, mime_type: &str, user_id: Uuid, + file_hash: Option, ) -> Document { Document { id: Uuid::new_v4(), @@ -176,6 +177,7 @@ impl FileService { created_at: Utc::now(), updated_at: Utc::now(), user_id, + file_hash, } } diff --git a/src/models.rs b/src/models.rs index cd4b7f7..d5cc1ad 100644 --- a/src/models.rs +++ b/src/models.rs @@ -98,6 +98,7 @@ pub struct Document { pub created_at: DateTime, pub updated_at: DateTime, pub user_id: Uuid, + pub file_hash: Option, } #[derive(Debug, Serialize, Deserialize, ToSchema)] diff --git a/src/routes/documents.rs b/src/routes/documents.rs index 4e26d96..52a4491 100644 --- a/src/routes/documents.rs +++ b/src/routes/documents.rs @@ -142,21 +142,17 @@ async fn upload_document( // Calculate file hash for deduplication let file_hash = calculate_file_hash(&data); - // Check if this exact file content already exists in the system - // This prevents uploading and processing duplicate files - if let Ok(existing_docs) = state.db.get_documents_by_user_with_role(auth_user.user.id, auth_user.user.role, 1000, 0).await { - for existing_doc in existing_docs { - // Quick size check first (much faster than hash comparison) - if existing_doc.file_size == file_size { - // Read the existing file and compare hashes - if let Ok(existing_file_data) = tokio::fs::read(&existing_doc.file_path).await { - let existing_hash = calculate_file_hash(&existing_file_data); - if file_hash == existing_hash { - // Return the existing document instead of creating a duplicate - return Ok(Json(existing_doc.into())); - } - } - } + // Check if this exact file content already exists using efficient hash lookup + match state.db.get_document_by_user_and_hash(auth_user.user.id, &file_hash).await { + Ok(Some(existing_doc)) => { + // Return the existing document instead of creating a duplicate + return Ok(Json(existing_doc.into())); + } + Ok(None) => { + // No duplicate found, proceed with upload + } + Err(_) => { + // Continue even if duplicate check fails } } @@ -176,6 +172,7 @@ async fn upload_document( file_size, &mime_type, auth_user.user.id, + Some(file_hash), ); let saved_document = state diff --git a/src/routes/webdav/webdav_sync.rs b/src/routes/webdav/webdav_sync.rs index bd644ca..a558837 100644 --- a/src/routes/webdav/webdav_sync.rs +++ b/src/routes/webdav/webdav_sync.rs @@ -276,51 +276,42 @@ async fn process_single_file( // Calculate file hash for deduplication let file_hash = calculate_file_hash(&file_data); - // Check if this exact file content already exists for this user - // This prevents downloading and processing duplicate files from WebDAV + // Check if this exact file content already exists for this user using efficient hash lookup info!("Checking for duplicate content for user {}: {} (hash: {}, size: {} bytes)", user_id, file_info.name, &file_hash[..8], file_data.len()); - // Query documents with the same file size for this user only - let size_filter = file_data.len() as i64; - if let Ok(existing_docs) = state.db.get_documents_by_user_with_role(user_id, crate::models::UserRole::User, 1000, 0).await { - let matching_docs: Vec<_> = existing_docs.into_iter() - .filter(|doc| doc.file_size == size_filter) - .collect(); + // Use efficient database hash lookup instead of reading all documents + match state.db.get_document_by_user_and_hash(user_id, &file_hash).await { + Ok(Some(existing_doc)) => { + info!("Found duplicate content for user {}: {} matches existing document {} (hash: {})", + user_id, file_info.name, existing_doc.original_filename, &file_hash[..8]); - info!("Found {} documents with same size for user {}", matching_docs.len(), user_id); - - for existing_doc in matching_docs { - // Read the existing file and compare hashes - if let Ok(existing_file_data) = tokio::fs::read(&existing_doc.file_path).await { - let existing_hash = calculate_file_hash(&existing_file_data); - if file_hash == existing_hash { - info!("Found duplicate content for user {}: {} matches existing document {}", - user_id, file_info.name, existing_doc.original_filename); - - // Record this WebDAV file as a duplicate but link to existing document - let webdav_file = CreateWebDAVFile { - user_id, - webdav_path: file_info.path.clone(), - etag: file_info.etag.clone(), - last_modified: file_info.last_modified, - file_size: file_info.size, - mime_type: file_info.mime_type.clone(), - document_id: Some(existing_doc.id), // Link to existing document - sync_status: "duplicate_content".to_string(), - sync_error: None, - }; - - if let Err(e) = state.db.create_or_update_webdav_file(&webdav_file).await { - error!("Failed to record duplicate WebDAV file: {}", e); - } - - info!("WebDAV file marked as duplicate_content, skipping processing"); - return Ok(false); // Not processed (duplicate) - } - } else { - warn!("Could not read existing file for hash comparison: {}", existing_doc.file_path); + // Record this WebDAV file as a duplicate but link to existing document + let webdav_file = CreateWebDAVFile { + user_id, + webdav_path: file_info.path.clone(), + etag: file_info.etag.clone(), + last_modified: file_info.last_modified, + file_size: file_info.size, + mime_type: file_info.mime_type.clone(), + document_id: Some(existing_doc.id), // Link to existing document + sync_status: "duplicate_content".to_string(), + sync_error: None, + }; + + if let Err(e) = state.db.create_or_update_webdav_file(&webdav_file).await { + error!("Failed to record duplicate WebDAV file: {}", e); } + + info!("WebDAV file marked as duplicate_content, skipping processing"); + return Ok(false); // Not processed (duplicate) + } + Ok(None) => { + info!("No duplicate content found for hash {}, proceeding with file processing", &file_hash[..8]); + } + Err(e) => { + warn!("Error checking for duplicate hash {}: {}", &file_hash[..8], e); + // Continue processing even if duplicate check fails } } @@ -330,7 +321,7 @@ async fn process_single_file( let saved_file_path = file_service.save_file(&file_info.name, &file_data).await .map_err(|e| format!("Failed to save {}: {}", file_info.name, e))?; - // Create document record + // Create document record with hash let file_service = FileService::new(state.config.upload_path.clone()); let document = file_service.create_document( &file_info.name, @@ -339,6 +330,7 @@ async fn process_single_file( file_data.len() as i64, &file_info.mime_type, user_id, + Some(file_hash.clone()), // Store the calculated hash ); // Save document to database diff --git a/src/source_sync.rs b/src/source_sync.rs index 6ccda79..cffe4c3 100644 --- a/src/source_sync.rs +++ b/src/source_sync.rs @@ -534,25 +534,19 @@ impl SourceSyncService { // Calculate file hash for deduplication let file_hash = Self::calculate_file_hash(&file_data); - // Check for duplicate content - if let Ok(existing_docs) = state.db.get_documents_by_user_with_role( - user_id, - crate::models::UserRole::User, - 1000, - 0 - ).await { - let matching_docs: Vec<_> = existing_docs.into_iter() - .filter(|doc| doc.file_size == file_data.len() as i64) - .collect(); - - for existing_doc in matching_docs { - if let Ok(existing_file_data) = tokio::fs::read(&existing_doc.file_path).await { - let existing_hash = Self::calculate_file_hash(&existing_file_data); - if file_hash == existing_hash { - info!("File content already exists, skipping: {}", file_info.path); - return Ok(false); - } - } + // Check for duplicate content using efficient hash lookup + match state.db.get_document_by_user_and_hash(user_id, &file_hash).await { + Ok(Some(existing_doc)) => { + info!("File content already exists for user {}: {} matches existing document {} (hash: {})", + user_id, file_info.name, existing_doc.original_filename, &file_hash[..8]); + return Ok(false); // Skip processing duplicate + } + Ok(None) => { + info!("No duplicate content found for hash {}, proceeding with file processing", &file_hash[..8]); + } + Err(e) => { + warn!("Error checking for duplicate hash {}: {}", &file_hash[..8], e); + // Continue processing even if duplicate check fails } } @@ -561,7 +555,7 @@ impl SourceSyncService { let saved_file_path = file_service.save_file(&file_info.name, &file_data).await .map_err(|e| anyhow!("Failed to save {}: {}", file_info.name, e))?; - // Create document record + // Create document record with hash let document = file_service.create_document( &file_info.name, &file_info.name, @@ -569,6 +563,7 @@ impl SourceSyncService { file_data.len() as i64, &file_info.mime_type, user_id, + Some(file_hash.clone()), // Store the calculated hash ); let created_document = state.db.create_document(document).await @@ -655,25 +650,19 @@ impl SourceSyncService { // Calculate file hash for deduplication let file_hash = Self::calculate_file_hash(&file_data); - // Check for duplicate content - if let Ok(existing_docs) = state.db.get_documents_by_user_with_role( - user_id, - crate::models::UserRole::User, - 1000, - 0 - ).await { - let matching_docs: Vec<_> = existing_docs.into_iter() - .filter(|doc| doc.file_size == file_data.len() as i64) - .collect(); - - for existing_doc in matching_docs { - if let Ok(existing_file_data) = tokio::fs::read(&existing_doc.file_path).await { - let existing_hash = Self::calculate_file_hash(&existing_file_data); - if file_hash == existing_hash { - info!("File content already exists, skipping: {}", file_info.path); - return Ok(false); - } - } + // Check for duplicate content using efficient hash lookup + match state.db.get_document_by_user_and_hash(user_id, &file_hash).await { + Ok(Some(existing_doc)) => { + info!("File content already exists for user {}: {} matches existing document {} (hash: {})", + user_id, file_info.name, existing_doc.original_filename, &file_hash[..8]); + return Ok(false); // Skip processing duplicate + } + Ok(None) => { + info!("No duplicate content found for hash {}, proceeding with file processing", &file_hash[..8]); + } + Err(e) => { + warn!("Error checking for duplicate hash {}: {}", &file_hash[..8], e); + // Continue processing even if duplicate check fails } } @@ -688,7 +677,7 @@ impl SourceSyncService { let saved_file_path = file_service.save_file(&file_info.name, &file_data).await .map_err(|e| anyhow!("Failed to save {}: {}", file_info.name, e))?; - // Create document record + // Create document record with hash let document = file_service.create_document( &file_info.name, &file_info.name, @@ -696,6 +685,7 @@ impl SourceSyncService { file_data.len() as i64, &file_info.mime_type, user_id, + Some(file_hash.clone()), // Store the calculated hash ); let created_document = state.db.create_document(document).await diff --git a/src/tests/file_service_tests.rs b/src/tests/file_service_tests.rs index a44efcc..cd292df 100644 --- a/src/tests/file_service_tests.rs +++ b/src/tests/file_service_tests.rs @@ -72,6 +72,7 @@ mod tests { 1024, "application/pdf", user_id, + Some("abcd1234hash".to_string()), ); assert_eq!(document.filename, "saved_file.pdf"); @@ -80,6 +81,7 @@ mod tests { assert_eq!(document.file_size, 1024); assert_eq!(document.mime_type, "application/pdf"); assert_eq!(document.user_id, user_id); + assert_eq!(document.file_hash, Some("abcd1234hash".to_string())); assert!(document.content.is_none()); assert!(document.ocr_text.is_none()); assert!(document.tags.is_empty()); diff --git a/src/watcher.rs b/src/watcher.rs index 16c632c..626ef16 100644 --- a/src/watcher.rs +++ b/src/watcher.rs @@ -362,6 +362,9 @@ async fn process_file( let saved_file_path = file_service.save_file(&filename, &file_data).await?; + // Calculate file hash for deduplication + let file_hash = calculate_file_hash(&file_data); + let document = file_service.create_document( &filename, &filename, @@ -369,6 +372,7 @@ async fn process_file( file_size, &mime_type, admin_user_id, + Some(file_hash), ); let created_doc = db.create_document(document).await?;