feat(server): create 'sources' concept and move WebDAV settings page to it

This commit is contained in:
perf3ct 2025-06-15 16:12:18 +00:00
parent 8ebffe4aa3
commit 317590f9c3
13 changed files with 2130 additions and 21 deletions

View File

@ -12,6 +12,7 @@ import DocumentsPage from './pages/DocumentsPage';
import SearchPage from './pages/SearchPage';
import DocumentDetailsPage from './pages/DocumentDetailsPage';
import SettingsPage from './pages/SettingsPage';
import SourcesPage from './pages/SourcesPage';
import WatchFolderPage from './pages/WatchFolderPage';
function App(): JSX.Element {
@ -65,6 +66,7 @@ function App(): JSX.Element {
<Route path="/documents" element={<DocumentsPage />} />
<Route path="/documents/:id" element={<DocumentDetailsPage />} />
<Route path="/search" element={<SearchPage />} />
<Route path="/sources" element={<SourcesPage />} />
<Route path="/watch" element={<WatchFolderPage />} />
<Route path="/settings" element={<SettingsPage />} />
<Route path="/profile" element={<div>Profile Page - Coming Soon</div>} />

View File

@ -31,6 +31,7 @@ import {
AccountCircle as AccountIcon,
Logout as LogoutIcon,
Description as DocumentIcon,
Storage as StorageIcon,
} from '@mui/icons-material';
import { useNavigate, useLocation } from 'react-router-dom';
import { useAuth } from '../../contexts/AuthContext';
@ -61,6 +62,7 @@ const navigationItems: NavigationItem[] = [
{ text: 'Upload', icon: UploadIcon, path: '/upload' },
{ text: 'Documents', icon: DocumentIcon, path: '/documents' },
{ text: 'Search', icon: SearchIcon, path: '/search' },
{ text: 'Sources', icon: StorageIcon, path: '/sources' },
{ text: 'Watch Folder', icon: FolderIcon, path: '/watch' },
];

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,87 @@
-- Create sources table to support multiple document sources per user
CREATE TABLE IF NOT EXISTS sources (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id UUID REFERENCES users(id) ON DELETE CASCADE,
name TEXT NOT NULL,
source_type TEXT NOT NULL, -- 'webdav', 'local_folder', 's3', etc.
enabled BOOLEAN DEFAULT TRUE,
-- Configuration (JSON to allow flexibility for different source types)
config JSONB NOT NULL DEFAULT '{}',
-- Status tracking
status TEXT DEFAULT 'idle', -- 'idle', 'syncing', 'error'
last_sync_at TIMESTAMPTZ,
last_error TEXT,
last_error_at TIMESTAMPTZ,
-- Statistics
total_files_synced BIGINT DEFAULT 0,
total_files_pending BIGINT DEFAULT 0,
total_size_bytes BIGINT DEFAULT 0,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW(),
UNIQUE(user_id, name)
);
-- Create indexes for performance
CREATE INDEX IF NOT EXISTS idx_sources_user_id ON sources(user_id);
CREATE INDEX IF NOT EXISTS idx_sources_source_type ON sources(source_type);
CREATE INDEX IF NOT EXISTS idx_sources_status ON sources(status);
-- Update documents table to link to sources
ALTER TABLE documents ADD COLUMN IF NOT EXISTS source_id UUID REFERENCES sources(id) ON DELETE SET NULL;
CREATE INDEX IF NOT EXISTS idx_documents_source_id ON documents(source_id);
-- Update webdav_files table to link to sources instead of users directly
ALTER TABLE webdav_files ADD COLUMN IF NOT EXISTS source_id UUID REFERENCES sources(id) ON DELETE CASCADE;
-- Migrate existing WebDAV settings to sources table
INSERT INTO sources (user_id, name, source_type, enabled, config, created_at, updated_at)
SELECT
s.user_id,
'WebDAV Server' as name,
'webdav' as source_type,
s.webdav_enabled as enabled,
jsonb_build_object(
'server_url', s.webdav_server_url,
'username', s.webdav_username,
'password', s.webdav_password,
'watch_folders', s.webdav_watch_folders,
'file_extensions', s.webdav_file_extensions,
'auto_sync', s.webdav_auto_sync,
'sync_interval_minutes', s.webdav_sync_interval_minutes
) as config,
NOW() as created_at,
NOW() as updated_at
FROM settings s
WHERE s.webdav_enabled = TRUE
AND s.webdav_server_url IS NOT NULL
AND s.webdav_username IS NOT NULL;
-- Update webdav_files to link to the newly created sources
UPDATE webdav_files wf
SET source_id = s.id
FROM sources s
WHERE wf.user_id = s.user_id
AND s.source_type = 'webdav';
-- Create a function to update the updated_at timestamp
CREATE OR REPLACE FUNCTION update_sources_updated_at()
RETURNS TRIGGER AS $$
BEGIN
NEW.updated_at = NOW();
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
-- Create trigger to auto-update updated_at
CREATE TRIGGER sources_updated_at_trigger
BEFORE UPDATE ON sources
FOR EACH ROW
EXECUTE FUNCTION update_sources_updated_at();
-- Note: We're keeping the webdav fields in settings table for now to ensure backward compatibility
-- They will be removed in a future migration after ensuring all code is updated

256
src/db.rs
View File

@ -1836,4 +1836,260 @@ impl Database {
Ok(files)
}
// Sources methods
pub async fn create_source(&self, user_id: Uuid, source: &crate::models::CreateSource) -> Result<crate::models::Source> {
let id = Uuid::new_v4();
let now = Utc::now();
let row = sqlx::query(
r#"INSERT INTO sources (id, user_id, name, source_type, enabled, config, status, created_at, updated_at)
VALUES ($1, $2, $3, $4, $5, $6, 'idle', $7, $8)
RETURNING *"#
)
.bind(id)
.bind(user_id)
.bind(&source.name)
.bind(source.source_type.to_string())
.bind(source.enabled.unwrap_or(true))
.bind(&source.config)
.bind(now)
.bind(now)
.fetch_one(&self.pool)
.await?;
Ok(crate::models::Source {
id: row.get("id"),
user_id: row.get("user_id"),
name: row.get("name"),
source_type: row.get::<String, _>("source_type").try_into().map_err(|e: String| anyhow::anyhow!(e))?,
enabled: row.get("enabled"),
config: row.get("config"),
status: row.get::<String, _>("status").try_into().map_err(|e: String| anyhow::anyhow!(e))?,
last_sync_at: row.get("last_sync_at"),
last_error: row.get("last_error"),
last_error_at: row.get("last_error_at"),
total_files_synced: row.get("total_files_synced"),
total_files_pending: row.get("total_files_pending"),
total_size_bytes: row.get("total_size_bytes"),
created_at: row.get("created_at"),
updated_at: row.get("updated_at"),
})
}
pub async fn get_source(&self, user_id: Uuid, source_id: Uuid) -> Result<Option<crate::models::Source>> {
let row = sqlx::query(
r#"SELECT * FROM sources WHERE id = $1 AND user_id = $2"#
)
.bind(source_id)
.bind(user_id)
.fetch_optional(&self.pool)
.await?;
match row {
Some(row) => Ok(Some(crate::models::Source {
id: row.get("id"),
user_id: row.get("user_id"),
name: row.get("name"),
source_type: row.get::<String, _>("source_type").try_into().map_err(|e: String| anyhow::anyhow!(e))?,
enabled: row.get("enabled"),
config: row.get("config"),
status: row.get::<String, _>("status").try_into().map_err(|e: String| anyhow::anyhow!(e))?,
last_sync_at: row.get("last_sync_at"),
last_error: row.get("last_error"),
last_error_at: row.get("last_error_at"),
total_files_synced: row.get("total_files_synced"),
total_files_pending: row.get("total_files_pending"),
total_size_bytes: row.get("total_size_bytes"),
created_at: row.get("created_at"),
updated_at: row.get("updated_at"),
})),
None => Ok(None),
}
}
pub async fn get_sources(&self, user_id: Uuid) -> Result<Vec<crate::models::Source>> {
let rows = sqlx::query(
r#"SELECT * FROM sources WHERE user_id = $1 ORDER BY created_at DESC"#
)
.bind(user_id)
.fetch_all(&self.pool)
.await?;
let mut sources = Vec::new();
for row in rows {
sources.push(crate::models::Source {
id: row.get("id"),
user_id: row.get("user_id"),
name: row.get("name"),
source_type: row.get::<String, _>("source_type").try_into().map_err(|e: String| anyhow::anyhow!(e))?,
enabled: row.get("enabled"),
config: row.get("config"),
status: row.get::<String, _>("status").try_into().map_err(|e: String| anyhow::anyhow!(e))?,
last_sync_at: row.get("last_sync_at"),
last_error: row.get("last_error"),
last_error_at: row.get("last_error_at"),
total_files_synced: row.get("total_files_synced"),
total_files_pending: row.get("total_files_pending"),
total_size_bytes: row.get("total_size_bytes"),
created_at: row.get("created_at"),
updated_at: row.get("updated_at"),
});
}
Ok(sources)
}
pub async fn update_source(&self, user_id: Uuid, source_id: Uuid, update: &crate::models::UpdateSource) -> Result<crate::models::Source> {
let mut query = String::from("UPDATE sources SET updated_at = NOW()");
let mut bind_count = 1;
if update.name.is_some() {
bind_count += 1;
query.push_str(&format!(", name = ${}", bind_count));
}
if update.enabled.is_some() {
bind_count += 1;
query.push_str(&format!(", enabled = ${}", bind_count));
}
if update.config.is_some() {
bind_count += 1;
query.push_str(&format!(", config = ${}", bind_count));
}
bind_count += 1;
query.push_str(&format!(" WHERE id = ${}", bind_count));
bind_count += 1;
query.push_str(&format!(" AND user_id = ${} RETURNING *", bind_count));
let mut query_builder = sqlx::query(&query);
// Bind values in order
if let Some(name) = &update.name {
query_builder = query_builder.bind(name);
}
if let Some(enabled) = &update.enabled {
query_builder = query_builder.bind(enabled);
}
if let Some(config) = &update.config {
query_builder = query_builder.bind(config);
}
query_builder = query_builder.bind(source_id);
query_builder = query_builder.bind(user_id);
let row = query_builder.fetch_one(&self.pool).await?;
Ok(crate::models::Source {
id: row.get("id"),
user_id: row.get("user_id"),
name: row.get("name"),
source_type: row.get::<String, _>("source_type").try_into().map_err(|e: String| anyhow::anyhow!(e))?,
enabled: row.get("enabled"),
config: row.get("config"),
status: row.get::<String, _>("status").try_into().map_err(|e: String| anyhow::anyhow!(e))?,
last_sync_at: row.get("last_sync_at"),
last_error: row.get("last_error"),
last_error_at: row.get("last_error_at"),
total_files_synced: row.get("total_files_synced"),
total_files_pending: row.get("total_files_pending"),
total_size_bytes: row.get("total_size_bytes"),
created_at: row.get("created_at"),
updated_at: row.get("updated_at"),
})
}
pub async fn delete_source(&self, user_id: Uuid, source_id: Uuid) -> Result<bool> {
let result = sqlx::query(
r#"DELETE FROM sources WHERE id = $1 AND user_id = $2"#
)
.bind(source_id)
.bind(user_id)
.execute(&self.pool)
.await?;
Ok(result.rows_affected() > 0)
}
pub async fn update_source_status(&self, source_id: Uuid, status: crate::models::SourceStatus, error: Option<String>) -> Result<()> {
if let Some(error_msg) = error {
sqlx::query(
r#"UPDATE sources
SET status = $1, last_error = $2, last_error_at = NOW(), updated_at = NOW()
WHERE id = $3"#
)
.bind(status.to_string())
.bind(error_msg)
.bind(source_id)
.execute(&self.pool)
.await?;
} else {
sqlx::query(
r#"UPDATE sources
SET status = $1, updated_at = NOW()
WHERE id = $2"#
)
.bind(status.to_string())
.bind(source_id)
.execute(&self.pool)
.await?;
}
Ok(())
}
pub async fn update_source_sync_stats(&self, source_id: Uuid, files_synced: i64, files_pending: i64, size_bytes: i64) -> Result<()> {
sqlx::query(
r#"UPDATE sources
SET total_files_synced = $1, total_files_pending = $2, total_size_bytes = $3,
last_sync_at = NOW(), updated_at = NOW()
WHERE id = $4"#
)
.bind(files_synced)
.bind(files_pending)
.bind(size_bytes)
.bind(source_id)
.execute(&self.pool)
.await?;
Ok(())
}
pub async fn get_recent_documents_for_source(&self, source_id: Uuid, limit: i64) -> Result<Vec<Document>> {
let rows = sqlx::query(
r#"SELECT * FROM documents
WHERE source_id = $1
ORDER BY created_at DESC
LIMIT $2"#
)
.bind(source_id)
.bind(limit)
.fetch_all(&self.pool)
.await?;
let mut documents = Vec::new();
for row in rows {
documents.push(Document {
id: row.get("id"),
filename: row.get("filename"),
original_filename: row.get("original_filename"),
file_path: row.get("file_path"),
file_size: row.get("file_size"),
mime_type: row.get("mime_type"),
content: row.get("content"),
ocr_text: row.get("ocr_text"),
ocr_confidence: row.get("ocr_confidence"),
ocr_word_count: row.get("ocr_word_count"),
ocr_processing_time_ms: row.get("ocr_processing_time_ms"),
ocr_status: row.get("ocr_status"),
ocr_error: row.get("ocr_error"),
ocr_completed_at: row.get("ocr_completed_at"),
tags: row.get("tags"),
created_at: row.get("created_at"),
updated_at: row.get("updated_at"),
user_id: row.get("user_id"),
});
}
Ok(documents)
}
}

View File

@ -31,6 +31,7 @@ use db::Database;
pub struct AppState {
pub db: Database,
pub config: Config,
pub webdav_scheduler: Option<std::sync::Arc<webdav_scheduler::WebDAVScheduler>>,
}
/// Health check endpoint for monitoring

View File

@ -128,26 +128,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}
}
let state = AppState { db, config: config.clone() };
let state = AppState {
db,
config: config.clone(),
webdav_scheduler: None, // Will be set after creating scheduler
};
let state = Arc::new(state);
let app = Router::new()
.route("/api/health", get(readur::health_check))
.nest("/api/auth", readur::routes::auth::router())
.nest("/api/documents", readur::routes::documents::router())
.nest("/api/metrics", readur::routes::metrics::router())
.nest("/metrics", readur::routes::prometheus_metrics::router())
.nest("/api/notifications", readur::routes::notifications::router())
.nest("/api/queue", readur::routes::queue::router())
.nest("/api/search", readur::routes::search::router())
.nest("/api/settings", readur::routes::settings::router())
.nest("/api/users", readur::routes::users::router())
.nest("/api/webdav", readur::routes::webdav::router())
.merge(readur::swagger::create_swagger_router())
.fallback_service(ServeDir::new("frontend/dist").fallback(ServeFile::new("frontend/dist/index.html")))
.layer(CorsLayer::permissive())
.with_state(state.clone());
let watcher_config = config.clone();
let watcher_db = state.db.clone();
tokio::spawn(async move {
@ -197,13 +184,46 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}
});
// Create WebDAV scheduler and update AppState
let webdav_scheduler = Arc::new(readur::webdav_scheduler::WebDAVScheduler::new(state.clone()));
// Update the state to include the scheduler
let updated_state = AppState {
db: state.db.clone(),
config: state.config.clone(),
webdav_scheduler: Some(webdav_scheduler.clone()),
};
let state = Arc::new(updated_state);
// Start WebDAV background sync scheduler on background runtime
let webdav_scheduler = readur::webdav_scheduler::WebDAVScheduler::new(state.clone());
let scheduler_for_background = webdav_scheduler.clone();
background_runtime.spawn(async move {
info!("Starting WebDAV background sync scheduler");
webdav_scheduler.start().await;
info!("Starting WebDAV background sync scheduler with 30-second startup delay");
// Wait 30 seconds before starting scheduler to allow server to fully initialize
tokio::time::sleep(tokio::time::Duration::from_secs(30)).await;
info!("WebDAV background sync scheduler starting after startup delay");
scheduler_for_background.start().await;
});
// Create the router with the updated state
let app = Router::new()
.route("/api/health", get(readur::health_check))
.nest("/api/auth", readur::routes::auth::router())
.nest("/api/documents", readur::routes::documents::router())
.nest("/api/metrics", readur::routes::metrics::router())
.nest("/metrics", readur::routes::prometheus_metrics::router())
.nest("/api/notifications", readur::routes::notifications::router())
.nest("/api/queue", readur::routes::queue::router())
.nest("/api/search", readur::routes::search::router())
.nest("/api/settings", readur::routes::settings::router())
.nest("/api/sources", readur::routes::sources::router())
.nest("/api/users", readur::routes::users::router())
.nest("/api/webdav", readur::routes::webdav::router())
.merge(readur::swagger::create_swagger_router())
.fallback_service(ServeDir::new("frontend/dist").fallback(ServeFile::new("frontend/dist/index.html")))
.layer(CorsLayer::permissive())
.with_state(state.clone());
let listener = tokio::net::TcpListener::bind(&config.server_address).await?;
info!("Server starting on {}", config.server_address);

