feat(server): create 'sources' concept and move WebDAV settings page to it
This commit is contained in:
parent
691c5e6bb8
commit
59e5356a25
|
|
@ -12,6 +12,7 @@ import DocumentsPage from './pages/DocumentsPage';
|
||||||
import SearchPage from './pages/SearchPage';
|
import SearchPage from './pages/SearchPage';
|
||||||
import DocumentDetailsPage from './pages/DocumentDetailsPage';
|
import DocumentDetailsPage from './pages/DocumentDetailsPage';
|
||||||
import SettingsPage from './pages/SettingsPage';
|
import SettingsPage from './pages/SettingsPage';
|
||||||
|
import SourcesPage from './pages/SourcesPage';
|
||||||
import WatchFolderPage from './pages/WatchFolderPage';
|
import WatchFolderPage from './pages/WatchFolderPage';
|
||||||
|
|
||||||
function App(): JSX.Element {
|
function App(): JSX.Element {
|
||||||
|
|
@ -65,6 +66,7 @@ function App(): JSX.Element {
|
||||||
<Route path="/documents" element={<DocumentsPage />} />
|
<Route path="/documents" element={<DocumentsPage />} />
|
||||||
<Route path="/documents/:id" element={<DocumentDetailsPage />} />
|
<Route path="/documents/:id" element={<DocumentDetailsPage />} />
|
||||||
<Route path="/search" element={<SearchPage />} />
|
<Route path="/search" element={<SearchPage />} />
|
||||||
|
<Route path="/sources" element={<SourcesPage />} />
|
||||||
<Route path="/watch" element={<WatchFolderPage />} />
|
<Route path="/watch" element={<WatchFolderPage />} />
|
||||||
<Route path="/settings" element={<SettingsPage />} />
|
<Route path="/settings" element={<SettingsPage />} />
|
||||||
<Route path="/profile" element={<div>Profile Page - Coming Soon</div>} />
|
<Route path="/profile" element={<div>Profile Page - Coming Soon</div>} />
|
||||||
|
|
|
||||||
|
|
@ -31,6 +31,7 @@ import {
|
||||||
AccountCircle as AccountIcon,
|
AccountCircle as AccountIcon,
|
||||||
Logout as LogoutIcon,
|
Logout as LogoutIcon,
|
||||||
Description as DocumentIcon,
|
Description as DocumentIcon,
|
||||||
|
Storage as StorageIcon,
|
||||||
} from '@mui/icons-material';
|
} from '@mui/icons-material';
|
||||||
import { useNavigate, useLocation } from 'react-router-dom';
|
import { useNavigate, useLocation } from 'react-router-dom';
|
||||||
import { useAuth } from '../../contexts/AuthContext';
|
import { useAuth } from '../../contexts/AuthContext';
|
||||||
|
|
@ -61,6 +62,7 @@ const navigationItems: NavigationItem[] = [
|
||||||
{ text: 'Upload', icon: UploadIcon, path: '/upload' },
|
{ text: 'Upload', icon: UploadIcon, path: '/upload' },
|
||||||
{ text: 'Documents', icon: DocumentIcon, path: '/documents' },
|
{ text: 'Documents', icon: DocumentIcon, path: '/documents' },
|
||||||
{ text: 'Search', icon: SearchIcon, path: '/search' },
|
{ text: 'Search', icon: SearchIcon, path: '/search' },
|
||||||
|
{ text: 'Sources', icon: StorageIcon, path: '/sources' },
|
||||||
{ text: 'Watch Folder', icon: FolderIcon, path: '/watch' },
|
{ text: 'Watch Folder', icon: FolderIcon, path: '/watch' },
|
||||||
];
|
];
|
||||||
|
|
||||||
|
|
|
||||||
File diff suppressed because it is too large
Load Diff
|
|
@ -0,0 +1,87 @@
|
||||||
|
-- Create sources table to support multiple document sources per user
|
||||||
|
CREATE TABLE IF NOT EXISTS sources (
|
||||||
|
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||||
|
user_id UUID REFERENCES users(id) ON DELETE CASCADE,
|
||||||
|
name TEXT NOT NULL,
|
||||||
|
source_type TEXT NOT NULL, -- 'webdav', 'local_folder', 's3', etc.
|
||||||
|
enabled BOOLEAN DEFAULT TRUE,
|
||||||
|
|
||||||
|
-- Configuration (JSON to allow flexibility for different source types)
|
||||||
|
config JSONB NOT NULL DEFAULT '{}',
|
||||||
|
|
||||||
|
-- Status tracking
|
||||||
|
status TEXT DEFAULT 'idle', -- 'idle', 'syncing', 'error'
|
||||||
|
last_sync_at TIMESTAMPTZ,
|
||||||
|
last_error TEXT,
|
||||||
|
last_error_at TIMESTAMPTZ,
|
||||||
|
|
||||||
|
-- Statistics
|
||||||
|
total_files_synced BIGINT DEFAULT 0,
|
||||||
|
total_files_pending BIGINT DEFAULT 0,
|
||||||
|
total_size_bytes BIGINT DEFAULT 0,
|
||||||
|
|
||||||
|
created_at TIMESTAMPTZ DEFAULT NOW(),
|
||||||
|
updated_at TIMESTAMPTZ DEFAULT NOW(),
|
||||||
|
|
||||||
|
UNIQUE(user_id, name)
|
||||||
|
);
|
||||||
|
|
||||||
|
-- Create indexes for performance
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_sources_user_id ON sources(user_id);
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_sources_source_type ON sources(source_type);
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_sources_status ON sources(status);
|
||||||
|
|
||||||
|
-- Update documents table to link to sources
|
||||||
|
ALTER TABLE documents ADD COLUMN IF NOT EXISTS source_id UUID REFERENCES sources(id) ON DELETE SET NULL;
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_documents_source_id ON documents(source_id);
|
||||||
|
|
||||||
|
-- Update webdav_files table to link to sources instead of users directly
|
||||||
|
ALTER TABLE webdav_files ADD COLUMN IF NOT EXISTS source_id UUID REFERENCES sources(id) ON DELETE CASCADE;
|
||||||
|
|
||||||
|
-- Migrate existing WebDAV settings to sources table
|
||||||
|
INSERT INTO sources (user_id, name, source_type, enabled, config, created_at, updated_at)
|
||||||
|
SELECT
|
||||||
|
s.user_id,
|
||||||
|
'WebDAV Server' as name,
|
||||||
|
'webdav' as source_type,
|
||||||
|
s.webdav_enabled as enabled,
|
||||||
|
jsonb_build_object(
|
||||||
|
'server_url', s.webdav_server_url,
|
||||||
|
'username', s.webdav_username,
|
||||||
|
'password', s.webdav_password,
|
||||||
|
'watch_folders', s.webdav_watch_folders,
|
||||||
|
'file_extensions', s.webdav_file_extensions,
|
||||||
|
'auto_sync', s.webdav_auto_sync,
|
||||||
|
'sync_interval_minutes', s.webdav_sync_interval_minutes
|
||||||
|
) as config,
|
||||||
|
NOW() as created_at,
|
||||||
|
NOW() as updated_at
|
||||||
|
FROM settings s
|
||||||
|
WHERE s.webdav_enabled = TRUE
|
||||||
|
AND s.webdav_server_url IS NOT NULL
|
||||||
|
AND s.webdav_username IS NOT NULL;
|
||||||
|
|
||||||
|
-- Update webdav_files to link to the newly created sources
|
||||||
|
UPDATE webdav_files wf
|
||||||
|
SET source_id = s.id
|
||||||
|
FROM sources s
|
||||||
|
WHERE wf.user_id = s.user_id
|
||||||
|
AND s.source_type = 'webdav';
|
||||||
|
|
||||||
|
-- Create a function to update the updated_at timestamp
|
||||||
|
CREATE OR REPLACE FUNCTION update_sources_updated_at()
|
||||||
|
RETURNS TRIGGER AS $$
|
||||||
|
BEGIN
|
||||||
|
NEW.updated_at = NOW();
|
||||||
|
RETURN NEW;
|
||||||
|
END;
|
||||||
|
$$ LANGUAGE plpgsql;
|
||||||
|
|
||||||
|
-- Create trigger to auto-update updated_at
|
||||||
|
CREATE TRIGGER sources_updated_at_trigger
|
||||||
|
BEFORE UPDATE ON sources
|
||||||
|
FOR EACH ROW
|
||||||
|
EXECUTE FUNCTION update_sources_updated_at();
|
||||||
|
|
||||||
|
-- Note: We're keeping the webdav fields in settings table for now to ensure backward compatibility
|
||||||
|
-- They will be removed in a future migration after ensuring all code is updated
|
||||||
256
src/db.rs
256
src/db.rs
|
|
@ -1836,4 +1836,260 @@ impl Database {
|
||||||
|
|
||||||
Ok(files)
|
Ok(files)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Sources methods
|
||||||
|
pub async fn create_source(&self, user_id: Uuid, source: &crate::models::CreateSource) -> Result<crate::models::Source> {
|
||||||
|
let id = Uuid::new_v4();
|
||||||
|
let now = Utc::now();
|
||||||
|
|
||||||
|
let row = sqlx::query(
|
||||||
|
r#"INSERT INTO sources (id, user_id, name, source_type, enabled, config, status, created_at, updated_at)
|
||||||
|
VALUES ($1, $2, $3, $4, $5, $6, 'idle', $7, $8)
|
||||||
|
RETURNING *"#
|
||||||
|
)
|
||||||
|
.bind(id)
|
||||||
|
.bind(user_id)
|
||||||
|
.bind(&source.name)
|
||||||
|
.bind(source.source_type.to_string())
|
||||||
|
.bind(source.enabled.unwrap_or(true))
|
||||||
|
.bind(&source.config)
|
||||||
|
.bind(now)
|
||||||
|
.bind(now)
|
||||||
|
.fetch_one(&self.pool)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(crate::models::Source {
|
||||||
|
id: row.get("id"),
|
||||||
|
user_id: row.get("user_id"),
|
||||||
|
name: row.get("name"),
|
||||||
|
source_type: row.get::<String, _>("source_type").try_into().map_err(|e: String| anyhow::anyhow!(e))?,
|
||||||
|
enabled: row.get("enabled"),
|
||||||
|
config: row.get("config"),
|
||||||
|
status: row.get::<String, _>("status").try_into().map_err(|e: String| anyhow::anyhow!(e))?,
|
||||||
|
last_sync_at: row.get("last_sync_at"),
|
||||||
|
last_error: row.get("last_error"),
|
||||||
|
last_error_at: row.get("last_error_at"),
|
||||||
|
total_files_synced: row.get("total_files_synced"),
|
||||||
|
total_files_pending: row.get("total_files_pending"),
|
||||||
|
total_size_bytes: row.get("total_size_bytes"),
|
||||||
|
created_at: row.get("created_at"),
|
||||||
|
updated_at: row.get("updated_at"),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn get_source(&self, user_id: Uuid, source_id: Uuid) -> Result<Option<crate::models::Source>> {
|
||||||
|
let row = sqlx::query(
|
||||||
|
r#"SELECT * FROM sources WHERE id = $1 AND user_id = $2"#
|
||||||
|
)
|
||||||
|
.bind(source_id)
|
||||||
|
.bind(user_id)
|
||||||
|
.fetch_optional(&self.pool)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
match row {
|
||||||
|
Some(row) => Ok(Some(crate::models::Source {
|
||||||
|
id: row.get("id"),
|
||||||
|
user_id: row.get("user_id"),
|
||||||
|
name: row.get("name"),
|
||||||
|
source_type: row.get::<String, _>("source_type").try_into().map_err(|e: String| anyhow::anyhow!(e))?,
|
||||||
|
enabled: row.get("enabled"),
|
||||||
|
config: row.get("config"),
|
||||||
|
status: row.get::<String, _>("status").try_into().map_err(|e: String| anyhow::anyhow!(e))?,
|
||||||
|
last_sync_at: row.get("last_sync_at"),
|
||||||
|
last_error: row.get("last_error"),
|
||||||
|
last_error_at: row.get("last_error_at"),
|
||||||
|
total_files_synced: row.get("total_files_synced"),
|
||||||
|
total_files_pending: row.get("total_files_pending"),
|
||||||
|
total_size_bytes: row.get("total_size_bytes"),
|
||||||
|
created_at: row.get("created_at"),
|
||||||
|
updated_at: row.get("updated_at"),
|
||||||
|
})),
|
||||||
|
None => Ok(None),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn get_sources(&self, user_id: Uuid) -> Result<Vec<crate::models::Source>> {
|
||||||
|
let rows = sqlx::query(
|
||||||
|
r#"SELECT * FROM sources WHERE user_id = $1 ORDER BY created_at DESC"#
|
||||||
|
)
|
||||||
|
.bind(user_id)
|
||||||
|
.fetch_all(&self.pool)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let mut sources = Vec::new();
|
||||||
|
for row in rows {
|
||||||
|
sources.push(crate::models::Source {
|
||||||
|
id: row.get("id"),
|
||||||
|
user_id: row.get("user_id"),
|
||||||
|
name: row.get("name"),
|
||||||
|
source_type: row.get::<String, _>("source_type").try_into().map_err(|e: String| anyhow::anyhow!(e))?,
|
||||||
|
enabled: row.get("enabled"),
|
||||||
|
config: row.get("config"),
|
||||||
|
status: row.get::<String, _>("status").try_into().map_err(|e: String| anyhow::anyhow!(e))?,
|
||||||
|
last_sync_at: row.get("last_sync_at"),
|
||||||
|
last_error: row.get("last_error"),
|
||||||
|
last_error_at: row.get("last_error_at"),
|
||||||
|
total_files_synced: row.get("total_files_synced"),
|
||||||
|
total_files_pending: row.get("total_files_pending"),
|
||||||
|
total_size_bytes: row.get("total_size_bytes"),
|
||||||
|
created_at: row.get("created_at"),
|
||||||
|
updated_at: row.get("updated_at"),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(sources)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn update_source(&self, user_id: Uuid, source_id: Uuid, update: &crate::models::UpdateSource) -> Result<crate::models::Source> {
|
||||||
|
let mut query = String::from("UPDATE sources SET updated_at = NOW()");
|
||||||
|
let mut bind_count = 1;
|
||||||
|
|
||||||
|
if update.name.is_some() {
|
||||||
|
bind_count += 1;
|
||||||
|
query.push_str(&format!(", name = ${}", bind_count));
|
||||||
|
}
|
||||||
|
if update.enabled.is_some() {
|
||||||
|
bind_count += 1;
|
||||||
|
query.push_str(&format!(", enabled = ${}", bind_count));
|
||||||
|
}
|
||||||
|
if update.config.is_some() {
|
||||||
|
bind_count += 1;
|
||||||
|
query.push_str(&format!(", config = ${}", bind_count));
|
||||||
|
}
|
||||||
|
|
||||||
|
bind_count += 1;
|
||||||
|
query.push_str(&format!(" WHERE id = ${}", bind_count));
|
||||||
|
bind_count += 1;
|
||||||
|
query.push_str(&format!(" AND user_id = ${} RETURNING *", bind_count));
|
||||||
|
|
||||||
|
let mut query_builder = sqlx::query(&query);
|
||||||
|
|
||||||
|
// Bind values in order
|
||||||
|
if let Some(name) = &update.name {
|
||||||
|
query_builder = query_builder.bind(name);
|
||||||
|
}
|
||||||
|
if let Some(enabled) = &update.enabled {
|
||||||
|
query_builder = query_builder.bind(enabled);
|
||||||
|
}
|
||||||
|
if let Some(config) = &update.config {
|
||||||
|
query_builder = query_builder.bind(config);
|
||||||
|
}
|
||||||
|
query_builder = query_builder.bind(source_id);
|
||||||
|
query_builder = query_builder.bind(user_id);
|
||||||
|
|
||||||
|
let row = query_builder.fetch_one(&self.pool).await?;
|
||||||
|
|
||||||
|
Ok(crate::models::Source {
|
||||||
|
id: row.get("id"),
|
||||||
|
user_id: row.get("user_id"),
|
||||||
|
name: row.get("name"),
|
||||||
|
source_type: row.get::<String, _>("source_type").try_into().map_err(|e: String| anyhow::anyhow!(e))?,
|
||||||
|
enabled: row.get("enabled"),
|
||||||
|
config: row.get("config"),
|
||||||
|
status: row.get::<String, _>("status").try_into().map_err(|e: String| anyhow::anyhow!(e))?,
|
||||||
|
last_sync_at: row.get("last_sync_at"),
|
||||||
|
last_error: row.get("last_error"),
|
||||||
|
last_error_at: row.get("last_error_at"),
|
||||||
|
total_files_synced: row.get("total_files_synced"),
|
||||||
|
total_files_pending: row.get("total_files_pending"),
|
||||||
|
total_size_bytes: row.get("total_size_bytes"),
|
||||||
|
created_at: row.get("created_at"),
|
||||||
|
updated_at: row.get("updated_at"),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn delete_source(&self, user_id: Uuid, source_id: Uuid) -> Result<bool> {
|
||||||
|
let result = sqlx::query(
|
||||||
|
r#"DELETE FROM sources WHERE id = $1 AND user_id = $2"#
|
||||||
|
)
|
||||||
|
.bind(source_id)
|
||||||
|
.bind(user_id)
|
||||||
|
.execute(&self.pool)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(result.rows_affected() > 0)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn update_source_status(&self, source_id: Uuid, status: crate::models::SourceStatus, error: Option<String>) -> Result<()> {
|
||||||
|
if let Some(error_msg) = error {
|
||||||
|
sqlx::query(
|
||||||
|
r#"UPDATE sources
|
||||||
|
SET status = $1, last_error = $2, last_error_at = NOW(), updated_at = NOW()
|
||||||
|
WHERE id = $3"#
|
||||||
|
)
|
||||||
|
.bind(status.to_string())
|
||||||
|
.bind(error_msg)
|
||||||
|
.bind(source_id)
|
||||||
|
.execute(&self.pool)
|
||||||
|
.await?;
|
||||||
|
} else {
|
||||||
|
sqlx::query(
|
||||||
|
r#"UPDATE sources
|
||||||
|
SET status = $1, updated_at = NOW()
|
||||||
|
WHERE id = $2"#
|
||||||
|
)
|
||||||
|
.bind(status.to_string())
|
||||||
|
.bind(source_id)
|
||||||
|
.execute(&self.pool)
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn update_source_sync_stats(&self, source_id: Uuid, files_synced: i64, files_pending: i64, size_bytes: i64) -> Result<()> {
|
||||||
|
sqlx::query(
|
||||||
|
r#"UPDATE sources
|
||||||
|
SET total_files_synced = $1, total_files_pending = $2, total_size_bytes = $3,
|
||||||
|
last_sync_at = NOW(), updated_at = NOW()
|
||||||
|
WHERE id = $4"#
|
||||||
|
)
|
||||||
|
.bind(files_synced)
|
||||||
|
.bind(files_pending)
|
||||||
|
.bind(size_bytes)
|
||||||
|
.bind(source_id)
|
||||||
|
.execute(&self.pool)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn get_recent_documents_for_source(&self, source_id: Uuid, limit: i64) -> Result<Vec<Document>> {
|
||||||
|
let rows = sqlx::query(
|
||||||
|
r#"SELECT * FROM documents
|
||||||
|
WHERE source_id = $1
|
||||||
|
ORDER BY created_at DESC
|
||||||
|
LIMIT $2"#
|
||||||
|
)
|
||||||
|
.bind(source_id)
|
||||||
|
.bind(limit)
|
||||||
|
.fetch_all(&self.pool)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let mut documents = Vec::new();
|
||||||
|
for row in rows {
|
||||||
|
documents.push(Document {
|
||||||
|
id: row.get("id"),
|
||||||
|
filename: row.get("filename"),
|
||||||
|
original_filename: row.get("original_filename"),
|
||||||
|
file_path: row.get("file_path"),
|
||||||
|
file_size: row.get("file_size"),
|
||||||
|
mime_type: row.get("mime_type"),
|
||||||
|
content: row.get("content"),
|
||||||
|
ocr_text: row.get("ocr_text"),
|
||||||
|
ocr_confidence: row.get("ocr_confidence"),
|
||||||
|
ocr_word_count: row.get("ocr_word_count"),
|
||||||
|
ocr_processing_time_ms: row.get("ocr_processing_time_ms"),
|
||||||
|
ocr_status: row.get("ocr_status"),
|
||||||
|
ocr_error: row.get("ocr_error"),
|
||||||
|
ocr_completed_at: row.get("ocr_completed_at"),
|
||||||
|
tags: row.get("tags"),
|
||||||
|
created_at: row.get("created_at"),
|
||||||
|
updated_at: row.get("updated_at"),
|
||||||
|
user_id: row.get("user_id"),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(documents)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -31,6 +31,7 @@ use db::Database;
|
||||||
pub struct AppState {
|
pub struct AppState {
|
||||||
pub db: Database,
|
pub db: Database,
|
||||||
pub config: Config,
|
pub config: Config,
|
||||||
|
pub webdav_scheduler: Option<std::sync::Arc<webdav_scheduler::WebDAVScheduler>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Health check endpoint for monitoring
|
/// Health check endpoint for monitoring
|
||||||
|
|
|
||||||
62
src/main.rs
62
src/main.rs
|
|
@ -128,26 +128,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let state = AppState { db, config: config.clone() };
|
let state = AppState {
|
||||||
|
db,
|
||||||
|
config: config.clone(),
|
||||||
|
webdav_scheduler: None, // Will be set after creating scheduler
|
||||||
|
};
|
||||||
let state = Arc::new(state);
|
let state = Arc::new(state);
|
||||||
|
|
||||||
let app = Router::new()
|
|
||||||
.route("/api/health", get(readur::health_check))
|
|
||||||
.nest("/api/auth", readur::routes::auth::router())
|
|
||||||
.nest("/api/documents", readur::routes::documents::router())
|
|
||||||
.nest("/api/metrics", readur::routes::metrics::router())
|
|
||||||
.nest("/metrics", readur::routes::prometheus_metrics::router())
|
|
||||||
.nest("/api/notifications", readur::routes::notifications::router())
|
|
||||||
.nest("/api/queue", readur::routes::queue::router())
|
|
||||||
.nest("/api/search", readur::routes::search::router())
|
|
||||||
.nest("/api/settings", readur::routes::settings::router())
|
|
||||||
.nest("/api/users", readur::routes::users::router())
|
|
||||||
.nest("/api/webdav", readur::routes::webdav::router())
|
|
||||||
.merge(readur::swagger::create_swagger_router())
|
|
||||||
.fallback_service(ServeDir::new("frontend/dist").fallback(ServeFile::new("frontend/dist/index.html")))
|
|
||||||
.layer(CorsLayer::permissive())
|
|
||||||
.with_state(state.clone());
|
|
||||||
|
|
||||||
let watcher_config = config.clone();
|
let watcher_config = config.clone();
|
||||||
let watcher_db = state.db.clone();
|
let watcher_db = state.db.clone();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
|
|
@ -197,13 +184,46 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// Create WebDAV scheduler and update AppState
|
||||||
|
let webdav_scheduler = Arc::new(readur::webdav_scheduler::WebDAVScheduler::new(state.clone()));
|
||||||
|
|
||||||
|
// Update the state to include the scheduler
|
||||||
|
let updated_state = AppState {
|
||||||
|
db: state.db.clone(),
|
||||||
|
config: state.config.clone(),
|
||||||
|
webdav_scheduler: Some(webdav_scheduler.clone()),
|
||||||
|
};
|
||||||
|
let state = Arc::new(updated_state);
|
||||||
|
|
||||||
// Start WebDAV background sync scheduler on background runtime
|
// Start WebDAV background sync scheduler on background runtime
|
||||||
let webdav_scheduler = readur::webdav_scheduler::WebDAVScheduler::new(state.clone());
|
let scheduler_for_background = webdav_scheduler.clone();
|
||||||
background_runtime.spawn(async move {
|
background_runtime.spawn(async move {
|
||||||
info!("Starting WebDAV background sync scheduler");
|
info!("Starting WebDAV background sync scheduler with 30-second startup delay");
|
||||||
webdav_scheduler.start().await;
|
// Wait 30 seconds before starting scheduler to allow server to fully initialize
|
||||||
|
tokio::time::sleep(tokio::time::Duration::from_secs(30)).await;
|
||||||
|
info!("WebDAV background sync scheduler starting after startup delay");
|
||||||
|
scheduler_for_background.start().await;
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// Create the router with the updated state
|
||||||
|
let app = Router::new()
|
||||||
|
.route("/api/health", get(readur::health_check))
|
||||||
|
.nest("/api/auth", readur::routes::auth::router())
|
||||||
|
.nest("/api/documents", readur::routes::documents::router())
|
||||||
|
.nest("/api/metrics", readur::routes::metrics::router())
|
||||||
|
.nest("/metrics", readur::routes::prometheus_metrics::router())
|
||||||
|
.nest("/api/notifications", readur::routes::notifications::router())
|
||||||
|
.nest("/api/queue", readur::routes::queue::router())
|
||||||
|
.nest("/api/search", readur::routes::search::router())
|
||||||
|
.nest("/api/settings", readur::routes::settings::router())
|
||||||
|
.nest("/api/sources", readur::routes::sources::router())
|
||||||
|
.nest("/api/users", readur::routes::users::router())
|
||||||
|
.nest("/api/webdav", readur::routes::webdav::router())
|
||||||
|
.merge(readur::swagger::create_swagger_router())
|
||||||
|
.fallback_service(ServeDir::new("frontend/dist").fallback(ServeFile::new("frontend/dist/index.html")))
|
||||||
|
.layer(CorsLayer::permissive())
|
||||||
|
.with_state(state.clone());
|
||||||
|
|
||||||
let listener = tokio::net::TcpListener::bind(&config.server_address).await?;
|
let listener = tokio::net::TcpListener::bind(&config.server_address).await?;
|
||||||
info!("Server starting on {}", config.server_address);
|
info!("Server starting on {}", config.server_address);
|
||||||
|
|
||||||
|
|
|
||||||
159
src/models.rs
159
src/models.rs
|
|
@ -620,4 +620,163 @@ pub struct FileInfo {
|
||||||
pub last_modified: Option<DateTime<Utc>>,
|
pub last_modified: Option<DateTime<Utc>>,
|
||||||
pub etag: String,
|
pub etag: String,
|
||||||
pub is_directory: bool,
|
pub is_directory: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, ToSchema)]
|
||||||
|
pub enum SourceType {
|
||||||
|
#[serde(rename = "webdav")]
|
||||||
|
WebDAV,
|
||||||
|
#[serde(rename = "local_folder")]
|
||||||
|
LocalFolder,
|
||||||
|
#[serde(rename = "s3")]
|
||||||
|
S3,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl std::fmt::Display for SourceType {
|
||||||
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
|
match self {
|
||||||
|
SourceType::WebDAV => write!(f, "webdav"),
|
||||||
|
SourceType::LocalFolder => write!(f, "local_folder"),
|
||||||
|
SourceType::S3 => write!(f, "s3"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TryFrom<String> for SourceType {
|
||||||
|
type Error = String;
|
||||||
|
|
||||||
|
fn try_from(value: String) -> Result<Self, Self::Error> {
|
||||||
|
match value.as_str() {
|
||||||
|
"webdav" => Ok(SourceType::WebDAV),
|
||||||
|
"local_folder" => Ok(SourceType::LocalFolder),
|
||||||
|
"s3" => Ok(SourceType::S3),
|
||||||
|
_ => Err(format!("Invalid source type: {}", value)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, ToSchema)]
|
||||||
|
pub enum SourceStatus {
|
||||||
|
#[serde(rename = "idle")]
|
||||||
|
Idle,
|
||||||
|
#[serde(rename = "syncing")]
|
||||||
|
Syncing,
|
||||||
|
#[serde(rename = "error")]
|
||||||
|
Error,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl std::fmt::Display for SourceStatus {
|
||||||
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
|
match self {
|
||||||
|
SourceStatus::Idle => write!(f, "idle"),
|
||||||
|
SourceStatus::Syncing => write!(f, "syncing"),
|
||||||
|
SourceStatus::Error => write!(f, "error"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TryFrom<String> for SourceStatus {
|
||||||
|
type Error = String;
|
||||||
|
|
||||||
|
fn try_from(value: String) -> Result<Self, <SourceStatus as TryFrom<String>>::Error> {
|
||||||
|
match value.as_str() {
|
||||||
|
"idle" => Ok(SourceStatus::Idle),
|
||||||
|
"syncing" => Ok(SourceStatus::Syncing),
|
||||||
|
"error" => Ok(SourceStatus::Error),
|
||||||
|
_ => Err(format!("Invalid source status: {}", value)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize, FromRow, ToSchema)]
|
||||||
|
pub struct Source {
|
||||||
|
pub id: Uuid,
|
||||||
|
pub user_id: Uuid,
|
||||||
|
pub name: String,
|
||||||
|
#[sqlx(try_from = "String")]
|
||||||
|
pub source_type: SourceType,
|
||||||
|
pub enabled: bool,
|
||||||
|
pub config: serde_json::Value,
|
||||||
|
#[sqlx(try_from = "String")]
|
||||||
|
pub status: SourceStatus,
|
||||||
|
pub last_sync_at: Option<DateTime<Utc>>,
|
||||||
|
pub last_error: Option<String>,
|
||||||
|
pub last_error_at: Option<DateTime<Utc>>,
|
||||||
|
pub total_files_synced: i64,
|
||||||
|
pub total_files_pending: i64,
|
||||||
|
pub total_size_bytes: i64,
|
||||||
|
pub created_at: DateTime<Utc>,
|
||||||
|
pub updated_at: DateTime<Utc>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize, Deserialize, ToSchema)]
|
||||||
|
pub struct SourceResponse {
|
||||||
|
pub id: Uuid,
|
||||||
|
pub name: String,
|
||||||
|
pub source_type: SourceType,
|
||||||
|
pub enabled: bool,
|
||||||
|
pub config: serde_json::Value,
|
||||||
|
pub status: SourceStatus,
|
||||||
|
pub last_sync_at: Option<DateTime<Utc>>,
|
||||||
|
pub last_error: Option<String>,
|
||||||
|
pub last_error_at: Option<DateTime<Utc>>,
|
||||||
|
pub total_files_synced: i64,
|
||||||
|
pub total_files_pending: i64,
|
||||||
|
pub total_size_bytes: i64,
|
||||||
|
pub created_at: DateTime<Utc>,
|
||||||
|
pub updated_at: DateTime<Utc>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize, Deserialize, ToSchema)]
|
||||||
|
pub struct CreateSource {
|
||||||
|
pub name: String,
|
||||||
|
pub source_type: SourceType,
|
||||||
|
pub enabled: Option<bool>,
|
||||||
|
pub config: serde_json::Value,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize, Deserialize, ToSchema)]
|
||||||
|
pub struct UpdateSource {
|
||||||
|
pub name: Option<String>,
|
||||||
|
pub enabled: Option<bool>,
|
||||||
|
pub config: Option<serde_json::Value>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize, Deserialize, ToSchema)]
|
||||||
|
pub struct SourceWithStats {
|
||||||
|
pub source: SourceResponse,
|
||||||
|
pub recent_documents: Vec<DocumentResponse>,
|
||||||
|
pub sync_progress: Option<f32>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<Source> for SourceResponse {
|
||||||
|
fn from(source: Source) -> Self {
|
||||||
|
Self {
|
||||||
|
id: source.id,
|
||||||
|
name: source.name,
|
||||||
|
source_type: source.source_type,
|
||||||
|
enabled: source.enabled,
|
||||||
|
config: source.config,
|
||||||
|
status: source.status,
|
||||||
|
last_sync_at: source.last_sync_at,
|
||||||
|
last_error: source.last_error,
|
||||||
|
last_error_at: source.last_error_at,
|
||||||
|
total_files_synced: source.total_files_synced,
|
||||||
|
total_files_pending: source.total_files_pending,
|
||||||
|
total_size_bytes: source.total_size_bytes,
|
||||||
|
created_at: source.created_at,
|
||||||
|
updated_at: source.updated_at,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize, Deserialize, ToSchema)]
|
||||||
|
pub struct WebDAVSourceConfig {
|
||||||
|
pub server_url: String,
|
||||||
|
pub username: String,
|
||||||
|
pub password: String,
|
||||||
|
pub watch_folders: Vec<String>,
|
||||||
|
pub file_extensions: Vec<String>,
|
||||||
|
pub auto_sync: bool,
|
||||||
|
pub sync_interval_minutes: i32,
|
||||||
}
|
}
|
||||||
|
|
@ -6,5 +6,6 @@ pub mod prometheus_metrics;
|
||||||
pub mod queue;
|
pub mod queue;
|
||||||
pub mod search;
|
pub mod search;
|
||||||
pub mod settings;
|
pub mod settings;
|
||||||
|
pub mod sources;
|
||||||
pub mod users;
|
pub mod users;
|
||||||
pub mod webdav;
|
pub mod webdav;
|
||||||
|
|
@ -0,0 +1,362 @@
|
||||||
|
use axum::{
|
||||||
|
extract::{Path, State},
|
||||||
|
http::StatusCode,
|
||||||
|
response::Json,
|
||||||
|
routing::{delete, get, post, put},
|
||||||
|
Router,
|
||||||
|
};
|
||||||
|
use std::sync::Arc;
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
use crate::{
|
||||||
|
auth::AuthUser,
|
||||||
|
models::{CreateSource, SourceResponse, SourceWithStats, UpdateSource},
|
||||||
|
AppState,
|
||||||
|
};
|
||||||
|
|
||||||
|
pub fn router() -> Router<Arc<AppState>> {
|
||||||
|
Router::new()
|
||||||
|
.route("/", get(list_sources).post(create_source))
|
||||||
|
.route("/{id}", get(get_source).put(update_source).delete(delete_source))
|
||||||
|
.route("/{id}/sync", post(trigger_sync))
|
||||||
|
.route("/{id}/test", post(test_connection))
|
||||||
|
}
|
||||||
|
|
||||||
|
#[utoipa::path(
|
||||||
|
get,
|
||||||
|
path = "/api/sources",
|
||||||
|
tag = "sources",
|
||||||
|
security(
|
||||||
|
("bearer_auth" = [])
|
||||||
|
),
|
||||||
|
responses(
|
||||||
|
(status = 200, description = "List of user sources", body = Vec<SourceResponse>),
|
||||||
|
(status = 401, description = "Unauthorized")
|
||||||
|
)
|
||||||
|
)]
|
||||||
|
async fn list_sources(
|
||||||
|
auth_user: AuthUser,
|
||||||
|
State(state): State<Arc<AppState>>,
|
||||||
|
) -> Result<Json<Vec<SourceResponse>>, StatusCode> {
|
||||||
|
let sources = state
|
||||||
|
.db
|
||||||
|
.get_sources(auth_user.user.id)
|
||||||
|
.await
|
||||||
|
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
|
||||||
|
|
||||||
|
let responses: Vec<SourceResponse> = sources.into_iter().map(|s| s.into()).collect();
|
||||||
|
Ok(Json(responses))
|
||||||
|
}
|
||||||
|
|
||||||
|
#[utoipa::path(
|
||||||
|
post,
|
||||||
|
path = "/api/sources",
|
||||||
|
tag = "sources",
|
||||||
|
security(
|
||||||
|
("bearer_auth" = [])
|
||||||
|
),
|
||||||
|
request_body = CreateSource,
|
||||||
|
responses(
|
||||||
|
(status = 201, description = "Source created successfully", body = SourceResponse),
|
||||||
|
(status = 400, description = "Bad request - invalid source data"),
|
||||||
|
(status = 401, description = "Unauthorized")
|
||||||
|
)
|
||||||
|
)]
|
||||||
|
async fn create_source(
|
||||||
|
auth_user: AuthUser,
|
||||||
|
State(state): State<Arc<AppState>>,
|
||||||
|
Json(source_data): Json<CreateSource>,
|
||||||
|
) -> Result<Json<SourceResponse>, StatusCode> {
|
||||||
|
// Validate source configuration based on type
|
||||||
|
if let Err(_) = validate_source_config(&source_data) {
|
||||||
|
return Err(StatusCode::BAD_REQUEST);
|
||||||
|
}
|
||||||
|
|
||||||
|
let source = state
|
||||||
|
.db
|
||||||
|
.create_source(auth_user.user.id, &source_data)
|
||||||
|
.await
|
||||||
|
.map_err(|_| StatusCode::BAD_REQUEST)?;
|
||||||
|
|
||||||
|
Ok(Json(source.into()))
|
||||||
|
}
|
||||||
|
|
||||||
|
#[utoipa::path(
|
||||||
|
get,
|
||||||
|
path = "/api/sources/{id}",
|
||||||
|
tag = "sources",
|
||||||
|
security(
|
||||||
|
("bearer_auth" = [])
|
||||||
|
),
|
||||||
|
params(
|
||||||
|
("id" = Uuid, Path, description = "Source ID")
|
||||||
|
),
|
||||||
|
responses(
|
||||||
|
(status = 200, description = "Source details with stats", body = SourceWithStats),
|
||||||
|
(status = 404, description = "Source not found"),
|
||||||
|
(status = 401, description = "Unauthorized")
|
||||||
|
)
|
||||||
|
)]
|
||||||
|
async fn get_source(
|
||||||
|
auth_user: AuthUser,
|
||||||
|
Path(source_id): Path<Uuid>,
|
||||||
|
State(state): State<Arc<AppState>>,
|
||||||
|
) -> Result<Json<SourceWithStats>, StatusCode> {
|
||||||
|
let source = state
|
||||||
|
.db
|
||||||
|
.get_source(auth_user.user.id, source_id)
|
||||||
|
.await
|
||||||
|
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
|
||||||
|
.ok_or(StatusCode::NOT_FOUND)?;
|
||||||
|
|
||||||
|
// Get recent documents for this source
|
||||||
|
let recent_documents = state
|
||||||
|
.db
|
||||||
|
.get_recent_documents_for_source(source_id, 10)
|
||||||
|
.await
|
||||||
|
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
|
||||||
|
|
||||||
|
// Calculate sync progress
|
||||||
|
let sync_progress = if source.total_files_pending > 0 {
|
||||||
|
Some(
|
||||||
|
(source.total_files_synced as f32
|
||||||
|
/ (source.total_files_synced + source.total_files_pending) as f32)
|
||||||
|
* 100.0,
|
||||||
|
)
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
|
|
||||||
|
let response = SourceWithStats {
|
||||||
|
source: source.into(),
|
||||||
|
recent_documents: recent_documents.into_iter().map(|d| d.into()).collect(),
|
||||||
|
sync_progress,
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(Json(response))
|
||||||
|
}
|
||||||
|
|
||||||
|
#[utoipa::path(
|
||||||
|
put,
|
||||||
|
path = "/api/sources/{id}",
|
||||||
|
tag = "sources",
|
||||||
|
security(
|
||||||
|
("bearer_auth" = [])
|
||||||
|
),
|
||||||
|
params(
|
||||||
|
("id" = Uuid, Path, description = "Source ID")
|
||||||
|
),
|
||||||
|
request_body = UpdateSource,
|
||||||
|
responses(
|
||||||
|
(status = 200, description = "Source updated successfully", body = SourceResponse),
|
||||||
|
(status = 404, description = "Source not found"),
|
||||||
|
(status = 400, description = "Bad request - invalid update data"),
|
||||||
|
(status = 401, description = "Unauthorized")
|
||||||
|
)
|
||||||
|
)]
|
||||||
|
async fn update_source(
|
||||||
|
auth_user: AuthUser,
|
||||||
|
Path(source_id): Path<Uuid>,
|
||||||
|
State(state): State<Arc<AppState>>,
|
||||||
|
Json(update_data): Json<UpdateSource>,
|
||||||
|
) -> Result<Json<SourceResponse>, StatusCode> {
|
||||||
|
// Check if source exists
|
||||||
|
let existing = state
|
||||||
|
.db
|
||||||
|
.get_source(auth_user.user.id, source_id)
|
||||||
|
.await
|
||||||
|
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
|
||||||
|
.ok_or(StatusCode::NOT_FOUND)?;
|
||||||
|
|
||||||
|
// Validate config if provided
|
||||||
|
if let Some(config) = &update_data.config {
|
||||||
|
if let Err(_) = validate_config_for_type(&existing.source_type, config) {
|
||||||
|
return Err(StatusCode::BAD_REQUEST);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let source = state
|
||||||
|
.db
|
||||||
|
.update_source(auth_user.user.id, source_id, &update_data)
|
||||||
|
.await
|
||||||
|
.map_err(|_| StatusCode::BAD_REQUEST)?;
|
||||||
|
|
||||||
|
Ok(Json(source.into()))
|
||||||
|
}
|
||||||
|
|
||||||
|
#[utoipa::path(
|
||||||
|
delete,
|
||||||
|
path = "/api/sources/{id}",
|
||||||
|
tag = "sources",
|
||||||
|
security(
|
||||||
|
("bearer_auth" = [])
|
||||||
|
),
|
||||||
|
params(
|
||||||
|
("id" = Uuid, Path, description = "Source ID")
|
||||||
|
),
|
||||||
|
responses(
|
||||||
|
(status = 204, description = "Source deleted successfully"),
|
||||||
|
(status = 404, description = "Source not found"),
|
||||||
|
(status = 401, description = "Unauthorized")
|
||||||
|
)
|
||||||
|
)]
|
||||||
|
async fn delete_source(
|
||||||
|
auth_user: AuthUser,
|
||||||
|
Path(source_id): Path<Uuid>,
|
||||||
|
State(state): State<Arc<AppState>>,
|
||||||
|
) -> Result<StatusCode, StatusCode> {
|
||||||
|
let deleted = state
|
||||||
|
.db
|
||||||
|
.delete_source(auth_user.user.id, source_id)
|
||||||
|
.await
|
||||||
|
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
|
||||||
|
|
||||||
|
if deleted {
|
||||||
|
Ok(StatusCode::NO_CONTENT)
|
||||||
|
} else {
|
||||||
|
Err(StatusCode::NOT_FOUND)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[utoipa::path(
|
||||||
|
post,
|
||||||
|
path = "/api/sources/{id}/sync",
|
||||||
|
tag = "sources",
|
||||||
|
security(
|
||||||
|
("bearer_auth" = [])
|
||||||
|
),
|
||||||
|
params(
|
||||||
|
("id" = Uuid, Path, description = "Source ID")
|
||||||
|
),
|
||||||
|
responses(
|
||||||
|
(status = 200, description = "Sync triggered successfully"),
|
||||||
|
(status = 404, description = "Source not found"),
|
||||||
|
(status = 409, description = "Source is already syncing"),
|
||||||
|
(status = 401, description = "Unauthorized")
|
||||||
|
)
|
||||||
|
)]
|
||||||
|
async fn trigger_sync(
|
||||||
|
auth_user: AuthUser,
|
||||||
|
Path(source_id): Path<Uuid>,
|
||||||
|
State(state): State<Arc<AppState>>,
|
||||||
|
) -> Result<StatusCode, StatusCode> {
|
||||||
|
let source = state
|
||||||
|
.db
|
||||||
|
.get_source(auth_user.user.id, source_id)
|
||||||
|
.await
|
||||||
|
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
|
||||||
|
.ok_or(StatusCode::NOT_FOUND)?;
|
||||||
|
|
||||||
|
// Check if already syncing
|
||||||
|
if matches!(source.status, crate::models::SourceStatus::Syncing) {
|
||||||
|
return Err(StatusCode::CONFLICT);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update status to syncing
|
||||||
|
state
|
||||||
|
.db
|
||||||
|
.update_source_status(source_id, crate::models::SourceStatus::Syncing, None)
|
||||||
|
.await
|
||||||
|
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
|
||||||
|
|
||||||
|
// Trigger the appropriate sync based on source type
|
||||||
|
match source.source_type {
|
||||||
|
crate::models::SourceType::WebDAV => {
|
||||||
|
// Send a message to trigger WebDAV sync
|
||||||
|
if let Some(scheduler) = &state.webdav_scheduler {
|
||||||
|
scheduler.trigger_sync(source_id).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ => {
|
||||||
|
// Other source types not implemented yet
|
||||||
|
state
|
||||||
|
.db
|
||||||
|
.update_source_status(
|
||||||
|
source_id,
|
||||||
|
crate::models::SourceStatus::Error,
|
||||||
|
Some("Source type not implemented".to_string()),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
|
||||||
|
return Err(StatusCode::NOT_IMPLEMENTED);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(StatusCode::OK)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[utoipa::path(
|
||||||
|
post,
|
||||||
|
path = "/api/sources/{id}/test",
|
||||||
|
tag = "sources",
|
||||||
|
security(
|
||||||
|
("bearer_auth" = [])
|
||||||
|
),
|
||||||
|
params(
|
||||||
|
("id" = Uuid, Path, description = "Source ID")
|
||||||
|
),
|
||||||
|
responses(
|
||||||
|
(status = 200, description = "Connection test result", body = serde_json::Value),
|
||||||
|
(status = 404, description = "Source not found"),
|
||||||
|
(status = 401, description = "Unauthorized")
|
||||||
|
)
|
||||||
|
)]
|
||||||
|
async fn test_connection(
|
||||||
|
auth_user: AuthUser,
|
||||||
|
Path(source_id): Path<Uuid>,
|
||||||
|
State(state): State<Arc<AppState>>,
|
||||||
|
) -> Result<Json<serde_json::Value>, StatusCode> {
|
||||||
|
let source = state
|
||||||
|
.db
|
||||||
|
.get_source(auth_user.user.id, source_id)
|
||||||
|
.await
|
||||||
|
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
|
||||||
|
.ok_or(StatusCode::NOT_FOUND)?;
|
||||||
|
|
||||||
|
match source.source_type {
|
||||||
|
crate::models::SourceType::WebDAV => {
|
||||||
|
// Test WebDAV connection
|
||||||
|
let config: crate::models::WebDAVSourceConfig = serde_json::from_value(source.config)
|
||||||
|
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
|
||||||
|
|
||||||
|
match crate::webdav_service::test_webdav_connection(
|
||||||
|
&config.server_url,
|
||||||
|
&config.username,
|
||||||
|
&config.password,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(success) => Ok(Json(serde_json::json!({
|
||||||
|
"success": success,
|
||||||
|
"message": if success { "Connection successful" } else { "Connection failed" }
|
||||||
|
}))),
|
||||||
|
Err(e) => Ok(Json(serde_json::json!({
|
||||||
|
"success": false,
|
||||||
|
"message": format!("Connection failed: {}", e)
|
||||||
|
}))),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ => Ok(Json(serde_json::json!({
|
||||||
|
"success": false,
|
||||||
|
"message": "Source type not implemented"
|
||||||
|
}))),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn validate_source_config(source: &CreateSource) -> Result<(), &'static str> {
|
||||||
|
validate_config_for_type(&source.source_type, &source.config)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn validate_config_for_type(
|
||||||
|
source_type: &crate::models::SourceType,
|
||||||
|
config: &serde_json::Value,
|
||||||
|
) -> Result<(), &'static str> {
|
||||||
|
match source_type {
|
||||||
|
crate::models::SourceType::WebDAV => {
|
||||||
|
let _: crate::models::WebDAVSourceConfig =
|
||||||
|
serde_json::from_value(config.clone()).map_err(|_| "Invalid WebDAV configuration")?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
_ => Ok(()), // Other types not implemented yet
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -87,6 +87,14 @@ async fn perform_sync_internal(
|
||||||
|
|
||||||
// Process each watch folder
|
// Process each watch folder
|
||||||
for folder_path in &config.watch_folders {
|
for folder_path in &config.watch_folders {
|
||||||
|
// Check if sync has been cancelled before processing each folder
|
||||||
|
if let Ok(Some(sync_state)) = state.db.get_webdav_sync_state(user_id).await {
|
||||||
|
if !sync_state.is_running {
|
||||||
|
info!("WebDAV sync cancelled, stopping folder processing");
|
||||||
|
return Err("Sync cancelled by user".into());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
info!("Syncing folder: {}", folder_path);
|
info!("Syncing folder: {}", folder_path);
|
||||||
|
|
||||||
// Update current folder in sync state
|
// Update current folder in sync state
|
||||||
|
|
@ -161,6 +169,17 @@ async fn perform_sync_internal(
|
||||||
|
|
||||||
// Process files concurrently and collect results
|
// Process files concurrently and collect results
|
||||||
while let Some(result) = file_futures.next().await {
|
while let Some(result) = file_futures.next().await {
|
||||||
|
// Check if sync has been cancelled
|
||||||
|
if let Ok(Some(sync_state)) = state.db.get_webdav_sync_state(user_id).await {
|
||||||
|
if !sync_state.is_running {
|
||||||
|
info!("WebDAV sync cancelled during file processing, stopping");
|
||||||
|
// Cancel remaining futures
|
||||||
|
file_futures.clear();
|
||||||
|
sync_errors.push("Sync cancelled by user during file processing".to_string());
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
match result {
|
match result {
|
||||||
Ok(processed) => {
|
Ok(processed) => {
|
||||||
if processed {
|
if processed {
|
||||||
|
|
@ -215,6 +234,14 @@ async fn process_single_file(
|
||||||
// Acquire semaphore permit to limit concurrent downloads
|
// Acquire semaphore permit to limit concurrent downloads
|
||||||
let _permit = semaphore.acquire().await.map_err(|e| format!("Semaphore error: {}", e))?;
|
let _permit = semaphore.acquire().await.map_err(|e| format!("Semaphore error: {}", e))?;
|
||||||
|
|
||||||
|
// Check if sync has been cancelled before processing this file
|
||||||
|
if let Ok(Some(sync_state)) = state.db.get_webdav_sync_state(user_id).await {
|
||||||
|
if !sync_state.is_running {
|
||||||
|
info!("Sync cancelled, skipping file: {}", file_info.path);
|
||||||
|
return Err("Sync cancelled by user".to_string());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
info!("Processing file: {}", file_info.path);
|
info!("Processing file: {}", file_info.path);
|
||||||
|
|
||||||
// Check if we've already processed this file
|
// Check if we've already processed this file
|
||||||
|
|
|
||||||
|
|
@ -30,6 +30,11 @@ impl WebDAVScheduler {
|
||||||
pub async fn start(&self) {
|
pub async fn start(&self) {
|
||||||
info!("Starting WebDAV background sync scheduler");
|
info!("Starting WebDAV background sync scheduler");
|
||||||
|
|
||||||
|
// First, check for any interrupted syncs that need to be resumed
|
||||||
|
if let Err(e) = self.resume_interrupted_syncs().await {
|
||||||
|
error!("Error resuming interrupted syncs: {}", e);
|
||||||
|
}
|
||||||
|
|
||||||
let mut interval_timer = interval(self.check_interval);
|
let mut interval_timer = interval(self.check_interval);
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
|
|
@ -41,6 +46,90 @@ impl WebDAVScheduler {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn resume_interrupted_syncs(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||||
|
info!("Checking for interrupted WebDAV syncs to resume");
|
||||||
|
|
||||||
|
// Get all users with settings
|
||||||
|
let users_with_settings = self.db.get_all_user_settings().await?;
|
||||||
|
|
||||||
|
for user_settings in users_with_settings {
|
||||||
|
// Skip if WebDAV is not enabled
|
||||||
|
if !user_settings.webdav_enabled {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if there's an interrupted sync for this user
|
||||||
|
if let Ok(Some(sync_state)) = self.db.get_webdav_sync_state(user_settings.user_id).await {
|
||||||
|
// Check if sync was interrupted (has errors containing "server restart" message)
|
||||||
|
let was_interrupted = sync_state.errors.iter().any(|e| e.contains("server restart"));
|
||||||
|
|
||||||
|
if was_interrupted && user_settings.webdav_auto_sync {
|
||||||
|
info!("Found interrupted WebDAV sync for user {}, will resume", user_settings.user_id);
|
||||||
|
|
||||||
|
// Clear the interruption error and resume sync
|
||||||
|
let cleared_errors: Vec<String> = sync_state.errors.into_iter()
|
||||||
|
.filter(|e| !e.contains("server restart"))
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
let reset_state = crate::models::UpdateWebDAVSyncState {
|
||||||
|
last_sync_at: sync_state.last_sync_at,
|
||||||
|
sync_cursor: sync_state.sync_cursor,
|
||||||
|
is_running: false,
|
||||||
|
files_processed: sync_state.files_processed,
|
||||||
|
files_remaining: 0,
|
||||||
|
current_folder: None,
|
||||||
|
errors: cleared_errors,
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Err(e) = self.db.update_webdav_sync_state(user_settings.user_id, &reset_state).await {
|
||||||
|
error!("Failed to reset interrupted sync state: {}", e);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Trigger a new sync for this user
|
||||||
|
if let Ok(webdav_config) = self.build_webdav_config(&user_settings) {
|
||||||
|
if let Ok(webdav_service) = WebDAVService::new(webdav_config.clone()) {
|
||||||
|
let state_clone = self.state.clone();
|
||||||
|
let user_id = user_settings.user_id;
|
||||||
|
let enable_background_ocr = user_settings.enable_background_ocr;
|
||||||
|
|
||||||
|
info!("Resuming interrupted WebDAV sync for user {}", user_id);
|
||||||
|
|
||||||
|
tokio::spawn(async move {
|
||||||
|
match perform_webdav_sync_with_tracking(state_clone.clone(), user_id, webdav_service, webdav_config, enable_background_ocr).await {
|
||||||
|
Ok(files_processed) => {
|
||||||
|
info!("Resumed WebDAV sync completed for user {}: {} files processed", user_id, files_processed);
|
||||||
|
|
||||||
|
// Send notification
|
||||||
|
let notification = crate::models::CreateNotification {
|
||||||
|
notification_type: "success".to_string(),
|
||||||
|
title: "WebDAV Sync Resumed".to_string(),
|
||||||
|
message: format!("Resumed sync after server restart. Processed {} files", files_processed),
|
||||||
|
action_url: Some("/documents".to_string()),
|
||||||
|
metadata: Some(serde_json::json!({
|
||||||
|
"sync_type": "webdav_resume",
|
||||||
|
"files_processed": files_processed
|
||||||
|
})),
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Err(e) = state_clone.db.create_notification(user_id, ¬ification).await {
|
||||||
|
error!("Failed to create resume notification: {}", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
error!("Resumed WebDAV sync failed for user {}: {}", user_id, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
async fn check_and_sync_users(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
async fn check_and_sync_users(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||||
// Get all users with WebDAV auto-sync enabled
|
// Get all users with WebDAV auto-sync enabled
|
||||||
let users_with_settings = self.db.get_all_user_settings().await?;
|
let users_with_settings = self.db.get_all_user_settings().await?;
|
||||||
|
|
@ -189,5 +278,11 @@ impl WebDAVScheduler {
|
||||||
server_type: Some("nextcloud".to_string()),
|
server_type: Some("nextcloud".to_string()),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn trigger_sync(&self, source_id: uuid::Uuid) {
|
||||||
|
info!("Triggering manual sync for source {}", source_id);
|
||||||
|
// TODO: Implement manual sync trigger for sources
|
||||||
|
// For now, this is a placeholder that the routes can call
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -420,4 +420,23 @@ impl WebDAVService {
|
||||||
Ok(bytes.to_vec())
|
Ok(bytes.to_vec())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn test_webdav_connection(
|
||||||
|
server_url: &str,
|
||||||
|
username: &str,
|
||||||
|
password: &str,
|
||||||
|
) -> Result<bool> {
|
||||||
|
let client = Client::new();
|
||||||
|
|
||||||
|
// Try to list the root directory to test connectivity
|
||||||
|
let response = client
|
||||||
|
.request(Method::from_bytes(b"PROPFIND")?, server_url)
|
||||||
|
.header("Depth", "0")
|
||||||
|
.basic_auth(username, Some(password))
|
||||||
|
.timeout(Duration::from_secs(10))
|
||||||
|
.send()
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(response.status().is_success())
|
||||||
}
|
}
|
||||||
Loading…
Reference in New Issue