feat(webdav): make sure to have scanned all subdirectories
This commit is contained in:
parent
69c40c10fa
commit
15b1f40cc1
|
|
@ -479,31 +479,16 @@ async fn trigger_deep_scan(
|
|||
tokio::spawn(async move {
|
||||
let start_time = chrono::Utc::now();
|
||||
|
||||
// Clear existing directory tracking to force full rescan
|
||||
if let Err(e) = state_clone.db.clear_webdav_directories(user_id).await {
|
||||
error!("Failed to clear WebDAV directories for deep scan: {}", e);
|
||||
}
|
||||
|
||||
// Use traditional discovery for deep scan to avoid borrowing issues
|
||||
let mut all_discovered_files = Vec::new();
|
||||
for folder in &config_clone.watch_folders {
|
||||
match webdav_service.discover_files_in_folder(folder).await {
|
||||
Ok(mut folder_files) => {
|
||||
info!("Deep scan discovered {} files in folder {}", folder_files.len(), folder);
|
||||
all_discovered_files.append(&mut folder_files);
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Deep scan failed to discover files in folder {}: {}", folder, e);
|
||||
// Continue with other folders
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !all_discovered_files.is_empty() {
|
||||
info!("Deep scan discovery completed for source {}: {} files found", source_id_clone, all_discovered_files.len());
|
||||
|
||||
// Filter files by extensions and process them
|
||||
let files_to_process: Vec<_> = all_discovered_files.into_iter()
|
||||
// Use guaranteed completeness deep scan method
|
||||
match webdav_service.deep_scan_with_guaranteed_completeness(user_id, &state_clone).await {
|
||||
Ok(all_discovered_files) => {
|
||||
info!("Deep scan with guaranteed completeness discovered {} files", all_discovered_files.len());
|
||||
|
||||
if !all_discovered_files.is_empty() {
|
||||
info!("Deep scan discovery completed for source {}: {} files found", source_id_clone, all_discovered_files.len());
|
||||
|
||||
// Filter files by extensions and process them
|
||||
let files_to_process: Vec<_> = all_discovered_files.into_iter()
|
||||
.filter(|file_info| {
|
||||
if file_info.is_directory {
|
||||
return false;
|
||||
|
|
@ -596,16 +581,47 @@ async fn trigger_deep_scan(
|
|||
}
|
||||
}
|
||||
|
||||
} else {
|
||||
info!("Deep scan found no files for source {}", source_id_clone);
|
||||
|
||||
// Update source status to idle even if no files found
|
||||
if let Err(e) = state_clone.db.update_source_status(
|
||||
source_id_clone,
|
||||
crate::models::SourceStatus::Idle,
|
||||
Some("Deep scan completed: no files found".to_string()),
|
||||
).await {
|
||||
error!("Failed to update source status after empty deep scan: {}", e);
|
||||
} else {
|
||||
info!("Deep scan found no files for source {}", source_id_clone);
|
||||
|
||||
// Update source status to idle even if no files found
|
||||
if let Err(e) = state_clone.db.update_source_status(
|
||||
source_id_clone,
|
||||
crate::models::SourceStatus::Idle,
|
||||
Some("Deep scan completed: no files found".to_string()),
|
||||
).await {
|
||||
error!("Failed to update source status after empty deep scan: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Deep scan with guaranteed completeness failed for source {}: {}", source_id_clone, e);
|
||||
|
||||
// Update source status to error
|
||||
if let Err(e2) = state_clone.db.update_source_status(
|
||||
source_id_clone,
|
||||
crate::models::SourceStatus::Error,
|
||||
Some(format!("Deep scan failed: {}", e)),
|
||||
).await {
|
||||
error!("Failed to update source status after deep scan error: {}", e2);
|
||||
}
|
||||
|
||||
// Send error notification
|
||||
let notification = crate::models::CreateNotification {
|
||||
notification_type: "error".to_string(),
|
||||
title: "Deep Scan Failed".to_string(),
|
||||
message: format!("Deep scan of {} failed: {}", source_name, e),
|
||||
action_url: Some("/sources".to_string()),
|
||||
metadata: Some(serde_json::json!({
|
||||
"source_id": source_id_clone,
|
||||
"scan_type": "deep_scan",
|
||||
"error": e.to_string()
|
||||
})),
|
||||
};
|
||||
|
||||
if let Err(e) = state_clone.db.create_notification(user_id, ¬ification).await {
|
||||
error!("Failed to create deep scan error notification: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
use anyhow::{anyhow, Result};
|
||||
use chrono::{DateTime, Utc};
|
||||
use reqwest::{Client, Method, Url};
|
||||
use reqwest::{Client, Method};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashSet;
|
||||
use std::time::Duration;
|
||||
use tokio::time::sleep;
|
||||
|
|
@ -76,6 +76,77 @@ pub struct WebDAVService {
|
|||
concurrency_config: ConcurrencyConfig,
|
||||
}
|
||||
|
||||
/// Report of ETag validation and directory integrity checks
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct ValidationReport {
|
||||
pub validation_id: uuid::Uuid,
|
||||
pub user_id: uuid::Uuid,
|
||||
pub started_at: chrono::DateTime<chrono::Utc>,
|
||||
pub completed_at: Option<chrono::DateTime<chrono::Utc>>,
|
||||
pub total_directories_checked: u32,
|
||||
pub issues_found: Vec<ValidationIssue>,
|
||||
pub recommendations: Vec<ValidationRecommendation>,
|
||||
pub etag_support_verified: bool,
|
||||
pub server_health_score: u8, // 0-100
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct ValidationIssue {
|
||||
pub issue_type: ValidationIssueType,
|
||||
pub directory_path: String,
|
||||
pub severity: ValidationSeverity,
|
||||
pub description: String,
|
||||
pub discovered_at: chrono::DateTime<chrono::Utc>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub enum ValidationIssueType {
|
||||
/// Directory exists on server but not in our tracking
|
||||
Untracked,
|
||||
/// Directory in our tracking but missing on server
|
||||
Missing,
|
||||
/// ETag mismatch between server and our cache
|
||||
ETagMismatch,
|
||||
/// Directory hasn't been scanned in a very long time
|
||||
Stale,
|
||||
/// Server errors when accessing directory
|
||||
Inaccessible,
|
||||
/// ETag support seems unreliable for this directory
|
||||
ETagUnreliable,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub enum ValidationSeverity {
|
||||
Info, // No action needed, just FYI
|
||||
Warning, // Should investigate but not urgent
|
||||
Error, // Needs immediate attention
|
||||
Critical, // System integrity at risk
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct ValidationRecommendation {
|
||||
pub action: ValidationAction,
|
||||
pub reason: String,
|
||||
pub affected_directories: Vec<String>,
|
||||
pub priority: ValidationSeverity,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub enum ValidationAction {
|
||||
/// Run a deep scan of specific directories
|
||||
DeepScanRequired,
|
||||
/// Clear and rebuild directory tracking
|
||||
RebuildTracking,
|
||||
/// ETag support is unreliable, switch to periodic scans
|
||||
DisableETagOptimization,
|
||||
/// Clean up orphaned database entries
|
||||
CleanupDatabase,
|
||||
/// Server configuration issue needs attention
|
||||
CheckServerConfiguration,
|
||||
/// No action needed, system is healthy
|
||||
NoActionRequired,
|
||||
}
|
||||
|
||||
impl WebDAVService {
|
||||
pub fn new(config: WebDAVConfig) -> Result<Self> {
|
||||
Self::new_with_configs(config, RetryConfig::default(), ConcurrencyConfig::default())
|
||||
|
|
@ -1925,4 +1996,635 @@ pub async fn test_webdav_connection(
|
|||
.await?;
|
||||
|
||||
Ok(response.status().is_success())
|
||||
}
|
||||
|
||||
impl WebDAVService {
|
||||
/// Validate ETag tracking integrity and directory consistency
|
||||
/// This replaces the need for periodic deep scans with intelligent validation
|
||||
pub async fn validate_etag_tracking(&self, user_id: uuid::Uuid, state: &crate::AppState) -> Result<ValidationReport> {
|
||||
let validation_id = uuid::Uuid::new_v4();
|
||||
let started_at = chrono::Utc::now();
|
||||
|
||||
info!("🔍 Starting ETag validation for user {} (validation_id: {})", user_id, validation_id);
|
||||
|
||||
let mut report = ValidationReport {
|
||||
validation_id,
|
||||
user_id,
|
||||
started_at,
|
||||
completed_at: None,
|
||||
total_directories_checked: 0,
|
||||
issues_found: Vec::new(),
|
||||
recommendations: Vec::new(),
|
||||
etag_support_verified: false,
|
||||
server_health_score: 100,
|
||||
};
|
||||
|
||||
// Step 1: Verify ETag support is still working
|
||||
match self.test_recursive_etag_support().await {
|
||||
Ok(supports_etags) => {
|
||||
report.etag_support_verified = supports_etags;
|
||||
if !supports_etags {
|
||||
report.issues_found.push(ValidationIssue {
|
||||
issue_type: ValidationIssueType::ETagUnreliable,
|
||||
directory_path: "server".to_string(),
|
||||
severity: ValidationSeverity::Critical,
|
||||
description: "Server no longer supports recursive ETags reliably".to_string(),
|
||||
discovered_at: chrono::Utc::now(),
|
||||
});
|
||||
report.server_health_score = 30;
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("Failed to test ETag support: {}", e);
|
||||
report.issues_found.push(ValidationIssue {
|
||||
issue_type: ValidationIssueType::ETagUnreliable,
|
||||
directory_path: "server".to_string(),
|
||||
severity: ValidationSeverity::Error,
|
||||
description: format!("Cannot verify ETag support: {}", e),
|
||||
discovered_at: chrono::Utc::now(),
|
||||
});
|
||||
report.server_health_score = 50;
|
||||
}
|
||||
}
|
||||
|
||||
// Step 2: Check tracked directories for issues
|
||||
match state.db.list_webdav_directories(user_id).await {
|
||||
Ok(tracked_dirs) => {
|
||||
report.total_directories_checked = tracked_dirs.len() as u32;
|
||||
|
||||
for tracked_dir in tracked_dirs {
|
||||
self.validate_single_directory(&tracked_dir, &mut report, state).await;
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to load tracked directories: {}", e);
|
||||
report.issues_found.push(ValidationIssue {
|
||||
issue_type: ValidationIssueType::Missing,
|
||||
directory_path: "database".to_string(),
|
||||
severity: ValidationSeverity::Critical,
|
||||
description: format!("Cannot access directory tracking database: {}", e),
|
||||
discovered_at: chrono::Utc::now(),
|
||||
});
|
||||
report.server_health_score = 10;
|
||||
}
|
||||
}
|
||||
|
||||
// Step 3: Sample a few watch directories to check for untracked directories
|
||||
for watch_folder in &self.config.watch_folders {
|
||||
if let Err(e) = self.check_for_untracked_directories(watch_folder, &mut report, user_id, state).await {
|
||||
warn!("Failed to check for untracked directories in {}: {}", watch_folder, e);
|
||||
}
|
||||
}
|
||||
|
||||
// Step 4: Generate recommendations based on issues found
|
||||
self.generate_validation_recommendations(&mut report);
|
||||
|
||||
report.completed_at = Some(chrono::Utc::now());
|
||||
let duration = report.completed_at.unwrap() - report.started_at;
|
||||
|
||||
info!("✅ ETag validation completed in {:.2}s. Health score: {}/100, {} issues found",
|
||||
duration.num_milliseconds() as f64 / 1000.0,
|
||||
report.server_health_score,
|
||||
report.issues_found.len());
|
||||
|
||||
Ok(report)
|
||||
}
|
||||
|
||||
/// Validate a single tracked directory
|
||||
async fn validate_single_directory(
|
||||
&self,
|
||||
tracked_dir: &crate::models::WebDAVDirectory,
|
||||
report: &mut ValidationReport,
|
||||
state: &crate::AppState
|
||||
) {
|
||||
let relative_path = self.convert_to_relative_path(&tracked_dir.directory_path);
|
||||
|
||||
// Check if directory still exists and get current ETag
|
||||
match self.check_directory_etag(&relative_path).await {
|
||||
Ok(current_etag) => {
|
||||
// Check for ETag mismatch
|
||||
if current_etag != tracked_dir.directory_etag {
|
||||
report.issues_found.push(ValidationIssue {
|
||||
issue_type: ValidationIssueType::ETagMismatch,
|
||||
directory_path: tracked_dir.directory_path.clone(),
|
||||
severity: ValidationSeverity::Warning,
|
||||
description: format!("ETag changed from '{}' to '{}' - directory may need rescanning",
|
||||
tracked_dir.directory_etag, current_etag),
|
||||
discovered_at: chrono::Utc::now(),
|
||||
});
|
||||
report.server_health_score = report.server_health_score.saturating_sub(5);
|
||||
}
|
||||
|
||||
// Check for stale directories (not scanned in >7 days)
|
||||
let last_scanned = tracked_dir.last_scanned_at;
|
||||
let duration = chrono::Utc::now() - last_scanned;
|
||||
let days_old = duration.num_days();
|
||||
if days_old > 7 {
|
||||
report.issues_found.push(ValidationIssue {
|
||||
issue_type: ValidationIssueType::Stale,
|
||||
directory_path: tracked_dir.directory_path.clone(),
|
||||
severity: if days_old > 30 { ValidationSeverity::Warning } else { ValidationSeverity::Info },
|
||||
description: format!("Directory not scanned for {} days", days_old),
|
||||
discovered_at: chrono::Utc::now(),
|
||||
});
|
||||
if days_old > 30 {
|
||||
report.server_health_score = report.server_health_score.saturating_sub(3);
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
// Directory inaccessible or missing
|
||||
report.issues_found.push(ValidationIssue {
|
||||
issue_type: ValidationIssueType::Inaccessible,
|
||||
directory_path: tracked_dir.directory_path.clone(),
|
||||
severity: ValidationSeverity::Error,
|
||||
description: format!("Cannot access directory: {}", e),
|
||||
discovered_at: chrono::Utc::now(),
|
||||
});
|
||||
report.server_health_score = report.server_health_score.saturating_sub(10);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Check for directories that exist on server but aren't tracked
|
||||
async fn check_for_untracked_directories(
|
||||
&self,
|
||||
watch_folder: &str,
|
||||
report: &mut ValidationReport,
|
||||
user_id: uuid::Uuid,
|
||||
state: &crate::AppState
|
||||
) -> Result<()> {
|
||||
let relative_watch_folder = self.convert_to_relative_path(watch_folder);
|
||||
|
||||
// Get shallow listing of watch folder
|
||||
match self.discover_files_in_folder_shallow(&relative_watch_folder).await {
|
||||
Ok(entries) => {
|
||||
// Find directories
|
||||
let server_dirs: Vec<_> = entries.iter()
|
||||
.filter(|e| e.is_directory)
|
||||
.collect();
|
||||
|
||||
// Check if each directory is tracked
|
||||
for server_dir in server_dirs {
|
||||
match state.db.get_webdav_directory(user_id, &server_dir.path).await {
|
||||
Ok(None) => {
|
||||
// Directory exists on server but not tracked
|
||||
report.issues_found.push(ValidationIssue {
|
||||
issue_type: ValidationIssueType::Untracked,
|
||||
directory_path: server_dir.path.clone(),
|
||||
severity: ValidationSeverity::Info,
|
||||
description: "Directory exists on server but not in tracking database".to_string(),
|
||||
discovered_at: chrono::Utc::now(),
|
||||
});
|
||||
report.server_health_score = report.server_health_score.saturating_sub(2);
|
||||
}
|
||||
Ok(Some(_)) => {
|
||||
// Directory is tracked, all good
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("Database error checking directory {}: {}", server_dir.path, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
return Err(anyhow!("Failed to list watch folder {}: {}", watch_folder, e));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Generate actionable recommendations based on validation issues
|
||||
fn generate_validation_recommendations(&self, report: &mut ValidationReport) {
|
||||
let mut etag_mismatches = Vec::new();
|
||||
let mut untracked_dirs = Vec::new();
|
||||
let mut inaccessible_dirs = Vec::new();
|
||||
let mut stale_dirs = Vec::new();
|
||||
let mut etag_unreliable = false;
|
||||
|
||||
// Categorize issues
|
||||
for issue in &report.issues_found {
|
||||
match issue.issue_type {
|
||||
ValidationIssueType::ETagMismatch => etag_mismatches.push(issue.directory_path.clone()),
|
||||
ValidationIssueType::Untracked => untracked_dirs.push(issue.directory_path.clone()),
|
||||
ValidationIssueType::Inaccessible => inaccessible_dirs.push(issue.directory_path.clone()),
|
||||
ValidationIssueType::Stale => stale_dirs.push(issue.directory_path.clone()),
|
||||
ValidationIssueType::ETagUnreliable => etag_unreliable = true,
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
// Generate recommendations
|
||||
if etag_unreliable {
|
||||
report.recommendations.push(ValidationRecommendation {
|
||||
action: ValidationAction::DisableETagOptimization,
|
||||
reason: "ETag support is unreliable, consider switching to periodic deep scans".to_string(),
|
||||
affected_directories: vec!["all".to_string()],
|
||||
priority: ValidationSeverity::Critical,
|
||||
});
|
||||
} else if !etag_mismatches.is_empty() {
|
||||
report.recommendations.push(ValidationRecommendation {
|
||||
action: ValidationAction::DeepScanRequired,
|
||||
reason: format!("{} directories have ETag mismatches and need rescanning", etag_mismatches.len()),
|
||||
affected_directories: etag_mismatches,
|
||||
priority: ValidationSeverity::Warning,
|
||||
});
|
||||
}
|
||||
|
||||
if !untracked_dirs.is_empty() {
|
||||
report.recommendations.push(ValidationRecommendation {
|
||||
action: ValidationAction::DeepScanRequired,
|
||||
reason: format!("{} untracked directories found on server", untracked_dirs.len()),
|
||||
affected_directories: untracked_dirs,
|
||||
priority: ValidationSeverity::Info,
|
||||
});
|
||||
}
|
||||
|
||||
if !inaccessible_dirs.is_empty() {
|
||||
report.recommendations.push(ValidationRecommendation {
|
||||
action: ValidationAction::CheckServerConfiguration,
|
||||
reason: format!("{} directories are inaccessible", inaccessible_dirs.len()),
|
||||
affected_directories: inaccessible_dirs,
|
||||
priority: ValidationSeverity::Error,
|
||||
});
|
||||
}
|
||||
|
||||
if !stale_dirs.is_empty() && stale_dirs.len() > 10 {
|
||||
report.recommendations.push(ValidationRecommendation {
|
||||
action: ValidationAction::DeepScanRequired,
|
||||
reason: format!("{} directories haven't been scanned recently", stale_dirs.len()),
|
||||
affected_directories: stale_dirs,
|
||||
priority: ValidationSeverity::Info,
|
||||
});
|
||||
}
|
||||
|
||||
// If no major issues, everything is healthy
|
||||
if report.recommendations.is_empty() {
|
||||
report.recommendations.push(ValidationRecommendation {
|
||||
action: ValidationAction::NoActionRequired,
|
||||
reason: "ETag tracking system is healthy and working correctly".to_string(),
|
||||
affected_directories: Vec::new(),
|
||||
priority: ValidationSeverity::Info,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/// Check if we should trigger a deep scan based on validation results
|
||||
pub fn should_trigger_deep_scan(&self, report: &ValidationReport) -> (bool, String) {
|
||||
// Critical issues always trigger deep scan
|
||||
let critical_issues = report.issues_found.iter()
|
||||
.filter(|issue| matches!(issue.severity, ValidationSeverity::Critical))
|
||||
.count();
|
||||
|
||||
if critical_issues > 0 {
|
||||
return (true, format!("{} critical issues detected", critical_issues));
|
||||
}
|
||||
|
||||
// Multiple ETag mismatches suggest systematic issues
|
||||
let etag_mismatches = report.issues_found.iter()
|
||||
.filter(|issue| matches!(issue.issue_type, ValidationIssueType::ETagMismatch))
|
||||
.count();
|
||||
|
||||
if etag_mismatches > 5 {
|
||||
return (true, format!("{} ETag mismatches suggest synchronization issues", etag_mismatches));
|
||||
}
|
||||
|
||||
// Many untracked directories suggest incomplete initial scan
|
||||
let untracked = report.issues_found.iter()
|
||||
.filter(|issue| matches!(issue.issue_type, ValidationIssueType::Untracked))
|
||||
.count();
|
||||
|
||||
if untracked > 10 {
|
||||
return (true, format!("{} untracked directories found", untracked));
|
||||
}
|
||||
|
||||
// Low health score indicates general problems
|
||||
if report.server_health_score < 70 {
|
||||
return (true, format!("Low server health score: {}/100", report.server_health_score));
|
||||
}
|
||||
|
||||
(false, "System appears healthy, no deep scan needed".to_string())
|
||||
}
|
||||
|
||||
/// Ensure complete directory tree discovery before marking deep scan as complete
|
||||
/// This is the MOST CRITICAL function - guarantees we've found ALL subdirectories
|
||||
pub async fn ensure_complete_directory_discovery(&self, user_id: uuid::Uuid, state: &crate::AppState) -> Result<DirectoryDiscoveryReport> {
|
||||
info!("🔍 Starting complete directory tree discovery verification");
|
||||
|
||||
let mut report = DirectoryDiscoveryReport {
|
||||
discovery_id: uuid::Uuid::new_v4(),
|
||||
user_id,
|
||||
started_at: chrono::Utc::now(),
|
||||
completed_at: None,
|
||||
watch_folders_processed: Vec::new(),
|
||||
total_directories_discovered: 0,
|
||||
new_directories_found: 0,
|
||||
missing_directories_detected: 0,
|
||||
is_complete: false,
|
||||
issues: Vec::new(),
|
||||
};
|
||||
|
||||
// Process each watch folder to ensure complete discovery
|
||||
for watch_folder in &self.config.watch_folders {
|
||||
info!("📂 Ensuring complete discovery for watch folder: {}", watch_folder);
|
||||
|
||||
match self.ensure_watch_folder_complete_discovery(watch_folder, user_id, state, &mut report).await {
|
||||
Ok(folder_report) => {
|
||||
report.watch_folders_processed.push(folder_report);
|
||||
}
|
||||
Err(e) => {
|
||||
error!("❌ Failed to ensure complete discovery for {}: {}", watch_folder, e);
|
||||
report.issues.push(format!("Failed to process {}: {}", watch_folder, e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Verify completeness by checking for any gaps
|
||||
self.verify_directory_tree_completeness(&mut report, user_id, state).await?;
|
||||
|
||||
report.completed_at = Some(chrono::Utc::now());
|
||||
let duration = report.completed_at.unwrap() - report.started_at;
|
||||
|
||||
if report.is_complete {
|
||||
info!("✅ Complete directory discovery verified in {:.2}s. {} total directories, {} newly discovered",
|
||||
duration.num_milliseconds() as f64 / 1000.0,
|
||||
report.total_directories_discovered,
|
||||
report.new_directories_found);
|
||||
} else {
|
||||
warn!("⚠️ Directory discovery incomplete after {:.2}s. {} issues found",
|
||||
duration.num_milliseconds() as f64 / 1000.0,
|
||||
report.issues.len());
|
||||
}
|
||||
|
||||
Ok(report)
|
||||
}
|
||||
|
||||
/// Ensure a single watch folder has complete n-depth directory discovery
|
||||
async fn ensure_watch_folder_complete_discovery(
|
||||
&self,
|
||||
watch_folder: &str,
|
||||
user_id: uuid::Uuid,
|
||||
state: &crate::AppState,
|
||||
main_report: &mut DirectoryDiscoveryReport
|
||||
) -> Result<WatchFolderDiscoveryReport> {
|
||||
let mut folder_report = WatchFolderDiscoveryReport {
|
||||
watch_folder: watch_folder.to_string(),
|
||||
total_directories: 0,
|
||||
new_directories: 0,
|
||||
depth_levels_scanned: 0,
|
||||
is_complete: false,
|
||||
};
|
||||
|
||||
// Use PROPFIND with Depth: infinity to get COMPLETE directory tree
|
||||
let relative_watch_folder = self.convert_to_relative_path(watch_folder);
|
||||
let all_entries = self.discover_files_in_folder_impl(&relative_watch_folder).await?;
|
||||
|
||||
// Extract ALL directories from the complete scan
|
||||
let all_server_directories: Vec<_> = all_entries.iter()
|
||||
.filter(|entry| entry.is_directory)
|
||||
.collect();
|
||||
|
||||
folder_report.total_directories = all_server_directories.len();
|
||||
main_report.total_directories_discovered += all_server_directories.len();
|
||||
|
||||
// Calculate depth levels
|
||||
let max_depth = all_server_directories.iter()
|
||||
.map(|dir| dir.path.chars().filter(|&c| c == '/').count())
|
||||
.max()
|
||||
.unwrap_or(0);
|
||||
folder_report.depth_levels_scanned = max_depth;
|
||||
|
||||
info!("📊 Found {} directories across {} depth levels in {}",
|
||||
all_server_directories.len(), max_depth, watch_folder);
|
||||
|
||||
// Check each directory against our tracking database
|
||||
for server_dir in &all_server_directories {
|
||||
match state.db.get_webdav_directory(user_id, &server_dir.path).await {
|
||||
Ok(Some(tracked_dir)) => {
|
||||
// Directory is already tracked - verify ETag is current
|
||||
if tracked_dir.directory_etag != server_dir.etag {
|
||||
debug!("🔄 Updating ETag for tracked directory: {}", server_dir.path);
|
||||
let update = crate::models::UpdateWebDAVDirectory {
|
||||
directory_etag: server_dir.etag.clone(),
|
||||
last_scanned_at: chrono::Utc::now(),
|
||||
file_count: 0, // Will be calculated separately
|
||||
total_size_bytes: 0,
|
||||
};
|
||||
if let Err(e) = state.db.update_webdav_directory(user_id, &server_dir.path, &update).await {
|
||||
warn!("Failed to update directory {}: {}", server_dir.path, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(None) => {
|
||||
// NEW DIRECTORY DISCOVERED - this is critical to track
|
||||
info!("🆕 NEW directory discovered: {}", server_dir.path);
|
||||
folder_report.new_directories += 1;
|
||||
main_report.new_directories_found += 1;
|
||||
|
||||
// Immediately add to tracking database
|
||||
let new_dir = crate::models::CreateWebDAVDirectory {
|
||||
user_id,
|
||||
directory_path: server_dir.path.clone(),
|
||||
directory_etag: server_dir.etag.clone(),
|
||||
file_count: 0, // Will be calculated when files are processed
|
||||
total_size_bytes: 0,
|
||||
};
|
||||
|
||||
if let Err(e) = state.db.create_or_update_webdav_directory(&new_dir).await {
|
||||
error!("❌ CRITICAL: Failed to track new directory {}: {}", server_dir.path, e);
|
||||
main_report.issues.push(format!("Failed to track new directory {}: {}", server_dir.path, e));
|
||||
} else {
|
||||
debug!("✅ Successfully tracking new directory: {}", server_dir.path);
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Database error checking directory {}: {}", server_dir.path, e);
|
||||
main_report.issues.push(format!("Database error for {}: {}", server_dir.path, e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Check for orphaned tracking entries (directories we track but don't exist on server)
|
||||
match state.db.list_webdav_directories(user_id).await {
|
||||
Ok(tracked_dirs) => {
|
||||
let server_paths: HashSet<String> = all_server_directories.iter()
|
||||
.map(|d| d.path.clone())
|
||||
.collect();
|
||||
|
||||
for tracked_dir in tracked_dirs {
|
||||
if tracked_dir.directory_path.starts_with(watch_folder) && !server_paths.contains(&tracked_dir.directory_path) {
|
||||
warn!("🗑️ Orphaned directory tracking detected: {} (exists in DB but not on server)", tracked_dir.directory_path);
|
||||
main_report.missing_directories_detected += 1;
|
||||
|
||||
// Could optionally clean up orphaned entries here
|
||||
// For now, just report them
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to check for orphaned directories: {}", e);
|
||||
main_report.issues.push(format!("Failed to check orphaned directories: {}", e));
|
||||
}
|
||||
}
|
||||
|
||||
folder_report.is_complete = folder_report.new_directories == 0 || main_report.issues.is_empty();
|
||||
Ok(folder_report)
|
||||
}
|
||||
|
||||
/// Final verification that directory tree coverage is complete
|
||||
async fn verify_directory_tree_completeness(
|
||||
&self,
|
||||
report: &mut DirectoryDiscoveryReport,
|
||||
user_id: uuid::Uuid,
|
||||
state: &crate::AppState
|
||||
) -> Result<()> {
|
||||
info!("🔍 Performing final completeness verification");
|
||||
|
||||
// Check that we have no scan_in_progress flags left over
|
||||
match state.db.get_incomplete_webdav_scans(user_id).await {
|
||||
Ok(incomplete) => {
|
||||
if !incomplete.is_empty() {
|
||||
warn!("⚠️ Found {} incomplete scans still in progress", incomplete.len());
|
||||
report.issues.push(format!("{} scans still marked as in progress", incomplete.len()));
|
||||
report.is_complete = false;
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to check incomplete scans: {}", e);
|
||||
report.issues.push(format!("Cannot verify scan completeness: {}", e));
|
||||
report.is_complete = false;
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
||||
// Verify each watch folder has at least some tracked directories
|
||||
for watch_folder in &self.config.watch_folders {
|
||||
match state.db.list_webdav_directories(user_id).await {
|
||||
Ok(dirs) => {
|
||||
let watch_folder_dirs = dirs.iter()
|
||||
.filter(|d| d.directory_path.starts_with(watch_folder))
|
||||
.count();
|
||||
|
||||
if watch_folder_dirs == 0 {
|
||||
warn!("⚠️ No directories tracked for watch folder: {}", watch_folder);
|
||||
report.issues.push(format!("No directories tracked for watch folder: {}", watch_folder));
|
||||
report.is_complete = false;
|
||||
} else {
|
||||
debug!("✅ Watch folder {} has {} tracked directories", watch_folder, watch_folder_dirs);
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to verify watch folder {}: {}", watch_folder, e);
|
||||
report.issues.push(format!("Cannot verify watch folder {}: {}", watch_folder, e));
|
||||
report.is_complete = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If no issues found, mark as complete
|
||||
if report.issues.is_empty() {
|
||||
report.is_complete = true;
|
||||
info!("✅ Directory tree completeness verified - all {} watch folders fully discovered", self.config.watch_folders.len());
|
||||
} else {
|
||||
warn!("❌ Directory tree completeness verification failed: {} issues", report.issues.len());
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Modified deep scan that REQUIRES complete directory discovery
|
||||
pub async fn deep_scan_with_guaranteed_completeness(&self, user_id: uuid::Uuid, state: &crate::AppState) -> Result<Vec<FileInfo>> {
|
||||
info!("🚀 Starting deep scan with guaranteed directory completeness");
|
||||
|
||||
let scan_id = uuid::Uuid::new_v4();
|
||||
let started_at = chrono::Utc::now();
|
||||
|
||||
// STEP 1: CRITICAL - Ensure complete directory discovery FIRST
|
||||
let discovery_report = self.ensure_complete_directory_discovery(user_id, state).await?;
|
||||
|
||||
if !discovery_report.is_complete {
|
||||
return Err(anyhow!("Cannot proceed with deep scan: Directory discovery incomplete. {} issues found: {:?}",
|
||||
discovery_report.issues.len(), discovery_report.issues));
|
||||
}
|
||||
|
||||
info!("✅ Directory discovery complete - proceeding with file processing");
|
||||
|
||||
// STEP 2: Only now process files, knowing we have complete directory coverage
|
||||
let mut all_files = Vec::new();
|
||||
for watch_folder in &self.config.watch_folders {
|
||||
match self.smart_directory_scan_with_checkpoints(watch_folder, None, user_id, state).await {
|
||||
Ok(mut files) => {
|
||||
info!("📁 Processed {} files from {}", files.len(), watch_folder);
|
||||
all_files.append(&mut files);
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to process files in {}: {}", watch_folder, e);
|
||||
return Err(anyhow!("File processing failed for {}: {}", watch_folder, e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// STEP 3: Final verification that nothing was missed
|
||||
let final_verification = self.ensure_complete_directory_discovery(user_id, state).await?;
|
||||
let is_complete = final_verification.is_complete && final_verification.new_directories_found == 0;
|
||||
|
||||
if final_verification.new_directories_found > 0 {
|
||||
warn!("⚠️ Found {} additional directories during final verification - scan may need to restart",
|
||||
final_verification.new_directories_found);
|
||||
}
|
||||
|
||||
let completed_at = chrono::Utc::now();
|
||||
let duration = completed_at - started_at;
|
||||
|
||||
if is_complete {
|
||||
info!("🎉 DEEP SCAN COMPLETE WITH GUARANTEED COMPLETENESS: {} files processed, {} directories tracked in {:.2}s",
|
||||
all_files.len(),
|
||||
discovery_report.total_directories_discovered,
|
||||
duration.num_milliseconds() as f64 / 1000.0);
|
||||
} else {
|
||||
warn!("⚠️ Deep scan completed but completeness not guaranteed: {:.2}s",
|
||||
duration.num_milliseconds() as f64 / 1000.0);
|
||||
}
|
||||
|
||||
Ok(all_files)
|
||||
}
|
||||
}
|
||||
|
||||
/// Report of complete directory tree discovery
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct DirectoryDiscoveryReport {
|
||||
pub discovery_id: uuid::Uuid,
|
||||
pub user_id: uuid::Uuid,
|
||||
pub started_at: chrono::DateTime<chrono::Utc>,
|
||||
pub completed_at: Option<chrono::DateTime<chrono::Utc>>,
|
||||
pub watch_folders_processed: Vec<WatchFolderDiscoveryReport>,
|
||||
pub total_directories_discovered: usize,
|
||||
pub new_directories_found: usize,
|
||||
pub missing_directories_detected: usize,
|
||||
pub is_complete: bool,
|
||||
pub issues: Vec<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct WatchFolderDiscoveryReport {
|
||||
pub watch_folder: String,
|
||||
pub total_directories: usize,
|
||||
pub new_directories: usize,
|
||||
pub depth_levels_scanned: usize,
|
||||
pub is_complete: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct CompleteDeepScanReport {
|
||||
pub scan_id: uuid::Uuid,
|
||||
pub user_id: uuid::Uuid,
|
||||
pub started_at: chrono::DateTime<chrono::Utc>,
|
||||
pub completed_at: chrono::DateTime<chrono::Utc>,
|
||||
pub directory_discovery_report: DirectoryDiscoveryReport,
|
||||
pub final_verification_report: DirectoryDiscoveryReport,
|
||||
pub total_files_processed: usize,
|
||||
pub scan_duration_seconds: i64,
|
||||
pub is_guaranteed_complete: bool,
|
||||
}
|
||||
Loading…
Reference in New Issue