feat(server/client): working s3 and local source types

This commit is contained in:
perf3ct 2025-06-15 17:51:04 +00:00
parent df51e61d06
commit ea8ad2c262
No known key found for this signature in database
GPG Key ID: 569C4EEC436F5232
12 changed files with 2471 additions and 65 deletions

972
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -48,12 +48,17 @@ hostname = "0.4"
walkdir = "2"
clap = { version = "4", features = ["derive"] }
utoipa = { version = "5", features = ["axum_extras", "chrono", "uuid"] }
aws-config = { version = "1.0", optional = true }
aws-sdk-s3 = { version = "1.0", optional = true }
aws-credential-types = { version = "1.0", optional = true }
aws-types = { version = "1.0", optional = true }
sha2 = "0.10"
utoipa-swagger-ui = { version = "9", features = ["axum"] }
[features]
default = ["ocr"]
default = ["ocr", "s3"]
ocr = ["tesseract", "pdf-extract", "image", "imageproc", "raw-cpuid"]
s3 = ["aws-config", "aws-sdk-s3", "aws-credential-types", "aws-types"]
[dev-dependencies]
tempfile = "3"

View File

@ -108,14 +108,26 @@ const SourcesPage: React.FC = () => {
name: '',
source_type: 'webdav' as 'webdav' | 'local_folder' | 's3',
enabled: true,
// WebDAV fields
server_url: '',
username: '',
password: '',
server_type: 'generic' as 'nextcloud' | 'owncloud' | 'generic',
// Local Folder fields
recursive: true,
follow_symlinks: false,
// S3 fields
bucket_name: '',
region: 'us-east-1',
access_key_id: '',
secret_access_key: '',
endpoint_url: '',
prefix: '',
// Common fields
watch_folders: ['/Documents'],
file_extensions: ['pdf', 'png', 'jpg', 'jpeg', 'tiff', 'bmp', 'txt'],
auto_sync: false,
sync_interval_minutes: 60,
server_type: 'generic' as 'nextcloud' | 'owncloud' | 'generic',
});
// Additional state for enhanced features
@ -153,14 +165,26 @@ const SourcesPage: React.FC = () => {
name: '',
source_type: 'webdav',
enabled: true,
// WebDAV fields
server_url: '',
username: '',
password: '',
server_type: 'generic',
// Local Folder fields
recursive: true,
follow_symlinks: false,
// S3 fields
bucket_name: '',
region: 'us-east-1',
access_key_id: '',
secret_access_key: '',
endpoint_url: '',
prefix: '',
// Common fields
watch_folders: ['/Documents'],
file_extensions: ['pdf', 'png', 'jpg', 'jpeg', 'tiff', 'bmp', 'txt'],
auto_sync: false,
sync_interval_minutes: 60,
server_type: 'generic',
});
setCrawlEstimate(null);
setNewFolder('');
@ -175,14 +199,26 @@ const SourcesPage: React.FC = () => {
name: source.name,
source_type: source.source_type,
enabled: source.enabled,
// WebDAV fields
server_url: config.server_url || '',
username: config.username || '',
password: config.password || '',
server_type: config.server_type || 'generic',
// Local Folder fields
recursive: config.recursive !== undefined ? config.recursive : true,
follow_symlinks: config.follow_symlinks || false,
// S3 fields
bucket_name: config.bucket_name || '',
region: config.region || 'us-east-1',
access_key_id: config.access_key_id || '',
secret_access_key: config.secret_access_key || '',
endpoint_url: config.endpoint_url || '',
prefix: config.prefix || '',
// Common fields
watch_folders: config.watch_folders || ['/Documents'],
file_extensions: config.file_extensions || ['pdf', 'png', 'jpg', 'jpeg', 'tiff', 'bmp', 'txt'],
auto_sync: config.auto_sync || false,
sync_interval_minutes: config.sync_interval_minutes || 60,
server_type: config.server_type || 'generic',
});
setCrawlEstimate(null);
setNewFolder('');
@ -841,24 +877,24 @@ const SourcesPage: React.FC = () => {
</Box>
</Stack>
</MenuItem>
<MenuItem value="local_folder" disabled>
<MenuItem value="local_folder">
<Stack direction="row" alignItems="center" spacing={2}>
<StorageIcon />
<FolderIcon />
<Box>
<Typography variant="body1">Local Folder</Typography>
<Typography variant="caption" color="text.secondary">
Coming Soon
Monitor local filesystem directories
</Typography>
</Box>
</Stack>
</MenuItem>
<MenuItem value="s3" disabled>
<MenuItem value="s3">
<Stack direction="row" alignItems="center" spacing={2}>
<CloudIcon />
<Box>
<Typography variant="body1">S3 Compatible</Typography>
<Typography variant="caption" color="text.secondary">
Coming Soon
AWS S3, MinIO, and other S3-compatible storage
</Typography>
</Box>
</Stack>

111
src/db.rs
View File

@ -2104,4 +2104,115 @@ impl Database {
Ok(documents)
}
// Source management operations
pub async fn get_all_sources(&self) -> Result<Vec<crate::models::Source>> {
let rows = sqlx::query(
r#"SELECT id, user_id, name, source_type, enabled, config, status,
last_sync_at, last_error, last_error_at, total_files_synced,
total_files_pending, total_size_bytes, created_at, updated_at
FROM sources ORDER BY created_at DESC"#
)
.fetch_all(&self.pool)
.await?;
let mut sources = Vec::new();
for row in rows {
sources.push(crate::models::Source {
id: row.get("id"),
user_id: row.get("user_id"),
name: row.get("name"),
source_type: row.get::<String, _>("source_type").try_into()
.map_err(|e| anyhow::anyhow!("Invalid source type: {}", e))?,
enabled: row.get("enabled"),
config: row.get("config"),
status: row.get::<String, _>("status").try_into()
.map_err(|e| anyhow::anyhow!("Invalid source status: {}", e))?,
last_sync_at: row.get("last_sync_at"),
last_error: row.get("last_error"),
last_error_at: row.get("last_error_at"),
total_files_synced: row.get("total_files_synced"),
total_files_pending: row.get("total_files_pending"),
total_size_bytes: row.get("total_size_bytes"),
created_at: row.get("created_at"),
updated_at: row.get("updated_at"),
});
}
Ok(sources)
}
pub async fn get_sources_for_sync(&self) -> Result<Vec<crate::models::Source>> {
let rows = sqlx::query(
r#"SELECT id, user_id, name, source_type, enabled, config, status,
last_sync_at, last_error, last_error_at, total_files_synced,
total_files_pending, total_size_bytes, created_at, updated_at
FROM sources
WHERE enabled = true AND status != 'syncing'
ORDER BY last_sync_at ASC NULLS FIRST"#
)
.fetch_all(&self.pool)
.await?;
let mut sources = Vec::new();
for row in rows {
sources.push(crate::models::Source {
id: row.get("id"),
user_id: row.get("user_id"),
name: row.get("name"),
source_type: row.get::<String, _>("source_type").try_into()
.map_err(|e| anyhow::anyhow!("Invalid source type: {}", e))?,
enabled: row.get("enabled"),
config: row.get("config"),
status: row.get::<String, _>("status").try_into()
.map_err(|e| anyhow::anyhow!("Invalid source status: {}", e))?,
last_sync_at: row.get("last_sync_at"),
last_error: row.get("last_error"),
last_error_at: row.get("last_error_at"),
total_files_synced: row.get("total_files_synced"),
total_files_pending: row.get("total_files_pending"),
total_size_bytes: row.get("total_size_bytes"),
created_at: row.get("created_at"),
updated_at: row.get("updated_at"),
});
}
Ok(sources)
}
pub async fn get_source_by_id(&self, source_id: Uuid) -> Result<Option<crate::models::Source>> {
let row = sqlx::query(
r#"SELECT id, user_id, name, source_type, enabled, config, status,
last_sync_at, last_error, last_error_at, total_files_synced,
total_files_pending, total_size_bytes, created_at, updated_at
FROM sources WHERE id = $1"#
)
.bind(source_id)
.fetch_optional(&self.pool)
.await?;
if let Some(row) = row {
Ok(Some(crate::models::Source {
id: row.get("id"),
user_id: row.get("user_id"),
name: row.get("name"),
source_type: row.get::<String, _>("source_type").try_into()
.map_err(|e| anyhow::anyhow!("Invalid source type: {}", e))?,
enabled: row.get("enabled"),
config: row.get("config"),
status: row.get::<String, _>("status").try_into()
.map_err(|e| anyhow::anyhow!("Invalid source status: {}", e))?,
last_sync_at: row.get("last_sync_at"),
last_error: row.get("last_error"),
last_error_at: row.get("last_error_at"),
total_files_synced: row.get("total_files_synced"),
total_files_pending: row.get("total_files_pending"),
total_size_bytes: row.get("total_size_bytes"),
created_at: row.get("created_at"),
updated_at: row.get("updated_at"),
}))
} else {
Ok(None)
}
}
}

View File

@ -4,6 +4,7 @@ pub mod config;
pub mod db;
pub mod enhanced_ocr;
pub mod file_service;
pub mod local_folder_service;
pub mod models;
pub mod ocr;
pub mod ocr_api;
@ -13,7 +14,10 @@ pub mod ocr_health;
pub mod ocr_queue;
pub mod ocr_tests;
pub mod routes;
pub mod s3_service;
pub mod seed;
pub mod source_scheduler;
pub mod source_sync;
pub mod swagger;
pub mod watcher;
pub mod webdav_service;

282
src/local_folder_service.rs Normal file
View File

@ -0,0 +1,282 @@
use std::path::Path;
use std::fs;
use anyhow::{anyhow, Result};
use chrono::{DateTime, Utc};
use tracing::{debug, info, warn};
use walkdir::WalkDir;
use sha2::{Sha256, Digest};
use crate::models::{FileInfo, LocalFolderSourceConfig};
#[derive(Debug, Clone)]
pub struct LocalFolderService {
config: LocalFolderSourceConfig,
}
impl LocalFolderService {
pub fn new(config: LocalFolderSourceConfig) -> Result<Self> {
// Validate that watch folders exist and are accessible
for folder in &config.watch_folders {
let path = Path::new(folder);
if !path.exists() {
return Err(anyhow!("Watch folder does not exist: {}", folder));
}
if !path.is_dir() {
return Err(anyhow!("Watch folder is not a directory: {}", folder));
}
}
Ok(Self { config })
}
/// Discover files in a specific folder
pub async fn discover_files_in_folder(&self, folder_path: &str) -> Result<Vec<FileInfo>> {
let path = Path::new(folder_path);
if !path.exists() {
return Err(anyhow!("Folder does not exist: {}", folder_path));
}
let mut files: Vec<FileInfo> = Vec::new();
info!("Scanning local folder: {} (recursive: {})", folder_path, self.config.recursive);
// Use tokio::task::spawn_blocking for file system operations
let folder_path_clone = folder_path.to_string();
let config = self.config.clone();
let discovered_files = tokio::task::spawn_blocking(move || -> Result<Vec<FileInfo>> {
let mut files: Vec<FileInfo> = Vec::new();
let walker = if config.recursive {
WalkDir::new(&folder_path_clone)
.follow_links(config.follow_symlinks)
.into_iter()
} else {
WalkDir::new(&folder_path_clone)
.max_depth(1)
.follow_links(config.follow_symlinks)
.into_iter()
};
for entry_result in walker {
match entry_result {
Ok(entry) => {
let path = entry.path();
// Skip directories and the root folder itself
if path.is_dir() {
continue;
}
// Check file extension
let extension = path.extension()
.and_then(|ext| ext.to_str())
.unwrap_or("")
.to_lowercase();
if !config.file_extensions.contains(&extension) {
debug!("Skipping file with unsupported extension: {}", path.display());
continue;
}
// Get file metadata
match fs::metadata(path) {
Ok(metadata) => {
let modified_time = metadata.modified()
.ok()
.and_then(|time| {
let duration = time.duration_since(std::time::UNIX_EPOCH).ok()?;
DateTime::from_timestamp(duration.as_secs() as i64, 0)
});
let file_name = path.file_name()
.and_then(|name| name.to_str())
.unwrap_or("unknown")
.to_string();
// Generate a simple hash-based ETag from file path and modification time
let etag = Self::generate_etag(path, &metadata);
// Determine MIME type based on extension
let mime_type = Self::get_mime_type(&extension);
let file_info = FileInfo {
path: path.to_string_lossy().to_string(),
name: file_name,
size: metadata.len() as i64,
mime_type,
last_modified: modified_time,
etag,
is_directory: false,
};
files.push(file_info);
}
Err(e) => {
warn!("Failed to get metadata for {}: {}", path.display(), e);
}
}
}
Err(e) => {
warn!("Error walking directory: {}", e);
}
}
}
Ok(files)
}).await??;
info!("Found {} files in local folder {}", discovered_files.len(), folder_path);
Ok(discovered_files)
}
/// Read file content for processing
pub async fn read_file(&self, file_path: &str) -> Result<Vec<u8>> {
let file_path = file_path.to_string();
tokio::task::spawn_blocking(move || -> Result<Vec<u8>> {
let content = fs::read(&file_path)
.map_err(|e| anyhow!("Failed to read file {}: {}", file_path, e))?;
Ok(content)
}).await?
}
/// Test if the service can access the configured folders
pub async fn test_connection(&self) -> Result<String> {
let mut accessible_folders = 0;
let mut total_files = 0;
for folder in &self.config.watch_folders {
match self.discover_files_in_folder(folder).await {
Ok(files) => {
accessible_folders += 1;
total_files += files.len();
info!("Local folder {} is accessible with {} files", folder, files.len());
}
Err(e) => {
return Err(anyhow!("Cannot access folder {}: {}", folder, e));
}
}
}
Ok(format!(
"Successfully accessed {} folders with {} total files",
accessible_folders, total_files
))
}
/// Generate ETag for file based on path and modification time
fn generate_etag(path: &Path, metadata: &fs::Metadata) -> String {
let mut hasher = Sha256::new();
hasher.update(path.to_string_lossy().as_bytes());
if let Ok(modified) = metadata.modified() {
if let Ok(duration) = modified.duration_since(std::time::UNIX_EPOCH) {
hasher.update(duration.as_secs().to_be_bytes());
}
}
hasher.update(metadata.len().to_be_bytes());
let result = hasher.finalize();
format!("{:x}", result)[..16].to_string() // Use first 16 chars as ETag
}
/// Get MIME type based on file extension
fn get_mime_type(extension: &str) -> String {
match extension {
"pdf" => "application/pdf",
"txt" => "text/plain",
"png" => "image/png",
"jpg" | "jpeg" => "image/jpeg",
"tiff" | "tif" => "image/tiff",
"bmp" => "image/bmp",
"gif" => "image/gif",
"webp" => "image/webp",
"doc" => "application/msword",
"docx" => "application/vnd.openxmlformats-officedocument.wordprocessingml.document",
"xls" => "application/vnd.ms-excel",
"xlsx" => "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
"ppt" => "application/vnd.ms-powerpoint",
"pptx" => "application/vnd.openxmlformats-officedocument.presentationml.presentation",
_ => "application/octet-stream",
}.to_string()
}
pub fn get_config(&self) -> &LocalFolderSourceConfig {
&self.config
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
use std::fs::File;
use std::io::Write;
#[tokio::test]
async fn test_local_folder_discovery() {
// Create a temporary directory with test files
let temp_dir = TempDir::new().unwrap();
let temp_path = temp_dir.path().to_str().unwrap();
// Create test files
let mut pdf_file = File::create(temp_dir.path().join("test.pdf")).unwrap();
pdf_file.write_all(b"fake pdf content").unwrap();
let mut txt_file = File::create(temp_dir.path().join("test.txt")).unwrap();
txt_file.write_all(b"test content").unwrap();
// Create unsupported file
let mut bin_file = File::create(temp_dir.path().join("test.bin")).unwrap();
bin_file.write_all(b"binary content").unwrap();
// Create config
let config = LocalFolderSourceConfig {
watch_folders: vec![temp_path.to_string()],
file_extensions: vec!["pdf".to_string(), "txt".to_string()],
auto_sync: true,
sync_interval_minutes: 60,
recursive: false,
follow_symlinks: false,
};
let service = LocalFolderService::new(config).unwrap();
let files = service.discover_files_in_folder(temp_path).await.unwrap();
// Should find 2 files (pdf and txt), but not bin
assert_eq!(files.len(), 2);
let pdf_file = files.iter().find(|f| f.name == "test.pdf").unwrap();
assert_eq!(pdf_file.mime_type, "application/pdf");
assert_eq!(pdf_file.size, 16);
let txt_file = files.iter().find(|f| f.name == "test.txt").unwrap();
assert_eq!(txt_file.mime_type, "text/plain");
assert_eq!(txt_file.size, 12);
}
#[tokio::test]
async fn test_file_reading() {
let temp_dir = TempDir::new().unwrap();
let file_path = temp_dir.path().join("test.txt");
let test_content = b"Hello, World!";
let mut file = File::create(&file_path).unwrap();
file.write_all(test_content).unwrap();
let config = LocalFolderSourceConfig {
watch_folders: vec![temp_dir.path().to_str().unwrap().to_string()],
file_extensions: vec!["txt".to_string()],
auto_sync: false,
sync_interval_minutes: 60,
recursive: false,
follow_symlinks: false,
};
let service = LocalFolderService::new(config).unwrap();
let content = service.read_file(file_path.to_str().unwrap()).await.unwrap();
assert_eq!(content, test_content);
}
}

View File

@ -209,10 +209,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}
});
// Create WebDAV scheduler with background state
// Create universal source scheduler with background state (handles WebDAV, Local, S3)
let source_scheduler = Arc::new(readur::source_scheduler::SourceScheduler::new(background_state.clone()));
// Keep WebDAV scheduler for backward compatibility with existing WebDAV endpoints
let webdav_scheduler = Arc::new(readur::webdav_scheduler::WebDAVScheduler::new(background_state.clone()));
// Update the web state to include the scheduler reference
// Update the web state to include scheduler references
let updated_web_state = AppState {
db: web_state.db.clone(),
config: web_state.config.clone(),
@ -220,13 +223,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
};
let web_state = Arc::new(updated_web_state);
// Start WebDAV background sync scheduler on background runtime
let scheduler_for_background = webdav_scheduler.clone();
// Start universal source scheduler on background runtime
let scheduler_for_background = source_scheduler.clone();
background_runtime.spawn(async move {
info!("Starting WebDAV background sync scheduler with 30-second startup delay");
info!("Starting universal source sync scheduler with 30-second startup delay");
// Wait 30 seconds before starting scheduler to allow server to fully initialize
tokio::time::sleep(tokio::time::Duration::from_secs(30)).await;
info!("WebDAV background sync scheduler starting after startup delay");
info!("Universal source sync scheduler starting after startup delay");
scheduler_for_background.start().await;
});

