feat(server): rewrite nearly everything to be async/follow best practices

This commit is contained in:
perf3ct 2025-06-15 02:06:17 +00:00
parent f2136cbd7b
commit e8dd7a788e
No known key found for this signature in database
GPG Key ID: 569C4EEC436F5232
19 changed files with 903 additions and 229 deletions

1
Cargo.lock generated
View File

@ -2454,6 +2454,7 @@ dependencies = [
"chrono",
"clap",
"dotenvy",
"futures",
"futures-util",
"hostname",
"image",

View File

@ -30,6 +30,7 @@ tracing = "0.1"
tracing-subscriber = "0.3"
tokio-util = { version = "0.7", features = ["io"] }
futures-util = "0.3"
futures = "0.3"
notify = "6"
mime_guess = "2"
tesseract = { version = "0.15", optional = true }

View File

@ -0,0 +1,118 @@
import React, { useState, useEffect } from 'react';
import { Box } from '@mui/material';
import {
PictureAsPdf as PdfIcon,
Image as ImageIcon,
Description as DocIcon,
TextSnippet as TextIcon,
} from '@mui/icons-material';
import { documentService } from '../services/api';
interface DocumentThumbnailProps {
documentId: string;
mimeType: string;
size?: 'small' | 'medium' | 'large';
fallbackIcon?: boolean;
}
const DocumentThumbnail: React.FC<DocumentThumbnailProps> = ({
documentId,
mimeType,
size = 'medium',
fallbackIcon = true,
}) => {
const [thumbnailUrl, setThumbnailUrl] = useState<string | null>(null);
const [loading, setLoading] = useState<boolean>(true);
const [error, setError] = useState<boolean>(false);
useEffect(() => {
loadThumbnail();
// Cleanup URL when component unmounts
return () => {
if (thumbnailUrl) {
window.URL.revokeObjectURL(thumbnailUrl);
}
};
}, [documentId]);
const loadThumbnail = async (): Promise<void> => {
try {
setLoading(true);
setError(false);
const response = await documentService.getThumbnail(documentId);
const url = window.URL.createObjectURL(new Blob([response.data]));
setThumbnailUrl(url);
} catch (err) {
setError(true);
} finally {
setLoading(false);
}
};
const getFileIcon = (mimeType: string): React.ReactElement => {
const iconProps = {
sx: {
fontSize: size === 'small' ? 24 : size === 'medium' ? 48 : 64,
color: 'action.active',
}
};
if (mimeType.includes('pdf')) return <PdfIcon {...iconProps} color="error" />;
if (mimeType.includes('image')) return <ImageIcon {...iconProps} color="primary" />;
if (mimeType.includes('text')) return <TextIcon {...iconProps} color="info" />;
return <DocIcon {...iconProps} color="secondary" />;
};
const dimensions = {
small: { width: 40, height: 40 },
medium: { width: 80, height: 80 },
large: { width: 120, height: 120 },
};
if (thumbnailUrl && !error) {
return (
<Box
sx={{
width: dimensions[size].width,
height: dimensions[size].height,
display: 'flex',
alignItems: 'center',
justifyContent: 'center',
}}
>
<img
src={thumbnailUrl}
alt="Document thumbnail"
style={{
maxWidth: '100%',
maxHeight: '100%',
objectFit: 'cover',
borderRadius: '4px',
}}
/>
</Box>
);
}
if (fallbackIcon) {
return (
<Box
sx={{
width: dimensions[size].width,
height: dimensions[size].height,
display: 'flex',
alignItems: 'center',
justifyContent: 'center',
}}
>
{getFileIcon(mimeType)}
</Box>
);
}
return null;
};
export default DocumentThumbnail;

View File

@ -0,0 +1,216 @@
import React, { useState, useEffect } from 'react';
import {
Box,
Typography,
CircularProgress,
Alert,
Paper,
} from '@mui/material';
import { documentService } from '../services/api';
interface DocumentViewerProps {
documentId: string;
filename: string;
mimeType: string;
}
const DocumentViewer: React.FC<DocumentViewerProps> = ({
documentId,
filename,
mimeType,
}) => {
const [loading, setLoading] = useState<boolean>(true);
const [error, setError] = useState<string | null>(null);
const [documentUrl, setDocumentUrl] = useState<string | null>(null);
useEffect(() => {
loadDocument();
// Cleanup URL when component unmounts
return () => {
if (documentUrl) {
window.URL.revokeObjectURL(documentUrl);
}
};
}, [documentId]);
const loadDocument = async (): Promise<void> => {
try {
setLoading(true);
setError(null);
const response = await documentService.view(documentId);
const url = window.URL.createObjectURL(new Blob([response.data], { type: mimeType }));
setDocumentUrl(url);
} catch (err) {
console.error('Failed to load document:', err);
setError('Failed to load document for viewing');
} finally {
setLoading(false);
}
};
const renderDocumentContent = (): React.ReactElement => {
if (!documentUrl) return <></>;
// Handle images
if (mimeType.startsWith('image/')) {
return (
<Box
sx={{
display: 'flex',
justifyContent: 'center',
alignItems: 'center',
minHeight: '60vh',
p: 2,
}}
>
<img
src={documentUrl}
alt={filename}
style={{
maxWidth: '100%',
maxHeight: '100%',
objectFit: 'contain',
borderRadius: '8px',
boxShadow: '0 4px 12px rgba(0,0,0,0.1)',
}}
/>
</Box>
);
}
// Handle PDFs
if (mimeType === 'application/pdf') {
return (
<Box sx={{ height: '70vh', width: '100%' }}>
<iframe
src={documentUrl}
width="100%"
height="100%"
style={{ border: 'none', borderRadius: '8px' }}
title={filename}
/>
</Box>
);
}
// Handle text files
if (mimeType.startsWith('text/')) {
return (
<TextFileViewer documentUrl={documentUrl} filename={filename} />
);
}
// For other file types, show a message
return (
<Box sx={{ textAlign: 'center', py: 8 }}>
<Typography variant="h6" color="text.secondary" sx={{ mb: 2 }}>
Preview not available
</Typography>
<Typography variant="body2" color="text.secondary">
This file type ({mimeType}) cannot be previewed in the browser.
</Typography>
<Typography variant="body2" color="text.secondary" sx={{ mt: 1 }}>
Please download the file to view its contents.
</Typography>
</Box>
);
};
if (loading) {
return (
<Box
sx={{
display: 'flex',
flexDirection: 'column',
justifyContent: 'center',
alignItems: 'center',
minHeight: '60vh',
}}
>
<CircularProgress sx={{ mb: 2 }} />
<Typography variant="body2" color="text.secondary">
Loading document...
</Typography>
</Box>
);
}
if (error) {
return (
<Box sx={{ p: 3 }}>
<Alert severity="error">{error}</Alert>
</Box>
);
}
return (
<Box sx={{ height: '100%', overflow: 'auto' }}>
{renderDocumentContent()}
</Box>
);
};
// Component for viewing text files
const TextFileViewer: React.FC<{ documentUrl: string; filename: string }> = ({
documentUrl,
filename,
}) => {
const [textContent, setTextContent] = useState<string>('');
const [loading, setLoading] = useState<boolean>(true);
useEffect(() => {
loadTextContent();
}, [documentUrl]);
const loadTextContent = async (): Promise<void> => {
try {
const response = await fetch(documentUrl);
const text = await response.text();
setTextContent(text);
} catch (err) {
console.error('Failed to load text content:', err);
setTextContent('Failed to load text content');
} finally {
setLoading(false);
}
};
if (loading) {
return (
<Box sx={{ display: 'flex', justifyContent: 'center', p: 3 }}>
<CircularProgress size={24} />
</Box>
);
}
return (
<Paper
sx={{
p: 3,
m: 2,
backgroundColor: (theme) => theme.palette.mode === 'light' ? 'grey.50' : 'grey.900',
border: '1px solid',
borderColor: 'divider',
borderRadius: 2,
maxHeight: '70vh',
overflow: 'auto',
}}
>
<Typography
variant="body2"
sx={{
fontFamily: 'monospace',
whiteSpace: 'pre-wrap',
lineHeight: 1.6,
color: 'text.primary',
}}
>
{textContent}
</Typography>
</Paper>
);
};
export default DocumentViewer;

View File

@ -35,6 +35,7 @@ import {
Edit as EditIcon,
} from '@mui/icons-material';
import { documentService, OcrResponse } from '../services/api';
import DocumentViewer from '../components/DocumentViewer';
interface Document {
id: string;
@ -57,6 +58,8 @@ const DocumentDetailsPage: React.FC = () => {
const [ocrData, setOcrData] = useState<OcrResponse | null>(null);
const [showOcrDialog, setShowOcrDialog] = useState<boolean>(false);
const [ocrLoading, setOcrLoading] = useState<boolean>(false);
const [showViewDialog, setShowViewDialog] = useState<boolean>(false);
const [thumbnailUrl, setThumbnailUrl] = useState<string | null>(null);
useEffect(() => {
if (id) {
@ -70,6 +73,12 @@ const DocumentDetailsPage: React.FC = () => {
}
}, [document]);
useEffect(() => {
if (document) {
loadThumbnail();
}
}, [document]);
const fetchDocumentDetails = async (): Promise<void> => {
try {
setLoading(true);
@ -134,6 +143,23 @@ const DocumentDetailsPage: React.FC = () => {
}
};
const loadThumbnail = async (): Promise<void> => {
if (!document) return;
try {
const response = await documentService.getThumbnail(document.id);
const url = window.URL.createObjectURL(new Blob([response.data]));
setThumbnailUrl(url);
} catch (err) {
console.log('Thumbnail not available:', err);
// Thumbnail not available, use fallback icon
}
};
const handleViewDocument = (): void => {
setShowViewDialog(true);
};
const getFileIcon = (mimeType?: string): React.ReactElement => {
if (mimeType?.includes('pdf')) return <PdfIcon color="error" sx={{ fontSize: 64 }} />;
if (mimeType?.includes('image')) return <ImageIcon color="primary" sx={{ fontSize: 64 }} />;
@ -227,18 +253,40 @@ const DocumentDetailsPage: React.FC = () => {
p: 3,
background: 'linear-gradient(135deg, rgba(99, 102, 241, 0.1) 0%, rgba(139, 92, 246, 0.1) 100%)',
borderRadius: 2,
minHeight: 200,
}}
>
{getFileIcon(document.mime_type)}
{thumbnailUrl ? (
<img
src={thumbnailUrl}
alt={document.original_filename}
style={{
maxWidth: '100%',
maxHeight: '200px',
borderRadius: '8px',
objectFit: 'contain',
}}
/>
) : (
getFileIcon(document.mime_type)
)}
</Box>
<Typography variant="h6" sx={{ mb: 2, fontWeight: 600 }}>
{document.original_filename}
</Typography>
<Stack direction="row" spacing={1} justifyContent="center" sx={{ mb: 3 }}>
<Stack direction="row" spacing={1} justifyContent="center" sx={{ mb: 3, flexWrap: 'wrap' }}>
<Button
variant="contained"
startIcon={<ViewIcon />}
onClick={handleViewDocument}
sx={{ borderRadius: 2 }}
>
View
</Button>
<Button
variant="outlined"
startIcon={<DownloadIcon />}
onClick={handleDownload}
sx={{ borderRadius: 2 }}
@ -252,7 +300,7 @@ const DocumentDetailsPage: React.FC = () => {
onClick={handleViewOcr}
sx={{ borderRadius: 2 }}
>
View OCR
OCR Text
</Button>
)}
</Stack>
@ -590,6 +638,49 @@ const DocumentDetailsPage: React.FC = () => {
</Button>
</DialogActions>
</Dialog>
{/* Document View Dialog */}
<Dialog
open={showViewDialog}
onClose={() => setShowViewDialog(false)}
maxWidth="lg"
fullWidth
PaperProps={{
sx: { height: '90vh' }
}}
>
<DialogTitle>
<Box sx={{ display: 'flex', justifyContent: 'space-between', alignItems: 'center' }}>
<Typography variant="h6" sx={{ fontWeight: 600 }}>
{document?.original_filename}
</Typography>
<Box>
<Button
startIcon={<DownloadIcon />}
onClick={handleDownload}
size="small"
sx={{ mr: 1 }}
>
Download
</Button>
</Box>
</Box>
</DialogTitle>
<DialogContent sx={{ p: 0, display: 'flex', flexDirection: 'column' }}>
{document && (
<DocumentViewer
documentId={document.id}
filename={document.original_filename}
mimeType={document.mime_type}
/>
)}
</DialogContent>
<DialogActions>
<Button onClick={() => setShowViewDialog(false)}>
Close
</Button>
</DialogActions>
</Dialog>
</Box>
);
};

View File

@ -40,6 +40,7 @@ import {
Visibility as ViewIcon,
} from '@mui/icons-material';
import { documentService } from '../services/api';
import DocumentThumbnail from '../components/DocumentThumbnail';
interface Document {
id: string;
@ -375,9 +376,11 @@ const DocumentsPage: React.FC = () => {
background: 'linear-gradient(135deg, rgba(99, 102, 241, 0.1) 0%, rgba(139, 92, 246, 0.1) 100%)',
}}
>
<Box sx={{ fontSize: '3rem' }}>
{getFileIcon(doc.mime_type)}
</Box>
<DocumentThumbnail
documentId={doc.id}
mimeType={doc.mime_type}
size="large"
/>
</Box>
)}
@ -385,7 +388,11 @@ const DocumentsPage: React.FC = () => {
<Box sx={{ display: 'flex', alignItems: 'flex-start', gap: 1 }}>
{viewMode === 'list' && (
<Box sx={{ mr: 1, mt: 0.5 }}>
{getFileIcon(doc.mime_type)}
<DocumentThumbnail
documentId={doc.id}
mimeType={doc.mime_type}
size="small"
/>
</Box>
)}

View File

@ -121,6 +121,18 @@ export const documentService = {
return api.get<OcrResponse>(`/documents/${id}/ocr`)
},
view: (id: string) => {
return api.get(`/documents/${id}/view`, {
responseType: 'blob',
})
},
getThumbnail: (id: string) => {
return api.get(`/documents/${id}/thumbnail`, {
responseType: 'blob',
})
},
search: (searchRequest: SearchRequest) => {
return api.get<SearchResponse>('/search', {
params: searchRequest,

View File

@ -78,10 +78,11 @@ impl BatchIngester {
let user_id_clone = user_id;
// Process file asynchronously
let db_clone = self.db.clone();
let handle = tokio::spawn(async move {
let permit = semaphore_clone.acquire().await.unwrap();
let _permit = permit;
process_single_file(path_clone, file_service, user_id_clone).await
process_single_file(path_clone, file_service, user_id_clone, db_clone).await
});
batch.push(handle);
@ -166,6 +167,7 @@ async fn process_single_file(
path: PathBuf,
file_service: FileService,
user_id: Uuid,
db: Database,
) -> Result<Option<(Uuid, i64)>> {
let filename = path
.file_name()
@ -204,7 +206,6 @@ async fn process_single_file(
);
// Save to database (without OCR)
let db = Database::new(&std::env::var("DATABASE_URL")?).await?;
let created_doc = db.create_document(document).await?;
Ok(Some((created_doc.id, file_size)))

View File

@ -54,9 +54,8 @@ async fn main() -> Result<()> {
let config = Config::from_env()?;
let db = Database::new(&config.database_url).await?;
let pool = sqlx::PgPool::connect(&config.database_url).await?;
let file_service = FileService::new(config.upload_path.clone());
let queue_service = OcrQueueService::new(db.clone(), pool, 1);
let queue_service = OcrQueueService::new(db.clone(), db.get_pool().clone(), 1);
let ingester = BatchIngester::new(db, queue_service, file_service, config);

View File

@ -1,6 +1,7 @@
use anyhow::Result;
use chrono::Utc;
use sqlx::{PgPool, Row};
use sqlx::{PgPool, Row, postgres::PgPoolOptions};
use std::time::Duration;
use uuid::Uuid;
use crate::models::{CreateUser, Document, SearchRequest, SearchMode, SearchSnippet, HighlightRange, EnhancedDocumentResponse, User};
@ -12,9 +13,19 @@ pub struct Database {
impl Database {
pub async fn new(database_url: &str) -> Result<Self> {
let pool = PgPool::connect(database_url).await?;
let pool = PgPoolOptions::new()
.max_connections(20) // Increase from default 10
.acquire_timeout(Duration::from_secs(3)) // 3 second timeout
.idle_timeout(Duration::from_secs(600)) // 10 minute idle timeout
.max_lifetime(Duration::from_secs(1800)) // 30 minute max lifetime
.connect(database_url)
.await?;
Ok(Self { pool })
}
pub fn get_pool(&self) -> &PgPool {
&self.pool
}
pub async fn migrate(&self) -> Result<()> {
// Create extensions

View File

@ -58,16 +58,28 @@ impl EnhancedOcrService {
file_path.to_string()
};
// Configure Tesseract with optimal settings
let mut tesseract = self.configure_tesseract(&processed_image_path, settings)?;
// Move CPU-intensive OCR operations to blocking thread pool
let processed_image_path_clone = processed_image_path.clone();
let settings_clone = settings.clone();
let temp_dir = self.temp_dir.clone();
// Extract text with confidence
let text = tesseract.get_text()?.trim().to_string();
let confidence = self.calculate_overall_confidence(&mut tesseract)?;
let ocr_result = tokio::task::spawn_blocking(move || -> Result<(String, f32)> {
// Configure Tesseract with optimal settings
let ocr_service = EnhancedOcrService::new(temp_dir);
let mut tesseract = ocr_service.configure_tesseract(&processed_image_path_clone, &settings_clone)?;
// Extract text with confidence
let text = tesseract.get_text()?.trim().to_string();
let confidence = ocr_service.calculate_overall_confidence(&mut tesseract)?;
Ok((text, confidence))
}).await??;
let (text, confidence) = ocr_result;
// Clean up temporary files if created
if processed_image_path != file_path {
let _ = std::fs::remove_file(&processed_image_path);
let _ = tokio::fs::remove_file(&processed_image_path).await;
}
let processing_time = start_time.elapsed().as_millis() as u64;
@ -502,7 +514,7 @@ impl EnhancedOcrService {
let start_time = std::time::Instant::now();
info!("Extracting text from PDF: {}", file_path);
let bytes = std::fs::read(file_path)?;
let bytes = tokio::fs::read(file_path).await?;
// Check if it's a valid PDF (handles leading null bytes)
if !is_valid_pdf(&bytes) {
@ -566,7 +578,7 @@ impl EnhancedOcrService {
}
"text/plain" => {
let start_time = std::time::Instant::now();
let text = std::fs::read_to_string(file_path)?;
let text = tokio::fs::read_to_string(file_path).await?;
let processing_time = start_time.elapsed().as_millis() as u64;
let word_count = text.split_whitespace().count();

View File

@ -6,6 +6,9 @@ use uuid::Uuid;
use crate::models::Document;
#[cfg(feature = "ocr")]
use image::{DynamicImage, ImageFormat, imageops::FilterType};
#[derive(Clone)]
pub struct FileService {
upload_path: String,
@ -87,4 +90,105 @@ impl FileService {
let data = fs::read(file_path).await?;
Ok(data)
}
#[cfg(feature = "ocr")]
pub async fn get_or_generate_thumbnail(&self, file_path: &str, filename: &str) -> Result<Vec<u8>> {
// Create thumbnails directory if it doesn't exist
let thumbnails_dir = Path::new(&self.upload_path).join("thumbnails");
if !thumbnails_dir.exists() {
fs::create_dir_all(&thumbnails_dir).await?;
}
// Generate thumbnail filename based on original file path
let file_stem = Path::new(file_path)
.file_stem()
.and_then(|s| s.to_str())
.unwrap_or("unknown");
let thumbnail_path = thumbnails_dir.join(format!("{}_thumb.jpg", file_stem));
// Check if thumbnail already exists
if thumbnail_path.exists() {
return self.read_file(&thumbnail_path.to_string_lossy()).await;
}
// Generate thumbnail
let thumbnail_data = self.generate_thumbnail(file_path, filename).await?;
// Save thumbnail to cache
fs::write(&thumbnail_path, &thumbnail_data).await?;
Ok(thumbnail_data)
}
#[cfg(feature = "ocr")]
async fn generate_thumbnail(&self, file_path: &str, filename: &str) -> Result<Vec<u8>> {
let file_data = self.read_file(file_path).await?;
// Determine file type from extension
let extension = Path::new(filename)
.extension()
.and_then(|ext| ext.to_str())
.unwrap_or("")
.to_lowercase();
match extension.as_str() {
"jpg" | "jpeg" | "png" | "bmp" | "tiff" | "gif" => {
self.generate_image_thumbnail(&file_data).await
}
"pdf" => {
// For PDFs, we'd need pdf2image or similar
// For now, return a placeholder
self.generate_placeholder_thumbnail("PDF").await
}
_ => {
// For other file types, generate a placeholder
self.generate_placeholder_thumbnail(&extension.to_uppercase()).await
}
}
}
#[cfg(feature = "ocr")]
async fn generate_image_thumbnail(&self, file_data: &[u8]) -> Result<Vec<u8>> {
let img = image::load_from_memory(file_data)?;
let thumbnail = img.resize(200, 200, FilterType::Lanczos3);
let mut buffer = Vec::new();
let mut cursor = std::io::Cursor::new(&mut buffer);
thumbnail.write_to(&mut cursor, ImageFormat::Jpeg)?;
Ok(buffer)
}
#[cfg(feature = "ocr")]
async fn generate_placeholder_thumbnail(&self, file_type: &str) -> Result<Vec<u8>> {
// Create a simple colored rectangle as placeholder
use image::{RgbImage, Rgb};
let mut img = RgbImage::new(200, 200);
// Different colors for different file types
let color = match file_type {
"PDF" => Rgb([220, 38, 27]), // Red for PDF
"TXT" => Rgb([34, 139, 34]), // Green for text
"DOC" | "DOCX" => Rgb([41, 128, 185]), // Blue for Word docs
_ => Rgb([108, 117, 125]), // Gray for unknown
};
// Fill with solid color
for pixel in img.pixels_mut() {
*pixel = color;
}
let dynamic_img = DynamicImage::ImageRgb8(img);
let mut buffer = Vec::new();
let mut cursor = std::io::Cursor::new(&mut buffer);
dynamic_img.write_to(&mut cursor, ImageFormat::Jpeg)?;
Ok(buffer)
}
#[cfg(not(feature = "ocr"))]
pub async fn get_or_generate_thumbnail(&self, _file_path: &str, _filename: &str) -> Result<Vec<u8>> {
anyhow::bail!("Thumbnail generation requires OCR feature")
}
}

View File

@ -149,28 +149,38 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.with_state(state.clone());
let watcher_config = config.clone();
let watcher_db = state.db.clone();
tokio::spawn(async move {
if let Err(e) = readur::watcher::start_folder_watcher(watcher_config).await {
if let Err(e) = readur::watcher::start_folder_watcher(watcher_config, watcher_db).await {
error!("Folder watcher error: {}", e);
}
});
// Start OCR queue worker
let queue_db = Database::new(&config.database_url).await?;
let queue_pool = sqlx::PgPool::connect(&config.database_url).await?;
let concurrent_jobs = 4; // TODO: Get from config/settings
let queue_service = Arc::new(readur::ocr_queue::OcrQueueService::new(queue_db, queue_pool, concurrent_jobs));
// Create dedicated runtime for background tasks to prevent interference
let background_runtime = tokio::runtime::Builder::new_multi_thread()
.worker_threads(2) // Dedicated threads for background work
.thread_name("readur-background")
.enable_all()
.build()?;
// Start OCR queue worker on dedicated runtime
let concurrent_jobs = 4; // TODO: Get from config/settings
let queue_service = Arc::new(readur::ocr_queue::OcrQueueService::new(
state.db.clone(),
state.db.get_pool().clone(),
concurrent_jobs
));
let queue_worker = queue_service.clone();
tokio::spawn(async move {
background_runtime.spawn(async move {
if let Err(e) = queue_worker.start_worker().await {
error!("OCR queue worker error: {}", e);
}
});
// Start maintenance tasks
// Start maintenance tasks on background runtime
let queue_maintenance = queue_service.clone();
tokio::spawn(async move {
background_runtime.spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(300)); // Every 5 minutes
loop {
interval.tick().await;
@ -187,9 +197,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}
});
// Start WebDAV background sync scheduler
// Start WebDAV background sync scheduler on background runtime
let webdav_scheduler = readur::webdav_scheduler::WebDAVScheduler::new(state.clone());
tokio::spawn(async move {
background_runtime.spawn(async move {
info!("Starting WebDAV background sync scheduler");
webdav_scheduler.start().await;
});

View File

@ -269,7 +269,7 @@ pub struct UpdateUser {
pub password: Option<String>,
}
#[derive(Debug, Serialize, Deserialize, FromRow, ToSchema)]
#[derive(Debug, Clone, Serialize, Deserialize, FromRow, ToSchema)]
pub struct Settings {
pub id: Uuid,
pub user_id: Uuid,

View File

@ -1,7 +1,7 @@
use axum::{
extract::{Multipart, Path, Query, State},
http::StatusCode,
response::Json,
http::{StatusCode, header::CONTENT_TYPE},
response::{Json, Response},
routing::{get, post},
Router,
};
@ -28,6 +28,8 @@ pub fn router() -> Router<Arc<AppState>> {
.route("/", post(upload_document))
.route("/", get(list_documents))
.route("/:id/download", get(download_document))
.route("/:id/view", get(view_document))
.route("/:id/thumbnail", get(get_document_thumbnail))
.route("/:id/ocr", get(get_document_ocr))
}
@ -111,10 +113,7 @@ async fn upload_document(
let enable_background_ocr = settings.enable_background_ocr;
if enable_background_ocr {
// Use connection pool from state to enqueue the document
let pool = sqlx::PgPool::connect(&state.config.database_url).await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let queue_service = OcrQueueService::new(state.db.clone(), pool, 1);
let queue_service = OcrQueueService::new(state.db.clone(), state.db.pool.clone(), 1);
// Calculate priority based on file size
let priority = match file_size {
@ -212,6 +211,109 @@ async fn download_document(
Ok(file_data)
}
#[utoipa::path(
get,
path = "/api/documents/{id}/view",
tag = "documents",
security(
("bearer_auth" = [])
),
params(
("id" = uuid::Uuid, Path, description = "Document ID")
),
responses(
(status = 200, description = "Document content for viewing in browser"),
(status = 404, description = "Document not found"),
(status = 401, description = "Unauthorized")
)
)]
async fn view_document(
State(state): State<Arc<AppState>>,
auth_user: AuthUser,
Path(document_id): Path<uuid::Uuid>,
) -> Result<Response, StatusCode> {
let documents = state
.db
.get_documents_by_user_with_role(auth_user.user.id, auth_user.user.role, 1000, 0)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let document = documents
.into_iter()
.find(|doc| doc.id == document_id)
.ok_or(StatusCode::NOT_FOUND)?;
let file_service = FileService::new(state.config.upload_path.clone());
let file_data = file_service
.read_file(&document.file_path)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
// Determine content type from file extension
let content_type = mime_guess::from_path(&document.filename)
.first_or_octet_stream()
.to_string();
let response = Response::builder()
.header(CONTENT_TYPE, content_type)
.header("Content-Length", file_data.len())
.body(file_data.into())
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
Ok(response)
}
#[utoipa::path(
get,
path = "/api/documents/{id}/thumbnail",
tag = "documents",
security(
("bearer_auth" = [])
),
params(
("id" = uuid::Uuid, Path, description = "Document ID")
),
responses(
(status = 200, description = "Document thumbnail image", content_type = "image/jpeg"),
(status = 404, description = "Document not found or thumbnail not available"),
(status = 401, description = "Unauthorized")
)
)]
async fn get_document_thumbnail(
State(state): State<Arc<AppState>>,
auth_user: AuthUser,
Path(document_id): Path<uuid::Uuid>,
) -> Result<Response, StatusCode> {
let documents = state
.db
.get_documents_by_user_with_role(auth_user.user.id, auth_user.user.role, 1000, 0)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let document = documents
.into_iter()
.find(|doc| doc.id == document_id)
.ok_or(StatusCode::NOT_FOUND)?;
let file_service = FileService::new(state.config.upload_path.clone());
// Try to generate or get cached thumbnail
match file_service.get_or_generate_thumbnail(&document.file_path, &document.filename).await {
Ok(thumbnail_data) => {
Ok(Response::builder()
.header(CONTENT_TYPE, "image/jpeg")
.header("Content-Length", thumbnail_data.len())
.header("Cache-Control", "public, max-age=3600") // Cache for 1 hour
.body(thumbnail_data.into())
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?)
}
Err(_) => {
// Return a placeholder thumbnail or 404
Err(StatusCode::NOT_FOUND)
}
}
}
#[utoipa::path(
get,
path = "/api/documents/{id}/ocr",

View File

@ -32,11 +32,7 @@ async fn get_queue_stats(
State(state): State<Arc<AppState>>,
_auth_user: AuthUser, // Require authentication
) -> Result<Json<serde_json::Value>, StatusCode> {
let pool = sqlx::PgPool::connect(&state.config.database_url)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let queue_service = OcrQueueService::new(state.db.clone(), pool, 1);
let queue_service = OcrQueueService::new(state.db.clone(), state.db.get_pool().clone(), 1);
let stats = queue_service
.get_stats()
@ -69,11 +65,7 @@ async fn requeue_failed(
State(state): State<Arc<AppState>>,
_auth_user: AuthUser, // Require authentication
) -> Result<Json<serde_json::Value>, StatusCode> {
let pool = sqlx::PgPool::connect(&state.config.database_url)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let queue_service = OcrQueueService::new(state.db.clone(), pool, 1);
let queue_service = OcrQueueService::new(state.db.clone(), state.db.get_pool().clone(), 1);
let count = queue_service
.requeue_failed_items()

View File

@ -2,11 +2,12 @@ use std::sync::Arc;
use std::path::Path;
use tracing::{error, info, warn};
use chrono::Utc;
use tokio::sync::Semaphore;
use futures::stream::{FuturesUnordered, StreamExt};
use crate::{
AppState,
models::{CreateWebDAVFile, UpdateWebDAVSyncState},
ocr_queue::OcrQueueService,
file_service::FileService,
webdav_service::{WebDAVConfig, WebDAVService},
};
@ -107,51 +108,78 @@ async fn perform_sync_internal(
Ok(files) => {
info!("Found {} files in folder {}", files.len(), folder_path);
let mut folder_files_processed = 0;
let files_to_process = files.len();
// Filter files for processing
let files_to_process: Vec<_> = files.into_iter()
.filter(|file_info| {
// Skip directories
if file_info.is_directory {
return false;
}
// Check if file extension is supported
let file_extension = Path::new(&file_info.name)
.extension()
.and_then(|ext| ext.to_str())
.unwrap_or("")
.to_lowercase();
config.file_extensions.contains(&file_extension)
})
.collect();
for (idx, file_info) in files.into_iter().enumerate() {
if file_info.is_directory {
continue; // Skip directories
}
info!("Processing {} files from folder {}", files_to_process.len(), folder_path);
// Process files concurrently with a limit to avoid overwhelming the system
let concurrent_limit = 5; // Max 5 concurrent downloads
let semaphore = Arc::new(Semaphore::new(concurrent_limit));
let mut folder_files_processed = 0;
// Create futures for processing each file concurrently
let mut file_futures = FuturesUnordered::new();
for file_info in files_to_process.iter() {
let state_clone = state.clone();
let webdav_service_clone = webdav_service.clone();
let file_info_clone = file_info.clone();
let semaphore_clone = semaphore.clone();
// Check if file extension is supported
let file_extension = Path::new(&file_info.name)
.extension()
.and_then(|ext| ext.to_str())
.unwrap_or("")
.to_lowercase();
// Create a future for processing this file
let future = async move {
process_single_file(
state_clone,
user_id,
&webdav_service_clone,
&file_info_clone,
enable_background_ocr,
semaphore_clone,
).await
};
if !config.file_extensions.contains(&file_extension) {
continue; // Skip unsupported file types
}
// Check if we've already processed this file
match state.db.get_webdav_file_by_path(user_id, &file_info.path).await {
Ok(Some(existing_file)) => {
// Check if file has changed (compare ETags)
if existing_file.etag == file_info.etag {
info!("Skipping unchanged file: {} (ETag: {})", file_info.path, file_info.etag);
continue;
file_futures.push(future);
}
// Process files concurrently and collect results
while let Some(result) = file_futures.next().await {
match result {
Ok(processed) => {
if processed {
folder_files_processed += 1;
info!("Successfully processed file ({} completed in this folder)", folder_files_processed);
}
info!("File has changed: {} (old ETag: {}, new ETag: {})",
file_info.path, existing_file.etag, file_info.etag);
}
Ok(None) => {
info!("New file found: {}", file_info.path);
}
Err(e) => {
warn!("Error checking existing file {}: {}", file_info.path, e);
Err(error) => {
error!("File processing error: {}", error);
sync_errors.push(error);
}
}
// Update progress
// Update progress periodically
let progress_update = UpdateWebDAVSyncState {
last_sync_at: Some(Utc::now()),
sync_cursor: None,
is_running: true,
files_processed: (total_files_processed + folder_files_processed) as i64,
files_remaining: (files_to_process - idx - 1) as i64,
files_remaining: (files_to_process.len() - folder_files_processed) as i64,
current_folder: Some(folder_path.clone()),
errors: sync_errors.clone(),
};
@ -159,148 +187,6 @@ async fn perform_sync_internal(
if let Err(e) = state.db.update_webdav_sync_state(user_id, &progress_update).await {
warn!("Failed to update sync progress: {}", e);
}
// Download the file
match webdav_service.download_file(&file_info.path).await {
Ok(file_data) => {
info!("Downloaded file: {} ({} bytes)", file_info.name, file_data.len());
// Create file service and save file to disk
let file_service = FileService::new(state.config.upload_path.clone());
let saved_file_path = match file_service.save_file(&file_info.name, &file_data).await {
Ok(path) => path,
Err(e) => {
error!("Failed to save file {}: {}", file_info.name, e);
sync_errors.push(format!("Failed to save {}: {}", file_info.name, e));
// Record failed file in database
let failed_file = CreateWebDAVFile {
user_id,
webdav_path: file_info.path.clone(),
etag: file_info.etag.clone(),
last_modified: file_info.last_modified,
file_size: file_info.size,
mime_type: file_info.mime_type.clone(),
document_id: None,
sync_status: "failed".to_string(),
sync_error: Some(e.to_string()),
};
if let Err(db_err) = state.db.create_or_update_webdav_file(&failed_file).await {
error!("Failed to record failed file: {}", db_err);
}
continue;
}
};
// Create document record
let document = file_service.create_document(
&file_info.name,
&file_info.name, // original filename same as name
&saved_file_path,
file_info.size,
&file_info.mime_type,
user_id,
);
// Save document to database
match state.db.create_document(document).await {
Ok(saved_document) => {
info!("Created document record: {} (ID: {})", file_info.name, saved_document.id);
// Record successful file in WebDAV tracking
let webdav_file = CreateWebDAVFile {
user_id,
webdav_path: file_info.path.clone(),
etag: file_info.etag.clone(),
last_modified: file_info.last_modified,
file_size: file_info.size,
mime_type: file_info.mime_type.clone(),
document_id: Some(saved_document.id),
sync_status: "completed".to_string(),
sync_error: None,
};
if let Err(e) = state.db.create_or_update_webdav_file(&webdav_file).await {
error!("Failed to record WebDAV file: {}", e);
}
// Add to OCR queue if enabled
if enable_background_ocr {
match sqlx::PgPool::connect(&state.config.database_url).await {
Ok(pool) => {
let queue_service = OcrQueueService::new(state.db.clone(), pool, 1);
// Calculate priority based on file size
let priority = match file_info.size {
0..=1048576 => 10, // <= 1MB: highest priority
..=5242880 => 8, // 1-5MB: high priority
..=10485760 => 6, // 5-10MB: medium priority
..=52428800 => 4, // 10-50MB: low priority
_ => 2, // > 50MB: lowest priority
};
if let Err(e) = queue_service.enqueue_document(saved_document.id, priority, file_info.size).await {
error!("Failed to enqueue document for OCR: {}", e);
} else {
info!("Enqueued document {} for OCR processing", saved_document.id);
}
}
Err(e) => {
error!("Failed to connect to database for OCR queueing: {}", e);
}
}
}
folder_files_processed += 1;
}
Err(e) => {
error!("Failed to create document record for {}: {}", file_info.name, e);
sync_errors.push(format!("Failed to create document {}: {}", file_info.name, e));
// Update WebDAV file status to failed
let failed_file = CreateWebDAVFile {
user_id,
webdav_path: file_info.path.clone(),
etag: file_info.etag.clone(),
last_modified: file_info.last_modified,
file_size: file_info.size,
mime_type: file_info.mime_type.clone(),
document_id: None,
sync_status: "failed".to_string(),
sync_error: Some(e.to_string()),
};
if let Err(db_err) = state.db.create_or_update_webdav_file(&failed_file).await {
error!("Failed to record failed file: {}", db_err);
}
}
}
}
Err(e) => {
error!("Failed to download file {}: {}", file_info.path, e);
sync_errors.push(format!("Failed to download {}: {}", file_info.path, e));
// Record download failure
let failed_file = CreateWebDAVFile {
user_id,
webdav_path: file_info.path.clone(),
etag: file_info.etag.clone(),
last_modified: file_info.last_modified,
file_size: file_info.size,
mime_type: file_info.mime_type.clone(),
document_id: None,
sync_status: "failed".to_string(),
sync_error: Some(format!("Download failed: {}", e)),
};
if let Err(db_err) = state.db.create_or_update_webdav_file(&failed_file).await {
error!("Failed to record failed file: {}", db_err);
}
}
}
}
total_files_processed += folder_files_processed;
@ -314,4 +200,116 @@ async fn perform_sync_internal(
info!("WebDAV sync completed for user {}: {} files processed", user_id, total_files_processed);
Ok(total_files_processed)
}
// Helper function to process a single file asynchronously
async fn process_single_file(
state: Arc<AppState>,
user_id: uuid::Uuid,
webdav_service: &WebDAVService,
file_info: &crate::models::FileInfo,
enable_background_ocr: bool,
semaphore: Arc<Semaphore>,
) -> Result<bool, String> {
// Acquire semaphore permit to limit concurrent downloads
let _permit = semaphore.acquire().await.map_err(|e| format!("Semaphore error: {}", e))?;
info!("Processing file: {}", file_info.path);
// Check if we've already processed this file
match state.db.get_webdav_file_by_path(user_id, &file_info.path).await {
Ok(Some(existing_file)) => {
// Check if file has changed (compare ETags)
if existing_file.etag == file_info.etag {
info!("Skipping unchanged file: {} (ETag: {})", file_info.path, file_info.etag);
return Ok(false); // Not processed (no change)
}
info!("File has changed: {} (old ETag: {}, new ETag: {})",
file_info.path, existing_file.etag, file_info.etag);
}
Ok(None) => {
info!("New file found: {}", file_info.path);
}
Err(e) => {
warn!("Error checking existing file {}: {}", file_info.path, e);
}
}
// Download the file
let file_data = webdav_service.download_file(&file_info.path).await
.map_err(|e| format!("Failed to download {}: {}", file_info.path, e))?;
info!("Downloaded file: {} ({} bytes)", file_info.name, file_data.len());
// Create file service and save file to disk
let file_service = FileService::new(state.config.upload_path.clone());
let saved_file_path = file_service.save_file(&file_info.name, &file_data).await
.map_err(|e| format!("Failed to save {}: {}", file_info.name, e))?;
// Create document record
let file_service = FileService::new(state.config.upload_path.clone());
let document = file_service.create_document(
&file_info.name,
&file_info.name, // original filename same as name
&saved_file_path,
file_data.len() as i64,
&file_info.mime_type,
user_id,
);
// Save document to database
let created_document = state.db.create_document(document)
.await
.map_err(|e| format!("Failed to create document {}: {}", file_info.name, e))?;
info!("Created document record for {}: {}", file_info.name, created_document.id);
// Record successful file in WebDAV files table
let webdav_file = CreateWebDAVFile {
user_id,
webdav_path: file_info.path.clone(),
etag: file_info.etag.clone(),
last_modified: file_info.last_modified,
file_size: file_info.size,
mime_type: file_info.mime_type.clone(),
document_id: Some(created_document.id),
sync_status: "synced".to_string(),
sync_error: None,
};
if let Err(e) = state.db.create_or_update_webdav_file(&webdav_file).await {
error!("Failed to record WebDAV file: {}", e);
}
// Queue for OCR processing if enabled
if enable_background_ocr {
match state.db.pool.acquire().await {
Ok(conn) => {
let queue_service = crate::ocr_queue::OcrQueueService::new(
state.db.clone(),
state.db.pool.clone(),
4
);
// Determine priority based on file size
let priority = if file_info.size <= 1024 * 1024 { 10 } // ≤ 1MB: High priority
else if file_info.size <= 5 * 1024 * 1024 { 8 } // ≤ 5MB: Medium priority
else if file_info.size <= 10 * 1024 * 1024 { 6 } // ≤ 10MB: Normal priority
else if file_info.size <= 50 * 1024 * 1024 { 4 } // ≤ 50MB: Low priority
else { 2 }; // > 50MB: Lowest priority
if let Err(e) = queue_service.enqueue_document(created_document.id, priority, file_info.size).await {
error!("Failed to enqueue document for OCR: {}", e);
} else {
info!("Enqueued document {} for OCR processing", created_document.id);
}
}
Err(e) => {
error!("Failed to connect to database for OCR queueing: {}", e);
}
}
}
Ok(true) // Successfully processed
}

View File

@ -10,14 +10,12 @@ use walkdir::WalkDir;
use crate::{config::Config, db::Database, file_service::FileService, ocr_queue::OcrQueueService};
pub async fn start_folder_watcher(config: Config) -> Result<()> {
pub async fn start_folder_watcher(config: Config, db: Database) -> Result<()> {
info!("Starting hybrid folder watcher on: {}", config.watch_folder);
// Initialize services
let db = Database::new(&config.database_url).await?;
let pool = sqlx::PgPool::connect(&config.database_url).await?;
// Initialize services with shared database
let file_service = FileService::new(config.upload_path.clone());
let queue_service = OcrQueueService::new(db.clone(), pool, 1);
let queue_service = OcrQueueService::new(db.clone(), db.get_pool().clone(), 1);
// Determine watch strategy based on filesystem type
let watch_path = Path::new(&config.watch_folder);

View File

@ -46,6 +46,7 @@ impl Default for RetryConfig {
#[derive(Clone)]
pub struct WebDAVService {
client: Client,
config: WebDAVConfig,