feat(webdav): track directory etags
✅ Core Optimizations Implemented 1. 📊 New Database Schema: Added webdav_directories table to track directory ETags, file counts, and metadata 2. 🔍 Smart Directory Checking: Before deep scans, check directory ETags with lightweight Depth: 0 PROPFIND requests 3. ⚡ Skip Unchanged Directories: If directory ETag matches, skip the entire deep scan 4. 🗂️ N-Depth Subdirectory Tracking: Recursively track all subdirectories found during scans 5. 🎯 Individual Subdirectory Checks: When parent unchanged, check each known subdirectory individually 🚀 Performance Benefits Before: Every sync = Full Depth: infinity scan of entire directory treeAfter: - First sync: Full scan + directory tracking setup - Subsequent syncs: Quick ETag checks → skip unchanged directories entirely - Changed directories: Only scan the specific changed subdirectories 📁 How It Works 1. Initial Request: PROPFIND Depth: 0 on /Documents → get directory ETag 2. Database Check: Compare with stored ETag for /Documents 3. If Unchanged: Check each known subdirectory (/Documents/2024, /Documents/Archive) individually 4. If Changed: Full recursive scan + update all directory tracking data
This commit is contained in:
parent
1e655be542
commit
fdc240fa5b
|
|
@ -0,0 +1,22 @@
|
||||||
|
-- Add directory-level ETag tracking for efficient WebDAV sync
|
||||||
|
-- This optimization allows skipping unchanged directories entirely
|
||||||
|
|
||||||
|
CREATE TABLE IF NOT EXISTS webdav_directories (
|
||||||
|
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||||
|
user_id UUID REFERENCES users(id) ON DELETE CASCADE,
|
||||||
|
directory_path TEXT NOT NULL,
|
||||||
|
directory_etag TEXT NOT NULL,
|
||||||
|
last_scanned_at TIMESTAMPTZ DEFAULT NOW(),
|
||||||
|
file_count BIGINT DEFAULT 0,
|
||||||
|
total_size_bytes BIGINT DEFAULT 0,
|
||||||
|
created_at TIMESTAMPTZ DEFAULT NOW(),
|
||||||
|
updated_at TIMESTAMPTZ DEFAULT NOW(),
|
||||||
|
|
||||||
|
UNIQUE(user_id, directory_path)
|
||||||
|
);
|
||||||
|
|
||||||
|
-- Create indexes for performance
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_webdav_directories_user_id ON webdav_directories(user_id);
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_webdav_directories_path ON webdav_directories(user_id, directory_path);
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_webdav_directories_etag ON webdav_directories(directory_etag);
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_webdav_directories_last_scanned ON webdav_directories(last_scanned_at);
|
||||||
121
src/db/webdav.rs
121
src/db/webdav.rs
|
|
@ -218,4 +218,125 @@ impl Database {
|
||||||
|
|
||||||
Ok(files)
|
Ok(files)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Directory tracking functions for efficient sync optimization
|
||||||
|
pub async fn get_webdav_directory(&self, user_id: Uuid, directory_path: &str) -> Result<Option<crate::models::WebDAVDirectory>> {
|
||||||
|
self.with_retry(|| async {
|
||||||
|
let row = sqlx::query(
|
||||||
|
r#"SELECT id, user_id, directory_path, directory_etag, last_scanned_at,
|
||||||
|
file_count, total_size_bytes, created_at, updated_at
|
||||||
|
FROM webdav_directories WHERE user_id = $1 AND directory_path = $2"#
|
||||||
|
)
|
||||||
|
.bind(user_id)
|
||||||
|
.bind(directory_path)
|
||||||
|
.fetch_optional(&self.pool)
|
||||||
|
.await
|
||||||
|
.map_err(|e| anyhow::anyhow!("Database query failed: {}", e))?;
|
||||||
|
|
||||||
|
match row {
|
||||||
|
Some(row) => Ok(Some(crate::models::WebDAVDirectory {
|
||||||
|
id: row.get("id"),
|
||||||
|
user_id: row.get("user_id"),
|
||||||
|
directory_path: row.get("directory_path"),
|
||||||
|
directory_etag: row.get("directory_etag"),
|
||||||
|
last_scanned_at: row.get("last_scanned_at"),
|
||||||
|
file_count: row.get("file_count"),
|
||||||
|
total_size_bytes: row.get("total_size_bytes"),
|
||||||
|
created_at: row.get("created_at"),
|
||||||
|
updated_at: row.get("updated_at"),
|
||||||
|
})),
|
||||||
|
None => Ok(None),
|
||||||
|
}
|
||||||
|
}).await
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn create_or_update_webdav_directory(&self, directory: &crate::models::CreateWebDAVDirectory) -> Result<crate::models::WebDAVDirectory> {
|
||||||
|
let row = sqlx::query(
|
||||||
|
r#"INSERT INTO webdav_directories (user_id, directory_path, directory_etag,
|
||||||
|
file_count, total_size_bytes, last_scanned_at, updated_at)
|
||||||
|
VALUES ($1, $2, $3, $4, $5, NOW(), NOW())
|
||||||
|
ON CONFLICT (user_id, directory_path) DO UPDATE SET
|
||||||
|
directory_etag = EXCLUDED.directory_etag,
|
||||||
|
file_count = EXCLUDED.file_count,
|
||||||
|
total_size_bytes = EXCLUDED.total_size_bytes,
|
||||||
|
last_scanned_at = NOW(),
|
||||||
|
updated_at = NOW()
|
||||||
|
RETURNING id, user_id, directory_path, directory_etag, last_scanned_at,
|
||||||
|
file_count, total_size_bytes, created_at, updated_at"#
|
||||||
|
)
|
||||||
|
.bind(directory.user_id)
|
||||||
|
.bind(&directory.directory_path)
|
||||||
|
.bind(&directory.directory_etag)
|
||||||
|
.bind(directory.file_count)
|
||||||
|
.bind(directory.total_size_bytes)
|
||||||
|
.fetch_one(&self.pool)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(crate::models::WebDAVDirectory {
|
||||||
|
id: row.get("id"),
|
||||||
|
user_id: row.get("user_id"),
|
||||||
|
directory_path: row.get("directory_path"),
|
||||||
|
directory_etag: row.get("directory_etag"),
|
||||||
|
last_scanned_at: row.get("last_scanned_at"),
|
||||||
|
file_count: row.get("file_count"),
|
||||||
|
total_size_bytes: row.get("total_size_bytes"),
|
||||||
|
created_at: row.get("created_at"),
|
||||||
|
updated_at: row.get("updated_at"),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn update_webdav_directory(&self, user_id: Uuid, directory_path: &str, update: &crate::models::UpdateWebDAVDirectory) -> Result<()> {
|
||||||
|
self.with_retry(|| async {
|
||||||
|
sqlx::query(
|
||||||
|
r#"UPDATE webdav_directories SET
|
||||||
|
directory_etag = $3,
|
||||||
|
last_scanned_at = $4,
|
||||||
|
file_count = $5,
|
||||||
|
total_size_bytes = $6,
|
||||||
|
updated_at = NOW()
|
||||||
|
WHERE user_id = $1 AND directory_path = $2"#
|
||||||
|
)
|
||||||
|
.bind(user_id)
|
||||||
|
.bind(directory_path)
|
||||||
|
.bind(&update.directory_etag)
|
||||||
|
.bind(update.last_scanned_at)
|
||||||
|
.bind(update.file_count)
|
||||||
|
.bind(update.total_size_bytes)
|
||||||
|
.execute(&self.pool)
|
||||||
|
.await
|
||||||
|
.map_err(|e| anyhow::anyhow!("Database update failed: {}", e))?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}).await
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn list_webdav_directories(&self, user_id: Uuid) -> Result<Vec<crate::models::WebDAVDirectory>> {
|
||||||
|
let rows = sqlx::query(
|
||||||
|
r#"SELECT id, user_id, directory_path, directory_etag, last_scanned_at,
|
||||||
|
file_count, total_size_bytes, created_at, updated_at
|
||||||
|
FROM webdav_directories
|
||||||
|
WHERE user_id = $1
|
||||||
|
ORDER BY directory_path ASC"#
|
||||||
|
)
|
||||||
|
.bind(user_id)
|
||||||
|
.fetch_all(&self.pool)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let mut directories = Vec::new();
|
||||||
|
for row in rows {
|
||||||
|
directories.push(crate::models::WebDAVDirectory {
|
||||||
|
id: row.get("id"),
|
||||||
|
user_id: row.get("user_id"),
|
||||||
|
directory_path: row.get("directory_path"),
|
||||||
|
directory_etag: row.get("directory_etag"),
|
||||||
|
last_scanned_at: row.get("last_scanned_at"),
|
||||||
|
file_count: row.get("file_count"),
|
||||||
|
total_size_bytes: row.get("total_size_bytes"),
|
||||||
|
created_at: row.get("created_at"),
|
||||||
|
updated_at: row.get("updated_at"),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(directories)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -931,6 +931,36 @@ pub struct FileInfo {
|
||||||
pub metadata: Option<serde_json::Value>,
|
pub metadata: Option<serde_json::Value>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize, Deserialize, FromRow)]
|
||||||
|
pub struct WebDAVDirectory {
|
||||||
|
pub id: Uuid,
|
||||||
|
pub user_id: Uuid,
|
||||||
|
pub directory_path: String,
|
||||||
|
pub directory_etag: String,
|
||||||
|
pub last_scanned_at: DateTime<Utc>,
|
||||||
|
pub file_count: i64,
|
||||||
|
pub total_size_bytes: i64,
|
||||||
|
pub created_at: DateTime<Utc>,
|
||||||
|
pub updated_at: DateTime<Utc>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
|
pub struct CreateWebDAVDirectory {
|
||||||
|
pub user_id: Uuid,
|
||||||
|
pub directory_path: String,
|
||||||
|
pub directory_etag: String,
|
||||||
|
pub file_count: i64,
|
||||||
|
pub total_size_bytes: i64,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
|
pub struct UpdateWebDAVDirectory {
|
||||||
|
pub directory_etag: String,
|
||||||
|
pub last_scanned_at: DateTime<Utc>,
|
||||||
|
pub file_count: i64,
|
||||||
|
pub total_size_bytes: i64,
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash, ToSchema)]
|
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash, ToSchema)]
|
||||||
pub enum SourceType {
|
pub enum SourceType {
|
||||||
#[serde(rename = "webdav")]
|
#[serde(rename = "webdav")]
|
||||||
|
|
|
||||||
|
|
@ -125,11 +125,18 @@ impl SourceSyncService {
|
||||||
cancellation_token,
|
cancellation_token,
|
||||||
|folder_path| {
|
|folder_path| {
|
||||||
let service = webdav_service.clone();
|
let service = webdav_service.clone();
|
||||||
|
let state_clone = self.state.clone();
|
||||||
async move {
|
async move {
|
||||||
debug!("WebDAV discover_files_in_folder called for: {}", folder_path);
|
info!("🚀 Using optimized WebDAV discovery for: {}", folder_path);
|
||||||
let result = service.discover_files_in_folder(&folder_path).await;
|
let result = service.discover_files_in_folder_optimized(&folder_path, source.user_id, &state_clone).await;
|
||||||
match &result {
|
match &result {
|
||||||
Ok(files) => debug!("WebDAV discovered {} files in folder: {}", files.len(), folder_path),
|
Ok(files) => {
|
||||||
|
if files.is_empty() {
|
||||||
|
info!("✅ Directory {} unchanged, skipped deep scan", folder_path);
|
||||||
|
} else {
|
||||||
|
info!("🔄 Directory {} changed, discovered {} files", folder_path, files.len());
|
||||||
|
}
|
||||||
|
},
|
||||||
Err(e) => error!("WebDAV discovery failed for folder {}: {}", folder_path, e),
|
Err(e) => error!("WebDAV discovery failed for folder {}: {}", folder_path, e),
|
||||||
}
|
}
|
||||||
result
|
result
|
||||||
|
|
|
||||||
|
|
@ -416,6 +416,353 @@ impl WebDAVService {
|
||||||
}).await
|
}).await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Optimized discovery that checks directory ETag first to avoid unnecessary deep scans
|
||||||
|
pub async fn discover_files_in_folder_optimized(&self, folder_path: &str, user_id: uuid::Uuid, state: &crate::AppState) -> Result<Vec<FileInfo>> {
|
||||||
|
info!("🔍 Starting optimized discovery for folder: {}", folder_path);
|
||||||
|
|
||||||
|
// Step 1: Check directory ETag first (lightweight PROPFIND with Depth: 0)
|
||||||
|
let current_dir_etag = match self.check_directory_etag(folder_path).await {
|
||||||
|
Ok(etag) => etag,
|
||||||
|
Err(e) => {
|
||||||
|
warn!("Failed to get directory ETag for {}, falling back to full scan: {}", folder_path, e);
|
||||||
|
return self.discover_files_in_folder_impl(folder_path).await;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Step 2: Check if we have this directory cached
|
||||||
|
match state.db.get_webdav_directory(user_id, folder_path).await {
|
||||||
|
Ok(Some(stored_dir)) => {
|
||||||
|
if stored_dir.directory_etag == current_dir_etag {
|
||||||
|
info!("✅ Directory {} unchanged (ETag: {}), checking subdirectories individually", folder_path, current_dir_etag);
|
||||||
|
|
||||||
|
// Update last_scanned_at to show we checked
|
||||||
|
let update = crate::models::UpdateWebDAVDirectory {
|
||||||
|
directory_etag: current_dir_etag,
|
||||||
|
last_scanned_at: chrono::Utc::now(),
|
||||||
|
file_count: stored_dir.file_count,
|
||||||
|
total_size_bytes: stored_dir.total_size_bytes,
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Err(e) = state.db.update_webdav_directory(user_id, folder_path, &update).await {
|
||||||
|
warn!("Failed to update directory scan time: {}", e);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Step 2a: Check subdirectories individually for changes
|
||||||
|
let changed_files = self.check_subdirectories_for_changes(folder_path, user_id, state).await?;
|
||||||
|
return Ok(changed_files);
|
||||||
|
} else {
|
||||||
|
info!("🔄 Directory {} changed (old ETag: {}, new ETag: {}), performing deep scan",
|
||||||
|
folder_path, stored_dir.directory_etag, current_dir_etag);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(None) => {
|
||||||
|
info!("🆕 New directory {}, performing initial scan", folder_path);
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
warn!("Database error checking directory {}: {}, proceeding with scan", folder_path, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Step 3: Directory has changed or is new - perform full discovery
|
||||||
|
let files = self.discover_files_in_folder_impl(folder_path).await?;
|
||||||
|
|
||||||
|
// Step 4: Update directory tracking info for main directory
|
||||||
|
let file_count = files.iter().filter(|f| !f.is_directory).count() as i64;
|
||||||
|
let total_size_bytes = files.iter().filter(|f| !f.is_directory).map(|f| f.size).sum::<i64>();
|
||||||
|
|
||||||
|
let directory_record = crate::models::CreateWebDAVDirectory {
|
||||||
|
user_id,
|
||||||
|
directory_path: folder_path.to_string(),
|
||||||
|
directory_etag: current_dir_etag.clone(),
|
||||||
|
file_count,
|
||||||
|
total_size_bytes,
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Err(e) = state.db.create_or_update_webdav_directory(&directory_record).await {
|
||||||
|
error!("Failed to update directory tracking for {}: {}", folder_path, e);
|
||||||
|
} else {
|
||||||
|
info!("📊 Updated directory tracking: {} files, {} bytes, ETag: {}",
|
||||||
|
file_count, total_size_bytes, current_dir_etag);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Step 5: Track ALL subdirectories found during the scan (n-depth)
|
||||||
|
self.track_subdirectories_recursively(&files, user_id, state).await;
|
||||||
|
|
||||||
|
Ok(files)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Track all subdirectories recursively with rock-solid n-depth support
|
||||||
|
async fn track_subdirectories_recursively(&self, files: &[FileInfo], user_id: uuid::Uuid, state: &crate::AppState) {
|
||||||
|
use std::collections::{HashMap, BTreeSet};
|
||||||
|
|
||||||
|
// Step 1: Extract all unique directory paths from the file list
|
||||||
|
let mut all_directories = BTreeSet::new();
|
||||||
|
|
||||||
|
for file in files {
|
||||||
|
if file.is_directory {
|
||||||
|
// Add the directory itself
|
||||||
|
all_directories.insert(file.path.clone());
|
||||||
|
} else {
|
||||||
|
// Extract all parent directories from file paths
|
||||||
|
let mut path_parts: Vec<&str> = file.path.split('/').collect();
|
||||||
|
path_parts.pop(); // Remove the filename
|
||||||
|
|
||||||
|
// Build directory paths from root down to immediate parent
|
||||||
|
let mut current_path = String::new();
|
||||||
|
for part in path_parts {
|
||||||
|
if !part.is_empty() {
|
||||||
|
if !current_path.is_empty() {
|
||||||
|
current_path.push('/');
|
||||||
|
}
|
||||||
|
current_path.push_str(part);
|
||||||
|
all_directories.insert(current_path.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
info!("🗂️ Found {} unique directories at all levels", all_directories.len());
|
||||||
|
|
||||||
|
// Step 2: Create a mapping of directory -> ETag from the files list
|
||||||
|
let mut directory_etags: HashMap<String, String> = HashMap::new();
|
||||||
|
for file in files {
|
||||||
|
if file.is_directory {
|
||||||
|
directory_etags.insert(file.path.clone(), file.etag.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Step 3: For each directory, calculate its direct content (files and immediate subdirs)
|
||||||
|
for dir_path in &all_directories {
|
||||||
|
let dir_etag = match directory_etags.get(dir_path) {
|
||||||
|
Some(etag) => etag.clone(),
|
||||||
|
None => {
|
||||||
|
debug!("⚠️ No ETag found for directory: {}", dir_path);
|
||||||
|
continue; // Skip directories without ETags
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Count direct files in this directory (not in subdirectories)
|
||||||
|
let direct_files: Vec<_> = files.iter()
|
||||||
|
.filter(|f| {
|
||||||
|
!f.is_directory &&
|
||||||
|
self.is_direct_child(&f.path, dir_path)
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
// Count direct subdirectories
|
||||||
|
let direct_subdirs: Vec<_> = files.iter()
|
||||||
|
.filter(|f| {
|
||||||
|
f.is_directory &&
|
||||||
|
self.is_direct_child(&f.path, dir_path)
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
let file_count = direct_files.len() as i64;
|
||||||
|
let total_size_bytes = direct_files.iter().map(|f| f.size).sum::<i64>();
|
||||||
|
|
||||||
|
// Create or update directory tracking record
|
||||||
|
let directory_record = crate::models::CreateWebDAVDirectory {
|
||||||
|
user_id,
|
||||||
|
directory_path: dir_path.clone(),
|
||||||
|
directory_etag: dir_etag.clone(),
|
||||||
|
file_count,
|
||||||
|
total_size_bytes,
|
||||||
|
};
|
||||||
|
|
||||||
|
match state.db.create_or_update_webdav_directory(&directory_record).await {
|
||||||
|
Ok(_) => {
|
||||||
|
debug!("📁 Tracked directory: {} ({} files, {} subdirs, {} bytes, ETag: {})",
|
||||||
|
dir_path, file_count, direct_subdirs.len(), total_size_bytes, dir_etag);
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
warn!("Failed to update directory tracking for {}: {}", dir_path, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
info!("✅ Completed tracking {} directories at all depth levels", all_directories.len());
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Check if a path is a direct child of a directory (not nested deeper)
|
||||||
|
fn is_direct_child(&self, child_path: &str, parent_path: &str) -> bool {
|
||||||
|
if !child_path.starts_with(parent_path) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Handle root directory case
|
||||||
|
if parent_path.is_empty() || parent_path == "/" {
|
||||||
|
return !child_path.trim_start_matches('/').contains('/');
|
||||||
|
}
|
||||||
|
|
||||||
|
// Remove parent path prefix and check if remainder has exactly one more path segment
|
||||||
|
let remaining = child_path.strip_prefix(parent_path)
|
||||||
|
.unwrap_or("")
|
||||||
|
.trim_start_matches('/');
|
||||||
|
|
||||||
|
// Direct child means no more slashes in the remaining path
|
||||||
|
!remaining.contains('/')
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Check subdirectories individually for changes when parent directory is unchanged
|
||||||
|
async fn check_subdirectories_for_changes(&self, parent_path: &str, user_id: uuid::Uuid, state: &crate::AppState) -> Result<Vec<FileInfo>> {
|
||||||
|
// Get all known subdirectories from database
|
||||||
|
let known_directories = match state.db.list_webdav_directories(user_id).await {
|
||||||
|
Ok(dirs) => dirs,
|
||||||
|
Err(e) => {
|
||||||
|
warn!("Failed to get known directories, falling back to full scan: {}", e);
|
||||||
|
return self.discover_files_in_folder_impl(parent_path).await;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Filter to subdirectories of this parent
|
||||||
|
let subdirectories: Vec<_> = known_directories.iter()
|
||||||
|
.filter(|dir| dir.directory_path.starts_with(parent_path) && dir.directory_path != parent_path)
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
if subdirectories.is_empty() {
|
||||||
|
info!("📁 No known subdirectories for {}, no changes to process", parent_path);
|
||||||
|
return Ok(Vec::new());
|
||||||
|
}
|
||||||
|
|
||||||
|
info!("🔍 Checking {} known subdirectories for changes", subdirectories.len());
|
||||||
|
|
||||||
|
let mut changed_files = Vec::new();
|
||||||
|
let subdirectory_count = subdirectories.len();
|
||||||
|
|
||||||
|
// Check each subdirectory individually
|
||||||
|
for subdir in subdirectories {
|
||||||
|
let subdir_path = &subdir.directory_path;
|
||||||
|
|
||||||
|
// Check if this subdirectory has changed
|
||||||
|
match self.check_directory_etag(subdir_path).await {
|
||||||
|
Ok(current_etag) => {
|
||||||
|
if current_etag != subdir.directory_etag {
|
||||||
|
info!("🔄 Subdirectory {} changed (old: {}, new: {}), scanning recursively",
|
||||||
|
subdir_path, subdir.directory_etag, current_etag);
|
||||||
|
|
||||||
|
// This subdirectory changed - get all its files recursively
|
||||||
|
match self.discover_files_in_folder_impl(subdir_path).await {
|
||||||
|
Ok(mut subdir_files) => {
|
||||||
|
info!("📂 Found {} files in changed subdirectory {}", subdir_files.len(), subdir_path);
|
||||||
|
changed_files.append(&mut subdir_files);
|
||||||
|
|
||||||
|
// Update tracking for this subdirectory and its children
|
||||||
|
self.track_subdirectories_recursively(&subdir_files, user_id, state).await;
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
error!("Failed to scan changed subdirectory {}: {}", subdir_path, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
debug!("✅ Subdirectory {} unchanged (ETag: {})", subdir_path, current_etag);
|
||||||
|
|
||||||
|
// Update last_scanned_at even for unchanged directories
|
||||||
|
let update = crate::models::UpdateWebDAVDirectory {
|
||||||
|
directory_etag: current_etag,
|
||||||
|
last_scanned_at: chrono::Utc::now(),
|
||||||
|
file_count: subdir.file_count,
|
||||||
|
total_size_bytes: subdir.total_size_bytes,
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Err(e) = state.db.update_webdav_directory(user_id, subdir_path, &update).await {
|
||||||
|
warn!("Failed to update scan time for {}: {}", subdir_path, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
warn!("Failed to check ETag for subdirectory {}: {}", subdir_path, e);
|
||||||
|
// Don't fail the entire operation, just log and continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
info!("🎯 Found {} changed files across {} subdirectories", changed_files.len(), subdirectory_count);
|
||||||
|
Ok(changed_files)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Check directory ETag without performing deep scan - used for optimization
|
||||||
|
pub async fn check_directory_etag(&self, folder_path: &str) -> Result<String> {
|
||||||
|
self.retry_with_backoff("check_directory_etag", || {
|
||||||
|
self.check_directory_etag_impl(folder_path)
|
||||||
|
}).await
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn check_directory_etag_impl(&self, folder_path: &str) -> Result<String> {
|
||||||
|
let folder_url = format!("{}{}", self.base_webdav_url, folder_path);
|
||||||
|
|
||||||
|
debug!("Checking directory ETag for: {}", folder_url);
|
||||||
|
|
||||||
|
let propfind_body = r#"<?xml version="1.0"?>
|
||||||
|
<d:propfind xmlns:d="DAV:">
|
||||||
|
<d:prop>
|
||||||
|
<d:getetag/>
|
||||||
|
</d:prop>
|
||||||
|
</d:propfind>"#;
|
||||||
|
|
||||||
|
let response = self.client
|
||||||
|
.request(Method::from_bytes(b"PROPFIND").unwrap(), &folder_url)
|
||||||
|
.basic_auth(&self.config.username, Some(&self.config.password))
|
||||||
|
.header("Depth", "0") // Only check the directory itself, not contents
|
||||||
|
.header("Content-Type", "application/xml")
|
||||||
|
.body(propfind_body)
|
||||||
|
.send()
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
if !response.status().is_success() {
|
||||||
|
return Err(anyhow!("PROPFIND request failed: {}", response.status()));
|
||||||
|
}
|
||||||
|
|
||||||
|
let response_text = response.text().await?;
|
||||||
|
debug!("Directory ETag response received, parsing...");
|
||||||
|
|
||||||
|
// Parse the response to extract directory ETag
|
||||||
|
self.parse_directory_etag(&response_text)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn parse_directory_etag(&self, xml_text: &str) -> Result<String> {
|
||||||
|
use quick_xml::events::Event;
|
||||||
|
use quick_xml::reader::Reader;
|
||||||
|
|
||||||
|
let mut reader = Reader::from_str(xml_text);
|
||||||
|
reader.config_mut().trim_text(true);
|
||||||
|
|
||||||
|
let mut current_element = String::new();
|
||||||
|
let mut etag = String::new();
|
||||||
|
let mut buf = Vec::new();
|
||||||
|
|
||||||
|
loop {
|
||||||
|
match reader.read_event_into(&mut buf) {
|
||||||
|
Ok(Event::Start(e)) | Ok(Event::Empty(e)) => {
|
||||||
|
let local_name = e.local_name();
|
||||||
|
let name = std::str::from_utf8(local_name.as_ref())?;
|
||||||
|
current_element = name.to_lowercase();
|
||||||
|
}
|
||||||
|
Ok(Event::Text(e)) => {
|
||||||
|
if current_element == "getetag" {
|
||||||
|
etag = e.unescape()?.to_string();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(Event::End(_)) => {
|
||||||
|
current_element.clear();
|
||||||
|
}
|
||||||
|
Ok(Event::Eof) => break,
|
||||||
|
Err(e) => return Err(anyhow!("XML parsing error: {}", e)),
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if etag.is_empty() {
|
||||||
|
return Err(anyhow!("No ETag found in directory response"));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Use existing ETag normalization function from parser module
|
||||||
|
let normalized_etag = crate::webdav_xml_parser::normalize_etag(&etag);
|
||||||
|
debug!("Directory ETag: {}", normalized_etag);
|
||||||
|
|
||||||
|
Ok(normalized_etag)
|
||||||
|
}
|
||||||
|
|
||||||
async fn discover_files_in_folder_impl(&self, folder_path: &str) -> Result<Vec<FileInfo>> {
|
async fn discover_files_in_folder_impl(&self, folder_path: &str) -> Result<Vec<FileInfo>> {
|
||||||
let folder_url = format!("{}{}", self.base_webdav_url, folder_path);
|
let folder_url = format!("{}{}", self.base_webdav_url, folder_path);
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -292,7 +292,7 @@ fn parse_http_date(date_str: &str) -> Option<DateTime<Utc>> {
|
||||||
/// - `"abc123"` → `abc123`
|
/// - `"abc123"` → `abc123`
|
||||||
/// - `W/"abc123"` → `abc123`
|
/// - `W/"abc123"` → `abc123`
|
||||||
/// - `abc123` → `abc123`
|
/// - `abc123` → `abc123`
|
||||||
fn normalize_etag(etag: &str) -> String {
|
pub fn normalize_etag(etag: &str) -> String {
|
||||||
etag.trim()
|
etag.trim()
|
||||||
.trim_start_matches("W/")
|
.trim_start_matches("W/")
|
||||||
.trim_matches('"')
|
.trim_matches('"')
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue