use axum::{ http::StatusCode, response::{Json, Html}, routing::get, Router, }; use sqlx::Row; use std::sync::Arc; use tower_http::{cors::CorsLayer, services::{ServeDir, ServeFile}}; use tracing::{info, error}; mod auth; mod batch_ingest; mod config; mod db; mod enhanced_ocr; mod file_service; mod models; mod ocr; mod ocr_queue; mod routes; mod seed; mod swagger; mod watcher; mod webdav_service; mod webdav_scheduler; mod webdav_xml_parser; #[cfg(test)] mod tests; use config::Config; use db::Database; #[derive(Clone)] pub struct AppState { pub db: Database, pub config: Config, } #[tokio::main] async fn main() -> Result<(), Box> { tracing_subscriber::fmt::init(); let config = Config::from_env()?; let db = Database::new(&config.database_url).await?; // Don't run the old migration system - let SQLx handle everything // db.migrate().await?; // Run SQLx migrations info!("Running SQLx migrations..."); let migrations = sqlx::migrate!("./migrations"); info!("Found {} migrations", migrations.migrations.len()); for migration in migrations.migrations.iter() { info!("Migration available: {} - {}", migration.version, migration.description); } // Check current migration status let applied_result = sqlx::query("SELECT version, description FROM _sqlx_migrations ORDER BY version") .fetch_all(&db.pool) .await; match applied_result { Ok(rows) => { info!("Currently applied migrations:"); for row in rows { let version: i64 = row.get("version"); let description: String = row.get("description"); info!(" - {} {}", version, description); } } Err(e) => { info!("No existing migrations found (this is normal for first run): {}", e); } } // Check if ocr_error column exists let check_column = sqlx::query("SELECT column_name FROM information_schema.columns WHERE table_name = 'documents' AND column_name = 'ocr_error'") .fetch_optional(&db.pool) .await; match check_column { Ok(Some(_)) => info!("✅ ocr_error column exists"), Ok(None) => { error!("❌ ocr_error column is missing! Migration 006 may not have been applied."); // Try to add the column manually as a fallback info!("Attempting to add missing columns..."); if let Err(e) = sqlx::query("ALTER TABLE documents ADD COLUMN IF NOT EXISTS ocr_error TEXT") .execute(&db.pool) .await { error!("Failed to add ocr_error column: {}", e); } if let Err(e) = sqlx::query("ALTER TABLE documents ADD COLUMN IF NOT EXISTS ocr_completed_at TIMESTAMPTZ") .execute(&db.pool) .await { error!("Failed to add ocr_completed_at column: {}", e); } info!("Fallback column addition completed"); } Err(e) => error!("Failed to check for ocr_error column: {}", e), } let result = migrations.run(&db.pool).await; match result { Ok(_) => info!("SQLx migrations completed successfully"), Err(e) => { error!("Failed to run SQLx migrations: {}", e); return Err(e.into()); } } // Debug: Check what columns exist in documents table let columns_result = sqlx::query( "SELECT column_name FROM information_schema.columns WHERE table_name = 'documents' AND table_schema = 'public' ORDER BY ordinal_position" ) .fetch_all(&db.pool) .await; match columns_result { Ok(rows) => { info!("Columns in documents table:"); for row in rows { let column_name: String = row.get("column_name"); info!(" - {}", column_name); } } Err(e) => { error!("Failed to check columns: {}", e); } } // Seed admin user seed::seed_admin_user(&db).await?; // Seed system user for watcher seed::seed_system_user(&db).await?; let state = AppState { db, config: config.clone() }; let state = Arc::new(state); let app = Router::new() .route("/api/health", get(readur::health_check)) .nest("/api/auth", routes::auth::router()) .nest("/api/documents", routes::documents::router()) .nest("/api/metrics", routes::metrics::router()) .nest("/api/notifications", routes::notifications::router()) .nest("/api/queue", routes::queue::router()) .nest("/api/search", routes::search::router()) .nest("/api/settings", routes::settings::router()) .nest("/api/users", routes::users::router()) .nest("/api/webdav", routes::webdav::router()) .merge(swagger::create_swagger_router()) .nest_service("/", ServeDir::new("/app/frontend").fallback(ServeFile::new("/app/frontend/index.html"))) .fallback(serve_spa) .layer(CorsLayer::permissive()) .with_state(state.clone()); let watcher_config = config.clone(); tokio::spawn(async move { if let Err(e) = watcher::start_folder_watcher(watcher_config).await { error!("Folder watcher error: {}", e); } }); // Start OCR queue worker let queue_db = Database::new(&config.database_url).await?; let queue_pool = sqlx::PgPool::connect(&config.database_url).await?; let concurrent_jobs = 4; // TODO: Get from config/settings let queue_service = Arc::new(ocr_queue::OcrQueueService::new(queue_db, queue_pool, concurrent_jobs)); let queue_worker = queue_service.clone(); tokio::spawn(async move { if let Err(e) = queue_worker.start_worker().await { error!("OCR queue worker error: {}", e); } }); // Start maintenance tasks let queue_maintenance = queue_service.clone(); tokio::spawn(async move { let mut interval = tokio::time::interval(std::time::Duration::from_secs(300)); // Every 5 minutes loop { interval.tick().await; // Recover stale items (older than 10 minutes) if let Err(e) = queue_maintenance.recover_stale_items(10).await { error!("Error recovering stale items: {}", e); } // Clean up old completed items (older than 7 days) if let Err(e) = queue_maintenance.cleanup_completed(7).await { error!("Error cleaning up completed items: {}", e); } } }); // Start WebDAV background sync scheduler let webdav_scheduler = webdav_scheduler::WebDAVScheduler::new(state.clone()); tokio::spawn(async move { info!("Starting WebDAV background sync scheduler"); webdav_scheduler.start().await; }); let listener = tokio::net::TcpListener::bind(&config.server_address).await?; info!("Server starting on {}", config.server_address); axum::serve(listener, app).await?; Ok(()) } async fn serve_spa() -> Result, StatusCode> { match tokio::fs::read_to_string("/app/frontend/index.html").await { Ok(html) => Ok(Html(html)), Err(_) => Err(StatusCode::NOT_FOUND), } }