feat(server): reorganize components into their own modules and fix imports

This commit is contained in:
perf3ct 2025-06-27 18:27:42 +00:00
parent eb95b1ca70
commit cdad6477ed
No known key found for this signature in database
GPG Key ID: 569C4EEC436F5232
37 changed files with 112 additions and 105 deletions

View File

@ -4,11 +4,11 @@ use std::path::Path;
use uuid::Uuid; use uuid::Uuid;
use readur::{ use readur::{
batch_ingest::BatchIngester, ingestion::batch_ingest::BatchIngester,
config::Config, config::Config,
db::Database, db::Database,
file_service::FileService, services::file_service::FileService,
ocr_queue::OcrQueueService, ocr::queue::OcrQueueService,
}; };
#[tokio::main] #[tokio::main]

View File

@ -10,9 +10,9 @@ use walkdir::WalkDir;
use crate::{ use crate::{
config::Config, config::Config,
db::Database, db::Database,
file_service::FileService, services::file_service::FileService,
document_ingestion::{DocumentIngestionService, IngestionResult}, ingestion::document_ingestion::{DocumentIngestionService, IngestionResult},
ocr_queue::OcrQueueService, ocr::queue::OcrQueueService,
}; };
pub struct BatchIngester { pub struct BatchIngester {

View File

@ -12,7 +12,7 @@ use tracing::{debug, info, warn};
use crate::models::Document; use crate::models::Document;
use crate::db::Database; use crate::db::Database;
use crate::file_service::FileService; use crate::services::file_service::FileService;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub enum DeduplicationPolicy { pub enum DeduplicationPolicy {

2
src/ingestion/mod.rs Normal file
View File

@ -0,0 +1,2 @@
pub mod batch_ingest;
pub mod document_ingestion;

View File

@ -1,32 +1,17 @@
pub mod auth; pub mod auth;
pub mod batch_ingest;
pub mod config; pub mod config;
pub mod db; pub mod db;
pub mod db_guardrails_simple; pub mod db_guardrails_simple;
pub mod document_ingestion; pub mod ingestion;
pub mod enhanced_ocr;
pub mod error_management;
pub mod file_service;
pub mod local_folder_service;
pub mod models; pub mod models;
pub mod monitoring;
pub mod ocr; pub mod ocr;
pub mod oidc; pub mod oidc;
pub mod ocr_api;
pub mod ocr_enhanced;
pub mod ocr_error;
pub mod ocr_health;
pub mod ocr_queue;
pub mod ocr_tests;
pub mod request_throttler;
pub mod routes; pub mod routes;
pub mod s3_service; pub mod scheduling;
pub mod seed; pub mod seed;
pub mod source_scheduler; pub mod services;
pub mod source_sync;
pub mod swagger; pub mod swagger;
pub mod watcher;
pub mod webdav_service;
pub mod webdav_scheduler;
pub mod webdav_xml_parser; pub mod webdav_xml_parser;
#[cfg(test)] #[cfg(test)]
@ -45,9 +30,9 @@ use oidc::OidcClient;
pub struct AppState { pub struct AppState {
pub db: Database, pub db: Database,
pub config: Config, pub config: Config,
pub webdav_scheduler: Option<std::sync::Arc<webdav_scheduler::WebDAVScheduler>>, pub webdav_scheduler: Option<std::sync::Arc<scheduling::webdav_scheduler::WebDAVScheduler>>,
pub source_scheduler: Option<std::sync::Arc<source_scheduler::SourceScheduler>>, pub source_scheduler: Option<std::sync::Arc<scheduling::source_scheduler::SourceScheduler>>,
pub queue_service: std::sync::Arc<ocr_queue::OcrQueueService>, pub queue_service: std::sync::Arc<ocr::queue::OcrQueueService>,
pub oidc_client: Option<std::sync::Arc<OidcClient>>, pub oidc_client: Option<std::sync::Arc<OidcClient>>,
} }

View File

@ -119,7 +119,7 @@ async fn main() -> anyhow::Result<()> {
// Initialize upload directory structure // Initialize upload directory structure
info!("Initializing upload directory structure..."); info!("Initializing upload directory structure...");
let file_service = readur::file_service::FileService::new(config.upload_path.clone()); let file_service = readur::services::file_service::FileService::new(config.upload_path.clone());
if let Err(e) = file_service.initialize_directory_structure().await { if let Err(e) = file_service.initialize_directory_structure().await {
error!("Failed to initialize directory structure: {}", e); error!("Failed to initialize directory structure: {}", e);
return Err(e.into()); return Err(e.into());
@ -284,7 +284,7 @@ async fn main() -> anyhow::Result<()> {
// Create shared OCR queue service for both web and background operations // Create shared OCR queue service for both web and background operations
let concurrent_jobs = 15; // Limit concurrent OCR jobs to prevent DB pool exhaustion let concurrent_jobs = 15; // Limit concurrent OCR jobs to prevent DB pool exhaustion
let shared_queue_service = Arc::new(readur::ocr_queue::OcrQueueService::new( let shared_queue_service = Arc::new(readur::ocr::queue::OcrQueueService::new(
background_db.clone(), background_db.clone(),
background_db.get_pool().clone(), background_db.get_pool().clone(),
concurrent_jobs concurrent_jobs
@ -333,7 +333,7 @@ async fn main() -> anyhow::Result<()> {
let watcher_config = config.clone(); let watcher_config = config.clone();
let watcher_db = background_state.db.clone(); let watcher_db = background_state.db.clone();
tokio::spawn(async move { tokio::spawn(async move {
if let Err(e) = readur::watcher::start_folder_watcher(watcher_config, watcher_db).await { if let Err(e) = readur::scheduling::watcher::start_folder_watcher(watcher_config, watcher_db).await {
error!("Folder watcher error: {}", e); error!("Folder watcher error: {}", e);
} }
}); });
@ -390,11 +390,11 @@ async fn main() -> anyhow::Result<()> {
println!("\n📅 SCHEDULER INITIALIZATION:"); println!("\n📅 SCHEDULER INITIALIZATION:");
println!("{}", "=".repeat(50)); println!("{}", "=".repeat(50));
let source_scheduler = Arc::new(readur::source_scheduler::SourceScheduler::new(background_state.clone())); let source_scheduler = Arc::new(readur::scheduling::source_scheduler::SourceScheduler::new(background_state.clone()));
println!("✅ Universal source scheduler created (handles WebDAV, Local, S3)"); println!("✅ Universal source scheduler created (handles WebDAV, Local, S3)");
// Keep WebDAV scheduler for backward compatibility with existing WebDAV endpoints // Keep WebDAV scheduler for backward compatibility with existing WebDAV endpoints
let webdav_scheduler = Arc::new(readur::webdav_scheduler::WebDAVScheduler::new(background_state.clone())); let webdav_scheduler = Arc::new(readur::scheduling::webdav_scheduler::WebDAVScheduler::new(background_state.clone()));
println!("✅ Legacy WebDAV scheduler created (backward compatibility)"); println!("✅ Legacy WebDAV scheduler created (backward compatibility)");
// Update the web state to include scheduler references // Update the web state to include scheduler references

View File

@ -5,7 +5,7 @@
* and automatic alerting for potential issues. * and automatic alerting for potential issues.
*/ */
use sqlx::PgPool; use sqlx::{PgPool, Row};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tokio::time::{Duration, interval}; use tokio::time::{Duration, interval};
use tracing::{error, warn, info, debug}; use tracing::{error, warn, info, debug};
@ -166,7 +166,7 @@ impl DatabaseMonitor {
} }
async fn check_ocr_processing_health(&self) -> Result<OcrProcessingHealth> { async fn check_ocr_processing_health(&self) -> Result<OcrProcessingHealth> {
let stats = sqlx::query!( let stats = sqlx::query(
r#" r#"
SELECT SELECT
COUNT(*) FILTER (WHERE ocr_status = 'pending') as pending, COUNT(*) FILTER (WHERE ocr_status = 'pending') as pending,
@ -182,13 +182,13 @@ impl DatabaseMonitor {
.fetch_one(&self.pool) .fetch_one(&self.pool)
.await?; .await?;
let pending = stats.pending.unwrap_or(0) as i32; let pending = stats.get::<Option<i64>, _>("pending").unwrap_or(0) as i32;
let processing = stats.processing.unwrap_or(0) as i32; let processing = stats.get::<Option<i64>, _>("processing").unwrap_or(0) as i32;
let stuck = stats.stuck.unwrap_or(0) as i32; let stuck = stats.get::<Option<i64>, _>("stuck").unwrap_or(0) as i32;
let failed_recent = stats.failed_recent.unwrap_or(0) as i32; let failed_recent = stats.get::<Option<i64>, _>("failed_recent").unwrap_or(0) as i32;
let avg_confidence = stats.avg_confidence; let avg_confidence = stats.get::<Option<f64>, _>("avg_confidence");
let avg_time = stats.avg_time; let avg_time = stats.get::<Option<f64>, _>("avg_time");
let throughput = stats.completed_last_minute.unwrap_or(0) as f64; let throughput = stats.get::<Option<i64>, _>("completed_last_minute").unwrap_or(0) as f64;
let status = if stuck > 0 || failed_recent > 10 { let status = if stuck > 0 || failed_recent > 10 {
HealthStatus::Critical HealthStatus::Critical
@ -211,7 +211,7 @@ impl DatabaseMonitor {
} }
async fn check_queue_health(&self) -> Result<QueueHealth> { async fn check_queue_health(&self) -> Result<QueueHealth> {
let queue_stats = sqlx::query!( let queue_stats = sqlx::query(
r#" r#"
SELECT SELECT
COUNT(*) as total_items, COUNT(*) as total_items,
@ -224,9 +224,9 @@ impl DatabaseMonitor {
.fetch_one(&self.pool) .fetch_one(&self.pool)
.await?; .await?;
let queue_size = queue_stats.total_items.unwrap_or(0) as i32; let queue_size = queue_stats.get::<Option<i64>, _>("total_items").unwrap_or(0) as i32;
let oldest_pending = queue_stats.oldest_pending_minutes.map(|m| m as i32); let oldest_pending = queue_stats.get::<Option<f64>, _>("oldest_pending_minutes").map(|m| m as i32);
let worker_count = queue_stats.active_workers.unwrap_or(0) as i32; let worker_count = queue_stats.get::<Option<i64>, _>("active_workers").unwrap_or(0) as i32;
// Calculate queue growth rate (simplified) // Calculate queue growth rate (simplified)
let growth_rate = 0.0; // Would need historical data for accurate calculation let growth_rate = 0.0; // Would need historical data for accurate calculation
@ -252,14 +252,14 @@ impl DatabaseMonitor {
let start = std::time::Instant::now(); let start = std::time::Instant::now();
// Test pool responsiveness // Test pool responsiveness
sqlx::query!("SELECT 1") sqlx::query("SELECT 1")
.fetch_one(&self.pool) .fetch_one(&self.pool)
.await?; .await?;
let response_time = start.elapsed().as_millis() as u64; let response_time = start.elapsed().as_millis() as u64;
let total_connections = self.pool.size(); let total_connections = self.pool.size() as u32;
let idle_connections = self.pool.num_idle(); let idle_connections = self.pool.num_idle() as u32;
let active_connections = total_connections - idle_connections; let active_connections = total_connections - idle_connections;
let utilization = if total_connections > 0 { let utilization = if total_connections > 0 {
(active_connections as f64 / total_connections as f64 * 100.0) as u8 (active_connections as f64 / total_connections as f64 * 100.0) as u8
@ -286,7 +286,7 @@ impl DatabaseMonitor {
} }
async fn check_data_consistency(&self) -> Result<ConsistencyHealth> { async fn check_data_consistency(&self) -> Result<ConsistencyHealth> {
let consistency_check = sqlx::query!( let consistency_check = sqlx::query(
r#" r#"
SELECT SELECT
-- Orphaned queue items -- Orphaned queue items
@ -306,9 +306,9 @@ impl DatabaseMonitor {
.fetch_one(&self.pool) .fetch_one(&self.pool)
.await?; .await?;
let orphaned = consistency_check.orphaned_queue.unwrap_or(0) as i32; let orphaned = consistency_check.get::<Option<i64>, _>("orphaned_queue").unwrap_or(0) as i32;
let missing_files = consistency_check.missing_files.unwrap_or(0) as i32; let missing_files = consistency_check.get::<Option<i32>, _>("missing_files").unwrap_or(0) as i32;
let inconsistent = consistency_check.inconsistent_states.unwrap_or(0) as i32; let inconsistent = consistency_check.get::<Option<i64>, _>("inconsistent_states").unwrap_or(0) as i32;
let total_issues = orphaned + missing_files + inconsistent; let total_issues = orphaned + missing_files + inconsistent;
let integrity_score = if total_issues == 0 { 100.0 } else { 100.0 - (total_issues as f64 * 10.0).min(100.0) }; let integrity_score = if total_issues == 0 { 100.0 } else { 100.0 - (total_issues as f64 * 10.0).min(100.0) };
@ -430,18 +430,18 @@ impl DatabaseMonitor {
} }
async fn reset_stuck_jobs(&self) -> Result<i32> { async fn reset_stuck_jobs(&self) -> Result<i32> {
let result = sqlx::query!( let result = sqlx::query(
"SELECT reset_stuck_ocr_jobs($1) as reset_count", "SELECT reset_stuck_ocr_jobs($1) as reset_count"
self.config.stuck_job_threshold_minutes
) )
.bind(self.config.stuck_job_threshold_minutes)
.fetch_one(&self.pool) .fetch_one(&self.pool)
.await?; .await?;
Ok(result.reset_count.unwrap_or(0)) Ok(result.get::<Option<i32>, _>("reset_count").unwrap_or(0))
} }
async fn cleanup_orphaned_items(&self) -> Result<i32> { async fn cleanup_orphaned_items(&self) -> Result<i32> {
let result = sqlx::query!( let result = sqlx::query(
r#" r#"
DELETE FROM ocr_queue DELETE FROM ocr_queue
WHERE document_id NOT IN (SELECT id FROM documents) WHERE document_id NOT IN (SELECT id FROM documents)
@ -464,7 +464,7 @@ impl DatabaseMonitor {
let cleanup_count = self.cleanup_orphaned_items().await?; let cleanup_count = self.cleanup_orphaned_items().await?;
// Refresh OCR stats // Refresh OCR stats
sqlx::query!("SELECT refresh_ocr_stats()") sqlx::query("SELECT refresh_ocr_stats()")
.execute(&self.pool) .execute(&self.pool)
.await?; .await?;

3
src/monitoring/mod.rs Normal file
View File

@ -0,0 +1,3 @@
pub mod db_monitoring;
pub mod error_management;
pub mod request_throttler;

View File

@ -1,5 +1,5 @@
use crate::ocr_enhanced::EnhancedOcrService; use crate::ocr::enhanced_processing::EnhancedOcrService;
use crate::ocr_error::OcrError; use crate::ocr::error::OcrError;
use crate::AppState; use crate::AppState;
use axum::{ use axum::{
extract::State, extract::State,
@ -38,7 +38,7 @@ pub async fn health_check(
let service = EnhancedOcrService::new(); let service = EnhancedOcrService::new();
let diagnostics = service.get_diagnostics().await; let diagnostics = service.get_diagnostics().await;
let health_checker = crate::ocr_health::OcrHealthChecker::new(); let health_checker = crate::ocr::health::OcrHealthChecker::new();
match health_checker.perform_full_health_check() { match health_checker.perform_full_health_check() {
Ok(diag) => { Ok(diag) => {

View File

@ -15,7 +15,7 @@ use imageproc::{
use tesseract::{Tesseract, PageSegMode, OcrEngineMode}; use tesseract::{Tesseract, PageSegMode, OcrEngineMode};
use crate::models::Settings; use crate::models::Settings;
use crate::file_service::FileService; use crate::services::file_service::FileService;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct ImageQualityStats { pub struct ImageQualityStats {

View File

@ -1,5 +1,5 @@
use crate::ocr_error::OcrError; use crate::ocr::error::OcrError;
use crate::ocr_health::OcrHealthChecker; use crate::ocr::health::OcrHealthChecker;
use anyhow::{anyhow, Result}; use anyhow::{anyhow, Result};
use image::DynamicImage; use image::DynamicImage;
use std::path::Path; use std::path::Path;

View File

@ -1,4 +1,4 @@
use crate::ocr_error::{CpuFeatures, OcrDiagnostics, OcrError}; use crate::ocr::error::{CpuFeatures, OcrDiagnostics, OcrError};
use std::process::Command; use std::process::Command;
use std::env; use std::env;
use std::path::Path; use std::path::Path;

View File

@ -1,8 +1,16 @@
pub mod api;
pub mod enhanced;
pub mod enhanced_processing;
pub mod error;
pub mod health;
pub mod queue;
pub mod tests;
use anyhow::{anyhow, Result}; use anyhow::{anyhow, Result};
use std::path::Path; use std::path::Path;
use std::panic::{catch_unwind, AssertUnwindSafe}; use std::panic::{catch_unwind, AssertUnwindSafe};
use crate::ocr_error::OcrError; use crate::ocr::error::OcrError;
use crate::ocr_health::OcrHealthChecker; use crate::ocr::health::OcrHealthChecker;
#[cfg(feature = "ocr")] #[cfg(feature = "ocr")]
use tesseract::Tesseract; use tesseract::Tesseract;

View File

@ -9,7 +9,7 @@ use tokio::time::{sleep, Duration};
use tracing::{error, info, warn}; use tracing::{error, info, warn};
use uuid::Uuid; use uuid::Uuid;
use crate::{db::Database, enhanced_ocr::EnhancedOcrService, db_guardrails_simple::DocumentTransactionManager, request_throttler::RequestThrottler}; use crate::{db::Database, ocr::enhanced::EnhancedOcrService, db_guardrails_simple::DocumentTransactionManager, monitoring::request_throttler::RequestThrottler};
#[derive(Debug, Clone, Serialize, Deserialize, FromRow)] #[derive(Debug, Clone, Serialize, Deserialize, FromRow)]
pub struct OcrQueueItem { pub struct OcrQueueItem {
@ -557,7 +557,7 @@ impl OcrQueueService {
use std::path::Path; use std::path::Path;
// Use the FileService to get the proper processed images directory // Use the FileService to get the proper processed images directory
use crate::file_service::FileService; use crate::services::file_service::FileService;
let base_upload_dir = std::env::var("UPLOAD_PATH").unwrap_or_else(|_| "uploads".to_string()); let base_upload_dir = std::env::var("UPLOAD_PATH").unwrap_or_else(|_| "uploads".to_string());
let file_service = FileService::new(base_upload_dir); let file_service = FileService::new(base_upload_dir);
let processed_images_dir = file_service.get_processed_images_path(); let processed_images_dir = file_service.get_processed_images_path();

View File

@ -1,9 +1,9 @@
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::super::*; use super::super::*;
use crate::ocr_error::{OcrError, OcrDiagnostics, CpuFeatures}; use crate::ocr::error::{OcrError, OcrDiagnostics, CpuFeatures};
use crate::ocr_health::OcrHealthChecker; use crate::ocr::health::OcrHealthChecker;
use crate::ocr_enhanced::EnhancedOcrService; use crate::ocr::enhanced_processing::EnhancedOcrService;
use std::env; use std::env;
use tempfile::TempDir; use tempfile::TempDir;
use std::fs; use std::fs;

View File

@ -12,8 +12,8 @@ use sqlx::Row;
use crate::{ use crate::{
auth::AuthUser, auth::AuthUser,
document_ingestion::{DocumentIngestionService, IngestionResult}, ingestion::document_ingestion::{DocumentIngestionService, IngestionResult},
file_service::FileService, services::file_service::FileService,
models::DocumentResponse, models::DocumentResponse,
AppState, AppState,
}; };

View File

@ -134,7 +134,7 @@ async fn collect_database_metrics(state: &Arc<AppState>) -> Result<DatabaseMetri
async fn collect_ocr_metrics(state: &Arc<AppState>) -> Result<OcrMetrics, StatusCode> { async fn collect_ocr_metrics(state: &Arc<AppState>) -> Result<OcrMetrics, StatusCode> {
// Use existing OCR queue statistics // Use existing OCR queue statistics
use crate::ocr_queue::OcrQueueService; use crate::ocr::queue::OcrQueueService;
let queue_service = OcrQueueService::new( let queue_service = OcrQueueService::new(
state.db.clone(), state.db.clone(),

View File

@ -288,7 +288,7 @@ async fn collect_document_metrics(state: &Arc<AppState>) -> Result<DocumentMetri
} }
async fn collect_ocr_metrics(state: &Arc<AppState>) -> Result<OcrMetrics, StatusCode> { async fn collect_ocr_metrics(state: &Arc<AppState>) -> Result<OcrMetrics, StatusCode> {
use crate::ocr_queue::OcrQueueService; use crate::ocr::queue::OcrQueueService;
let queue_service = OcrQueueService::new( let queue_service = OcrQueueService::new(
state.db.clone(), state.db.clone(),

View File

@ -7,7 +7,7 @@ use axum::{
}; };
use std::sync::Arc; use std::sync::Arc;
use crate::{auth::AuthUser, ocr_queue::OcrQueueService, AppState, models::UserRole}; use crate::{auth::AuthUser, ocr::queue::OcrQueueService, AppState, models::UserRole};
fn require_admin(auth_user: &AuthUser) -> Result<(), StatusCode> { fn require_admin(auth_user: &AuthUser) -> Result<(), StatusCode> {
if auth_user.user.role != UserRole::Admin { if auth_user.user.role != UserRole::Admin {

View File

@ -442,7 +442,7 @@ async fn test_connection(
let config: crate::models::WebDAVSourceConfig = serde_json::from_value(source.config) let config: crate::models::WebDAVSourceConfig = serde_json::from_value(source.config)
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
match crate::webdav_service::test_webdav_connection( match crate::services::webdav_service::test_webdav_connection(
&config.server_url, &config.server_url,
&config.username, &config.username,
&config.password, &config.password,
@ -464,7 +464,7 @@ async fn test_connection(
let config: crate::models::LocalFolderSourceConfig = serde_json::from_value(source.config) let config: crate::models::LocalFolderSourceConfig = serde_json::from_value(source.config)
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
match crate::local_folder_service::LocalFolderService::new(config) { match crate::services::local_folder_service::LocalFolderService::new(config) {
Ok(service) => { Ok(service) => {
match service.test_connection().await { match service.test_connection().await {
Ok(message) => Ok(Json(serde_json::json!({ Ok(message) => Ok(Json(serde_json::json!({
@ -488,7 +488,7 @@ async fn test_connection(
let config: crate::models::S3SourceConfig = serde_json::from_value(source.config) let config: crate::models::S3SourceConfig = serde_json::from_value(source.config)
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
match crate::s3_service::S3Service::new(config).await { match crate::services::s3_service::S3Service::new(config).await {
Ok(service) => { Ok(service) => {
match service.test_connection().await { match service.test_connection().await {
Ok(message) => Ok(Json(serde_json::json!({ Ok(message) => Ok(Json(serde_json::json!({
@ -610,7 +610,7 @@ async fn estimate_webdav_crawl_internal(
config: &crate::models::WebDAVSourceConfig, config: &crate::models::WebDAVSourceConfig,
) -> Result<Json<serde_json::Value>, StatusCode> { ) -> Result<Json<serde_json::Value>, StatusCode> {
// Create WebDAV service config // Create WebDAV service config
let webdav_config = crate::webdav_service::WebDAVConfig { let webdav_config = crate::services::webdav_service::WebDAVConfig {
server_url: config.server_url.clone(), server_url: config.server_url.clone(),
username: config.username.clone(), username: config.username.clone(),
password: config.password.clone(), password: config.password.clone(),
@ -621,7 +621,7 @@ async fn estimate_webdav_crawl_internal(
}; };
// Create WebDAV service and estimate crawl // Create WebDAV service and estimate crawl
match crate::webdav_service::WebDAVService::new(webdav_config) { match crate::services::webdav_service::WebDAVService::new(webdav_config) {
Ok(webdav_service) => { Ok(webdav_service) => {
match webdav_service.estimate_crawl(&config.watch_folders).await { match webdav_service.estimate_crawl(&config.watch_folders).await {
Ok(estimate) => Ok(Json(serde_json::to_value(estimate).unwrap())), Ok(estimate) => Ok(Json(serde_json::to_value(estimate).unwrap())),
@ -678,7 +678,7 @@ async fn test_connection_with_config(
let config: crate::models::WebDAVSourceConfig = serde_json::from_value(request.config) let config: crate::models::WebDAVSourceConfig = serde_json::from_value(request.config)
.map_err(|_| StatusCode::BAD_REQUEST)?; .map_err(|_| StatusCode::BAD_REQUEST)?;
match crate::webdav_service::test_webdav_connection( match crate::services::webdav_service::test_webdav_connection(
&config.server_url, &config.server_url,
&config.username, &config.username,
&config.password, &config.password,
@ -700,7 +700,7 @@ async fn test_connection_with_config(
let config: crate::models::LocalFolderSourceConfig = serde_json::from_value(request.config) let config: crate::models::LocalFolderSourceConfig = serde_json::from_value(request.config)
.map_err(|_| StatusCode::BAD_REQUEST)?; .map_err(|_| StatusCode::BAD_REQUEST)?;
match crate::local_folder_service::LocalFolderService::new(config) { match crate::services::local_folder_service::LocalFolderService::new(config) {
Ok(service) => { Ok(service) => {
match service.test_connection().await { match service.test_connection().await {
Ok(message) => Ok(Json(serde_json::json!({ Ok(message) => Ok(Json(serde_json::json!({
@ -724,7 +724,7 @@ async fn test_connection_with_config(
let config: crate::models::S3SourceConfig = serde_json::from_value(request.config) let config: crate::models::S3SourceConfig = serde_json::from_value(request.config)
.map_err(|_| StatusCode::BAD_REQUEST)?; .map_err(|_| StatusCode::BAD_REQUEST)?;
match crate::s3_service::S3Service::new(config).await { match crate::services::s3_service::S3Service::new(config).await {
Ok(service) => { Ok(service) => {
match service.test_connection().await { match service.test_connection().await {
Ok(message) => Ok(Json(serde_json::json!({ Ok(message) => Ok(Json(serde_json::json!({

View File

@ -17,8 +17,8 @@ use crate::{
}, },
AppState, AppState,
}; };
use crate::webdav_service::WebDAVConfig; use crate::services::webdav_service::WebDAVConfig;
use crate::webdav_service::WebDAVService; use crate::services::webdav_service::WebDAVService;
pub mod webdav_sync; pub mod webdav_sync;
use webdav_sync::perform_webdav_sync_with_tracking; use webdav_sync::perform_webdav_sync_with_tracking;

View File

@ -8,9 +8,9 @@ use futures::stream::{FuturesUnordered, StreamExt};
use crate::{ use crate::{
AppState, AppState,
models::{CreateWebDAVFile, UpdateWebDAVSyncState}, models::{CreateWebDAVFile, UpdateWebDAVSyncState},
file_service::FileService, services::file_service::FileService,
document_ingestion::{DocumentIngestionService, IngestionResult}, ingestion::document_ingestion::{DocumentIngestionService, IngestionResult},
webdav_service::{WebDAVConfig, WebDAVService}, services::webdav_service::{WebDAVConfig, WebDAVService},
}; };
pub async fn perform_webdav_sync_with_tracking( pub async fn perform_webdav_sync_with_tracking(

4
src/scheduling/mod.rs Normal file
View File

@ -0,0 +1,4 @@
pub mod source_scheduler;
pub mod source_sync;
pub mod webdav_scheduler;
pub mod watcher;

View File

@ -11,8 +11,8 @@ use uuid::Uuid;
use crate::{ use crate::{
AppState, AppState,
models::{SourceType, LocalFolderSourceConfig, S3SourceConfig, WebDAVSourceConfig}, models::{SourceType, LocalFolderSourceConfig, S3SourceConfig, WebDAVSourceConfig},
source_sync::SourceSyncService,
}; };
use super::source_sync::SourceSyncService;
pub struct SourceScheduler { pub struct SourceScheduler {
state: Arc<AppState>, state: Arc<AppState>,

View File

@ -11,11 +11,11 @@ use uuid::Uuid;
use crate::{ use crate::{
AppState, AppState,
models::{FileInfo, Source, SourceType, SourceStatus, LocalFolderSourceConfig, S3SourceConfig, WebDAVSourceConfig}, models::{FileInfo, Source, SourceType, SourceStatus, LocalFolderSourceConfig, S3SourceConfig, WebDAVSourceConfig},
file_service::FileService, services::file_service::FileService,
document_ingestion::{DocumentIngestionService, IngestionResult}, ingestion::document_ingestion::{DocumentIngestionService, IngestionResult},
local_folder_service::LocalFolderService, services::local_folder_service::LocalFolderService,
s3_service::S3Service, services::s3_service::S3Service,
webdav_service::{WebDAVService, WebDAVConfig}, services::webdav_service::{WebDAVService, WebDAVConfig},
}; };
#[derive(Clone)] #[derive(Clone)]

View File

@ -11,9 +11,9 @@ use walkdir::WalkDir;
use crate::{ use crate::{
config::Config, config::Config,
db::Database, db::Database,
file_service::FileService, services::file_service::FileService,
document_ingestion::{DocumentIngestionService, IngestionResult}, ingestion::document_ingestion::{DocumentIngestionService, IngestionResult},
ocr_queue::OcrQueueService ocr::queue::OcrQueueService
}; };
pub async fn start_folder_watcher(config: Config, db: Database) -> Result<()> { pub async fn start_folder_watcher(config: Config, db: Database) -> Result<()> {

View File

@ -5,11 +5,11 @@ use tracing::{error, info, warn};
use crate::{ use crate::{
db::Database, db::Database,
ocr_queue::OcrQueueService, ocr::queue::OcrQueueService,
file_service::FileService, services::file_service::FileService,
AppState, AppState,
}; };
use crate::webdav_service::{WebDAVConfig, WebDAVService}; use crate::services::webdav_service::{WebDAVConfig, WebDAVService};
use crate::routes::webdav::webdav_sync::perform_webdav_sync_with_tracking; use crate::routes::webdav::webdav_sync::perform_webdav_sync_with_tracking;
pub struct WebDAVScheduler { pub struct WebDAVScheduler {

5
src/services/mod.rs Normal file
View File

@ -0,0 +1,5 @@
pub mod file_service;
pub mod local_folder_service;
pub mod s3_service;
pub mod s3_service_stub;
pub mod webdav_service;

View File

@ -1,5 +1,5 @@
#[cfg(test)] #[cfg(test)]
use crate::file_service::FileService; use crate::services::file_service::FileService;
#[cfg(test)] #[cfg(test)]
use crate::models::Document; use crate::models::Document;
#[cfg(test)] #[cfg(test)]