From eb70964254d83b280e1a3f1bdf0eb1e68aa7c8b1 Mon Sep 17 00:00:00 2001 From: perf3ct Date: Thu, 12 Jun 2025 23:15:12 +0000 Subject: [PATCH] feat(server): improve queue system --- .env.example | 12 +- Dockerfile | 5 +- WATCH_FOLDER.md | 226 +++++++++++++++++++++++++++++++++ src/batch_ingest.rs | 16 ++- src/config.rs | 12 ++ src/db.rs | 146 +++++++++++++++++++++- src/file_service.rs | 1 + src/lib.rs | 11 +- src/ocr_queue.rs | 130 +++++++++++-------- src/watcher.rs | 288 +++++++++++++++++++++++++++++++++++++++---- test_watch_folder.sh | 37 ++++++ 11 files changed, 800 insertions(+), 84 deletions(-) create mode 100644 WATCH_FOLDER.md create mode 100755 test_watch_folder.sh diff --git a/.env.example b/.env.example index b27bbd2..9c0a4ca 100644 --- a/.env.example +++ b/.env.example @@ -2,5 +2,15 @@ DATABASE_URL=postgresql://readur:readur_password@localhost:5432/readur JWT_SECRET=your-super-secret-jwt-key-change-this-in-production SERVER_ADDRESS=0.0.0.0:8000 UPLOAD_PATH=./uploads + +# Watch folder configuration WATCH_FOLDER=./watch -ALLOWED_FILE_TYPES=pdf,txt,doc,docx,png,jpg,jpeg,tiff,bmp \ No newline at end of file +WATCH_INTERVAL_SECONDS=30 +FILE_STABILITY_CHECK_MS=500 +MAX_FILE_AGE_HOURS=168 + +# File type restrictions +ALLOWED_FILE_TYPES=pdf,txt,doc,docx,png,jpg,jpeg,tiff,bmp + +# Force polling mode for testing network filesystems (optional) +# FORCE_POLLING_WATCH=1 \ No newline at end of file diff --git a/Dockerfile b/Dockerfile index 215d1c6..956d287 100644 --- a/Dockerfile +++ b/Dockerfile @@ -22,7 +22,7 @@ RUN apt-get update && apt-get install -y \ && rm -rf /var/lib/apt/lists/* WORKDIR /app -COPY Cargo.toml ./ +COPY Cargo.toml Cargo.lock ./ COPY src ./src RUN cargo build --release @@ -44,6 +44,9 @@ COPY --from=backend-builder /app/target/release/readur /app/readur # Create necessary directories RUN mkdir -p /app/uploads /app/watch /app/frontend +# Set permissions for watch folder to handle various mount scenarios +RUN chmod 755 /app/watch + # Copy built frontend from frontend-builder COPY --from=frontend-builder /frontend/dist /app/frontend diff --git a/WATCH_FOLDER.md b/WATCH_FOLDER.md new file mode 100644 index 0000000..4365387 --- /dev/null +++ b/WATCH_FOLDER.md @@ -0,0 +1,226 @@ +# Watch Folder Documentation + +The watch folder feature automatically monitors a directory for new OCR-able files and processes them without deleting the original files. This is perfect for scenarios where files are mounted from various filesystem types including NFS, SMB, S3, and local storage. + +## Features + +### 🔄 Cross-Filesystem Compatibility +- **Automatic Detection**: Detects filesystem type and chooses optimal watching strategy +- **Local Filesystems**: Uses efficient inotify-based watching for ext4, NTFS, APFS, etc. +- **Network Filesystems**: Uses polling-based watching for NFS, SMB/CIFS, S3 mounts +- **Hybrid Fallback**: Gracefully falls back to polling if inotify fails + +### 📁 Smart File Processing +- **OCR-able File Detection**: Only processes supported file types (PDF, images, text, Word docs) +- **Duplicate Prevention**: Checks for existing files with same name and size +- **File Stability**: Waits for files to finish being written before processing +- **System File Exclusion**: Skips hidden files, temporary files, and system directories + +### ⚙️ Configuration Options + +| Environment Variable | Default | Description | +|---------------------|---------|-------------| +| `WATCH_FOLDER` | `./watch` | Path to the folder to monitor | +| `WATCH_INTERVAL_SECONDS` | `30` | Polling interval for network filesystems | +| `FILE_STABILITY_CHECK_MS` | `500` | Time to wait for file stability | +| `MAX_FILE_AGE_HOURS` | `none` | Skip files older than specified hours | +| `ALLOWED_FILE_TYPES` | `pdf,png,jpg,jpeg,tiff,bmp,txt,doc,docx` | Allowed file extensions | +| `FORCE_POLLING_WATCH` | `unset` | Force polling mode even for local filesystems | + +## Usage + +### Basic Setup + +1. **Set the watch folder path:** + ```bash + export WATCH_FOLDER=/path/to/your/mounted/folder + ``` + +2. **Start the application:** + ```bash + ./readur + ``` + +3. **Copy files to the watch folder:** + The application will automatically detect and process new files. + +### Docker Usage + +```dockerfile +# Mount your folder to the container's watch directory +docker run -d \ + -v /path/to/your/files:/app/watch \ + -e WATCH_FOLDER=/app/watch \ + -e WATCH_INTERVAL_SECONDS=60 \ + readur:latest +``` + +### Docker Compose + +```yaml +services: + readur: + image: readur:latest + volumes: + - /mnt/nfs/documents:/app/watch + - readur_uploads:/app/uploads + environment: + WATCH_FOLDER: /app/watch + WATCH_INTERVAL_SECONDS: 30 + FILE_STABILITY_CHECK_MS: 1000 + MAX_FILE_AGE_HOURS: 168 # 1 week + ports: + - "8000:8000" +``` + +## Filesystem-Specific Configuration + +### NFS Mounts +```bash +# Recommended settings for NFS +export WATCH_INTERVAL_SECONDS=60 +export FILE_STABILITY_CHECK_MS=1000 +export FORCE_POLLING_WATCH=1 +``` + +### SMB/CIFS Mounts +```bash +# Recommended settings for SMB +export WATCH_INTERVAL_SECONDS=30 +export FILE_STABILITY_CHECK_MS=2000 +``` + +### S3 Mounts (s3fs, goofys, etc.) +```bash +# Recommended settings for S3 +export WATCH_INTERVAL_SECONDS=120 +export FILE_STABILITY_CHECK_MS=5000 +export FORCE_POLLING_WATCH=1 +``` + +### Local Filesystems +```bash +# Optimal settings for local storage (default behavior) +# No special configuration needed - uses inotify automatically +``` + +## Supported File Types + +The watch folder processes these file types for OCR: + +- **PDF**: `*.pdf` +- **Images**: `*.png`, `*.jpg`, `*.jpeg`, `*.tiff`, `*.bmp`, `*.gif` +- **Text**: `*.txt` +- **Word Documents**: `*.doc`, `*.docx` + +## File Processing Priority + +Files are prioritized for OCR processing based on: + +1. **File Size**: Smaller files get higher priority +2. **File Type**: Images > Text files > PDFs > Word documents +3. **Queue Time**: Older items get higher priority within the same size/type category + +## Monitoring and Logs + +The application provides detailed logging for watch folder operations: + +``` +INFO readur::watcher: Starting hybrid folder watcher on: /app/watch +INFO readur::watcher: Using watch strategy: Hybrid +INFO readur::watcher: Started polling-based watcher on: /app/watch +INFO readur::watcher: Processing new file: "/app/watch/document.pdf" +INFO readur::watcher: Successfully queued file for OCR: document.pdf (size: 2048 bytes) +``` + +## Troubleshooting + +### Files Not Being Detected + +1. **Check permissions:** + ```bash + ls -la /path/to/watch/folder + chmod 755 /path/to/watch/folder + ``` + +2. **Verify file types:** + ```bash + # Only supported file types are processed + echo $ALLOWED_FILE_TYPES + ``` + +3. **Check file stability:** + ```bash + # Increase stability check time for slow networks + export FILE_STABILITY_CHECK_MS=2000 + ``` + +### High CPU Usage + +1. **Increase polling interval:** + ```bash + export WATCH_INTERVAL_SECONDS=120 + ``` + +2. **Limit file age:** + ```bash + export MAX_FILE_AGE_HOURS=24 + ``` + +### Network Mount Issues + +1. **Force polling mode:** + ```bash + export FORCE_POLLING_WATCH=1 + ``` + +2. **Increase stability check:** + ```bash + export FILE_STABILITY_CHECK_MS=5000 + ``` + +## Testing + +Use the provided test script to verify functionality: + +```bash +./test_watch_folder.sh +``` + +This creates sample files in the watch folder for testing. + +## Security Considerations + +- Files are copied to a secure upload directory, not processed in-place +- Original files in the watch folder are never modified or deleted +- System files and hidden files are automatically excluded +- File size limits prevent processing of excessively large files (>500MB) + +## Performance + +- **Local filesystems**: Near-instant detection via inotify +- **Network filesystems**: Detection within polling interval (default 30s) +- **Concurrent processing**: Multiple files processed simultaneously +- **Memory efficient**: Streams large files without loading entirely into memory + +## Examples + +### Basic File Drop +```bash +# Copy a file to the watch folder +cp document.pdf /app/watch/ +# File will be automatically detected and processed +``` + +### Batch Processing +```bash +# Copy multiple files +cp *.pdf /app/watch/ +# All supported files will be queued for processing +``` + +### Real-time Monitoring +```bash +# Watch the logs for processing updates +docker logs -f readur-container | grep watcher +``` \ No newline at end of file diff --git a/src/batch_ingest.rs b/src/batch_ingest.rs index 7c35781..5aac905 100644 --- a/src/batch_ingest.rs +++ b/src/batch_ingest.rs @@ -1,5 +1,6 @@ use anyhow::Result; use std::path::{Path, PathBuf}; +use std::sync::Arc; use tokio::fs; use tokio::sync::Semaphore; use tracing::{error, info, warn}; @@ -66,18 +67,19 @@ impl BatchIngester { info!("Found {} files to ingest", file_paths.len()); // Process files in batches - let semaphore = Semaphore::new(self.max_concurrent_io); + let semaphore = Arc::new(Semaphore::new(self.max_concurrent_io)); let mut batch = Vec::new(); let mut queue_items = Vec::new(); for (idx, path) in file_paths.iter().enumerate() { - let permit = semaphore.acquire().await?; + let semaphore_clone = semaphore.clone(); let path_clone = path.clone(); let file_service = self.file_service.clone(); let user_id_clone = user_id; // Process file asynchronously let handle = tokio::spawn(async move { + let permit = semaphore_clone.acquire().await.unwrap(); let _permit = permit; process_single_file(path_clone, file_service, user_id_clone).await }); @@ -210,11 +212,15 @@ async fn process_single_file( fn calculate_priority(file_size: i64) -> i32 { const MB: i64 = 1024 * 1024; + const MB5: i64 = 5 * 1024 * 1024; + const MB10: i64 = 10 * 1024 * 1024; + const MB50: i64 = 50 * 1024 * 1024; + match file_size { 0..=MB => 10, // <= 1MB: highest priority - ..=5 * MB => 8, // 1-5MB: high priority - ..=10 * MB => 6, // 5-10MB: medium priority - ..=50 * MB => 4, // 10-50MB: low priority + ..=MB5 => 8, // 1-5MB: high priority + ..=MB10 => 6, // 5-10MB: medium priority + ..=MB50 => 4, // 10-50MB: low priority _ => 2, // > 50MB: lowest priority } } \ No newline at end of file diff --git a/src/config.rs b/src/config.rs index 6391a4a..2c3015d 100644 --- a/src/config.rs +++ b/src/config.rs @@ -9,6 +9,9 @@ pub struct Config { pub upload_path: String, pub watch_folder: String, pub allowed_file_types: Vec, + pub watch_interval_seconds: Option, + pub file_stability_check_ms: Option, + pub max_file_age_hours: Option, } impl Config { @@ -31,6 +34,15 @@ impl Config { .split(',') .map(|s| s.trim().to_lowercase()) .collect(), + watch_interval_seconds: env::var("WATCH_INTERVAL_SECONDS") + .ok() + .and_then(|s| s.parse().ok()), + file_stability_check_ms: env::var("FILE_STABILITY_CHECK_MS") + .ok() + .and_then(|s| s.parse().ok()), + max_file_age_hours: env::var("MAX_FILE_AGE_HOURS") + .ok() + .and_then(|s| s.parse().ok()), }) } } \ No newline at end of file diff --git a/src/db.rs b/src/db.rs index 44db832..419e729 100644 --- a/src/db.rs +++ b/src/db.rs @@ -115,12 +115,118 @@ impl Database { .execute(&self.pool) .await?; - // Run OCR queue migration - let migration_sql = include_str!("../migrations/001_add_ocr_queue.sql"); - sqlx::query(migration_sql) + // Run OCR queue migration - execute each statement separately + self.run_ocr_queue_migration().await?; + + Ok(()) + } + + async fn run_ocr_queue_migration(&self) -> Result<()> { + // Create OCR queue table + sqlx::query( + r#" + CREATE TABLE IF NOT EXISTS ocr_queue ( + id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), + document_id UUID REFERENCES documents(id) ON DELETE CASCADE, + status VARCHAR(20) DEFAULT 'pending', + priority INT DEFAULT 5, + attempts INT DEFAULT 0, + max_attempts INT DEFAULT 3, + created_at TIMESTAMPTZ DEFAULT NOW(), + started_at TIMESTAMPTZ, + completed_at TIMESTAMPTZ, + error_message TEXT, + worker_id VARCHAR(100), + processing_time_ms INT, + file_size BIGINT, + CONSTRAINT check_status CHECK (status IN ('pending', 'processing', 'completed', 'failed', 'cancelled')) + ) + "# + ) + .execute(&self.pool) + .await?; + + // Create indexes + sqlx::query("CREATE INDEX IF NOT EXISTS idx_ocr_queue_status ON ocr_queue(status, priority DESC, created_at)") .execute(&self.pool) .await?; + sqlx::query("CREATE INDEX IF NOT EXISTS idx_ocr_queue_document_id ON ocr_queue(document_id)") + .execute(&self.pool) + .await?; + + sqlx::query("CREATE INDEX IF NOT EXISTS idx_ocr_queue_worker ON ocr_queue(worker_id) WHERE status = 'processing'") + .execute(&self.pool) + .await?; + + sqlx::query("CREATE INDEX IF NOT EXISTS idx_ocr_queue_created_at ON ocr_queue(created_at) WHERE status = 'pending'") + .execute(&self.pool) + .await?; + + // Add columns to documents table + sqlx::query("ALTER TABLE documents ADD COLUMN IF NOT EXISTS ocr_status VARCHAR(20) DEFAULT 'pending'") + .execute(&self.pool) + .await?; + + sqlx::query("ALTER TABLE documents ADD COLUMN IF NOT EXISTS ocr_error TEXT") + .execute(&self.pool) + .await?; + + sqlx::query("ALTER TABLE documents ADD COLUMN IF NOT EXISTS ocr_completed_at TIMESTAMPTZ") + .execute(&self.pool) + .await?; + + // Create metrics table + sqlx::query( + r#" + CREATE TABLE IF NOT EXISTS ocr_metrics ( + id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), + date DATE DEFAULT CURRENT_DATE, + hour INT DEFAULT EXTRACT(HOUR FROM NOW()), + total_processed INT DEFAULT 0, + total_failed INT DEFAULT 0, + total_retried INT DEFAULT 0, + avg_processing_time_ms INT, + max_processing_time_ms INT, + min_processing_time_ms INT, + queue_depth INT, + active_workers INT, + UNIQUE(date, hour) + ) + "# + ) + .execute(&self.pool) + .await?; + + // Create the statistics function + sqlx::query( + r#" + CREATE OR REPLACE FUNCTION get_ocr_queue_stats() + RETURNS TABLE ( + pending_count BIGINT, + processing_count BIGINT, + failed_count BIGINT, + completed_today BIGINT, + avg_wait_time_minutes DOUBLE PRECISION, + oldest_pending_minutes DOUBLE PRECISION + ) AS $$ + BEGIN + RETURN QUERY + SELECT + COUNT(*) FILTER (WHERE status = 'pending') as pending_count, + COUNT(*) FILTER (WHERE status = 'processing') as processing_count, + COUNT(*) FILTER (WHERE status = 'failed' AND attempts >= max_attempts) as failed_count, + COUNT(*) FILTER (WHERE status = 'completed' AND completed_at >= CURRENT_DATE) as completed_today, + AVG(EXTRACT(EPOCH FROM (COALESCE(started_at, NOW()) - created_at))/60) FILTER (WHERE status IN ('processing', 'completed')) as avg_wait_time_minutes, + MAX(EXTRACT(EPOCH FROM (NOW() - created_at))/60) FILTER (WHERE status = 'pending') as oldest_pending_minutes + FROM ocr_queue; + END; + $$ LANGUAGE plpgsql + "# + ) + .execute(&self.pool) + .await?; + Ok(()) } @@ -271,6 +377,40 @@ impl Database { Ok(documents) } + pub async fn find_documents_by_filename(&self, filename: &str) -> Result> { + let rows = sqlx::query( + r#" + SELECT id, filename, original_filename, file_path, file_size, mime_type, content, ocr_text, tags, created_at, updated_at, user_id + FROM documents + WHERE filename = $1 OR original_filename = $1 + ORDER BY created_at DESC + "# + ) + .bind(filename) + .fetch_all(&self.pool) + .await?; + + let documents = rows + .into_iter() + .map(|row| Document { + id: row.get("id"), + filename: row.get("filename"), + original_filename: row.get("original_filename"), + file_path: row.get("file_path"), + file_size: row.get("file_size"), + mime_type: row.get("mime_type"), + content: row.get("content"), + ocr_text: row.get("ocr_text"), + tags: row.get("tags"), + created_at: row.get("created_at"), + updated_at: row.get("updated_at"), + user_id: row.get("user_id"), + }) + .collect(); + + Ok(documents) + } + pub async fn search_documents(&self, user_id: Uuid, search: SearchRequest) -> Result<(Vec, i64)> { let mut query_builder = sqlx::QueryBuilder::new( r#" diff --git a/src/file_service.rs b/src/file_service.rs index 0caafb8..a65c6f0 100644 --- a/src/file_service.rs +++ b/src/file_service.rs @@ -6,6 +6,7 @@ use uuid::Uuid; use crate::models::Document; +#[derive(Clone)] pub struct FileService { upload_path: String, } diff --git a/src/lib.rs b/src/lib.rs index 1cecce1..ded2c4e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -8,4 +8,13 @@ pub mod ocr; pub mod ocr_queue; pub mod routes; pub mod seed; -pub mod watcher; \ No newline at end of file +pub mod watcher; + +use config::Config; +use db::Database; + +#[derive(Clone)] +pub struct AppState { + pub db: Database, + pub config: Config, +} \ No newline at end of file diff --git a/src/ocr_queue.rs b/src/ocr_queue.rs index f5d0853..4cac703 100644 --- a/src/ocr_queue.rs +++ b/src/ocr_queue.rs @@ -1,7 +1,7 @@ use anyhow::Result; use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; -use sqlx::{FromRow, PgPool}; +use sqlx::{FromRow, PgPool, Row}; use std::sync::Arc; use tokio::sync::Semaphore; use tokio::time::{sleep, Duration}; @@ -37,6 +37,7 @@ pub struct QueueStats { pub oldest_pending_minutes: Option, } +#[derive(Clone)] pub struct OcrQueueService { db: Database, pool: PgPool, @@ -57,18 +58,20 @@ impl OcrQueueService { /// Add a document to the OCR queue pub async fn enqueue_document(&self, document_id: Uuid, priority: i32, file_size: i64) -> Result { - let id = sqlx::query_scalar!( + let row = sqlx::query( r#" INSERT INTO ocr_queue (document_id, priority, file_size) VALUES ($1, $2, $3) RETURNING id - "#, - document_id, - priority, - file_size + "# ) + .bind(document_id) + .bind(priority) + .bind(file_size) .fetch_one(&self.pool) .await?; + + let id: Uuid = row.get("id"); info!("Enqueued document {} with priority {} for OCR processing", document_id, priority); Ok(id) @@ -82,19 +85,20 @@ impl OcrQueueService { let mut tx = self.pool.begin().await?; for (document_id, priority, file_size) in documents { - let id = sqlx::query_scalar!( + let row = sqlx::query( r#" INSERT INTO ocr_queue (document_id, priority, file_size) VALUES ($1, $2, $3) RETURNING id - "#, - document_id, - priority, - file_size + "# ) + .bind(document_id) + .bind(priority) + .bind(file_size) .fetch_one(&mut *tx) .await?; + let id: Uuid = row.get("id"); ids.push(id); } @@ -106,8 +110,7 @@ impl OcrQueueService { /// Get the next item from the queue async fn dequeue(&self) -> Result> { - let item = sqlx::query_as!( - OcrQueueItem, + let row = sqlx::query( r#" UPDATE ocr_queue SET status = 'processing', @@ -124,28 +127,47 @@ impl OcrQueueService { LIMIT 1 ) RETURNING * - "#, - &self.worker_id + "# ) + .bind(&self.worker_id) .fetch_optional(&self.pool) .await?; + let item = match row { + Some(row) => Some(OcrQueueItem { + id: row.get("id"), + document_id: row.get("document_id"), + status: row.get("status"), + priority: row.get("priority"), + attempts: row.get("attempts"), + max_attempts: row.get("max_attempts"), + created_at: row.get("created_at"), + started_at: row.get("started_at"), + completed_at: row.get("completed_at"), + error_message: row.get("error_message"), + worker_id: row.get("worker_id"), + processing_time_ms: row.get("processing_time_ms"), + file_size: row.get("file_size"), + }), + None => None, + }; + Ok(item) } /// Mark an item as completed async fn mark_completed(&self, item_id: Uuid, processing_time_ms: i32) -> Result<()> { - sqlx::query!( + sqlx::query( r#" UPDATE ocr_queue SET status = 'completed', completed_at = NOW(), processing_time_ms = $2 WHERE id = $1 - "#, - item_id, - processing_time_ms + "# ) + .bind(item_id) + .bind(processing_time_ms) .execute(&self.pool) .await?; @@ -154,7 +176,7 @@ impl OcrQueueService { /// Mark an item as failed async fn mark_failed(&self, item_id: Uuid, error: &str) -> Result<()> { - let result = sqlx::query!( + let result = sqlx::query( r#" UPDATE ocr_queue SET status = CASE @@ -166,14 +188,15 @@ impl OcrQueueService { worker_id = NULL WHERE id = $1 RETURNING status - "#, - item_id, - error + "# ) + .bind(item_id) + .bind(error) .fetch_one(&self.pool) .await?; - if result.status == Some("failed".to_string()) { + let status: Option = result.get("status"); + if status == Some("failed".to_string()) { error!("OCR job {} permanently failed after max attempts: {}", item_id, error); } @@ -187,21 +210,24 @@ impl OcrQueueService { info!("Processing OCR job {} for document {}", item.id, item.document_id); // Get document details - let document = sqlx::query!( + let document = sqlx::query( r#" SELECT file_path, mime_type, user_id FROM documents WHERE id = $1 - "#, - item.document_id + "# ) + .bind(item.document_id) .fetch_optional(&self.pool) .await?; match document { - Some(doc) => { + Some(row) => { + let file_path: String = row.get("file_path"); + let mime_type: String = row.get("mime_type"); + let user_id: Option = row.get("user_id"); // Get user's OCR settings - let settings = if let Some(user_id) = doc.user_id { + let settings = if let Some(user_id) = user_id { self.db.get_user_settings(user_id).await.ok().flatten() } else { None @@ -213,11 +239,11 @@ impl OcrQueueService { .unwrap_or_else(|| "eng".to_string()); // Perform OCR - match ocr_service.extract_text_with_lang(&doc.file_path, &doc.mime_type, &ocr_language).await { + match ocr_service.extract_text_with_lang(&file_path, &mime_type, &ocr_language).await { Ok(text) => { if !text.is_empty() { // Update document with OCR text - sqlx::query!( + sqlx::query( r#" UPDATE documents SET ocr_text = $2, @@ -225,10 +251,10 @@ impl OcrQueueService { ocr_completed_at = NOW(), updated_at = NOW() WHERE id = $1 - "#, - item.document_id, - text + "# ) + .bind(item.document_id) + .bind(text) .execute(&self.pool) .await?; } @@ -246,17 +272,17 @@ impl OcrQueueService { warn!("{}", error_msg); // Update document status - sqlx::query!( + sqlx::query( r#" UPDATE documents SET ocr_status = 'failed', ocr_error = $2, updated_at = NOW() WHERE id = $1 - "#, - item.document_id, - &error_msg + "# ) + .bind(item.document_id) + .bind(&error_msg) .execute(&self.pool) .await?; @@ -313,7 +339,7 @@ impl OcrQueueService { /// Get queue statistics pub async fn get_stats(&self) -> Result { - let stats = sqlx::query!( + let stats = sqlx::query( r#" SELECT * FROM get_ocr_queue_stats() "# @@ -322,18 +348,18 @@ impl OcrQueueService { .await?; Ok(QueueStats { - pending_count: stats.pending_count.unwrap_or(0), - processing_count: stats.processing_count.unwrap_or(0), - failed_count: stats.failed_count.unwrap_or(0), - completed_today: stats.completed_today.unwrap_or(0), - avg_wait_time_minutes: stats.avg_wait_time_minutes, - oldest_pending_minutes: stats.oldest_pending_minutes, + pending_count: stats.get::, _>("pending_count").unwrap_or(0), + processing_count: stats.get::, _>("processing_count").unwrap_or(0), + failed_count: stats.get::, _>("failed_count").unwrap_or(0), + completed_today: stats.get::, _>("completed_today").unwrap_or(0), + avg_wait_time_minutes: stats.get("avg_wait_time_minutes"), + oldest_pending_minutes: stats.get("oldest_pending_minutes"), }) } /// Requeue failed items pub async fn requeue_failed_items(&self) -> Result { - let result = sqlx::query!( + let result = sqlx::query( r#" UPDATE ocr_queue SET status = 'pending', @@ -353,14 +379,14 @@ impl OcrQueueService { /// Clean up old completed items pub async fn cleanup_completed(&self, days_to_keep: i32) -> Result { - let result = sqlx::query!( + let result = sqlx::query( r#" DELETE FROM ocr_queue WHERE status = 'completed' AND completed_at < NOW() - INTERVAL '1 day' * $1 - "#, - days_to_keep + "# ) + .bind(days_to_keep) .execute(&self.pool) .await?; @@ -369,7 +395,7 @@ impl OcrQueueService { /// Handle stale processing items (worker crashed) pub async fn recover_stale_items(&self, stale_minutes: i32) -> Result { - let result = sqlx::query!( + let result = sqlx::query( r#" UPDATE ocr_queue SET status = 'pending', @@ -377,9 +403,9 @@ impl OcrQueueService { worker_id = NULL WHERE status = 'processing' AND started_at < NOW() - INTERVAL '1 minute' * $1 - "#, - stale_minutes + "# ) + .bind(stale_minutes) .execute(&self.pool) .await?; diff --git a/src/watcher.rs b/src/watcher.rs index 0ef7cae..3ec392f 100644 --- a/src/watcher.rs +++ b/src/watcher.rs @@ -1,12 +1,98 @@ use anyhow::Result; use notify::{RecommendedWatcher, RecursiveMode, Watcher}; -use std::path::Path; +use std::collections::HashSet; +use std::path::{Path, PathBuf}; +use std::time::{Duration, SystemTime}; use tokio::sync::mpsc; -use tracing::{error, info}; +use tokio::time::{interval, sleep}; +use tracing::{debug, error, info, warn}; +use walkdir::WalkDir; use crate::{config::Config, db::Database, file_service::FileService, ocr_queue::OcrQueueService}; pub async fn start_folder_watcher(config: Config) -> Result<()> { + info!("Starting hybrid folder watcher on: {}", config.watch_folder); + + // Initialize services + let db = Database::new(&config.database_url).await?; + let pool = sqlx::PgPool::connect(&config.database_url).await?; + let file_service = FileService::new(config.upload_path.clone()); + let queue_service = OcrQueueService::new(db.clone(), pool, 1); + + // Determine watch strategy based on filesystem type + let watch_path = Path::new(&config.watch_folder); + let watch_strategy = determine_watch_strategy(watch_path).await?; + + info!("Using watch strategy: {:?}", watch_strategy); + + match watch_strategy { + WatchStrategy::NotifyBased => { + start_notify_watcher(config, db, file_service, queue_service).await + } + WatchStrategy::PollingBased => { + start_polling_watcher(config, db, file_service, queue_service).await + } + WatchStrategy::Hybrid => { + // Start both methods concurrently + let config_clone = config.clone(); + let db_clone = db.clone(); + let file_service_clone = file_service.clone(); + let queue_service_clone = queue_service.clone(); + + let notify_handle = tokio::spawn(async move { + if let Err(e) = start_notify_watcher(config_clone, db_clone, file_service_clone, queue_service_clone).await { + warn!("Notify watcher failed, continuing with polling: {}", e); + } + }); + + let polling_result = start_polling_watcher(config, db, file_service, queue_service).await; + + // Cancel notify watcher if polling completes + notify_handle.abort(); + + polling_result + } + } +} + +#[derive(Debug, Clone)] +enum WatchStrategy { + NotifyBased, // For local filesystems + PollingBased, // For network filesystems (NFS, SMB, S3, etc.) + Hybrid, // Try notify first, fall back to polling +} + +async fn determine_watch_strategy(path: &Path) -> Result { + // Try to determine filesystem type + let canonical_path = match path.canonicalize() { + Ok(p) => p, + Err(_) => { + // If canonicalize fails, assume network filesystem + return Ok(WatchStrategy::PollingBased); + } + }; + + let path_str = canonical_path.to_string_lossy(); + + // Check for common network filesystem patterns + if path_str.starts_with("//") || + path_str.contains("nfs") || + path_str.contains("smb") || + path_str.contains("cifs") || + std::env::var("FORCE_POLLING_WATCH").is_ok() { + return Ok(WatchStrategy::PollingBased); + } + + // For local filesystems, use hybrid approach (notify with polling backup) + Ok(WatchStrategy::Hybrid) +} + +async fn start_notify_watcher( + config: Config, + db: Database, + file_service: FileService, + queue_service: OcrQueueService, +) -> Result<()> { let (tx, mut rx) = mpsc::channel(100); let mut watcher = RecommendedWatcher::new( @@ -20,12 +106,7 @@ pub async fn start_folder_watcher(config: Config) -> Result<()> { watcher.watch(Path::new(&config.watch_folder), RecursiveMode::Recursive)?; - info!("Starting folder watcher on: {}", config.watch_folder); - - let db = Database::new(&config.database_url).await?; - let pool = sqlx::PgPool::connect(&config.database_url).await?; - let file_service = FileService::new(config.upload_path.clone()); - let queue_service = OcrQueueService::new(db.clone(), pool, 1); // Single job for enqueuing + info!("Started notify-based watcher on: {}", config.watch_folder); while let Some(res) = rx.recv().await { match res { @@ -43,6 +124,93 @@ pub async fn start_folder_watcher(config: Config) -> Result<()> { Ok(()) } +async fn start_polling_watcher( + config: Config, + db: Database, + file_service: FileService, + queue_service: OcrQueueService, +) -> Result<()> { + info!("Started polling-based watcher on: {}", config.watch_folder); + + let mut known_files: HashSet<(PathBuf, SystemTime)> = HashSet::new(); + let mut interval = interval(Duration::from_secs(config.watch_interval_seconds.unwrap_or(30))); + + // Initial scan + scan_directory(&config.watch_folder, &mut known_files, &db, &file_service, &queue_service, &config).await?; + + loop { + interval.tick().await; + + if let Err(e) = scan_directory(&config.watch_folder, &mut known_files, &db, &file_service, &queue_service, &config).await { + error!("Error during directory scan: {}", e); + // Continue polling even if one scan fails + } + } +} + +async fn scan_directory( + watch_folder: &str, + known_files: &mut HashSet<(PathBuf, SystemTime)>, + db: &Database, + file_service: &FileService, + queue_service: &OcrQueueService, + config: &Config, +) -> Result<()> { + let mut current_files: HashSet<(PathBuf, SystemTime)> = HashSet::new(); + + // Walk directory and collect all files with their modification times + for entry in WalkDir::new(watch_folder) + .follow_links(true) + .into_iter() + .filter_map(|e| e.ok()) + { + if entry.file_type().is_file() { + let path = entry.path().to_path_buf(); + + if let Ok(metadata) = entry.metadata() { + if let Ok(modified) = metadata.modified() { + let file_info = (path.clone(), modified); + current_files.insert(file_info.clone()); + + // Check if this is a new file or modified file + if !known_files.contains(&file_info) { + // Wait a bit to ensure file is fully written + if is_file_stable(&path).await { + debug!("Found new/modified file: {:?}", path); + if let Err(e) = process_file(&path, db, file_service, queue_service, config).await { + error!("Failed to process file {:?}: {}", path, e); + } + } + } + } + } + } + } + + // Update known files + *known_files = current_files; + + Ok(()) +} + +async fn is_file_stable(path: &Path) -> bool { + // Check if file size is stable (not currently being written) + if let Ok(metadata1) = tokio::fs::metadata(path).await { + let size1 = metadata1.len(); + + // Wait a short time + sleep(Duration::from_millis(500)).await; + + if let Ok(metadata2) = tokio::fs::metadata(path).await { + let size2 = metadata2.len(); + return size1 == size2; + } + } + + // If we can't read metadata, assume it's not stable + false +} + async fn process_file( path: &std::path::Path, db: &Database, @@ -60,27 +228,81 @@ async fn process_file( .unwrap_or("") .to_string(); - if !file_service.is_allowed_file_type(&filename, &config.allowed_file_types) { + // Skip hidden files, temporary files, and system files + if filename.starts_with('.') || + filename.starts_with('~') || + filename.ends_with(".tmp") || + filename.ends_with(".temp") || + filename.contains("$RECYCLE.BIN") || + filename.contains("System Volume Information") { + debug!("Skipping system/temporary file: {}", filename); return Ok(()); } + if !file_service.is_allowed_file_type(&filename, &config.allowed_file_types) { + debug!("Skipping file with disallowed type: {}", filename); + return Ok(()); + } + + // Check file age if configured + if let Some(max_age_hours) = config.max_file_age_hours { + if let Ok(metadata) = tokio::fs::metadata(path).await { + if let Ok(created) = metadata.created() { + let age = SystemTime::now().duration_since(created).unwrap_or_default(); + if age.as_secs() > max_age_hours * 3600 { + debug!("Skipping old file: {} (age: {}h)", filename, age.as_secs() / 3600); + return Ok(()); + } + } + } + } + info!("Processing new file: {:?}", path); let file_data = tokio::fs::read(path).await?; let file_size = file_data.len() as i64; + // Skip very large files (> 500MB by default) + const MAX_FILE_SIZE: i64 = 500 * 1024 * 1024; + if file_size > MAX_FILE_SIZE { + warn!("Skipping large file: {} ({} MB)", filename, file_size / 1024 / 1024); + return Ok(()); + } + + // Skip empty files + if file_size == 0 { + debug!("Skipping empty file: {}", filename); + return Ok(()); + } + let mime_type = mime_guess::from_path(&filename) .first_or_octet_stream() .to_string(); - let file_path = file_service.save_file(&filename, &file_data).await?; + // Check if file is OCR-able + if !is_ocr_able_file(&mime_type) { + debug!("Skipping non-OCR-able file: {} ({})", filename, mime_type); + return Ok(()); + } + + // Check for duplicate files (same filename and size) + if let Ok(existing_docs) = db.find_documents_by_filename(&filename).await { + for doc in existing_docs { + if doc.file_size == file_size { + info!("Skipping duplicate file: {} (already exists with same size)", filename); + return Ok(()); + } + } + } + + let saved_file_path = file_service.save_file(&filename, &file_data).await?; let system_user_id = uuid::Uuid::parse_str("00000000-0000-0000-0000-000000000000")?; let document = file_service.create_document( &filename, &filename, - &file_path, + &saved_file_path, file_size, &mime_type, system_user_id, @@ -88,23 +310,47 @@ async fn process_file( let created_doc = db.create_document(document).await?; - // Enqueue for OCR processing with priority based on file size - let priority = calculate_priority(file_size); + // Enqueue for OCR processing with priority based on file size and type + let priority = calculate_priority(file_size, &mime_type); queue_service.enqueue_document(created_doc.id, priority, file_size).await?; - info!("Successfully queued file for OCR: {}", filename); + info!("Successfully queued file for OCR: {} (size: {} bytes)", filename, file_size); Ok(()) } -/// Calculate priority based on file size (smaller files get higher priority) -fn calculate_priority(file_size: i64) -> i32 { +fn is_ocr_able_file(mime_type: &str) -> bool { + matches!(mime_type, + "application/pdf" | + "text/plain" | + "image/png" | "image/jpeg" | "image/jpg" | "image/tiff" | "image/bmp" | "image/gif" | + "application/msword" | "application/vnd.openxmlformats-officedocument.wordprocessingml.document" + ) +} + +/// Calculate priority based on file size and type (smaller files and images get higher priority) +fn calculate_priority(file_size: i64, mime_type: &str) -> i32 { const MB: i64 = 1024 * 1024; - match file_size { + const MB5: i64 = 5 * 1024 * 1024; + const MB10: i64 = 10 * 1024 * 1024; + const MB50: i64 = 50 * 1024 * 1024; + + let base_priority = match file_size { 0..=MB => 10, // <= 1MB: highest priority - ..=5 * MB => 8, // 1-5MB: high priority - ..=10 * MB => 6, // 5-10MB: medium priority - ..=50 * MB => 4, // 10-50MB: low priority + ..=MB5 => 8, // 1-5MB: high priority + ..=MB10 => 6, // 5-10MB: medium priority + ..=MB50 => 4, // 10-50MB: low priority _ => 2, // > 50MB: lowest priority - } + }; + + // Boost priority for images (usually faster to OCR) + let type_boost = if mime_type.starts_with("image/") { + 2 + } else if mime_type == "text/plain" { + 1 + } else { + 0 + }; + + (base_priority + type_boost).min(10) } \ No newline at end of file diff --git a/test_watch_folder.sh b/test_watch_folder.sh new file mode 100755 index 0000000..880c1ec --- /dev/null +++ b/test_watch_folder.sh @@ -0,0 +1,37 @@ +#!/bin/bash + +# Test script for watch folder functionality +echo "Testing watch folder functionality..." + +# Create a test watch folder if it doesn't exist +mkdir -p ./watch + +echo "Creating test files in watch folder..." + +# Create a test text file +echo "This is a test document for OCR processing." > ./watch/test_document.txt + +# Create a test PDF file (mock content) +echo "%PDF-1.4 Mock PDF for testing" > ./watch/test_document.pdf + +# Create a test image file (mock content) +echo "Mock PNG image content" > ./watch/test_image.png + +echo "Test files created in ./watch/ folder:" +ls -la ./watch/ + +echo "" +echo "Watch folder setup complete!" +echo "You can now:" +echo "1. Start the readur application" +echo "2. Copy OCR-able files to the ./watch/ folder" +echo "3. Monitor the logs to see files being processed" +echo "" +echo "Supported file types: PDF, PNG, JPG, JPEG, TIFF, BMP, TXT, DOC, DOCX" +echo "" +echo "Environment variables for configuration:" +echo "- WATCH_FOLDER: Path to watch folder (default: ./watch)" +echo "- WATCH_INTERVAL_SECONDS: Polling interval (default: 30)" +echo "- FILE_STABILITY_CHECK_MS: File stability check time (default: 500)" +echo "- MAX_FILE_AGE_HOURS: Skip files older than this (default: none)" +echo "- FORCE_POLLING_WATCH: Force polling mode (default: auto-detect)" \ No newline at end of file