feat(ingestion): create ingestion engine to handle document creation, and centralize deduplication logic

This commit is contained in:
perf3ct 2025-06-20 16:24:26 +00:00
parent 6b7981cd1a
commit ac069de5bc
4 changed files with 320 additions and 51 deletions

274
src/document_ingestion.rs Normal file
View File

@ -0,0 +1,274 @@
/*!
* Unified Document Ingestion Service
*
* This module provides a centralized abstraction for document ingestion with
* consistent deduplication logic across all sources (direct upload, WebDAV,
* source sync, batch ingest, folder watcher).
*/
use uuid::Uuid;
use sha2::{Digest, Sha256};
use tracing::{info, warn};
use crate::models::Document;
use crate::db::Database;
use crate::file_service::FileService;
#[derive(Debug, Clone)]
pub enum DeduplicationPolicy {
/// Skip ingestion if content already exists (for batch operations)
Skip,
/// Return existing document if content already exists (for direct uploads)
ReturnExisting,
/// Create new document record even if content exists (allows multiple filenames for same content)
AllowDuplicateContent,
/// Track as duplicate but link to existing document (for WebDAV)
TrackAsDuplicate,
}
#[derive(Debug)]
pub enum IngestionResult {
/// New document was created
Created(Document),
/// Existing document was returned (content duplicate)
ExistingDocument(Document),
/// Document was skipped due to duplication policy
Skipped { existing_document_id: Uuid, reason: String },
/// Document was tracked as duplicate (for WebDAV)
TrackedAsDuplicate { existing_document_id: Uuid },
}
#[derive(Debug)]
pub struct DocumentIngestionRequest {
pub filename: String,
pub original_filename: String,
pub file_data: Vec<u8>,
pub mime_type: String,
pub user_id: Uuid,
pub deduplication_policy: DeduplicationPolicy,
/// Optional source identifier for tracking
pub source_type: Option<String>,
pub source_id: Option<Uuid>,
}
pub struct DocumentIngestionService {
db: Database,
file_service: FileService,
}
impl DocumentIngestionService {
pub fn new(db: Database, file_service: FileService) -> Self {
Self { db, file_service }
}
/// Unified document ingestion with configurable deduplication policy
pub async fn ingest_document(&self, request: DocumentIngestionRequest) -> Result<IngestionResult, Box<dyn std::error::Error + Send + Sync>> {
let file_hash = self.calculate_file_hash(&request.file_data);
let file_size = request.file_data.len() as i64;
info!(
"Ingesting document: {} for user {} (hash: {}, size: {} bytes, policy: {:?})",
request.filename, request.user_id, &file_hash[..8], file_size, request.deduplication_policy
);
// Check for existing document with same content
match self.db.get_document_by_user_and_hash(request.user_id, &file_hash).await {
Ok(Some(existing_doc)) => {
info!(
"Found existing document with same content: {} (ID: {}) matches new file: {}",
existing_doc.original_filename, existing_doc.id, request.filename
);
match request.deduplication_policy {
DeduplicationPolicy::Skip => {
return Ok(IngestionResult::Skipped {
existing_document_id: existing_doc.id,
reason: format!("Content already exists as '{}'", existing_doc.original_filename),
});
}
DeduplicationPolicy::ReturnExisting => {
return Ok(IngestionResult::ExistingDocument(existing_doc));
}
DeduplicationPolicy::TrackAsDuplicate => {
return Ok(IngestionResult::TrackedAsDuplicate {
existing_document_id: existing_doc.id,
});
}
DeduplicationPolicy::AllowDuplicateContent => {
// Continue with creating new document record
info!("Creating new document record despite duplicate content (policy: AllowDuplicateContent)");
}
}
}
Ok(None) => {
info!("No duplicate content found, proceeding with new document creation");
}
Err(e) => {
warn!("Error checking for duplicate content (hash: {}): {}", &file_hash[..8], e);
// Continue with ingestion even if duplicate check fails
}
}
// Save file to storage
let file_path = self.file_service
.save_file(&request.filename, &request.file_data)
.await
.map_err(|e| {
warn!("Failed to save file {}: {}", request.filename, e);
e
})?;
// Create document record
let document = self.file_service.create_document(
&request.filename,
&request.original_filename,
&file_path,
file_size,
&request.mime_type,
request.user_id,
Some(file_hash.clone()),
);
let saved_document = match self.db.create_document(document).await {
Ok(doc) => doc,
Err(e) => {
// Check if this is a unique constraint violation on the hash
let error_string = e.to_string();
if error_string.contains("duplicate key value violates unique constraint")
&& error_string.contains("idx_documents_user_file_hash") {
warn!("Hash collision detected during concurrent upload for {} (hash: {}), fetching existing document",
request.filename, &file_hash[..8]);
// Race condition: another request created the document, fetch it
match self.db.get_document_by_user_and_hash(request.user_id, &file_hash).await {
Ok(Some(existing_doc)) => {
info!("Found existing document after collision for {}: {} (ID: {})",
request.filename, existing_doc.original_filename, existing_doc.id);
return Ok(IngestionResult::ExistingDocument(existing_doc));
}
Ok(None) => {
warn!("Unexpected: constraint violation but no document found for hash {}", &file_hash[..8]);
return Err(e.into());
}
Err(fetch_err) => {
warn!("Failed to fetch document after constraint violation: {}", fetch_err);
return Err(e.into());
}
}
} else {
warn!("Failed to create document record for {} (hash: {}): {}",
request.filename, &file_hash[..8], e);
return Err(e.into());
}
}
};
info!(
"Successfully ingested document: {} (ID: {}) for user {}",
saved_document.original_filename, saved_document.id, request.user_id
);
Ok(IngestionResult::Created(saved_document))
}
/// Calculate SHA256 hash of file content
fn calculate_file_hash(&self, data: &[u8]) -> String {
let mut hasher = Sha256::new();
hasher.update(data);
let result = hasher.finalize();
format!("{:x}", result)
}
/// Convenience method for direct uploads (maintains backward compatibility)
pub async fn ingest_upload(
&self,
filename: &str,
file_data: Vec<u8>,
mime_type: &str,
user_id: Uuid,
) -> Result<IngestionResult, Box<dyn std::error::Error + Send + Sync>> {
let request = DocumentIngestionRequest {
filename: filename.to_string(),
original_filename: filename.to_string(),
file_data,
mime_type: mime_type.to_string(),
user_id,
deduplication_policy: DeduplicationPolicy::AllowDuplicateContent, // Fixed behavior for uploads
source_type: Some("direct_upload".to_string()),
source_id: None,
};
self.ingest_document(request).await
}
/// Convenience method for source sync operations
pub async fn ingest_from_source(
&self,
filename: &str,
file_data: Vec<u8>,
mime_type: &str,
user_id: Uuid,
source_id: Uuid,
source_type: &str,
) -> Result<IngestionResult, Box<dyn std::error::Error + Send + Sync>> {
let request = DocumentIngestionRequest {
filename: filename.to_string(),
original_filename: filename.to_string(),
file_data,
mime_type: mime_type.to_string(),
user_id,
deduplication_policy: DeduplicationPolicy::Skip, // Skip duplicates for source sync
source_type: Some(source_type.to_string()),
source_id: Some(source_id),
};
self.ingest_document(request).await
}
/// Convenience method for WebDAV operations
pub async fn ingest_from_webdav(
&self,
filename: &str,
file_data: Vec<u8>,
mime_type: &str,
user_id: Uuid,
webdav_source_id: Uuid,
) -> Result<IngestionResult, Box<dyn std::error::Error + Send + Sync>> {
let request = DocumentIngestionRequest {
filename: filename.to_string(),
original_filename: filename.to_string(),
file_data,
mime_type: mime_type.to_string(),
user_id,
deduplication_policy: DeduplicationPolicy::TrackAsDuplicate, // Track duplicates for WebDAV
source_type: Some("webdav".to_string()),
source_id: Some(webdav_source_id),
};
self.ingest_document(request).await
}
/// Convenience method for batch ingestion
pub async fn ingest_batch_file(
&self,
filename: &str,
file_data: Vec<u8>,
mime_type: &str,
user_id: Uuid,
) -> Result<IngestionResult, Box<dyn std::error::Error + Send + Sync>> {
let request = DocumentIngestionRequest {
filename: filename.to_string(),
original_filename: filename.to_string(),
file_data,
mime_type: mime_type.to_string(),
user_id,
deduplication_policy: DeduplicationPolicy::Skip, // Skip duplicates for batch operations
source_type: Some("batch_ingest".to_string()),
source_id: None,
};
self.ingest_document(request).await
}
}
// TODO: Add comprehensive tests once test_helpers module is available

View File

@ -3,6 +3,7 @@ pub mod batch_ingest;
pub mod config;
pub mod db;
pub mod db_guardrails_simple;
pub mod document_ingestion;
pub mod enhanced_ocr;
pub mod error_management;
pub mod file_service;

View File

@ -8,15 +8,16 @@ use axum::{
use serde::Deserialize;
use std::sync::Arc;
use utoipa::ToSchema;
use sha2::{Sha256, Digest};
use sqlx::Row;
use crate::{
auth::AuthUser,
document_ingestion::{DocumentIngestionService, IngestionResult},
file_service::FileService,
models::DocumentResponse,
AppState,
};
use tracing;
#[derive(Deserialize, ToSchema)]
struct PaginationQuery {
@ -109,6 +110,7 @@ async fn upload_document(
mut multipart: Multipart,
) -> Result<Json<DocumentResponse>, StatusCode> {
let file_service = FileService::new(state.config.upload_path.clone());
let ingestion_service = DocumentIngestionService::new(state.db.clone(), file_service.clone());
// Get user settings for file upload restrictions
let settings = state
@ -140,55 +142,34 @@ async fn upload_document(
return Err(StatusCode::PAYLOAD_TOO_LARGE);
}
// Calculate file hash for deduplication
let file_hash = calculate_file_hash(&data);
// Check if this exact file content already exists using efficient hash lookup
match state.db.get_document_by_user_and_hash(auth_user.user.id, &file_hash).await {
Ok(Some(existing_doc)) => {
// Return the existing document instead of creating a duplicate
return Ok(Json(existing_doc.into()));
}
Ok(None) => {
// No duplicate found, proceed with upload
}
Err(_) => {
// Continue even if duplicate check fails
}
}
let mime_type = mime_guess::from_path(&filename)
.first_or_octet_stream()
.to_string();
let file_path = file_service
.save_file(&filename, &data)
// Use the unified ingestion service with AllowDuplicateContent policy
// This will create separate documents for different filenames even with same content
let result = ingestion_service
.ingest_upload(&filename, data.to_vec(), &mime_type, auth_user.user.id)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
.map_err(|e| {
tracing::error!("Document ingestion failed for user {} filename {}: {}",
auth_user.user.id, filename, e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
let document = file_service.create_document(
&filename,
&filename,
&file_path,
file_size,
&mime_type,
auth_user.user.id,
Some(file_hash),
);
let saved_document = state
.db
.create_document(document)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let (saved_document, should_queue_ocr) = match result {
IngestionResult::Created(doc) => (doc, true), // New document - queue for OCR
IngestionResult::ExistingDocument(doc) => (doc, false), // Existing document - don't re-queue OCR
_ => return Err(StatusCode::INTERNAL_SERVER_ERROR),
};
let document_id = saved_document.id;
let enable_background_ocr = settings.enable_background_ocr;
if enable_background_ocr {
if enable_background_ocr && should_queue_ocr {
// Use the shared queue service from AppState instead of creating a new one
// Calculate priority based on file size
let priority = match file_size {
let priority = match saved_document.file_size {
0..=1048576 => 10, // <= 1MB: highest priority
..=5242880 => 8, // 1-5MB: high priority
..=10485760 => 6, // 5-10MB: medium priority
@ -196,7 +177,7 @@ async fn upload_document(
_ => 2, // > 50MB: lowest priority
};
state.queue_service.enqueue_document(document_id, priority, file_size).await
state.queue_service.enqueue_document(document_id, priority, saved_document.file_size).await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
}
@ -207,12 +188,6 @@ async fn upload_document(
Err(StatusCode::BAD_REQUEST)
}
fn calculate_file_hash(data: &[u8]) -> String {
let mut hasher = Sha256::new();
hasher.update(data);
let result = hasher.finalize();
format!("{:x}", result)
}
#[utoipa::path(
get,

View File

@ -612,7 +612,9 @@ async fn debug_document_upload_race_conditions() {
let debugger = PipelineDebugger::new().await;
// Upload same content with different filenames to test upload race conditions
// Upload same content with different filenames to test:
// 1. Concurrent upload race condition handling (no 500 errors)
// 2. Proper deduplication (identical content = same document ID)
let same_content = "IDENTICAL-CONTENT-FOR-RACE-CONDITION-TEST";
let task1 = debugger.upload_document_with_debug(same_content, "race1.txt");
let task2 = debugger.upload_document_with_debug(same_content, "race2.txt");
@ -627,15 +629,32 @@ async fn debug_document_upload_race_conditions() {
i+1, doc.id, doc.filename, doc.file_size);
}
// Check if all documents have unique IDs
// Check deduplication behavior: identical content should result in same document ID
let mut ids: Vec<_> = docs.iter().map(|d| d.id).collect();
ids.sort();
ids.dedup();
if ids.len() == docs.len() {
println!("✅ All documents have unique IDs");
if ids.len() == 1 {
println!("✅ Correct deduplication: All identical content maps to same document ID");
println!("✅ Race condition handled properly: No 500 errors during concurrent uploads");
} else if ids.len() == docs.len() {
println!("❌ UNEXPECTED: All documents have unique IDs despite identical content");
panic!("Deduplication not working - identical content should map to same document");
} else {
println!("❌ DUPLICATE DOCUMENT IDs DETECTED!");
panic!("Document upload race condition detected");
println!("❌ PARTIAL DEDUPLICATION: Some duplicates detected but not all");
panic!("Inconsistent deduplication behavior");
}
// Verify all documents have the same content hash (should be identical)
let content_hashes: Vec<_> = docs.iter().map(|d| {
// We can't directly access file_hash from DocumentResponse, but we can verify
// they all have the same file size as a proxy for same content
d.file_size
}).collect();
if content_hashes.iter().all(|&size| size == content_hashes[0]) {
println!("✅ All documents have same file size (content verification)");
} else {
println!("❌ Documents have different file sizes - test setup error");
}
}