View File

@ -620,4 +620,163 @@ pub struct FileInfo {
pub last_modified: Option<DateTime<Utc>>,
pub etag: String,
pub is_directory: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, ToSchema)]
pub enum SourceType {
#[serde(rename = "webdav")]
WebDAV,
#[serde(rename = "local_folder")]
LocalFolder,
#[serde(rename = "s3")]
S3,
}
impl std::fmt::Display for SourceType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
SourceType::WebDAV => write!(f, "webdav"),
SourceType::LocalFolder => write!(f, "local_folder"),
SourceType::S3 => write!(f, "s3"),
}
}
}
impl TryFrom<String> for SourceType {
type Error = String;
fn try_from(value: String) -> Result<Self, Self::Error> {
match value.as_str() {
"webdav" => Ok(SourceType::WebDAV),
"local_folder" => Ok(SourceType::LocalFolder),
"s3" => Ok(SourceType::S3),
_ => Err(format!("Invalid source type: {}", value)),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, ToSchema)]
pub enum SourceStatus {
#[serde(rename = "idle")]
Idle,
#[serde(rename = "syncing")]
Syncing,
#[serde(rename = "error")]
Error,
}
impl std::fmt::Display for SourceStatus {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
SourceStatus::Idle => write!(f, "idle"),
SourceStatus::Syncing => write!(f, "syncing"),
SourceStatus::Error => write!(f, "error"),
}
}
}
impl TryFrom<String> for SourceStatus {
type Error = String;
fn try_from(value: String) -> Result<Self, <SourceStatus as TryFrom<String>>::Error> {
match value.as_str() {
"idle" => Ok(SourceStatus::Idle),
"syncing" => Ok(SourceStatus::Syncing),
"error" => Ok(SourceStatus::Error),
_ => Err(format!("Invalid source status: {}", value)),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, FromRow, ToSchema)]
pub struct Source {
pub id: Uuid,
pub user_id: Uuid,
pub name: String,
#[sqlx(try_from = "String")]
pub source_type: SourceType,
pub enabled: bool,
pub config: serde_json::Value,
#[sqlx(try_from = "String")]
pub status: SourceStatus,
pub last_sync_at: Option<DateTime<Utc>>,
pub last_error: Option<String>,
pub last_error_at: Option<DateTime<Utc>>,
pub total_files_synced: i64,
pub total_files_pending: i64,
pub total_size_bytes: i64,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
}
#[derive(Debug, Serialize, Deserialize, ToSchema)]
pub struct SourceResponse {
pub id: Uuid,
pub name: String,
pub source_type: SourceType,
pub enabled: bool,
pub config: serde_json::Value,
pub status: SourceStatus,
pub last_sync_at: Option<DateTime<Utc>>,
pub last_error: Option<String>,
pub last_error_at: Option<DateTime<Utc>>,
pub total_files_synced: i64,
pub total_files_pending: i64,
pub total_size_bytes: i64,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
}
#[derive(Debug, Serialize, Deserialize, ToSchema)]
pub struct CreateSource {
pub name: String,
pub source_type: SourceType,
pub enabled: Option<bool>,
pub config: serde_json::Value,
}
#[derive(Debug, Serialize, Deserialize, ToSchema)]
pub struct UpdateSource {
pub name: Option<String>,
pub enabled: Option<bool>,
pub config: Option<serde_json::Value>,
}
#[derive(Debug, Serialize, Deserialize, ToSchema)]
pub struct SourceWithStats {
pub source: SourceResponse,
pub recent_documents: Vec<DocumentResponse>,
pub sync_progress: Option<f32>,
}
impl From<Source> for SourceResponse {
fn from(source: Source) -> Self {
Self {
id: source.id,
name: source.name,
source_type: source.source_type,
enabled: source.enabled,
config: source.config,
status: source.status,
last_sync_at: source.last_sync_at,
last_error: source.last_error,
last_error_at: source.last_error_at,
total_files_synced: source.total_files_synced,
total_files_pending: source.total_files_pending,
total_size_bytes: source.total_size_bytes,
created_at: source.created_at,
updated_at: source.updated_at,
}
}
}
#[derive(Debug, Serialize, Deserialize, ToSchema)]
pub struct WebDAVSourceConfig {
pub server_url: String,
pub username: String,
pub password: String,
pub watch_folders: Vec<String>,
pub file_extensions: Vec<String>,
pub auto_sync: bool,
pub sync_interval_minutes: i32,
}

View File

@ -6,5 +6,6 @@ pub mod prometheus_metrics;
pub mod queue;
pub mod search;
pub mod settings;
pub mod sources;
pub mod users;
pub mod webdav;

362
src/routes/sources.rs Normal file
View File

@ -0,0 +1,362 @@
use axum::{
extract::{Path, State},
http::StatusCode,
response::Json,
routing::{delete, get, post, put},
Router,
};
use std::sync::Arc;
use uuid::Uuid;
use crate::{
auth::AuthUser,
models::{CreateSource, SourceResponse, SourceWithStats, UpdateSource},
AppState,
};
pub fn router() -> Router<Arc<AppState>> {
Router::new()
.route("/", get(list_sources).post(create_source))
.route("/{id}", get(get_source).put(update_source).delete(delete_source))
.route("/{id}/sync", post(trigger_sync))
.route("/{id}/test", post(test_connection))
}
#[utoipa::path(
get,
path = "/api/sources",
tag = "sources",
security(
("bearer_auth" = [])
),
responses(
(status = 200, description = "List of user sources", body = Vec<SourceResponse>),
(status = 401, description = "Unauthorized")
)
)]
async fn list_sources(
auth_user: AuthUser,
State(state): State<Arc<AppState>>,
) -> Result<Json<Vec<SourceResponse>>, StatusCode> {
let sources = state
.db
.get_sources(auth_user.user.id)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let responses: Vec<SourceResponse> = sources.into_iter().map(|s| s.into()).collect();
Ok(Json(responses))
}
#[utoipa::path(
post,
path = "/api/sources",
tag = "sources",
security(
("bearer_auth" = [])
),
request_body = CreateSource,
responses(
(status = 201, description = "Source created successfully", body = SourceResponse),
(status = 400, description = "Bad request - invalid source data"),
(status = 401, description = "Unauthorized")
)
)]
async fn create_source(
auth_user: AuthUser,
State(state): State<Arc<AppState>>,
Json(source_data): Json<CreateSource>,
) -> Result<Json<SourceResponse>, StatusCode> {
// Validate source configuration based on type
if let Err(_) = validate_source_config(&source_data) {
return Err(StatusCode::BAD_REQUEST);
}
let source = state
.db
.create_source(auth_user.user.id, &source_data)
.await
.map_err(|_| StatusCode::BAD_REQUEST)?;
Ok(Json(source.into()))
}
#[utoipa::path(
get,
path = "/api/sources/{id}",
tag = "sources",
security(
("bearer_auth" = [])
),
params(
("id" = Uuid, Path, description = "Source ID")
),
responses(
(status = 200, description = "Source details with stats", body = SourceWithStats),
(status = 404, description = "Source not found"),
(status = 401, description = "Unauthorized")
)
)]
async fn get_source(
auth_user: AuthUser,
Path(source_id): Path<Uuid>,
State(state): State<Arc<AppState>>,
) -> Result<Json<SourceWithStats>, StatusCode> {
let source = state
.db
.get_source(auth_user.user.id, source_id)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
.ok_or(StatusCode::NOT_FOUND)?;
// Get recent documents for this source
let recent_documents = state
.db
.get_recent_documents_for_source(source_id, 10)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
// Calculate sync progress
let sync_progress = if source.total_files_pending > 0 {
Some(
(source.total_files_synced as f32
/ (source.total_files_synced + source.total_files_pending) as f32)
* 100.0,
)
} else {
None
};
let response = SourceWithStats {
source: source.into(),
recent_documents: recent_documents.into_iter().map(|d| d.into()).collect(),
sync_progress,
};
Ok(Json(response))
}
#[utoipa::path(
put,
path = "/api/sources/{id}",
tag = "sources",
security(
("bearer_auth" = [])
),
params(
("id" = Uuid, Path, description = "Source ID")
),
request_body = UpdateSource,
responses(
(status = 200, description = "Source updated successfully", body = SourceResponse),
(status = 404, description = "Source not found"),
(status = 400, description = "Bad request - invalid update data"),
(status = 401, description = "Unauthorized")
)
)]
async fn update_source(
auth_user: AuthUser,
Path(source_id): Path<Uuid>,
State(state): State<Arc<AppState>>,
Json(update_data): Json<UpdateSource>,
) -> Result<Json<SourceResponse>, StatusCode> {
// Check if source exists
let existing = state
.db
.get_source(auth_user.user.id, source_id)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
.ok_or(StatusCode::NOT_FOUND)?;
// Validate config if provided
if let Some(config) = &update_data.config {
if let Err(_) = validate_config_for_type(&existing.source_type, config) {
return Err(StatusCode::BAD_REQUEST);
}
}
let source = state
.db
.update_source(auth_user.user.id, source_id, &update_data)
.await
.map_err(|_| StatusCode::BAD_REQUEST)?;
Ok(Json(source.into()))
}
#[utoipa::path(
delete,
path = "/api/sources/{id}",
tag = "sources",
security(
("bearer_auth" = [])
),
params(
("id" = Uuid, Path, description = "Source ID")
),
responses(
(status = 204, description = "Source deleted successfully"),
(status = 404, description = "Source not found"),
(status = 401, description = "Unauthorized")
)
)]
async fn delete_source(
auth_user: AuthUser,
Path(source_id): Path<Uuid>,
State(state): State<Arc<AppState>>,
) -> Result<StatusCode, StatusCode> {
let deleted = state
.db
.delete_source(auth_user.user.id, source_id)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
if deleted {
Ok(StatusCode::NO_CONTENT)
} else {
Err(StatusCode::NOT_FOUND)
}
}
#[utoipa::path(
post,
path = "/api/sources/{id}/sync",
tag = "sources",
security(
("bearer_auth" = [])
),
params(
("id" = Uuid, Path, description = "Source ID")
),
responses(
(status = 200, description = "Sync triggered successfully"),
(status = 404, description = "Source not found"),
(status = 409, description = "Source is already syncing"),
(status = 401, description = "Unauthorized")
)
)]
async fn trigger_sync(
auth_user: AuthUser,
Path(source_id): Path<Uuid>,
State(state): State<Arc<AppState>>,
) -> Result<StatusCode, StatusCode> {
let source = state
.db
.get_source(auth_user.user.id, source_id)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
.ok_or(StatusCode::NOT_FOUND)?;
// Check if already syncing
if matches!(source.status, crate::models::SourceStatus::Syncing) {
return Err(StatusCode::CONFLICT);
}
// Update status to syncing
state
.db
.update_source_status(source_id, crate::models::SourceStatus::Syncing, None)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
// Trigger the appropriate sync based on source type
match source.source_type {
crate::models::SourceType::WebDAV => {
// Send a message to trigger WebDAV sync
if let Some(scheduler) = &state.webdav_scheduler {
scheduler.trigger_sync(source_id).await;
}
}
_ => {
// Other source types not implemented yet
state
.db
.update_source_status(
source_id,
crate::models::SourceStatus::Error,
Some("Source type not implemented".to_string()),
)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
return Err(StatusCode::NOT_IMPLEMENTED);
}
}
Ok(StatusCode::OK)
}
#[utoipa::path(
post,
path = "/api/sources/{id}/test",
tag = "sources",
security(
("bearer_auth" = [])
),
params(
("id" = Uuid, Path, description = "Source ID")
),
responses(
(status = 200, description = "Connection test result", body = serde_json::Value),
(status = 404, description = "Source not found"),
(status = 401, description = "Unauthorized")
)
)]
async fn test_connection(
auth_user: AuthUser,
Path(source_id): Path<Uuid>,
State(state): State<Arc<AppState>>,
) -> Result<Json<serde_json::Value>, StatusCode> {
let source = state
.db
.get_source(auth_user.user.id, source_id)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
.ok_or(StatusCode::NOT_FOUND)?;
match source.source_type {
crate::models::SourceType::WebDAV => {
// Test WebDAV connection
let config: crate::models::WebDAVSourceConfig = serde_json::from_value(source.config)
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
match crate::webdav_service::test_webdav_connection(
&config.server_url,
&config.username,
&config.password,
)
.await
{
Ok(success) => Ok(Json(serde_json::json!({
"success": success,
"message": if success { "Connection successful" } else { "Connection failed" }
}))),
Err(e) => Ok(Json(serde_json::json!({
"success": false,
"message": format!("Connection failed: {}", e)
}))),
}
}
_ => Ok(Json(serde_json::json!({
"success": false,
"message": "Source type not implemented"
}))),
}
}
fn validate_source_config(source: &CreateSource) -> Result<(), &'static str> {
validate_config_for_type(&source.source_type, &source.config)
}
fn validate_config_for_type(
source_type: &crate::models::SourceType,
config: &serde_json::Value,
) -> Result<(), &'static str> {
match source_type {
crate::models::SourceType::WebDAV => {
let _: crate::models::WebDAVSourceConfig =
serde_json::from_value(config.clone()).map_err(|_| "Invalid WebDAV configuration")?;
Ok(())
}
_ => Ok(()), // Other types not implemented yet
}
}

