feat(server): try to resume syncs after server restart
This commit is contained in:
parent
6d462df990
commit
54868cdc57
20
src/db.rs
20
src/db.rs
|
|
@ -1979,6 +1979,26 @@ impl Database {
|
||||||
Ok(result.rows_affected() as i64)
|
Ok(result.rows_affected() as i64)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Reset any running source syncs on startup (handles server restart during sync)
|
||||||
|
pub async fn reset_running_source_syncs(&self) -> Result<i64> {
|
||||||
|
let result = sqlx::query(
|
||||||
|
r#"UPDATE sources
|
||||||
|
SET status = 'idle',
|
||||||
|
last_error = CASE
|
||||||
|
WHEN last_error IS NULL OR last_error = ''
|
||||||
|
THEN 'Sync interrupted by server restart'
|
||||||
|
ELSE last_error || '; Sync interrupted by server restart'
|
||||||
|
END,
|
||||||
|
last_error_at = NOW(),
|
||||||
|
updated_at = NOW()
|
||||||
|
WHERE status = 'syncing'"#
|
||||||
|
)
|
||||||
|
.execute(&self.pool)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(result.rows_affected() as i64)
|
||||||
|
}
|
||||||
|
|
||||||
// WebDAV file tracking operations
|
// WebDAV file tracking operations
|
||||||
pub async fn get_webdav_file_by_path(&self, user_id: Uuid, webdav_path: &str) -> Result<Option<crate::models::WebDAVFile>> {
|
pub async fn get_webdav_file_by_path(&self, user_id: Uuid, webdav_path: &str) -> Result<Option<crate::models::WebDAVFile>> {
|
||||||
let row = sqlx::query(
|
let row = sqlx::query(
|
||||||
|
|
|
||||||
12
src/main.rs
12
src/main.rs
|
|
@ -147,6 +147,18 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Reset any running universal source syncs from previous server instance
|
||||||
|
match background_db.reset_running_source_syncs().await {
|
||||||
|
Ok(count) => {
|
||||||
|
if count > 0 {
|
||||||
|
info!("Reset {} orphaned source sync states from server restart", count);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
warn!("Failed to reset running source syncs: {}", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Create shared OCR queue service for both web and background operations
|
// Create shared OCR queue service for both web and background operations
|
||||||
let concurrent_jobs = 15; // Limit concurrent OCR jobs to prevent DB pool exhaustion
|
let concurrent_jobs = 15; // Limit concurrent OCR jobs to prevent DB pool exhaustion
|
||||||
let shared_queue_service = Arc::new(readur::ocr_queue::OcrQueueService::new(
|
let shared_queue_service = Arc::new(readur::ocr_queue::OcrQueueService::new(
|
||||||
|
|
|
||||||
|
|
@ -76,24 +76,10 @@ impl SourceScheduler {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check if auto-sync is enabled for this source
|
// Always resume interrupted syncs regardless of auto_sync setting
|
||||||
let should_resume = match source.source_type {
|
// This ensures that manually triggered syncs that were interrupted by server restart
|
||||||
SourceType::WebDAV => {
|
// will continue downloading files instead of just starting OCR on existing files
|
||||||
if let Ok(config) = serde_json::from_value::<WebDAVSourceConfig>(source.config.clone()) {
|
let should_resume = true;
|
||||||
config.auto_sync
|
|
||||||
} else { false }
|
|
||||||
}
|
|
||||||
SourceType::LocalFolder => {
|
|
||||||
if let Ok(config) = serde_json::from_value::<LocalFolderSourceConfig>(source.config.clone()) {
|
|
||||||
config.auto_sync
|
|
||||||
} else { false }
|
|
||||||
}
|
|
||||||
SourceType::S3 => {
|
|
||||||
if let Ok(config) = serde_json::from_value::<S3SourceConfig>(source.config.clone()) {
|
|
||||||
config.auto_sync
|
|
||||||
} else { false }
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
if should_resume {
|
if should_resume {
|
||||||
info!("Resuming interrupted sync for source {}", source.name);
|
info!("Resuming interrupted sync for source {}", source.name);
|
||||||
|
|
|
||||||
|
|
@ -327,7 +327,8 @@ fn extract_extension(filename: &str) -> String {
|
||||||
#[test]
|
#[test]
|
||||||
fn test_sync_cancellation_handling() {
|
fn test_sync_cancellation_handling() {
|
||||||
// Test sync cancellation logic
|
// Test sync cancellation logic
|
||||||
use std::sync::{Arc, AtomicBool};
|
use std::sync::Arc;
|
||||||
|
use std::sync::atomic::AtomicBool;
|
||||||
use std::sync::atomic::Ordering;
|
use std::sync::atomic::Ordering;
|
||||||
|
|
||||||
let cancellation_token = Arc::new(AtomicBool::new(false));
|
let cancellation_token = Arc::new(AtomicBool::new(false));
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue