From 0e84993afa1f1b80b8d73926267f0b857efdb651 Mon Sep 17 00:00:00 2001 From: perf3ct Date: Thu, 3 Jul 2025 23:58:11 +0000 Subject: [PATCH] fix(server): resolve import issues --- src/db/documents/helpers.rs | 1 - src/db/documents/management.rs | 1 + src/db/documents/operations.rs | 2 +- src/ingestion/document_ingestion.rs | 1 - src/models/search.rs | 2 +- src/monitoring/request_throttler.rs | 2 +- src/routes/documents_old.rs.bak | 62 -- src/routes/settings.rs | 2 +- src/routes/sources_main.rs.bak | 41 - src/routes/sources_old.rs.bak | 1100 -------------------------- src/routes/users.rs | 2 +- src/scheduling/source_scheduler.rs | 2 +- src/scheduling/source_sync.rs | 3 +- src/scheduling/webdav_scheduler.rs | 2 - src/services/file_service.rs | 3 +- src/services/local_folder_service.rs | 2 +- src/services/s3_service.rs | 6 +- src/services/webdav/config.rs | 1 - src/services/webdav/discovery.rs | 4 +- src/services/webdav/service.rs | 2 +- src/services/webdav/validation.rs | 4 +- src/utils/debug.rs | 2 +- 22 files changed, 19 insertions(+), 1228 deletions(-) delete mode 100644 src/routes/documents_old.rs.bak delete mode 100644 src/routes/sources_main.rs.bak delete mode 100644 src/routes/sources_old.rs.bak diff --git a/src/db/documents/helpers.rs b/src/db/documents/helpers.rs index 63e9eee..a082056 100644 --- a/src/db/documents/helpers.rs +++ b/src/db/documents/helpers.rs @@ -1,4 +1,3 @@ -use anyhow::Result; use sqlx::{Row, QueryBuilder, Postgres}; use uuid::Uuid; diff --git a/src/db/documents/management.rs b/src/db/documents/management.rs index d256c84..2cba832 100644 --- a/src/db/documents/management.rs +++ b/src/db/documents/management.rs @@ -63,6 +63,7 @@ impl Database { created_at: row.get("created_at"), updated_at: row.get("updated_at"), document_count: 0, + source_count: 0, }; if Some(doc_id) != current_doc_id { diff --git a/src/db/documents/operations.rs b/src/db/documents/operations.rs index ab029c8..e72613c 100644 --- a/src/db/documents/operations.rs +++ b/src/db/documents/operations.rs @@ -1,5 +1,5 @@ use anyhow::Result; -use sqlx::{QueryBuilder, Postgres, Transaction}; +use sqlx::{QueryBuilder, Postgres, Row}; use uuid::Uuid; use crate::models::{Document, UserRole, FailedDocument}; diff --git a/src/ingestion/document_ingestion.rs b/src/ingestion/document_ingestion.rs index c8e9425..7c7d96c 100644 --- a/src/ingestion/document_ingestion.rs +++ b/src/ingestion/document_ingestion.rs @@ -9,7 +9,6 @@ use uuid::Uuid; use sha2::{Digest, Sha256}; use tracing::{debug, info, warn}; -use chrono::Utc; use serde_json; use crate::models::{Document, FileInfo}; diff --git a/src/models/search.rs b/src/models/search.rs index d9c8df5..2d276aa 100644 --- a/src/models/search.rs +++ b/src/models/search.rs @@ -1,7 +1,7 @@ use serde::{Deserialize, Serialize}; use utoipa::{ToSchema, IntoParams}; -use super::responses::{EnhancedDocumentResponse, SearchSnippet, HighlightRange}; +use super::responses::EnhancedDocumentResponse; #[derive(Debug, Serialize, Deserialize, ToSchema, IntoParams)] pub struct SearchRequest { diff --git a/src/monitoring/request_throttler.rs b/src/monitoring/request_throttler.rs index 477988c..e4f9f7c 100644 --- a/src/monitoring/request_throttler.rs +++ b/src/monitoring/request_throttler.rs @@ -8,7 +8,7 @@ use std::sync::Arc; use tokio::sync::Semaphore; use tokio::time::{Duration, Instant}; -use tracing::{warn, info}; +use tracing::info; /// Request throttler to limit concurrent operations #[derive(Clone)] diff --git a/src/routes/documents_old.rs.bak b/src/routes/documents_old.rs.bak deleted file mode 100644 index 1342895..0000000 --- a/src/routes/documents_old.rs.bak +++ /dev/null @@ -1,62 +0,0 @@ -use axum::{ - routing::{get, post, delete}, - Router, -}; -use std::sync::Arc; - -use crate::AppState; - -// Import all the modularized functions -mod types; -mod crud; -mod ocr; -mod bulk; -mod debug; -mod failed; - -// Re-export types for external use -pub use types::*; - -// Use the individual module functions -use crud::*; -use ocr::*; -use bulk::*; -use debug::*; -use failed::*; - -/// Documents router with all document-related endpoints -pub fn router() -> Router> { - Router::new() - // Basic CRUD operations - .route("/", post(upload_document)) - .route("/", get(list_documents)) - .route("/", delete(bulk_delete_documents)) - .route("/{id}", get(get_document_by_id)) - .route("/{id}", delete(delete_document)) - .route("/{id}/download", get(download_document)) - .route("/{id}/view", get(view_document)) - - // OCR operations - .route("/{id}/ocr", get(get_document_ocr)) - .route("/{id}/retry-ocr", post(retry_ocr)) - .route("/ocr/bulk-retry", post(crate::routes::documents_ocr_retry::bulk_retry_ocr)) - .route("/ocr/retry-stats", get(crate::routes::documents_ocr_retry::get_ocr_retry_stats)) - .route("/ocr/retry-recommendations", get(crate::routes::documents_ocr_retry::get_retry_recommendations)) - .route("/{id}/ocr/retry-history", get(crate::routes::documents_ocr_retry::get_document_retry_history)) - - // Bulk operations - .route("/delete-low-confidence", post(delete_low_confidence_documents)) - .route("/delete-failed-ocr", post(delete_failed_ocr_documents)) - - // Debug and diagnostic operations - .route("/{id}/debug", get(get_document_debug_info)) - .route("/{id}/thumbnail", get(get_document_thumbnail)) - .route("/{id}/processed-image", get(get_processed_image)) - - // Failed document management - .route("/failed", get(get_failed_documents)) - .route("/failed/{id}/view", get(view_failed_document)) - - // Other operations - .route("/duplicates", get(get_user_duplicates)) -} \ No newline at end of file diff --git a/src/routes/settings.rs b/src/routes/settings.rs index a6457f7..4ca1bec 100644 --- a/src/routes/settings.rs +++ b/src/routes/settings.rs @@ -2,7 +2,7 @@ use axum::{ extract::State, http::StatusCode, response::Json, - routing::{get, put}, + routing::get, Router, }; use std::sync::Arc; diff --git a/src/routes/sources_main.rs.bak b/src/routes/sources_main.rs.bak deleted file mode 100644 index 5f6850c..0000000 --- a/src/routes/sources_main.rs.bak +++ /dev/null @@ -1,41 +0,0 @@ -use axum::{ - routing::{get, post, delete, put}, - Router, -}; -use std::sync::Arc; - -use crate::AppState; - -// Declare the modules -mod crud; -mod sync; -mod validation; -mod estimation; - -// Import all the modularized functions -use crud::*; -use sync::*; -use validation::*; -use estimation::*; - -/// Sources router with all source-related endpoints -pub fn router() -> Router> { - Router::new() - // Basic CRUD operations - .route("/", get(list_sources).post(create_source)) - .route("/{id}", get(get_source).put(update_source).delete(delete_source)) - - // Sync operations - .route("/{id}/sync", post(trigger_sync)) - .route("/{id}/sync/stop", post(stop_sync)) - .route("/{id}/deep-scan", post(trigger_deep_scan)) - - // Validation and testing - .route("/{id}/validate", post(validate_source)) - .route("/{id}/test", post(test_connection)) - .route("/test-connection", post(test_connection_with_config)) - - // Estimation - .route("/{id}/estimate", post(estimate_crawl)) - .route("/estimate", post(estimate_crawl_with_config)) -} \ No newline at end of file diff --git a/src/routes/sources_old.rs.bak b/src/routes/sources_old.rs.bak deleted file mode 100644 index 2e1d483..0000000 --- a/src/routes/sources_old.rs.bak +++ /dev/null @@ -1,1100 +0,0 @@ -use axum::{ - extract::{Path, State}, - http::StatusCode, - response::Json, - routing::{delete, get, post, put}, - Router, -}; -use std::sync::Arc; -use uuid::Uuid; -use tracing::{error, info}; -use anyhow::Result; - -use crate::{ - auth::AuthUser, - models::{CreateSource, SourceResponse, SourceWithStats, UpdateSource}, - AppState, -}; - -pub fn router() -> Router> { - Router::new() - .route("/", get(list_sources).post(create_source)) - .route("/{id}", get(get_source).put(update_source).delete(delete_source)) - .route("/{id}/sync", post(trigger_sync)) - .route("/{id}/sync/stop", post(stop_sync)) - .route("/{id}/deep-scan", post(trigger_deep_scan)) - .route("/{id}/validate", post(validate_source)) - .route("/{id}/test", post(test_connection)) - .route("/{id}/estimate", post(estimate_crawl)) - .route("/estimate", post(estimate_crawl_with_config)) - .route("/test-connection", post(test_connection_with_config)) -} - -#[utoipa::path( - get, - path = "/api/sources", - tag = "sources", - security( - ("bearer_auth" = []) - ), - responses( - (status = 200, description = "List of user sources", body = Vec), - (status = 401, description = "Unauthorized"), - (status = 500, description = "Internal server error") - ) -)] -async fn list_sources( - auth_user: AuthUser, - State(state): State>, -) -> Result>, StatusCode> { - let sources = state - .db - .get_sources(auth_user.user.id) - .await - .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; - - // Get source IDs for batch counting - let source_ids: Vec = sources.iter().map(|s| s.id).collect(); - - // Get document counts for all sources in one query - let counts = state - .db - .count_documents_for_sources(&source_ids) - .await - .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; - - // Create a map for quick lookup - let count_map: std::collections::HashMap = counts - .into_iter() - .map(|(id, total, ocr)| (id, (total, ocr))) - .collect(); - - let responses: Vec = sources - .into_iter() - .map(|s| { - let (total_docs, total_ocr) = count_map.get(&s.id).copied().unwrap_or((0, 0)); - let mut response: SourceResponse = s.into(); - response.total_documents = total_docs; - response.total_documents_ocr = total_ocr; - response - }) - .collect(); - - Ok(Json(responses)) -} - -#[utoipa::path( - post, - path = "/api/sources", - tag = "sources", - security( - ("bearer_auth" = []) - ), - request_body = CreateSource, - responses( - (status = 201, description = "Source created successfully", body = SourceResponse), - (status = 400, description = "Bad request - invalid source data"), - (status = 401, description = "Unauthorized"), - (status = 500, description = "Internal server error") - ) -)] -async fn create_source( - auth_user: AuthUser, - State(state): State>, - Json(source_data): Json, -) -> Result, StatusCode> { - // Validate source configuration based on type - if let Err(validation_error) = validate_source_config(&source_data) { - error!("Source validation failed: {}", validation_error); - error!("Invalid source data received: {:?}", source_data); - return Err(StatusCode::BAD_REQUEST); - } - - let source = state - .db - .create_source(auth_user.user.id, &source_data) - .await - .map_err(|e| { - error!("Failed to create source in database: {}", e); - StatusCode::INTERNAL_SERVER_ERROR - })?; - - let mut response: SourceResponse = source.into(); - // New sources have no documents yet - response.total_documents = 0; - response.total_documents_ocr = 0; - - Ok(Json(response)) -} - -#[utoipa::path( - get, - path = "/api/sources/{id}", - tag = "sources", - security( - ("bearer_auth" = []) - ), - params( - ("id" = Uuid, Path, description = "Source ID") - ), - responses( - (status = 200, description = "Source details with stats", body = SourceWithStats), - (status = 401, description = "Unauthorized"), - (status = 404, description = "Source not found"), - (status = 500, description = "Internal server error") - ) -)] -async fn get_source( - auth_user: AuthUser, - Path(source_id): Path, - State(state): State>, -) -> Result, StatusCode> { - let source = state - .db - .get_source(auth_user.user.id, source_id) - .await - .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)? - .ok_or(StatusCode::NOT_FOUND)?; - - // Get recent documents for this source - let recent_documents = state - .db - .get_recent_documents_for_source(source_id, 10) - .await - .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; - - // Get document counts - let (total_documents, total_documents_ocr) = state - .db - .count_documents_for_source(source_id) - .await - .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; - - // Calculate sync progress - let sync_progress = if source.total_files_pending > 0 { - Some( - (source.total_files_synced as f32 - / (source.total_files_synced + source.total_files_pending) as f32) - * 100.0, - ) - } else { - None - }; - - let mut source_response: SourceResponse = source.into(); - source_response.total_documents = total_documents; - source_response.total_documents_ocr = total_documents_ocr; - - let response = SourceWithStats { - source: source_response, - recent_documents: recent_documents.into_iter().map(|d| d.into()).collect(), - sync_progress, - }; - - Ok(Json(response)) -} - -#[utoipa::path( - put, - path = "/api/sources/{id}", - tag = "sources", - security( - ("bearer_auth" = []) - ), - params( - ("id" = Uuid, Path, description = "Source ID") - ), - request_body = UpdateSource, - responses( - (status = 200, description = "Source updated successfully", body = SourceResponse), - (status = 400, description = "Bad request - invalid update data"), - (status = 401, description = "Unauthorized"), - (status = 404, description = "Source not found"), - (status = 500, description = "Internal server error") - ) -)] -async fn update_source( - auth_user: AuthUser, - Path(source_id): Path, - State(state): State>, - Json(update_data): Json, -) -> Result, StatusCode> { - use tracing::info; - info!("Updating source {} with data: {:?}", source_id, update_data); - // Check if source exists - let existing = state - .db - .get_source(auth_user.user.id, source_id) - .await - .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)? - .ok_or(StatusCode::NOT_FOUND)?; - - // Validate config if provided - if let Some(config) = &update_data.config { - if let Err(validation_error) = validate_config_for_type(&existing.source_type, config) { - error!("Config validation failed for source {}: {}", source_id, validation_error); - error!("Invalid config received: {:?}", config); - return Err(StatusCode::BAD_REQUEST); - } - } - - let source = state - .db - .update_source(auth_user.user.id, source_id, &update_data) - .await - .map_err(|e| { - error!("Failed to update source {} in database: {}", source_id, e); - StatusCode::INTERNAL_SERVER_ERROR - })?; - - // Get document counts - let (total_documents, total_documents_ocr) = state - .db - .count_documents_for_source(source_id) - .await - .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; - - let mut response: SourceResponse = source.into(); - response.total_documents = total_documents; - response.total_documents_ocr = total_documents_ocr; - - info!("Successfully updated source {}: {}", source_id, response.name); - Ok(Json(response)) -} - -#[utoipa::path( - delete, - path = "/api/sources/{id}", - tag = "sources", - security( - ("bearer_auth" = []) - ), - params( - ("id" = Uuid, Path, description = "Source ID") - ), - responses( - (status = 204, description = "Source deleted successfully"), - (status = 401, description = "Unauthorized"), - (status = 404, description = "Source not found"), - (status = 500, description = "Internal server error") - ) -)] -async fn delete_source( - auth_user: AuthUser, - Path(source_id): Path, - State(state): State>, -) -> Result { - let deleted = state - .db - .delete_source(auth_user.user.id, source_id) - .await - .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; - - if deleted { - Ok(StatusCode::NO_CONTENT) - } else { - Err(StatusCode::NOT_FOUND) - } -} - -#[utoipa::path( - post, - path = "/api/sources/{id}/sync", - tag = "sources", - security( - ("bearer_auth" = []) - ), - params( - ("id" = Uuid, Path, description = "Source ID") - ), - responses( - (status = 200, description = "Sync triggered successfully"), - (status = 401, description = "Unauthorized"), - (status = 404, description = "Source not found"), - (status = 409, description = "Source is already syncing"), - (status = 500, description = "Internal server error"), - (status = 501, description = "Not implemented - Source type not supported") - ) -)] -async fn trigger_sync( - auth_user: AuthUser, - Path(source_id): Path, - State(state): State>, -) -> Result { - let source = state - .db - .get_source(auth_user.user.id, source_id) - .await - .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)? - .ok_or(StatusCode::NOT_FOUND)?; - - // Check if already syncing - if matches!(source.status, crate::models::SourceStatus::Syncing) { - return Err(StatusCode::CONFLICT); - } - - // Update status to syncing - state - .db - .update_source_status(source_id, crate::models::SourceStatus::Syncing, None) - .await - .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; - - // Trigger sync using the universal source scheduler - if let Some(scheduler) = &state.source_scheduler { - if let Err(e) = scheduler.trigger_sync(source_id).await { - error!("Failed to trigger sync for source {}: {}", source_id, e); - state - .db - .update_source_status( - source_id, - crate::models::SourceStatus::Error, - Some(format!("Failed to trigger sync: {}", e)), - ) - .await - .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; - return Err(StatusCode::INTERNAL_SERVER_ERROR); - } - } else { - // Fallback to WebDAV scheduler for backward compatibility - match source.source_type { - crate::models::SourceType::WebDAV => { - if let Some(webdav_scheduler) = &state.webdav_scheduler { - webdav_scheduler.trigger_sync(source_id).await; - } else { - state - .db - .update_source_status( - source_id, - crate::models::SourceStatus::Error, - Some("No scheduler available".to_string()), - ) - .await - .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; - return Err(StatusCode::INTERNAL_SERVER_ERROR); - } - } - _ => { - state - .db - .update_source_status( - source_id, - crate::models::SourceStatus::Error, - Some("Source type not supported".to_string()), - ) - .await - .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; - return Err(StatusCode::NOT_IMPLEMENTED); - } - } - } - - Ok(StatusCode::OK) -} - -#[utoipa::path( - post, - path = "/api/sources/{id}/deep-scan", - tag = "sources", - security( - ("bearer_auth" = []) - ), - params( - ("id" = Uuid, Path, description = "Source ID") - ), - responses( - (status = 200, description = "Deep scan started successfully"), - (status = 401, description = "Unauthorized"), - (status = 404, description = "Source not found"), - (status = 409, description = "Source is already syncing"), - (status = 500, description = "Internal server error") - ) -)] -async fn trigger_deep_scan( - auth_user: AuthUser, - Path(source_id): Path, - State(state): State>, -) -> Result, StatusCode> { - info!("Starting deep scan for source {} by user {}", source_id, auth_user.user.username); - - let source = state - .db - .get_source(auth_user.user.id, source_id) - .await - .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)? - .ok_or(StatusCode::NOT_FOUND)?; - - // Check if source is already syncing - if matches!(source.status, crate::models::SourceStatus::Syncing) { - return Ok(Json(serde_json::json!({ - "success": false, - "error": "source_already_syncing", - "message": "Source is already syncing. Please wait for the current sync to complete before starting a deep scan." - }))); - } - - match source.source_type { - crate::models::SourceType::WebDAV => { - // Handle WebDAV deep scan - let config: crate::models::WebDAVSourceConfig = serde_json::from_value(source.config) - .map_err(|e| { - error!("Failed to parse WebDAV config for source {}: {}", source_id, e); - StatusCode::INTERNAL_SERVER_ERROR - })?; - - // Create WebDAV service - let webdav_config = crate::services::webdav::WebDAVConfig { - server_url: config.server_url.clone(), - username: config.username.clone(), - password: config.password.clone(), - watch_folders: config.watch_folders.clone(), - file_extensions: config.file_extensions.clone(), - timeout_seconds: 600, // 10 minutes for deep scan - server_type: config.server_type.clone(), - }; - - let webdav_service = crate::services::webdav::WebDAVService::new(webdav_config.clone()) - .map_err(|e| { - error!("Failed to create WebDAV service for deep scan: {}", e); - StatusCode::INTERNAL_SERVER_ERROR - })?; - - // Update source status to syncing - state - .db - .update_source_status( - source_id, - crate::models::SourceStatus::Syncing, - Some("Deep scan in progress".to_string()), - ) - .await - .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; - - // Start deep scan in background - let state_clone = state.clone(); - let user_id = auth_user.user.id; - let source_name = source.name.clone(); - let source_id_clone = source_id; - let config_clone = config.clone(); - - tokio::spawn(async move { - let start_time = chrono::Utc::now(); - - // Use guaranteed completeness deep scan method - match webdav_service.deep_scan_with_guaranteed_completeness(user_id, &state_clone).await { - Ok(all_discovered_files) => { - info!("Deep scan with guaranteed completeness discovered {} files", all_discovered_files.len()); - - if !all_discovered_files.is_empty() { - info!("Deep scan discovery completed for source {}: {} files found", source_id_clone, all_discovered_files.len()); - - // Filter files by extensions and process them - let files_to_process: Vec<_> = all_discovered_files.into_iter() - .filter(|file_info| { - if file_info.is_directory { - return false; - } - let file_extension = std::path::Path::new(&file_info.name) - .extension() - .and_then(|ext| ext.to_str()) - .unwrap_or("") - .to_lowercase(); - config_clone.file_extensions.contains(&file_extension) - }) - .collect(); - - info!("Deep scan will process {} files for source {}", files_to_process.len(), source_id_clone); - - // Process files using the existing sync mechanism - match crate::routes::webdav::webdav_sync::process_files_for_deep_scan( - state_clone.clone(), - user_id, - &webdav_service, - &files_to_process, - true, // enable background OCR - Some(source_id_clone) - ).await { - Ok(files_processed) => { - let duration = chrono::Utc::now() - start_time; - info!("Deep scan completed for source {}: {} files processed in {:?}", - source_id_clone, files_processed, duration); - - // Update source status to idle - if let Err(e) = state_clone.db.update_source_status( - source_id_clone, - crate::models::SourceStatus::Idle, - Some(format!("Deep scan completed: {} files processed", files_processed)), - ).await { - error!("Failed to update source status after deep scan: {}", e); - } - - // Send success notification - let notification = crate::models::CreateNotification { - notification_type: "success".to_string(), - title: "Deep Scan Completed".to_string(), - message: format!( - "Deep scan of {} completed successfully. {} files processed in {:.1} minutes.", - source_name, - files_processed, - duration.num_seconds() as f64 / 60.0 - ), - action_url: Some("/documents".to_string()), - metadata: Some(serde_json::json!({ - "source_id": source_id_clone, - "scan_type": "deep_scan", - "files_processed": files_processed, - "duration_seconds": duration.num_seconds() - })), - }; - - if let Err(e) = state_clone.db.create_notification(user_id, ¬ification).await { - error!("Failed to create deep scan success notification: {}", e); - } - } - Err(e) => { - error!("Deep scan file processing failed for source {}: {}", source_id_clone, e); - - // Update source status to error - if let Err(e2) = state_clone.db.update_source_status( - source_id_clone, - crate::models::SourceStatus::Error, - Some(format!("Deep scan failed: {}", e)), - ).await { - error!("Failed to update source status after deep scan error: {}", e2); - } - - // Send error notification - let notification = crate::models::CreateNotification { - notification_type: "error".to_string(), - title: "Deep Scan Failed".to_string(), - message: format!("Deep scan of {} failed: {}", source_name, e), - action_url: Some("/sources".to_string()), - metadata: Some(serde_json::json!({ - "source_id": source_id_clone, - "scan_type": "deep_scan", - "error": e.to_string() - })), - }; - - if let Err(e) = state_clone.db.create_notification(user_id, ¬ification).await { - error!("Failed to create deep scan error notification: {}", e); - } - } - } - - } else { - info!("Deep scan found no files for source {}", source_id_clone); - - // Update source status to idle even if no files found - if let Err(e) = state_clone.db.update_source_status( - source_id_clone, - crate::models::SourceStatus::Idle, - Some("Deep scan completed: no files found".to_string()), - ).await { - error!("Failed to update source status after empty deep scan: {}", e); - } - } - } - Err(e) => { - error!("Deep scan with guaranteed completeness failed for source {}: {}", source_id_clone, e); - - // Update source status to error - if let Err(e2) = state_clone.db.update_source_status( - source_id_clone, - crate::models::SourceStatus::Error, - Some(format!("Deep scan failed: {}", e)), - ).await { - error!("Failed to update source status after deep scan error: {}", e2); - } - - // Send error notification - let notification = crate::models::CreateNotification { - notification_type: "error".to_string(), - title: "Deep Scan Failed".to_string(), - message: format!("Deep scan of {} failed: {}", source_name, e), - action_url: Some("/sources".to_string()), - metadata: Some(serde_json::json!({ - "source_id": source_id_clone, - "scan_type": "deep_scan", - "error": e.to_string() - })), - }; - - if let Err(e) = state_clone.db.create_notification(user_id, ¬ification).await { - error!("Failed to create deep scan error notification: {}", e); - } - } - } - }); - - Ok(Json(serde_json::json!({ - "success": true, - "message": format!("Deep scan started for source '{}'. This will perform a complete rescan of all configured folders.", source.name) - }))) - } - _ => { - error!("Deep scan not supported for source type: {:?}", source.source_type); - Ok(Json(serde_json::json!({ - "success": false, - "error": "unsupported_source_type", - "message": "Deep scan is currently only supported for WebDAV sources" - }))) - } - } -} - -#[utoipa::path( - post, - path = "/api/sources/{id}/validate", - tag = "sources", - security( - ("bearer_auth" = []) - ), - params( - ("id" = Uuid, Path, description = "Source ID") - ), - responses( - (status = 200, description = "Validation started successfully"), - (status = 401, description = "Unauthorized"), - (status = 404, description = "Source not found"), - (status = 500, description = "Internal server error") - ) -)] -async fn validate_source( - auth_user: AuthUser, - Path(source_id): Path, - State(state): State>, -) -> Result, StatusCode> { - info!("Starting validation check for source {} by user {}", source_id, auth_user.user.username); - - let source = state - .db - .get_source(auth_user.user.id, source_id) - .await - .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)? - .ok_or(StatusCode::NOT_FOUND)?; - - // Start validation in background - let state_clone = state.clone(); - let source_clone = source.clone(); - tokio::spawn(async move { - if let Err(e) = crate::scheduling::source_scheduler::SourceScheduler::validate_source_health(&source_clone, &state_clone).await { - error!("Manual validation check failed for source {}: {}", source_clone.name, e); - } - }); - - Ok(Json(serde_json::json!({ - "success": true, - "message": format!("Validation check started for source '{}'", source.name) - }))) -} - -#[utoipa::path( - post, - path = "/api/sources/{id}/sync/stop", - tag = "sources", - security( - ("bearer_auth" = []) - ), - params( - ("id" = Uuid, Path, description = "Source ID") - ), - responses( - (status = 200, description = "Sync stopped successfully"), - (status = 401, description = "Unauthorized"), - (status = 404, description = "Source not found"), - (status = 409, description = "Source is not currently syncing"), - (status = 500, description = "Internal server error") - ) -)] -async fn stop_sync( - auth_user: AuthUser, - Path(source_id): Path, - State(state): State>, -) -> Result { - let source = state - .db - .get_source(auth_user.user.id, source_id) - .await - .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)? - .ok_or(StatusCode::NOT_FOUND)?; - - // Allow stopping sync regardless of current status to handle edge cases - // where the database status might be out of sync with actual running tasks - - // Stop sync using the universal source scheduler - if let Some(scheduler) = &state.source_scheduler { - if let Err(e) = scheduler.stop_sync(source_id).await { - let error_msg = e.to_string(); - // If no sync is running, treat it as success since the desired state is achieved - if error_msg.contains("No running sync found") { - info!("No sync was running for source {}, updating status to idle", source_id); - // Update status to idle since no sync is running - state - .db - .update_source_status( - source_id, - crate::models::SourceStatus::Idle, - None, - ) - .await - .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; - } else { - error!("Failed to stop sync for source {}: {}", source_id, e); - return Err(StatusCode::INTERNAL_SERVER_ERROR); - } - } - } else { - // Update status directly if no scheduler available (fallback) - state - .db - .update_source_status( - source_id, - crate::models::SourceStatus::Idle, - Some("Sync cancelled by user".to_string()), - ) - .await - .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; - } - - Ok(StatusCode::OK) -} - -#[utoipa::path( - post, - path = "/api/sources/{id}/test", - tag = "sources", - security( - ("bearer_auth" = []) - ), - params( - ("id" = Uuid, Path, description = "Source ID") - ), - responses( - (status = 200, description = "Connection test result", body = serde_json::Value), - (status = 401, description = "Unauthorized"), - (status = 404, description = "Source not found"), - (status = 500, description = "Internal server error") - ) -)] -async fn test_connection( - auth_user: AuthUser, - Path(source_id): Path, - State(state): State>, -) -> Result, StatusCode> { - let source = state - .db - .get_source(auth_user.user.id, source_id) - .await - .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)? - .ok_or(StatusCode::NOT_FOUND)?; - - match source.source_type { - crate::models::SourceType::WebDAV => { - // Test WebDAV connection - let config: crate::models::WebDAVSourceConfig = serde_json::from_value(source.config) - .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; - - match crate::services::webdav::test_webdav_connection( - &config.server_url, - &config.username, - &config.password, - ) - .await - { - Ok(success) => Ok(Json(serde_json::json!({ - "success": success, - "message": if success { "Connection successful" } else { "Connection failed" } - }))), - Err(e) => Ok(Json(serde_json::json!({ - "success": false, - "message": format!("Connection failed: {}", e) - }))), - } - } - crate::models::SourceType::LocalFolder => { - // Test Local Folder access - let config: crate::models::LocalFolderSourceConfig = serde_json::from_value(source.config) - .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; - - match crate::services::local_folder_service::LocalFolderService::new(config) { - Ok(service) => { - match service.test_connection().await { - Ok(message) => Ok(Json(serde_json::json!({ - "success": true, - "message": message - }))), - Err(e) => Ok(Json(serde_json::json!({ - "success": false, - "message": format!("Local folder test failed: {}", e) - }))), - } - } - Err(e) => Ok(Json(serde_json::json!({ - "success": false, - "message": format!("Local folder configuration error: {}", e) - }))), - } - } - crate::models::SourceType::S3 => { - // Test S3 connection - let config: crate::models::S3SourceConfig = serde_json::from_value(source.config) - .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; - - match crate::services::s3_service::S3Service::new(config).await { - Ok(service) => { - match service.test_connection().await { - Ok(message) => Ok(Json(serde_json::json!({ - "success": true, - "message": message - }))), - Err(e) => Ok(Json(serde_json::json!({ - "success": false, - "message": format!("S3 test failed: {}", e) - }))), - } - } - Err(e) => Ok(Json(serde_json::json!({ - "success": false, - "message": format!("S3 configuration error: {}", e) - }))), - } - } - } -} - -fn validate_source_config(source: &CreateSource) -> Result<(), &'static str> { - validate_config_for_type(&source.source_type, &source.config) -} - -fn validate_config_for_type( - source_type: &crate::models::SourceType, - config: &serde_json::Value, -) -> Result<(), &'static str> { - match source_type { - crate::models::SourceType::WebDAV => { - let _: crate::models::WebDAVSourceConfig = - serde_json::from_value(config.clone()).map_err(|_| "Invalid WebDAV configuration")?; - Ok(()) - } - crate::models::SourceType::LocalFolder => { - let _: crate::models::LocalFolderSourceConfig = - serde_json::from_value(config.clone()).map_err(|_| "Invalid Local Folder configuration")?; - Ok(()) - } - crate::models::SourceType::S3 => { - let _: crate::models::S3SourceConfig = - serde_json::from_value(config.clone()).map_err(|_| "Invalid S3 configuration")?; - Ok(()) - } - } -} - -#[utoipa::path( - post, - path = "/api/sources/{id}/estimate", - tag = "sources", - security( - ("bearer_auth" = []) - ), - params( - ("id" = Uuid, Path, description = "Source ID") - ), - responses( - (status = 200, description = "Crawl estimate result", body = serde_json::Value), - (status = 401, description = "Unauthorized"), - (status = 404, description = "Source not found"), - (status = 500, description = "Internal server error") - ) -)] -async fn estimate_crawl( - auth_user: AuthUser, - Path(source_id): Path, - State(state): State>, -) -> Result, StatusCode> { - let source = state - .db - .get_source(auth_user.user.id, source_id) - .await - .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)? - .ok_or(StatusCode::NOT_FOUND)?; - - match source.source_type { - crate::models::SourceType::WebDAV => { - let config: crate::models::WebDAVSourceConfig = serde_json::from_value(source.config) - .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; - - estimate_webdav_crawl_internal(&config).await - } - _ => Ok(Json(serde_json::json!({ - "error": "Source type not supported for estimation" - }))), - } -} - -#[utoipa::path( - post, - path = "/api/sources/estimate", - tag = "sources", - security( - ("bearer_auth" = []) - ), - request_body = serde_json::Value, - responses( - (status = 200, description = "Crawl estimate result", body = serde_json::Value), - (status = 400, description = "Bad request - invalid configuration"), - (status = 401, description = "Unauthorized"), - (status = 500, description = "Internal server error") - ) -)] -async fn estimate_crawl_with_config( - _auth_user: AuthUser, - State(_state): State>, - Json(config_data): Json, -) -> Result, StatusCode> { - // Parse the WebDAV config from the request - let config: crate::models::WebDAVSourceConfig = serde_json::from_value(config_data) - .map_err(|_| StatusCode::BAD_REQUEST)?; - - estimate_webdav_crawl_internal(&config).await -} - -async fn estimate_webdav_crawl_internal( - config: &crate::models::WebDAVSourceConfig, -) -> Result, StatusCode> { - // Create WebDAV service config - let webdav_config = crate::services::webdav::WebDAVConfig { - server_url: config.server_url.clone(), - username: config.username.clone(), - password: config.password.clone(), - watch_folders: config.watch_folders.clone(), - file_extensions: config.file_extensions.clone(), - timeout_seconds: 300, - server_type: config.server_type.clone(), - }; - - // Create WebDAV service and estimate crawl - match crate::services::webdav::WebDAVService::new(webdav_config) { - Ok(webdav_service) => { - match webdav_service.estimate_crawl(&config.watch_folders).await { - Ok(estimate) => Ok(Json(serde_json::to_value(estimate).unwrap())), - Err(e) => Ok(Json(serde_json::json!({ - "error": format!("Crawl estimation failed: {}", e), - "folders": [], - "total_files": 0, - "total_supported_files": 0, - "total_estimated_time_hours": 0.0, - "total_size_mb": 0.0, - }))), - } - } - Err(e) => Ok(Json(serde_json::json!({ - "error": format!("Failed to create WebDAV service: {}", e), - "folders": [], - "total_files": 0, - "total_supported_files": 0, - "total_estimated_time_hours": 0.0, - "total_size_mb": 0.0, - }))), - } -} - -#[derive(serde::Deserialize, utoipa::ToSchema)] -struct TestConnectionRequest { - source_type: crate::models::SourceType, - config: serde_json::Value, -} - -#[utoipa::path( - post, - path = "/api/sources/test-connection", - tag = "sources", - security( - ("bearer_auth" = []) - ), - request_body = TestConnectionRequest, - responses( - (status = 200, description = "Connection test result", body = serde_json::Value), - (status = 400, description = "Bad request - invalid configuration"), - (status = 401, description = "Unauthorized"), - (status = 500, description = "Internal server error") - ) -)] -async fn test_connection_with_config( - _auth_user: AuthUser, - State(_state): State>, - Json(request): Json, -) -> Result, StatusCode> { - match request.source_type { - crate::models::SourceType::WebDAV => { - // Test WebDAV connection - let config: crate::models::WebDAVSourceConfig = serde_json::from_value(request.config) - .map_err(|_| StatusCode::BAD_REQUEST)?; - - match crate::services::webdav::test_webdav_connection( - &config.server_url, - &config.username, - &config.password, - ) - .await - { - Ok(success) => Ok(Json(serde_json::json!({ - "success": success, - "message": if success { "WebDAV connection successful" } else { "WebDAV connection failed" } - }))), - Err(e) => Ok(Json(serde_json::json!({ - "success": false, - "message": format!("WebDAV connection failed: {}", e) - }))), - } - } - crate::models::SourceType::LocalFolder => { - // Test Local Folder access - let config: crate::models::LocalFolderSourceConfig = serde_json::from_value(request.config) - .map_err(|_| StatusCode::BAD_REQUEST)?; - - match crate::services::local_folder_service::LocalFolderService::new(config) { - Ok(service) => { - match service.test_connection().await { - Ok(message) => Ok(Json(serde_json::json!({ - "success": true, - "message": message - }))), - Err(e) => Ok(Json(serde_json::json!({ - "success": false, - "message": format!("Local folder test failed: {}", e) - }))), - } - } - Err(e) => Ok(Json(serde_json::json!({ - "success": false, - "message": format!("Local folder configuration error: {}", e) - }))), - } - } - crate::models::SourceType::S3 => { - // Test S3 connection - let config: crate::models::S3SourceConfig = serde_json::from_value(request.config) - .map_err(|_| StatusCode::BAD_REQUEST)?; - - match crate::services::s3_service::S3Service::new(config).await { - Ok(service) => { - match service.test_connection().await { - Ok(message) => Ok(Json(serde_json::json!({ - "success": true, - "message": message - }))), - Err(e) => Ok(Json(serde_json::json!({ - "success": false, - "message": format!("S3 test failed: {}", e) - }))), - } - } - Err(e) => Ok(Json(serde_json::json!({ - "success": false, - "message": format!("S3 configuration error: {}", e) - }))), - } - } - } -} \ No newline at end of file diff --git a/src/routes/users.rs b/src/routes/users.rs index c3ce835..a9b539f 100644 --- a/src/routes/users.rs +++ b/src/routes/users.rs @@ -2,7 +2,7 @@ use axum::{ extract::{Path, State}, http::StatusCode, response::Json, - routing::{get, post, put, delete}, + routing::get, Router, }; use std::sync::Arc; diff --git a/src/scheduling/source_scheduler.rs b/src/scheduling/source_scheduler.rs index 1f17923..ca74ec2 100644 --- a/src/scheduling/source_scheduler.rs +++ b/src/scheduling/source_scheduler.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use std::time::Duration; use std::collections::HashMap; use tokio::time::interval; -use tokio::sync::{Mutex, RwLock}; +use tokio::sync::RwLock; use tokio_util::sync::CancellationToken; use tracing::{error, info, warn}; use chrono::Utc; diff --git a/src/scheduling/source_sync.rs b/src/scheduling/source_sync.rs index 6d6dda4..de56f95 100644 --- a/src/scheduling/source_sync.rs +++ b/src/scheduling/source_sync.rs @@ -1,11 +1,10 @@ use std::sync::Arc; use std::path::Path; use anyhow::{anyhow, Result}; -use chrono::Utc; use tokio::sync::Semaphore; use tokio_util::sync::CancellationToken; use futures::stream::{FuturesUnordered, StreamExt}; -use tracing::{debug, error, info, warn}; +use tracing::{debug, error, info}; use uuid::Uuid; use crate::{ diff --git a/src/scheduling/webdav_scheduler.rs b/src/scheduling/webdav_scheduler.rs index d942339..4a50bb8 100644 --- a/src/scheduling/webdav_scheduler.rs +++ b/src/scheduling/webdav_scheduler.rs @@ -5,8 +5,6 @@ use tracing::{error, info, warn}; use crate::{ db::Database, - ocr::queue::OcrQueueService, - services::file_service::FileService, AppState, }; use crate::services::webdav::{WebDAVConfig, WebDAVService}; diff --git a/src/services/file_service.rs b/src/services/file_service.rs index e4e4e6e..6a293cb 100644 --- a/src/services/file_service.rs +++ b/src/services/file_service.rs @@ -1,7 +1,6 @@ use anyhow::Result; use chrono::Utc; use std::path::{Path, PathBuf}; -use std::panic::{catch_unwind, AssertUnwindSafe}; use tokio::fs; use uuid::Uuid; use tracing::{info, warn, error}; @@ -9,7 +8,7 @@ use tracing::{info, warn, error}; use crate::models::Document; #[cfg(feature = "ocr")] -use image::{DynamicImage, ImageFormat, imageops::FilterType, Rgb, RgbImage, Rgba, ImageBuffer}; +use image::{DynamicImage, ImageFormat, imageops::FilterType}; #[derive(Clone)] pub struct FileService { diff --git a/src/services/local_folder_service.rs b/src/services/local_folder_service.rs index 1aca90f..4ad2b23 100644 --- a/src/services/local_folder_service.rs +++ b/src/services/local_folder_service.rs @@ -1,7 +1,7 @@ use std::path::Path; use std::fs; use anyhow::{anyhow, Result}; -use chrono::{DateTime, Utc}; +use chrono::DateTime; use tracing::{debug, info, warn}; use walkdir::WalkDir; use sha2::{Sha256, Digest}; diff --git a/src/services/s3_service.rs b/src/services/s3_service.rs index d3e1f88..08f175d 100644 --- a/src/services/s3_service.rs +++ b/src/services/s3_service.rs @@ -1,12 +1,12 @@ use anyhow::{anyhow, Result}; -use chrono::{DateTime, Utc}; -use tracing::{debug, error, info, warn}; +use chrono::DateTime; +use tracing::{debug, info, warn}; use serde_json; #[cfg(feature = "s3")] use aws_sdk_s3::Client; #[cfg(feature = "s3")] -use aws_config::{BehaviorVersion, load_from_env}; +use aws_config::load_from_env; #[cfg(feature = "s3")] use aws_credential_types::Credentials; #[cfg(feature = "s3")] diff --git a/src/services/webdav/config.rs b/src/services/webdav/config.rs index 528d725..1706452 100644 --- a/src/services/webdav/config.rs +++ b/src/services/webdav/config.rs @@ -1,4 +1,3 @@ -use serde::{Deserialize, Serialize}; /// WebDAV server configuration #[derive(Debug, Clone)] diff --git a/src/services/webdav/discovery.rs b/src/services/webdav/discovery.rs index e5d3e63..b687e03 100644 --- a/src/services/webdav/discovery.rs +++ b/src/services/webdav/discovery.rs @@ -1,9 +1,9 @@ -use anyhow::{anyhow, Result}; +use anyhow::Result; use reqwest::Method; use std::collections::HashSet; use tokio::sync::Semaphore; use futures_util::stream::{self, StreamExt}; -use tracing::{debug, error, info, warn}; +use tracing::{debug, info, warn}; use crate::models::{FileInfo, WebDAVCrawlEstimate, WebDAVFolderInfo}; use crate::webdav_xml_parser::{parse_propfind_response, parse_propfind_response_with_directories}; diff --git a/src/services/webdav/service.rs b/src/services/webdav/service.rs index 9b15a0b..3205798 100644 --- a/src/services/webdav/service.rs +++ b/src/services/webdav/service.rs @@ -1,7 +1,7 @@ use anyhow::{anyhow, Result}; use std::sync::Arc; use tokio::sync::Semaphore; -use tracing::{debug, error, info, warn}; +use tracing::{debug, error, info}; use crate::models::{ FileInfo, WebDAVConnectionResult, WebDAVCrawlEstimate, WebDAVTestConnection, diff --git a/src/services/webdav/validation.rs b/src/services/webdav/validation.rs index a4b9f90..f2652af 100644 --- a/src/services/webdav/validation.rs +++ b/src/services/webdav/validation.rs @@ -1,7 +1,7 @@ -use anyhow::{anyhow, Result}; +use anyhow::Result; use serde::{Deserialize, Serialize}; use std::collections::HashMap; -use tracing::{debug, error, info, warn}; +use tracing::{debug, info, warn}; use super::config::WebDAVConfig; use super::connection::WebDAVConnection; diff --git a/src/utils/debug.rs b/src/utils/debug.rs index 7956f44..723fd7c 100644 --- a/src/utils/debug.rs +++ b/src/utils/debug.rs @@ -1,5 +1,5 @@ use std::env; -use tracing::{debug, info, warn, error}; +use tracing::{info, warn, error}; /// Check if DEBUG environment variable is set to enable verbose debug output pub fn is_debug_enabled() -> bool {