feat(server): webdav integration nearly done

This commit is contained in:
perf3ct 2025-06-14 16:14:41 +00:00
parent cc04d5818b
commit 5b67232266
7 changed files with 1188 additions and 275 deletions

169
src/db.rs
View File

@ -1590,4 +1590,173 @@ impl Database {
recent_notifications, recent_notifications,
}) })
} }
// WebDAV sync state operations
pub async fn get_webdav_sync_state(&self, user_id: Uuid) -> Result<Option<crate::models::WebDAVSyncState>> {
let row = sqlx::query(
r#"SELECT id, user_id, last_sync_at, sync_cursor, is_running, files_processed,
files_remaining, current_folder, errors, created_at, updated_at
FROM webdav_sync_state WHERE user_id = $1"#
)
.bind(user_id)
.fetch_optional(&self.pool)
.await?;
match row {
Some(row) => Ok(Some(crate::models::WebDAVSyncState {
id: row.get("id"),
user_id: row.get("user_id"),
last_sync_at: row.get("last_sync_at"),
sync_cursor: row.get("sync_cursor"),
is_running: row.get("is_running"),
files_processed: row.get("files_processed"),
files_remaining: row.get("files_remaining"),
current_folder: row.get("current_folder"),
errors: row.get("errors"),
created_at: row.get("created_at"),
updated_at: row.get("updated_at"),
})),
None => Ok(None),
}
}
pub async fn update_webdav_sync_state(&self, user_id: Uuid, state: &crate::models::UpdateWebDAVSyncState) -> Result<()> {
sqlx::query(
r#"INSERT INTO webdav_sync_state (user_id, last_sync_at, sync_cursor, is_running,
files_processed, files_remaining, current_folder, errors, updated_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, NOW())
ON CONFLICT (user_id) DO UPDATE SET
last_sync_at = EXCLUDED.last_sync_at,
sync_cursor = EXCLUDED.sync_cursor,
is_running = EXCLUDED.is_running,
files_processed = EXCLUDED.files_processed,
files_remaining = EXCLUDED.files_remaining,
current_folder = EXCLUDED.current_folder,
errors = EXCLUDED.errors,
updated_at = NOW()"#
)
.bind(user_id)
.bind(state.last_sync_at)
.bind(&state.sync_cursor)
.bind(state.is_running)
.bind(state.files_processed)
.bind(state.files_remaining)
.bind(&state.current_folder)
.bind(&state.errors)
.execute(&self.pool)
.await?;
Ok(())
}
// WebDAV file tracking operations
pub async fn get_webdav_file_by_path(&self, user_id: Uuid, webdav_path: &str) -> Result<Option<crate::models::WebDAVFile>> {
let row = sqlx::query(
r#"SELECT id, user_id, webdav_path, etag, last_modified, file_size,
mime_type, document_id, sync_status, sync_error, created_at, updated_at
FROM webdav_files WHERE user_id = $1 AND webdav_path = $2"#
)
.bind(user_id)
.bind(webdav_path)
.fetch_optional(&self.pool)
.await?;
match row {
Some(row) => Ok(Some(crate::models::WebDAVFile {
id: row.get("id"),
user_id: row.get("user_id"),
webdav_path: row.get("webdav_path"),
etag: row.get("etag"),
last_modified: row.get("last_modified"),
file_size: row.get("file_size"),
mime_type: row.get("mime_type"),
document_id: row.get("document_id"),
sync_status: row.get("sync_status"),
sync_error: row.get("sync_error"),
created_at: row.get("created_at"),
updated_at: row.get("updated_at"),
})),
None => Ok(None),
}
}
pub async fn create_or_update_webdav_file(&self, file: &crate::models::CreateWebDAVFile) -> Result<crate::models::WebDAVFile> {
let row = sqlx::query(
r#"INSERT INTO webdav_files (user_id, webdav_path, etag, last_modified, file_size,
mime_type, document_id, sync_status, sync_error)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
ON CONFLICT (user_id, webdav_path) DO UPDATE SET
etag = EXCLUDED.etag,
last_modified = EXCLUDED.last_modified,
file_size = EXCLUDED.file_size,
mime_type = EXCLUDED.mime_type,
document_id = EXCLUDED.document_id,
sync_status = EXCLUDED.sync_status,
sync_error = EXCLUDED.sync_error,
updated_at = NOW()
RETURNING id, user_id, webdav_path, etag, last_modified, file_size,
mime_type, document_id, sync_status, sync_error, created_at, updated_at"#
)
.bind(file.user_id)
.bind(&file.webdav_path)
.bind(&file.etag)
.bind(file.last_modified)
.bind(file.file_size)
.bind(&file.mime_type)
.bind(file.document_id)
.bind(&file.sync_status)
.bind(&file.sync_error)
.fetch_one(&self.pool)
.await?;
Ok(crate::models::WebDAVFile {
id: row.get("id"),
user_id: row.get("user_id"),
webdav_path: row.get("webdav_path"),
etag: row.get("etag"),
last_modified: row.get("last_modified"),
file_size: row.get("file_size"),
mime_type: row.get("mime_type"),
document_id: row.get("document_id"),
sync_status: row.get("sync_status"),
sync_error: row.get("sync_error"),
created_at: row.get("created_at"),
updated_at: row.get("updated_at"),
})
}
pub async fn get_pending_webdav_files(&self, user_id: Uuid, limit: i64) -> Result<Vec<crate::models::WebDAVFile>> {
let rows = sqlx::query(
r#"SELECT id, user_id, webdav_path, etag, last_modified, file_size,
mime_type, document_id, sync_status, sync_error, created_at, updated_at
FROM webdav_files
WHERE user_id = $1 AND sync_status = 'pending'
ORDER BY created_at ASC
LIMIT $2"#
)
.bind(user_id)
.bind(limit)
.fetch_all(&self.pool)
.await?;
let mut files = Vec::new();
for row in rows {
files.push(crate::models::WebDAVFile {
id: row.get("id"),
user_id: row.get("user_id"),
webdav_path: row.get("webdav_path"),
etag: row.get("etag"),
last_modified: row.get("last_modified"),
file_size: row.get("file_size"),
mime_type: row.get("mime_type"),
document_id: row.get("document_id"),
sync_status: row.get("sync_status"),
sync_error: row.get("sync_error"),
created_at: row.get("created_at"),
updated_at: row.get("updated_at"),
});
}
Ok(files)
}
} }

