feat(server): decrease logging verbosity for ingestion
This commit is contained in:
parent
0484245886
commit
a5ca6e33f2
|
|
@ -2739,6 +2739,15 @@ dependencies = [
|
||||||
"hashbrown 0.15.4",
|
"hashbrown 0.15.4",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "matchers"
|
||||||
|
version = "0.1.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558"
|
||||||
|
dependencies = [
|
||||||
|
"regex-automata 0.1.10",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "matchit"
|
name = "matchit"
|
||||||
version = "0.8.4"
|
version = "0.8.4"
|
||||||
|
|
@ -3221,7 +3230,7 @@ checksum = "914a1c2265c98e2446911282c6ac86d8524f495792c38c5bd884f80499c7538a"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"parse-display-derive",
|
"parse-display-derive",
|
||||||
"regex",
|
"regex",
|
||||||
"regex-syntax",
|
"regex-syntax 0.8.5",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
|
|
@ -3233,7 +3242,7 @@ dependencies = [
|
||||||
"proc-macro2",
|
"proc-macro2",
|
||||||
"quote",
|
"quote",
|
||||||
"regex",
|
"regex",
|
||||||
"regex-syntax",
|
"regex-syntax 0.8.5",
|
||||||
"structmeta",
|
"structmeta",
|
||||||
"syn 2.0.103",
|
"syn 2.0.103",
|
||||||
]
|
]
|
||||||
|
|
@ -3725,8 +3734,17 @@ checksum = "b544ef1b4eac5dc2db33ea63606ae9ffcfac26c1416a2806ae0bf5f56b201191"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"aho-corasick",
|
"aho-corasick",
|
||||||
"memchr",
|
"memchr",
|
||||||
"regex-automata",
|
"regex-automata 0.4.9",
|
||||||
"regex-syntax",
|
"regex-syntax 0.8.5",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "regex-automata"
|
||||||
|
version = "0.1.10"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132"
|
||||||
|
dependencies = [
|
||||||
|
"regex-syntax 0.6.29",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
|
|
@ -3737,7 +3755,7 @@ checksum = "809e8dc61f6de73b46c85f4c96486310fe304c434cfa43669d7b40f711150908"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"aho-corasick",
|
"aho-corasick",
|
||||||
"memchr",
|
"memchr",
|
||||||
"regex-syntax",
|
"regex-syntax 0.8.5",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
|
|
@ -3746,6 +3764,12 @@ version = "0.1.6"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "53a49587ad06b26609c52e423de037e7f57f20d53535d66e08c695f347df952a"
|
checksum = "53a49587ad06b26609c52e423de037e7f57f20d53535d66e08c695f347df952a"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "regex-syntax"
|
||||||
|
version = "0.6.29"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "regex-syntax"
|
name = "regex-syntax"
|
||||||
version = "0.8.5"
|
version = "0.8.5"
|
||||||
|
|
@ -5199,10 +5223,14 @@ version = "0.3.19"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "e8189decb5ac0fa7bc8b96b7cb9b2701d60d48805aca84a238004d665fcc4008"
|
checksum = "e8189decb5ac0fa7bc8b96b7cb9b2701d60d48805aca84a238004d665fcc4008"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
|
"matchers",
|
||||||
"nu-ansi-term",
|
"nu-ansi-term",
|
||||||
|
"once_cell",
|
||||||
|
"regex",
|
||||||
"sharded-slab",
|
"sharded-slab",
|
||||||
"smallvec",
|
"smallvec",
|
||||||
"thread_local",
|
"thread_local",
|
||||||
|
"tracing",
|
||||||
"tracing-core",
|
"tracing-core",
|
||||||
"tracing-log",
|
"tracing-log",
|
||||||
]
|
]
|
||||||
|
|
|
||||||
|
|
@ -27,7 +27,7 @@ base64ct = "=1.8.0"
|
||||||
jsonwebtoken = "9"
|
jsonwebtoken = "9"
|
||||||
anyhow = "1"
|
anyhow = "1"
|
||||||
tracing = "0.1"
|
tracing = "0.1"
|
||||||
tracing-subscriber = "0.3"
|
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
|
||||||
tokio-util = { version = "0.7", features = ["io"] }
|
tokio-util = { version = "0.7", features = ["io"] }
|
||||||
futures-util = "0.3"
|
futures-util = "0.3"
|
||||||
futures = "0.3"
|
futures = "0.3"
|
||||||
|
|
|
||||||
|
|
@ -13,7 +13,17 @@ use readur::{
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() -> Result<()> {
|
async fn main() -> Result<()> {
|
||||||
tracing_subscriber::fmt::init();
|
// Initialize logging with custom filters to reduce spam from pdf_extract crate
|
||||||
|
let env_filter = tracing_subscriber::EnvFilter::try_from_default_env()
|
||||||
|
.unwrap_or_else(|_| {
|
||||||
|
tracing_subscriber::EnvFilter::new("info")
|
||||||
|
.add_directive("pdf_extract=error".parse().unwrap()) // Suppress pdf_extract WARN spam
|
||||||
|
.add_directive("readur=info".parse().unwrap()) // Keep our app logs at info
|
||||||
|
});
|
||||||
|
|
||||||
|
tracing_subscriber::fmt()
|
||||||
|
.with_env_filter(env_filter)
|
||||||
|
.init();
|
||||||
|
|
||||||
let matches = Command::new("batch_ingest")
|
let matches = Command::new("batch_ingest")
|
||||||
.about("Batch ingest files for OCR processing")
|
.about("Batch ingest files for OCR processing")
|
||||||
|
|
|
||||||
|
|
@ -8,7 +8,7 @@
|
||||||
|
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
use sha2::{Digest, Sha256};
|
use sha2::{Digest, Sha256};
|
||||||
use tracing::{info, warn};
|
use tracing::{debug, info, warn};
|
||||||
|
|
||||||
use crate::models::Document;
|
use crate::models::Document;
|
||||||
use crate::db::Database;
|
use crate::db::Database;
|
||||||
|
|
@ -66,7 +66,7 @@ impl DocumentIngestionService {
|
||||||
let file_hash = self.calculate_file_hash(&request.file_data);
|
let file_hash = self.calculate_file_hash(&request.file_data);
|
||||||
let file_size = request.file_data.len() as i64;
|
let file_size = request.file_data.len() as i64;
|
||||||
|
|
||||||
info!(
|
debug!(
|
||||||
"Ingesting document: {} for user {} (hash: {}, size: {} bytes, policy: {:?})",
|
"Ingesting document: {} for user {} (hash: {}, size: {} bytes, policy: {:?})",
|
||||||
request.filename, request.user_id, &file_hash[..8], file_size, request.deduplication_policy
|
request.filename, request.user_id, &file_hash[..8], file_size, request.deduplication_policy
|
||||||
);
|
);
|
||||||
|
|
@ -101,7 +101,7 @@ impl DocumentIngestionService {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok(None) => {
|
Ok(None) => {
|
||||||
info!("No duplicate content found, proceeding with new document creation");
|
debug!("No duplicate content found, proceeding with new document creation");
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
warn!("Error checking for duplicate content (hash: {}): {}", &file_hash[..8], e);
|
warn!("Error checking for duplicate content (hash: {}): {}", &file_hash[..8], e);
|
||||||
|
|
@ -163,7 +163,7 @@ impl DocumentIngestionService {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
info!(
|
debug!(
|
||||||
"Successfully ingested document: {} (ID: {}) for user {}",
|
"Successfully ingested document: {} (ID: {}) for user {}",
|
||||||
saved_document.original_filename, saved_document.id, request.user_id
|
saved_document.original_filename, saved_document.id, request.user_id
|
||||||
);
|
);
|
||||||
|
|
|
||||||
17
src/main.rs
17
src/main.rs
|
|
@ -52,7 +52,22 @@ fn determine_static_files_path() -> std::path::PathBuf {
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() -> anyhow::Result<()> {
|
async fn main() -> anyhow::Result<()> {
|
||||||
tracing_subscriber::fmt::init();
|
// Initialize logging with custom filters to reduce spam from pdf_extract crate
|
||||||
|
// Users can override with RUST_LOG environment variable, e.g.:
|
||||||
|
// RUST_LOG=debug cargo run (enable debug for all)
|
||||||
|
// RUST_LOG=readur=debug,pdf_extract=error (debug for readur, suppress pdf_extract)
|
||||||
|
// RUST_LOG=pdf_extract=off (completely silence pdf_extract)
|
||||||
|
let env_filter = tracing_subscriber::EnvFilter::try_from_default_env()
|
||||||
|
.unwrap_or_else(|_| {
|
||||||
|
// Default filter when RUST_LOG is not set
|
||||||
|
tracing_subscriber::EnvFilter::new("info")
|
||||||
|
.add_directive("pdf_extract=error".parse().unwrap()) // Suppress pdf_extract WARN spam
|
||||||
|
.add_directive("readur=info".parse().unwrap()) // Keep our app logs at info
|
||||||
|
});
|
||||||
|
|
||||||
|
tracing_subscriber::fmt()
|
||||||
|
.with_env_filter(env_filter)
|
||||||
|
.init();
|
||||||
|
|
||||||
println!("\n🚀 READUR APPLICATION STARTUP");
|
println!("\n🚀 READUR APPLICATION STARTUP");
|
||||||
println!("{}", "=".repeat(60));
|
println!("{}", "=".repeat(60));
|
||||||
|
|
|
||||||
|
|
@ -5,7 +5,7 @@ use chrono::Utc;
|
||||||
use tokio::sync::Semaphore;
|
use tokio::sync::Semaphore;
|
||||||
use tokio_util::sync::CancellationToken;
|
use tokio_util::sync::CancellationToken;
|
||||||
use futures::stream::{FuturesUnordered, StreamExt};
|
use futures::stream::{FuturesUnordered, StreamExt};
|
||||||
use tracing::{error, info, warn};
|
use tracing::{debug, error, info, warn};
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
|
|
@ -126,10 +126,10 @@ impl SourceSyncService {
|
||||||
|folder_path| {
|
|folder_path| {
|
||||||
let service = webdav_service.clone();
|
let service = webdav_service.clone();
|
||||||
async move {
|
async move {
|
||||||
info!("WebDAV discover_files_in_folder called for: {}", folder_path);
|
debug!("WebDAV discover_files_in_folder called for: {}", folder_path);
|
||||||
let result = service.discover_files_in_folder(&folder_path).await;
|
let result = service.discover_files_in_folder(&folder_path).await;
|
||||||
match &result {
|
match &result {
|
||||||
Ok(files) => info!("WebDAV discovered {} files in folder: {}", files.len(), folder_path),
|
Ok(files) => debug!("WebDAV discovered {} files in folder: {}", files.len(), folder_path),
|
||||||
Err(e) => error!("WebDAV discovery failed for folder {}: {}", folder_path, e),
|
Err(e) => error!("WebDAV discovery failed for folder {}: {}", folder_path, e),
|
||||||
}
|
}
|
||||||
result
|
result
|
||||||
|
|
@ -138,10 +138,10 @@ impl SourceSyncService {
|
||||||
|file_path| {
|
|file_path| {
|
||||||
let service = webdav_service.clone();
|
let service = webdav_service.clone();
|
||||||
async move {
|
async move {
|
||||||
info!("WebDAV download_file called for: {}", file_path);
|
debug!("WebDAV download_file called for: {}", file_path);
|
||||||
let result = service.download_file(&file_path).await;
|
let result = service.download_file(&file_path).await;
|
||||||
match &result {
|
match &result {
|
||||||
Ok(data) => info!("WebDAV downloaded {} bytes for file: {}", data.len(), file_path),
|
Ok(data) => debug!("WebDAV downloaded {} bytes for file: {}", data.len(), file_path),
|
||||||
Err(e) => error!("WebDAV download failed for file {}: {}", file_path, e),
|
Err(e) => error!("WebDAV download failed for file {}: {}", file_path, e),
|
||||||
}
|
}
|
||||||
result
|
result
|
||||||
|
|
@ -287,7 +287,7 @@ impl SourceSyncService {
|
||||||
Ok(processed) => {
|
Ok(processed) => {
|
||||||
if processed {
|
if processed {
|
||||||
folder_files_processed += 1;
|
folder_files_processed += 1;
|
||||||
info!("Successfully processed file ({} completed in this folder)", folder_files_processed);
|
debug!("Successfully processed file ({} completed in this folder)", folder_files_processed);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Err(error) => {
|
Err(error) => {
|
||||||
|
|
@ -475,7 +475,7 @@ impl SourceSyncService {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
info!("Successfully processed file ({} completed in this folder, {} total)", folder_files_processed, total_files_processed);
|
debug!("Successfully processed file ({} completed in this folder, {} total)", folder_files_processed, total_files_processed);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Err(error) => {
|
Err(error) => {
|
||||||
|
|
@ -520,13 +520,13 @@ impl SourceSyncService {
|
||||||
let _permit = semaphore.acquire().await
|
let _permit = semaphore.acquire().await
|
||||||
.map_err(|e| anyhow!("Semaphore error: {}", e))?;
|
.map_err(|e| anyhow!("Semaphore error: {}", e))?;
|
||||||
|
|
||||||
info!("Processing file: {}", file_info.path);
|
debug!("Processing file: {}", file_info.path);
|
||||||
|
|
||||||
// Download the file
|
// Download the file
|
||||||
let file_data = download_file(file_info.path.clone()).await
|
let file_data = download_file(file_info.path.clone()).await
|
||||||
.map_err(|e| anyhow!("Failed to download {}: {}", file_info.path, e))?;
|
.map_err(|e| anyhow!("Failed to download {}: {}", file_info.path, e))?;
|
||||||
|
|
||||||
info!("Downloaded file: {} ({} bytes)", file_info.name, file_data.len());
|
debug!("Downloaded file: {} ({} bytes)", file_info.name, file_data.len());
|
||||||
|
|
||||||
// Use the unified ingestion service for consistent deduplication
|
// Use the unified ingestion service for consistent deduplication
|
||||||
let file_service = FileService::new(state.config.upload_path.clone());
|
let file_service = FileService::new(state.config.upload_path.clone());
|
||||||
|
|
@ -546,7 +546,7 @@ impl SourceSyncService {
|
||||||
|
|
||||||
let (document, should_queue_ocr) = match result {
|
let (document, should_queue_ocr) = match result {
|
||||||
IngestionResult::Created(doc) => {
|
IngestionResult::Created(doc) => {
|
||||||
info!("Created new document for {}: {}", file_info.name, doc.id);
|
debug!("Created new document for {}: {}", file_info.name, doc.id);
|
||||||
(doc, true) // New document - queue for OCR
|
(doc, true) // New document - queue for OCR
|
||||||
}
|
}
|
||||||
IngestionResult::Skipped { existing_document_id, reason } => {
|
IngestionResult::Skipped { existing_document_id, reason } => {
|
||||||
|
|
@ -554,7 +554,7 @@ impl SourceSyncService {
|
||||||
return Ok(false); // File was skipped due to deduplication
|
return Ok(false); // File was skipped due to deduplication
|
||||||
}
|
}
|
||||||
IngestionResult::ExistingDocument(doc) => {
|
IngestionResult::ExistingDocument(doc) => {
|
||||||
info!("Found existing document for {}: {}", file_info.name, doc.id);
|
debug!("Found existing document for {}: {}", file_info.name, doc.id);
|
||||||
(doc, false) // Existing document - don't re-queue OCR
|
(doc, false) // Existing document - don't re-queue OCR
|
||||||
}
|
}
|
||||||
IngestionResult::TrackedAsDuplicate { existing_document_id } => {
|
IngestionResult::TrackedAsDuplicate { existing_document_id } => {
|
||||||
|
|
@ -565,7 +565,7 @@ impl SourceSyncService {
|
||||||
|
|
||||||
// Queue for OCR if enabled and this is a new document
|
// Queue for OCR if enabled and this is a new document
|
||||||
if enable_background_ocr && should_queue_ocr {
|
if enable_background_ocr && should_queue_ocr {
|
||||||
info!("Background OCR enabled, queueing document {} for processing", document.id);
|
debug!("Background OCR enabled, queueing document {} for processing", document.id);
|
||||||
|
|
||||||
let priority = if file_info.size <= 1024 * 1024 { 10 }
|
let priority = if file_info.size <= 1024 * 1024 { 10 }
|
||||||
else if file_info.size <= 5 * 1024 * 1024 { 8 }
|
else if file_info.size <= 5 * 1024 * 1024 { 8 }
|
||||||
|
|
@ -576,7 +576,7 @@ impl SourceSyncService {
|
||||||
if let Err(e) = state.queue_service.enqueue_document(document.id, priority, file_info.size).await {
|
if let Err(e) = state.queue_service.enqueue_document(document.id, priority, file_info.size).await {
|
||||||
error!("Failed to enqueue document for OCR: {}", e);
|
error!("Failed to enqueue document for OCR: {}", e);
|
||||||
} else {
|
} else {
|
||||||
info!("Enqueued document {} for OCR processing", document.id);
|
debug!("Enqueued document {} for OCR processing", document.id);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -606,7 +606,7 @@ impl SourceSyncService {
|
||||||
let _permit = semaphore.acquire().await
|
let _permit = semaphore.acquire().await
|
||||||
.map_err(|e| anyhow!("Semaphore error: {}", e))?;
|
.map_err(|e| anyhow!("Semaphore error: {}", e))?;
|
||||||
|
|
||||||
info!("Processing file: {}", file_info.path);
|
debug!("Processing file: {}", file_info.path);
|
||||||
|
|
||||||
// Check for cancellation again after acquiring semaphore
|
// Check for cancellation again after acquiring semaphore
|
||||||
if cancellation_token.is_cancelled() {
|
if cancellation_token.is_cancelled() {
|
||||||
|
|
@ -624,7 +624,7 @@ impl SourceSyncService {
|
||||||
return Err(anyhow!("Processing cancelled"));
|
return Err(anyhow!("Processing cancelled"));
|
||||||
}
|
}
|
||||||
|
|
||||||
info!("Downloaded file: {} ({} bytes)", file_info.name, file_data.len());
|
debug!("Downloaded file: {} ({} bytes)", file_info.name, file_data.len());
|
||||||
|
|
||||||
// Check for cancellation before processing
|
// Check for cancellation before processing
|
||||||
if cancellation_token.is_cancelled() {
|
if cancellation_token.is_cancelled() {
|
||||||
|
|
@ -650,7 +650,7 @@ impl SourceSyncService {
|
||||||
|
|
||||||
let (document, should_queue_ocr) = match result {
|
let (document, should_queue_ocr) = match result {
|
||||||
IngestionResult::Created(doc) => {
|
IngestionResult::Created(doc) => {
|
||||||
info!("Created new document for {}: {}", file_info.name, doc.id);
|
debug!("Created new document for {}: {}", file_info.name, doc.id);
|
||||||
(doc, true) // New document - queue for OCR
|
(doc, true) // New document - queue for OCR
|
||||||
}
|
}
|
||||||
IngestionResult::Skipped { existing_document_id, reason } => {
|
IngestionResult::Skipped { existing_document_id, reason } => {
|
||||||
|
|
@ -658,7 +658,7 @@ impl SourceSyncService {
|
||||||
return Ok(false); // File was skipped due to deduplication
|
return Ok(false); // File was skipped due to deduplication
|
||||||
}
|
}
|
||||||
IngestionResult::ExistingDocument(doc) => {
|
IngestionResult::ExistingDocument(doc) => {
|
||||||
info!("Found existing document for {}: {}", file_info.name, doc.id);
|
debug!("Found existing document for {}: {}", file_info.name, doc.id);
|
||||||
(doc, false) // Existing document - don't re-queue OCR
|
(doc, false) // Existing document - don't re-queue OCR
|
||||||
}
|
}
|
||||||
IngestionResult::TrackedAsDuplicate { existing_document_id } => {
|
IngestionResult::TrackedAsDuplicate { existing_document_id } => {
|
||||||
|
|
@ -669,7 +669,7 @@ impl SourceSyncService {
|
||||||
|
|
||||||
// Queue for OCR if enabled and this is a new document (OCR continues even if sync is cancelled)
|
// Queue for OCR if enabled and this is a new document (OCR continues even if sync is cancelled)
|
||||||
if enable_background_ocr && should_queue_ocr {
|
if enable_background_ocr && should_queue_ocr {
|
||||||
info!("Background OCR enabled, queueing document {} for processing", document.id);
|
debug!("Background OCR enabled, queueing document {} for processing", document.id);
|
||||||
|
|
||||||
let priority = if file_info.size <= 1024 * 1024 { 10 }
|
let priority = if file_info.size <= 1024 * 1024 { 10 }
|
||||||
else if file_info.size <= 5 * 1024 * 1024 { 8 }
|
else if file_info.size <= 5 * 1024 * 1024 { 8 }
|
||||||
|
|
@ -680,7 +680,7 @@ impl SourceSyncService {
|
||||||
if let Err(e) = state.queue_service.enqueue_document(document.id, priority, file_info.size).await {
|
if let Err(e) = state.queue_service.enqueue_document(document.id, priority, file_info.size).await {
|
||||||
error!("Failed to enqueue document for OCR: {}", e);
|
error!("Failed to enqueue document for OCR: {}", e);
|
||||||
} else {
|
} else {
|
||||||
info!("Enqueued document {} for OCR processing", document.id);
|
debug!("Enqueued document {} for OCR processing", document.id);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue