feat(storage): implement s3 for storage
This commit is contained in:
parent
deccac0f64
commit
68ceb1f9cb
|
|
@ -0,0 +1,244 @@
|
|||
//! Migration utility to move existing local files to S3 storage
|
||||
//!
|
||||
//! Usage: cargo run --bin migrate_to_s3 --features s3
|
||||
//!
|
||||
//! This utility will:
|
||||
//! 1. Connect to the database
|
||||
//! 2. Find all documents with local file paths
|
||||
//! 3. Upload files to S3 with proper structure
|
||||
//! 4. Update database records with S3 paths
|
||||
//! 5. Optionally delete local files after successful upload
|
||||
|
||||
use anyhow::Result;
|
||||
use clap::Parser;
|
||||
use std::path::Path;
|
||||
use uuid::Uuid;
|
||||
use tracing::{info, warn, error};
|
||||
|
||||
use readur::{
|
||||
config::Config,
|
||||
db::Database,
|
||||
services::{s3_service::S3Service, file_service::FileService},
|
||||
};
|
||||
|
||||
#[derive(Parser)]
|
||||
#[command(name = "migrate_to_s3")]
|
||||
#[command(about = "Migrate existing local files to S3 storage")]
|
||||
struct Args {
|
||||
/// Dry run - only show what would be migrated
|
||||
#[arg(short, long)]
|
||||
dry_run: bool,
|
||||
|
||||
/// Delete local files after successful S3 upload
|
||||
#[arg(long)]
|
||||
delete_local: bool,
|
||||
|
||||
/// Limit number of files to migrate (for testing)
|
||||
#[arg(short, long)]
|
||||
limit: Option<usize>,
|
||||
|
||||
/// Only migrate files for specific user ID
|
||||
#[arg(short, long)]
|
||||
user_id: Option<String>,
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<()> {
|
||||
// Initialize logging
|
||||
tracing_subscriber::fmt()
|
||||
.with_env_filter("info")
|
||||
.init();
|
||||
|
||||
let args = Args::parse();
|
||||
|
||||
info!("🚀 Starting S3 migration utility");
|
||||
|
||||
// Load configuration
|
||||
let config = Config::from_env()?;
|
||||
|
||||
if !config.s3_enabled {
|
||||
error!("S3 is not enabled in configuration. Set S3_ENABLED=true and provide S3 configuration.");
|
||||
std::process::exit(1);
|
||||
}
|
||||
|
||||
let s3_config = config.s3_config.as_ref()
|
||||
.ok_or_else(|| anyhow::anyhow!("S3 configuration not found"))?;
|
||||
|
||||
// Connect to database
|
||||
info!("📊 Connecting to database...");
|
||||
let db = Database::new(&config.database_url).await?;
|
||||
|
||||
// Initialize S3 service
|
||||
info!("☁️ Initializing S3 service...");
|
||||
let s3_service = S3Service::new(s3_config.clone()).await?;
|
||||
|
||||
// Test S3 connection
|
||||
match s3_service.test_connection().await {
|
||||
Ok(_) => info!("✅ S3 connection successful"),
|
||||
Err(e) => {
|
||||
error!("❌ S3 connection failed: {}", e);
|
||||
std::process::exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
// Get documents to migrate
|
||||
info!("🔍 Finding documents to migrate...");
|
||||
let mut documents = if let Some(user_id_str) = &args.user_id {
|
||||
let user_id = Uuid::parse_str(user_id_str)?;
|
||||
db.get_documents_by_user(user_id, args.limit.unwrap_or(1000) as i64, 0).await?
|
||||
} else {
|
||||
// Get all documents (this might need pagination for large datasets)
|
||||
let all_users = db.get_all_users().await?;
|
||||
let mut all_docs = Vec::new();
|
||||
|
||||
for user in all_users {
|
||||
let user_docs = db.get_documents_by_user(user.id, 500, 0).await?;
|
||||
all_docs.extend(user_docs);
|
||||
|
||||
if let Some(limit) = args.limit {
|
||||
if all_docs.len() >= limit {
|
||||
all_docs.truncate(limit);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
all_docs
|
||||
};
|
||||
|
||||
// Filter documents that are not already in S3
|
||||
let local_documents: Vec<_> = documents.into_iter()
|
||||
.filter(|doc| !doc.file_path.starts_with("s3://"))
|
||||
.collect();
|
||||
|
||||
info!("📋 Found {} documents with local file paths", local_documents.len());
|
||||
|
||||
if local_documents.is_empty() {
|
||||
info!("✅ No local documents found to migrate");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
if args.dry_run {
|
||||
info!("🔍 DRY RUN - Would migrate the following files:");
|
||||
for doc in &local_documents {
|
||||
info!(" - {} (User: {}, Size: {} bytes)",
|
||||
doc.original_filename, doc.user_id, doc.file_size);
|
||||
}
|
||||
info!("💡 Run without --dry-run to perform actual migration");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Perform migration
|
||||
let mut migrated_count = 0;
|
||||
let mut failed_count = 0;
|
||||
|
||||
for doc in local_documents {
|
||||
info!("📦 Migrating: {} ({})", doc.original_filename, doc.id);
|
||||
|
||||
match migrate_document(&db, &s3_service, &doc, args.delete_local).await {
|
||||
Ok(_) => {
|
||||
migrated_count += 1;
|
||||
info!("✅ Successfully migrated: {}", doc.original_filename);
|
||||
}
|
||||
Err(e) => {
|
||||
failed_count += 1;
|
||||
error!("❌ Failed to migrate {}: {}", doc.original_filename, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
info!("🎉 Migration completed!");
|
||||
info!("✅ Successfully migrated: {} files", migrated_count);
|
||||
if failed_count > 0 {
|
||||
warn!("❌ Failed to migrate: {} files", failed_count);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn migrate_document(
|
||||
db: &Database,
|
||||
s3_service: &S3Service,
|
||||
document: &readur::models::Document,
|
||||
delete_local: bool,
|
||||
) -> Result<()> {
|
||||
// Read local file
|
||||
let local_path = Path::new(&document.file_path);
|
||||
if !local_path.exists() {
|
||||
return Err(anyhow::anyhow!("Local file not found: {}", document.file_path));
|
||||
}
|
||||
|
||||
let file_data = tokio::fs::read(&local_path).await?;
|
||||
|
||||
// Upload to S3
|
||||
let s3_key = s3_service.store_document(
|
||||
document.user_id,
|
||||
document.id,
|
||||
&document.filename,
|
||||
&file_data,
|
||||
).await?;
|
||||
|
||||
let s3_path = format!("s3://{}", s3_key);
|
||||
|
||||
// Update database record
|
||||
db.update_document_file_path(document.id, &s3_path).await?;
|
||||
|
||||
// Migrate associated files (thumbnails, processed images)
|
||||
migrate_associated_files(s3_service, document, delete_local).await?;
|
||||
|
||||
// Delete local file if requested
|
||||
if delete_local {
|
||||
if let Err(e) = tokio::fs::remove_file(&local_path).await {
|
||||
warn!("Failed to delete local file {}: {}", document.file_path, e);
|
||||
} else {
|
||||
info!("🗑️ Deleted local file: {}", document.file_path);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn migrate_associated_files(
|
||||
s3_service: &S3Service,
|
||||
document: &readur::models::Document,
|
||||
delete_local: bool,
|
||||
) -> Result<()> {
|
||||
let file_service = FileService::new("./uploads".to_string());
|
||||
|
||||
// Migrate thumbnail
|
||||
let thumbnail_path = file_service.get_thumbnails_path().join(format!("{}_thumb.jpg", document.id));
|
||||
if thumbnail_path.exists() {
|
||||
match tokio::fs::read(&thumbnail_path).await {
|
||||
Ok(thumbnail_data) => {
|
||||
if let Err(e) = s3_service.store_thumbnail(document.user_id, document.id, &thumbnail_data).await {
|
||||
warn!("Failed to migrate thumbnail for {}: {}", document.id, e);
|
||||
} else {
|
||||
info!("📸 Migrated thumbnail for: {}", document.original_filename);
|
||||
if delete_local {
|
||||
let _ = tokio::fs::remove_file(&thumbnail_path).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => warn!("Failed to read thumbnail {}: {}", thumbnail_path.display(), e),
|
||||
}
|
||||
}
|
||||
|
||||
// Migrate processed image
|
||||
let processed_path = file_service.get_processed_images_path().join(format!("{}_processed.png", document.id));
|
||||
if processed_path.exists() {
|
||||
match tokio::fs::read(&processed_path).await {
|
||||
Ok(processed_data) => {
|
||||
if let Err(e) = s3_service.store_processed_image(document.user_id, document.id, &processed_data).await {
|
||||
warn!("Failed to migrate processed image for {}: {}", document.id, e);
|
||||
} else {
|
||||
info!("🖼️ Migrated processed image for: {}", document.original_filename);
|
||||
if delete_local {
|
||||
let _ = tokio::fs::remove_file(&processed_path).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => warn!("Failed to read processed image {}: {}", processed_path.display(), e),
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
@ -1,6 +1,8 @@
|
|||
use anyhow::Result;
|
||||
use std::env;
|
||||
|
||||
use crate::models::S3SourceConfig;
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct Config {
|
||||
pub database_url: String,
|
||||
|
|
@ -31,6 +33,10 @@ pub struct Config {
|
|||
pub oidc_client_secret: Option<String>,
|
||||
pub oidc_issuer_url: Option<String>,
|
||||
pub oidc_redirect_uri: Option<String>,
|
||||
|
||||
// S3 Configuration
|
||||
pub s3_enabled: bool,
|
||||
pub s3_config: Option<S3SourceConfig>,
|
||||
}
|
||||
|
||||
impl Config {
|
||||
|
|
@ -431,6 +437,61 @@ impl Config {
|
|||
None
|
||||
}
|
||||
},
|
||||
|
||||
// S3 Configuration
|
||||
s3_enabled: match env::var("S3_ENABLED") {
|
||||
Ok(val) => {
|
||||
let enabled = val.to_lowercase() == "true";
|
||||
println!("✅ S3_ENABLED: {} (loaded from env)", enabled);
|
||||
enabled
|
||||
}
|
||||
Err(_) => {
|
||||
println!("⚠️ S3_ENABLED: false (using default - env var not set)");
|
||||
false
|
||||
}
|
||||
},
|
||||
s3_config: if env::var("S3_ENABLED").unwrap_or_default().to_lowercase() == "true" {
|
||||
// Only load S3 config if S3 is enabled
|
||||
let bucket_name = env::var("S3_BUCKET_NAME").unwrap_or_default();
|
||||
let region = env::var("S3_REGION").unwrap_or_else(|_| "us-east-1".to_string());
|
||||
let access_key_id = env::var("S3_ACCESS_KEY_ID").unwrap_or_default();
|
||||
let secret_access_key = env::var("S3_SECRET_ACCESS_KEY").unwrap_or_default();
|
||||
let endpoint_url = env::var("S3_ENDPOINT_URL").ok();
|
||||
let prefix = env::var("S3_PREFIX").ok();
|
||||
|
||||
if !bucket_name.is_empty() && !access_key_id.is_empty() && !secret_access_key.is_empty() {
|
||||
println!("✅ S3_BUCKET_NAME: {} (loaded from env)", bucket_name);
|
||||
println!("✅ S3_REGION: {} (loaded from env)", region);
|
||||
println!("✅ S3_ACCESS_KEY_ID: {}***{} (loaded from env)",
|
||||
&access_key_id[..2.min(access_key_id.len())],
|
||||
&access_key_id[access_key_id.len().saturating_sub(2)..]);
|
||||
println!("✅ S3_SECRET_ACCESS_KEY: ***hidden*** (loaded from env, {} chars)", secret_access_key.len());
|
||||
if let Some(ref endpoint) = endpoint_url {
|
||||
println!("✅ S3_ENDPOINT_URL: {} (loaded from env)", endpoint);
|
||||
}
|
||||
if let Some(ref pref) = prefix {
|
||||
println!("✅ S3_PREFIX: {} (loaded from env)", pref);
|
||||
}
|
||||
|
||||
Some(S3SourceConfig {
|
||||
bucket_name,
|
||||
region,
|
||||
access_key_id,
|
||||
secret_access_key,
|
||||
endpoint_url,
|
||||
prefix,
|
||||
watch_folders: vec![], // Will be configured separately for sources
|
||||
file_extensions: vec![], // Will be configured separately for sources
|
||||
auto_sync: false, // Not used for general storage
|
||||
sync_interval_minutes: 0, // Not used for general storage
|
||||
})
|
||||
} else {
|
||||
println!("❌ S3 enabled but missing required configuration (bucket_name, access_key_id, or secret_access_key)");
|
||||
None
|
||||
}
|
||||
} else {
|
||||
None
|
||||
},
|
||||
};
|
||||
|
||||
println!("\n🔍 CONFIGURATION VALIDATION:");
|
||||
|
|
|
|||
|
|
@ -201,4 +201,21 @@ impl Database {
|
|||
|
||||
Ok(rows.iter().map(map_row_to_document).collect())
|
||||
}
|
||||
|
||||
/// Update document file path (useful for S3 migration)
|
||||
pub async fn update_document_file_path(&self, document_id: Uuid, new_file_path: &str) -> Result<()> {
|
||||
sqlx::query(
|
||||
r#"
|
||||
UPDATE documents
|
||||
SET file_path = $2, updated_at = NOW()
|
||||
WHERE id = $1
|
||||
"#
|
||||
)
|
||||
.bind(document_id)
|
||||
.bind(new_file_path)
|
||||
.execute(&self.pool)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
|
@ -168,9 +168,12 @@ impl DocumentIngestionService {
|
|||
}
|
||||
}
|
||||
|
||||
// Save file to storage
|
||||
// Generate document ID upfront so we can use it for storage path
|
||||
let document_id = Uuid::new_v4();
|
||||
|
||||
// Save file to storage - use S3 if configured, otherwise local storage
|
||||
let file_path = match self.file_service
|
||||
.save_file(&request.filename, &request.file_data)
|
||||
.save_document_file(request.user_id, document_id, &request.filename, &request.file_data)
|
||||
.await {
|
||||
Ok(path) => path,
|
||||
Err(e) => {
|
||||
|
|
@ -212,8 +215,9 @@ impl DocumentIngestionService {
|
|||
}
|
||||
};
|
||||
|
||||
// Create document record
|
||||
let document = self.file_service.create_document(
|
||||
// Create document record with the same ID used for storage
|
||||
let document = self.file_service.create_document_with_id(
|
||||
document_id,
|
||||
&request.filename,
|
||||
&request.original_filename,
|
||||
&file_path,
|
||||
|
|
|
|||
45
src/main.rs
45
src/main.rs
|
|
@ -121,14 +121,45 @@ async fn main() -> anyhow::Result<()> {
|
|||
println!("📁 Upload directory: {}", config.upload_path);
|
||||
println!("👁️ Watch directory: {}", config.watch_folder);
|
||||
|
||||
// Initialize upload directory structure
|
||||
info!("Initializing upload directory structure...");
|
||||
let file_service = readur::services::file_service::FileService::new(config.upload_path.clone());
|
||||
if let Err(e) = file_service.initialize_directory_structure().await {
|
||||
error!("Failed to initialize directory structure: {}", e);
|
||||
return Err(e.into());
|
||||
// Initialize file service with S3 support if configured
|
||||
info!("Initializing file service...");
|
||||
let file_service = if config.s3_enabled {
|
||||
if let Some(s3_config) = &config.s3_config {
|
||||
info!("S3 storage enabled, initializing S3Service...");
|
||||
match readur::services::s3_service::S3Service::new(s3_config.clone()).await {
|
||||
Ok(s3_service) => {
|
||||
info!("✅ S3Service initialized successfully");
|
||||
readur::services::file_service::FileService::new_with_s3(
|
||||
config.upload_path.clone(),
|
||||
Arc::new(s3_service)
|
||||
)
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to initialize S3Service: {}", e);
|
||||
warn!("Falling back to local storage only");
|
||||
readur::services::file_service::FileService::new(config.upload_path.clone())
|
||||
}
|
||||
}
|
||||
} else {
|
||||
warn!("S3 enabled but no S3 configuration provided, using local storage");
|
||||
readur::services::file_service::FileService::new(config.upload_path.clone())
|
||||
}
|
||||
} else {
|
||||
info!("Using local file storage");
|
||||
readur::services::file_service::FileService::new(config.upload_path.clone())
|
||||
};
|
||||
|
||||
if !file_service.is_s3_enabled() {
|
||||
// Only initialize local directory structure if not using S3
|
||||
info!("Initializing local upload directory structure...");
|
||||
if let Err(e) = file_service.initialize_directory_structure().await {
|
||||
error!("Failed to initialize directory structure: {}", e);
|
||||
return Err(e.into());
|
||||
}
|
||||
info!("✅ Local upload directory structure initialized");
|
||||
} else {
|
||||
info!("✅ File service initialized with S3 storage backend");
|
||||
}
|
||||
info!("✅ Upload directory structure initialized");
|
||||
|
||||
// Migrate existing files to new structure (one-time operation)
|
||||
info!("Migrating existing files to structured directories...");
|
||||
|
|
|
|||
|
|
@ -1,11 +1,13 @@
|
|||
use anyhow::Result;
|
||||
use chrono::Utc;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::Arc;
|
||||
use tokio::fs;
|
||||
use uuid::Uuid;
|
||||
use tracing::{info, warn, error};
|
||||
|
||||
use crate::models::Document;
|
||||
use crate::services::s3_service::S3Service;
|
||||
|
||||
#[cfg(feature = "ocr")]
|
||||
use image::{DynamicImage, ImageFormat, imageops::FilterType};
|
||||
|
|
@ -13,11 +15,27 @@ use image::{DynamicImage, ImageFormat, imageops::FilterType};
|
|||
#[derive(Clone)]
|
||||
pub struct FileService {
|
||||
upload_path: String,
|
||||
s3_service: Option<Arc<S3Service>>,
|
||||
}
|
||||
|
||||
impl FileService {
|
||||
pub fn new(upload_path: String) -> Self {
|
||||
Self { upload_path }
|
||||
Self {
|
||||
upload_path,
|
||||
s3_service: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new_with_s3(upload_path: String, s3_service: Arc<S3Service>) -> Self {
|
||||
Self {
|
||||
upload_path,
|
||||
s3_service: Some(s3_service),
|
||||
}
|
||||
}
|
||||
|
||||
/// Check if S3 storage is enabled
|
||||
pub fn is_s3_enabled(&self) -> bool {
|
||||
self.s3_service.is_some()
|
||||
}
|
||||
|
||||
/// Initialize the upload directory structure
|
||||
|
|
@ -148,6 +166,67 @@ impl FileService {
|
|||
Ok(file_path.to_string_lossy().to_string())
|
||||
}
|
||||
|
||||
/// Save file for a specific document (works with both local and S3)
|
||||
pub async fn save_document_file(&self, user_id: Uuid, document_id: Uuid, filename: &str, data: &[u8]) -> Result<String> {
|
||||
if let Some(s3_service) = &self.s3_service {
|
||||
// Use S3 storage
|
||||
let s3_key = s3_service.store_document(user_id, document_id, filename, data).await?;
|
||||
info!("Saved document to S3: {}", s3_key);
|
||||
Ok(format!("s3://{}", s3_key))
|
||||
} else {
|
||||
// Use local storage
|
||||
self.save_file(filename, data).await
|
||||
}
|
||||
}
|
||||
|
||||
/// Save thumbnail (works with both local and S3)
|
||||
pub async fn save_thumbnail(&self, user_id: Uuid, document_id: Uuid, data: &[u8]) -> Result<String> {
|
||||
if let Some(s3_service) = &self.s3_service {
|
||||
// Use S3 storage
|
||||
let s3_key = s3_service.store_thumbnail(user_id, document_id, data).await?;
|
||||
info!("Saved thumbnail to S3: {}", s3_key);
|
||||
Ok(format!("s3://{}", s3_key))
|
||||
} else {
|
||||
// Use local storage
|
||||
let thumbnails_dir = self.get_thumbnails_path();
|
||||
if let Err(e) = fs::create_dir_all(&thumbnails_dir).await {
|
||||
error!("Failed to create thumbnails directory: {}", e);
|
||||
return Err(anyhow::anyhow!("Failed to create thumbnails directory: {}", e));
|
||||
}
|
||||
|
||||
let thumbnail_filename = format!("{}_thumb.jpg", document_id);
|
||||
let thumbnail_path = thumbnails_dir.join(&thumbnail_filename);
|
||||
|
||||
fs::write(&thumbnail_path, data).await?;
|
||||
info!("Saved thumbnail locally: {}", thumbnail_path.display());
|
||||
Ok(thumbnail_path.to_string_lossy().to_string())
|
||||
}
|
||||
}
|
||||
|
||||
/// Save processed image (works with both local and S3)
|
||||
pub async fn save_processed_image(&self, user_id: Uuid, document_id: Uuid, data: &[u8]) -> Result<String> {
|
||||
if let Some(s3_service) = &self.s3_service {
|
||||
// Use S3 storage
|
||||
let s3_key = s3_service.store_processed_image(user_id, document_id, data).await?;
|
||||
info!("Saved processed image to S3: {}", s3_key);
|
||||
Ok(format!("s3://{}", s3_key))
|
||||
} else {
|
||||
// Use local storage
|
||||
let processed_dir = self.get_processed_images_path();
|
||||
if let Err(e) = fs::create_dir_all(&processed_dir).await {
|
||||
error!("Failed to create processed images directory: {}", e);
|
||||
return Err(anyhow::anyhow!("Failed to create processed images directory: {}", e));
|
||||
}
|
||||
|
||||
let processed_filename = format!("{}_processed.png", document_id);
|
||||
let processed_path = processed_dir.join(&processed_filename);
|
||||
|
||||
fs::write(&processed_path, data).await?;
|
||||
info!("Saved processed image locally: {}", processed_path.display());
|
||||
Ok(processed_path.to_string_lossy().to_string())
|
||||
}
|
||||
}
|
||||
|
||||
pub fn create_document(
|
||||
&self,
|
||||
filename: &str,
|
||||
|
|
@ -166,9 +245,50 @@ impl FileService {
|
|||
file_owner: Option<String>,
|
||||
file_group: Option<String>,
|
||||
source_metadata: Option<serde_json::Value>,
|
||||
) -> Document {
|
||||
self.create_document_with_id(
|
||||
Uuid::new_v4(),
|
||||
filename,
|
||||
original_filename,
|
||||
file_path,
|
||||
file_size,
|
||||
mime_type,
|
||||
user_id,
|
||||
file_hash,
|
||||
original_created_at,
|
||||
original_modified_at,
|
||||
source_path,
|
||||
source_type,
|
||||
source_id,
|
||||
file_permissions,
|
||||
file_owner,
|
||||
file_group,
|
||||
source_metadata,
|
||||
)
|
||||
}
|
||||
|
||||
pub fn create_document_with_id(
|
||||
&self,
|
||||
document_id: Uuid,
|
||||
filename: &str,
|
||||
original_filename: &str,
|
||||
file_path: &str,
|
||||
file_size: i64,
|
||||
mime_type: &str,
|
||||
user_id: Uuid,
|
||||
file_hash: Option<String>,
|
||||
original_created_at: Option<chrono::DateTime<chrono::Utc>>,
|
||||
original_modified_at: Option<chrono::DateTime<chrono::Utc>>,
|
||||
source_path: Option<String>,
|
||||
source_type: Option<String>,
|
||||
source_id: Option<Uuid>,
|
||||
file_permissions: Option<i32>,
|
||||
file_owner: Option<String>,
|
||||
file_group: Option<String>,
|
||||
source_metadata: Option<serde_json::Value>,
|
||||
) -> Document {
|
||||
Document {
|
||||
id: Uuid::new_v4(),
|
||||
id: document_id,
|
||||
filename: filename.to_string(),
|
||||
original_filename: original_filename.to_string(),
|
||||
file_path: file_path.to_string(),
|
||||
|
|
@ -243,6 +363,17 @@ impl FileService {
|
|||
}
|
||||
|
||||
pub async fn read_file(&self, file_path: &str) -> Result<Vec<u8>> {
|
||||
// Check if this is an S3 path
|
||||
if file_path.starts_with("s3://") {
|
||||
if let Some(s3_service) = &self.s3_service {
|
||||
let s3_key = file_path.strip_prefix("s3://").unwrap_or(file_path);
|
||||
return s3_service.retrieve_file(s3_key).await;
|
||||
} else {
|
||||
return Err(anyhow::anyhow!("S3 path provided but S3 service not configured: {}", file_path));
|
||||
}
|
||||
}
|
||||
|
||||
// Handle local file path
|
||||
let resolved_path = self.resolve_file_path(file_path).await?;
|
||||
let data = fs::read(&resolved_path).await?;
|
||||
Ok(data)
|
||||
|
|
@ -508,6 +639,17 @@ impl FileService {
|
|||
}
|
||||
|
||||
pub async fn delete_document_files(&self, document: &Document) -> Result<()> {
|
||||
// Check if this document uses S3 storage
|
||||
if document.file_path.starts_with("s3://") {
|
||||
if let Some(s3_service) = &self.s3_service {
|
||||
// Use S3 deletion
|
||||
return s3_service.delete_document_files(document.user_id, document.id, &document.filename).await;
|
||||
} else {
|
||||
return Err(anyhow::anyhow!("Document stored in S3 but S3 service not configured"));
|
||||
}
|
||||
}
|
||||
|
||||
// Handle local file deletion
|
||||
let mut deleted_files = Vec::new();
|
||||
let mut serious_errors = Vec::new();
|
||||
|
||||
|
|
|
|||
|
|
@ -1,7 +1,10 @@
|
|||
use anyhow::{anyhow, Result};
|
||||
use chrono::DateTime;
|
||||
use tracing::{debug, info, warn};
|
||||
use chrono::{DateTime, Datelike};
|
||||
use tracing::{debug, info, warn, error};
|
||||
use serde_json;
|
||||
use std::collections::HashMap;
|
||||
use std::time::Duration;
|
||||
use uuid::Uuid;
|
||||
|
||||
#[cfg(feature = "s3")]
|
||||
use aws_sdk_s3::Client;
|
||||
|
|
@ -9,6 +12,8 @@ use aws_sdk_s3::Client;
|
|||
use aws_credential_types::Credentials;
|
||||
#[cfg(feature = "s3")]
|
||||
use aws_types::region::Region as AwsRegion;
|
||||
#[cfg(feature = "s3")]
|
||||
use aws_sdk_s3::primitives::ByteStream;
|
||||
|
||||
use crate::models::{FileIngestionInfo, S3SourceConfig};
|
||||
|
||||
|
|
@ -325,6 +330,329 @@ impl S3Service {
|
|||
pub fn get_config(&self) -> &S3SourceConfig {
|
||||
&self.config
|
||||
}
|
||||
|
||||
// ========================================
|
||||
// DIRECT STORAGE OPERATIONS
|
||||
// ========================================
|
||||
|
||||
/// Store a file directly to S3 with structured path
|
||||
pub async fn store_document(&self, user_id: Uuid, document_id: Uuid, filename: &str, data: &[u8]) -> Result<String> {
|
||||
#[cfg(not(feature = "s3"))]
|
||||
{
|
||||
return Err(anyhow!("S3 support not compiled in"));
|
||||
}
|
||||
|
||||
#[cfg(feature = "s3")]
|
||||
{
|
||||
let key = self.generate_document_key(user_id, document_id, filename);
|
||||
self.store_file(&key, data, None).await?;
|
||||
Ok(key)
|
||||
}
|
||||
}
|
||||
|
||||
/// Store a thumbnail to S3
|
||||
pub async fn store_thumbnail(&self, user_id: Uuid, document_id: Uuid, data: &[u8]) -> Result<String> {
|
||||
#[cfg(not(feature = "s3"))]
|
||||
{
|
||||
return Err(anyhow!("S3 support not compiled in"));
|
||||
}
|
||||
|
||||
#[cfg(feature = "s3")]
|
||||
{
|
||||
let key = format!("thumbnails/{}/{}_thumb.jpg", user_id, document_id);
|
||||
self.store_file(&key, data, Some(self.get_image_metadata())).await?;
|
||||
Ok(key)
|
||||
}
|
||||
}
|
||||
|
||||
/// Store a processed image to S3
|
||||
pub async fn store_processed_image(&self, user_id: Uuid, document_id: Uuid, data: &[u8]) -> Result<String> {
|
||||
#[cfg(not(feature = "s3"))]
|
||||
{
|
||||
return Err(anyhow!("S3 support not compiled in"));
|
||||
}
|
||||
|
||||
#[cfg(feature = "s3")]
|
||||
{
|
||||
let key = format!("processed_images/{}/{}_processed.png", user_id, document_id);
|
||||
self.store_file(&key, data, Some(self.get_image_metadata())).await?;
|
||||
Ok(key)
|
||||
}
|
||||
}
|
||||
|
||||
/// Generic file storage method
|
||||
async fn store_file(&self, key: &str, data: &[u8], metadata: Option<HashMap<String, String>>) -> Result<()> {
|
||||
#[cfg(not(feature = "s3"))]
|
||||
{
|
||||
return Err(anyhow!("S3 support not compiled in"));
|
||||
}
|
||||
|
||||
#[cfg(feature = "s3")]
|
||||
{
|
||||
info!("Storing file to S3: {}/{}", self.config.bucket_name, key);
|
||||
|
||||
let key_owned = key.to_string();
|
||||
let data_owned = data.to_vec();
|
||||
let metadata_owned = metadata.clone();
|
||||
let bucket_name = self.config.bucket_name.clone();
|
||||
let client = self.client.clone();
|
||||
|
||||
self.retry_operation(&format!("store_file: {}", key), || {
|
||||
let key = key_owned.clone();
|
||||
let data = data_owned.clone();
|
||||
let metadata = metadata_owned.clone();
|
||||
let bucket_name = bucket_name.clone();
|
||||
let client = client.clone();
|
||||
let content_type = self.get_content_type_from_key(&key);
|
||||
|
||||
async move {
|
||||
let mut put_request = client
|
||||
.put_object()
|
||||
.bucket(&bucket_name)
|
||||
.key(&key)
|
||||
.body(ByteStream::from(data));
|
||||
|
||||
// Add metadata if provided
|
||||
if let Some(meta) = metadata {
|
||||
for (k, v) in meta {
|
||||
put_request = put_request.metadata(k, v);
|
||||
}
|
||||
}
|
||||
|
||||
// Set content type based on file extension
|
||||
if let Some(ct) = content_type {
|
||||
put_request = put_request.content_type(ct);
|
||||
}
|
||||
|
||||
put_request.send().await
|
||||
.map_err(|e| anyhow!("Failed to store file {}: {}", key, e))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}).await?;
|
||||
|
||||
info!("Successfully stored file: {}", key);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Retrieve a file from S3
|
||||
pub async fn retrieve_file(&self, key: &str) -> Result<Vec<u8>> {
|
||||
#[cfg(not(feature = "s3"))]
|
||||
{
|
||||
return Err(anyhow!("S3 support not compiled in"));
|
||||
}
|
||||
|
||||
#[cfg(feature = "s3")]
|
||||
{
|
||||
info!("Retrieving file from S3: {}/{}", self.config.bucket_name, key);
|
||||
|
||||
let key_owned = key.to_string();
|
||||
let bucket_name = self.config.bucket_name.clone();
|
||||
let client = self.client.clone();
|
||||
|
||||
let bytes = self.retry_operation(&format!("retrieve_file: {}", key), || {
|
||||
let key = key_owned.clone();
|
||||
let bucket_name = bucket_name.clone();
|
||||
let client = client.clone();
|
||||
|
||||
async move {
|
||||
let response = client
|
||||
.get_object()
|
||||
.bucket(&bucket_name)
|
||||
.key(&key)
|
||||
.send()
|
||||
.await
|
||||
.map_err(|e| anyhow!("Failed to retrieve file {}: {}", key, e))?;
|
||||
|
||||
let body = response.body.collect().await
|
||||
.map_err(|e| anyhow!("Failed to read file body: {}", e))?;
|
||||
|
||||
Ok(body.into_bytes().to_vec())
|
||||
}
|
||||
}).await?;
|
||||
|
||||
info!("Successfully retrieved file: {} ({} bytes)", key, bytes.len());
|
||||
Ok(bytes)
|
||||
}
|
||||
}
|
||||
|
||||
/// Delete a file from S3
|
||||
pub async fn delete_file(&self, key: &str) -> Result<()> {
|
||||
#[cfg(not(feature = "s3"))]
|
||||
{
|
||||
return Err(anyhow!("S3 support not compiled in"));
|
||||
}
|
||||
|
||||
#[cfg(feature = "s3")]
|
||||
{
|
||||
info!("Deleting file from S3: {}/{}", self.config.bucket_name, key);
|
||||
|
||||
self.client
|
||||
.delete_object()
|
||||
.bucket(&self.config.bucket_name)
|
||||
.key(key)
|
||||
.send()
|
||||
.await
|
||||
.map_err(|e| anyhow!("Failed to delete file {}: {}", key, e))?;
|
||||
|
||||
info!("Successfully deleted file: {}", key);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Check if a file exists in S3
|
||||
pub async fn file_exists(&self, key: &str) -> Result<bool> {
|
||||
#[cfg(not(feature = "s3"))]
|
||||
{
|
||||
return Err(anyhow!("S3 support not compiled in"));
|
||||
}
|
||||
|
||||
#[cfg(feature = "s3")]
|
||||
{
|
||||
match self.client
|
||||
.head_object()
|
||||
.bucket(&self.config.bucket_name)
|
||||
.key(key)
|
||||
.send()
|
||||
.await
|
||||
{
|
||||
Ok(_) => Ok(true),
|
||||
Err(e) => {
|
||||
let error_msg = e.to_string();
|
||||
if error_msg.contains("NotFound") || error_msg.contains("404") {
|
||||
Ok(false)
|
||||
} else {
|
||||
Err(anyhow!("Failed to check file existence {}: {}", key, e))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Delete all files for a document (document, thumbnail, processed image)
|
||||
pub async fn delete_document_files(&self, user_id: Uuid, document_id: Uuid, filename: &str) -> Result<()> {
|
||||
#[cfg(not(feature = "s3"))]
|
||||
{
|
||||
return Err(anyhow!("S3 support not compiled in"));
|
||||
}
|
||||
|
||||
#[cfg(feature = "s3")]
|
||||
{
|
||||
let document_key = self.generate_document_key(user_id, document_id, filename);
|
||||
let thumbnail_key = format!("thumbnails/{}/{}_thumb.jpg", user_id, document_id);
|
||||
let processed_key = format!("processed_images/{}/{}_processed.png", user_id, document_id);
|
||||
|
||||
let mut errors = Vec::new();
|
||||
|
||||
// Delete document file
|
||||
if let Err(e) = self.delete_file(&document_key).await {
|
||||
if !e.to_string().contains("NotFound") {
|
||||
errors.push(format!("Document: {}", e));
|
||||
}
|
||||
}
|
||||
|
||||
// Delete thumbnail
|
||||
if let Err(e) = self.delete_file(&thumbnail_key).await {
|
||||
if !e.to_string().contains("NotFound") {
|
||||
errors.push(format!("Thumbnail: {}", e));
|
||||
}
|
||||
}
|
||||
|
||||
// Delete processed image
|
||||
if let Err(e) = self.delete_file(&processed_key).await {
|
||||
if !e.to_string().contains("NotFound") {
|
||||
errors.push(format!("Processed image: {}", e));
|
||||
}
|
||||
}
|
||||
|
||||
if !errors.is_empty() {
|
||||
return Err(anyhow!("Failed to delete some files: {}", errors.join("; ")));
|
||||
}
|
||||
|
||||
info!("Successfully deleted all files for document {}", document_id);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
// ========================================
|
||||
// HELPER METHODS
|
||||
// ========================================
|
||||
|
||||
/// Generate a structured S3 key for a document
|
||||
fn generate_document_key(&self, user_id: Uuid, document_id: Uuid, filename: &str) -> String {
|
||||
let now = chrono::Utc::now();
|
||||
let year = now.year();
|
||||
let month = now.month();
|
||||
|
||||
// Extract file extension
|
||||
let extension = std::path::Path::new(filename)
|
||||
.extension()
|
||||
.and_then(|ext| ext.to_str())
|
||||
.unwrap_or("");
|
||||
|
||||
if extension.is_empty() {
|
||||
format!("documents/{}/{:04}/{:02}/{}", user_id, year, month, document_id)
|
||||
} else {
|
||||
format!("documents/{}/{:04}/{:02}/{}.{}", user_id, year, month, document_id, extension)
|
||||
}
|
||||
}
|
||||
|
||||
/// Get content type from S3 key/filename
|
||||
fn get_content_type_from_key(&self, key: &str) -> Option<String> {
|
||||
let extension = std::path::Path::new(key)
|
||||
.extension()
|
||||
.and_then(|ext| ext.to_str())
|
||||
.unwrap_or("")
|
||||
.to_lowercase();
|
||||
|
||||
Some(Self::get_mime_type(&extension))
|
||||
}
|
||||
|
||||
/// Get metadata for image files
|
||||
fn get_image_metadata(&self) -> HashMap<String, String> {
|
||||
let mut metadata = HashMap::new();
|
||||
metadata.insert("generated-by".to_string(), "readur".to_string());
|
||||
metadata.insert("created-at".to_string(), chrono::Utc::now().to_rfc3339());
|
||||
metadata
|
||||
}
|
||||
|
||||
/// Retry wrapper for S3 operations with exponential backoff
|
||||
async fn retry_operation<T, F, Fut>(&self, operation_name: &str, operation: F) -> Result<T>
|
||||
where
|
||||
F: Fn() -> Fut,
|
||||
Fut: std::future::Future<Output = Result<T>>,
|
||||
{
|
||||
const MAX_RETRIES: u32 = 3;
|
||||
const BASE_DELAY_MS: u64 = 100;
|
||||
|
||||
let mut last_error = None;
|
||||
|
||||
for attempt in 0..=MAX_RETRIES {
|
||||
match operation().await {
|
||||
Ok(result) => {
|
||||
if attempt > 0 {
|
||||
info!("S3 operation '{}' succeeded after {} retries", operation_name, attempt);
|
||||
}
|
||||
return Ok(result);
|
||||
}
|
||||
Err(e) => {
|
||||
last_error = Some(e);
|
||||
|
||||
if attempt < MAX_RETRIES {
|
||||
let delay_ms = BASE_DELAY_MS * 2u64.pow(attempt);
|
||||
warn!("S3 operation '{}' failed (attempt {}/{}), retrying in {}ms: {}",
|
||||
operation_name, attempt + 1, MAX_RETRIES + 1, delay_ms, last_error.as_ref().unwrap());
|
||||
tokio::time::sleep(Duration::from_millis(delay_ms)).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
error!("S3 operation '{}' failed after {} attempts: {}",
|
||||
operation_name, MAX_RETRIES + 1, last_error.as_ref().unwrap());
|
||||
Err(last_error.unwrap())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
|
|
|||
|
|
@ -816,6 +816,10 @@ impl TestConfigBuilder {
|
|||
oidc_client_secret: None,
|
||||
oidc_issuer_url: None,
|
||||
oidc_redirect_uri: None,
|
||||
|
||||
// S3 Configuration
|
||||
s3_enabled: false,
|
||||
s3_config: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,78 @@
|
|||
//! Basic S3 storage functionality tests
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use readur::services::file_service::FileService;
|
||||
|
||||
#[cfg(feature = "s3")]
|
||||
use readur::services::s3_service::S3Service;
|
||||
#[cfg(feature = "s3")]
|
||||
use readur::models::S3SourceConfig;
|
||||
|
||||
#[cfg(feature = "s3")]
|
||||
#[tokio::test]
|
||||
async fn test_s3_service_new_validation() {
|
||||
// Test S3Service creation fails with empty bucket name
|
||||
let config = S3SourceConfig {
|
||||
bucket_name: "".to_string(),
|
||||
region: "us-east-1".to_string(),
|
||||
access_key_id: "".to_string(),
|
||||
secret_access_key: "".to_string(),
|
||||
endpoint_url: None,
|
||||
prefix: None,
|
||||
watch_folders: vec![],
|
||||
file_extensions: vec![],
|
||||
auto_sync: false,
|
||||
sync_interval_minutes: 0,
|
||||
};
|
||||
|
||||
let result = S3Service::new(config).await;
|
||||
assert!(result.is_err());
|
||||
assert!(result.unwrap_err().to_string().contains("Bucket name is required"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_file_service_local_creation() {
|
||||
// Test local-only FileService creation and functionality
|
||||
let upload_path = "./test_uploads".to_string();
|
||||
let local_service = FileService::new(upload_path);
|
||||
assert!(!local_service.is_s3_enabled());
|
||||
}
|
||||
|
||||
#[cfg(feature = "s3")]
|
||||
#[tokio::test]
|
||||
async fn test_s3_service_configuration() {
|
||||
// Test that S3 service can be created with proper configuration structure
|
||||
let config = S3SourceConfig {
|
||||
bucket_name: "test-bucket".to_string(),
|
||||
region: "us-east-1".to_string(),
|
||||
access_key_id: "test-key".to_string(),
|
||||
secret_access_key: "test-secret".to_string(),
|
||||
endpoint_url: Some("http://localhost:9000".to_string()),
|
||||
prefix: None,
|
||||
watch_folders: vec!["documents/".to_string()],
|
||||
file_extensions: vec!["pdf".to_string(), "txt".to_string()],
|
||||
auto_sync: false,
|
||||
sync_interval_minutes: 60,
|
||||
};
|
||||
|
||||
// This test verifies the configuration structure is correct
|
||||
// Actual S3 connection will fail since we don't have a real endpoint
|
||||
match S3Service::new(config.clone()).await {
|
||||
Ok(service) => {
|
||||
// If it succeeds, verify the config was stored correctly
|
||||
assert_eq!(service.get_config().bucket_name, "test-bucket");
|
||||
assert_eq!(service.get_config().region, "us-east-1");
|
||||
assert_eq!(service.get_config().watch_folders.len(), 1);
|
||||
|
||||
// Test FileService integration
|
||||
let s3_file_service = FileService::new_with_s3("./test".to_string(), Arc::new(service));
|
||||
assert!(s3_file_service.is_s3_enabled());
|
||||
}
|
||||
Err(_) => {
|
||||
// Expected to fail since we don't have a real S3 endpoint
|
||||
// This test mainly verifies the structure compiles correctly
|
||||
println!("S3 service creation failed as expected (no real S3 endpoint)");
|
||||
}
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue