feat(server): try to resume syncs after server restart

This commit is contained in:
perf3ct 2025-06-16 23:21:43 +00:00
parent 3ef3af6ca8
commit 27c38bf0fe
No known key found for this signature in database
GPG Key ID: 569C4EEC436F5232
4 changed files with 38 additions and 19 deletions

View File

@ -1979,6 +1979,26 @@ impl Database {
Ok(result.rows_affected() as i64)
}
// Reset any running source syncs on startup (handles server restart during sync)
pub async fn reset_running_source_syncs(&self) -> Result<i64> {
let result = sqlx::query(
r#"UPDATE sources
SET status = 'idle',
last_error = CASE
WHEN last_error IS NULL OR last_error = ''
THEN 'Sync interrupted by server restart'
ELSE last_error || '; Sync interrupted by server restart'
END,
last_error_at = NOW(),
updated_at = NOW()
WHERE status = 'syncing'"#
)
.execute(&self.pool)
.await?;
Ok(result.rows_affected() as i64)
}
// WebDAV file tracking operations
pub async fn get_webdav_file_by_path(&self, user_id: Uuid, webdav_path: &str) -> Result<Option<crate::models::WebDAVFile>> {
let row = sqlx::query(

View File

@ -147,6 +147,18 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}
}
// Reset any running universal source syncs from previous server instance
match background_db.reset_running_source_syncs().await {
Ok(count) => {
if count > 0 {
info!("Reset {} orphaned source sync states from server restart", count);
}
}
Err(e) => {
warn!("Failed to reset running source syncs: {}", e);
}
}
// Create shared OCR queue service for both web and background operations
let concurrent_jobs = 15; // Limit concurrent OCR jobs to prevent DB pool exhaustion
let shared_queue_service = Arc::new(readur::ocr_queue::OcrQueueService::new(

View File

@ -76,24 +76,10 @@ impl SourceScheduler {
continue;
}
// Check if auto-sync is enabled for this source
let should_resume = match source.source_type {
SourceType::WebDAV => {
if let Ok(config) = serde_json::from_value::<WebDAVSourceConfig>(source.config.clone()) {
config.auto_sync
} else { false }
}
SourceType::LocalFolder => {
if let Ok(config) = serde_json::from_value::<LocalFolderSourceConfig>(source.config.clone()) {
config.auto_sync
} else { false }
}
SourceType::S3 => {
if let Ok(config) = serde_json::from_value::<S3SourceConfig>(source.config.clone()) {
config.auto_sync
} else { false }
}
};
// Always resume interrupted syncs regardless of auto_sync setting
// This ensures that manually triggered syncs that were interrupted by server restart
// will continue downloading files instead of just starting OCR on existing files
let should_resume = true;
if should_resume {
info!("Resuming interrupted sync for source {}", source.name);

View File

@ -327,7 +327,8 @@ fn extract_extension(filename: &str) -> String {
#[test]
fn test_sync_cancellation_handling() {
// Test sync cancellation logic
use std::sync::{Arc, AtomicBool};
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
let cancellation_token = Arc::new(AtomicBool::new(false));