feat(webdav): move to tracking all subdirectory etags
This commit is contained in:
parent
3c09e80d6d
commit
8c9d5dc12b
|
|
@ -270,20 +270,28 @@ pub async fn trigger_deep_scan(
|
|||
tokio::spawn(async move {
|
||||
let start_time = chrono::Utc::now();
|
||||
|
||||
// Use guaranteed completeness deep scan method
|
||||
match webdav_service.discover_all_files().await {
|
||||
Ok(all_discovered_files) => {
|
||||
info!("Deep scan with guaranteed completeness discovered {} files", all_discovered_files.len());
|
||||
// Use smart sync service for deep scans - this will properly reset directory ETags
|
||||
let smart_sync_service = crate::services::webdav::SmartSyncService::new(state_clone.clone());
|
||||
let mut all_files_to_process = Vec::new();
|
||||
let mut total_directories_tracked = 0;
|
||||
|
||||
if !all_discovered_files.is_empty() {
|
||||
info!("Deep scan discovery completed for source {}: {} files found", source_id_clone, all_discovered_files.len());
|
||||
// Process each watch folder using smart sync
|
||||
for watch_folder in &webdav_config.watch_folders {
|
||||
info!("🔍 Deep scan processing watch folder: {}", watch_folder);
|
||||
|
||||
// Filter files by extensions and process them
|
||||
let files_to_process: Vec<_> = all_discovered_files.into_iter()
|
||||
match smart_sync_service.perform_smart_sync(
|
||||
user_id,
|
||||
&webdav_service,
|
||||
watch_folder,
|
||||
crate::services::webdav::SmartSyncStrategy::FullDeepScan // Force deep scan for directory reset
|
||||
).await {
|
||||
Ok(sync_result) => {
|
||||
info!("Deep scan found {} files and {} directories in {}",
|
||||
sync_result.files.len(), sync_result.directories.len(), watch_folder);
|
||||
|
||||
// Filter files by extensions
|
||||
let filtered_files: Vec<_> = sync_result.files.into_iter()
|
||||
.filter(|file_info| {
|
||||
if file_info.is_directory {
|
||||
return false;
|
||||
}
|
||||
let file_extension = std::path::Path::new(&file_info.name)
|
||||
.extension()
|
||||
.and_then(|ext| ext.to_str())
|
||||
|
|
@ -293,14 +301,26 @@ pub async fn trigger_deep_scan(
|
|||
})
|
||||
.collect();
|
||||
|
||||
info!("Deep scan will process {} files for source {}", files_to_process.len(), source_id_clone);
|
||||
all_files_to_process.extend(filtered_files);
|
||||
total_directories_tracked += sync_result.directories.len();
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Deep scan failed for watch folder {}: {}", watch_folder, e);
|
||||
// Continue with other folders rather than failing completely
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !all_files_to_process.is_empty() {
|
||||
info!("Deep scan will process {} files from {} directories for source {}",
|
||||
all_files_to_process.len(), total_directories_tracked, source_id_clone);
|
||||
|
||||
// Process files using the existing sync mechanism
|
||||
match crate::routes::webdav::webdav_sync::process_files_for_deep_scan(
|
||||
state_clone.clone(),
|
||||
user_id,
|
||||
&webdav_service,
|
||||
&files_to_process,
|
||||
&all_files_to_process,
|
||||
true, // enable background OCR
|
||||
Some(source_id_clone)
|
||||
).await {
|
||||
|
|
@ -323,9 +343,10 @@ pub async fn trigger_deep_scan(
|
|||
notification_type: "success".to_string(),
|
||||
title: "Deep Scan Completed".to_string(),
|
||||
message: format!(
|
||||
"Deep scan of {} completed successfully. {} files processed in {:.1} minutes.",
|
||||
"Smart deep scan of {} completed successfully. {} files processed, {} directories tracked in {:.1} minutes.",
|
||||
source_name,
|
||||
files_processed,
|
||||
total_directories_tracked,
|
||||
duration.num_seconds() as f64 / 60.0
|
||||
),
|
||||
action_url: Some("/documents".to_string()),
|
||||
|
|
@ -373,48 +394,18 @@ pub async fn trigger_deep_scan(
|
|||
}
|
||||
|
||||
} else {
|
||||
info!("Deep scan found no files for source {}", source_id_clone);
|
||||
info!("Deep scan found no files but tracked {} directories for source {}",
|
||||
total_directories_tracked, source_id_clone);
|
||||
|
||||
// Update source status to idle even if no files found
|
||||
if let Err(e) = state_clone.db.update_source_status(
|
||||
source_id_clone,
|
||||
SourceStatus::Idle,
|
||||
Some("Deep scan completed: no files found".to_string()),
|
||||
Some(format!("Smart deep scan completed: {} directories tracked, no files found", total_directories_tracked)),
|
||||
).await {
|
||||
error!("Failed to update source status after empty deep scan: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Deep scan with guaranteed completeness failed for source {}: {}", source_id_clone, e);
|
||||
|
||||
// Update source status to error
|
||||
if let Err(e2) = state_clone.db.update_source_status(
|
||||
source_id_clone,
|
||||
SourceStatus::Error,
|
||||
Some(format!("Deep scan failed: {}", e)),
|
||||
).await {
|
||||
error!("Failed to update source status after deep scan error: {}", e2);
|
||||
}
|
||||
|
||||
// Send error notification
|
||||
let notification = crate::models::CreateNotification {
|
||||
notification_type: "error".to_string(),
|
||||
title: "Deep Scan Failed".to_string(),
|
||||
message: format!("Deep scan of {} failed: {}", source_name, e),
|
||||
action_url: Some("/sources".to_string()),
|
||||
metadata: Some(serde_json::json!({
|
||||
"source_id": source_id_clone,
|
||||
"scan_type": "deep_scan",
|
||||
"error": e.to_string()
|
||||
})),
|
||||
};
|
||||
|
||||
if let Err(e) = state_clone.db.create_notification(user_id, ¬ification).await {
|
||||
error!("Failed to create deep scan error notification: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
Ok(Json(serde_json::json!({
|
||||
|
|
|
|||
|
|
@ -10,7 +10,7 @@ use crate::{
|
|||
models::{CreateWebDAVFile, UpdateWebDAVSyncState},
|
||||
services::file_service::FileService,
|
||||
ingestion::document_ingestion::{DocumentIngestionService, IngestionResult},
|
||||
services::webdav::{WebDAVConfig, WebDAVService},
|
||||
services::webdav::{WebDAVConfig, WebDAVService, SmartSyncService},
|
||||
};
|
||||
|
||||
pub async fn perform_webdav_sync_with_tracking(
|
||||
|
|
@ -114,19 +114,17 @@ async fn perform_sync_internal(
|
|||
warn!("Failed to update sync folder state: {}", e);
|
||||
}
|
||||
|
||||
// Discover files in the folder
|
||||
match webdav_service.discover_files_in_directory(folder_path, true).await {
|
||||
Ok(files) => {
|
||||
info!("Found {} files in folder {}", files.len(), folder_path);
|
||||
// Use smart sync service for intelligent scanning
|
||||
let smart_sync_service = SmartSyncService::new(state.clone());
|
||||
|
||||
// Filter files for processing
|
||||
let files_to_process: Vec<_> = files.into_iter()
|
||||
match smart_sync_service.evaluate_and_sync(user_id, &webdav_service, folder_path).await {
|
||||
Ok(Some(sync_result)) => {
|
||||
info!("🧠 Smart sync completed for {}: {} files found using {:?}",
|
||||
folder_path, sync_result.files.len(), sync_result.strategy_used);
|
||||
|
||||
// Filter files for processing (directories already handled by smart sync service)
|
||||
let files_to_process: Vec<_> = sync_result.files.into_iter()
|
||||
.filter(|file_info| {
|
||||
// Skip directories
|
||||
if file_info.is_directory {
|
||||
return false;
|
||||
}
|
||||
|
||||
// Check if file extension is supported
|
||||
let file_extension = Path::new(&file_info.name)
|
||||
.extension()
|
||||
|
|
@ -214,9 +212,13 @@ async fn perform_sync_internal(
|
|||
|
||||
total_files_processed += folder_files_processed;
|
||||
}
|
||||
Ok(None) => {
|
||||
info!("✅ Smart sync: No changes detected for {}, skipping folder", folder_path);
|
||||
// No files to process, continue to next folder
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to discover files in folder {}: {}", folder_path, e);
|
||||
sync_errors.push(format!("Failed to list folder {}: {}", folder_path, e));
|
||||
error!("Smart sync failed for folder {}: {}", folder_path, e);
|
||||
sync_errors.push(format!("Smart sync failed for folder {}: {}", folder_path, e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -444,3 +446,4 @@ pub async fn process_files_for_deep_scan(
|
|||
Ok(files_processed)
|
||||
}
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -685,19 +685,41 @@ impl SourceScheduler {
|
|||
}
|
||||
)?;
|
||||
|
||||
// Run deep scan in background
|
||||
// Run smart deep scan in background
|
||||
let source_clone = source.clone();
|
||||
let state_clone = state.clone();
|
||||
tokio::spawn(async move {
|
||||
match webdav_service.discover_all_files().await {
|
||||
Ok(files) => {
|
||||
info!("🎉 Automatic deep scan completed for {}: {} files found", source_clone.name, files.len());
|
||||
// Use smart sync service for automatic deep scans
|
||||
let smart_sync_service = crate::services::webdav::SmartSyncService::new(state_clone.clone());
|
||||
let mut all_files_to_process = Vec::new();
|
||||
let mut total_directories_tracked = 0;
|
||||
|
||||
// Process all watch folders using smart deep scan
|
||||
for watch_folder in &webdav_config.watch_folders {
|
||||
match smart_sync_service.perform_smart_sync(
|
||||
source_clone.user_id,
|
||||
&webdav_service,
|
||||
watch_folder,
|
||||
crate::services::webdav::SmartSyncStrategy::FullDeepScan // Force deep scan for automatic triggers
|
||||
).await {
|
||||
Ok(sync_result) => {
|
||||
all_files_to_process.extend(sync_result.files);
|
||||
total_directories_tracked += sync_result.directories.len();
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Automatic smart deep scan failed for watch folder {}: {}", watch_folder, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
info!("🎉 Automatic smart deep scan completed for {}: {} files found, {} directories tracked",
|
||||
source_clone.name, all_files_to_process.len(), total_directories_tracked);
|
||||
|
||||
// Process the files if any were found
|
||||
let files_processed = if !files.is_empty() {
|
||||
let total_files = files.len();
|
||||
let files_processed = if !all_files_to_process.is_empty() {
|
||||
let total_files = all_files_to_process.len();
|
||||
// Filter and process files as in the manual deep scan
|
||||
let files_to_process: Vec<_> = files.into_iter()
|
||||
let files_to_process: Vec<_> = all_files_to_process.into_iter()
|
||||
.filter(|file_info| {
|
||||
if file_info.is_directory {
|
||||
return false;
|
||||
|
|
@ -747,30 +769,6 @@ impl SourceScheduler {
|
|||
if let Err(e) = state_clone.db.create_notification(source_clone.user_id, ¬ification).await {
|
||||
error!("Failed to create success notification: {}", e);
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Automatic deep scan failed for {}: {}", source_clone.name, e);
|
||||
|
||||
// Error notification
|
||||
let notification = crate::models::CreateNotification {
|
||||
notification_type: "error".to_string(),
|
||||
title: "Automatic Deep Scan Failed".to_string(),
|
||||
message: format!("Deep scan of {} failed: {}", source_clone.name, e),
|
||||
action_url: Some("/sources".to_string()),
|
||||
metadata: Some(serde_json::json!({
|
||||
"source_type": source_clone.source_type.to_string(),
|
||||
"source_id": source_clone.id,
|
||||
"scan_type": "deep_scan",
|
||||
"automatic": true,
|
||||
"error": e.to_string()
|
||||
})),
|
||||
};
|
||||
|
||||
if let Err(e) = state_clone.db.create_notification(source_clone.user_id, ¬ification).await {
|
||||
error!("Failed to create error notification: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -125,20 +125,28 @@ impl SourceSyncService {
|
|||
|folder_path| {
|
||||
let service = webdav_service.clone();
|
||||
let state_clone = self.state.clone();
|
||||
let user_id = source.user_id; // Capture user_id from source
|
||||
async move {
|
||||
info!("🚀 Using WebDAV discovery for: {}", folder_path);
|
||||
let result = service.discover_files_in_directory(&folder_path, true).await;
|
||||
match &result {
|
||||
Ok(files) => {
|
||||
if files.is_empty() {
|
||||
info!("✅ Directory {} unchanged, skipped deep scan", folder_path);
|
||||
} else {
|
||||
info!("🔄 Directory {} changed, discovered {} files", folder_path, files.len());
|
||||
}
|
||||
info!("🧠 Using smart sync for scheduled sync: {}", folder_path);
|
||||
|
||||
// Use smart sync service for intelligent discovery
|
||||
let smart_sync_service = crate::services::webdav::SmartSyncService::new(state_clone);
|
||||
|
||||
match smart_sync_service.evaluate_and_sync(user_id, &service, &folder_path).await {
|
||||
Ok(Some(sync_result)) => {
|
||||
info!("✅ Smart sync completed for {}: {} files found using {:?}",
|
||||
folder_path, sync_result.files.len(), sync_result.strategy_used);
|
||||
Ok(sync_result.files)
|
||||
},
|
||||
Err(e) => error!("WebDAV discovery failed for folder {}: {}", folder_path, e),
|
||||
Ok(None) => {
|
||||
info!("🔍 Smart sync: No changes detected for {}, skipping", folder_path);
|
||||
Ok(Vec::new()) // No files to process
|
||||
},
|
||||
Err(e) => {
|
||||
error!("Smart sync failed for scheduled sync {}: {}", folder_path, e);
|
||||
Err(e)
|
||||
}
|
||||
}
|
||||
result
|
||||
}
|
||||
},
|
||||
|file_path| {
|
||||
|
|
|
|||
|
|
@ -10,6 +10,13 @@ use crate::webdav_xml_parser::{parse_propfind_response, parse_propfind_response_
|
|||
use super::config::{WebDAVConfig, ConcurrencyConfig};
|
||||
use super::connection::WebDAVConnection;
|
||||
|
||||
/// Results from WebDAV discovery including both files and directories
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct WebDAVDiscoveryResult {
|
||||
pub files: Vec<FileIngestionInfo>,
|
||||
pub directories: Vec<FileIngestionInfo>,
|
||||
}
|
||||
|
||||
pub struct WebDAVDiscovery {
|
||||
connection: WebDAVConnection,
|
||||
config: WebDAVConfig,
|
||||
|
|
@ -40,6 +47,17 @@ impl WebDAVDiscovery {
|
|||
}
|
||||
}
|
||||
|
||||
/// Discovers both files and directories with their ETags for directory tracking
|
||||
pub async fn discover_files_and_directories(&self, directory_path: &str, recursive: bool) -> Result<WebDAVDiscoveryResult> {
|
||||
info!("🔍 Discovering files and directories in: {}", directory_path);
|
||||
|
||||
if recursive {
|
||||
self.discover_files_and_directories_recursive(directory_path).await
|
||||
} else {
|
||||
self.discover_files_and_directories_single(directory_path).await
|
||||
}
|
||||
}
|
||||
|
||||
/// Discovers files in a single directory (non-recursive)
|
||||
async fn discover_files_single_directory(&self, directory_path: &str) -> Result<Vec<FileIngestionInfo>> {
|
||||
let url = self.connection.get_url_for_path(directory_path);
|
||||
|
|
@ -83,6 +101,55 @@ impl WebDAVDiscovery {
|
|||
Ok(filtered_files)
|
||||
}
|
||||
|
||||
/// Discovers both files and directories in a single directory (non-recursive)
|
||||
async fn discover_files_and_directories_single(&self, directory_path: &str) -> Result<WebDAVDiscoveryResult> {
|
||||
let url = self.connection.get_url_for_path(directory_path);
|
||||
|
||||
let propfind_body = r#"<?xml version="1.0" encoding="utf-8"?>
|
||||
<D:propfind xmlns:D="DAV:">
|
||||
<D:prop>
|
||||
<D:displayname/>
|
||||
<D:getcontentlength/>
|
||||
<D:getlastmodified/>
|
||||
<D:getetag/>
|
||||
<D:resourcetype/>
|
||||
<D:creationdate/>
|
||||
</D:prop>
|
||||
</D:propfind>"#;
|
||||
|
||||
let response = self.connection
|
||||
.authenticated_request(
|
||||
Method::from_bytes(b"PROPFIND")?,
|
||||
&url,
|
||||
Some(propfind_body.to_string()),
|
||||
Some(vec![
|
||||
("Depth", "1"),
|
||||
("Content-Type", "application/xml"),
|
||||
]),
|
||||
)
|
||||
.await?;
|
||||
|
||||
let body = response.text().await?;
|
||||
let all_items = parse_propfind_response_with_directories(&body)?;
|
||||
|
||||
// Separate files and directories
|
||||
let mut files = Vec::new();
|
||||
let mut directories = Vec::new();
|
||||
|
||||
for item in all_items {
|
||||
if item.is_directory {
|
||||
directories.push(item);
|
||||
} else if self.config.is_supported_extension(&item.name) {
|
||||
files.push(item);
|
||||
}
|
||||
}
|
||||
|
||||
debug!("Single directory '{}': {} files, {} directories",
|
||||
directory_path, files.len(), directories.len());
|
||||
|
||||
Ok(WebDAVDiscoveryResult { files, directories })
|
||||
}
|
||||
|
||||
/// Discovers files recursively in directory tree
|
||||
async fn discover_files_recursive(&self, root_directory: &str) -> Result<Vec<FileIngestionInfo>> {
|
||||
let mut all_files = Vec::new();
|
||||
|
|
@ -125,6 +192,54 @@ impl WebDAVDiscovery {
|
|||
Ok(all_files)
|
||||
}
|
||||
|
||||
/// Discovers both files and directories recursively in directory tree
|
||||
async fn discover_files_and_directories_recursive(&self, root_directory: &str) -> Result<WebDAVDiscoveryResult> {
|
||||
let mut all_files = Vec::new();
|
||||
let mut all_directories = Vec::new();
|
||||
let mut directories_to_scan = vec![root_directory.to_string()];
|
||||
let semaphore = Semaphore::new(self.concurrency_config.max_concurrent_scans);
|
||||
|
||||
while !directories_to_scan.is_empty() {
|
||||
let current_batch: Vec<String> = directories_to_scan
|
||||
.drain(..)
|
||||
.take(self.concurrency_config.max_concurrent_scans)
|
||||
.collect();
|
||||
|
||||
let tasks = current_batch.into_iter().map(|dir| {
|
||||
let semaphore = &semaphore;
|
||||
async move {
|
||||
let _permit = semaphore.acquire().await.unwrap();
|
||||
self.scan_directory_with_all_info(&dir).await
|
||||
}
|
||||
});
|
||||
|
||||
let results = stream::iter(tasks)
|
||||
.buffer_unordered(self.concurrency_config.max_concurrent_scans)
|
||||
.collect::<Vec<_>>()
|
||||
.await;
|
||||
|
||||
for result in results {
|
||||
match result {
|
||||
Ok((files, directories, subdirs_to_scan)) => {
|
||||
all_files.extend(files);
|
||||
all_directories.extend(directories);
|
||||
directories_to_scan.extend(subdirs_to_scan);
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("Failed to scan directory: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
info!("Recursive discovery found {} total files and {} directories",
|
||||
all_files.len(), all_directories.len());
|
||||
Ok(WebDAVDiscoveryResult {
|
||||
files: all_files,
|
||||
directories: all_directories
|
||||
})
|
||||
}
|
||||
|
||||
/// Scans a directory and returns both files and subdirectories
|
||||
async fn scan_directory_with_subdirs(&self, directory_path: &str) -> Result<(Vec<FileIngestionInfo>, Vec<String>)> {
|
||||
let url = self.connection.get_url_for_path(directory_path);
|
||||
|
|
@ -182,6 +297,69 @@ impl WebDAVDiscovery {
|
|||
Ok((filtered_files, full_dir_paths))
|
||||
}
|
||||
|
||||
/// Scans a directory and returns files, directories, and subdirectory paths for queue
|
||||
async fn scan_directory_with_all_info(&self, directory_path: &str) -> Result<(Vec<FileIngestionInfo>, Vec<FileIngestionInfo>, Vec<String>)> {
|
||||
let url = self.connection.get_url_for_path(directory_path);
|
||||
|
||||
let propfind_body = r#"<?xml version="1.0" encoding="utf-8"?>
|
||||
<D:propfind xmlns:D="DAV:">
|
||||
<D:prop>
|
||||
<D:displayname/>
|
||||
<D:getcontentlength/>
|
||||
<D:getlastmodified/>
|
||||
<D:getetag/>
|
||||
<D:resourcetype/>
|
||||
<D:creationdate/>
|
||||
</D:prop>
|
||||
</D:propfind>"#;
|
||||
|
||||
let response = self.connection
|
||||
.authenticated_request(
|
||||
Method::from_bytes(b"PROPFIND")?,
|
||||
&url,
|
||||
Some(propfind_body.to_string()),
|
||||
Some(vec![
|
||||
("Depth", "1"),
|
||||
("Content-Type", "application/xml"),
|
||||
]),
|
||||
)
|
||||
.await?;
|
||||
|
||||
let body = response.text().await?;
|
||||
let all_items = parse_propfind_response_with_directories(&body)?;
|
||||
|
||||
// Separate files and directories
|
||||
let mut filtered_files = Vec::new();
|
||||
let mut directories = Vec::new();
|
||||
let mut subdirectory_paths = Vec::new();
|
||||
|
||||
for item in all_items {
|
||||
if item.is_directory {
|
||||
// Fix the directory path to be absolute
|
||||
let full_path = if directory_path == "/" {
|
||||
format!("/{}", item.path.trim_start_matches('/'))
|
||||
} else {
|
||||
format!("{}/{}", directory_path.trim_end_matches('/'), item.path.trim_start_matches('/'))
|
||||
};
|
||||
|
||||
// Create a directory info with the corrected path
|
||||
let mut directory_info = item.clone();
|
||||
directory_info.path = full_path.clone();
|
||||
directories.push(directory_info);
|
||||
|
||||
// Add to paths for further scanning
|
||||
subdirectory_paths.push(full_path);
|
||||
} else if self.config.is_supported_extension(&item.name) {
|
||||
filtered_files.push(item);
|
||||
}
|
||||
}
|
||||
|
||||
debug!("Directory '{}': {} files, {} directories, {} paths to scan",
|
||||
directory_path, filtered_files.len(), directories.len(), subdirectory_paths.len());
|
||||
|
||||
Ok((filtered_files, directories, subdirectory_paths))
|
||||
}
|
||||
|
||||
/// Estimates crawl time and file counts for watch folders
|
||||
pub async fn estimate_crawl(&self) -> Result<WebDAVCrawlEstimate> {
|
||||
info!("📊 Estimating crawl for WebDAV watch folders");
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ pub mod connection;
|
|||
pub mod discovery;
|
||||
pub mod validation;
|
||||
pub mod service;
|
||||
pub mod smart_sync;
|
||||
|
||||
// Re-export main types for convenience
|
||||
pub use config::{WebDAVConfig, RetryConfig, ConcurrencyConfig};
|
||||
|
|
@ -15,6 +16,7 @@ pub use validation::{
|
|||
ValidationSeverity, ValidationRecommendation, ValidationAction, ValidationSummary
|
||||
};
|
||||
pub use service::{WebDAVService, ServerCapabilities, HealthStatus, test_webdav_connection};
|
||||
pub use smart_sync::{SmartSyncService, SmartSyncDecision, SmartSyncStrategy, SmartSyncResult};
|
||||
|
||||
// Test modules
|
||||
#[cfg(test)]
|
||||
|
|
|
|||
|
|
@ -9,7 +9,7 @@ use crate::models::{
|
|||
|
||||
use super::config::{WebDAVConfig, RetryConfig, ConcurrencyConfig};
|
||||
use super::connection::WebDAVConnection;
|
||||
use super::discovery::WebDAVDiscovery;
|
||||
use super::discovery::{WebDAVDiscovery, WebDAVDiscoveryResult};
|
||||
use super::validation::{WebDAVValidator, ValidationReport};
|
||||
|
||||
/// Main WebDAV service that coordinates all WebDAV operations
|
||||
|
|
@ -150,6 +150,12 @@ impl WebDAVService {
|
|||
self.discovery.discover_files(directory_path, recursive).await
|
||||
}
|
||||
|
||||
/// Discovers both files and directories with their ETags for smart sync
|
||||
pub async fn discover_files_and_directories(&self, directory_path: &str, recursive: bool) -> Result<WebDAVDiscoveryResult> {
|
||||
info!("🔍 Discovering files and directories: {} (recursive: {})", directory_path, recursive);
|
||||
self.discovery.discover_files_and_directories(directory_path, recursive).await
|
||||
}
|
||||
|
||||
/// Downloads a file from WebDAV server by path
|
||||
pub async fn download_file(&self, file_path: &str) -> Result<Vec<u8>> {
|
||||
let _permit = self.download_semaphore.acquire().await?;
|
||||
|
|
|
|||
|
|
@ -0,0 +1,268 @@
|
|||
use std::sync::Arc;
|
||||
use std::collections::HashMap;
|
||||
use anyhow::Result;
|
||||
use tracing::{debug, info, warn};
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::{AppState, models::{CreateWebDAVDirectory, FileIngestionInfo}};
|
||||
use super::WebDAVService;
|
||||
|
||||
/// Smart sync service that provides intelligent WebDAV synchronization
|
||||
/// by comparing directory ETags to avoid unnecessary scans
|
||||
pub struct SmartSyncService {
|
||||
state: Arc<AppState>,
|
||||
}
|
||||
|
||||
/// Result of smart sync evaluation
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum SmartSyncDecision {
|
||||
/// No changes detected, sync can be skipped entirely
|
||||
SkipSync,
|
||||
/// Smart sync detected changes, need to perform discovery
|
||||
RequiresSync(SmartSyncStrategy),
|
||||
}
|
||||
|
||||
/// Strategy for performing sync after smart evaluation
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum SmartSyncStrategy {
|
||||
/// Full deep scan needed (first time, too many changes, or fallback)
|
||||
FullDeepScan,
|
||||
/// Targeted scan of specific changed directories
|
||||
TargetedScan(Vec<String>),
|
||||
}
|
||||
|
||||
/// Complete result from smart sync operation
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct SmartSyncResult {
|
||||
pub files: Vec<FileIngestionInfo>,
|
||||
pub directories: Vec<FileIngestionInfo>,
|
||||
pub strategy_used: SmartSyncStrategy,
|
||||
pub directories_scanned: usize,
|
||||
pub directories_skipped: usize,
|
||||
}
|
||||
|
||||
impl SmartSyncService {
|
||||
pub fn new(state: Arc<AppState>) -> Self {
|
||||
Self { state }
|
||||
}
|
||||
|
||||
/// Evaluates whether sync is needed and determines the best strategy
|
||||
pub async fn evaluate_sync_need(
|
||||
&self,
|
||||
user_id: Uuid,
|
||||
webdav_service: &WebDAVService,
|
||||
folder_path: &str,
|
||||
) -> Result<SmartSyncDecision> {
|
||||
info!("🧠 Evaluating smart sync for folder: {}", folder_path);
|
||||
|
||||
// Get all known directory ETags from database in bulk
|
||||
let known_directories = self.state.db.list_webdav_directories(user_id).await
|
||||
.map_err(|e| anyhow::anyhow!("Failed to fetch known directories: {}", e))?;
|
||||
|
||||
// Filter to only directories under the current folder path
|
||||
let relevant_dirs: HashMap<String, String> = known_directories
|
||||
.into_iter()
|
||||
.filter(|dir| dir.directory_path.starts_with(folder_path))
|
||||
.map(|dir| (dir.directory_path, dir.directory_etag))
|
||||
.collect();
|
||||
|
||||
if relevant_dirs.is_empty() {
|
||||
info!("No known directories for {}, requires full deep scan", folder_path);
|
||||
return Ok(SmartSyncDecision::RequiresSync(SmartSyncStrategy::FullDeepScan));
|
||||
}
|
||||
|
||||
info!("Found {} known directories for smart sync comparison", relevant_dirs.len());
|
||||
|
||||
// Do a shallow discovery of the root folder to check immediate changes
|
||||
match webdav_service.discover_files_and_directories(folder_path, false).await {
|
||||
Ok(root_discovery) => {
|
||||
let mut changed_directories = Vec::new();
|
||||
let mut new_directories = Vec::new();
|
||||
|
||||
// Check if any immediate subdirectories have changed ETags
|
||||
for directory in &root_discovery.directories {
|
||||
match relevant_dirs.get(&directory.path) {
|
||||
Some(known_etag) => {
|
||||
if known_etag != &directory.etag {
|
||||
info!("Directory changed: {} (old: {}, new: {})",
|
||||
directory.path, known_etag, directory.etag);
|
||||
changed_directories.push(directory.path.clone());
|
||||
}
|
||||
}
|
||||
None => {
|
||||
info!("New directory discovered: {}", directory.path);
|
||||
new_directories.push(directory.path.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If no changes detected in immediate subdirectories, we can skip
|
||||
if changed_directories.is_empty() && new_directories.is_empty() {
|
||||
info!("✅ Smart sync: No directory changes detected, sync can be skipped");
|
||||
return Ok(SmartSyncDecision::SkipSync);
|
||||
}
|
||||
|
||||
// Determine strategy based on scope of changes
|
||||
let total_changes = changed_directories.len() + new_directories.len();
|
||||
let total_known = relevant_dirs.len();
|
||||
let change_ratio = total_changes as f64 / total_known.max(1) as f64;
|
||||
|
||||
if change_ratio > 0.3 || new_directories.len() > 5 {
|
||||
// Too many changes, do full deep scan for efficiency
|
||||
info!("📁 Smart sync: Large changes detected ({} changed, {} new), using full deep scan",
|
||||
changed_directories.len(), new_directories.len());
|
||||
return Ok(SmartSyncDecision::RequiresSync(SmartSyncStrategy::FullDeepScan));
|
||||
} else {
|
||||
// Targeted scan of changed directories
|
||||
let mut targets = changed_directories;
|
||||
targets.extend(new_directories);
|
||||
info!("🎯 Smart sync: Targeted changes detected, scanning {} directories", targets.len());
|
||||
return Ok(SmartSyncDecision::RequiresSync(SmartSyncStrategy::TargetedScan(targets)));
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("Smart sync evaluation failed, falling back to deep scan: {}", e);
|
||||
return Ok(SmartSyncDecision::RequiresSync(SmartSyncStrategy::FullDeepScan));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Performs smart sync based on the strategy determined by evaluation
|
||||
pub async fn perform_smart_sync(
|
||||
&self,
|
||||
user_id: Uuid,
|
||||
webdav_service: &WebDAVService,
|
||||
folder_path: &str,
|
||||
strategy: SmartSyncStrategy,
|
||||
) -> Result<SmartSyncResult> {
|
||||
match strategy {
|
||||
SmartSyncStrategy::FullDeepScan => {
|
||||
info!("🔍 Performing full deep scan for: {}", folder_path);
|
||||
self.perform_full_deep_scan(user_id, webdav_service, folder_path).await
|
||||
}
|
||||
SmartSyncStrategy::TargetedScan(target_dirs) => {
|
||||
info!("🎯 Performing targeted scan of {} directories", target_dirs.len());
|
||||
self.perform_targeted_scan(user_id, webdav_service, target_dirs).await
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Combined evaluation and execution for convenience
|
||||
pub async fn evaluate_and_sync(
|
||||
&self,
|
||||
user_id: Uuid,
|
||||
webdav_service: &WebDAVService,
|
||||
folder_path: &str,
|
||||
) -> Result<Option<SmartSyncResult>> {
|
||||
match self.evaluate_sync_need(user_id, webdav_service, folder_path).await? {
|
||||
SmartSyncDecision::SkipSync => {
|
||||
info!("✅ Smart sync: Skipping sync for {} - no changes detected", folder_path);
|
||||
Ok(None)
|
||||
}
|
||||
SmartSyncDecision::RequiresSync(strategy) => {
|
||||
let result = self.perform_smart_sync(user_id, webdav_service, folder_path, strategy).await?;
|
||||
Ok(Some(result))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Performs a full deep scan and saves all directory ETags
|
||||
async fn perform_full_deep_scan(
|
||||
&self,
|
||||
user_id: Uuid,
|
||||
webdav_service: &WebDAVService,
|
||||
folder_path: &str,
|
||||
) -> Result<SmartSyncResult> {
|
||||
let discovery_result = webdav_service.discover_files_and_directories(folder_path, true).await?;
|
||||
|
||||
info!("Deep scan found {} files and {} directories in folder {}",
|
||||
discovery_result.files.len(), discovery_result.directories.len(), folder_path);
|
||||
|
||||
// Save all discovered directories to database for ETag tracking
|
||||
let mut directories_saved = 0;
|
||||
for directory_info in &discovery_result.directories {
|
||||
let webdav_directory = CreateWebDAVDirectory {
|
||||
user_id,
|
||||
directory_path: directory_info.path.clone(),
|
||||
directory_etag: directory_info.etag.clone(),
|
||||
file_count: 0, // Will be updated by stats
|
||||
total_size_bytes: 0, // Will be updated by stats
|
||||
};
|
||||
|
||||
match self.state.db.create_or_update_webdav_directory(&webdav_directory).await {
|
||||
Ok(_) => {
|
||||
debug!("Saved directory ETag: {} -> {}", directory_info.path, directory_info.etag);
|
||||
directories_saved += 1;
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("Failed to save directory ETag for {}: {}", directory_info.path, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
info!("Saved ETags for {}/{} directories", directories_saved, discovery_result.directories.len());
|
||||
|
||||
Ok(SmartSyncResult {
|
||||
files: discovery_result.files,
|
||||
directories: discovery_result.directories.clone(),
|
||||
strategy_used: SmartSyncStrategy::FullDeepScan,
|
||||
directories_scanned: discovery_result.directories.len(),
|
||||
directories_skipped: 0,
|
||||
})
|
||||
}
|
||||
|
||||
/// Performs targeted scans of specific directories
|
||||
async fn perform_targeted_scan(
|
||||
&self,
|
||||
user_id: Uuid,
|
||||
webdav_service: &WebDAVService,
|
||||
target_directories: Vec<String>,
|
||||
) -> Result<SmartSyncResult> {
|
||||
let mut all_files = Vec::new();
|
||||
let mut all_directories = Vec::new();
|
||||
let mut directories_scanned = 0;
|
||||
|
||||
// Scan each target directory recursively
|
||||
for target_dir in &target_directories {
|
||||
match webdav_service.discover_files_and_directories(target_dir, true).await {
|
||||
Ok(discovery_result) => {
|
||||
all_files.extend(discovery_result.files);
|
||||
|
||||
// Save directory ETags for this scan
|
||||
for directory_info in &discovery_result.directories {
|
||||
let webdav_directory = CreateWebDAVDirectory {
|
||||
user_id,
|
||||
directory_path: directory_info.path.clone(),
|
||||
directory_etag: directory_info.etag.clone(),
|
||||
file_count: 0,
|
||||
total_size_bytes: 0,
|
||||
};
|
||||
|
||||
if let Err(e) = self.state.db.create_or_update_webdav_directory(&webdav_directory).await {
|
||||
warn!("Failed to save directory ETag for {}: {}", directory_info.path, e);
|
||||
} else {
|
||||
debug!("Updated directory ETag: {} -> {}", directory_info.path, directory_info.etag);
|
||||
}
|
||||
}
|
||||
|
||||
all_directories.extend(discovery_result.directories);
|
||||
directories_scanned += 1;
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("Failed to scan target directory {}: {}", target_dir, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
info!("Targeted scan completed: {} directories scanned, {} files found",
|
||||
directories_scanned, all_files.len());
|
||||
|
||||
Ok(SmartSyncResult {
|
||||
files: all_files,
|
||||
directories: all_directories,
|
||||
strategy_used: SmartSyncStrategy::TargetedScan(target_directories),
|
||||
directories_scanned,
|
||||
directories_skipped: 0, // TODO: Could track this if needed
|
||||
})
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue