feat(ingestion): have everything use the document ingestion engine

This commit is contained in:
perf3ct 2025-06-20 16:53:06 +00:00
parent ac069de5bc
commit c4a9c51b98
6 changed files with 235 additions and 343 deletions

View File

@ -6,12 +6,12 @@ use tokio::sync::Semaphore;
use tracing::{error, info, warn};
use uuid::Uuid;
use walkdir::WalkDir;
use sha2::{Sha256, Digest};
use crate::{
config::Config,
db::Database,
file_service::FileService,
document_ingestion::{DocumentIngestionService, IngestionResult},
ocr_queue::OcrQueueService,
};
@ -189,47 +189,36 @@ async fn process_single_file(
// Read file data
let file_data = fs::read(&path).await?;
// Calculate file hash for deduplication
let file_hash = calculate_file_hash(&file_data);
// Check for duplicate content using efficient hash lookup
match db.get_document_by_user_and_hash(user_id, &file_hash).await {
Ok(Some(existing_doc)) => {
info!("Skipping duplicate file: {} matches existing document {} (hash: {})",
filename, existing_doc.original_filename, &file_hash[..8]);
return Ok(None); // Skip processing duplicate
}
Ok(None) => {
info!("No duplicate content found for hash {}, proceeding with file processing", &file_hash[..8]);
}
Err(e) => {
warn!("Error checking for duplicate hash {}: {}", &file_hash[..8], e);
// Continue processing even if duplicate check fails
}
}
let mime_type = mime_guess::from_path(&filename)
.first_or_octet_stream()
.to_string();
// Save file
let file_path = file_service.save_file(&filename, &file_data).await?;
// Use the unified ingestion service for consistent deduplication
let ingestion_service = DocumentIngestionService::new(db, file_service);
// Create document with hash
let document = file_service.create_document(
&filename,
&filename,
&file_path,
file_size,
&mime_type,
user_id,
Some(file_hash),
);
// Save to database (without OCR)
let created_doc = db.create_document(document).await?;
Ok(Some((created_doc.id, file_size)))
let result = ingestion_service
.ingest_batch_file(&filename, file_data, &mime_type, user_id)
.await
.map_err(|e| anyhow::anyhow!(e))?;
match result {
IngestionResult::Created(doc) => {
info!("Created new document for batch file {}: {}", filename, doc.id);
Ok(Some((doc.id, file_size)))
}
IngestionResult::Skipped { existing_document_id, reason } => {
info!("Skipped duplicate batch file {}: {} (existing: {})", filename, reason, existing_document_id);
Ok(None) // File was skipped due to deduplication
}
IngestionResult::ExistingDocument(doc) => {
info!("Found existing document for batch file {}: {}", filename, doc.id);
Ok(None) // Don't re-queue for OCR
}
IngestionResult::TrackedAsDuplicate { existing_document_id } => {
info!("Tracked batch file {} as duplicate of existing document: {}", filename, existing_document_id);
Ok(None) // File was tracked as duplicate
}
}
}
fn calculate_priority(file_size: i64) -> i32 {
@ -247,9 +236,3 @@ fn calculate_priority(file_size: i64) -> i32 {
}
}
fn calculate_file_hash(data: &[u8]) -> String {
let mut hasher = Sha256::new();
hasher.update(data);
let result = hasher.finalize();
format!("{:x}", result)
}

View File

