feat(server): updating the watcher.rs file to preserve source metadata
This commit is contained in:
parent
42bdda9476
commit
c4c0bc3295
|
|
@ -6,13 +6,15 @@ use tokio::sync::Semaphore;
|
||||||
use tracing::{error, info, warn};
|
use tracing::{error, info, warn};
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
use walkdir::WalkDir;
|
use walkdir::WalkDir;
|
||||||
|
use chrono::{DateTime, Utc};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
config::Config,
|
config::Config,
|
||||||
db::Database,
|
db::Database,
|
||||||
services::file_service::FileService,
|
services::file_service::FileService,
|
||||||
ingestion::document_ingestion::{DocumentIngestionService, IngestionResult},
|
ingestion::document_ingestion::{DocumentIngestionService, IngestionResult, DeduplicationPolicy},
|
||||||
ocr::queue::OcrQueueService,
|
ocr::queue::OcrQueueService,
|
||||||
|
models::FileInfo,
|
||||||
};
|
};
|
||||||
|
|
||||||
pub struct BatchIngester {
|
pub struct BatchIngester {
|
||||||
|
|
@ -164,58 +166,105 @@ impl BatchIngester {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn process_single_file(
|
/// Extract FileInfo from filesystem path and metadata
|
||||||
path: PathBuf,
|
async fn extract_file_info_from_path(path: &Path) -> Result<FileInfo> {
|
||||||
file_service: FileService,
|
let metadata = fs::metadata(path).await?;
|
||||||
user_id: Uuid,
|
|
||||||
db: Database,
|
|
||||||
) -> Result<Option<(Uuid, i64)>> {
|
|
||||||
let filename = path
|
let filename = path
|
||||||
.file_name()
|
.file_name()
|
||||||
.and_then(|n| n.to_str())
|
.and_then(|n| n.to_str())
|
||||||
.unwrap_or("")
|
.unwrap_or("")
|
||||||
.to_string();
|
.to_string();
|
||||||
|
|
||||||
// Read file metadata
|
|
||||||
let metadata = fs::metadata(&path).await?;
|
|
||||||
let file_size = metadata.len() as i64;
|
let file_size = metadata.len() as i64;
|
||||||
|
let mime_type = mime_guess::from_path(&filename)
|
||||||
|
.first_or_octet_stream()
|
||||||
|
.to_string();
|
||||||
|
|
||||||
|
// Extract timestamps
|
||||||
|
let last_modified = metadata.modified()
|
||||||
|
.ok()
|
||||||
|
.and_then(|time| time.duration_since(std::time::UNIX_EPOCH).ok())
|
||||||
|
.map(|duration| DateTime::from_timestamp(duration.as_secs() as i64, 0).unwrap_or_else(Utc::now));
|
||||||
|
|
||||||
|
let created_at = metadata.created()
|
||||||
|
.ok()
|
||||||
|
.and_then(|time| time.duration_since(std::time::UNIX_EPOCH).ok())
|
||||||
|
.map(|duration| DateTime::from_timestamp(duration.as_secs() as i64, 0).unwrap_or_else(Utc::now));
|
||||||
|
|
||||||
|
// Extract Unix permissions (if on Unix-like system)
|
||||||
|
#[cfg(unix)]
|
||||||
|
let (permissions, owner, group) = {
|
||||||
|
use std::os::unix::fs::MetadataExt;
|
||||||
|
let permissions = Some(metadata.mode());
|
||||||
|
|
||||||
|
// For now, just use uid/gid as strings (could be enhanced to resolve names later)
|
||||||
|
let owner = Some(metadata.uid().to_string());
|
||||||
|
let group = Some(metadata.gid().to_string());
|
||||||
|
|
||||||
|
(permissions, owner, group)
|
||||||
|
};
|
||||||
|
|
||||||
|
// On non-Unix systems, permissions/owner/group are not available
|
||||||
|
#[cfg(not(unix))]
|
||||||
|
let (permissions, owner, group) = (None, None, None);
|
||||||
|
|
||||||
|
Ok(FileInfo {
|
||||||
|
path: path.to_string_lossy().to_string(),
|
||||||
|
name: filename,
|
||||||
|
size: file_size,
|
||||||
|
mime_type,
|
||||||
|
last_modified,
|
||||||
|
etag: format!("{}-{}", file_size, last_modified.map_or(0, |t| t.timestamp())),
|
||||||
|
is_directory: metadata.is_dir(),
|
||||||
|
created_at,
|
||||||
|
permissions,
|
||||||
|
owner,
|
||||||
|
group,
|
||||||
|
metadata: None, // Could extract EXIF/other metadata in the future
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn process_single_file(
|
||||||
|
path: PathBuf,
|
||||||
|
file_service: FileService,
|
||||||
|
user_id: Uuid,
|
||||||
|
db: Database,
|
||||||
|
) -> Result<Option<(Uuid, i64)>> {
|
||||||
|
// Extract file info with metadata
|
||||||
|
let file_info = extract_file_info_from_path(&path).await?;
|
||||||
|
|
||||||
// Skip very large files (> 100MB)
|
// Skip very large files (> 100MB)
|
||||||
if file_size > 100 * 1024 * 1024 {
|
if file_info.size > 100 * 1024 * 1024 {
|
||||||
warn!("Skipping large file: {} ({} MB)", filename, file_size / 1024 / 1024);
|
warn!("Skipping large file: {} ({} MB)", file_info.name, file_info.size / 1024 / 1024);
|
||||||
return Ok(None);
|
return Ok(None);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Read file data
|
// Read file data
|
||||||
let file_data = fs::read(&path).await?;
|
let file_data = fs::read(&path).await?;
|
||||||
|
|
||||||
let mime_type = mime_guess::from_path(&filename)
|
// Use the unified ingestion service with full metadata support
|
||||||
.first_or_octet_stream()
|
|
||||||
.to_string();
|
|
||||||
|
|
||||||
// Use the unified ingestion service for consistent deduplication
|
|
||||||
let ingestion_service = DocumentIngestionService::new(db, file_service);
|
let ingestion_service = DocumentIngestionService::new(db, file_service);
|
||||||
|
|
||||||
let result = ingestion_service
|
let result = ingestion_service
|
||||||
.ingest_batch_file(&filename, file_data, &mime_type, user_id)
|
.ingest_from_file_info(&file_info, file_data, user_id, DeduplicationPolicy::Skip, "batch_ingest", None)
|
||||||
.await
|
.await
|
||||||
.map_err(|e| anyhow::anyhow!(e))?;
|
.map_err(|e| anyhow::anyhow!(e))?;
|
||||||
|
|
||||||
match result {
|
match result {
|
||||||
IngestionResult::Created(doc) => {
|
IngestionResult::Created(doc) => {
|
||||||
info!("Created new document for batch file {}: {}", filename, doc.id);
|
info!("Created new document for batch file {}: {}", file_info.name, doc.id);
|
||||||
Ok(Some((doc.id, file_size)))
|
Ok(Some((doc.id, file_info.size)))
|
||||||
}
|
}
|
||||||
IngestionResult::Skipped { existing_document_id, reason } => {
|
IngestionResult::Skipped { existing_document_id, reason } => {
|
||||||
info!("Skipped duplicate batch file {}: {} (existing: {})", filename, reason, existing_document_id);
|
info!("Skipped duplicate batch file {}: {} (existing: {})", file_info.name, reason, existing_document_id);
|
||||||
Ok(None) // File was skipped due to deduplication
|
Ok(None) // File was skipped due to deduplication
|
||||||
}
|
}
|
||||||
IngestionResult::ExistingDocument(doc) => {
|
IngestionResult::ExistingDocument(doc) => {
|
||||||
info!("Found existing document for batch file {}: {}", filename, doc.id);
|
info!("Found existing document for batch file {}: {}", file_info.name, doc.id);
|
||||||
Ok(None) // Don't re-queue for OCR
|
Ok(None) // Don't re-queue for OCR
|
||||||
}
|
}
|
||||||
IngestionResult::TrackedAsDuplicate { existing_document_id } => {
|
IngestionResult::TrackedAsDuplicate { existing_document_id } => {
|
||||||
info!("Tracked batch file {} as duplicate of existing document: {}", filename, existing_document_id);
|
info!("Tracked batch file {} as duplicate of existing document: {}", file_info.name, existing_document_id);
|
||||||
Ok(None) // File was tracked as duplicate
|
Ok(None) // File was tracked as duplicate
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -7,13 +7,15 @@ use tokio::sync::mpsc;
|
||||||
use tokio::time::{interval, sleep};
|
use tokio::time::{interval, sleep};
|
||||||
use tracing::{debug, error, info, warn};
|
use tracing::{debug, error, info, warn};
|
||||||
use walkdir::WalkDir;
|
use walkdir::WalkDir;
|
||||||
|
use chrono::{DateTime, Utc};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
config::Config,
|
config::Config,
|
||||||
db::Database,
|
db::Database,
|
||||||
services::file_service::FileService,
|
services::file_service::FileService,
|
||||||
ingestion::document_ingestion::{DocumentIngestionService, IngestionResult},
|
ingestion::document_ingestion::{DocumentIngestionService, IngestionResult, DeduplicationPolicy},
|
||||||
ocr::queue::OcrQueueService
|
ocr::queue::OcrQueueService,
|
||||||
|
models::FileInfo,
|
||||||
};
|
};
|
||||||
|
|
||||||
pub async fn start_folder_watcher(config: Config, db: Database) -> Result<()> {
|
pub async fn start_folder_watcher(config: Config, db: Database) -> Result<()> {
|
||||||
|
|
@ -335,38 +337,96 @@ async fn process_file(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Extract file info with metadata
|
||||||
|
let file_info = extract_file_info_from_path(path).await?;
|
||||||
|
|
||||||
// Use the unified ingestion service for consistent deduplication
|
// Use the unified ingestion service for consistent deduplication
|
||||||
let ingestion_service = DocumentIngestionService::new(db.clone(), file_service.clone());
|
let ingestion_service = DocumentIngestionService::new(db.clone(), file_service.clone());
|
||||||
|
|
||||||
let result = ingestion_service
|
let result = ingestion_service
|
||||||
.ingest_batch_file(&filename, file_data, &mime_type, admin_user_id)
|
.ingest_from_file_info(&file_info, file_data, admin_user_id, DeduplicationPolicy::Skip, "watch_folder", None)
|
||||||
.await
|
.await
|
||||||
.map_err(|e| anyhow::anyhow!(e))?;
|
.map_err(|e| anyhow::anyhow!(e))?;
|
||||||
|
|
||||||
match result {
|
match result {
|
||||||
IngestionResult::Created(doc) => {
|
IngestionResult::Created(doc) => {
|
||||||
info!("Created new document for watch folder file {}: {}", filename, doc.id);
|
info!("Created new document for watch folder file {}: {}", file_info.name, doc.id);
|
||||||
|
|
||||||
// Enqueue for OCR processing with priority based on file size and type
|
// Enqueue for OCR processing with priority based on file size and type
|
||||||
let priority = calculate_priority(file_size, &mime_type);
|
let priority = calculate_priority(file_info.size, &file_info.mime_type);
|
||||||
queue_service.enqueue_document(doc.id, priority, file_size).await?;
|
queue_service.enqueue_document(doc.id, priority, file_info.size).await?;
|
||||||
|
|
||||||
info!("Successfully queued file for OCR: {} (size: {} bytes)", filename, file_size);
|
info!("Successfully queued file for OCR: {} (size: {} bytes)", file_info.name, file_info.size);
|
||||||
}
|
}
|
||||||
IngestionResult::Skipped { existing_document_id, reason } => {
|
IngestionResult::Skipped { existing_document_id, reason } => {
|
||||||
info!("Skipped duplicate watch folder file {}: {} (existing: {})", filename, reason, existing_document_id);
|
info!("Skipped duplicate watch folder file {}: {} (existing: {})", file_info.name, reason, existing_document_id);
|
||||||
}
|
}
|
||||||
IngestionResult::ExistingDocument(doc) => {
|
IngestionResult::ExistingDocument(doc) => {
|
||||||
info!("Found existing document for watch folder file {}: {} (not re-queuing for OCR)", filename, doc.id);
|
info!("Found existing document for watch folder file {}: {} (not re-queuing for OCR)", file_info.name, doc.id);
|
||||||
}
|
}
|
||||||
IngestionResult::TrackedAsDuplicate { existing_document_id } => {
|
IngestionResult::TrackedAsDuplicate { existing_document_id } => {
|
||||||
info!("Tracked watch folder file {} as duplicate of existing document: {}", filename, existing_document_id);
|
info!("Tracked watch folder file {} as duplicate of existing document: {}", file_info.name, existing_document_id);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Extract FileInfo from filesystem path and metadata (for watcher)
|
||||||
|
async fn extract_file_info_from_path(path: &Path) -> Result<FileInfo> {
|
||||||
|
let metadata = tokio::fs::metadata(path).await?;
|
||||||
|
let filename = path
|
||||||
|
.file_name()
|
||||||
|
.and_then(|n| n.to_str())
|
||||||
|
.unwrap_or("")
|
||||||
|
.to_string();
|
||||||
|
|
||||||
|
let file_size = metadata.len() as i64;
|
||||||
|
let mime_type = mime_guess::from_path(&filename)
|
||||||
|
.first_or_octet_stream()
|
||||||
|
.to_string();
|
||||||
|
|
||||||
|
// Extract timestamps
|
||||||
|
let last_modified = metadata.modified()
|
||||||
|
.ok()
|
||||||
|
.and_then(|time| time.duration_since(std::time::UNIX_EPOCH).ok())
|
||||||
|
.map(|duration| DateTime::from_timestamp(duration.as_secs() as i64, 0).unwrap_or_else(Utc::now));
|
||||||
|
|
||||||
|
let created_at = metadata.created()
|
||||||
|
.ok()
|
||||||
|
.and_then(|time| time.duration_since(std::time::UNIX_EPOCH).ok())
|
||||||
|
.map(|duration| DateTime::from_timestamp(duration.as_secs() as i64, 0).unwrap_or_else(Utc::now));
|
||||||
|
|
||||||
|
// Extract Unix permissions (if on Unix-like system)
|
||||||
|
#[cfg(unix)]
|
||||||
|
let (permissions, owner, group) = {
|
||||||
|
use std::os::unix::fs::MetadataExt;
|
||||||
|
let permissions = Some(metadata.mode() as u32);
|
||||||
|
let owner = Some(metadata.uid().to_string());
|
||||||
|
let group = Some(metadata.gid().to_string());
|
||||||
|
(permissions, owner, group)
|
||||||
|
};
|
||||||
|
|
||||||
|
// On non-Unix systems, permissions/owner/group are not available
|
||||||
|
#[cfg(not(unix))]
|
||||||
|
let (permissions, owner, group) = (None, None, None);
|
||||||
|
|
||||||
|
Ok(FileInfo {
|
||||||
|
path: path.to_string_lossy().to_string(),
|
||||||
|
name: filename,
|
||||||
|
size: file_size,
|
||||||
|
mime_type,
|
||||||
|
last_modified,
|
||||||
|
etag: format!("{}-{}", file_size, last_modified.map_or(0, |t| t.timestamp())),
|
||||||
|
is_directory: metadata.is_dir(),
|
||||||
|
created_at,
|
||||||
|
permissions,
|
||||||
|
owner,
|
||||||
|
group,
|
||||||
|
metadata: None, // Could extract EXIF/other metadata in the future
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
fn is_ocr_able_file(mime_type: &str) -> bool {
|
fn is_ocr_able_file(mime_type: &str) -> bool {
|
||||||
matches!(mime_type,
|
matches!(mime_type,
|
||||||
"application/pdf" |
|
"application/pdf" |
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue