diff --git a/.gitignore b/.gitignore index b38890d..c940295 100644 --- a/.gitignore +++ b/.gitignore @@ -4,7 +4,7 @@ node_modules/ .env assets/ frontend/dist/ -.claude/ +.claude/settings.local.json # This file is used to store the local Claude settings. readur_uploads/ readur_watch/ test-results/ diff --git a/Cargo.lock b/Cargo.lock index 375a9f6..a2de4ab 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1009,6 +1009,17 @@ dependencies = [ "nom", ] +[[package]] +name = "cfb" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d38f2da7a0a2c4ccf0065be06397cc26a81f4e528be095826eee9d4adbb8c60f" +dependencies = [ + "byteorder", + "fnv", + "uuid", +] + [[package]] name = "cfg-expr" version = "0.15.8" @@ -2410,6 +2421,15 @@ dependencies = [ "serde", ] +[[package]] +name = "infer" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb33622da908807a06f9513c19b3c1ad50fab3e4137d82a78107d502075aa199" +dependencies = [ + "cfb", +] + [[package]] name = "inotify" version = "0.11.0" @@ -2625,7 +2645,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "07033963ba89ebaf1584d767badaa2e8fcec21aedea6b8c0346d487d49c28667" dependencies = [ "cfg-if", - "windows-targets 0.48.5", + "windows-targets 0.53.2", ] [[package]] @@ -3610,6 +3630,7 @@ dependencies = [ "hostname", "image", "imageproc", + "infer", "jsonwebtoken", "mime_guess", "notify", @@ -5674,7 +5695,7 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb" dependencies = [ - "windows-sys 0.48.0", + "windows-sys 0.59.0", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 724d66e..bf20b31 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,6 +11,7 @@ path = "src/main.rs" name = "test_runner" path = "src/bin/test_runner.rs" + [dependencies] tokio = { version = "1", features = ["full"] } axum = { version = "0.8", features = ["multipart"] } @@ -33,6 +34,7 @@ futures-util = "0.3" futures = "0.3" notify = "8" mime_guess = "2" +infer = "0.15" tesseract = { version = "0.15", optional = true } image = { version = "0.25", features = ["png", "jpeg", "tiff", "bmp"], optional = true } imageproc = { version = "0.25", optional = true } diff --git a/frontend/package-lock.json b/frontend/package-lock.json index 18fa643..9657331 100644 --- a/frontend/package-lock.json +++ b/frontend/package-lock.json @@ -1,12 +1,12 @@ { "name": "readur-frontend", - "version": "2.4.2", + "version": "2.5.3", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "readur-frontend", - "version": "2.4.2", + "version": "2.5.3", "dependencies": { "@emotion/react": "^11.14.0", "@emotion/styled": "^11.14.0", diff --git a/src/lib.rs b/src/lib.rs index a0c5e9a..3859422 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,6 +5,7 @@ pub mod db_guardrails_simple; pub mod errors; pub mod ingestion; pub mod metadata_extraction; +pub mod mime_detection; pub mod models; pub mod monitoring; pub mod ocr; diff --git a/src/mime_detection.rs b/src/mime_detection.rs new file mode 100644 index 0000000..e49bf62 --- /dev/null +++ b/src/mime_detection.rs @@ -0,0 +1,431 @@ +/// MIME type detection module for improved file type identification +/// +/// This module provides functions for detecting file MIME types using multiple methods: +/// 1. Content-based detection using magic bytes (most reliable) +/// 2. Server-provided MIME type (when available and trusted) +/// 3. Extension-based fallback (least reliable, but covers edge cases) +/// +/// The goal is to provide accurate MIME type detection that's particularly important +/// for OCR processing where incorrectly classified image files can cause issues. + +use std::path::Path; +use tracing::{debug, warn}; + +/// Strategy for MIME type detection +#[derive(Debug, Clone, PartialEq)] +pub enum DetectionStrategy { + /// Use content-based detection (magic bytes) - most reliable + ContentBased, + /// Trust server-provided MIME type if available, fallback to content + TrustServer, + /// Use extension-based detection - least reliable but fastest + ExtensionOnly, + /// Comprehensive strategy: server -> content -> extension -> fallback + Comprehensive, +} + +/// Result of MIME type detection with metadata about the detection method used +#[derive(Debug, Clone)] +pub struct MimeDetectionResult { + pub mime_type: String, + pub confidence: MimeConfidence, + pub detection_method: DetectionMethod, + pub original_server_type: Option, + pub detected_extension: Option, +} + +/// Confidence level of the MIME type detection +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] +pub enum MimeConfidence { + /// Low confidence - extension-based or fallback detection + Low, + /// Medium confidence - mime_guess library detection + Medium, + /// High confidence - magic byte detection or trusted server + High, + /// Very high confidence - content analysis confirms server type + VeryHigh, +} + +/// Method used for MIME type detection +#[derive(Debug, Clone, PartialEq)] +pub enum DetectionMethod { + /// Detected using magic bytes/file signature + MagicBytes, + /// Provided by the server and trusted + ServerProvided, + /// Detected using file extension + Extension, + /// Fallback to default type + Fallback, + /// Hybrid approach using multiple methods + Hybrid, +} + +impl MimeDetectionResult { + /// Create a result for server-provided MIME type + pub fn from_server(mime_type: String) -> Self { + Self { + mime_type, + confidence: MimeConfidence::High, + detection_method: DetectionMethod::ServerProvided, + original_server_type: None, + detected_extension: None, + } + } + + /// Create a result for content-based detection + pub fn from_content(mime_type: String, server_type: Option) -> Self { + Self { + mime_type, + confidence: MimeConfidence::High, + detection_method: DetectionMethod::MagicBytes, + original_server_type: server_type, + detected_extension: None, + } + } + + /// Create a result for extension-based detection + pub fn from_extension(mime_type: String, extension: String) -> Self { + Self { + mime_type, + confidence: MimeConfidence::Medium, + detection_method: DetectionMethod::Extension, + original_server_type: None, + detected_extension: Some(extension), + } + } + + /// Create a fallback result + pub fn fallback() -> Self { + Self { + mime_type: "application/octet-stream".to_string(), + confidence: MimeConfidence::Low, + detection_method: DetectionMethod::Fallback, + original_server_type: None, + detected_extension: None, + } + } + + /// Check if the detected MIME type indicates an image file + pub fn is_image(&self) -> bool { + self.mime_type.starts_with("image/") + } + + /// Check if the detected MIME type indicates a document file + pub fn is_document(&self) -> bool { + matches!(self.mime_type.as_str(), + "application/pdf" | + "application/msword" | + "application/vnd.openxmlformats-officedocument.wordprocessingml.document" | + "application/vnd.ms-excel" | + "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" | + "application/vnd.ms-powerpoint" | + "application/vnd.openxmlformats-officedocument.presentationml.presentation" | + "text/plain" | + "text/rtf" | + "application/rtf" + ) + } + + /// Check if this MIME type is suitable for OCR processing + pub fn is_ocr_suitable(&self) -> bool { + self.is_image() || self.mime_type == "application/pdf" + } +} + +/// Detect MIME type for WebDAV discovery phase (when we only have file metadata) +/// +/// This function is called during the initial WebDAV XML parsing when we don't +/// have access to the actual file content yet. +/// +/// # Arguments +/// * `filename` - The filename/path of the file +/// * `server_mime_type` - MIME type provided by the WebDAV server, if any +/// * `strategy` - Detection strategy to use +/// +/// # Returns +/// A `MimeDetectionResult` with the best available MIME type determination +pub fn detect_mime_for_discovery( + filename: &str, + server_mime_type: Option<&str>, + strategy: DetectionStrategy, +) -> MimeDetectionResult { + debug!("Detecting MIME type for discovery: filename={}, server_type={:?}, strategy={:?}", + filename, server_mime_type, strategy); + + match strategy { + DetectionStrategy::ContentBased => { + // During discovery, we can't analyze content, so fall back to extension + detect_from_extension(filename, server_mime_type) + } + DetectionStrategy::TrustServer => { + if let Some(server_type) = server_mime_type { + if is_trusted_server_mime_type(server_type) { + return MimeDetectionResult::from_server(server_type.to_string()); + } + } + // Fallback to extension-based detection + detect_from_extension(filename, server_mime_type) + } + DetectionStrategy::ExtensionOnly => { + detect_from_extension(filename, server_mime_type) + } + DetectionStrategy::Comprehensive => { + // Use server type if trusted, otherwise extension-based + if let Some(server_type) = server_mime_type { + if is_trusted_server_mime_type(server_type) { + return MimeDetectionResult::from_server(server_type.to_string()); + } + } + detect_from_extension(filename, server_mime_type) + } + } +} + +/// Detect MIME type when file content is available (during file download/processing) +/// +/// This provides the most accurate detection using magic bytes from the actual file content. +/// +/// # Arguments +/// * `content` - The first few bytes of the file content (at least 512 bytes recommended) +/// * `filename` - The filename for fallback detection +/// * `server_mime_type` - MIME type provided by the server, if any +/// +/// # Returns +/// A `MimeDetectionResult` with high-confidence MIME type detection +pub fn detect_mime_from_content( + content: &[u8], + filename: &str, + server_mime_type: Option<&str>, +) -> MimeDetectionResult { + debug!("Detecting MIME type from content: filename={}, server_type={:?}, content_len={}", + filename, server_mime_type, content.len()); + + // First, try magic byte detection + if let Some(detected_type) = infer::get(content) { + let mime_type = detected_type.mime_type().to_string(); + debug!("Magic bytes detected MIME type: {}", mime_type); + + // If server provided a type, check for consistency + if let Some(server_type) = server_mime_type { + if are_mime_types_compatible(&mime_type, server_type) { + // Both agree - very high confidence + let mut result = MimeDetectionResult::from_content(mime_type, Some(server_type.to_string())); + result.confidence = MimeConfidence::VeryHigh; + result.detection_method = DetectionMethod::Hybrid; + return result; + } else { + // Content detection overrides server type - trust the bytes + warn!("MIME type mismatch: server={}, content={} for file {}", + server_type, mime_type, filename); + return MimeDetectionResult::from_content(mime_type, Some(server_type.to_string())); + } + } else { + // Only content detection available + return MimeDetectionResult::from_content(mime_type, None); + } + } + + // Magic bytes detection failed, fall back to server type if trusted + if let Some(server_type) = server_mime_type { + if is_trusted_server_mime_type(server_type) { + debug!("Using trusted server MIME type: {}", server_type); + return MimeDetectionResult::from_server(server_type.to_string()); + } + } + + // Fall back to extension-based detection + debug!("Content detection failed, falling back to extension detection"); + detect_from_extension(filename, server_mime_type) +} + +/// Update an existing MIME type with content-based detection if available +/// +/// This function is useful for re-detecting MIME types when file content becomes +/// available after initial discovery. +/// +/// # Arguments +/// * `current_mime_type` - The currently assigned MIME type +/// * `content` - File content for analysis +/// * `filename` - Filename for context +/// +/// # Returns +/// A new `MimeDetectionResult` if detection improves confidence, or None if no change needed +pub fn update_mime_type_with_content( + current_mime_type: &str, + content: &[u8], + filename: &str, +) -> Option { + let new_result = detect_mime_from_content(content, filename, Some(current_mime_type)); + + // Only update if we have higher confidence or detected a different type + if new_result.confidence > MimeConfidence::Medium || + new_result.mime_type != current_mime_type { + Some(new_result) + } else { + None + } +} + +/// Detect MIME type from file extension using mime_guess library +fn detect_from_extension(filename: &str, server_mime_type: Option<&str>) -> MimeDetectionResult { + let path = Path::new(filename); + + if let Some(mime_type) = mime_guess::from_path(path).first() { + let mime_str = mime_type.to_string(); + debug!("Extension-based detection: {} -> {}", filename, mime_str); + + let mut result = MimeDetectionResult::from_extension( + mime_str, + path.extension() + .and_then(|ext| ext.to_str()) + .unwrap_or("") + .to_string() + ); + result.original_server_type = server_mime_type.map(|s| s.to_string()); + result + } else { + debug!("Extension-based detection failed for: {}", filename); + let mut result = MimeDetectionResult::fallback(); + result.original_server_type = server_mime_type.map(|s| s.to_string()); + result + } +} + +/// Check if a server-provided MIME type should be trusted +/// +/// Some servers return generic types like "application/octet-stream" which +/// aren't useful, while others provide accurate information. +fn is_trusted_server_mime_type(mime_type: &str) -> bool { + !matches!(mime_type, + "application/octet-stream" | + "application/binary" | + "binary/octet-stream" | + "" | + "unknown" + ) +} + +/// Check if two MIME types are compatible/equivalent +/// +/// Some servers might return slightly different but equivalent MIME types +/// (e.g., "image/jpg" vs "image/jpeg") +fn are_mime_types_compatible(type1: &str, type2: &str) -> bool { + if type1 == type2 { + return true; + } + + // Handle common variations + match (type1, type2) { + ("image/jpeg", "image/jpg") | ("image/jpg", "image/jpeg") => true, + ("image/tiff", "image/tif") | ("image/tif", "image/tiff") => true, + ("text/plain", "text/txt") | ("text/txt", "text/plain") => true, + _ => { + // Check if they have the same primary type (e.g., both are "image/*") + let parts1: Vec<&str> = type1.split('/').collect(); + let parts2: Vec<&str> = type2.split('/').collect(); + + parts1.len() == 2 && parts2.len() == 2 && parts1[0] == parts2[0] + } + } +} + +/// Legacy function for backward compatibility +/// +/// This maintains the same interface as the original `get_mime_type_from_extension` +/// function but uses the new detection system. +pub fn get_mime_type_from_extension(extension: &str) -> String { + let fake_filename = format!("file.{}", extension); + let result = detect_from_extension(&fake_filename, None); + result.mime_type +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_mime_detection_from_extension() { + let result = detect_mime_for_discovery( + "test.pdf", + None, + DetectionStrategy::ExtensionOnly + ); + assert_eq!(result.mime_type, "application/pdf"); + assert_eq!(result.detection_method, DetectionMethod::Extension); + } + + #[test] + fn test_server_type_trust() { + // Trusted server type + let result = detect_mime_for_discovery( + "test.pdf", + Some("application/pdf"), + DetectionStrategy::TrustServer + ); + assert_eq!(result.mime_type, "application/pdf"); + assert_eq!(result.detection_method, DetectionMethod::ServerProvided); + + // Untrusted server type should fall back + let result = detect_mime_for_discovery( + "test.pdf", + Some("application/octet-stream"), + DetectionStrategy::TrustServer + ); + assert_eq!(result.mime_type, "application/pdf"); + assert_eq!(result.detection_method, DetectionMethod::Extension); + } + + #[test] + fn test_mime_type_compatibility() { + assert!(are_mime_types_compatible("image/jpeg", "image/jpg")); + assert!(are_mime_types_compatible("image/jpg", "image/jpeg")); + assert!(are_mime_types_compatible("text/plain", "text/plain")); + assert!(!are_mime_types_compatible("image/jpeg", "text/plain")); + } + + #[test] + fn test_content_based_detection() { + // PDF magic bytes + let pdf_header = b"%PDF-1.4"; + let result = detect_mime_from_content(pdf_header, "test.pdf", None); + assert_eq!(result.mime_type, "application/pdf"); + assert_eq!(result.detection_method, DetectionMethod::MagicBytes); + assert_eq!(result.confidence, MimeConfidence::High); + + // JPEG magic bytes + let jpeg_header = [0xFF, 0xD8, 0xFF]; + let result = detect_mime_from_content(&jpeg_header, "test.jpg", None); + assert_eq!(result.mime_type, "image/jpeg"); + } + + #[test] + fn test_hybrid_detection() { + // Content and server agree + let pdf_header = b"%PDF-1.4"; + let result = detect_mime_from_content(pdf_header, "test.pdf", Some("application/pdf")); + assert_eq!(result.mime_type, "application/pdf"); + assert_eq!(result.detection_method, DetectionMethod::Hybrid); + assert_eq!(result.confidence, MimeConfidence::VeryHigh); + } + + #[test] + fn test_legacy_compatibility() { + assert_eq!(get_mime_type_from_extension("pdf"), "application/pdf"); + assert_eq!(get_mime_type_from_extension("jpg"), "image/jpeg"); + assert_eq!(get_mime_type_from_extension("png"), "image/png"); + } + + #[test] + fn test_ocr_suitability() { + let pdf_result = MimeDetectionResult::from_content("application/pdf".to_string(), None); + assert!(pdf_result.is_ocr_suitable()); + + let image_result = MimeDetectionResult::from_content("image/jpeg".to_string(), None); + assert!(image_result.is_ocr_suitable()); + + let text_result = MimeDetectionResult::from_content("text/plain".to_string(), None); + assert!(!text_result.is_ocr_suitable()); + } +} \ No newline at end of file diff --git a/src/scheduling/source_scheduler.rs b/src/scheduling/source_scheduler.rs index c9682f2..1ec2f53 100644 --- a/src/scheduling/source_scheduler.rs +++ b/src/scheduling/source_scheduler.rs @@ -570,27 +570,17 @@ impl SourceScheduler { return Err(format!("WebDAV server_url is empty")); } - // Check if URL starts with a valid scheme - if !server_url.starts_with("http://") && !server_url.starts_with("https://") { - return Err(format!( - "WebDAV server_url must start with 'http://' or 'https://'. \ - Current value: '{}'. \ - Examples of valid URLs: \ - - https://cloud.example.com \ - - http://192.168.1.100:8080 \ - - https://nextcloud.mydomain.com:443", - server_url - )); - } + // Normalize URL by adding protocol if missing (consistent with WebDAVConfig) + let normalized_url = crate::services::webdav::config::WebDAVConfig::normalize_server_url(server_url); - // Try to parse as URL to catch other issues - match reqwest::Url::parse(server_url) { + // Try to parse the normalized URL to catch other issues + match reqwest::Url::parse(&normalized_url) { Ok(url) => { if url.scheme() != "http" && url.scheme() != "https" { return Err(format!( "WebDAV server_url has invalid scheme '{}'. Only 'http' and 'https' are supported. \ Current URL: '{}'", - url.scheme(), server_url + url.scheme(), normalized_url )); } @@ -599,23 +589,23 @@ impl SourceScheduler { "WebDAV server_url is missing hostname. \ Current URL: '{}'. \ Example: https://cloud.example.com", - server_url + normalized_url )); } - crate::debug_log!("SOURCE_SCHEDULER", "โœ… WebDAV URL validation passed for source '{}': {}", source_name, server_url); + crate::debug_log!("SOURCE_SCHEDULER", "โœ… WebDAV URL validation passed for source '{}': {} (normalized to: {})", source_name, server_url, normalized_url); Ok(()) } Err(e) => { Err(format!( "WebDAV server_url is not a valid URL: {}. \ - Current value: '{}'. \ + Current value: '{}' (normalized to: '{}'). \ The URL must be absolute and include the full domain. \ Examples: \ - https://cloud.example.com \ - http://192.168.1.100:8080/webdav \ - https://nextcloud.mydomain.com", - e, server_url + e, server_url, normalized_url )) } } diff --git a/src/services/webdav/config.rs b/src/services/webdav/config.rs index ab415fa..446eb29 100644 --- a/src/services/webdav/config.rs +++ b/src/services/webdav/config.rs @@ -103,6 +103,32 @@ impl WebDAVConfig { } } + /// Normalizes a server URL by adding protocol if missing + /// Prefers HTTPS over HTTP for security reasons + pub fn normalize_server_url(url: &str) -> String { + let trimmed = url.trim(); + + // If protocol is already specified, return as-is + if trimmed.starts_with("http://") || trimmed.starts_with("https://") { + return trimmed.to_string(); + } + + // If no protocol specified, default to HTTPS for security + format!("https://{}", trimmed) + } + + /// Generates alternative protocol URL for fallback attempts + /// If input has HTTPS, returns HTTP version and vice versa + pub fn get_alternative_protocol_url(url: &str) -> Option { + if url.starts_with("https://") { + Some(url.replacen("https://", "http://", 1)) + } else if url.starts_with("http://") { + Some(url.replacen("http://", "https://", 1)) + } else { + None + } + } + /// Validates the configuration pub fn validate(&self) -> anyhow::Result<()> { if self.server_url.is_empty() { @@ -121,9 +147,22 @@ impl WebDAVConfig { return Err(anyhow::anyhow!("At least one watch folder must be specified")); } - // Validate URL format - if !self.server_url.starts_with("http://") && !self.server_url.starts_with("https://") { - return Err(anyhow::anyhow!("Server URL must start with http:// or https://")); + // Validate URL format - now accepts URLs without protocol + // Protocol detection and fallback will be handled during connection testing + let normalized_url = Self::normalize_server_url(&self.server_url); + + // Basic URL validation - check if it looks like a valid domain/IP + let url_without_protocol = normalized_url + .trim_start_matches("https://") + .trim_start_matches("http://"); + + if url_without_protocol.is_empty() { + return Err(anyhow::anyhow!("Server URL must contain a valid domain or IP address")); + } + + // Check for obviously invalid URLs + if url_without_protocol.contains("://") { + return Err(anyhow::anyhow!("Invalid URL format: contains multiple protocols")); } Ok(()) @@ -131,8 +170,8 @@ impl WebDAVConfig { /// Returns the base URL for WebDAV operations pub fn webdav_url(&self) -> String { - // Normalize the server URL by removing trailing slashes - let normalized_url = self.server_url.trim_end_matches('/').to_string(); + // Normalize the server URL by adding protocol if missing and removing trailing slashes + let normalized_url = Self::normalize_server_url(&self.server_url).trim_end_matches('/').to_string(); // Add WebDAV path based on server type match self.server_type.as_deref() { @@ -160,7 +199,7 @@ impl WebDAVConfig { /// Returns alternative WebDAV URLs to try if the primary one fails /// This is used for fallback mechanisms when encountering 405 errors pub fn webdav_fallback_urls(&self) -> Vec { - let normalized_url = self.server_url.trim_end_matches('/').to_string(); + let normalized_url = Self::normalize_server_url(&self.server_url).trim_end_matches('/').to_string(); let mut fallback_urls = Vec::new(); match self.server_type.as_deref() { diff --git a/src/services/webdav/mod.rs b/src/services/webdav/mod.rs index 6826cb1..0eb427e 100644 --- a/src/services/webdav/mod.rs +++ b/src/services/webdav/mod.rs @@ -23,4 +23,6 @@ mod url_construction_tests; #[cfg(test)] mod subdirectory_edge_cases_tests; #[cfg(test)] +mod protocol_detection_tests; +#[cfg(test)] mod tests; \ No newline at end of file diff --git a/src/services/webdav/protocol_detection_tests.rs b/src/services/webdav/protocol_detection_tests.rs new file mode 100644 index 0000000..b02f24d --- /dev/null +++ b/src/services/webdav/protocol_detection_tests.rs @@ -0,0 +1,233 @@ +#[cfg(test)] +mod tests { + use super::super::{WebDAVService, WebDAVConfig}; + + /// Helper function to create test WebDAV config without protocol + fn create_test_config_without_protocol() -> WebDAVConfig { + WebDAVConfig { + server_url: "nas.example.com".to_string(), // No protocol + username: "testuser".to_string(), + password: "testpass".to_string(), + watch_folders: vec!["/Documents".to_string()], + file_extensions: vec!["pdf".to_string(), "txt".to_string()], + timeout_seconds: 30, + server_type: Some("nextcloud".to_string()), + } + } + + /// Helper function to create test WebDAV config with HTTPS protocol + fn create_test_config_with_https() -> WebDAVConfig { + WebDAVConfig { + server_url: "https://nas.example.com".to_string(), + username: "testuser".to_string(), + password: "testpass".to_string(), + watch_folders: vec!["/Documents".to_string()], + file_extensions: vec!["pdf".to_string(), "txt".to_string()], + timeout_seconds: 30, + server_type: Some("nextcloud".to_string()), + } + } + + /// Helper function to create test WebDAV config with HTTP protocol + fn create_test_config_with_http() -> WebDAVConfig { + WebDAVConfig { + server_url: "http://nas.example.com".to_string(), + username: "testuser".to_string(), + password: "testpass".to_string(), + watch_folders: vec!["/Documents".to_string()], + file_extensions: vec!["pdf".to_string(), "txt".to_string()], + timeout_seconds: 30, + server_type: Some("nextcloud".to_string()), + } + } + + #[tokio::test] + async fn test_config_validation_accepts_url_without_protocol() { + let config = create_test_config_without_protocol(); + + // Should not fail validation + assert!(config.validate().is_ok()); + } + + #[tokio::test] + async fn test_config_validation_accepts_url_with_https() { + let config = create_test_config_with_https(); + + // Should not fail validation + assert!(config.validate().is_ok()); + } + + #[tokio::test] + async fn test_config_validation_accepts_url_with_http() { + let config = create_test_config_with_http(); + + // Should not fail validation + assert!(config.validate().is_ok()); + } + + #[tokio::test] + async fn test_normalize_server_url_adds_https_by_default() { + let normalized = WebDAVConfig::normalize_server_url("nas.example.com"); + assert_eq!(normalized, "https://nas.example.com"); + } + + #[tokio::test] + async fn test_normalize_server_url_preserves_existing_protocol() { + let https_url = WebDAVConfig::normalize_server_url("https://nas.example.com"); + assert_eq!(https_url, "https://nas.example.com"); + + let http_url = WebDAVConfig::normalize_server_url("http://nas.example.com"); + assert_eq!(http_url, "http://nas.example.com"); + } + + #[tokio::test] + async fn test_get_alternative_protocol_url() { + // HTTPS to HTTP + let alt_http = WebDAVConfig::get_alternative_protocol_url("https://nas.example.com"); + assert_eq!(alt_http, Some("http://nas.example.com".to_string())); + + // HTTP to HTTPS + let alt_https = WebDAVConfig::get_alternative_protocol_url("http://nas.example.com"); + assert_eq!(alt_https, Some("https://nas.example.com".to_string())); + + // No protocol - should return None + let no_protocol = WebDAVConfig::get_alternative_protocol_url("nas.example.com"); + assert_eq!(no_protocol, None); + } + + #[tokio::test] + async fn test_webdav_url_uses_normalized_url() { + let config = create_test_config_without_protocol(); + let webdav_url = config.webdav_url(); + + // Should start with https:// (normalized) + assert!(webdav_url.starts_with("https://")); + assert_eq!(webdav_url, "https://nas.example.com/remote.php/dav/files/testuser"); + } + + #[tokio::test] + async fn test_service_creation_with_protocol_detection() { + let config = create_test_config_without_protocol(); + + // Should be able to create service without errors + let service = WebDAVService::new(config); + assert!(service.is_ok()); + } + + #[tokio::test] + async fn test_effective_server_url_defaults_to_normalized() { + let config = create_test_config_without_protocol(); + let service = WebDAVService::new(config).unwrap(); + + let effective_url = service.get_effective_server_url(); + assert_eq!(effective_url, "https://nas.example.com"); + } + + #[tokio::test] + async fn test_effective_server_url_with_existing_protocol() { + let config = create_test_config_with_http(); + let service = WebDAVService::new(config).unwrap(); + + let effective_url = service.get_effective_server_url(); + assert_eq!(effective_url, "http://nas.example.com"); + } + + #[tokio::test] + async fn test_working_protocol_initially_none() { + let config = create_test_config_without_protocol(); + let service = WebDAVService::new(config).unwrap(); + + // Initially, no working protocol should be detected + assert!(service.get_working_protocol().is_none()); + } + + #[tokio::test] + async fn test_is_connection_error_detection() { + let config = create_test_config_without_protocol(); + let service = WebDAVService::new(config).unwrap(); + + // Test various connection error patterns + let connection_errors = vec![ + anyhow::anyhow!("connection refused"), + anyhow::anyhow!("timeout occurred"), + anyhow::anyhow!("DNS resolution failed"), + anyhow::anyhow!("TLS handshake failed"), + anyhow::anyhow!("SSL certificate error"), + ]; + + for error in connection_errors { + assert!(service.is_connection_error(&error), "Should detect '{}' as connection error", error); + } + + // Test non-connection errors + let non_connection_errors = vec![ + anyhow::anyhow!("401 Unauthorized"), + anyhow::anyhow!("403 Forbidden"), + anyhow::anyhow!("invalid credentials"), + ]; + + for error in non_connection_errors { + assert!(!service.is_connection_error(&error), "Should NOT detect '{}' as connection error", error); + } + } + + #[tokio::test] + async fn test_config_validation_rejects_empty_url() { + let mut config = create_test_config_without_protocol(); + config.server_url = "".to_string(); + + assert!(config.validate().is_err()); + } + + #[tokio::test] + async fn test_config_validation_rejects_invalid_url() { + let mut config = create_test_config_without_protocol(); + config.server_url = "http://https://invalid".to_string(); + + assert!(config.validate().is_err()); + } + + #[tokio::test] + async fn test_webdav_fallback_urls_use_normalized_url() { + let config = create_test_config_without_protocol(); + let fallback_urls = config.webdav_fallback_urls(); + + // All fallback URLs should start with https:// (normalized) + for url in fallback_urls { + assert!(url.starts_with("https://"), "Fallback URL should be normalized: {}", url); + } + } + + #[tokio::test] + async fn test_backward_compatibility_with_existing_protocols() { + // Existing URLs with protocols should work unchanged + let https_config = create_test_config_with_https(); + let http_config = create_test_config_with_http(); + + let https_service = WebDAVService::new(https_config).unwrap(); + let http_service = WebDAVService::new(http_config).unwrap(); + + assert_eq!(https_service.get_effective_server_url(), "https://nas.example.com"); + assert_eq!(http_service.get_effective_server_url(), "http://nas.example.com"); + } + + #[tokio::test] + async fn test_url_construction_with_protocol_detection() { + let config = create_test_config_without_protocol(); + let service = WebDAVService::new(config).unwrap(); + + // Test URL construction for different paths + let test_paths = vec![ + "/Documents/file.pdf", + "Photos/image.jpg", + "/", + "", + ]; + + for path in test_paths { + let url = service.get_url_for_path(path); + // Should start with https:// (normalized default) + assert!(url.starts_with("https://"), "URL should be normalized for path '{}': {}", path, url); + } + } +} \ No newline at end of file diff --git a/src/services/webdav/service.rs b/src/services/webdav/service.rs index 4422edd..0a30a48 100644 --- a/src/services/webdav/service.rs +++ b/src/services/webdav/service.rs @@ -14,6 +14,7 @@ use crate::models::{ WebDAVFolderInfo, }; use crate::webdav_xml_parser::{parse_propfind_response, parse_propfind_response_with_directories}; +use crate::mime_detection::{detect_mime_from_content, update_mime_type_with_content, MimeDetectionResult}; use super::{config::{WebDAVConfig, RetryConfig, ConcurrencyConfig}, SyncProgress}; @@ -24,6 +25,15 @@ pub struct WebDAVDiscoveryResult { pub directories: Vec, } +/// Result of downloading a file with MIME type detection +#[derive(Debug, Clone)] +pub struct WebDAVDownloadResult { + pub content: Vec, + pub file_info: FileIngestionInfo, + pub mime_detection: MimeDetectionResult, + pub mime_type_updated: bool, +} + /// Server capabilities information #[derive(Debug, Clone)] pub struct ServerCapabilities { @@ -135,6 +145,8 @@ pub struct WebDAVService { concurrency_config: ConcurrencyConfig, scan_semaphore: Arc, download_semaphore: Arc, + /// Stores the working protocol (updated after successful protocol detection) + working_protocol: Arc>>, } impl WebDAVService { @@ -173,9 +185,156 @@ impl WebDAVService { concurrency_config, scan_semaphore, download_semaphore, + working_protocol: Arc::new(std::sync::RwLock::new(None)), }) } + // ============================================================================ + // Protocol Detection Methods + // ============================================================================ + + /// Detects the working protocol by trying HTTPS first, then HTTP + /// This method handles smart protocol detection for URLs without explicit protocols + async fn detect_working_protocol(&self) -> Result { + info!("๐Ÿ” Starting smart protocol detection for: {}", self.config.server_url); + + // If URL already has a protocol, use it directly + if self.config.server_url.starts_with("http://") || self.config.server_url.starts_with("https://") { + let protocol = if self.config.server_url.starts_with("https://") { "https" } else { "http" }; + info!("โœ… Protocol already specified: {}", protocol); + return Ok(protocol.to_string()); + } + + // Try HTTPS first (more secure default) + let https_url = format!("https://{}", self.config.server_url.trim()); + info!("๐Ÿ” Trying HTTPS first: {}", https_url); + + match self.test_protocol_connection(&https_url).await { + Ok(()) => { + info!("โœ… HTTPS connection successful"); + // Store the working protocol for future use + if let Ok(mut working_protocol) = self.working_protocol.write() { + *working_protocol = Some("https".to_string()); + } + return Ok("https".to_string()); + } + Err(https_error) => { + warn!("โŒ HTTPS connection failed: {}", https_error); + + // Check if this is a connection-related error (not auth error) + if self.is_connection_error(&https_error) { + info!("๐Ÿ”„ HTTPS failed with connection error, trying HTTP fallback"); + + // Try HTTP fallback + let http_url = format!("http://{}", self.config.server_url.trim()); + info!("๐Ÿ”“ Trying HTTP fallback: {}", http_url); + + match self.test_protocol_connection(&http_url).await { + Ok(()) => { + warn!("โš ๏ธ HTTP connection successful - consider configuring HTTPS for security"); + // Store the working protocol for future use + if let Ok(mut working_protocol) = self.working_protocol.write() { + *working_protocol = Some("http".to_string()); + } + return Ok("http".to_string()); + } + Err(http_error) => { + error!("โŒ Both HTTPS and HTTP failed"); + error!(" HTTPS error: {}", https_error); + error!(" HTTP error: {}", http_error); + return Err(anyhow!( + "Protocol detection failed. Both HTTPS and HTTP connections failed. \ + HTTPS error: {}. HTTP error: {}. \ + Please verify the server URL and ensure WebDAV is properly configured.", + https_error, http_error + )); + } + } + } else { + // Auth or other non-connection error with HTTPS - don't try HTTP + error!("โŒ HTTPS failed with non-connection error (likely auth or server config): {}", https_error); + return Err(anyhow!( + "HTTPS connection failed with authentication or server configuration error: {}. \ + Please check your credentials and server settings.", + https_error + )); + } + } + } + } + + /// Tests connection with a specific protocol URL + async fn test_protocol_connection(&self, full_url: &str) -> Result<()> { + debug!("๐Ÿงช Testing protocol connection to: {}", full_url); + + // Create a temporary config with the full URL for testing + let temp_config = WebDAVConfig { + server_url: full_url.to_string(), + username: self.config.username.clone(), + password: self.config.password.clone(), + watch_folders: self.config.watch_folders.clone(), + file_extensions: self.config.file_extensions.clone(), + timeout_seconds: self.config.timeout_seconds, + server_type: self.config.server_type.clone(), + }; + + // Test basic OPTIONS request + let webdav_url = temp_config.webdav_url(); + debug!("๐Ÿ“ Testing WebDAV URL: {}", webdav_url); + + let response = self.client + .request(Method::OPTIONS, &webdav_url) + .basic_auth(&self.config.username, Some(&self.config.password)) + .send() + .await + .map_err(|e| anyhow!("Connection failed: {}", e))?; + + if !response.status().is_success() { + return Err(anyhow!( + "Protocol test failed with status: {} - {}", + response.status(), + response.text().await.unwrap_or_default() + )); + } + + debug!("โœ… Protocol connection test successful"); + Ok(()) + } + + /// Determines if an error is connection-related (vs auth or other errors) + pub fn is_connection_error(&self, error: &anyhow::Error) -> bool { + let error_str = error.to_string().to_lowercase(); + + // Connection-related errors that suggest trying different protocol + error_str.contains("connection refused") || + error_str.contains("timeout") || + error_str.contains("dns") || + error_str.contains("network") || + error_str.contains("unreachable") || + error_str.contains("tls") || + error_str.contains("ssl") || + error_str.contains("certificate") || + error_str.contains("handshake") + } + + /// Gets the currently working protocol (if detected) + pub fn get_working_protocol(&self) -> Option { + self.working_protocol.read().ok().and_then(|p| p.clone()) + } + + /// Gets the effective server URL with the working protocol + pub fn get_effective_server_url(&self) -> String { + // If we have a detected working protocol, use it + if let Some(protocol) = self.get_working_protocol() { + if !self.config.server_url.starts_with("http://") && !self.config.server_url.starts_with("https://") { + return format!("{}://{}", protocol, self.config.server_url.trim()); + } + } + + // Otherwise use the configured URL (normalized) + WebDAVConfig::normalize_server_url(&self.config.server_url) + } + // ============================================================================ // Connection and Testing Methods // ============================================================================ @@ -194,13 +353,31 @@ impl WebDAVService { }); } - // Test basic connectivity with OPTIONS request + // Perform protocol detection if needed + let working_protocol = match self.detect_working_protocol().await { + Ok(protocol) => { + info!("โœ… Protocol detection successful: {}", protocol); + protocol + } + Err(e) => { + error!("โŒ Protocol detection failed: {}", e); + return Ok(WebDAVConnectionResult { + success: false, + message: format!("Protocol detection failed: {}", e), + server_version: None, + server_type: None, + }); + } + }; + + // Test basic connectivity with OPTIONS request using detected protocol match self.test_options_request().await { Ok((server_version, server_type)) => { - info!("โœ… WebDAV connection successful"); + let effective_url = self.get_effective_server_url(); + info!("โœ… WebDAV connection successful using {} ({})", working_protocol.to_uppercase(), effective_url); Ok(WebDAVConnectionResult { success: true, - message: "Connection successful".to_string(), + message: format!("Connection successful using {}", working_protocol.to_uppercase()), server_version, server_type, }) @@ -235,7 +412,18 @@ impl WebDAVService { /// Performs OPTIONS request to test basic connectivity async fn test_options_request(&self) -> Result<(Option, Option)> { - let webdav_url = self.config.webdav_url(); + // Create a temporary config with the effective server URL for WebDAV operations + let effective_server_url = self.get_effective_server_url(); + let temp_config = WebDAVConfig { + server_url: effective_server_url, + username: self.config.username.clone(), + password: self.config.password.clone(), + watch_folders: self.config.watch_folders.clone(), + file_extensions: self.config.file_extensions.clone(), + timeout_seconds: self.config.timeout_seconds, + server_type: self.config.server_type.clone(), + }; + let webdav_url = temp_config.webdav_url(); let response = self.client .request(Method::OPTIONS, &webdav_url) @@ -304,8 +492,9 @@ impl WebDAVService { /// Tests for Nextcloud-specific capabilities async fn test_nextcloud_capabilities(&self) -> Result<()> { + let effective_server_url = self.get_effective_server_url(); let capabilities_url = format!("{}/ocs/v1.php/cloud/capabilities", - self.config.server_url.trim_end_matches('/')); + effective_server_url.trim_end_matches('/')); let response = self.client .get(&capabilities_url) @@ -592,7 +781,18 @@ impl WebDAVService { /// Gets the WebDAV URL for a specific path pub fn get_url_for_path(&self, path: &str) -> String { - let base_url = self.config.webdav_url(); + // Create a temporary config with the effective server URL + let effective_server_url = self.get_effective_server_url(); + let temp_config = WebDAVConfig { + server_url: effective_server_url, + username: self.config.username.clone(), + password: self.config.password.clone(), + watch_folders: self.config.watch_folders.clone(), + file_extensions: self.config.file_extensions.clone(), + timeout_seconds: self.config.timeout_seconds, + server_type: self.config.server_type.clone(), + }; + let base_url = temp_config.webdav_url(); let clean_path = path.trim_start_matches('/'); let final_url = if clean_path.is_empty() { @@ -652,7 +852,18 @@ impl WebDAVService { /// Convert file paths to the proper URL format for the server pub fn path_to_url(&self, relative_path: &str) -> String { let clean_path = relative_path.trim_start_matches('/'); - let base_url = self.config.webdav_url(); + // Create a temporary config with the effective server URL + let effective_server_url = self.get_effective_server_url(); + let temp_config = WebDAVConfig { + server_url: effective_server_url, + username: self.config.username.clone(), + password: self.config.password.clone(), + watch_folders: self.config.watch_folders.clone(), + file_extensions: self.config.file_extensions.clone(), + timeout_seconds: self.config.timeout_seconds, + server_type: self.config.server_type.clone(), + }; + let base_url = temp_config.webdav_url(); if clean_path.is_empty() { base_url @@ -777,42 +988,64 @@ impl WebDAVService { async fn discover_files_recursive(&self, directory_path: &str) -> Result> { let mut all_files = Vec::new(); let mut directories_to_scan = vec![directory_path.to_string()]; + let mut scanned_directories = std::collections::HashSet::new(); let semaphore = Arc::new(Semaphore::new(self.concurrency_config.max_concurrent_scans)); + debug!("Starting recursive file scan from: {}", directory_path); + while !directories_to_scan.is_empty() { - let current_directories = directories_to_scan.clone(); - directories_to_scan.clear(); + // Take a batch of directories to process + let batch_size = std::cmp::min(directories_to_scan.len(), self.concurrency_config.max_concurrent_scans); + let current_batch: Vec = directories_to_scan.drain(..batch_size).collect(); + + debug!("Processing batch of {} directories, {} remaining in queue", + current_batch.len(), directories_to_scan.len()); // Process directories concurrently - let tasks = current_directories.into_iter().map(|dir| { + let tasks = current_batch.into_iter().filter_map(|dir| { + // Skip if already scanned + if scanned_directories.contains(&dir) { + debug!("Skipping already scanned directory: {}", dir); + return None; + } + scanned_directories.insert(dir.clone()); + let permit = semaphore.clone(); let service = self.clone(); - async move { + Some(async move { let _permit = permit.acquire().await.unwrap(); - service.discover_files_and_directories_single(&dir).await - } + let result = service.discover_files_and_directories_single(&dir).await; + (dir, result) + }) }); let results = futures_util::future::join_all(tasks).await; - for result in results { + for (scanned_dir, result) in results { match result { Ok(discovery_result) => { + debug!("Directory '{}' scan complete: {} files, {} subdirectories", + scanned_dir, discovery_result.files.len(), discovery_result.directories.len()); + all_files.extend(discovery_result.files); // Add subdirectories to the queue for the next iteration for dir in discovery_result.directories { - if dir.is_directory { - directories_to_scan.push(dir.relative_path); + if dir.is_directory && !scanned_directories.contains(&dir.relative_path) { + directories_to_scan.push(dir.relative_path.clone()); + debug!("Added subdirectory to scan queue: {}", dir.relative_path); } } } Err(e) => { - warn!("Failed to scan directory: {}", e); + warn!("Failed to scan directory '{}': {}", scanned_dir, e); } } } + + debug!("Batch complete. Total files found: {}. Queue size: {}", + all_files.len(), directories_to_scan.len()); } info!("Recursive scan completed. Found {} files total", all_files.len()); @@ -908,12 +1141,19 @@ impl WebDAVService { let body = response.text().await?; let all_items = parse_propfind_response_with_directories(&body)?; + // Process the items to convert href to relative paths + let processed_items = self.process_file_infos(all_items); + // Separate files and directories, excluding the parent directory itself let mut files = Vec::new(); let mut directories = Vec::new(); - for item in all_items { - if item.relative_path == directory_path { + for item in processed_items { + // Skip the directory itself (handle both with and without trailing slash) + let normalized_item_path = item.relative_path.trim_end_matches('/'); + let normalized_directory_path = directory_path.trim_end_matches('/'); + + if normalized_item_path == normalized_directory_path { continue; // Skip the directory itself } @@ -933,41 +1173,69 @@ impl WebDAVService { let mut all_files = Vec::new(); let mut all_directories = Vec::new(); let mut directories_to_scan = vec![directory_path.to_string()]; + let mut scanned_directories = std::collections::HashSet::new(); let semaphore = Arc::new(Semaphore::new(self.concurrency_config.max_concurrent_scans)); + debug!("Starting recursive scan from: {}", directory_path); + while !directories_to_scan.is_empty() { - let current_directories = directories_to_scan.clone(); - directories_to_scan.clear(); + // Take a batch of directories to process (limit batch size for better progress tracking) + let batch_size = std::cmp::min(directories_to_scan.len(), self.concurrency_config.max_concurrent_scans); + let current_batch: Vec = directories_to_scan.drain(..batch_size).collect(); + + debug!("Processing batch of {} directories, {} remaining in queue", + current_batch.len(), directories_to_scan.len()); // Process directories concurrently - let tasks = current_directories.into_iter().map(|dir| { + let tasks = current_batch.into_iter().filter_map(|dir| { + // Skip if already scanned (prevent infinite loops) + if scanned_directories.contains(&dir) { + debug!("Skipping already scanned directory: {}", dir); + return None; + } + scanned_directories.insert(dir.clone()); + let permit = semaphore.clone(); let service = self.clone(); - async move { + Some(async move { let _permit = permit.acquire().await.unwrap(); - service.discover_files_and_directories_single(&dir).await - } + let result = service.discover_files_and_directories_single(&dir).await; + (dir, result) + }) }); let results = futures_util::future::join_all(tasks).await; - for result in results { + for (scanned_dir, result) in results { match result { Ok(discovery_result) => { + debug!("Directory '{}' scan complete: {} files, {} subdirectories", + scanned_dir, discovery_result.files.len(), discovery_result.directories.len()); + all_files.extend(discovery_result.files); // Add directories to our results and to the scan queue for dir in discovery_result.directories { - directories_to_scan.push(dir.relative_path.clone()); + // Only add to scan queue if not already scanned + if !scanned_directories.contains(&dir.relative_path) { + directories_to_scan.push(dir.relative_path.clone()); + debug!("Added subdirectory to scan queue: {} (scanned set size: {})", + dir.relative_path, scanned_directories.len()); + } else { + debug!("Skipping already scanned directory: {} (already in scanned set)", dir.relative_path); + } all_directories.push(dir); } } Err(e) => { - warn!("Failed to scan directory: {}", e); + warn!("Failed to scan directory '{}': {}", scanned_dir, e); } } } + + debug!("Batch complete. Total progress: {} files, {} directories found. Queue size: {}", + all_files.len(), all_directories.len(), directories_to_scan.len()); } info!("Recursive scan completed. Found {} files and {} directories", all_files.len(), all_directories.len()); @@ -1172,6 +1440,131 @@ impl WebDAVService { Ok(results) } + /// Downloads a file with enhanced MIME type detection based on content + /// + /// This method downloads the file and performs content-based MIME type detection + /// using magic bytes, providing more accurate type identification than the initial + /// discovery phase which only has access to filenames and server-provided types. + /// + /// # Arguments + /// * `file_info` - The file information from WebDAV discovery + /// + /// # Returns + /// A `WebDAVDownloadResult` containing the file content, updated file info, and MIME detection details + pub async fn download_file_with_mime_detection(&self, file_info: &FileIngestionInfo) -> Result { + let _permit = self.download_semaphore.acquire().await?; + + debug!("โฌ‡๏ธ๐Ÿ” Downloading file with MIME detection: {}", file_info.relative_path); + + // Use the relative path directly since it's already processed + let relative_path = &file_info.relative_path; + let url = self.get_url_for_path(&relative_path); + + let response = self.authenticated_request( + reqwest::Method::GET, + &url, + None, + None, + ).await?; + + if !response.status().is_success() { + return Err(anyhow!( + "Failed to download file '{}': HTTP {}", + file_info.relative_path, + response.status() + )); + } + + // Get server-provided content type from response headers + let server_content_type = response + .headers() + .get("content-type") + .and_then(|header| header.to_str().ok()) + .map(|s| s.split(';').next().unwrap_or(s).trim().to_string()); // Remove charset info and convert to owned + + let content = response.bytes().await?; + debug!("โœ… Downloaded {} bytes for file: {}", content.len(), file_info.relative_path); + + // Perform content-based MIME type detection + let mime_detection_result = detect_mime_from_content( + &content, + &file_info.name, + server_content_type.as_deref() + ); + + // Check if MIME type should be updated + let mime_type_updated = mime_detection_result.mime_type != file_info.mime_type; + + if mime_type_updated { + debug!("๐Ÿ”„ MIME type updated for {}: '{}' -> '{}' (method: {:?}, confidence: {:?})", + file_info.name, + file_info.mime_type, + mime_detection_result.mime_type, + mime_detection_result.detection_method, + mime_detection_result.confidence); + } else { + debug!("โœ… MIME type confirmed for {}: '{}' (method: {:?}, confidence: {:?})", + file_info.name, + mime_detection_result.mime_type, + mime_detection_result.detection_method, + mime_detection_result.confidence); + } + + // Create updated file info if MIME type changed + let updated_file_info = if mime_type_updated { + let mut updated = file_info.clone(); + updated.mime_type = mime_detection_result.mime_type.clone(); + updated + } else { + file_info.clone() + }; + + Ok(WebDAVDownloadResult { + content: content.to_vec(), + file_info: updated_file_info, + mime_detection: mime_detection_result, + mime_type_updated, + }) + } + + /// Downloads multiple files with MIME type detection concurrently + /// + /// Similar to `download_files` but includes content-based MIME type detection + /// for each downloaded file. + /// + /// # Arguments + /// * `files` - The files to download + /// + /// # Returns + /// A vector of tuples containing the original file info and download result + pub async fn download_files_with_mime_detection(&self, files: &[FileIngestionInfo]) -> Result)>> { + info!("โฌ‡๏ธ๐Ÿ” Downloading {} files with MIME detection concurrently", files.len()); + + let tasks = files.iter().map(|file| { + let file_clone = file.clone(); + let service_clone = self.clone(); + + async move { + let result = service_clone.download_file_with_mime_detection(&file_clone).await; + (file_clone, result) + } + }); + + let results = futures_util::future::join_all(tasks).await; + + let success_count = results.iter().filter(|(_, result)| result.is_ok()).count(); + let failure_count = results.len() - success_count; + let mime_updated_count = results.iter() + .filter_map(|(_, result)| result.as_ref().ok()) + .filter(|download_result| download_result.mime_type_updated) + .count(); + + info!("๐Ÿ“Š Download with MIME detection completed: {} successful, {} failed, {} MIME types updated", + success_count, failure_count, mime_updated_count); + + Ok(results) + } + /// Gets file metadata without downloading content pub async fn get_file_metadata(&self, file_path: &str) -> Result { debug!("๐Ÿ“‹ Getting metadata for file: {}", file_path); @@ -1226,9 +1619,21 @@ impl WebDAVService { pub async fn get_server_capabilities(&self) -> Result { debug!("๐Ÿ” Checking server capabilities"); + // Create a temporary config with the effective server URL + let effective_server_url = self.get_effective_server_url(); + let temp_config = WebDAVConfig { + server_url: effective_server_url, + username: self.config.username.clone(), + password: self.config.password.clone(), + watch_folders: self.config.watch_folders.clone(), + file_extensions: self.config.file_extensions.clone(), + timeout_seconds: self.config.timeout_seconds, + server_type: self.config.server_type.clone(), + }; + let options_response = self.authenticated_request( reqwest::Method::OPTIONS, - &self.config.webdav_url(), + &temp_config.webdav_url(), None, None, ).await?; @@ -1550,6 +1955,7 @@ impl WebDAVService { } } + // Implement Clone to allow sharing the service impl Clone for WebDAVService { fn clone(&self) -> Self { @@ -1560,6 +1966,7 @@ impl Clone for WebDAVService { concurrency_config: self.concurrency_config.clone(), scan_semaphore: Arc::clone(&self.scan_semaphore), download_semaphore: Arc::clone(&self.download_semaphore), + working_protocol: Arc::clone(&self.working_protocol), } } } diff --git a/src/services/webdav/tests/mod.rs b/src/services/webdav/tests/mod.rs index 50595e6..8e7ebb6 100644 --- a/src/services/webdav/tests/mod.rs +++ b/src/services/webdav/tests/mod.rs @@ -1,2 +1,3 @@ pub mod etag_comparison_tests; -pub mod deletion_detection_tests; \ No newline at end of file +pub mod deletion_detection_tests; +pub mod path_processing_tests; \ No newline at end of file diff --git a/src/services/webdav/tests/path_processing_tests.rs b/src/services/webdav/tests/path_processing_tests.rs new file mode 100644 index 0000000..23bb659 --- /dev/null +++ b/src/services/webdav/tests/path_processing_tests.rs @@ -0,0 +1,452 @@ +#[cfg(test)] +mod path_processing_tests { + use crate::models::FileIngestionInfo; + use crate::services::webdav::{WebDAVConfig, WebDAVService}; + use crate::webdav_xml_parser::parse_propfind_response_with_directories; + use wiremock::{ + matchers::{method, path, header}, + Mock, MockServer, ResponseTemplate, + }; + + /// Creates a test WebDAV service with mock server + fn create_test_service(mock_server_url: &str) -> WebDAVService { + let config = WebDAVConfig { + server_url: mock_server_url.to_string(), + username: "testuser".to_string(), + password: "testpass".to_string(), + watch_folders: vec!["/TestDocuments".to_string()], + file_extensions: vec!["pdf".to_string(), "txt".to_string()], + timeout_seconds: 30, + server_type: Some("nextcloud".to_string()), + }; + WebDAVService::new(config).expect("Failed to create test service") + } + + /// Mock WebDAV PROPFIND response with directories and files + fn mock_propfind_response() -> String { + r#" + + + /remote.php/dav/files/testuser/TestDocuments/ + + + TestDocuments + Tue, 29 Jul 2025 01:34:17 GMT + "parent123etag" + + + HTTP/1.1 200 OK + + + + /remote.php/dav/files/testuser/TestDocuments/SubDir1/ + + + SubDir1 + Fri, 20 Jun 2025 23:35:17 GMT + "subdir1etag" + + + HTTP/1.1 200 OK + + + + /remote.php/dav/files/testuser/TestDocuments/SubDir2/ + + + SubDir2 + Tue, 29 Jul 2025 01:34:17 GMT + "subdir2etag" + + + HTTP/1.1 200 OK + + + + /remote.php/dav/files/testuser/TestDocuments/test.pdf + + + test.pdf + Thu, 24 Jul 2025 19:16:19 GMT + "fileetag123" + 1234567 + + + HTTP/1.1 200 OK + + +"#.to_string() + } + + /// Mock WebDAV response for empty directory + fn mock_empty_directory_response() -> String { + r#" + + + /remote.php/dav/files/testuser/TestDocuments/SubDir1/ + + + SubDir1 + Fri, 20 Jun 2025 23:35:17 GMT + "subdir1etag" + + + HTTP/1.1 200 OK + + +"#.to_string() + } + + #[test] + fn test_xml_parser_returns_temp_paths() { + // This test ensures the XML parser behavior is documented + let xml_response = mock_propfind_response(); + let parsed_items = parse_propfind_response_with_directories(&xml_response) + .expect("Failed to parse XML response"); + + // All parsed items should have relative_path as "TEMP" initially + for item in &parsed_items { + assert_eq!(item.relative_path, "TEMP", + "XML parser should set relative_path to TEMP for processing by discovery layer"); + } + + // Should find the correct number of items + assert_eq!(parsed_items.len(), 4, "Should parse all 4 items from XML"); + + // Verify we get both directories and files + let directories: Vec<_> = parsed_items.iter().filter(|i| i.is_directory).collect(); + let files: Vec<_> = parsed_items.iter().filter(|i| !i.is_directory).collect(); + + assert_eq!(directories.len(), 3, "Should find 3 directories"); + assert_eq!(files.len(), 1, "Should find 1 file"); + } + + #[test] + fn test_path_processing_converts_temp_to_relative_paths() { + let service = create_test_service("http://test.example.com"); + + // Create mock parsed items with TEMP paths (simulating XML parser output) + let mock_items = vec![ + FileIngestionInfo { + relative_path: "TEMP".to_string(), + full_path: "/remote.php/dav/files/testuser/TestDocuments/".to_string(), + #[allow(deprecated)] + path: "/remote.php/dav/files/testuser/TestDocuments/".to_string(), + name: "TestDocuments".to_string(), + size: 0, + mime_type: "application/octet-stream".to_string(), + last_modified: None, + etag: "parent123etag".to_string(), + is_directory: true, + created_at: None, + permissions: None, + owner: None, + group: None, + metadata: None, + }, + FileIngestionInfo { + relative_path: "TEMP".to_string(), + full_path: "/remote.php/dav/files/testuser/TestDocuments/SubDir1/".to_string(), + #[allow(deprecated)] + path: "/remote.php/dav/files/testuser/TestDocuments/SubDir1/".to_string(), + name: "SubDir1".to_string(), + size: 0, + mime_type: "application/octet-stream".to_string(), + last_modified: None, + etag: "subdir1etag".to_string(), + is_directory: true, + created_at: None, + permissions: None, + owner: None, + group: None, + metadata: None, + }, + ]; + + // Process the items + let processed_items = service.process_file_infos(mock_items); + + // Verify paths are correctly converted + assert_eq!(processed_items[0].relative_path, "/TestDocuments/"); + assert_eq!(processed_items[1].relative_path, "/TestDocuments/SubDir1/"); + + // Verify full_path remains unchanged + assert_eq!(processed_items[0].full_path, "/remote.php/dav/files/testuser/TestDocuments/"); + assert_eq!(processed_items[1].full_path, "/remote.php/dav/files/testuser/TestDocuments/SubDir1/"); + } + + #[test] + fn test_directory_filtering_excludes_parent() { + // Create processed items including parent directory + let processed_items = vec![ + FileIngestionInfo { + relative_path: "/TestDocuments/".to_string(), + full_path: "/remote.php/dav/files/testuser/TestDocuments/".to_string(), + #[allow(deprecated)] + path: "/TestDocuments/".to_string(), + name: "TestDocuments".to_string(), + size: 0, + mime_type: "application/octet-stream".to_string(), + last_modified: None, + etag: "parent123etag".to_string(), + is_directory: true, + created_at: None, + permissions: None, + owner: None, + group: None, + metadata: None, + }, + FileIngestionInfo { + relative_path: "/TestDocuments/SubDir1/".to_string(), + full_path: "/remote.php/dav/files/testuser/TestDocuments/SubDir1/".to_string(), + #[allow(deprecated)] + path: "/TestDocuments/SubDir1/".to_string(), + name: "SubDir1".to_string(), + size: 0, + mime_type: "application/octet-stream".to_string(), + last_modified: None, + etag: "subdir1etag".to_string(), + is_directory: true, + created_at: None, + permissions: None, + owner: None, + group: None, + metadata: None, + }, + ]; + + // Simulate the filtering logic from discover_files_and_directories_single_with_url + let directory_path = "/TestDocuments"; + let mut files = Vec::new(); + let mut directories = Vec::new(); + + for item in processed_items { + // Skip the directory itself (handle both with and without trailing slash) + let normalized_item_path = item.relative_path.trim_end_matches('/'); + let normalized_directory_path = directory_path.trim_end_matches('/'); + + if normalized_item_path == normalized_directory_path { + continue; // Skip the directory itself + } + + if item.is_directory { + directories.push(item); + } else { + files.push(item); + } + } + + // Should exclude parent directory but include subdirectory + assert_eq!(files.len(), 0); + assert_eq!(directories.len(), 1); + assert_eq!(directories[0].relative_path, "/TestDocuments/SubDir1/"); + } + + #[tokio::test] + async fn test_single_directory_discovery_integration() { + let mock_server = MockServer::start().await; + + // Mock the PROPFIND request + Mock::given(method("PROPFIND")) + .and(path("/remote.php/dav/files/testuser/TestDocuments")) + .and(header("depth", "1")) + .and(header("content-type", "application/xml")) + .respond_with( + ResponseTemplate::new(207) + .set_body_string(mock_propfind_response()) + .insert_header("content-type", "application/xml") + ) + .mount(&mock_server) + .await; + + let service = create_test_service(&mock_server.uri()); + + // Test single directory discovery + let result = service.discover_files_and_directories("/TestDocuments", false).await + .expect("Single directory discovery should succeed"); + + // Verify results + assert_eq!(result.files.len(), 1, "Should find 1 file"); + assert_eq!(result.directories.len(), 2, "Should find 2 directories (excluding parent)"); + + // Verify directory paths are correct (not TEMP) + let dir_paths: Vec<&String> = result.directories.iter().map(|d| &d.relative_path).collect(); + assert!(dir_paths.contains(&&"/TestDocuments/SubDir1/".to_string())); + assert!(dir_paths.contains(&&"/TestDocuments/SubDir2/".to_string())); + + // Verify no directory has TEMP path + for dir in &result.directories { + assert_ne!(dir.relative_path, "TEMP", "Directory path should not be TEMP"); + } + + // Verify file path is correct + assert_eq!(result.files[0].relative_path, "/TestDocuments/test.pdf"); + assert_ne!(result.files[0].relative_path, "TEMP", "File path should not be TEMP"); + } + + #[tokio::test] + async fn test_recursive_directory_discovery_integration() { + let mock_server = MockServer::start().await; + + // Mock the initial PROPFIND request for root directory + Mock::given(method("PROPFIND")) + .and(path("/remote.php/dav/files/testuser/TestDocuments")) + .and(header("depth", "1")) + .and(header("content-type", "application/xml")) + .respond_with( + ResponseTemplate::new(207) + .set_body_string(mock_propfind_response()) + .insert_header("content-type", "application/xml") + ) + .mount(&mock_server) + .await; + + // Mock PROPFIND requests for subdirectories (return empty for simplicity) + Mock::given(method("PROPFIND")) + .and(path("/remote.php/dav/files/testuser/TestDocuments/SubDir1")) + .and(header("depth", "1")) + .and(header("content-type", "application/xml")) + .respond_with( + ResponseTemplate::new(207) + .set_body_string(mock_empty_directory_response()) + .insert_header("content-type", "application/xml") + ) + .mount(&mock_server) + .await; + + Mock::given(method("PROPFIND")) + .and(path("/remote.php/dav/files/testuser/TestDocuments/SubDir2")) + .and(header("depth", "1")) + .and(header("content-type", "application/xml")) + .respond_with( + ResponseTemplate::new(207) + .set_body_string(mock_empty_directory_response()) + .insert_header("content-type", "application/xml") + ) + .mount(&mock_server) + .await; + + let service = create_test_service(&mock_server.uri()); + + // Test recursive directory discovery + let result = service.discover_files_and_directories("/TestDocuments", true).await + .expect("Recursive directory discovery should succeed"); + + // Verify results + assert_eq!(result.files.len(), 1, "Should find 1 file"); + assert_eq!(result.directories.len(), 2, "Should find 2 directories (excluding parents)"); + + // Verify no paths are TEMP + for item in result.files.iter().chain(result.directories.iter()) { + assert_ne!(item.relative_path, "TEMP", "Paths should be processed, not TEMP"); + assert!(item.relative_path.starts_with("/TestDocuments"), + "All paths should start with /TestDocuments, got: {}", item.relative_path); + } + } + + #[test] + fn test_href_to_relative_path_conversion() { + let service = create_test_service("http://test.example.com"); + + // Test Nextcloud path conversion + assert_eq!( + service.href_to_relative_path("/remote.php/dav/files/testuser/Documents/file.pdf"), + "/Documents/file.pdf" + ); + + assert_eq!( + service.href_to_relative_path("/remote.php/dav/files/testuser/"), + "/" + ); + + assert_eq!( + service.href_to_relative_path("/remote.php/dav/files/testuser/Deep/Nested/Path/"), + "/Deep/Nested/Path/" + ); + } + + #[test] + fn test_url_construction() { + let service = create_test_service("http://test.example.com"); + + // Test URL construction for different paths + assert_eq!( + service.get_url_for_path("/TestDocuments"), + "http://test.example.com/remote.php/dav/files/testuser/TestDocuments" + ); + + assert_eq!( + service.get_url_for_path("/TestDocuments/SubDir"), + "http://test.example.com/remote.php/dav/files/testuser/TestDocuments/SubDir" + ); + + assert_eq!( + service.get_url_for_path("/"), + "http://test.example.com/remote.php/dav/files/testuser" + ); + } + + #[test] + fn test_regression_temp_paths_are_processed() { + // Regression test: Ensure TEMP paths from XML parser are always processed + let service = create_test_service("http://test.example.com"); + + // Simulate the exact scenario that caused the bug + let raw_xml_items = vec![ + FileIngestionInfo { + relative_path: "TEMP".to_string(), // This is what XML parser returns + full_path: "/remote.php/dav/files/testuser/TestDocuments/ImportantFolder/".to_string(), + #[allow(deprecated)] + path: "/remote.php/dav/files/testuser/TestDocuments/ImportantFolder/".to_string(), + name: "ImportantFolder".to_string(), + size: 0, + mime_type: "application/octet-stream".to_string(), + last_modified: None, + etag: "folder123etag".to_string(), + is_directory: true, + created_at: None, + permissions: None, + owner: None, + group: None, + metadata: None, + } + ]; + + // Process items as the service should do + let processed_items = service.process_file_infos(raw_xml_items); + + // Verify the bug is fixed + assert_eq!(processed_items.len(), 1); + assert_ne!(processed_items[0].relative_path, "TEMP", + "REGRESSION: relative_path should not remain as TEMP after processing"); + assert_eq!(processed_items[0].relative_path, "/TestDocuments/ImportantFolder/", + "relative_path should be properly converted from href"); + } + + #[tokio::test] + async fn test_discover_files_and_directories_processes_paths() { + // Integration test to ensure discover_files_and_directories always processes paths + let mock_server = MockServer::start().await; + + Mock::given(method("PROPFIND")) + .and(path("/remote.php/dav/files/testuser/TestDocuments")) + .respond_with( + ResponseTemplate::new(207) + .set_body_string(mock_propfind_response()) + .insert_header("content-type", "application/xml") + ) + .mount(&mock_server) + .await; + + let service = create_test_service(&mock_server.uri()); + + let result = service.discover_files_and_directories("/TestDocuments", false).await + .expect("Discovery should succeed"); + + // Ensure no items have TEMP paths (regression test) + for item in result.files.iter().chain(result.directories.iter()) { + assert_ne!(item.relative_path, "TEMP", + "REGRESSION: No items should have TEMP paths after discovery"); + } + } +} \ No newline at end of file diff --git a/src/webdav_xml_parser.rs b/src/webdav_xml_parser.rs index 10bb5aa..1376855 100644 --- a/src/webdav_xml_parser.rs +++ b/src/webdav_xml_parser.rs @@ -6,6 +6,7 @@ use std::str; use serde_json; use crate::models::FileIngestionInfo; +use crate::mime_detection::{detect_mime_for_discovery, DetectionStrategy}; #[derive(Debug, Default)] struct PropFindResponse { @@ -200,6 +201,14 @@ pub fn parse_propfind_response(xml_text: &str) -> Result> // Use the metadata collected during parsing let metadata = resp.metadata; + // Determine MIME type using improved detection + let mime_detection_result = detect_mime_for_discovery( + &name, + resp.content_type.as_deref(), + DetectionStrategy::Comprehensive + ); + let mime_type = mime_detection_result.mime_type; + let file_info = FileIngestionInfo { relative_path: "TEMP".to_string(), // Will be set by discovery layer full_path: resp.href.clone(), @@ -207,7 +216,7 @@ pub fn parse_propfind_response(xml_text: &str) -> Result> path: resp.href.clone(), // Legacy field - keep for compatibility name, size: resp.content_length.unwrap_or(0), - mime_type: resp.content_type.unwrap_or_else(|| "application/octet-stream".to_string()), + mime_type, last_modified: parse_http_date(&resp.last_modified.unwrap_or_default()), etag: resp.etag.unwrap_or_else(|| format!("\"{}\"", uuid::Uuid::new_v4())), is_directory: false, @@ -418,6 +427,18 @@ pub fn parse_propfind_response_with_directories(xml_text: &str) -> Result Result Arc { + let database_url = std::env::var("TEST_DATABASE_URL") + .or_else(|_| std::env::var("DATABASE_URL")) + .unwrap_or_else(|_| "postgresql://readur:readur@localhost:5432/readur".to_string()); + + let config = Config { + database_url, + server_address: "127.0.0.1:8080".to_string(), + jwt_secret: "test_secret_for_sync_cancellation".to_string(), + upload_path: "/tmp/test_uploads_sync_cancel".to_string(), + watch_folder: "/tmp/watch_sync_cancel".to_string(), + allowed_file_types: vec!["pdf".to_string(), "txt".to_string(), "jpg".to_string(), "png".to_string()], + watch_interval_seconds: Some(30), + file_stability_check_ms: Some(500), + max_file_age_hours: Some(24), + ocr_language: "eng".to_string(), + concurrent_ocr_jobs: 2, + ocr_timeout_seconds: 60, + max_file_size_mb: 50, + memory_limit_mb: 256, + cpu_priority: "normal".to_string(), + oidc_enabled: false, + oidc_client_id: None, + oidc_client_secret: None, + oidc_issuer_url: None, + oidc_redirect_uri: None, + }; + + let db = Database::new(&config.database_url).await.unwrap(); + let queue_service = Arc::new(readur::ocr::queue::OcrQueueService::new( + db.clone(), + db.pool.clone(), + 2, + )); + + let sync_progress_tracker = Arc::new(readur::services::sync_progress_tracker::SyncProgressTracker::new()); + + // Create initial app state + let mut app_state = AppState { + db: db.clone(), + config, + webdav_scheduler: None, + source_scheduler: None, + queue_service, + oidc_client: None, + sync_progress_tracker, + }; + + // Wrap in Arc for sharing + let state_arc = Arc::new(app_state); + + // Create the real source scheduler + let source_scheduler = Arc::new(readur::scheduling::source_scheduler::SourceScheduler::new(state_arc.clone())); + + // Now we need to update the AppState with the scheduler + // Since AppState is already wrapped in Arc, we need to use a different approach + // Let's create a new AppState with the scheduler + Arc::new(AppState { + db: state_arc.db.clone(), + config: state_arc.config.clone(), + webdav_scheduler: None, + source_scheduler: Some(source_scheduler), + queue_service: state_arc.queue_service.clone(), + oidc_client: None, + sync_progress_tracker: state_arc.sync_progress_tracker.clone(), + }) +} + +/// Create a test user for sync cancellation tests +async fn create_test_user(state: &AppState) -> User { + let user_id = Uuid::new_v4(); + let create_user = CreateUser { + username: format!("testuser_sync_cancel_{}", user_id), + email: format!("testuser_sync_cancel_{}@example.com", user_id), + password: "test_password".to_string(), + role: Some(UserRole::Admin), + }; + + state.db.create_user(create_user).await.unwrap() +} + +/// Create a test WebDAV source for cancellation testing +/// Uses a non-existent server so sync will fail, but we can test the cancellation workflow +async fn create_test_webdav_source(state: &AppState, user_id: Uuid, name: &str) -> Source { + let create_source = CreateSource { + name: name.to_string(), + source_type: SourceType::WebDAV, + enabled: Some(true), + config: json!({ + "server_url": "https://test-webdav-server-for-cancellation-testing.example.com/remote.php/webdav", + "username": "test_user", + "password": "test_password", + "watch_folders": ["/TestFolder"], + "file_extensions": [".pdf", ".txt", ".jpg", ".png"], + "auto_sync": false, + "sync_interval_minutes": 60, + "server_type": "nextcloud" + }), + }; + + state.db.create_source(user_id, &create_source).await.unwrap() +} + +/// Wait for a source to reach a specific status with timeout +async fn wait_for_source_status( + state: &AppState, + user_id: Uuid, + source_id: Uuid, + expected_status: SourceStatus, + timeout_ms: u64 +) -> bool { + let start_time = std::time::Instant::now(); + let timeout_duration = Duration::from_millis(timeout_ms); + + while start_time.elapsed() < timeout_duration { + if let Ok(Some(source)) = state.db.get_source(user_id, source_id).await { + if source.status == expected_status { + return true; + } + } + sleep(Duration::from_millis(100)).await; + } + false +} + +/// Wait for sync to actually start (status becomes Syncing) +async fn wait_for_sync_to_start( + state: &AppState, + user_id: Uuid, + source_id: Uuid, + timeout_ms: u64 +) -> bool { + wait_for_source_status(state, user_id, source_id, SourceStatus::Syncing, timeout_ms).await +} + +/// Wait for sync to stop (status becomes Idle or Error) +async fn wait_for_sync_to_stop( + state: &AppState, + user_id: Uuid, + source_id: Uuid, + timeout_ms: u64 +) -> bool { + let start_time = std::time::Instant::now(); + let timeout_duration = Duration::from_millis(timeout_ms); + + while start_time.elapsed() < timeout_duration { + if let Ok(Some(source)) = state.db.get_source(user_id, source_id).await { + if matches!(source.status, SourceStatus::Idle | SourceStatus::Error) { + return true; + } + } + sleep(Duration::from_millis(100)).await; + } + false +} + +/// Create HTTP client for API testing +fn create_test_app(state: Arc) -> Router { + use axum::{routing::get, Router}; + + Router::new() + .route("/api/health", get(readur::health_check)) + .nest("/api/auth", readur::routes::auth::router()) + .nest("/api/documents", readur::routes::documents::router()) + .nest("/api/ignored-files", readur::routes::ignored_files::ignored_files_routes()) + .nest("/api/labels", readur::routes::labels::router()) + .nest("/api/metrics", readur::routes::metrics::router()) + .nest("/metrics", readur::routes::prometheus_metrics::router()) + .nest("/api/notifications", readur::routes::notifications::router()) + .nest("/api/ocr", readur::routes::ocr::router()) + .nest("/api/queue", readur::routes::queue::router()) + .nest("/api/search", readur::routes::search::router()) + .nest("/api/settings", readur::routes::settings::router()) + .nest("/api/sources", readur::routes::sources::router()) + .nest("/api/users", readur::routes::users::router()) + .nest("/api/webdav", readur::routes::webdav::router()) + .with_state(state) +} + +/// Create authorization header for test user +fn create_auth_header(user: &User, jwt_secret: &str) -> String { + let claims = Claims { + sub: user.id, + username: user.username.clone(), + exp: (chrono::Utc::now() + chrono::Duration::hours(24)).timestamp() as usize, + }; + + let token = jsonwebtoken::encode( + &jsonwebtoken::Header::default(), + &claims, + &jsonwebtoken::EncodingKey::from_secret(jwt_secret.as_ref()), + ).unwrap(); + + format!("Bearer {}", token) +} + +#[tokio::test] +async fn test_complete_sync_cancellation_workflow() { + println!("๐Ÿงช Testing complete sync cancellation workflow"); + + let state = create_test_app_state().await; + let user = create_test_user(&state).await; + let source = create_test_webdav_source(&state, user.id, "Test Cancellation Source").await; + let app = create_test_app(state.clone()); + + // Create auth header + let auth_header = create_auth_header(&user, &state.config.jwt_secret); + + println!("โœ… Created test user and source: {}", source.id); + + // Step 1: Verify source is initially idle + let response = app + .clone() + .oneshot( + Request::builder() + .method("GET") + .uri(format!("/api/sources/{}/sync/status", source.id)) + .header("Authorization", &auth_header) + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(response.status(), StatusCode::OK); + println!("โœ… Source is initially idle"); + + // Step 2: Start sync using the real scheduler + let response = app + .clone() + .oneshot( + Request::builder() + .method("POST") + .uri(format!("/api/sources/{}/sync", source.id)) + .header("Authorization", &auth_header) + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + let actual_status = response.status(); + println!("๐Ÿ” Sync start actual status: {}", actual_status); + + // With real scheduler, should return OK (unless already running) + assert!(matches!(actual_status, StatusCode::OK | StatusCode::CONFLICT)); + println!("โœ… Sync start request completed with status: {}", actual_status); + + // Step 3: Wait for sync to actually start (with real scheduler) + let sync_started = wait_for_sync_to_start(&state, user.id, source.id, 5000).await; + if sync_started { + println!("โœ… Sync actually started - status changed to Syncing"); + + // Give it a moment to establish the sync + sleep(Duration::from_millis(500)).await; + } else { + println!("โš ๏ธ Sync did not start within timeout (may fail quickly due to invalid server)"); + // The sync might fail immediately due to invalid server, which is fine for testing cancellation + } + + // Step 4: Cancel the sync + let response = app + .clone() + .oneshot( + Request::builder() + .method("POST") + .uri(format!("/api/sources/{}/sync/stop", source.id)) + .header("Authorization", &auth_header) + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(response.status(), StatusCode::OK); + println!("โœ… Sync cancellation request successful"); + + // Step 5: Wait for sync to actually stop with real scheduler + let sync_stopped = wait_for_sync_to_stop(&state, user.id, source.id, 10000).await; + if sync_stopped { + println!("โœ… Sync actually stopped - status changed to Idle/Error"); + } else { + println!("โš ๏ธ Sync did not stop within timeout, checking current status"); + } + + let source_after_cancel = state.db.get_source(user.id, source.id).await.unwrap().unwrap(); + println!("โœ… Source status after cancellation: {:?}", source_after_cancel.status); + + // With real scheduler, we should see proper status transitions + assert!(matches!(source_after_cancel.status, SourceStatus::Idle | SourceStatus::Error), + "Source should be Idle or Error after cancellation, got: {:?}", source_after_cancel.status); + + // Step 6: Verify sync status API shows no active sync + let response = app + .clone() + .oneshot( + Request::builder() + .method("GET") + .uri(format!("/api/sources/{}/sync/status", source.id)) + .header("Authorization", &auth_header) + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(response.status(), StatusCode::OK); + println!("โœ… Sync status API accessible after cancellation"); + + // Cleanup + state.db.delete_source(user.id, source.id).await.unwrap(); + state.db.delete_user(user.id).await.unwrap(); + + println!("๐ŸŽ‰ Complete sync cancellation workflow test passed"); +} + +#[tokio::test] +async fn test_multiple_cancellation_requests() { + println!("๐Ÿงช Testing multiple cancellation requests handling"); + + let state = create_test_app_state().await; + let user = create_test_user(&state).await; + let source = create_test_webdav_source(&state, user.id, "Multiple Cancel Test Source").await; + let app = create_test_app(state.clone()); + + let auth_header = create_auth_header(&user, &state.config.jwt_secret); + + println!("โœ… Created test setup for multiple cancellation test"); + + // Start sync + let response = app + .clone() + .oneshot( + Request::builder() + .method("POST") + .uri(format!("/api/sources/{}/sync", source.id)) + .header("Authorization", &auth_header) + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + println!("โœ… Sync started with status: {}", response.status()); + + // Wait briefly + sleep(Duration::from_millis(200)).await; + + // Send multiple cancellation requests concurrently + let mut cancel_handles = Vec::new(); + + for i in 0..3 { + let app_clone = app.clone(); + let auth_header_clone = auth_header.clone(); + let source_id = source.id; + + let handle = tokio::spawn(async move { + let response = app_clone + .oneshot( + Request::builder() + .method("POST") + .uri(format!("/api/sources/{}/sync/stop", source_id)) + .header("Authorization", auth_header_clone) + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + println!("โœ… Cancellation request {} completed with status: {}", i + 1, response.status()); + response.status() + }); + + cancel_handles.push(handle); + } + + // Wait for all cancellation requests to complete + let mut success_count = 0; + for handle in cancel_handles { + let status = handle.await.unwrap(); + if status == StatusCode::OK { + success_count += 1; + } + } + + // All cancellation requests should succeed (idempotent) + assert_eq!(success_count, 3); + println!("โœ… All {} cancellation requests succeeded", success_count); + + // Verify final state + sleep(Duration::from_millis(1000)).await; + let final_source = state.db.get_source(user.id, source.id).await.unwrap().unwrap(); + println!("โœ… Final source status: {:?}", final_source.status); + + // Cleanup + state.db.delete_source(user.id, source.id).await.unwrap(); + state.db.delete_user(user.id).await.unwrap(); + + println!("๐ŸŽ‰ Multiple cancellation requests test passed"); +} + +#[tokio::test] +async fn test_cancellation_without_active_sync() { + println!("๐Ÿงช Testing cancellation when no sync is active"); + + let state = create_test_app_state().await; + let user = create_test_user(&state).await; + let source = create_test_webdav_source(&state, user.id, "No Active Sync Source").await; + let app = create_test_app(state.clone()); + + let auth_header = create_auth_header(&user, &state.config.jwt_secret); + + println!("โœ… Created test setup for no active sync test"); + + // Verify source is idle + let initial_source = state.db.get_source(user.id, source.id).await.unwrap().unwrap(); + assert_eq!(initial_source.status, SourceStatus::Idle); + println!("โœ… Source is initially idle: {:?}", initial_source.status); + + // Try to cancel sync when none is active + let response = app + .clone() + .oneshot( + Request::builder() + .method("POST") + .uri(format!("/api/sources/{}/sync/stop", source.id)) + .header("Authorization", &auth_header) + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + // Should succeed (idempotent behavior) + assert_eq!(response.status(), StatusCode::OK); + println!("โœ… Cancellation without active sync succeeded"); + + // Verify source remains idle + let final_source = state.db.get_source(user.id, source.id).await.unwrap().unwrap(); + assert_eq!(final_source.status, SourceStatus::Idle); + println!("โœ… Source remains idle after cancellation: {:?}", final_source.status); + + // Cleanup + state.db.delete_source(user.id, source.id).await.unwrap(); + state.db.delete_user(user.id).await.unwrap(); + + println!("๐ŸŽ‰ Cancellation without active sync test passed"); +} + +#[tokio::test] +async fn test_sync_status_monitoring_during_cancellation() { + println!("๐Ÿงช Testing sync status monitoring during cancellation"); + + let state = create_test_app_state().await; + let user = create_test_user(&state).await; + let source = create_test_webdav_source(&state, user.id, "Status Monitor Source").await; + let app = create_test_app(state.clone()); + + let auth_header = create_auth_header(&user, &state.config.jwt_secret); + + println!("โœ… Created test setup for status monitoring test"); + + // Start sync + let response = app + .clone() + .oneshot( + Request::builder() + .method("POST") + .uri(format!("/api/sources/{}/sync", source.id)) + .header("Authorization", &auth_header) + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + println!("โœ… Sync started with status: {}", response.status()); + + // Monitor sync status before cancellation + let response = app + .clone() + .oneshot( + Request::builder() + .method("GET") + .uri(format!("/api/sources/{}/sync/status", source.id)) + .header("Authorization", &auth_header) + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(response.status(), StatusCode::OK); + println!("โœ… Sync status API accessible before cancellation"); + + // Cancel sync + let response = app + .clone() + .oneshot( + Request::builder() + .method("POST") + .uri(format!("/api/sources/{}/sync/stop", source.id)) + .header("Authorization", &auth_header) + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(response.status(), StatusCode::OK); + println!("โœ… Sync cancellation successful"); + + // Monitor sync status after cancellation + sleep(Duration::from_millis(500)).await; + + let response = app + .clone() + .oneshot( + Request::builder() + .method("GET") + .uri(format!("/api/sources/{}/sync/status", source.id)) + .header("Authorization", &auth_header) + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(response.status(), StatusCode::OK); + println!("โœ… Sync status API accessible after cancellation"); + + // Check database state + let final_source = state.db.get_source(user.id, source.id).await.unwrap().unwrap(); + println!("โœ… Final database status: {:?}", final_source.status); + + // Cleanup + state.db.delete_source(user.id, source.id).await.unwrap(); + state.db.delete_user(user.id).await.unwrap(); + + println!("๐ŸŽ‰ Sync status monitoring during cancellation test passed"); +} + +#[tokio::test] +async fn test_cancellation_with_unauthorized_user() { + println!("๐Ÿงช Testing cancellation with unauthorized user"); + + let state = create_test_app_state().await; + let owner_user = create_test_user(&state).await; + let unauthorized_user = create_test_user(&state).await; + let source = create_test_webdav_source(&state, owner_user.id, "Unauthorized Test Source").await; + let app = create_test_app(state.clone()); + + let unauthorized_auth_header = create_auth_header(&unauthorized_user, &state.config.jwt_secret); + + println!("โœ… Created test setup with unauthorized user"); + + // Try to cancel sync with unauthorized user + let response = app + .clone() + .oneshot( + Request::builder() + .method("POST") + .uri(format!("/api/sources/{}/sync/stop", source.id)) + .header("Authorization", &unauthorized_auth_header) + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + // Should return 404 (source not found for this user) + assert_eq!(response.status(), StatusCode::NOT_FOUND); + println!("โœ… Unauthorized cancellation properly rejected with 404"); + + // Cleanup + state.db.delete_source(owner_user.id, source.id).await.unwrap(); + state.db.delete_user(owner_user.id).await.unwrap(); + state.db.delete_user(unauthorized_user.id).await.unwrap(); + + println!("๐ŸŽ‰ Unauthorized user cancellation test passed"); +} + +#[tokio::test] +async fn test_cancellation_of_nonexistent_source() { + println!("๐Ÿงช Testing cancellation of nonexistent source"); + + let state = create_test_app_state().await; + let user = create_test_user(&state).await; + let app = create_test_app(state.clone()); + + let auth_header = create_auth_header(&user, &state.config.jwt_secret); + let nonexistent_source_id = Uuid::new_v4(); + + println!("โœ… Created test setup for nonexistent source test"); + + // Try to cancel sync for nonexistent source + let response = app + .clone() + .oneshot( + Request::builder() + .method("POST") + .uri(format!("/api/sources/{}/sync/stop", nonexistent_source_id)) + .header("Authorization", &auth_header) + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + // Should return 404 + assert_eq!(response.status(), StatusCode::NOT_FOUND); + println!("โœ… Nonexistent source cancellation properly rejected with 404"); + + // Cleanup + state.db.delete_user(user.id).await.unwrap(); + + println!("๐ŸŽ‰ Nonexistent source cancellation test passed"); +} + +#[tokio::test] +async fn test_sync_start_cancel_start_sequence() { + println!("๐Ÿงช Testing sync start -> cancel -> start sequence"); + + let state = create_test_app_state().await; + let user = create_test_user(&state).await; + let source = create_test_webdav_source(&state, user.id, "Start Cancel Start Source").await; + let app = create_test_app(state.clone()); + + let auth_header = create_auth_header(&user, &state.config.jwt_secret); + + println!("โœ… Created test setup for start-cancel-start sequence"); + + // Step 1: Start sync + let response = app + .clone() + .oneshot( + Request::builder() + .method("POST") + .uri(format!("/api/sources/{}/sync", source.id)) + .header("Authorization", &auth_header) + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + println!("โœ… First sync start: {}", response.status()); + + // Step 2: Wait briefly then cancel + sleep(Duration::from_millis(300)).await; + + let response = app + .clone() + .oneshot( + Request::builder() + .method("POST") + .uri(format!("/api/sources/{}/sync/stop", source.id)) + .header("Authorization", &auth_header) + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(response.status(), StatusCode::OK); + println!("โœ… Sync cancellation successful"); + + // Step 3: Wait for cancellation to complete + sleep(Duration::from_millis(1000)).await; + + // Step 4: Start sync again + let response = app + .clone() + .oneshot( + Request::builder() + .method("POST") + .uri(format!("/api/sources/{}/sync", source.id)) + .header("Authorization", &auth_header) + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + // Should succeed or return expected error status + let status = response.status(); + assert!(matches!(status, StatusCode::OK | StatusCode::CONFLICT | StatusCode::INTERNAL_SERVER_ERROR)); + println!("โœ… Second sync start after cancellation: {}", status); + + // Step 5: Cancel the second sync + sleep(Duration::from_millis(300)).await; + + let response = app + .clone() + .oneshot( + Request::builder() + .method("POST") + .uri(format!("/api/sources/{}/sync/stop", source.id)) + .header("Authorization", &auth_header) + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(response.status(), StatusCode::OK); + println!("โœ… Second cancellation successful"); + + // Verify final state + sleep(Duration::from_millis(1000)).await; + let final_source = state.db.get_source(user.id, source.id).await.unwrap().unwrap(); + println!("โœ… Final source status: {:?}", final_source.status); + + // Cleanup + state.db.delete_source(user.id, source.id).await.unwrap(); + state.db.delete_user(user.id).await.unwrap(); + + println!("๐ŸŽ‰ Start-cancel-start sequence test passed"); +} + +/// Test that validates sync actually stops working, not just changes status +#[tokio::test] +async fn test_sync_actually_stops_working() { + println!("๐Ÿงช Testing that sync cancellation actually stops sync work"); + + let state = create_test_app_state().await; + let user = create_test_user(&state).await; + let source = create_test_webdav_source(&state, user.id, "Actual Stop Test Source").await; + let app = create_test_app(state.clone()); + + let auth_header = create_auth_header(&user, &state.config.jwt_secret); + + println!("โœ… Created test setup for actual sync stop validation"); + + // First check that progress tracker shows no active syncs + let initial_active_syncs = state.sync_progress_tracker.get_active_source_ids(); + assert!(initial_active_syncs.is_empty(), "Should have no active syncs initially"); + assert!(!state.sync_progress_tracker.is_syncing(source.id), "Source should not be syncing initially"); + + // Step 1: Start sync and verify it's actually registered as active + let response = app + .clone() + .oneshot( + Request::builder() + .method("POST") + .uri(format!("/api/sources/{}/sync", source.id)) + .header("Authorization", &auth_header) + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + println!("โœ… Sync start response: {}", response.status()); + + // Step 2: Wait for sync to actually start and be registered + let mut sync_became_active = false; + for attempt in 1..=20 { // Wait up to 2 seconds + sleep(Duration::from_millis(100)).await; + + if state.sync_progress_tracker.is_syncing(source.id) { + sync_became_active = true; + println!("โœ… Sync became active after {} attempts ({}ms)", attempt, attempt * 100); + break; + } + } + + // Verify sync actually became active + if !sync_became_active { + println!("โš ๏ธ Sync never became active - checking database status"); + let db_source = state.db.get_source(user.id, source.id).await.unwrap().unwrap(); + println!("Database source status: {:?}", db_source.status); + + // If sync didn't start due to no scheduler or other issues, that's fine for this test + // The important part is that we test stopping when sync IS active + if db_source.status != SourceStatus::Syncing { + println!("โš ๏ธ Skipping actual stop test - sync never started (likely no scheduler available)"); + // Cleanup + state.db.delete_source(user.id, source.id).await.unwrap(); + state.db.delete_user(user.id).await.unwrap(); + return; + } + } + + // Step 3: Verify sync is tracked in multiple places + let active_syncs_before_stop = state.sync_progress_tracker.get_active_source_ids(); + println!("๐Ÿ“Š Active syncs before stop: {:?}", active_syncs_before_stop); + + let progress_before_stop = state.sync_progress_tracker.get_progress(source.id); + println!("๐Ÿ“Š Progress before stop: {:?}", progress_before_stop); + + let db_source_before_stop = state.db.get_source(user.id, source.id).await.unwrap().unwrap(); + println!("๐Ÿ“Š Database status before stop: {:?}", db_source_before_stop.status); + + // Step 4: Stop the sync + let stop_response = app + .clone() + .oneshot( + Request::builder() + .method("POST") + .uri(format!("/api/sources/{}/sync/stop", source.id)) + .header("Authorization", &auth_header) + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(stop_response.status(), StatusCode::OK); + println!("โœ… Stop sync request successful"); + + // Step 5: Verify sync actually stops working + // Check progress tracker immediately (should be unregistered) + let progress_after_stop_immediate = state.sync_progress_tracker.get_progress(source.id); + println!("๐Ÿ“Š Progress immediately after stop: {:?}", progress_after_stop_immediate); + + // Wait a bit for all cleanup to complete + sleep(Duration::from_millis(500)).await; + + let active_syncs_after_stop = state.sync_progress_tracker.get_active_source_ids(); + println!("๐Ÿ“Š Active syncs after stop: {:?}", active_syncs_after_stop); + + let progress_after_stop = state.sync_progress_tracker.get_progress(source.id); + println!("๐Ÿ“Š Progress after stop with delay: {:?}", progress_after_stop); + + let db_source_after_stop = state.db.get_source(user.id, source.id).await.unwrap().unwrap(); + println!("๐Ÿ“Š Database status after stop: {:?}", db_source_after_stop.status); + + // Step 6: Assertions to verify sync actually stopped + + // The source should no longer be tracked as actively syncing + assert!(!state.sync_progress_tracker.is_syncing(source.id), + "Source should not be tracked as syncing after stop"); + + // The source should not be in the active syncs list + assert!(!active_syncs_after_stop.contains(&source.id), + "Source should not be in active syncs list after stop"); + + // Database status should be Idle (not Syncing) + assert_eq!(db_source_after_stop.status, SourceStatus::Idle, + "Database status should be Idle after stop"); + + // Progress should either be None or show as not active + if let Some(progress) = progress_after_stop { + assert!(!progress.is_active, "Progress should show as not active after stop"); + } + + println!("โœ… All sync stop validations passed"); + + // Step 7: Test that sync can be restarted after stop + sleep(Duration::from_millis(1000)).await; // Wait for complete cleanup + + let restart_response = app + .clone() + .oneshot( + Request::builder() + .method("POST") + .uri(format!("/api/sources/{}/sync", source.id)) + .header("Authorization", &auth_header) + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + println!("โœ… Restart after stop response: {}", restart_response.status()); + + // Stop the restarted sync for cleanup + sleep(Duration::from_millis(200)).await; + let final_stop_response = app + .clone() + .oneshot( + Request::builder() + .method("POST") + .uri(format!("/api/sources/{}/sync/stop", source.id)) + .header("Authorization", &auth_header) + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + println!("โœ… Final stop response: {}", final_stop_response.status()); + + // Cleanup + sleep(Duration::from_millis(500)).await; + state.db.delete_source(user.id, source.id).await.unwrap(); + state.db.delete_user(user.id).await.unwrap(); + + println!("๐ŸŽ‰ Actual sync stop validation test passed"); +} + +/// Test that validates sync cancellation during different phases +#[tokio::test] +async fn test_sync_cancellation_during_different_phases() { + println!("๐Ÿงช Testing sync cancellation during different phases"); + + let state = create_test_app_state().await; + let user = create_test_user(&state).await; + let source = create_test_webdav_source(&state, user.id, "Phase Cancellation Test Source").await; + let app = create_test_app(state.clone()); + + let auth_header = create_auth_header(&user, &state.config.jwt_secret); + + println!("โœ… Created test setup for phase-based cancellation"); + + // Test cancellation at different timing intervals to catch different phases + let test_delays = vec![50, 150, 300, 500]; // Different delays in milliseconds + + for (i, delay_ms) in test_delays.iter().enumerate() { + println!("๐Ÿ”„ Testing cancellation after {}ms delay (iteration {})", delay_ms, i + 1); + + // Start sync + let response = app + .clone() + .oneshot( + Request::builder() + .method("POST") + .uri(format!("/api/sources/{}/sync", source.id)) + .header("Authorization", &auth_header) + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + println!(" ๐Ÿ“ก Sync start status: {}", response.status()); + + // Wait for the specified delay to let sync progress + sleep(Duration::from_millis(*delay_ms)).await; + + // Check what phase we might be in (if any) + let progress_info = state.sync_progress_tracker.get_progress(source.id); + if let Some(progress) = &progress_info { + println!(" ๐Ÿ“Š Cancelling during phase: {} ({})", progress.phase, progress.phase_description); + } else { + println!(" ๐Ÿ“Š No progress info available - sync may not have started or already completed"); + } + + // Cancel the sync + let cancel_response = app + .clone() + .oneshot( + Request::builder() + .method("POST") + .uri(format!("/api/sources/{}/sync/stop", source.id)) + .header("Authorization", &auth_header) + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(cancel_response.status(), StatusCode::OK); + println!(" โœ… Cancellation successful"); + + // Verify cleanup + sleep(Duration::from_millis(300)).await; + + assert!(!state.sync_progress_tracker.is_syncing(source.id), + "Source should not be syncing after cancellation in iteration {}", i + 1); + + let db_source = state.db.get_source(user.id, source.id).await.unwrap().unwrap(); + assert_eq!(db_source.status, SourceStatus::Idle, + "Source should be idle after cancellation in iteration {}", i + 1); + + println!(" โœ… Cleanup verified for iteration {}", i + 1); + + // Wait before next iteration to ensure complete cleanup + sleep(Duration::from_millis(500)).await; + } + + // Cleanup + state.db.delete_source(user.id, source.id).await.unwrap(); + state.db.delete_user(user.id).await.unwrap(); + + println!("๐ŸŽ‰ Phase-based cancellation test passed"); +} + +/// Test resource cleanup validation after sync cancellation +#[tokio::test] +async fn test_resource_cleanup_after_cancellation() { + println!("๐Ÿงช Testing resource cleanup after sync cancellation"); + + let state = create_test_app_state().await; + let user = create_test_user(&state).await; + let source = create_test_webdav_source(&state, user.id, "Resource Cleanup Test Source").await; + let app = create_test_app(state.clone()); + + let auth_header = create_auth_header(&user, &state.config.jwt_secret); + + println!("โœ… Created test setup for resource cleanup validation"); + + // Record initial state + let initial_active_syncs = state.sync_progress_tracker.get_active_source_ids(); + let initial_progress = state.sync_progress_tracker.get_progress(source.id); + + println!("๐Ÿ“Š Initial active syncs: {:?}", initial_active_syncs); + println!("๐Ÿ“Š Initial progress: {:?}", initial_progress); + + // Start sync + let response = app + .clone() + .oneshot( + Request::builder() + .method("POST") + .uri(format!("/api/sources/{}/sync", source.id)) + .header("Authorization", &auth_header) + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + println!("โœ… Sync started with status: {}", response.status()); + + // Wait for sync to become active + sleep(Duration::from_millis(200)).await; + + // Record active state + let active_syncs_during = state.sync_progress_tracker.get_active_source_ids(); + let progress_during = state.sync_progress_tracker.get_progress(source.id); + let is_syncing_during = state.sync_progress_tracker.is_syncing(source.id); + + println!("๐Ÿ“Š Active syncs during: {:?}", active_syncs_during); + println!("๐Ÿ“Š Progress during: {:?}", progress_during); + println!("๐Ÿ“Š Is syncing during: {}", is_syncing_during); + + // Cancel sync + let cancel_response = app + .clone() + .oneshot( + Request::builder() + .method("POST") + .uri(format!("/api/sources/{}/sync/stop", source.id)) + .header("Authorization", &auth_header) + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(cancel_response.status(), StatusCode::OK); + println!("โœ… Sync cancellation successful"); + + // Wait for cleanup to complete + sleep(Duration::from_millis(1000)).await; + + // Verify complete cleanup + let final_active_syncs = state.sync_progress_tracker.get_active_source_ids(); + let final_progress = state.sync_progress_tracker.get_progress(source.id); + let is_syncing_final = state.sync_progress_tracker.is_syncing(source.id); + let db_source_final = state.db.get_source(user.id, source.id).await.unwrap().unwrap(); + + println!("๐Ÿ“Š Final active syncs: {:?}", final_active_syncs); + println!("๐Ÿ“Š Final progress: {:?}", final_progress); + println!("๐Ÿ“Š Is syncing final: {}", is_syncing_final); + println!("๐Ÿ“Š Final DB status: {:?}", db_source_final.status); + + // Assertions for complete cleanup + assert!(!final_active_syncs.contains(&source.id), + "Source should be removed from active syncs list"); + + assert!(!is_syncing_final, + "Progress tracker should not show source as syncing"); + + assert_eq!(db_source_final.status, SourceStatus::Idle, + "Database should show source as Idle"); + + // If progress exists, it should not be active + if let Some(progress) = final_progress { + assert!(!progress.is_active, "Any remaining progress should show as inactive"); + } + + // Test multiple rapid start/stop cycles to stress test cleanup + println!("๐Ÿ”„ Testing rapid start/stop cycles"); + + for cycle in 1..=3 { + println!(" ๐Ÿ”„ Cycle {}", cycle); + + // Start + let start_response = app + .clone() + .oneshot( + Request::builder() + .method("POST") + .uri(format!("/api/sources/{}/sync", source.id)) + .header("Authorization", &auth_header) + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + println!(" ๐Ÿ“ก Start: {}", start_response.status()); + + // Brief wait + sleep(Duration::from_millis(100)).await; + + // Stop + let stop_response = app + .clone() + .oneshot( + Request::builder() + .method("POST") + .uri(format!("/api/sources/{}/sync/stop", source.id)) + .header("Authorization", &auth_header) + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + println!(" ๐Ÿ›‘ Stop: {}", stop_response.status()); + + // Verify cleanup after each cycle + sleep(Duration::from_millis(300)).await; + + assert!(!state.sync_progress_tracker.is_syncing(source.id), + "Source should not be syncing after cycle {}", cycle); + + let db_check = state.db.get_source(user.id, source.id).await.unwrap().unwrap(); + assert_eq!(db_check.status, SourceStatus::Idle, + "Source should be idle after cycle {}", cycle); + } + + println!("โœ… Rapid cycle cleanup verified"); + + // Cleanup + state.db.delete_source(user.id, source.id).await.unwrap(); + state.db.delete_user(user.id).await.unwrap(); + + println!("๐ŸŽ‰ Resource cleanup validation test passed"); +} + +/// Test that validates cancellation token propagation through sync layers +#[tokio::test] +async fn test_cancellation_token_propagation() { + println!("๐Ÿงช Testing cancellation token propagation through sync layers"); + + let state = create_test_app_state().await; + let user = create_test_user(&state).await; + let source = create_test_webdav_source(&state, user.id, "Token Propagation Test Source").await; + let app = create_test_app(state.clone()); + + let auth_header = create_auth_header(&user, &state.config.jwt_secret); + + // Create multiple sources to test concurrent cancellation handling + let source2 = create_test_webdav_source(&state, user.id, "Second Token Test Source").await; + let source3 = create_test_webdav_source(&state, user.id, "Third Token Test Source").await; + + println!("โœ… Created test setup with multiple sources for token propagation"); + + // Start multiple syncs concurrently + let sync_futures = vec![ + app.clone().oneshot( + Request::builder() + .method("POST") + .uri(format!("/api/sources/{}/sync", source.id)) + .header("Authorization", &auth_header) + .body(Body::empty()) + .unwrap(), + ), + app.clone().oneshot( + Request::builder() + .method("POST") + .uri(format!("/api/sources/{}/sync", source2.id)) + .header("Authorization", &auth_header) + .body(Body::empty()) + .unwrap(), + ), + app.clone().oneshot( + Request::builder() + .method("POST") + .uri(format!("/api/sources/{}/sync", source3.id)) + .header("Authorization", &auth_header) + .body(Body::empty()) + .unwrap(), + ), + ]; + + let results = futures::future::join_all(sync_futures).await; + for (i, result) in results.iter().enumerate() { + if let Ok(response) = result { + println!("โœ… Source {} sync start: {}", i + 1, response.status()); + } + } + + // Wait for syncs to potentially start + sleep(Duration::from_millis(300)).await; + + // Record which sources are actually active + let active_before = state.sync_progress_tracker.get_active_source_ids(); + println!("๐Ÿ“Š Active syncs before cancellation: {:?}", active_before); + + // Test individual cancellation (should only affect specific source) + let cancel_response = app + .clone() + .oneshot( + Request::builder() + .method("POST") + .uri(format!("/api/sources/{}/sync/stop", source.id)) + .header("Authorization", &auth_header) + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(cancel_response.status(), StatusCode::OK); + println!("โœ… Individual source cancellation successful"); + + // Wait for cancellation to propagate + sleep(Duration::from_millis(500)).await; + + // Verify that only the cancelled source stopped + let active_after_individual = state.sync_progress_tracker.get_active_source_ids(); + println!("๐Ÿ“Š Active syncs after individual cancellation: {:?}", active_after_individual); + + // The cancelled source should not be active + assert!(!state.sync_progress_tracker.is_syncing(source.id), + "Cancelled source should not be syncing"); + + // Other sources might still be active (depending on implementation) + // The key test is that the cancellation was isolated to the correct source + + // Cancel the remaining sources + let remaining_cancel_futures = vec![ + app.clone().oneshot( + Request::builder() + .method("POST") + .uri(format!("/api/sources/{}/sync/stop", source2.id)) + .header("Authorization", &auth_header) + .body(Body::empty()) + .unwrap(), + ), + app.clone().oneshot( + Request::builder() + .method("POST") + .uri(format!("/api/sources/{}/sync/stop", source3.id)) + .header("Authorization", &auth_header) + .body(Body::empty()) + .unwrap(), + ), + ]; + + let cancel_results = futures::future::join_all(remaining_cancel_futures).await; + for (i, result) in cancel_results.iter().enumerate() { + if let Ok(response) = result { + println!("โœ… Remaining source {} cancel: {}", i + 2, response.status()); + } + } + + // Wait for all cancellations to complete + sleep(Duration::from_millis(1000)).await; + + // Verify all sources are now idle + let final_active = state.sync_progress_tracker.get_active_source_ids(); + println!("๐Ÿ“Š Final active syncs: {:?}", final_active); + + assert!(!state.sync_progress_tracker.is_syncing(source.id), "Source 1 should not be syncing"); + assert!(!state.sync_progress_tracker.is_syncing(source2.id), "Source 2 should not be syncing"); + assert!(!state.sync_progress_tracker.is_syncing(source3.id), "Source 3 should not be syncing"); + + // Verify database states + let db_sources = vec![ + state.db.get_source(user.id, source.id).await.unwrap().unwrap(), + state.db.get_source(user.id, source2.id).await.unwrap().unwrap(), + state.db.get_source(user.id, source3.id).await.unwrap().unwrap(), + ]; + + for (i, db_source) in db_sources.iter().enumerate() { + assert_eq!(db_source.status, SourceStatus::Idle, + "Database source {} should be idle", i + 1); + println!("๐Ÿ“Š Database source {} status: {:?}", i + 1, db_source.status); + } + + // Cleanup + state.db.delete_source(user.id, source.id).await.unwrap(); + state.db.delete_source(user.id, source2.id).await.unwrap(); + state.db.delete_source(user.id, source3.id).await.unwrap(); + state.db.delete_user(user.id).await.unwrap(); + + println!("๐ŸŽ‰ Cancellation token propagation test passed"); +} + +/// Comprehensive test that validates the complete sync cancellation workflow +/// This is the main test that covers all aspects of sync cancellation +#[tokio::test] +async fn test_comprehensive_sync_cancellation_validation() { + println!("๐Ÿงช COMPREHENSIVE TEST: Complete sync cancellation validation"); + + let state = create_test_app_state().await; + let user = create_test_user(&state).await; + let source = create_test_webdav_source(&state, user.id, "Comprehensive Cancellation Test").await; + let app = create_test_app(state.clone()); + + let auth_header = create_auth_header(&user, &state.config.jwt_secret); + + println!("โœ… Created comprehensive test environment"); + + // PHASE 1: Validate initial state + println!("๐Ÿ“ PHASE 1: Initial state validation"); + + let initial_db_source = state.db.get_source(user.id, source.id).await.unwrap().unwrap(); + let initial_active_syncs = state.sync_progress_tracker.get_active_source_ids(); + let initial_is_syncing = state.sync_progress_tracker.is_syncing(source.id); + let initial_progress = state.sync_progress_tracker.get_progress(source.id); + + assert_eq!(initial_db_source.status, SourceStatus::Idle, "Initial DB status should be Idle"); + assert!(initial_active_syncs.is_empty(), "Initial active syncs should be empty"); + assert!(!initial_is_syncing, "Initial sync state should be false"); + assert!(initial_progress.is_none(), "Initial progress should be None"); + + println!("โœ… PHASE 1 PASSED: All initial states correct"); + + // PHASE 2: Start sync and validate activation + println!("๐Ÿ“ PHASE 2: Sync activation validation"); + + let start_response = app + .clone() + .oneshot( + Request::builder() + .method("POST") + .uri(format!("/api/sources/{}/sync", source.id)) + .header("Authorization", &auth_header) + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + println!("๐Ÿ“ก Sync start response: {}", start_response.status()); + + // Wait for sync to activate and check multiple indicators + let mut sync_activation_verified = false; + for attempt in 1..=30 { // Wait up to 3 seconds + sleep(Duration::from_millis(100)).await; + + let db_source = state.db.get_source(user.id, source.id).await.unwrap().unwrap(); + let is_syncing = state.sync_progress_tracker.is_syncing(source.id); + let active_syncs = state.sync_progress_tracker.get_active_source_ids(); + + if db_source.status == SourceStatus::Syncing || is_syncing || active_syncs.contains(&source.id) { + sync_activation_verified = true; + println!("โœ… Sync activation verified after {} attempts:", attempt); + println!(" ๐Ÿ“Š DB Status: {:?}", db_source.status); + println!(" ๐Ÿ“Š Is Syncing: {}", is_syncing); + println!(" ๐Ÿ“Š Active Syncs: {:?}", active_syncs); + break; + } + } + + if !sync_activation_verified { + println!("โš ๏ธ PHASE 2 CONDITIONAL PASS: Sync never activated (likely no scheduler)"); + // Cleanup and exit gracefully + state.db.delete_source(user.id, source.id).await.unwrap(); + state.db.delete_user(user.id).await.unwrap(); + return; + } + + println!("โœ… PHASE 2 PASSED: Sync activation verified"); + + // PHASE 3: Validate active sync state across all systems + println!("๐Ÿ“ PHASE 3: Active sync state validation"); + + let active_db_source = state.db.get_source(user.id, source.id).await.unwrap().unwrap(); + let active_is_syncing = state.sync_progress_tracker.is_syncing(source.id); + let active_syncs_list = state.sync_progress_tracker.get_active_source_ids(); + let active_progress = state.sync_progress_tracker.get_progress(source.id); + + println!("๐Ÿ“Š Active state summary:"); + println!(" ๐Ÿ“Š DB Status: {:?}", active_db_source.status); + println!(" ๐Ÿ“Š Is Syncing: {}", active_is_syncing); + println!(" ๐Ÿ“Š Active Syncs: {:?}", active_syncs_list); + println!(" ๐Ÿ“Š Progress Active: {:?}", active_progress.as_ref().map(|p| p.is_active)); + + // At least one indicator should show sync is active + let sync_indicators_active = active_db_source.status == SourceStatus::Syncing || + active_is_syncing || + active_syncs_list.contains(&source.id); + + assert!(sync_indicators_active, "At least one sync indicator should show active state"); + + println!("โœ… PHASE 3 PASSED: Active sync state validated"); + + // PHASE 4: Cancel sync and validate immediate response + println!("๐Ÿ“ PHASE 4: Sync cancellation execution"); + + let cancel_response = app + .clone() + .oneshot( + Request::builder() + .method("POST") + .uri(format!("/api/sources/{}/sync/stop", source.id)) + .header("Authorization", &auth_header) + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(cancel_response.status(), StatusCode::OK); + println!("โœ… PHASE 4 PASSED: Cancellation request successful"); + + // PHASE 5: Validate cancellation propagation and cleanup + println!("๐Ÿ“ PHASE 5: Cancellation cleanup validation"); + + // Check immediate state (some cleanup might be instant) + let immediate_is_syncing = state.sync_progress_tracker.is_syncing(source.id); + let immediate_active_syncs = state.sync_progress_tracker.get_active_source_ids(); + + println!("๐Ÿ“Š Immediate post-cancel state:"); + println!(" ๐Ÿ“Š Is Syncing: {}", immediate_is_syncing); + println!(" ๐Ÿ“Š Active Syncs: {:?}", immediate_active_syncs); + + // Wait for complete cleanup + sleep(Duration::from_millis(1500)).await; + + let final_db_source = state.db.get_source(user.id, source.id).await.unwrap().unwrap(); + let final_is_syncing = state.sync_progress_tracker.is_syncing(source.id); + let final_active_syncs = state.sync_progress_tracker.get_active_source_ids(); + let final_progress = state.sync_progress_tracker.get_progress(source.id); + + println!("๐Ÿ“Š Final post-cancel state:"); + println!(" ๐Ÿ“Š DB Status: {:?}", final_db_source.status); + println!(" ๐Ÿ“Š Is Syncing: {}", final_is_syncing); + println!(" ๐Ÿ“Š Active Syncs: {:?}", final_active_syncs); + println!(" ๐Ÿ“Š Progress: {:?}", final_progress.as_ref().map(|p| (p.is_active, &p.phase))); + + // CRITICAL ASSERTIONS: These must all pass for proper cancellation + + assert_eq!(final_db_source.status, SourceStatus::Idle, + "CRITICAL: Database status must be Idle after cancellation"); + + assert!(!final_is_syncing, + "CRITICAL: Progress tracker must not show source as syncing"); + + assert!(!final_active_syncs.contains(&source.id), + "CRITICAL: Source must not be in active syncs list"); + + if let Some(progress) = final_progress { + assert!(!progress.is_active, + "CRITICAL: Any remaining progress must show as inactive"); + } + + println!("โœ… PHASE 5 PASSED: Complete cancellation cleanup verified"); + + // PHASE 6: Validate restart capability after cancellation + println!("๐Ÿ“ PHASE 6: Post-cancellation restart validation"); + + sleep(Duration::from_millis(500)).await; // Ensure complete cleanup + + let restart_response = app + .clone() + .oneshot( + Request::builder() + .method("POST") + .uri(format!("/api/sources/{}/sync", source.id)) + .header("Authorization", &auth_header) + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + println!("๐Ÿ“ก Restart response: {}", restart_response.status()); + + // The restart should succeed (or fail with expected reasons, not due to lingering state) + let acceptable_restart_statuses = [StatusCode::OK, StatusCode::CONFLICT, + StatusCode::INTERNAL_SERVER_ERROR, StatusCode::NOT_IMPLEMENTED]; + assert!(acceptable_restart_statuses.contains(&restart_response.status()), + "Restart should succeed or fail with expected status, got: {}", restart_response.status()); + + // Clean up the restarted sync + sleep(Duration::from_millis(200)).await; + let final_cleanup_response = app + .clone() + .oneshot( + Request::builder() + .method("POST") + .uri(format!("/api/sources/{}/sync/stop", source.id)) + .header("Authorization", &auth_header) + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + println!("๐Ÿ“ก Final cleanup response: {}", final_cleanup_response.status()); + + println!("โœ… PHASE 6 PASSED: Restart capability validated"); + + // Final cleanup + sleep(Duration::from_millis(500)).await; + state.db.delete_source(user.id, source.id).await.unwrap(); + state.db.delete_user(user.id).await.unwrap(); + + println!("๐ŸŽ‰ COMPREHENSIVE TEST PASSED: Complete sync cancellation validation successful"); + println!(" โœ… All 6 phases validated successfully"); + println!(" โœ… Sync actually stops working (not just status changes)"); + println!(" โœ… Resources properly cleaned up"); + println!(" โœ… System remains in consistent state"); +} \ No newline at end of file