feat(server): reorganize components into their own modules and fix imports

This commit is contained in:
perf3ct 2025-06-27 18:27:42 +00:00
parent 193327144e
commit 9a8bf72ff7
37 changed files with 112 additions and 105 deletions

View File

@ -4,11 +4,11 @@ use std::path::Path;
use uuid::Uuid;
use readur::{
batch_ingest::BatchIngester,
ingestion::batch_ingest::BatchIngester,
config::Config,
db::Database,
file_service::FileService,
ocr_queue::OcrQueueService,
services::file_service::FileService,
ocr::queue::OcrQueueService,
};
#[tokio::main]

View File

@ -10,9 +10,9 @@ use walkdir::WalkDir;
use crate::{
config::Config,
db::Database,
file_service::FileService,
document_ingestion::{DocumentIngestionService, IngestionResult},
ocr_queue::OcrQueueService,
services::file_service::FileService,
ingestion::document_ingestion::{DocumentIngestionService, IngestionResult},
ocr::queue::OcrQueueService,
};
pub struct BatchIngester {

View File

@ -12,7 +12,7 @@ use tracing::{debug, info, warn};
use crate::models::Document;
use crate::db::Database;
use crate::file_service::FileService;
use crate::services::file_service::FileService;
#[derive(Debug, Clone)]
pub enum DeduplicationPolicy {

2
src/ingestion/mod.rs Normal file
View File

@ -0,0 +1,2 @@
pub mod batch_ingest;
pub mod document_ingestion;

View File

@ -1,32 +1,17 @@
pub mod auth;
pub mod batch_ingest;
pub mod config;
pub mod db;
pub mod db_guardrails_simple;
pub mod document_ingestion;
pub mod enhanced_ocr;
pub mod error_management;
pub mod file_service;
pub mod local_folder_service;
pub mod ingestion;
pub mod models;
pub mod monitoring;
pub mod ocr;
pub mod oidc;
pub mod ocr_api;
pub mod ocr_enhanced;
pub mod ocr_error;
pub mod ocr_health;
pub mod ocr_queue;
pub mod ocr_tests;
pub mod request_throttler;
pub mod routes;
pub mod s3_service;
pub mod scheduling;
pub mod seed;
pub mod source_scheduler;
pub mod source_sync;
pub mod services;
pub mod swagger;
pub mod watcher;
pub mod webdav_service;
pub mod webdav_scheduler;
pub mod webdav_xml_parser;
#[cfg(test)]
@ -45,9 +30,9 @@ use oidc::OidcClient;
pub struct AppState {
pub db: Database,
pub config: Config,
pub webdav_scheduler: Option<std::sync::Arc<webdav_scheduler::WebDAVScheduler>>,
pub source_scheduler: Option<std::sync::Arc<source_scheduler::SourceScheduler>>,
pub queue_service: std::sync::Arc<ocr_queue::OcrQueueService>,
pub webdav_scheduler: Option<std::sync::Arc<scheduling::webdav_scheduler::WebDAVScheduler>>,
pub source_scheduler: Option<std::sync::Arc<scheduling::source_scheduler::SourceScheduler>>,
pub queue_service: std::sync::Arc<ocr::queue::OcrQueueService>,
pub oidc_client: Option<std::sync::Arc<OidcClient>>,
}

View File

@ -119,7 +119,7 @@ async fn main() -> anyhow::Result<()> {
// Initialize upload directory structure
info!("Initializing upload directory structure...");
let file_service = readur::file_service::FileService::new(config.upload_path.clone());
let file_service = readur::services::file_service::FileService::new(config.upload_path.clone());
if let Err(e) = file_service.initialize_directory_structure().await {
error!("Failed to initialize directory structure: {}", e);
return Err(e.into());
@ -284,7 +284,7 @@ async fn main() -> anyhow::Result<()> {
// Create shared OCR queue service for both web and background operations
let concurrent_jobs = 15; // Limit concurrent OCR jobs to prevent DB pool exhaustion
let shared_queue_service = Arc::new(readur::ocr_queue::OcrQueueService::new(
let shared_queue_service = Arc::new(readur::ocr::queue::OcrQueueService::new(
background_db.clone(),
background_db.get_pool().clone(),
concurrent_jobs
@ -333,7 +333,7 @@ async fn main() -> anyhow::Result<()> {
let watcher_config = config.clone();
let watcher_db = background_state.db.clone();
tokio::spawn(async move {
if let Err(e) = readur::watcher::start_folder_watcher(watcher_config, watcher_db).await {
if let Err(e) = readur::scheduling::watcher::start_folder_watcher(watcher_config, watcher_db).await {
error!("Folder watcher error: {}", e);
}
});
@ -390,11 +390,11 @@ async fn main() -> anyhow::Result<()> {
println!("\n📅 SCHEDULER INITIALIZATION:");
println!("{}", "=".repeat(50));
let source_scheduler = Arc::new(readur::source_scheduler::SourceScheduler::new(background_state.clone()));
let source_scheduler = Arc::new(readur::scheduling::source_scheduler::SourceScheduler::new(background_state.clone()));
println!("✅ Universal source scheduler created (handles WebDAV, Local, S3)");
// Keep WebDAV scheduler for backward compatibility with existing WebDAV endpoints
let webdav_scheduler = Arc::new(readur::webdav_scheduler::WebDAVScheduler::new(background_state.clone()));
let webdav_scheduler = Arc::new(readur::scheduling::webdav_scheduler::WebDAVScheduler::new(background_state.clone()));
println!("✅ Legacy WebDAV scheduler created (backward compatibility)");
// Update the web state to include scheduler references

View File

@ -5,7 +5,7 @@
* and automatic alerting for potential issues.
*/
use sqlx::PgPool;
use sqlx::{PgPool, Row};
use serde::{Deserialize, Serialize};
use tokio::time::{Duration, interval};
use tracing::{error, warn, info, debug};
@ -166,7 +166,7 @@ impl DatabaseMonitor {
}
async fn check_ocr_processing_health(&self) -> Result<OcrProcessingHealth> {
let stats = sqlx::query!(
let stats = sqlx::query(
r#"
SELECT
COUNT(*) FILTER (WHERE ocr_status = 'pending') as pending,
@ -182,13 +182,13 @@ impl DatabaseMonitor {
.fetch_one(&self.pool)
.await?;
let pending = stats.pending.unwrap_or(0) as i32;
let processing = stats.processing.unwrap_or(0) as i32;
let stuck = stats.stuck.unwrap_or(0) as i32;
let failed_recent = stats.failed_recent.unwrap_or(0) as i32;
let avg_confidence = stats.avg_confidence;
let avg_time = stats.avg_time;
let throughput = stats.completed_last_minute.unwrap_or(0) as f64;
let pending = stats.get::<Option<i64>, _>("pending").unwrap_or(0) as i32;
let processing = stats.get::<Option<i64>, _>("processing").unwrap_or(0) as i32;
let stuck = stats.get::<Option<i64>, _>("stuck").unwrap_or(0) as i32;
let failed_recent = stats.get::<Option<i64>, _>("failed_recent").unwrap_or(0) as i32;
let avg_confidence = stats.get::<Option<f64>, _>("avg_confidence");
let avg_time = stats.get::<Option<f64>, _>("avg_time");
let throughput = stats.get::<Option<i64>, _>("completed_last_minute").unwrap_or(0) as f64;
let status = if stuck > 0 || failed_recent > 10 {
HealthStatus::Critical
@ -211,7 +211,7 @@ impl DatabaseMonitor {
}
async fn check_queue_health(&self) -> Result<QueueHealth> {
let queue_stats = sqlx::query!(
let queue_stats = sqlx::query(
r#"
SELECT
COUNT(*) as total_items,
@ -224,9 +224,9 @@ impl DatabaseMonitor {
.fetch_one(&self.pool)
.await?;
let queue_size = queue_stats.total_items.unwrap_or(0) as i32;
let oldest_pending = queue_stats.oldest_pending_minutes.map(|m| m as i32);
let worker_count = queue_stats.active_workers.unwrap_or(0) as i32;
let queue_size = queue_stats.get::<Option<i64>, _>("total_items").unwrap_or(0) as i32;
let oldest_pending = queue_stats.get::<Option<f64>, _>("oldest_pending_minutes").map(|m| m as i32);
let worker_count = queue_stats.get::<Option<i64>, _>("active_workers").unwrap_or(0) as i32;
// Calculate queue growth rate (simplified)
let growth_rate = 0.0; // Would need historical data for accurate calculation
@ -252,14 +252,14 @@ impl DatabaseMonitor {
let start = std::time::Instant::now();
// Test pool responsiveness
sqlx::query!("SELECT 1")
sqlx::query("SELECT 1")
.fetch_one(&self.pool)
.await?;
let response_time = start.elapsed().as_millis() as u64;
let total_connections = self.pool.size();
let idle_connections = self.pool.num_idle();
let total_connections = self.pool.size() as u32;
let idle_connections = self.pool.num_idle() as u32;
let active_connections = total_connections - idle_connections;
let utilization = if total_connections > 0 {
(active_connections as f64 / total_connections as f64 * 100.0) as u8
@ -286,7 +286,7 @@ impl DatabaseMonitor {
}
async fn check_data_consistency(&self) -> Result<ConsistencyHealth> {
let consistency_check = sqlx::query!(
let consistency_check = sqlx::query(
r#"
SELECT
-- Orphaned queue items
@ -306,9 +306,9 @@ impl DatabaseMonitor {
.fetch_one(&self.pool)
.await?;
let orphaned = consistency_check.orphaned_queue.unwrap_or(0) as i32;
let missing_files = consistency_check.missing_files.unwrap_or(0) as i32;
let inconsistent = consistency_check.inconsistent_states.unwrap_or(0) as i32;
let orphaned = consistency_check.get::<Option<i64>, _>("orphaned_queue").unwrap_or(0) as i32;
let missing_files = consistency_check.get::<Option<i32>, _>("missing_files").unwrap_or(0) as i32;
let inconsistent = consistency_check.get::<Option<i64>, _>("inconsistent_states").unwrap_or(0) as i32;
let total_issues = orphaned + missing_files + inconsistent;
let integrity_score = if total_issues == 0 { 100.0 } else { 100.0 - (total_issues as f64 * 10.0).min(100.0) };
@ -430,18 +430,18 @@ impl DatabaseMonitor {
}
async fn reset_stuck_jobs(&self) -> Result<i32> {
let result = sqlx::query!(
"SELECT reset_stuck_ocr_jobs($1) as reset_count",
self.config.stuck_job_threshold_minutes
let result = sqlx::query(
"SELECT reset_stuck_ocr_jobs($1) as reset_count"
)
.bind(self.config.stuck_job_threshold_minutes)
.fetch_one(&self.pool)
.await?;
Ok(result.reset_count.unwrap_or(0))
Ok(result.get::<Option<i32>, _>("reset_count").unwrap_or(0))
}
async fn cleanup_orphaned_items(&self) -> Result<i32> {
let result = sqlx::query!(
let result = sqlx::query(
r#"
DELETE FROM ocr_queue
WHERE document_id NOT IN (SELECT id FROM documents)
@ -464,7 +464,7 @@ impl DatabaseMonitor {
let cleanup_count = self.cleanup_orphaned_items().await?;
// Refresh OCR stats
sqlx::query!("SELECT refresh_ocr_stats()")
sqlx::query("SELECT refresh_ocr_stats()")
.execute(&self.pool)
.await?;

3
src/monitoring/mod.rs Normal file
View File

@ -0,0 +1,3 @@
pub mod db_monitoring;
pub mod error_management;
pub mod request_throttler;

View File

@ -1,5 +1,5 @@
use crate::ocr_enhanced::EnhancedOcrService;
use crate::ocr_error::OcrError;
use crate::ocr::enhanced_processing::EnhancedOcrService;
use crate::ocr::error::OcrError;
use crate::AppState;
use axum::{
extract::State,
@ -38,7 +38,7 @@ pub async fn health_check(
let service = EnhancedOcrService::new();
let diagnostics = service.get_diagnostics().await;
let health_checker = crate::ocr_health::OcrHealthChecker::new();
let health_checker = crate::ocr::health::OcrHealthChecker::new();
match health_checker.perform_full_health_check() {
Ok(diag) => {

View File

@ -15,7 +15,7 @@ use imageproc::{
use tesseract::{Tesseract, PageSegMode, OcrEngineMode};
use crate::models::Settings;
use crate::file_service::FileService;
use crate::services::file_service::FileService;
#[derive(Debug, Clone)]
pub struct ImageQualityStats {

View File

@ -1,5 +1,5 @@
use crate::ocr_error::OcrError;
use crate::ocr_health::OcrHealthChecker;
use crate::ocr::error::OcrError;
use crate::ocr::health::OcrHealthChecker;
use anyhow::{anyhow, Result};
use image::DynamicImage;
use std::path::Path;

View File

@ -1,4 +1,4 @@
use crate::ocr_error::{CpuFeatures, OcrDiagnostics, OcrError};
use crate::ocr::error::{CpuFeatures, OcrDiagnostics, OcrError};
use std::process::Command;
use std::env;
use std::path::Path;

View File

@ -1,8 +1,16 @@
pub mod api;
pub mod enhanced;
pub mod enhanced_processing;
pub mod error;
pub mod health;
pub mod queue;
pub mod tests;
use anyhow::{anyhow, Result};
use std::path::Path;
use std::panic::{catch_unwind, AssertUnwindSafe};
use crate::ocr_error::OcrError;
use crate::ocr_health::OcrHealthChecker;
use crate::ocr::error::OcrError;
use crate::ocr::health::OcrHealthChecker;
#[cfg(feature = "ocr")]
use tesseract::Tesseract;

View File

@ -9,7 +9,7 @@ use tokio::time::{sleep, Duration};
use tracing::{error, info, warn};
use uuid::Uuid;
use crate::{db::Database, enhanced_ocr::EnhancedOcrService, db_guardrails_simple::DocumentTransactionManager, request_throttler::RequestThrottler};
use crate::{db::Database, ocr::enhanced::EnhancedOcrService, db_guardrails_simple::DocumentTransactionManager, monitoring::request_throttler::RequestThrottler};
#[derive(Debug, Clone, Serialize, Deserialize, FromRow)]
pub struct OcrQueueItem {
@ -557,7 +557,7 @@ impl OcrQueueService {
use std::path::Path;
// Use the FileService to get the proper processed images directory
use crate::file_service::FileService;
use crate::services::file_service::FileService;
let base_upload_dir = std::env::var("UPLOAD_PATH").unwrap_or_else(|_| "uploads".to_string());
let file_service = FileService::new(base_upload_dir);
let processed_images_dir = file_service.get_processed_images_path();

View File

@ -1,9 +1,9 @@
#[cfg(test)]
mod tests {
use super::super::*;
use crate::ocr_error::{OcrError, OcrDiagnostics, CpuFeatures};
use crate::ocr_health::OcrHealthChecker;
use crate::ocr_enhanced::EnhancedOcrService;
use crate::ocr::error::{OcrError, OcrDiagnostics, CpuFeatures};
use crate::ocr::health::OcrHealthChecker;
use crate::ocr::enhanced_processing::EnhancedOcrService;
use std::env;
use tempfile::TempDir;
use std::fs;

View File

@ -12,8 +12,8 @@ use sqlx::Row;
use crate::{
auth::AuthUser,
document_ingestion::{DocumentIngestionService, IngestionResult},
file_service::FileService,
ingestion::document_ingestion::{DocumentIngestionService, IngestionResult},
services::file_service::FileService,
models::DocumentResponse,
AppState,
};

View File

@ -134,7 +134,7 @@ async fn collect_database_metrics(state: &Arc<AppState>) -> Result<DatabaseMetri
async fn collect_ocr_metrics(state: &Arc<AppState>) -> Result<OcrMetrics, StatusCode> {
// Use existing OCR queue statistics
use crate::ocr_queue::OcrQueueService;
use crate::ocr::queue::OcrQueueService;
let queue_service = OcrQueueService::new(
state.db.clone(),

View File

@ -288,7 +288,7 @@ async fn collect_document_metrics(state: &Arc<AppState>) -> Result<DocumentMetri
}
async fn collect_ocr_metrics(state: &Arc<AppState>) -> Result<OcrMetrics, StatusCode> {
use crate::ocr_queue::OcrQueueService;
use crate::ocr::queue::OcrQueueService;
let queue_service = OcrQueueService::new(
state.db.clone(),

View File

@ -7,7 +7,7 @@ use axum::{
};
use std::sync::Arc;
use crate::{auth::AuthUser, ocr_queue::OcrQueueService, AppState, models::UserRole};
use crate::{auth::AuthUser, ocr::queue::OcrQueueService, AppState, models::UserRole};
fn require_admin(auth_user: &AuthUser) -> Result<(), StatusCode> {
if auth_user.user.role != UserRole::Admin {

View File

@ -442,7 +442,7 @@ async fn test_connection(
let config: crate::models::WebDAVSourceConfig = serde_json::from_value(source.config)
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
match crate::webdav_service::test_webdav_connection(
match crate::services::webdav_service::test_webdav_connection(
&config.server_url,
&config.username,
&config.password,
@ -464,7 +464,7 @@ async fn test_connection(
let config: crate::models::LocalFolderSourceConfig = serde_json::from_value(source.config)
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
match crate::local_folder_service::LocalFolderService::new(config) {
match crate::services::local_folder_service::LocalFolderService::new(config) {
Ok(service) => {
match service.test_connection().await {
Ok(message) => Ok(Json(serde_json::json!({
@ -488,7 +488,7 @@ async fn test_connection(
let config: crate::models::S3SourceConfig = serde_json::from_value(source.config)
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
match crate::s3_service::S3Service::new(config).await {
match crate::services::s3_service::S3Service::new(config).await {
Ok(service) => {
match service.test_connection().await {
Ok(message) => Ok(Json(serde_json::json!({
@ -610,7 +610,7 @@ async fn estimate_webdav_crawl_internal(
config: &crate::models::WebDAVSourceConfig,
) -> Result<Json<serde_json::Value>, StatusCode> {
// Create WebDAV service config
let webdav_config = crate::webdav_service::WebDAVConfig {
let webdav_config = crate::services::webdav_service::WebDAVConfig {
server_url: config.server_url.clone(),
username: config.username.clone(),
password: config.password.clone(),
@ -621,7 +621,7 @@ async fn estimate_webdav_crawl_internal(
};
// Create WebDAV service and estimate crawl
match crate::webdav_service::WebDAVService::new(webdav_config) {
match crate::services::webdav_service::WebDAVService::new(webdav_config) {
Ok(webdav_service) => {
match webdav_service.estimate_crawl(&config.watch_folders).await {
Ok(estimate) => Ok(Json(serde_json::to_value(estimate).unwrap())),
@ -678,7 +678,7 @@ async fn test_connection_with_config(
let config: crate::models::WebDAVSourceConfig = serde_json::from_value(request.config)
.map_err(|_| StatusCode::BAD_REQUEST)?;
match crate::webdav_service::test_webdav_connection(
match crate::services::webdav_service::test_webdav_connection(
&config.server_url,
&config.username,
&config.password,
@ -700,7 +700,7 @@ async fn test_connection_with_config(
let config: crate::models::LocalFolderSourceConfig = serde_json::from_value(request.config)
.map_err(|_| StatusCode::BAD_REQUEST)?;
match crate::local_folder_service::LocalFolderService::new(config) {
match crate::services::local_folder_service::LocalFolderService::new(config) {
Ok(service) => {
match service.test_connection().await {
Ok(message) => Ok(Json(serde_json::json!({
@ -724,7 +724,7 @@ async fn test_connection_with_config(
let config: crate::models::S3SourceConfig = serde_json::from_value(request.config)
.map_err(|_| StatusCode::BAD_REQUEST)?;
match crate::s3_service::S3Service::new(config).await {
match crate::services::s3_service::S3Service::new(config).await {
Ok(service) => {
match service.test_connection().await {
Ok(message) => Ok(Json(serde_json::json!({

View File

@ -17,8 +17,8 @@ use crate::{
},
AppState,
};
use crate::webdav_service::WebDAVConfig;
use crate::webdav_service::WebDAVService;
use crate::services::webdav_service::WebDAVConfig;
use crate::services::webdav_service::WebDAVService;
pub mod webdav_sync;
use webdav_sync::perform_webdav_sync_with_tracking;

View File

@ -8,9 +8,9 @@ use futures::stream::{FuturesUnordered, StreamExt};
use crate::{
AppState,
models::{CreateWebDAVFile, UpdateWebDAVSyncState},
file_service::FileService,
document_ingestion::{DocumentIngestionService, IngestionResult},
webdav_service::{WebDAVConfig, WebDAVService},
services::file_service::FileService,
ingestion::document_ingestion::{DocumentIngestionService, IngestionResult},
services::webdav_service::{WebDAVConfig, WebDAVService},
};
pub async fn perform_webdav_sync_with_tracking(

4
src/scheduling/mod.rs Normal file
View File

@ -0,0 +1,4 @@
pub mod source_scheduler;
pub mod source_sync;
pub mod webdav_scheduler;
pub mod watcher;

View File

@ -11,8 +11,8 @@ use uuid::Uuid;
use crate::{
AppState,
models::{SourceType, LocalFolderSourceConfig, S3SourceConfig, WebDAVSourceConfig},
source_sync::SourceSyncService,
};
use super::source_sync::SourceSyncService;
pub struct SourceScheduler {
state: Arc<AppState>,

View File

@ -11,11 +11,11 @@ use uuid::Uuid;
use crate::{
AppState,
models::{FileInfo, Source, SourceType, SourceStatus, LocalFolderSourceConfig, S3SourceConfig, WebDAVSourceConfig},
file_service::FileService,
document_ingestion::{DocumentIngestionService, IngestionResult},
local_folder_service::LocalFolderService,
s3_service::S3Service,
webdav_service::{WebDAVService, WebDAVConfig},
services::file_service::FileService,
ingestion::document_ingestion::{DocumentIngestionService, IngestionResult},
services::local_folder_service::LocalFolderService,
services::s3_service::S3Service,
services::webdav_service::{WebDAVService, WebDAVConfig},
};
#[derive(Clone)]

View File

@ -11,9 +11,9 @@ use walkdir::WalkDir;
use crate::{
config::Config,
db::Database,
file_service::FileService,
document_ingestion::{DocumentIngestionService, IngestionResult},
ocr_queue::OcrQueueService
services::file_service::FileService,
ingestion::document_ingestion::{DocumentIngestionService, IngestionResult},
ocr::queue::OcrQueueService
};
pub async fn start_folder_watcher(config: Config, db: Database) -> Result<()> {

View File

@ -5,11 +5,11 @@ use tracing::{error, info, warn};
use crate::{
db::Database,
ocr_queue::OcrQueueService,
file_service::FileService,
ocr::queue::OcrQueueService,
services::file_service::FileService,
AppState,
};
use crate::webdav_service::{WebDAVConfig, WebDAVService};
use crate::services::webdav_service::{WebDAVConfig, WebDAVService};
use crate::routes::webdav::webdav_sync::perform_webdav_sync_with_tracking;
pub struct WebDAVScheduler {

5
src/services/mod.rs Normal file
View File

@ -0,0 +1,5 @@
pub mod file_service;
pub mod local_folder_service;
pub mod s3_service;
pub mod s3_service_stub;
pub mod webdav_service;

View File

@ -1,5 +1,5 @@
#[cfg(test)]
use crate::file_service::FileService;
use crate::services::file_service::FileService;
#[cfg(test)]
use crate::models::Document;
#[cfg(test)]