From 6547130fb17652ebdd87d53dc07e05826f1c9452 Mon Sep 17 00:00:00 2001 From: perf3ct Date: Thu, 3 Jul 2025 19:57:31 +0000 Subject: [PATCH] feat(dev): also break up the large webdav_service.rs file into smaller ones --- src/db/documents/management.rs | 9 +- src/db/documents/search.rs | 2 +- src/routes/sources.rs | 12 +- src/routes/webdav.rs | 4 +- src/routes/webdav/webdav_sync.rs | 2 +- src/scheduling/source_scheduler.rs | 8 +- src/scheduling/source_sync.rs | 2 +- src/scheduling/webdav_scheduler.rs | 2 +- src/services/mod.rs | 2 +- src/services/webdav/config.rs | 141 + src/services/webdav/connection.rs | 307 +++ src/services/webdav/discovery.rs | 349 +++ src/services/webdav/mod.rs | 17 + src/services/webdav/service.rs | 391 +++ src/services/webdav/validation.rs | 352 +++ src/services/webdav_service.rs | 2371 ----------------- src/tests/unit_webdav_smart_scanning_tests.rs | 2 +- 17 files changed, 1582 insertions(+), 2391 deletions(-) create mode 100644 src/services/webdav/config.rs create mode 100644 src/services/webdav/connection.rs create mode 100644 src/services/webdav/discovery.rs create mode 100644 src/services/webdav/mod.rs create mode 100644 src/services/webdav/service.rs create mode 100644 src/services/webdav/validation.rs delete mode 100644 src/services/webdav_service.rs diff --git a/src/db/documents/management.rs b/src/db/documents/management.rs index 3a80137..d256c84 100644 --- a/src/db/documents/management.rs +++ b/src/db/documents/management.rs @@ -1,5 +1,5 @@ use anyhow::Result; -use sqlx::{QueryBuilder, Postgres}; +use sqlx::{QueryBuilder, Postgres, Row}; use uuid::Uuid; use crate::models::{Document, UserRole, FacetItem}; @@ -53,11 +53,16 @@ impl Database { let doc_id: Uuid = row.get("document_id"); let label = Label { id: row.get("label_id"), - user_id: row.get("user_id"), + user_id: Some(row.get("user_id")), name: row.get("name"), + description: None, color: row.get("color"), + background_color: None, + icon: None, + is_system: false, created_at: row.get("created_at"), updated_at: row.get("updated_at"), + document_count: 0, }; if Some(doc_id) != current_doc_id { diff --git a/src/db/documents/search.rs b/src/db/documents/search.rs index 5c8d40c..d367a05 100644 --- a/src/db/documents/search.rs +++ b/src/db/documents/search.rs @@ -1,5 +1,5 @@ use anyhow::Result; -use sqlx::{QueryBuilder, Postgres}; +use sqlx::{QueryBuilder, Postgres, Row}; use uuid::Uuid; use crate::models::{Document, UserRole, SearchRequest, SearchMode, SearchSnippet, HighlightRange, EnhancedDocumentResponse}; diff --git a/src/routes/sources.rs b/src/routes/sources.rs index cd13bab..2e1d483 100644 --- a/src/routes/sources.rs +++ b/src/routes/sources.rs @@ -443,7 +443,7 @@ async fn trigger_deep_scan( })?; // Create WebDAV service - let webdav_config = crate::services::webdav_service::WebDAVConfig { + let webdav_config = crate::services::webdav::WebDAVConfig { server_url: config.server_url.clone(), username: config.username.clone(), password: config.password.clone(), @@ -453,7 +453,7 @@ async fn trigger_deep_scan( server_type: config.server_type.clone(), }; - let webdav_service = crate::services::webdav_service::WebDAVService::new(webdav_config.clone()) + let webdav_service = crate::services::webdav::WebDAVService::new(webdav_config.clone()) .map_err(|e| { error!("Failed to create WebDAV service for deep scan: {}", e); StatusCode::INTERNAL_SERVER_ERROR @@ -795,7 +795,7 @@ async fn test_connection( let config: crate::models::WebDAVSourceConfig = serde_json::from_value(source.config) .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; - match crate::services::webdav_service::test_webdav_connection( + match crate::services::webdav::test_webdav_connection( &config.server_url, &config.username, &config.password, @@ -963,7 +963,7 @@ async fn estimate_webdav_crawl_internal( config: &crate::models::WebDAVSourceConfig, ) -> Result, StatusCode> { // Create WebDAV service config - let webdav_config = crate::services::webdav_service::WebDAVConfig { + let webdav_config = crate::services::webdav::WebDAVConfig { server_url: config.server_url.clone(), username: config.username.clone(), password: config.password.clone(), @@ -974,7 +974,7 @@ async fn estimate_webdav_crawl_internal( }; // Create WebDAV service and estimate crawl - match crate::services::webdav_service::WebDAVService::new(webdav_config) { + match crate::services::webdav::WebDAVService::new(webdav_config) { Ok(webdav_service) => { match webdav_service.estimate_crawl(&config.watch_folders).await { Ok(estimate) => Ok(Json(serde_json::to_value(estimate).unwrap())), @@ -1031,7 +1031,7 @@ async fn test_connection_with_config( let config: crate::models::WebDAVSourceConfig = serde_json::from_value(request.config) .map_err(|_| StatusCode::BAD_REQUEST)?; - match crate::services::webdav_service::test_webdav_connection( + match crate::services::webdav::test_webdav_connection( &config.server_url, &config.username, &config.password, diff --git a/src/routes/webdav.rs b/src/routes/webdav.rs index c4502a3..eebcaa7 100644 --- a/src/routes/webdav.rs +++ b/src/routes/webdav.rs @@ -17,8 +17,8 @@ use crate::{ }, AppState, }; -use crate::services::webdav_service::WebDAVConfig; -use crate::services::webdav_service::WebDAVService; +use crate::services::webdav::WebDAVConfig; +use crate::services::webdav::WebDAVService; pub mod webdav_sync; use webdav_sync::perform_webdav_sync_with_tracking; diff --git a/src/routes/webdav/webdav_sync.rs b/src/routes/webdav/webdav_sync.rs index b78d840..9dc2656 100644 --- a/src/routes/webdav/webdav_sync.rs +++ b/src/routes/webdav/webdav_sync.rs @@ -10,7 +10,7 @@ use crate::{ models::{CreateWebDAVFile, UpdateWebDAVSyncState}, services::file_service::FileService, ingestion::document_ingestion::{DocumentIngestionService, IngestionResult}, - services::webdav_service::{WebDAVConfig, WebDAVService}, + services::webdav::{WebDAVConfig, WebDAVService}, }; pub async fn perform_webdav_sync_with_tracking( diff --git a/src/scheduling/source_scheduler.rs b/src/scheduling/source_scheduler.rs index a3b064e..1f17923 100644 --- a/src/scheduling/source_scheduler.rs +++ b/src/scheduling/source_scheduler.rs @@ -662,8 +662,8 @@ impl SourceScheduler { // Trigger the deep scan via the API endpoint // We'll reuse the existing deep scan logic from the sources route let webdav_config: WebDAVSourceConfig = serde_json::from_value(source.config.clone())?; - let webdav_service = crate::services::webdav_service::WebDAVService::new( - crate::services::webdav_service::WebDAVConfig { + let webdav_service = crate::services::webdav::WebDAVService::new( + crate::services::webdav::WebDAVConfig { server_url: webdav_config.server_url.clone(), username: webdav_config.username.clone(), password: webdav_config.password.clone(), @@ -950,7 +950,7 @@ impl SourceScheduler { let config: WebDAVSourceConfig = serde_json::from_value(source.config.clone()) .map_err(|e| format!("Config parse error: {}", e))?; - let webdav_config = crate::services::webdav_service::WebDAVConfig { + let webdav_config = crate::services::webdav::WebDAVConfig { server_url: config.server_url.clone(), username: config.username.clone(), password: config.password.clone(), @@ -960,7 +960,7 @@ impl SourceScheduler { server_type: config.server_type.clone(), }; - let webdav_service = crate::services::webdav_service::WebDAVService::new(webdav_config) + let webdav_service = crate::services::webdav::WebDAVService::new(webdav_config) .map_err(|e| format!("Service creation failed: {}", e))?; let test_config = crate::models::WebDAVTestConnection { diff --git a/src/scheduling/source_sync.rs b/src/scheduling/source_sync.rs index 3556e2c..6d6dda4 100644 --- a/src/scheduling/source_sync.rs +++ b/src/scheduling/source_sync.rs @@ -15,7 +15,7 @@ use crate::{ ingestion::document_ingestion::{DocumentIngestionService, IngestionResult}, services::local_folder_service::LocalFolderService, services::s3_service::S3Service, - services::webdav_service::{WebDAVService, WebDAVConfig}, + services::webdav::{WebDAVService, WebDAVConfig}, }; #[derive(Clone)] diff --git a/src/scheduling/webdav_scheduler.rs b/src/scheduling/webdav_scheduler.rs index 278dbed..d942339 100644 --- a/src/scheduling/webdav_scheduler.rs +++ b/src/scheduling/webdav_scheduler.rs @@ -9,7 +9,7 @@ use crate::{ services::file_service::FileService, AppState, }; -use crate::services::webdav_service::{WebDAVConfig, WebDAVService}; +use crate::services::webdav::{WebDAVConfig, WebDAVService}; use crate::routes::webdav::webdav_sync::perform_webdav_sync_with_tracking; pub struct WebDAVScheduler { diff --git a/src/services/mod.rs b/src/services/mod.rs index bedcea3..3922063 100644 --- a/src/services/mod.rs +++ b/src/services/mod.rs @@ -3,4 +3,4 @@ pub mod local_folder_service; pub mod ocr_retry_service; pub mod s3_service; pub mod s3_service_stub; -pub mod webdav_service; \ No newline at end of file +pub mod webdav; \ No newline at end of file diff --git a/src/services/webdav/config.rs b/src/services/webdav/config.rs new file mode 100644 index 0000000..528d725 --- /dev/null +++ b/src/services/webdav/config.rs @@ -0,0 +1,141 @@ +use serde::{Deserialize, Serialize}; + +/// WebDAV server configuration +#[derive(Debug, Clone)] +pub struct WebDAVConfig { + pub server_url: String, + pub username: String, + pub password: String, + pub watch_folders: Vec, + pub file_extensions: Vec, + pub timeout_seconds: u64, + pub server_type: Option, // "nextcloud", "owncloud", "generic" +} + +/// Retry configuration for WebDAV operations +#[derive(Debug, Clone)] +pub struct RetryConfig { + pub max_retries: u32, + pub initial_delay_ms: u64, + pub max_delay_ms: u64, + pub backoff_multiplier: f64, + pub timeout_seconds: u64, + pub rate_limit_backoff_ms: u64, // Additional backoff for 429 responses +} + +/// Concurrency configuration for WebDAV operations +#[derive(Debug, Clone)] +pub struct ConcurrencyConfig { + pub max_concurrent_scans: usize, + pub max_concurrent_downloads: usize, + pub adaptive_rate_limiting: bool, +} + +impl Default for RetryConfig { + fn default() -> Self { + Self { + max_retries: 3, + initial_delay_ms: 1000, // 1 second + max_delay_ms: 30000, // 30 seconds + backoff_multiplier: 2.0, + timeout_seconds: 30, + rate_limit_backoff_ms: 5000, // 5 seconds + } + } +} + +impl Default for ConcurrencyConfig { + fn default() -> Self { + Self { + max_concurrent_scans: 4, + max_concurrent_downloads: 8, + adaptive_rate_limiting: true, + } + } +} + +impl WebDAVConfig { + /// Creates a new WebDAV configuration + pub fn new( + server_url: String, + username: String, + password: String, + watch_folders: Vec, + file_extensions: Vec, + ) -> Self { + Self { + server_url, + username, + password, + watch_folders, + file_extensions, + timeout_seconds: 30, + server_type: None, + } + } + + /// Validates the configuration + pub fn validate(&self) -> anyhow::Result<()> { + if self.server_url.is_empty() { + return Err(anyhow::anyhow!("Server URL cannot be empty")); + } + + if self.username.is_empty() { + return Err(anyhow::anyhow!("Username cannot be empty")); + } + + if self.password.is_empty() { + return Err(anyhow::anyhow!("Password cannot be empty")); + } + + if self.watch_folders.is_empty() { + return Err(anyhow::anyhow!("At least one watch folder must be specified")); + } + + // Validate URL format + if !self.server_url.starts_with("http://") && !self.server_url.starts_with("https://") { + return Err(anyhow::anyhow!("Server URL must start with http:// or https://")); + } + + Ok(()) + } + + /// Returns the base URL for WebDAV operations + pub fn webdav_url(&self) -> String { + let mut url = self.server_url.trim_end_matches('/').to_string(); + + // Add WebDAV path based on server type + match self.server_type.as_deref() { + Some("nextcloud") => { + if !url.contains("/remote.php/dav/files/") { + url.push_str(&format!("/remote.php/dav/files/{}", self.username)); + } + } + Some("owncloud") => { + if !url.contains("/remote.php/webdav") { + url.push_str("/remote.php/webdav"); + } + } + _ => { + // Generic WebDAV - use the URL as provided + } + } + + url + } + + /// Checks if a file extension is supported + pub fn is_supported_extension(&self, filename: &str) -> bool { + if self.file_extensions.is_empty() { + return true; // If no extensions specified, support all + } + + let extension = filename.split('.').last().unwrap_or(""); + self.file_extensions.iter().any(|ext| ext.eq_ignore_ascii_case(extension)) + } + + /// Gets the timeout duration + pub fn timeout(&self) -> std::time::Duration { + std::time::Duration::from_secs(self.timeout_seconds) + } +} \ No newline at end of file diff --git a/src/services/webdav/connection.rs b/src/services/webdav/connection.rs new file mode 100644 index 0000000..b8f59c8 --- /dev/null +++ b/src/services/webdav/connection.rs @@ -0,0 +1,307 @@ +use anyhow::{anyhow, Result}; +use reqwest::{Client, Method}; +use std::time::Duration; +use tokio::time::sleep; +use tracing::{debug, error, info, warn}; + +use crate::models::{WebDAVConnectionResult, WebDAVTestConnection}; +use super::config::{WebDAVConfig, RetryConfig}; + +pub struct WebDAVConnection { + client: Client, + config: WebDAVConfig, + retry_config: RetryConfig, +} + +impl WebDAVConnection { + pub fn new(config: WebDAVConfig, retry_config: RetryConfig) -> Result { + // Validate configuration first + config.validate()?; + let client = Client::builder() + .timeout(config.timeout()) + .build()?; + + Ok(Self { + client, + config, + retry_config, + }) + } + + /// Tests WebDAV connection with the provided configuration + pub async fn test_connection(&self) -> Result { + info!("🔍 Testing WebDAV connection to: {}", self.config.server_url); + + // Validate configuration first + if let Err(e) = self.config.validate() { + return Ok(WebDAVConnectionResult { + success: false, + message: format!("Configuration error: {}", e), + server_version: None, + server_type: None, + }); + } + + // Test basic connectivity with OPTIONS request + match self.test_options_request().await { + Ok((server_version, server_type)) => { + info!("✅ WebDAV connection successful"); + Ok(WebDAVConnectionResult { + success: true, + message: "Connection successful".to_string(), + server_version, + server_type, + }) + } + Err(e) => { + error!("❌ WebDAV connection failed: {}", e); + Ok(WebDAVConnectionResult { + success: false, + message: format!("Connection failed: {}", e), + server_version: None, + server_type: None, + }) + } + } + } + + /// Tests connection with provided credentials (for configuration testing) + pub async fn test_connection_with_config(test_config: &WebDAVTestConnection) -> Result { + let config = WebDAVConfig { + server_url: test_config.server_url.clone(), + username: test_config.username.clone(), + password: test_config.password.clone(), + watch_folders: vec!["/".to_string()], + file_extensions: vec![], + timeout_seconds: 30, + server_type: test_config.server_type.clone(), + }; + + let connection = Self::new(config, RetryConfig::default())?; + connection.test_connection().await + } + + /// Performs OPTIONS request to test basic connectivity + async fn test_options_request(&self) -> Result<(Option, Option)> { + let webdav_url = self.config.webdav_url(); + + let response = self.client + .request(Method::OPTIONS, &webdav_url) + .basic_auth(&self.config.username, Some(&self.config.password)) + .send() + .await?; + + if !response.status().is_success() { + return Err(anyhow!( + "OPTIONS request failed with status: {} - {}", + response.status(), + response.text().await.unwrap_or_default() + )); + } + + // Extract server information from headers + let server_version = response + .headers() + .get("server") + .and_then(|v| v.to_str().ok()) + .map(|s| s.to_string()); + + let server_type = self.detect_server_type(&response, &server_version).await; + + Ok((server_version, server_type)) + } + + /// Detects the WebDAV server type based on response headers and capabilities + async fn detect_server_type( + &self, + response: &reqwest::Response, + server_version: &Option, + ) -> Option { + // Check server header first + if let Some(ref server) = server_version { + let server_lower = server.to_lowercase(); + if server_lower.contains("nextcloud") { + return Some("nextcloud".to_string()); + } + if server_lower.contains("owncloud") { + return Some("owncloud".to_string()); + } + if server_lower.contains("apache") || server_lower.contains("nginx") { + // Could be generic WebDAV + } + } + + // Check DAV capabilities + if let Some(dav_header) = response.headers().get("dav") { + if let Ok(dav_str) = dav_header.to_str() { + debug!("DAV capabilities: {}", dav_str); + // Different servers expose different DAV levels + if dav_str.contains("3") { + return Some("webdav_level_3".to_string()); + } + } + } + + // Test for Nextcloud/ownCloud specific endpoints + if self.test_nextcloud_capabilities().await.is_ok() { + return Some("nextcloud".to_string()); + } + + Some("generic".to_string()) + } + + /// Tests for Nextcloud-specific capabilities + async fn test_nextcloud_capabilities(&self) -> Result<()> { + let capabilities_url = format!("{}/ocs/v1.php/cloud/capabilities", + self.config.server_url.trim_end_matches('/')); + + let response = self.client + .get(&capabilities_url) + .basic_auth(&self.config.username, Some(&self.config.password)) + .header("OCS-APIRequest", "true") + .send() + .await?; + + if response.status().is_success() { + debug!("Nextcloud capabilities endpoint accessible"); + Ok(()) + } else { + Err(anyhow!("Nextcloud capabilities not accessible")) + } + } + + /// Tests PROPFIND request on root directory + pub async fn test_propfind(&self, path: &str) -> Result<()> { + let url = format!("{}{}", self.config.webdav_url(), path); + + let propfind_body = r#" + + + + + + + + + "#; + + let response = self.client + .request(Method::from_bytes(b"PROPFIND")?) + .url(&url) + .basic_auth(&self.config.username, Some(&self.config.password)) + .header("Depth", "1") + .header("Content-Type", "application/xml") + .body(propfind_body) + .send() + .await?; + + if response.status().as_u16() == 207 { + debug!("PROPFIND successful for path: {}", path); + Ok(()) + } else { + Err(anyhow!( + "PROPFIND failed for path '{}' with status: {} - {}", + path, + response.status(), + response.text().await.unwrap_or_default() + )) + } + } + + /// Performs authenticated request with retry logic + pub async fn authenticated_request( + &self, + method: Method, + url: &str, + body: Option, + headers: Option>, + ) -> Result { + let mut attempt = 0; + let mut delay = self.retry_config.initial_delay_ms; + + loop { + let mut request = self.client + .request(method.clone(), url) + .basic_auth(&self.config.username, Some(&self.config.password)); + + if let Some(ref body_content) = body { + request = request.body(body_content.clone()); + } + + if let Some(ref headers_list) = headers { + for (key, value) in headers_list { + request = request.header(*key, *value); + } + } + + match request.send().await { + Ok(response) => { + let status = response.status(); + + if status.is_success() || status.as_u16() == 207 { + return Ok(response); + } + + // Handle rate limiting + if status.as_u16() == 429 { + warn!("Rate limited, backing off for {}ms", self.retry_config.rate_limit_backoff_ms); + sleep(Duration::from_millis(self.retry_config.rate_limit_backoff_ms)).await; + continue; + } + + // Handle client errors (don't retry) + if status.is_client_error() && status.as_u16() != 429 { + return Err(anyhow!("Client error: {} - {}", status, + response.text().await.unwrap_or_default())); + } + + // Handle server errors (retry) + if status.is_server_error() && attempt < self.retry_config.max_retries { + warn!("Server error {}, retrying in {}ms (attempt {}/{})", + status, delay, attempt + 1, self.retry_config.max_retries); + + sleep(Duration::from_millis(delay)).await; + delay = std::cmp::min( + (delay as f64 * self.retry_config.backoff_multiplier) as u64, + self.retry_config.max_delay_ms + ); + attempt += 1; + continue; + } + + return Err(anyhow!("Request failed: {} - {}", status, + response.text().await.unwrap_or_default())); + } + Err(e) => { + if attempt < self.retry_config.max_retries { + warn!("Request error: {}, retrying in {}ms (attempt {}/{})", + e, delay, attempt + 1, self.retry_config.max_retries); + + sleep(Duration::from_millis(delay)).await; + delay = std::cmp::min( + (delay as f64 * self.retry_config.backoff_multiplier) as u64, + self.retry_config.max_delay_ms + ); + attempt += 1; + continue; + } + + return Err(anyhow!("Request failed after {} attempts: {}", + self.retry_config.max_retries, e)); + } + } + } + } + + /// Gets the WebDAV URL for a specific path + pub fn get_url_for_path(&self, path: &str) -> String { + let base_url = self.config.webdav_url(); + let clean_path = path.trim_start_matches('/'); + + if clean_path.is_empty() { + base_url + } else { + format!("{}/{}", base_url.trim_end_matches('/'), clean_path) + } + } +} \ No newline at end of file diff --git a/src/services/webdav/discovery.rs b/src/services/webdav/discovery.rs new file mode 100644 index 0000000..e5d3e63 --- /dev/null +++ b/src/services/webdav/discovery.rs @@ -0,0 +1,349 @@ +use anyhow::{anyhow, Result}; +use reqwest::Method; +use std::collections::HashSet; +use tokio::sync::Semaphore; +use futures_util::stream::{self, StreamExt}; +use tracing::{debug, error, info, warn}; + +use crate::models::{FileInfo, WebDAVCrawlEstimate, WebDAVFolderInfo}; +use crate::webdav_xml_parser::{parse_propfind_response, parse_propfind_response_with_directories}; +use super::config::{WebDAVConfig, ConcurrencyConfig}; +use super::connection::WebDAVConnection; + +pub struct WebDAVDiscovery { + connection: WebDAVConnection, + config: WebDAVConfig, + concurrency_config: ConcurrencyConfig, +} + +impl WebDAVDiscovery { + pub fn new( + connection: WebDAVConnection, + config: WebDAVConfig, + concurrency_config: ConcurrencyConfig + ) -> Self { + Self { + connection, + config, + concurrency_config + } + } + + /// Discovers files in a directory with support for pagination and filtering + pub async fn discover_files(&self, directory_path: &str, recursive: bool) -> Result> { + info!("🔍 Discovering files in directory: {}", directory_path); + + if recursive { + self.discover_files_recursive(directory_path).await + } else { + self.discover_files_single_directory(directory_path).await + } + } + + /// Discovers files in a single directory (non-recursive) + async fn discover_files_single_directory(&self, directory_path: &str) -> Result> { + let url = self.connection.get_url_for_path(directory_path); + + let propfind_body = r#" + + + + + + + + + + "#; + + let response = self.connection + .authenticated_request( + Method::from_bytes(b"PROPFIND")?, + &url, + Some(propfind_body.to_string()), + Some(vec![ + ("Depth", "1"), + ("Content-Type", "application/xml"), + ]), + ) + .await?; + + let body = response.text().await?; + let files = parse_propfind_response(&body)?; + + // Filter files based on supported extensions + let filtered_files: Vec = files + .into_iter() + .filter(|file| { + !file.is_directory && self.config.is_supported_extension(&file.name) + }) + .collect(); + + debug!("Found {} supported files in directory: {}", filtered_files.len(), directory_path); + Ok(filtered_files) + } + + /// Discovers files recursively in directory tree + async fn discover_files_recursive(&self, root_directory: &str) -> Result> { + let mut all_files = Vec::new(); + let mut directories_to_scan = vec![root_directory.to_string()]; + let semaphore = Semaphore::new(self.concurrency_config.max_concurrent_scans); + + while !directories_to_scan.is_empty() { + let current_batch: Vec = directories_to_scan + .drain(..) + .take(self.concurrency_config.max_concurrent_scans) + .collect(); + + let tasks = current_batch.into_iter().map(|dir| { + let semaphore = &semaphore; + async move { + let _permit = semaphore.acquire().await.unwrap(); + self.scan_directory_with_subdirs(&dir).await + } + }); + + let results = stream::iter(tasks) + .buffer_unordered(self.concurrency_config.max_concurrent_scans) + .collect::>() + .await; + + for result in results { + match result { + Ok((files, subdirs)) => { + all_files.extend(files); + directories_to_scan.extend(subdirs); + } + Err(e) => { + warn!("Failed to scan directory: {}", e); + } + } + } + } + + info!("Recursive discovery found {} total files", all_files.len()); + Ok(all_files) + } + + /// Scans a directory and returns both files and subdirectories + async fn scan_directory_with_subdirs(&self, directory_path: &str) -> Result<(Vec, Vec)> { + let url = self.connection.get_url_for_path(directory_path); + + let propfind_body = r#" + + + + + + + + + + "#; + + let response = self.connection + .authenticated_request( + Method::from_bytes(b"PROPFIND")?, + &url, + Some(propfind_body.to_string()), + Some(vec![ + ("Depth", "1"), + ("Content-Type", "application/xml"), + ]), + ) + .await?; + + let body = response.text().await?; + let (files, directories) = parse_propfind_response_with_directories(&body)?; + + // Filter files by supported extensions + let filtered_files: Vec = files + .into_iter() + .filter(|file| self.config.is_supported_extension(&file.name)) + .collect(); + + // Convert directory paths to full paths + let full_dir_paths: Vec = directories + .into_iter() + .map(|dir| { + if directory_path == "/" { + format!("/{}", dir.trim_start_matches('/')) + } else { + format!("{}/{}", directory_path.trim_end_matches('/'), dir.trim_start_matches('/')) + } + }) + .collect(); + + debug!("Directory '{}': {} files, {} subdirectories", + directory_path, filtered_files.len(), full_dir_paths.len()); + + Ok((filtered_files, full_dir_paths)) + } + + /// Estimates crawl time and file counts for watch folders + pub async fn estimate_crawl(&self) -> Result { + info!("📊 Estimating crawl for WebDAV watch folders"); + + let mut folders = Vec::new(); + let mut total_files = 0; + let mut total_supported_files = 0; + let mut total_size_mb = 0.0; + + for watch_folder in &self.config.watch_folders { + match self.estimate_folder(watch_folder).await { + Ok(folder_info) => { + total_files += folder_info.total_files; + total_supported_files += folder_info.supported_files; + total_size_mb += folder_info.total_size_mb; + folders.push(folder_info); + } + Err(e) => { + warn!("Failed to estimate folder '{}': {}", watch_folder, e); + // Add empty folder info for failed estimates + folders.push(WebDAVFolderInfo { + path: watch_folder.clone(), + total_files: 0, + supported_files: 0, + estimated_time_hours: 0.0, + total_size_mb: 0.0, + }); + } + } + } + + // Estimate total time based on file count and average processing time + let avg_time_per_file_seconds = 2.0; // Conservative estimate + let total_estimated_time_hours = (total_supported_files as f32 * avg_time_per_file_seconds) / 3600.0; + + Ok(WebDAVCrawlEstimate { + folders, + total_files, + total_supported_files, + total_estimated_time_hours, + total_size_mb, + }) + } + + /// Estimates file count and size for a specific folder + async fn estimate_folder(&self, folder_path: &str) -> Result { + debug!("Estimating folder: {}", folder_path); + + // Sample a few subdirectories to estimate the total + let sample_files = self.discover_files_single_directory(folder_path).await?; + + // Get subdirectories for deeper estimation + let subdirs = self.get_subdirectories(folder_path).await?; + + let mut total_files = sample_files.len() as i64; + let mut total_size: i64 = sample_files.iter().map(|f| f.size).sum(); + + // Sample a few subdirectories to extrapolate + let sample_size = std::cmp::min(5, subdirs.len()); + if sample_size > 0 { + let mut sample_total = 0i64; + + for subdir in subdirs.iter().take(sample_size) { + if let Ok(subdir_files) = self.discover_files_single_directory(subdir).await { + sample_total += subdir_files.len() as i64; + } + } + + // Extrapolate based on sample + if sample_total > 0 { + let avg_files_per_subdir = sample_total as f64 / sample_size as f64; + total_files += (avg_files_per_subdir * subdirs.len() as f64) as i64; + } + } + + // Filter for supported files + let supported_files = (total_files as f64 * self.calculate_support_ratio(&sample_files)) as i64; + + let total_size_mb = total_size as f64 / (1024.0 * 1024.0); + let estimated_time_hours = (supported_files as f32 * 2.0) / 3600.0; // 2 seconds per file + + Ok(WebDAVFolderInfo { + path: folder_path.to_string(), + total_files, + supported_files, + estimated_time_hours, + total_size_mb, + }) + } + + /// Gets subdirectories for a given path + async fn get_subdirectories(&self, directory_path: &str) -> Result> { + let url = self.connection.get_url_for_path(directory_path); + + let propfind_body = r#" + + + + + "#; + + let response = self.connection + .authenticated_request( + Method::from_bytes(b"PROPFIND")?, + &url, + Some(propfind_body.to_string()), + Some(vec![ + ("Depth", "1"), + ("Content-Type", "application/xml"), + ]), + ) + .await?; + + let body = response.text().await?; + let (_, directories) = parse_propfind_response_with_directories(&body)?; + + Ok(directories) + } + + /// Calculates the ratio of supported files in a sample + fn calculate_support_ratio(&self, sample_files: &[FileInfo]) -> f64 { + if sample_files.is_empty() { + return 1.0; // Assume all files are supported if no sample + } + + let supported_count = sample_files + .iter() + .filter(|file| self.config.is_supported_extension(&file.name)) + .count(); + + supported_count as f64 / sample_files.len() as f64 + } + + /// Filters files by last modified date (for incremental syncs) + pub fn filter_files_by_date(&self, files: Vec, since: chrono::DateTime) -> Vec { + files + .into_iter() + .filter(|file| { + file.last_modified + .map(|modified| modified > since) + .unwrap_or(true) // Include files without modification date + }) + .collect() + } + + /// Deduplicates files by ETag or path + pub fn deduplicate_files(&self, files: Vec) -> Vec { + let mut seen_etags = HashSet::new(); + let mut seen_paths = HashSet::new(); + let mut deduplicated = Vec::new(); + + for file in files { + let is_duplicate = if !file.etag.is_empty() { + !seen_etags.insert(file.etag.clone()) + } else { + !seen_paths.insert(file.path.clone()) + }; + + if !is_duplicate { + deduplicated.push(file); + } + } + + debug!("Deduplicated {} files", deduplicated.len()); + deduplicated + } +} \ No newline at end of file diff --git a/src/services/webdav/mod.rs b/src/services/webdav/mod.rs new file mode 100644 index 0000000..67b28ca --- /dev/null +++ b/src/services/webdav/mod.rs @@ -0,0 +1,17 @@ +// WebDAV service modules organized by functionality + +pub mod config; +pub mod connection; +pub mod discovery; +pub mod validation; +pub mod service; + +// Re-export main types for convenience +pub use config::{WebDAVConfig, RetryConfig, ConcurrencyConfig}; +pub use connection::WebDAVConnection; +pub use discovery::WebDAVDiscovery; +pub use validation::{ + WebDAVValidator, ValidationReport, ValidationIssue, ValidationIssueType, + ValidationSeverity, ValidationRecommendation, ValidationAction, ValidationSummary +}; +pub use service::{WebDAVService, ServerCapabilities, HealthStatus, test_webdav_connection}; \ No newline at end of file diff --git a/src/services/webdav/service.rs b/src/services/webdav/service.rs new file mode 100644 index 0000000..9b15a0b --- /dev/null +++ b/src/services/webdav/service.rs @@ -0,0 +1,391 @@ +use anyhow::{anyhow, Result}; +use std::sync::Arc; +use tokio::sync::Semaphore; +use tracing::{debug, error, info, warn}; + +use crate::models::{ + FileInfo, WebDAVConnectionResult, WebDAVCrawlEstimate, WebDAVTestConnection, +}; + +use super::config::{WebDAVConfig, RetryConfig, ConcurrencyConfig}; +use super::connection::WebDAVConnection; +use super::discovery::WebDAVDiscovery; +use super::validation::{WebDAVValidator, ValidationReport}; + +/// Main WebDAV service that coordinates all WebDAV operations +pub struct WebDAVService { + connection: Arc, + discovery: Arc, + validator: Arc, + config: WebDAVConfig, + retry_config: RetryConfig, + concurrency_config: ConcurrencyConfig, + scan_semaphore: Arc, + download_semaphore: Arc, +} + +impl WebDAVService { + /// Creates a new WebDAV service with default configurations + pub fn new(config: WebDAVConfig) -> Result { + Self::new_with_configs(config, RetryConfig::default(), ConcurrencyConfig::default()) + } + + /// Creates a new WebDAV service with custom retry configuration + pub fn new_with_retry(config: WebDAVConfig, retry_config: RetryConfig) -> Result { + Self::new_with_configs(config, retry_config, ConcurrencyConfig::default()) + } + + /// Creates a new WebDAV service with all custom configurations + pub fn new_with_configs( + config: WebDAVConfig, + retry_config: RetryConfig, + concurrency_config: ConcurrencyConfig + ) -> Result { + // Validate configuration + config.validate()?; + + // Create connection handler + let connection = Arc::new(WebDAVConnection::new(config.clone(), retry_config.clone())?); + + // Create discovery handler + let discovery = Arc::new(WebDAVDiscovery::new( + connection.as_ref().clone(), + config.clone(), + concurrency_config.clone(), + )); + + // Create validator + let validator = Arc::new(WebDAVValidator::new( + connection.as_ref().clone(), + config.clone(), + )); + + // Create semaphores for concurrency control + let scan_semaphore = Arc::new(Semaphore::new(concurrency_config.max_concurrent_scans)); + let download_semaphore = Arc::new(Semaphore::new(concurrency_config.max_concurrent_downloads)); + + Ok(Self { + connection, + discovery, + validator, + config, + retry_config, + concurrency_config, + scan_semaphore, + download_semaphore, + }) + } + + /// Tests the WebDAV connection + pub async fn test_connection(&self) -> Result { + info!("🔍 Testing WebDAV connection for service"); + self.connection.test_connection().await + } + + /// Tests WebDAV connection with provided configuration (static method) + pub async fn test_connection_with_config(test_config: &WebDAVTestConnection) -> Result { + WebDAVConnection::test_connection_with_config(test_config).await + } +} + +/// Tests WebDAV connection with provided configuration (standalone function for backward compatibility) +pub async fn test_webdav_connection(test_config: &WebDAVTestConnection) -> Result { + WebDAVConnection::test_connection_with_config(test_config).await +} + +impl WebDAVService { + /// Performs a comprehensive system validation + pub async fn validate_system(&self) -> Result { + info!("🔍 Performing comprehensive WebDAV system validation"); + self.validator.validate_system().await + } + + /// Estimates crawl time and resource requirements + pub async fn estimate_crawl(&self) -> Result { + info!("📊 Estimating WebDAV crawl requirements"); + self.discovery.estimate_crawl().await + } + + /// Discovers all files in watch folders + pub async fn discover_all_files(&self) -> Result> { + info!("🔍 Discovering all files in watch folders"); + let mut all_files = Vec::new(); + + for watch_folder in &self.config.watch_folders { + info!("📁 Scanning watch folder: {}", watch_folder); + + match self.discovery.discover_files(watch_folder, true).await { + Ok(files) => { + info!("✅ Found {} files in {}", files.len(), watch_folder); + all_files.extend(files); + } + Err(e) => { + error!("❌ Failed to scan watch folder '{}': {}", watch_folder, e); + return Err(anyhow!("Failed to scan watch folder '{}': {}", watch_folder, e)); + } + } + } + + // Deduplicate files across folders + let deduplicated_files = self.discovery.deduplicate_files(all_files); + + info!("🎯 Total unique files discovered: {}", deduplicated_files.len()); + Ok(deduplicated_files) + } + + /// Discovers files changed since a specific date (for incremental syncs) + pub async fn discover_changed_files(&self, since: chrono::DateTime) -> Result> { + info!("🔍 Discovering files changed since: {}", since); + + let all_files = self.discover_all_files().await?; + let changed_files = self.discovery.filter_files_by_date(all_files, since); + + info!("📈 Found {} files changed since {}", changed_files.len(), since); + Ok(changed_files) + } + + /// Discovers files in a specific directory + pub async fn discover_files_in_directory(&self, directory_path: &str, recursive: bool) -> Result> { + info!("🔍 Discovering files in directory: {} (recursive: {})", directory_path, recursive); + self.discovery.discover_files(directory_path, recursive).await + } + + /// Downloads a file from WebDAV server + pub async fn download_file(&self, file_info: &FileInfo) -> Result> { + let _permit = self.download_semaphore.acquire().await?; + + debug!("⬇️ Downloading file: {}", file_info.path); + + let url = self.connection.get_url_for_path(&file_info.path); + + let response = self.connection + .authenticated_request( + reqwest::Method::GET, + &url, + None, + None, + ) + .await?; + + if !response.status().is_success() { + return Err(anyhow!( + "Failed to download file '{}': HTTP {}", + file_info.path, + response.status() + )); + } + + let content = response.bytes().await?; + debug!("✅ Downloaded {} bytes for file: {}", content.len(), file_info.path); + + Ok(content.to_vec()) + } + + /// Downloads multiple files concurrently + pub async fn download_files(&self, files: &[FileInfo]) -> Result>)>> { + info!("⬇️ Downloading {} files concurrently", files.len()); + + let tasks = files.iter().map(|file| { + let file_clone = file.clone(); + let service_clone = self.clone(); + + async move { + let result = service_clone.download_file(&file_clone).await; + (file_clone, result) + } + }); + + let results = futures_util::future::join_all(tasks).await; + + let success_count = results.iter().filter(|(_, result)| result.is_ok()).count(); + let failure_count = results.len() - success_count; + + info!("📊 Download completed: {} successful, {} failed", success_count, failure_count); + + Ok(results) + } + + /// Gets file metadata without downloading content + pub async fn get_file_metadata(&self, file_path: &str) -> Result { + debug!("📋 Getting metadata for file: {}", file_path); + + let url = self.connection.get_url_for_path(file_path); + + let propfind_body = r#" + + + + + + + + + + "#; + + let response = self.connection + .authenticated_request( + reqwest::Method::from_bytes(b"PROPFIND")?, + &url, + Some(propfind_body.to_string()), + Some(vec![ + ("Depth", "0"), + ("Content-Type", "application/xml"), + ]), + ) + .await?; + + let body = response.text().await?; + let files = crate::webdav_xml_parser::parse_propfind_response(&body)?; + + files.into_iter() + .find(|f| f.path == file_path) + .ok_or_else(|| anyhow!("File metadata not found: {}", file_path)) + } + + /// Checks if a file exists on the WebDAV server + pub async fn file_exists(&self, file_path: &str) -> Result { + match self.get_file_metadata(file_path).await { + Ok(_) => Ok(true), + Err(_) => Ok(false), + } + } + + /// Gets the server capabilities and features + pub async fn get_server_capabilities(&self) -> Result { + debug!("🔍 Checking server capabilities"); + + let options_response = self.connection + .authenticated_request( + reqwest::Method::OPTIONS, + &self.config.webdav_url(), + None, + None, + ) + .await?; + + let dav_header = options_response + .headers() + .get("dav") + .and_then(|v| v.to_str().ok()) + .unwrap_or("") + .to_string(); + + let allow_header = options_response + .headers() + .get("allow") + .and_then(|v| v.to_str().ok()) + .unwrap_or("") + .to_string(); + + let server_header = options_response + .headers() + .get("server") + .and_then(|v| v.to_str().ok()) + .map(|s| s.to_string()); + + Ok(ServerCapabilities { + dav_compliance: dav_header, + allowed_methods: allow_header, + server_software: server_header, + supports_etag: dav_header.contains("1") || dav_header.contains("2"), + supports_depth_infinity: dav_header.contains("1"), + }) + } + + /// Performs a health check on the WebDAV service + pub async fn health_check(&self) -> Result { + info!("🏥 Performing WebDAV service health check"); + + let start_time = std::time::Instant::now(); + + // Test basic connectivity + let connection_result = self.test_connection().await?; + if !connection_result.success { + return Ok(HealthStatus { + healthy: false, + message: format!("Connection failed: {}", connection_result.message), + response_time_ms: start_time.elapsed().as_millis() as u64, + details: None, + }); + } + + // Test each watch folder + for folder in &self.config.watch_folders { + if let Err(e) = self.connection.test_propfind(folder).await { + return Ok(HealthStatus { + healthy: false, + message: format!("Watch folder '{}' is inaccessible: {}", folder, e), + response_time_ms: start_time.elapsed().as_millis() as u64, + details: Some(serde_json::json!({ + "failed_folder": folder, + "error": e.to_string() + })), + }); + } + } + + let response_time = start_time.elapsed().as_millis() as u64; + + Ok(HealthStatus { + healthy: true, + message: "All systems operational".to_string(), + response_time_ms: response_time, + details: Some(serde_json::json!({ + "tested_folders": self.config.watch_folders, + "server_type": connection_result.server_type, + "server_version": connection_result.server_version + })), + }) + } + + /// Gets configuration information + pub fn get_config(&self) -> &WebDAVConfig { + &self.config + } + + /// Gets retry configuration + pub fn get_retry_config(&self) -> &RetryConfig { + &self.retry_config + } + + /// Gets concurrency configuration + pub fn get_concurrency_config(&self) -> &ConcurrencyConfig { + &self.concurrency_config + } +} + +// Implement Clone to allow sharing the service +impl Clone for WebDAVService { + fn clone(&self) -> Self { + Self { + connection: Arc::clone(&self.connection), + discovery: Arc::clone(&self.discovery), + validator: Arc::clone(&self.validator), + config: self.config.clone(), + retry_config: self.retry_config.clone(), + concurrency_config: self.concurrency_config.clone(), + scan_semaphore: Arc::clone(&self.scan_semaphore), + download_semaphore: Arc::clone(&self.download_semaphore), + } + } +} + +/// Server capabilities information +#[derive(Debug, Clone)] +pub struct ServerCapabilities { + pub dav_compliance: String, + pub allowed_methods: String, + pub server_software: Option, + pub supports_etag: bool, + pub supports_depth_infinity: bool, +} + +/// Health status information +#[derive(Debug, Clone)] +pub struct HealthStatus { + pub healthy: bool, + pub message: String, + pub response_time_ms: u64, + pub details: Option, +} \ No newline at end of file diff --git a/src/services/webdav/validation.rs b/src/services/webdav/validation.rs new file mode 100644 index 0000000..a4b9f90 --- /dev/null +++ b/src/services/webdav/validation.rs @@ -0,0 +1,352 @@ +use anyhow::{anyhow, Result}; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use tracing::{debug, error, info, warn}; + +use super::config::WebDAVConfig; +use super::connection::WebDAVConnection; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ValidationReport { + pub overall_health_score: i32, // 0-100 + pub issues: Vec, + pub recommendations: Vec, + pub summary: ValidationSummary, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ValidationIssue { + pub issue_type: ValidationIssueType, + pub severity: ValidationSeverity, + pub directory_path: String, + pub description: String, + pub details: Option, + pub detected_at: chrono::DateTime, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum ValidationIssueType { + /// Directory exists on server but not in our tracking + Untracked, + /// Directory in our tracking but missing on server + Missing, + /// ETag mismatch between server and our cache + ETagMismatch, + /// Directory hasn't been scanned in a very long time + Stale, + /// Server errors when accessing directory + Inaccessible, + /// ETag support seems unreliable for this directory + ETagUnreliable, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum ValidationSeverity { + Info, // No action needed, just FYI + Warning, // Should investigate but not urgent + Error, // Needs immediate attention + Critical, // System integrity at risk +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ValidationRecommendation { + pub action: ValidationAction, + pub reason: String, + pub affected_directories: Vec, + pub priority: ValidationSeverity, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum ValidationAction { + /// Run a deep scan of specific directories + DeepScanRequired, + /// Clear and rebuild directory tracking + RebuildTracking, + /// ETag support is unreliable, switch to periodic scans + DisableETagOptimization, + /// Clean up orphaned database entries + CleanupDatabase, + /// Server configuration issue needs attention + CheckServerConfiguration, + /// No action needed, system is healthy + NoActionRequired, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ValidationSummary { + pub total_directories_checked: usize, + pub healthy_directories: usize, + pub directories_with_issues: usize, + pub critical_issues: usize, + pub warning_issues: usize, + pub info_issues: usize, + pub validation_duration_ms: u64, +} + +pub struct WebDAVValidator { + connection: WebDAVConnection, + config: WebDAVConfig, +} + +impl WebDAVValidator { + pub fn new(connection: WebDAVConnection, config: WebDAVConfig) -> Self { + Self { connection, config } + } + + /// Performs comprehensive validation of WebDAV setup and directory tracking + pub async fn validate_system(&self) -> Result { + let start_time = std::time::Instant::now(); + info!("🔍 Starting WebDAV system validation"); + + let mut issues = Vec::new(); + let mut total_checked = 0; + + // Test basic connectivity + match self.connection.test_connection().await { + Ok(result) if !result.success => { + issues.push(ValidationIssue { + issue_type: ValidationIssueType::Inaccessible, + severity: ValidationSeverity::Critical, + directory_path: "/".to_string(), + description: format!("WebDAV server connection failed: {}", result.message), + details: None, + detected_at: chrono::Utc::now(), + }); + } + Err(e) => { + issues.push(ValidationIssue { + issue_type: ValidationIssueType::Inaccessible, + severity: ValidationSeverity::Critical, + directory_path: "/".to_string(), + description: format!("WebDAV server connectivity test failed: {}", e), + details: None, + detected_at: chrono::Utc::now(), + }); + } + _ => { + debug!("✅ Basic connectivity test passed"); + } + } + + // Validate each watch folder + for folder in &self.config.watch_folders { + total_checked += 1; + if let Err(e) = self.validate_watch_folder(folder, &mut issues).await { + warn!("Failed to validate watch folder '{}': {}", folder, e); + } + } + + // Test ETag reliability + self.validate_etag_support(&mut issues).await?; + + // Generate recommendations based on issues + let recommendations = self.generate_recommendations(&issues); + + let validation_duration = start_time.elapsed().as_millis() as u64; + let health_score = self.calculate_health_score(&issues); + + let summary = ValidationSummary { + total_directories_checked: total_checked, + healthy_directories: total_checked - issues.len(), + directories_with_issues: issues.len(), + critical_issues: issues.iter().filter(|i| matches!(i.severity, ValidationSeverity::Critical)).count(), + warning_issues: issues.iter().filter(|i| matches!(i.severity, ValidationSeverity::Warning)).count(), + info_issues: issues.iter().filter(|i| matches!(i.severity, ValidationSeverity::Info)).count(), + validation_duration_ms: validation_duration, + }; + + info!("✅ WebDAV validation completed in {}ms. Health score: {}/100", + validation_duration, health_score); + + Ok(ValidationReport { + overall_health_score: health_score, + issues, + recommendations, + summary, + }) + } + + /// Validates a specific watch folder + async fn validate_watch_folder(&self, folder: &str, issues: &mut Vec) -> Result<()> { + debug!("Validating watch folder: {}", folder); + + // Test PROPFIND access + match self.connection.test_propfind(folder).await { + Ok(_) => { + debug!("✅ Watch folder '{}' is accessible", folder); + } + Err(e) => { + issues.push(ValidationIssue { + issue_type: ValidationIssueType::Inaccessible, + severity: ValidationSeverity::Error, + directory_path: folder.to_string(), + description: format!("Cannot access watch folder: {}", e), + details: Some(serde_json::json!({ + "error": e.to_string(), + "folder": folder + })), + detected_at: chrono::Utc::now(), + }); + } + } + + Ok(()) + } + + /// Tests ETag support reliability + async fn validate_etag_support(&self, issues: &mut Vec) -> Result<()> { + debug!("Testing ETag support reliability"); + + // Test ETag consistency across multiple requests + for folder in &self.config.watch_folders { + if let Err(e) = self.test_etag_consistency(folder, issues).await { + warn!("ETag consistency test failed for '{}': {}", folder, e); + } + } + + Ok(()) + } + + /// Tests ETag consistency for a specific folder + async fn test_etag_consistency(&self, folder: &str, issues: &mut Vec) -> Result<()> { + // Make two consecutive PROPFIND requests and compare ETags + let etag1 = self.get_folder_etag(folder).await?; + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + let etag2 = self.get_folder_etag(folder).await?; + + if etag1 != etag2 && etag1.is_some() && etag2.is_some() { + issues.push(ValidationIssue { + issue_type: ValidationIssueType::ETagUnreliable, + severity: ValidationSeverity::Warning, + directory_path: folder.to_string(), + description: "ETag values are inconsistent across requests".to_string(), + details: Some(serde_json::json!({ + "etag1": etag1, + "etag2": etag2, + "folder": folder + })), + detected_at: chrono::Utc::now(), + }); + } + + Ok(()) + } + + /// Gets the ETag for a folder + async fn get_folder_etag(&self, folder: &str) -> Result> { + let url = self.connection.get_url_for_path(folder); + + let propfind_body = r#" + + + + + "#; + + let response = self.connection + .authenticated_request( + reqwest::Method::from_bytes(b"PROPFIND")?, + &url, + Some(propfind_body.to_string()), + Some(vec![ + ("Depth", "0"), + ("Content-Type", "application/xml"), + ]), + ) + .await?; + + let body = response.text().await?; + + // Parse ETag from XML response (simplified) + if let Some(start) = body.find("") { + if let Some(end) = body[start..].find("") { + let etag = &body[start + 11..start + end]; + return Ok(Some(etag.trim_matches('"').to_string())); + } + } + + Ok(None) + } + + /// Generates recommendations based on detected issues + fn generate_recommendations(&self, issues: &Vec) -> Vec { + let mut recommendations = Vec::new(); + let mut directories_by_issue: HashMap> = HashMap::new(); + + // Group directories by issue type + for issue in issues { + directories_by_issue + .entry(issue.issue_type.clone()) + .or_insert_with(Vec::new) + .push(issue.directory_path.clone()); + } + + // Generate recommendations for each issue type + for (issue_type, directories) in directories_by_issue { + let recommendation = match issue_type { + ValidationIssueType::Inaccessible => ValidationRecommendation { + action: ValidationAction::CheckServerConfiguration, + reason: "Some directories are inaccessible. Check server configuration and permissions.".to_string(), + affected_directories: directories, + priority: ValidationSeverity::Critical, + }, + ValidationIssueType::ETagUnreliable => ValidationRecommendation { + action: ValidationAction::DisableETagOptimization, + reason: "ETag support appears unreliable. Consider disabling ETag optimization.".to_string(), + affected_directories: directories, + priority: ValidationSeverity::Warning, + }, + ValidationIssueType::Missing => ValidationRecommendation { + action: ValidationAction::CleanupDatabase, + reason: "Some tracked directories no longer exist on the server.".to_string(), + affected_directories: directories, + priority: ValidationSeverity::Warning, + }, + ValidationIssueType::Stale => ValidationRecommendation { + action: ValidationAction::DeepScanRequired, + reason: "Some directories haven't been scanned recently.".to_string(), + affected_directories: directories, + priority: ValidationSeverity::Info, + }, + _ => ValidationRecommendation { + action: ValidationAction::DeepScanRequired, + reason: "General validation issues detected.".to_string(), + affected_directories: directories, + priority: ValidationSeverity::Warning, + }, + }; + recommendations.push(recommendation); + } + + if recommendations.is_empty() { + recommendations.push(ValidationRecommendation { + action: ValidationAction::NoActionRequired, + reason: "System validation passed successfully.".to_string(), + affected_directories: Vec::new(), + priority: ValidationSeverity::Info, + }); + } + + recommendations + } + + /// Calculates overall health score based on issues + fn calculate_health_score(&self, issues: &Vec) -> i32 { + if issues.is_empty() { + return 100; + } + + let mut penalty = 0; + for issue in issues { + let issue_penalty = match issue.severity { + ValidationSeverity::Critical => 30, + ValidationSeverity::Error => 20, + ValidationSeverity::Warning => 10, + ValidationSeverity::Info => 5, + }; + penalty += issue_penalty; + } + + std::cmp::max(0, 100 - penalty) + } +} \ No newline at end of file diff --git a/src/services/webdav_service.rs b/src/services/webdav_service.rs deleted file mode 100644 index 4f3d3dc..0000000 --- a/src/services/webdav_service.rs +++ /dev/null @@ -1,2371 +0,0 @@ -use anyhow::{anyhow, Result}; -use reqwest::{Client, Method}; -use serde::{Deserialize, Serialize}; -use std::collections::HashSet; -use std::time::Duration; -use tokio::time::sleep; -use tokio::sync::Semaphore; -use futures_util::stream::{self, StreamExt}; -use tracing::{debug, error, info, warn}; - -use crate::models::{ - FileInfo, WebDAVConnectionResult, WebDAVCrawlEstimate, WebDAVFolderInfo, - WebDAVTestConnection, -}; -use crate::webdav_xml_parser::{parse_propfind_response, parse_propfind_response_with_directories}; - -#[derive(Debug, Clone)] -pub struct WebDAVConfig { - pub server_url: String, - pub username: String, - pub password: String, - pub watch_folders: Vec, - pub file_extensions: Vec, - pub timeout_seconds: u64, - pub server_type: Option, // "nextcloud", "owncloud", "generic" -} - -#[derive(Debug, Clone)] -pub struct RetryConfig { - pub max_retries: u32, - pub initial_delay_ms: u64, - pub max_delay_ms: u64, - pub backoff_multiplier: f64, - pub timeout_seconds: u64, - pub rate_limit_backoff_ms: u64, // Additional backoff for 429 responses -} - -#[derive(Debug, Clone)] -pub struct ConcurrencyConfig { - pub max_concurrent_scans: usize, - pub max_concurrent_downloads: usize, - pub adaptive_rate_limiting: bool, -} - -impl Default for RetryConfig { - fn default() -> Self { - Self { - max_retries: 3, - initial_delay_ms: 1000, // 1 second - max_delay_ms: 30000, // 30 seconds - backoff_multiplier: 2.0, - timeout_seconds: 300, // 5 minutes total timeout for crawl operations - rate_limit_backoff_ms: 5000, // 5 seconds extra for rate limits - } - } -} - -impl Default for ConcurrencyConfig { - fn default() -> Self { - Self { - max_concurrent_scans: 10, - max_concurrent_downloads: 5, - adaptive_rate_limiting: true, - } - } -} - - - -#[derive(Clone)] -pub struct WebDAVService { - client: Client, - config: WebDAVConfig, - base_webdav_url: String, - retry_config: RetryConfig, - concurrency_config: ConcurrencyConfig, -} - -/// Report of ETag validation and directory integrity checks -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct ValidationReport { - pub validation_id: uuid::Uuid, - pub user_id: uuid::Uuid, - pub started_at: chrono::DateTime, - pub completed_at: Option>, - pub total_directories_checked: u32, - pub issues_found: Vec, - pub recommendations: Vec, - pub etag_support_verified: bool, - pub server_health_score: u8, // 0-100 -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct ValidationIssue { - pub issue_type: ValidationIssueType, - pub directory_path: String, - pub severity: ValidationSeverity, - pub description: String, - pub discovered_at: chrono::DateTime, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub enum ValidationIssueType { - /// Directory exists on server but not in our tracking - Untracked, - /// Directory in our tracking but missing on server - Missing, - /// ETag mismatch between server and our cache - ETagMismatch, - /// Directory hasn't been scanned in a very long time - Stale, - /// Server errors when accessing directory - Inaccessible, - /// ETag support seems unreliable for this directory - ETagUnreliable, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub enum ValidationSeverity { - Info, // No action needed, just FYI - Warning, // Should investigate but not urgent - Error, // Needs immediate attention - Critical, // System integrity at risk -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct ValidationRecommendation { - pub action: ValidationAction, - pub reason: String, - pub affected_directories: Vec, - pub priority: ValidationSeverity, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub enum ValidationAction { - /// Run a deep scan of specific directories - DeepScanRequired, - /// Clear and rebuild directory tracking - RebuildTracking, - /// ETag support is unreliable, switch to periodic scans - DisableETagOptimization, - /// Clean up orphaned database entries - CleanupDatabase, - /// Server configuration issue needs attention - CheckServerConfiguration, - /// No action needed, system is healthy - NoActionRequired, -} - -impl WebDAVService { - pub fn new(config: WebDAVConfig) -> Result { - Self::new_with_configs(config, RetryConfig::default(), ConcurrencyConfig::default()) - } - - pub fn new_with_retry(config: WebDAVConfig, retry_config: RetryConfig) -> Result { - Self::new_with_configs(config, retry_config, ConcurrencyConfig::default()) - } - - pub fn new_with_configs(config: WebDAVConfig, retry_config: RetryConfig, concurrency_config: ConcurrencyConfig) -> Result { - let client = Client::builder() - .timeout(Duration::from_secs(config.timeout_seconds)) - .build()?; - - // Validate server URL before constructing WebDAV URLs - if config.server_url.trim().is_empty() { - return Err(anyhow!("❌ WebDAV Configuration Error: server_url is empty")); - } - - if !config.server_url.starts_with("http://") && !config.server_url.starts_with("https://") { - return Err(anyhow!( - "❌ WebDAV Configuration Error: server_url must start with 'http://' or 'https://'. \ - Current value: '{}'. \ - Examples: \ - - https://cloud.example.com \ - - http://192.168.1.100:8080 \ - - https://nextcloud.mydomain.com", - config.server_url - )); - } - - // Validate that server_url can be parsed as a proper URL - if let Err(e) = reqwest::Url::parse(&config.server_url) { - return Err(anyhow!( - "❌ WebDAV Configuration Error: server_url is not a valid URL: {}. \ - Current value: '{}'. \ - The URL must be absolute and include the full domain. \ - Examples: \ - - https://cloud.example.com \ - - http://192.168.1.100:8080/webdav \ - - https://nextcloud.mydomain.com", - e, config.server_url - )); - } - - // Construct WebDAV URL based on server type - let base_webdav_url = match config.server_type.as_deref() { - Some("nextcloud") | Some("owncloud") => { - let url = format!( - "{}/remote.php/dav/files/{}", - config.server_url.trim_end_matches('/'), - config.username - ); - debug!("🔗 Constructed Nextcloud/ownCloud WebDAV URL: {}", url); - url - }, - _ => { - let url = format!( - "{}/webdav", - config.server_url.trim_end_matches('/') - ); - debug!("🔗 Constructed generic WebDAV URL: {}", url); - url - }, - }; - - Ok(Self { - client, - config, - base_webdav_url, - retry_config, - concurrency_config, - }) - } - - async fn retry_with_backoff(&self, operation_name: &str, operation: F) -> Result - where - F: Fn() -> Fut, - Fut: std::future::Future>, - { - let mut attempt = 0; - let mut delay = self.retry_config.initial_delay_ms; - - loop { - match operation().await { - Ok(result) => { - if attempt > 0 { - info!("{} succeeded after {} retries", operation_name, attempt); - } - return Ok(result); - } - Err(err) => { - attempt += 1; - - if attempt > self.retry_config.max_retries { - error!("{} failed after {} attempts: {}", operation_name, attempt - 1, err); - return Err(err); - } - - // Check if error is retryable - if !Self::is_retryable_error(&err) { - error!("{} failed with non-retryable error: {}", operation_name, err); - return Err(err); - } - - // Apply adaptive backoff for rate limiting - let actual_delay = if Self::is_rate_limit_error(&err) && self.concurrency_config.adaptive_rate_limiting { - let rate_limit_delay = delay + self.retry_config.rate_limit_backoff_ms; - warn!("{} rate limited (attempt {}), retrying in {}ms with extra backoff: {}", - operation_name, attempt, rate_limit_delay, err); - rate_limit_delay - } else { - warn!("{} failed (attempt {}), retrying in {}ms: {}", - operation_name, attempt, delay, err); - delay - }; - - sleep(Duration::from_millis(actual_delay)).await; - - // Calculate next delay with exponential backoff - delay = ((delay as f64 * self.retry_config.backoff_multiplier) as u64) - .min(self.retry_config.max_delay_ms); - } - } - } - } - - fn is_retryable_error(error: &anyhow::Error) -> bool { - // Check if error is network-related or temporary - if let Some(reqwest_error) = error.downcast_ref::() { - // Retry on network errors, timeouts, and server errors (5xx) - return reqwest_error.is_timeout() - || reqwest_error.is_connect() - || reqwest_error.is_request() - || reqwest_error.status() - .map(|s| { - s.is_server_error() // 5xx errors (including server restart scenarios) - || s == 429 // Too Many Requests - || s == 502 // Bad Gateway (server restarting) - || s == 503 // Service Unavailable (server restarting/overloaded) - || s == 504 // Gateway Timeout (server slow to respond) - }) - .unwrap_or(true); - } - - // For other errors, check the error message for common temporary issues - let error_str = error.to_string().to_lowercase(); - error_str.contains("timeout") - || error_str.contains("connection") - || error_str.contains("network") - || error_str.contains("temporary") - || error_str.contains("rate limit") - || error_str.contains("too many requests") - || error_str.contains("connection reset") - || error_str.contains("connection aborted") - || error_str.contains("server unavailable") - || error_str.contains("bad gateway") - || error_str.contains("service unavailable") - } - - fn is_rate_limit_error(error: &anyhow::Error) -> bool { - if let Some(reqwest_error) = error.downcast_ref::() { - return reqwest_error.status() - .map(|s| s == 429) - .unwrap_or(false); - } - - let error_str = error.to_string().to_lowercase(); - error_str.contains("rate limit") || error_str.contains("too many requests") - } - - fn is_server_restart_error(&self, error: &anyhow::Error) -> bool { - if let Some(reqwest_error) = error.downcast_ref::() { - if let Some(status) = reqwest_error.status() { - return status == 502 // Bad Gateway - || status == 503 // Service Unavailable - || status == 504; // Gateway Timeout - } - - // Network-level connection issues often indicate server restart - return reqwest_error.is_connect() || reqwest_error.is_timeout(); - } - - let error_str = error.to_string().to_lowercase(); - error_str.contains("connection reset") - || error_str.contains("connection aborted") - || error_str.contains("bad gateway") - || error_str.contains("service unavailable") - || error_str.contains("server unreachable") - } - - pub async fn test_connection(&self, test_config: WebDAVTestConnection) -> Result { - info!("Testing WebDAV connection to {} ({})", - test_config.server_url, - test_config.server_type.as_deref().unwrap_or("generic")); - - // Validate server URL before constructing test URL - if test_config.server_url.trim().is_empty() { - return Ok(WebDAVConnectionResult { - success: false, - message: "❌ WebDAV server_url is empty".to_string(), - server_version: None, - server_type: None, - }); - } - - if !test_config.server_url.starts_with("http://") && !test_config.server_url.starts_with("https://") { - return Ok(WebDAVConnectionResult { - success: false, - message: format!( - "❌ WebDAV server_url must start with 'http://' or 'https://'. \ - Current value: '{}'. \ - Examples: https://cloud.example.com, http://192.168.1.100:8080", - test_config.server_url - ), - server_version: None, - server_type: None, - }); - } - - // Validate URL can be parsed - if let Err(e) = reqwest::Url::parse(&test_config.server_url) { - return Ok(WebDAVConnectionResult { - success: false, - message: format!( - "❌ WebDAV server_url is not a valid URL: {}. \ - Current value: '{}'. \ - Must be absolute URL like: https://cloud.example.com", - e, test_config.server_url - ), - server_version: None, - server_type: None, - }); - } - - let test_url = match test_config.server_type.as_deref() { - Some("nextcloud") | Some("owncloud") => format!( - "{}/remote.php/dav/files/{}/", - test_config.server_url.trim_end_matches('/'), - test_config.username - ), - _ => format!( - "{}/webdav/", - test_config.server_url.trim_end_matches('/') - ), - }; - - debug!("🔗 Constructed test URL: {}", test_url); - - let resp = self.client - .request(Method::from_bytes(b"PROPFIND").unwrap(), &test_url) - .basic_auth(&test_config.username, Some(&test_config.password)) - .header("Depth", "0") - .body(r#" - - - - - "#) - .send() - .await - .map_err(|e| { - error!("❌ WebDAV HTTP request failed for URL '{}': {}", test_url, e); - anyhow!("WebDAV HTTP request failed for URL '{}': {}. \ - This often indicates a URL configuration issue. \ - Verify the server_url is correct and accessible.", test_url, e) - })?; - - if resp.status().is_success() { - info!("✅ WebDAV connection successful"); - - // Try to get server info - let (version, server_type) = self.get_server_info(&test_config).await; - - Ok(WebDAVConnectionResult { - success: true, - message: format!("Successfully connected to WebDAV server ({})", - server_type.as_deref().unwrap_or("Generic WebDAV")), - server_version: version, - server_type, - }) - } else { - error!("❌ WebDAV connection failed with status: {} for URL: {}", resp.status(), test_url); - Ok(WebDAVConnectionResult { - success: false, - message: format!("Connection failed: HTTP {} for URL: {}", resp.status(), test_url), - server_version: None, - server_type: None, - }) - } - } - - async fn get_server_info(&self, test_config: &WebDAVTestConnection) -> (Option, Option) { - // Try Nextcloud/ownCloud capabilities first - if let Some(server_type) = &test_config.server_type { - if server_type == "nextcloud" || server_type == "owncloud" { - let capabilities_url = format!( - "{}/ocs/v1.php/cloud/capabilities?format=json", - test_config.server_url.trim_end_matches('/') - ); - - if let Ok(response) = self.client - .get(&capabilities_url) - .basic_auth(&test_config.username, Some(&test_config.password)) - .send() - .await - { - if response.status().is_success() { - if let Ok(text) = response.text().await { - // Simple version extraction - if let Some(start) = text.find("\"version\":\"") { - let version_start = start + 11; - if let Some(end) = text[version_start..].find('"') { - let version = text[version_start..version_start + end].to_string(); - return (Some(version), Some(server_type.clone())); - } - } - } - } - } - } - } - - // Fallback: try to detect server type from headers - (Some("Unknown".to_string()), test_config.server_type.clone()) - } - - pub async fn estimate_crawl(&self, folders: &[String]) -> Result { - info!("Estimating crawl for {} folders", folders.len()); - - let mut folder_infos = Vec::new(); - let supported_extensions: HashSet = self.config.file_extensions - .iter() - .map(|ext| ext.to_lowercase()) - .collect(); - - for folder_path in folders { - debug!("Analyzing folder: {}", folder_path); - - match self.analyze_folder(folder_path, &supported_extensions).await { - Ok(folder_info) => { - debug!("Folder {} has {} files ({} supported)", - folder_path, folder_info.total_files, folder_info.supported_files); - folder_infos.push(folder_info); - } - Err(e) => { - warn!("Failed to analyze folder {}: {}", folder_path, e); - // Add empty folder info so UI can show the error - folder_infos.push(WebDAVFolderInfo { - path: folder_path.clone(), - total_files: 0, - supported_files: 0, - estimated_time_hours: 0.0, - total_size_mb: 0.0, - }); - } - } - } - - let total_files: i64 = folder_infos.iter().map(|f| f.total_files).sum(); - let total_supported_files: i64 = folder_infos.iter().map(|f| f.supported_files).sum(); - let total_estimated_time_hours: f32 = folder_infos.iter().map(|f| f.estimated_time_hours).sum(); - let total_size_mb: f64 = folder_infos.iter().map(|f| f.total_size_mb).sum(); - - info!("Crawl estimate complete: {} total files, {} supported files, {:.2} hours estimated", - total_files, total_supported_files, total_estimated_time_hours); - - Ok(WebDAVCrawlEstimate { - folders: folder_infos, - total_files, - total_supported_files, - total_estimated_time_hours, - total_size_mb, - }) - } - - async fn analyze_folder(&self, folder_path: &str, supported_extensions: &HashSet) -> Result { - let files = self.discover_files_in_folder(folder_path).await?; - - let mut total_files = 0i64; - let mut supported_files = 0i64; - let mut total_size_bytes = 0i64; - - for file in files { - if !file.is_directory { - total_files += 1; - total_size_bytes += file.size; - - // Check if file extension is supported - if let Some(extension) = std::path::Path::new(&file.name) - .extension() - .and_then(|ext| ext.to_str()) - { - if supported_extensions.contains(&extension.to_lowercase()) { - supported_files += 1; - } - } - } - } - - // Estimate processing time: ~2 seconds per file for OCR - // This is a rough estimate - actual time depends on file size and complexity - let estimated_time_hours = (supported_files as f32 * 2.0) / 3600.0; - let total_size_mb = total_size_bytes as f64 / (1024.0 * 1024.0); - - Ok(WebDAVFolderInfo { - path: folder_path.to_string(), - total_files, - supported_files, - estimated_time_hours, - total_size_mb, - }) - } - - pub async fn discover_files_in_folder(&self, folder_path: &str) -> Result> { - self.retry_with_backoff("discover_files_in_folder", || { - self.discover_files_in_folder_impl(folder_path) - }).await - } - - /// Optimized discovery that checks directory ETag first to avoid unnecessary deep scans - pub async fn discover_files_in_folder_optimized(&self, folder_path: &str, user_id: uuid::Uuid, state: &crate::AppState) -> Result> { - self.discover_files_in_folder_optimized_with_recovery(folder_path, user_id, state, true).await - } - - async fn discover_files_in_folder_optimized_with_recovery(&self, folder_path: &str, user_id: uuid::Uuid, state: &crate::AppState, enable_crash_recovery: bool) -> Result> { - debug!("🔍 Starting optimized discovery for folder: {}", folder_path); - - // Check for incomplete scans that need recovery - if enable_crash_recovery { - if let Ok(incomplete_scans) = self.detect_incomplete_scans(user_id, state).await { - if !incomplete_scans.is_empty() { - info!("🔄 Detected {} incomplete scans from previous session, resuming...", incomplete_scans.len()); - for incomplete_path in incomplete_scans { - if incomplete_path.starts_with(folder_path) { - info!("🔄 Resuming incomplete scan for: {}", incomplete_path); - match self.resume_deep_scan_internal(&incomplete_path, user_id, state).await { - Ok(resumed_files) => { - info!("✅ Successfully resumed scan for {}: {} files found", incomplete_path, resumed_files.len()); - } - Err(e) => { - warn!("⚠️ Failed to resume scan for {}: {}", incomplete_path, e); - } - } - } - } - } - } - } - - // Check if we should use smart scanning - let use_smart_scan = match self.config.server_type.as_deref() { - Some("nextcloud") | Some("owncloud") => { - debug!("🚀 Using smart scanning for Nextcloud/ownCloud server"); - true - } - _ => { - debug!("📁 Using traditional scanning for generic WebDAV server"); - false - } - }; - - if use_smart_scan { - // Get stored ETag for this directory - let stored_etag = match state.db.get_webdav_directory(user_id, folder_path).await { - Ok(Some(dir)) => Some(dir.directory_etag), - Ok(None) => None, - Err(e) => { - warn!("Database error checking directory {}: {}", folder_path, e); - None - } - }; - - // Use smart scanning with depth-1 traversal and checkpoint recovery - return self.smart_directory_scan_with_checkpoints(folder_path, stored_etag.as_deref(), user_id, state).await; - } - - // Fall back to traditional optimization for other servers - // Step 1: Check directory ETag first (lightweight PROPFIND with Depth: 0) - let current_dir_etag = match self.check_directory_etag(folder_path).await { - Ok(etag) => etag, - Err(e) => { - warn!("Failed to get directory ETag for {}, falling back to full scan: {}", folder_path, e); - return self.discover_files_in_folder_impl(folder_path).await; - } - }; - - // Step 2: Check if we have this directory cached - match state.db.get_webdav_directory(user_id, folder_path).await { - Ok(Some(stored_dir)) => { - if stored_dir.directory_etag == current_dir_etag { - debug!("✅ Directory {} unchanged (ETag: {}), checking subdirectories individually", folder_path, current_dir_etag); - - // Update last_scanned_at to show we checked - let update = crate::models::UpdateWebDAVDirectory { - directory_etag: current_dir_etag, - last_scanned_at: chrono::Utc::now(), - file_count: stored_dir.file_count, - total_size_bytes: stored_dir.total_size_bytes, - }; - - if let Err(e) = state.db.update_webdav_directory(user_id, folder_path, &update).await { - warn!("Failed to update directory scan time: {}", e); - } - - // Step 2a: Check subdirectories individually for changes - let changed_files = self.check_subdirectories_for_changes(folder_path, user_id, state).await?; - return Ok(changed_files); - } else { - debug!("🔄 Directory {} changed (old ETag: {}, new ETag: {}), performing deep scan", - folder_path, stored_dir.directory_etag, current_dir_etag); - } - } - Ok(None) => { - debug!("🆕 New directory {}, performing initial scan", folder_path); - } - Err(e) => { - warn!("Database error checking directory {}: {}, proceeding with scan", folder_path, e); - } - } - - // Step 3: Directory has changed or is new - perform full discovery - let files = self.discover_files_in_folder_impl(folder_path).await?; - - // Step 4: Update directory tracking info for main directory - let file_count = files.iter().filter(|f| !f.is_directory).count() as i64; - let total_size_bytes = files.iter().filter(|f| !f.is_directory).map(|f| f.size).sum::(); - - let directory_record = crate::models::CreateWebDAVDirectory { - user_id, - directory_path: folder_path.to_string(), - directory_etag: current_dir_etag.clone(), - file_count, - total_size_bytes, - }; - - if let Err(e) = state.db.create_or_update_webdav_directory(&directory_record).await { - error!("Failed to update directory tracking for {}: {}", folder_path, e); - } else { - debug!("📊 Updated directory tracking: {} files, {} bytes, ETag: {}", - file_count, total_size_bytes, current_dir_etag); - } - - // Step 5: Track ALL subdirectories found during the scan (n-depth) - self.track_subdirectories_recursively(&files, user_id, state).await; - - Ok(files) - } - - /// Track all subdirectories recursively with rock-solid n-depth support - async fn track_subdirectories_recursively(&self, files: &[FileInfo], user_id: uuid::Uuid, state: &crate::AppState) { - use std::collections::{HashMap, BTreeSet}; - - // Step 1: Extract all unique directory paths from the file list - let mut all_directories = BTreeSet::new(); - - for file in files { - if file.is_directory { - // Add the directory itself - all_directories.insert(file.path.clone()); - } else { - // Extract all parent directories from file paths - let mut path_parts: Vec<&str> = file.path.split('/').collect(); - path_parts.pop(); // Remove the filename - - // Build directory paths from root down to immediate parent - let mut current_path = String::new(); - for part in path_parts { - if !part.is_empty() { - if !current_path.is_empty() { - current_path.push('/'); - } - current_path.push_str(part); - all_directories.insert(current_path.clone()); - } - } - } - } - - debug!("🗂️ Found {} unique directories at all levels", all_directories.len()); - - // Step 2: Create a mapping of directory -> ETag from the files list - let mut directory_etags: HashMap = HashMap::new(); - for file in files { - if file.is_directory { - directory_etags.insert(file.path.clone(), file.etag.clone()); - } - } - - // Step 3: For each directory, calculate its direct content (files and immediate subdirs) - for dir_path in &all_directories { - let dir_etag = match directory_etags.get(dir_path) { - Some(etag) => etag.clone(), - None => { - debug!("⚠️ No ETag found for directory: {}", dir_path); - continue; // Skip directories without ETags - } - }; - - // Count direct files in this directory (not in subdirectories) - let direct_files: Vec<_> = files.iter() - .filter(|f| { - !f.is_directory && - self.is_direct_child(&f.path, dir_path) - }) - .collect(); - - // Count direct subdirectories - let direct_subdirs: Vec<_> = files.iter() - .filter(|f| { - f.is_directory && - self.is_direct_child(&f.path, dir_path) - }) - .collect(); - - let file_count = direct_files.len() as i64; - let total_size_bytes = direct_files.iter().map(|f| f.size).sum::(); - - // Create or update directory tracking record - let directory_record = crate::models::CreateWebDAVDirectory { - user_id, - directory_path: dir_path.clone(), - directory_etag: dir_etag.clone(), - file_count, - total_size_bytes, - }; - - match state.db.create_or_update_webdav_directory(&directory_record).await { - Ok(_) => { - debug!("📁 Tracked directory: {} ({} files, {} subdirs, {} bytes, ETag: {})", - dir_path, file_count, direct_subdirs.len(), total_size_bytes, dir_etag); - } - Err(e) => { - warn!("Failed to update directory tracking for {}: {}", dir_path, e); - } - } - } - - debug!("✅ Completed tracking {} directories at all depth levels", all_directories.len()); - } - - /// Check if a path is a direct child of a directory (not nested deeper) - pub fn is_direct_child(&self, child_path: &str, parent_path: &str) -> bool { - // Normalize paths by removing trailing slashes - let child_normalized = child_path.trim_end_matches('/'); - let parent_normalized = parent_path.trim_end_matches('/'); - - if !child_normalized.starts_with(parent_normalized) { - return false; - } - - // Same path is not a direct child of itself - if child_normalized == parent_normalized { - return false; - } - - // Handle root directory case - if parent_normalized.is_empty() || parent_normalized == "/" { - let child_without_leading_slash = child_normalized.trim_start_matches('/'); - return !child_without_leading_slash.is_empty() && !child_without_leading_slash.contains('/'); - } - - // Remove parent path prefix and check if remainder has exactly one more path segment - let remaining = child_normalized.strip_prefix(parent_normalized) - .unwrap_or("") - .trim_start_matches('/'); - - // Direct child means no more slashes in the remaining path - !remaining.contains('/') && !remaining.is_empty() - } - - /// Perform targeted re-scanning of only specific paths that have changed - pub async fn discover_files_targeted_rescan(&self, paths_to_scan: &[String], user_id: uuid::Uuid, state: &crate::AppState) -> Result> { - debug!("🎯 Starting targeted re-scan for {} specific paths", paths_to_scan.len()); - - let mut all_files = Vec::new(); - - for path in paths_to_scan { - debug!("🔍 Targeted scan of: {}", path); - - // Convert to relative path for API calls - let relative_path = self.convert_to_relative_path(path); - - // Check if this specific path has changed - match self.check_directory_etag(&relative_path).await { - Ok(current_etag) => { - // Check cached ETag - let needs_scan = match state.db.get_webdav_directory(user_id, path).await { - Ok(Some(stored_dir)) => { - if stored_dir.directory_etag != current_etag { - debug!("🔄 Path {} changed (old: {}, new: {})", path, stored_dir.directory_etag, current_etag); - true - } else { - debug!("✅ Path {} unchanged (ETag: {})", path, current_etag); - false - } - } - Ok(None) => { - debug!("🆕 New path {} detected", path); - true - } - Err(e) => { - warn!("Database error for path {}: {}", path, e); - true // Scan on error to be safe - } - }; - - if needs_scan { - // Use shallow scan for this specific directory only - match self.discover_files_in_folder_shallow(&relative_path).await { - Ok(mut path_files) => { - debug!("📂 Found {} files in changed path {}", path_files.len(), path); - all_files.append(&mut path_files); - - // Update tracking for this specific path - self.update_single_directory_tracking(path, &path_files, user_id, state).await; - } - Err(e) => { - error!("Failed to scan changed path {}: {}", path, e); - } - } - } - } - Err(e) => { - warn!("Failed to check ETag for path {}: {}, skipping", path, e); - } - } - } - - debug!("🎯 Targeted re-scan completed: {} total files found", all_files.len()); - Ok(all_files) - } - - /// Discover files in a single directory only (shallow scan, no recursion) - async fn discover_files_in_folder_shallow(&self, folder_path: &str) -> Result> { - let folder_url = format!("{}{}", self.base_webdav_url, folder_path); - - debug!("Shallow scan of directory: {}", folder_url); - - let propfind_body = r#" - - - "#; - - let response = self.client - .request(Method::from_bytes(b"PROPFIND").unwrap(), &folder_url) - .basic_auth(&self.config.username, Some(&self.config.password)) - .header("Depth", "1") // Only direct children, not recursive - .header("Content-Type", "application/xml") - .body(propfind_body) - .send() - .await?; - - if !response.status().is_success() { - return Err(anyhow!("PROPFIND request failed: {}", response.status())); - } - - let response_text = response.text().await?; - debug!("Shallow WebDAV response received, parsing..."); - - // Use the parser that includes directories for shallow scans - self.parse_webdav_response_with_directories(&response_text) - } - - /// Update tracking for a single directory without recursive processing - async fn update_single_directory_tracking(&self, directory_path: &str, files: &[FileInfo], user_id: uuid::Uuid, state: &crate::AppState) { - // Get the directory's own ETag - let dir_etag = files.iter() - .find(|f| f.is_directory && f.path == directory_path) - .map(|f| f.etag.clone()) - .unwrap_or_else(|| { - warn!("No ETag found for directory {}, using timestamp-based fallback", directory_path); - chrono::Utc::now().timestamp().to_string() - }); - - // Count direct files in this directory only - let direct_files: Vec<_> = files.iter() - .filter(|f| !f.is_directory && self.is_direct_child(&f.path, directory_path)) - .collect(); - - let file_count = direct_files.len() as i64; - let total_size_bytes = direct_files.iter().map(|f| f.size).sum::(); - - let directory_record = crate::models::CreateWebDAVDirectory { - user_id, - directory_path: directory_path.to_string(), - directory_etag: dir_etag.clone(), - file_count, - total_size_bytes, - }; - - match state.db.create_or_update_webdav_directory(&directory_record).await { - Ok(_) => { - debug!("📊 Updated single directory tracking: {} ({} files, {} bytes, ETag: {})", - directory_path, file_count, total_size_bytes, dir_etag); - } - Err(e) => { - error!("Failed to update single directory tracking for {}: {}", directory_path, e); - } - } - } - - /// Check subdirectories individually for changes when parent directory is unchanged - async fn check_subdirectories_for_changes(&self, parent_path: &str, user_id: uuid::Uuid, state: &crate::AppState) -> Result> { - // First, check if this server supports recursive ETags - let supports_recursive_etags = match self.config.server_type.as_deref() { - Some("nextcloud") | Some("owncloud") => true, - _ => false - }; - - if supports_recursive_etags { - // With recursive ETags, if parent hasn't changed, nothing inside has changed - debug!("🚀 Server supports recursive ETags - parent {} unchanged means all contents unchanged", parent_path); - return Ok(Vec::new()); - } - - // For servers without recursive ETags, fall back to checking each subdirectory - debug!("📁 Server doesn't support recursive ETags, checking subdirectories individually"); - - // Get all known subdirectories from database - let known_directories = match state.db.list_webdav_directories(user_id).await { - Ok(dirs) => dirs, - Err(e) => { - warn!("Failed to get known directories, falling back to full scan: {}", e); - return self.discover_files_in_folder_impl(parent_path).await; - } - }; - - // Filter to subdirectories of this parent - let subdirectories: Vec<_> = known_directories.iter() - .filter(|dir| dir.directory_path.starts_with(parent_path) && dir.directory_path != parent_path) - .collect(); - - if subdirectories.is_empty() { - debug!("📁 No known subdirectories for {}, performing initial scan to discover structure", parent_path); - return self.discover_files_in_folder_impl(parent_path).await; - } - - debug!("🔍 Checking {} known subdirectories for changes", subdirectories.len()); - - let mut changed_files = Vec::new(); - let subdirectory_count = subdirectories.len(); - - // Check each subdirectory individually - for subdir in subdirectories { - let subdir_path = &subdir.directory_path; - - // Check if this subdirectory has changed - match self.check_directory_etag(subdir_path).await { - Ok(current_etag) => { - if current_etag != subdir.directory_etag { - debug!("🔄 Subdirectory {} changed (old: {}, new: {}), scanning recursively", - subdir_path, subdir.directory_etag, current_etag); - - // This subdirectory changed - get all its files recursively - match self.discover_files_in_folder_impl(subdir_path).await { - Ok(mut subdir_files) => { - debug!("📂 Found {} files in changed subdirectory {}", subdir_files.len(), subdir_path); - changed_files.append(&mut subdir_files); - - // Update tracking for this subdirectory and its children - self.track_subdirectories_recursively(&subdir_files, user_id, state).await; - } - Err(e) => { - error!("Failed to scan changed subdirectory {}: {}", subdir_path, e); - } - } - } else { - debug!("✅ Subdirectory {} unchanged (ETag: {})", subdir_path, current_etag); - - // Update last_scanned_at even for unchanged directories - let update = crate::models::UpdateWebDAVDirectory { - directory_etag: current_etag, - last_scanned_at: chrono::Utc::now(), - file_count: subdir.file_count, - total_size_bytes: subdir.total_size_bytes, - }; - - if let Err(e) = state.db.update_webdav_directory(user_id, subdir_path, &update).await { - warn!("Failed to update scan time for {}: {}", subdir_path, e); - } - } - } - Err(e) => { - warn!("Failed to check ETag for subdirectory {}: {}", subdir_path, e); - // Don't fail the entire operation, just log and continue - } - } - } - - debug!("🎯 Found {} changed files across {} subdirectories", changed_files.len(), subdirectory_count); - Ok(changed_files) - } - - /// Check directory ETag without performing deep scan - used for optimization - pub async fn check_directory_etag(&self, folder_path: &str) -> Result { - self.retry_with_backoff("check_directory_etag", || { - self.check_directory_etag_impl(folder_path) - }).await - } - - async fn check_directory_etag_impl(&self, folder_path: &str) -> Result { - let folder_url = format!("{}{}", self.base_webdav_url, folder_path); - - debug!("Checking directory ETag for: {}", folder_url); - - let propfind_body = r#" - - - - - "#; - - let response = self.client - .request(Method::from_bytes(b"PROPFIND").unwrap(), &folder_url) - .basic_auth(&self.config.username, Some(&self.config.password)) - .header("Depth", "0") // Only check the directory itself, not contents - .header("Content-Type", "application/xml") - .body(propfind_body) - .send() - .await?; - - if !response.status().is_success() { - return Err(anyhow!("PROPFIND request failed: {}", response.status())); - } - - let response_text = response.text().await?; - debug!("Directory ETag response received, parsing..."); - - // Parse the response to extract directory ETag - self.parse_directory_etag(&response_text) - } - - pub fn parse_directory_etag(&self, xml_text: &str) -> Result { - use quick_xml::events::Event; - use quick_xml::reader::Reader; - - let mut reader = Reader::from_str(xml_text); - reader.config_mut().trim_text(true); - - let mut current_element = String::new(); - let mut etag = String::new(); - let mut buf = Vec::new(); - - loop { - match reader.read_event_into(&mut buf) { - Ok(Event::Start(e)) | Ok(Event::Empty(e)) => { - let local_name = e.local_name(); - let name = std::str::from_utf8(local_name.as_ref())?; - current_element = name.to_lowercase(); - } - Ok(Event::Text(e)) => { - if current_element == "getetag" { - etag = e.unescape()?.to_string(); - break; - } - } - Ok(Event::End(_)) => { - current_element.clear(); - } - Ok(Event::Eof) => break, - Err(e) => return Err(anyhow!("XML parsing error: {}", e)), - _ => {} - } - } - - if etag.is_empty() { - return Err(anyhow!("No ETag found in directory response")); - } - - // Use existing ETag normalization function from parser module - let normalized_etag = crate::webdav_xml_parser::normalize_etag(&etag); - debug!("Directory ETag: {}", normalized_etag); - - Ok(normalized_etag) - } - - async fn discover_files_in_folder_impl(&self, folder_path: &str) -> Result> { - let folder_url = format!("{}{}", self.base_webdav_url, folder_path); - - debug!("Discovering files in: {}", folder_url); - - let propfind_body = r#" - - - "#; - - let response = self.client - .request(Method::from_bytes(b"PROPFIND").unwrap(), &folder_url) - .basic_auth(&self.config.username, Some(&self.config.password)) - .header("Depth", "infinity") // Get all files recursively - .header("Content-Type", "application/xml") - .body(propfind_body) - .send() - .await?; - - if !response.status().is_success() { - return Err(anyhow!("PROPFIND request failed: {}", response.status())); - } - - let response_text = response.text().await?; - debug!("WebDAV response received, parsing..."); - - self.parse_webdav_response(&response_text) - } - - pub fn parse_webdav_response(&self, xml_text: &str) -> Result> { - parse_propfind_response(xml_text) - } - - /// Parse WebDAV response including both files and directories - /// Used for shallow directory scans where we need to track directory structure - pub fn parse_webdav_response_with_directories(&self, xml_text: &str) -> Result> { - parse_propfind_response_with_directories(xml_text) - } - - /// Test if the WebDAV server supports recursive ETag propagation - /// (i.e., parent directory ETags change when child content changes) - /// This test is read-only and checks existing directory structures - pub async fn test_recursive_etag_support(&self) -> Result { - debug!("🔬 Testing recursive ETag support using existing directory structure"); - - // Find a directory with subdirectories from our watch folders - for watch_folder in &self.config.watch_folders { - // Convert to relative path for API calls - let relative_watch_folder = self.convert_to_relative_path(watch_folder); - - // Get the directory structure with depth 1 - match self.discover_files_in_folder_shallow(&relative_watch_folder).await { - Ok(entries) => { - // Find a subdirectory to test with - let subdirs: Vec<_> = entries.iter() - .filter(|e| e.is_directory && &e.path != watch_folder) - .collect(); - - if subdirs.is_empty() { - continue; // Try next watch folder - } - - // Use the first subdirectory for testing - let test_subdir = &subdirs[0]; - debug!("Testing with directory: {} and subdirectory: {}", watch_folder, test_subdir.path); - - // Step 1: Get parent directory ETag - let parent_etag = self.check_directory_etag(&relative_watch_folder).await?; - - // Step 2: Get subdirectory ETag (convert to relative path) - let relative_subdir_path = self.convert_to_relative_path(&test_subdir.path); - let subdir_etag = self.check_directory_etag(&relative_subdir_path).await?; - - // Step 3: Check if parent has a different ETag than child - // In a recursive ETag system, they should be different but related - // The key test is: if we check the parent again after some time, - // and a file deep inside changed, did the parent ETag change? - - // For now, we'll just check if the server provides ETags at all - if !parent_etag.is_empty() && !subdir_etag.is_empty() { - debug!("✅ Server provides ETags for directories"); - debug!(" Parent ETag: {}", parent_etag); - debug!(" Subdir ETag: {}", subdir_etag); - - // Without write access, we can't definitively test recursive propagation - // But we can make an educated guess based on the server type - let likely_supports_recursive = match self.config.server_type.as_deref() { - Some("nextcloud") | Some("owncloud") => { - debug!(" Nextcloud/ownCloud servers typically support recursive ETags"); - true - } - _ => { - debug!(" Unknown server type - recursive ETag support uncertain"); - false - } - }; - - return Ok(likely_supports_recursive); - } - } - Err(e) => { - warn!("Failed to scan directory {}: {}", watch_folder, e); - continue; - } - } - } - - debug!("❓ Could not determine recursive ETag support - no suitable directories found"); - Ok(false) - } - - /// Convert full WebDAV path to relative path for use with base_webdav_url - pub fn convert_to_relative_path(&self, full_webdav_path: &str) -> String { - // For Nextcloud/ownCloud paths like "/remote.php/dav/files/username/folder/subfolder/" - // We need to extract just the "folder/subfolder/" part - let webdav_prefix = match self.config.server_type.as_deref() { - Some("nextcloud") | Some("owncloud") => { - format!("/remote.php/dav/files/{}/", self.config.username) - }, - _ => "/webdav/".to_string() - }; - - if let Some(relative_part) = full_webdav_path.strip_prefix(&webdav_prefix) { - format!("/{}", relative_part) - } else { - // If path doesn't match expected format, return as-is - full_webdav_path.to_string() - } - } - - /// Smart directory scan that uses depth-1 traversal for efficient synchronization - /// Only scans directories whose ETags have changed, avoiding unnecessary deep scans - pub fn smart_directory_scan<'a>( - &'a self, - path: &'a str, - known_etag: Option<&'a str>, - user_id: uuid::Uuid, - state: &'a crate::AppState - ) -> std::pin::Pin>> + Send + 'a>> { - Box::pin(async move { - debug!("🧠 Smart scan starting for path: {}", path); - - // Convert full WebDAV path to relative path for existing functions - let relative_path = self.convert_to_relative_path(path); - debug!("🔄 Converted {} to relative path: {}", path, relative_path); - - // Step 1: Check current directory ETag - let current_etag = match self.check_directory_etag(&relative_path).await { - Ok(etag) => etag, - Err(e) => { - warn!("Failed to get directory ETag for {}, falling back to full scan: {}", path, e); - return self.discover_files_in_folder_impl(&relative_path).await; - } - }; - - // Step 2: If unchanged and we support recursive ETags, nothing to do - if known_etag == Some(¤t_etag) { - let supports_recursive = match self.config.server_type.as_deref() { - Some("nextcloud") | Some("owncloud") => true, - _ => false - }; - - if supports_recursive { - debug!("✅ Directory {} unchanged (recursive ETag: {}), skipping scan", path, current_etag); - return Ok(Vec::new()); - } else { - debug!("📁 Directory {} ETag unchanged but server doesn't support recursive ETags, checking subdirectories", path); - } - } else { - debug!("🔄 Directory {} changed (old: {:?}, new: {})", path, known_etag, current_etag); - } - - // Step 3: Directory changed or we need to check subdirectories - do depth-1 scan - let entries = match self.discover_files_in_folder_shallow(&relative_path).await { - Ok(files) => files, - Err(e) => { - error!("Failed shallow scan of {}: {}", path, e); - return Err(e); - } - }; - - let mut all_files = Vec::new(); - let mut subdirs_to_scan = Vec::new(); - - // Separate files and directories - for entry in entries { - if entry.is_directory && entry.path != path { - subdirs_to_scan.push(entry.clone()); - } - all_files.push(entry); - } - - // Note: We'll update the directory tracking at the end after processing all subdirectories - // to avoid ETag race conditions during the scan - - // Step 4: Process subdirectories concurrently with controlled parallelism - if !subdirs_to_scan.is_empty() { - let semaphore = std::sync::Arc::new(Semaphore::new(self.concurrency_config.max_concurrent_scans)); - let subdirs_stream = stream::iter(subdirs_to_scan) - .map(|subdir| { - let semaphore = semaphore.clone(); - let service = self.clone(); - async move { - let _permit = semaphore.acquire().await.map_err(|e| anyhow!("Semaphore error: {}", e))?; - - // Get stored ETag for this subdirectory - let stored_etag = match state.db.get_webdav_directory(user_id, &subdir.path).await { - Ok(Some(dir)) => Some(dir.directory_etag), - Ok(None) => { - debug!("🆕 New subdirectory discovered: {}", subdir.path); - None - } - Err(e) => { - warn!("Database error checking subdirectory {}: {}", subdir.path, e); - None - } - }; - - // If ETag changed or new directory, scan it recursively - if stored_etag.as_deref() != Some(&subdir.etag) { - debug!("🔄 Subdirectory {} needs scanning (old: {:?}, new: {})", - subdir.path, stored_etag, subdir.etag); - - match service.smart_directory_scan_internal(&subdir.path, stored_etag.as_deref(), user_id, state).await { - Ok(subdir_files) => { - debug!("📂 Found {} entries in subdirectory {}", subdir_files.len(), subdir.path); - Result::, anyhow::Error>::Ok(subdir_files) - } - Err(e) => { - error!("Failed to scan subdirectory {}: {}", subdir.path, e); - Result::, anyhow::Error>::Ok(Vec::new()) // Continue with other subdirectories - } - } - } else { - debug!("✅ Subdirectory {} unchanged (ETag: {})", subdir.path, subdir.etag); - // Don't update database during scan - will be handled by top-level caller - Result::, anyhow::Error>::Ok(Vec::new()) - } - } - }) - .buffer_unordered(self.concurrency_config.max_concurrent_scans); - - // Collect all results concurrently - let mut subdirs_stream = std::pin::pin!(subdirs_stream); - while let Some(result) = subdirs_stream.next().await { - match result { - Ok(mut subdir_files) => { - all_files.append(&mut subdir_files); - } - Err(e) => { - warn!("Concurrent subdirectory scan error: {}", e); - // Continue processing other subdirectories - } - } - } - } - - // Only update database if this is the top-level call (not a recursive subdirectory scan) - let file_count = all_files.iter().filter(|f| !f.is_directory && self.is_direct_child(&f.path, path)).count() as i64; - let total_size = all_files.iter() - .filter(|f| !f.is_directory && self.is_direct_child(&f.path, path)) - .map(|f| f.size) - .sum::(); - - let dir_record = crate::models::CreateWebDAVDirectory { - user_id, - directory_path: path.to_string(), - directory_etag: current_etag.clone(), - file_count, - total_size_bytes: total_size, - }; - - if let Err(e) = state.db.create_or_update_webdav_directory(&dir_record).await { - warn!("Failed to update directory tracking for {}: {}", path, e); - } - - debug!("🧠 Smart scan completed for {}: {} total entries found", path, all_files.len()); - Ok(all_files) - }) - } - - /// Internal version of smart_directory_scan that doesn't update the database - /// Used for recursive subdirectory scanning to avoid race conditions - fn smart_directory_scan_internal<'a>( - &'a self, - path: &'a str, - known_etag: Option<&'a str>, - user_id: uuid::Uuid, - state: &'a crate::AppState - ) -> std::pin::Pin>> + Send + 'a>> { - Box::pin(async move { - debug!("🧠 Smart scan (internal) starting for path: {}", path); - - // Convert full WebDAV path to relative path for existing functions - let relative_path = self.convert_to_relative_path(path); - debug!("🔄 Converted {} to relative path: {}", path, relative_path); - - // Step 1: Check current directory ETag - let current_etag = match self.check_directory_etag(&relative_path).await { - Ok(etag) => etag, - Err(e) => { - warn!("Failed to get directory ETag for {}, falling back to full scan: {}", path, e); - return self.discover_files_in_folder_impl(&relative_path).await; - } - }; - - // Step 2: If unchanged and we support recursive ETags, nothing to do - if known_etag == Some(¤t_etag) { - let supports_recursive = match self.config.server_type.as_deref() { - Some("nextcloud") | Some("owncloud") => true, - _ => false - }; - - if supports_recursive { - debug!("✅ Directory {} unchanged (recursive ETag: {}), skipping scan", path, current_etag); - return Ok(Vec::new()); - } else { - debug!("📁 Directory {} ETag unchanged but server doesn't support recursive ETags, checking subdirectories", path); - } - } else { - debug!("🔄 Directory {} changed (old: {:?}, new: {})", path, known_etag, current_etag); - } - - // Step 3: Directory changed or we need to check subdirectories - do depth-1 scan - let entries = match self.discover_files_in_folder_shallow(&relative_path).await { - Ok(files) => files, - Err(e) => { - error!("Failed shallow scan of {}: {}", path, e); - return Err(e); - } - }; - - let mut all_files = Vec::new(); - let mut subdirs_to_scan = Vec::new(); - - // Separate files and directories - for entry in entries { - if entry.is_directory && entry.path != path { - subdirs_to_scan.push(entry.clone()); - } - all_files.push(entry); - } - - // Note: No database update in internal function to avoid race conditions - - // Step 4: Process subdirectories concurrently with controlled parallelism - if !subdirs_to_scan.is_empty() { - let semaphore = std::sync::Arc::new(Semaphore::new(self.concurrency_config.max_concurrent_scans)); - let subdirs_stream = stream::iter(subdirs_to_scan) - .map(|subdir| { - let semaphore = semaphore.clone(); - let service = self.clone(); - async move { - let _permit = semaphore.acquire().await.map_err(|e| anyhow!("Semaphore error: {}", e))?; - - // Get stored ETag for this subdirectory - let stored_etag = match state.db.get_webdav_directory(user_id, &subdir.path).await { - Ok(Some(dir)) => Some(dir.directory_etag), - Ok(None) => { - debug!("🆕 New subdirectory discovered: {}", subdir.path); - None - } - Err(e) => { - warn!("Database error checking subdirectory {}: {}", subdir.path, e); - None - } - }; - - // If ETag changed or new directory, scan it recursively - if stored_etag.as_deref() != Some(&subdir.etag) { - debug!("🔄 Subdirectory {} needs scanning (old: {:?}, new: {})", - subdir.path, stored_etag, subdir.etag); - - match service.smart_directory_scan_internal(&subdir.path, stored_etag.as_deref(), user_id, state).await { - Ok(subdir_files) => { - debug!("📂 Found {} entries in subdirectory {}", subdir_files.len(), subdir.path); - Result::, anyhow::Error>::Ok(subdir_files) - } - Err(e) => { - error!("Failed to scan subdirectory {}: {}", subdir.path, e); - Result::, anyhow::Error>::Ok(Vec::new()) // Continue with other subdirectories - } - } - } else { - debug!("✅ Subdirectory {} unchanged (ETag: {})", subdir.path, subdir.etag); - // Don't update database during internal scan - Result::, anyhow::Error>::Ok(Vec::new()) - } - } - }) - .buffer_unordered(self.concurrency_config.max_concurrent_scans); - - // Collect all results concurrently - let mut subdirs_stream = std::pin::pin!(subdirs_stream); - while let Some(result) = subdirs_stream.next().await { - match result { - Ok(mut subdir_files) => { - all_files.append(&mut subdir_files); - } - Err(e) => { - warn!("Concurrent subdirectory scan error: {}", e); - // Continue processing other subdirectories - } - } - } - } - - debug!("🧠 Smart scan (internal) completed for {}: {} total entries found", path, all_files.len()); - Ok(all_files) - }) - } - - /// Smart directory scan with checkpoint-based crash recovery - pub fn smart_directory_scan_with_checkpoints<'a>( - &'a self, - path: &'a str, - known_etag: Option<&'a str>, - user_id: uuid::Uuid, - state: &'a crate::AppState - ) -> std::pin::Pin>> + Send + 'a>> { - Box::pin(async move { - debug!("🧠 Smart scan with checkpoints starting for path: {}", path); - - // Mark scan as in progress (checkpoint) - if let Err(e) = self.mark_scan_in_progress(user_id, path, state).await { - warn!("Failed to mark scan in progress for {}: {}", path, e); - } - - // Perform the actual scan - let result = self.smart_directory_scan_internal(path, known_etag, user_id, state).await; - - match &result { - Ok(files) => { - debug!("✅ Smart scan completed for {}: {} files", path, files.len()); - - // Update directory tracking and mark scan complete - let file_count = files.iter().filter(|f| !f.is_directory && self.is_direct_child(&f.path, path)).count() as i64; - let total_size = files.iter() - .filter(|f| !f.is_directory && self.is_direct_child(&f.path, path)) - .map(|f| f.size) - .sum::(); - - let current_etag = known_etag.unwrap_or("unknown").to_string(); - let dir_record = crate::models::CreateWebDAVDirectory { - user_id, - directory_path: path.to_string(), - directory_etag: current_etag.clone(), - file_count, - total_size_bytes: total_size, - }; - - if let Err(e) = state.db.create_or_update_webdav_directory(&dir_record).await { - warn!("Failed to update directory tracking for {}: {}", path, e); - } - - // Mark scan as complete (remove checkpoint) - if let Err(e) = self.mark_scan_complete(user_id, path, state).await { - warn!("Failed to mark scan complete for {}: {}", path, e); - } - } - Err(e) => { - error!("❌ Smart scan failed for {}: {}", path, e); - // Mark scan as failed for better tracking - if let Err(mark_err) = state.db.mark_webdav_scan_failed(user_id, path, &e.to_string()).await { - warn!("Failed to mark scan as failed for {}: {}", path, mark_err); - } - } - } - - result - }) - } - - /// Detect directories with incomplete scans that need recovery - async fn detect_incomplete_scans(&self, user_id: uuid::Uuid, state: &crate::AppState) -> Result> { - debug!("🔍 Checking for incomplete scans..."); - - // Check for both incomplete scans and stale scans (running too long, likely crashed) - let mut incomplete_scans = state.db.get_incomplete_webdav_scans(user_id).await.unwrap_or_default(); - let stale_scans = state.db.get_stale_webdav_scans(user_id, 30).await.unwrap_or_default(); // 30 minute timeout - - // Combine and deduplicate - incomplete_scans.extend(stale_scans); - incomplete_scans.sort(); - incomplete_scans.dedup(); - - if !incomplete_scans.is_empty() { - info!("Found {} incomplete/stale scans to recover", incomplete_scans.len()); - } - - Ok(incomplete_scans) - } - - /// Mark a directory scan as in progress (for crash recovery) - async fn mark_scan_in_progress(&self, user_id: uuid::Uuid, path: &str, state: &crate::AppState) -> Result<()> { - debug!("📝 Marking scan in progress for: {}", path); - state.db.mark_webdav_scan_in_progress(user_id, path).await - } - - /// Mark a directory scan as complete (remove crash recovery checkpoint) - async fn mark_scan_complete(&self, user_id: uuid::Uuid, path: &str, state: &crate::AppState) -> Result<()> { - debug!("✅ Marking scan complete for: {}", path); - state.db.mark_webdav_scan_complete(user_id, path).await - } - - - /// Internal resume function that doesn't trigger crash recovery detection (to avoid recursion) - async fn resume_deep_scan_internal(&self, checkpoint_path: &str, user_id: uuid::Uuid, state: &crate::AppState) -> Result> { - info!("🔄 Resuming deep scan from checkpoint: {}", checkpoint_path); - - // Check if the checkpoint directory is still accessible - let relative_checkpoint_path = self.convert_to_relative_path(checkpoint_path); - match self.check_directory_etag(&relative_checkpoint_path).await { - Ok(current_etag) => { - info!("✅ Checkpoint directory accessible, resuming scan"); - - // Check if directory changed since checkpoint - match state.db.get_webdav_directory(user_id, checkpoint_path).await { - Ok(Some(stored_dir)) => { - if stored_dir.directory_etag != current_etag { - info!("🔄 Directory changed since checkpoint, performing full rescan"); - } else { - info!("✅ Directory unchanged since checkpoint, can skip"); - return Ok(Vec::new()); - } - } - Ok(None) => { - info!("🆕 New checkpoint directory, performing full scan"); - } - Err(e) => { - warn!("Database error checking checkpoint {}: {}, performing full scan", checkpoint_path, e); - } - } - - // Resume with smart scanning from this point - self.smart_directory_scan_with_checkpoints(checkpoint_path, None, user_id, state).await - } - Err(e) => { - warn!("Checkpoint directory {} inaccessible after restart: {}", checkpoint_path, e); - // Server might have restarted, wait a bit and retry - tokio::time::sleep(Duration::from_secs(5)).await; - - match self.check_directory_etag(&relative_checkpoint_path).await { - Ok(_) => { - info!("🔄 Server recovered, resuming scan"); - self.smart_directory_scan_with_checkpoints(checkpoint_path, None, user_id, state).await - } - Err(e2) => { - error!("Failed to resume deep scan after server restart: {}", e2); - Err(anyhow!("Cannot resume deep scan: server unreachable after restart")) - } - } - } - } - } - - - pub async fn download_file(&self, file_path: &str) -> Result> { - self.retry_with_backoff("download_file", || { - self.download_file_impl(file_path) - }).await - } - - async fn download_file_impl(&self, file_path: &str) -> Result> { - // For Nextcloud/ownCloud, the file_path might already be an absolute WebDAV path - // The path comes from href which is already URL-encoded - let file_url = if file_path.starts_with("/remote.php/dav/") { - // Use the server URL + the full WebDAV path - // Don't double-encode - the path from href is already properly encoded - format!("{}{}", self.config.server_url.trim_end_matches('/'), file_path) - } else { - // Traditional approach for other WebDAV servers or relative paths - format!("{}{}", self.base_webdav_url, file_path) - }; - - debug!("Downloading file: {}", file_url); - debug!("Original file_path: {}", file_path); - - let response = self.client - .get(&file_url) - .basic_auth(&self.config.username, Some(&self.config.password)) - .send() - .await?; - - if !response.status().is_success() { - return Err(anyhow!("File download failed: {}", response.status())); - } - - let bytes = response.bytes().await?; - debug!("Downloaded {} bytes", bytes.len()); - - Ok(bytes.to_vec()) - } - -} - -pub async fn test_webdav_connection( - server_url: &str, - username: &str, - password: &str, -) -> Result { - let client = Client::new(); - - // Try to list the root directory to test connectivity - let response = client - .request(Method::from_bytes(b"PROPFIND")?, server_url) - .header("Depth", "0") - .basic_auth(username, Some(password)) - .timeout(Duration::from_secs(10)) - .send() - .await?; - - Ok(response.status().is_success()) -} - -impl WebDAVService { - /// Validate ETag tracking integrity and directory consistency - /// This replaces the need for periodic deep scans with intelligent validation - pub async fn validate_etag_tracking(&self, user_id: uuid::Uuid, state: &crate::AppState) -> Result { - let validation_id = uuid::Uuid::new_v4(); - let started_at = chrono::Utc::now(); - - info!("🔍 Starting ETag validation for user {} (validation_id: {})", user_id, validation_id); - - let mut report = ValidationReport { - validation_id, - user_id, - started_at, - completed_at: None, - total_directories_checked: 0, - issues_found: Vec::new(), - recommendations: Vec::new(), - etag_support_verified: false, - server_health_score: 100, - }; - - // Step 1: Verify ETag support is still working - match self.test_recursive_etag_support().await { - Ok(supports_etags) => { - report.etag_support_verified = supports_etags; - if !supports_etags { - report.issues_found.push(ValidationIssue { - issue_type: ValidationIssueType::ETagUnreliable, - directory_path: "server".to_string(), - severity: ValidationSeverity::Critical, - description: "Server no longer supports recursive ETags reliably".to_string(), - discovered_at: chrono::Utc::now(), - }); - report.server_health_score = 30; - } - } - Err(e) => { - warn!("Failed to test ETag support: {}", e); - report.issues_found.push(ValidationIssue { - issue_type: ValidationIssueType::ETagUnreliable, - directory_path: "server".to_string(), - severity: ValidationSeverity::Error, - description: format!("Cannot verify ETag support: {}", e), - discovered_at: chrono::Utc::now(), - }); - report.server_health_score = 50; - } - } - - // Step 2: Check tracked directories for issues - match state.db.list_webdav_directories(user_id).await { - Ok(tracked_dirs) => { - report.total_directories_checked = tracked_dirs.len() as u32; - - for tracked_dir in tracked_dirs { - self.validate_single_directory(&tracked_dir, &mut report, state).await; - } - } - Err(e) => { - error!("Failed to load tracked directories: {}", e); - report.issues_found.push(ValidationIssue { - issue_type: ValidationIssueType::Missing, - directory_path: "database".to_string(), - severity: ValidationSeverity::Critical, - description: format!("Cannot access directory tracking database: {}", e), - discovered_at: chrono::Utc::now(), - }); - report.server_health_score = 10; - } - } - - // Step 3: Sample a few watch directories to check for untracked directories - for watch_folder in &self.config.watch_folders { - if let Err(e) = self.check_for_untracked_directories(watch_folder, &mut report, user_id, state).await { - warn!("Failed to check for untracked directories in {}: {}", watch_folder, e); - } - } - - // Step 4: Generate recommendations based on issues found - self.generate_validation_recommendations(&mut report); - - report.completed_at = Some(chrono::Utc::now()); - let duration = report.completed_at.unwrap() - report.started_at; - - info!("✅ ETag validation completed in {:.2}s. Health score: {}/100, {} issues found", - duration.num_milliseconds() as f64 / 1000.0, - report.server_health_score, - report.issues_found.len()); - - Ok(report) - } - - /// Validate a single tracked directory - async fn validate_single_directory( - &self, - tracked_dir: &crate::models::WebDAVDirectory, - report: &mut ValidationReport, - state: &crate::AppState - ) { - let relative_path = self.convert_to_relative_path(&tracked_dir.directory_path); - - // Check if directory still exists and get current ETag - match self.check_directory_etag(&relative_path).await { - Ok(current_etag) => { - // Check for ETag mismatch - if current_etag != tracked_dir.directory_etag { - report.issues_found.push(ValidationIssue { - issue_type: ValidationIssueType::ETagMismatch, - directory_path: tracked_dir.directory_path.clone(), - severity: ValidationSeverity::Warning, - description: format!("ETag changed from '{}' to '{}' - directory may need rescanning", - tracked_dir.directory_etag, current_etag), - discovered_at: chrono::Utc::now(), - }); - report.server_health_score = report.server_health_score.saturating_sub(5); - } - - // Check for stale directories (not scanned in >7 days) - let last_scanned = tracked_dir.last_scanned_at; - let duration = chrono::Utc::now() - last_scanned; - let days_old = duration.num_days(); - if days_old > 7 { - report.issues_found.push(ValidationIssue { - issue_type: ValidationIssueType::Stale, - directory_path: tracked_dir.directory_path.clone(), - severity: if days_old > 30 { ValidationSeverity::Warning } else { ValidationSeverity::Info }, - description: format!("Directory not scanned for {} days", days_old), - discovered_at: chrono::Utc::now(), - }); - if days_old > 30 { - report.server_health_score = report.server_health_score.saturating_sub(3); - } - } - } - Err(e) => { - // Directory inaccessible or missing - report.issues_found.push(ValidationIssue { - issue_type: ValidationIssueType::Inaccessible, - directory_path: tracked_dir.directory_path.clone(), - severity: ValidationSeverity::Error, - description: format!("Cannot access directory: {}", e), - discovered_at: chrono::Utc::now(), - }); - report.server_health_score = report.server_health_score.saturating_sub(10); - } - } - } - - /// Check for directories that exist on server but aren't tracked - async fn check_for_untracked_directories( - &self, - watch_folder: &str, - report: &mut ValidationReport, - user_id: uuid::Uuid, - state: &crate::AppState - ) -> Result<()> { - let relative_watch_folder = self.convert_to_relative_path(watch_folder); - - // Get shallow listing of watch folder - match self.discover_files_in_folder_shallow(&relative_watch_folder).await { - Ok(entries) => { - // Find directories - let server_dirs: Vec<_> = entries.iter() - .filter(|e| e.is_directory) - .collect(); - - // Check if each directory is tracked - for server_dir in server_dirs { - match state.db.get_webdav_directory(user_id, &server_dir.path).await { - Ok(None) => { - // Directory exists on server but not tracked - report.issues_found.push(ValidationIssue { - issue_type: ValidationIssueType::Untracked, - directory_path: server_dir.path.clone(), - severity: ValidationSeverity::Info, - description: "Directory exists on server but not in tracking database".to_string(), - discovered_at: chrono::Utc::now(), - }); - report.server_health_score = report.server_health_score.saturating_sub(2); - } - Ok(Some(_)) => { - // Directory is tracked, all good - } - Err(e) => { - warn!("Database error checking directory {}: {}", server_dir.path, e); - } - } - } - } - Err(e) => { - return Err(anyhow!("Failed to list watch folder {}: {}", watch_folder, e)); - } - } - - Ok(()) - } - - /// Generate actionable recommendations based on validation issues - fn generate_validation_recommendations(&self, report: &mut ValidationReport) { - let mut etag_mismatches = Vec::new(); - let mut untracked_dirs = Vec::new(); - let mut inaccessible_dirs = Vec::new(); - let mut stale_dirs = Vec::new(); - let mut etag_unreliable = false; - - // Categorize issues - for issue in &report.issues_found { - match issue.issue_type { - ValidationIssueType::ETagMismatch => etag_mismatches.push(issue.directory_path.clone()), - ValidationIssueType::Untracked => untracked_dirs.push(issue.directory_path.clone()), - ValidationIssueType::Inaccessible => inaccessible_dirs.push(issue.directory_path.clone()), - ValidationIssueType::Stale => stale_dirs.push(issue.directory_path.clone()), - ValidationIssueType::ETagUnreliable => etag_unreliable = true, - _ => {} - } - } - - // Generate recommendations - if etag_unreliable { - report.recommendations.push(ValidationRecommendation { - action: ValidationAction::DisableETagOptimization, - reason: "ETag support is unreliable, consider switching to periodic deep scans".to_string(), - affected_directories: vec!["all".to_string()], - priority: ValidationSeverity::Critical, - }); - } else if !etag_mismatches.is_empty() { - report.recommendations.push(ValidationRecommendation { - action: ValidationAction::DeepScanRequired, - reason: format!("{} directories have ETag mismatches and need rescanning", etag_mismatches.len()), - affected_directories: etag_mismatches, - priority: ValidationSeverity::Warning, - }); - } - - if !untracked_dirs.is_empty() { - report.recommendations.push(ValidationRecommendation { - action: ValidationAction::DeepScanRequired, - reason: format!("{} untracked directories found on server", untracked_dirs.len()), - affected_directories: untracked_dirs, - priority: ValidationSeverity::Info, - }); - } - - if !inaccessible_dirs.is_empty() { - report.recommendations.push(ValidationRecommendation { - action: ValidationAction::CheckServerConfiguration, - reason: format!("{} directories are inaccessible", inaccessible_dirs.len()), - affected_directories: inaccessible_dirs, - priority: ValidationSeverity::Error, - }); - } - - if !stale_dirs.is_empty() && stale_dirs.len() > 10 { - report.recommendations.push(ValidationRecommendation { - action: ValidationAction::DeepScanRequired, - reason: format!("{} directories haven't been scanned recently", stale_dirs.len()), - affected_directories: stale_dirs, - priority: ValidationSeverity::Info, - }); - } - - // If no major issues, everything is healthy - if report.recommendations.is_empty() { - report.recommendations.push(ValidationRecommendation { - action: ValidationAction::NoActionRequired, - reason: "ETag tracking system is healthy and working correctly".to_string(), - affected_directories: Vec::new(), - priority: ValidationSeverity::Info, - }); - } - } - - /// Check if we should trigger a deep scan based on validation results - pub fn should_trigger_deep_scan(&self, report: &ValidationReport) -> (bool, String) { - // Critical issues always trigger deep scan - let critical_issues = report.issues_found.iter() - .filter(|issue| matches!(issue.severity, ValidationSeverity::Critical)) - .count(); - - if critical_issues > 0 { - return (true, format!("{} critical issues detected", critical_issues)); - } - - // Multiple ETag mismatches suggest systematic issues - let etag_mismatches = report.issues_found.iter() - .filter(|issue| matches!(issue.issue_type, ValidationIssueType::ETagMismatch)) - .count(); - - if etag_mismatches > 5 { - return (true, format!("{} ETag mismatches suggest synchronization issues", etag_mismatches)); - } - - // Many untracked directories suggest incomplete initial scan - let untracked = report.issues_found.iter() - .filter(|issue| matches!(issue.issue_type, ValidationIssueType::Untracked)) - .count(); - - if untracked > 10 { - return (true, format!("{} untracked directories found", untracked)); - } - - // Low health score indicates general problems - if report.server_health_score < 70 { - return (true, format!("Low server health score: {}/100", report.server_health_score)); - } - - (false, "System appears healthy, no deep scan needed".to_string()) - } - - /// Ensure complete directory tree discovery before marking deep scan as complete - /// This is the MOST CRITICAL function - guarantees we've found ALL subdirectories - pub async fn ensure_complete_directory_discovery(&self, user_id: uuid::Uuid, state: &crate::AppState) -> Result { - info!("🔍 Starting complete directory tree discovery verification"); - - let mut report = DirectoryDiscoveryReport { - discovery_id: uuid::Uuid::new_v4(), - user_id, - started_at: chrono::Utc::now(), - completed_at: None, - watch_folders_processed: Vec::new(), - total_directories_discovered: 0, - new_directories_found: 0, - missing_directories_detected: 0, - is_complete: false, - issues: Vec::new(), - }; - - // Process each watch folder to ensure complete discovery - for watch_folder in &self.config.watch_folders { - info!("📂 Ensuring complete discovery for watch folder: {}", watch_folder); - - match self.ensure_watch_folder_complete_discovery(watch_folder, user_id, state, &mut report).await { - Ok(folder_report) => { - report.watch_folders_processed.push(folder_report); - } - Err(e) => { - error!("❌ Failed to ensure complete discovery for {}: {}", watch_folder, e); - report.issues.push(format!("Failed to process {}: {}", watch_folder, e)); - } - } - } - - // Verify completeness by checking for any gaps - self.verify_directory_tree_completeness(&mut report, user_id, state).await?; - - report.completed_at = Some(chrono::Utc::now()); - let duration = report.completed_at.unwrap() - report.started_at; - - if report.is_complete { - info!("✅ Complete directory discovery verified in {:.2}s. {} total directories, {} newly discovered", - duration.num_milliseconds() as f64 / 1000.0, - report.total_directories_discovered, - report.new_directories_found); - } else { - warn!("⚠️ Directory discovery incomplete after {:.2}s. {} issues found", - duration.num_milliseconds() as f64 / 1000.0, - report.issues.len()); - } - - Ok(report) - } - - /// Ensure a single watch folder has complete n-depth directory discovery - async fn ensure_watch_folder_complete_discovery( - &self, - watch_folder: &str, - user_id: uuid::Uuid, - state: &crate::AppState, - main_report: &mut DirectoryDiscoveryReport - ) -> Result { - let mut folder_report = WatchFolderDiscoveryReport { - watch_folder: watch_folder.to_string(), - total_directories: 0, - new_directories: 0, - depth_levels_scanned: 0, - is_complete: false, - }; - - // Use PROPFIND with Depth: infinity to get COMPLETE directory tree - let relative_watch_folder = self.convert_to_relative_path(watch_folder); - let all_entries = self.discover_files_in_folder_impl(&relative_watch_folder).await?; - - // Extract ALL directories from the complete scan - let all_server_directories: Vec<_> = all_entries.iter() - .filter(|entry| entry.is_directory) - .collect(); - - folder_report.total_directories = all_server_directories.len(); - main_report.total_directories_discovered += all_server_directories.len(); - - // Calculate depth levels - let max_depth = all_server_directories.iter() - .map(|dir| dir.path.chars().filter(|&c| c == '/').count()) - .max() - .unwrap_or(0); - folder_report.depth_levels_scanned = max_depth; - - info!("📊 Found {} directories across {} depth levels in {}", - all_server_directories.len(), max_depth, watch_folder); - - // Check each directory against our tracking database - for server_dir in &all_server_directories { - match state.db.get_webdav_directory(user_id, &server_dir.path).await { - Ok(Some(tracked_dir)) => { - // Directory is already tracked - verify ETag is current - if tracked_dir.directory_etag != server_dir.etag { - debug!("🔄 Updating ETag for tracked directory: {}", server_dir.path); - let update = crate::models::UpdateWebDAVDirectory { - directory_etag: server_dir.etag.clone(), - last_scanned_at: chrono::Utc::now(), - file_count: 0, // Will be calculated separately - total_size_bytes: 0, - }; - if let Err(e) = state.db.update_webdav_directory(user_id, &server_dir.path, &update).await { - warn!("Failed to update directory {}: {}", server_dir.path, e); - } - } - } - Ok(None) => { - // NEW DIRECTORY DISCOVERED - this is critical to track - info!("🆕 NEW directory discovered: {}", server_dir.path); - folder_report.new_directories += 1; - main_report.new_directories_found += 1; - - // Immediately add to tracking database - let new_dir = crate::models::CreateWebDAVDirectory { - user_id, - directory_path: server_dir.path.clone(), - directory_etag: server_dir.etag.clone(), - file_count: 0, // Will be calculated when files are processed - total_size_bytes: 0, - }; - - if let Err(e) = state.db.create_or_update_webdav_directory(&new_dir).await { - error!("❌ CRITICAL: Failed to track new directory {}: {}", server_dir.path, e); - main_report.issues.push(format!("Failed to track new directory {}: {}", server_dir.path, e)); - } else { - debug!("✅ Successfully tracking new directory: {}", server_dir.path); - } - } - Err(e) => { - error!("Database error checking directory {}: {}", server_dir.path, e); - main_report.issues.push(format!("Database error for {}: {}", server_dir.path, e)); - } - } - } - - // Check for orphaned tracking entries (directories we track but don't exist on server) - match state.db.list_webdav_directories(user_id).await { - Ok(tracked_dirs) => { - let server_paths: HashSet = all_server_directories.iter() - .map(|d| d.path.clone()) - .collect(); - - for tracked_dir in tracked_dirs { - if tracked_dir.directory_path.starts_with(watch_folder) && !server_paths.contains(&tracked_dir.directory_path) { - warn!("🗑️ Orphaned directory tracking detected: {} (exists in DB but not on server)", tracked_dir.directory_path); - main_report.missing_directories_detected += 1; - - // Could optionally clean up orphaned entries here - // For now, just report them - } - } - } - Err(e) => { - error!("Failed to check for orphaned directories: {}", e); - main_report.issues.push(format!("Failed to check orphaned directories: {}", e)); - } - } - - folder_report.is_complete = folder_report.new_directories == 0 || main_report.issues.is_empty(); - Ok(folder_report) - } - - /// Final verification that directory tree coverage is complete - async fn verify_directory_tree_completeness( - &self, - report: &mut DirectoryDiscoveryReport, - user_id: uuid::Uuid, - state: &crate::AppState - ) -> Result<()> { - info!("🔍 Performing final completeness verification"); - - // Check that we have no scan_in_progress flags left over - match state.db.get_incomplete_webdav_scans(user_id).await { - Ok(incomplete) => { - if !incomplete.is_empty() { - warn!("⚠️ Found {} incomplete scans still in progress", incomplete.len()); - report.issues.push(format!("{} scans still marked as in progress", incomplete.len())); - report.is_complete = false; - return Ok(()); - } - } - Err(e) => { - error!("Failed to check incomplete scans: {}", e); - report.issues.push(format!("Cannot verify scan completeness: {}", e)); - report.is_complete = false; - return Ok(()); - } - } - - // Verify each watch folder has at least some tracked directories - for watch_folder in &self.config.watch_folders { - match state.db.list_webdav_directories(user_id).await { - Ok(dirs) => { - let watch_folder_dirs = dirs.iter() - .filter(|d| d.directory_path.starts_with(watch_folder)) - .count(); - - if watch_folder_dirs == 0 { - warn!("⚠️ No directories tracked for watch folder: {}", watch_folder); - report.issues.push(format!("No directories tracked for watch folder: {}", watch_folder)); - report.is_complete = false; - } else { - debug!("✅ Watch folder {} has {} tracked directories", watch_folder, watch_folder_dirs); - } - } - Err(e) => { - error!("Failed to verify watch folder {}: {}", watch_folder, e); - report.issues.push(format!("Cannot verify watch folder {}: {}", watch_folder, e)); - report.is_complete = false; - } - } - } - - // If no issues found, mark as complete - if report.issues.is_empty() { - report.is_complete = true; - info!("✅ Directory tree completeness verified - all {} watch folders fully discovered", self.config.watch_folders.len()); - } else { - warn!("❌ Directory tree completeness verification failed: {} issues", report.issues.len()); - } - - Ok(()) - } - - /// Modified deep scan that REQUIRES complete directory discovery - pub async fn deep_scan_with_guaranteed_completeness(&self, user_id: uuid::Uuid, state: &crate::AppState) -> Result> { - info!("🚀 Starting deep scan with guaranteed directory completeness"); - - let scan_id = uuid::Uuid::new_v4(); - let started_at = chrono::Utc::now(); - - // STEP 1: CRITICAL - Ensure complete directory discovery FIRST - let discovery_report = self.ensure_complete_directory_discovery(user_id, state).await?; - - if !discovery_report.is_complete { - return Err(anyhow!("Cannot proceed with deep scan: Directory discovery incomplete. {} issues found: {:?}", - discovery_report.issues.len(), discovery_report.issues)); - } - - info!("✅ Directory discovery complete - proceeding with file processing"); - - // STEP 2: Only now process files, knowing we have complete directory coverage - let mut all_files = Vec::new(); - for watch_folder in &self.config.watch_folders { - match self.smart_directory_scan_with_checkpoints(watch_folder, None, user_id, state).await { - Ok(mut files) => { - info!("📁 Processed {} files from {}", files.len(), watch_folder); - all_files.append(&mut files); - } - Err(e) => { - error!("Failed to process files in {}: {}", watch_folder, e); - return Err(anyhow!("File processing failed for {}: {}", watch_folder, e)); - } - } - } - - // STEP 3: Final verification that nothing was missed - let final_verification = self.ensure_complete_directory_discovery(user_id, state).await?; - let is_complete = final_verification.is_complete && final_verification.new_directories_found == 0; - - if final_verification.new_directories_found > 0 { - warn!("⚠️ Found {} additional directories during final verification - scan may need to restart", - final_verification.new_directories_found); - } - - let completed_at = chrono::Utc::now(); - let duration = completed_at - started_at; - - if is_complete { - info!("🎉 DEEP SCAN COMPLETE WITH GUARANTEED COMPLETENESS: {} files processed, {} directories tracked in {:.2}s", - all_files.len(), - discovery_report.total_directories_discovered, - duration.num_milliseconds() as f64 / 1000.0); - } else { - warn!("⚠️ Deep scan completed but completeness not guaranteed: {:.2}s", - duration.num_milliseconds() as f64 / 1000.0); - } - - Ok(all_files) - } -} - -/// Report of complete directory tree discovery -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct DirectoryDiscoveryReport { - pub discovery_id: uuid::Uuid, - pub user_id: uuid::Uuid, - pub started_at: chrono::DateTime, - pub completed_at: Option>, - pub watch_folders_processed: Vec, - pub total_directories_discovered: usize, - pub new_directories_found: usize, - pub missing_directories_detected: usize, - pub is_complete: bool, - pub issues: Vec, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct WatchFolderDiscoveryReport { - pub watch_folder: String, - pub total_directories: usize, - pub new_directories: usize, - pub depth_levels_scanned: usize, - pub is_complete: bool, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct CompleteDeepScanReport { - pub scan_id: uuid::Uuid, - pub user_id: uuid::Uuid, - pub started_at: chrono::DateTime, - pub completed_at: chrono::DateTime, - pub directory_discovery_report: DirectoryDiscoveryReport, - pub final_verification_report: DirectoryDiscoveryReport, - pub total_files_processed: usize, - pub scan_duration_seconds: i64, - pub is_guaranteed_complete: bool, -} \ No newline at end of file diff --git a/src/tests/unit_webdav_smart_scanning_tests.rs b/src/tests/unit_webdav_smart_scanning_tests.rs index 72975ba..63fb718 100644 --- a/src/tests/unit_webdav_smart_scanning_tests.rs +++ b/src/tests/unit_webdav_smart_scanning_tests.rs @@ -1,4 +1,4 @@ -use crate::services::webdav_service::{WebDAVConfig, WebDAVService}; +use crate::services::webdav::{WebDAVConfig, WebDAVService}; fn create_test_config() -> WebDAVConfig { WebDAVConfig {