feat(server): fix recursively scanning the uploads folder, and the quick search bar

This commit is contained in:
perf3ct 2025-06-15 04:37:49 +00:00
parent 99521b4ca0
commit 4aa3d77e40
No known key found for this signature in database
GPG Key ID: 569C4EEC436F5232
9 changed files with 226 additions and 49 deletions

1
Cargo.lock generated
View File

@ -2929,6 +2929,7 @@ dependencies = [
"reqwest", "reqwest",
"serde", "serde",
"serde_json", "serde_json",
"sha2",
"sqlx", "sqlx",
"sysinfo", "sysinfo",
"tempfile", "tempfile",

View File

@ -48,6 +48,7 @@ hostname = "0.4"
walkdir = "2" walkdir = "2"
clap = { version = "4", features = ["derive"] } clap = { version = "4", features = ["derive"] }
utoipa = { version = "5", features = ["axum_extras", "chrono", "uuid"] } utoipa = { version = "5", features = ["axum_extras", "chrono", "uuid"] }
sha2 = "0.10"
utoipa-swagger-ui = { version = "9", features = ["axum"] } utoipa-swagger-ui = { version = "9", features = ["axum"] }
[features] [features]

View File

@ -33,35 +33,18 @@ import {
AccessTime as TimeIcon, AccessTime as TimeIcon,
} from '@mui/icons-material'; } from '@mui/icons-material';
import { useNavigate } from 'react-router-dom'; import { useNavigate } from 'react-router-dom';
import { documentService, SearchRequest } from '../../services/api'; import { documentService, SearchRequest, EnhancedDocument, SearchResponse } from '../../services/api';
interface GlobalSearchBarProps { interface GlobalSearchBarProps {
sx?: SxProps<Theme>; sx?: SxProps<Theme>;
[key: string]: any; [key: string]: any;
} }
interface Document {
id: string;
original_filename: string;
filename?: string;
file_size: number;
mime_type: string;
has_ocr_text?: boolean;
search_rank?: number;
snippets?: Array<{ text: string }>;
}
interface SearchResponse {
documents: Document[];
total_count: number;
search_time_ms: number;
}
const GlobalSearchBar: React.FC<GlobalSearchBarProps> = ({ sx, ...props }) => { const GlobalSearchBar: React.FC<GlobalSearchBarProps> = ({ sx, ...props }) => {
const navigate = useNavigate(); const navigate = useNavigate();
const theme = useTheme(); const theme = useTheme();
const [query, setQuery] = useState<string>(''); const [query, setQuery] = useState<string>('');
const [results, setResults] = useState<Document[]>([]); const [results, setResults] = useState<EnhancedDocument[]>([]);
const [loading, setLoading] = useState<boolean>(false); const [loading, setLoading] = useState<boolean>(false);
const [showResults, setShowResults] = useState<boolean>(false); const [showResults, setShowResults] = useState<boolean>(false);
const [recentSearches, setRecentSearches] = useState<string[]>([]); const [recentSearches, setRecentSearches] = useState<string[]>([]);
@ -221,7 +204,7 @@ const GlobalSearchBar: React.FC<GlobalSearchBarProps> = ({ sx, ...props }) => {
setSearchProgress(0); setSearchProgress(0);
}; };
const handleDocumentClick = (doc: Document): void => { const handleDocumentClick = (doc: EnhancedDocument): void => {
saveRecentSearch(query); saveRecentSearch(query);
setShowResults(false); setShowResults(false);
navigate(`/documents/${doc.id}`); navigate(`/documents/${doc.id}`);
@ -661,7 +644,7 @@ const GlobalSearchBar: React.FC<GlobalSearchBarProps> = ({ sx, ...props }) => {
flex: 1, flex: 1,
}} }}
> >
{highlightText(generateContextSnippet(doc.original_filename, query), query)} {highlightText(doc.original_filename || doc.filename, query)}
</Typography> </Typography>
} }
secondary={ secondary={
@ -725,7 +708,7 @@ const GlobalSearchBar: React.FC<GlobalSearchBarProps> = ({ sx, ...props }) => {
flex: 1, flex: 1,
}} }}
> >
{highlightText(doc.snippets[0].text.substring(0, 80) + '...', query)} {highlightText(doc.snippets[0]?.text?.substring(0, 80) + '...' || '', query)}
</Typography> </Typography>
)} )}
</Box> </Box>

View File

