fix(web_upload): resolve issue that caused files that were uploaded via the web, to not be added to the queue
This commit is contained in:
parent
b356017484
commit
a4b9626616
|
|
@ -362,8 +362,11 @@ async fn main() -> anyhow::Result<()> {
|
|||
// Start OCR queue worker on dedicated OCR runtime using shared queue service
|
||||
let queue_worker = shared_queue_service.clone();
|
||||
ocr_runtime.spawn(async move {
|
||||
info!("🚀 Starting OCR queue worker...");
|
||||
if let Err(e) = queue_worker.start_worker().await {
|
||||
error!("OCR queue worker error: {}", e);
|
||||
error!("❌ OCR queue worker error: {}", e);
|
||||
} else {
|
||||
info!("✅ OCR queue worker started successfully");
|
||||
}
|
||||
});
|
||||
|
||||
|
|
|
|||
|
|
@ -146,8 +146,18 @@ impl OcrQueueService {
|
|||
|
||||
/// Get the next item from the queue with atomic job claiming and retry logic
|
||||
pub async fn dequeue(&self) -> Result<Option<OcrQueueItem>> {
|
||||
crate::debug_log!("OCR_QUEUE",
|
||||
"worker_id" => &self.worker_id,
|
||||
"message" => "Starting dequeue operation"
|
||||
);
|
||||
|
||||
// Retry up to 3 times for race condition scenarios
|
||||
for attempt in 1..=3 {
|
||||
crate::debug_log!("OCR_QUEUE",
|
||||
"worker_id" => &self.worker_id,
|
||||
"attempt" => attempt,
|
||||
"message" => "Attempting to dequeue job"
|
||||
);
|
||||
// Use a transaction to ensure atomic job claiming
|
||||
let mut tx = self.pool.begin().await?;
|
||||
|
||||
|
|
@ -169,8 +179,24 @@ impl OcrQueueService {
|
|||
.await?;
|
||||
|
||||
let job_id = match job_row {
|
||||
Some(ref row) => row.get::<Uuid, _>("id"),
|
||||
Some(ref row) => {
|
||||
let job_id = row.get::<Uuid, _>("id");
|
||||
let document_id = row.get::<Uuid, _>("document_id");
|
||||
crate::debug_log!("OCR_QUEUE",
|
||||
"worker_id" => &self.worker_id,
|
||||
"job_id" => job_id,
|
||||
"document_id" => document_id,
|
||||
"attempt" => attempt,
|
||||
"message" => "Found pending job in queue"
|
||||
);
|
||||
job_id
|
||||
},
|
||||
None => {
|
||||
crate::debug_log!("OCR_QUEUE",
|
||||
"worker_id" => &self.worker_id,
|
||||
"attempt" => attempt,
|
||||
"message" => "No pending jobs found in queue"
|
||||
);
|
||||
// No jobs available
|
||||
tx.rollback().await?;
|
||||
return Ok(None);
|
||||
|
|
@ -196,10 +222,24 @@ impl OcrQueueService {
|
|||
|
||||
if updated_rows.rows_affected() != 1 {
|
||||
// Job was claimed by another worker between SELECT and UPDATE
|
||||
crate::debug_log!("OCR_QUEUE",
|
||||
"worker_id" => &self.worker_id,
|
||||
"job_id" => job_id,
|
||||
"attempt" => attempt,
|
||||
"rows_affected" => updated_rows.rows_affected(),
|
||||
"message" => "Job was claimed by another worker, retrying"
|
||||
);
|
||||
tx.rollback().await?;
|
||||
warn!("Job {} was claimed by another worker, retrying", job_id);
|
||||
return Ok(None);
|
||||
continue; // Continue to next attempt instead of returning
|
||||
}
|
||||
|
||||
crate::debug_log!("OCR_QUEUE",
|
||||
"worker_id" => &self.worker_id,
|
||||
"job_id" => job_id,
|
||||
"attempt" => attempt,
|
||||
"message" => "Successfully claimed job, updating to processing state"
|
||||
);
|
||||
|
||||
// Step 3: Get the updated job details
|
||||
let row = sqlx::query(
|
||||
|
|
@ -566,18 +606,41 @@ impl OcrQueueService {
|
|||
"Starting OCR worker {} with {} concurrent jobs",
|
||||
self.worker_id, self.max_concurrent_jobs
|
||||
);
|
||||
|
||||
crate::debug_log!("OCR_WORKER",
|
||||
"worker_id" => &self.worker_id,
|
||||
"max_concurrent_jobs" => self.max_concurrent_jobs,
|
||||
"message" => "OCR worker loop starting"
|
||||
);
|
||||
|
||||
loop {
|
||||
// Check if processing is paused
|
||||
if self.is_paused() {
|
||||
crate::debug_log!("OCR_WORKER",
|
||||
"worker_id" => &self.worker_id,
|
||||
"message" => "OCR processing is paused, waiting..."
|
||||
);
|
||||
info!("OCR processing is paused, waiting...");
|
||||
sleep(Duration::from_secs(5)).await;
|
||||
continue;
|
||||
}
|
||||
|
||||
crate::debug_log!("OCR_WORKER",
|
||||
"worker_id" => &self.worker_id,
|
||||
"message" => "Worker loop iteration - checking for items to process"
|
||||
);
|
||||
|
||||
// Check for items to process
|
||||
match self.dequeue().await {
|
||||
Ok(Some(item)) => {
|
||||
crate::debug_log!("OCR_WORKER",
|
||||
"worker_id" => &self.worker_id,
|
||||
"job_id" => item.id,
|
||||
"document_id" => item.document_id,
|
||||
"priority" => item.priority,
|
||||
"message" => "Dequeued job, spawning processing task"
|
||||
);
|
||||
|
||||
let permit = semaphore.clone().acquire_owned().await?;
|
||||
let self_clone = self.clone();
|
||||
let ocr_service_clone = ocr_service.clone();
|
||||
|
|
@ -605,6 +668,10 @@ impl OcrQueueService {
|
|||
});
|
||||
}
|
||||
Ok(None) => {
|
||||
crate::debug_log!("OCR_WORKER",
|
||||
"worker_id" => &self.worker_id,
|
||||
"message" => "No items in queue, sleeping for 5 seconds"
|
||||
);
|
||||
// No items in queue or all jobs were claimed by other workers
|
||||
// Use exponential backoff to reduce database load when queue is empty
|
||||
sleep(Duration::from_secs(5)).await;
|
||||
|
|
|
|||
|
|
@ -98,6 +98,16 @@ pub async fn upload_document(
|
|||
match ingestion_service.ingest_document(request).await {
|
||||
Ok(IngestionResult::Created(document)) => {
|
||||
info!("Document uploaded successfully: {}", document.id);
|
||||
|
||||
// Auto-enqueue document for OCR processing
|
||||
let priority = 5; // Normal priority for direct uploads
|
||||
if let Err(e) = state.queue_service.enqueue_document(document.id, priority, document.file_size).await {
|
||||
error!("Failed to enqueue document {} for OCR: {}", document.id, e);
|
||||
// Don't fail the upload if OCR queueing fails, just log the error
|
||||
} else {
|
||||
info!("Document {} enqueued for OCR processing", document.id);
|
||||
}
|
||||
|
||||
Ok(Json(DocumentUploadResponse {
|
||||
document_id: document.id,
|
||||
filename: document.filename,
|
||||
|
|
|
|||
|
|
@ -82,19 +82,24 @@ async fn debug_ocr_content() {
|
|||
|
||||
println!("✅ User logged in successfully");
|
||||
|
||||
// Upload 2 documents with very distinctive content
|
||||
let doc1_content = "DOCUMENT-ONE-UNIQUE-SIGNATURE-12345-ALPHA";
|
||||
let doc2_content = "DOCUMENT-TWO-UNIQUE-SIGNATURE-67890-BETA";
|
||||
// Upload 2 test images that should trigger OCR processing
|
||||
let test_image1_path = "tests/test_images/test1.png";
|
||||
let test_image2_path = "tests/test_images/test2.jpg";
|
||||
|
||||
let part1 = reqwest::multipart::Part::text(doc1_content.to_string())
|
||||
.file_name("debug_doc1.txt".to_string())
|
||||
.mime_str("text/plain")
|
||||
let image1_data = std::fs::read(test_image1_path)
|
||||
.expect("Should be able to read test image 1");
|
||||
let image2_data = std::fs::read(test_image2_path)
|
||||
.expect("Should be able to read test image 2");
|
||||
|
||||
let part1 = reqwest::multipart::Part::bytes(image1_data)
|
||||
.file_name("test1.png".to_string())
|
||||
.mime_str("image/png")
|
||||
.expect("Valid mime type");
|
||||
let form1 = reqwest::multipart::Form::new().part("file", part1);
|
||||
|
||||
let part2 = reqwest::multipart::Part::text(doc2_content.to_string())
|
||||
.file_name("debug_doc2.txt".to_string())
|
||||
.mime_str("text/plain")
|
||||
let part2 = reqwest::multipart::Part::bytes(image2_data)
|
||||
.file_name("test2.jpg".to_string())
|
||||
.mime_str("image/jpeg")
|
||||
.expect("Valid mime type");
|
||||
let form2 = reqwest::multipart::Form::new().part("file", part2);
|
||||
|
||||
|
|
@ -226,54 +231,40 @@ async fn debug_ocr_content() {
|
|||
println!("\n🔍 DETAILED OCR ANALYSIS:");
|
||||
println!("=====================================");
|
||||
|
||||
println!("\n📋 Document 1 Analysis:");
|
||||
println!(" - Expected content: {}", doc1_content);
|
||||
println!("\n📋 Document 1 Analysis (test1.png):");
|
||||
println!(" - OCR status: {}", doc1_ocr["ocr_status"].as_str().unwrap_or("unknown"));
|
||||
println!(" - OCR text: {:?}", doc1_ocr["ocr_text"]);
|
||||
println!(" - OCR text length: {}", doc1_ocr["ocr_text"].as_str().unwrap_or("").len());
|
||||
println!(" - OCR confidence: {:?}", doc1_ocr["ocr_confidence"]);
|
||||
println!(" - OCR word count: {:?}", doc1_ocr["ocr_word_count"]);
|
||||
|
||||
println!("\n📋 Document 2 Analysis:");
|
||||
println!(" - Expected content: {}", doc2_content);
|
||||
println!("\n📋 Document 2 Analysis (test2.jpg):");
|
||||
println!(" - OCR status: {}", doc2_ocr["ocr_status"].as_str().unwrap_or("unknown"));
|
||||
println!(" - OCR text: {:?}", doc2_ocr["ocr_text"]);
|
||||
println!(" - OCR text length: {}", doc2_ocr["ocr_text"].as_str().unwrap_or("").len());
|
||||
println!(" - OCR confidence: {:?}", doc2_ocr["ocr_confidence"]);
|
||||
println!(" - OCR word count: {:?}", doc2_ocr["ocr_word_count"]);
|
||||
|
||||
// Check for corruption
|
||||
// Check for basic OCR functionality
|
||||
let doc1_text = doc1_ocr["ocr_text"].as_str().unwrap_or("");
|
||||
let doc2_text = doc2_ocr["ocr_text"].as_str().unwrap_or("");
|
||||
|
||||
let doc1_has_own_signature = doc1_text.contains("DOCUMENT-ONE-UNIQUE-SIGNATURE-12345-ALPHA");
|
||||
let doc1_has_other_signature = doc1_text.contains("DOCUMENT-TWO-UNIQUE-SIGNATURE-67890-BETA");
|
||||
let doc2_has_own_signature = doc2_text.contains("DOCUMENT-TWO-UNIQUE-SIGNATURE-67890-BETA");
|
||||
let doc2_has_other_signature = doc2_text.contains("DOCUMENT-ONE-UNIQUE-SIGNATURE-12345-ALPHA");
|
||||
|
||||
println!("\n🚨 CORRUPTION ANALYSIS:");
|
||||
println!(" Doc1 has own signature: {}", doc1_has_own_signature);
|
||||
println!(" Doc1 has Doc2's signature: {}", doc1_has_other_signature);
|
||||
println!(" Doc2 has own signature: {}", doc2_has_own_signature);
|
||||
println!(" Doc2 has Doc1's signature: {}", doc2_has_other_signature);
|
||||
println!("\n🔍 OCR ANALYSIS:");
|
||||
println!(" Document 1 has OCR text: {}", !doc1_text.is_empty());
|
||||
println!(" Document 2 has OCR text: {}", !doc2_text.is_empty());
|
||||
println!(" Documents have different content: {}", doc1_text != doc2_text);
|
||||
|
||||
if doc1_text == doc2_text && !doc1_text.is_empty() {
|
||||
println!("❌ IDENTICAL OCR TEXT DETECTED - Documents have the same content!");
|
||||
println!("This suggests potential OCR corruption or cross-contamination.");
|
||||
}
|
||||
|
||||
if doc1_text.is_empty() && doc2_text.is_empty() {
|
||||
println!("❌ EMPTY OCR TEXT - Both documents have no OCR content!");
|
||||
println!("⚠️ EMPTY OCR TEXT - Both documents have no OCR content!");
|
||||
println!("This might be expected if the test images contain no readable text.");
|
||||
}
|
||||
|
||||
if !doc1_has_own_signature || !doc2_has_own_signature {
|
||||
println!("❌ MISSING SIGNATURES - Documents don't contain their expected content!");
|
||||
}
|
||||
|
||||
if doc1_has_other_signature || doc2_has_other_signature {
|
||||
println!("❌ CROSS-CONTAMINATION - Documents contain each other's content!");
|
||||
}
|
||||
|
||||
if doc1_has_own_signature && doc2_has_own_signature && !doc1_has_other_signature && !doc2_has_other_signature {
|
||||
println!("✅ NO CORRUPTION DETECTED - All documents have correct content!");
|
||||
if !doc1_text.is_empty() && !doc2_text.is_empty() && doc1_text != doc2_text {
|
||||
println!("✅ OCR PROCESSING SUCCESSFUL - Documents have different content!");
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue