feat(server): add hash for documents

This commit is contained in:
perf3ct 2025-06-17 15:41:42 +00:00
parent 4d32163ccc
commit 58aaedf4a6
10 changed files with 202 additions and 102 deletions

View File

@ -0,0 +1,36 @@
-- Add file_hash field to documents table for efficient duplicate detection
-- This will store SHA256 hash of file content to prevent duplicates
-- Add the file_hash column to documents table
ALTER TABLE documents
ADD COLUMN IF NOT EXISTS file_hash VARCHAR(64);
-- Create unique index to prevent hash duplicates per user
-- This enforces that each user cannot have duplicate file content
CREATE UNIQUE INDEX IF NOT EXISTS idx_documents_user_file_hash
ON documents(user_id, file_hash)
WHERE file_hash IS NOT NULL;
-- Create additional index for efficient hash lookups
CREATE INDEX IF NOT EXISTS idx_documents_file_hash
ON documents(file_hash)
WHERE file_hash IS NOT NULL;
-- Add helpful comments
COMMENT ON COLUMN documents.file_hash IS 'SHA256 hash of file content for duplicate detection - prevents same content from being stored multiple times per user';
-- Create a view for duplicate analysis
CREATE OR REPLACE VIEW document_duplicates_analysis AS
SELECT
file_hash,
COUNT(*) as duplicate_count,
array_agg(DISTINCT user_id ORDER BY user_id) as users_with_duplicates,
array_agg(filename ORDER BY created_at) as filenames,
MIN(created_at) as first_upload,
MAX(created_at) as last_upload,
SUM(file_size) as total_storage_used
FROM documents
WHERE file_hash IS NOT NULL
GROUP BY file_hash
HAVING COUNT(*) > 1
ORDER BY duplicate_count DESC, total_storage_used DESC;

View File

@ -6,6 +6,7 @@ use tokio::sync::Semaphore;
use tracing::{error, info, warn};
use uuid::Uuid;
use walkdir::WalkDir;
use sha2::{Sha256, Digest};
use crate::{
config::Config,
@ -188,6 +189,25 @@ async fn process_single_file(
// Read file data
let file_data = fs::read(&path).await?;
// Calculate file hash for deduplication
let file_hash = calculate_file_hash(&file_data);
// Check for duplicate content using efficient hash lookup
match db.get_document_by_user_and_hash(user_id, &file_hash).await {
Ok(Some(existing_doc)) => {
info!("Skipping duplicate file: {} matches existing document {} (hash: {})",
filename, existing_doc.original_filename, &file_hash[..8]);
return Ok(None); // Skip processing duplicate
}
Ok(None) => {
info!("No duplicate content found for hash {}, proceeding with file processing", &file_hash[..8]);
}
Err(e) => {
warn!("Error checking for duplicate hash {}: {}", &file_hash[..8], e);
// Continue processing even if duplicate check fails
}
}
let mime_type = mime_guess::from_path(&filename)
.first_or_octet_stream()
.to_string();
@ -195,7 +215,7 @@ async fn process_single_file(
// Save file
let file_path = file_service.save_file(&filename, &file_data).await?;
// Create document
// Create document with hash
let document = file_service.create_document(
&filename,
&filename,
@ -203,6 +223,7 @@ async fn process_single_file(
file_size,
&mime_type,
user_id,
Some(file_hash),
);
// Save to database (without OCR)
@ -225,3 +246,10 @@ fn calculate_priority(file_size: i64) -> i32 {
_ => 2, // > 50MB: lowest priority
}
}
fn calculate_file_hash(data: &[u8]) -> String {
let mut hasher = Sha256::new();
hasher.update(data);
let result = hasher.finalize();
format!("{:x}", result)
}

View File

@ -9,9 +9,9 @@ impl Database {
pub async fn create_document(&self, document: Document) -> Result<Document> {
let row = sqlx::query(
r#"
INSERT INTO documents (id, filename, original_filename, file_path, file_size, mime_type, content, ocr_text, ocr_confidence, ocr_word_count, ocr_processing_time_ms, ocr_status, ocr_error, ocr_completed_at, tags, created_at, updated_at, user_id)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18)
RETURNING id, filename, original_filename, file_path, file_size, mime_type, content, ocr_text, ocr_confidence, ocr_word_count, ocr_processing_time_ms, ocr_status, ocr_error, ocr_completed_at, tags, created_at, updated_at, user_id
INSERT INTO documents (id, filename, original_filename, file_path, file_size, mime_type, content, ocr_text, ocr_confidence, ocr_word_count, ocr_processing_time_ms, ocr_status, ocr_error, ocr_completed_at, tags, created_at, updated_at, user_id, file_hash)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19)
RETURNING id, filename, original_filename, file_path, file_size, mime_type, content, ocr_text, ocr_confidence, ocr_word_count, ocr_processing_time_ms, ocr_status, ocr_error, ocr_completed_at, tags, created_at, updated_at, user_id, file_hash
"#
)
.bind(document.id)
@ -32,6 +32,7 @@ impl Database {
.bind(document.created_at)
.bind(document.updated_at)
.bind(document.user_id)
.bind(&document.file_hash)
.fetch_one(&self.pool)
.await?;
@ -54,6 +55,7 @@ impl Database {
created_at: row.get("created_at"),
updated_at: row.get("updated_at"),
user_id: row.get("user_id"),
file_hash: row.get("file_hash"),
})
}
@ -61,7 +63,7 @@ impl Database {
let query = if user_role == crate::models::UserRole::Admin {
// Admins can see all documents
r#"
SELECT id, filename, original_filename, file_path, file_size, mime_type, content, ocr_text, ocr_confidence, ocr_word_count, ocr_processing_time_ms, ocr_status, ocr_error, ocr_completed_at, tags, created_at, updated_at, user_id
SELECT id, filename, original_filename, file_path, file_size, mime_type, content, ocr_text, ocr_confidence, ocr_word_count, ocr_processing_time_ms, ocr_status, ocr_error, ocr_completed_at, tags, created_at, updated_at, user_id, file_hash
FROM documents
ORDER BY created_at DESC
LIMIT $1 OFFSET $2
@ -69,7 +71,7 @@ impl Database {
} else {
// Regular users can only see their own documents
r#"
SELECT id, filename, original_filename, file_path, file_size, mime_type, content, ocr_text, ocr_confidence, ocr_word_count, ocr_processing_time_ms, ocr_status, ocr_error, ocr_completed_at, tags, created_at, updated_at, user_id
SELECT id, filename, original_filename, file_path, file_size, mime_type, content, ocr_text, ocr_confidence, ocr_word_count, ocr_processing_time_ms, ocr_status, ocr_error, ocr_completed_at, tags, created_at, updated_at, user_id, file_hash
FROM documents
WHERE user_id = $3
ORDER BY created_at DESC
@ -113,6 +115,7 @@ impl Database {
created_at: row.get("created_at"),
updated_at: row.get("updated_at"),
user_id: row.get("user_id"),
file_hash: row.get("file_hash"),
})
.collect();
@ -211,6 +214,7 @@ impl Database {
created_at: row.get("created_at"),
updated_at: row.get("updated_at"),
user_id: row.get("user_id"),
file_hash: row.get("file_hash"),
})
.collect();
@ -297,6 +301,7 @@ impl Database {
created_at: row.get("created_at"),
updated_at: row.get("updated_at"),
user_id: row.get("user_id"),
file_hash: row.get("file_hash"),
})
.collect();
@ -337,6 +342,7 @@ impl Database {
created_at: row.get("created_at"),
updated_at: row.get("updated_at"),
user_id: row.get("user_id"),
file_hash: row.get("file_hash"),
})
.collect();
@ -407,6 +413,7 @@ impl Database {
created_at: row.get("created_at"),
updated_at: row.get("updated_at"),
user_id: row.get("user_id"),
file_hash: row.get("file_hash"),
})
.collect();
@ -1122,4 +1129,45 @@ impl Database {
None => Ok(None),
}
}
/// Check if a document with the given file hash already exists for the user
pub async fn get_document_by_user_and_hash(&self, user_id: Uuid, file_hash: &str) -> Result<Option<Document>> {
let row = sqlx::query(
r#"
SELECT id, filename, original_filename, file_path, file_size, mime_type, content, ocr_text, ocr_confidence, ocr_word_count, ocr_processing_time_ms, ocr_status, ocr_error, ocr_completed_at, tags, created_at, updated_at, user_id, file_hash
FROM documents
WHERE user_id = $1 AND file_hash = $2
LIMIT 1
"#
)
.bind(user_id)
.bind(file_hash)
.fetch_optional(&self.pool)
.await?;
match row {
Some(row) => Ok(Some(Document {
id: row.get("id"),
filename: row.get("filename"),
original_filename: row.get("original_filename"),
file_path: row.get("file_path"),
file_size: row.get("file_size"),
mime_type: row.get("mime_type"),
content: row.get("content"),
ocr_text: row.get("ocr_text"),
ocr_confidence: row.get("ocr_confidence"),
ocr_word_count: row.get("ocr_word_count"),
ocr_processing_time_ms: row.get("ocr_processing_time_ms"),
ocr_status: row.get("ocr_status"),
ocr_error: row.get("ocr_error"),
ocr_completed_at: row.get("ocr_completed_at"),
tags: row.get("tags"),
created_at: row.get("created_at"),
updated_at: row.get("updated_at"),
user_id: row.get("user_id"),
file_hash: row.get("file_hash"),
})),
None => Ok(None),
}
}
}

