From 92b21350db26cd863695878d8584baa10fc8d426 Mon Sep 17 00:00:00 2001 From: perf3ct Date: Tue, 1 Jul 2025 21:22:16 +0000 Subject: [PATCH] feat(webdav): track directory etags MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Γ£à Core Optimizations Implemented 1. ≡ƒôè New Database Schema: Added webdav_directories table to track directory ETags, file counts, and metadata 2. ≡ƒöì Smart Directory Checking: Before deep scans, check directory ETags with lightweight Depth: 0 PROPFIND requests 3. ΓÜí Skip Unchanged Directories: If directory ETag matches, skip the entire deep scan 4. ≡ƒùé∩╕Å N-Depth Subdirectory Tracking: Recursively track all subdirectories found during scans 5. ≡ƒÄ» Individual Subdirectory Checks: When parent unchanged, check each known subdirectory individually ≡ƒÜÇ Performance Benefits Before: Every sync = Full Depth: infinity scan of entire directory treeAfter: - First sync: Full scan + directory tracking setup - Subsequent syncs: Quick ETag checks ΓåÆ skip unchanged directories entirely - Changed directories: Only scan the specific changed subdirectories ≡ƒôü How It Works 1. Initial Request: PROPFIND Depth: 0 on /Documents ΓåÆ get directory ETag 2. Database Check: Compare with stored ETag for /Documents 3. If Unchanged: Check each known subdirectory (/Documents/2024, /Documents/Archive) individually 4. If Changed: Full recursive scan + update all directory tracking data --- .../20250701000000_add_webdav_directories.sql | 22 ++ src/db/webdav.rs | 121 ++++++ src/models.rs | 30 ++ src/scheduling/source_sync.rs | 13 +- src/services/webdav_service.rs | 347 ++++++++++++++++++ src/webdav_xml_parser.rs | 2 +- 6 files changed, 531 insertions(+), 4 deletions(-) create mode 100644 migrations/20250701000000_add_webdav_directories.sql diff --git a/migrations/20250701000000_add_webdav_directories.sql b/migrations/20250701000000_add_webdav_directories.sql new file mode 100644 index 0000000..b300c05 --- /dev/null +++ b/migrations/20250701000000_add_webdav_directories.sql @@ -0,0 +1,22 @@ +-- Add directory-level ETag tracking for efficient WebDAV sync +-- This optimization allows skipping unchanged directories entirely + +CREATE TABLE IF NOT EXISTS webdav_directories ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + user_id UUID REFERENCES users(id) ON DELETE CASCADE, + directory_path TEXT NOT NULL, + directory_etag TEXT NOT NULL, + last_scanned_at TIMESTAMPTZ DEFAULT NOW(), + file_count BIGINT DEFAULT 0, + total_size_bytes BIGINT DEFAULT 0, + created_at TIMESTAMPTZ DEFAULT NOW(), + updated_at TIMESTAMPTZ DEFAULT NOW(), + + UNIQUE(user_id, directory_path) +); + +-- Create indexes for performance +CREATE INDEX IF NOT EXISTS idx_webdav_directories_user_id ON webdav_directories(user_id); +CREATE INDEX IF NOT EXISTS idx_webdav_directories_path ON webdav_directories(user_id, directory_path); +CREATE INDEX IF NOT EXISTS idx_webdav_directories_etag ON webdav_directories(directory_etag); +CREATE INDEX IF NOT EXISTS idx_webdav_directories_last_scanned ON webdav_directories(last_scanned_at); \ No newline at end of file diff --git a/src/db/webdav.rs b/src/db/webdav.rs index 2454b14..e5c0f44 100644 --- a/src/db/webdav.rs +++ b/src/db/webdav.rs @@ -218,4 +218,125 @@ impl Database { Ok(files) } + + // Directory tracking functions for efficient sync optimization + pub async fn get_webdav_directory(&self, user_id: Uuid, directory_path: &str) -> Result> { + self.with_retry(|| async { + let row = sqlx::query( + r#"SELECT id, user_id, directory_path, directory_etag, last_scanned_at, + file_count, total_size_bytes, created_at, updated_at + FROM webdav_directories WHERE user_id = $1 AND directory_path = $2"# + ) + .bind(user_id) + .bind(directory_path) + .fetch_optional(&self.pool) + .await + .map_err(|e| anyhow::anyhow!("Database query failed: {}", e))?; + + match row { + Some(row) => Ok(Some(crate::models::WebDAVDirectory { + id: row.get("id"), + user_id: row.get("user_id"), + directory_path: row.get("directory_path"), + directory_etag: row.get("directory_etag"), + last_scanned_at: row.get("last_scanned_at"), + file_count: row.get("file_count"), + total_size_bytes: row.get("total_size_bytes"), + created_at: row.get("created_at"), + updated_at: row.get("updated_at"), + })), + None => Ok(None), + } + }).await + } + + pub async fn create_or_update_webdav_directory(&self, directory: &crate::models::CreateWebDAVDirectory) -> Result { + let row = sqlx::query( + r#"INSERT INTO webdav_directories (user_id, directory_path, directory_etag, + file_count, total_size_bytes, last_scanned_at, updated_at) + VALUES ($1, $2, $3, $4, $5, NOW(), NOW()) + ON CONFLICT (user_id, directory_path) DO UPDATE SET + directory_etag = EXCLUDED.directory_etag, + file_count = EXCLUDED.file_count, + total_size_bytes = EXCLUDED.total_size_bytes, + last_scanned_at = NOW(), + updated_at = NOW() + RETURNING id, user_id, directory_path, directory_etag, last_scanned_at, + file_count, total_size_bytes, created_at, updated_at"# + ) + .bind(directory.user_id) + .bind(&directory.directory_path) + .bind(&directory.directory_etag) + .bind(directory.file_count) + .bind(directory.total_size_bytes) + .fetch_one(&self.pool) + .await?; + + Ok(crate::models::WebDAVDirectory { + id: row.get("id"), + user_id: row.get("user_id"), + directory_path: row.get("directory_path"), + directory_etag: row.get("directory_etag"), + last_scanned_at: row.get("last_scanned_at"), + file_count: row.get("file_count"), + total_size_bytes: row.get("total_size_bytes"), + created_at: row.get("created_at"), + updated_at: row.get("updated_at"), + }) + } + + pub async fn update_webdav_directory(&self, user_id: Uuid, directory_path: &str, update: &crate::models::UpdateWebDAVDirectory) -> Result<()> { + self.with_retry(|| async { + sqlx::query( + r#"UPDATE webdav_directories SET + directory_etag = $3, + last_scanned_at = $4, + file_count = $5, + total_size_bytes = $6, + updated_at = NOW() + WHERE user_id = $1 AND directory_path = $2"# + ) + .bind(user_id) + .bind(directory_path) + .bind(&update.directory_etag) + .bind(update.last_scanned_at) + .bind(update.file_count) + .bind(update.total_size_bytes) + .execute(&self.pool) + .await + .map_err(|e| anyhow::anyhow!("Database update failed: {}", e))?; + + Ok(()) + }).await + } + + pub async fn list_webdav_directories(&self, user_id: Uuid) -> Result> { + let rows = sqlx::query( + r#"SELECT id, user_id, directory_path, directory_etag, last_scanned_at, + file_count, total_size_bytes, created_at, updated_at + FROM webdav_directories + WHERE user_id = $1 + ORDER BY directory_path ASC"# + ) + .bind(user_id) + .fetch_all(&self.pool) + .await?; + + let mut directories = Vec::new(); + for row in rows { + directories.push(crate::models::WebDAVDirectory { + id: row.get("id"), + user_id: row.get("user_id"), + directory_path: row.get("directory_path"), + directory_etag: row.get("directory_etag"), + last_scanned_at: row.get("last_scanned_at"), + file_count: row.get("file_count"), + total_size_bytes: row.get("total_size_bytes"), + created_at: row.get("created_at"), + updated_at: row.get("updated_at"), + }); + } + + Ok(directories) + } } \ No newline at end of file diff --git a/src/models.rs b/src/models.rs index 83cd3cb..b0fb0ed 100644 --- a/src/models.rs +++ b/src/models.rs @@ -931,6 +931,36 @@ pub struct FileInfo { pub metadata: Option, } +#[derive(Debug, Serialize, Deserialize, FromRow)] +pub struct WebDAVDirectory { + pub id: Uuid, + pub user_id: Uuid, + pub directory_path: String, + pub directory_etag: String, + pub last_scanned_at: DateTime, + pub file_count: i64, + pub total_size_bytes: i64, + pub created_at: DateTime, + pub updated_at: DateTime, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct CreateWebDAVDirectory { + pub user_id: Uuid, + pub directory_path: String, + pub directory_etag: String, + pub file_count: i64, + pub total_size_bytes: i64, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct UpdateWebDAVDirectory { + pub directory_etag: String, + pub last_scanned_at: DateTime, + pub file_count: i64, + pub total_size_bytes: i64, +} + #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash, ToSchema)] pub enum SourceType { #[serde(rename = "webdav")] diff --git a/src/scheduling/source_sync.rs b/src/scheduling/source_sync.rs index 8fa6e67..3556e2c 100644 --- a/src/scheduling/source_sync.rs +++ b/src/scheduling/source_sync.rs @@ -125,11 +125,18 @@ impl SourceSyncService { cancellation_token, |folder_path| { let service = webdav_service.clone(); + let state_clone = self.state.clone(); async move { - debug!("WebDAV discover_files_in_folder called for: {}", folder_path); - let result = service.discover_files_in_folder(&folder_path).await; + info!("🚀 Using optimized WebDAV discovery for: {}", folder_path); + let result = service.discover_files_in_folder_optimized(&folder_path, source.user_id, &state_clone).await; match &result { - Ok(files) => debug!("WebDAV discovered {} files in folder: {}", files.len(), folder_path), + Ok(files) => { + if files.is_empty() { + info!("✅ Directory {} unchanged, skipped deep scan", folder_path); + } else { + info!("🔄 Directory {} changed, discovered {} files", folder_path, files.len()); + } + }, Err(e) => error!("WebDAV discovery failed for folder {}: {}", folder_path, e), } result diff --git a/src/services/webdav_service.rs b/src/services/webdav_service.rs index e3cae9d..0aa37c9 100644 --- a/src/services/webdav_service.rs +++ b/src/services/webdav_service.rs @@ -416,6 +416,353 @@ impl WebDAVService { }).await } + /// Optimized discovery that checks directory ETag first to avoid unnecessary deep scans + pub async fn discover_files_in_folder_optimized(&self, folder_path: &str, user_id: uuid::Uuid, state: &crate::AppState) -> Result> { + info!("🔍 Starting optimized discovery for folder: {}", folder_path); + + // Step 1: Check directory ETag first (lightweight PROPFIND with Depth: 0) + let current_dir_etag = match self.check_directory_etag(folder_path).await { + Ok(etag) => etag, + Err(e) => { + warn!("Failed to get directory ETag for {}, falling back to full scan: {}", folder_path, e); + return self.discover_files_in_folder_impl(folder_path).await; + } + }; + + // Step 2: Check if we have this directory cached + match state.db.get_webdav_directory(user_id, folder_path).await { + Ok(Some(stored_dir)) => { + if stored_dir.directory_etag == current_dir_etag { + info!("✅ Directory {} unchanged (ETag: {}), checking subdirectories individually", folder_path, current_dir_etag); + + // Update last_scanned_at to show we checked + let update = crate::models::UpdateWebDAVDirectory { + directory_etag: current_dir_etag, + last_scanned_at: chrono::Utc::now(), + file_count: stored_dir.file_count, + total_size_bytes: stored_dir.total_size_bytes, + }; + + if let Err(e) = state.db.update_webdav_directory(user_id, folder_path, &update).await { + warn!("Failed to update directory scan time: {}", e); + } + + // Step 2a: Check subdirectories individually for changes + let changed_files = self.check_subdirectories_for_changes(folder_path, user_id, state).await?; + return Ok(changed_files); + } else { + info!("🔄 Directory {} changed (old ETag: {}, new ETag: {}), performing deep scan", + folder_path, stored_dir.directory_etag, current_dir_etag); + } + } + Ok(None) => { + info!("🆕 New directory {}, performing initial scan", folder_path); + } + Err(e) => { + warn!("Database error checking directory {}: {}, proceeding with scan", folder_path, e); + } + } + + // Step 3: Directory has changed or is new - perform full discovery + let files = self.discover_files_in_folder_impl(folder_path).await?; + + // Step 4: Update directory tracking info for main directory + let file_count = files.iter().filter(|f| !f.is_directory).count() as i64; + let total_size_bytes = files.iter().filter(|f| !f.is_directory).map(|f| f.size).sum::(); + + let directory_record = crate::models::CreateWebDAVDirectory { + user_id, + directory_path: folder_path.to_string(), + directory_etag: current_dir_etag.clone(), + file_count, + total_size_bytes, + }; + + if let Err(e) = state.db.create_or_update_webdav_directory(&directory_record).await { + error!("Failed to update directory tracking for {}: {}", folder_path, e); + } else { + info!("📊 Updated directory tracking: {} files, {} bytes, ETag: {}", + file_count, total_size_bytes, current_dir_etag); + } + + // Step 5: Track ALL subdirectories found during the scan (n-depth) + self.track_subdirectories_recursively(&files, user_id, state).await; + + Ok(files) + } + + /// Track all subdirectories recursively with rock-solid n-depth support + async fn track_subdirectories_recursively(&self, files: &[FileInfo], user_id: uuid::Uuid, state: &crate::AppState) { + use std::collections::{HashMap, BTreeSet}; + + // Step 1: Extract all unique directory paths from the file list + let mut all_directories = BTreeSet::new(); + + for file in files { + if file.is_directory { + // Add the directory itself + all_directories.insert(file.path.clone()); + } else { + // Extract all parent directories from file paths + let mut path_parts: Vec<&str> = file.path.split('/').collect(); + path_parts.pop(); // Remove the filename + + // Build directory paths from root down to immediate parent + let mut current_path = String::new(); + for part in path_parts { + if !part.is_empty() { + if !current_path.is_empty() { + current_path.push('/'); + } + current_path.push_str(part); + all_directories.insert(current_path.clone()); + } + } + } + } + + info!("🗂️ Found {} unique directories at all levels", all_directories.len()); + + // Step 2: Create a mapping of directory -> ETag from the files list + let mut directory_etags: HashMap = HashMap::new(); + for file in files { + if file.is_directory { + directory_etags.insert(file.path.clone(), file.etag.clone()); + } + } + + // Step 3: For each directory, calculate its direct content (files and immediate subdirs) + for dir_path in &all_directories { + let dir_etag = match directory_etags.get(dir_path) { + Some(etag) => etag.clone(), + None => { + debug!("⚠️ No ETag found for directory: {}", dir_path); + continue; // Skip directories without ETags + } + }; + + // Count direct files in this directory (not in subdirectories) + let direct_files: Vec<_> = files.iter() + .filter(|f| { + !f.is_directory && + self.is_direct_child(&f.path, dir_path) + }) + .collect(); + + // Count direct subdirectories + let direct_subdirs: Vec<_> = files.iter() + .filter(|f| { + f.is_directory && + self.is_direct_child(&f.path, dir_path) + }) + .collect(); + + let file_count = direct_files.len() as i64; + let total_size_bytes = direct_files.iter().map(|f| f.size).sum::(); + + // Create or update directory tracking record + let directory_record = crate::models::CreateWebDAVDirectory { + user_id, + directory_path: dir_path.clone(), + directory_etag: dir_etag.clone(), + file_count, + total_size_bytes, + }; + + match state.db.create_or_update_webdav_directory(&directory_record).await { + Ok(_) => { + debug!("📁 Tracked directory: {} ({} files, {} subdirs, {} bytes, ETag: {})", + dir_path, file_count, direct_subdirs.len(), total_size_bytes, dir_etag); + } + Err(e) => { + warn!("Failed to update directory tracking for {}: {}", dir_path, e); + } + } + } + + info!("✅ Completed tracking {} directories at all depth levels", all_directories.len()); + } + + /// Check if a path is a direct child of a directory (not nested deeper) + fn is_direct_child(&self, child_path: &str, parent_path: &str) -> bool { + if !child_path.starts_with(parent_path) { + return false; + } + + // Handle root directory case + if parent_path.is_empty() || parent_path == "/" { + return !child_path.trim_start_matches('/').contains('/'); + } + + // Remove parent path prefix and check if remainder has exactly one more path segment + let remaining = child_path.strip_prefix(parent_path) + .unwrap_or("") + .trim_start_matches('/'); + + // Direct child means no more slashes in the remaining path + !remaining.contains('/') + } + + /// Check subdirectories individually for changes when parent directory is unchanged + async fn check_subdirectories_for_changes(&self, parent_path: &str, user_id: uuid::Uuid, state: &crate::AppState) -> Result> { + // Get all known subdirectories from database + let known_directories = match state.db.list_webdav_directories(user_id).await { + Ok(dirs) => dirs, + Err(e) => { + warn!("Failed to get known directories, falling back to full scan: {}", e); + return self.discover_files_in_folder_impl(parent_path).await; + } + }; + + // Filter to subdirectories of this parent + let subdirectories: Vec<_> = known_directories.iter() + .filter(|dir| dir.directory_path.starts_with(parent_path) && dir.directory_path != parent_path) + .collect(); + + if subdirectories.is_empty() { + info!("📁 No known subdirectories for {}, no changes to process", parent_path); + return Ok(Vec::new()); + } + + info!("🔍 Checking {} known subdirectories for changes", subdirectories.len()); + + let mut changed_files = Vec::new(); + let subdirectory_count = subdirectories.len(); + + // Check each subdirectory individually + for subdir in subdirectories { + let subdir_path = &subdir.directory_path; + + // Check if this subdirectory has changed + match self.check_directory_etag(subdir_path).await { + Ok(current_etag) => { + if current_etag != subdir.directory_etag { + info!("🔄 Subdirectory {} changed (old: {}, new: {}), scanning recursively", + subdir_path, subdir.directory_etag, current_etag); + + // This subdirectory changed - get all its files recursively + match self.discover_files_in_folder_impl(subdir_path).await { + Ok(mut subdir_files) => { + info!("📂 Found {} files in changed subdirectory {}", subdir_files.len(), subdir_path); + changed_files.append(&mut subdir_files); + + // Update tracking for this subdirectory and its children + self.track_subdirectories_recursively(&subdir_files, user_id, state).await; + } + Err(e) => { + error!("Failed to scan changed subdirectory {}: {}", subdir_path, e); + } + } + } else { + debug!("✅ Subdirectory {} unchanged (ETag: {})", subdir_path, current_etag); + + // Update last_scanned_at even for unchanged directories + let update = crate::models::UpdateWebDAVDirectory { + directory_etag: current_etag, + last_scanned_at: chrono::Utc::now(), + file_count: subdir.file_count, + total_size_bytes: subdir.total_size_bytes, + }; + + if let Err(e) = state.db.update_webdav_directory(user_id, subdir_path, &update).await { + warn!("Failed to update scan time for {}: {}", subdir_path, e); + } + } + } + Err(e) => { + warn!("Failed to check ETag for subdirectory {}: {}", subdir_path, e); + // Don't fail the entire operation, just log and continue + } + } + } + + info!("🎯 Found {} changed files across {} subdirectories", changed_files.len(), subdirectory_count); + Ok(changed_files) + } + + /// Check directory ETag without performing deep scan - used for optimization + pub async fn check_directory_etag(&self, folder_path: &str) -> Result { + self.retry_with_backoff("check_directory_etag", || { + self.check_directory_etag_impl(folder_path) + }).await + } + + async fn check_directory_etag_impl(&self, folder_path: &str) -> Result { + let folder_url = format!("{}{}", self.base_webdav_url, folder_path); + + debug!("Checking directory ETag for: {}", folder_url); + + let propfind_body = r#" + + + + + "#; + + let response = self.client + .request(Method::from_bytes(b"PROPFIND").unwrap(), &folder_url) + .basic_auth(&self.config.username, Some(&self.config.password)) + .header("Depth", "0") // Only check the directory itself, not contents + .header("Content-Type", "application/xml") + .body(propfind_body) + .send() + .await?; + + if !response.status().is_success() { + return Err(anyhow!("PROPFIND request failed: {}", response.status())); + } + + let response_text = response.text().await?; + debug!("Directory ETag response received, parsing..."); + + // Parse the response to extract directory ETag + self.parse_directory_etag(&response_text) + } + + fn parse_directory_etag(&self, xml_text: &str) -> Result { + use quick_xml::events::Event; + use quick_xml::reader::Reader; + + let mut reader = Reader::from_str(xml_text); + reader.config_mut().trim_text(true); + + let mut current_element = String::new(); + let mut etag = String::new(); + let mut buf = Vec::new(); + + loop { + match reader.read_event_into(&mut buf) { + Ok(Event::Start(e)) | Ok(Event::Empty(e)) => { + let local_name = e.local_name(); + let name = std::str::from_utf8(local_name.as_ref())?; + current_element = name.to_lowercase(); + } + Ok(Event::Text(e)) => { + if current_element == "getetag" { + etag = e.unescape()?.to_string(); + break; + } + } + Ok(Event::End(_)) => { + current_element.clear(); + } + Ok(Event::Eof) => break, + Err(e) => return Err(anyhow!("XML parsing error: {}", e)), + _ => {} + } + } + + if etag.is_empty() { + return Err(anyhow!("No ETag found in directory response")); + } + + // Use existing ETag normalization function from parser module + let normalized_etag = crate::webdav_xml_parser::normalize_etag(&etag); + debug!("Directory ETag: {}", normalized_etag); + + Ok(normalized_etag) + } + async fn discover_files_in_folder_impl(&self, folder_path: &str) -> Result> { let folder_url = format!("{}{}", self.base_webdav_url, folder_path); diff --git a/src/webdav_xml_parser.rs b/src/webdav_xml_parser.rs index 25399d0..acfcaaa 100644 --- a/src/webdav_xml_parser.rs +++ b/src/webdav_xml_parser.rs @@ -292,7 +292,7 @@ fn parse_http_date(date_str: &str) -> Option> { /// - `"abc123"` → `abc123` /// - `W/"abc123"` → `abc123` /// - `abc123` → `abc123` -fn normalize_etag(etag: &str) -> String { +pub fn normalize_etag(etag: &str) -> String { etag.trim() .trim_start_matches("W/") .trim_matches('"')