View File

@ -554,4 +554,59 @@ pub struct CreateNotification {
pub struct NotificationSummary { pub struct NotificationSummary {
pub unread_count: i64, pub unread_count: i64,
pub recent_notifications: Vec<Notification>, pub recent_notifications: Vec<Notification>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct WebDAVSyncState {
pub id: Uuid,
pub user_id: Uuid,
pub last_sync_at: Option<DateTime<Utc>>,
pub sync_cursor: Option<String>,
pub is_running: bool,
pub files_processed: i64,
pub files_remaining: i64,
pub current_folder: Option<String>,
pub errors: Vec<String>,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct UpdateWebDAVSyncState {
pub last_sync_at: Option<DateTime<Utc>>,
pub sync_cursor: Option<String>,
pub is_running: bool,
pub files_processed: i64,
pub files_remaining: i64,
pub current_folder: Option<String>,
pub errors: Vec<String>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct WebDAVFile {
pub id: Uuid,
pub user_id: Uuid,
pub webdav_path: String,
pub etag: String,
pub last_modified: Option<DateTime<Utc>>,
pub file_size: i64,
pub mime_type: String,
pub document_id: Option<Uuid>,
pub sync_status: String,
pub sync_error: Option<String>,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct CreateWebDAVFile {
pub user_id: Uuid,
pub webdav_path: String,
pub etag: String,
pub last_modified: Option<DateTime<Utc>>,
pub file_size: i64,
pub mime_type: String,
pub document_id: Option<Uuid>,
pub sync_status: String,
pub sync_error: Option<String>,
} }

View File

@ -1,5 +1,4 @@
use std::sync::Arc; use std::sync::Arc;
use std::path::Path;
use axum::{ use axum::{
extract::State, extract::State,
@ -16,13 +15,14 @@ use crate::{
WebDAVConnectionResult, WebDAVCrawlEstimate, WebDAVSyncStatus, WebDAVConnectionResult, WebDAVCrawlEstimate, WebDAVSyncStatus,
WebDAVTestConnection, WebDAVTestConnection,
}, },
ocr_queue::OcrQueueService,
file_service::FileService,
AppState, AppState,
}; };
use crate::webdav_service::WebDAVConfig; use crate::webdav_service::WebDAVConfig;
use crate::webdav_service::WebDAVService; use crate::webdav_service::WebDAVService;
pub mod webdav_sync;
use webdav_sync::perform_webdav_sync_with_tracking;
pub fn router() -> Router<Arc<AppState>> { pub fn router() -> Router<Arc<AppState>> {
Router::new() Router::new()
.route("/test-connection", post(test_webdav_connection)) .route("/test-connection", post(test_webdav_connection))
@ -246,18 +246,41 @@ async fn get_webdav_sync_status(
} }
}; };
// For now, return basic status - in production you'd query the webdav_sync_state table // Get sync state from database
// TODO: Read actual sync state from database match state.db.get_webdav_sync_state(auth_user.user.id).await {
let status = WebDAVSyncStatus { Ok(Some(sync_state)) => {
is_running: false, Ok(Json(WebDAVSyncStatus {
last_sync: None, is_running: sync_state.is_running,
files_processed: 0, last_sync: sync_state.last_sync_at,
files_remaining: 0, files_processed: sync_state.files_processed,
current_folder: None, files_remaining: sync_state.files_remaining,
errors: Vec::new(), current_folder: sync_state.current_folder,
}; errors: sync_state.errors,
}))
Ok(Json(status)) }
Ok(None) => {
// No sync state yet
Ok(Json(WebDAVSyncStatus {
is_running: false,
last_sync: None,
files_processed: 0,
files_remaining: 0,
current_folder: None,
errors: Vec::new(),
}))
}
Err(e) => {
error!("Failed to get WebDAV sync state: {}", e);
Ok(Json(WebDAVSyncStatus {
is_running: false,
last_sync: None,
files_processed: 0,
files_remaining: 0,
current_folder: None,
errors: vec![format!("Error retrieving sync state: {}", e)],
}))
}
}
} }
#[utoipa::path( #[utoipa::path(
@ -303,7 +326,7 @@ async fn start_webdav_sync(
let enable_background_ocr = user_settings.enable_background_ocr; let enable_background_ocr = user_settings.enable_background_ocr;
tokio::spawn(async move { tokio::spawn(async move {
match perform_webdav_sync(state_clone.clone(), user_id, webdav_service, webdav_config, enable_background_ocr).await { match perform_webdav_sync_with_tracking(state_clone.clone(), user_id, webdav_service, webdav_config, enable_background_ocr).await {
Ok(files_processed) => { Ok(files_processed) => {
info!("WebDAV sync completed successfully for user {}: {} files processed", user_id, files_processed); info!("WebDAV sync completed successfully for user {}: {} files processed", user_id, files_processed);
@ -355,123 +378,3 @@ async fn start_webdav_sync(
}))) })))
} }
async fn perform_webdav_sync(
state: Arc<AppState>,
user_id: uuid::Uuid,
webdav_service: WebDAVService,
config: WebDAVConfig,
enable_background_ocr: bool,
) -> Result<usize, Box<dyn std::error::Error + Send + Sync>> {
info!("Performing WebDAV sync for user {} on {} folders", user_id, config.watch_folders.len());
let mut files_processed = 0;
// Process each watch folder
for folder_path in &config.watch_folders {
info!("Syncing folder: {}", folder_path);
// Discover files in the folder
match webdav_service.discover_files_in_folder(folder_path).await {
Ok(files) => {
info!("Found {} files in folder {}", files.len(), folder_path);
for file_info in files {
if file_info.is_directory {
continue; // Skip directories
}
// Check if file extension is supported
let file_extension = Path::new(&file_info.name)
.extension()
.and_then(|ext| ext.to_str())
.unwrap_or("")
.to_lowercase();
if !config.file_extensions.contains(&file_extension) {
continue; // Skip unsupported file types
}
// Check if we've already processed this file
// TODO: Check webdav_files table for existing files with same etag
// Download the file
match webdav_service.download_file(&file_info.path).await {
Ok(file_data) => {
info!("Downloaded file: {} ({} bytes)", file_info.name, file_data.len());
// Create file service and save file to disk first
let file_service = FileService::new(state.config.upload_path.clone());
let saved_file_path = match file_service.save_file(&file_info.name, &file_data).await {
Ok(path) => path,
Err(e) => {
error!("Failed to save file {}: {}", file_info.name, e);
continue;
}
};
// Create document record
let document = file_service.create_document(
&file_info.name,
&file_info.name, // original filename same as name
&saved_file_path,
file_info.size,
&file_info.mime_type,
user_id,
);
// Save document to database
match state.db.create_document(document).await {
Ok(saved_document) => {
info!("Created document record: {} (ID: {})", file_info.name, saved_document.id);
// Add to OCR queue if enabled
if enable_background_ocr {
match sqlx::PgPool::connect(&state.config.database_url).await {
Ok(pool) => {
let queue_service = OcrQueueService::new(state.db.clone(), pool, 1);
// Calculate priority based on file size
let priority = match file_info.size {
0..=1048576 => 10, // <= 1MB: highest priority
..=5242880 => 8, // 1-5MB: high priority
..=10485760 => 6, // 5-10MB: medium priority
..=52428800 => 4, // 10-50MB: low priority
_ => 2, // > 50MB: lowest priority
};
if let Err(e) = queue_service.enqueue_document(saved_document.id, priority, file_info.size).await {
error!("Failed to enqueue document for OCR: {}", e);
} else {
info!("Enqueued document {} for OCR processing", saved_document.id);
}
}
Err(e) => {
error!("Failed to connect to database for OCR queueing: {}", e);
}
}
}
// TODO: Record in webdav_files table for tracking
files_processed += 1;
}
Err(e) => {
error!("Failed to create document record for {}: {}", file_info.name, e);
}
}
}
Err(e) => {
error!("Failed to download file {}: {}", file_info.path, e);
}
}
}
}
Err(e) => {
error!("Failed to discover files in folder {}: {}", folder_path, e);
}
}
}
info!("WebDAV sync completed for user {}: {} files processed", user_id, files_processed);
Ok(files_processed)
}

