diff --git a/frontend/src/App.tsx b/frontend/src/App.tsx index 9d6de7d..ffc4678 100644 --- a/frontend/src/App.tsx +++ b/frontend/src/App.tsx @@ -12,6 +12,7 @@ import DocumentsPage from './pages/DocumentsPage'; import SearchPage from './pages/SearchPage'; import DocumentDetailsPage from './pages/DocumentDetailsPage'; import SettingsPage from './pages/SettingsPage'; +import SourcesPage from './pages/SourcesPage'; import WatchFolderPage from './pages/WatchFolderPage'; function App(): JSX.Element { @@ -65,6 +66,7 @@ function App(): JSX.Element { } /> } /> } /> + } /> } /> } /> Profile Page - Coming Soon} /> diff --git a/frontend/src/components/Layout/AppLayout.tsx b/frontend/src/components/Layout/AppLayout.tsx index b9feea6..65a1fa9 100644 --- a/frontend/src/components/Layout/AppLayout.tsx +++ b/frontend/src/components/Layout/AppLayout.tsx @@ -31,6 +31,7 @@ import { AccountCircle as AccountIcon, Logout as LogoutIcon, Description as DocumentIcon, + Storage as StorageIcon, } from '@mui/icons-material'; import { useNavigate, useLocation } from 'react-router-dom'; import { useAuth } from '../../contexts/AuthContext'; @@ -61,6 +62,7 @@ const navigationItems: NavigationItem[] = [ { text: 'Upload', icon: UploadIcon, path: '/upload' }, { text: 'Documents', icon: DocumentIcon, path: '/documents' }, { text: 'Search', icon: SearchIcon, path: '/search' }, + { text: 'Sources', icon: StorageIcon, path: '/sources' }, { text: 'Watch Folder', icon: FolderIcon, path: '/watch' }, ]; diff --git a/frontend/src/pages/SourcesPage.tsx b/frontend/src/pages/SourcesPage.tsx new file mode 100644 index 0000000..a91e119 --- /dev/null +++ b/frontend/src/pages/SourcesPage.tsx @@ -0,0 +1,1078 @@ +import React, { useState, useEffect } from 'react'; +import { + Box, + Container, + Typography, + Paper, + Button, + Grid, + Card, + CardContent, + Chip, + IconButton, + Dialog, + DialogTitle, + DialogContent, + DialogActions, + TextField, + FormControl, + InputLabel, + Select, + MenuItem, + Alert, + LinearProgress, + Snackbar, + Divider, + FormControlLabel, + Switch, + Tooltip, + CircularProgress, + Fade, + Stack, + Avatar, + Badge, + useTheme, + alpha, + Table, + TableBody, + TableCell, + TableContainer, + TableHead, + TableRow, +} from '@mui/material'; +import { + Add as AddIcon, + CloudSync as CloudSyncIcon, + Error as ErrorIcon, + CheckCircle as CheckCircleIcon, + Edit as EditIcon, + Delete as DeleteIcon, + PlayArrow as PlayArrowIcon, + Storage as StorageIcon, + Cloud as CloudIcon, + Speed as SpeedIcon, + Timeline as TimelineIcon, + TrendingUp as TrendingUpIcon, + Security as SecurityIcon, + AutoFixHigh as AutoFixHighIcon, + Sync as SyncIcon, + MoreVert as MoreVertIcon, + Menu as MenuIcon, + Folder as FolderIcon, + Assessment as AssessmentIcon, + Extension as ExtensionIcon, + Server as ServerIcon, +} from '@mui/icons-material'; +import { useNavigate } from 'react-router-dom'; +import api from '../services/api'; +import { formatDistanceToNow } from 'date-fns'; + +interface Source { + id: string; + name: string; + source_type: 'webdav' | 'local_folder' | 's3'; + enabled: boolean; + config: any; + status: 'idle' | 'syncing' | 'error'; + last_sync_at: string | null; + last_error: string | null; + last_error_at: string | null; + total_files_synced: number; + total_files_pending: number; + total_size_bytes: number; + created_at: string; + updated_at: string; +} + +interface SnackbarState { + open: boolean; + message: string; + severity: 'success' | 'error' | 'warning' | 'info'; +} + +const SourcesPage: React.FC = () => { + const theme = useTheme(); + const navigate = useNavigate(); + const [sources, setSources] = useState([]); + const [loading, setLoading] = useState(true); + const [dialogOpen, setDialogOpen] = useState(false); + const [editingSource, setEditingSource] = useState(null); + const [snackbar, setSnackbar] = useState({ + open: false, + message: '', + severity: 'info', + }); + + // Form state + const [formData, setFormData] = useState({ + name: '', + source_type: 'webdav' as 'webdav' | 'local_folder' | 's3', + enabled: true, + server_url: '', + username: '', + password: '', + watch_folders: ['/Documents'], + file_extensions: ['pdf', 'png', 'jpg', 'jpeg', 'tiff', 'bmp', 'txt'], + auto_sync: false, + sync_interval_minutes: 60, + server_type: 'generic' as 'nextcloud' | 'owncloud' | 'generic', + }); + + // Additional state for enhanced features + const [newFolder, setNewFolder] = useState(''); + const [newExtension, setNewExtension] = useState(''); + const [crawlEstimate, setCrawlEstimate] = useState(null); + const [estimatingCrawl, setEstimatingCrawl] = useState(false); + + const [testingConnection, setTestingConnection] = useState(false); + const [syncingSource, setSyncingSource] = useState(null); + + useEffect(() => { + loadSources(); + }, []); + + const loadSources = async () => { + try { + const response = await api.get('/sources'); + setSources(response.data); + } catch (error) { + console.error('Failed to load sources:', error); + showSnackbar('Failed to load sources', 'error'); + } finally { + setLoading(false); + } + }; + + const showSnackbar = (message: string, severity: SnackbarState['severity']) => { + setSnackbar({ open: true, message, severity }); + }; + + const handleCreateSource = () => { + setEditingSource(null); + setFormData({ + name: '', + source_type: 'webdav', + enabled: true, + server_url: '', + username: '', + password: '', + watch_folders: ['/Documents'], + file_extensions: ['pdf', 'png', 'jpg', 'jpeg', 'tiff', 'bmp', 'txt'], + auto_sync: false, + sync_interval_minutes: 60, + server_type: 'generic', + }); + setCrawlEstimate(null); + setNewFolder(''); + setNewExtension(''); + setDialogOpen(true); + }; + + const handleEditSource = (source: Source) => { + setEditingSource(source); + const config = source.config; + setFormData({ + name: source.name, + source_type: source.source_type, + enabled: source.enabled, + server_url: config.server_url || '', + username: config.username || '', + password: config.password || '', + watch_folders: config.watch_folders || ['/Documents'], + file_extensions: config.file_extensions || ['pdf', 'png', 'jpg', 'jpeg', 'tiff', 'bmp', 'txt'], + auto_sync: config.auto_sync || false, + sync_interval_minutes: config.sync_interval_minutes || 60, + server_type: config.server_type || 'generic', + }); + setCrawlEstimate(null); + setNewFolder(''); + setNewExtension(''); + setDialogOpen(true); + }; + + const handleSaveSource = async () => { + try { + const config = { + server_url: formData.server_url, + username: formData.username, + password: formData.password, + watch_folders: formData.watch_folders, + file_extensions: formData.file_extensions, + auto_sync: formData.auto_sync, + sync_interval_minutes: formData.sync_interval_minutes, + server_type: formData.server_type, + }; + + if (editingSource) { + await api.put(`/sources/${editingSource.id}`, { + name: formData.name, + enabled: formData.enabled, + config, + }); + showSnackbar('Source updated successfully', 'success'); + } else { + await api.post('/sources', { + name: formData.name, + source_type: formData.source_type, + enabled: formData.enabled, + config, + }); + showSnackbar('Source created successfully', 'success'); + } + + setDialogOpen(false); + loadSources(); + } catch (error) { + console.error('Failed to save source:', error); + showSnackbar('Failed to save source', 'error'); + } + }; + + const handleDeleteSource = async (source: Source) => { + if (!confirm(`Are you sure you want to delete "${source.name}"?`)) { + return; + } + + try { + await api.delete(`/sources/${source.id}`); + showSnackbar('Source deleted successfully', 'success'); + loadSources(); + } catch (error) { + console.error('Failed to delete source:', error); + showSnackbar('Failed to delete source', 'error'); + } + }; + + const handleTestConnection = async () => { + if (!editingSource) return; + + setTestingConnection(true); + try { + const response = await api.post(`/sources/${editingSource.id}/test`); + if (response.data.success) { + showSnackbar('Connection successful!', 'success'); + } else { + showSnackbar(response.data.message || 'Connection failed', 'error'); + } + } catch (error) { + console.error('Failed to test connection:', error); + showSnackbar('Failed to test connection', 'error'); + } finally { + setTestingConnection(false); + } + }; + + const handleTriggerSync = async (sourceId: string) => { + setSyncingSource(sourceId); + try { + await api.post(`/sources/${sourceId}/sync`); + showSnackbar('Sync started successfully', 'success'); + setTimeout(loadSources, 1000); + } catch (error: any) { + console.error('Failed to trigger sync:', error); + if (error.response?.status === 409) { + showSnackbar('Source is already syncing', 'warning'); + } else { + showSnackbar('Failed to start sync', 'error'); + } + } finally { + setSyncingSource(null); + } + }; + + // Utility functions for folder management + const addFolder = () => { + if (newFolder && !formData.watch_folders.includes(newFolder)) { + setFormData({ + ...formData, + watch_folders: [...formData.watch_folders, newFolder] + }); + setNewFolder(''); + } + }; + + const removeFolder = (folderToRemove: string) => { + setFormData({ + ...formData, + watch_folders: formData.watch_folders.filter(folder => folder !== folderToRemove) + }); + }; + + // Utility functions for file extension management + const addFileExtension = () => { + if (newExtension && !formData.file_extensions.includes(newExtension)) { + setFormData({ + ...formData, + file_extensions: [...formData.file_extensions, newExtension] + }); + setNewExtension(''); + } + }; + + const removeFileExtension = (extensionToRemove: string) => { + setFormData({ + ...formData, + file_extensions: formData.file_extensions.filter(ext => ext !== extensionToRemove) + }); + }; + + // Crawl estimation function + const estimateCrawl = async () => { + if (!editingSource) return; + + setEstimatingCrawl(true); + try { + const response = await api.post('/webdav/estimate', { + server_url: formData.server_url, + username: formData.username, + password: formData.password, + watch_folders: formData.watch_folders, + file_extensions: formData.file_extensions, + server_type: formData.server_type, + }); + setCrawlEstimate(response.data); + showSnackbar('Crawl estimation completed', 'success'); + } catch (error) { + console.error('Failed to estimate crawl:', error); + showSnackbar('Failed to estimate crawl', 'error'); + } finally { + setEstimatingCrawl(false); + } + }; + + const getSourceIcon = (sourceType: string) => { + switch (sourceType) { + case 'webdav': + return ; + case 's3': + return ; + case 'local_folder': + return ; + default: + return ; + } + }; + + const getStatusIcon = (source: Source) => { + if (source.status === 'syncing') { + return ; + } else if (source.status === 'error') { + return ; + } else { + return ; + } + }; + + const getStatusColor = (status: string) => { + switch (status) { + case 'syncing': + return theme.palette.info.main; + case 'error': + return theme.palette.error.main; + default: + return theme.palette.success.main; + } + }; + + const formatBytes = (bytes: number) => { + if (bytes === 0) return '0 B'; + const k = 1024; + const sizes = ['B', 'KB', 'MB', 'GB', 'TB']; + const i = Math.floor(Math.log(bytes) / Math.log(k)); + return parseFloat((bytes / Math.pow(k, i)).toFixed(1)) + ' ' + sizes[i]; + }; + + const StatCard = ({ icon, label, value, color = 'primary' }: { + icon: React.ReactNode; + label: string; + value: string | number; + color?: 'primary' | 'success' | 'warning' | 'error' + }) => ( + + + + {icon} + + + + {typeof value === 'number' ? value.toLocaleString() : value} + + + {label} + + + + + ); + + const renderSourceCard = (source: Source) => ( + + + + {/* Header */} + + + + {getSourceIcon(source.source_type)} + + + + {source.name} + + + + + {!source.enabled && ( + + )} + + + + + {/* Action Buttons */} + + + + handleTriggerSync(source.id)} + disabled={source.status === 'syncing' || !source.enabled} + sx={{ + bgcolor: alpha(theme.palette.primary.main, 0.1), + '&:hover': { bgcolor: alpha(theme.palette.primary.main, 0.2) }, + }} + > + {syncingSource === source.id ? ( + + ) : ( + + )} + + + + + handleEditSource(source)} + sx={{ + bgcolor: alpha(theme.palette.grey[500], 0.1), + '&:hover': { bgcolor: alpha(theme.palette.grey[500], 0.2) }, + }} + > + + + + + handleDeleteSource(source)} + sx={{ + bgcolor: alpha(theme.palette.error.main, 0.1), + '&:hover': { bgcolor: alpha(theme.palette.error.main, 0.2) }, + color: theme.palette.error.main, + }} + > + + + + + + + {/* Stats Grid */} + + + } + label="Files Synced" + value={source.total_files_synced} + color="success" + /> + + + } + label="Files Pending" + value={source.total_files_pending} + color="warning" + /> + + + } + label="Total Size" + value={formatBytes(source.total_size_bytes)} + color="primary" + /> + + + } + label="Last Sync" + value={source.last_sync_at + ? formatDistanceToNow(new Date(source.last_sync_at), { addSuffix: true }) + : 'Never'} + color="primary" + /> + + + + {/* Error Alert */} + {source.last_error && ( + + + {source.last_error} + + {source.last_error_at && ( + + {formatDistanceToNow(new Date(source.last_error_at), { addSuffix: true })} + + )} + + )} + + + + ); + + return ( + + {/* Header */} + + + Document Sources + + + Connect and manage your document sources with intelligent syncing + + + + + + + + + + {/* Content */} + {loading ? ( + + + + ) : sources.length === 0 ? ( + + + + + + No Sources Configured + + + Connect your first document source to start automatically syncing and processing your files with AI-powered OCR. + + + + ) : ( + + {sources.map(renderSourceCard)} + + )} + + {/* Create/Edit Dialog - Enhanced */} + setDialogOpen(false)} + maxWidth="md" + fullWidth + PaperProps={{ + sx: { + borderRadius: 4, + background: theme.palette.background.paper, + } + }} + > + + + + {editingSource ? : } + + + + {editingSource ? 'Edit Source' : 'Create New Source'} + + + {editingSource ? 'Update your source configuration' : 'Connect a new document source'} + + + + + + + + setFormData({ ...formData, name: e.target.value })} + placeholder="My Document Server" + sx={{ + '& .MuiOutlinedInput-root': { + borderRadius: 2, + } + }} + /> + + {!editingSource && ( + + Source Type + + + )} + + {formData.source_type === 'webdav' && ( + + + + + + + + WebDAV Configuration + + + + setFormData({ ...formData, server_url: e.target.value })} + placeholder="https://nextcloud.example.com/remote.php/dav/files/username/" + sx={{ '& .MuiOutlinedInput-root': { borderRadius: 2 } }} + /> + + + + setFormData({ ...formData, username: e.target.value })} + sx={{ '& .MuiOutlinedInput-root': { borderRadius: 2 } }} + /> + + + setFormData({ ...formData, password: e.target.value })} + sx={{ '& .MuiOutlinedInput-root': { borderRadius: 2 } }} + /> + + + + + Server Type + + + + setFormData({ ...formData, auto_sync: e.target.checked })} + /> + } + label={ + + + Enable Automatic Sync + + + Automatically sync files on a schedule + + + } + /> + + {formData.auto_sync && ( + setFormData({ ...formData, sync_interval_minutes: parseInt(e.target.value) || 60 })} + inputProps={{ min: 15, max: 1440 }} + helperText="How often to check for new files (15 min - 24 hours)" + sx={{ '& .MuiOutlinedInput-root': { borderRadius: 2 } }} + /> + )} + + + )} + + setFormData({ ...formData, enabled: e.target.checked })} + /> + } + label={ + + + Source Enabled + + + Enable this source for syncing + + + } + /> + + + + + + {editingSource && formData.source_type === 'webdav' && ( + + )} + + + + + {/* Snackbar */} + setSnackbar({ ...snackbar, open: false })} + anchorOrigin={{ vertical: 'bottom', horizontal: 'right' }} + > + setSnackbar({ ...snackbar, open: false })} + severity={snackbar.severity} + sx={{ + width: '100%', + borderRadius: 3, + }} + > + {snackbar.message} + + + + {/* Custom CSS for animations */} + + + ); +}; + +export default SourcesPage; \ No newline at end of file diff --git a/migrations/20240101000011_add_sources_table.sql b/migrations/20240101000011_add_sources_table.sql new file mode 100644 index 0000000..c1977c6 --- /dev/null +++ b/migrations/20240101000011_add_sources_table.sql @@ -0,0 +1,87 @@ +-- Create sources table to support multiple document sources per user +CREATE TABLE IF NOT EXISTS sources ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + user_id UUID REFERENCES users(id) ON DELETE CASCADE, + name TEXT NOT NULL, + source_type TEXT NOT NULL, -- 'webdav', 'local_folder', 's3', etc. + enabled BOOLEAN DEFAULT TRUE, + + -- Configuration (JSON to allow flexibility for different source types) + config JSONB NOT NULL DEFAULT '{}', + + -- Status tracking + status TEXT DEFAULT 'idle', -- 'idle', 'syncing', 'error' + last_sync_at TIMESTAMPTZ, + last_error TEXT, + last_error_at TIMESTAMPTZ, + + -- Statistics + total_files_synced BIGINT DEFAULT 0, + total_files_pending BIGINT DEFAULT 0, + total_size_bytes BIGINT DEFAULT 0, + + created_at TIMESTAMPTZ DEFAULT NOW(), + updated_at TIMESTAMPTZ DEFAULT NOW(), + + UNIQUE(user_id, name) +); + +-- Create indexes for performance +CREATE INDEX IF NOT EXISTS idx_sources_user_id ON sources(user_id); +CREATE INDEX IF NOT EXISTS idx_sources_source_type ON sources(source_type); +CREATE INDEX IF NOT EXISTS idx_sources_status ON sources(status); + +-- Update documents table to link to sources +ALTER TABLE documents ADD COLUMN IF NOT EXISTS source_id UUID REFERENCES sources(id) ON DELETE SET NULL; +CREATE INDEX IF NOT EXISTS idx_documents_source_id ON documents(source_id); + +-- Update webdav_files table to link to sources instead of users directly +ALTER TABLE webdav_files ADD COLUMN IF NOT EXISTS source_id UUID REFERENCES sources(id) ON DELETE CASCADE; + +-- Migrate existing WebDAV settings to sources table +INSERT INTO sources (user_id, name, source_type, enabled, config, created_at, updated_at) +SELECT + s.user_id, + 'WebDAV Server' as name, + 'webdav' as source_type, + s.webdav_enabled as enabled, + jsonb_build_object( + 'server_url', s.webdav_server_url, + 'username', s.webdav_username, + 'password', s.webdav_password, + 'watch_folders', s.webdav_watch_folders, + 'file_extensions', s.webdav_file_extensions, + 'auto_sync', s.webdav_auto_sync, + 'sync_interval_minutes', s.webdav_sync_interval_minutes + ) as config, + NOW() as created_at, + NOW() as updated_at +FROM settings s +WHERE s.webdav_enabled = TRUE + AND s.webdav_server_url IS NOT NULL + AND s.webdav_username IS NOT NULL; + +-- Update webdav_files to link to the newly created sources +UPDATE webdav_files wf +SET source_id = s.id +FROM sources s +WHERE wf.user_id = s.user_id + AND s.source_type = 'webdav'; + +-- Create a function to update the updated_at timestamp +CREATE OR REPLACE FUNCTION update_sources_updated_at() +RETURNS TRIGGER AS $$ +BEGIN + NEW.updated_at = NOW(); + RETURN NEW; +END; +$$ LANGUAGE plpgsql; + +-- Create trigger to auto-update updated_at +CREATE TRIGGER sources_updated_at_trigger +BEFORE UPDATE ON sources +FOR EACH ROW +EXECUTE FUNCTION update_sources_updated_at(); + +-- Note: We're keeping the webdav fields in settings table for now to ensure backward compatibility +-- They will be removed in a future migration after ensuring all code is updated \ No newline at end of file diff --git a/src/db.rs b/src/db.rs index b3283f7..503ef15 100644 --- a/src/db.rs +++ b/src/db.rs @@ -1836,4 +1836,260 @@ impl Database { Ok(files) } + + // Sources methods + pub async fn create_source(&self, user_id: Uuid, source: &crate::models::CreateSource) -> Result { + let id = Uuid::new_v4(); + let now = Utc::now(); + + let row = sqlx::query( + r#"INSERT INTO sources (id, user_id, name, source_type, enabled, config, status, created_at, updated_at) + VALUES ($1, $2, $3, $4, $5, $6, 'idle', $7, $8) + RETURNING *"# + ) + .bind(id) + .bind(user_id) + .bind(&source.name) + .bind(source.source_type.to_string()) + .bind(source.enabled.unwrap_or(true)) + .bind(&source.config) + .bind(now) + .bind(now) + .fetch_one(&self.pool) + .await?; + + Ok(crate::models::Source { + id: row.get("id"), + user_id: row.get("user_id"), + name: row.get("name"), + source_type: row.get::("source_type").try_into().map_err(|e: String| anyhow::anyhow!(e))?, + enabled: row.get("enabled"), + config: row.get("config"), + status: row.get::("status").try_into().map_err(|e: String| anyhow::anyhow!(e))?, + last_sync_at: row.get("last_sync_at"), + last_error: row.get("last_error"), + last_error_at: row.get("last_error_at"), + total_files_synced: row.get("total_files_synced"), + total_files_pending: row.get("total_files_pending"), + total_size_bytes: row.get("total_size_bytes"), + created_at: row.get("created_at"), + updated_at: row.get("updated_at"), + }) + } + + pub async fn get_source(&self, user_id: Uuid, source_id: Uuid) -> Result> { + let row = sqlx::query( + r#"SELECT * FROM sources WHERE id = $1 AND user_id = $2"# + ) + .bind(source_id) + .bind(user_id) + .fetch_optional(&self.pool) + .await?; + + match row { + Some(row) => Ok(Some(crate::models::Source { + id: row.get("id"), + user_id: row.get("user_id"), + name: row.get("name"), + source_type: row.get::("source_type").try_into().map_err(|e: String| anyhow::anyhow!(e))?, + enabled: row.get("enabled"), + config: row.get("config"), + status: row.get::("status").try_into().map_err(|e: String| anyhow::anyhow!(e))?, + last_sync_at: row.get("last_sync_at"), + last_error: row.get("last_error"), + last_error_at: row.get("last_error_at"), + total_files_synced: row.get("total_files_synced"), + total_files_pending: row.get("total_files_pending"), + total_size_bytes: row.get("total_size_bytes"), + created_at: row.get("created_at"), + updated_at: row.get("updated_at"), + })), + None => Ok(None), + } + } + + pub async fn get_sources(&self, user_id: Uuid) -> Result> { + let rows = sqlx::query( + r#"SELECT * FROM sources WHERE user_id = $1 ORDER BY created_at DESC"# + ) + .bind(user_id) + .fetch_all(&self.pool) + .await?; + + let mut sources = Vec::new(); + for row in rows { + sources.push(crate::models::Source { + id: row.get("id"), + user_id: row.get("user_id"), + name: row.get("name"), + source_type: row.get::("source_type").try_into().map_err(|e: String| anyhow::anyhow!(e))?, + enabled: row.get("enabled"), + config: row.get("config"), + status: row.get::("status").try_into().map_err(|e: String| anyhow::anyhow!(e))?, + last_sync_at: row.get("last_sync_at"), + last_error: row.get("last_error"), + last_error_at: row.get("last_error_at"), + total_files_synced: row.get("total_files_synced"), + total_files_pending: row.get("total_files_pending"), + total_size_bytes: row.get("total_size_bytes"), + created_at: row.get("created_at"), + updated_at: row.get("updated_at"), + }); + } + + Ok(sources) + } + + pub async fn update_source(&self, user_id: Uuid, source_id: Uuid, update: &crate::models::UpdateSource) -> Result { + let mut query = String::from("UPDATE sources SET updated_at = NOW()"); + let mut bind_count = 1; + + if update.name.is_some() { + bind_count += 1; + query.push_str(&format!(", name = ${}", bind_count)); + } + if update.enabled.is_some() { + bind_count += 1; + query.push_str(&format!(", enabled = ${}", bind_count)); + } + if update.config.is_some() { + bind_count += 1; + query.push_str(&format!(", config = ${}", bind_count)); + } + + bind_count += 1; + query.push_str(&format!(" WHERE id = ${}", bind_count)); + bind_count += 1; + query.push_str(&format!(" AND user_id = ${} RETURNING *", bind_count)); + + let mut query_builder = sqlx::query(&query); + + // Bind values in order + if let Some(name) = &update.name { + query_builder = query_builder.bind(name); + } + if let Some(enabled) = &update.enabled { + query_builder = query_builder.bind(enabled); + } + if let Some(config) = &update.config { + query_builder = query_builder.bind(config); + } + query_builder = query_builder.bind(source_id); + query_builder = query_builder.bind(user_id); + + let row = query_builder.fetch_one(&self.pool).await?; + + Ok(crate::models::Source { + id: row.get("id"), + user_id: row.get("user_id"), + name: row.get("name"), + source_type: row.get::("source_type").try_into().map_err(|e: String| anyhow::anyhow!(e))?, + enabled: row.get("enabled"), + config: row.get("config"), + status: row.get::("status").try_into().map_err(|e: String| anyhow::anyhow!(e))?, + last_sync_at: row.get("last_sync_at"), + last_error: row.get("last_error"), + last_error_at: row.get("last_error_at"), + total_files_synced: row.get("total_files_synced"), + total_files_pending: row.get("total_files_pending"), + total_size_bytes: row.get("total_size_bytes"), + created_at: row.get("created_at"), + updated_at: row.get("updated_at"), + }) + } + + pub async fn delete_source(&self, user_id: Uuid, source_id: Uuid) -> Result { + let result = sqlx::query( + r#"DELETE FROM sources WHERE id = $1 AND user_id = $2"# + ) + .bind(source_id) + .bind(user_id) + .execute(&self.pool) + .await?; + + Ok(result.rows_affected() > 0) + } + + pub async fn update_source_status(&self, source_id: Uuid, status: crate::models::SourceStatus, error: Option) -> Result<()> { + if let Some(error_msg) = error { + sqlx::query( + r#"UPDATE sources + SET status = $1, last_error = $2, last_error_at = NOW(), updated_at = NOW() + WHERE id = $3"# + ) + .bind(status.to_string()) + .bind(error_msg) + .bind(source_id) + .execute(&self.pool) + .await?; + } else { + sqlx::query( + r#"UPDATE sources + SET status = $1, updated_at = NOW() + WHERE id = $2"# + ) + .bind(status.to_string()) + .bind(source_id) + .execute(&self.pool) + .await?; + } + + Ok(()) + } + + pub async fn update_source_sync_stats(&self, source_id: Uuid, files_synced: i64, files_pending: i64, size_bytes: i64) -> Result<()> { + sqlx::query( + r#"UPDATE sources + SET total_files_synced = $1, total_files_pending = $2, total_size_bytes = $3, + last_sync_at = NOW(), updated_at = NOW() + WHERE id = $4"# + ) + .bind(files_synced) + .bind(files_pending) + .bind(size_bytes) + .bind(source_id) + .execute(&self.pool) + .await?; + + Ok(()) + } + + pub async fn get_recent_documents_for_source(&self, source_id: Uuid, limit: i64) -> Result> { + let rows = sqlx::query( + r#"SELECT * FROM documents + WHERE source_id = $1 + ORDER BY created_at DESC + LIMIT $2"# + ) + .bind(source_id) + .bind(limit) + .fetch_all(&self.pool) + .await?; + + let mut documents = Vec::new(); + for row in rows { + documents.push(Document { + id: row.get("id"), + filename: row.get("filename"), + original_filename: row.get("original_filename"), + file_path: row.get("file_path"), + file_size: row.get("file_size"), + mime_type: row.get("mime_type"), + content: row.get("content"), + ocr_text: row.get("ocr_text"), + ocr_confidence: row.get("ocr_confidence"), + ocr_word_count: row.get("ocr_word_count"), + ocr_processing_time_ms: row.get("ocr_processing_time_ms"), + ocr_status: row.get("ocr_status"), + ocr_error: row.get("ocr_error"), + ocr_completed_at: row.get("ocr_completed_at"), + tags: row.get("tags"), + created_at: row.get("created_at"), + updated_at: row.get("updated_at"), + user_id: row.get("user_id"), + }); + } + + Ok(documents) + } } \ No newline at end of file diff --git a/src/lib.rs b/src/lib.rs index 2ec6f76..5a7bff7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -31,6 +31,7 @@ use db::Database; pub struct AppState { pub db: Database, pub config: Config, + pub webdav_scheduler: Option>, } /// Health check endpoint for monitoring diff --git a/src/main.rs b/src/main.rs index ad77b49..e9da6e0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -128,26 +128,13 @@ async fn main() -> Result<(), Box> { } } - let state = AppState { db, config: config.clone() }; + let state = AppState { + db, + config: config.clone(), + webdav_scheduler: None, // Will be set after creating scheduler + }; let state = Arc::new(state); - let app = Router::new() - .route("/api/health", get(readur::health_check)) - .nest("/api/auth", readur::routes::auth::router()) - .nest("/api/documents", readur::routes::documents::router()) - .nest("/api/metrics", readur::routes::metrics::router()) - .nest("/metrics", readur::routes::prometheus_metrics::router()) - .nest("/api/notifications", readur::routes::notifications::router()) - .nest("/api/queue", readur::routes::queue::router()) - .nest("/api/search", readur::routes::search::router()) - .nest("/api/settings", readur::routes::settings::router()) - .nest("/api/users", readur::routes::users::router()) - .nest("/api/webdav", readur::routes::webdav::router()) - .merge(readur::swagger::create_swagger_router()) - .fallback_service(ServeDir::new("frontend/dist").fallback(ServeFile::new("frontend/dist/index.html"))) - .layer(CorsLayer::permissive()) - .with_state(state.clone()); - let watcher_config = config.clone(); let watcher_db = state.db.clone(); tokio::spawn(async move { @@ -197,13 +184,46 @@ async fn main() -> Result<(), Box> { } }); + // Create WebDAV scheduler and update AppState + let webdav_scheduler = Arc::new(readur::webdav_scheduler::WebDAVScheduler::new(state.clone())); + + // Update the state to include the scheduler + let updated_state = AppState { + db: state.db.clone(), + config: state.config.clone(), + webdav_scheduler: Some(webdav_scheduler.clone()), + }; + let state = Arc::new(updated_state); + // Start WebDAV background sync scheduler on background runtime - let webdav_scheduler = readur::webdav_scheduler::WebDAVScheduler::new(state.clone()); + let scheduler_for_background = webdav_scheduler.clone(); background_runtime.spawn(async move { - info!("Starting WebDAV background sync scheduler"); - webdav_scheduler.start().await; + info!("Starting WebDAV background sync scheduler with 30-second startup delay"); + // Wait 30 seconds before starting scheduler to allow server to fully initialize + tokio::time::sleep(tokio::time::Duration::from_secs(30)).await; + info!("WebDAV background sync scheduler starting after startup delay"); + scheduler_for_background.start().await; }); + // Create the router with the updated state + let app = Router::new() + .route("/api/health", get(readur::health_check)) + .nest("/api/auth", readur::routes::auth::router()) + .nest("/api/documents", readur::routes::documents::router()) + .nest("/api/metrics", readur::routes::metrics::router()) + .nest("/metrics", readur::routes::prometheus_metrics::router()) + .nest("/api/notifications", readur::routes::notifications::router()) + .nest("/api/queue", readur::routes::queue::router()) + .nest("/api/search", readur::routes::search::router()) + .nest("/api/settings", readur::routes::settings::router()) + .nest("/api/sources", readur::routes::sources::router()) + .nest("/api/users", readur::routes::users::router()) + .nest("/api/webdav", readur::routes::webdav::router()) + .merge(readur::swagger::create_swagger_router()) + .fallback_service(ServeDir::new("frontend/dist").fallback(ServeFile::new("frontend/dist/index.html"))) + .layer(CorsLayer::permissive()) + .with_state(state.clone()); + let listener = tokio::net::TcpListener::bind(&config.server_address).await?; info!("Server starting on {}", config.server_address); diff --git a/src/models.rs b/src/models.rs index e148507..5fbbcbb 100644 --- a/src/models.rs +++ b/src/models.rs @@ -620,4 +620,163 @@ pub struct FileInfo { pub last_modified: Option>, pub etag: String, pub is_directory: bool, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, ToSchema)] +pub enum SourceType { + #[serde(rename = "webdav")] + WebDAV, + #[serde(rename = "local_folder")] + LocalFolder, + #[serde(rename = "s3")] + S3, +} + +impl std::fmt::Display for SourceType { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + SourceType::WebDAV => write!(f, "webdav"), + SourceType::LocalFolder => write!(f, "local_folder"), + SourceType::S3 => write!(f, "s3"), + } + } +} + +impl TryFrom for SourceType { + type Error = String; + + fn try_from(value: String) -> Result { + match value.as_str() { + "webdav" => Ok(SourceType::WebDAV), + "local_folder" => Ok(SourceType::LocalFolder), + "s3" => Ok(SourceType::S3), + _ => Err(format!("Invalid source type: {}", value)), + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, ToSchema)] +pub enum SourceStatus { + #[serde(rename = "idle")] + Idle, + #[serde(rename = "syncing")] + Syncing, + #[serde(rename = "error")] + Error, +} + +impl std::fmt::Display for SourceStatus { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + SourceStatus::Idle => write!(f, "idle"), + SourceStatus::Syncing => write!(f, "syncing"), + SourceStatus::Error => write!(f, "error"), + } + } +} + +impl TryFrom for SourceStatus { + type Error = String; + + fn try_from(value: String) -> Result>::Error> { + match value.as_str() { + "idle" => Ok(SourceStatus::Idle), + "syncing" => Ok(SourceStatus::Syncing), + "error" => Ok(SourceStatus::Error), + _ => Err(format!("Invalid source status: {}", value)), + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, FromRow, ToSchema)] +pub struct Source { + pub id: Uuid, + pub user_id: Uuid, + pub name: String, + #[sqlx(try_from = "String")] + pub source_type: SourceType, + pub enabled: bool, + pub config: serde_json::Value, + #[sqlx(try_from = "String")] + pub status: SourceStatus, + pub last_sync_at: Option>, + pub last_error: Option, + pub last_error_at: Option>, + pub total_files_synced: i64, + pub total_files_pending: i64, + pub total_size_bytes: i64, + pub created_at: DateTime, + pub updated_at: DateTime, +} + +#[derive(Debug, Serialize, Deserialize, ToSchema)] +pub struct SourceResponse { + pub id: Uuid, + pub name: String, + pub source_type: SourceType, + pub enabled: bool, + pub config: serde_json::Value, + pub status: SourceStatus, + pub last_sync_at: Option>, + pub last_error: Option, + pub last_error_at: Option>, + pub total_files_synced: i64, + pub total_files_pending: i64, + pub total_size_bytes: i64, + pub created_at: DateTime, + pub updated_at: DateTime, +} + +#[derive(Debug, Serialize, Deserialize, ToSchema)] +pub struct CreateSource { + pub name: String, + pub source_type: SourceType, + pub enabled: Option, + pub config: serde_json::Value, +} + +#[derive(Debug, Serialize, Deserialize, ToSchema)] +pub struct UpdateSource { + pub name: Option, + pub enabled: Option, + pub config: Option, +} + +#[derive(Debug, Serialize, Deserialize, ToSchema)] +pub struct SourceWithStats { + pub source: SourceResponse, + pub recent_documents: Vec, + pub sync_progress: Option, +} + +impl From for SourceResponse { + fn from(source: Source) -> Self { + Self { + id: source.id, + name: source.name, + source_type: source.source_type, + enabled: source.enabled, + config: source.config, + status: source.status, + last_sync_at: source.last_sync_at, + last_error: source.last_error, + last_error_at: source.last_error_at, + total_files_synced: source.total_files_synced, + total_files_pending: source.total_files_pending, + total_size_bytes: source.total_size_bytes, + created_at: source.created_at, + updated_at: source.updated_at, + } + } +} + +#[derive(Debug, Serialize, Deserialize, ToSchema)] +pub struct WebDAVSourceConfig { + pub server_url: String, + pub username: String, + pub password: String, + pub watch_folders: Vec, + pub file_extensions: Vec, + pub auto_sync: bool, + pub sync_interval_minutes: i32, } \ No newline at end of file diff --git a/src/routes/mod.rs b/src/routes/mod.rs index 4442cf4..28dc768 100644 --- a/src/routes/mod.rs +++ b/src/routes/mod.rs @@ -6,5 +6,6 @@ pub mod prometheus_metrics; pub mod queue; pub mod search; pub mod settings; +pub mod sources; pub mod users; pub mod webdav; \ No newline at end of file diff --git a/src/routes/sources.rs b/src/routes/sources.rs new file mode 100644 index 0000000..4d9d498 --- /dev/null +++ b/src/routes/sources.rs @@ -0,0 +1,362 @@ +use axum::{ + extract::{Path, State}, + http::StatusCode, + response::Json, + routing::{delete, get, post, put}, + Router, +}; +use std::sync::Arc; +use uuid::Uuid; + +use crate::{ + auth::AuthUser, + models::{CreateSource, SourceResponse, SourceWithStats, UpdateSource}, + AppState, +}; + +pub fn router() -> Router> { + Router::new() + .route("/", get(list_sources).post(create_source)) + .route("/{id}", get(get_source).put(update_source).delete(delete_source)) + .route("/{id}/sync", post(trigger_sync)) + .route("/{id}/test", post(test_connection)) +} + +#[utoipa::path( + get, + path = "/api/sources", + tag = "sources", + security( + ("bearer_auth" = []) + ), + responses( + (status = 200, description = "List of user sources", body = Vec), + (status = 401, description = "Unauthorized") + ) +)] +async fn list_sources( + auth_user: AuthUser, + State(state): State>, +) -> Result>, StatusCode> { + let sources = state + .db + .get_sources(auth_user.user.id) + .await + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + + let responses: Vec = sources.into_iter().map(|s| s.into()).collect(); + Ok(Json(responses)) +} + +#[utoipa::path( + post, + path = "/api/sources", + tag = "sources", + security( + ("bearer_auth" = []) + ), + request_body = CreateSource, + responses( + (status = 201, description = "Source created successfully", body = SourceResponse), + (status = 400, description = "Bad request - invalid source data"), + (status = 401, description = "Unauthorized") + ) +)] +async fn create_source( + auth_user: AuthUser, + State(state): State>, + Json(source_data): Json, +) -> Result, StatusCode> { + // Validate source configuration based on type + if let Err(_) = validate_source_config(&source_data) { + return Err(StatusCode::BAD_REQUEST); + } + + let source = state + .db + .create_source(auth_user.user.id, &source_data) + .await + .map_err(|_| StatusCode::BAD_REQUEST)?; + + Ok(Json(source.into())) +} + +#[utoipa::path( + get, + path = "/api/sources/{id}", + tag = "sources", + security( + ("bearer_auth" = []) + ), + params( + ("id" = Uuid, Path, description = "Source ID") + ), + responses( + (status = 200, description = "Source details with stats", body = SourceWithStats), + (status = 404, description = "Source not found"), + (status = 401, description = "Unauthorized") + ) +)] +async fn get_source( + auth_user: AuthUser, + Path(source_id): Path, + State(state): State>, +) -> Result, StatusCode> { + let source = state + .db + .get_source(auth_user.user.id, source_id) + .await + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)? + .ok_or(StatusCode::NOT_FOUND)?; + + // Get recent documents for this source + let recent_documents = state + .db + .get_recent_documents_for_source(source_id, 10) + .await + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + + // Calculate sync progress + let sync_progress = if source.total_files_pending > 0 { + Some( + (source.total_files_synced as f32 + / (source.total_files_synced + source.total_files_pending) as f32) + * 100.0, + ) + } else { + None + }; + + let response = SourceWithStats { + source: source.into(), + recent_documents: recent_documents.into_iter().map(|d| d.into()).collect(), + sync_progress, + }; + + Ok(Json(response)) +} + +#[utoipa::path( + put, + path = "/api/sources/{id}", + tag = "sources", + security( + ("bearer_auth" = []) + ), + params( + ("id" = Uuid, Path, description = "Source ID") + ), + request_body = UpdateSource, + responses( + (status = 200, description = "Source updated successfully", body = SourceResponse), + (status = 404, description = "Source not found"), + (status = 400, description = "Bad request - invalid update data"), + (status = 401, description = "Unauthorized") + ) +)] +async fn update_source( + auth_user: AuthUser, + Path(source_id): Path, + State(state): State>, + Json(update_data): Json, +) -> Result, StatusCode> { + // Check if source exists + let existing = state + .db + .get_source(auth_user.user.id, source_id) + .await + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)? + .ok_or(StatusCode::NOT_FOUND)?; + + // Validate config if provided + if let Some(config) = &update_data.config { + if let Err(_) = validate_config_for_type(&existing.source_type, config) { + return Err(StatusCode::BAD_REQUEST); + } + } + + let source = state + .db + .update_source(auth_user.user.id, source_id, &update_data) + .await + .map_err(|_| StatusCode::BAD_REQUEST)?; + + Ok(Json(source.into())) +} + +#[utoipa::path( + delete, + path = "/api/sources/{id}", + tag = "sources", + security( + ("bearer_auth" = []) + ), + params( + ("id" = Uuid, Path, description = "Source ID") + ), + responses( + (status = 204, description = "Source deleted successfully"), + (status = 404, description = "Source not found"), + (status = 401, description = "Unauthorized") + ) +)] +async fn delete_source( + auth_user: AuthUser, + Path(source_id): Path, + State(state): State>, +) -> Result { + let deleted = state + .db + .delete_source(auth_user.user.id, source_id) + .await + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + + if deleted { + Ok(StatusCode::NO_CONTENT) + } else { + Err(StatusCode::NOT_FOUND) + } +} + +#[utoipa::path( + post, + path = "/api/sources/{id}/sync", + tag = "sources", + security( + ("bearer_auth" = []) + ), + params( + ("id" = Uuid, Path, description = "Source ID") + ), + responses( + (status = 200, description = "Sync triggered successfully"), + (status = 404, description = "Source not found"), + (status = 409, description = "Source is already syncing"), + (status = 401, description = "Unauthorized") + ) +)] +async fn trigger_sync( + auth_user: AuthUser, + Path(source_id): Path, + State(state): State>, +) -> Result { + let source = state + .db + .get_source(auth_user.user.id, source_id) + .await + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)? + .ok_or(StatusCode::NOT_FOUND)?; + + // Check if already syncing + if matches!(source.status, crate::models::SourceStatus::Syncing) { + return Err(StatusCode::CONFLICT); + } + + // Update status to syncing + state + .db + .update_source_status(source_id, crate::models::SourceStatus::Syncing, None) + .await + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + + // Trigger the appropriate sync based on source type + match source.source_type { + crate::models::SourceType::WebDAV => { + // Send a message to trigger WebDAV sync + if let Some(scheduler) = &state.webdav_scheduler { + scheduler.trigger_sync(source_id).await; + } + } + _ => { + // Other source types not implemented yet + state + .db + .update_source_status( + source_id, + crate::models::SourceStatus::Error, + Some("Source type not implemented".to_string()), + ) + .await + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + return Err(StatusCode::NOT_IMPLEMENTED); + } + } + + Ok(StatusCode::OK) +} + +#[utoipa::path( + post, + path = "/api/sources/{id}/test", + tag = "sources", + security( + ("bearer_auth" = []) + ), + params( + ("id" = Uuid, Path, description = "Source ID") + ), + responses( + (status = 200, description = "Connection test result", body = serde_json::Value), + (status = 404, description = "Source not found"), + (status = 401, description = "Unauthorized") + ) +)] +async fn test_connection( + auth_user: AuthUser, + Path(source_id): Path, + State(state): State>, +) -> Result, StatusCode> { + let source = state + .db + .get_source(auth_user.user.id, source_id) + .await + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)? + .ok_or(StatusCode::NOT_FOUND)?; + + match source.source_type { + crate::models::SourceType::WebDAV => { + // Test WebDAV connection + let config: crate::models::WebDAVSourceConfig = serde_json::from_value(source.config) + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + + match crate::webdav_service::test_webdav_connection( + &config.server_url, + &config.username, + &config.password, + ) + .await + { + Ok(success) => Ok(Json(serde_json::json!({ + "success": success, + "message": if success { "Connection successful" } else { "Connection failed" } + }))), + Err(e) => Ok(Json(serde_json::json!({ + "success": false, + "message": format!("Connection failed: {}", e) + }))), + } + } + _ => Ok(Json(serde_json::json!({ + "success": false, + "message": "Source type not implemented" + }))), + } +} + +fn validate_source_config(source: &CreateSource) -> Result<(), &'static str> { + validate_config_for_type(&source.source_type, &source.config) +} + +fn validate_config_for_type( + source_type: &crate::models::SourceType, + config: &serde_json::Value, +) -> Result<(), &'static str> { + match source_type { + crate::models::SourceType::WebDAV => { + let _: crate::models::WebDAVSourceConfig = + serde_json::from_value(config.clone()).map_err(|_| "Invalid WebDAV configuration")?; + Ok(()) + } + _ => Ok(()), // Other types not implemented yet + } +} \ No newline at end of file diff --git a/src/routes/webdav/webdav_sync.rs b/src/routes/webdav/webdav_sync.rs index 67180f6..bd644ca 100644 --- a/src/routes/webdav/webdav_sync.rs +++ b/src/routes/webdav/webdav_sync.rs @@ -87,6 +87,14 @@ async fn perform_sync_internal( // Process each watch folder for folder_path in &config.watch_folders { + // Check if sync has been cancelled before processing each folder + if let Ok(Some(sync_state)) = state.db.get_webdav_sync_state(user_id).await { + if !sync_state.is_running { + info!("WebDAV sync cancelled, stopping folder processing"); + return Err("Sync cancelled by user".into()); + } + } + info!("Syncing folder: {}", folder_path); // Update current folder in sync state @@ -161,6 +169,17 @@ async fn perform_sync_internal( // Process files concurrently and collect results while let Some(result) = file_futures.next().await { + // Check if sync has been cancelled + if let Ok(Some(sync_state)) = state.db.get_webdav_sync_state(user_id).await { + if !sync_state.is_running { + info!("WebDAV sync cancelled during file processing, stopping"); + // Cancel remaining futures + file_futures.clear(); + sync_errors.push("Sync cancelled by user during file processing".to_string()); + break; + } + } + match result { Ok(processed) => { if processed { @@ -215,6 +234,14 @@ async fn process_single_file( // Acquire semaphore permit to limit concurrent downloads let _permit = semaphore.acquire().await.map_err(|e| format!("Semaphore error: {}", e))?; + // Check if sync has been cancelled before processing this file + if let Ok(Some(sync_state)) = state.db.get_webdav_sync_state(user_id).await { + if !sync_state.is_running { + info!("Sync cancelled, skipping file: {}", file_info.path); + return Err("Sync cancelled by user".to_string()); + } + } + info!("Processing file: {}", file_info.path); // Check if we've already processed this file diff --git a/src/webdav_scheduler.rs b/src/webdav_scheduler.rs index 0eb1810..bdb6f3a 100644 --- a/src/webdav_scheduler.rs +++ b/src/webdav_scheduler.rs @@ -30,6 +30,11 @@ impl WebDAVScheduler { pub async fn start(&self) { info!("Starting WebDAV background sync scheduler"); + // First, check for any interrupted syncs that need to be resumed + if let Err(e) = self.resume_interrupted_syncs().await { + error!("Error resuming interrupted syncs: {}", e); + } + let mut interval_timer = interval(self.check_interval); loop { @@ -41,6 +46,90 @@ impl WebDAVScheduler { } } + async fn resume_interrupted_syncs(&self) -> Result<(), Box> { + info!("Checking for interrupted WebDAV syncs to resume"); + + // Get all users with settings + let users_with_settings = self.db.get_all_user_settings().await?; + + for user_settings in users_with_settings { + // Skip if WebDAV is not enabled + if !user_settings.webdav_enabled { + continue; + } + + // Check if there's an interrupted sync for this user + if let Ok(Some(sync_state)) = self.db.get_webdav_sync_state(user_settings.user_id).await { + // Check if sync was interrupted (has errors containing "server restart" message) + let was_interrupted = sync_state.errors.iter().any(|e| e.contains("server restart")); + + if was_interrupted && user_settings.webdav_auto_sync { + info!("Found interrupted WebDAV sync for user {}, will resume", user_settings.user_id); + + // Clear the interruption error and resume sync + let cleared_errors: Vec = sync_state.errors.into_iter() + .filter(|e| !e.contains("server restart")) + .collect(); + + let reset_state = crate::models::UpdateWebDAVSyncState { + last_sync_at: sync_state.last_sync_at, + sync_cursor: sync_state.sync_cursor, + is_running: false, + files_processed: sync_state.files_processed, + files_remaining: 0, + current_folder: None, + errors: cleared_errors, + }; + + if let Err(e) = self.db.update_webdav_sync_state(user_settings.user_id, &reset_state).await { + error!("Failed to reset interrupted sync state: {}", e); + continue; + } + + // Trigger a new sync for this user + if let Ok(webdav_config) = self.build_webdav_config(&user_settings) { + if let Ok(webdav_service) = WebDAVService::new(webdav_config.clone()) { + let state_clone = self.state.clone(); + let user_id = user_settings.user_id; + let enable_background_ocr = user_settings.enable_background_ocr; + + info!("Resuming interrupted WebDAV sync for user {}", user_id); + + tokio::spawn(async move { + match perform_webdav_sync_with_tracking(state_clone.clone(), user_id, webdav_service, webdav_config, enable_background_ocr).await { + Ok(files_processed) => { + info!("Resumed WebDAV sync completed for user {}: {} files processed", user_id, files_processed); + + // Send notification + let notification = crate::models::CreateNotification { + notification_type: "success".to_string(), + title: "WebDAV Sync Resumed".to_string(), + message: format!("Resumed sync after server restart. Processed {} files", files_processed), + action_url: Some("/documents".to_string()), + metadata: Some(serde_json::json!({ + "sync_type": "webdav_resume", + "files_processed": files_processed + })), + }; + + if let Err(e) = state_clone.db.create_notification(user_id, ¬ification).await { + error!("Failed to create resume notification: {}", e); + } + } + Err(e) => { + error!("Resumed WebDAV sync failed for user {}: {}", user_id, e); + } + } + }); + } + } + } + } + } + + Ok(()) + } + async fn check_and_sync_users(&self) -> Result<(), Box> { // Get all users with WebDAV auto-sync enabled let users_with_settings = self.db.get_all_user_settings().await?; @@ -189,5 +278,11 @@ impl WebDAVScheduler { server_type: Some("nextcloud".to_string()), }) } + + pub async fn trigger_sync(&self, source_id: uuid::Uuid) { + info!("Triggering manual sync for source {}", source_id); + // TODO: Implement manual sync trigger for sources + // For now, this is a placeholder that the routes can call + } } diff --git a/src/webdav_service.rs b/src/webdav_service.rs index 1abafba..bc47e30 100644 --- a/src/webdav_service.rs +++ b/src/webdav_service.rs @@ -420,4 +420,23 @@ impl WebDAVService { Ok(bytes.to_vec()) } +} + +pub async fn test_webdav_connection( + server_url: &str, + username: &str, + password: &str, +) -> Result { + let client = Client::new(); + + // Try to list the root directory to test connectivity + let response = client + .request(Method::from_bytes(b"PROPFIND")?, server_url) + .header("Depth", "0") + .basic_auth(username, Some(password)) + .timeout(Duration::from_secs(10)) + .send() + .await?; + + Ok(response.status().is_success()) } \ No newline at end of file