diff --git a/src/db.rs b/src/db.rs index cd5b6b5..eca92a5 100644 --- a/src/db.rs +++ b/src/db.rs @@ -1590,4 +1590,173 @@ impl Database { recent_notifications, }) } + + // WebDAV sync state operations + pub async fn get_webdav_sync_state(&self, user_id: Uuid) -> Result> { + let row = sqlx::query( + r#"SELECT id, user_id, last_sync_at, sync_cursor, is_running, files_processed, + files_remaining, current_folder, errors, created_at, updated_at + FROM webdav_sync_state WHERE user_id = $1"# + ) + .bind(user_id) + .fetch_optional(&self.pool) + .await?; + + match row { + Some(row) => Ok(Some(crate::models::WebDAVSyncState { + id: row.get("id"), + user_id: row.get("user_id"), + last_sync_at: row.get("last_sync_at"), + sync_cursor: row.get("sync_cursor"), + is_running: row.get("is_running"), + files_processed: row.get("files_processed"), + files_remaining: row.get("files_remaining"), + current_folder: row.get("current_folder"), + errors: row.get("errors"), + created_at: row.get("created_at"), + updated_at: row.get("updated_at"), + })), + None => Ok(None), + } + } + + pub async fn update_webdav_sync_state(&self, user_id: Uuid, state: &crate::models::UpdateWebDAVSyncState) -> Result<()> { + sqlx::query( + r#"INSERT INTO webdav_sync_state (user_id, last_sync_at, sync_cursor, is_running, + files_processed, files_remaining, current_folder, errors, updated_at) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, NOW()) + ON CONFLICT (user_id) DO UPDATE SET + last_sync_at = EXCLUDED.last_sync_at, + sync_cursor = EXCLUDED.sync_cursor, + is_running = EXCLUDED.is_running, + files_processed = EXCLUDED.files_processed, + files_remaining = EXCLUDED.files_remaining, + current_folder = EXCLUDED.current_folder, + errors = EXCLUDED.errors, + updated_at = NOW()"# + ) + .bind(user_id) + .bind(state.last_sync_at) + .bind(&state.sync_cursor) + .bind(state.is_running) + .bind(state.files_processed) + .bind(state.files_remaining) + .bind(&state.current_folder) + .bind(&state.errors) + .execute(&self.pool) + .await?; + + Ok(()) + } + + // WebDAV file tracking operations + pub async fn get_webdav_file_by_path(&self, user_id: Uuid, webdav_path: &str) -> Result> { + let row = sqlx::query( + r#"SELECT id, user_id, webdav_path, etag, last_modified, file_size, + mime_type, document_id, sync_status, sync_error, created_at, updated_at + FROM webdav_files WHERE user_id = $1 AND webdav_path = $2"# + ) + .bind(user_id) + .bind(webdav_path) + .fetch_optional(&self.pool) + .await?; + + match row { + Some(row) => Ok(Some(crate::models::WebDAVFile { + id: row.get("id"), + user_id: row.get("user_id"), + webdav_path: row.get("webdav_path"), + etag: row.get("etag"), + last_modified: row.get("last_modified"), + file_size: row.get("file_size"), + mime_type: row.get("mime_type"), + document_id: row.get("document_id"), + sync_status: row.get("sync_status"), + sync_error: row.get("sync_error"), + created_at: row.get("created_at"), + updated_at: row.get("updated_at"), + })), + None => Ok(None), + } + } + + pub async fn create_or_update_webdav_file(&self, file: &crate::models::CreateWebDAVFile) -> Result { + let row = sqlx::query( + r#"INSERT INTO webdav_files (user_id, webdav_path, etag, last_modified, file_size, + mime_type, document_id, sync_status, sync_error) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) + ON CONFLICT (user_id, webdav_path) DO UPDATE SET + etag = EXCLUDED.etag, + last_modified = EXCLUDED.last_modified, + file_size = EXCLUDED.file_size, + mime_type = EXCLUDED.mime_type, + document_id = EXCLUDED.document_id, + sync_status = EXCLUDED.sync_status, + sync_error = EXCLUDED.sync_error, + updated_at = NOW() + RETURNING id, user_id, webdav_path, etag, last_modified, file_size, + mime_type, document_id, sync_status, sync_error, created_at, updated_at"# + ) + .bind(file.user_id) + .bind(&file.webdav_path) + .bind(&file.etag) + .bind(file.last_modified) + .bind(file.file_size) + .bind(&file.mime_type) + .bind(file.document_id) + .bind(&file.sync_status) + .bind(&file.sync_error) + .fetch_one(&self.pool) + .await?; + + Ok(crate::models::WebDAVFile { + id: row.get("id"), + user_id: row.get("user_id"), + webdav_path: row.get("webdav_path"), + etag: row.get("etag"), + last_modified: row.get("last_modified"), + file_size: row.get("file_size"), + mime_type: row.get("mime_type"), + document_id: row.get("document_id"), + sync_status: row.get("sync_status"), + sync_error: row.get("sync_error"), + created_at: row.get("created_at"), + updated_at: row.get("updated_at"), + }) + } + + pub async fn get_pending_webdav_files(&self, user_id: Uuid, limit: i64) -> Result> { + let rows = sqlx::query( + r#"SELECT id, user_id, webdav_path, etag, last_modified, file_size, + mime_type, document_id, sync_status, sync_error, created_at, updated_at + FROM webdav_files + WHERE user_id = $1 AND sync_status = 'pending' + ORDER BY created_at ASC + LIMIT $2"# + ) + .bind(user_id) + .bind(limit) + .fetch_all(&self.pool) + .await?; + + let mut files = Vec::new(); + for row in rows { + files.push(crate::models::WebDAVFile { + id: row.get("id"), + user_id: row.get("user_id"), + webdav_path: row.get("webdav_path"), + etag: row.get("etag"), + last_modified: row.get("last_modified"), + file_size: row.get("file_size"), + mime_type: row.get("mime_type"), + document_id: row.get("document_id"), + sync_status: row.get("sync_status"), + sync_error: row.get("sync_error"), + created_at: row.get("created_at"), + updated_at: row.get("updated_at"), + }); + } + + Ok(files) + } } \ No newline at end of file diff --git a/src/models.rs b/src/models.rs index 722adc4..13c6e85 100644 --- a/src/models.rs +++ b/src/models.rs @@ -554,4 +554,59 @@ pub struct CreateNotification { pub struct NotificationSummary { pub unread_count: i64, pub recent_notifications: Vec, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct WebDAVSyncState { + pub id: Uuid, + pub user_id: Uuid, + pub last_sync_at: Option>, + pub sync_cursor: Option, + pub is_running: bool, + pub files_processed: i64, + pub files_remaining: i64, + pub current_folder: Option, + pub errors: Vec, + pub created_at: DateTime, + pub updated_at: DateTime, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct UpdateWebDAVSyncState { + pub last_sync_at: Option>, + pub sync_cursor: Option, + pub is_running: bool, + pub files_processed: i64, + pub files_remaining: i64, + pub current_folder: Option, + pub errors: Vec, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct WebDAVFile { + pub id: Uuid, + pub user_id: Uuid, + pub webdav_path: String, + pub etag: String, + pub last_modified: Option>, + pub file_size: i64, + pub mime_type: String, + pub document_id: Option, + pub sync_status: String, + pub sync_error: Option, + pub created_at: DateTime, + pub updated_at: DateTime, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct CreateWebDAVFile { + pub user_id: Uuid, + pub webdav_path: String, + pub etag: String, + pub last_modified: Option>, + pub file_size: i64, + pub mime_type: String, + pub document_id: Option, + pub sync_status: String, + pub sync_error: Option, } \ No newline at end of file diff --git a/src/routes/webdav.rs b/src/routes/webdav.rs index e42044c..9941441 100644 --- a/src/routes/webdav.rs +++ b/src/routes/webdav.rs @@ -1,5 +1,4 @@ use std::sync::Arc; -use std::path::Path; use axum::{ extract::State, @@ -16,13 +15,14 @@ use crate::{ WebDAVConnectionResult, WebDAVCrawlEstimate, WebDAVSyncStatus, WebDAVTestConnection, }, - ocr_queue::OcrQueueService, - file_service::FileService, AppState, }; use crate::webdav_service::WebDAVConfig; use crate::webdav_service::WebDAVService; +pub mod webdav_sync; +use webdav_sync::perform_webdav_sync_with_tracking; + pub fn router() -> Router> { Router::new() .route("/test-connection", post(test_webdav_connection)) @@ -246,18 +246,41 @@ async fn get_webdav_sync_status( } }; - // For now, return basic status - in production you'd query the webdav_sync_state table - // TODO: Read actual sync state from database - let status = WebDAVSyncStatus { - is_running: false, - last_sync: None, - files_processed: 0, - files_remaining: 0, - current_folder: None, - errors: Vec::new(), - }; - - Ok(Json(status)) + // Get sync state from database + match state.db.get_webdav_sync_state(auth_user.user.id).await { + Ok(Some(sync_state)) => { + Ok(Json(WebDAVSyncStatus { + is_running: sync_state.is_running, + last_sync: sync_state.last_sync_at, + files_processed: sync_state.files_processed, + files_remaining: sync_state.files_remaining, + current_folder: sync_state.current_folder, + errors: sync_state.errors, + })) + } + Ok(None) => { + // No sync state yet + Ok(Json(WebDAVSyncStatus { + is_running: false, + last_sync: None, + files_processed: 0, + files_remaining: 0, + current_folder: None, + errors: Vec::new(), + })) + } + Err(e) => { + error!("Failed to get WebDAV sync state: {}", e); + Ok(Json(WebDAVSyncStatus { + is_running: false, + last_sync: None, + files_processed: 0, + files_remaining: 0, + current_folder: None, + errors: vec![format!("Error retrieving sync state: {}", e)], + })) + } + } } #[utoipa::path( @@ -303,7 +326,7 @@ async fn start_webdav_sync( let enable_background_ocr = user_settings.enable_background_ocr; tokio::spawn(async move { - match perform_webdav_sync(state_clone.clone(), user_id, webdav_service, webdav_config, enable_background_ocr).await { + match perform_webdav_sync_with_tracking(state_clone.clone(), user_id, webdav_service, webdav_config, enable_background_ocr).await { Ok(files_processed) => { info!("WebDAV sync completed successfully for user {}: {} files processed", user_id, files_processed); @@ -355,123 +378,3 @@ async fn start_webdav_sync( }))) } -async fn perform_webdav_sync( - state: Arc, - user_id: uuid::Uuid, - webdav_service: WebDAVService, - config: WebDAVConfig, - enable_background_ocr: bool, -) -> Result> { - info!("Performing WebDAV sync for user {} on {} folders", user_id, config.watch_folders.len()); - - let mut files_processed = 0; - - // Process each watch folder - for folder_path in &config.watch_folders { - info!("Syncing folder: {}", folder_path); - - // Discover files in the folder - match webdav_service.discover_files_in_folder(folder_path).await { - Ok(files) => { - info!("Found {} files in folder {}", files.len(), folder_path); - - for file_info in files { - if file_info.is_directory { - continue; // Skip directories - } - - // Check if file extension is supported - let file_extension = Path::new(&file_info.name) - .extension() - .and_then(|ext| ext.to_str()) - .unwrap_or("") - .to_lowercase(); - - if !config.file_extensions.contains(&file_extension) { - continue; // Skip unsupported file types - } - - // Check if we've already processed this file - // TODO: Check webdav_files table for existing files with same etag - - // Download the file - match webdav_service.download_file(&file_info.path).await { - Ok(file_data) => { - info!("Downloaded file: {} ({} bytes)", file_info.name, file_data.len()); - - // Create file service and save file to disk first - let file_service = FileService::new(state.config.upload_path.clone()); - - let saved_file_path = match file_service.save_file(&file_info.name, &file_data).await { - Ok(path) => path, - Err(e) => { - error!("Failed to save file {}: {}", file_info.name, e); - continue; - } - }; - - // Create document record - let document = file_service.create_document( - &file_info.name, - &file_info.name, // original filename same as name - &saved_file_path, - file_info.size, - &file_info.mime_type, - user_id, - ); - - // Save document to database - match state.db.create_document(document).await { - Ok(saved_document) => { - info!("Created document record: {} (ID: {})", file_info.name, saved_document.id); - - // Add to OCR queue if enabled - if enable_background_ocr { - match sqlx::PgPool::connect(&state.config.database_url).await { - Ok(pool) => { - let queue_service = OcrQueueService::new(state.db.clone(), pool, 1); - - // Calculate priority based on file size - let priority = match file_info.size { - 0..=1048576 => 10, // <= 1MB: highest priority - ..=5242880 => 8, // 1-5MB: high priority - ..=10485760 => 6, // 5-10MB: medium priority - ..=52428800 => 4, // 10-50MB: low priority - _ => 2, // > 50MB: lowest priority - }; - - if let Err(e) = queue_service.enqueue_document(saved_document.id, priority, file_info.size).await { - error!("Failed to enqueue document for OCR: {}", e); - } else { - info!("Enqueued document {} for OCR processing", saved_document.id); - } - } - Err(e) => { - error!("Failed to connect to database for OCR queueing: {}", e); - } - } - } - - // TODO: Record in webdav_files table for tracking - files_processed += 1; - } - Err(e) => { - error!("Failed to create document record for {}: {}", file_info.name, e); - } - } - } - Err(e) => { - error!("Failed to download file {}: {}", file_info.path, e); - } - } - } - } - Err(e) => { - error!("Failed to discover files in folder {}: {}", folder_path, e); - } - } - } - - info!("WebDAV sync completed for user {}: {} files processed", user_id, files_processed); - Ok(files_processed) -} \ No newline at end of file diff --git a/src/routes/webdav/webdav_sync.rs b/src/routes/webdav/webdav_sync.rs new file mode 100644 index 0000000..41796bd --- /dev/null +++ b/src/routes/webdav/webdav_sync.rs @@ -0,0 +1,287 @@ +use std::sync::Arc; +use std::path::Path; +use tracing::{error, info, warn}; +use chrono::Utc; + +use crate::{ + AppState, + models::{CreateWebDAVFile, UpdateWebDAVSyncState}, + ocr_queue::OcrQueueService, + file_service::FileService, + webdav_service::{WebDAVConfig, WebDAVService}, +}; + +pub async fn perform_webdav_sync_with_tracking( + state: Arc, + user_id: uuid::Uuid, + webdav_service: WebDAVService, + config: WebDAVConfig, + enable_background_ocr: bool, +) -> Result> { + info!("Performing WebDAV sync for user {} on {} folders", user_id, config.watch_folders.len()); + + // Update sync state to running + let sync_state_update = UpdateWebDAVSyncState { + last_sync_at: Some(Utc::now()), + sync_cursor: None, + is_running: true, + files_processed: 0, + files_remaining: 0, + current_folder: None, + errors: Vec::new(), + }; + + if let Err(e) = state.db.update_webdav_sync_state(user_id, &sync_state_update).await { + error!("Failed to update sync state: {}", e); + } + + let mut total_files_processed = 0; + let mut sync_errors = Vec::new(); + + // Process each watch folder + for folder_path in &config.watch_folders { + info!("Syncing folder: {}", folder_path); + + // Update current folder in sync state + let folder_update = UpdateWebDAVSyncState { + last_sync_at: Some(Utc::now()), + sync_cursor: None, + is_running: true, + files_processed: total_files_processed as i64, + files_remaining: 0, + current_folder: Some(folder_path.clone()), + errors: sync_errors.clone(), + }; + + if let Err(e) = state.db.update_webdav_sync_state(user_id, &folder_update).await { + warn!("Failed to update sync folder state: {}", e); + } + + // Discover files in the folder + match webdav_service.discover_files_in_folder(folder_path).await { + Ok(files) => { + info!("Found {} files in folder {}", files.len(), folder_path); + + let mut folder_files_processed = 0; + let files_to_process = files.len(); + + for (idx, file_info) in files.into_iter().enumerate() { + if file_info.is_directory { + continue; // Skip directories + } + + // Check if file extension is supported + let file_extension = Path::new(&file_info.name) + .extension() + .and_then(|ext| ext.to_str()) + .unwrap_or("") + .to_lowercase(); + + if !config.file_extensions.contains(&file_extension) { + continue; // Skip unsupported file types + } + + // Check if we've already processed this file + match state.db.get_webdav_file_by_path(user_id, &file_info.path).await { + Ok(Some(existing_file)) => { + // Check if file has changed (compare ETags) + if existing_file.etag == file_info.etag { + info!("Skipping unchanged file: {} (ETag: {})", file_info.path, file_info.etag); + continue; + } + info!("File has changed: {} (old ETag: {}, new ETag: {})", + file_info.path, existing_file.etag, file_info.etag); + } + Ok(None) => { + info!("New file found: {}", file_info.path); + } + Err(e) => { + warn!("Error checking existing file {}: {}", file_info.path, e); + } + } + + // Update progress + let progress_update = UpdateWebDAVSyncState { + last_sync_at: Some(Utc::now()), + sync_cursor: None, + is_running: true, + files_processed: (total_files_processed + folder_files_processed) as i64, + files_remaining: (files_to_process - idx - 1) as i64, + current_folder: Some(folder_path.clone()), + errors: sync_errors.clone(), + }; + + if let Err(e) = state.db.update_webdav_sync_state(user_id, &progress_update).await { + warn!("Failed to update sync progress: {}", e); + } + + // Download the file + match webdav_service.download_file(&file_info.path).await { + Ok(file_data) => { + info!("Downloaded file: {} ({} bytes)", file_info.name, file_data.len()); + + // Create file service and save file to disk + let file_service = FileService::new(state.config.upload_path.clone()); + + let saved_file_path = match file_service.save_file(&file_info.name, &file_data).await { + Ok(path) => path, + Err(e) => { + error!("Failed to save file {}: {}", file_info.name, e); + sync_errors.push(format!("Failed to save {}: {}", file_info.name, e)); + + // Record failed file in database + let failed_file = CreateWebDAVFile { + user_id, + webdav_path: file_info.path.clone(), + etag: file_info.etag.clone(), + last_modified: file_info.last_modified, + file_size: file_info.size, + mime_type: file_info.mime_type.clone(), + document_id: None, + sync_status: "failed".to_string(), + sync_error: Some(e.to_string()), + }; + + if let Err(db_err) = state.db.create_or_update_webdav_file(&failed_file).await { + error!("Failed to record failed file: {}", db_err); + } + + continue; + } + }; + + // Create document record + let document = file_service.create_document( + &file_info.name, + &file_info.name, // original filename same as name + &saved_file_path, + file_info.size, + &file_info.mime_type, + user_id, + ); + + // Save document to database + match state.db.create_document(document).await { + Ok(saved_document) => { + info!("Created document record: {} (ID: {})", file_info.name, saved_document.id); + + // Record successful file in WebDAV tracking + let webdav_file = CreateWebDAVFile { + user_id, + webdav_path: file_info.path.clone(), + etag: file_info.etag.clone(), + last_modified: file_info.last_modified, + file_size: file_info.size, + mime_type: file_info.mime_type.clone(), + document_id: Some(saved_document.id), + sync_status: "completed".to_string(), + sync_error: None, + }; + + if let Err(e) = state.db.create_or_update_webdav_file(&webdav_file).await { + error!("Failed to record WebDAV file: {}", e); + } + + // Add to OCR queue if enabled + if enable_background_ocr { + match sqlx::PgPool::connect(&state.config.database_url).await { + Ok(pool) => { + let queue_service = OcrQueueService::new(state.db.clone(), pool, 1); + + // Calculate priority based on file size + let priority = match file_info.size { + 0..=1048576 => 10, // <= 1MB: highest priority + ..=5242880 => 8, // 1-5MB: high priority + ..=10485760 => 6, // 5-10MB: medium priority + ..=52428800 => 4, // 10-50MB: low priority + _ => 2, // > 50MB: lowest priority + }; + + if let Err(e) = queue_service.enqueue_document(saved_document.id, priority, file_info.size).await { + error!("Failed to enqueue document for OCR: {}", e); + } else { + info!("Enqueued document {} for OCR processing", saved_document.id); + } + } + Err(e) => { + error!("Failed to connect to database for OCR queueing: {}", e); + } + } + } + + folder_files_processed += 1; + } + Err(e) => { + error!("Failed to create document record for {}: {}", file_info.name, e); + sync_errors.push(format!("Failed to create document {}: {}", file_info.name, e)); + + // Update WebDAV file status to failed + let failed_file = CreateWebDAVFile { + user_id, + webdav_path: file_info.path.clone(), + etag: file_info.etag.clone(), + last_modified: file_info.last_modified, + file_size: file_info.size, + mime_type: file_info.mime_type.clone(), + document_id: None, + sync_status: "failed".to_string(), + sync_error: Some(e.to_string()), + }; + + if let Err(db_err) = state.db.create_or_update_webdav_file(&failed_file).await { + error!("Failed to record failed file: {}", db_err); + } + } + } + } + Err(e) => { + error!("Failed to download file {}: {}", file_info.path, e); + sync_errors.push(format!("Failed to download {}: {}", file_info.path, e)); + + // Record download failure + let failed_file = CreateWebDAVFile { + user_id, + webdav_path: file_info.path.clone(), + etag: file_info.etag.clone(), + last_modified: file_info.last_modified, + file_size: file_info.size, + mime_type: file_info.mime_type.clone(), + document_id: None, + sync_status: "failed".to_string(), + sync_error: Some(format!("Download failed: {}", e)), + }; + + if let Err(db_err) = state.db.create_or_update_webdav_file(&failed_file).await { + error!("Failed to record failed file: {}", db_err); + } + } + } + } + + total_files_processed += folder_files_processed; + } + Err(e) => { + error!("Failed to discover files in folder {}: {}", folder_path, e); + sync_errors.push(format!("Failed to list folder {}: {}", folder_path, e)); + } + } + } + + // Update final sync state + let final_state = UpdateWebDAVSyncState { + last_sync_at: Some(Utc::now()), + sync_cursor: None, + is_running: false, + files_processed: total_files_processed as i64, + files_remaining: 0, + current_folder: None, + errors: sync_errors, + }; + + if let Err(e) = state.db.update_webdav_sync_state(user_id, &final_state).await { + error!("Failed to update final sync state: {}", e); + } + + info!("WebDAV sync completed for user {}: {} files processed", user_id, total_files_processed); + Ok(total_files_processed) +} \ No newline at end of file diff --git a/src/webdav_scheduler.rs b/src/webdav_scheduler.rs index b063338..4032bc0 100644 --- a/src/webdav_scheduler.rs +++ b/src/webdav_scheduler.rs @@ -10,6 +10,7 @@ use crate::{ AppState, }; use crate::webdav_service::{WebDAVConfig, WebDAVService}; +use crate::routes::webdav::webdav_sync::perform_webdav_sync_with_tracking; pub struct WebDAVScheduler { db: Database, @@ -66,7 +67,7 @@ impl WebDAVScheduler { let enable_background_ocr = user_settings.enable_background_ocr; tokio::spawn(async move { - match perform_webdav_sync(state_clone.clone(), user_id, webdav_service, webdav_config, enable_background_ocr).await { + match perform_webdav_sync_with_tracking(state_clone.clone(), user_id, webdav_service, webdav_config, enable_background_ocr).await { Ok(files_processed) => { info!("Background WebDAV sync completed for user {}: {} files processed", user_id, files_processed); @@ -121,9 +122,6 @@ impl WebDAVScheduler { } async fn is_sync_due(&self, user_settings: &crate::models::Settings) -> Result> { - // TODO: Add a webdav_sync_state table to track last sync time - // For now, we'll use a simple time-based check - // Get the sync interval in minutes let sync_interval_minutes = user_settings.webdav_sync_interval_minutes; @@ -132,8 +130,27 @@ impl WebDAVScheduler { return Ok(false); } - // TODO: Check actual last sync time from database - // For now, assume sync is always due (this will be refined when we add the webdav_sync_state table) + // Check if a sync is already running + if let Ok(Some(sync_state)) = self.db.get_webdav_sync_state(user_settings.user_id).await { + if sync_state.is_running { + info!("Sync already running for user {}", user_settings.user_id); + return Ok(false); + } + + // Check last sync time + if let Some(last_sync) = sync_state.last_sync_at { + let elapsed = chrono::Utc::now() - last_sync; + let elapsed_minutes = elapsed.num_minutes(); + + if elapsed_minutes < sync_interval_minutes as i64 { + info!("Sync not due for user {} (last sync {} minutes ago, interval {} minutes)", + user_settings.user_id, elapsed_minutes, sync_interval_minutes); + return Ok(false); + } + } + } + + // Sync is due Ok(true) } @@ -164,124 +181,3 @@ impl WebDAVScheduler { } } -// Re-use the sync function from webdav routes -async fn perform_webdav_sync( - state: Arc, - user_id: uuid::Uuid, - webdav_service: WebDAVService, - config: WebDAVConfig, - enable_background_ocr: bool, -) -> Result> { - use std::path::Path; - - info!("Performing background WebDAV sync for user {} on {} folders", user_id, config.watch_folders.len()); - - let mut files_processed = 0; - - // Process each watch folder - for folder_path in &config.watch_folders { - info!("Syncing folder: {}", folder_path); - - // Discover files in the folder - match webdav_service.discover_files_in_folder(folder_path).await { - Ok(files) => { - info!("Found {} files in folder {}", files.len(), folder_path); - - for file_info in files { - if file_info.is_directory { - continue; // Skip directories - } - - // Check if file extension is supported - let file_extension = Path::new(&file_info.name) - .extension() - .and_then(|ext| ext.to_str()) - .unwrap_or("") - .to_lowercase(); - - if !config.file_extensions.contains(&file_extension) { - continue; // Skip unsupported file types - } - - // TODO: Check if we've already processed this file using ETag - - // Download the file - match webdav_service.download_file(&file_info.path).await { - Ok(file_data) => { - info!("Downloaded file: {} ({} bytes)", file_info.name, file_data.len()); - - // Create file service and save file - let file_service = FileService::new(state.config.upload_path.clone()); - - let saved_file_path = match file_service.save_file(&file_info.name, &file_data).await { - Ok(path) => path, - Err(e) => { - error!("Failed to save file {}: {}", file_info.name, e); - continue; - } - }; - - // Create document record - let document = file_service.create_document( - &file_info.name, - &file_info.name, - &saved_file_path, - file_info.size, - &file_info.mime_type, - user_id, - ); - - // Save document to database - match state.db.create_document(document).await { - Ok(saved_document) => { - info!("Created document record: {} (ID: {})", file_info.name, saved_document.id); - - // Add to OCR queue if enabled - if enable_background_ocr { - match sqlx::PgPool::connect(&state.config.database_url).await { - Ok(pool) => { - let queue_service = OcrQueueService::new(state.db.clone(), pool, 1); - - // Calculate priority based on file size - let priority = match file_info.size { - 0..=1048576 => 10, // <= 1MB: highest priority - ..=5242880 => 8, // 1-5MB: high priority - ..=10485760 => 6, // 5-10MB: medium priority - ..=52428800 => 4, // 10-50MB: low priority - _ => 2, // > 50MB: lowest priority - }; - - if let Err(e) = queue_service.enqueue_document(saved_document.id, priority, file_info.size).await { - error!("Failed to enqueue document for OCR: {}", e); - } else { - info!("Enqueued document {} for OCR processing", saved_document.id); - } - } - Err(e) => { - error!("Failed to connect to database for OCR queueing: {}", e); - } - } - } - - files_processed += 1; - } - Err(e) => { - error!("Failed to create document record for {}: {}", file_info.name, e); - } - } - } - Err(e) => { - error!("Failed to download file {}: {}", file_info.path, e); - } - } - } - } - Err(e) => { - error!("Failed to discover files in folder {}: {}", folder_path, e); - } - } - } - - info!("Background WebDAV sync completed for user {}: {} files processed", user_id, files_processed); - Ok(files_processed) -} \ No newline at end of file diff --git a/src/webdav_service.rs b/src/webdav_service.rs index 57d3d0c..dc3e06c 100644 --- a/src/webdav_service.rs +++ b/src/webdav_service.rs @@ -536,16 +536,4 @@ impl WebDAVService { Ok(bytes.to_vec()) } - pub async fn get_sync_status(&self) -> Result { - // This would typically read from database/cache - // For now, return a placeholder - Ok(WebDAVSyncStatus { - is_running: false, - last_sync: None, - files_processed: 0, - files_remaining: 0, - current_folder: None, - errors: Vec::new(), - }) - } } \ No newline at end of file diff --git a/tests/webdav_enhanced_unit_tests.rs b/tests/webdav_enhanced_unit_tests.rs new file mode 100644 index 0000000..cdf9086 --- /dev/null +++ b/tests/webdav_enhanced_unit_tests.rs @@ -0,0 +1,615 @@ +use readur::webdav_service::{WebDAVService, WebDAVConfig, FileInfo, RetryConfig}; +use readur::models::*; +use tokio; +use chrono::{DateTime, Utc}; +use uuid::Uuid; + +// Mock WebDAV responses for comprehensive testing +fn mock_nextcloud_propfind_response() -> String { + r#" + + + /remote.php/dav/files/admin/Documents/ + + + Documents + Tue, 01 Jan 2024 12:00:00 GMT + + + + "abc123" + + HTTP/1.1 200 OK + + + + /remote.php/dav/files/admin/Documents/report.pdf + + + report.pdf + 2048000 + Mon, 15 Jan 2024 14:30:00 GMT + application/pdf + "pdf123" + + + HTTP/1.1 200 OK + + + + /remote.php/dav/files/admin/Documents/photo.png + + + photo.png + 768000 + Wed, 10 Jan 2024 09:15:00 GMT + image/png + "png456" + + + HTTP/1.1 200 OK + + + + /remote.php/dav/files/admin/Documents/unsupported.docx + + + unsupported.docx + 102400 + Thu, 20 Jan 2024 16:45:00 GMT + application/vnd.openxmlformats-officedocument.wordprocessingml.document + "docx789" + + + HTTP/1.1 200 OK + + + "#.to_string() +} + +fn mock_empty_folder_response() -> String { + r#" + + + /webdav/EmptyFolder/ + + + EmptyFolder + Fri, 01 Jan 2024 12:00:00 GMT + + + + + HTTP/1.1 200 OK + + + "#.to_string() +} + +fn mock_malformed_xml_response() -> String { + r#" + + + /webdav/test.pdf + + + test.pdf + + + + + "#.to_string() +} + +#[test] +fn test_webdav_config_validation() { + // Test valid config + let valid_config = WebDAVConfig { + server_url: "https://cloud.example.com".to_string(), + username: "testuser".to_string(), + password: "testpass".to_string(), + watch_folders: vec!["/Documents".to_string(), "/Photos".to_string()], + file_extensions: vec!["pdf".to_string(), "png".to_string(), "jpg".to_string()], + timeout_seconds: 30, + server_type: Some("nextcloud".to_string()), + }; + + assert!(WebDAVService::new(valid_config).is_ok()); + + // Test config with empty server URL + let invalid_config = WebDAVConfig { + server_url: "".to_string(), + username: "testuser".to_string(), + password: "testpass".to_string(), + watch_folders: vec!["/Documents".to_string()], + file_extensions: vec!["pdf".to_string()], + timeout_seconds: 30, + server_type: Some("nextcloud".to_string()), + }; + + // Should still create service, validation happens during actual requests + assert!(WebDAVService::new(invalid_config).is_ok()); +} + +#[test] +fn test_webdav_url_construction_comprehensive() { + // Test Nextcloud URL construction + let nextcloud_config = WebDAVConfig { + server_url: "https://nextcloud.example.com".to_string(), + username: "admin".to_string(), + password: "secret".to_string(), + watch_folders: vec!["/Documents".to_string()], + file_extensions: vec!["pdf".to_string()], + timeout_seconds: 30, + server_type: Some("nextcloud".to_string()), + }; + + let service = WebDAVService::new(nextcloud_config).unwrap(); + // URL construction is tested implicitly during service creation + + // Test ownCloud URL construction + let owncloud_config = WebDAVConfig { + server_url: "https://cloud.example.com/".to_string(), // With trailing slash + username: "user123".to_string(), + password: "pass123".to_string(), + watch_folders: vec!["/Shared".to_string()], + file_extensions: vec!["jpg".to_string()], + timeout_seconds: 60, + server_type: Some("owncloud".to_string()), + }; + + assert!(WebDAVService::new(owncloud_config).is_ok()); + + // Test generic WebDAV URL construction + let generic_config = WebDAVConfig { + server_url: "https://webdav.example.com".to_string(), + username: "webdavuser".to_string(), + password: "webdavpass".to_string(), + watch_folders: vec!["/Files".to_string()], + file_extensions: vec!["txt".to_string()], + timeout_seconds: 45, + server_type: None, // No server type = generic + }; + + assert!(WebDAVService::new(generic_config).is_ok()); +} + +#[test] +fn test_webdav_response_parsing_comprehensive() { + let config = WebDAVConfig { + server_url: "https://cloud.example.com".to_string(), + username: "admin".to_string(), + password: "testpass".to_string(), + watch_folders: vec!["/Documents".to_string()], + file_extensions: vec!["pdf".to_string(), "png".to_string(), "jpg".to_string()], + timeout_seconds: 30, + server_type: Some("nextcloud".to_string()), + }; + + let service = WebDAVService::new(config).unwrap(); + + // Test Nextcloud response parsing + let nextcloud_response = mock_nextcloud_propfind_response(); + let files = service.parse_webdav_response(&nextcloud_response); + assert!(files.is_ok()); + + let files = files.unwrap(); + assert_eq!(files.len(), 3); // Should have 3 files (excluding directory) + + // Verify first file (report.pdf) + let pdf_file = files.iter().find(|f| f.name == "report.pdf").unwrap(); + assert_eq!(pdf_file.size, 2048000); + assert_eq!(pdf_file.mime_type, "application/pdf"); + assert_eq!(pdf_file.etag, "\"pdf123\""); + assert!(!pdf_file.is_directory); + + // Verify second file (photo.png) + let png_file = files.iter().find(|f| f.name == "photo.png").unwrap(); + assert_eq!(png_file.size, 768000); + assert_eq!(png_file.mime_type, "image/png"); + assert_eq!(png_file.etag, "\"png456\""); + assert!(!png_file.is_directory); + + // Verify third file (unsupported.docx) + let docx_file = files.iter().find(|f| f.name == "unsupported.docx").unwrap(); + assert_eq!(docx_file.size, 102400); + assert_eq!(docx_file.mime_type, "application/vnd.openxmlformats-officedocument.wordprocessingml.document"); + assert_eq!(docx_file.etag, "\"docx789\""); + assert!(!docx_file.is_directory); +} + +#[test] +fn test_empty_folder_parsing() { + let config = WebDAVConfig { + server_url: "https://cloud.example.com".to_string(), + username: "testuser".to_string(), + password: "testpass".to_string(), + watch_folders: vec!["/EmptyFolder".to_string()], + file_extensions: vec!["pdf".to_string()], + timeout_seconds: 30, + server_type: Some("generic".to_string()), + }; + + let service = WebDAVService::new(config).unwrap(); + let response = mock_empty_folder_response(); + + let files = service.parse_webdav_response(&response); + assert!(files.is_ok()); + + let files = files.unwrap(); + assert_eq!(files.len(), 0); // Empty folder should have no files +} + +#[test] +fn test_malformed_xml_handling() { + let config = WebDAVConfig { + server_url: "https://cloud.example.com".to_string(), + username: "testuser".to_string(), + password: "testpass".to_string(), + watch_folders: vec!["/Documents".to_string()], + file_extensions: vec!["pdf".to_string()], + timeout_seconds: 30, + server_type: Some("nextcloud".to_string()), + }; + + let service = WebDAVService::new(config).unwrap(); + let response = mock_malformed_xml_response(); + + // Current simple parser might still extract some data from malformed XML + let result = service.parse_webdav_response(&response); + // It might succeed or fail depending on how robust the parser is + assert!(result.is_ok() || result.is_err()); +} + +#[test] +fn test_retry_config_custom_values() { + let custom_retry = RetryConfig { + max_retries: 5, + initial_delay_ms: 500, + max_delay_ms: 15000, + backoff_multiplier: 1.5, + timeout_seconds: 90, + }; + + assert_eq!(custom_retry.max_retries, 5); + assert_eq!(custom_retry.initial_delay_ms, 500); + assert_eq!(custom_retry.max_delay_ms, 15000); + assert_eq!(custom_retry.backoff_multiplier, 1.5); + assert_eq!(custom_retry.timeout_seconds, 90); + + let config = WebDAVConfig { + server_url: "https://cloud.example.com".to_string(), + username: "testuser".to_string(), + password: "testpass".to_string(), + watch_folders: vec!["/Documents".to_string()], + file_extensions: vec!["pdf".to_string()], + timeout_seconds: 30, + server_type: Some("nextcloud".to_string()), + }; + + assert!(WebDAVService::new_with_retry(config, custom_retry).is_ok()); +} + +#[test] +fn test_file_extension_matching() { + let supported_extensions = vec!["pdf", "png", "jpg", "jpeg", "tiff", "bmp", "txt"]; + + let test_cases = vec![ + ("document.pdf", true), + ("image.PNG", true), // Case insensitive + ("photo.jpg", true), + ("photo.JPEG", true), + ("scan.tiff", true), + ("bitmap.bmp", true), + ("readme.txt", true), + ("spreadsheet.xlsx", false), + ("presentation.pptx", false), + ("archive.zip", false), + ("script.sh", false), + ("no_extension", false), + (".hidden", false), + ]; + + for (filename, should_match) in test_cases { + let extension = std::path::Path::new(filename) + .extension() + .and_then(|ext| ext.to_str()) + .map(|ext| ext.to_lowercase()); + + let matches = extension + .as_ref() + .map(|ext| supported_extensions.contains(&ext.as_str())) + .unwrap_or(false); + + assert_eq!(matches, should_match, + "File '{}' extension matching failed. Expected: {}, Got: {}", + filename, should_match, matches); + } +} + +#[test] +fn test_webdav_sync_state_model() { + let sync_state = WebDAVSyncState { + id: Uuid::new_v4(), + user_id: Uuid::new_v4(), + last_sync_at: Some(Utc::now()), + sync_cursor: Some("cursor123".to_string()), + is_running: true, + files_processed: 42, + files_remaining: 58, + current_folder: Some("/Documents".to_string()), + errors: vec!["Error 1".to_string(), "Error 2".to_string()], + created_at: Utc::now(), + updated_at: Utc::now(), + }; + + assert!(sync_state.is_running); + assert_eq!(sync_state.files_processed, 42); + assert_eq!(sync_state.files_remaining, 58); + assert_eq!(sync_state.current_folder, Some("/Documents".to_string())); + assert_eq!(sync_state.errors.len(), 2); +} + +#[test] +fn test_webdav_file_model() { + let document_id = Uuid::new_v4(); + let webdav_file = WebDAVFile { + id: Uuid::new_v4(), + user_id: Uuid::new_v4(), + webdav_path: "/Documents/report.pdf".to_string(), + etag: "\"abc123\"".to_string(), + last_modified: Some(Utc::now()), + file_size: 2048000, + mime_type: "application/pdf".to_string(), + document_id: Some(document_id), + sync_status: "completed".to_string(), + sync_error: None, + created_at: Utc::now(), + updated_at: Utc::now(), + }; + + assert_eq!(webdav_file.webdav_path, "/Documents/report.pdf"); + assert_eq!(webdav_file.etag, "\"abc123\""); + assert_eq!(webdav_file.file_size, 2048000); + assert_eq!(webdav_file.sync_status, "completed"); + assert!(webdav_file.sync_error.is_none()); +} + +#[test] +fn test_create_webdav_file_model() { + let user_id = Uuid::new_v4(); + let create_file = CreateWebDAVFile { + user_id, + webdav_path: "/Photos/vacation.jpg".to_string(), + etag: "\"photo123\"".to_string(), + last_modified: Some(Utc::now()), + file_size: 1536000, + mime_type: "image/jpeg".to_string(), + document_id: None, + sync_status: "pending".to_string(), + sync_error: None, + }; + + assert_eq!(create_file.user_id, user_id); + assert_eq!(create_file.webdav_path, "/Photos/vacation.jpg"); + assert_eq!(create_file.file_size, 1536000); + assert_eq!(create_file.sync_status, "pending"); +} + +#[test] +fn test_update_webdav_sync_state_model() { + let update_state = UpdateWebDAVSyncState { + last_sync_at: Some(Utc::now()), + sync_cursor: Some("new_cursor".to_string()), + is_running: false, + files_processed: 100, + files_remaining: 0, + current_folder: None, + errors: Vec::new(), + }; + + assert!(!update_state.is_running); + assert_eq!(update_state.files_processed, 100); + assert_eq!(update_state.files_remaining, 0); + assert!(update_state.current_folder.is_none()); + assert!(update_state.errors.is_empty()); +} + +#[test] +fn test_ocr_priority_calculation_comprehensive() { + let test_cases = vec![ + // Size boundaries + (0, 10), // 0 bytes + (1, 10), // 1 byte + (1048576, 10), // Exactly 1MB + (1048577, 8), // 1MB + 1 byte + (5242880, 8), // Exactly 5MB + (5242881, 6), // 5MB + 1 byte + (10485760, 6), // Exactly 10MB + (10485761, 4), // 10MB + 1 byte + (52428800, 4), // Exactly 50MB + (52428801, 2), // 50MB + 1 byte + (104857600, 2), // 100MB + (1073741824, 2), // 1GB + ]; + + for (file_size, expected_priority) in test_cases { + let priority = match file_size { + 0..=1048576 => 10, // <= 1MB + ..=5242880 => 8, // 1-5MB + ..=10485760 => 6, // 5-10MB + ..=52428800 => 4, // 10-50MB + _ => 2, // > 50MB + }; + + assert_eq!(priority, expected_priority, + "Priority calculation failed for file size {} bytes", file_size); + } +} + +#[test] +fn test_sync_status_serialization() { + let sync_status = WebDAVSyncStatus { + is_running: true, + last_sync: Some(Utc::now()), + files_processed: 25, + files_remaining: 75, + current_folder: Some("/Documents/Reports".to_string()), + errors: vec!["Connection timeout".to_string()], + }; + + // Test that the status can be serialized to JSON + let json = serde_json::to_string(&sync_status); + assert!(json.is_ok()); + + let json_str = json.unwrap(); + assert!(json_str.contains("\"is_running\":true")); + assert!(json_str.contains("\"files_processed\":25")); + assert!(json_str.contains("\"files_remaining\":75")); + assert!(json_str.contains("\"current_folder\":\"/Documents/Reports\"")); +} + +#[test] +fn test_crawl_estimate_calculation() { + let folder1 = WebDAVFolderInfo { + path: "/Documents".to_string(), + total_files: 100, + supported_files: 80, + estimated_time_hours: 0.044, // ~2.6 minutes + total_size_mb: 150.0, + }; + + let folder2 = WebDAVFolderInfo { + path: "/Photos".to_string(), + total_files: 200, + supported_files: 150, + estimated_time_hours: 0.083, // ~5 minutes + total_size_mb: 500.0, + }; + + let estimate = WebDAVCrawlEstimate { + folders: vec![folder1, folder2], + total_files: 300, + total_supported_files: 230, + total_estimated_time_hours: 0.127, // ~7.6 minutes + total_size_mb: 650.0, + }; + + assert_eq!(estimate.folders.len(), 2); + assert_eq!(estimate.total_files, 300); + assert_eq!(estimate.total_supported_files, 230); + assert!((estimate.total_estimated_time_hours - 0.127).abs() < 0.001); + assert_eq!(estimate.total_size_mb, 650.0); +} + +#[test] +fn test_connection_result_variants() { + // Success case + let success_result = WebDAVConnectionResult { + success: true, + message: "Connected successfully to Nextcloud 28.0.1".to_string(), + server_version: Some("28.0.1".to_string()), + server_type: Some("nextcloud".to_string()), + }; + + assert!(success_result.success); + assert!(success_result.server_version.is_some()); + assert_eq!(success_result.server_type, Some("nextcloud".to_string())); + + // Failure case + let failure_result = WebDAVConnectionResult { + success: false, + message: "Authentication failed: 401 Unauthorized".to_string(), + server_version: None, + server_type: None, + }; + + assert!(!failure_result.success); + assert!(failure_result.server_version.is_none()); + assert!(failure_result.server_type.is_none()); + assert!(failure_result.message.contains("401")); +} + +#[test] +fn test_notification_creation_for_webdav() { + let notification = CreateNotification { + notification_type: "info".to_string(), + title: "WebDAV Sync Started".to_string(), + message: "Synchronizing files from Nextcloud server".to_string(), + action_url: Some("/sync-status".to_string()), + metadata: Some(serde_json::json!({ + "sync_type": "webdav", + "folders": ["/Documents", "/Photos"], + "estimated_files": 150 + })), + }; + + assert_eq!(notification.notification_type, "info"); + assert_eq!(notification.title, "WebDAV Sync Started"); + assert!(notification.action_url.is_some()); + + let metadata = notification.metadata.unwrap(); + assert_eq!(metadata["sync_type"], "webdav"); + assert!(metadata["folders"].is_array()); + assert_eq!(metadata["estimated_files"], 150); +} + +#[test] +fn test_special_characters_in_paths() { + let test_paths = vec![ + "/Documents/File with spaces.pdf", + "/Documents/Ñoño/archivo.pdf", + "/Documents/测试文件.pdf", + "/Documents/файл.pdf", + "/Documents/50%.pdf", + "/Documents/file&name.pdf", + "/Documents/file#1.pdf", + ]; + + for path in test_paths { + let file_info = FileInfo { + path: path.to_string(), + name: std::path::Path::new(path) + .file_name() + .unwrap() + .to_str() + .unwrap() + .to_string(), + size: 1024, + mime_type: "application/pdf".to_string(), + last_modified: Some(Utc::now()), + etag: "\"test123\"".to_string(), + is_directory: false, + }; + + assert!(!file_info.name.is_empty()); + assert!(file_info.name.ends_with(".pdf")); + } +} + +#[test] +fn test_backoff_delay_calculation() { + let retry_config = RetryConfig::default(); + + let mut delays = Vec::new(); + let mut delay = retry_config.initial_delay_ms; + + for _ in 0..5 { + delays.push(delay); + delay = ((delay as f64 * retry_config.backoff_multiplier) as u64) + .min(retry_config.max_delay_ms); + } + + assert_eq!(delays[0], 1000); // 1s + assert_eq!(delays[1], 2000); // 2s + assert_eq!(delays[2], 4000); // 4s + assert_eq!(delays[3], 8000); // 8s + assert_eq!(delays[4], 16000); // 16s + + // Verify max delay is respected + for _ in 0..10 { + delay = ((delay as f64 * retry_config.backoff_multiplier) as u64) + .min(retry_config.max_delay_ms); + } + assert_eq!(delay, retry_config.max_delay_ms); +} \ No newline at end of file