feat(async): create dedicated pools + runtime isolation for OCR

This commit is contained in:
perf3ct 2025-06-15 16:47:55 +00:00
parent 853c9b7c2e
commit 41774056c7
No known key found for this signature in database
GPG Key ID: 569C4EEC436F5232
3 changed files with 52 additions and 22 deletions

View File

@ -61,7 +61,7 @@ import {
Folder as FolderIcon,
Assessment as AssessmentIcon,
Extension as ExtensionIcon,
Server as ServerIcon,
Storage as ServerIcon,
} from '@mui/icons-material';
import { useNavigate } from 'react-router-dom';
import api from '../services/api';

View File

@ -24,6 +24,18 @@ impl Database {
.await?;
Ok(Self { pool })
}
pub async fn new_with_pool_config(database_url: &str, max_connections: u32, min_connections: u32) -> Result<Self> {
let pool = PgPoolOptions::new()
.max_connections(max_connections)
.acquire_timeout(Duration::from_secs(10))
.idle_timeout(Duration::from_secs(600))
.max_lifetime(Duration::from_secs(1800))
.min_connections(min_connections)
.connect(database_url)
.await?;
Ok(Self { pool })
}
pub fn get_pool(&self) -> &PgPool {
&self.pool

View File

@ -20,7 +20,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt::init();
let config = Config::from_env()?;
let db = Database::new(&config.database_url).await?;
// Create separate database pools for different workloads
let web_db = Database::new_with_pool_config(&config.database_url, 20, 2).await?; // Web UI pool
let background_db = Database::new_with_pool_config(&config.database_url, 30, 3).await?; // Background operations pool
// Don't run the old migration system - let SQLx handle everything
// db.migrate().await?;
@ -110,14 +112,14 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}
}
// Seed admin user
seed::seed_admin_user(&db).await?;
// Seed admin user
seed::seed_admin_user(&background_db).await?;
// Seed system user for watcher
seed::seed_system_user(&db).await?;
seed::seed_system_user(&background_db).await?;
// Reset any running WebDAV syncs from previous server instance
match db.reset_running_webdav_syncs().await {
// Reset any running WebDAV syncs from previous server instance using background DB
match background_db.reset_running_webdav_syncs().await {
Ok(count) => {
if count > 0 {
info!("Reset {} orphaned WebDAV sync states from server restart", count);
@ -128,15 +130,24 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}
}
let state = AppState {
db,
// Create web-facing state with dedicated web DB pool
let web_state = AppState {
db: web_db,
config: config.clone(),
webdav_scheduler: None, // Will be set after creating scheduler
};
let state = Arc::new(state);
let web_state = Arc::new(web_state);
// Create background state with dedicated background DB pool
let background_state = AppState {
db: background_db,
config: config.clone(),
webdav_scheduler: None,
};
let background_state = Arc::new(background_state);
let watcher_config = config.clone();
let watcher_db = state.db.clone();
let watcher_db = background_state.db.clone();
tokio::spawn(async move {
if let Err(e) = readur::watcher::start_folder_watcher(watcher_config, watcher_db).await {
error!("Folder watcher error: {}", e);
@ -156,12 +167,19 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.thread_name("readur-background")
.enable_all()
.build()?;
// Create dedicated runtime for database-heavy operations
let db_runtime = tokio::runtime::Builder::new_multi_thread()
.worker_threads(2) // Dedicated threads for intensive DB operations
.thread_name("readur-db")
.enable_all()
.build()?;
// Start OCR queue worker on dedicated OCR runtime
// Start OCR queue worker on dedicated OCR runtime using background DB pool
let concurrent_jobs = 4; // TODO: Get from config/settings
let queue_service = Arc::new(readur::ocr_queue::OcrQueueService::new(
state.db.clone(),
state.db.get_pool().clone(),
background_state.db.clone(),
background_state.db.get_pool().clone(),
concurrent_jobs
));
@ -191,16 +209,16 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}
});
// Create WebDAV scheduler and update AppState
let webdav_scheduler = Arc::new(readur::webdav_scheduler::WebDAVScheduler::new(state.clone()));
// Create WebDAV scheduler with background state
let webdav_scheduler = Arc::new(readur::webdav_scheduler::WebDAVScheduler::new(background_state.clone()));
// Update the state to include the scheduler
let updated_state = AppState {
db: state.db.clone(),
config: state.config.clone(),
// Update the web state to include the scheduler reference
let updated_web_state = AppState {
db: web_state.db.clone(),
config: web_state.config.clone(),
webdav_scheduler: Some(webdav_scheduler.clone()),
};
let state = Arc::new(updated_state);
let web_state = Arc::new(updated_web_state);
// Start WebDAV background sync scheduler on background runtime
let scheduler_for_background = webdav_scheduler.clone();
@ -229,7 +247,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.merge(readur::swagger::create_swagger_router())
.fallback_service(ServeDir::new("frontend/dist").fallback(ServeFile::new("frontend/dist/index.html")))
.layer(CorsLayer::permissive())
.with_state(state.clone());
.with_state(web_state.clone());
let listener = tokio::net::TcpListener::bind(&config.server_address).await?;
info!("Server starting on {}", config.server_address);