@ -80,23 +80,22 @@ const DocumentDetailsPage: React.FC = () => {
}, [document]); }, [document]);
const fetchDocumentDetails = async (): Promise<void> => { const fetchDocumentDetails = async (): Promise<void> => {
if (!id) {
setError('No document ID provided');
setLoading(false);
return;
}
try { try {
setLoading(true); setLoading(true);
setError(null); setError(null);
// Since we don't have a direct document details endpoint, const response = await documentService.getById(id);
// we'll fetch the document from the list and find the matching one setDocument(response.data);
const response = await documentService.list(1000, 0); } catch (err: any) {
const foundDoc = response.data.find(doc => doc.id === id); const errorMessage = err.message || 'Failed to load document details';
setError(errorMessage);
if (foundDoc) { console.error('Failed to fetch document details:', err);
setDocument(foundDoc);
} else {
setError('Document not found');
}
} catch (err) {
setError('Failed to load document details');
console.error(err);
} finally { } finally {
setLoading(false); setLoading(false);
} }

View File

@ -111,6 +111,23 @@ export const documentService = {
}) })
}, },
getById: (id: string) => {
// Use the document list endpoint with pagination to find the specific document
// This is a temporary solution until we have a proper document details endpoint
return api.get<Document[]>('/documents', {
params: {
limit: 1000, // Fetch a reasonable amount to find our document
offset: 0
}
}).then(response => {
const document = response.data.find(doc => doc.id === id);
if (!document) {
throw new Error('Document not found');
}
return { data: document };
})
},
download: (id: string) => { download: (id: string) => {
return api.get(`/documents/${id}/download`, { return api.get(`/documents/${id}/download`, {
responseType: 'blob', responseType: 'blob',

View File

@ -28,7 +28,7 @@ impl Config {
pub fn from_env() -> Result<Self> { pub fn from_env() -> Result<Self> {
dotenvy::dotenv().ok(); dotenvy::dotenv().ok();
Ok(Config { let config = Config {
database_url: env::var("DATABASE_URL") database_url: env::var("DATABASE_URL")
.unwrap_or_else(|_| "postgresql://readur:readur@localhost/readur".to_string()), .unwrap_or_else(|_| "postgresql://readur:readur@localhost/readur".to_string()),
server_address: { server_address: {
@ -85,6 +85,68 @@ impl Config {
.unwrap_or(512), .unwrap_or(512),
cpu_priority: env::var("CPU_PRIORITY") cpu_priority: env::var("CPU_PRIORITY")
.unwrap_or_else(|_| "normal".to_string()), .unwrap_or_else(|_| "normal".to_string()),
}) };
// Validate configuration to prevent recursion issues
config.validate_paths()?;
Ok(config)
}
fn validate_paths(&self) -> Result<()> {
use std::path::Path;
let upload_path = Path::new(&self.upload_path);
let watch_path = Path::new(&self.watch_folder);
// Normalize paths to handle relative paths and symlinks
let upload_canonical = upload_path.canonicalize()
.unwrap_or_else(|_| upload_path.to_path_buf());
let watch_canonical = watch_path.canonicalize()
.unwrap_or_else(|_| watch_path.to_path_buf());
// Check if paths are the same
if upload_canonical == watch_canonical {
return Err(anyhow::anyhow!(
"Configuration Error: UPLOAD_PATH and WATCH_FOLDER cannot be the same directory.\n\
This would cause infinite recursion where WebDAV files are downloaded to the upload \n\
directory and then immediately reprocessed by the watcher.\n\
Current config:\n\
- UPLOAD_PATH: {}\n\
- WATCH_FOLDER: {}\n\
Please set them to different directories.",
self.upload_path, self.watch_folder
));
}
// Check if watch folder is inside upload folder
if watch_canonical.starts_with(&upload_canonical) {
return Err(anyhow::anyhow!(
"Configuration Error: WATCH_FOLDER cannot be inside UPLOAD_PATH.\n\
This would cause recursion where WebDAV files downloaded to uploads are \n\
detected by the watcher as new files.\n\
Current config:\n\
- UPLOAD_PATH: {}\n\
- WATCH_FOLDER: {}\n\
Please move the watch folder outside the upload directory.",
self.upload_path, self.watch_folder
));
}
// Check if upload folder is inside watch folder
if upload_canonical.starts_with(&watch_canonical) {
return Err(anyhow::anyhow!(
"Configuration Error: UPLOAD_PATH cannot be inside WATCH_FOLDER.\n\
This would cause recursion where files from the watch folder are \n\
copied to uploads (inside the watch folder) and reprocessed.\n\
Current config:\n\
- UPLOAD_PATH: {}\n\
- WATCH_FOLDER: {}\n\
Please move the upload directory outside the watch folder.",
self.upload_path, self.watch_folder
));
}
Ok(())
} }
} }

View File

@ -8,6 +8,7 @@ use axum::{
use serde::Deserialize; use serde::Deserialize;
use std::sync::Arc; use std::sync::Arc;
use utoipa::ToSchema; use utoipa::ToSchema;
use sha2::{Sha256, Digest};
use crate::{ use crate::{
auth::AuthUser, auth::AuthUser,
@ -85,6 +86,27 @@ async fn upload_document(
return Err(StatusCode::PAYLOAD_TOO_LARGE); return Err(StatusCode::PAYLOAD_TOO_LARGE);
} }
// Calculate file hash for deduplication
let file_hash = calculate_file_hash(&data);
// Check if this exact file content already exists in the system
// This prevents uploading and processing duplicate files
if let Ok(existing_docs) = state.db.get_documents_by_user_with_role(auth_user.user.id, auth_user.user.role, 1000, 0).await {
for existing_doc in existing_docs {
// Quick size check first (much faster than hash comparison)
if existing_doc.file_size == file_size {
// Read the existing file and compare hashes
if let Ok(existing_file_data) = tokio::fs::read(&existing_doc.file_path).await {
let existing_hash = calculate_file_hash(&existing_file_data);
if file_hash == existing_hash {
// Return the existing document instead of creating a duplicate
return Ok(Json(existing_doc.into()));
}
}
}
}
}
let mime_type = mime_guess::from_path(&filename) let mime_type = mime_guess::from_path(&filename)
.first_or_octet_stream() .first_or_octet_stream()
.to_string(); .to_string();
@ -135,6 +157,13 @@ async fn upload_document(
Err(StatusCode::BAD_REQUEST) Err(StatusCode::BAD_REQUEST)
} }
fn calculate_file_hash(data: &[u8]) -> String {
let mut hasher = Sha256::new();
hasher.update(data);
let result = hasher.finalize();
format!("{:x}", result)
}
#[utoipa::path( #[utoipa::path(
get, get,
path = "/api/documents", path = "/api/documents",

View File

@ -4,6 +4,7 @@ use tracing::{error, info, warn};
use chrono::Utc; use chrono::Utc;
use tokio::sync::Semaphore; use tokio::sync::Semaphore;
use futures::stream::{FuturesUnordered, StreamExt}; use futures::stream::{FuturesUnordered, StreamExt};
use sha2::{Sha256, Digest};
use crate::{ use crate::{
AppState, AppState,
@ -241,6 +242,46 @@ async fn process_single_file(
info!("Downloaded file: {} ({} bytes)", file_info.name, file_data.len()); info!("Downloaded file: {} ({} bytes)", file_info.name, file_data.len());
// Calculate file hash for deduplication
let file_hash = calculate_file_hash(&file_data);
// Check if this exact file content already exists in the system
// This prevents downloading and processing duplicate files from WebDAV
if let Ok(existing_docs) = state.db.get_documents_by_user_with_role(user_id, crate::models::UserRole::User, 1000, 0).await {
for existing_doc in existing_docs {
// Quick size check first (much faster than hash comparison)
if existing_doc.file_size == file_data.len() as i64 {
// Read the existing file and compare hashes
if let Ok(existing_file_data) = tokio::fs::read(&existing_doc.file_path).await {
let existing_hash = calculate_file_hash(&existing_file_data);
if file_hash == existing_hash {
info!("Skipping duplicate WebDAV file content: {} (hash: {}, already exists as: {})",
file_info.name, &file_hash[..8], existing_doc.original_filename);
// Still record this WebDAV file in the tracking table to prevent re-downloading
let webdav_file = CreateWebDAVFile {
user_id,
webdav_path: file_info.path.clone(),
etag: file_info.etag.clone(),
last_modified: file_info.last_modified,
file_size: file_info.size,
mime_type: file_info.mime_type.clone(),
document_id: Some(existing_doc.id), // Link to existing document
sync_status: "duplicate_content".to_string(),
sync_error: None,
};
if let Err(e) = state.db.create_or_update_webdav_file(&webdav_file).await {
error!("Failed to record duplicate WebDAV file: {}", e);
}
return Ok(false); // Not processed (duplicate)
}
}
}
}
}
// Create file service and save file to disk // Create file service and save file to disk
let file_service = FileService::new(state.config.upload_path.clone()); let file_service = FileService::new(state.config.upload_path.clone());
@ -312,4 +353,11 @@ async fn process_single_file(
} }
Ok(true) // Successfully processed Ok(true) // Successfully processed
}
fn calculate_file_hash(data: &[u8]) -> String {
let mut hasher = Sha256::new();
hasher.update(data);
let result = hasher.finalize();
format!("{:x}", result)
} }

View File

@ -7,6 +7,7 @@ use tokio::sync::mpsc;
use tokio::time::{interval, sleep}; use tokio::time::{interval, sleep};
use tracing::{debug, error, info, warn}; use tracing::{debug, error, info, warn};
use walkdir::WalkDir; use walkdir::WalkDir;
use sha2::{Sha256, Digest};
use crate::{config::Config, db::Database, file_service::FileService, ocr_queue::OcrQueueService}; use crate::{config::Config, db::Database, file_service::FileService, ocr_queue::OcrQueueService};
@ -134,7 +135,9 @@ async fn start_polling_watcher(
let mut interval = interval(Duration::from_secs(config.watch_interval_seconds.unwrap_or(30))); let mut interval = interval(Duration::from_secs(config.watch_interval_seconds.unwrap_or(30)));
// Initial scan // Initial scan
info!("Starting initial scan of watch directory: {}", config.watch_folder);
scan_directory(&config.watch_folder, &mut known_files, &db, &file_service, &queue_service, &config).await?; scan_directory(&config.watch_folder, &mut known_files, &db, &file_service, &queue_service, &config).await?;
info!("Initial scan completed. Found {} files to track", known_files.len());
loop { loop {
interval.tick().await; interval.tick().await;
@ -242,6 +245,19 @@ async fn process_file(
return Ok(()); return Ok(());
} }
// CRITICAL: Skip files that are in the upload directory - these are managed by WebDAV/manual uploads
let path_str = path.to_string_lossy();
let upload_path_normalized = std::path::Path::new(&config.upload_path)
.canonicalize()
.unwrap_or_else(|_| std::path::PathBuf::from(&config.upload_path));
if let Ok(file_canonical) = path.canonicalize() {
if file_canonical.starts_with(&upload_path_normalized) {
debug!("Skipping file in upload directory (managed by WebDAV/manual upload): {}", filename);
return Ok(());
}
}
// Check file age if configured // Check file age if configured
if let Some(max_age_hours) = config.max_file_age_hours { if let Some(max_age_hours) = config.max_file_age_hours {
if let Ok(metadata) = tokio::fs::metadata(path).await { if let Ok(metadata) = tokio::fs::metadata(path).await {
@ -255,7 +271,7 @@ async fn process_file(
} }
} }
info!("Processing new file: {:?}", path); info!("Processing new file: {:?} (from watch directory: {})", path, config.watch_folder);
let file_data = tokio::fs::read(path).await?; let file_data = tokio::fs::read(path).await?;
let file_size = file_data.len() as i64; let file_size = file_data.len() as i64;
@ -283,16 +299,35 @@ async fn process_file(
return Ok(()); return Ok(());
} }
// Check for duplicate files (same filename and size) // Fetch admin user ID from database for watch folder documents
if let Ok(existing_docs) = db.find_documents_by_filename(&filename).await { let admin_user = db.get_user_by_username("admin").await?
for doc in existing_docs { .ok_or_else(|| anyhow::anyhow!("Admin user not found. Please ensure the admin user is created."))?;
if doc.file_size == file_size { let admin_user_id = admin_user.id;
info!("Skipping duplicate file: {} (already exists with same size)", filename);
return Ok(()); // Calculate file hash for deduplication
let file_hash = calculate_file_hash(&file_data);
// Check if this exact file content already exists in the system by comparing
// against existing files with the same size (performance optimization)
if let Ok(existing_docs) = db.get_documents_by_user_with_role(admin_user_id, crate::models::UserRole::Admin, 1000, 0).await {
for existing_doc in existing_docs {
// Quick size check first (much faster than hash comparison)
if existing_doc.file_size == file_size {
// Read the existing file and compare hashes
if let Ok(existing_file_data) = tokio::fs::read(&existing_doc.file_path).await {
let existing_hash = calculate_file_hash(&existing_file_data);
if file_hash == existing_hash {
info!("Skipping duplicate file content: {} (hash: {}, already exists as: {})",
filename, &file_hash[..8], existing_doc.original_filename);
return Ok(());
}
}
} }
} }
} }
debug!("File content is unique: {} (hash: {})", filename, &file_hash[..8]);
// Validate PDF files before processing // Validate PDF files before processing
if mime_type == "application/pdf" { if mime_type == "application/pdf" {
if !is_valid_pdf(&file_data) { if !is_valid_pdf(&file_data) {
@ -310,11 +345,6 @@ async fn process_file(
let saved_file_path = file_service.save_file(&filename, &file_data).await?; let saved_file_path = file_service.save_file(&filename, &file_data).await?;
// Fetch admin user ID from database for watch folder documents
let admin_user = db.get_user_by_username("admin").await?
.ok_or_else(|| anyhow::anyhow!("Admin user not found. Please ensure the admin user is created."))?;
let admin_user_id = admin_user.id;
let document = file_service.create_document( let document = file_service.create_document(
&filename, &filename,
&filename, &filename,
@ -410,4 +440,11 @@ fn clean_pdf_data(data: &[u8]) -> &[u8] {
// If no PDF header found, return original data // If no PDF header found, return original data
data data
}
fn calculate_file_hash(data: &[u8]) -> String {
let mut hasher = Sha256::new();
hasher.update(data);
let result = hasher.finalize();
format!("{:x}", result)
} }