From 68ceb1f9cb508e54cf627f69dc53523383d89434 Mon Sep 17 00:00:00 2001 From: perf3ct Date: Fri, 1 Aug 2025 00:27:13 +0000 Subject: [PATCH] feat(storage): implement s3 for storage --- src/bin/migrate_to_s3.rs | 244 ++++++++++++++++++++ src/config.rs | 61 +++++ src/db/documents/crud.rs | 17 ++ src/ingestion/document_ingestion.rs | 12 +- src/main.rs | 45 +++- src/services/file_service.rs | 146 +++++++++++- src/services/s3_service.rs | 332 +++++++++++++++++++++++++++- src/test_utils.rs | 4 + tests/s3_storage_tests.rs | 78 +++++++ 9 files changed, 924 insertions(+), 15 deletions(-) create mode 100644 src/bin/migrate_to_s3.rs create mode 100644 tests/s3_storage_tests.rs diff --git a/src/bin/migrate_to_s3.rs b/src/bin/migrate_to_s3.rs new file mode 100644 index 0000000..83f9dbd --- /dev/null +++ b/src/bin/migrate_to_s3.rs @@ -0,0 +1,244 @@ +//! Migration utility to move existing local files to S3 storage +//! +//! Usage: cargo run --bin migrate_to_s3 --features s3 +//! +//! This utility will: +//! 1. Connect to the database +//! 2. Find all documents with local file paths +//! 3. Upload files to S3 with proper structure +//! 4. Update database records with S3 paths +//! 5. Optionally delete local files after successful upload + +use anyhow::Result; +use clap::Parser; +use std::path::Path; +use uuid::Uuid; +use tracing::{info, warn, error}; + +use readur::{ + config::Config, + db::Database, + services::{s3_service::S3Service, file_service::FileService}, +}; + +#[derive(Parser)] +#[command(name = "migrate_to_s3")] +#[command(about = "Migrate existing local files to S3 storage")] +struct Args { + /// Dry run - only show what would be migrated + #[arg(short, long)] + dry_run: bool, + + /// Delete local files after successful S3 upload + #[arg(long)] + delete_local: bool, + + /// Limit number of files to migrate (for testing) + #[arg(short, long)] + limit: Option, + + /// Only migrate files for specific user ID + #[arg(short, long)] + user_id: Option, +} + +#[tokio::main] +async fn main() -> Result<()> { + // Initialize logging + tracing_subscriber::fmt() + .with_env_filter("info") + .init(); + + let args = Args::parse(); + + info!("šŸš€ Starting S3 migration utility"); + + // Load configuration + let config = Config::from_env()?; + + if !config.s3_enabled { + error!("S3 is not enabled in configuration. Set S3_ENABLED=true and provide S3 configuration."); + std::process::exit(1); + } + + let s3_config = config.s3_config.as_ref() + .ok_or_else(|| anyhow::anyhow!("S3 configuration not found"))?; + + // Connect to database + info!("šŸ“Š Connecting to database..."); + let db = Database::new(&config.database_url).await?; + + // Initialize S3 service + info!("ā˜ļø Initializing S3 service..."); + let s3_service = S3Service::new(s3_config.clone()).await?; + + // Test S3 connection + match s3_service.test_connection().await { + Ok(_) => info!("āœ… S3 connection successful"), + Err(e) => { + error!("āŒ S3 connection failed: {}", e); + std::process::exit(1); + } + } + + // Get documents to migrate + info!("šŸ” Finding documents to migrate..."); + let mut documents = if let Some(user_id_str) = &args.user_id { + let user_id = Uuid::parse_str(user_id_str)?; + db.get_documents_by_user(user_id, args.limit.unwrap_or(1000) as i64, 0).await? + } else { + // Get all documents (this might need pagination for large datasets) + let all_users = db.get_all_users().await?; + let mut all_docs = Vec::new(); + + for user in all_users { + let user_docs = db.get_documents_by_user(user.id, 500, 0).await?; + all_docs.extend(user_docs); + + if let Some(limit) = args.limit { + if all_docs.len() >= limit { + all_docs.truncate(limit); + break; + } + } + } + all_docs + }; + + // Filter documents that are not already in S3 + let local_documents: Vec<_> = documents.into_iter() + .filter(|doc| !doc.file_path.starts_with("s3://")) + .collect(); + + info!("šŸ“‹ Found {} documents with local file paths", local_documents.len()); + + if local_documents.is_empty() { + info!("āœ… No local documents found to migrate"); + return Ok(()); + } + + if args.dry_run { + info!("šŸ” DRY RUN - Would migrate the following files:"); + for doc in &local_documents { + info!(" - {} (User: {}, Size: {} bytes)", + doc.original_filename, doc.user_id, doc.file_size); + } + info!("šŸ’” Run without --dry-run to perform actual migration"); + return Ok(()); + } + + // Perform migration + let mut migrated_count = 0; + let mut failed_count = 0; + + for doc in local_documents { + info!("šŸ“¦ Migrating: {} ({})", doc.original_filename, doc.id); + + match migrate_document(&db, &s3_service, &doc, args.delete_local).await { + Ok(_) => { + migrated_count += 1; + info!("āœ… Successfully migrated: {}", doc.original_filename); + } + Err(e) => { + failed_count += 1; + error!("āŒ Failed to migrate {}: {}", doc.original_filename, e); + } + } + } + + info!("šŸŽ‰ Migration completed!"); + info!("āœ… Successfully migrated: {} files", migrated_count); + if failed_count > 0 { + warn!("āŒ Failed to migrate: {} files", failed_count); + } + + Ok(()) +} + +async fn migrate_document( + db: &Database, + s3_service: &S3Service, + document: &readur::models::Document, + delete_local: bool, +) -> Result<()> { + // Read local file + let local_path = Path::new(&document.file_path); + if !local_path.exists() { + return Err(anyhow::anyhow!("Local file not found: {}", document.file_path)); + } + + let file_data = tokio::fs::read(&local_path).await?; + + // Upload to S3 + let s3_key = s3_service.store_document( + document.user_id, + document.id, + &document.filename, + &file_data, + ).await?; + + let s3_path = format!("s3://{}", s3_key); + + // Update database record + db.update_document_file_path(document.id, &s3_path).await?; + + // Migrate associated files (thumbnails, processed images) + migrate_associated_files(s3_service, document, delete_local).await?; + + // Delete local file if requested + if delete_local { + if let Err(e) = tokio::fs::remove_file(&local_path).await { + warn!("Failed to delete local file {}: {}", document.file_path, e); + } else { + info!("šŸ—‘ļø Deleted local file: {}", document.file_path); + } + } + + Ok(()) +} + +async fn migrate_associated_files( + s3_service: &S3Service, + document: &readur::models::Document, + delete_local: bool, +) -> Result<()> { + let file_service = FileService::new("./uploads".to_string()); + + // Migrate thumbnail + let thumbnail_path = file_service.get_thumbnails_path().join(format!("{}_thumb.jpg", document.id)); + if thumbnail_path.exists() { + match tokio::fs::read(&thumbnail_path).await { + Ok(thumbnail_data) => { + if let Err(e) = s3_service.store_thumbnail(document.user_id, document.id, &thumbnail_data).await { + warn!("Failed to migrate thumbnail for {}: {}", document.id, e); + } else { + info!("šŸ“ø Migrated thumbnail for: {}", document.original_filename); + if delete_local { + let _ = tokio::fs::remove_file(&thumbnail_path).await; + } + } + } + Err(e) => warn!("Failed to read thumbnail {}: {}", thumbnail_path.display(), e), + } + } + + // Migrate processed image + let processed_path = file_service.get_processed_images_path().join(format!("{}_processed.png", document.id)); + if processed_path.exists() { + match tokio::fs::read(&processed_path).await { + Ok(processed_data) => { + if let Err(e) = s3_service.store_processed_image(document.user_id, document.id, &processed_data).await { + warn!("Failed to migrate processed image for {}: {}", document.id, e); + } else { + info!("šŸ–¼ļø Migrated processed image for: {}", document.original_filename); + if delete_local { + let _ = tokio::fs::remove_file(&processed_path).await; + } + } + } + Err(e) => warn!("Failed to read processed image {}: {}", processed_path.display(), e), + } + } + + Ok(()) +} \ No newline at end of file diff --git a/src/config.rs b/src/config.rs index 5cd3ed2..720046b 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,6 +1,8 @@ use anyhow::Result; use std::env; +use crate::models::S3SourceConfig; + #[derive(Clone, Debug)] pub struct Config { pub database_url: String, @@ -31,6 +33,10 @@ pub struct Config { pub oidc_client_secret: Option, pub oidc_issuer_url: Option, pub oidc_redirect_uri: Option, + + // S3 Configuration + pub s3_enabled: bool, + pub s3_config: Option, } impl Config { @@ -431,6 +437,61 @@ impl Config { None } }, + + // S3 Configuration + s3_enabled: match env::var("S3_ENABLED") { + Ok(val) => { + let enabled = val.to_lowercase() == "true"; + println!("āœ… S3_ENABLED: {} (loaded from env)", enabled); + enabled + } + Err(_) => { + println!("āš ļø S3_ENABLED: false (using default - env var not set)"); + false + } + }, + s3_config: if env::var("S3_ENABLED").unwrap_or_default().to_lowercase() == "true" { + // Only load S3 config if S3 is enabled + let bucket_name = env::var("S3_BUCKET_NAME").unwrap_or_default(); + let region = env::var("S3_REGION").unwrap_or_else(|_| "us-east-1".to_string()); + let access_key_id = env::var("S3_ACCESS_KEY_ID").unwrap_or_default(); + let secret_access_key = env::var("S3_SECRET_ACCESS_KEY").unwrap_or_default(); + let endpoint_url = env::var("S3_ENDPOINT_URL").ok(); + let prefix = env::var("S3_PREFIX").ok(); + + if !bucket_name.is_empty() && !access_key_id.is_empty() && !secret_access_key.is_empty() { + println!("āœ… S3_BUCKET_NAME: {} (loaded from env)", bucket_name); + println!("āœ… S3_REGION: {} (loaded from env)", region); + println!("āœ… S3_ACCESS_KEY_ID: {}***{} (loaded from env)", + &access_key_id[..2.min(access_key_id.len())], + &access_key_id[access_key_id.len().saturating_sub(2)..]); + println!("āœ… S3_SECRET_ACCESS_KEY: ***hidden*** (loaded from env, {} chars)", secret_access_key.len()); + if let Some(ref endpoint) = endpoint_url { + println!("āœ… S3_ENDPOINT_URL: {} (loaded from env)", endpoint); + } + if let Some(ref pref) = prefix { + println!("āœ… S3_PREFIX: {} (loaded from env)", pref); + } + + Some(S3SourceConfig { + bucket_name, + region, + access_key_id, + secret_access_key, + endpoint_url, + prefix, + watch_folders: vec![], // Will be configured separately for sources + file_extensions: vec![], // Will be configured separately for sources + auto_sync: false, // Not used for general storage + sync_interval_minutes: 0, // Not used for general storage + }) + } else { + println!("āŒ S3 enabled but missing required configuration (bucket_name, access_key_id, or secret_access_key)"); + None + } + } else { + None + }, }; println!("\nšŸ” CONFIGURATION VALIDATION:"); diff --git a/src/db/documents/crud.rs b/src/db/documents/crud.rs index 39d1b4a..e3ebc04 100644 --- a/src/db/documents/crud.rs +++ b/src/db/documents/crud.rs @@ -201,4 +201,21 @@ impl Database { Ok(rows.iter().map(map_row_to_document).collect()) } + + /// Update document file path (useful for S3 migration) + pub async fn update_document_file_path(&self, document_id: Uuid, new_file_path: &str) -> Result<()> { + sqlx::query( + r#" + UPDATE documents + SET file_path = $2, updated_at = NOW() + WHERE id = $1 + "# + ) + .bind(document_id) + .bind(new_file_path) + .execute(&self.pool) + .await?; + + Ok(()) + } } \ No newline at end of file diff --git a/src/ingestion/document_ingestion.rs b/src/ingestion/document_ingestion.rs index 9cecfbe..a5daec7 100644 --- a/src/ingestion/document_ingestion.rs +++ b/src/ingestion/document_ingestion.rs @@ -168,9 +168,12 @@ impl DocumentIngestionService { } } - // Save file to storage + // Generate document ID upfront so we can use it for storage path + let document_id = Uuid::new_v4(); + + // Save file to storage - use S3 if configured, otherwise local storage let file_path = match self.file_service - .save_file(&request.filename, &request.file_data) + .save_document_file(request.user_id, document_id, &request.filename, &request.file_data) .await { Ok(path) => path, Err(e) => { @@ -212,8 +215,9 @@ impl DocumentIngestionService { } }; - // Create document record - let document = self.file_service.create_document( + // Create document record with the same ID used for storage + let document = self.file_service.create_document_with_id( + document_id, &request.filename, &request.original_filename, &file_path, diff --git a/src/main.rs b/src/main.rs index 80b7878..09de930 100644 --- a/src/main.rs +++ b/src/main.rs @@ -121,14 +121,45 @@ async fn main() -> anyhow::Result<()> { println!("šŸ“ Upload directory: {}", config.upload_path); println!("šŸ‘ļø Watch directory: {}", config.watch_folder); - // Initialize upload directory structure - info!("Initializing upload directory structure..."); - let file_service = readur::services::file_service::FileService::new(config.upload_path.clone()); - if let Err(e) = file_service.initialize_directory_structure().await { - error!("Failed to initialize directory structure: {}", e); - return Err(e.into()); + // Initialize file service with S3 support if configured + info!("Initializing file service..."); + let file_service = if config.s3_enabled { + if let Some(s3_config) = &config.s3_config { + info!("S3 storage enabled, initializing S3Service..."); + match readur::services::s3_service::S3Service::new(s3_config.clone()).await { + Ok(s3_service) => { + info!("āœ… S3Service initialized successfully"); + readur::services::file_service::FileService::new_with_s3( + config.upload_path.clone(), + Arc::new(s3_service) + ) + } + Err(e) => { + error!("Failed to initialize S3Service: {}", e); + warn!("Falling back to local storage only"); + readur::services::file_service::FileService::new(config.upload_path.clone()) + } + } + } else { + warn!("S3 enabled but no S3 configuration provided, using local storage"); + readur::services::file_service::FileService::new(config.upload_path.clone()) + } + } else { + info!("Using local file storage"); + readur::services::file_service::FileService::new(config.upload_path.clone()) + }; + + if !file_service.is_s3_enabled() { + // Only initialize local directory structure if not using S3 + info!("Initializing local upload directory structure..."); + if let Err(e) = file_service.initialize_directory_structure().await { + error!("Failed to initialize directory structure: {}", e); + return Err(e.into()); + } + info!("āœ… Local upload directory structure initialized"); + } else { + info!("āœ… File service initialized with S3 storage backend"); } - info!("āœ… Upload directory structure initialized"); // Migrate existing files to new structure (one-time operation) info!("Migrating existing files to structured directories..."); diff --git a/src/services/file_service.rs b/src/services/file_service.rs index 514083d..5174a4a 100644 --- a/src/services/file_service.rs +++ b/src/services/file_service.rs @@ -1,11 +1,13 @@ use anyhow::Result; use chrono::Utc; use std::path::{Path, PathBuf}; +use std::sync::Arc; use tokio::fs; use uuid::Uuid; use tracing::{info, warn, error}; use crate::models::Document; +use crate::services::s3_service::S3Service; #[cfg(feature = "ocr")] use image::{DynamicImage, ImageFormat, imageops::FilterType}; @@ -13,11 +15,27 @@ use image::{DynamicImage, ImageFormat, imageops::FilterType}; #[derive(Clone)] pub struct FileService { upload_path: String, + s3_service: Option>, } impl FileService { pub fn new(upload_path: String) -> Self { - Self { upload_path } + Self { + upload_path, + s3_service: None, + } + } + + pub fn new_with_s3(upload_path: String, s3_service: Arc) -> Self { + Self { + upload_path, + s3_service: Some(s3_service), + } + } + + /// Check if S3 storage is enabled + pub fn is_s3_enabled(&self) -> bool { + self.s3_service.is_some() } /// Initialize the upload directory structure @@ -148,6 +166,67 @@ impl FileService { Ok(file_path.to_string_lossy().to_string()) } + /// Save file for a specific document (works with both local and S3) + pub async fn save_document_file(&self, user_id: Uuid, document_id: Uuid, filename: &str, data: &[u8]) -> Result { + if let Some(s3_service) = &self.s3_service { + // Use S3 storage + let s3_key = s3_service.store_document(user_id, document_id, filename, data).await?; + info!("Saved document to S3: {}", s3_key); + Ok(format!("s3://{}", s3_key)) + } else { + // Use local storage + self.save_file(filename, data).await + } + } + + /// Save thumbnail (works with both local and S3) + pub async fn save_thumbnail(&self, user_id: Uuid, document_id: Uuid, data: &[u8]) -> Result { + if let Some(s3_service) = &self.s3_service { + // Use S3 storage + let s3_key = s3_service.store_thumbnail(user_id, document_id, data).await?; + info!("Saved thumbnail to S3: {}", s3_key); + Ok(format!("s3://{}", s3_key)) + } else { + // Use local storage + let thumbnails_dir = self.get_thumbnails_path(); + if let Err(e) = fs::create_dir_all(&thumbnails_dir).await { + error!("Failed to create thumbnails directory: {}", e); + return Err(anyhow::anyhow!("Failed to create thumbnails directory: {}", e)); + } + + let thumbnail_filename = format!("{}_thumb.jpg", document_id); + let thumbnail_path = thumbnails_dir.join(&thumbnail_filename); + + fs::write(&thumbnail_path, data).await?; + info!("Saved thumbnail locally: {}", thumbnail_path.display()); + Ok(thumbnail_path.to_string_lossy().to_string()) + } + } + + /// Save processed image (works with both local and S3) + pub async fn save_processed_image(&self, user_id: Uuid, document_id: Uuid, data: &[u8]) -> Result { + if let Some(s3_service) = &self.s3_service { + // Use S3 storage + let s3_key = s3_service.store_processed_image(user_id, document_id, data).await?; + info!("Saved processed image to S3: {}", s3_key); + Ok(format!("s3://{}", s3_key)) + } else { + // Use local storage + let processed_dir = self.get_processed_images_path(); + if let Err(e) = fs::create_dir_all(&processed_dir).await { + error!("Failed to create processed images directory: {}", e); + return Err(anyhow::anyhow!("Failed to create processed images directory: {}", e)); + } + + let processed_filename = format!("{}_processed.png", document_id); + let processed_path = processed_dir.join(&processed_filename); + + fs::write(&processed_path, data).await?; + info!("Saved processed image locally: {}", processed_path.display()); + Ok(processed_path.to_string_lossy().to_string()) + } + } + pub fn create_document( &self, filename: &str, @@ -166,9 +245,50 @@ impl FileService { file_owner: Option, file_group: Option, source_metadata: Option, + ) -> Document { + self.create_document_with_id( + Uuid::new_v4(), + filename, + original_filename, + file_path, + file_size, + mime_type, + user_id, + file_hash, + original_created_at, + original_modified_at, + source_path, + source_type, + source_id, + file_permissions, + file_owner, + file_group, + source_metadata, + ) + } + + pub fn create_document_with_id( + &self, + document_id: Uuid, + filename: &str, + original_filename: &str, + file_path: &str, + file_size: i64, + mime_type: &str, + user_id: Uuid, + file_hash: Option, + original_created_at: Option>, + original_modified_at: Option>, + source_path: Option, + source_type: Option, + source_id: Option, + file_permissions: Option, + file_owner: Option, + file_group: Option, + source_metadata: Option, ) -> Document { Document { - id: Uuid::new_v4(), + id: document_id, filename: filename.to_string(), original_filename: original_filename.to_string(), file_path: file_path.to_string(), @@ -243,6 +363,17 @@ impl FileService { } pub async fn read_file(&self, file_path: &str) -> Result> { + // Check if this is an S3 path + if file_path.starts_with("s3://") { + if let Some(s3_service) = &self.s3_service { + let s3_key = file_path.strip_prefix("s3://").unwrap_or(file_path); + return s3_service.retrieve_file(s3_key).await; + } else { + return Err(anyhow::anyhow!("S3 path provided but S3 service not configured: {}", file_path)); + } + } + + // Handle local file path let resolved_path = self.resolve_file_path(file_path).await?; let data = fs::read(&resolved_path).await?; Ok(data) @@ -508,6 +639,17 @@ impl FileService { } pub async fn delete_document_files(&self, document: &Document) -> Result<()> { + // Check if this document uses S3 storage + if document.file_path.starts_with("s3://") { + if let Some(s3_service) = &self.s3_service { + // Use S3 deletion + return s3_service.delete_document_files(document.user_id, document.id, &document.filename).await; + } else { + return Err(anyhow::anyhow!("Document stored in S3 but S3 service not configured")); + } + } + + // Handle local file deletion let mut deleted_files = Vec::new(); let mut serious_errors = Vec::new(); diff --git a/src/services/s3_service.rs b/src/services/s3_service.rs index 0973233..85abe38 100644 --- a/src/services/s3_service.rs +++ b/src/services/s3_service.rs @@ -1,7 +1,10 @@ use anyhow::{anyhow, Result}; -use chrono::DateTime; -use tracing::{debug, info, warn}; +use chrono::{DateTime, Datelike}; +use tracing::{debug, info, warn, error}; use serde_json; +use std::collections::HashMap; +use std::time::Duration; +use uuid::Uuid; #[cfg(feature = "s3")] use aws_sdk_s3::Client; @@ -9,6 +12,8 @@ use aws_sdk_s3::Client; use aws_credential_types::Credentials; #[cfg(feature = "s3")] use aws_types::region::Region as AwsRegion; +#[cfg(feature = "s3")] +use aws_sdk_s3::primitives::ByteStream; use crate::models::{FileIngestionInfo, S3SourceConfig}; @@ -325,6 +330,329 @@ impl S3Service { pub fn get_config(&self) -> &S3SourceConfig { &self.config } + + // ======================================== + // DIRECT STORAGE OPERATIONS + // ======================================== + + /// Store a file directly to S3 with structured path + pub async fn store_document(&self, user_id: Uuid, document_id: Uuid, filename: &str, data: &[u8]) -> Result { + #[cfg(not(feature = "s3"))] + { + return Err(anyhow!("S3 support not compiled in")); + } + + #[cfg(feature = "s3")] + { + let key = self.generate_document_key(user_id, document_id, filename); + self.store_file(&key, data, None).await?; + Ok(key) + } + } + + /// Store a thumbnail to S3 + pub async fn store_thumbnail(&self, user_id: Uuid, document_id: Uuid, data: &[u8]) -> Result { + #[cfg(not(feature = "s3"))] + { + return Err(anyhow!("S3 support not compiled in")); + } + + #[cfg(feature = "s3")] + { + let key = format!("thumbnails/{}/{}_thumb.jpg", user_id, document_id); + self.store_file(&key, data, Some(self.get_image_metadata())).await?; + Ok(key) + } + } + + /// Store a processed image to S3 + pub async fn store_processed_image(&self, user_id: Uuid, document_id: Uuid, data: &[u8]) -> Result { + #[cfg(not(feature = "s3"))] + { + return Err(anyhow!("S3 support not compiled in")); + } + + #[cfg(feature = "s3")] + { + let key = format!("processed_images/{}/{}_processed.png", user_id, document_id); + self.store_file(&key, data, Some(self.get_image_metadata())).await?; + Ok(key) + } + } + + /// Generic file storage method + async fn store_file(&self, key: &str, data: &[u8], metadata: Option>) -> Result<()> { + #[cfg(not(feature = "s3"))] + { + return Err(anyhow!("S3 support not compiled in")); + } + + #[cfg(feature = "s3")] + { + info!("Storing file to S3: {}/{}", self.config.bucket_name, key); + + let key_owned = key.to_string(); + let data_owned = data.to_vec(); + let metadata_owned = metadata.clone(); + let bucket_name = self.config.bucket_name.clone(); + let client = self.client.clone(); + + self.retry_operation(&format!("store_file: {}", key), || { + let key = key_owned.clone(); + let data = data_owned.clone(); + let metadata = metadata_owned.clone(); + let bucket_name = bucket_name.clone(); + let client = client.clone(); + let content_type = self.get_content_type_from_key(&key); + + async move { + let mut put_request = client + .put_object() + .bucket(&bucket_name) + .key(&key) + .body(ByteStream::from(data)); + + // Add metadata if provided + if let Some(meta) = metadata { + for (k, v) in meta { + put_request = put_request.metadata(k, v); + } + } + + // Set content type based on file extension + if let Some(ct) = content_type { + put_request = put_request.content_type(ct); + } + + put_request.send().await + .map_err(|e| anyhow!("Failed to store file {}: {}", key, e))?; + + Ok(()) + } + }).await?; + + info!("Successfully stored file: {}", key); + Ok(()) + } + } + + /// Retrieve a file from S3 + pub async fn retrieve_file(&self, key: &str) -> Result> { + #[cfg(not(feature = "s3"))] + { + return Err(anyhow!("S3 support not compiled in")); + } + + #[cfg(feature = "s3")] + { + info!("Retrieving file from S3: {}/{}", self.config.bucket_name, key); + + let key_owned = key.to_string(); + let bucket_name = self.config.bucket_name.clone(); + let client = self.client.clone(); + + let bytes = self.retry_operation(&format!("retrieve_file: {}", key), || { + let key = key_owned.clone(); + let bucket_name = bucket_name.clone(); + let client = client.clone(); + + async move { + let response = client + .get_object() + .bucket(&bucket_name) + .key(&key) + .send() + .await + .map_err(|e| anyhow!("Failed to retrieve file {}: {}", key, e))?; + + let body = response.body.collect().await + .map_err(|e| anyhow!("Failed to read file body: {}", e))?; + + Ok(body.into_bytes().to_vec()) + } + }).await?; + + info!("Successfully retrieved file: {} ({} bytes)", key, bytes.len()); + Ok(bytes) + } + } + + /// Delete a file from S3 + pub async fn delete_file(&self, key: &str) -> Result<()> { + #[cfg(not(feature = "s3"))] + { + return Err(anyhow!("S3 support not compiled in")); + } + + #[cfg(feature = "s3")] + { + info!("Deleting file from S3: {}/{}", self.config.bucket_name, key); + + self.client + .delete_object() + .bucket(&self.config.bucket_name) + .key(key) + .send() + .await + .map_err(|e| anyhow!("Failed to delete file {}: {}", key, e))?; + + info!("Successfully deleted file: {}", key); + Ok(()) + } + } + + /// Check if a file exists in S3 + pub async fn file_exists(&self, key: &str) -> Result { + #[cfg(not(feature = "s3"))] + { + return Err(anyhow!("S3 support not compiled in")); + } + + #[cfg(feature = "s3")] + { + match self.client + .head_object() + .bucket(&self.config.bucket_name) + .key(key) + .send() + .await + { + Ok(_) => Ok(true), + Err(e) => { + let error_msg = e.to_string(); + if error_msg.contains("NotFound") || error_msg.contains("404") { + Ok(false) + } else { + Err(anyhow!("Failed to check file existence {}: {}", key, e)) + } + } + } + } + } + + /// Delete all files for a document (document, thumbnail, processed image) + pub async fn delete_document_files(&self, user_id: Uuid, document_id: Uuid, filename: &str) -> Result<()> { + #[cfg(not(feature = "s3"))] + { + return Err(anyhow!("S3 support not compiled in")); + } + + #[cfg(feature = "s3")] + { + let document_key = self.generate_document_key(user_id, document_id, filename); + let thumbnail_key = format!("thumbnails/{}/{}_thumb.jpg", user_id, document_id); + let processed_key = format!("processed_images/{}/{}_processed.png", user_id, document_id); + + let mut errors = Vec::new(); + + // Delete document file + if let Err(e) = self.delete_file(&document_key).await { + if !e.to_string().contains("NotFound") { + errors.push(format!("Document: {}", e)); + } + } + + // Delete thumbnail + if let Err(e) = self.delete_file(&thumbnail_key).await { + if !e.to_string().contains("NotFound") { + errors.push(format!("Thumbnail: {}", e)); + } + } + + // Delete processed image + if let Err(e) = self.delete_file(&processed_key).await { + if !e.to_string().contains("NotFound") { + errors.push(format!("Processed image: {}", e)); + } + } + + if !errors.is_empty() { + return Err(anyhow!("Failed to delete some files: {}", errors.join("; "))); + } + + info!("Successfully deleted all files for document {}", document_id); + Ok(()) + } + } + + // ======================================== + // HELPER METHODS + // ======================================== + + /// Generate a structured S3 key for a document + fn generate_document_key(&self, user_id: Uuid, document_id: Uuid, filename: &str) -> String { + let now = chrono::Utc::now(); + let year = now.year(); + let month = now.month(); + + // Extract file extension + let extension = std::path::Path::new(filename) + .extension() + .and_then(|ext| ext.to_str()) + .unwrap_or(""); + + if extension.is_empty() { + format!("documents/{}/{:04}/{:02}/{}", user_id, year, month, document_id) + } else { + format!("documents/{}/{:04}/{:02}/{}.{}", user_id, year, month, document_id, extension) + } + } + + /// Get content type from S3 key/filename + fn get_content_type_from_key(&self, key: &str) -> Option { + let extension = std::path::Path::new(key) + .extension() + .and_then(|ext| ext.to_str()) + .unwrap_or("") + .to_lowercase(); + + Some(Self::get_mime_type(&extension)) + } + + /// Get metadata for image files + fn get_image_metadata(&self) -> HashMap { + let mut metadata = HashMap::new(); + metadata.insert("generated-by".to_string(), "readur".to_string()); + metadata.insert("created-at".to_string(), chrono::Utc::now().to_rfc3339()); + metadata + } + + /// Retry wrapper for S3 operations with exponential backoff + async fn retry_operation(&self, operation_name: &str, operation: F) -> Result + where + F: Fn() -> Fut, + Fut: std::future::Future>, + { + const MAX_RETRIES: u32 = 3; + const BASE_DELAY_MS: u64 = 100; + + let mut last_error = None; + + for attempt in 0..=MAX_RETRIES { + match operation().await { + Ok(result) => { + if attempt > 0 { + info!("S3 operation '{}' succeeded after {} retries", operation_name, attempt); + } + return Ok(result); + } + Err(e) => { + last_error = Some(e); + + if attempt < MAX_RETRIES { + let delay_ms = BASE_DELAY_MS * 2u64.pow(attempt); + warn!("S3 operation '{}' failed (attempt {}/{}), retrying in {}ms: {}", + operation_name, attempt + 1, MAX_RETRIES + 1, delay_ms, last_error.as_ref().unwrap()); + tokio::time::sleep(Duration::from_millis(delay_ms)).await; + } + } + } + } + + error!("S3 operation '{}' failed after {} attempts: {}", + operation_name, MAX_RETRIES + 1, last_error.as_ref().unwrap()); + Err(last_error.unwrap()) + } } #[cfg(test)] diff --git a/src/test_utils.rs b/src/test_utils.rs index 89d012d..5179f91 100644 --- a/src/test_utils.rs +++ b/src/test_utils.rs @@ -816,6 +816,10 @@ impl TestConfigBuilder { oidc_client_secret: None, oidc_issuer_url: None, oidc_redirect_uri: None, + + // S3 Configuration + s3_enabled: false, + s3_config: None, } } } diff --git a/tests/s3_storage_tests.rs b/tests/s3_storage_tests.rs new file mode 100644 index 0000000..c3c3e15 --- /dev/null +++ b/tests/s3_storage_tests.rs @@ -0,0 +1,78 @@ +//! Basic S3 storage functionality tests + +use std::sync::Arc; + +use readur::services::file_service::FileService; + +#[cfg(feature = "s3")] +use readur::services::s3_service::S3Service; +#[cfg(feature = "s3")] +use readur::models::S3SourceConfig; + +#[cfg(feature = "s3")] +#[tokio::test] +async fn test_s3_service_new_validation() { + // Test S3Service creation fails with empty bucket name + let config = S3SourceConfig { + bucket_name: "".to_string(), + region: "us-east-1".to_string(), + access_key_id: "".to_string(), + secret_access_key: "".to_string(), + endpoint_url: None, + prefix: None, + watch_folders: vec![], + file_extensions: vec![], + auto_sync: false, + sync_interval_minutes: 0, + }; + + let result = S3Service::new(config).await; + assert!(result.is_err()); + assert!(result.unwrap_err().to_string().contains("Bucket name is required")); +} + +#[tokio::test] +async fn test_file_service_local_creation() { + // Test local-only FileService creation and functionality + let upload_path = "./test_uploads".to_string(); + let local_service = FileService::new(upload_path); + assert!(!local_service.is_s3_enabled()); +} + +#[cfg(feature = "s3")] +#[tokio::test] +async fn test_s3_service_configuration() { + // Test that S3 service can be created with proper configuration structure + let config = S3SourceConfig { + bucket_name: "test-bucket".to_string(), + region: "us-east-1".to_string(), + access_key_id: "test-key".to_string(), + secret_access_key: "test-secret".to_string(), + endpoint_url: Some("http://localhost:9000".to_string()), + prefix: None, + watch_folders: vec!["documents/".to_string()], + file_extensions: vec!["pdf".to_string(), "txt".to_string()], + auto_sync: false, + sync_interval_minutes: 60, + }; + + // This test verifies the configuration structure is correct + // Actual S3 connection will fail since we don't have a real endpoint + match S3Service::new(config.clone()).await { + Ok(service) => { + // If it succeeds, verify the config was stored correctly + assert_eq!(service.get_config().bucket_name, "test-bucket"); + assert_eq!(service.get_config().region, "us-east-1"); + assert_eq!(service.get_config().watch_folders.len(), 1); + + // Test FileService integration + let s3_file_service = FileService::new_with_s3("./test".to_string(), Arc::new(service)); + assert!(s3_file_service.is_s3_enabled()); + } + Err(_) => { + // Expected to fail since we don't have a real S3 endpoint + // This test mainly verifies the structure compiles correctly + println!("S3 service creation failed as expected (no real S3 endpoint)"); + } + } +} \ No newline at end of file