feat(server): improve queue system

This commit is contained in:
perf3ct 2025-06-12 23:15:12 +00:00
parent 90599eed74
commit eb70964254
11 changed files with 800 additions and 84 deletions

View File

@ -2,5 +2,15 @@ DATABASE_URL=postgresql://readur:readur_password@localhost:5432/readur
JWT_SECRET=your-super-secret-jwt-key-change-this-in-production
SERVER_ADDRESS=0.0.0.0:8000
UPLOAD_PATH=./uploads
# Watch folder configuration
WATCH_FOLDER=./watch
WATCH_INTERVAL_SECONDS=30
FILE_STABILITY_CHECK_MS=500
MAX_FILE_AGE_HOURS=168
# File type restrictions
ALLOWED_FILE_TYPES=pdf,txt,doc,docx,png,jpg,jpeg,tiff,bmp
# Force polling mode for testing network filesystems (optional)
# FORCE_POLLING_WATCH=1

View File

@ -22,7 +22,7 @@ RUN apt-get update && apt-get install -y \
&& rm -rf /var/lib/apt/lists/*
WORKDIR /app
COPY Cargo.toml ./
COPY Cargo.toml Cargo.lock ./
COPY src ./src
RUN cargo build --release
@ -44,6 +44,9 @@ COPY --from=backend-builder /app/target/release/readur /app/readur
# Create necessary directories
RUN mkdir -p /app/uploads /app/watch /app/frontend
# Set permissions for watch folder to handle various mount scenarios
RUN chmod 755 /app/watch
# Copy built frontend from frontend-builder
COPY --from=frontend-builder /frontend/dist /app/frontend

226
WATCH_FOLDER.md Normal file
View File

@ -0,0 +1,226 @@
# Watch Folder Documentation
The watch folder feature automatically monitors a directory for new OCR-able files and processes them without deleting the original files. This is perfect for scenarios where files are mounted from various filesystem types including NFS, SMB, S3, and local storage.
## Features
### 🔄 Cross-Filesystem Compatibility
- **Automatic Detection**: Detects filesystem type and chooses optimal watching strategy
- **Local Filesystems**: Uses efficient inotify-based watching for ext4, NTFS, APFS, etc.
- **Network Filesystems**: Uses polling-based watching for NFS, SMB/CIFS, S3 mounts
- **Hybrid Fallback**: Gracefully falls back to polling if inotify fails
### 📁 Smart File Processing
- **OCR-able File Detection**: Only processes supported file types (PDF, images, text, Word docs)
- **Duplicate Prevention**: Checks for existing files with same name and size
- **File Stability**: Waits for files to finish being written before processing
- **System File Exclusion**: Skips hidden files, temporary files, and system directories
### ⚙️ Configuration Options
| Environment Variable | Default | Description |
|---------------------|---------|-------------|
| `WATCH_FOLDER` | `./watch` | Path to the folder to monitor |
| `WATCH_INTERVAL_SECONDS` | `30` | Polling interval for network filesystems |
| `FILE_STABILITY_CHECK_MS` | `500` | Time to wait for file stability |
| `MAX_FILE_AGE_HOURS` | `none` | Skip files older than specified hours |
| `ALLOWED_FILE_TYPES` | `pdf,png,jpg,jpeg,tiff,bmp,txt,doc,docx` | Allowed file extensions |
| `FORCE_POLLING_WATCH` | `unset` | Force polling mode even for local filesystems |
## Usage
### Basic Setup
1. **Set the watch folder path:**
```bash
export WATCH_FOLDER=/path/to/your/mounted/folder
```
2. **Start the application:**
```bash
./readur
```
3. **Copy files to the watch folder:**
The application will automatically detect and process new files.
### Docker Usage
```dockerfile
# Mount your folder to the container's watch directory
docker run -d \
-v /path/to/your/files:/app/watch \
-e WATCH_FOLDER=/app/watch \
-e WATCH_INTERVAL_SECONDS=60 \
readur:latest
```
### Docker Compose
```yaml
services:
readur:
image: readur:latest
volumes:
- /mnt/nfs/documents:/app/watch
- readur_uploads:/app/uploads
environment:
WATCH_FOLDER: /app/watch
WATCH_INTERVAL_SECONDS: 30
FILE_STABILITY_CHECK_MS: 1000
MAX_FILE_AGE_HOURS: 168 # 1 week
ports:
- "8000:8000"
```
## Filesystem-Specific Configuration
### NFS Mounts
```bash
# Recommended settings for NFS
export WATCH_INTERVAL_SECONDS=60
export FILE_STABILITY_CHECK_MS=1000
export FORCE_POLLING_WATCH=1
```
### SMB/CIFS Mounts
```bash
# Recommended settings for SMB
export WATCH_INTERVAL_SECONDS=30
export FILE_STABILITY_CHECK_MS=2000
```
### S3 Mounts (s3fs, goofys, etc.)
```bash
# Recommended settings for S3
export WATCH_INTERVAL_SECONDS=120
export FILE_STABILITY_CHECK_MS=5000
export FORCE_POLLING_WATCH=1
```
### Local Filesystems
```bash
# Optimal settings for local storage (default behavior)
# No special configuration needed - uses inotify automatically
```
## Supported File Types
The watch folder processes these file types for OCR:
- **PDF**: `*.pdf`
- **Images**: `*.png`, `*.jpg`, `*.jpeg`, `*.tiff`, `*.bmp`, `*.gif`
- **Text**: `*.txt`
- **Word Documents**: `*.doc`, `*.docx`
## File Processing Priority
Files are prioritized for OCR processing based on:
1. **File Size**: Smaller files get higher priority
2. **File Type**: Images > Text files > PDFs > Word documents
3. **Queue Time**: Older items get higher priority within the same size/type category
## Monitoring and Logs
The application provides detailed logging for watch folder operations:
```
INFO readur::watcher: Starting hybrid folder watcher on: /app/watch
INFO readur::watcher: Using watch strategy: Hybrid
INFO readur::watcher: Started polling-based watcher on: /app/watch
INFO readur::watcher: Processing new file: "/app/watch/document.pdf"
INFO readur::watcher: Successfully queued file for OCR: document.pdf (size: 2048 bytes)
```
## Troubleshooting
### Files Not Being Detected
1. **Check permissions:**
```bash
ls -la /path/to/watch/folder
chmod 755 /path/to/watch/folder
```
2. **Verify file types:**
```bash
# Only supported file types are processed
echo $ALLOWED_FILE_TYPES
```
3. **Check file stability:**
```bash
# Increase stability check time for slow networks
export FILE_STABILITY_CHECK_MS=2000
```
### High CPU Usage
1. **Increase polling interval:**
```bash
export WATCH_INTERVAL_SECONDS=120
```
2. **Limit file age:**
```bash
export MAX_FILE_AGE_HOURS=24
```
### Network Mount Issues
1. **Force polling mode:**
```bash
export FORCE_POLLING_WATCH=1
```
2. **Increase stability check:**
```bash
export FILE_STABILITY_CHECK_MS=5000
```
## Testing
Use the provided test script to verify functionality:
```bash
./test_watch_folder.sh
```
This creates sample files in the watch folder for testing.
## Security Considerations
- Files are copied to a secure upload directory, not processed in-place
- Original files in the watch folder are never modified or deleted
- System files and hidden files are automatically excluded
- File size limits prevent processing of excessively large files (>500MB)
## Performance
- **Local filesystems**: Near-instant detection via inotify
- **Network filesystems**: Detection within polling interval (default 30s)
- **Concurrent processing**: Multiple files processed simultaneously
- **Memory efficient**: Streams large files without loading entirely into memory
## Examples
### Basic File Drop
```bash
# Copy a file to the watch folder
cp document.pdf /app/watch/
# File will be automatically detected and processed
```
### Batch Processing
```bash
# Copy multiple files
cp *.pdf /app/watch/
# All supported files will be queued for processing
```
### Real-time Monitoring
```bash
# Watch the logs for processing updates
docker logs -f readur-container | grep watcher
```

View File

@ -1,5 +1,6 @@
use anyhow::Result;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tokio::fs;
use tokio::sync::Semaphore;
use tracing::{error, info, warn};
@ -66,18 +67,19 @@ impl BatchIngester {
info!("Found {} files to ingest", file_paths.len());
// Process files in batches
let semaphore = Semaphore::new(self.max_concurrent_io);
let semaphore = Arc::new(Semaphore::new(self.max_concurrent_io));
let mut batch = Vec::new();
let mut queue_items = Vec::new();
for (idx, path) in file_paths.iter().enumerate() {
let permit = semaphore.acquire().await?;
let semaphore_clone = semaphore.clone();
let path_clone = path.clone();
let file_service = self.file_service.clone();
let user_id_clone = user_id;
// Process file asynchronously
let handle = tokio::spawn(async move {
let permit = semaphore_clone.acquire().await.unwrap();
let _permit = permit;
process_single_file(path_clone, file_service, user_id_clone).await
});
@ -210,11 +212,15 @@ async fn process_single_file(
fn calculate_priority(file_size: i64) -> i32 {
const MB: i64 = 1024 * 1024;
const MB5: i64 = 5 * 1024 * 1024;
const MB10: i64 = 10 * 1024 * 1024;
const MB50: i64 = 50 * 1024 * 1024;
match file_size {
0..=MB => 10, // <= 1MB: highest priority
..=5 * MB => 8, // 1-5MB: high priority
..=10 * MB => 6, // 5-10MB: medium priority
..=50 * MB => 4, // 10-50MB: low priority
..=MB5 => 8, // 1-5MB: high priority
..=MB10 => 6, // 5-10MB: medium priority
..=MB50 => 4, // 10-50MB: low priority
_ => 2, // > 50MB: lowest priority
}
}

View File

@ -9,6 +9,9 @@ pub struct Config {
pub upload_path: String,
pub watch_folder: String,
pub allowed_file_types: Vec<String>,
pub watch_interval_seconds: Option<u64>,
pub file_stability_check_ms: Option<u64>,
pub max_file_age_hours: Option<u64>,
}
impl Config {
@ -31,6 +34,15 @@ impl Config {
.split(',')
.map(|s| s.trim().to_lowercase())
.collect(),
watch_interval_seconds: env::var("WATCH_INTERVAL_SECONDS")
.ok()
.and_then(|s| s.parse().ok()),
file_stability_check_ms: env::var("FILE_STABILITY_CHECK_MS")
.ok()
.and_then(|s| s.parse().ok()),
max_file_age_hours: env::var("MAX_FILE_AGE_HOURS")
.ok()
.and_then(|s| s.parse().ok()),
})
}
}

146
src/db.rs
View File

@ -115,9 +115,115 @@ impl Database {
.execute(&self.pool)
.await?;
// Run OCR queue migration
let migration_sql = include_str!("../migrations/001_add_ocr_queue.sql");
sqlx::query(migration_sql)
// Run OCR queue migration - execute each statement separately
self.run_ocr_queue_migration().await?;
Ok(())
}
async fn run_ocr_queue_migration(&self) -> Result<()> {
// Create OCR queue table
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS ocr_queue (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
document_id UUID REFERENCES documents(id) ON DELETE CASCADE,
status VARCHAR(20) DEFAULT 'pending',
priority INT DEFAULT 5,
attempts INT DEFAULT 0,
max_attempts INT DEFAULT 3,
created_at TIMESTAMPTZ DEFAULT NOW(),
started_at TIMESTAMPTZ,
completed_at TIMESTAMPTZ,
error_message TEXT,
worker_id VARCHAR(100),
processing_time_ms INT,
file_size BIGINT,
CONSTRAINT check_status CHECK (status IN ('pending', 'processing', 'completed', 'failed', 'cancelled'))
)
"#
)
.execute(&self.pool)
.await?;
// Create indexes
sqlx::query("CREATE INDEX IF NOT EXISTS idx_ocr_queue_status ON ocr_queue(status, priority DESC, created_at)")
.execute(&self.pool)
.await?;
sqlx::query("CREATE INDEX IF NOT EXISTS idx_ocr_queue_document_id ON ocr_queue(document_id)")
.execute(&self.pool)
.await?;
sqlx::query("CREATE INDEX IF NOT EXISTS idx_ocr_queue_worker ON ocr_queue(worker_id) WHERE status = 'processing'")
.execute(&self.pool)
.await?;
sqlx::query("CREATE INDEX IF NOT EXISTS idx_ocr_queue_created_at ON ocr_queue(created_at) WHERE status = 'pending'")
.execute(&self.pool)
.await?;
// Add columns to documents table
sqlx::query("ALTER TABLE documents ADD COLUMN IF NOT EXISTS ocr_status VARCHAR(20) DEFAULT 'pending'")
.execute(&self.pool)
.await?;
sqlx::query("ALTER TABLE documents ADD COLUMN IF NOT EXISTS ocr_error TEXT")
.execute(&self.pool)
.await?;
sqlx::query("ALTER TABLE documents ADD COLUMN IF NOT EXISTS ocr_completed_at TIMESTAMPTZ")
.execute(&self.pool)
.await?;
// Create metrics table
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS ocr_metrics (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
date DATE DEFAULT CURRENT_DATE,
hour INT DEFAULT EXTRACT(HOUR FROM NOW()),
total_processed INT DEFAULT 0,
total_failed INT DEFAULT 0,
total_retried INT DEFAULT 0,
avg_processing_time_ms INT,
max_processing_time_ms INT,
min_processing_time_ms INT,
queue_depth INT,
active_workers INT,
UNIQUE(date, hour)
)
"#
)
.execute(&self.pool)
.await?;
// Create the statistics function
sqlx::query(
r#"
CREATE OR REPLACE FUNCTION get_ocr_queue_stats()
RETURNS TABLE (
pending_count BIGINT,
processing_count BIGINT,
failed_count BIGINT,
completed_today BIGINT,
avg_wait_time_minutes DOUBLE PRECISION,
oldest_pending_minutes DOUBLE PRECISION
) AS $$
BEGIN
RETURN QUERY
SELECT
COUNT(*) FILTER (WHERE status = 'pending') as pending_count,
COUNT(*) FILTER (WHERE status = 'processing') as processing_count,
COUNT(*) FILTER (WHERE status = 'failed' AND attempts >= max_attempts) as failed_count,
COUNT(*) FILTER (WHERE status = 'completed' AND completed_at >= CURRENT_DATE) as completed_today,
AVG(EXTRACT(EPOCH FROM (COALESCE(started_at, NOW()) - created_at))/60) FILTER (WHERE status IN ('processing', 'completed')) as avg_wait_time_minutes,
MAX(EXTRACT(EPOCH FROM (NOW() - created_at))/60) FILTER (WHERE status = 'pending') as oldest_pending_minutes
FROM ocr_queue;
END;
$$ LANGUAGE plpgsql
"#
)
.execute(&self.pool)
.await?;
@ -271,6 +377,40 @@ impl Database {
Ok(documents)
}
pub async fn find_documents_by_filename(&self, filename: &str) -> Result<Vec<Document>> {
let rows = sqlx::query(
r#"
SELECT id, filename, original_filename, file_path, file_size, mime_type, content, ocr_text, tags, created_at, updated_at, user_id
FROM documents
WHERE filename = $1 OR original_filename = $1
ORDER BY created_at DESC
"#
)
.bind(filename)
.fetch_all(&self.pool)
.await?;
let documents = rows
.into_iter()
.map(|row| Document {
id: row.get("id"),
filename: row.get("filename"),
original_filename: row.get("original_filename"),
file_path: row.get("file_path"),
file_size: row.get("file_size"),
mime_type: row.get("mime_type"),
content: row.get("content"),
ocr_text: row.get("ocr_text"),
tags: row.get("tags"),
created_at: row.get("created_at"),
updated_at: row.get("updated_at"),
user_id: row.get("user_id"),
})
.collect();
Ok(documents)
}
pub async fn search_documents(&self, user_id: Uuid, search: SearchRequest) -> Result<(Vec<Document>, i64)> {
let mut query_builder = sqlx::QueryBuilder::new(
r#"

View File

@ -6,6 +6,7 @@ use uuid::Uuid;
use crate::models::Document;
#[derive(Clone)]
pub struct FileService {
upload_path: String,
}

View File

@ -9,3 +9,12 @@ pub mod ocr_queue;
pub mod routes;
pub mod seed;
pub mod watcher;
use config::Config;
use db::Database;
#[derive(Clone)]
pub struct AppState {
pub db: Database,
pub config: Config,
}

View File

@ -1,7 +1,7 @@
use anyhow::Result;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use sqlx::{FromRow, PgPool};
use sqlx::{FromRow, PgPool, Row};
use std::sync::Arc;
use tokio::sync::Semaphore;
use tokio::time::{sleep, Duration};
@ -37,6 +37,7 @@ pub struct QueueStats {
pub oldest_pending_minutes: Option<f64>,
}
#[derive(Clone)]
pub struct OcrQueueService {
db: Database,
pool: PgPool,
@ -57,19 +58,21 @@ impl OcrQueueService {
/// Add a document to the OCR queue
pub async fn enqueue_document(&self, document_id: Uuid, priority: i32, file_size: i64) -> Result<Uuid> {
let id = sqlx::query_scalar!(
let row = sqlx::query(
r#"
INSERT INTO ocr_queue (document_id, priority, file_size)
VALUES ($1, $2, $3)
RETURNING id
"#,
document_id,
priority,
file_size
"#
)
.bind(document_id)
.bind(priority)
.bind(file_size)
.fetch_one(&self.pool)
.await?;
let id: Uuid = row.get("id");
info!("Enqueued document {} with priority {} for OCR processing", document_id, priority);
Ok(id)
}
@ -82,19 +85,20 @@ impl OcrQueueService {
let mut tx = self.pool.begin().await?;
for (document_id, priority, file_size) in documents {
let id = sqlx::query_scalar!(
let row = sqlx::query(
r#"
INSERT INTO ocr_queue (document_id, priority, file_size)
VALUES ($1, $2, $3)
RETURNING id
"#,
document_id,
priority,
file_size
"#
)
.bind(document_id)
.bind(priority)
.bind(file_size)
.fetch_one(&mut *tx)
.await?;
let id: Uuid = row.get("id");
ids.push(id);
}
@ -106,8 +110,7 @@ impl OcrQueueService {
/// Get the next item from the queue
async fn dequeue(&self) -> Result<Option<OcrQueueItem>> {
let item = sqlx::query_as!(
OcrQueueItem,
let row = sqlx::query(
r#"
UPDATE ocr_queue
SET status = 'processing',
@ -124,28 +127,47 @@ impl OcrQueueService {
LIMIT 1
)
RETURNING *
"#,
&self.worker_id
"#
)
.bind(&self.worker_id)
.fetch_optional(&self.pool)
.await?;
let item = match row {
Some(row) => Some(OcrQueueItem {
id: row.get("id"),
document_id: row.get("document_id"),
status: row.get("status"),
priority: row.get("priority"),
attempts: row.get("attempts"),
max_attempts: row.get("max_attempts"),
created_at: row.get("created_at"),
started_at: row.get("started_at"),
completed_at: row.get("completed_at"),
error_message: row.get("error_message"),
worker_id: row.get("worker_id"),
processing_time_ms: row.get("processing_time_ms"),
file_size: row.get("file_size"),
}),
None => None,
};
Ok(item)
}
/// Mark an item as completed
async fn mark_completed(&self, item_id: Uuid, processing_time_ms: i32) -> Result<()> {
sqlx::query!(
sqlx::query(
r#"
UPDATE ocr_queue
SET status = 'completed',
completed_at = NOW(),
processing_time_ms = $2
WHERE id = $1
"#,
item_id,
processing_time_ms
"#
)
.bind(item_id)
.bind(processing_time_ms)
.execute(&self.pool)
.await?;
@ -154,7 +176,7 @@ impl OcrQueueService {
/// Mark an item as failed
async fn mark_failed(&self, item_id: Uuid, error: &str) -> Result<()> {
let result = sqlx::query!(
let result = sqlx::query(
r#"
UPDATE ocr_queue
SET status = CASE
@ -166,14 +188,15 @@ impl OcrQueueService {
worker_id = NULL
WHERE id = $1
RETURNING status
"#,
item_id,
error
"#
)
.bind(item_id)
.bind(error)
.fetch_one(&self.pool)
.await?;
if result.status == Some("failed".to_string()) {
let status: Option<String> = result.get("status");
if status == Some("failed".to_string()) {
error!("OCR job {} permanently failed after max attempts: {}", item_id, error);
}
@ -187,21 +210,24 @@ impl OcrQueueService {
info!("Processing OCR job {} for document {}", item.id, item.document_id);
// Get document details
let document = sqlx::query!(
let document = sqlx::query(
r#"
SELECT file_path, mime_type, user_id
FROM documents
WHERE id = $1
"#,
item.document_id
"#
)
.bind(item.document_id)
.fetch_optional(&self.pool)
.await?;
match document {
Some(doc) => {
Some(row) => {
let file_path: String = row.get("file_path");
let mime_type: String = row.get("mime_type");
let user_id: Option<Uuid> = row.get("user_id");
// Get user's OCR settings
let settings = if let Some(user_id) = doc.user_id {
let settings = if let Some(user_id) = user_id {
self.db.get_user_settings(user_id).await.ok().flatten()
} else {
None
@ -213,11 +239,11 @@ impl OcrQueueService {
.unwrap_or_else(|| "eng".to_string());
// Perform OCR
match ocr_service.extract_text_with_lang(&doc.file_path, &doc.mime_type, &ocr_language).await {
match ocr_service.extract_text_with_lang(&file_path, &mime_type, &ocr_language).await {
Ok(text) => {
if !text.is_empty() {
// Update document with OCR text
sqlx::query!(
sqlx::query(
r#"
UPDATE documents
SET ocr_text = $2,
@ -225,10 +251,10 @@ impl OcrQueueService {
ocr_completed_at = NOW(),
updated_at = NOW()
WHERE id = $1
"#,
item.document_id,
text
"#
)
.bind(item.document_id)
.bind(text)
.execute(&self.pool)
.await?;
}
@ -246,17 +272,17 @@ impl OcrQueueService {
warn!("{}", error_msg);
// Update document status
sqlx::query!(
sqlx::query(
r#"
UPDATE documents
SET ocr_status = 'failed',
ocr_error = $2,
updated_at = NOW()
WHERE id = $1
"#,
item.document_id,
&error_msg
"#
)
.bind(item.document_id)
.bind(&error_msg)
.execute(&self.pool)
.await?;
@ -313,7 +339,7 @@ impl OcrQueueService {
/// Get queue statistics
pub async fn get_stats(&self) -> Result<QueueStats> {
let stats = sqlx::query!(
let stats = sqlx::query(
r#"
SELECT * FROM get_ocr_queue_stats()
"#
@ -322,18 +348,18 @@ impl OcrQueueService {
.await?;
Ok(QueueStats {
pending_count: stats.pending_count.unwrap_or(0),
processing_count: stats.processing_count.unwrap_or(0),
failed_count: stats.failed_count.unwrap_or(0),
completed_today: stats.completed_today.unwrap_or(0),
avg_wait_time_minutes: stats.avg_wait_time_minutes,
oldest_pending_minutes: stats.oldest_pending_minutes,
pending_count: stats.get::<Option<i64>, _>("pending_count").unwrap_or(0),
processing_count: stats.get::<Option<i64>, _>("processing_count").unwrap_or(0),
failed_count: stats.get::<Option<i64>, _>("failed_count").unwrap_or(0),
completed_today: stats.get::<Option<i64>, _>("completed_today").unwrap_or(0),
avg_wait_time_minutes: stats.get("avg_wait_time_minutes"),
oldest_pending_minutes: stats.get("oldest_pending_minutes"),
})
}
/// Requeue failed items
pub async fn requeue_failed_items(&self) -> Result<i64> {
let result = sqlx::query!(
let result = sqlx::query(
r#"
UPDATE ocr_queue
SET status = 'pending',
@ -353,14 +379,14 @@ impl OcrQueueService {
/// Clean up old completed items
pub async fn cleanup_completed(&self, days_to_keep: i32) -> Result<i64> {
let result = sqlx::query!(
let result = sqlx::query(
r#"
DELETE FROM ocr_queue
WHERE status = 'completed'
AND completed_at < NOW() - INTERVAL '1 day' * $1
"#,
days_to_keep
"#
)
.bind(days_to_keep)
.execute(&self.pool)
.await?;
@ -369,7 +395,7 @@ impl OcrQueueService {
/// Handle stale processing items (worker crashed)
pub async fn recover_stale_items(&self, stale_minutes: i32) -> Result<i64> {
let result = sqlx::query!(
let result = sqlx::query(
r#"
UPDATE ocr_queue
SET status = 'pending',
@ -377,9 +403,9 @@ impl OcrQueueService {
worker_id = NULL
WHERE status = 'processing'
AND started_at < NOW() - INTERVAL '1 minute' * $1
"#,
stale_minutes
"#
)
.bind(stale_minutes)
.execute(&self.pool)
.await?;

View File

@ -1,12 +1,98 @@
use anyhow::Result;
use notify::{RecommendedWatcher, RecursiveMode, Watcher};
use std::path::Path;
use std::collections::HashSet;
use std::path::{Path, PathBuf};
use std::time::{Duration, SystemTime};
use tokio::sync::mpsc;
use tracing::{error, info};
use tokio::time::{interval, sleep};
use tracing::{debug, error, info, warn};
use walkdir::WalkDir;
use crate::{config::Config, db::Database, file_service::FileService, ocr_queue::OcrQueueService};
pub async fn start_folder_watcher(config: Config) -> Result<()> {
info!("Starting hybrid folder watcher on: {}", config.watch_folder);
// Initialize services
let db = Database::new(&config.database_url).await?;
let pool = sqlx::PgPool::connect(&config.database_url).await?;
let file_service = FileService::new(config.upload_path.clone());
let queue_service = OcrQueueService::new(db.clone(), pool, 1);
// Determine watch strategy based on filesystem type
let watch_path = Path::new(&config.watch_folder);
let watch_strategy = determine_watch_strategy(watch_path).await?;
info!("Using watch strategy: {:?}", watch_strategy);
match watch_strategy {
WatchStrategy::NotifyBased => {
start_notify_watcher(config, db, file_service, queue_service).await
}
WatchStrategy::PollingBased => {
start_polling_watcher(config, db, file_service, queue_service).await
}
WatchStrategy::Hybrid => {
// Start both methods concurrently
let config_clone = config.clone();
let db_clone = db.clone();
let file_service_clone = file_service.clone();
let queue_service_clone = queue_service.clone();
let notify_handle = tokio::spawn(async move {
if let Err(e) = start_notify_watcher(config_clone, db_clone, file_service_clone, queue_service_clone).await {
warn!("Notify watcher failed, continuing with polling: {}", e);
}
});
let polling_result = start_polling_watcher(config, db, file_service, queue_service).await;
// Cancel notify watcher if polling completes
notify_handle.abort();
polling_result
}
}
}
#[derive(Debug, Clone)]
enum WatchStrategy {
NotifyBased, // For local filesystems
PollingBased, // For network filesystems (NFS, SMB, S3, etc.)
Hybrid, // Try notify first, fall back to polling
}
async fn determine_watch_strategy(path: &Path) -> Result<WatchStrategy> {
// Try to determine filesystem type
let canonical_path = match path.canonicalize() {
Ok(p) => p,
Err(_) => {
// If canonicalize fails, assume network filesystem
return Ok(WatchStrategy::PollingBased);
}
};
let path_str = canonical_path.to_string_lossy();
// Check for common network filesystem patterns
if path_str.starts_with("//") ||
path_str.contains("nfs") ||
path_str.contains("smb") ||
path_str.contains("cifs") ||
std::env::var("FORCE_POLLING_WATCH").is_ok() {
return Ok(WatchStrategy::PollingBased);
}
// For local filesystems, use hybrid approach (notify with polling backup)
Ok(WatchStrategy::Hybrid)
}
async fn start_notify_watcher(
config: Config,
db: Database,
file_service: FileService,
queue_service: OcrQueueService,
) -> Result<()> {
let (tx, mut rx) = mpsc::channel(100);
let mut watcher = RecommendedWatcher::new(
@ -20,12 +106,7 @@ pub async fn start_folder_watcher(config: Config) -> Result<()> {
watcher.watch(Path::new(&config.watch_folder), RecursiveMode::Recursive)?;
info!("Starting folder watcher on: {}", config.watch_folder);
let db = Database::new(&config.database_url).await?;
let pool = sqlx::PgPool::connect(&config.database_url).await?;
let file_service = FileService::new(config.upload_path.clone());
let queue_service = OcrQueueService::new(db.clone(), pool, 1); // Single job for enqueuing
info!("Started notify-based watcher on: {}", config.watch_folder);
while let Some(res) = rx.recv().await {
match res {
@ -43,6 +124,93 @@ pub async fn start_folder_watcher(config: Config) -> Result<()> {
Ok(())
}
async fn start_polling_watcher(
config: Config,
db: Database,
file_service: FileService,
queue_service: OcrQueueService,
) -> Result<()> {
info!("Started polling-based watcher on: {}", config.watch_folder);
let mut known_files: HashSet<(PathBuf, SystemTime)> = HashSet::new();
let mut interval = interval(Duration::from_secs(config.watch_interval_seconds.unwrap_or(30)));
// Initial scan
scan_directory(&config.watch_folder, &mut known_files, &db, &file_service, &queue_service, &config).await?;
loop {
interval.tick().await;
if let Err(e) = scan_directory(&config.watch_folder, &mut known_files, &db, &file_service, &queue_service, &config).await {
error!("Error during directory scan: {}", e);
// Continue polling even if one scan fails
}
}
}
async fn scan_directory(
watch_folder: &str,
known_files: &mut HashSet<(PathBuf, SystemTime)>,
db: &Database,
file_service: &FileService,
queue_service: &OcrQueueService,
config: &Config,
) -> Result<()> {
let mut current_files: HashSet<(PathBuf, SystemTime)> = HashSet::new();
// Walk directory and collect all files with their modification times
for entry in WalkDir::new(watch_folder)
.follow_links(true)
.into_iter()
.filter_map(|e| e.ok())
{
if entry.file_type().is_file() {
let path = entry.path().to_path_buf();
if let Ok(metadata) = entry.metadata() {
if let Ok(modified) = metadata.modified() {
let file_info = (path.clone(), modified);
current_files.insert(file_info.clone());
// Check if this is a new file or modified file
if !known_files.contains(&file_info) {
// Wait a bit to ensure file is fully written
if is_file_stable(&path).await {
debug!("Found new/modified file: {:?}", path);
if let Err(e) = process_file(&path, db, file_service, queue_service, config).await {
error!("Failed to process file {:?}: {}", path, e);
}
}
}
}
}
}
}
// Update known files
*known_files = current_files;
Ok(())
}
async fn is_file_stable(path: &Path) -> bool {
// Check if file size is stable (not currently being written)
if let Ok(metadata1) = tokio::fs::metadata(path).await {
let size1 = metadata1.len();
// Wait a short time
sleep(Duration::from_millis(500)).await;
if let Ok(metadata2) = tokio::fs::metadata(path).await {
let size2 = metadata2.len();
return size1 == size2;
}
}
// If we can't read metadata, assume it's not stable
false
}
async fn process_file(
path: &std::path::Path,
db: &Database,
@ -60,27 +228,81 @@ async fn process_file(
.unwrap_or("")
.to_string();
if !file_service.is_allowed_file_type(&filename, &config.allowed_file_types) {
// Skip hidden files, temporary files, and system files
if filename.starts_with('.') ||
filename.starts_with('~') ||
filename.ends_with(".tmp") ||
filename.ends_with(".temp") ||
filename.contains("$RECYCLE.BIN") ||
filename.contains("System Volume Information") {
debug!("Skipping system/temporary file: {}", filename);
return Ok(());
}
if !file_service.is_allowed_file_type(&filename, &config.allowed_file_types) {
debug!("Skipping file with disallowed type: {}", filename);
return Ok(());
}
// Check file age if configured
if let Some(max_age_hours) = config.max_file_age_hours {
if let Ok(metadata) = tokio::fs::metadata(path).await {
if let Ok(created) = metadata.created() {
let age = SystemTime::now().duration_since(created).unwrap_or_default();
if age.as_secs() > max_age_hours * 3600 {
debug!("Skipping old file: {} (age: {}h)", filename, age.as_secs() / 3600);
return Ok(());
}
}
}
}
info!("Processing new file: {:?}", path);
let file_data = tokio::fs::read(path).await?;
let file_size = file_data.len() as i64;
// Skip very large files (> 500MB by default)
const MAX_FILE_SIZE: i64 = 500 * 1024 * 1024;
if file_size > MAX_FILE_SIZE {
warn!("Skipping large file: {} ({} MB)", filename, file_size / 1024 / 1024);
return Ok(());
}
// Skip empty files
if file_size == 0 {
debug!("Skipping empty file: {}", filename);
return Ok(());
}
let mime_type = mime_guess::from_path(&filename)
.first_or_octet_stream()
.to_string();
let file_path = file_service.save_file(&filename, &file_data).await?;
// Check if file is OCR-able
if !is_ocr_able_file(&mime_type) {
debug!("Skipping non-OCR-able file: {} ({})", filename, mime_type);
return Ok(());
}
// Check for duplicate files (same filename and size)
if let Ok(existing_docs) = db.find_documents_by_filename(&filename).await {
for doc in existing_docs {
if doc.file_size == file_size {
info!("Skipping duplicate file: {} (already exists with same size)", filename);
return Ok(());
}
}
}
let saved_file_path = file_service.save_file(&filename, &file_data).await?;
let system_user_id = uuid::Uuid::parse_str("00000000-0000-0000-0000-000000000000")?;
let document = file_service.create_document(
&filename,
&filename,
&file_path,
&saved_file_path,
file_size,
&mime_type,
system_user_id,
@ -88,23 +310,47 @@ async fn process_file(
let created_doc = db.create_document(document).await?;
// Enqueue for OCR processing with priority based on file size
let priority = calculate_priority(file_size);
// Enqueue for OCR processing with priority based on file size and type
let priority = calculate_priority(file_size, &mime_type);
queue_service.enqueue_document(created_doc.id, priority, file_size).await?;
info!("Successfully queued file for OCR: {}", filename);
info!("Successfully queued file for OCR: {} (size: {} bytes)", filename, file_size);
Ok(())
}
/// Calculate priority based on file size (smaller files get higher priority)
fn calculate_priority(file_size: i64) -> i32 {
fn is_ocr_able_file(mime_type: &str) -> bool {
matches!(mime_type,
"application/pdf" |
"text/plain" |
"image/png" | "image/jpeg" | "image/jpg" | "image/tiff" | "image/bmp" | "image/gif" |
"application/msword" | "application/vnd.openxmlformats-officedocument.wordprocessingml.document"
)
}
/// Calculate priority based on file size and type (smaller files and images get higher priority)
fn calculate_priority(file_size: i64, mime_type: &str) -> i32 {
const MB: i64 = 1024 * 1024;
match file_size {
const MB5: i64 = 5 * 1024 * 1024;
const MB10: i64 = 10 * 1024 * 1024;
const MB50: i64 = 50 * 1024 * 1024;
let base_priority = match file_size {
0..=MB => 10, // <= 1MB: highest priority
..=5 * MB => 8, // 1-5MB: high priority
..=10 * MB => 6, // 5-10MB: medium priority
..=50 * MB => 4, // 10-50MB: low priority
..=MB5 => 8, // 1-5MB: high priority
..=MB10 => 6, // 5-10MB: medium priority
..=MB50 => 4, // 10-50MB: low priority
_ => 2, // > 50MB: lowest priority
}
};
// Boost priority for images (usually faster to OCR)
let type_boost = if mime_type.starts_with("image/") {
2
} else if mime_type == "text/plain" {
1
} else {
0
};
(base_priority + type_boost).min(10)
}

37
test_watch_folder.sh Executable file
View File

@ -0,0 +1,37 @@
#!/bin/bash
# Test script for watch folder functionality
echo "Testing watch folder functionality..."
# Create a test watch folder if it doesn't exist
mkdir -p ./watch
echo "Creating test files in watch folder..."
# Create a test text file
echo "This is a test document for OCR processing." > ./watch/test_document.txt
# Create a test PDF file (mock content)
echo "%PDF-1.4 Mock PDF for testing" > ./watch/test_document.pdf
# Create a test image file (mock content)
echo "Mock PNG image content" > ./watch/test_image.png
echo "Test files created in ./watch/ folder:"
ls -la ./watch/
echo ""
echo "Watch folder setup complete!"
echo "You can now:"
echo "1. Start the readur application"
echo "2. Copy OCR-able files to the ./watch/ folder"
echo "3. Monitor the logs to see files being processed"
echo ""
echo "Supported file types: PDF, PNG, JPG, JPEG, TIFF, BMP, TXT, DOC, DOCX"
echo ""
echo "Environment variables for configuration:"
echo "- WATCH_FOLDER: Path to watch folder (default: ./watch)"
echo "- WATCH_INTERVAL_SECONDS: Polling interval (default: 30)"
echo "- FILE_STABILITY_CHECK_MS: File stability check time (default: 500)"
echo "- MAX_FILE_AGE_HOURS: Skip files older than this (default: none)"
echo "- FORCE_POLLING_WATCH: Force polling mode (default: auto-detect)"