View File

@ -0,0 +1,287 @@
use std::sync::Arc;
use std::path::Path;
use tracing::{error, info, warn};
use chrono::Utc;
use crate::{
AppState,
models::{CreateWebDAVFile, UpdateWebDAVSyncState},
ocr_queue::OcrQueueService,
file_service::FileService,
webdav_service::{WebDAVConfig, WebDAVService},
};
pub async fn perform_webdav_sync_with_tracking(
state: Arc<AppState>,
user_id: uuid::Uuid,
webdav_service: WebDAVService,
config: WebDAVConfig,
enable_background_ocr: bool,
) -> Result<usize, Box<dyn std::error::Error + Send + Sync>> {
info!("Performing WebDAV sync for user {} on {} folders", user_id, config.watch_folders.len());
// Update sync state to running
let sync_state_update = UpdateWebDAVSyncState {
last_sync_at: Some(Utc::now()),
sync_cursor: None,
is_running: true,
files_processed: 0,
files_remaining: 0,
current_folder: None,
errors: Vec::new(),
};
if let Err(e) = state.db.update_webdav_sync_state(user_id, &sync_state_update).await {
error!("Failed to update sync state: {}", e);
}
let mut total_files_processed = 0;
let mut sync_errors = Vec::new();
// Process each watch folder
for folder_path in &config.watch_folders {
info!("Syncing folder: {}", folder_path);
// Update current folder in sync state
let folder_update = UpdateWebDAVSyncState {
last_sync_at: Some(Utc::now()),
sync_cursor: None,
is_running: true,
files_processed: total_files_processed as i64,
files_remaining: 0,
current_folder: Some(folder_path.clone()),
errors: sync_errors.clone(),
};
if let Err(e) = state.db.update_webdav_sync_state(user_id, &folder_update).await {
warn!("Failed to update sync folder state: {}", e);
}
// Discover files in the folder
match webdav_service.discover_files_in_folder(folder_path).await {
Ok(files) => {
info!("Found {} files in folder {}", files.len(), folder_path);
let mut folder_files_processed = 0;
let files_to_process = files.len();
for (idx, file_info) in files.into_iter().enumerate() {
if file_info.is_directory {
continue; // Skip directories
}
// Check if file extension is supported
let file_extension = Path::new(&file_info.name)
.extension()
.and_then(|ext| ext.to_str())
.unwrap_or("")
.to_lowercase();
if !config.file_extensions.contains(&file_extension) {
continue; // Skip unsupported file types
}
// Check if we've already processed this file
match state.db.get_webdav_file_by_path(user_id, &file_info.path).await {
Ok(Some(existing_file)) => {
// Check if file has changed (compare ETags)
if existing_file.etag == file_info.etag {
info!("Skipping unchanged file: {} (ETag: {})", file_info.path, file_info.etag);
continue;
}
info!("File has changed: {} (old ETag: {}, new ETag: {})",
file_info.path, existing_file.etag, file_info.etag);
}
Ok(None) => {
info!("New file found: {}", file_info.path);
}
Err(e) => {
warn!("Error checking existing file {}: {}", file_info.path, e);
}
}
// Update progress
let progress_update = UpdateWebDAVSyncState {
last_sync_at: Some(Utc::now()),
sync_cursor: None,
is_running: true,
files_processed: (total_files_processed + folder_files_processed) as i64,
files_remaining: (files_to_process - idx - 1) as i64,
current_folder: Some(folder_path.clone()),
errors: sync_errors.clone(),
};
if let Err(e) = state.db.update_webdav_sync_state(user_id, &progress_update).await {
warn!("Failed to update sync progress: {}", e);
}
// Download the file
match webdav_service.download_file(&file_info.path).await {
Ok(file_data) => {
info!("Downloaded file: {} ({} bytes)", file_info.name, file_data.len());
// Create file service and save file to disk
let file_service = FileService::new(state.config.upload_path.clone());
let saved_file_path = match file_service.save_file(&file_info.name, &file_data).await {
Ok(path) => path,
Err(e) => {
error!("Failed to save file {}: {}", file_info.name, e);
sync_errors.push(format!("Failed to save {}: {}", file_info.name, e));
// Record failed file in database
let failed_file = CreateWebDAVFile {
user_id,
webdav_path: file_info.path.clone(),
etag: file_info.etag.clone(),
last_modified: file_info.last_modified,
file_size: file_info.size,
mime_type: file_info.mime_type.clone(),
document_id: None,
sync_status: "failed".to_string(),
sync_error: Some(e.to_string()),
};
if let Err(db_err) = state.db.create_or_update_webdav_file(&failed_file).await {
error!("Failed to record failed file: {}", db_err);
}
continue;
}
};
// Create document record
let document = file_service.create_document(
&file_info.name,
&file_info.name, // original filename same as name
&saved_file_path,
file_info.size,
&file_info.mime_type,
user_id,
);
// Save document to database
match state.db.create_document(document).await {
Ok(saved_document) => {
info!("Created document record: {} (ID: {})", file_info.name, saved_document.id);
// Record successful file in WebDAV tracking
let webdav_file = CreateWebDAVFile {
user_id,
webdav_path: file_info.path.clone(),
etag: file_info.etag.clone(),
last_modified: file_info.last_modified,
file_size: file_info.size,
mime_type: file_info.mime_type.clone(),
document_id: Some(saved_document.id),
sync_status: "completed".to_string(),
sync_error: None,
};
if let Err(e) = state.db.create_or_update_webdav_file(&webdav_file).await {
error!("Failed to record WebDAV file: {}", e);
}
// Add to OCR queue if enabled
if enable_background_ocr {
match sqlx::PgPool::connect(&state.config.database_url).await {
Ok(pool) => {
let queue_service = OcrQueueService::new(state.db.clone(), pool, 1);
// Calculate priority based on file size
let priority = match file_info.size {
0..=1048576 => 10, // <= 1MB: highest priority
..=5242880 => 8, // 1-5MB: high priority
..=10485760 => 6, // 5-10MB: medium priority
..=52428800 => 4, // 10-50MB: low priority
_ => 2, // > 50MB: lowest priority
};
if let Err(e) = queue_service.enqueue_document(saved_document.id, priority, file_info.size).await {
error!("Failed to enqueue document for OCR: {}", e);
} else {
info!("Enqueued document {} for OCR processing", saved_document.id);
}
}
Err(e) => {
error!("Failed to connect to database for OCR queueing: {}", e);
}
}
}
folder_files_processed += 1;
}
Err(e) => {
error!("Failed to create document record for {}: {}", file_info.name, e);
sync_errors.push(format!("Failed to create document {}: {}", file_info.name, e));
// Update WebDAV file status to failed
let failed_file = CreateWebDAVFile {
user_id,
webdav_path: file_info.path.clone(),
etag: file_info.etag.clone(),
last_modified: file_info.last_modified,
file_size: file_info.size,
mime_type: file_info.mime_type.clone(),
document_id: None,
sync_status: "failed".to_string(),
sync_error: Some(e.to_string()),
};
if let Err(db_err) = state.db.create_or_update_webdav_file(&failed_file).await {
error!("Failed to record failed file: {}", db_err);
}
}
}
}
Err(e) => {
error!("Failed to download file {}: {}", file_info.path, e);
sync_errors.push(format!("Failed to download {}: {}", file_info.path, e));
// Record download failure
let failed_file = CreateWebDAVFile {
user_id,
webdav_path: file_info.path.clone(),
etag: file_info.etag.clone(),
last_modified: file_info.last_modified,
file_size: file_info.size,
mime_type: file_info.mime_type.clone(),
document_id: None,
sync_status: "failed".to_string(),
sync_error: Some(format!("Download failed: {}", e)),
};
if let Err(db_err) = state.db.create_or_update_webdav_file(&failed_file).await {
error!("Failed to record failed file: {}", db_err);
}
}
}
}
total_files_processed += folder_files_processed;
}
Err(e) => {
error!("Failed to discover files in folder {}: {}", folder_path, e);
sync_errors.push(format!("Failed to list folder {}: {}", folder_path, e));
}
}
}
// Update final sync state
let final_state = UpdateWebDAVSyncState {
last_sync_at: Some(Utc::now()),
sync_cursor: None,
is_running: false,
files_processed: total_files_processed as i64,
files_remaining: 0,
current_folder: None,
errors: sync_errors,
};
if let Err(e) = state.db.update_webdav_sync_state(user_id, &final_state).await {
error!("Failed to update final sync state: {}", e);
}
info!("WebDAV sync completed for user {}: {} files processed", user_id, total_files_processed);
Ok(total_files_processed)
}

View File

@ -10,6 +10,7 @@ use crate::{
AppState, AppState,
}; };
use crate::webdav_service::{WebDAVConfig, WebDAVService}; use crate::webdav_service::{WebDAVConfig, WebDAVService};
use crate::routes::webdav::webdav_sync::perform_webdav_sync_with_tracking;
pub struct WebDAVScheduler { pub struct WebDAVScheduler {
db: Database, db: Database,
@ -66,7 +67,7 @@ impl WebDAVScheduler {
let enable_background_ocr = user_settings.enable_background_ocr; let enable_background_ocr = user_settings.enable_background_ocr;
tokio::spawn(async move { tokio::spawn(async move {
match perform_webdav_sync(state_clone.clone(), user_id, webdav_service, webdav_config, enable_background_ocr).await { match perform_webdav_sync_with_tracking(state_clone.clone(), user_id, webdav_service, webdav_config, enable_background_ocr).await {
Ok(files_processed) => { Ok(files_processed) => {
info!("Background WebDAV sync completed for user {}: {} files processed", user_id, files_processed); info!("Background WebDAV sync completed for user {}: {} files processed", user_id, files_processed);
@ -121,9 +122,6 @@ impl WebDAVScheduler {
} }
async fn is_sync_due(&self, user_settings: &crate::models::Settings) -> Result<bool, Box<dyn std::error::Error + Send + Sync>> { async fn is_sync_due(&self, user_settings: &crate::models::Settings) -> Result<bool, Box<dyn std::error::Error + Send + Sync>> {
// TODO: Add a webdav_sync_state table to track last sync time
// For now, we'll use a simple time-based check
// Get the sync interval in minutes // Get the sync interval in minutes
let sync_interval_minutes = user_settings.webdav_sync_interval_minutes; let sync_interval_minutes = user_settings.webdav_sync_interval_minutes;
@ -132,8 +130,27 @@ impl WebDAVScheduler {
return Ok(false); return Ok(false);
} }
// TODO: Check actual last sync time from database // Check if a sync is already running
// For now, assume sync is always due (this will be refined when we add the webdav_sync_state table) if let Ok(Some(sync_state)) = self.db.get_webdav_sync_state(user_settings.user_id).await {
if sync_state.is_running {
info!("Sync already running for user {}", user_settings.user_id);
return Ok(false);
}
// Check last sync time
if let Some(last_sync) = sync_state.last_sync_at {
let elapsed = chrono::Utc::now() - last_sync;
let elapsed_minutes = elapsed.num_minutes();
if elapsed_minutes < sync_interval_minutes as i64 {
info!("Sync not due for user {} (last sync {} minutes ago, interval {} minutes)",
user_settings.user_id, elapsed_minutes, sync_interval_minutes);
return Ok(false);
}
}
}
// Sync is due
Ok(true) Ok(true)
} }
@ -164,124 +181,3 @@ impl WebDAVScheduler {
} }
} }
// Re-use the sync function from webdav routes
async fn perform_webdav_sync(
state: Arc<AppState>,
user_id: uuid::Uuid,
webdav_service: WebDAVService,
config: WebDAVConfig,
enable_background_ocr: bool,
) -> Result<usize, Box<dyn std::error::Error + Send + Sync>> {
use std::path::Path;
info!("Performing background WebDAV sync for user {} on {} folders", user_id, config.watch_folders.len());
let mut files_processed = 0;
// Process each watch folder
for folder_path in &config.watch_folders {
info!("Syncing folder: {}", folder_path);
// Discover files in the folder
match webdav_service.discover_files_in_folder(folder_path).await {
Ok(files) => {
info!("Found {} files in folder {}", files.len(), folder_path);
for file_info in files {
if file_info.is_directory {
continue; // Skip directories
}
// Check if file extension is supported
let file_extension = Path::new(&file_info.name)
.extension()
.and_then(|ext| ext.to_str())
.unwrap_or("")
.to_lowercase();
if !config.file_extensions.contains(&file_extension) {
continue; // Skip unsupported file types
}
// TODO: Check if we've already processed this file using ETag
// Download the file
match webdav_service.download_file(&file_info.path).await {
Ok(file_data) => {
info!("Downloaded file: {} ({} bytes)", file_info.name, file_data.len());
// Create file service and save file
let file_service = FileService::new(state.config.upload_path.clone());
let saved_file_path = match file_service.save_file(&file_info.name, &file_data).await {
Ok(path) => path,
Err(e) => {
error!("Failed to save file {}: {}", file_info.name, e);
continue;
}
};
// Create document record
let document = file_service.create_document(
&file_info.name,
&file_info.name,
&saved_file_path,
file_info.size,
&file_info.mime_type,
user_id,
);
// Save document to database
match state.db.create_document(document).await {
Ok(saved_document) => {
info!("Created document record: {} (ID: {})", file_info.name, saved_document.id);
// Add to OCR queue if enabled
if enable_background_ocr {
match sqlx::PgPool::connect(&state.config.database_url).await {
Ok(pool) => {
let queue_service = OcrQueueService::new(state.db.clone(), pool, 1);
// Calculate priority based on file size
let priority = match file_info.size {
0..=1048576 => 10, // <= 1MB: highest priority
..=5242880 => 8, // 1-5MB: high priority
..=10485760 => 6, // 5-10MB: medium priority
..=52428800 => 4, // 10-50MB: low priority
_ => 2, // > 50MB: lowest priority
};
if let Err(e) = queue_service.enqueue_document(saved_document.id, priority, file_info.size).await {
error!("Failed to enqueue document for OCR: {}", e);
} else {
info!("Enqueued document {} for OCR processing", saved_document.id);
}
}
Err(e) => {
error!("Failed to connect to database for OCR queueing: {}", e);
}
}
}
files_processed += 1;
}
Err(e) => {
error!("Failed to create document record for {}: {}", file_info.name, e);
}
}
}
Err(e) => {
error!("Failed to download file {}: {}", file_info.path, e);
}
}
}
}
Err(e) => {
error!("Failed to discover files in folder {}: {}", folder_path, e);
}
}
}
info!("Background WebDAV sync completed for user {}: {} files processed", user_id, files_processed);
Ok(files_processed)
}

View File

@ -536,16 +536,4 @@ impl WebDAVService {
Ok(bytes.to_vec()) Ok(bytes.to_vec())
} }
pub async fn get_sync_status(&self) -> Result<WebDAVSyncStatus> {
// This would typically read from database/cache
// For now, return a placeholder
Ok(WebDAVSyncStatus {
is_running: false,
last_sync: None,
files_processed: 0,
files_remaining: 0,
current_folder: None,
errors: Vec::new(),
})
}
} }

View File

@ -0,0 +1,615 @@
use readur::webdav_service::{WebDAVService, WebDAVConfig, FileInfo, RetryConfig};
use readur::models::*;
use tokio;
use chrono::{DateTime, Utc};
use uuid::Uuid;
// Mock WebDAV responses for comprehensive testing
fn mock_nextcloud_propfind_response() -> String {
r#"<?xml version="1.0"?>
<d:multistatus xmlns:d="DAV:" xmlns:s="http://sabredav.org/ns" xmlns:oc="http://owncloud.org/ns" xmlns:nc="http://nextcloud.org/ns">
<d:response>
<d:href>/remote.php/dav/files/admin/Documents/</d:href>
<d:propstat>
<d:prop>
<d:displayname>Documents</d:displayname>
<d:getlastmodified>Tue, 01 Jan 2024 12:00:00 GMT</d:getlastmodified>
<d:resourcetype>
<d:collection/>
</d:resourcetype>
<d:getetag>"abc123"</d:getetag>
</d:prop>
<d:status>HTTP/1.1 200 OK</d:status>
</d:propstat>
</d:response>
<d:response>
<d:href>/remote.php/dav/files/admin/Documents/report.pdf</d:href>
<d:propstat>
<d:prop>
<d:displayname>report.pdf</d:displayname>
<d:getcontentlength>2048000</d:getcontentlength>
<d:getlastmodified>Mon, 15 Jan 2024 14:30:00 GMT</d:getlastmodified>
<d:getcontenttype>application/pdf</d:getcontenttype>
<d:getetag>"pdf123"</d:getetag>
<d:resourcetype/>
</d:prop>
<d:status>HTTP/1.1 200 OK</d:status>
</d:propstat>
</d:response>
<d:response>
<d:href>/remote.php/dav/files/admin/Documents/photo.png</d:href>
<d:propstat>
<d:prop>
<d:displayname>photo.png</d:displayname>
<d:getcontentlength>768000</d:getcontentlength>
<d:getlastmodified>Wed, 10 Jan 2024 09:15:00 GMT</d:getlastmodified>
<d:getcontenttype>image/png</d:getcontenttype>
<d:getetag>"png456"</d:getetag>
<d:resourcetype/>
</d:prop>
<d:status>HTTP/1.1 200 OK</d:status>
</d:propstat>
</d:response>
<d:response>
<d:href>/remote.php/dav/files/admin/Documents/unsupported.docx</d:href>
<d:propstat>
<d:prop>
<d:displayname>unsupported.docx</d:displayname>
<d:getcontentlength>102400</d:getcontentlength>
<d:getlastmodified>Thu, 20 Jan 2024 16:45:00 GMT</d:getlastmodified>
<d:getcontenttype>application/vnd.openxmlformats-officedocument.wordprocessingml.document</d:getcontenttype>
<d:getetag>"docx789"</d:getetag>
<d:resourcetype/>
</d:prop>
<d:status>HTTP/1.1 200 OK</d:status>
</d:propstat>
</d:response>
</d:multistatus>"#.to_string()
}
fn mock_empty_folder_response() -> String {
r#"<?xml version="1.0"?>
<d:multistatus xmlns:d="DAV:">
<d:response>
<d:href>/webdav/EmptyFolder/</d:href>
<d:propstat>
<d:prop>
<d:displayname>EmptyFolder</d:displayname>
<d:getlastmodified>Fri, 01 Jan 2024 12:00:00 GMT</d:getlastmodified>
<d:resourcetype>
<d:collection/>
</d:resourcetype>
</d:prop>
<d:status>HTTP/1.1 200 OK</d:status>
</d:propstat>
</d:response>
</d:multistatus>"#.to_string()
}
fn mock_malformed_xml_response() -> String {
r#"<?xml version="1.0"?>
<d:multistatus xmlns:d="DAV:">
<d:response>
<d:href>/webdav/test.pdf</d:href>
<d:propstat>
<d:prop>
<d:displayname>test.pdf
<!-- Missing closing tag -->
</d:prop>
</d:propstat>
</d:response>
<!-- Incomplete XML -->"#.to_string()
}
#[test]
fn test_webdav_config_validation() {
// Test valid config
let valid_config = WebDAVConfig {
server_url: "https://cloud.example.com".to_string(),
username: "testuser".to_string(),
password: "testpass".to_string(),
watch_folders: vec!["/Documents".to_string(), "/Photos".to_string()],
file_extensions: vec!["pdf".to_string(), "png".to_string(), "jpg".to_string()],
timeout_seconds: 30,
server_type: Some("nextcloud".to_string()),
};
assert!(WebDAVService::new(valid_config).is_ok());
// Test config with empty server URL
let invalid_config = WebDAVConfig {
server_url: "".to_string(),
username: "testuser".to_string(),
password: "testpass".to_string(),
watch_folders: vec!["/Documents".to_string()],
file_extensions: vec!["pdf".to_string()],
timeout_seconds: 30,
server_type: Some("nextcloud".to_string()),
};
// Should still create service, validation happens during actual requests
assert!(WebDAVService::new(invalid_config).is_ok());
}
#[test]
fn test_webdav_url_construction_comprehensive() {
// Test Nextcloud URL construction
let nextcloud_config = WebDAVConfig {
server_url: "https://nextcloud.example.com".to_string(),
username: "admin".to_string(),
password: "secret".to_string(),
watch_folders: vec!["/Documents".to_string()],
file_extensions: vec!["pdf".to_string()],
timeout_seconds: 30,
server_type: Some("nextcloud".to_string()),
};
let service = WebDAVService::new(nextcloud_config).unwrap();
// URL construction is tested implicitly during service creation
// Test ownCloud URL construction
let owncloud_config = WebDAVConfig {
server_url: "https://cloud.example.com/".to_string(), // With trailing slash
username: "user123".to_string(),
password: "pass123".to_string(),
watch_folders: vec!["/Shared".to_string()],
file_extensions: vec!["jpg".to_string()],
timeout_seconds: 60,
server_type: Some("owncloud".to_string()),
};
assert!(WebDAVService::new(owncloud_config).is_ok());
// Test generic WebDAV URL construction
let generic_config = WebDAVConfig {
server_url: "https://webdav.example.com".to_string(),
username: "webdavuser".to_string(),
password: "webdavpass".to_string(),
watch_folders: vec!["/Files".to_string()],
file_extensions: vec!["txt".to_string()],
timeout_seconds: 45,
server_type: None, // No server type = generic
};
assert!(WebDAVService::new(generic_config).is_ok());
}
#[test]
fn test_webdav_response_parsing_comprehensive() {
let config = WebDAVConfig {
server_url: "https://cloud.example.com".to_string(),
username: "admin".to_string(),
password: "testpass".to_string(),
watch_folders: vec!["/Documents".to_string()],
file_extensions: vec!["pdf".to_string(), "png".to_string(), "jpg".to_string()],
timeout_seconds: 30,
server_type: Some("nextcloud".to_string()),
};
let service = WebDAVService::new(config).unwrap();
// Test Nextcloud response parsing
let nextcloud_response = mock_nextcloud_propfind_response();
let files = service.parse_webdav_response(&nextcloud_response);
assert!(files.is_ok());
let files = files.unwrap();
assert_eq!(files.len(), 3); // Should have 3 files (excluding directory)
// Verify first file (report.pdf)
let pdf_file = files.iter().find(|f| f.name == "report.pdf").unwrap();
assert_eq!(pdf_file.size, 2048000);
assert_eq!(pdf_file.mime_type, "application/pdf");
assert_eq!(pdf_file.etag, "\"pdf123\"");
assert!(!pdf_file.is_directory);
// Verify second file (photo.png)
let png_file = files.iter().find(|f| f.name == "photo.png").unwrap();
assert_eq!(png_file.size, 768000);
assert_eq!(png_file.mime_type, "image/png");
assert_eq!(png_file.etag, "\"png456\"");
assert!(!png_file.is_directory);
// Verify third file (unsupported.docx)
let docx_file = files.iter().find(|f| f.name == "unsupported.docx").unwrap();
assert_eq!(docx_file.size, 102400);
assert_eq!(docx_file.mime_type, "application/vnd.openxmlformats-officedocument.wordprocessingml.document");
assert_eq!(docx_file.etag, "\"docx789\"");
assert!(!docx_file.is_directory);
}
#[test]
fn test_empty_folder_parsing() {
let config = WebDAVConfig {
server_url: "https://cloud.example.com".to_string(),
username: "testuser".to_string(),
password: "testpass".to_string(),
watch_folders: vec!["/EmptyFolder".to_string()],
file_extensions: vec!["pdf".to_string()],
timeout_seconds: 30,
server_type: Some("generic".to_string()),
};
let service = WebDAVService::new(config).unwrap();
let response = mock_empty_folder_response();
let files = service.parse_webdav_response(&response);
assert!(files.is_ok());
let files = files.unwrap();
assert_eq!(files.len(), 0); // Empty folder should have no files
}
#[test]
fn test_malformed_xml_handling() {
let config = WebDAVConfig {
server_url: "https://cloud.example.com".to_string(),
username: "testuser".to_string(),
password: "testpass".to_string(),
watch_folders: vec!["/Documents".to_string()],
file_extensions: vec!["pdf".to_string()],
timeout_seconds: 30,
server_type: Some("nextcloud".to_string()),
};
let service = WebDAVService::new(config).unwrap();
let response = mock_malformed_xml_response();
// Current simple parser might still extract some data from malformed XML
let result = service.parse_webdav_response(&response);
// It might succeed or fail depending on how robust the parser is
assert!(result.is_ok() || result.is_err());
}
#[test]
fn test_retry_config_custom_values() {
let custom_retry = RetryConfig {
max_retries: 5,
initial_delay_ms: 500,
max_delay_ms: 15000,
backoff_multiplier: 1.5,
timeout_seconds: 90,
};
assert_eq!(custom_retry.max_retries, 5);
assert_eq!(custom_retry.initial_delay_ms, 500);
assert_eq!(custom_retry.max_delay_ms, 15000);
assert_eq!(custom_retry.backoff_multiplier, 1.5);
assert_eq!(custom_retry.timeout_seconds, 90);
let config = WebDAVConfig {
server_url: "https://cloud.example.com".to_string(),
username: "testuser".to_string(),
password: "testpass".to_string(),
watch_folders: vec!["/Documents".to_string()],
file_extensions: vec!["pdf".to_string()],
timeout_seconds: 30,
server_type: Some("nextcloud".to_string()),
};
assert!(WebDAVService::new_with_retry(config, custom_retry).is_ok());
}
#[test]
fn test_file_extension_matching() {
let supported_extensions = vec!["pdf", "png", "jpg", "jpeg", "tiff", "bmp", "txt"];
let test_cases = vec![
("document.pdf", true),
("image.PNG", true), // Case insensitive
("photo.jpg", true),
("photo.JPEG", true),
("scan.tiff", true),
("bitmap.bmp", true),
("readme.txt", true),
("spreadsheet.xlsx", false),
("presentation.pptx", false),
("archive.zip", false),
("script.sh", false),
("no_extension", false),
(".hidden", false),
];
for (filename, should_match) in test_cases {
let extension = std::path::Path::new(filename)
.extension()
.and_then(|ext| ext.to_str())
.map(|ext| ext.to_lowercase());
let matches = extension
.as_ref()
.map(|ext| supported_extensions.contains(&ext.as_str()))
.unwrap_or(false);
assert_eq!(matches, should_match,
"File '{}' extension matching failed. Expected: {}, Got: {}",
filename, should_match, matches);
}
}
#[test]
fn test_webdav_sync_state_model() {
let sync_state = WebDAVSyncState {
id: Uuid::new_v4(),
user_id: Uuid::new_v4(),
last_sync_at: Some(Utc::now()),
sync_cursor: Some("cursor123".to_string()),
is_running: true,
files_processed: 42,
files_remaining: 58,
current_folder: Some("/Documents".to_string()),
errors: vec!["Error 1".to_string(), "Error 2".to_string()],
created_at: Utc::now(),
updated_at: Utc::now(),
};
assert!(sync_state.is_running);
assert_eq!(sync_state.files_processed, 42);
assert_eq!(sync_state.files_remaining, 58);
assert_eq!(sync_state.current_folder, Some("/Documents".to_string()));
assert_eq!(sync_state.errors.len(), 2);
}
#[test]
fn test_webdav_file_model() {
let document_id = Uuid::new_v4();
let webdav_file = WebDAVFile {
id: Uuid::new_v4(),
user_id: Uuid::new_v4(),
webdav_path: "/Documents/report.pdf".to_string(),
etag: "\"abc123\"".to_string(),
last_modified: Some(Utc::now()),
file_size: 2048000,
mime_type: "application/pdf".to_string(),
document_id: Some(document_id),
sync_status: "completed".to_string(),
sync_error: None,
created_at: Utc::now(),
updated_at: Utc::now(),
};
assert_eq!(webdav_file.webdav_path, "/Documents/report.pdf");
assert_eq!(webdav_file.etag, "\"abc123\"");
assert_eq!(webdav_file.file_size, 2048000);
assert_eq!(webdav_file.sync_status, "completed");
assert!(webdav_file.sync_error.is_none());
}
#[test]
fn test_create_webdav_file_model() {
let user_id = Uuid::new_v4();
let create_file = CreateWebDAVFile {
user_id,
webdav_path: "/Photos/vacation.jpg".to_string(),
etag: "\"photo123\"".to_string(),
last_modified: Some(Utc::now()),
file_size: 1536000,
mime_type: "image/jpeg".to_string(),
document_id: None,
sync_status: "pending".to_string(),
sync_error: None,
};
assert_eq!(create_file.user_id, user_id);
assert_eq!(create_file.webdav_path, "/Photos/vacation.jpg");
assert_eq!(create_file.file_size, 1536000);
assert_eq!(create_file.sync_status, "pending");
}
#[test]
fn test_update_webdav_sync_state_model() {
let update_state = UpdateWebDAVSyncState {
last_sync_at: Some(Utc::now()),
sync_cursor: Some("new_cursor".to_string()),
is_running: false,
files_processed: 100,
files_remaining: 0,
current_folder: None,
errors: Vec::new(),
};
assert!(!update_state.is_running);
assert_eq!(update_state.files_processed, 100);
assert_eq!(update_state.files_remaining, 0);
assert!(update_state.current_folder.is_none());
assert!(update_state.errors.is_empty());
}
#[test]
fn test_ocr_priority_calculation_comprehensive() {
let test_cases = vec![
// Size boundaries
(0, 10), // 0 bytes
(1, 10), // 1 byte
(1048576, 10), // Exactly 1MB
(1048577, 8), // 1MB + 1 byte
(5242880, 8), // Exactly 5MB
(5242881, 6), // 5MB + 1 byte
(10485760, 6), // Exactly 10MB
(10485761, 4), // 10MB + 1 byte
(52428800, 4), // Exactly 50MB
(52428801, 2), // 50MB + 1 byte
(104857600, 2), // 100MB
(1073741824, 2), // 1GB
];
for (file_size, expected_priority) in test_cases {
let priority = match file_size {
0..=1048576 => 10, // <= 1MB
..=5242880 => 8, // 1-5MB
..=10485760 => 6, // 5-10MB
..=52428800 => 4, // 10-50MB
_ => 2, // > 50MB
};
assert_eq!(priority, expected_priority,
"Priority calculation failed for file size {} bytes", file_size);
}
}
#[test]
fn test_sync_status_serialization() {
let sync_status = WebDAVSyncStatus {
is_running: true,
last_sync: Some(Utc::now()),
files_processed: 25,
files_remaining: 75,
current_folder: Some("/Documents/Reports".to_string()),
errors: vec!["Connection timeout".to_string()],
};
// Test that the status can be serialized to JSON
let json = serde_json::to_string(&sync_status);
assert!(json.is_ok());
let json_str = json.unwrap();
assert!(json_str.contains("\"is_running\":true"));
assert!(json_str.contains("\"files_processed\":25"));
assert!(json_str.contains("\"files_remaining\":75"));
assert!(json_str.contains("\"current_folder\":\"/Documents/Reports\""));
}
#[test]
fn test_crawl_estimate_calculation() {
let folder1 = WebDAVFolderInfo {
path: "/Documents".to_string(),
total_files: 100,
supported_files: 80,
estimated_time_hours: 0.044, // ~2.6 minutes
total_size_mb: 150.0,
};
let folder2 = WebDAVFolderInfo {
path: "/Photos".to_string(),
total_files: 200,
supported_files: 150,
estimated_time_hours: 0.083, // ~5 minutes
total_size_mb: 500.0,
};
let estimate = WebDAVCrawlEstimate {
folders: vec![folder1, folder2],
total_files: 300,
total_supported_files: 230,
total_estimated_time_hours: 0.127, // ~7.6 minutes
total_size_mb: 650.0,
};
assert_eq!(estimate.folders.len(), 2);
assert_eq!(estimate.total_files, 300);
assert_eq!(estimate.total_supported_files, 230);
assert!((estimate.total_estimated_time_hours - 0.127).abs() < 0.001);
assert_eq!(estimate.total_size_mb, 650.0);
}
#[test]
fn test_connection_result_variants() {
// Success case
let success_result = WebDAVConnectionResult {
success: true,
message: "Connected successfully to Nextcloud 28.0.1".to_string(),
server_version: Some("28.0.1".to_string()),
server_type: Some("nextcloud".to_string()),
};
assert!(success_result.success);
assert!(success_result.server_version.is_some());
assert_eq!(success_result.server_type, Some("nextcloud".to_string()));
// Failure case
let failure_result = WebDAVConnectionResult {
success: false,
message: "Authentication failed: 401 Unauthorized".to_string(),
server_version: None,
server_type: None,
};
assert!(!failure_result.success);
assert!(failure_result.server_version.is_none());
assert!(failure_result.server_type.is_none());
assert!(failure_result.message.contains("401"));
}
#[test]
fn test_notification_creation_for_webdav() {
let notification = CreateNotification {
notification_type: "info".to_string(),
title: "WebDAV Sync Started".to_string(),
message: "Synchronizing files from Nextcloud server".to_string(),
action_url: Some("/sync-status".to_string()),
metadata: Some(serde_json::json!({
"sync_type": "webdav",
"folders": ["/Documents", "/Photos"],
"estimated_files": 150
})),
};
assert_eq!(notification.notification_type, "info");
assert_eq!(notification.title, "WebDAV Sync Started");
assert!(notification.action_url.is_some());
let metadata = notification.metadata.unwrap();
assert_eq!(metadata["sync_type"], "webdav");
assert!(metadata["folders"].is_array());
assert_eq!(metadata["estimated_files"], 150);
}
#[test]
fn test_special_characters_in_paths() {
let test_paths = vec![
"/Documents/File with spaces.pdf",
"/Documents/Ñoño/archivo.pdf",
"/Documents/测试文件.pdf",
"/Documents/файл.pdf",
"/Documents/50%.pdf",
"/Documents/file&name.pdf",
"/Documents/file#1.pdf",
];
for path in test_paths {
let file_info = FileInfo {
path: path.to_string(),
name: std::path::Path::new(path)
.file_name()
.unwrap()
.to_str()
.unwrap()
.to_string(),
size: 1024,
mime_type: "application/pdf".to_string(),
last_modified: Some(Utc::now()),
etag: "\"test123\"".to_string(),
is_directory: false,
};
assert!(!file_info.name.is_empty());
assert!(file_info.name.ends_with(".pdf"));
}
}
#[test]
fn test_backoff_delay_calculation() {
let retry_config = RetryConfig::default();
let mut delays = Vec::new();
let mut delay = retry_config.initial_delay_ms;
for _ in 0..5 {
delays.push(delay);
delay = ((delay as f64 * retry_config.backoff_multiplier) as u64)
.min(retry_config.max_delay_ms);
}
assert_eq!(delays[0], 1000); // 1s
assert_eq!(delays[1], 2000); // 2s
assert_eq!(delays[2], 4000); // 4s
assert_eq!(delays[3], 8000); // 8s
assert_eq!(delays[4], 16000); // 16s
// Verify max delay is respected
for _ in 0..10 {
delay = ((delay as f64 * retry_config.backoff_multiplier) as u64)
.min(retry_config.max_delay_ms);
}
assert_eq!(delay, retry_config.max_delay_ms);
}