View File

@ -781,3 +781,27 @@ pub struct WebDAVSourceConfig {
pub sync_interval_minutes: i32,
pub server_type: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
pub struct LocalFolderSourceConfig {
pub watch_folders: Vec<String>,
pub file_extensions: Vec<String>,
pub auto_sync: bool,
pub sync_interval_minutes: i32,
pub recursive: bool,
pub follow_symlinks: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
pub struct S3SourceConfig {
pub bucket_name: String,
pub region: String,
pub access_key_id: String,
pub secret_access_key: String,
pub endpoint_url: Option<String>, // For S3-compatible services
pub prefix: Option<String>, // Optional path prefix
pub watch_folders: Vec<String>, // S3 prefixes to monitor
pub file_extensions: Vec<String>,
pub auto_sync: bool,
pub sync_interval_minutes: i32,
}

330
src/s3_service.rs Normal file
View File

@ -0,0 +1,330 @@
use anyhow::{anyhow, Result};
use chrono::{DateTime, Utc};
use tracing::{debug, error, info, warn};
#[cfg(feature = "s3")]
use aws_sdk_s3::Client;
#[cfg(feature = "s3")]
use aws_config::{BehaviorVersion, load_from_env};
#[cfg(feature = "s3")]
use aws_credential_types::Credentials;
#[cfg(feature = "s3")]
use aws_types::region::Region as AwsRegion;
use crate::models::{FileInfo, S3SourceConfig};
#[derive(Debug, Clone)]
pub struct S3Service {
#[cfg(feature = "s3")]
client: Client,
config: S3SourceConfig,
}
impl S3Service {
pub async fn new(config: S3SourceConfig) -> Result<Self> {
#[cfg(not(feature = "s3"))]
{
return Err(anyhow!("S3 support not compiled in. Enable the 's3' feature to use S3 sources."));
}
#[cfg(feature = "s3")]
{
// Validate required fields
if config.bucket_name.is_empty() {
return Err(anyhow!("Bucket name is required"));
}
if config.access_key_id.is_empty() {
return Err(anyhow!("Access key ID is required"));
}
if config.secret_access_key.is_empty() {
return Err(anyhow!("Secret access key is required"));
}
// Create S3 client with custom configuration
let credentials = Credentials::new(
&config.access_key_id,
&config.secret_access_key,
None, // session token
None, // expiry
"readur-s3-source"
);
let region = if config.region.is_empty() {
"us-east-1".to_string()
} else {
config.region.clone()
};
let mut s3_config_builder = aws_sdk_s3::config::Builder::new()
.region(AwsRegion::new(region))
.credentials_provider(credentials);
// Set custom endpoint if provided (for S3-compatible services)
if let Some(endpoint_url) = &config.endpoint_url {
if !endpoint_url.is_empty() {
s3_config_builder = s3_config_builder.endpoint_url(endpoint_url);
info!("Using custom S3 endpoint: {}", endpoint_url);
}
}
let s3_config = s3_config_builder.build();
let client = Client::from_conf(s3_config);
Ok(Self {
#[cfg(feature = "s3")]
client,
config
})
}
}
/// Discover files in a specific S3 prefix (folder)
pub async fn discover_files_in_folder(&self, folder_path: &str) -> Result<Vec<FileInfo>> {
#[cfg(not(feature = "s3"))]
{
return Err(anyhow!("S3 support not compiled in"));
}
#[cfg(feature = "s3")]
{
info!("Scanning S3 bucket: {} prefix: {}", self.config.bucket_name, folder_path);
let mut files = Vec::new();
let mut continuation_token: Option<String> = None;
loop {
let mut list_request = self.client
.list_objects_v2()
.bucket(&self.config.bucket_name)
.prefix(folder_path);
if let Some(token) = &continuation_token {
list_request = list_request.continuation_token(token);
}
match list_request.send().await {
Ok(response) => {
if let Some(contents) = response.contents {
for object in contents {
if let Some(key) = object.key {
// Skip "directories" (keys ending with /)
if key.ends_with('/') {
continue;
}
// Check file extension
let extension = std::path::Path::new(&key)
.extension()
.and_then(|ext| ext.to_str())
.unwrap_or("")
.to_lowercase();
if !self.config.file_extensions.contains(&extension) {
debug!("Skipping S3 object with unsupported extension: {}", key);
continue;
}
let file_name = std::path::Path::new(&key)
.file_name()
.and_then(|name| name.to_str())
.unwrap_or(&key)
.to_string();
let size = object.size.unwrap_or(0);
let last_modified = object.last_modified
.and_then(|dt| {
// Convert AWS DateTime to chrono DateTime
let timestamp = dt.secs();
DateTime::from_timestamp(timestamp, 0)
});
let etag = object.e_tag.unwrap_or_else(|| {
// Generate a fallback ETag if none provided
format!("fallback-{}", &key.chars().take(16).collect::<String>())
});
// Remove quotes from ETag if present
let etag = etag.trim_matches('"').to_string();
let mime_type = Self::get_mime_type(&extension);
let file_info = FileInfo {
path: key.clone(),
name: file_name,
size,
mime_type,
last_modified,
etag,
is_directory: false,
};
files.push(file_info);
}
}
}
// Check if there are more results
if response.is_truncated == Some(true) {
continuation_token = response.next_continuation_token;
} else {
break;
}
}
Err(e) => {
return Err(anyhow!("Failed to list S3 objects: {}", e));
}
}
}
info!("Found {} files in S3 bucket {} prefix {}", files.len(), self.config.bucket_name, folder_path);
Ok(files)
}
}
/// Download file content from S3
pub async fn download_file(&self, object_key: &str) -> Result<Vec<u8>> {
#[cfg(not(feature = "s3"))]
{
return Err(anyhow!("S3 support not compiled in"));
}
#[cfg(feature = "s3")]
{
info!("Downloading S3 object: {}/{}", self.config.bucket_name, object_key);
let response = self.client
.get_object()
.bucket(&self.config.bucket_name)
.key(object_key)
.send()
.await
.map_err(|e| anyhow!("Failed to download S3 object {}: {}", object_key, e))?;
let body = response.body.collect().await
.map_err(|e| anyhow!("Failed to read S3 object body: {}", e))?;
let bytes = body.into_bytes().to_vec();
info!("Downloaded S3 object {} ({} bytes)", object_key, bytes.len());
Ok(bytes)
}
}
/// Test S3 connection and access to bucket
pub async fn test_connection(&self) -> Result<String> {
#[cfg(not(feature = "s3"))]
{
return Err(anyhow!("S3 support not compiled in"));
}
#[cfg(feature = "s3")]
{
info!("Testing S3 connection to bucket: {}", self.config.bucket_name);
// Test bucket access by listing objects with a limit
let response = self.client
.list_objects_v2()
.bucket(&self.config.bucket_name)
.max_keys(1)
.send()
.await
.map_err(|e| anyhow!("Failed to access S3 bucket {}: {}", self.config.bucket_name, e))?;
// Test if we can get bucket region (additional validation)
let _head_bucket_response = self.client
.head_bucket()
.bucket(&self.config.bucket_name)
.send()
.await
.map_err(|e| anyhow!("Cannot access bucket {}: {}", self.config.bucket_name, e))?;
let object_count = response.key_count.unwrap_or(0);
Ok(format!(
"Successfully connected to S3 bucket '{}' (found {} objects)",
self.config.bucket_name, object_count
))
}
}
/// Get estimated file count and size for all watch folders
pub async fn estimate_sync(&self) -> Result<(usize, i64)> {
let mut total_files = 0;
let mut total_size = 0i64;
for folder in &self.config.watch_folders {
match self.discover_files_in_folder(folder).await {
Ok(files) => {
total_files += files.len();
total_size += files.iter().map(|f| f.size).sum::<i64>();
}
Err(e) => {
warn!("Failed to estimate folder {}: {}", folder, e);
}
}
}
Ok((total_files, total_size))
}
/// Get MIME type based on file extension
fn get_mime_type(extension: &str) -> String {
match extension {
"pdf" => "application/pdf",
"txt" => "text/plain",
"png" => "image/png",
"jpg" | "jpeg" => "image/jpeg",
"tiff" | "tif" => "image/tiff",
"bmp" => "image/bmp",
"gif" => "image/gif",
"webp" => "image/webp",
"doc" => "application/msword",
"docx" => "application/vnd.openxmlformats-officedocument.wordprocessingml.document",
"xls" => "application/vnd.ms-excel",
"xlsx" => "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
"ppt" => "application/vnd.ms-powerpoint",
"pptx" => "application/vnd.openxmlformats-officedocument.presentationml.presentation",
_ => "application/octet-stream",
}.to_string()
}
pub fn get_config(&self) -> &S3SourceConfig {
&self.config
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_s3_config_creation() {
let config = S3SourceConfig {
bucket_name: "test-bucket".to_string(),
region: "us-east-1".to_string(),
access_key_id: "test-key".to_string(),
secret_access_key: "test-secret".to_string(),
endpoint_url: None,
prefix: None,
watch_folders: vec!["documents/".to_string()],
file_extensions: vec!["pdf".to_string(), "txt".to_string()],
auto_sync: true,
sync_interval_minutes: 60,
};
// This will create the client but won't test actual S3 access
let service = S3Service::new(config).await;
#[cfg(feature = "s3")]
assert!(service.is_ok());
#[cfg(not(feature = "s3"))]
assert!(service.is_err());
}
#[test]
fn test_mime_type_detection() {
assert_eq!(S3Service::get_mime_type("pdf"), "application/pdf");
assert_eq!(S3Service::get_mime_type("jpg"), "image/jpeg");
assert_eq!(S3Service::get_mime_type("txt"), "text/plain");
assert_eq!(S3Service::get_mime_type("unknown"), "application/octet-stream");
}
}

37
src/s3_service_stub.rs Normal file
View File

@ -0,0 +1,37 @@
// Stub implementation when S3 feature is not enabled
use anyhow::{anyhow, Result};
use tracing::warn;
use crate::models::{FileInfo, S3SourceConfig};
#[derive(Debug, Clone)]
pub struct S3Service {
config: S3SourceConfig,
}
impl S3Service {
pub async fn new(_config: S3SourceConfig) -> Result<Self> {
Err(anyhow!("S3 support not compiled in. Enable the 's3' feature to use S3 sources."))
}
pub async fn discover_files_in_folder(&self, _folder_path: &str) -> Result<Vec<FileInfo>> {
warn!("S3 support not compiled in");
Ok(Vec::new())
}
pub async fn download_file(&self, _object_key: &str) -> Result<Vec<u8>> {
Err(anyhow!("S3 support not compiled in"))
}
pub async fn test_connection(&self) -> Result<String> {
Err(anyhow!("S3 support not compiled in"))
}
pub async fn estimate_sync(&self) -> Result<(usize, i64)> {
Ok((0, 0))
}
pub fn get_config(&self) -> &S3SourceConfig {
&self.config
}
}

319
src/source_scheduler.rs Normal file
View File

@ -0,0 +1,319 @@
use std::sync::Arc;
use std::time::Duration;
use tokio::time::interval;
use tracing::{error, info, warn};
use chrono::Utc;
use crate::{
AppState,
models::{SourceType, LocalFolderSourceConfig, S3SourceConfig, WebDAVSourceConfig},
source_sync::SourceSyncService,
};
pub struct SourceScheduler {
state: Arc<AppState>,
sync_service: SourceSyncService,
check_interval: Duration,
}
impl SourceScheduler {
pub fn new(state: Arc<AppState>) -> Self {
let sync_service = SourceSyncService::new(state.clone());
Self {
state,
sync_service,
check_interval: Duration::from_secs(60), // Check every minute for due syncs
}
}
pub async fn start(&self) {
info!("Starting universal source sync scheduler");
// First, check for any interrupted syncs that need to be resumed
if let Err(e) = self.resume_interrupted_syncs().await {
error!("Error resuming interrupted syncs: {}", e);
}
let mut interval_timer = interval(self.check_interval);
loop {
interval_timer.tick().await;
if let Err(e) = self.check_and_sync_sources().await {
error!("Error in source sync scheduler: {}", e);
}
}
}
async fn resume_interrupted_syncs(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
info!("Checking for interrupted source syncs to resume");
// Get all enabled sources that might have been interrupted
let sources = self.state.db.get_sources_for_sync().await?;
for source in sources {
// Check if this source was likely interrupted during sync
// This is a simplified check - you might want to add specific interrupted tracking
if source.status.to_string() == "syncing" {
info!("Found potentially interrupted sync for source {}, will resume", source.name);
// Reset status and trigger new sync
if let Err(e) = sqlx::query(
r#"UPDATE sources SET status = 'idle', updated_at = NOW() WHERE id = $1"#
)
.bind(source.id)
.execute(self.state.db.get_pool())
.await {
error!("Failed to reset interrupted source status: {}", e);
continue;
}
// Check if auto-sync is enabled for this source
let should_resume = match source.source_type {
SourceType::WebDAV => {
if let Ok(config) = serde_json::from_value::<WebDAVSourceConfig>(source.config.clone()) {
config.auto_sync
} else { false }
}
SourceType::LocalFolder => {
if let Ok(config) = serde_json::from_value::<LocalFolderSourceConfig>(source.config.clone()) {
config.auto_sync
} else { false }
}
SourceType::S3 => {
if let Ok(config) = serde_json::from_value::<S3SourceConfig>(source.config.clone()) {
config.auto_sync
} else { false }
}
};
if should_resume {
info!("Resuming interrupted sync for source {}", source.name);
let sync_service = self.sync_service.clone();
let source_clone = source.clone();
let state_clone = self.state.clone();
tokio::spawn(async move {
// Get user's OCR setting - simplified, you might want to store this in source config
let enable_background_ocr = true; // Default to true, could be made configurable per source
match sync_service.sync_source(&source_clone, enable_background_ocr).await {
Ok(files_processed) => {
info!("Resumed sync completed for source {}: {} files processed",
source_clone.name, files_processed);
// Create notification for successful resume
let notification = crate::models::CreateNotification {
notification_type: "success".to_string(),
title: "Source Sync Resumed".to_string(),
message: format!("Resumed sync for {} after server restart. Processed {} files",
source_clone.name, files_processed),
action_url: Some("/sources".to_string()),
metadata: Some(serde_json::json!({
"source_type": source_clone.source_type.to_string(),
"source_id": source_clone.id,
"files_processed": files_processed
})),
};
if let Err(e) = state_clone.db.create_notification(source_clone.user_id, &notification).await {
error!("Failed to create resume notification: {}", e);
}
}
Err(e) => {
error!("Resumed sync failed for source {}: {}", source_clone.name, e);
}
}
});
}
}
}
Ok(())
}
async fn check_and_sync_sources(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// Get all sources that might need syncing
let sources = self.state.db.get_sources_for_sync().await?;
for source in sources {
// Check if sync is due for this source
if self.is_sync_due(&source).await? {
info!("Starting background sync for source: {} ({})", source.name, source.source_type);
let sync_service = self.sync_service.clone();
let source_clone = source.clone();
let state_clone = self.state.clone();
// Start sync in background task
tokio::spawn(async move {
// Get user's OCR setting - simplified, you might want to store this in source config
let enable_background_ocr = true; // Default to true, could be made configurable per source
match sync_service.sync_source(&source_clone, enable_background_ocr).await {
Ok(files_processed) => {
info!("Background sync completed for source {}: {} files processed",
source_clone.name, files_processed);
// Update last sync time
if let Err(e) = sqlx::query(
r#"UPDATE sources
SET last_sync_at = NOW(),
total_files_synced = total_files_synced + $2,
updated_at = NOW()
WHERE id = $1"#
)
.bind(source_clone.id)
.bind(files_processed as i64)
.execute(state_clone.db.get_pool())
.await {
error!("Failed to update source sync time: {}", e);
}
// Send notification if files were processed
if files_processed > 0 {
let notification = crate::models::CreateNotification {
notification_type: "success".to_string(),
title: "Source Sync Completed".to_string(),
message: format!("Successfully processed {} files from {}",
files_processed, source_clone.name),
action_url: Some("/documents".to_string()),
metadata: Some(serde_json::json!({
"source_type": source_clone.source_type.to_string(),
"source_id": source_clone.id,
"files_processed": files_processed
})),
};
if let Err(e) = state_clone.db.create_notification(source_clone.user_id, &notification).await {
error!("Failed to create success notification: {}", e);
}
}
}
Err(e) => {
error!("Background sync failed for source {}: {}", source_clone.name, e);
// Send error notification
let notification = crate::models::CreateNotification {
notification_type: "error".to_string(),
title: "Source Sync Failed".to_string(),
message: format!("Sync failed for {}: {}", source_clone.name, e),
action_url: Some("/sources".to_string()),
metadata: Some(serde_json::json!({
"source_type": source_clone.source_type.to_string(),
"source_id": source_clone.id,
"error": e.to_string()
})),
};
if let Err(e) = state_clone.db.create_notification(source_clone.user_id, &notification).await {
error!("Failed to create error notification: {}", e);
}
}
}
});
}
}
Ok(())
}
async fn is_sync_due(&self, source: &crate::models::Source) -> Result<bool, Box<dyn std::error::Error + Send + Sync>> {
// Get sync interval from source config
let sync_interval_minutes = match source.source_type {
SourceType::WebDAV => {
let config: WebDAVSourceConfig = serde_json::from_value(source.config.clone())?;
if !config.auto_sync { return Ok(false); }
config.sync_interval_minutes
}
SourceType::LocalFolder => {
let config: LocalFolderSourceConfig = serde_json::from_value(source.config.clone())?;
if !config.auto_sync { return Ok(false); }
config.sync_interval_minutes
}
SourceType::S3 => {
let config: S3SourceConfig = serde_json::from_value(source.config.clone())?;
if !config.auto_sync { return Ok(false); }
config.sync_interval_minutes
}
};
if sync_interval_minutes <= 0 {
warn!("Invalid sync interval for source {}: {} minutes", source.name, sync_interval_minutes);
return Ok(false);
}
// Check if a sync is already running
if source.status.to_string() == "syncing" {
info!("Sync already running for source {}", source.name);
return Ok(false);
}
// Check last sync time
if let Some(last_sync) = source.last_sync_at {
let elapsed = Utc::now() - last_sync;
let elapsed_minutes = elapsed.num_minutes();
if elapsed_minutes < sync_interval_minutes as i64 {
// Only log this occasionally to avoid spam
if elapsed_minutes % 10 == 0 {
info!("Sync not due for source {} (last sync {} minutes ago, interval {} minutes)",
source.name, elapsed_minutes, sync_interval_minutes);
}
return Ok(false);
}
info!("Sync is due for source {} (last sync {} minutes ago, interval {} minutes)",
source.name, elapsed_minutes, sync_interval_minutes);
} else {
info!("No previous sync found for source {}, sync is due", source.name);
}
// Sync is due
Ok(true)
}
pub async fn trigger_sync(&self, source_id: uuid::Uuid) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
info!("Triggering manual sync for source {}", source_id);
if let Some(source) = self.state.db.get_source_by_id(source_id).await? {
let sync_service = self.sync_service.clone();
let state_clone = self.state.clone();
tokio::spawn(async move {
let enable_background_ocr = true; // Could be made configurable
match sync_service.sync_source(&source, enable_background_ocr).await {
Ok(files_processed) => {
info!("Manual sync completed for source {}: {} files processed",
source.name, files_processed);
// Update sync stats
if let Err(e) = sqlx::query(
r#"UPDATE sources
SET last_sync_at = NOW(),
total_files_synced = total_files_synced + $2,
updated_at = NOW()
WHERE id = $1"#
)
.bind(source.id)
.bind(files_processed as i64)
.execute(state_clone.db.get_pool())
.await {
error!("Failed to update source sync stats: {}", e);
}
}
Err(e) => {
error!("Manual sync failed for source {}: {}", source.name, e);
}
}
});
Ok(())
} else {
Err("Source not found".into())
}
}
}

383
src/source_sync.rs Normal file
View File

@ -0,0 +1,383 @@
use std::sync::Arc;
use std::path::Path;
use anyhow::{anyhow, Result};
use chrono::Utc;
use tokio::sync::Semaphore;
use futures::stream::{FuturesUnordered, StreamExt};
use sha2::{Sha256, Digest};
use tracing::{error, info, warn};
use uuid::Uuid;
use crate::{
AppState,
models::{FileInfo, Source, SourceType, SourceStatus, LocalFolderSourceConfig, S3SourceConfig, WebDAVSourceConfig},
file_service::FileService,
local_folder_service::LocalFolderService,
s3_service::S3Service,
webdav_service::{WebDAVService, WebDAVConfig},
};
#[derive(Clone)]
pub struct SourceSyncService {
state: Arc<AppState>,
}
impl SourceSyncService {
pub fn new(state: Arc<AppState>) -> Self {
Self { state }
}
/// Perform sync for any source type
pub async fn sync_source(&self, source: &Source, enable_background_ocr: bool) -> Result<usize> {
info!("Starting sync for source {} ({})", source.name, source.source_type);
// Update source status to syncing
if let Err(e) = self.update_source_status(source.id, SourceStatus::Syncing, None).await {
error!("Failed to update source status: {}", e);
}
let sync_result = match source.source_type {
SourceType::WebDAV => self.sync_webdav_source(source, enable_background_ocr).await,
SourceType::LocalFolder => self.sync_local_folder_source(source, enable_background_ocr).await,
SourceType::S3 => self.sync_s3_source(source, enable_background_ocr).await,
};
match &sync_result {
Ok(files_processed) => {
info!("Sync completed for source {}: {} files processed", source.name, files_processed);
if let Err(e) = self.update_source_status(source.id, SourceStatus::Idle, None).await {
error!("Failed to update source status after successful sync: {}", e);
}
}
Err(e) => {
error!("Sync failed for source {}: {}", source.name, e);
let error_msg = format!("Sync failed: {}", e);
if let Err(e) = self.update_source_status(source.id, SourceStatus::Error, Some(&error_msg)).await {
error!("Failed to update source status after error: {}", e);
}
}
}
sync_result
}
async fn sync_webdav_source(&self, source: &Source, enable_background_ocr: bool) -> Result<usize> {
let config: WebDAVSourceConfig = serde_json::from_value(source.config.clone())
.map_err(|e| anyhow!("Invalid WebDAV config: {}", e))?;
let webdav_config = WebDAVConfig {
server_url: config.server_url,
username: config.username,
password: config.password,
watch_folders: config.watch_folders,
file_extensions: config.file_extensions,
timeout_seconds: 30,
server_type: config.server_type,
};
let webdav_service = WebDAVService::new(webdav_config.clone())
.map_err(|e| anyhow!("Failed to create WebDAV service: {}", e))?;
self.perform_sync_internal(
source.user_id,
source.id,
&webdav_config.watch_folders,
&webdav_config.file_extensions,
enable_background_ocr,
|folder_path| {
let service = webdav_service.clone();
async move { service.discover_files_in_folder(&folder_path).await }
},
|file_path| {
let service = webdav_service.clone();
async move { service.download_file(&file_path).await }
}
).await
}
async fn sync_local_folder_source(&self, source: &Source, enable_background_ocr: bool) -> Result<usize> {
let config: LocalFolderSourceConfig = serde_json::from_value(source.config.clone())
.map_err(|e| anyhow!("Invalid LocalFolder config: {}", e))?;
let local_service = LocalFolderService::new(config.clone())
.map_err(|e| anyhow!("Failed to create LocalFolder service: {}", e))?;
self.perform_sync_internal(
source.user_id,
source.id,
&config.watch_folders,
&config.file_extensions,
enable_background_ocr,
|folder_path| {
let service = local_service.clone();
async move { service.discover_files_in_folder(&folder_path).await }
},
|file_path| {
let service = local_service.clone();
async move { service.read_file(&file_path).await }
}
).await
}
async fn sync_s3_source(&self, source: &Source, enable_background_ocr: bool) -> Result<usize> {
let config: S3SourceConfig = serde_json::from_value(source.config.clone())
.map_err(|e| anyhow!("Invalid S3 config: {}", e))?;
let s3_service = S3Service::new(config.clone()).await
.map_err(|e| anyhow!("Failed to create S3 service: {}", e))?;
self.perform_sync_internal(
source.user_id,
source.id,
&config.watch_folders,
&config.file_extensions,
enable_background_ocr,
|folder_path| {
let service = s3_service.clone();
async move { service.discover_files_in_folder(&folder_path).await }
},
|file_path| {
let service = s3_service.clone();
async move { service.download_file(&file_path).await }
}
).await
}
async fn perform_sync_internal<F, D, Fut1, Fut2>(
&self,
user_id: Uuid,
_source_id: Uuid,
watch_folders: &[String],
file_extensions: &[String],
enable_background_ocr: bool,
discover_files: F,
download_file: D,
) -> Result<usize>
where
F: Fn(String) -> Fut1,
D: Fn(String) -> Fut2 + Clone,
Fut1: std::future::Future<Output = Result<Vec<FileInfo>>>,
Fut2: std::future::Future<Output = Result<Vec<u8>>>,
{
let mut total_files_processed = 0;
for folder_path in watch_folders {
info!("Syncing folder: {}", folder_path);
// Discover files in the folder
match discover_files(folder_path.clone()).await {
Ok(files) => {
info!("Found {} files in folder {}", files.len(), folder_path);
// Filter files for processing
let files_to_process: Vec<_> = files.into_iter()
.filter(|file_info| {
if file_info.is_directory {
return false;
}
let file_extension = Path::new(&file_info.name)
.extension()
.and_then(|ext| ext.to_str())
.unwrap_or("")
.to_lowercase();
file_extensions.contains(&file_extension)
})
.collect();
info!("Processing {} files from folder {}", files_to_process.len(), folder_path);
// Process files concurrently with a limit
let concurrent_limit = 5;
let semaphore = Arc::new(Semaphore::new(concurrent_limit));
let mut folder_files_processed = 0;
let mut file_futures = FuturesUnordered::new();
for file_info in files_to_process.iter() {
let state_clone = self.state.clone();
let file_info_clone = file_info.clone();
let semaphore_clone = semaphore.clone();
let download_file_clone = download_file.clone();
let future = async move {
Self::process_single_file(
state_clone,
user_id,
_source_id,
&file_info_clone,
enable_background_ocr,
semaphore_clone,
download_file_clone,
).await
};
file_futures.push(future);
}
// Process files concurrently
while let Some(result) = file_futures.next().await {
match result {
Ok(processed) => {
if processed {
folder_files_processed += 1;
info!("Successfully processed file ({} completed in this folder)", folder_files_processed);
}
}
Err(error) => {
error!("File processing error: {}", error);
}
}
}
total_files_processed += folder_files_processed;
}
Err(e) => {
error!("Failed to discover files in folder {}: {}", folder_path, e);
}
}
}
info!("Source sync completed: {} files processed", total_files_processed);
Ok(total_files_processed)
}
async fn process_single_file<D, Fut>(
state: Arc<AppState>,
user_id: Uuid,
_source_id: Uuid,
file_info: &FileInfo,
enable_background_ocr: bool,
semaphore: Arc<Semaphore>,
download_file: D,
) -> Result<bool>
where
D: Fn(String) -> Fut,
Fut: std::future::Future<Output = Result<Vec<u8>>>,
{
let _permit = semaphore.acquire().await
.map_err(|e| anyhow!("Semaphore error: {}", e))?;
info!("Processing file: {}", file_info.path);
// Check if we've already processed this file by looking for documents with same source
// This is a simplified version - you might want to implement source-specific tracking tables
// Download the file
let file_data = download_file(file_info.path.clone()).await
.map_err(|e| anyhow!("Failed to download {}: {}", file_info.path, e))?;
info!("Downloaded file: {} ({} bytes)", file_info.name, file_data.len());
// Calculate file hash for deduplication
let file_hash = Self::calculate_file_hash(&file_data);
// Check for duplicate content
if let Ok(existing_docs) = state.db.get_documents_by_user_with_role(
user_id,
crate::models::UserRole::User,
1000,
0
).await {
let matching_docs: Vec<_> = existing_docs.into_iter()
.filter(|doc| doc.file_size == file_data.len() as i64)
.collect();
for existing_doc in matching_docs {
if let Ok(existing_file_data) = tokio::fs::read(&existing_doc.file_path).await {
let existing_hash = Self::calculate_file_hash(&existing_file_data);
if file_hash == existing_hash {
info!("File content already exists, skipping: {}", file_info.path);
return Ok(false);
}
}
}
}
// Save file to disk
let file_service = FileService::new(state.config.upload_path.clone());
let saved_file_path = file_service.save_file(&file_info.name, &file_data).await
.map_err(|e| anyhow!("Failed to save {}: {}", file_info.name, e))?;
// Create document record
let document = file_service.create_document(
&file_info.name,
&file_info.name,
&saved_file_path,
file_data.len() as i64,
&file_info.mime_type,
user_id,
);
let created_document = state.db.create_document(document).await
.map_err(|e| anyhow!("Failed to create document {}: {}", file_info.name, e))?;
info!("Created document record for {}: {}", file_info.name, created_document.id);
// Queue for OCR if enabled
if enable_background_ocr {
info!("Background OCR enabled, queueing document {} for processing", created_document.id);
match state.db.pool.acquire().await {
Ok(_conn) => {
let queue_service = crate::ocr_queue::OcrQueueService::new(
state.db.clone(),
state.db.pool.clone(),
4
);
let priority = if file_info.size <= 1024 * 1024 { 10 }
else if file_info.size <= 5 * 1024 * 1024 { 8 }
else if file_info.size <= 10 * 1024 * 1024 { 6 }
else if file_info.size <= 50 * 1024 * 1024 { 4 }
else { 2 };
if let Err(e) = queue_service.enqueue_document(created_document.id, priority, file_info.size).await {
error!("Failed to enqueue document for OCR: {}", e);
} else {
info!("Enqueued document {} for OCR processing", created_document.id);
}
}
Err(e) => {
error!("Failed to connect to database for OCR queueing: {}", e);
}
}
}
Ok(true)
}
async fn update_source_status(&self, source_id: Uuid, status: SourceStatus, error_message: Option<&str>) -> Result<()> {
let query = if let Some(error) = error_message {
sqlx::query(
r#"UPDATE sources
SET status = $2, last_error = $3, last_error_at = NOW(), updated_at = NOW()
WHERE id = $1"#
)
.bind(source_id)
.bind(status.to_string())
.bind(error)
} else {
sqlx::query(
r#"UPDATE sources
SET status = $2, last_error = NULL, last_error_at = NULL, updated_at = NOW()
WHERE id = $1"#
)
.bind(source_id)
.bind(status.to_string())
};
query.execute(self.state.db.get_pool()).await
.map_err(|e| anyhow!("Failed to update source status: {}", e))?;
Ok(())
}
fn calculate_file_hash(data: &[u8]) -> String {
let mut hasher = Sha256::new();
hasher.update(data);
let result = hasher.finalize();
format!("{:x}", result)
}
}