@ -346,7 +346,7 @@ async fn start_webdav_sync(
let enable_background_ocr = user_settings.enable_background_ocr;
tokio::spawn(async move {
match perform_webdav_sync_with_tracking(state_clone.clone(), user_id, webdav_service, webdav_config, enable_background_ocr).await {
match perform_webdav_sync_with_tracking(state_clone.clone(), user_id, webdav_service, webdav_config, enable_background_ocr, None).await {
Ok(files_processed) => {
info!("WebDAV sync completed successfully for user {}: {} files processed", user_id, files_processed);

View File

@ -4,12 +4,12 @@ use tracing::{error, info, warn};
use chrono::Utc;
use tokio::sync::Semaphore;
use futures::stream::{FuturesUnordered, StreamExt};
use sha2::{Sha256, Digest};
use crate::{
AppState,
models::{CreateWebDAVFile, UpdateWebDAVSyncState},
file_service::FileService,
document_ingestion::{DocumentIngestionService, IngestionResult},
webdav_service::{WebDAVConfig, WebDAVService},
};
@ -19,6 +19,7 @@ pub async fn perform_webdav_sync_with_tracking(
webdav_service: WebDAVService,
config: WebDAVConfig,
enable_background_ocr: bool,
webdav_source_id: Option<uuid::Uuid>,
) -> Result<usize, Box<dyn std::error::Error + Send + Sync>> {
info!("Performing WebDAV sync for user {} on {} folders", user_id, config.watch_folders.len());
@ -59,7 +60,7 @@ pub async fn perform_webdav_sync_with_tracking(
};
// Perform sync with proper cleanup
let sync_result = perform_sync_internal(state.clone(), user_id, webdav_service, config, enable_background_ocr).await;
let sync_result = perform_sync_internal(state.clone(), user_id, webdav_service, config, enable_background_ocr, webdav_source_id).await;
match &sync_result {
Ok(files_processed) => {
@ -80,6 +81,7 @@ async fn perform_sync_internal(
webdav_service: WebDAVService,
config: WebDAVConfig,
enable_background_ocr: bool,
webdav_source_id: Option<uuid::Uuid>,
) -> Result<usize, Box<dyn std::error::Error + Send + Sync>> {
let mut total_files_processed = 0;
@ -161,6 +163,7 @@ async fn perform_sync_internal(
&file_info_clone,
enable_background_ocr,
semaphore_clone,
webdav_source_id,
).await
};
@ -230,6 +233,7 @@ async fn process_single_file(
file_info: &crate::models::FileInfo,
enable_background_ocr: bool,
semaphore: Arc<Semaphore>,
webdav_source_id: Option<uuid::Uuid>,
) -> Result<bool, String> {
// Acquire semaphore permit to limit concurrent downloads
let _permit = semaphore.acquire().await.map_err(|e| format!("Semaphore error: {}", e))?;
@ -273,74 +277,68 @@ async fn process_single_file(
info!("Downloaded file: {} ({} bytes)", file_info.name, file_data.len());
// Calculate file hash for deduplication
let file_hash = calculate_file_hash(&file_data);
// Use the unified ingestion service for consistent deduplication
let file_service = FileService::new(state.config.upload_path.clone());
let ingestion_service = DocumentIngestionService::new(state.db.clone(), file_service);
// Check if this exact file content already exists for this user using efficient hash lookup
info!("Checking for duplicate content for user {}: {} (hash: {}, size: {} bytes)",
user_id, file_info.name, &file_hash[..8], file_data.len());
// Use efficient database hash lookup instead of reading all documents
match state.db.get_document_by_user_and_hash(user_id, &file_hash).await {
Ok(Some(existing_doc)) => {
info!("Found duplicate content for user {}: {} matches existing document {} (hash: {})",
user_id, file_info.name, existing_doc.original_filename, &file_hash[..8]);
// Record this WebDAV file as a duplicate but link to existing document
let webdav_file = CreateWebDAVFile {
let result = if let Some(source_id) = webdav_source_id {
ingestion_service
.ingest_from_webdav(
&file_info.name,
file_data,
&file_info.mime_type,
user_id,
webdav_path: file_info.path.clone(),
etag: file_info.etag.clone(),
last_modified: file_info.last_modified,
file_size: file_info.size,
mime_type: file_info.mime_type.clone(),
document_id: Some(existing_doc.id), // Link to existing document
sync_status: "duplicate_content".to_string(),
sync_error: None,
};
source_id,
)
.await
} else {
// Fallback for backward compatibility - treat as generic WebDAV sync
ingestion_service
.ingest_from_source(
&file_info.name,
file_data,
&file_info.mime_type,
user_id,
uuid::Uuid::new_v4(), // Generate a temporary ID for tracking
"webdav_sync",
)
.await
};
let result = result.map_err(|e| format!("Document ingestion failed for {}: {}", file_info.name, e))?;
let (document, should_queue_ocr, webdav_sync_status) = match result {
IngestionResult::Created(doc) => {
info!("Created new document for {}: {}", file_info.name, doc.id);
(doc, true, "synced") // New document - queue for OCR
}
IngestionResult::ExistingDocument(doc) => {
info!("Found existing document for {}: {}", file_info.name, doc.id);
(doc, false, "duplicate_content") // Existing document - don't re-queue OCR
}
IngestionResult::TrackedAsDuplicate { existing_document_id } => {
info!("Tracked {} as duplicate of existing document: {}", file_info.name, existing_document_id);
if let Err(e) = state.db.create_or_update_webdav_file(&webdav_file).await {
error!("Failed to record duplicate WebDAV file: {}", e);
}
// For duplicates, we still need to get the document info for WebDAV tracking
let existing_doc = state.db.get_document_by_id(existing_document_id, user_id, crate::models::UserRole::User).await
.map_err(|e| format!("Failed to get existing document: {}", e))?
.ok_or_else(|| "Document not found".to_string())?;
info!("WebDAV file marked as duplicate_content, skipping processing");
return Ok(false); // Not processed (duplicate)
(existing_doc, false, "duplicate_content") // Track as duplicate
}
Ok(None) => {
info!("No duplicate content found for hash {}, proceeding with file processing", &file_hash[..8]);
IngestionResult::Skipped { existing_document_id, reason: _ } => {
info!("Skipped duplicate file {}: existing document {}", file_info.name, existing_document_id);
// For skipped files, we still need to get the document info for WebDAV tracking
let existing_doc = state.db.get_document_by_id(existing_document_id, user_id, crate::models::UserRole::User).await
.map_err(|e| format!("Failed to get existing document: {}", e))?
.ok_or_else(|| "Document not found".to_string())?;
(existing_doc, false, "duplicate_content") // Track as duplicate
}
Err(e) => {
warn!("Error checking for duplicate hash {}: {}", &file_hash[..8], e);
// Continue processing even if duplicate check fails
}
}
// Create file service and save file to disk
let file_service = FileService::new(state.config.upload_path.clone());
let saved_file_path = file_service.save_file(&file_info.name, &file_data).await
.map_err(|e| format!("Failed to save {}: {}", file_info.name, e))?;
// Create document record with hash
let file_service = FileService::new(state.config.upload_path.clone());
let document = file_service.create_document(
&file_info.name,
&file_info.name, // original filename same as name
&saved_file_path,
file_data.len() as i64,
&file_info.mime_type,
user_id,
Some(file_hash.clone()), // Store the calculated hash
);
// Save document to database
let created_document = state.db.create_document(document)
.await
.map_err(|e| format!("Failed to create document {}: {}", file_info.name, e))?;
info!("Created document record for {}: {}", file_info.name, created_document.id);
// Record successful file in WebDAV files table
};
// Record WebDAV file in tracking table
let webdav_file = CreateWebDAVFile {
user_id,
webdav_path: file_info.path.clone(),
@ -348,8 +346,8 @@ async fn process_single_file(
last_modified: file_info.last_modified,
file_size: file_info.size,
mime_type: file_info.mime_type.clone(),
document_id: Some(created_document.id),
sync_status: "synced".to_string(),
document_id: Some(document.id),
sync_status: webdav_sync_status.to_string(),
sync_error: None,
};
@ -357,45 +355,26 @@ async fn process_single_file(
error!("Failed to record WebDAV file: {}", e);
}
// Queue for OCR processing if enabled
if enable_background_ocr {
info!("Background OCR is enabled, queueing document {} for processing", created_document.id);
// Queue for OCR processing if enabled and this is a new document
if enable_background_ocr && should_queue_ocr {
info!("Background OCR is enabled, queueing document {} for processing", document.id);
match state.db.pool.acquire().await {
Ok(_conn) => {
let queue_service = crate::ocr_queue::OcrQueueService::new(
state.db.clone(),
state.db.pool.clone(),
4
);
// Determine priority based on file size
let priority = if file_info.size <= 1024 * 1024 { 10 } // ≤ 1MB: High priority
else if file_info.size <= 5 * 1024 * 1024 { 8 } // ≤ 5MB: Medium priority
else if file_info.size <= 10 * 1024 * 1024 { 6 } // ≤ 10MB: Normal priority
else if file_info.size <= 50 * 1024 * 1024 { 4 } // ≤ 50MB: Low priority
else { 2 }; // > 50MB: Lowest priority
if let Err(e) = queue_service.enqueue_document(created_document.id, priority, file_info.size).await {
error!("Failed to enqueue document for OCR: {}", e);
} else {
info!("Enqueued document {} for OCR processing", created_document.id);
}
}
Err(e) => {
error!("Failed to connect to database for OCR queueing: {}", e);
}
// Determine priority based on file size
let priority = if file_info.size <= 1024 * 1024 { 10 } // ≤ 1MB: High priority
else if file_info.size <= 5 * 1024 * 1024 { 8 } // ≤ 5MB: Medium priority
else if file_info.size <= 10 * 1024 * 1024 { 6 } // ≤ 10MB: Normal priority
else if file_info.size <= 50 * 1024 * 1024 { 4 } // ≤ 50MB: Low priority
else { 2 }; // > 50MB: Lowest priority
if let Err(e) = state.queue_service.enqueue_document(document.id, priority, file_info.size).await {
error!("Failed to enqueue document for OCR: {}", e);
} else {
info!("Enqueued document {} for OCR processing", document.id);
}
} else {
info!("Background OCR is disabled, skipping OCR queue for document {}", created_document.id);
info!("Background OCR is disabled or document already processed, skipping OCR queue for document {}", document.id);
}
Ok(true) // Successfully processed
}
fn calculate_file_hash(data: &[u8]) -> String {
let mut hasher = Sha256::new();
hasher.update(data);
let result = hasher.finalize();
format!("{:x}", result)
}

View File

@ -5,7 +5,6 @@ use chrono::Utc;
use tokio::sync::Semaphore;
use tokio_util::sync::CancellationToken;
use futures::stream::{FuturesUnordered, StreamExt};
use sha2::{Sha256, Digest};
use tracing::{error, info, warn};
use uuid::Uuid;
@ -13,6 +12,7 @@ use crate::{
AppState,
models::{FileInfo, Source, SourceType, SourceStatus, LocalFolderSourceConfig, S3SourceConfig, WebDAVSourceConfig},
file_service::FileService,
document_ingestion::{DocumentIngestionService, IngestionResult},
local_folder_service::LocalFolderService,
s3_service::S3Service,
webdav_service::{WebDAVService, WebDAVConfig},
@ -507,7 +507,7 @@ impl SourceSyncService {
async fn process_single_file<D, Fut>(
state: Arc<AppState>,
user_id: Uuid,
_source_id: Uuid,
source_id: Uuid,
file_info: &FileInfo,
enable_background_ocr: bool,
semaphore: Arc<Semaphore>,
@ -521,9 +521,6 @@ impl SourceSyncService {
.map_err(|e| anyhow!("Semaphore error: {}", e))?;
info!("Processing file: {}", file_info.path);
// Check if we've already processed this file by looking for documents with same source
// This is a simplified version - you might want to implement source-specific tracking tables
// Download the file
let file_data = download_file(file_info.path.clone()).await
@ -531,73 +528,55 @@ impl SourceSyncService {
info!("Downloaded file: {} ({} bytes)", file_info.name, file_data.len());
// Calculate file hash for deduplication
let file_hash = Self::calculate_file_hash(&file_data);
// Check for duplicate content using efficient hash lookup
match state.db.get_document_by_user_and_hash(user_id, &file_hash).await {
Ok(Some(existing_doc)) => {
info!("File content already exists for user {}: {} matches existing document {} (hash: {})",
user_id, file_info.name, existing_doc.original_filename, &file_hash[..8]);
return Ok(false); // Skip processing duplicate
}
Ok(None) => {
info!("No duplicate content found for hash {}, proceeding with file processing", &file_hash[..8]);
}
Err(e) => {
warn!("Error checking for duplicate hash {}: {}", &file_hash[..8], e);
// Continue processing even if duplicate check fails
}
}
// Save file to disk
// Use the unified ingestion service for consistent deduplication
let file_service = FileService::new(state.config.upload_path.clone());
let saved_file_path = file_service.save_file(&file_info.name, &file_data).await
.map_err(|e| anyhow!("Failed to save {}: {}", file_info.name, e))?;
let ingestion_service = DocumentIngestionService::new(state.db.clone(), file_service);
let result = ingestion_service
.ingest_from_source(
&file_info.name,
file_data,
&file_info.mime_type,
user_id,
source_id,
"source_sync",
)
.await
.map_err(|e| anyhow!("Document ingestion failed for {}: {}", file_info.name, e))?;
// Create document record with hash
let document = file_service.create_document(
&file_info.name,
&file_info.name,
&saved_file_path,
file_data.len() as i64,
&file_info.mime_type,
user_id,
Some(file_hash.clone()), // Store the calculated hash
);
let (document, should_queue_ocr) = match result {
IngestionResult::Created(doc) => {
info!("Created new document for {}: {}", file_info.name, doc.id);
(doc, true) // New document - queue for OCR
}
IngestionResult::Skipped { existing_document_id, reason } => {
info!("Skipped duplicate file {}: {} (existing: {})", file_info.name, reason, existing_document_id);
return Ok(false); // File was skipped due to deduplication
}
IngestionResult::ExistingDocument(doc) => {
info!("Found existing document for {}: {}", file_info.name, doc.id);
(doc, false) // Existing document - don't re-queue OCR
}
IngestionResult::TrackedAsDuplicate { existing_document_id } => {
info!("Tracked {} as duplicate of existing document: {}", file_info.name, existing_document_id);
return Ok(false); // File was tracked as duplicate
}
};
let created_document = state.db.create_document(document).await
.map_err(|e| anyhow!("Failed to create document {}: {}", file_info.name, e))?;
// Queue for OCR if enabled and this is a new document
if enable_background_ocr && should_queue_ocr {
info!("Background OCR enabled, queueing document {} for processing", document.id);
info!("Created document record for {}: {}", file_info.name, created_document.id);
let priority = if file_info.size <= 1024 * 1024 { 10 }
else if file_info.size <= 5 * 1024 * 1024 { 8 }
else if file_info.size <= 10 * 1024 * 1024 { 6 }
else if file_info.size <= 50 * 1024 * 1024 { 4 }
else { 2 };
// Queue for OCR if enabled
if enable_background_ocr {
info!("Background OCR enabled, queueing document {} for processing", created_document.id);
match state.db.pool.acquire().await {
Ok(_conn) => {
let queue_service = crate::ocr_queue::OcrQueueService::new(
state.db.clone(),
state.db.pool.clone(),
4
);
let priority = if file_info.size <= 1024 * 1024 { 10 }
else if file_info.size <= 5 * 1024 * 1024 { 8 }
else if file_info.size <= 10 * 1024 * 1024 { 6 }
else if file_info.size <= 50 * 1024 * 1024 { 4 }
else { 2 };
if let Err(e) = queue_service.enqueue_document(created_document.id, priority, file_info.size).await {
error!("Failed to enqueue document for OCR: {}", e);
} else {
info!("Enqueued document {} for OCR processing", created_document.id);
}
}
Err(e) => {
error!("Failed to connect to database for OCR queueing: {}", e);
}
if let Err(e) = state.queue_service.enqueue_document(document.id, priority, file_info.size).await {
error!("Failed to enqueue document for OCR: {}", e);
} else {
info!("Enqueued document {} for OCR processing", document.id);
}
}
@ -607,7 +586,7 @@ impl SourceSyncService {
async fn process_single_file_with_cancellation<D, Fut>(
state: Arc<AppState>,
user_id: Uuid,
_source_id: Uuid,
source_id: Uuid,
file_info: &FileInfo,
enable_background_ocr: bool,
semaphore: Arc<Semaphore>,
@ -647,79 +626,61 @@ impl SourceSyncService {
info!("Downloaded file: {} ({} bytes)", file_info.name, file_data.len());
// Calculate file hash for deduplication
let file_hash = Self::calculate_file_hash(&file_data);
// Check for duplicate content using efficient hash lookup
match state.db.get_document_by_user_and_hash(user_id, &file_hash).await {
Ok(Some(existing_doc)) => {
info!("File content already exists for user {}: {} matches existing document {} (hash: {})",
user_id, file_info.name, existing_doc.original_filename, &file_hash[..8]);
return Ok(false); // Skip processing duplicate
}
Ok(None) => {
info!("No duplicate content found for hash {}, proceeding with file processing", &file_hash[..8]);
}
Err(e) => {
warn!("Error checking for duplicate hash {}: {}", &file_hash[..8], e);
// Continue processing even if duplicate check fails
}
}
// Check for cancellation before saving
// Check for cancellation before processing
if cancellation_token.is_cancelled() {
info!("File processing cancelled before saving: {}", file_info.path);
info!("File processing cancelled before ingestion: {}", file_info.path);
return Err(anyhow!("Processing cancelled"));
}
// Save file to disk
// Use the unified ingestion service for consistent deduplication
let file_service = FileService::new(state.config.upload_path.clone());
let saved_file_path = file_service.save_file(&file_info.name, &file_data).await
.map_err(|e| anyhow!("Failed to save {}: {}", file_info.name, e))?;
let ingestion_service = DocumentIngestionService::new(state.db.clone(), file_service);
let result = ingestion_service
.ingest_from_source(
&file_info.name,
file_data,
&file_info.mime_type,
user_id,
source_id,
"source_sync",
)
.await
.map_err(|e| anyhow!("Document ingestion failed for {}: {}", file_info.name, e))?;
// Create document record with hash
let document = file_service.create_document(
&file_info.name,
&file_info.name,
&saved_file_path,
file_data.len() as i64,
&file_info.mime_type,
user_id,
Some(file_hash.clone()), // Store the calculated hash
);
let (document, should_queue_ocr) = match result {
IngestionResult::Created(doc) => {
info!("Created new document for {}: {}", file_info.name, doc.id);
(doc, true) // New document - queue for OCR
}
IngestionResult::Skipped { existing_document_id, reason } => {
info!("Skipped duplicate file {}: {} (existing: {})", file_info.name, reason, existing_document_id);
return Ok(false); // File was skipped due to deduplication
}
IngestionResult::ExistingDocument(doc) => {
info!("Found existing document for {}: {}", file_info.name, doc.id);
(doc, false) // Existing document - don't re-queue OCR
}
IngestionResult::TrackedAsDuplicate { existing_document_id } => {
info!("Tracked {} as duplicate of existing document: {}", file_info.name, existing_document_id);
return Ok(false); // File was tracked as duplicate
}
};
let created_document = state.db.create_document(document).await
.map_err(|e| anyhow!("Failed to create document {}: {}", file_info.name, e))?;
// Queue for OCR if enabled and this is a new document (OCR continues even if sync is cancelled)
if enable_background_ocr && should_queue_ocr {
info!("Background OCR enabled, queueing document {} for processing", document.id);
info!("Created document record for {}: {}", file_info.name, created_document.id);
let priority = if file_info.size <= 1024 * 1024 { 10 }
else if file_info.size <= 5 * 1024 * 1024 { 8 }
else if file_info.size <= 10 * 1024 * 1024 { 6 }
else if file_info.size <= 50 * 1024 * 1024 { 4 }
else { 2 };
// Queue for OCR if enabled (OCR continues even if sync is cancelled)
if enable_background_ocr {
info!("Background OCR enabled, queueing document {} for processing", created_document.id);
match state.db.pool.acquire().await {
Ok(_conn) => {
let queue_service = crate::ocr_queue::OcrQueueService::new(
state.db.clone(),
state.db.pool.clone(),
4
);
let priority = if file_info.size <= 1024 * 1024 { 10 }
else if file_info.size <= 5 * 1024 * 1024 { 8 }
else if file_info.size <= 10 * 1024 * 1024 { 6 }
else if file_info.size <= 50 * 1024 * 1024 { 4 }
else { 2 };
if let Err(e) = queue_service.enqueue_document(created_document.id, priority, file_info.size).await {
error!("Failed to enqueue document for OCR: {}", e);
} else {
info!("Enqueued document {} for OCR processing", created_document.id);
}
}
Err(e) => {
error!("Failed to connect to database for OCR queueing: {}", e);
}
if let Err(e) = state.queue_service.enqueue_document(document.id, priority, file_info.size).await {
error!("Failed to enqueue document for OCR: {}", e);
} else {
info!("Enqueued document {} for OCR processing", document.id);
}
}
@ -752,10 +713,4 @@ impl SourceSyncService {
Ok(())
}
fn calculate_file_hash(data: &[u8]) -> String {
let mut hasher = Sha256::new();
hasher.update(data);
let result = hasher.finalize();
format!("{:x}", result)
}
}

View File

@ -7,9 +7,14 @@ use tokio::sync::mpsc;
use tokio::time::{interval, sleep};
use tracing::{debug, error, info, warn};
use walkdir::WalkDir;
use sha2::{Sha256, Digest};
use crate::{config::Config, db::Database, file_service::FileService, ocr_queue::OcrQueueService};
use crate::{
config::Config,
db::Database,
file_service::FileService,
document_ingestion::{DocumentIngestionService, IngestionResult},
ocr_queue::OcrQueueService
};
pub async fn start_folder_watcher(config: Config, db: Database) -> Result<()> {
info!("Starting hybrid folder watcher on: {}", config.watch_folder);
@ -315,36 +320,6 @@ async fn process_file(
.ok_or_else(|| anyhow::anyhow!("Admin user not found. Please ensure the admin user is created."))?;
let admin_user_id = admin_user.id;
// Calculate file hash for deduplication
let file_hash = calculate_file_hash(&file_data);
// Check if this exact file content already exists for the admin user
debug!("Checking for duplicate content for admin user: {} (hash: {}, size: {} bytes)",
filename, &file_hash[..8], file_size);
// Query documents with the same file size for the admin user only
if let Ok(existing_docs) = db.get_documents_by_user_with_role(admin_user_id, crate::models::UserRole::Admin, 1000, 0).await {
let matching_docs: Vec<_> = existing_docs.into_iter()
.filter(|doc| doc.file_size == file_size)
.collect();
debug!("Found {} documents with same size for admin user", matching_docs.len());
for existing_doc in matching_docs {
// Read the existing file and compare hashes
if let Ok(existing_file_data) = tokio::fs::read(&existing_doc.file_path).await {
let existing_hash = calculate_file_hash(&existing_file_data);
if file_hash == existing_hash {
info!("Skipping duplicate file content: {} (hash: {}, already exists as: {})",
filename, &file_hash[..8], existing_doc.original_filename);
return Ok(());
}
}
}
}
debug!("File content is unique: {} (hash: {})", filename, &file_hash[..8]);
// Validate PDF files before processing
if mime_type == "application/pdf" {
if !is_valid_pdf(&file_data) {
@ -360,28 +335,34 @@ async fn process_file(
}
}
let saved_file_path = file_service.save_file(&filename, &file_data).await?;
// Use the unified ingestion service for consistent deduplication
let ingestion_service = DocumentIngestionService::new(db.clone(), file_service.clone());
// Calculate file hash for deduplication
let file_hash = calculate_file_hash(&file_data);
let document = file_service.create_document(
&filename,
&filename,
&saved_file_path,
file_size,
&mime_type,
admin_user_id,
Some(file_hash),
);
let created_doc = db.create_document(document).await?;
// Enqueue for OCR processing with priority based on file size and type
let priority = calculate_priority(file_size, &mime_type);
queue_service.enqueue_document(created_doc.id, priority, file_size).await?;
info!("Successfully queued file for OCR: {} (size: {} bytes)", filename, file_size);
let result = ingestion_service
.ingest_batch_file(&filename, file_data, &mime_type, admin_user_id)
.await
.map_err(|e| anyhow::anyhow!(e))?;
match result {
IngestionResult::Created(doc) => {
info!("Created new document for watch folder file {}: {}", filename, doc.id);
// Enqueue for OCR processing with priority based on file size and type
let priority = calculate_priority(file_size, &mime_type);
queue_service.enqueue_document(doc.id, priority, file_size).await?;
info!("Successfully queued file for OCR: {} (size: {} bytes)", filename, file_size);
}
IngestionResult::Skipped { existing_document_id, reason } => {
info!("Skipped duplicate watch folder file {}: {} (existing: {})", filename, reason, existing_document_id);
}
IngestionResult::ExistingDocument(doc) => {
info!("Found existing document for watch folder file {}: {} (not re-queuing for OCR)", filename, doc.id);
}
IngestionResult::TrackedAsDuplicate { existing_document_id } => {
info!("Tracked watch folder file {} as duplicate of existing document: {}", filename, existing_document_id);
}
}
Ok(())
}
@ -463,9 +444,3 @@ fn clean_pdf_data(data: &[u8]) -> &[u8] {
data
}
fn calculate_file_hash(data: &[u8]) -> String {
let mut hasher = Sha256::new();
hasher.update(data);
let result = hasher.finalize();
format!("{:x}", result)
}

View File

@ -96,7 +96,7 @@ impl WebDAVScheduler {
info!("Resuming interrupted WebDAV sync for user {}", user_id);
tokio::spawn(async move {
match perform_webdav_sync_with_tracking(state_clone.clone(), user_id, webdav_service, webdav_config, enable_background_ocr).await {
match perform_webdav_sync_with_tracking(state_clone.clone(), user_id, webdav_service, webdav_config, enable_background_ocr, None).await {
Ok(files_processed) => {
info!("Resumed WebDAV sync completed for user {}: {} files processed", user_id, files_processed);
@ -156,7 +156,7 @@ impl WebDAVScheduler {
let enable_background_ocr = user_settings.enable_background_ocr;
tokio::spawn(async move {
match perform_webdav_sync_with_tracking(state_clone.clone(), user_id, webdav_service, webdav_config, enable_background_ocr).await {
match perform_webdav_sync_with_tracking(state_clone.clone(), user_id, webdav_service, webdav_config, enable_background_ocr, None).await {
Ok(files_processed) => {
info!("Background WebDAV sync completed for user {}: {} files processed", user_id, files_processed);