diff --git a/.claude/agents/react-minimal-ux-expert.md b/.claude/agents/react-minimal-ux-expert.md index c0b97be..344b598 100644 --- a/.claude/agents/react-minimal-ux-expert.md +++ b/.claude/agents/react-minimal-ux-expert.md @@ -1,5 +1,5 @@ --- -name: react-minimal-ux-expert +name: react-expert description: Use this agent when you need to create, review, or improve React components and applications with a focus on 2026 minimalistic design principles and intuitive user experience. Examples: Context: User wants to create a modern login form component. user: 'I need to build a login form component for my React app' assistant: 'I'll use the react-minimal-ux-expert agent to create a minimalistic, user-friendly login form that follows 2026 design principles' The user needs a React component with modern UX design, perfect for the react-minimal-ux-expert agent. Context: User has built a dashboard but wants UX improvements. user: 'My React dashboard feels cluttered and hard to navigate' assistant: 'Let me use the react-minimal-ux-expert agent to analyze your dashboard and suggest minimalistic design improvements for better user experience' The user needs UX analysis and minimalistic design guidance for their React application. color: purple --- diff --git a/.claude/agents/rust-react-test-fixer.md b/.claude/agents/rust-react-test-fixer.md index 5277866..ea69052 100644 --- a/.claude/agents/rust-react-test-fixer.md +++ b/.claude/agents/rust-react-test-fixer.md @@ -1,5 +1,5 @@ --- -name: rust-react-test-fixer +name: test-fixer description: Use this agent when you need to diagnose and fix failing tests in a Rust/React codebase that has unit tests, integration tests, and E2E tests. Examples: Context: User has a Rust/React project with failing tests across multiple test suites. user: 'My tests are failing and I can't figure out why. Can you help me get them all passing?' assistant: 'I'll use the rust-react-test-fixer agent to systematically diagnose and fix the failing tests across all three test categories.' The user needs comprehensive test debugging across unit, integration, and E2E tests, which is exactly what this agent specializes in. Context: User reports that integration tests are failing after making server changes. user: 'I made some changes to my Rust server and now my integration tests are broken' assistant: 'Let me use the rust-react-test-fixer agent to analyze the integration test failures and determine what needs to be fixed.' This is a perfect case for the test-fixer agent as it involves debugging specific test category failures. color: pink --- diff --git a/frontend/src/components/SyncProgressDisplay.tsx b/frontend/src/components/SyncProgressDisplay.tsx index 2643b1b..93a9212 100644 --- a/frontend/src/components/SyncProgressDisplay.tsx +++ b/frontend/src/components/SyncProgressDisplay.tsx @@ -73,6 +73,7 @@ export const SyncProgressDisplay: React.FC = ({ onConnectionStatusChange: handleConnectionStatusChange, }); + const formatBytes = (bytes: number): string => { if (bytes === 0) return '0 B'; const k = 1024; diff --git a/frontend/src/components/__tests__/SyncProgressDisplay.test.tsx b/frontend/src/components/__tests__/SyncProgressDisplay.test.tsx index e4ba89c..9099954 100644 --- a/frontend/src/components/__tests__/SyncProgressDisplay.test.tsx +++ b/frontend/src/components/__tests__/SyncProgressDisplay.test.tsx @@ -1,5 +1,15 @@ import { describe, test, expect, vi, beforeEach, afterEach } from 'vitest'; import { screen, fireEvent, waitFor, act } from '@testing-library/react'; + +// Create a mock for the hook FIRST, before any component imports +const mockUseSyncProgressWebSocket = vi.fn(); +vi.mock('../../hooks/useSyncProgressWebSocket', () => ({ + useSyncProgressWebSocket: mockUseSyncProgressWebSocket, +})); + +// Use the automatic mocking with __mocks__ directory +vi.mock('../../services/api'); + import SyncProgressDisplay from '../SyncProgressDisplay'; import { renderWithProviders } from '../../test/test-utils'; // Define SyncProgressInfo type locally for tests @@ -23,11 +33,24 @@ interface SyncProgressInfo { is_active: boolean; } -// Mock the API module using the __mocks__ version +// Use the automatic mocking with __mocks__ directory vi.mock('../../services/api'); -// Import the mock helpers -import { getMockSyncProgressWebSocket, resetMockSyncProgressWebSocket, MockSyncProgressWebSocket, sourcesService } from '../../services/__mocks__/api'; +// Import mock helpers directly from the mock file +import { getMockSyncProgressWebSocket, resetMockSyncProgressWebSocket, MockSyncProgressWebSocket } from '../../services/__mocks__/api'; + +// Import the mocked services +import { sourcesService } from '../../services/api'; + +// Define ConnectionStatus type locally +type ConnectionStatus = 'disconnected' | 'connecting' | 'connected' | 'reconnecting' | 'error' | 'failed'; + +// Create a mock for the hook +const mockUseSyncProgressWebSocket = vi.fn(); +vi.mock('../../hooks/useSyncProgressWebSocket', () => ({ + useSyncProgressWebSocket: mockUseSyncProgressWebSocket, + ConnectionStatus: {} as any, +})); // Create mock progress data factory const createMockProgressInfo = (overrides: Partial = {}): SyncProgressInfo => ({ @@ -51,33 +74,61 @@ const createMockProgressInfo = (overrides: Partial = {}): Sync ...overrides, }); -// Helper function to simulate progress updates -const simulateProgressUpdate = (progressData: SyncProgressInfo) => { +// Helper functions to simulate hook state changes +let currentMockState = { + progressInfo: null as SyncProgressInfo | null, + connectionStatus: 'connecting' as ConnectionStatus, + isConnected: false, + reconnect: vi.fn(), + disconnect: vi.fn(), +}; + +const mockHookState = (overrides: Partial) => { + currentMockState = { + ...currentMockState, + ...overrides, + }; + mockUseSyncProgressWebSocket.mockReturnValue(currentMockState); +}; + +// Helper functions to simulate WebSocket events that update the hook state +const simulateProgressUpdate = (progressInfo: SyncProgressInfo) => { + mockHookState({ + progressInfo, + connectionStatus: 'connected', + isConnected: true + }); + + // Also trigger the mock WebSocket's progress event for completeness const mockWS = getMockSyncProgressWebSocket(); if (mockWS) { - act(() => { - mockWS.simulateProgress(progressData); - }); + mockWS.simulateProgress(progressInfo); } }; -// Helper function to simulate heartbeat updates -const simulateHeartbeatUpdate = (data: any) => { +const simulateConnectionStatusChange = (status: ConnectionStatus) => { + mockHookState({ + connectionStatus: status, + isConnected: status === 'connected' + }); + + // Also trigger the mock WebSocket's connection status event const mockWS = getMockSyncProgressWebSocket(); if (mockWS) { - act(() => { - mockWS.simulateHeartbeat(data); - }); + mockWS.simulateConnectionStatus(status); } }; -// Helper function to simulate connection status changes -const simulateConnectionStatusChange = (status: string) => { +const simulateHeartbeatUpdate = (heartbeatData: { source_id: string; is_active: boolean; timestamp: number }) => { + // If heartbeat indicates sync is not active, clear progress info + if (!heartbeatData.is_active) { + mockHookState({ progressInfo: null }); + } + + // Also trigger the mock WebSocket's heartbeat event const mockWS = getMockSyncProgressWebSocket(); if (mockWS) { - act(() => { - mockWS.simulateConnectionStatus(status); - }); + mockWS.simulateHeartbeat(heartbeatData); } }; @@ -98,6 +149,16 @@ describe('SyncProgressDisplay Component', () => { // Reset the mock WebSocket instance resetMockSyncProgressWebSocket(); + // Initialize the mock hook with default state + currentMockState = { + progressInfo: null, + connectionStatus: 'disconnected', + isConnected: false, + reconnect: vi.fn(), + disconnect: vi.fn(), + }; + mockUseSyncProgressWebSocket.mockReturnValue(currentMockState); + // Mock localStorage for token access Object.defineProperty(global, 'localStorage', { value: { @@ -128,28 +189,47 @@ describe('SyncProgressDisplay Component', () => { describe('Visibility and Rendering', () => { test('should not render when isVisible is false', () => { renderComponent({ isVisible: false }); - expect(screen.queryByText('Test WebDAV Source - Sync Progress')).not.toBeInTheDocument(); + expect(screen.queryByText((content, element) => { + return element?.textContent === 'Test WebDAV Source - Sync Progress'; + })).not.toBeInTheDocument(); }); - test('should render when isVisible is true', () => { + test('should render when isVisible is true', async () => { + // Start with connecting status so component will be visible + mockHookState({ connectionStatus: 'connecting', isConnected: false }); + renderComponent({ isVisible: true }); - expect(screen.getByText('Test WebDAV Source - Sync Progress')).toBeInTheDocument(); + + // Component should be visible immediately with connecting status + await waitFor(() => { + expect(screen.getByText(/Test WebDAV Source/)).toBeInTheDocument(); + expect(screen.getByText(/Sync Progress/)).toBeInTheDocument(); + }); }); test('should show connecting status initially', async () => { + // Set connecting status for this test + mockHookState({ connectionStatus: 'connecting', isConnected: false }); + renderComponent(); - // The hook starts in disconnected state, then moves to connecting + // Wait for the component to be visible and show connecting status await waitFor(() => { - simulateConnectionStatusChange('connecting'); + expect(screen.getByText('Connecting...')).toBeInTheDocument(); }); - - expect(screen.getByText('Connecting...')).toBeInTheDocument(); }); - test('should render with custom source name', () => { + test('should render with custom source name', async () => { + // Set connecting status so component will be visible + mockHookState({ connectionStatus: 'connecting', isConnected: false }); + renderComponent({ sourceName: 'My Custom Source' }); - expect(screen.getByText('My Custom Source - Sync Progress')).toBeInTheDocument(); + + // Wait for the component to be visible with custom source name + await waitFor(() => { + expect(screen.getByText(/My Custom Source/)).toBeInTheDocument(); + expect(screen.getByText(/Sync Progress/)).toBeInTheDocument(); + }); }); }); diff --git a/frontend/src/pages/SettingsPage.tsx b/frontend/src/pages/SettingsPage.tsx index f6a99d9..531706b 100644 --- a/frontend/src/pages/SettingsPage.tsx +++ b/frontend/src/pages/SettingsPage.tsx @@ -34,14 +34,18 @@ import { Chip, LinearProgress, CircularProgress, + Tooltip, + Divider, } from '@mui/material'; import Grid from '@mui/material/GridLegacy'; import { Edit as EditIcon, Delete as DeleteIcon, Add as AddIcon, CloudSync as CloudSyncIcon, Folder as FolderIcon, Assessment as AssessmentIcon, PlayArrow as PlayArrowIcon, - Pause as PauseIcon, Stop as StopIcon } from '@mui/icons-material'; + Pause as PauseIcon, Stop as StopIcon, CheckCircle as CheckCircleIcon, + Error as ErrorIcon, Visibility as VisibilityIcon, CreateNewFolder as CreateNewFolderIcon, + RemoveCircle as RemoveCircleIcon, Warning as WarningIcon } from '@mui/icons-material'; import { useAuth } from '../contexts/AuthContext'; -import api, { queueService, ErrorHelper, ErrorCodes } from '../services/api'; +import api, { queueService, ErrorHelper, ErrorCodes, userWatchService, UserWatchDirectoryResponse } from '../services/api'; import OcrLanguageSelector from '../components/OcrLanguageSelector'; import LanguageSelector from '../components/LanguageSelector'; @@ -262,6 +266,21 @@ const SettingsPage: React.FC = () => { const [serverConfig, setServerConfig] = useState(null); const [configLoading, setConfigLoading] = useState(false); + // Watch Directory State + const [userWatchDirectories, setUserWatchDirectories] = useState>(new Map()); + const [watchDirLoading, setWatchDirLoading] = useState>(new Map()); + const [confirmDialog, setConfirmDialog] = useState<{ + open: boolean; + title: string; + message: string; + onConfirm: () => void; + }>({ + open: false, + title: '', + message: '', + onConfirm: () => {}, + }); + useEffect(() => { fetchSettings(); @@ -270,6 +289,13 @@ const SettingsPage: React.FC = () => { fetchServerConfiguration(); }, []); + // Fetch watch directory information after users are loaded + useEffect(() => { + if (users.length > 0) { + fetchUserWatchDirectories(); + } + }, [users]); + const fetchSettings = async (): Promise => { try { const response = await api.get('/settings'); @@ -560,6 +586,255 @@ const SettingsPage: React.FC = () => { } }; + // Watch Directory Functions + const fetchUserWatchDirectories = async (): Promise => { + try { + const watchDirMap = new Map(); + + // Fetch watch directory info for each user + await Promise.all( + users.map(async (user) => { + try { + const response = await userWatchService.getUserWatchDirectory(user.id); + watchDirMap.set(user.id, response.data); + } catch (error: any) { + // If watch directory doesn't exist or user doesn't have one, that's okay + if (error.response?.status === 404) { + watchDirMap.set(user.id, { + user_id: user.id, + username: user.username, + watch_directory_path: `./user_watch/${user.username}`, + exists: false, + enabled: false, + }); + } else { + console.error(`Error fetching watch directory for user ${user.username}:`, error); + } + } + }) + ); + + setUserWatchDirectories(watchDirMap); + } catch (error: any) { + console.error('Error fetching user watch directories:', error); + // Don't show error message as this might not be available for all users + } + }; + + const setUserWatchDirLoading = (userId: string, loading: boolean): void => { + setWatchDirLoading(prev => { + const newMap = new Map(prev); + if (loading) { + newMap.set(userId, true); + } else { + newMap.delete(userId); + } + return newMap; + }); + }; + + const handleCreateWatchDirectory = async (userId: string): Promise => { + setUserWatchDirLoading(userId, true); + try { + const response = await userWatchService.createUserWatchDirectory(userId); + if (response.data.success) { + showSnackbar('Watch directory created successfully', 'success'); + // Refresh the watch directory info for this user + try { + const updatedResponse = await userWatchService.getUserWatchDirectory(userId); + setUserWatchDirectories(prev => { + const newMap = new Map(prev); + newMap.set(userId, updatedResponse.data); + return newMap; + }); + } catch (fetchError) { + console.error('Error refreshing watch directory info:', fetchError); + } + } else { + showSnackbar(response.data.message || 'Failed to create watch directory', 'error'); + } + } catch (error: any) { + console.error('Error creating watch directory:', error); + + const errorInfo = ErrorHelper.formatErrorForDisplay(error, true); + if (error.response?.status === 403) { + showSnackbar('Admin access required to create watch directories', 'error'); + } else if (error.response?.status === 409) { + showSnackbar('Watch directory already exists for this user', 'warning'); + } else { + showSnackbar(errorInfo.message || 'Failed to create watch directory', 'error'); + } + } finally { + setUserWatchDirLoading(userId, false); + } + }; + + const handleViewWatchDirectory = (directoryPath: string): void => { + // For now, just show the path in a snackbar + // In a real implementation, this could open a file explorer or navigate to a directory view + showSnackbar(`Watch directory: ${directoryPath}`, 'info'); + }; + + const handleRemoveWatchDirectory = (userId: string, username: string): void => { + setConfirmDialog({ + open: true, + title: 'Remove Watch Directory', + message: `Are you sure you want to remove the watch directory for user "${username}"? This action cannot be undone and will stop monitoring their directory for new files.`, + onConfirm: () => confirmRemoveWatchDirectory(userId), + }); + }; + + const confirmRemoveWatchDirectory = async (userId: string): Promise => { + setUserWatchDirLoading(userId, true); + try { + const response = await userWatchService.deleteUserWatchDirectory(userId); + if (response.data.success) { + showSnackbar('Watch directory removed successfully', 'success'); + // Update the watch directory info to reflect removal + setUserWatchDirectories(prev => { + const newMap = new Map(prev); + const current = newMap.get(userId); + if (current) { + newMap.set(userId, { + ...current, + exists: false, + enabled: false, + }); + } + return newMap; + }); + } else { + showSnackbar(response.data.message || 'Failed to remove watch directory', 'error'); + } + } catch (error: any) { + console.error('Error removing watch directory:', error); + + const errorInfo = ErrorHelper.formatErrorForDisplay(error, true); + if (error.response?.status === 403) { + showSnackbar('Admin access required to remove watch directories', 'error'); + } else if (error.response?.status === 404) { + showSnackbar('Watch directory not found or already removed', 'warning'); + // Update state to reflect that it doesn't exist + setUserWatchDirectories(prev => { + const newMap = new Map(prev); + const current = newMap.get(userId); + if (current) { + newMap.set(userId, { + ...current, + exists: false, + enabled: false, + }); + } + return newMap; + }); + } else { + showSnackbar(errorInfo.message || 'Failed to remove watch directory', 'error'); + } + } finally { + setUserWatchDirLoading(userId, false); + setConfirmDialog(prev => ({ ...prev, open: false })); + } + }; + + const handleCloseConfirmDialog = (): void => { + setConfirmDialog(prev => ({ ...prev, open: false })); + }; + + // Helper function to render watch directory status + const renderWatchDirectoryStatus = (userId: string, username: string) => { + const watchDirInfo = userWatchDirectories.get(userId); + const isLoading = watchDirLoading.get(userId) || false; + + if (isLoading) { + return ( + + + + Loading... + + + ); + } + + if (!watchDirInfo) { + return ( + + Unknown + + ); + } + + const getStatusIcon = () => { + if (watchDirInfo.exists && watchDirInfo.enabled) { + return ; + } else if (watchDirInfo.exists && !watchDirInfo.enabled) { + return ; + } else { + return ; + } + }; + + const getStatusText = () => { + if (watchDirInfo.exists && watchDirInfo.enabled) { + return 'Active'; + } else if (watchDirInfo.exists && !watchDirInfo.enabled) { + return 'Disabled'; + } else { + return 'Not Created'; + } + }; + + const getStatusColor = (): "success" | "warning" | "error" => { + if (watchDirInfo.exists && watchDirInfo.enabled) { + return 'success'; + } else if (watchDirInfo.exists && !watchDirInfo.enabled) { + return 'warning'; + } else { + return 'error'; + } + }; + + return ( + + + + {getStatusIcon()} + + + + + {watchDirInfo.watch_directory_path} + + {/* Show truncated path on mobile */} + + .../user_watch/{username} + + + ); + }; + return ( @@ -1174,35 +1449,139 @@ const SettingsPage: React.FC = () => { - - + +
Username - Email - Created At + Email + Created At + Watch Directory Actions {users.map((user) => ( - {user.username} - {user.email} - {new Date(user.created_at).toLocaleDateString()} + + + + {user.username} + + {/* Show email on mobile */} + + {user.email} + + {/* Show created date on mobile */} + + Created: {new Date(user.created_at).toLocaleDateString()} + + + + + {user.email} + + + {new Date(user.created_at).toLocaleDateString()} + + + {renderWatchDirectoryStatus(user.id, user.username)} + - handleOpenUserDialog('edit', user)} - disabled={loading} - > - - - handleDeleteUser(user.id)} - disabled={loading || user.id === currentUser?.id} - > - - + + {/* Watch Directory Actions */} + {(() => { + const watchDirInfo = userWatchDirectories.get(user.id); + const isWatchDirLoading = watchDirLoading.get(user.id) || false; + + if (!watchDirInfo || !watchDirInfo.exists) { + // Show Create Directory button + return ( + + handleCreateWatchDirectory(user.id)} + disabled={loading || isWatchDirLoading} + color="primary" + size="small" + > + {isWatchDirLoading ? ( + + ) : ( + + )} + + + ); + } else { + // Show View and Remove buttons + return ( + <> + + handleViewWatchDirectory(watchDirInfo.watch_directory_path)} + disabled={loading || isWatchDirLoading} + color="info" + size="small" + > + + + + + handleRemoveWatchDirectory(user.id, user.username)} + disabled={loading || isWatchDirLoading} + color="error" + size="small" + > + {isWatchDirLoading ? ( + + ) : ( + + )} + + + + ); + } + })()} + + + + {/* User Management Actions */} + + handleOpenUserDialog('edit', user)} + disabled={loading} + size="small" + > + + + + + handleDeleteUser(user.id)} + disabled={loading || user.id === currentUser?.id} + color="error" + size="small" + > + + + + ))} @@ -1440,6 +1819,40 @@ const SettingsPage: React.FC = () => { + {/* Confirmation Dialog for Watch Directory Actions */} + + + + {confirmDialog.title} + + + + {confirmDialog.message} + + + + + + + + { const theme = useTheme(); + const { user } = useAuth(); + + // Queue statistics state const [queueStats, setQueueStats] = useState(null); const [loading, setLoading] = useState(true); const [error, setError] = useState(null); const [lastRefresh, setLastRefresh] = useState(null); const [requeuingFailed, setRequeuingFailed] = useState(false); + + // User watch directory state + const [userWatchInfo, setUserWatchInfo] = useState(null); + const [userWatchLoading, setUserWatchLoading] = useState(false); + const [userWatchError, setUserWatchError] = useState(null); + const [creatingDirectory, setCreatingDirectory] = useState(false); + const [successMessage, setSuccessMessage] = useState(null); // Mock configuration data (would typically come from API) const watchConfig: WatchConfig = { @@ -62,9 +78,28 @@ const WatchFolderPage: React.FC = () => { useEffect(() => { fetchQueueStats(); + if (user) { + fetchUserWatchDirectory(); + } const interval = setInterval(fetchQueueStats, 30000); // Refresh every 30 seconds return () => clearInterval(interval); - }, []); + }, [user]); + + const fetchUserWatchDirectory = async (): Promise => { + if (!user) return; + + try { + setUserWatchLoading(true); + setUserWatchError(null); + const response = await userWatchService.getUserWatchDirectory(user.id); + setUserWatchInfo(response.data); + } catch (err) { + console.error('Error fetching user watch directory:', err); + setUserWatchError('Failed to fetch user watch directory information'); + } finally { + setUserWatchLoading(false); + } + }; const fetchQueueStats = async (): Promise => { try { @@ -81,6 +116,33 @@ const WatchFolderPage: React.FC = () => { } }; + const createUserWatchDirectory = async (): Promise => { + if (!user) return; + + try { + setCreatingDirectory(true); + setUserWatchError(null); + setSuccessMessage(null); + + const response = await userWatchService.createUserWatchDirectory(user.id); + + if (response.data.success) { + setSuccessMessage(response.data.message); + // Refresh user watch directory info + await fetchUserWatchDirectory(); + } else { + setUserWatchError(response.data.message || 'Failed to create watch directory'); + } + } catch (err) { + console.error('Error creating user watch directory:', err); + setUserWatchError('Failed to create user watch directory'); + } finally { + setCreatingDirectory(false); + // Clear success message after 5 seconds + setTimeout(() => setSuccessMessage(null), 5000); + } + }; + const requeueFailedJobs = async (): Promise => { try { setRequeuingFailed(true); @@ -152,11 +214,16 @@ const WatchFolderPage: React.FC = () => { {queueStats && queueStats.failed_count > 0 && ( @@ -178,13 +245,161 @@ const WatchFolderPage: React.FC = () => { )} - {/* Watch Folder Configuration */} + {successMessage && ( + + {successMessage} + + )} + + {/* User Watch Directory - Only show for authenticated users */} + {user && ( + + + + + Personal Watch Directory + {user.role === 'Admin' && ( + } + label="Admin" + size="small" + color="primary" + variant="outlined" + sx={{ ml: 1 }} + /> + )} + + + {userWatchError && ( + + {userWatchError} + + )} + + {userWatchLoading ? ( + + + + + + ) : userWatchInfo ? ( + + + + + Your Personal Watch Directory + + + + {userWatchInfo.watch_directory_path} + + + + + + + + Directory Status + + : } + label={userWatchInfo.exists ? 'Directory Exists' : 'Directory Missing'} + color={userWatchInfo.exists ? 'success' : 'error'} + variant="filled" + size="small" + /> + + + + Watch Status + + : } + label={userWatchInfo.enabled ? 'Enabled' : 'Disabled'} + color={userWatchInfo.enabled ? 'success' : 'warning'} + variant="filled" + size="small" + /> + + + + + {!userWatchInfo.exists && ( + + + + Your personal watch directory doesn't exist yet. Create it to start uploading files to your own dedicated folder. + + + + + )} + + ) : ( + + Unable to load personal watch directory information. Please try refreshing the page. + + )} + + + )} + + {/* Divider between Personal and Global sections */} + {user && ( + + + + System Configuration + + + + )} + + {/* Global Watch Folder Configuration */} - Watch Folder Configuration + Global Watch Folder Configuration + {user?.role === 'Admin' && ( + + )} + {user?.role !== 'Admin' && ( + + This is the system-wide watch folder configuration. All users can view this information. + + )} diff --git a/frontend/src/services/__mocks__/api.ts b/frontend/src/services/__mocks__/api.ts index 8417c11..e5e085d 100644 --- a/frontend/src/services/__mocks__/api.ts +++ b/frontend/src/services/__mocks__/api.ts @@ -67,24 +67,38 @@ const createMockWebSocket = () => { // Create the main mock instance let currentMockWebSocket = createMockWebSocket(); -// Mock the global WebSocket -global.WebSocket = vi.fn(() => currentMockWebSocket) as any; +// Mock the global WebSocket constructor +global.WebSocket = vi.fn().mockImplementation(() => currentMockWebSocket) as any; (global.WebSocket as any).CONNECTING = WEBSOCKET_CONNECTING; (global.WebSocket as any).OPEN = WEBSOCKET_OPEN; (global.WebSocket as any).CLOSING = WEBSOCKET_CLOSING; (global.WebSocket as any).CLOSED = WEBSOCKET_CLOSED; -// Mock SyncProgressWebSocket class +// Mock SyncProgressWebSocket class - also export as the main class name export class MockSyncProgressWebSocket { private listeners: { [key: string]: ((data: any) => void)[] } = {}; + private ws: any = null; // Mock WebSocket instance constructor(private sourceId: string) { + console.log('[MOCK] MockSyncProgressWebSocket created for source:', sourceId); // Store reference to current instance for test access currentMockSyncProgressWebSocket = this; + + // Create a mock WebSocket instance to match the real class structure + this.ws = { + close: vi.fn(), + readyState: WEBSOCKET_OPEN, + onopen: null, + onmessage: null, + onerror: null, + onclose: null, + }; } connect(): Promise { - // Simulate successful connection + // Immediately emit connecting status + this.emit('connectionStatus', 'connecting'); + // Simulate successful connection after a short delay setTimeout(() => { this.emit('connectionStatus', 'connected'); }, 10); @@ -111,15 +125,21 @@ export class MockSyncProgressWebSocket { } close(): void { + // Mock the same behavior as the real class + if (this.ws) { + this.ws.close(1000, 'Client requested closure'); + this.ws = null; + } this.listeners = {}; + this.emit('connectionStatus', 'disconnected'); } getReadyState(): number { - return WEBSOCKET_OPEN; + return this.ws?.readyState ?? WEBSOCKET_CLOSED; } isConnected(): boolean { - return true; + return this.ws?.readyState === WEBSOCKET_OPEN; } // Test helper methods @@ -149,6 +169,7 @@ export const sourcesService = { triggerDeepScan: vi.fn(), stopSync: vi.fn(), getSyncStatus: vi.fn(), + getSyncProgressStream: vi.fn(), createSyncProgressWebSocket: vi.fn((sourceId: string) => { return new MockSyncProgressWebSocket(sourceId); }), @@ -174,6 +195,30 @@ export const resetMockSyncProgressWebSocket = () => { return currentMockSyncProgressWebSocket; }; +// Export the mock class as the main SyncProgressWebSocket class for vitest to use +export const SyncProgressWebSocket = MockSyncProgressWebSocket; + +// Export the SyncProgressInfo type from the mock +export interface SyncProgressInfo { + source_id: string + phase: string + phase_description: string + elapsed_time_secs: number + directories_found: number + directories_processed: number + files_found: number + files_processed: number + bytes_processed: number + processing_rate_files_per_sec: number + files_progress_percent: number + estimated_time_remaining_secs?: number + current_directory: string + current_file?: string + errors: number + warnings: number + is_active: boolean +} + // Re-export types that components might need export interface Document { id: string diff --git a/frontend/src/services/__tests__/sync-progress-simple.test.ts b/frontend/src/services/__tests__/sync-progress-simple.test.ts index 22fe9d0..5bd545d 100644 --- a/frontend/src/services/__tests__/sync-progress-simple.test.ts +++ b/frontend/src/services/__tests__/sync-progress-simple.test.ts @@ -116,10 +116,10 @@ describe('Sync Progress API Methods Type Safety', () => { const { sourcesService } = await import('../api'); expect(typeof sourcesService.getSyncStatus).toBe('function'); - expect(typeof sourcesService.getSyncProgressStream).toBe('function'); expect(typeof sourcesService.triggerSync).toBe('function'); expect(typeof sourcesService.stopSync).toBe('function'); expect(typeof sourcesService.triggerDeepScan).toBe('function'); + expect(typeof sourcesService.createSyncProgressWebSocket).toBe('function'); }); test('should accept proper parameter types', () => { diff --git a/frontend/src/services/__tests__/websocket-sync-progress.test.ts b/frontend/src/services/__tests__/websocket-sync-progress.test.ts index d17e341..5bfabb9 100644 --- a/frontend/src/services/__tests__/websocket-sync-progress.test.ts +++ b/frontend/src/services/__tests__/websocket-sync-progress.test.ts @@ -1,13 +1,12 @@ import { describe, test, expect, vi, beforeEach, afterEach } from 'vitest'; // Mock WebSocket globally -const mockWebSocket = vi.fn(); const mockWebSocketInstances: any[] = []; -mockWebSocket.mockImplementation((url: string) => { +const createMockWebSocketInstance = (url: string) => { const instance = { url, - readyState: WebSocket.CONNECTING, + readyState: 0, // CONNECTING send: vi.fn(), close: vi.fn(), addEventListener: vi.fn(), @@ -24,16 +23,18 @@ mockWebSocket.mockImplementation((url: string) => { mockWebSocketInstances.push(instance); - // Simulate connection opening after a short delay - setTimeout(() => { - instance.readyState = WebSocket.OPEN; + // Simulate connection opening after the current call stack finishes + queueMicrotask(() => { + instance.readyState = 1; // OPEN if (instance.onopen) { instance.onopen(new Event('open')); } - }, 10); + }); return instance; -}); +}; + +const mockWebSocket = vi.fn().mockImplementation(createMockWebSocketInstance); // Replace global WebSocket Object.defineProperty(global, 'WebSocket', { @@ -41,6 +42,12 @@ Object.defineProperty(global, 'WebSocket', { writable: true, }); +// Set WebSocket constants on the global object +Object.defineProperty(global.WebSocket, 'CONNECTING', { value: 0 }); +Object.defineProperty(global.WebSocket, 'OPEN', { value: 1 }); +Object.defineProperty(global.WebSocket, 'CLOSING', { value: 2 }); +Object.defineProperty(global.WebSocket, 'CLOSED', { value: 3 }); + // Mock localStorage const mockLocalStorage = { getItem: vi.fn(), @@ -161,6 +168,10 @@ describe('WebSocket Sync Progress Service', () => { vi.clearAllMocks(); mockWebSocketInstances.length = 0; + // Reset the mock function + mockWebSocket.mockClear(); + mockWebSocket.mockImplementation(createMockWebSocketInstance); + sourceId = 'test-source-123'; mockOnMessage = vi.fn(); mockOnError = vi.fn(); @@ -315,17 +326,22 @@ describe('WebSocket Sync Progress Service', () => { // Simulate unexpected disconnection (not code 1000) if (wsInstance.onclose) { + // IMPORTANT: Update the mock WebSocket's readyState to CLOSED + wsInstance.readyState = 3; // WebSocket.CLOSED + wsInstance.onclose({ code: 1006, // Abnormal closure reason: 'Connection lost' }); + + // Advance time to trigger reconnection, ensuring WebSocket stays closed + vi.advanceTimersByTime(500); + wsInstance.readyState = 3; // Make sure it stays closed (factory might reset it) + vi.advanceTimersByTime(1500); // Total 2000ms } expect(mockOnConnectionChange).toHaveBeenCalledWith('disconnected'); - // Fast-forward time to trigger reconnection - vi.advanceTimersByTime(1000); - // Should attempt to reconnect expect(mockWebSocket).toHaveBeenCalledTimes(2); @@ -365,17 +381,27 @@ describe('WebSocket Sync Progress Service', () => { // Simulate multiple disconnections for (let i = 0; i < 6; i++) { - const wsInstance = mockWebSocketInstances[mockWebSocketInstances.length - 1]; + // Get the most recent WebSocket instance + let wsInstance = mockWebSocketInstances[mockWebSocketInstances.length - 1]; - if (wsInstance.onclose) { + if (wsInstance && wsInstance.onclose) { + // IMPORTANT: Update the mock WebSocket's readyState to CLOSED + wsInstance.readyState = 3; // WebSocket.CLOSED + wsInstance.onclose({ code: 1006, reason: 'Connection lost' }); } - // Fast-forward to trigger reconnection - vi.advanceTimersByTime(10000); + // Fast-forward to trigger reconnection with exponential backoff + const delay = 1000 * Math.pow(2, i); // Exponential backoff + vi.advanceTimersByTime(delay + 100); // Add a bit extra to ensure timing + + // Make sure the WebSocket stays closed after advancing time + if (wsInstance) { + wsInstance.readyState = 3; // WebSocket.CLOSED + } } // Should stop reconnecting after max attempts @@ -388,7 +414,7 @@ describe('WebSocket Sync Progress Service', () => { service.connect(); const wsInstance = mockWebSocketInstances[0]; - wsInstance.readyState = WebSocket.OPEN; + wsInstance.readyState = 1; // OPEN service.sendPing(); @@ -399,7 +425,7 @@ describe('WebSocket Sync Progress Service', () => { service.connect(); const wsInstance = mockWebSocketInstances[0]; - wsInstance.readyState = WebSocket.CLOSED; + wsInstance.readyState = 3; // CLOSED service.sendPing(); @@ -417,24 +443,24 @@ describe('WebSocket Sync Progress Service', () => { }); test('should return correct connection state', () => { - expect(service.getConnectionState()).toBe(WebSocket.CLOSED); + expect(service.getConnectionState()).toBe(3); // CLOSED service.connect(); const wsInstance = mockWebSocketInstances[0]; - wsInstance.readyState = WebSocket.CONNECTING; + wsInstance.readyState = 0; // CONNECTING - expect(service.getConnectionState()).toBe(WebSocket.CONNECTING); + expect(service.getConnectionState()).toBe(0); // CONNECTING - wsInstance.readyState = WebSocket.OPEN; - expect(service.getConnectionState()).toBe(WebSocket.OPEN); + wsInstance.readyState = 1; // OPEN + expect(service.getConnectionState()).toBe(1); // OPEN }); test('should not create multiple connections when already connected', () => { service.connect(); - const wsInstance = mockWebSocketInstances[0]; - wsInstance.readyState = WebSocket.OPEN; + const wsInstance = mockWebSocketInstances[0]; + wsInstance.readyState = 1; // OPEN // Try to connect again service.connect(); @@ -453,19 +479,25 @@ describe('WebSocket Sync Progress Service', () => { // First reconnection const wsInstance1 = mockWebSocketInstances[0]; if (wsInstance1.onclose) { + // IMPORTANT: Update the mock WebSocket's readyState to CLOSED + wsInstance1.readyState = 3; // WebSocket.CLOSED + wsInstance1.onclose({ code: 1006, reason: 'Connection lost' }); } - vi.advanceTimersByTime(1000); // 1s delay + vi.advanceTimersByTime(2000); // 1s delay * 2^0 = 1s, add extra time expect(mockWebSocket).toHaveBeenCalledTimes(initialCallCount + 1); // Second reconnection const wsInstance2 = mockWebSocketInstances[1]; if (wsInstance2.onclose) { + // IMPORTANT: Update the mock WebSocket's readyState to CLOSED + wsInstance2.readyState = 3; // WebSocket.CLOSED + wsInstance2.onclose({ code: 1006, reason: 'Connection lost' }); } - vi.advanceTimersByTime(2000); // 2s delay (exponential backoff) + vi.advanceTimersByTime(4000); // 1s * 2^1 = 2s, add extra time expect(mockWebSocket).toHaveBeenCalledTimes(initialCallCount + 2); vi.useRealTimers(); @@ -473,18 +505,33 @@ describe('WebSocket Sync Progress Service', () => { }); describe('WebSocket Message Types', () => { - test('should handle progress messages with all fields', () => { + beforeEach(() => { + vi.clearAllMocks(); + mockWebSocketInstances.length = 0; + + // Reset the mock function + mockWebSocket.mockClear(); + mockWebSocket.mockImplementation(createMockWebSocketInstance); + + mockLocalStorage.getItem.mockReturnValue('mock-jwt-token'); + }); + + test('should handle progress messages with all fields', async () => { const mockOnMessage = vi.fn(); - const service = new WebSocketSyncProgressService( + const testService = new WebSocketSyncProgressService( 'test-source', mockOnMessage, vi.fn(), vi.fn() ); - service.connect(); + testService.connect(); - const wsInstance = mockWebSocketInstances[0]; + // Wait for connection to complete so onmessage handler is set up + await new Promise(resolve => setTimeout(resolve, 10)); + + // Find the most recent WebSocket instance + const wsInstance = mockWebSocketInstances[mockWebSocketInstances.length - 1]; const progressMessage = { type: 'progress', data: { @@ -522,18 +569,22 @@ describe('WebSocket Message Types', () => { expect(receivedData.data.current_file).toBe('important-document.pdf'); }); - test('should handle error messages', () => { + test('should handle error messages', async () => { + const mockOnMessage = vi.fn(); - const service = new WebSocketSyncProgressService( + const testService = new WebSocketSyncProgressService( 'test-source', mockOnMessage, vi.fn(), vi.fn() ); - service.connect(); + testService.connect(); - const wsInstance = mockWebSocketInstances[0]; + // Wait for connection to complete so onmessage handler is set up + await new Promise(resolve => setTimeout(resolve, 10)); + + const wsInstance = mockWebSocketInstances[mockWebSocketInstances.length - 1]; const errorMessage = { type: 'error', data: { @@ -550,18 +601,22 @@ describe('WebSocket Message Types', () => { expect(mockOnMessage).toHaveBeenCalledWith(errorMessage); }); - test('should handle different sync phases', () => { + test('should handle different sync phases', async () => { + const mockOnMessage = vi.fn(); - const service = new WebSocketSyncProgressService( + const testService = new WebSocketSyncProgressService( 'test-source', mockOnMessage, vi.fn(), vi.fn() ); - service.connect(); + testService.connect(); - const wsInstance = mockWebSocketInstances[0]; + // Wait for connection to complete so onmessage handler is set up + await new Promise(resolve => setTimeout(resolve, 10)); + + const wsInstance = mockWebSocketInstances[mockWebSocketInstances.length - 1]; const phases = [ 'initializing', 'evaluating', diff --git a/frontend/src/services/api.ts b/frontend/src/services/api.ts index e0e8961..dedd7ff 100644 --- a/frontend/src/services/api.ts +++ b/frontend/src/services/api.ts @@ -662,6 +662,35 @@ export class SyncProgressWebSocket { } } +// User Watch Directory Types +export interface UserWatchDirectoryResponse { + user_id: string; + username: string; + watch_directory_path: string; + exists: boolean; + enabled: boolean; +} + +export interface UserWatchDirectoryOperationResponse { + success: boolean; + message: string; + watch_directory_path?: string; +} + +export const userWatchService = { + getUserWatchDirectory: (userId: string) => { + return api.get(`/users/${userId}/watch-directory`) + }, + + createUserWatchDirectory: (userId: string) => { + return api.post(`/users/${userId}/watch-directory`) + }, + + deleteUserWatchDirectory: (userId: string) => { + return api.delete(`/users/${userId}/watch-directory`) + }, +} + export const sourcesService = { triggerSync: (sourceId: string) => { return api.post(`/sources/${sourceId}/sync`) diff --git a/src/config.rs b/src/config.rs index 2c17566..5cd3ed2 100644 --- a/src/config.rs +++ b/src/config.rs @@ -8,6 +8,8 @@ pub struct Config { pub jwt_secret: String, pub upload_path: String, pub watch_folder: String, + pub user_watch_base_dir: String, + pub enable_per_user_watch: bool, pub allowed_file_types: Vec, pub watch_interval_seconds: Option, pub file_stability_check_ms: Option, @@ -162,6 +164,33 @@ impl Config { default_folder } }, + user_watch_base_dir: match env::var("USER_WATCH_BASE_DIR") { + Ok(dir) => { + println!("✅ USER_WATCH_BASE_DIR: {} (loaded from env)", dir); + dir + } + Err(_) => { + let default_dir = "./user_watch".to_string(); + println!("⚠️ USER_WATCH_BASE_DIR: {} (using default - env var not set)", default_dir); + default_dir + } + }, + enable_per_user_watch: match env::var("ENABLE_PER_USER_WATCH") { + Ok(val) => match val.to_lowercase().as_str() { + "true" | "1" | "yes" | "on" => { + println!("✅ ENABLE_PER_USER_WATCH: true (loaded from env)"); + true + } + _ => { + println!("✅ ENABLE_PER_USER_WATCH: false (loaded from env)"); + false + } + }, + Err(_) => { + println!("⚠️ ENABLE_PER_USER_WATCH: false (using default - env var not set)"); + false + } + }, allowed_file_types: { let file_types_str = match env::var("ALLOWED_FILE_TYPES") { Ok(types) => { @@ -433,6 +462,10 @@ impl Config { println!("🌐 Server will bind to: {}", config.server_address); println!("📁 Upload directory: {}", config.upload_path); println!("👁️ Watch directory: {}", config.watch_folder); + println!("👥 Per-user watch enabled: {}", config.enable_per_user_watch); + if config.enable_per_user_watch { + println!("📂 User watch base directory: {}", config.user_watch_base_dir); + } println!("📄 Allowed file types: {:?}", config.allowed_file_types); println!("🧠 OCR language: {}", config.ocr_language); println!("⚙️ Concurrent OCR jobs: {}", config.concurrent_ocr_jobs); @@ -485,9 +518,13 @@ impl Config { let upload_path = Path::new(&self.upload_path); let watch_path = Path::new(&self.watch_folder); + let user_watch_path = Path::new(&self.user_watch_base_dir); println!("📁 Checking upload directory: {}", self.upload_path); println!("👁️ Checking watch directory: {}", self.watch_folder); + if self.enable_per_user_watch { + println!("👥 Checking user watch base directory: {}", self.user_watch_base_dir); + } // Check if paths exist and are accessible if !upload_path.exists() { @@ -512,6 +549,19 @@ impl Config { println!("✅ Watch directory exists and is accessible"); } + if self.enable_per_user_watch { + if !user_watch_path.exists() { + println!("⚠️ User watch base directory does not exist yet: {}", self.user_watch_base_dir); + } else if !user_watch_path.is_dir() { + println!("❌ User watch base path exists but is not a directory: {}", self.user_watch_base_dir); + return Err(anyhow::anyhow!( + "User watch base directory '{}' exists but is not a directory", self.user_watch_base_dir + )); + } else { + println!("✅ User watch base directory exists and is accessible"); + } + } + // Normalize paths to handle relative paths and symlinks let upload_canonical = upload_path.canonicalize() .unwrap_or_else(|_| { @@ -523,9 +573,21 @@ impl Config { println!("⚠️ Could not canonicalize watch path, using as-is"); watch_path.to_path_buf() }); + let user_watch_canonical = if self.enable_per_user_watch { + Some(user_watch_path.canonicalize() + .unwrap_or_else(|_| { + println!("⚠️ Could not canonicalize user watch path, using as-is"); + user_watch_path.to_path_buf() + })) + } else { + None + }; println!("📍 Canonical upload path: {}", upload_canonical.display()); println!("📍 Canonical watch path: {}", watch_canonical.display()); + if let Some(ref user_watch) = user_watch_canonical { + println!("📍 Canonical user watch path: {}", user_watch.display()); + } // Check if paths are the same if upload_canonical == watch_canonical { @@ -572,6 +634,61 @@ impl Config { )); } + // Additional validation for user watch directory if enabled + if let Some(ref user_watch) = user_watch_canonical { + // Check if user watch is same as upload or watch + if user_watch == &upload_canonical { + println!("❌ CRITICAL ERROR: User watch base directory is same as upload directory!"); + return Err(anyhow::anyhow!( + "❌ Configuration Error: USER_WATCH_BASE_DIR cannot be the same as UPLOAD_PATH.\n\ + Current config:\n\ + - UPLOAD_PATH: {}\n\ + - USER_WATCH_BASE_DIR: {}\n\ + Please set them to different directories.", + self.upload_path, self.user_watch_base_dir + )); + } + + if user_watch == &watch_canonical { + println!("❌ CRITICAL ERROR: User watch base directory is same as global watch directory!"); + return Err(anyhow::anyhow!( + "❌ Configuration Error: USER_WATCH_BASE_DIR cannot be the same as WATCH_FOLDER.\n\ + Current config:\n\ + - WATCH_FOLDER: {}\n\ + - USER_WATCH_BASE_DIR: {}\n\ + Please set them to different directories.", + self.watch_folder, self.user_watch_base_dir + )); + } + + // Check if user watch is inside upload or vice versa + if user_watch.starts_with(&upload_canonical) { + println!("❌ CRITICAL ERROR: User watch base directory is inside upload directory!"); + return Err(anyhow::anyhow!( + "❌ Configuration Error: USER_WATCH_BASE_DIR cannot be inside UPLOAD_PATH.\n\ + This would cause recursion issues.\n\ + Current config:\n\ + - UPLOAD_PATH: {}\n\ + - USER_WATCH_BASE_DIR: {}\n\ + Please move the user watch directory outside the upload directory.", + self.upload_path, self.user_watch_base_dir + )); + } + + if upload_canonical.starts_with(user_watch) { + println!("❌ CRITICAL ERROR: Upload directory is inside user watch base directory!"); + return Err(anyhow::anyhow!( + "❌ Configuration Error: UPLOAD_PATH cannot be inside USER_WATCH_BASE_DIR.\n\ + This would cause recursion issues.\n\ + Current config:\n\ + - UPLOAD_PATH: {}\n\ + - USER_WATCH_BASE_DIR: {}\n\ + Please move the upload directory outside the user watch directory.", + self.upload_path, self.user_watch_base_dir + )); + } + } + println!("✅ Directory path validation passed - no conflicts detected"); Ok(()) } diff --git a/src/lib.rs b/src/lib.rs index 3859422..461d175 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -39,6 +39,7 @@ pub struct AppState { pub queue_service: std::sync::Arc, pub oidc_client: Option>, pub sync_progress_tracker: std::sync::Arc, + pub user_watch_service: Option>, } /// Health check endpoint for monitoring diff --git a/src/main.rs b/src/main.rs index ceaf744..80b7878 100644 --- a/src/main.rs +++ b/src/main.rs @@ -338,6 +338,16 @@ async fn main() -> anyhow::Result<()> { // Create shared progress tracker let sync_progress_tracker = Arc::new(readur::services::sync_progress_tracker::SyncProgressTracker::new()); + // Initialize user watch service if per-user watch is enabled + let user_watch_service = if config.enable_per_user_watch { + let service = readur::services::user_watch_service::UserWatchService::new(&config.user_watch_base_dir); + println!("✅ User watch service initialized: {}", config.user_watch_base_dir); + Some(Arc::new(service)) + } else { + println!("ℹ️ Per-user watch directories are disabled"); + None + }; + // Create web-facing state with shared queue service let web_state = AppState { db: web_db, @@ -347,6 +357,7 @@ async fn main() -> anyhow::Result<()> { queue_service: shared_queue_service.clone(), oidc_client: oidc_client.clone(), sync_progress_tracker: sync_progress_tracker.clone(), + user_watch_service: user_watch_service.clone(), }; let web_state = Arc::new(web_state); @@ -359,6 +370,7 @@ async fn main() -> anyhow::Result<()> { queue_service: shared_queue_service.clone(), oidc_client: oidc_client.clone(), sync_progress_tracker: sync_progress_tracker.clone(), + user_watch_service: user_watch_service.clone(), }; let background_state = Arc::new(background_state); @@ -441,6 +453,7 @@ async fn main() -> anyhow::Result<()> { queue_service: shared_queue_service.clone(), oidc_client: oidc_client.clone(), sync_progress_tracker: sync_progress_tracker.clone(), + user_watch_service: user_watch_service.clone(), }; let web_state = Arc::new(updated_web_state); diff --git a/src/routes/users.rs b/src/routes/users.rs index 5a900a9..b92102f 100644 --- a/src/routes/users.rs +++ b/src/routes/users.rs @@ -2,11 +2,13 @@ use axum::{ extract::{Path, State}, http::StatusCode, response::Json, - routing::get, + routing::{get, post, delete}, Router, }; +use serde::{Deserialize, Serialize}; use std::sync::Arc; use uuid::Uuid; +use utoipa::ToSchema; use crate::{ auth::AuthUser, @@ -15,6 +17,27 @@ use crate::{ AppState, }; +#[derive(Debug, Serialize, Deserialize, ToSchema)] +pub struct UserWatchDirectoryResponse { + pub user_id: Uuid, + pub username: String, + pub watch_directory_path: String, + pub exists: bool, + pub enabled: bool, +} + +#[derive(Debug, Serialize, Deserialize, ToSchema)] +pub struct CreateUserWatchDirectoryRequest { + pub ensure_created: Option, +} + +#[derive(Debug, Serialize, Deserialize, ToSchema)] +pub struct UserWatchDirectoryOperationResponse { + pub success: bool, + pub message: String, + pub watch_directory_path: Option, +} + fn require_admin(auth_user: &AuthUser) -> Result<(), UserError> { if auth_user.user.role != UserRole::Admin { Err(UserError::permission_denied("Admin access required")) @@ -23,10 +46,20 @@ fn require_admin(auth_user: &AuthUser) -> Result<(), UserError> { } } +fn can_access_user_data(auth_user: &AuthUser, target_user_id: Uuid) -> Result<(), UserError> { + // Admin can access any user's data, users can only access their own + if auth_user.user.role == UserRole::Admin || auth_user.user.id == target_user_id { + Ok(()) + } else { + Err(UserError::permission_denied("Cannot access other user's data")) + } +} + pub fn router() -> Router> { Router::new() .route("/", get(list_users).post(create_user)) .route("/{id}", get(get_user).put(update_user).delete(delete_user)) + .route("/{id}/watch-directory", get(get_user_watch_directory).post(create_user_watch_directory).delete(delete_user_watch_directory)) } #[utoipa::path( @@ -224,4 +257,221 @@ async fn delete_user( })?; Ok(StatusCode::NO_CONTENT) +} + +#[utoipa::path( + get, + path = "/api/users/{id}/watch-directory", + tag = "users", + security( + ("bearer_auth" = []) + ), + params( + ("id" = Uuid, Path, description = "User ID") + ), + responses( + (status = 200, description = "User watch directory information", body = UserWatchDirectoryResponse), + (status = 401, description = "Unauthorized"), + (status = 403, description = "Forbidden - Admin access required or not your user account"), + (status = 404, description = "User not found"), + (status = 501, description = "Per-user watch directories are disabled"), + (status = 500, description = "Internal server error") + ) +)] +async fn get_user_watch_directory( + auth_user: AuthUser, + State(state): State>, + Path(id): Path, +) -> Result, UserError> { + can_access_user_data(&auth_user, id)?; + + // Check if per-user watch is enabled + if !state.config.enable_per_user_watch { + return Err(UserError::internal_server_error("Per-user watch directories are not enabled".to_string())); + } + + // Get the user + let user = state + .db + .get_user_by_id(id) + .await + .map_err(|e| UserError::internal_server_error(format!("Failed to fetch user: {}", e)))? + .ok_or_else(|| UserError::not_found_by_id(id))?; + + // Get the user watch service + let user_watch_service = state + .user_watch_service + .as_ref() + .ok_or_else(|| UserError::internal_server_error("User watch service not initialized".to_string()))?; + + // Get the watch directory path + let watch_directory_path = match user_watch_service.get_user_directory(user.id).await { + Some(path) => path.to_string_lossy().to_string(), + None => { + // Try to construct the path manually if not cached + let base_dir = std::path::Path::new(&state.config.user_watch_base_dir); + base_dir.join(&user.username).to_string_lossy().to_string() + } + }; + + // Check if directory exists + let exists = tokio::fs::metadata(&watch_directory_path).await.is_ok(); + + let response = UserWatchDirectoryResponse { + user_id: user.id, + username: user.username, + watch_directory_path, + exists, + enabled: state.config.enable_per_user_watch, + }; + + Ok(Json(response)) +} + +#[utoipa::path( + post, + path = "/api/users/{id}/watch-directory", + tag = "users", + security( + ("bearer_auth" = []) + ), + params( + ("id" = Uuid, Path, description = "User ID") + ), + request_body = CreateUserWatchDirectoryRequest, + responses( + (status = 200, description = "User watch directory created successfully", body = UserWatchDirectoryOperationResponse), + (status = 401, description = "Unauthorized"), + (status = 403, description = "Forbidden - Admin access required or not your user account"), + (status = 404, description = "User not found"), + (status = 501, description = "Per-user watch directories are disabled"), + (status = 500, description = "Internal server error") + ) +)] +async fn create_user_watch_directory( + auth_user: AuthUser, + State(state): State>, + Path(id): Path, + Json(request): Json, +) -> Result, UserError> { + can_access_user_data(&auth_user, id)?; + + // Check if per-user watch is enabled + if !state.config.enable_per_user_watch { + return Err(UserError::internal_server_error("Per-user watch directories are not enabled".to_string())); + } + + // Get the user + let user = state + .db + .get_user_by_id(id) + .await + .map_err(|e| UserError::internal_server_error(format!("Failed to fetch user: {}", e)))? + .ok_or_else(|| UserError::not_found_by_id(id))?; + + // Get the user watch service + let user_watch_service = state + .user_watch_service + .as_ref() + .ok_or_else(|| UserError::internal_server_error("User watch service not initialized".to_string()))?; + + // Create or ensure the directory exists + let ensure_created = request.ensure_created.unwrap_or(true); + + let result = if ensure_created { + user_watch_service.ensure_user_directory(&user).await + } else { + match user_watch_service.get_user_directory(user.id).await { + Some(path) => Ok(path), + None => { + let base_dir = std::path::Path::new(&state.config.user_watch_base_dir); + Ok(base_dir.join(&user.username)) + } + } + }; + + match result { + Ok(watch_directory_path) => { + let response = UserWatchDirectoryOperationResponse { + success: true, + message: format!("Watch directory ready for user '{}'", user.username), + watch_directory_path: Some(watch_directory_path.to_string_lossy().to_string()), + }; + Ok(Json(response)) + } + Err(e) => { + let response = UserWatchDirectoryOperationResponse { + success: false, + message: format!("Failed to create watch directory: {}", e), + watch_directory_path: None, + }; + Ok(Json(response)) + } + } +} + +#[utoipa::path( + delete, + path = "/api/users/{id}/watch-directory", + tag = "users", + security( + ("bearer_auth" = []) + ), + params( + ("id" = Uuid, Path, description = "User ID") + ), + responses( + (status = 200, description = "User watch directory removed successfully", body = UserWatchDirectoryOperationResponse), + (status = 401, description = "Unauthorized"), + (status = 403, description = "Forbidden - Admin access required"), + (status = 404, description = "User not found"), + (status = 501, description = "Per-user watch directories are disabled"), + (status = 500, description = "Internal server error") + ) +)] +async fn delete_user_watch_directory( + auth_user: AuthUser, + State(state): State>, + Path(id): Path, +) -> Result, UserError> { + require_admin(&auth_user)?; // Only admins can delete watch directories + + // Check if per-user watch is enabled + if !state.config.enable_per_user_watch { + return Err(UserError::internal_server_error("Per-user watch directories are not enabled".to_string())); + } + + // Get the user + let user = state + .db + .get_user_by_id(id) + .await + .map_err(|e| UserError::internal_server_error(format!("Failed to fetch user: {}", e)))? + .ok_or_else(|| UserError::not_found_by_id(id))?; + + // Get the user watch service + let user_watch_service = state + .user_watch_service + .as_ref() + .ok_or_else(|| UserError::internal_server_error("User watch service not initialized".to_string()))?; + + // Remove the user's watch directory + match user_watch_service.remove_user_directory(&user).await { + Ok(_) => { + let response = UserWatchDirectoryOperationResponse { + success: true, + message: format!("Watch directory removed for user '{}'", user.username), + watch_directory_path: None, + }; + Ok(Json(response)) + } + Err(e) => { + let response = UserWatchDirectoryOperationResponse { + success: false, + message: format!("Failed to remove watch directory: {}", e), + watch_directory_path: None, + }; + Ok(Json(response)) + } + } } \ No newline at end of file diff --git a/src/scheduling/mod.rs b/src/scheduling/mod.rs index aa1045e..e8c441a 100644 --- a/src/scheduling/mod.rs +++ b/src/scheduling/mod.rs @@ -1,4 +1,5 @@ pub mod source_scheduler; pub mod source_sync; +pub mod user_watch_manager; pub mod webdav_scheduler; pub mod watcher; \ No newline at end of file diff --git a/src/scheduling/user_watch_manager.rs b/src/scheduling/user_watch_manager.rs new file mode 100644 index 0000000..f3652f7 --- /dev/null +++ b/src/scheduling/user_watch_manager.rs @@ -0,0 +1,412 @@ +use anyhow::Result; +use std::path::Path; +use std::sync::Arc; +use tokio::sync::RwLock; +use std::collections::HashMap; +use tracing::{debug, error, info, warn}; +use uuid::Uuid; + +use crate::{ + db::Database, + models::User, + services::user_watch_service::UserWatchService, +}; + +/// Manager that coordinates between the file watcher and user management +/// +/// This manager handles: +/// - Mapping file paths to users based on directory structure +/// - Discovering existing users and setting up their watch directories +/// - Handling user lifecycle events (creation/deletion) +/// - Providing efficient user lookup by file path +/// - Caching user information for performance +#[derive(Clone)] +pub struct UserWatchManager { + /// Database for user operations + db: Database, + /// Service for managing user watch directories + user_watch_service: UserWatchService, + /// Cache of username to user mappings for fast lookup + /// Uses RwLock for concurrent read access with exclusive write access + user_cache: Arc>>, + /// Cache of user directory paths to user IDs for reverse lookup + path_to_user_cache: Arc>>, +} + +impl UserWatchManager { + /// Create a new UserWatchManager + /// + /// # Arguments + /// * `db` - Database instance for user operations + /// * `user_watch_service` - Service for managing user watch directories + /// + /// # Returns + /// * New UserWatchManager instance + pub fn new(db: Database, user_watch_service: UserWatchService) -> Self { + Self { + db, + user_watch_service, + user_cache: Arc::new(RwLock::new(HashMap::new())), + path_to_user_cache: Arc::new(RwLock::new(HashMap::new())), + } + } + + /// Initialize the manager by discovering users and setting up their directories + /// + /// # Returns + /// * Result indicating success or failure + pub async fn initialize(&self) -> Result<()> { + info!("Initializing UserWatchManager"); + + // Initialize the user watch service first + self.user_watch_service.initialize().await?; + + // Discover and cache all users + self.discover_and_cache_users().await?; + + info!("UserWatchManager initialized successfully"); + Ok(()) + } + + /// Discover all users from database and cache them + async fn discover_and_cache_users(&self) -> Result<()> { + info!("Discovering and caching users"); + + // Get all users from database + let users = self.db.get_all_users().await + .map_err(|e| anyhow::anyhow!("Failed to get users from database: {}", e))?; + + let mut user_cache = self.user_cache.write().await; + let mut path_cache = self.path_to_user_cache.write().await; + + for user in users { + debug!("Caching user: {} ({})", user.username, user.id); + + // Ensure user directory exists + if let Err(e) = self.user_watch_service.ensure_user_directory(&user).await { + warn!("Failed to ensure directory for user {}: {}", user.username, e); + continue; + } + + // Get user directory path for reverse lookup cache + let user_dir = self.user_watch_service.get_user_directory_by_username(&user.username); + let dir_key = user_dir.to_string_lossy().to_string(); + + // Update caches + user_cache.insert(user.username.clone(), user.clone()); + path_cache.insert(dir_key, user.id); + } + + info!("Cached {} users and their watch directories", user_cache.len()); + Ok(()) + } + + /// Get user by username, checking cache first, then database + /// + /// # Arguments + /// * `username` - Username to look up + /// + /// # Returns + /// * Option if found + pub async fn get_user_by_username(&self, username: &str) -> Result> { + // Check cache first (read lock) + { + let cache = self.user_cache.read().await; + if let Some(user) = cache.get(username) { + debug!("Found user {} in cache", username); + return Ok(Some(user.clone())); + } + } + + debug!("User {} not in cache, checking database", username); + + // Not in cache, check database (release lock before DB operation) + let user = self.db.get_user_by_username(username).await?; + + if let Some(ref user) = user { + // Prepare directory before acquiring locks + let ensure_dir_result = self.user_watch_service.ensure_user_directory(user).await; + let user_dir = self.user_watch_service.get_user_directory_by_username(username); + let dir_key = user_dir.to_string_lossy().to_string(); + + // Update caches with short-lived locks + { + let mut cache = self.user_cache.write().await; + cache.insert(username.to_string(), user.clone()); + } + + if ensure_dir_result.is_ok() { + let mut path_cache = self.path_to_user_cache.write().await; + path_cache.insert(dir_key, user.id); + } else { + warn!("Failed to ensure directory for user {}: {:?}", username, ensure_dir_result); + } + + info!("Cached new user from database: {}", username); + } + + Ok(user) + } + + /// Get user by file path within user watch directories + /// + /// # Arguments + /// * `file_path` - Path to a file within a user watch directory + /// + /// # Returns + /// * Option if the file belongs to a user's watch directory + pub async fn get_user_by_file_path(&self, file_path: &Path) -> Result> { + // Extract username from path + let username = match self.user_watch_service.extract_username_from_path(file_path) { + Some(username) => username, + None => { + debug!("Could not extract username from path: {}", file_path.display()); + return Ok(None); + } + }; + + debug!("Extracted username '{}' from path: {}", username, file_path.display()); + + // Look up user by username + self.get_user_by_username(&username).await + } + + /// Check if a file path is within user watch directories + /// + /// # Arguments + /// * `file_path` - Path to check + /// + /// # Returns + /// * bool indicating whether the path is within user watch directories + pub fn is_user_watch_path(&self, file_path: &Path) -> bool { + self.user_watch_service.is_within_user_watch(file_path) + } + + /// Handle user creation by setting up their watch directory + /// + /// # Arguments + /// * `user` - Newly created user + /// + /// # Returns + /// * Result indicating success or failure + pub async fn handle_user_created(&self, user: &User) -> Result<()> { + info!("Setting up watch directory for new user: {}", user.username); + + // Ensure user directory exists + self.user_watch_service.ensure_user_directory(user).await?; + + // Update caches + let mut user_cache = self.user_cache.write().await; + let mut path_cache = self.path_to_user_cache.write().await; + + let user_dir = self.user_watch_service.get_user_directory_by_username(&user.username); + let dir_key = user_dir.to_string_lossy().to_string(); + + user_cache.insert(user.username.clone(), user.clone()); + path_cache.insert(dir_key, user.id); + + info!("Successfully set up watch directory for user: {}", user.username); + Ok(()) + } + + /// Handle user deletion by cleaning up their watch directory + /// + /// # Arguments + /// * `user` - User being deleted + /// + /// # Returns + /// * Result indicating success or failure + pub async fn handle_user_deleted(&self, user: &User) -> Result<()> { + info!("Cleaning up watch directory for deleted user: {}", user.username); + + // Remove user directory + self.user_watch_service.remove_user_directory(user).await?; + + // Remove from caches + let mut user_cache = self.user_cache.write().await; + let mut path_cache = self.path_to_user_cache.write().await; + + user_cache.remove(&user.username); + + // Remove from path cache (need to find the entry by user ID) + let user_dir = self.user_watch_service.get_user_directory_by_username(&user.username); + let dir_key = user_dir.to_string_lossy().to_string(); + path_cache.remove(&dir_key); + + info!("Successfully cleaned up watch directory for user: {}", user.username); + Ok(()) + } + + /// Handle username change by moving watch directory and updating caches + /// + /// # Arguments + /// * `old_username` - Previous username + /// * `updated_user` - User with updated information + /// + /// # Returns + /// * Result indicating success or failure + pub async fn handle_username_changed(&self, old_username: &str, updated_user: &User) -> Result<()> { + info!("Handling username change from '{}' to '{}'", old_username, updated_user.username); + + let old_dir = self.user_watch_service.get_user_directory_by_username(old_username); + let new_dir = self.user_watch_service.get_user_directory_by_username(&updated_user.username); + + // Move directory if it exists + if old_dir.exists() { + info!("Moving user watch directory from '{}' to '{}'", old_dir.display(), new_dir.display()); + tokio::fs::rename(&old_dir, &new_dir).await + .map_err(|e| anyhow::anyhow!( + "Failed to move user watch directory from '{}' to '{}': {}", + old_dir.display(), new_dir.display(), e + ))?; + } else { + // If old directory doesn't exist, create new one + self.user_watch_service.ensure_user_directory(updated_user).await?; + } + + // Update caches + let mut user_cache = self.user_cache.write().await; + let mut path_cache = self.path_to_user_cache.write().await; + + // Remove old entries + user_cache.remove(old_username); + let old_dir_key = old_dir.to_string_lossy().to_string(); + path_cache.remove(&old_dir_key); + + // Add new entries + user_cache.insert(updated_user.username.clone(), updated_user.clone()); + let new_dir_key = new_dir.to_string_lossy().to_string(); + path_cache.insert(new_dir_key, updated_user.id); + + info!("Successfully handled username change to '{}'", updated_user.username); + Ok(()) + } + + /// Get all users that have watch directories set up + /// + /// # Returns + /// * Vec of users with watch directories + pub async fn get_all_watch_users(&self) -> Vec { + let cache = self.user_cache.read().await; + cache.values().cloned().collect() + } + + /// Get statistics about the user watch manager + /// + /// # Returns + /// * (cached_users, service_stats) tuple + pub async fn get_statistics(&self) -> Result<(usize, (usize, usize))> { + let cached_users = { + let cache = self.user_cache.read().await; + cache.len() + }; + + let service_stats = self.user_watch_service.get_statistics().await?; + + Ok((cached_users, service_stats)) + } + + /// Clear all caches (useful for testing or cache invalidation) + pub async fn clear_caches(&self) { + let mut user_cache = self.user_cache.write().await; + let mut path_cache = self.path_to_user_cache.write().await; + + user_cache.clear(); + path_cache.clear(); + + self.user_watch_service.clear_cache().await; + + debug!("All UserWatchManager caches cleared"); + } + + /// Refresh user cache by reloading from database + /// + /// # Returns + /// * Result indicating success or failure + pub async fn refresh_user_cache(&self) -> Result<()> { + info!("Refreshing user cache from database"); + + // Clear existing cache + self.clear_caches().await; + + // Reload from database + self.discover_and_cache_users().await?; + + info!("User cache refreshed successfully"); + Ok(()) + } + + /// Get the underlying UserWatchService (for direct access if needed) + pub fn get_user_watch_service(&self) -> &UserWatchService { + &self.user_watch_service + } +} + +#[cfg(test)] +mod tests { + use super::*; + use tempfile::TempDir; + use uuid::Uuid; + use crate::models::{UserRole, AuthProvider}; + use chrono::Utc; + + fn create_test_user(username: &str) -> User { + User { + id: Uuid::new_v4(), + username: username.to_string(), + email: format!("{}@example.com", username), + password_hash: Some("test_hash".to_string()), + role: UserRole::User, + created_at: Utc::now(), + updated_at: Utc::now(), + oidc_subject: None, + oidc_issuer: None, + oidc_email: None, + auth_provider: AuthProvider::Local, + } + } + + // Note: These tests would need a mock database implementation + // For now, they serve as documentation of the intended API + + #[tokio::test] + async fn test_user_watch_manager_creation() { + let temp_dir = TempDir::new().unwrap(); + let user_watch_service = UserWatchService::new(temp_dir.path()); + + // Would need mock database here + // let db = create_mock_database(); + // let manager = UserWatchManager::new(db, user_watch_service); + // assert!(manager.initialize().await.is_ok()); + } + + #[tokio::test] + async fn test_extract_username_from_path() { + let temp_dir = TempDir::new().unwrap(); + let user_watch_service = UserWatchService::new(temp_dir.path()); + user_watch_service.initialize().await.unwrap(); + + let user = create_test_user("testuser"); + let user_dir = user_watch_service.ensure_user_directory(&user).await.unwrap(); + let test_file = user_dir.join("document.pdf"); + + let username = user_watch_service.extract_username_from_path(&test_file); + assert_eq!(username, Some("testuser".to_string())); + } + + #[tokio::test] + async fn test_is_user_watch_path() { + let temp_dir = TempDir::new().unwrap(); + let user_watch_service = UserWatchService::new(temp_dir.path()); + user_watch_service.initialize().await.unwrap(); + + let user = create_test_user("testuser"); + let user_dir = user_watch_service.ensure_user_directory(&user).await.unwrap(); + let test_file = user_dir.join("document.pdf"); + + assert!(user_watch_service.is_within_user_watch(&test_file)); + + let outside_file = temp_dir.path().parent().unwrap().join("outside.pdf"); + assert!(!user_watch_service.is_within_user_watch(&outside_file)); + } +} \ No newline at end of file diff --git a/src/scheduling/watcher.rs b/src/scheduling/watcher.rs index b8fcf7e..603f71b 100644 --- a/src/scheduling/watcher.rs +++ b/src/scheduling/watcher.rs @@ -12,7 +12,8 @@ use chrono::{DateTime, Utc}; use crate::{ config::Config, db::Database, - services::file_service::FileService, + services::{file_service::FileService, user_watch_service::UserWatchService}, + scheduling::user_watch_manager::UserWatchManager, ingestion::document_ingestion::{DocumentIngestionService, IngestionResult, DeduplicationPolicy}, ocr::queue::OcrQueueService, models::FileIngestionInfo, @@ -22,6 +23,10 @@ pub async fn start_folder_watcher(config: Config, db: Database) -> Result<()> { info!("Starting hybrid folder watcher on: {}", config.watch_folder); info!("Upload path configured as: {}", config.upload_path); + if config.enable_per_user_watch { + info!("Per-user watch directories enabled. Base directory: {}", config.user_watch_base_dir); + } + // Debug: Check if paths resolve correctly let watch_canonical = std::path::Path::new(&config.watch_folder).canonicalize() .unwrap_or_else(|_| std::path::PathBuf::from(&config.watch_folder)); @@ -35,6 +40,21 @@ pub async fn start_folder_watcher(config: Config, db: Database) -> Result<()> { let file_service = FileService::new(config.upload_path.clone()); let queue_service = OcrQueueService::new(db.clone(), db.get_pool().clone(), 1); + // Initialize user watch components if enabled + let user_watch_manager = if config.enable_per_user_watch { + let user_watch_service = UserWatchService::new(&config.user_watch_base_dir); + let manager = UserWatchManager::new(db.clone(), user_watch_service); + + if let Err(e) = manager.initialize().await { + error!("Failed to initialize user watch manager: {}", e); + return Err(e); + } + + Some(manager) + } else { + None + }; + // Determine watch strategy based on filesystem type let watch_path = Path::new(&config.watch_folder); let watch_strategy = determine_watch_strategy(watch_path).await?; @@ -43,10 +63,10 @@ pub async fn start_folder_watcher(config: Config, db: Database) -> Result<()> { match watch_strategy { WatchStrategy::NotifyBased => { - start_notify_watcher(config, db, file_service, queue_service).await + start_notify_watcher(config, db, file_service, queue_service, user_watch_manager).await } WatchStrategy::PollingBased => { - start_polling_watcher(config, db, file_service, queue_service).await + start_polling_watcher(config, db, file_service, queue_service, user_watch_manager).await } WatchStrategy::Hybrid => { // Start both methods concurrently @@ -54,14 +74,15 @@ pub async fn start_folder_watcher(config: Config, db: Database) -> Result<()> { let db_clone = db.clone(); let file_service_clone = file_service.clone(); let queue_service_clone = queue_service.clone(); + let user_watch_manager_clone = user_watch_manager.clone(); let notify_handle = tokio::spawn(async move { - if let Err(e) = start_notify_watcher(config_clone, db_clone, file_service_clone, queue_service_clone).await { + if let Err(e) = start_notify_watcher(config_clone, db_clone, file_service_clone, queue_service_clone, user_watch_manager_clone).await { warn!("Notify watcher failed, continuing with polling: {}", e); } }); - let polling_result = start_polling_watcher(config, db, file_service, queue_service).await; + let polling_result = start_polling_watcher(config, db, file_service, queue_service, user_watch_manager).await; // Cancel notify watcher if polling completes notify_handle.abort(); @@ -108,6 +129,7 @@ async fn start_notify_watcher( db: Database, file_service: FileService, queue_service: OcrQueueService, + user_watch_manager: Option, ) -> Result<()> { let (tx, mut rx) = mpsc::channel(100); @@ -120,15 +142,26 @@ async fn start_notify_watcher( notify::Config::default(), )?; + // Watch the global watch folder watcher.watch(Path::new(&config.watch_folder), RecursiveMode::Recursive)?; + info!("Started notify-based watcher on global folder: {}", config.watch_folder); - info!("Started notify-based watcher on: {}", config.watch_folder); + // Also watch user watch directories if enabled + if config.enable_per_user_watch { + let user_watch_path = Path::new(&config.user_watch_base_dir); + if user_watch_path.exists() { + watcher.watch(user_watch_path, RecursiveMode::Recursive)?; + info!("Started notify-based watcher on user watch folder: {}", config.user_watch_base_dir); + } else { + info!("User watch base directory does not exist yet: {}", config.user_watch_base_dir); + } + } while let Some(res) = rx.recv().await { match res { Ok(event) => { for path in event.paths { - if let Err(e) = process_file(&path, &db, &file_service, &queue_service, &config).await { + if let Err(e) = process_file(&path, &db, &file_service, &queue_service, &config, &user_watch_manager).await { error!("Failed to process file {:?}: {}", path, e); } } @@ -145,24 +178,41 @@ async fn start_polling_watcher( db: Database, file_service: FileService, queue_service: OcrQueueService, + user_watch_manager: Option, ) -> Result<()> { info!("Started polling-based watcher on: {}", config.watch_folder); let mut known_files: HashSet<(PathBuf, SystemTime)> = HashSet::new(); let mut interval = interval(Duration::from_secs(config.watch_interval_seconds.unwrap_or(30))); - // Initial scan - info!("Starting initial scan of watch directory: {}", config.watch_folder); - scan_directory(&config.watch_folder, &mut known_files, &db, &file_service, &queue_service, &config).await?; + // Initial scan of global watch directory + info!("Starting initial scan of global watch directory: {}", config.watch_folder); + scan_directory(&config.watch_folder, &mut known_files, &db, &file_service, &queue_service, &config, &user_watch_manager).await?; + + // Initial scan of user watch directories if enabled + if config.enable_per_user_watch { + info!("Starting initial scan of user watch directories: {}", config.user_watch_base_dir); + scan_directory(&config.user_watch_base_dir, &mut known_files, &db, &file_service, &queue_service, &config, &user_watch_manager).await?; + } + info!("Initial scan completed. Found {} files to track", known_files.len()); loop { interval.tick().await; - if let Err(e) = scan_directory(&config.watch_folder, &mut known_files, &db, &file_service, &queue_service, &config).await { - error!("Error during directory scan: {}", e); + // Scan global watch directory + if let Err(e) = scan_directory(&config.watch_folder, &mut known_files, &db, &file_service, &queue_service, &config, &user_watch_manager).await { + error!("Error during global watch directory scan: {}", e); // Continue polling even if one scan fails } + + // Scan user watch directories if enabled + if config.enable_per_user_watch { + if let Err(e) = scan_directory(&config.user_watch_base_dir, &mut known_files, &db, &file_service, &queue_service, &config, &user_watch_manager).await { + error!("Error during user watch directory scan: {}", e); + // Continue polling even if one scan fails + } + } } } @@ -173,6 +223,7 @@ async fn scan_directory( file_service: &FileService, queue_service: &OcrQueueService, config: &Config, + user_watch_manager: &Option, ) -> Result<()> { let mut current_files: HashSet<(PathBuf, SystemTime)> = HashSet::new(); @@ -196,7 +247,7 @@ async fn scan_directory( // Wait a bit to ensure file is fully written if is_file_stable(&path).await { debug!("Found new/modified file: {:?}", path); - if let Err(e) = process_file(&path, db, file_service, queue_service, config).await { + if let Err(e) = process_file(&path, db, file_service, queue_service, config, user_watch_manager).await { error!("Failed to process file {:?}: {}", path, e); } } @@ -236,6 +287,7 @@ async fn process_file( file_service: &FileService, queue_service: &OcrQueueService, config: &Config, + user_watch_manager: &Option, ) -> Result<()> { if !path.is_file() { return Ok(()); @@ -276,6 +328,31 @@ async fn process_file( } } + // Skip files that are not in either global watch directory or user watch directories + let global_watch_canonical = std::path::Path::new(&config.watch_folder) + .canonicalize() + .unwrap_or_else(|_| std::path::PathBuf::from(&config.watch_folder)); + let user_watch_canonical = if config.enable_per_user_watch { + Some(std::path::Path::new(&config.user_watch_base_dir) + .canonicalize() + .unwrap_or_else(|_| std::path::PathBuf::from(&config.user_watch_base_dir))) + } else { + None + }; + + if let Ok(file_canonical) = path.canonicalize() { + let in_global_watch = file_canonical.starts_with(&global_watch_canonical); + let in_user_watch = user_watch_canonical + .as_ref() + .map(|user_watch| file_canonical.starts_with(user_watch)) + .unwrap_or(false); + + if !in_global_watch && !in_user_watch { + debug!("Skipping file outside of watch directories: {}", filename); + return Ok(()); + } + } + // Check file age if configured if let Some(max_age_hours) = config.max_file_age_hours { if let Ok(metadata) = tokio::fs::metadata(path).await { @@ -317,10 +394,38 @@ async fn process_file( return Ok(()); } - // Fetch admin user ID from database for watch folder documents - let admin_user = db.get_user_by_username("admin").await? - .ok_or_else(|| anyhow::anyhow!("Admin user not found. Please ensure the admin user is created."))?; - let admin_user_id = admin_user.id; + // Determine which user this file belongs to + let target_user_id = if let Some(ref manager) = user_watch_manager { + // Check if file is in user watch directory + if manager.is_user_watch_path(path) { + // Extract user from file path + match manager.get_user_by_file_path(path).await? { + Some(user) => { + info!("File {} belongs to user: {} ({})", filename, user.username, user.id); + user.id + } + None => { + warn!("File {} is in user watch directory but no user found - assigning to admin", filename); + // Fallback to admin + let admin_user = db.get_user_by_username("admin").await? + .ok_or_else(|| anyhow::anyhow!("Admin user not found. Please ensure the admin user is created."))?; + admin_user.id + } + } + } else { + // File is in global watch directory, assign to admin + debug!("File {} is in global watch directory - assigning to admin", filename); + let admin_user = db.get_user_by_username("admin").await? + .ok_or_else(|| anyhow::anyhow!("Admin user not found. Please ensure the admin user is created."))?; + admin_user.id + } + } else { + // Per-user watch is disabled, always use admin + debug!("Per-user watch disabled - assigning file {} to admin", filename); + let admin_user = db.get_user_by_username("admin").await? + .ok_or_else(|| anyhow::anyhow!("Admin user not found. Please ensure the admin user is created."))?; + admin_user.id + }; // Validate PDF files before processing if mime_type == "application/pdf" { @@ -349,7 +454,7 @@ async fn process_file( let ingestion_service = DocumentIngestionService::new(db.clone(), file_service.clone()); let result = ingestion_service - .ingest_from_file_info(&file_info, file_data, admin_user_id, DeduplicationPolicy::Skip, "watch_folder", None) + .ingest_from_file_info(&file_info, file_data, target_user_id, DeduplicationPolicy::Skip, "watch_folder", None) .await .map_err(|e| anyhow::anyhow!(e))?; diff --git a/src/services/mod.rs b/src/services/mod.rs index 31147ac..b1733bf 100644 --- a/src/services/mod.rs +++ b/src/services/mod.rs @@ -4,4 +4,5 @@ pub mod ocr_retry_service; pub mod s3_service; pub mod s3_service_stub; pub mod sync_progress_tracker; +pub mod user_watch_service; pub mod webdav; \ No newline at end of file diff --git a/src/services/user_watch_service.rs b/src/services/user_watch_service.rs new file mode 100644 index 0000000..8e4f972 --- /dev/null +++ b/src/services/user_watch_service.rs @@ -0,0 +1,468 @@ +use anyhow::Result; +use std::path::{Path, PathBuf}; +use std::sync::Arc; +use tokio::sync::RwLock; +use std::collections::HashMap; +use tracing::{debug, info, warn}; +use uuid::Uuid; + +use crate::models::User; + +/// Service for managing per-user watch directories +/// +/// This service handles: +/// - Creating user-specific watch directories +/// - Managing directory permissions and ownership +/// - Handling cleanup on user deletion +/// - Providing thread-safe access to user directory paths +/// - Graceful error handling for filesystem operations +#[derive(Clone)] +pub struct UserWatchService { + /// Base directory where user watch folders are created + base_dir: PathBuf, + /// Cache of user ID to watch directory path mappings + /// Uses Arc for concurrent read access with exclusive write access + user_directories: Arc>>, +} + +impl UserWatchService { + /// Create a new UserWatchService + /// + /// # Arguments + /// * `base_dir` - Base directory path where user watch directories will be created + /// + /// # Returns + /// * New UserWatchService instance + pub fn new>(base_dir: P) -> Self { + Self { + base_dir: base_dir.as_ref().to_path_buf(), + user_directories: Arc::new(RwLock::new(HashMap::new())), + } + } + + /// Validate username for security (prevent path traversal attacks) + /// + /// # Arguments + /// * `username` - Username to validate + /// + /// # Returns + /// * Result indicating if username is valid + fn validate_username(username: &str) -> Result<()> { + if username.is_empty() || username.len() > 64 { + return Err(anyhow::anyhow!("Username must be between 1 and 64 characters")); + } + + // Check for path traversal attempts and invalid characters + if username.contains("..") || + username.starts_with('.') || + username.contains('/') || + username.contains('\\') || + username.contains('\0') { + return Err(anyhow::anyhow!("Username contains invalid characters")); + } + + // Only allow alphanumeric characters, underscore, and dash + if !username.chars().all(|c| c.is_alphanumeric() || c == '_' || c == '-') { + return Err(anyhow::anyhow!("Username can only contain alphanumeric characters, underscore, and dash")); + } + + // Additional security checks + if username == "." || username == ".." { + return Err(anyhow::anyhow!("Username cannot be '.' or '..'")); + } + + Ok(()) + } + + /// Initialize the service by creating the base directory and discovering existing user directories + /// + /// # Returns + /// * Result indicating success or failure + pub async fn initialize(&self) -> Result<()> { + info!("Initializing UserWatchService with base directory: {}", self.base_dir.display()); + + // Create base directory if it doesn't exist + if !self.base_dir.exists() { + info!("Creating user watch base directory: {}", self.base_dir.display()); + tokio::fs::create_dir_all(&self.base_dir).await + .map_err(|e| anyhow::anyhow!( + "Failed to create user watch base directory '{}': {}", + self.base_dir.display(), e + ))?; + } else if !self.base_dir.is_dir() { + return Err(anyhow::anyhow!( + "User watch base path '{}' exists but is not a directory", + self.base_dir.display() + )); + } + + // Discover existing user directories + self.discover_existing_directories().await?; + + info!("UserWatchService initialized successfully"); + Ok(()) + } + + /// Discover existing user directories in the base directory + /// This is used during initialization to populate the cache with existing directories + async fn discover_existing_directories(&self) -> Result<()> { + debug!("Discovering existing user watch directories"); + + let mut entries = tokio::fs::read_dir(&self.base_dir).await + .map_err(|e| anyhow::anyhow!( + "Failed to read user watch base directory '{}': {}", + self.base_dir.display(), e + ))?; + + let mut discovered_count = 0; + while let Some(entry) = entries.next_entry().await.map_err(|e| { + anyhow::anyhow!("Error reading directory entry: {}", e) + })? { + let path = entry.path(); + if path.is_dir() { + if let Some(dir_name) = path.file_name().and_then(|n| n.to_str()) { + debug!("Found existing user watch directory: {}", dir_name); + // Note: We don't store these in the cache yet since we don't have user IDs + // The cache will be populated when users are looked up during operation + discovered_count += 1; + } + } + } + + info!("Discovered {} existing user watch directories", discovered_count); + Ok(()) + } + + /// Create or ensure a user's watch directory exists + /// + /// # Arguments + /// * `user` - User for whom to create the watch directory + /// + /// # Returns + /// * PathBuf to the user's watch directory + pub async fn ensure_user_directory(&self, user: &User) -> Result { + // Validate username for security + Self::validate_username(&user.username)?; + // Check cache first (read lock) + { + let cache = self.user_directories.read().await; + if let Some(path) = cache.get(&user.id) { + if path.exists() { + debug!("User watch directory found in cache: {}", path.display()); + return Ok(path.clone()); + } else { + warn!("Cached user watch directory no longer exists: {}", path.display()); + } + } + } + + // Not in cache or doesn't exist, create it (write lock) + let mut cache = self.user_directories.write().await; + + // Double-check in case another thread created it while we were waiting for the write lock + if let Some(path) = cache.get(&user.id) { + if path.exists() { + debug!("User watch directory created by another thread: {}", path.display()); + return Ok(path.clone()); + } + } + + let user_dir = self.base_dir.join(&user.username); + + // Use atomic directory creation to avoid race conditions + match tokio::fs::create_dir_all(&user_dir).await { + Ok(_) => { + info!("Created user watch directory for {}: {}", user.username, user_dir.display()); + + // Set appropriate permissions (readable/writable by owner, readable by group) + #[cfg(unix)] + { + use std::os::unix::fs::PermissionsExt; + let permissions = std::fs::Permissions::from_mode(0o755); + if let Err(e) = std::fs::set_permissions(&user_dir, permissions) { + warn!("Failed to set permissions on user watch directory '{}': {}", + user_dir.display(), e); + // Don't fail the operation for permission issues + } + } + } + Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => { + // Directory already exists, check if it's actually a directory + if !user_dir.is_dir() { + return Err(anyhow::anyhow!( + "User watch path '{}' exists but is not a directory", + user_dir.display() + )); + } + debug!("User watch directory already exists for {}: {}", user.username, user_dir.display()); + } + Err(e) => { + return Err(anyhow::anyhow!( + "Failed to create user watch directory for '{}' at '{}': {}", + user.username, user_dir.display(), e + )); + } + } + + // Update cache + cache.insert(user.id, user_dir.clone()); + + Ok(user_dir) + } + + /// Get the watch directory path for a user (from cache if available) + /// + /// # Arguments + /// * `user_id` - ID of the user + /// + /// # Returns + /// * Option to the user's watch directory if it exists + pub async fn get_user_directory(&self, user_id: Uuid) -> Option { + let cache = self.user_directories.read().await; + cache.get(&user_id).filter(|path| path.exists()).cloned() + } + + /// Get the watch directory path for a user by username + /// This method constructs the path based on the username without checking the cache + /// + /// # Arguments + /// * `username` - Username of the user + /// + /// # Returns + /// * PathBuf to where the user's watch directory should be + pub fn get_user_directory_by_username(&self, username: &str) -> PathBuf { + self.base_dir.join(username) + } + + /// Extract username from a file path within the user watch directory structure + /// + /// # Arguments + /// * `file_path` - Path to a file within a user watch directory + /// + /// # Returns + /// * Option containing the username if the path is within a user directory + pub fn extract_username_from_path(&self, file_path: &Path) -> Option { + // Normalize the file path - use canonical path for security + let file_canonical = match file_path.canonicalize() { + Ok(path) => path, + Err(_) => { + debug!("Failed to canonicalize file path: {}", file_path.display()); + return None; + } + }; + + let base_canonical = match self.base_dir.canonicalize() { + Ok(path) => path, + Err(_) => { + debug!("Failed to canonicalize base directory: {}", self.base_dir.display()); + return None; + } + }; + + // Check if the file is within the user watch base directory + if !file_canonical.starts_with(&base_canonical) { + debug!("File path {} is not within user watch base directory {}", + file_canonical.display(), base_canonical.display()); + return None; + } + + // Extract the relative path from base directory + let relative_path = file_canonical.strip_prefix(&base_canonical).ok()?; + let components: Vec<_> = relative_path.components().collect(); + + if components.is_empty() { + debug!("No path components found after stripping base directory"); + return None; + } + + // First component should be the username + let username = components[0].as_os_str().to_str()?; + + // Validate the extracted username for security + if let Err(e) = Self::validate_username(username) { + warn!("Invalid username '{}' extracted from path {}: {}", + username, file_path.display(), e); + return None; + } + + debug!("Extracted username '{}' from path {}", username, file_path.display()); + Some(username.to_string()) + } + + /// Remove a user's watch directory and clean up cache + /// + /// # Arguments + /// * `user` - User whose watch directory should be removed + /// + /// # Returns + /// * Result indicating success or failure + pub async fn remove_user_directory(&self, user: &User) -> Result<()> { + info!("Removing user watch directory for {}", user.username); + + let user_dir = self.base_dir.join(&user.username); + + if user_dir.exists() { + // Remove directory and all contents + tokio::fs::remove_dir_all(&user_dir).await + .map_err(|e| anyhow::anyhow!( + "Failed to remove user watch directory for '{}' at '{}': {}", + user.username, user_dir.display(), e + ))?; + + info!("Successfully removed user watch directory for {}", user.username); + } else { + debug!("User watch directory for {} did not exist", user.username); + } + + // Remove from cache + let mut cache = self.user_directories.write().await; + cache.remove(&user.id); + + Ok(()) + } + + /// Check if a path is within the user watch directory structure + /// + /// # Arguments + /// * `path` - Path to check + /// + /// # Returns + /// * bool indicating whether the path is within user watch directories + pub fn is_within_user_watch(&self, path: &Path) -> bool { + let file_canonical = path.canonicalize().ok().unwrap_or_else(|| path.to_path_buf()); + let base_canonical = self.base_dir.canonicalize().ok().unwrap_or_else(|| self.base_dir.clone()); + + file_canonical.starts_with(&base_canonical) + } + + /// Get statistics about user watch directories + /// + /// # Returns + /// * (cached_directories, total_directories) tuple + pub async fn get_statistics(&self) -> Result<(usize, usize)> { + let cached_count = { + let cache = self.user_directories.read().await; + cache.len() + }; + + let mut total_count = 0; + if self.base_dir.exists() { + let mut entries = tokio::fs::read_dir(&self.base_dir).await + .map_err(|e| anyhow::anyhow!( + "Failed to read user watch base directory: {}", e + ))?; + + while let Some(entry) = entries.next_entry().await + .map_err(|e| anyhow::anyhow!("Error reading directory entry: {}", e))? { + if entry.path().is_dir() { + total_count += 1; + } + } + } + + Ok((cached_count, total_count)) + } + + /// Clear the directory cache (useful for testing or cache invalidation) + pub async fn clear_cache(&self) { + let mut cache = self.user_directories.write().await; + cache.clear(); + debug!("User watch directory cache cleared"); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use tempfile::TempDir; + use uuid::Uuid; + use crate::models::{UserRole, AuthProvider}; + use chrono::Utc; + + fn create_test_user(username: &str) -> User { + User { + id: Uuid::new_v4(), + username: username.to_string(), + email: format!("{}@example.com", username), + password_hash: Some("test_hash".to_string()), + role: UserRole::User, + created_at: Utc::now(), + updated_at: Utc::now(), + oidc_subject: None, + oidc_issuer: None, + oidc_email: None, + auth_provider: AuthProvider::Local, + } + } + + #[tokio::test] + async fn test_user_watch_service_initialization() { + let temp_dir = TempDir::new().unwrap(); + let service = UserWatchService::new(temp_dir.path()); + + assert!(service.initialize().await.is_ok()); + assert!(temp_dir.path().exists()); + } + + #[tokio::test] + async fn test_ensure_user_directory() { + let temp_dir = TempDir::new().unwrap(); + let service = UserWatchService::new(temp_dir.path()); + service.initialize().await.unwrap(); + + let user = create_test_user("testuser"); + let user_dir = service.ensure_user_directory(&user).await.unwrap(); + + assert!(user_dir.exists()); + assert!(user_dir.is_dir()); + assert_eq!(user_dir.file_name().unwrap(), "testuser"); + } + + #[tokio::test] + async fn test_extract_username_from_path() { + let temp_dir = TempDir::new().unwrap(); + let service = UserWatchService::new(temp_dir.path()); + service.initialize().await.unwrap(); + + // Create user directory + let user = create_test_user("testuser"); + let user_dir = service.ensure_user_directory(&user).await.unwrap(); + + // Create a test file + let test_file = user_dir.join("test.pdf"); + tokio::fs::write(&test_file, b"test content").await.unwrap(); + + let username = service.extract_username_from_path(&test_file); + assert_eq!(username, Some("testuser".to_string())); + } + + #[tokio::test] + async fn test_remove_user_directory() { + let temp_dir = TempDir::new().unwrap(); + let service = UserWatchService::new(temp_dir.path()); + service.initialize().await.unwrap(); + + let user = create_test_user("testuser"); + let user_dir = service.ensure_user_directory(&user).await.unwrap(); + assert!(user_dir.exists()); + + service.remove_user_directory(&user).await.unwrap(); + assert!(!user_dir.exists()); + } + + #[tokio::test] + async fn test_is_within_user_watch() { + let temp_dir = TempDir::new().unwrap(); + let service = UserWatchService::new(temp_dir.path()); + service.initialize().await.unwrap(); + + let user = create_test_user("testuser"); + let user_dir = service.ensure_user_directory(&user).await.unwrap(); + let test_file = user_dir.join("test.pdf"); + + assert!(service.is_within_user_watch(&test_file)); + + let outside_file = temp_dir.path().parent().unwrap().join("outside.pdf"); + assert!(!service.is_within_user_watch(&outside_file)); + } +} \ No newline at end of file diff --git a/src/services/webdav/config.rs b/src/services/webdav/config.rs index 446eb29..94a2d50 100644 --- a/src/services/webdav/config.rs +++ b/src/services/webdav/config.rs @@ -165,6 +165,11 @@ impl WebDAVConfig { return Err(anyhow::anyhow!("Invalid URL format: contains multiple protocols")); } + // Reject relative URLs (paths that start with /) + if url_without_protocol.starts_with('/') { + return Err(anyhow::anyhow!("Server URL cannot be a relative path. Please provide a full server URL like 'https://server.example.com' or 'server.example.com'")); + } + Ok(()) } diff --git a/src/test_utils.rs b/src/test_utils.rs index 605596a..89d012d 100644 --- a/src/test_utils.rs +++ b/src/test_utils.rs @@ -274,6 +274,12 @@ impl TestContext { let config = config_builder.build(database_url); let queue_service = Arc::new(crate::ocr::queue::OcrQueueService::new(db.clone(), db.pool.clone(), 2)); + let user_watch_service = if config.enable_per_user_watch { + Some(Arc::new(crate::services::user_watch_service::UserWatchService::new(&config.user_watch_base_dir))) + } else { + None + }; + let state = Arc::new(AppState { db, config, @@ -282,6 +288,7 @@ impl TestContext { queue_service, oidc_client: None, sync_progress_tracker: Arc::new(crate::services::sync_progress_tracker::SyncProgressTracker::new()), + user_watch_service, }); let app = Router::new() @@ -786,6 +793,8 @@ impl TestConfigBuilder { jwt_secret: self.jwt_secret, upload_path: self.upload_path, watch_folder: self.watch_folder, + user_watch_base_dir: "./test-user-watch".to_string(), + enable_per_user_watch: false, allowed_file_types: vec!["pdf".to_string(), "txt".to_string(), "png".to_string()], watch_interval_seconds: Some(30), file_stability_check_ms: Some(500), diff --git a/tests/integration_per_user_watch_directories_tests.rs b/tests/integration_per_user_watch_directories_tests.rs new file mode 100644 index 0000000..58f6a36 --- /dev/null +++ b/tests/integration_per_user_watch_directories_tests.rs @@ -0,0 +1,460 @@ +use anyhow::Result; +use axum::{ + body::Body, + http::{Method, Request, StatusCode}, + Router, +}; +use serde_json::{json, Value}; +use std::path::PathBuf; +use std::sync::Arc; +use tempfile::TempDir; +use tower::ServiceExt; +use uuid::Uuid; + +use readur::{ + config::Config, + db::Database, + models::{CreateUser, UserRole}, + services::user_watch_service::UserWatchService, + AppState, +}; + +/// Helper to create test configuration with per-user watch enabled +async fn create_test_config() -> Result<(Config, TempDir, TempDir)> { + let temp_upload_dir = TempDir::new()?; + let temp_watch_dir = TempDir::new()?; + let temp_user_watch_dir = TempDir::new()?; + + let config = Config { + database_url: std::env::var("TEST_DATABASE_URL") + .unwrap_or_else(|_| "postgresql://readur:readur@localhost/readur_test".to_string()), + server_address: "127.0.0.1:0".to_string(), + jwt_secret: "test_secret".to_string(), + upload_path: temp_upload_dir.path().to_string_lossy().to_string(), + watch_folder: temp_watch_dir.path().to_string_lossy().to_string(), + user_watch_base_dir: temp_user_watch_dir.path().to_string_lossy().to_string(), + enable_per_user_watch: true, + allowed_file_types: vec!["pdf".to_string(), "txt".to_string(), "png".to_string()], + watch_interval_seconds: Some(10), + file_stability_check_ms: Some(1000), + max_file_age_hours: None, + ocr_language: "eng".to_string(), + concurrent_ocr_jobs: 1, + ocr_timeout_seconds: 30, + max_file_size_mb: 10, + memory_limit_mb: 512, + cpu_priority: "normal".to_string(), + oidc_enabled: false, + oidc_client_id: None, + oidc_client_secret: None, + oidc_issuer_url: None, + oidc_redirect_uri: None, + }; + + Ok((config, temp_upload_dir, temp_user_watch_dir)) +} + +/// Helper to create test app state +async fn create_test_app_state(config: Config) -> Result> { + let db = Database::new(&config.database_url).await?; + let queue_service = Arc::new(readur::ocr::queue::OcrQueueService::new( + db.clone(), + db.get_pool().clone(), + 1, + )); + + let user_watch_service = if config.enable_per_user_watch { + Some(Arc::new(UserWatchService::new(&config.user_watch_base_dir))) + } else { + None + }; + + Ok(Arc::new(AppState { + db, + config, + webdav_scheduler: None, + source_scheduler: None, + queue_service, + oidc_client: None, + sync_progress_tracker: Arc::new(readur::services::sync_progress_tracker::SyncProgressTracker::new()), + user_watch_service, + })) +} + +/// Helper to create test user and get auth token +async fn create_test_user_and_login( + app: &Router, + username: &str, + email: &str, + role: UserRole, +) -> Result<(String, Uuid)> { + // Create user + let create_user_req = CreateUser { + username: username.to_string(), + email: email.to_string(), + password: "test_password".to_string(), + role: Some(role), + }; + + let create_response = app + .clone() + .oneshot( + Request::builder() + .method(Method::POST) + .uri("/api/users") + .header("Content-Type", "application/json") + .body(Body::from(serde_json::to_string(&create_user_req)?))?, + ) + .await?; + + assert_eq!(create_response.status(), StatusCode::OK); + + let create_body = axum::body::to_bytes(create_response.into_body(), usize::MAX).await?; + let user_response: Value = serde_json::from_slice(&create_body)?; + let user_id = Uuid::parse_str(user_response["id"].as_str().unwrap())?; + + // Login to get token + let login_req = json!({ + "username": username, + "password": "test_password" + }); + + let login_response = app + .clone() + .oneshot( + Request::builder() + .method(Method::POST) + .uri("/api/auth/login") + .header("Content-Type", "application/json") + .body(Body::from(serde_json::to_string(&login_req)?))?, + ) + .await?; + + assert_eq!(login_response.status(), StatusCode::OK); + + let login_body = axum::body::to_bytes(login_response.into_body(), usize::MAX).await?; + let login_response: Value = serde_json::from_slice(&login_body)?; + let token = login_response["token"].as_str().unwrap().to_string(); + + Ok((token, user_id)) +} + +#[tokio::test] +async fn test_per_user_watch_directory_lifecycle() -> Result<()> { + let (config, _temp_upload, temp_user_watch) = create_test_config().await?; + let state = create_test_app_state(config).await?; + + let app = Router::new() + .nest("/api/users", readur::routes::users::router()) + .nest("/api/auth", readur::routes::auth::router()) + .with_state(state.clone()); + + // Create admin user and regular user + let (admin_token, admin_id) = create_test_user_and_login(&app, "admin", "admin@test.com", UserRole::Admin).await?; + let (user_token, user_id) = create_test_user_and_login(&app, "testuser", "test@test.com", UserRole::User).await?; + + // Test 1: Get user watch directory info (should not exist initially) + let get_response = app + .clone() + .oneshot( + Request::builder() + .method(Method::GET) + .uri(&format!("/api/users/{}/watch-directory", user_id)) + .header("Authorization", format!("Bearer {}", admin_token)) + .body(Body::empty())?, + ) + .await?; + + assert_eq!(get_response.status(), StatusCode::OK); + + let get_body = axum::body::to_bytes(get_response.into_body(), usize::MAX).await?; + let watch_info: Value = serde_json::from_slice(&get_body)?; + + assert_eq!(watch_info["username"], "testuser"); + assert_eq!(watch_info["exists"], false); + assert_eq!(watch_info["enabled"], true); + assert!(watch_info["watch_directory_path"].as_str().unwrap().contains("testuser")); + + // Test 2: Create user watch directory + let create_req = json!({ + "ensure_created": true + }); + + let create_response = app + .clone() + .oneshot( + Request::builder() + .method(Method::POST) + .uri(&format!("/api/users/{}/watch-directory", user_id)) + .header("Authorization", format!("Bearer {}", admin_token)) + .header("Content-Type", "application/json") + .body(Body::from(serde_json::to_string(&create_req)?))?, + ) + .await?; + + assert_eq!(create_response.status(), StatusCode::OK); + + let create_body = axum::body::to_bytes(create_response.into_body(), usize::MAX).await?; + let create_result: Value = serde_json::from_slice(&create_body)?; + + assert_eq!(create_result["success"], true); + assert!(create_result["message"].as_str().unwrap().contains("testuser")); + assert!(create_result["watch_directory_path"].is_string()); + + // Verify directory was created on filesystem + let expected_path = temp_user_watch.path().join("testuser"); + assert!(expected_path.exists()); + assert!(expected_path.is_dir()); + + // Test 3: Get user watch directory info again (should exist now) + let get_response2 = app + .clone() + .oneshot( + Request::builder() + .method(Method::GET) + .uri(&format!("/api/users/{}/watch-directory", user_id)) + .header("Authorization", format!("Bearer {}", admin_token)) + .body(Body::empty())?, + ) + .await?; + + assert_eq!(get_response2.status(), StatusCode::OK); + + let get_body2 = axum::body::to_bytes(get_response2.into_body(), usize::MAX).await?; + let watch_info2: Value = serde_json::from_slice(&get_body2)?; + + assert_eq!(watch_info2["exists"], true); + + // Test 4: Regular user can access their own watch directory + let user_get_response = app + .clone() + .oneshot( + Request::builder() + .method(Method::GET) + .uri(&format!("/api/users/{}/watch-directory", user_id)) + .header("Authorization", format!("Bearer {}", user_token)) + .body(Body::empty())?, + ) + .await?; + + assert_eq!(user_get_response.status(), StatusCode::OK); + + // Test 5: Regular user cannot access another user's watch directory + let forbidden_response = app + .clone() + .oneshot( + Request::builder() + .method(Method::GET) + .uri(&format!("/api/users/{}/watch-directory", admin_id)) + .header("Authorization", format!("Bearer {}", user_token)) + .body(Body::empty())?, + ) + .await?; + + assert_eq!(forbidden_response.status(), StatusCode::FORBIDDEN); + + // Test 6: Delete user watch directory (admin only) + let delete_response = app + .clone() + .oneshot( + Request::builder() + .method(Method::DELETE) + .uri(&format!("/api/users/{}/watch-directory", user_id)) + .header("Authorization", format!("Bearer {}", admin_token)) + .body(Body::empty())?, + ) + .await?; + + assert_eq!(delete_response.status(), StatusCode::OK); + + let delete_body = axum::body::to_bytes(delete_response.into_body(), usize::MAX).await?; + let delete_result: Value = serde_json::from_slice(&delete_body)?; + + assert_eq!(delete_result["success"], true); + + // Verify directory was removed from filesystem + assert!(!expected_path.exists()); + + // Test 7: Regular user cannot delete watch directories + let user_delete_response = app + .clone() + .oneshot( + Request::builder() + .method(Method::DELETE) + .uri(&format!("/api/users/{}/watch-directory", user_id)) + .header("Authorization", format!("Bearer {}", user_token)) + .body(Body::empty())?, + ) + .await?; + + assert_eq!(user_delete_response.status(), StatusCode::FORBIDDEN); + + Ok(()) +} + +#[tokio::test] +async fn test_user_watch_service_security() -> Result<()> { + let (config, _temp_upload, temp_user_watch) = create_test_config().await?; + + let user_watch_service = UserWatchService::new(&config.user_watch_base_dir); + + // Create test user + let test_user = readur::models::User { + id: Uuid::new_v4(), + username: "testuser".to_string(), + email: "test@test.com".to_string(), + password_hash: Some("hash".to_string()), + role: UserRole::User, + created_at: chrono::Utc::now(), + updated_at: chrono::Utc::now(), + oidc_subject: None, + oidc_issuer: None, + oidc_email: None, + auth_provider: readur::models::user::AuthProvider::Local, + }; + + // Test 1: Normal username works + let result = user_watch_service.ensure_user_directory(&test_user).await; + assert!(result.is_ok()); + + let user_dir = temp_user_watch.path().join("testuser"); + assert!(user_dir.exists()); + + // Test 2: Security - usernames with path traversal attempts should be rejected + let malicious_user = readur::models::User { + id: Uuid::new_v4(), + username: "../malicious".to_string(), + email: "mal@test.com".to_string(), + password_hash: Some("hash".to_string()), + role: UserRole::User, + created_at: chrono::Utc::now(), + updated_at: chrono::Utc::now(), + oidc_subject: None, + oidc_issuer: None, + oidc_email: None, + auth_provider: readur::models::user::AuthProvider::Local, + }; + + let malicious_result = user_watch_service.ensure_user_directory(&malicious_user).await; + assert!(malicious_result.is_err()); + + // Verify no malicious directory was created outside the base directory + let malicious_dir = temp_user_watch.path().parent().unwrap().join("malicious"); + assert!(!malicious_dir.exists()); + + // Test 3: Security - usernames with null bytes should be rejected + let null_user = readur::models::User { + id: Uuid::new_v4(), + username: "test\0user".to_string(), + email: "null@test.com".to_string(), + password_hash: Some("hash".to_string()), + role: UserRole::User, + created_at: chrono::Utc::now(), + updated_at: chrono::Utc::now(), + oidc_subject: None, + oidc_issuer: None, + oidc_email: None, + auth_provider: readur::models::user::AuthProvider::Local, + }; + + let null_result = user_watch_service.ensure_user_directory(&null_user).await; + assert!(null_result.is_err()); + + Ok(()) +} + +#[tokio::test] +async fn test_user_watch_directory_file_processing_simulation() -> Result<()> { + let (config, _temp_upload, temp_user_watch) = create_test_config().await?; + let state = create_test_app_state(config.clone()).await?; + + // Create user watch manager to test file path mapping + let user_watch_service = state.user_watch_service.as_ref().unwrap(); + let user_watch_manager = readur::scheduling::user_watch_manager::UserWatchManager::new(state.db.clone(), (**user_watch_service).clone()); + + // Create test user + let test_user = readur::models::User { + id: Uuid::new_v4(), + username: "filetest".to_string(), + email: "filetest@test.com".to_string(), + password_hash: Some("hash".to_string()), + role: UserRole::User, + created_at: chrono::Utc::now(), + updated_at: chrono::Utc::now(), + oidc_subject: None, + oidc_issuer: None, + oidc_email: None, + auth_provider: readur::models::user::AuthProvider::Local, + }; + + // Insert user into database + let created_user = state.db.create_user(readur::models::CreateUser { + username: test_user.username.clone(), + email: test_user.email.clone(), + password: "test_password".to_string(), + role: Some(UserRole::User), + }).await?; + + // Create user watch directory + let user_watch_service = state.user_watch_service.as_ref().unwrap(); + let user_dir_path = user_watch_service.ensure_user_directory(&created_user).await?; + + // Test file path to user mapping + let test_file_path = user_dir_path.join("test_document.pdf"); + std::fs::File::create(&test_file_path)?; + + // Wait a moment for caching + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + + // Test that the user watch manager can map file paths to users + let mapped_user_result = user_watch_manager.get_user_by_file_path(&test_file_path).await?; + let mapped_user_id = mapped_user_result.as_ref().map(|user| user.id); + + // The user should be discoverable via file path + assert!(mapped_user_id.is_some()); + if let Some(user_id) = mapped_user_id { + assert_eq!(user_id, created_user.id); + } + + // Test invalid path (should not map to any user) + let invalid_path = PathBuf::from("/invalid/path/document.pdf"); + let invalid_mapping_result = user_watch_manager.get_user_by_file_path(&invalid_path).await?; + assert!(invalid_mapping_result.is_none()); + + Ok(()) +} + +#[tokio::test] +async fn test_per_user_watch_disabled() -> Result<()> { + // Create config with per-user watch disabled + let (mut config, _temp_upload, _temp_user_watch) = create_test_config().await?; + config.enable_per_user_watch = false; + + let state = create_test_app_state(config).await?; + + let app = Router::new() + .nest("/api/users", readur::routes::users::router()) + .nest("/api/auth", readur::routes::auth::router()) + .with_state(state.clone()); + + // Create admin user + let (admin_token, _admin_id) = create_test_user_and_login(&app, "admin", "admin@test.com", UserRole::Admin).await?; + let (_user_token, user_id) = create_test_user_and_login(&app, "testuser", "test@test.com", UserRole::User).await?; + + // Try to get user watch directory info when feature is disabled + let get_response = app + .clone() + .oneshot( + Request::builder() + .method(Method::GET) + .uri(&format!("/api/users/{}/watch-directory", user_id)) + .header("Authorization", format!("Bearer {}", admin_token)) + .body(Body::empty())?, + ) + .await?; + + // Should return internal server error when feature is disabled + assert_eq!(get_response.status(), StatusCode::INTERNAL_SERVER_ERROR); + + Ok(()) +} \ No newline at end of file