From 69c40c10fac4254085338faae5ae00b7f0a79052 Mon Sep 17 00:00:00 2001 From: perf3ct Date: Thu, 3 Jul 2025 04:45:25 +0000 Subject: [PATCH] feat(webdav): gracefully recover webdav from stops/crashes --- ...50703000001_add_scan_progress_tracking.sql | 17 + src/db/webdav.rs | 78 +++++ src/services/webdav_service.rs | 308 ++++++++++++++++-- 3 files changed, 377 insertions(+), 26 deletions(-) create mode 100644 migrations/20250703000001_add_scan_progress_tracking.sql diff --git a/migrations/20250703000001_add_scan_progress_tracking.sql b/migrations/20250703000001_add_scan_progress_tracking.sql new file mode 100644 index 0000000..1cfff29 --- /dev/null +++ b/migrations/20250703000001_add_scan_progress_tracking.sql @@ -0,0 +1,17 @@ +-- Add scan progress tracking for crash recovery +-- This allows resuming interrupted scans after server restarts + +ALTER TABLE webdav_directories +ADD COLUMN IF NOT EXISTS scan_in_progress BOOLEAN DEFAULT FALSE, +ADD COLUMN IF NOT EXISTS scan_started_at TIMESTAMPTZ, +ADD COLUMN IF NOT EXISTS scan_error TEXT; + +-- Create index for finding incomplete scans +CREATE INDEX IF NOT EXISTS idx_webdav_directories_scan_progress +ON webdav_directories(user_id, scan_in_progress) +WHERE scan_in_progress = TRUE; + +-- Create index for finding scans that have been running too long (possible crashes) +CREATE INDEX IF NOT EXISTS idx_webdav_directories_stale_scans +ON webdav_directories(scan_started_at) +WHERE scan_in_progress = TRUE; \ No newline at end of file diff --git a/src/db/webdav.rs b/src/db/webdav.rs index 069edc9..f2317e2 100644 --- a/src/db/webdav.rs +++ b/src/db/webdav.rs @@ -351,4 +351,82 @@ impl Database { Ok(result.rows_affected() as i64) } + + /// Find directories with incomplete scans that need recovery + pub async fn get_incomplete_webdav_scans(&self, user_id: Uuid) -> Result> { + let rows = sqlx::query( + r#"SELECT directory_path FROM webdav_directories + WHERE user_id = $1 AND scan_in_progress = TRUE + ORDER BY scan_started_at ASC"# + ) + .bind(user_id) + .fetch_all(&self.pool) + .await?; + + Ok(rows.into_iter().map(|row| row.get("directory_path")).collect()) + } + + /// Find scans that have been running too long (possible crashes) + pub async fn get_stale_webdav_scans(&self, user_id: Uuid, timeout_minutes: i64) -> Result> { + let rows = sqlx::query( + r#"SELECT directory_path FROM webdav_directories + WHERE user_id = $1 AND scan_in_progress = TRUE + AND scan_started_at < NOW() - INTERVAL '1 minute' * $2 + ORDER BY scan_started_at ASC"# + ) + .bind(user_id) + .bind(timeout_minutes) + .fetch_all(&self.pool) + .await?; + + Ok(rows.into_iter().map(|row| row.get("directory_path")).collect()) + } + + /// Mark a directory scan as in progress + pub async fn mark_webdav_scan_in_progress(&self, user_id: Uuid, directory_path: &str) -> Result<()> { + sqlx::query( + r#"INSERT INTO webdav_directories (user_id, directory_path, directory_etag, scan_in_progress, scan_started_at, last_scanned_at) + VALUES ($1, $2, '', TRUE, NOW(), NOW()) + ON CONFLICT (user_id, directory_path) + DO UPDATE SET scan_in_progress = TRUE, scan_started_at = NOW(), scan_error = NULL"# + ) + .bind(user_id) + .bind(directory_path) + .execute(&self.pool) + .await?; + + Ok(()) + } + + /// Mark a directory scan as complete + pub async fn mark_webdav_scan_complete(&self, user_id: Uuid, directory_path: &str) -> Result<()> { + sqlx::query( + r#"UPDATE webdav_directories + SET scan_in_progress = FALSE, scan_started_at = NULL, scan_error = NULL, + last_scanned_at = NOW(), updated_at = NOW() + WHERE user_id = $1 AND directory_path = $2"# + ) + .bind(user_id) + .bind(directory_path) + .execute(&self.pool) + .await?; + + Ok(()) + } + + /// Mark a directory scan as failed with error + pub async fn mark_webdav_scan_failed(&self, user_id: Uuid, directory_path: &str, error: &str) -> Result<()> { + sqlx::query( + r#"UPDATE webdav_directories + SET scan_in_progress = FALSE, scan_error = $3, updated_at = NOW() + WHERE user_id = $1 AND directory_path = $2"# + ) + .bind(user_id) + .bind(directory_path) + .bind(error) + .execute(&self.pool) + .await?; + + Ok(()) + } } \ No newline at end of file diff --git a/src/services/webdav_service.rs b/src/services/webdav_service.rs index 96bc33e..2b5f55d 100644 --- a/src/services/webdav_service.rs +++ b/src/services/webdav_service.rs @@ -498,8 +498,34 @@ impl WebDAVService { /// Optimized discovery that checks directory ETag first to avoid unnecessary deep scans pub async fn discover_files_in_folder_optimized(&self, folder_path: &str, user_id: uuid::Uuid, state: &crate::AppState) -> Result> { + self.discover_files_in_folder_optimized_with_recovery(folder_path, user_id, state, true).await + } + + async fn discover_files_in_folder_optimized_with_recovery(&self, folder_path: &str, user_id: uuid::Uuid, state: &crate::AppState, enable_crash_recovery: bool) -> Result> { debug!("🔍 Starting optimized discovery for folder: {}", folder_path); + // Check for incomplete scans that need recovery + if enable_crash_recovery { + if let Ok(incomplete_scans) = self.detect_incomplete_scans(user_id, state).await { + if !incomplete_scans.is_empty() { + info!("🔄 Detected {} incomplete scans from previous session, resuming...", incomplete_scans.len()); + for incomplete_path in incomplete_scans { + if incomplete_path.starts_with(folder_path) { + info!("🔄 Resuming incomplete scan for: {}", incomplete_path); + match self.resume_deep_scan_internal(&incomplete_path, user_id, state).await { + Ok(resumed_files) => { + info!("✅ Successfully resumed scan for {}: {} files found", incomplete_path, resumed_files.len()); + } + Err(e) => { + warn!("⚠️ Failed to resume scan for {}: {}", incomplete_path, e); + } + } + } + } + } + } + } + // Check if we should use smart scanning let use_smart_scan = match self.config.server_type.as_deref() { Some("nextcloud") | Some("owncloud") => { @@ -523,8 +549,8 @@ impl WebDAVService { } }; - // Use smart scanning with depth-1 traversal - return self.smart_directory_scan(folder_path, stored_etag.as_deref(), user_id, state).await; + // Use smart scanning with depth-1 traversal and checkpoint recovery + return self.smart_directory_scan_with_checkpoints(folder_path, stored_etag.as_deref(), user_id, state).await; } // Fall back to traditional optimization for other servers @@ -730,8 +756,11 @@ impl WebDAVService { for path in paths_to_scan { debug!("🔍 Targeted scan of: {}", path); + // Convert to relative path for API calls + let relative_path = self.convert_to_relative_path(path); + // Check if this specific path has changed - match self.check_directory_etag(path).await { + match self.check_directory_etag(&relative_path).await { Ok(current_etag) => { // Check cached ETag let needs_scan = match state.db.get_webdav_directory(user_id, path).await { @@ -756,7 +785,7 @@ impl WebDAVService { if needs_scan { // Use shallow scan for this specific directory only - match self.discover_files_in_folder_shallow(path).await { + match self.discover_files_in_folder_shallow(&relative_path).await { Ok(mut path_files) => { debug!("📂 Found {} files in changed path {}", path_files.len(), path); all_files.append(&mut path_files); @@ -1244,8 +1273,11 @@ impl WebDAVService { // Find a directory with subdirectories from our watch folders for watch_folder in &self.config.watch_folders { + // Convert to relative path for API calls + let relative_watch_folder = self.convert_to_relative_path(watch_folder); + // Get the directory structure with depth 1 - match self.discover_files_in_folder_shallow(watch_folder).await { + match self.discover_files_in_folder_shallow(&relative_watch_folder).await { Ok(entries) => { // Find a subdirectory to test with let subdirs: Vec<_> = entries.iter() @@ -1261,10 +1293,11 @@ impl WebDAVService { debug!("Testing with directory: {} and subdirectory: {}", watch_folder, test_subdir.path); // Step 1: Get parent directory ETag - let parent_etag = self.check_directory_etag(watch_folder).await?; + let parent_etag = self.check_directory_etag(&relative_watch_folder).await?; - // Step 2: Get subdirectory ETag - let subdir_etag = self.check_directory_etag(&test_subdir.path).await?; + // Step 2: Get subdirectory ETag (convert to relative path) + let relative_subdir_path = self.convert_to_relative_path(&test_subdir.path); + let subdir_etag = self.check_directory_etag(&relative_subdir_path).await?; // Step 3: Check if parent has a different ETag than child // In a recursive ETag system, they should be different but related @@ -1385,7 +1418,72 @@ impl WebDAVService { all_files.push(entry); } - // Update tracking for this directory + // Note: We'll update the directory tracking at the end after processing all subdirectories + // to avoid ETag race conditions during the scan + + // Step 4: Process subdirectories concurrently with controlled parallelism + if !subdirs_to_scan.is_empty() { + let semaphore = std::sync::Arc::new(Semaphore::new(self.concurrency_config.max_concurrent_scans)); + let subdirs_stream = stream::iter(subdirs_to_scan) + .map(|subdir| { + let semaphore = semaphore.clone(); + let service = self.clone(); + async move { + let _permit = semaphore.acquire().await.map_err(|e| anyhow!("Semaphore error: {}", e))?; + + // Get stored ETag for this subdirectory + let stored_etag = match state.db.get_webdav_directory(user_id, &subdir.path).await { + Ok(Some(dir)) => Some(dir.directory_etag), + Ok(None) => { + debug!("🆕 New subdirectory discovered: {}", subdir.path); + None + } + Err(e) => { + warn!("Database error checking subdirectory {}: {}", subdir.path, e); + None + } + }; + + // If ETag changed or new directory, scan it recursively + if stored_etag.as_deref() != Some(&subdir.etag) { + debug!("🔄 Subdirectory {} needs scanning (old: {:?}, new: {})", + subdir.path, stored_etag, subdir.etag); + + match service.smart_directory_scan_internal(&subdir.path, stored_etag.as_deref(), user_id, state).await { + Ok(subdir_files) => { + debug!("📂 Found {} entries in subdirectory {}", subdir_files.len(), subdir.path); + Result::, anyhow::Error>::Ok(subdir_files) + } + Err(e) => { + error!("Failed to scan subdirectory {}: {}", subdir.path, e); + Result::, anyhow::Error>::Ok(Vec::new()) // Continue with other subdirectories + } + } + } else { + debug!("✅ Subdirectory {} unchanged (ETag: {})", subdir.path, subdir.etag); + // Don't update database during scan - will be handled by top-level caller + Result::, anyhow::Error>::Ok(Vec::new()) + } + } + }) + .buffer_unordered(self.concurrency_config.max_concurrent_scans); + + // Collect all results concurrently + let mut subdirs_stream = std::pin::pin!(subdirs_stream); + while let Some(result) = subdirs_stream.next().await { + match result { + Ok(mut subdir_files) => { + all_files.append(&mut subdir_files); + } + Err(e) => { + warn!("Concurrent subdirectory scan error: {}", e); + // Continue processing other subdirectories + } + } + } + } + + // Only update database if this is the top-level call (not a recursive subdirectory scan) let file_count = all_files.iter().filter(|f| !f.is_directory && self.is_direct_child(&f.path, path)).count() as i64; let total_size = all_files.iter() .filter(|f| !f.is_directory && self.is_direct_child(&f.path, path)) @@ -1404,6 +1502,75 @@ impl WebDAVService { warn!("Failed to update directory tracking for {}: {}", path, e); } + debug!("🧠 Smart scan completed for {}: {} total entries found", path, all_files.len()); + Ok(all_files) + }) + } + + /// Internal version of smart_directory_scan that doesn't update the database + /// Used for recursive subdirectory scanning to avoid race conditions + fn smart_directory_scan_internal<'a>( + &'a self, + path: &'a str, + known_etag: Option<&'a str>, + user_id: uuid::Uuid, + state: &'a crate::AppState + ) -> std::pin::Pin>> + Send + 'a>> { + Box::pin(async move { + debug!("🧠 Smart scan (internal) starting for path: {}", path); + + // Convert full WebDAV path to relative path for existing functions + let relative_path = self.convert_to_relative_path(path); + debug!("🔄 Converted {} to relative path: {}", path, relative_path); + + // Step 1: Check current directory ETag + let current_etag = match self.check_directory_etag(&relative_path).await { + Ok(etag) => etag, + Err(e) => { + warn!("Failed to get directory ETag for {}, falling back to full scan: {}", path, e); + return self.discover_files_in_folder_impl(&relative_path).await; + } + }; + + // Step 2: If unchanged and we support recursive ETags, nothing to do + if known_etag == Some(¤t_etag) { + let supports_recursive = match self.config.server_type.as_deref() { + Some("nextcloud") | Some("owncloud") => true, + _ => false + }; + + if supports_recursive { + debug!("✅ Directory {} unchanged (recursive ETag: {}), skipping scan", path, current_etag); + return Ok(Vec::new()); + } else { + debug!("📁 Directory {} ETag unchanged but server doesn't support recursive ETags, checking subdirectories", path); + } + } else { + debug!("🔄 Directory {} changed (old: {:?}, new: {})", path, known_etag, current_etag); + } + + // Step 3: Directory changed or we need to check subdirectories - do depth-1 scan + let entries = match self.discover_files_in_folder_shallow(&relative_path).await { + Ok(files) => files, + Err(e) => { + error!("Failed shallow scan of {}: {}", path, e); + return Err(e); + } + }; + + let mut all_files = Vec::new(); + let mut subdirs_to_scan = Vec::new(); + + // Separate files and directories + for entry in entries { + if entry.is_directory && entry.path != path { + subdirs_to_scan.push(entry.clone()); + } + all_files.push(entry); + } + + // Note: No database update in internal function to avoid race conditions + // Step 4: Process subdirectories concurrently with controlled parallelism if !subdirs_to_scan.is_empty() { let semaphore = std::sync::Arc::new(Semaphore::new(self.concurrency_config.max_concurrent_scans)); @@ -1432,7 +1599,7 @@ impl WebDAVService { debug!("🔄 Subdirectory {} needs scanning (old: {:?}, new: {})", subdir.path, stored_etag, subdir.etag); - match service.smart_directory_scan(&subdir.path, stored_etag.as_deref(), user_id, state).await { + match service.smart_directory_scan_internal(&subdir.path, stored_etag.as_deref(), user_id, state).await { Ok(subdir_files) => { debug!("📂 Found {} entries in subdirectory {}", subdir_files.len(), subdir.path); Result::, anyhow::Error>::Ok(subdir_files) @@ -1444,17 +1611,7 @@ impl WebDAVService { } } else { debug!("✅ Subdirectory {} unchanged (ETag: {})", subdir.path, subdir.etag); - // Update last_scanned_at - let update = crate::models::UpdateWebDAVDirectory { - directory_etag: subdir.etag.clone(), - last_scanned_at: chrono::Utc::now(), - file_count: 0, // Will be preserved by database - total_size_bytes: 0, - }; - - if let Err(e) = state.db.update_webdav_directory(user_id, &subdir.path, &update).await { - warn!("Failed to update scan time for {}: {}", subdir.path, e); - } + // Don't update database during internal scan Result::, anyhow::Error>::Ok(Vec::new()) } } @@ -1476,17 +1633,116 @@ impl WebDAVService { } } - debug!("🧠 Smart scan completed for {}: {} total entries found", path, all_files.len()); + debug!("🧠 Smart scan (internal) completed for {}: {} total entries found", path, all_files.len()); Ok(all_files) }) } + /// Smart directory scan with checkpoint-based crash recovery + pub fn smart_directory_scan_with_checkpoints<'a>( + &'a self, + path: &'a str, + known_etag: Option<&'a str>, + user_id: uuid::Uuid, + state: &'a crate::AppState + ) -> std::pin::Pin>> + Send + 'a>> { + Box::pin(async move { + debug!("🧠 Smart scan with checkpoints starting for path: {}", path); + + // Mark scan as in progress (checkpoint) + if let Err(e) = self.mark_scan_in_progress(user_id, path, state).await { + warn!("Failed to mark scan in progress for {}: {}", path, e); + } + + // Perform the actual scan + let result = self.smart_directory_scan_internal(path, known_etag, user_id, state).await; + + match &result { + Ok(files) => { + debug!("✅ Smart scan completed for {}: {} files", path, files.len()); + + // Update directory tracking and mark scan complete + let file_count = files.iter().filter(|f| !f.is_directory && self.is_direct_child(&f.path, path)).count() as i64; + let total_size = files.iter() + .filter(|f| !f.is_directory && self.is_direct_child(&f.path, path)) + .map(|f| f.size) + .sum::(); + + let current_etag = known_etag.unwrap_or("unknown").to_string(); + let dir_record = crate::models::CreateWebDAVDirectory { + user_id, + directory_path: path.to_string(), + directory_etag: current_etag.clone(), + file_count, + total_size_bytes: total_size, + }; + + if let Err(e) = state.db.create_or_update_webdav_directory(&dir_record).await { + warn!("Failed to update directory tracking for {}: {}", path, e); + } + + // Mark scan as complete (remove checkpoint) + if let Err(e) = self.mark_scan_complete(user_id, path, state).await { + warn!("Failed to mark scan complete for {}: {}", path, e); + } + } + Err(e) => { + error!("❌ Smart scan failed for {}: {}", path, e); + // Mark scan as failed for better tracking + if let Err(mark_err) = state.db.mark_webdav_scan_failed(user_id, path, &e.to_string()).await { + warn!("Failed to mark scan as failed for {}: {}", path, mark_err); + } + } + } + + result + }) + } + + /// Detect directories with incomplete scans that need recovery + async fn detect_incomplete_scans(&self, user_id: uuid::Uuid, state: &crate::AppState) -> Result> { + debug!("🔍 Checking for incomplete scans..."); + + // Check for both incomplete scans and stale scans (running too long, likely crashed) + let mut incomplete_scans = state.db.get_incomplete_webdav_scans(user_id).await.unwrap_or_default(); + let stale_scans = state.db.get_stale_webdav_scans(user_id, 30).await.unwrap_or_default(); // 30 minute timeout + + // Combine and deduplicate + incomplete_scans.extend(stale_scans); + incomplete_scans.sort(); + incomplete_scans.dedup(); + + if !incomplete_scans.is_empty() { + info!("Found {} incomplete/stale scans to recover", incomplete_scans.len()); + } + + Ok(incomplete_scans) + } + + /// Mark a directory scan as in progress (for crash recovery) + async fn mark_scan_in_progress(&self, user_id: uuid::Uuid, path: &str, state: &crate::AppState) -> Result<()> { + debug!("📝 Marking scan in progress for: {}", path); + state.db.mark_webdav_scan_in_progress(user_id, path).await + } + + /// Mark a directory scan as complete (remove crash recovery checkpoint) + async fn mark_scan_complete(&self, user_id: uuid::Uuid, path: &str, state: &crate::AppState) -> Result<()> { + debug!("✅ Marking scan complete for: {}", path); + state.db.mark_webdav_scan_complete(user_id, path).await + } + /// Resume a deep scan from a checkpoint after server restart/interruption pub async fn resume_deep_scan(&self, checkpoint_path: &str, user_id: uuid::Uuid, state: &crate::AppState) -> Result> { + self.resume_deep_scan_internal(checkpoint_path, user_id, state).await + } + + /// Internal resume function that doesn't trigger crash recovery detection (to avoid recursion) + async fn resume_deep_scan_internal(&self, checkpoint_path: &str, user_id: uuid::Uuid, state: &crate::AppState) -> Result> { info!("🔄 Resuming deep scan from checkpoint: {}", checkpoint_path); // Check if the checkpoint directory is still accessible - match self.check_directory_etag(checkpoint_path).await { + let relative_checkpoint_path = self.convert_to_relative_path(checkpoint_path); + match self.check_directory_etag(&relative_checkpoint_path).await { Ok(current_etag) => { info!("✅ Checkpoint directory accessible, resuming scan"); @@ -1509,17 +1765,17 @@ impl WebDAVService { } // Resume with smart scanning from this point - self.discover_files_in_folder_optimized(checkpoint_path, user_id, state).await + self.smart_directory_scan_with_checkpoints(checkpoint_path, None, user_id, state).await } Err(e) => { warn!("Checkpoint directory {} inaccessible after restart: {}", checkpoint_path, e); // Server might have restarted, wait a bit and retry tokio::time::sleep(Duration::from_secs(5)).await; - match self.check_directory_etag(checkpoint_path).await { + match self.check_directory_etag(&relative_checkpoint_path).await { Ok(_) => { info!("🔄 Server recovered, resuming scan"); - self.discover_files_in_folder_optimized(checkpoint_path, user_id, state).await + self.smart_directory_scan_with_checkpoints(checkpoint_path, None, user_id, state).await } Err(e2) => { error!("Failed to resume deep scan after server restart: {}", e2);