feat(server): set up new storage service
This commit is contained in:
parent
68ceb1f9cb
commit
3ad0dd3600
|
|
@ -14,6 +14,7 @@ pub mod routes;
|
|||
pub mod scheduling;
|
||||
pub mod seed;
|
||||
pub mod services;
|
||||
pub mod storage;
|
||||
pub mod swagger;
|
||||
pub mod utils;
|
||||
pub mod webdav_xml_parser;
|
||||
|
|
|
|||
50
src/main.rs
50
src/main.rs
|
|
@ -121,45 +121,27 @@ async fn main() -> anyhow::Result<()> {
|
|||
println!("📁 Upload directory: {}", config.upload_path);
|
||||
println!("👁️ Watch directory: {}", config.watch_folder);
|
||||
|
||||
// Initialize file service with S3 support if configured
|
||||
info!("Initializing file service...");
|
||||
let file_service = if config.s3_enabled {
|
||||
if let Some(s3_config) = &config.s3_config {
|
||||
info!("S3 storage enabled, initializing S3Service...");
|
||||
match readur::services::s3_service::S3Service::new(s3_config.clone()).await {
|
||||
Ok(s3_service) => {
|
||||
info!("✅ S3Service initialized successfully");
|
||||
readur::services::file_service::FileService::new_with_s3(
|
||||
config.upload_path.clone(),
|
||||
Arc::new(s3_service)
|
||||
)
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to initialize S3Service: {}", e);
|
||||
warn!("Falling back to local storage only");
|
||||
readur::services::file_service::FileService::new(config.upload_path.clone())
|
||||
}
|
||||
}
|
||||
} else {
|
||||
warn!("S3 enabled but no S3 configuration provided, using local storage");
|
||||
// Initialize file service using the new storage backend architecture
|
||||
info!("Initializing file service with storage backend...");
|
||||
let storage_config = readur::storage::factory::storage_config_from_env(&config)?;
|
||||
let file_service = match readur::services::file_service::FileService::from_config(storage_config, config.upload_path.clone()).await {
|
||||
Ok(service) => {
|
||||
info!("✅ File service initialized with {} storage backend", service.storage_type());
|
||||
service
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to initialize file service with configured storage backend: {}", e);
|
||||
warn!("Falling back to local storage only");
|
||||
readur::services::file_service::FileService::new(config.upload_path.clone())
|
||||
}
|
||||
} else {
|
||||
info!("Using local file storage");
|
||||
readur::services::file_service::FileService::new(config.upload_path.clone())
|
||||
};
|
||||
|
||||
if !file_service.is_s3_enabled() {
|
||||
// Only initialize local directory structure if not using S3
|
||||
info!("Initializing local upload directory structure...");
|
||||
if let Err(e) = file_service.initialize_directory_structure().await {
|
||||
error!("Failed to initialize directory structure: {}", e);
|
||||
return Err(e.into());
|
||||
}
|
||||
info!("✅ Local upload directory structure initialized");
|
||||
} else {
|
||||
info!("✅ File service initialized with S3 storage backend");
|
||||
// Initialize the storage backend (creates directories, validates access, etc.)
|
||||
if let Err(e) = file_service.initialize_storage().await {
|
||||
error!("Failed to initialize storage backend: {}", e);
|
||||
return Err(e.into());
|
||||
}
|
||||
info!("✅ Storage backend initialized successfully");
|
||||
|
||||
// Migrate existing files to new structure (one-time operation)
|
||||
info!("Migrating existing files to structured directories...");
|
||||
|
|
|
|||
|
|
@ -8,6 +8,7 @@ use tracing::{info, warn, error};
|
|||
|
||||
use crate::models::Document;
|
||||
use crate::services::s3_service::S3Service;
|
||||
use crate::storage::{StorageBackend, StorageConfig, factory};
|
||||
|
||||
#[cfg(feature = "ocr")]
|
||||
use image::{DynamicImage, ImageFormat, imageops::FilterType};
|
||||
|
|
@ -15,31 +16,75 @@ use image::{DynamicImage, ImageFormat, imageops::FilterType};
|
|||
#[derive(Clone)]
|
||||
pub struct FileService {
|
||||
upload_path: String,
|
||||
/// Storage backend for all file operations
|
||||
storage: Arc<dyn StorageBackend>,
|
||||
/// Legacy S3 service reference for backward compatibility
|
||||
/// TODO: Remove this after all usage sites are migrated
|
||||
s3_service: Option<Arc<S3Service>>,
|
||||
}
|
||||
|
||||
impl FileService {
|
||||
/// Create a new FileService with local storage (backward compatible)
|
||||
pub fn new(upload_path: String) -> Self {
|
||||
use crate::storage::local::LocalStorageBackend;
|
||||
let local_backend = LocalStorageBackend::new(upload_path.clone());
|
||||
Self {
|
||||
upload_path,
|
||||
storage: Arc::new(local_backend),
|
||||
s3_service: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a new FileService with S3 storage (backward compatible)
|
||||
pub fn new_with_s3(upload_path: String, s3_service: Arc<S3Service>) -> Self {
|
||||
let storage_backend = s3_service.clone() as Arc<dyn StorageBackend>;
|
||||
Self {
|
||||
upload_path,
|
||||
storage: storage_backend,
|
||||
s3_service: Some(s3_service),
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a new FileService with a specific storage backend (new API)
|
||||
pub fn with_storage(upload_path: String, storage: Arc<dyn StorageBackend>) -> Self {
|
||||
Self {
|
||||
upload_path,
|
||||
storage,
|
||||
s3_service: None, // New API doesn't need legacy S3 reference
|
||||
}
|
||||
}
|
||||
|
||||
/// Create FileService from storage configuration (factory pattern)
|
||||
pub async fn from_config(config: StorageConfig, upload_path: String) -> Result<Self> {
|
||||
let storage = factory::create_storage_backend(config).await?;
|
||||
Ok(Self::with_storage(upload_path, storage))
|
||||
}
|
||||
|
||||
/// Check if S3 storage is enabled
|
||||
pub fn is_s3_enabled(&self) -> bool {
|
||||
self.s3_service.is_some()
|
||||
// Check if storage backend is S3 type
|
||||
self.storage.storage_type() == "s3" || self.s3_service.is_some()
|
||||
}
|
||||
|
||||
/// Get the storage backend type
|
||||
pub fn storage_type(&self) -> &'static str {
|
||||
self.storage.storage_type()
|
||||
}
|
||||
|
||||
/// Initialize the upload directory structure
|
||||
/// Initialize the storage backend and directory structure
|
||||
pub async fn initialize_storage(&self) -> Result<()> {
|
||||
// Initialize the storage backend first
|
||||
self.storage.initialize().await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Initialize the upload directory structure (legacy method for local storage)
|
||||
pub async fn initialize_directory_structure(&self) -> Result<()> {
|
||||
// For non-local storage, this is a no-op
|
||||
if self.storage.storage_type() != "local" {
|
||||
info!("Skipping directory structure initialization for {} storage", self.storage.storage_type());
|
||||
return Ok(());
|
||||
}
|
||||
let base_path = Path::new(&self.upload_path);
|
||||
|
||||
// Create subdirectories for organized file storage
|
||||
|
|
@ -168,63 +213,23 @@ impl FileService {
|
|||
|
||||
/// Save file for a specific document (works with both local and S3)
|
||||
pub async fn save_document_file(&self, user_id: Uuid, document_id: Uuid, filename: &str, data: &[u8]) -> Result<String> {
|
||||
if let Some(s3_service) = &self.s3_service {
|
||||
// Use S3 storage
|
||||
let s3_key = s3_service.store_document(user_id, document_id, filename, data).await?;
|
||||
info!("Saved document to S3: {}", s3_key);
|
||||
Ok(format!("s3://{}", s3_key))
|
||||
} else {
|
||||
// Use local storage
|
||||
self.save_file(filename, data).await
|
||||
}
|
||||
let storage_path = self.storage.store_document(user_id, document_id, filename, data).await?;
|
||||
info!("Saved document via storage backend: {}", storage_path);
|
||||
Ok(storage_path)
|
||||
}
|
||||
|
||||
/// Save thumbnail (works with both local and S3)
|
||||
pub async fn save_thumbnail(&self, user_id: Uuid, document_id: Uuid, data: &[u8]) -> Result<String> {
|
||||
if let Some(s3_service) = &self.s3_service {
|
||||
// Use S3 storage
|
||||
let s3_key = s3_service.store_thumbnail(user_id, document_id, data).await?;
|
||||
info!("Saved thumbnail to S3: {}", s3_key);
|
||||
Ok(format!("s3://{}", s3_key))
|
||||
} else {
|
||||
// Use local storage
|
||||
let thumbnails_dir = self.get_thumbnails_path();
|
||||
if let Err(e) = fs::create_dir_all(&thumbnails_dir).await {
|
||||
error!("Failed to create thumbnails directory: {}", e);
|
||||
return Err(anyhow::anyhow!("Failed to create thumbnails directory: {}", e));
|
||||
}
|
||||
|
||||
let thumbnail_filename = format!("{}_thumb.jpg", document_id);
|
||||
let thumbnail_path = thumbnails_dir.join(&thumbnail_filename);
|
||||
|
||||
fs::write(&thumbnail_path, data).await?;
|
||||
info!("Saved thumbnail locally: {}", thumbnail_path.display());
|
||||
Ok(thumbnail_path.to_string_lossy().to_string())
|
||||
}
|
||||
let storage_path = self.storage.store_thumbnail(user_id, document_id, data).await?;
|
||||
info!("Saved thumbnail via storage backend: {}", storage_path);
|
||||
Ok(storage_path)
|
||||
}
|
||||
|
||||
/// Save processed image (works with both local and S3)
|
||||
pub async fn save_processed_image(&self, user_id: Uuid, document_id: Uuid, data: &[u8]) -> Result<String> {
|
||||
if let Some(s3_service) = &self.s3_service {
|
||||
// Use S3 storage
|
||||
let s3_key = s3_service.store_processed_image(user_id, document_id, data).await?;
|
||||
info!("Saved processed image to S3: {}", s3_key);
|
||||
Ok(format!("s3://{}", s3_key))
|
||||
} else {
|
||||
// Use local storage
|
||||
let processed_dir = self.get_processed_images_path();
|
||||
if let Err(e) = fs::create_dir_all(&processed_dir).await {
|
||||
error!("Failed to create processed images directory: {}", e);
|
||||
return Err(anyhow::anyhow!("Failed to create processed images directory: {}", e));
|
||||
}
|
||||
|
||||
let processed_filename = format!("{}_processed.png", document_id);
|
||||
let processed_path = processed_dir.join(&processed_filename);
|
||||
|
||||
fs::write(&processed_path, data).await?;
|
||||
info!("Saved processed image locally: {}", processed_path.display());
|
||||
Ok(processed_path.to_string_lossy().to_string())
|
||||
}
|
||||
let storage_path = self.storage.store_processed_image(user_id, document_id, data).await?;
|
||||
info!("Saved processed image via storage backend: {}", storage_path);
|
||||
Ok(storage_path)
|
||||
}
|
||||
|
||||
pub fn create_document(
|
||||
|
|
@ -363,20 +368,24 @@ impl FileService {
|
|||
}
|
||||
|
||||
pub async fn read_file(&self, file_path: &str) -> Result<Vec<u8>> {
|
||||
// Check if this is an S3 path
|
||||
// Check if this is a storage backend path (s3:// or other prefixes)
|
||||
if file_path.starts_with("s3://") {
|
||||
if let Some(s3_service) = &self.s3_service {
|
||||
let s3_key = file_path.strip_prefix("s3://").unwrap_or(file_path);
|
||||
return s3_service.retrieve_file(s3_key).await;
|
||||
} else {
|
||||
return Err(anyhow::anyhow!("S3 path provided but S3 service not configured: {}", file_path));
|
||||
}
|
||||
// Strip the s3:// prefix and delegate to storage backend
|
||||
let storage_key = file_path.strip_prefix("s3://").unwrap_or(file_path);
|
||||
return self.storage.retrieve_file(storage_key).await;
|
||||
}
|
||||
|
||||
// Handle local file path
|
||||
let resolved_path = self.resolve_file_path(file_path).await?;
|
||||
let data = fs::read(&resolved_path).await?;
|
||||
Ok(data)
|
||||
// For local files, we might need to use the storage backend or fall back to direct file access
|
||||
// Try storage backend first, then fall back to legacy file resolution
|
||||
match self.storage.retrieve_file(file_path).await {
|
||||
Ok(data) => Ok(data),
|
||||
Err(_) => {
|
||||
// Fall back to legacy file resolution for backward compatibility
|
||||
let resolved_path = self.resolve_file_path(file_path).await?;
|
||||
let data = fs::read(&resolved_path).await?;
|
||||
Ok(data)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "ocr")]
|
||||
|
|
@ -639,13 +648,15 @@ impl FileService {
|
|||
}
|
||||
|
||||
pub async fn delete_document_files(&self, document: &Document) -> Result<()> {
|
||||
// Check if this document uses S3 storage
|
||||
if document.file_path.starts_with("s3://") {
|
||||
if let Some(s3_service) = &self.s3_service {
|
||||
// Use S3 deletion
|
||||
return s3_service.delete_document_files(document.user_id, document.id, &document.filename).await;
|
||||
} else {
|
||||
return Err(anyhow::anyhow!("Document stored in S3 but S3 service not configured"));
|
||||
// Use storage backend for deletion - it handles both S3 and local storage
|
||||
match self.storage.delete_document_files(document.user_id, document.id, &document.filename).await {
|
||||
Ok(_) => {
|
||||
info!("Successfully deleted files for document {} via storage backend", document.id);
|
||||
return Ok(());
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("Storage backend deletion failed for document {}: {}. Falling back to legacy deletion.", document.id, e);
|
||||
// Fall back to legacy deletion logic for backward compatibility
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -707,7 +718,7 @@ impl FileService {
|
|||
if deleted_files.is_empty() {
|
||||
info!("No files needed deletion for document {} (all files already removed)", document.id);
|
||||
} else {
|
||||
info!("Successfully deleted {} files for document {}", deleted_files.len(), document.id);
|
||||
info!("Successfully deleted {} files for document {} via legacy method", deleted_files.len(), document.id);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
|
|
|||
|
|
@ -1,4 +1,5 @@
|
|||
use anyhow::{anyhow, Result};
|
||||
use async_trait::async_trait;
|
||||
use chrono::{DateTime, Datelike};
|
||||
use tracing::{debug, info, warn, error};
|
||||
use serde_json;
|
||||
|
|
@ -16,6 +17,7 @@ use aws_types::region::Region as AwsRegion;
|
|||
use aws_sdk_s3::primitives::ByteStream;
|
||||
|
||||
use crate::models::{FileIngestionInfo, S3SourceConfig};
|
||||
use crate::storage::StorageBackend;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct S3Service {
|
||||
|
|
@ -655,6 +657,168 @@ impl S3Service {
|
|||
}
|
||||
}
|
||||
|
||||
// Implement StorageBackend trait for S3Service
|
||||
#[async_trait]
|
||||
impl StorageBackend for S3Service {
|
||||
fn as_any(&self) -> Option<&dyn std::any::Any> {
|
||||
Some(self)
|
||||
}
|
||||
async fn store_document(&self, user_id: Uuid, document_id: Uuid, filename: &str, data: &[u8]) -> Result<String> {
|
||||
// Generate S3 key
|
||||
let key = self.generate_document_key(user_id, document_id, filename);
|
||||
self.store_file(&key, data, None).await?;
|
||||
Ok(format!("s3://{}", key))
|
||||
}
|
||||
|
||||
async fn store_thumbnail(&self, user_id: Uuid, document_id: Uuid, data: &[u8]) -> Result<String> {
|
||||
let key = format!("thumbnails/{}/{}_thumb.jpg", user_id, document_id);
|
||||
self.store_file(&key, data, Some(self.get_image_metadata())).await?;
|
||||
Ok(format!("s3://{}", key))
|
||||
}
|
||||
|
||||
async fn store_processed_image(&self, user_id: Uuid, document_id: Uuid, data: &[u8]) -> Result<String> {
|
||||
let key = format!("processed_images/{}/{}_processed.png", user_id, document_id);
|
||||
self.store_file(&key, data, Some(self.get_image_metadata())).await?;
|
||||
Ok(format!("s3://{}", key))
|
||||
}
|
||||
|
||||
async fn retrieve_file(&self, path: &str) -> Result<Vec<u8>> {
|
||||
// Handle s3:// prefix if present
|
||||
let key = if path.starts_with("s3://") {
|
||||
path.strip_prefix("s3://").unwrap_or(path)
|
||||
} else {
|
||||
path
|
||||
};
|
||||
|
||||
#[cfg(not(feature = "s3"))]
|
||||
{
|
||||
return Err(anyhow!("S3 support not compiled in"));
|
||||
}
|
||||
|
||||
#[cfg(feature = "s3")]
|
||||
{
|
||||
info!("Retrieving file from S3: {}/{}", self.config.bucket_name, key);
|
||||
|
||||
let key_owned = key.to_string();
|
||||
let bucket_name = self.config.bucket_name.clone();
|
||||
let client = self.client.clone();
|
||||
|
||||
let bytes = self.retry_operation(&format!("retrieve_file: {}", key), || {
|
||||
let key = key_owned.clone();
|
||||
let bucket_name = bucket_name.clone();
|
||||
let client = client.clone();
|
||||
|
||||
async move {
|
||||
let response = client
|
||||
.get_object()
|
||||
.bucket(&bucket_name)
|
||||
.key(&key)
|
||||
.send()
|
||||
.await
|
||||
.map_err(|e| anyhow!("Failed to retrieve file {}: {}", key, e))?;
|
||||
|
||||
let body = response.body.collect().await
|
||||
.map_err(|e| anyhow!("Failed to read file body: {}", e))?;
|
||||
|
||||
Ok(body.into_bytes().to_vec())
|
||||
}
|
||||
}).await?;
|
||||
|
||||
info!("Successfully retrieved file: {} ({} bytes)", key, bytes.len());
|
||||
Ok(bytes)
|
||||
}
|
||||
}
|
||||
|
||||
async fn delete_document_files(&self, user_id: Uuid, document_id: Uuid, filename: &str) -> Result<()> {
|
||||
#[cfg(not(feature = "s3"))]
|
||||
{
|
||||
return Err(anyhow!("S3 support not compiled in"));
|
||||
}
|
||||
|
||||
#[cfg(feature = "s3")]
|
||||
{
|
||||
let document_key = self.generate_document_key(user_id, document_id, filename);
|
||||
let thumbnail_key = format!("thumbnails/{}/{}_thumb.jpg", user_id, document_id);
|
||||
let processed_key = format!("processed_images/{}/{}_processed.png", user_id, document_id);
|
||||
|
||||
let mut errors = Vec::new();
|
||||
|
||||
// Delete document file
|
||||
if let Err(e) = self.delete_file(&document_key).await {
|
||||
if !e.to_string().contains("NotFound") {
|
||||
errors.push(format!("Document: {}", e));
|
||||
}
|
||||
}
|
||||
|
||||
// Delete thumbnail
|
||||
if let Err(e) = self.delete_file(&thumbnail_key).await {
|
||||
if !e.to_string().contains("NotFound") {
|
||||
errors.push(format!("Thumbnail: {}", e));
|
||||
}
|
||||
}
|
||||
|
||||
// Delete processed image
|
||||
if let Err(e) = self.delete_file(&processed_key).await {
|
||||
if !e.to_string().contains("NotFound") {
|
||||
errors.push(format!("Processed image: {}", e));
|
||||
}
|
||||
}
|
||||
|
||||
if !errors.is_empty() {
|
||||
return Err(anyhow!("Failed to delete some files: {}", errors.join("; ")));
|
||||
}
|
||||
|
||||
info!("Successfully deleted all files for document {}", document_id);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
async fn file_exists(&self, path: &str) -> Result<bool> {
|
||||
// Handle s3:// prefix if present
|
||||
let key = if path.starts_with("s3://") {
|
||||
path.strip_prefix("s3://").unwrap_or(path)
|
||||
} else {
|
||||
path
|
||||
};
|
||||
|
||||
#[cfg(not(feature = "s3"))]
|
||||
{
|
||||
return Err(anyhow!("S3 support not compiled in"));
|
||||
}
|
||||
|
||||
#[cfg(feature = "s3")]
|
||||
{
|
||||
match self.client
|
||||
.head_object()
|
||||
.bucket(&self.config.bucket_name)
|
||||
.key(key)
|
||||
.send()
|
||||
.await
|
||||
{
|
||||
Ok(_) => Ok(true),
|
||||
Err(e) => {
|
||||
let error_msg = e.to_string();
|
||||
if error_msg.contains("NotFound") || error_msg.contains("404") {
|
||||
Ok(false)
|
||||
} else {
|
||||
Err(anyhow!("Failed to check file existence {}: {}", key, e))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn storage_type(&self) -> &'static str {
|
||||
"s3"
|
||||
}
|
||||
|
||||
async fn initialize(&self) -> Result<()> {
|
||||
self.test_connection().await?;
|
||||
info!("S3 storage backend initialized successfully");
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
|
|
|||
|
|
@ -0,0 +1,59 @@
|
|||
//! Factory for creating storage backends based on configuration
|
||||
|
||||
use anyhow::Result;
|
||||
use std::sync::Arc;
|
||||
|
||||
use super::{StorageBackend, StorageConfig};
|
||||
use super::local::LocalStorageBackend;
|
||||
|
||||
#[cfg(feature = "s3")]
|
||||
use crate::services::s3_service::S3Service;
|
||||
|
||||
/// Create a storage backend based on the provided configuration
|
||||
pub async fn create_storage_backend(config: StorageConfig) -> Result<Arc<dyn StorageBackend>> {
|
||||
match config {
|
||||
StorageConfig::Local { upload_path } => {
|
||||
let backend = LocalStorageBackend::new(upload_path);
|
||||
backend.initialize().await?;
|
||||
Ok(Arc::new(backend))
|
||||
}
|
||||
#[cfg(feature = "s3")]
|
||||
StorageConfig::S3 { s3_config, .. } => {
|
||||
let backend = S3Service::new(s3_config).await?;
|
||||
backend.initialize().await?;
|
||||
Ok(Arc::new(backend))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Create storage configuration from environment variables
|
||||
pub fn storage_config_from_env(config: &crate::config::Config) -> Result<StorageConfig> {
|
||||
if config.s3_enabled {
|
||||
#[cfg(feature = "s3")]
|
||||
{
|
||||
if let Some(s3_config) = &config.s3_config {
|
||||
Ok(StorageConfig::S3 {
|
||||
s3_config: s3_config.clone(),
|
||||
fallback_path: Some(config.upload_path.clone()),
|
||||
})
|
||||
} else {
|
||||
// S3 enabled but no config, fall back to local
|
||||
Ok(StorageConfig::Local {
|
||||
upload_path: config.upload_path.clone(),
|
||||
})
|
||||
}
|
||||
}
|
||||
#[cfg(not(feature = "s3"))]
|
||||
{
|
||||
// S3 requested but not compiled in
|
||||
tracing::warn!("S3 storage requested but S3 feature not compiled in, using local storage");
|
||||
Ok(StorageConfig::Local {
|
||||
upload_path: config.upload_path.clone(),
|
||||
})
|
||||
}
|
||||
} else {
|
||||
Ok(StorageConfig::Local {
|
||||
upload_path: config.upload_path.clone(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,276 @@
|
|||
//! Local filesystem storage backend implementation
|
||||
|
||||
use anyhow::Result;
|
||||
use async_trait::async_trait;
|
||||
use std::path::{Path, PathBuf};
|
||||
use tokio::fs;
|
||||
use tracing::{info, error};
|
||||
use uuid::Uuid;
|
||||
|
||||
use super::StorageBackend;
|
||||
|
||||
/// Local filesystem storage backend
|
||||
pub struct LocalStorageBackend {
|
||||
upload_path: String,
|
||||
}
|
||||
|
||||
impl LocalStorageBackend {
|
||||
/// Create a new local storage backend
|
||||
pub fn new(upload_path: String) -> Self {
|
||||
Self { upload_path }
|
||||
}
|
||||
|
||||
/// Get the base upload path
|
||||
pub fn get_upload_path(&self) -> &str {
|
||||
&self.upload_path
|
||||
}
|
||||
|
||||
/// Get path for documents subdirectory
|
||||
pub fn get_documents_path(&self) -> PathBuf {
|
||||
Path::new(&self.upload_path).join("documents")
|
||||
}
|
||||
|
||||
/// Get path for thumbnails subdirectory
|
||||
pub fn get_thumbnails_path(&self) -> PathBuf {
|
||||
Path::new(&self.upload_path).join("thumbnails")
|
||||
}
|
||||
|
||||
/// Get path for processed images subdirectory
|
||||
pub fn get_processed_images_path(&self) -> PathBuf {
|
||||
Path::new(&self.upload_path).join("processed_images")
|
||||
}
|
||||
|
||||
/// Get path for temporary files subdirectory
|
||||
pub fn get_temp_path(&self) -> PathBuf {
|
||||
Path::new(&self.upload_path).join("temp")
|
||||
}
|
||||
|
||||
/// Get path for backups subdirectory
|
||||
pub fn get_backups_path(&self) -> PathBuf {
|
||||
Path::new(&self.upload_path).join("backups")
|
||||
}
|
||||
|
||||
/// Resolve file path, handling both old and new directory structures
|
||||
pub async fn resolve_file_path(&self, file_path: &str) -> Result<String> {
|
||||
// If the file exists at the given path, use it
|
||||
if Path::new(file_path).exists() {
|
||||
return Ok(file_path.to_string());
|
||||
}
|
||||
|
||||
// Try to find the file in the new structured directory
|
||||
if file_path.starts_with("./uploads/") && !file_path.contains("/documents/") {
|
||||
let new_path = file_path.replace("./uploads/", "./uploads/documents/");
|
||||
if Path::new(&new_path).exists() {
|
||||
info!("Found file in new structured directory: {} -> {}", file_path, new_path);
|
||||
return Ok(new_path);
|
||||
}
|
||||
}
|
||||
|
||||
// Try without the ./ prefix
|
||||
if file_path.starts_with("uploads/") && !file_path.contains("/documents/") {
|
||||
let new_path = file_path.replace("uploads/", "uploads/documents/");
|
||||
if Path::new(&new_path).exists() {
|
||||
info!("Found file in new structured directory: {} -> {}", file_path, new_path);
|
||||
return Ok(new_path);
|
||||
}
|
||||
}
|
||||
|
||||
// File not found in any expected location
|
||||
Err(anyhow::anyhow!("File not found: {} (checked original path and structured directory)", file_path))
|
||||
}
|
||||
|
||||
/// Save a file with generated UUID filename (legacy method)
|
||||
pub async fn save_file(&self, filename: &str, data: &[u8]) -> Result<String> {
|
||||
let file_id = Uuid::new_v4();
|
||||
let extension = Path::new(filename)
|
||||
.extension()
|
||||
.and_then(|ext| ext.to_str())
|
||||
.unwrap_or("");
|
||||
|
||||
let saved_filename = if extension.is_empty() {
|
||||
file_id.to_string()
|
||||
} else {
|
||||
format!("{}.{}", file_id, extension)
|
||||
};
|
||||
|
||||
// Save to documents subdirectory
|
||||
let documents_dir = self.get_documents_path();
|
||||
let file_path = documents_dir.join(&saved_filename);
|
||||
|
||||
// Ensure the documents directory exists
|
||||
if let Err(e) = fs::create_dir_all(&documents_dir).await {
|
||||
error!("Failed to create documents directory: {}", e);
|
||||
return Err(anyhow::anyhow!("Failed to create documents directory: {}", e));
|
||||
}
|
||||
|
||||
fs::write(&file_path, data).await?;
|
||||
|
||||
Ok(file_path.to_string_lossy().to_string())
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl StorageBackend for LocalStorageBackend {
|
||||
async fn store_document(&self, _user_id: Uuid, document_id: Uuid, filename: &str, data: &[u8]) -> Result<String> {
|
||||
let extension = Path::new(filename)
|
||||
.extension()
|
||||
.and_then(|ext| ext.to_str())
|
||||
.unwrap_or("");
|
||||
|
||||
let document_filename = if extension.is_empty() {
|
||||
document_id.to_string()
|
||||
} else {
|
||||
format!("{}.{}", document_id, extension)
|
||||
};
|
||||
|
||||
let documents_dir = self.get_documents_path();
|
||||
let file_path = documents_dir.join(&document_filename);
|
||||
|
||||
// Ensure the documents directory exists
|
||||
fs::create_dir_all(&documents_dir).await?;
|
||||
|
||||
fs::write(&file_path, data).await?;
|
||||
|
||||
info!("Stored document locally: {}", file_path.display());
|
||||
Ok(file_path.to_string_lossy().to_string())
|
||||
}
|
||||
|
||||
async fn store_thumbnail(&self, _user_id: Uuid, document_id: Uuid, data: &[u8]) -> Result<String> {
|
||||
let thumbnails_dir = self.get_thumbnails_path();
|
||||
fs::create_dir_all(&thumbnails_dir).await?;
|
||||
|
||||
let thumbnail_filename = format!("{}_thumb.jpg", document_id);
|
||||
let thumbnail_path = thumbnails_dir.join(&thumbnail_filename);
|
||||
|
||||
fs::write(&thumbnail_path, data).await?;
|
||||
|
||||
info!("Stored thumbnail locally: {}", thumbnail_path.display());
|
||||
Ok(thumbnail_path.to_string_lossy().to_string())
|
||||
}
|
||||
|
||||
async fn store_processed_image(&self, _user_id: Uuid, document_id: Uuid, data: &[u8]) -> Result<String> {
|
||||
let processed_dir = self.get_processed_images_path();
|
||||
fs::create_dir_all(&processed_dir).await?;
|
||||
|
||||
let processed_filename = format!("{}_processed.png", document_id);
|
||||
let processed_path = processed_dir.join(&processed_filename);
|
||||
|
||||
fs::write(&processed_path, data).await?;
|
||||
|
||||
info!("Stored processed image locally: {}", processed_path.display());
|
||||
Ok(processed_path.to_string_lossy().to_string())
|
||||
}
|
||||
|
||||
async fn retrieve_file(&self, path: &str) -> Result<Vec<u8>> {
|
||||
let resolved_path = self.resolve_file_path(path).await?;
|
||||
let data = fs::read(&resolved_path).await?;
|
||||
Ok(data)
|
||||
}
|
||||
|
||||
async fn delete_document_files(&self, _user_id: Uuid, document_id: Uuid, filename: &str) -> Result<()> {
|
||||
let mut deleted_files = Vec::new();
|
||||
let mut serious_errors = Vec::new();
|
||||
|
||||
// Helper function to safely delete a file
|
||||
async fn safe_delete(path: &Path, serious_errors: &mut Vec<String>) -> Option<String> {
|
||||
match fs::remove_file(path).await {
|
||||
Ok(_) => {
|
||||
info!("Deleted file: {}", path.display());
|
||||
Some(path.to_string_lossy().to_string())
|
||||
}
|
||||
Err(e) => {
|
||||
match e.kind() {
|
||||
std::io::ErrorKind::NotFound => {
|
||||
info!("File already deleted: {}", path.display());
|
||||
None
|
||||
}
|
||||
_ => {
|
||||
serious_errors.push(format!("Failed to delete file {}: {}", path.display(), e));
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Delete main document file (try to find it first)
|
||||
let extension = Path::new(filename)
|
||||
.extension()
|
||||
.and_then(|ext| ext.to_str())
|
||||
.unwrap_or("");
|
||||
|
||||
let document_filename = if extension.is_empty() {
|
||||
document_id.to_string()
|
||||
} else {
|
||||
format!("{}.{}", document_id, extension)
|
||||
};
|
||||
|
||||
let main_file = self.get_documents_path().join(&document_filename);
|
||||
if let Some(deleted_path) = safe_delete(&main_file, &mut serious_errors).await {
|
||||
deleted_files.push(deleted_path);
|
||||
}
|
||||
|
||||
// Delete thumbnail if it exists
|
||||
let thumbnail_filename = format!("{}_thumb.jpg", document_id);
|
||||
let thumbnail_path = self.get_thumbnails_path().join(&thumbnail_filename);
|
||||
if let Some(deleted_path) = safe_delete(&thumbnail_path, &mut serious_errors).await {
|
||||
deleted_files.push(deleted_path);
|
||||
}
|
||||
|
||||
// Delete processed image if it exists
|
||||
let processed_image_filename = format!("{}_processed.png", document_id);
|
||||
let processed_image_path = self.get_processed_images_path().join(&processed_image_filename);
|
||||
if let Some(deleted_path) = safe_delete(&processed_image_path, &mut serious_errors).await {
|
||||
deleted_files.push(deleted_path);
|
||||
}
|
||||
|
||||
// Only fail if there were serious errors (not "file not found")
|
||||
if !serious_errors.is_empty() {
|
||||
error!("Serious errors occurred while deleting files for document {}: {}", document_id, serious_errors.join("; "));
|
||||
return Err(anyhow::anyhow!("File deletion errors: {}", serious_errors.join("; ")));
|
||||
}
|
||||
|
||||
if deleted_files.is_empty() {
|
||||
info!("No files needed deletion for document {} (all files already removed)", document_id);
|
||||
} else {
|
||||
info!("Successfully deleted {} files for document {}", deleted_files.len(), document_id);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn file_exists(&self, path: &str) -> Result<bool> {
|
||||
match self.resolve_file_path(path).await {
|
||||
Ok(_) => Ok(true),
|
||||
Err(_) => Ok(false),
|
||||
}
|
||||
}
|
||||
|
||||
fn storage_type(&self) -> &'static str {
|
||||
"local"
|
||||
}
|
||||
|
||||
async fn initialize(&self) -> Result<()> {
|
||||
let base_path = Path::new(&self.upload_path);
|
||||
|
||||
// Create subdirectories for organized file storage
|
||||
let directories = [
|
||||
"documents", // Final uploaded documents
|
||||
"thumbnails", // Document thumbnails
|
||||
"processed_images", // OCR processed images for review
|
||||
"temp", // Temporary files during processing
|
||||
"backups", // Document backups
|
||||
];
|
||||
|
||||
for dir in directories.iter() {
|
||||
let dir_path = base_path.join(dir);
|
||||
if let Err(e) = fs::create_dir_all(&dir_path).await {
|
||||
error!("Failed to create directory {:?}: {}", dir_path, e);
|
||||
return Err(anyhow::anyhow!("Failed to create directory structure: {}", e));
|
||||
}
|
||||
info!("Ensured directory exists: {:?}", dir_path);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,62 @@
|
|||
//! Storage backend abstraction for document management
|
||||
//!
|
||||
//! This module provides a clean abstraction over different storage backends
|
||||
//! (local filesystem, S3, etc.) with a unified interface.
|
||||
|
||||
use anyhow::Result;
|
||||
use async_trait::async_trait;
|
||||
use uuid::Uuid;
|
||||
|
||||
pub mod local;
|
||||
pub mod factory;
|
||||
|
||||
/// Core storage backend trait that all storage implementations must implement
|
||||
#[async_trait]
|
||||
pub trait StorageBackend: Send + Sync {
|
||||
/// Support for downcasting to concrete types (for backward compatibility)
|
||||
fn as_any(&self) -> Option<&dyn std::any::Any> {
|
||||
None // Default implementation returns None
|
||||
}
|
||||
/// Store a document file
|
||||
/// Returns the storage path/key where the document was stored
|
||||
async fn store_document(&self, user_id: Uuid, document_id: Uuid, filename: &str, data: &[u8]) -> Result<String>;
|
||||
|
||||
/// Store a thumbnail image
|
||||
/// Returns the storage path/key where the thumbnail was stored
|
||||
async fn store_thumbnail(&self, user_id: Uuid, document_id: Uuid, data: &[u8]) -> Result<String>;
|
||||
|
||||
/// Store a processed image (e.g., OCR processed image)
|
||||
/// Returns the storage path/key where the processed image was stored
|
||||
async fn store_processed_image(&self, user_id: Uuid, document_id: Uuid, data: &[u8]) -> Result<String>;
|
||||
|
||||
/// Retrieve file data by storage path/key
|
||||
async fn retrieve_file(&self, path: &str) -> Result<Vec<u8>>;
|
||||
|
||||
/// Delete all files associated with a document (document, thumbnail, processed image)
|
||||
async fn delete_document_files(&self, user_id: Uuid, document_id: Uuid, filename: &str) -> Result<()>;
|
||||
|
||||
/// Check if a file exists at the given path/key
|
||||
async fn file_exists(&self, path: &str) -> Result<bool>;
|
||||
|
||||
/// Get a human-readable identifier for this storage backend type
|
||||
fn storage_type(&self) -> &'static str;
|
||||
|
||||
/// Initialize the storage backend (create directories, validate access, etc.)
|
||||
async fn initialize(&self) -> Result<()>;
|
||||
}
|
||||
|
||||
/// Storage configuration enum for different backend types
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum StorageConfig {
|
||||
/// Local filesystem storage
|
||||
Local {
|
||||
upload_path: String,
|
||||
},
|
||||
/// S3-compatible storage
|
||||
#[cfg(feature = "s3")]
|
||||
S3 {
|
||||
s3_config: crate::models::S3SourceConfig,
|
||||
/// Optional local fallback path for hybrid scenarios
|
||||
fallback_path: Option<String>,
|
||||
},
|
||||
}
|
||||
Loading…
Reference in New Issue