diff --git a/frontend/src/pages/SourcesPage.tsx b/frontend/src/pages/SourcesPage.tsx index 21b907c..4da76a5 100644 --- a/frontend/src/pages/SourcesPage.tsx +++ b/frontend/src/pages/SourcesPage.tsx @@ -69,6 +69,9 @@ import { Visibility as OcrIcon, Block as BlockIcon, FindInPage as DeepScanIcon, + HealthAndSafety as HealthIcon, + Warning as WarningIcon, + Error as CriticalIcon, } from '@mui/icons-material'; import { useNavigate } from 'react-router-dom'; import api, { queueService } from '../services/api'; @@ -92,6 +95,11 @@ interface Source { total_documents_ocr: number; created_at: string; updated_at: string; + // Validation fields + validation_status?: string | null; + last_validation_at?: string | null; + validation_score?: number | null; + validation_issues?: string | null; } interface SnackbarState { @@ -152,7 +160,7 @@ const SourcesPage: React.FC = () => { const [testingConnection, setTestingConnection] = useState(false); const [syncingSource, setSyncingSource] = useState(null); const [stoppingSync, setStoppingSync] = useState(null); - const [deepScanning, setDeepScanning] = useState(null); + const [validating, setValidating] = useState(null); const [autoRefreshing, setAutoRefreshing] = useState(false); useEffect(() => { @@ -490,31 +498,83 @@ const SourcesPage: React.FC = () => { } }; - const handleDeepScan = async (sourceId: string) => { - setDeepScanning(sourceId); + const handleValidation = async (sourceId: string) => { + setValidating(sourceId); try { - const response = await api.post(`/sources/${sourceId}/deep-scan`); + const response = await api.post(`/sources/${sourceId}/validate`); if (response.data.success) { - showSnackbar(response.data.message || 'Deep scan started successfully', 'success'); - setTimeout(loadSources, 1000); + showSnackbar(response.data.message || 'Validation check started successfully', 'success'); + setTimeout(loadSources, 2000); // Reload after 2 seconds to show updated status } else { - showSnackbar(response.data.message || 'Failed to start deep scan', 'error'); + showSnackbar(response.data.message || 'Failed to start validation check', 'error'); } } catch (error: any) { - console.error('Failed to trigger deep scan:', error); - if (error.response?.status === 409) { - showSnackbar('Source is already syncing', 'warning'); - } else if (error.response?.status === 404) { - showSnackbar('Source not found', 'error'); - } else { - const message = error.response?.data?.message || 'Failed to start deep scan'; - showSnackbar(message, 'error'); - } + console.error('Failed to trigger validation:', error); + const message = error.response?.data?.message || 'Failed to start validation check'; + showSnackbar(message, 'error'); } finally { - setDeepScanning(null); + setValidating(null); } }; + // Helper function to render validation status + const renderValidationStatus = (source: Source) => { + const validationStatus = source.validation_status; + const validationScore = source.validation_score; + const lastValidationAt = source.last_validation_at; + + let statusColor = theme.palette.grey[500]; + let StatusIcon = HealthIcon; + let statusText = 'Unknown'; + let tooltipText = 'Validation status unknown'; + + if (validationStatus === 'healthy') { + statusColor = theme.palette.success.main; + StatusIcon = CheckCircleIcon; + statusText = 'Healthy'; + tooltipText = `Health score: ${validationScore || 'N/A'}`; + } else if (validationStatus === 'warning') { + statusColor = theme.palette.warning.main; + StatusIcon = WarningIcon; + statusText = 'Warning'; + tooltipText = `Health score: ${validationScore || 'N/A'} - Issues detected`; + } else if (validationStatus === 'critical') { + statusColor = theme.palette.error.main; + StatusIcon = CriticalIcon; + statusText = 'Critical'; + tooltipText = `Health score: ${validationScore || 'N/A'} - Critical issues`; + } else if (validationStatus === 'validating') { + statusColor = theme.palette.info.main; + StatusIcon = HealthIcon; + statusText = 'Validating'; + tooltipText = 'Validation check in progress'; + } + + if (lastValidationAt) { + const lastValidation = new Date(lastValidationAt); + tooltipText += `\nLast checked: ${formatDistanceToNow(lastValidation)} ago`; + } + + return ( + + } + label={statusText} + size="small" + sx={{ + bgcolor: alpha(statusColor, 0.1), + color: statusColor, + borderColor: statusColor, + border: '1px solid', + '& .MuiChip-icon': { + color: statusColor, + }, + }} + /> + + ); + }; + // Utility functions for folder management const addFolder = () => { if (newFolder && !formData.watch_folders.includes(newFolder)) { @@ -864,25 +924,28 @@ const SourcesPage: React.FC = () => { )} - - + {/* Validation Status Display */} + + {renderValidationStatus(source)} + handleDeepScan(source.id)} - disabled={deepScanning === source.id || source.status === 'syncing' || !source.enabled} + onClick={() => handleValidation(source.id)} + disabled={validating === source.id || source.status === 'syncing' || !source.enabled} + size="small" sx={{ - bgcolor: alpha(theme.palette.secondary.main, 0.1), - '&:hover': { bgcolor: alpha(theme.palette.secondary.main, 0.2) }, - color: theme.palette.secondary.main, + bgcolor: alpha(theme.palette.info.main, 0.1), + '&:hover': { bgcolor: alpha(theme.palette.info.main, 0.2) }, + color: theme.palette.info.main, }} > - {deepScanning === source.id ? ( - + {validating === source.id ? ( + ) : ( - + )} - - + + handleEditSource(source)} diff --git a/migrations/20250703000002_add_source_validation_fields.sql b/migrations/20250703000002_add_source_validation_fields.sql new file mode 100644 index 0000000..c9f5ff1 --- /dev/null +++ b/migrations/20250703000002_add_source_validation_fields.sql @@ -0,0 +1,16 @@ +-- Add validation status fields to sources table +ALTER TABLE sources +ADD COLUMN validation_status TEXT DEFAULT NULL, +ADD COLUMN last_validation_at TIMESTAMP WITH TIME ZONE DEFAULT NULL, +ADD COLUMN validation_score INTEGER DEFAULT NULL CHECK (validation_score >= 0 AND validation_score <= 100), +ADD COLUMN validation_issues TEXT DEFAULT NULL; + +-- Create index for querying validation status +CREATE INDEX idx_sources_validation_status ON sources (validation_status); +CREATE INDEX idx_sources_last_validation_at ON sources (last_validation_at); + +-- Add comments for documentation +COMMENT ON COLUMN sources.validation_status IS 'Current validation status: "healthy", "warning", "critical", "validating", or NULL'; +COMMENT ON COLUMN sources.last_validation_at IS 'Timestamp of the last validation check'; +COMMENT ON COLUMN sources.validation_score IS 'Health score from 0-100, where 100 is perfect health'; +COMMENT ON COLUMN sources.validation_issues IS 'JSON array of validation issues and recommendations'; \ No newline at end of file diff --git a/src/db/sources.rs b/src/db/sources.rs index 9c25ae4..6d46f8e 100644 --- a/src/db/sources.rs +++ b/src/db/sources.rs @@ -43,6 +43,10 @@ impl Database { total_size_bytes: row.get("total_size_bytes"), created_at: row.get("created_at"), updated_at: row.get("updated_at"), + validation_status: row.get("validation_status"), + last_validation_at: row.get("last_validation_at"), + validation_score: row.get("validation_score"), + validation_issues: row.get("validation_issues"), }) } @@ -103,6 +107,10 @@ impl Database { total_size_bytes: row.get("total_size_bytes"), created_at: row.get("created_at"), updated_at: row.get("updated_at"), + validation_status: row.get("validation_status"), + last_validation_at: row.get("last_validation_at"), + validation_score: row.get("validation_score"), + validation_issues: row.get("validation_issues"), }); } @@ -164,6 +172,10 @@ impl Database { total_size_bytes: row.get("total_size_bytes"), created_at: row.get("created_at"), updated_at: row.get("updated_at"), + validation_status: row.get("validation_status"), + last_validation_at: row.get("last_validation_at"), + validation_score: row.get("validation_score"), + validation_issues: row.get("validation_issues"), }) } @@ -254,6 +266,10 @@ impl Database { total_size_bytes: row.get("total_size_bytes"), created_at: row.get("created_at"), updated_at: row.get("updated_at"), + validation_status: row.get("validation_status"), + last_validation_at: row.get("last_validation_at"), + validation_score: row.get("validation_score"), + validation_issues: row.get("validation_issues"), }); } diff --git a/src/models.rs b/src/models.rs index 5ab5c15..06480af 100644 --- a/src/models.rs +++ b/src/models.rs @@ -1048,6 +1048,15 @@ pub struct Source { pub total_size_bytes: i64, pub created_at: DateTime, pub updated_at: DateTime, + // Validation status tracking + #[sqlx(default)] + pub validation_status: Option, + #[sqlx(default)] + pub last_validation_at: Option>, + #[sqlx(default)] + pub validation_score: Option, // 0-100 health score + #[sqlx(default)] + pub validation_issues: Option, // JSON array of validation issues } #[derive(Debug, Serialize, Deserialize, ToSchema)] @@ -1072,6 +1081,15 @@ pub struct SourceResponse { /// Total number of documents that have been OCR'd from this source #[serde(default)] pub total_documents_ocr: i64, + /// Validation status and health score + #[serde(default)] + pub validation_status: Option, + #[serde(default)] + pub last_validation_at: Option>, + #[serde(default)] + pub validation_score: Option, + #[serde(default)] + pub validation_issues: Option, } #[derive(Debug, Serialize, Deserialize, ToSchema)] @@ -1116,6 +1134,11 @@ impl From for SourceResponse { // These will be populated separately when needed total_documents: 0, total_documents_ocr: 0, + // Validation fields + validation_status: source.validation_status, + last_validation_at: source.last_validation_at, + validation_score: source.validation_score, + validation_issues: source.validation_issues, } } } diff --git a/src/routes/sources.rs b/src/routes/sources.rs index fca8c2f..cd13bab 100644 --- a/src/routes/sources.rs +++ b/src/routes/sources.rs @@ -23,6 +23,7 @@ pub fn router() -> Router> { .route("/{id}/sync", post(trigger_sync)) .route("/{id}/sync/stop", post(stop_sync)) .route("/{id}/deep-scan", post(trigger_deep_scan)) + .route("/{id}/validate", post(validate_source)) .route("/{id}/test", post(test_connection)) .route("/{id}/estimate", post(estimate_crawl)) .route("/estimate", post(estimate_crawl_with_config)) @@ -642,6 +643,52 @@ async fn trigger_deep_scan( } } +#[utoipa::path( + post, + path = "/api/sources/{id}/validate", + tag = "sources", + security( + ("bearer_auth" = []) + ), + params( + ("id" = Uuid, Path, description = "Source ID") + ), + responses( + (status = 200, description = "Validation started successfully"), + (status = 401, description = "Unauthorized"), + (status = 404, description = "Source not found"), + (status = 500, description = "Internal server error") + ) +)] +async fn validate_source( + auth_user: AuthUser, + Path(source_id): Path, + State(state): State>, +) -> Result, StatusCode> { + info!("Starting validation check for source {} by user {}", source_id, auth_user.user.username); + + let source = state + .db + .get_source(auth_user.user.id, source_id) + .await + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)? + .ok_or(StatusCode::NOT_FOUND)?; + + // Start validation in background + let state_clone = state.clone(); + let source_clone = source.clone(); + tokio::spawn(async move { + if let Err(e) = crate::scheduling::source_scheduler::SourceScheduler::validate_source_health(&source_clone, &state_clone).await { + error!("Manual validation check failed for source {}: {}", source_clone.name, e); + } + }); + + Ok(Json(serde_json::json!({ + "success": true, + "message": format!("Validation check started for source '{}'", source.name) + }))) +} + #[utoipa::path( post, path = "/api/sources/{id}/sync/stop", diff --git a/src/scheduling/source_scheduler.rs b/src/scheduling/source_scheduler.rs index 70b3141..04c4e70 100644 --- a/src/scheduling/source_scheduler.rs +++ b/src/scheduling/source_scheduler.rs @@ -7,6 +7,7 @@ use tokio_util::sync::CancellationToken; use tracing::{error, info, warn}; use chrono::Utc; use uuid::Uuid; +use sqlx::Row; use crate::{ AppState, @@ -14,6 +15,16 @@ use crate::{ }; use super::source_sync::SourceSyncService; +struct SyncHealthAnalysis { + score_penalty: i32, + issues: Vec, +} + +struct ErrorAnalysis { + score_penalty: i32, + issues: Vec, +} + pub struct SourceScheduler { state: Arc, sync_service: SourceSyncService, @@ -201,6 +212,11 @@ impl SourceScheduler { info!("Background sync completed for source {}: {} files processed", source_clone.name, files_processed); + // Perform automatic validation check after sync completion + if let Err(e) = Self::validate_source_health(&source_clone, &state_clone).await { + error!("Failed to perform validation check: {}", e); + } + // Update last sync time if let Err(e) = sqlx::query( r#"UPDATE sources @@ -516,4 +532,545 @@ impl SourceScheduler { } } } + + /// Check if a deep scan should be triggered based on sync results + async fn check_and_trigger_deep_scan( + source: &crate::models::Source, + files_processed: usize, + state: &Arc, + ) -> Result<(), Box> { + // Get sync history for intelligent decision making + let recent_syncs = sqlx::query( + r#" + SELECT + COUNT(*) as sync_count, + SUM(CASE WHEN total_files_synced = 0 THEN 1 ELSE 0 END) as empty_sync_count, + MAX(last_sync_at) as last_sync, + MIN(last_sync_at) as first_sync + FROM ( + SELECT total_files_synced, last_sync_at + FROM sources + WHERE id = $1 + ORDER BY last_sync_at DESC + LIMIT 10 + ) recent_syncs + "# + ) + .bind(source.id) + .fetch_one(state.db.get_pool()) + .await?; + + // Get last deep scan time + let last_deep_scan = sqlx::query( + r#" + SELECT MAX(created_at) as last_deep_scan + FROM notifications + WHERE user_id = $1 + AND metadata->>'source_id' = $2 + AND metadata->>'scan_type' = 'deep_scan' + AND notification_type = 'success' + "# + ) + .bind(source.user_id) + .bind(source.id.to_string()) + .fetch_one(state.db.get_pool()) + .await?; + + let mut should_trigger_deep_scan = false; + let mut reason = String::new(); + + // Trigger conditions: + + // 1. If the last 5+ syncs found no files, something might be wrong + let empty_sync_count: i64 = recent_syncs.try_get("empty_sync_count").unwrap_or(0); + if empty_sync_count >= 5 { + should_trigger_deep_scan = true; + reason = "Multiple consecutive syncs found no files - deep scan needed to verify directory structure".to_string(); + } + + // 2. If we haven't done a deep scan in over 7 days + let last_deep_time: Option> = last_deep_scan.try_get("last_deep_scan").ok(); + if let Some(last_deep) = last_deep_time { + let days_since_deep_scan = (chrono::Utc::now() - last_deep).num_days(); + if days_since_deep_scan > 7 { + should_trigger_deep_scan = true; + reason = format!("No deep scan in {} days - periodic verification needed", days_since_deep_scan); + } + } + + // 3. If this is the first sync ever (no deep scan history) + let sync_count: i64 = recent_syncs.try_get("sync_count").unwrap_or(0); + if last_deep_time.is_none() && sync_count <= 1 { + should_trigger_deep_scan = true; + reason = "First sync completed - deep scan recommended for initial directory discovery".to_string(); + } + + // 4. If sync found files but we've been getting inconsistent results + else if files_processed > 0 { + // Check for erratic sync patterns (alternating between finding files and not) + let erratic_check = sqlx::query( + r#" + SELECT + COUNT(DISTINCT CASE WHEN total_files_synced > 0 THEN 1 ELSE 0 END) as distinct_states + FROM ( + SELECT total_files_synced + FROM sources + WHERE id = $1 + ORDER BY last_sync_at DESC + LIMIT 5 + ) recent + "# + ) + .bind(source.id) + .fetch_one(state.db.get_pool()) + .await?; + + let distinct_states: i64 = erratic_check.try_get("distinct_states").unwrap_or(0); + if distinct_states > 1 { + should_trigger_deep_scan = true; + reason = "Inconsistent sync results detected - deep scan needed for stability".to_string(); + } + } + + if should_trigger_deep_scan { + info!("🎯 Intelligent deep scan trigger activated for source {}: {}", source.name, reason); + + // Create notification about automatic deep scan + let notification = crate::models::CreateNotification { + notification_type: "info".to_string(), + title: "Automatic Deep Scan Triggered".to_string(), + message: format!("Starting deep scan for {}: {}", source.name, reason), + action_url: Some("/sources".to_string()), + metadata: Some(serde_json::json!({ + "source_type": source.source_type.to_string(), + "source_id": source.id, + "scan_type": "deep_scan", + "trigger_reason": reason, + "automatic": true + })), + }; + + if let Err(e) = state.db.create_notification(source.user_id, ¬ification).await { + error!("Failed to create deep scan notification: {}", e); + } + + // Trigger the deep scan via the API endpoint + // We'll reuse the existing deep scan logic from the sources route + let webdav_config: WebDAVSourceConfig = serde_json::from_value(source.config.clone())?; + let webdav_service = crate::services::webdav_service::WebDAVService::new( + crate::services::webdav_service::WebDAVConfig { + server_url: webdav_config.server_url.clone(), + username: webdav_config.username.clone(), + password: webdav_config.password.clone(), + watch_folders: webdav_config.watch_folders.clone(), + file_extensions: webdav_config.file_extensions.clone(), + timeout_seconds: 600, // 10 minutes for deep scan + server_type: webdav_config.server_type.clone(), + } + )?; + + // Run deep scan in background + let source_clone = source.clone(); + let state_clone = state.clone(); + tokio::spawn(async move { + match webdav_service.deep_scan_with_guaranteed_completeness(source_clone.user_id, &state_clone).await { + Ok(files) => { + info!("🎉 Automatic deep scan completed for {}: {} files found", source_clone.name, files.len()); + + // Process the files if any were found + let files_processed = if !files.is_empty() { + let total_files = files.len(); + // Filter and process files as in the manual deep scan + let files_to_process: Vec<_> = files.into_iter() + .filter(|file_info| { + if file_info.is_directory { + return false; + } + let file_extension = std::path::Path::new(&file_info.name) + .extension() + .and_then(|ext| ext.to_str()) + .unwrap_or("") + .to_lowercase(); + webdav_config.file_extensions.contains(&file_extension) + }) + .collect(); + + let processed_count = files_to_process.len(); + + if let Err(e) = crate::routes::webdav::webdav_sync::process_files_for_deep_scan( + state_clone.clone(), + source_clone.user_id, + &webdav_service, + &files_to_process, + true, // enable background OCR + Some(source_clone.id) + ).await { + error!("Failed to process files from automatic deep scan: {}", e); + } + + processed_count + } else { + 0 + }; + + // Success notification + let notification = crate::models::CreateNotification { + notification_type: "success".to_string(), + title: "Automatic Deep Scan Completed".to_string(), + message: format!("Deep scan of {} completed successfully", source_clone.name), + action_url: Some("/documents".to_string()), + metadata: Some(serde_json::json!({ + "source_type": source_clone.source_type.to_string(), + "source_id": source_clone.id, + "scan_type": "deep_scan", + "automatic": true, + "files_found": files_processed + })), + }; + + if let Err(e) = state_clone.db.create_notification(source_clone.user_id, ¬ification).await { + error!("Failed to create success notification: {}", e); + } + } + Err(e) => { + error!("Automatic deep scan failed for {}: {}", source_clone.name, e); + + // Error notification + let notification = crate::models::CreateNotification { + notification_type: "error".to_string(), + title: "Automatic Deep Scan Failed".to_string(), + message: format!("Deep scan of {} failed: {}", source_clone.name, e), + action_url: Some("/sources".to_string()), + metadata: Some(serde_json::json!({ + "source_type": source_clone.source_type.to_string(), + "source_id": source_clone.id, + "scan_type": "deep_scan", + "automatic": true, + "error": e.to_string() + })), + }; + + if let Err(e) = state_clone.db.create_notification(source_clone.user_id, ¬ification).await { + error!("Failed to create error notification: {}", e); + } + } + } + }); + } + + Ok(()) + } + + /// Perform automatic validation of source health and connectivity + pub async fn validate_source_health( + source: &crate::models::Source, + state: &Arc, + ) -> Result<(), Box> { + info!("🔍 Starting validation check for source: {}", source.name); + + let mut validation_score = 100; + let mut validation_issues = Vec::::new(); + let mut validation_status = "healthy"; + + // 1. Configuration validation + if let Err(config_error) = Self::validate_source_config_detailed(source) { + validation_score -= 30; + validation_status = "critical"; + validation_issues.push(serde_json::json!({ + "type": "configuration", + "severity": "critical", + "message": format!("Configuration error: {}", config_error), + "recommendation": "Check and fix source configuration in settings" + })); + } + + // 2. Connectivity validation + match source.source_type { + crate::models::SourceType::WebDAV => { + if let Err(e) = Self::validate_webdav_connectivity(source).await { + validation_score -= 25; + if validation_status == "healthy" { validation_status = "warning"; } + validation_issues.push(serde_json::json!({ + "type": "connectivity", + "severity": "warning", + "message": format!("WebDAV connectivity issue: {}", e), + "recommendation": "Check server URL, credentials, and network connectivity" + })); + } + } + crate::models::SourceType::LocalFolder => { + if let Err(e) = Self::validate_local_folder_access(source).await { + validation_score -= 25; + if validation_status == "healthy" { validation_status = "warning"; } + validation_issues.push(serde_json::json!({ + "type": "connectivity", + "severity": "warning", + "message": format!("Local folder access issue: {}", e), + "recommendation": "Check folder permissions and path accessibility" + })); + } + } + crate::models::SourceType::S3 => { + if let Err(e) = Self::validate_s3_connectivity(source).await { + validation_score -= 25; + if validation_status == "healthy" { validation_status = "warning"; } + validation_issues.push(serde_json::json!({ + "type": "connectivity", + "severity": "warning", + "message": format!("S3 connectivity issue: {}", e), + "recommendation": "Check AWS credentials, bucket access, and permissions" + })); + } + } + } + + // 3. Sync pattern analysis + if let Ok(sync_health) = Self::analyze_sync_patterns(source, state).await { + validation_score -= sync_health.score_penalty; + if sync_health.score_penalty > 15 && validation_status == "healthy" { + validation_status = "warning"; + } + for issue in sync_health.issues { + validation_issues.push(issue); + } + } + + // 4. Error rate analysis + if let Ok(error_analysis) = Self::analyze_error_patterns(source, state).await { + validation_score -= error_analysis.score_penalty; + if error_analysis.score_penalty > 20 { + validation_status = "warning"; + } + for issue in error_analysis.issues { + validation_issues.push(issue); + } + } + + // Cap the minimum score at 0 + validation_score = validation_score.max(0); + + // Update validation status in database + let validation_issues_json = serde_json::to_string(&validation_issues) + .unwrap_or_else(|_| "[]".to_string()); + + if let Err(e) = sqlx::query( + r#" + UPDATE sources + SET validation_status = $1, + last_validation_at = NOW(), + validation_score = $2, + validation_issues = $3, + updated_at = NOW() + WHERE id = $4 + "# + ) + .bind(validation_status) + .bind(validation_score) + .bind(validation_issues_json) + .bind(source.id) + .execute(state.db.get_pool()) + .await { + error!("Failed to update validation status: {}", e); + } + + // Send notification if there are critical issues + if validation_status == "critical" || validation_score < 50 { + let notification = crate::models::CreateNotification { + notification_type: if validation_status == "critical" { "error" } else { "warning" }.to_string(), + title: format!("Source Validation {}", if validation_status == "critical" { "Failed" } else { "Warning" }), + message: format!("Source {} has validation issues (score: {})", source.name, validation_score), + action_url: Some("/sources".to_string()), + metadata: Some(serde_json::json!({ + "source_type": source.source_type.to_string(), + "source_id": source.id, + "validation_type": "health_check", + "validation_score": validation_score, + "validation_status": validation_status, + "issue_count": validation_issues.len() + })), + }; + + if let Err(e) = state.db.create_notification(source.user_id, ¬ification).await { + error!("Failed to create validation notification: {}", e); + } + } + + info!("✅ Validation completed for {}: {} (score: {})", source.name, validation_status, validation_score); + Ok(()) + } + + fn validate_source_config_detailed(source: &crate::models::Source) -> Result<(), String> { + // Reuse existing validation logic but return more detailed errors + Self::validate_source_config_static(source) + } + + fn validate_source_config_static(source: &crate::models::Source) -> Result<(), String> { + use crate::models::{SourceType, WebDAVSourceConfig, S3SourceConfig, LocalFolderSourceConfig}; + + match source.source_type { + SourceType::WebDAV => { + let config: WebDAVSourceConfig = serde_json::from_value(source.config.clone()) + .map_err(|e| format!("Failed to parse WebDAV configuration: {}", e))?; + + if config.server_url.trim().is_empty() { + return Err("WebDAV server URL is empty".to_string()); + } + if config.username.trim().is_empty() { + return Err("WebDAV username is empty".to_string()); + } + if config.password.trim().is_empty() { + return Err("WebDAV password is empty".to_string()); + } + if config.watch_folders.is_empty() { + return Err("WebDAV watch folders list is empty".to_string()); + } + Ok(()) + } + SourceType::S3 => { + let _config: S3SourceConfig = serde_json::from_value(source.config.clone()) + .map_err(|e| format!("Failed to parse S3 configuration: {}", e))?; + Ok(()) + } + SourceType::LocalFolder => { + let _config: LocalFolderSourceConfig = serde_json::from_value(source.config.clone()) + .map_err(|e| format!("Failed to parse Local Folder configuration: {}", e))?; + Ok(()) + } + } + } + + async fn validate_webdav_connectivity(source: &crate::models::Source) -> Result<(), String> { + use crate::models::WebDAVSourceConfig; + + let config: WebDAVSourceConfig = serde_json::from_value(source.config.clone()) + .map_err(|e| format!("Config parse error: {}", e))?; + + let webdav_config = crate::services::webdav_service::WebDAVConfig { + server_url: config.server_url, + username: config.username, + password: config.password, + watch_folders: config.watch_folders, + file_extensions: config.file_extensions, + timeout_seconds: 30, // Quick connectivity test + server_type: config.server_type, + }; + + let webdav_service = crate::services::webdav_service::WebDAVService::new(webdav_config) + .map_err(|e| format!("Service creation failed: {}", e))?; + + let test_config = crate::models::WebDAVTestConnection { + server_url: config.server_url.clone(), + username: config.username.clone(), + password: config.password.clone(), + server_type: config.server_type.clone(), + }; + + webdav_service.test_connection(test_config).await + .map_err(|e| format!("Connection test failed: {}", e.message))?; + + Ok(()) + } + + async fn validate_local_folder_access(_source: &crate::models::Source) -> Result<(), String> { + // Simplified local folder validation - could be enhanced + // For now, just return OK as local folders are validated differently + Ok(()) + } + + async fn validate_s3_connectivity(_source: &crate::models::Source) -> Result<(), String> { + // Simplified S3 validation - could be enhanced with actual AWS SDK calls + // For now, just return OK as S3 validation requires more complex setup + Ok(()) + } + + + async fn analyze_sync_patterns( + source: &crate::models::Source, + state: &Arc + ) -> Result> { + let mut score_penalty = 0; + let mut issues = Vec::new(); + + // Check recent sync history + let sync_stats = sqlx::query( + r#" + SELECT + COUNT(*) as total_syncs, + SUM(CASE WHEN total_files_synced = 0 THEN 1 ELSE 0 END) as empty_syncs, + MAX(last_sync_at) as last_sync, + AVG(total_files_synced) as avg_files_per_sync + FROM sources + WHERE id = $1 AND last_sync_at >= NOW() - INTERVAL '7 days' + "# + ) + .bind(source.id) + .fetch_one(state.db.get_pool()) + .await?; + + let total_syncs: i64 = sync_stats.try_get("total_syncs").unwrap_or(0); + let empty_syncs: i64 = sync_stats.try_get("empty_syncs").unwrap_or(0); + + if total_syncs > 0 { + let empty_sync_ratio = (empty_syncs as f64) / (total_syncs as f64); + + if empty_sync_ratio > 0.8 { + score_penalty += 20; + issues.push(serde_json::json!({ + "type": "sync_pattern", + "severity": "warning", + "message": format!("High empty sync ratio: {:.1}% of recent syncs found no files", empty_sync_ratio * 100.0), + "recommendation": "This may indicate connectivity issues or that the source has no new content" + })); + } + + if total_syncs < 2 && chrono::Utc::now().signed_duration_since(source.created_at).num_days() > 1 { + score_penalty += 10; + issues.push(serde_json::json!({ + "type": "sync_pattern", + "severity": "info", + "message": "Very few syncs performed since source creation", + "recommendation": "Consider enabling auto-sync or manually triggering sync to ensure content is up to date" + })); + } + } + + Ok(SyncHealthAnalysis { score_penalty, issues }) + } + + + async fn analyze_error_patterns( + source: &crate::models::Source, + _state: &Arc + ) -> Result> { + let mut score_penalty = 0; + let mut issues = Vec::new(); + + // Check if source has recent errors + if let Some(last_error_at) = source.last_error_at { + let hours_since_error = chrono::Utc::now().signed_duration_since(last_error_at).num_hours(); + + if hours_since_error < 24 { + score_penalty += 15; + issues.push(serde_json::json!({ + "type": "error_pattern", + "severity": "warning", + "message": format!("Recent error occurred {} hours ago", hours_since_error), + "recommendation": format!("Last error: {}", source.last_error.as_deref().unwrap_or("Unknown error")) + })); + } + } + + // Check if source is in error state + if source.status == crate::models::SourceStatus::Error { + score_penalty += 25; + issues.push(serde_json::json!({ + "type": "error_pattern", + "severity": "critical", + "message": "Source is currently in error state", + "recommendation": "Review and fix the configuration or connectivity issues" + })); + } + + Ok(ErrorAnalysis { score_penalty, issues }) + } } \ No newline at end of file diff --git a/src/services/webdav_service.rs b/src/services/webdav_service.rs index 0da7b53..4f3d3dc 100644 --- a/src/services/webdav_service.rs +++ b/src/services/webdav_service.rs @@ -948,179 +948,6 @@ impl WebDAVService { } } } - - /// Get a list of directories that need targeted scanning based on recent changes - pub async fn get_directories_needing_scan(&self, user_id: uuid::Uuid, state: &crate::AppState, max_age_hours: i64) -> Result> { - let cutoff_time = chrono::Utc::now() - chrono::Duration::hours(max_age_hours); - - match state.db.list_webdav_directories(user_id).await { - Ok(directories) => { - let stale_dirs: Vec = directories.iter() - .filter(|dir| dir.last_scanned_at < cutoff_time) - .map(|dir| dir.directory_path.clone()) - .collect(); - - debug!("🕒 Found {} directories not scanned in last {} hours", stale_dirs.len(), max_age_hours); - Ok(stale_dirs) - } - Err(e) => { - error!("Failed to get directories needing scan: {}", e); - Err(e.into()) - } - } - } - - /// Smart sync mode that combines multiple optimization strategies - pub async fn discover_files_smart_sync(&self, watch_folders: &[String], user_id: uuid::Uuid, state: &crate::AppState) -> Result> { - debug!("🧠 Starting smart sync for {} watch folders", watch_folders.len()); - - let mut all_files = Vec::new(); - - for folder_path in watch_folders { - debug!("🔍 Smart sync processing folder: {}", folder_path); - - // Step 1: Try optimized discovery first (checks directory ETag) - let optimized_result = self.discover_files_in_folder_optimized(folder_path, user_id, state).await; - - match optimized_result { - Ok(files) => { - if !files.is_empty() { - debug!("✅ Optimized discovery found {} files in {}", files.len(), folder_path); - all_files.extend(files); - } else { - debug!("🔍 Directory {} unchanged, checking for stale subdirectories", folder_path); - - // Step 2: Check for stale subdirectories that need targeted scanning - let stale_dirs = self.get_stale_subdirectories(folder_path, user_id, state, 24).await?; - - if !stale_dirs.is_empty() { - debug!("🎯 Found {} stale subdirectories, performing targeted scan", stale_dirs.len()); - let targeted_files = self.discover_files_targeted_rescan(&stale_dirs, user_id, state).await?; - all_files.extend(targeted_files); - } else { - debug!("✅ All subdirectories of {} are fresh, no scan needed", folder_path); - } - } - } - Err(e) => { - warn!("Optimized discovery failed for {}, falling back to full scan: {}", folder_path, e); - // Fallback to traditional full scan - match self.discover_files_in_folder(folder_path).await { - Ok(files) => { - debug!("📂 Fallback scan found {} files in {}", files.len(), folder_path); - all_files.extend(files); - } - Err(fallback_error) => { - error!("Both optimized and fallback scans failed for {}: {}", folder_path, fallback_error); - return Err(fallback_error); - } - } - } - } - } - - debug!("🧠 Smart sync completed: {} total files discovered", all_files.len()); - Ok(all_files) - } - - /// Get subdirectories of a parent that haven't been scanned recently - async fn get_stale_subdirectories(&self, parent_path: &str, user_id: uuid::Uuid, state: &crate::AppState, max_age_hours: i64) -> Result> { - let cutoff_time = chrono::Utc::now() - chrono::Duration::hours(max_age_hours); - - match state.db.list_webdav_directories(user_id).await { - Ok(directories) => { - let stale_subdirs: Vec = directories.iter() - .filter(|dir| { - dir.directory_path.starts_with(parent_path) && - dir.directory_path != parent_path && - dir.last_scanned_at < cutoff_time - }) - .map(|dir| dir.directory_path.clone()) - .collect(); - - debug!("🕒 Found {} stale subdirectories under {} (not scanned in {} hours)", - stale_subdirs.len(), parent_path, max_age_hours); - Ok(stale_subdirs) - } - Err(e) => { - error!("Failed to get stale subdirectories: {}", e); - Err(e.into()) - } - } - } - - /// Perform incremental sync - only scan directories that have actually changed - pub async fn discover_files_incremental(&self, watch_folders: &[String], user_id: uuid::Uuid, state: &crate::AppState) -> Result> { - debug!("⚡ Starting incremental sync for {} watch folders", watch_folders.len()); - - let mut changed_files = Vec::new(); - let mut unchanged_count = 0; - let mut changed_count = 0; - - for folder_path in watch_folders { - // Check directory ETag to see if it changed - match self.check_directory_etag(folder_path).await { - Ok(current_etag) => { - let needs_scan = match state.db.get_webdav_directory(user_id, folder_path).await { - Ok(Some(stored_dir)) => { - if stored_dir.directory_etag != current_etag { - debug!("🔄 Directory {} changed (ETag: {} → {})", folder_path, stored_dir.directory_etag, current_etag); - changed_count += 1; - true - } else { - debug!("✅ Directory {} unchanged (ETag: {})", folder_path, current_etag); - unchanged_count += 1; - false - } - } - Ok(None) => { - debug!("🆕 New directory {} detected", folder_path); - changed_count += 1; - true - } - Err(e) => { - warn!("Database error for {}: {}, scanning to be safe", folder_path, e); - changed_count += 1; - true - } - }; - - if needs_scan { - // Directory changed - perform targeted scan - match self.discover_files_in_folder_optimized(folder_path, user_id, state).await { - Ok(mut files) => { - debug!("📂 Incremental scan found {} files in changed directory {}", files.len(), folder_path); - changed_files.append(&mut files); - } - Err(e) => { - error!("Failed incremental scan of {}: {}", folder_path, e); - } - } - } else { - // Directory unchanged - just update scan timestamp - let update = crate::models::UpdateWebDAVDirectory { - directory_etag: current_etag, - last_scanned_at: chrono::Utc::now(), - file_count: 0, // Will be updated by the database layer - total_size_bytes: 0, - }; - - if let Err(e) = state.db.update_webdav_directory(user_id, folder_path, &update).await { - warn!("Failed to update scan timestamp for {}: {}", folder_path, e); - } - } - } - Err(e) => { - error!("Failed to check directory ETag for {}: {}", folder_path, e); - } - } - } - - debug!("⚡ Incremental sync completed: {} unchanged, {} changed, {} total files found", - unchanged_count, changed_count, changed_files.len()); - - Ok(changed_files) - } /// Check subdirectories individually for changes when parent directory is unchanged async fn check_subdirectories_for_changes(&self, parent_path: &str, user_id: uuid::Uuid, state: &crate::AppState) -> Result> { @@ -1802,10 +1629,6 @@ impl WebDAVService { state.db.mark_webdav_scan_complete(user_id, path).await } - /// Resume a deep scan from a checkpoint after server restart/interruption - pub async fn resume_deep_scan(&self, checkpoint_path: &str, user_id: uuid::Uuid, state: &crate::AppState) -> Result> { - self.resume_deep_scan_internal(checkpoint_path, user_id, state).await - } /// Internal resume function that doesn't trigger crash recovery detection (to avoid recursion) async fn resume_deep_scan_internal(&self, checkpoint_path: &str, user_id: uuid::Uuid, state: &crate::AppState) -> Result> { @@ -1857,88 +1680,6 @@ impl WebDAVService { } } - /// Discover files in multiple folders concurrently with rate limiting - pub async fn discover_files_concurrent(&self, folders: &[String], user_id: uuid::Uuid, state: &crate::AppState) -> Result> { - if folders.is_empty() { - return Ok(Vec::new()); - } - - info!("🚀 Starting concurrent discovery for {} folders", folders.len()); - - let semaphore = std::sync::Arc::new(Semaphore::new(self.concurrency_config.max_concurrent_scans)); - let folders_stream = stream::iter(folders.iter()) - .map(|folder_path| { - let semaphore = semaphore.clone(); - let service = self.clone(); - let folder_path = folder_path.clone(); - async move { - let _permit = semaphore.acquire().await.map_err(|e| anyhow!("Semaphore error: {}", e))?; - - info!("📂 Scanning folder: {}", folder_path); - let start_time = std::time::Instant::now(); - - // Save checkpoint for resumption after interruption - let checkpoint_record = crate::models::CreateWebDAVDirectory { - user_id, - directory_path: folder_path.clone(), - directory_etag: "scanning".to_string(), // Temporary marker - file_count: 0, - total_size_bytes: 0, - }; - - if let Err(e) = state.db.create_or_update_webdav_directory(&checkpoint_record).await { - warn!("Failed to save scan checkpoint for {}: {}", folder_path, e); - } - - let result = service.discover_files_in_folder_optimized(&folder_path, user_id, state).await; - - match &result { - Ok(files) => { - let duration = start_time.elapsed(); - info!("✅ Completed folder {} in {:?}: {} files found", - folder_path, duration, files.len()); - } - Err(e) => { - // Check if this was a server restart/connection issue - if service.is_server_restart_error(e) { - warn!("🔄 Server restart detected during scan of {}, will resume later", folder_path); - // Keep checkpoint for resumption - return Err(anyhow!("Server restart detected: {}", e)); - } else { - error!("❌ Failed to scan folder {}: {}", folder_path, e); - } - } - } - - result.map(|files| (folder_path, files)) - } - }) - .buffer_unordered(self.concurrency_config.max_concurrent_scans); - - let mut all_files = Vec::new(); - let mut success_count = 0; - let mut error_count = 0; - - let mut folders_stream = std::pin::pin!(folders_stream); - while let Some(result) = folders_stream.next().await { - match result { - Ok((folder_path, mut files)) => { - debug!("📁 Folder {} contributed {} files", folder_path, files.len()); - all_files.append(&mut files); - success_count += 1; - } - Err(e) => { - warn!("Folder scan error: {}", e); - error_count += 1; - } - } - } - - info!("🎯 Concurrent discovery completed: {} folders successful, {} failed, {} total files", - success_count, error_count, all_files.len()); - - Ok(all_files) - } pub async fn download_file(&self, file_path: &str) -> Result> { self.retry_with_backoff("download_file", || {