View File

@ -156,6 +156,7 @@ impl FileService {
file_size: i64,
mime_type: &str,
user_id: Uuid,
file_hash: Option<String>,
) -> Document {
Document {
id: Uuid::new_v4(),
@ -176,6 +177,7 @@ impl FileService {
created_at: Utc::now(),
updated_at: Utc::now(),
user_id,
file_hash,
}
}

View File

@ -98,6 +98,7 @@ pub struct Document {
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
pub user_id: Uuid,
pub file_hash: Option<String>,
}
#[derive(Debug, Serialize, Deserialize, ToSchema)]

View File

@ -142,21 +142,17 @@ async fn upload_document(
// Calculate file hash for deduplication
let file_hash = calculate_file_hash(&data);
// Check if this exact file content already exists in the system
// This prevents uploading and processing duplicate files
if let Ok(existing_docs) = state.db.get_documents_by_user_with_role(auth_user.user.id, auth_user.user.role, 1000, 0).await {
for existing_doc in existing_docs {
// Quick size check first (much faster than hash comparison)
if existing_doc.file_size == file_size {
// Read the existing file and compare hashes
if let Ok(existing_file_data) = tokio::fs::read(&existing_doc.file_path).await {
let existing_hash = calculate_file_hash(&existing_file_data);
if file_hash == existing_hash {
// Return the existing document instead of creating a duplicate
return Ok(Json(existing_doc.into()));
}
}
}
// Check if this exact file content already exists using efficient hash lookup
match state.db.get_document_by_user_and_hash(auth_user.user.id, &file_hash).await {
Ok(Some(existing_doc)) => {
// Return the existing document instead of creating a duplicate
return Ok(Json(existing_doc.into()));
}
Ok(None) => {
// No duplicate found, proceed with upload
}
Err(_) => {
// Continue even if duplicate check fails
}
}
@ -176,6 +172,7 @@ async fn upload_document(
file_size,
&mime_type,
auth_user.user.id,
Some(file_hash),
);
let saved_document = state

View File

@ -276,51 +276,42 @@ async fn process_single_file(
// Calculate file hash for deduplication
let file_hash = calculate_file_hash(&file_data);
// Check if this exact file content already exists for this user
// This prevents downloading and processing duplicate files from WebDAV
// Check if this exact file content already exists for this user using efficient hash lookup
info!("Checking for duplicate content for user {}: {} (hash: {}, size: {} bytes)",
user_id, file_info.name, &file_hash[..8], file_data.len());
// Query documents with the same file size for this user only
let size_filter = file_data.len() as i64;
if let Ok(existing_docs) = state.db.get_documents_by_user_with_role(user_id, crate::models::UserRole::User, 1000, 0).await {
let matching_docs: Vec<_> = existing_docs.into_iter()
.filter(|doc| doc.file_size == size_filter)
.collect();
// Use efficient database hash lookup instead of reading all documents
match state.db.get_document_by_user_and_hash(user_id, &file_hash).await {
Ok(Some(existing_doc)) => {
info!("Found duplicate content for user {}: {} matches existing document {} (hash: {})",
user_id, file_info.name, existing_doc.original_filename, &file_hash[..8]);
info!("Found {} documents with same size for user {}", matching_docs.len(), user_id);
// Record this WebDAV file as a duplicate but link to existing document
let webdav_file = CreateWebDAVFile {
user_id,
webdav_path: file_info.path.clone(),
etag: file_info.etag.clone(),
last_modified: file_info.last_modified,
file_size: file_info.size,
mime_type: file_info.mime_type.clone(),
document_id: Some(existing_doc.id), // Link to existing document
sync_status: "duplicate_content".to_string(),
sync_error: None,
};
for existing_doc in matching_docs {
// Read the existing file and compare hashes
if let Ok(existing_file_data) = tokio::fs::read(&existing_doc.file_path).await {
let existing_hash = calculate_file_hash(&existing_file_data);
if file_hash == existing_hash {
info!("Found duplicate content for user {}: {} matches existing document {}",
user_id, file_info.name, existing_doc.original_filename);
// Record this WebDAV file as a duplicate but link to existing document
let webdav_file = CreateWebDAVFile {
user_id,
webdav_path: file_info.path.clone(),
etag: file_info.etag.clone(),
last_modified: file_info.last_modified,
file_size: file_info.size,
mime_type: file_info.mime_type.clone(),
document_id: Some(existing_doc.id), // Link to existing document
sync_status: "duplicate_content".to_string(),
sync_error: None,
};
if let Err(e) = state.db.create_or_update_webdav_file(&webdav_file).await {
error!("Failed to record duplicate WebDAV file: {}", e);
}
info!("WebDAV file marked as duplicate_content, skipping processing");
return Ok(false); // Not processed (duplicate)
}
} else {
warn!("Could not read existing file for hash comparison: {}", existing_doc.file_path);
if let Err(e) = state.db.create_or_update_webdav_file(&webdav_file).await {
error!("Failed to record duplicate WebDAV file: {}", e);
}
info!("WebDAV file marked as duplicate_content, skipping processing");
return Ok(false); // Not processed (duplicate)
}
Ok(None) => {
info!("No duplicate content found for hash {}, proceeding with file processing", &file_hash[..8]);
}
Err(e) => {
warn!("Error checking for duplicate hash {}: {}", &file_hash[..8], e);
// Continue processing even if duplicate check fails
}
}
@ -330,7 +321,7 @@ async fn process_single_file(
let saved_file_path = file_service.save_file(&file_info.name, &file_data).await
.map_err(|e| format!("Failed to save {}: {}", file_info.name, e))?;
// Create document record
// Create document record with hash
let file_service = FileService::new(state.config.upload_path.clone());
let document = file_service.create_document(
&file_info.name,
@ -339,6 +330,7 @@ async fn process_single_file(
file_data.len() as i64,
&file_info.mime_type,
user_id,
Some(file_hash.clone()), // Store the calculated hash
);
// Save document to database

View File

@ -534,25 +534,19 @@ impl SourceSyncService {
// Calculate file hash for deduplication
let file_hash = Self::calculate_file_hash(&file_data);
// Check for duplicate content
if let Ok(existing_docs) = state.db.get_documents_by_user_with_role(
user_id,
crate::models::UserRole::User,
1000,
0
).await {
let matching_docs: Vec<_> = existing_docs.into_iter()
.filter(|doc| doc.file_size == file_data.len() as i64)
.collect();
for existing_doc in matching_docs {
if let Ok(existing_file_data) = tokio::fs::read(&existing_doc.file_path).await {
let existing_hash = Self::calculate_file_hash(&existing_file_data);
if file_hash == existing_hash {
info!("File content already exists, skipping: {}", file_info.path);
return Ok(false);
}
}
// Check for duplicate content using efficient hash lookup
match state.db.get_document_by_user_and_hash(user_id, &file_hash).await {
Ok(Some(existing_doc)) => {
info!("File content already exists for user {}: {} matches existing document {} (hash: {})",
user_id, file_info.name, existing_doc.original_filename, &file_hash[..8]);
return Ok(false); // Skip processing duplicate
}
Ok(None) => {
info!("No duplicate content found for hash {}, proceeding with file processing", &file_hash[..8]);
}
Err(e) => {
warn!("Error checking for duplicate hash {}: {}", &file_hash[..8], e);
// Continue processing even if duplicate check fails
}
}
@ -561,7 +555,7 @@ impl SourceSyncService {
let saved_file_path = file_service.save_file(&file_info.name, &file_data).await
.map_err(|e| anyhow!("Failed to save {}: {}", file_info.name, e))?;
// Create document record
// Create document record with hash
let document = file_service.create_document(
&file_info.name,
&file_info.name,
@ -569,6 +563,7 @@ impl SourceSyncService {
file_data.len() as i64,
&file_info.mime_type,
user_id,
Some(file_hash.clone()), // Store the calculated hash
);
let created_document = state.db.create_document(document).await
@ -655,25 +650,19 @@ impl SourceSyncService {
// Calculate file hash for deduplication
let file_hash = Self::calculate_file_hash(&file_data);
// Check for duplicate content
if let Ok(existing_docs) = state.db.get_documents_by_user_with_role(
user_id,
crate::models::UserRole::User,
1000,
0
).await {
let matching_docs: Vec<_> = existing_docs.into_iter()
.filter(|doc| doc.file_size == file_data.len() as i64)
.collect();
for existing_doc in matching_docs {
if let Ok(existing_file_data) = tokio::fs::read(&existing_doc.file_path).await {
let existing_hash = Self::calculate_file_hash(&existing_file_data);
if file_hash == existing_hash {
info!("File content already exists, skipping: {}", file_info.path);
return Ok(false);
}
}
// Check for duplicate content using efficient hash lookup
match state.db.get_document_by_user_and_hash(user_id, &file_hash).await {
Ok(Some(existing_doc)) => {
info!("File content already exists for user {}: {} matches existing document {} (hash: {})",
user_id, file_info.name, existing_doc.original_filename, &file_hash[..8]);
return Ok(false); // Skip processing duplicate
}
Ok(None) => {
info!("No duplicate content found for hash {}, proceeding with file processing", &file_hash[..8]);
}
Err(e) => {
warn!("Error checking for duplicate hash {}: {}", &file_hash[..8], e);
// Continue processing even if duplicate check fails
}
}
@ -688,7 +677,7 @@ impl SourceSyncService {
let saved_file_path = file_service.save_file(&file_info.name, &file_data).await
.map_err(|e| anyhow!("Failed to save {}: {}", file_info.name, e))?;
// Create document record
// Create document record with hash
let document = file_service.create_document(
&file_info.name,
&file_info.name,
@ -696,6 +685,7 @@ impl SourceSyncService {
file_data.len() as i64,
&file_info.mime_type,
user_id,
Some(file_hash.clone()), // Store the calculated hash
);
let created_document = state.db.create_document(document).await

View File

@ -72,6 +72,7 @@ mod tests {
1024,
"application/pdf",
user_id,
Some("abcd1234hash".to_string()),
);
assert_eq!(document.filename, "saved_file.pdf");
@ -80,6 +81,7 @@ mod tests {
assert_eq!(document.file_size, 1024);
assert_eq!(document.mime_type, "application/pdf");
assert_eq!(document.user_id, user_id);
assert_eq!(document.file_hash, Some("abcd1234hash".to_string()));
assert!(document.content.is_none());
assert!(document.ocr_text.is_none());
assert!(document.tags.is_empty());

View File

@ -362,6 +362,9 @@ async fn process_file(
let saved_file_path = file_service.save_file(&filename, &file_data).await?;
// Calculate file hash for deduplication
let file_hash = calculate_file_hash(&file_data);
let document = file_service.create_document(
&filename,
&filename,
@ -369,6 +372,7 @@ async fn process_file(
file_size,
&mime_type,
admin_user_id,
Some(file_hash),
);
let created_doc = db.create_document(document).await?;