View File

@ -87,6 +87,14 @@ async fn perform_sync_internal(
// Process each watch folder
for folder_path in &config.watch_folders {
// Check if sync has been cancelled before processing each folder
if let Ok(Some(sync_state)) = state.db.get_webdav_sync_state(user_id).await {
if !sync_state.is_running {
info!("WebDAV sync cancelled, stopping folder processing");
return Err("Sync cancelled by user".into());
}
}
info!("Syncing folder: {}", folder_path);
// Update current folder in sync state
@ -161,6 +169,17 @@ async fn perform_sync_internal(
// Process files concurrently and collect results
while let Some(result) = file_futures.next().await {
// Check if sync has been cancelled
if let Ok(Some(sync_state)) = state.db.get_webdav_sync_state(user_id).await {
if !sync_state.is_running {
info!("WebDAV sync cancelled during file processing, stopping");
// Cancel remaining futures
file_futures.clear();
sync_errors.push("Sync cancelled by user during file processing".to_string());
break;
}
}
match result {
Ok(processed) => {
if processed {
@ -215,6 +234,14 @@ async fn process_single_file(
// Acquire semaphore permit to limit concurrent downloads
let _permit = semaphore.acquire().await.map_err(|e| format!("Semaphore error: {}", e))?;
// Check if sync has been cancelled before processing this file
if let Ok(Some(sync_state)) = state.db.get_webdav_sync_state(user_id).await {
if !sync_state.is_running {
info!("Sync cancelled, skipping file: {}", file_info.path);
return Err("Sync cancelled by user".to_string());
}
}
info!("Processing file: {}", file_info.path);
// Check if we've already processed this file

View File

@ -30,6 +30,11 @@ impl WebDAVScheduler {
pub async fn start(&self) {
info!("Starting WebDAV background sync scheduler");
// First, check for any interrupted syncs that need to be resumed
if let Err(e) = self.resume_interrupted_syncs().await {
error!("Error resuming interrupted syncs: {}", e);
}
let mut interval_timer = interval(self.check_interval);
loop {
@ -41,6 +46,90 @@ impl WebDAVScheduler {
}
}
async fn resume_interrupted_syncs(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
info!("Checking for interrupted WebDAV syncs to resume");
// Get all users with settings
let users_with_settings = self.db.get_all_user_settings().await?;
for user_settings in users_with_settings {
// Skip if WebDAV is not enabled
if !user_settings.webdav_enabled {
continue;
}
// Check if there's an interrupted sync for this user
if let Ok(Some(sync_state)) = self.db.get_webdav_sync_state(user_settings.user_id).await {
// Check if sync was interrupted (has errors containing "server restart" message)
let was_interrupted = sync_state.errors.iter().any(|e| e.contains("server restart"));
if was_interrupted && user_settings.webdav_auto_sync {
info!("Found interrupted WebDAV sync for user {}, will resume", user_settings.user_id);
// Clear the interruption error and resume sync
let cleared_errors: Vec<String> = sync_state.errors.into_iter()
.filter(|e| !e.contains("server restart"))
.collect();
let reset_state = crate::models::UpdateWebDAVSyncState {
last_sync_at: sync_state.last_sync_at,
sync_cursor: sync_state.sync_cursor,
is_running: false,
files_processed: sync_state.files_processed,
files_remaining: 0,
current_folder: None,
errors: cleared_errors,
};
if let Err(e) = self.db.update_webdav_sync_state(user_settings.user_id, &reset_state).await {
error!("Failed to reset interrupted sync state: {}", e);
continue;
}
// Trigger a new sync for this user
if let Ok(webdav_config) = self.build_webdav_config(&user_settings) {
if let Ok(webdav_service) = WebDAVService::new(webdav_config.clone()) {
let state_clone = self.state.clone();
let user_id = user_settings.user_id;
let enable_background_ocr = user_settings.enable_background_ocr;
info!("Resuming interrupted WebDAV sync for user {}", user_id);
tokio::spawn(async move {
match perform_webdav_sync_with_tracking(state_clone.clone(), user_id, webdav_service, webdav_config, enable_background_ocr).await {
Ok(files_processed) => {
info!("Resumed WebDAV sync completed for user {}: {} files processed", user_id, files_processed);
// Send notification
let notification = crate::models::CreateNotification {
notification_type: "success".to_string(),
title: "WebDAV Sync Resumed".to_string(),
message: format!("Resumed sync after server restart. Processed {} files", files_processed),
action_url: Some("/documents".to_string()),
metadata: Some(serde_json::json!({
"sync_type": "webdav_resume",
"files_processed": files_processed
})),
};
if let Err(e) = state_clone.db.create_notification(user_id, &notification).await {
error!("Failed to create resume notification: {}", e);
}
}
Err(e) => {
error!("Resumed WebDAV sync failed for user {}: {}", user_id, e);
}
}
});
}
}
}
}
}
Ok(())
}
async fn check_and_sync_users(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// Get all users with WebDAV auto-sync enabled
let users_with_settings = self.db.get_all_user_settings().await?;
@ -189,5 +278,11 @@ impl WebDAVScheduler {
server_type: Some("nextcloud".to_string()),
})
}
pub async fn trigger_sync(&self, source_id: uuid::Uuid) {
info!("Triggering manual sync for source {}", source_id);
// TODO: Implement manual sync trigger for sources
// For now, this is a placeholder that the routes can call
}
}

View File

@ -420,4 +420,23 @@ impl WebDAVService {
Ok(bytes.to_vec())
}
}
pub async fn test_webdav_connection(
server_url: &str,
username: &str,
password: &str,
) -> Result<bool> {
let client = Client::new();
// Try to list the root directory to test connectivity
let response = client
.request(Method::from_bytes(b"PROPFIND")?, server_url)
.header("Depth", "0")
.basic_auth(username, Some(password))
.timeout(Duration::from_secs(10))
.send()
.await?;
Ok(response.status().is_success())
}