Readur/src/main.rs

491 lines
20 KiB
Rust
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

use axum::{
routing::get,
Router,
};
use sqlx::Row;
use std::sync::Arc;
use tower_http::{cors::CorsLayer, services::{ServeDir, ServeFile}};
use tracing::{info, error, warn};
use anyhow;
use readur::{config::Config, db::Database, AppState, *};
#[cfg(test)]
mod tests;
/// Determines the correct path for static files based on the environment
/// Checks multiple possible locations in order of preference
fn determine_static_files_path() -> std::path::PathBuf {
use std::path::PathBuf;
let current_dir = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."));
// List of possible static file locations in order of preference
let possible_paths = vec![
// Docker/production environment - frontend build copied to /app/frontend/dist
current_dir.join("frontend/dist"),
// Development environment - frontend build in local frontend/dist
PathBuf::from("frontend/dist"),
// Alternative development setup
current_dir.join("../frontend/dist"),
// Fallback to current directory if somehow the build is there
current_dir.join("dist"),
// Last resort fallback
PathBuf::from("dist"),
];
for path in possible_paths {
let index_path = path.join("index.html");
if index_path.exists() && index_path.is_file() {
info!("Found static files at: {}", path.display());
return path;
} else {
info!("Static files not found at: {} (index.html exists: {})",
path.display(), index_path.exists());
}
}
// If no valid path found, default to frontend/dist and let it fail gracefully
warn!("No valid static files directory found, defaulting to 'frontend/dist'");
PathBuf::from("frontend/dist")
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// Initialize logging with custom filters to reduce spam from pdf_extract crate
// Users can override with RUST_LOG environment variable, e.g.:
// RUST_LOG=debug cargo run (enable debug for all)
// RUST_LOG=readur=debug,pdf_extract=error (debug for readur, suppress pdf_extract)
// RUST_LOG=pdf_extract=off (completely silence pdf_extract)
let env_filter = tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| {
// Default filter when RUST_LOG is not set
tracing_subscriber::EnvFilter::new("info")
.add_directive("pdf_extract=error".parse().unwrap()) // Suppress pdf_extract WARN spam
.add_directive("readur=info".parse().unwrap()) // Keep our app logs at info
});
tracing_subscriber::fmt()
.with_env_filter(env_filter)
.init();
println!("\n🚀 READUR APPLICATION STARTUP");
println!("{}", "=".repeat(60));
// Load and validate configuration with comprehensive logging
let config = match Config::from_env() {
Ok(cfg) => {
println!("✅ Configuration loaded and validated successfully");
cfg
}
Err(e) => {
println!("❌ CRITICAL: Configuration loading failed!");
println!("Error: {}", e);
println!("\n🔧 Please check your environment variables and fix the configuration issues above.");
return Err(e);
}
};
// Log critical configuration values that affect startup
println!("\n🔗 STARTUP CONFIGURATION:");
println!("{}", "=".repeat(50));
println!("🌐 Server will start on: {}", config.server_address);
// Parse database URL safely without exposing credentials
let db_info = if let Some(at_pos) = config.database_url.find('@') {
let host_part = &config.database_url[at_pos + 1..];
let protocol = if config.database_url.starts_with("postgresql://") { "postgresql" } else { "postgres" };
// Extract just username from credentials part (before @)
let creds_part = &config.database_url[..at_pos];
let username = if let Some(proto_end) = creds_part.find("://") {
let after_proto = &creds_part[proto_end + 3..];
if let Some(colon_pos) = after_proto.find(':') {
&after_proto[..colon_pos]
} else {
after_proto
}
} else {
"unknown"
};
format!("{}://{}:***@{}", protocol, username, host_part)
} else {
"Invalid database URL format".to_string()
};
println!("🗄️ Database connection: {}", db_info);
println!("📁 Upload directory: {}", config.upload_path);
println!("👁️ Watch directory: {}", config.watch_folder);
// Initialize upload directory structure
info!("Initializing upload directory structure...");
let file_service = readur::services::file_service::FileService::new(config.upload_path.clone());
if let Err(e) = file_service.initialize_directory_structure().await {
error!("Failed to initialize directory structure: {}", e);
return Err(e.into());
}
info!("✅ Upload directory structure initialized");
// Migrate existing files to new structure (one-time operation)
info!("Migrating existing files to structured directories...");
if let Err(e) = file_service.migrate_existing_files().await {
warn!("Failed to migrate some existing files: {}", e);
// Don't fail startup for migration issues
}
// Create separate database pools for different workloads
println!("\n🗄️ DATABASE CONNECTION:");
println!("{}", "=".repeat(50));
let web_db = match Database::new_with_pool_config(&config.database_url, 20, 2).await {
Ok(db) => {
println!("✅ Web database pool created (max: 20 connections, min idle: 2)");
db
}
Err(e) => {
println!("❌ CRITICAL: Failed to connect to database for web operations!");
println!("Database URL: {}", db_info); // Use the already-masked URL
println!("Error: {}", e);
println!("\n🔧 Please verify:");
println!(" - Database server is running");
println!(" - DATABASE_URL is correct");
println!(" - Database credentials are valid");
println!(" - Network connectivity to database");
return Err(e.into());
}
};
let background_db = match Database::new_with_pool_config(&config.database_url, 30, 3).await {
Ok(db) => {
println!("✅ Background database pool created (max: 30 connections, min idle: 3)");
db
}
Err(e) => {
println!("❌ CRITICAL: Failed to connect to database for background operations!");
println!("Error: {}", e);
return Err(e.into());
}
};
// Don't run the old migration system - let SQLx handle everything
// db.migrate().await?;
// Run SQLx migrations
info!("Running SQLx migrations...");
let migrations = sqlx::migrate!("./migrations");
info!("Found {} migrations", migrations.migrations.len());
for migration in migrations.migrations.iter() {
info!("Migration available: {} - {}", migration.version, migration.description);
}
// Check current migration status
let applied_result = sqlx::query("SELECT version, description FROM _sqlx_migrations ORDER BY version")
.fetch_all(web_db.get_pool())
.await;
match applied_result {
Ok(rows) => {
info!("Currently applied migrations:");
for row in rows {
let version: i64 = row.get("version");
let description: String = row.get("description");
info!(" - {} {}", version, description);
}
}
Err(e) => {
info!("No existing migrations found (this is normal for first run): {}", e);
}
}
// Check if ocr_error column exists
let check_column = sqlx::query("SELECT column_name FROM information_schema.columns WHERE table_name = 'documents' AND column_name = 'ocr_error'")
.fetch_optional(web_db.get_pool())
.await;
match check_column {
Ok(Some(_)) => info!("✅ ocr_error column exists"),
Ok(None) => {
error!("❌ ocr_error column is missing! Migration 006 may not have been applied.");
// Try to add the column manually as a fallback
info!("Attempting to add missing columns...");
if let Err(e) = sqlx::query("ALTER TABLE documents ADD COLUMN IF NOT EXISTS ocr_error TEXT")
.execute(web_db.get_pool())
.await {
error!("Failed to add ocr_error column: {}", e);
}
if let Err(e) = sqlx::query("ALTER TABLE documents ADD COLUMN IF NOT EXISTS ocr_completed_at TIMESTAMPTZ")
.execute(web_db.get_pool())
.await {
error!("Failed to add ocr_completed_at column: {}", e);
}
info!("Fallback column addition completed");
}
Err(e) => error!("Failed to check for ocr_error column: {}", e),
}
let result = migrations.run(web_db.get_pool()).await;
match result {
Ok(_) => info!("SQLx migrations completed successfully"),
Err(e) => {
error!("Failed to run SQLx migrations: {}", e);
return Err(e.into());
}
}
// Debug: Check what columns exist in documents table
let columns_result = sqlx::query(
"SELECT column_name FROM information_schema.columns
WHERE table_name = 'documents' AND table_schema = 'public'
ORDER BY ordinal_position"
)
.fetch_all(web_db.get_pool())
.await;
match columns_result {
Ok(rows) => {
info!("Columns in documents table:");
for row in rows {
let column_name: String = row.get("column_name");
info!(" - {}", column_name);
}
}
Err(e) => {
error!("Failed to check columns: {}", e);
}
}
// Seed admin user
seed::seed_admin_user(&background_db).await?;
// Reset any running WebDAV syncs from previous server instance using background DB
match background_db.reset_running_webdav_syncs().await {
Ok(count) => {
if count > 0 {
info!("Reset {} orphaned WebDAV sync states from server restart", count);
}
}
Err(e) => {
warn!("Failed to reset running WebDAV syncs: {}", e);
}
}
// Reset any running universal source syncs from previous server instance
match background_db.reset_running_source_syncs().await {
Ok(count) => {
if count > 0 {
info!("Reset {} orphaned source sync states from server restart", count);
}
}
Err(e) => {
warn!("Failed to reset running source syncs: {}", e);
}
}
// Create shared OCR queue service for both web and background operations
let concurrent_jobs = 15; // Limit concurrent OCR jobs to prevent DB pool exhaustion
let shared_queue_service = Arc::new(readur::ocr::queue::OcrQueueService::new(
background_db.clone(),
background_db.get_pool().clone(),
concurrent_jobs
));
// Initialize OIDC client if enabled
let oidc_client = if config.oidc_enabled {
match readur::oidc::OidcClient::new(&config).await {
Ok(client) => {
println!("✅ OIDC client initialized successfully");
Some(Arc::new(client))
}
Err(e) => {
error!("❌ Failed to initialize OIDC client: {}", e);
println!("❌ OIDC authentication will be disabled");
None
}
}
} else {
println!(" OIDC authentication is disabled");
None
};
// Create web-facing state with shared queue service
let web_state = AppState {
db: web_db,
config: config.clone(),
webdav_scheduler: None, // Will be set after creating scheduler
source_scheduler: None, // Will be set after creating scheduler
queue_service: shared_queue_service.clone(),
oidc_client: oidc_client.clone(),
};
let web_state = Arc::new(web_state);
// Create background state with shared queue service
let background_state = AppState {
db: background_db,
config: config.clone(),
webdav_scheduler: None,
source_scheduler: None,
queue_service: shared_queue_service.clone(),
oidc_client: oidc_client.clone(),
};
let background_state = Arc::new(background_state);
let watcher_config = config.clone();
let watcher_db = background_state.db.clone();
tokio::spawn(async move {
if let Err(e) = readur::scheduling::watcher::start_folder_watcher(watcher_config, watcher_db).await {
error!("Folder watcher error: {}", e);
}
});
// Create dedicated runtime for OCR processing to prevent interference with WebDAV
let ocr_runtime = tokio::runtime::Builder::new_multi_thread()
.worker_threads(3) // Dedicated threads for OCR work
.thread_name("readur-ocr")
.enable_all()
.build()?;
// Create separate runtime for other background tasks (WebDAV, maintenance)
let background_runtime = tokio::runtime::Builder::new_multi_thread()
.worker_threads(2) // Dedicated threads for WebDAV and maintenance
.thread_name("readur-background")
.enable_all()
.build()?;
// Create dedicated runtime for database-heavy operations
let db_runtime = tokio::runtime::Builder::new_multi_thread()
.worker_threads(2) // Dedicated threads for intensive DB operations
.thread_name("readur-db")
.enable_all()
.build()?;
// Start OCR queue worker on dedicated OCR runtime using shared queue service
let queue_worker = shared_queue_service.clone();
ocr_runtime.spawn(async move {
if let Err(e) = queue_worker.start_worker().await {
error!("OCR queue worker error: {}", e);
}
});
// Start OCR maintenance tasks on dedicated OCR runtime
let queue_maintenance = shared_queue_service.clone();
ocr_runtime.spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(300)); // Every 5 minutes
loop {
interval.tick().await;
// Recover stale items (older than 10 minutes)
if let Err(e) = queue_maintenance.recover_stale_items(10).await {
error!("Error recovering stale items: {}", e);
}
// Clean up old completed items (older than 7 days)
if let Err(e) = queue_maintenance.cleanup_completed(7).await {
error!("Error cleaning up completed items: {}", e);
}
}
});
// Create universal source scheduler with background state (handles WebDAV, Local, S3)
println!("\n📅 SCHEDULER INITIALIZATION:");
println!("{}", "=".repeat(50));
let source_scheduler = Arc::new(readur::scheduling::source_scheduler::SourceScheduler::new(background_state.clone()));
println!("✅ Universal source scheduler created (handles WebDAV, Local, S3)");
// Keep WebDAV scheduler for backward compatibility with existing WebDAV endpoints
let webdav_scheduler = Arc::new(readur::scheduling::webdav_scheduler::WebDAVScheduler::new(background_state.clone()));
println!("✅ Legacy WebDAV scheduler created (backward compatibility)");
// Update the web state to include scheduler references
let updated_web_state = AppState {
db: web_state.db.clone(),
config: web_state.config.clone(),
webdav_scheduler: Some(webdav_scheduler.clone()),
source_scheduler: Some(source_scheduler.clone()),
queue_service: shared_queue_service.clone(),
oidc_client: oidc_client.clone(),
};
let web_state = Arc::new(updated_web_state);
// Start universal source scheduler on background runtime
println!("⏰ Scheduling background source sync to start in 30 seconds");
let scheduler_for_background = source_scheduler.clone();
background_runtime.spawn(async move {
info!("Starting universal source sync scheduler with 30-second startup delay");
// Wait 30 seconds before starting scheduler to allow server to fully initialize
tokio::time::sleep(tokio::time::Duration::from_secs(30)).await;
info!("🔄 Universal source sync scheduler starting after startup delay - this will check for WebDAV sources!");
scheduler_for_background.start().await;
});
// Determine the correct static files path for SPA serving
let static_dir = determine_static_files_path();
let index_file = static_dir.join("index.html");
info!("Using static files directory: {}", static_dir.display());
info!("Using index.html file: {}", index_file.display());
// Create the router with the updated state
let app = Router::new()
.route("/api/health", get(readur::health_check))
.nest("/api/auth", readur::routes::auth::router())
.nest("/api/documents", readur::routes::documents::router())
.nest("/api/ignored-files", readur::routes::ignored_files::ignored_files_routes())
.nest("/api/labels", readur::routes::labels::router())
.nest("/api/metrics", readur::routes::metrics::router())
.nest("/metrics", readur::routes::prometheus_metrics::router())
.nest("/api/notifications", readur::routes::notifications::router())
.nest("/api/queue", readur::routes::queue::router())
.nest("/api/search", readur::routes::search::router())
.nest("/api/settings", readur::routes::settings::router())
.nest("/api/sources", readur::routes::sources::router())
.nest("/api/users", readur::routes::users::router())
.nest("/api/webdav", readur::routes::webdav::router())
.merge(readur::swagger::create_swagger_router())
.fallback_service(
ServeDir::new(&static_dir)
.precompressed_gzip()
.precompressed_br()
.fallback(ServeFile::new(&index_file))
)
.layer(CorsLayer::permissive())
.with_state(web_state.clone());
println!("\n🌐 STARTING HTTP SERVER:");
println!("{}", "=".repeat(50));
let listener = match tokio::net::TcpListener::bind(&config.server_address).await {
Ok(listener) => {
println!("✅ HTTP server bound to: {}", config.server_address);
listener
}
Err(e) => {
println!("❌ CRITICAL: Failed to bind to address: {}", config.server_address);
println!("Error: {}", e);
println!("\n🔧 Please check:");
println!(" - Address {} is not already in use", config.server_address);
println!(" - SERVER_HOST and SERVER_PORT environment variables are correct");
println!(" - You have permission to bind to this address");
return Err(e.into());
}
};
println!("\n🎉 READUR APPLICATION READY!");
println!("{}", "=".repeat(60));
println!("🌐 Server: http://{}", config.server_address);
println!("📁 Upload Directory: {}", config.upload_path);
println!("👁️ Watch Directory: {}", config.watch_folder);
println!("🔄 Source Scheduler: Will start in 30 seconds");
println!("📋 Check logs above for any configuration warnings");
println!("{}", "=".repeat(60));
info!("🚀 Readur server is now running and accepting connections");
axum::serve(listener, app).await?;
Ok(())
}