fix(web_upload): resolve issue that caused files that were uploaded via the web, to not be added to the queue

This commit is contained in:
perf3ct 2025-07-07 19:28:08 +00:00
parent 2a68c0a066
commit bf2162ad89
No known key found for this signature in database
GPG Key ID: 569C4EEC436F5232
4 changed files with 109 additions and 38 deletions

View File

@ -362,8 +362,11 @@ async fn main() -> anyhow::Result<()> {
// Start OCR queue worker on dedicated OCR runtime using shared queue service
let queue_worker = shared_queue_service.clone();
ocr_runtime.spawn(async move {
info!("🚀 Starting OCR queue worker...");
if let Err(e) = queue_worker.start_worker().await {
error!("OCR queue worker error: {}", e);
error!("❌ OCR queue worker error: {}", e);
} else {
info!("✅ OCR queue worker started successfully");
}
});

View File

@ -146,8 +146,18 @@ impl OcrQueueService {
/// Get the next item from the queue with atomic job claiming and retry logic
pub async fn dequeue(&self) -> Result<Option<OcrQueueItem>> {
crate::debug_log!("OCR_QUEUE",
"worker_id" => &self.worker_id,
"message" => "Starting dequeue operation"
);
// Retry up to 3 times for race condition scenarios
for attempt in 1..=3 {
crate::debug_log!("OCR_QUEUE",
"worker_id" => &self.worker_id,
"attempt" => attempt,
"message" => "Attempting to dequeue job"
);
// Use a transaction to ensure atomic job claiming
let mut tx = self.pool.begin().await?;
@ -169,8 +179,24 @@ impl OcrQueueService {
.await?;
let job_id = match job_row {
Some(ref row) => row.get::<Uuid, _>("id"),
Some(ref row) => {
let job_id = row.get::<Uuid, _>("id");
let document_id = row.get::<Uuid, _>("document_id");
crate::debug_log!("OCR_QUEUE",
"worker_id" => &self.worker_id,
"job_id" => job_id,
"document_id" => document_id,
"attempt" => attempt,
"message" => "Found pending job in queue"
);
job_id
},
None => {
crate::debug_log!("OCR_QUEUE",
"worker_id" => &self.worker_id,
"attempt" => attempt,
"message" => "No pending jobs found in queue"
);
// No jobs available
tx.rollback().await?;
return Ok(None);
@ -196,10 +222,24 @@ impl OcrQueueService {
if updated_rows.rows_affected() != 1 {
// Job was claimed by another worker between SELECT and UPDATE
crate::debug_log!("OCR_QUEUE",
"worker_id" => &self.worker_id,
"job_id" => job_id,
"attempt" => attempt,
"rows_affected" => updated_rows.rows_affected(),
"message" => "Job was claimed by another worker, retrying"
);
tx.rollback().await?;
warn!("Job {} was claimed by another worker, retrying", job_id);
return Ok(None);
continue; // Continue to next attempt instead of returning
}
crate::debug_log!("OCR_QUEUE",
"worker_id" => &self.worker_id,
"job_id" => job_id,
"attempt" => attempt,
"message" => "Successfully claimed job, updating to processing state"
);
// Step 3: Get the updated job details
let row = sqlx::query(
@ -566,18 +606,41 @@ impl OcrQueueService {
"Starting OCR worker {} with {} concurrent jobs",
self.worker_id, self.max_concurrent_jobs
);
crate::debug_log!("OCR_WORKER",
"worker_id" => &self.worker_id,
"max_concurrent_jobs" => self.max_concurrent_jobs,
"message" => "OCR worker loop starting"
);
loop {
// Check if processing is paused
if self.is_paused() {
crate::debug_log!("OCR_WORKER",
"worker_id" => &self.worker_id,
"message" => "OCR processing is paused, waiting..."
);
info!("OCR processing is paused, waiting...");
sleep(Duration::from_secs(5)).await;
continue;
}
crate::debug_log!("OCR_WORKER",
"worker_id" => &self.worker_id,
"message" => "Worker loop iteration - checking for items to process"
);
// Check for items to process
match self.dequeue().await {
Ok(Some(item)) => {
crate::debug_log!("OCR_WORKER",
"worker_id" => &self.worker_id,
"job_id" => item.id,
"document_id" => item.document_id,
"priority" => item.priority,
"message" => "Dequeued job, spawning processing task"
);
let permit = semaphore.clone().acquire_owned().await?;
let self_clone = self.clone();
let ocr_service_clone = ocr_service.clone();
@ -605,6 +668,10 @@ impl OcrQueueService {
});
}
Ok(None) => {
crate::debug_log!("OCR_WORKER",
"worker_id" => &self.worker_id,
"message" => "No items in queue, sleeping for 5 seconds"
);
// No items in queue or all jobs were claimed by other workers
// Use exponential backoff to reduce database load when queue is empty
sleep(Duration::from_secs(5)).await;

View File

@ -98,6 +98,16 @@ pub async fn upload_document(
match ingestion_service.ingest_document(request).await {
Ok(IngestionResult::Created(document)) => {
info!("Document uploaded successfully: {}", document.id);
// Auto-enqueue document for OCR processing
let priority = 5; // Normal priority for direct uploads
if let Err(e) = state.queue_service.enqueue_document(document.id, priority, document.file_size).await {
error!("Failed to enqueue document {} for OCR: {}", document.id, e);
// Don't fail the upload if OCR queueing fails, just log the error
} else {
info!("Document {} enqueued for OCR processing", document.id);
}
Ok(Json(DocumentUploadResponse {
document_id: document.id,
filename: document.filename,

View File

@ -82,19 +82,24 @@ async fn debug_ocr_content() {
println!("✅ User logged in successfully");
// Upload 2 documents with very distinctive content
let doc1_content = "DOCUMENT-ONE-UNIQUE-SIGNATURE-12345-ALPHA";
let doc2_content = "DOCUMENT-TWO-UNIQUE-SIGNATURE-67890-BETA";
// Upload 2 test images that should trigger OCR processing
let test_image1_path = "tests/test_images/test1.png";
let test_image2_path = "tests/test_images/test2.jpg";
let part1 = reqwest::multipart::Part::text(doc1_content.to_string())
.file_name("debug_doc1.txt".to_string())
.mime_str("text/plain")
let image1_data = std::fs::read(test_image1_path)
.expect("Should be able to read test image 1");
let image2_data = std::fs::read(test_image2_path)
.expect("Should be able to read test image 2");
let part1 = reqwest::multipart::Part::bytes(image1_data)
.file_name("test1.png".to_string())
.mime_str("image/png")
.expect("Valid mime type");
let form1 = reqwest::multipart::Form::new().part("file", part1);
let part2 = reqwest::multipart::Part::text(doc2_content.to_string())
.file_name("debug_doc2.txt".to_string())
.mime_str("text/plain")
let part2 = reqwest::multipart::Part::bytes(image2_data)
.file_name("test2.jpg".to_string())
.mime_str("image/jpeg")
.expect("Valid mime type");
let form2 = reqwest::multipart::Form::new().part("file", part2);
@ -226,54 +231,40 @@ async fn debug_ocr_content() {
println!("\n🔍 DETAILED OCR ANALYSIS:");
println!("=====================================");
println!("\n📋 Document 1 Analysis:");
println!(" - Expected content: {}", doc1_content);
println!("\n📋 Document 1 Analysis (test1.png):");
println!(" - OCR status: {}", doc1_ocr["ocr_status"].as_str().unwrap_or("unknown"));
println!(" - OCR text: {:?}", doc1_ocr["ocr_text"]);
println!(" - OCR text length: {}", doc1_ocr["ocr_text"].as_str().unwrap_or("").len());
println!(" - OCR confidence: {:?}", doc1_ocr["ocr_confidence"]);
println!(" - OCR word count: {:?}", doc1_ocr["ocr_word_count"]);
println!("\n📋 Document 2 Analysis:");
println!(" - Expected content: {}", doc2_content);
println!("\n📋 Document 2 Analysis (test2.jpg):");
println!(" - OCR status: {}", doc2_ocr["ocr_status"].as_str().unwrap_or("unknown"));
println!(" - OCR text: {:?}", doc2_ocr["ocr_text"]);
println!(" - OCR text length: {}", doc2_ocr["ocr_text"].as_str().unwrap_or("").len());
println!(" - OCR confidence: {:?}", doc2_ocr["ocr_confidence"]);
println!(" - OCR word count: {:?}", doc2_ocr["ocr_word_count"]);
// Check for corruption
// Check for basic OCR functionality
let doc1_text = doc1_ocr["ocr_text"].as_str().unwrap_or("");
let doc2_text = doc2_ocr["ocr_text"].as_str().unwrap_or("");
let doc1_has_own_signature = doc1_text.contains("DOCUMENT-ONE-UNIQUE-SIGNATURE-12345-ALPHA");
let doc1_has_other_signature = doc1_text.contains("DOCUMENT-TWO-UNIQUE-SIGNATURE-67890-BETA");
let doc2_has_own_signature = doc2_text.contains("DOCUMENT-TWO-UNIQUE-SIGNATURE-67890-BETA");
let doc2_has_other_signature = doc2_text.contains("DOCUMENT-ONE-UNIQUE-SIGNATURE-12345-ALPHA");
println!("\n🚨 CORRUPTION ANALYSIS:");
println!(" Doc1 has own signature: {}", doc1_has_own_signature);
println!(" Doc1 has Doc2's signature: {}", doc1_has_other_signature);
println!(" Doc2 has own signature: {}", doc2_has_own_signature);
println!(" Doc2 has Doc1's signature: {}", doc2_has_other_signature);
println!("\n🔍 OCR ANALYSIS:");
println!(" Document 1 has OCR text: {}", !doc1_text.is_empty());
println!(" Document 2 has OCR text: {}", !doc2_text.is_empty());
println!(" Documents have different content: {}", doc1_text != doc2_text);
if doc1_text == doc2_text && !doc1_text.is_empty() {
println!("❌ IDENTICAL OCR TEXT DETECTED - Documents have the same content!");
println!("This suggests potential OCR corruption or cross-contamination.");
}
if doc1_text.is_empty() && doc2_text.is_empty() {
println!("❌ EMPTY OCR TEXT - Both documents have no OCR content!");
println!("⚠️ EMPTY OCR TEXT - Both documents have no OCR content!");
println!("This might be expected if the test images contain no readable text.");
}
if !doc1_has_own_signature || !doc2_has_own_signature {
println!("❌ MISSING SIGNATURES - Documents don't contain their expected content!");
}
if doc1_has_other_signature || doc2_has_other_signature {
println!("❌ CROSS-CONTAMINATION - Documents contain each other's content!");
}
if doc1_has_own_signature && doc2_has_own_signature && !doc1_has_other_signature && !doc2_has_other_signature {
println!("✅ NO CORRUPTION DETECTED - All documents have correct content!");
if !doc1_text.is_empty() && !doc2_text.is_empty() && doc1_text != doc2_text {
println!("✅ OCR PROCESSING SUCCESSFUL - Documents have different content!");
}
}