From ea8ad2c262cb73c6debd607669f0db41799ee989 Mon Sep 17 00:00:00 2001 From: perf3ct Date: Sun, 15 Jun 2025 17:51:04 +0000 Subject: [PATCH] feat(server/client): working s3 and local source types --- Cargo.lock | 972 +++++++++++++++++++++++++++-- Cargo.toml | 7 +- frontend/src/pages/SourcesPage.tsx | 52 +- src/db.rs | 111 ++++ src/lib.rs | 4 + src/local_folder_service.rs | 282 +++++++++ src/main.rs | 15 +- src/models.rs | 24 + src/s3_service.rs | 330 ++++++++++ src/s3_service_stub.rs | 37 ++ src/source_scheduler.rs | 319 ++++++++++ src/source_sync.rs | 383 ++++++++++++ 12 files changed, 2471 insertions(+), 65 deletions(-) create mode 100644 src/local_folder_service.rs create mode 100644 src/s3_service.rs create mode 100644 src/s3_service_stub.rs create mode 100644 src/source_scheduler.rs create mode 100644 src/source_sync.rs diff --git a/Cargo.lock b/Cargo.lock index 4c18977..d78865b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -238,6 +238,435 @@ dependencies = [ "arrayvec", ] +[[package]] +name = "aws-config" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "455e9fb7743c6f6267eb2830ccc08686fbb3d13c9a689369562fd4d4ef9ea462" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-sdk-sso", + "aws-sdk-ssooidc", + "aws-sdk-sts", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "fastrand", + "hex", + "http 1.3.1", + "ring", + "time", + "tokio", + "tracing", + "url", + "zeroize", +] + +[[package]] +name = "aws-credential-types" +version = "1.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "687bc16bc431a8533fe0097c7f0182874767f920989d7260950172ae8e3c4465" +dependencies = [ + "aws-smithy-async", + "aws-smithy-runtime-api", + "aws-smithy-types", + "zeroize", +] + +[[package]] +name = "aws-lc-rs" +version = "1.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93fcc8f365936c834db5514fc45aee5b1202d677e6b40e48468aaaa8183ca8c7" +dependencies = [ + "aws-lc-sys", + "zeroize", +] + +[[package]] +name = "aws-lc-sys" +version = "0.29.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61b1d86e7705efe1be1b569bab41d4fa1e14e220b60a160f78de2db687add079" +dependencies = [ + "bindgen 0.69.5", + "cc", + "cmake", + "dunce", + "fs_extra", +] + +[[package]] +name = "aws-runtime" +version = "1.5.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4f6c68419d8ba16d9a7463671593c54f81ba58cab466e9b759418da606dcc2e2" +dependencies = [ + "aws-credential-types", + "aws-sigv4", + "aws-smithy-async", + "aws-smithy-eventstream", + "aws-smithy-http", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "fastrand", + "http 0.2.12", + "http-body 0.4.6", + "percent-encoding", + "pin-project-lite", + "tracing", + "uuid", +] + +[[package]] +name = "aws-sdk-s3" +version = "1.92.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f68db68f26c6337fb89c15916d5ac59c1b4224eb0111492a4f7b85c1058f9ca8" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-sigv4", + "aws-smithy-async", + "aws-smithy-checksums", + "aws-smithy-eventstream", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-smithy-xml", + "aws-types", + "bytes", + "fastrand", + "hex", + "hmac", + "http 0.2.12", + "http 1.3.1", + "http-body 0.4.6", + "lru", + "percent-encoding", + "regex-lite", + "sha2", + "tracing", + "url", +] + +[[package]] +name = "aws-sdk-sso" +version = "1.73.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2ac1674cba7872061a29baaf02209fefe499ff034dfd91bd4cc59e4d7741489" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "fastrand", + "http 0.2.12", + "regex-lite", + "tracing", +] + +[[package]] +name = "aws-sdk-ssooidc" +version = "1.74.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3a6a22f077f5fd3e3c0270d4e1a110346cddf6769e9433eb9e6daceb4ca3b149" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "fastrand", + "http 0.2.12", + "regex-lite", + "tracing", +] + +[[package]] +name = "aws-sdk-sts" +version = "1.74.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19d440e1d368759bd10df0dbdddbfff6473d7cd73e9d9ef2363dc9995ac2d711" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-query", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-smithy-xml", + "aws-types", + "fastrand", + "http 0.2.12", + "regex-lite", + "tracing", +] + +[[package]] +name = "aws-sigv4" +version = "1.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ddfb9021f581b71870a17eac25b52335b82211cdc092e02b6876b2bcefa61666" +dependencies = [ + "aws-credential-types", + "aws-smithy-eventstream", + "aws-smithy-http", + "aws-smithy-runtime-api", + "aws-smithy-types", + "bytes", + "crypto-bigint 0.5.5", + "form_urlencoded", + "hex", + "hmac", + "http 0.2.12", + "http 1.3.1", + "p256", + "percent-encoding", + "ring", + "sha2", + "subtle", + "time", + "tracing", + "zeroize", +] + +[[package]] +name = "aws-smithy-async" +version = "1.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e190749ea56f8c42bf15dd76c65e14f8f765233e6df9b0506d9d934ebef867c" +dependencies = [ + "futures-util", + "pin-project-lite", + "tokio", +] + +[[package]] +name = "aws-smithy-checksums" +version = "0.63.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2f77a921dbd2c78ebe70726799787c1d110a2245dd65e39b20923dfdfb2deee" +dependencies = [ + "aws-smithy-http", + "aws-smithy-types", + "bytes", + "crc-fast", + "hex", + "http 0.2.12", + "http-body 0.4.6", + "md-5", + "pin-project-lite", + "sha1", + "sha2", + "tracing", +] + +[[package]] +name = "aws-smithy-eventstream" +version = "0.60.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "338a3642c399c0a5d157648426110e199ca7fd1c689cc395676b81aa563700c4" +dependencies = [ + "aws-smithy-types", + "bytes", + "crc32fast", +] + +[[package]] +name = "aws-smithy-http" +version = "0.62.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "99335bec6cdc50a346fda1437f9fefe33abf8c99060739a546a16457f2862ca9" +dependencies = [ + "aws-smithy-eventstream", + "aws-smithy-runtime-api", + "aws-smithy-types", + "bytes", + "bytes-utils", + "futures-core", + "http 0.2.12", + "http 1.3.1", + "http-body 0.4.6", + "percent-encoding", + "pin-project-lite", + "pin-utils", + "tracing", +] + +[[package]] +name = "aws-smithy-http-client" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f491388e741b7ca73b24130ff464c1478acc34d5b331b7dd0a2ee4643595a15" +dependencies = [ + "aws-smithy-async", + "aws-smithy-runtime-api", + "aws-smithy-types", + "h2 0.3.26", + "h2 0.4.10", + "http 0.2.12", + "http 1.3.1", + "http-body 0.4.6", + "hyper 0.14.32", + "hyper 1.6.0", + "hyper-rustls 0.24.2", + "hyper-rustls 0.27.7", + "hyper-util", + "pin-project-lite", + "rustls 0.21.12", + "rustls 0.23.27", + "rustls-native-certs 0.8.1", + "rustls-pki-types", + "tokio", + "tower", + "tracing", +] + +[[package]] +name = "aws-smithy-json" +version = "0.61.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a16e040799d29c17412943bdbf488fd75db04112d0c0d4b9290bacf5ae0014b9" +dependencies = [ + "aws-smithy-types", +] + +[[package]] +name = "aws-smithy-observability" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9364d5989ac4dd918e5cc4c4bdcc61c9be17dcd2586ea7f69e348fc7c6cab393" +dependencies = [ + "aws-smithy-runtime-api", +] + +[[package]] +name = "aws-smithy-query" +version = "0.60.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2fbd61ceb3fe8a1cb7352e42689cec5335833cd9f94103a61e98f9bb61c64bb" +dependencies = [ + "aws-smithy-types", + "urlencoding", +] + +[[package]] +name = "aws-smithy-runtime" +version = "1.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14302f06d1d5b7d333fd819943075b13d27c7700b414f574c3c35859bfb55d5e" +dependencies = [ + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-http-client", + "aws-smithy-observability", + "aws-smithy-runtime-api", + "aws-smithy-types", + "bytes", + "fastrand", + "http 0.2.12", + "http 1.3.1", + "http-body 0.4.6", + "http-body 1.0.1", + "pin-project-lite", + "pin-utils", + "tokio", + "tracing", +] + +[[package]] +name = "aws-smithy-runtime-api" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd8531b6d8882fd8f48f82a9754e682e29dd44cff27154af51fa3eb730f59efb" +dependencies = [ + "aws-smithy-async", + "aws-smithy-types", + "bytes", + "http 0.2.12", + "http 1.3.1", + "pin-project-lite", + "tokio", + "tracing", + "zeroize", +] + +[[package]] +name = "aws-smithy-types" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d498595448e43de7f4296b7b7a18a8a02c61ec9349128c80a368f7c3b4ab11a8" +dependencies = [ + "base64-simd", + "bytes", + "bytes-utils", + "futures-core", + "http 0.2.12", + "http 1.3.1", + "http-body 0.4.6", + "http-body 1.0.1", + "http-body-util", + "itoa", + "num-integer", + "pin-project-lite", + "pin-utils", + "ryu", + "serde", + "time", + "tokio", + "tokio-util", +] + +[[package]] +name = "aws-smithy-xml" +version = "0.60.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3db87b96cb1b16c024980f133968d52882ca0daaee3a086c6decc500f6c99728" +dependencies = [ + "xmlparser", +] + +[[package]] +name = "aws-types" +version = "1.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a322fec39e4df22777ed3ad8ea868ac2f94cd15e1a55f6ee8d8d6305057689a" +dependencies = [ + "aws-credential-types", + "aws-smithy-async", + "aws-smithy-runtime-api", + "aws-smithy-types", + "rustc_version", + "tracing", +] + [[package]] name = "axum" version = "0.8.4" @@ -248,10 +677,10 @@ dependencies = [ "bytes", "form_urlencoded", "futures-util", - "http", - "http-body", + "http 1.3.1", + "http-body 1.0.1", "http-body-util", - "hyper", + "hyper 1.6.0", "hyper-util", "itoa", "matchit", @@ -281,8 +710,8 @@ checksum = "68464cd0412f486726fb3373129ef5d2993f90c34bc2bc1c1e9943b2f4fc7ca6" dependencies = [ "bytes", "futures-core", - "http", - "http-body", + "http 1.3.1", + "http-body 1.0.1", "http-body-util", "mime", "pin-project-lite", @@ -308,6 +737,12 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "base16ct" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "349a06037c7bf932dd7e7d1f653678b2038b9ad46a74102f1fc7bd7872678cce" + [[package]] name = "base64" version = "0.21.7" @@ -320,6 +755,16 @@ version = "0.22.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" +[[package]] +name = "base64-simd" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "339abbe78e73178762e23bea9dfd08e697eb3f3301cd4be981c0f78ba5859195" +dependencies = [ + "outref", + "vsimd", +] + [[package]] name = "base64ct" version = "1.6.0" @@ -361,6 +806,29 @@ dependencies = [ "which", ] +[[package]] +name = "bindgen" +version = "0.69.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "271383c67ccabffb7381723dea0672a673f292304fcb45c01cc648c7a8d58088" +dependencies = [ + "bitflags 2.9.1", + "cexpr", + "clang-sys", + "itertools", + "lazy_static", + "lazycell", + "log", + "prettyplease", + "proc-macro2", + "quote", + "regex", + "rustc-hash", + "shlex", + "syn 2.0.103", + "which", +] + [[package]] name = "bit_field" version = "0.10.2" @@ -429,18 +897,18 @@ dependencies = [ "futures-util", "hex", "home", - "http", + "http 1.3.1", "http-body-util", - "hyper", + "hyper 1.6.0", "hyper-named-pipe", - "hyper-rustls", + "hyper-rustls 0.27.7", "hyper-util", "hyperlocal", "log", "pin-project-lite", - "rustls", - "rustls-native-certs", - "rustls-pemfile", + "rustls 0.23.27", + "rustls-native-certs 0.8.1", + "rustls-pemfile 2.2.0", "rustls-pki-types", "serde", "serde_derive", @@ -508,6 +976,16 @@ version = "1.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a" +[[package]] +name = "bytes-utils" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7dafe3a8757b027e2be6e4e5601ed563c55989fcf1546e933c66c8eb3a058d35" +dependencies = [ + "bytes", + "either", +] + [[package]] name = "cbc" version = "0.1.2" @@ -635,6 +1113,15 @@ version = "0.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b94f61472cee1439c0b966b47e3aca9ae07e45d070759512cd390ea2bebc6675" +[[package]] +name = "cmake" +version = "0.1.54" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7caa3f9de89ddbe2c607f4101924c5abec803763ae9534e4f4d7d8f84aa81f0" +dependencies = [ + "cc", +] + [[package]] name = "color_quant" version = "1.1.0" @@ -712,6 +1199,19 @@ version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5" +[[package]] +name = "crc-fast" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6bf62af4cc77d8fe1c22dde4e721d87f2f54056139d8c412e1366b740305f56f" +dependencies = [ + "crc", + "digest", + "libc", + "rand 0.9.1", + "regex", +] + [[package]] name = "crc32fast" version = "1.4.2" @@ -761,6 +1261,28 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "43da5946c66ffcc7745f48db692ffbb10a83bfe0afd96235c5c2a4fb23994929" +[[package]] +name = "crypto-bigint" +version = "0.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef2b4b23cddf68b89b8f8069890e8c270d54e2d5fe1b143820234805e4cb17ef" +dependencies = [ + "generic-array", + "rand_core 0.6.4", + "subtle", + "zeroize", +] + +[[package]] +name = "crypto-bigint" +version = "0.5.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0dc92fb57ca44df6db8059111ab3af99a63d5d0f8375d9972e319a379c6bab76" +dependencies = [ + "rand_core 0.6.4", + "subtle", +] + [[package]] name = "crypto-common" version = "0.1.6" @@ -806,6 +1328,16 @@ dependencies = [ "syn 2.0.103", ] +[[package]] +name = "der" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1a467a65c5e759bce6e65eaf91cc29f466cdc57cb65777bd646872a8a1fd4de" +dependencies = [ + "const-oid", + "zeroize", +] + [[package]] name = "der" version = "0.7.10" @@ -878,6 +1410,12 @@ version = "0.15.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1aaf95b3e5c8f23aa320147307562d361db0ae0d51242340f558153b4eb2439b" +[[package]] +name = "dunce" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "92773504d58c093f6de2459af4af33faa518c13451eb8f2b5698ed3d36e7c813" + [[package]] name = "dyn-clone" version = "1.0.19" @@ -893,6 +1431,18 @@ dependencies = [ "cipher", ] +[[package]] +name = "ecdsa" +version = "0.14.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "413301934810f597c1d19ca71c8710e99a3f1ba28a0d2ebc01551a2daeea3c5c" +dependencies = [ + "der 0.6.1", + "elliptic-curve", + "rfc6979", + "signature 1.6.4", +] + [[package]] name = "either" version = "1.15.0" @@ -902,6 +1452,26 @@ dependencies = [ "serde", ] +[[package]] +name = "elliptic-curve" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7bb888ab5300a19b8e5bceef25ac745ad065f3c9f7efc6de1b91958110891d3" +dependencies = [ + "base16ct", + "crypto-bigint 0.4.9", + "der 0.6.1", + "digest", + "ff", + "generic-array", + "group", + "pkcs8 0.9.0", + "rand_core 0.6.4", + "sec1", + "subtle", + "zeroize", +] + [[package]] name = "encoding_rs" version = "0.8.35" @@ -1019,6 +1589,16 @@ dependencies = [ "simd-adler32", ] +[[package]] +name = "ff" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d013fc25338cc558c5c2cfbad646908fb23591e2404481826742b651c9af7160" +dependencies = [ + "rand_core 0.6.4", + "subtle", +] + [[package]] name = "filetime" version = "0.2.25" @@ -1089,6 +1669,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "fs_extra" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c" + [[package]] name = "fsevent-sys" version = "4.1.0" @@ -1255,6 +1841,36 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a8d1add55171497b4705a648c6b583acafb01d58050a51727785f0b2c8e0a2b2" +[[package]] +name = "group" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5dfbfb3a6cfbd390d5c9564ab283a0349b9b9fcd46a706c1eb10e0db70bfbac7" +dependencies = [ + "ff", + "rand_core 0.6.4", + "subtle", +] + +[[package]] +name = "h2" +version = "0.3.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81fe527a889e1532da5c525686d96d4c2e74cdd345badf8dfef9f6b39dd5f5e8" +dependencies = [ + "bytes", + "fnv", + "futures-core", + "futures-sink", + "futures-util", + "http 0.2.12", + "indexmap 2.9.0", + "slab", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "h2" version = "0.4.10" @@ -1266,7 +1882,7 @@ dependencies = [ "fnv", "futures-core", "futures-sink", - "http", + "http 1.3.1", "indexmap 2.9.0", "slab", "tokio", @@ -1360,6 +1976,17 @@ dependencies = [ "windows-link", ] +[[package]] +name = "http" +version = "0.2.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "601cbb57e577e2f5ef5be8e7b83f0f63994f25aa94d673e54a92d5c516d101f1" +dependencies = [ + "bytes", + "fnv", + "itoa", +] + [[package]] name = "http" version = "1.3.1" @@ -1371,6 +1998,17 @@ dependencies = [ "itoa", ] +[[package]] +name = "http-body" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2" +dependencies = [ + "bytes", + "http 0.2.12", + "pin-project-lite", +] + [[package]] name = "http-body" version = "1.0.1" @@ -1378,7 +2016,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184" dependencies = [ "bytes", - "http", + "http 1.3.1", ] [[package]] @@ -1389,8 +2027,8 @@ checksum = "b021d93e26becf5dc7e1b75b1bed1fd93124b374ceb73f43d4d4eafec896a64a" dependencies = [ "bytes", "futures-core", - "http", - "http-body", + "http 1.3.1", + "http-body 1.0.1", "pin-project-lite", ] @@ -1412,6 +2050,30 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" +[[package]] +name = "hyper" +version = "0.14.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41dfc780fdec9373c01bae43289ea34c972e40ee3c9f6b3c8801a35f35586ce7" +dependencies = [ + "bytes", + "futures-channel", + "futures-core", + "futures-util", + "h2 0.3.26", + "http 0.2.12", + "http-body 0.4.6", + "httparse", + "httpdate", + "itoa", + "pin-project-lite", + "socket2", + "tokio", + "tower-service", + "tracing", + "want", +] + [[package]] name = "hyper" version = "1.6.0" @@ -1421,9 +2083,9 @@ dependencies = [ "bytes", "futures-channel", "futures-util", - "h2", - "http", - "http-body", + "h2 0.4.10", + "http 1.3.1", + "http-body 1.0.1", "httparse", "httpdate", "itoa", @@ -1440,7 +2102,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "73b7d8abf35697b81a825e386fc151e0d503e8cb5fcb93cc8669c376dfd6f278" dependencies = [ "hex", - "hyper", + "hyper 1.6.0", "hyper-util", "pin-project-lite", "tokio", @@ -1448,19 +2110,36 @@ dependencies = [ "winapi", ] +[[package]] +name = "hyper-rustls" +version = "0.24.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec3efd23720e2049821a693cbc7e65ea87c72f1c58ff2f9522ff332b1491e590" +dependencies = [ + "futures-util", + "http 0.2.12", + "hyper 0.14.32", + "log", + "rustls 0.21.12", + "rustls-native-certs 0.6.3", + "tokio", + "tokio-rustls 0.24.1", +] + [[package]] name = "hyper-rustls" version = "0.27.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3c93eb611681b207e1fe55d5a71ecf91572ec8a6705cdb6857f7d8d5242cf58" dependencies = [ - "http", - "hyper", + "http 1.3.1", + "hyper 1.6.0", "hyper-util", - "rustls", + "rustls 0.23.27", + "rustls-native-certs 0.8.1", "rustls-pki-types", "tokio", - "tokio-rustls", + "tokio-rustls 0.26.2", "tower-service", ] @@ -1472,7 +2151,7 @@ checksum = "70206fc6890eaca9fde8a0bf71caa2ddfc9fe045ac9e5c70df101a7dbde866e0" dependencies = [ "bytes", "http-body-util", - "hyper", + "hyper 1.6.0", "hyper-util", "native-tls", "tokio", @@ -1491,9 +2170,9 @@ dependencies = [ "futures-channel", "futures-core", "futures-util", - "http", - "http-body", - "hyper", + "http 1.3.1", + "http-body 1.0.1", + "hyper 1.6.0", "ipnet", "libc", "percent-encoding", @@ -1514,7 +2193,7 @@ checksum = "986c5ce3b994526b3cd75578e62554abd09f0899d6206de48b3e96ab34ccc8c7" dependencies = [ "hex", "http-body-util", - "hyper", + "hyper 1.6.0", "hyper-util", "pin-project-lite", "tokio", @@ -1914,7 +2593,7 @@ version = "0.4.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "da627c72b2499a8106f4dd33143843015e4a631f445d561f3481f7fba35b6151" dependencies = [ - "bindgen", + "bindgen 0.64.0", "pkg-config", "vcpkg", ] @@ -2051,6 +2730,15 @@ dependencies = [ "weezl", ] +[[package]] +name = "lru" +version = "0.12.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "234cf4f4a04dc1f57e24b96cc0cd600cf2af460d4161ac5ecdd0af8e1f3b2a38" +dependencies = [ + "hashbrown 0.15.4", +] + [[package]] name = "matchit" version = "0.8.4" @@ -2146,7 +2834,7 @@ dependencies = [ "bytes", "encoding_rs", "futures-util", - "http", + "http 1.3.1", "httparse", "memchr", "mime", @@ -2464,6 +3152,12 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "outref" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a80800c0488c3a21695ea981a54918fbb37abf04f4d0720c453632255e2ff0e" + [[package]] name = "overload" version = "0.1.1" @@ -2479,6 +3173,17 @@ dependencies = [ "ttf-parser", ] +[[package]] +name = "p256" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51f44edd08f51e2ade572f141051021c5af22677e42b7dd28a88155151c33594" +dependencies = [ + "ecdsa", + "elliptic-curve", + "sha2", +] + [[package]] name = "parking" version = "2.2.1" @@ -2605,9 +3310,19 @@ version = "0.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c8ffb9f10fa047879315e6625af03c164b16962a5368d724ed16323b68ace47f" dependencies = [ - "der", - "pkcs8", - "spki", + "der 0.7.10", + "pkcs8 0.10.2", + "spki 0.7.3", +] + +[[package]] +name = "pkcs8" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9eca2c590a5f85da82668fa685c09ce2888b9430e83299debf1f34b65fd4a4ba" +dependencies = [ + "der 0.6.1", + "spki 0.6.0", ] [[package]] @@ -2616,8 +3331,8 @@ version = "0.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f950b2377845cebe5cf8b5165cb3cc1a5e0fa5cfa3e1f7f55707d8fd82e0a7b7" dependencies = [ - "der", - "spki", + "der 0.7.10", + "spki 0.7.3", ] [[package]] @@ -2675,6 +3390,16 @@ dependencies = [ "zerocopy", ] +[[package]] +name = "prettyplease" +version = "0.2.34" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6837b9e10d61f45f987d50808f83d1ee3d206c66acf650c3e4ae2e1f6ddedf55" +dependencies = [ + "proc-macro2", + "syn 2.0.103", +] + [[package]] name = "proc-macro2" version = "1.0.95" @@ -2908,6 +3633,10 @@ name = "readur" version = "0.1.0" dependencies = [ "anyhow", + "aws-config", + "aws-credential-types", + "aws-sdk-s3", + "aws-types", "axum", "base64ct", "bcrypt", @@ -3011,6 +3740,12 @@ dependencies = [ "regex-syntax", ] +[[package]] +name = "regex-lite" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53a49587ad06b26609c52e423de037e7f57f20d53535d66e08c695f347df952a" + [[package]] name = "regex-syntax" version = "0.8.5" @@ -3028,12 +3763,12 @@ dependencies = [ "encoding_rs", "futures-core", "futures-util", - "h2", - "http", - "http-body", + "h2 0.4.10", + "http 1.3.1", + "http-body 1.0.1", "http-body-util", - "hyper", - "hyper-rustls", + "hyper 1.6.0", + "hyper-rustls 0.27.7", "hyper-tls", "hyper-util", "js-sys", @@ -3059,6 +3794,17 @@ dependencies = [ "web-sys", ] +[[package]] +name = "rfc6979" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7743f17af12fa0b03b803ba12cd6a8d9483a587e89c69445e3909655c0b9fabb" +dependencies = [ + "crypto-bigint 0.4.9", + "hmac", + "zeroize", +] + [[package]] name = "rgb" version = "0.8.50" @@ -3091,10 +3837,10 @@ dependencies = [ "num-integer", "num-traits", "pkcs1", - "pkcs8", + "pkcs8 0.10.2", "rand_core 0.6.4", - "signature", - "spki", + "signature 2.2.0", + "spki 0.7.3", "subtle", "zeroize", ] @@ -3145,6 +3891,15 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" +[[package]] +name = "rustc_version" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cfcb3a22ef46e85b45de6ee7e79d063319ebb6594faafcf1c225ea92ab6e9b92" +dependencies = [ + "semver", +] + [[package]] name = "rustix" version = "0.38.44" @@ -3171,20 +3926,45 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "rustls" +version = "0.21.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f56a14d1f48b391359b22f731fd4bd7e43c97f3c50eee276f3aa09c94784d3e" +dependencies = [ + "log", + "ring", + "rustls-webpki 0.101.7", + "sct", +] + [[package]] name = "rustls" version = "0.23.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "730944ca083c1c233a75c09f199e973ca499344a2b7ba9e755c457e86fb4a321" dependencies = [ + "aws-lc-rs", "once_cell", "ring", "rustls-pki-types", - "rustls-webpki", + "rustls-webpki 0.103.3", "subtle", "zeroize", ] +[[package]] +name = "rustls-native-certs" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9aace74cb666635c918e9c12bc0d348266037aa8eb599b5cba565709a8dff00" +dependencies = [ + "openssl-probe", + "rustls-pemfile 1.0.4", + "schannel", + "security-framework 2.11.1", +] + [[package]] name = "rustls-native-certs" version = "0.8.1" @@ -3197,6 +3977,15 @@ dependencies = [ "security-framework 3.2.0", ] +[[package]] +name = "rustls-pemfile" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1c74cae0a4cf6ccbbf5f359f08efdf8ee7e1dc532573bf0db71968cb56b1448c" +dependencies = [ + "base64 0.21.7", +] + [[package]] name = "rustls-pemfile" version = "2.2.0" @@ -3215,12 +4004,23 @@ dependencies = [ "zeroize", ] +[[package]] +name = "rustls-webpki" +version = "0.101.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b6275d1ee7a1cd780b64aca7726599a1dbc893b1e64144529e55c3c2f745765" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "rustls-webpki" version = "0.103.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e4a72fe2bcf7a6ac6fd7d0b9e5cb68aeb7d4c0a0271730218b3e92d43b4eb435" dependencies = [ + "aws-lc-rs", "ring", "rustls-pki-types", "untrusted", @@ -3283,6 +4083,30 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" +[[package]] +name = "sct" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da046153aa2352493d6cb7da4b6e5c0c057d8a1d0a9aa8560baffdd945acd414" +dependencies = [ + "ring", + "untrusted", +] + +[[package]] +name = "sec1" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3be24c1842290c45df0a7bf069e0c268a747ad05a192f2fd7dcfdbc1cba40928" +dependencies = [ + "base16ct", + "der 0.6.1", + "generic-array", + "pkcs8 0.9.0", + "subtle", + "zeroize", +] + [[package]] name = "security-framework" version = "2.11.1" @@ -3319,6 +4143,12 @@ dependencies = [ "libc", ] +[[package]] +name = "semver" +version = "1.0.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56e6fa9c48d24d85fb3de5ad847117517440f6beceb7798af16b4a87d616b8d0" + [[package]] name = "serde" version = "1.0.219" @@ -3470,6 +4300,16 @@ dependencies = [ "libc", ] +[[package]] +name = "signature" +version = "1.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "74233d3b3b2f6d4b006dc19dee745e73e2a6bfb6f93607cd3b02bd5b00797d7c" +dependencies = [ + "digest", + "rand_core 0.6.4", +] + [[package]] name = "signature" version = "2.2.0" @@ -3557,6 +4397,16 @@ dependencies = [ "lock_api", ] +[[package]] +name = "spki" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67cf02bbac7a337dc36e4f5a693db6c21e7863f45070f7064577eb4367a3212b" +dependencies = [ + "base64ct", + "der 0.6.1", +] + [[package]] name = "spki" version = "0.7.3" @@ -3564,7 +4414,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d91ed6c858b01f942cd56b37a94b3e0a1798290327d1236e4d9cf4eaca44d29d" dependencies = [ "base64ct", - "der", + "der 0.7.10", ] [[package]] @@ -3604,7 +4454,7 @@ dependencies = [ "memchr", "once_cell", "percent-encoding", - "rustls", + "rustls 0.23.27", "serde", "serde_json", "sha2", @@ -3954,7 +4804,7 @@ version = "0.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f8dd2bedded009c8252301743af66d77e3912357bacc0e44a51dba54a8679ea5" dependencies = [ - "bindgen", + "bindgen 0.64.0", "leptonica-sys", "pkg-config", "vcpkg", @@ -4154,13 +5004,23 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-rustls" +version = "0.24.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081" +dependencies = [ + "rustls 0.21.12", + "tokio", +] + [[package]] name = "tokio-rustls" version = "0.26.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8e727b36a1a0e8b74c376ac2211e40c2c8af09fb4013c60d910495810f008e9b" dependencies = [ - "rustls", + "rustls 0.23.27", "tokio", ] @@ -4263,8 +5123,8 @@ dependencies = [ "bytes", "futures-core", "futures-util", - "http", - "http-body", + "http 1.3.1", + "http-body 1.0.1", "http-body-util", "http-range-header", "httpdate", @@ -4537,6 +5397,12 @@ version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" +[[package]] +name = "vsimd" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c3082ca00d5a5ef149bb8b555a72ae84c9c59f7250f013ac822ac2e49b19c64" + [[package]] name = "walkdir" version = "2.5.0" @@ -5113,6 +5979,12 @@ dependencies = [ "rustix 1.0.7", ] +[[package]] +name = "xmlparser" +version = "0.13.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "66fee0b777b0f5ac1c69bb06d361268faafa61cd4682ae064a171c16c433e9e4" + [[package]] name = "yoke" version = "0.8.0" diff --git a/Cargo.toml b/Cargo.toml index 2e4338e..8bb064b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -48,12 +48,17 @@ hostname = "0.4" walkdir = "2" clap = { version = "4", features = ["derive"] } utoipa = { version = "5", features = ["axum_extras", "chrono", "uuid"] } +aws-config = { version = "1.0", optional = true } +aws-sdk-s3 = { version = "1.0", optional = true } +aws-credential-types = { version = "1.0", optional = true } +aws-types = { version = "1.0", optional = true } sha2 = "0.10" utoipa-swagger-ui = { version = "9", features = ["axum"] } [features] -default = ["ocr"] +default = ["ocr", "s3"] ocr = ["tesseract", "pdf-extract", "image", "imageproc", "raw-cpuid"] +s3 = ["aws-config", "aws-sdk-s3", "aws-credential-types", "aws-types"] [dev-dependencies] tempfile = "3" diff --git a/frontend/src/pages/SourcesPage.tsx b/frontend/src/pages/SourcesPage.tsx index c4795cc..23570c8 100644 --- a/frontend/src/pages/SourcesPage.tsx +++ b/frontend/src/pages/SourcesPage.tsx @@ -108,14 +108,26 @@ const SourcesPage: React.FC = () => { name: '', source_type: 'webdav' as 'webdav' | 'local_folder' | 's3', enabled: true, + // WebDAV fields server_url: '', username: '', password: '', + server_type: 'generic' as 'nextcloud' | 'owncloud' | 'generic', + // Local Folder fields + recursive: true, + follow_symlinks: false, + // S3 fields + bucket_name: '', + region: 'us-east-1', + access_key_id: '', + secret_access_key: '', + endpoint_url: '', + prefix: '', + // Common fields watch_folders: ['/Documents'], file_extensions: ['pdf', 'png', 'jpg', 'jpeg', 'tiff', 'bmp', 'txt'], auto_sync: false, sync_interval_minutes: 60, - server_type: 'generic' as 'nextcloud' | 'owncloud' | 'generic', }); // Additional state for enhanced features @@ -153,14 +165,26 @@ const SourcesPage: React.FC = () => { name: '', source_type: 'webdav', enabled: true, + // WebDAV fields server_url: '', username: '', password: '', + server_type: 'generic', + // Local Folder fields + recursive: true, + follow_symlinks: false, + // S3 fields + bucket_name: '', + region: 'us-east-1', + access_key_id: '', + secret_access_key: '', + endpoint_url: '', + prefix: '', + // Common fields watch_folders: ['/Documents'], file_extensions: ['pdf', 'png', 'jpg', 'jpeg', 'tiff', 'bmp', 'txt'], auto_sync: false, sync_interval_minutes: 60, - server_type: 'generic', }); setCrawlEstimate(null); setNewFolder(''); @@ -175,14 +199,26 @@ const SourcesPage: React.FC = () => { name: source.name, source_type: source.source_type, enabled: source.enabled, + // WebDAV fields server_url: config.server_url || '', username: config.username || '', password: config.password || '', + server_type: config.server_type || 'generic', + // Local Folder fields + recursive: config.recursive !== undefined ? config.recursive : true, + follow_symlinks: config.follow_symlinks || false, + // S3 fields + bucket_name: config.bucket_name || '', + region: config.region || 'us-east-1', + access_key_id: config.access_key_id || '', + secret_access_key: config.secret_access_key || '', + endpoint_url: config.endpoint_url || '', + prefix: config.prefix || '', + // Common fields watch_folders: config.watch_folders || ['/Documents'], file_extensions: config.file_extensions || ['pdf', 'png', 'jpg', 'jpeg', 'tiff', 'bmp', 'txt'], auto_sync: config.auto_sync || false, sync_interval_minutes: config.sync_interval_minutes || 60, - server_type: config.server_type || 'generic', }); setCrawlEstimate(null); setNewFolder(''); @@ -841,24 +877,24 @@ const SourcesPage: React.FC = () => { - + - + Local Folder - Coming Soon + Monitor local filesystem directories - + S3 Compatible - Coming Soon + AWS S3, MinIO, and other S3-compatible storage diff --git a/src/db.rs b/src/db.rs index e96deff..8fd2456 100644 --- a/src/db.rs +++ b/src/db.rs @@ -2104,4 +2104,115 @@ impl Database { Ok(documents) } + + // Source management operations + pub async fn get_all_sources(&self) -> Result> { + let rows = sqlx::query( + r#"SELECT id, user_id, name, source_type, enabled, config, status, + last_sync_at, last_error, last_error_at, total_files_synced, + total_files_pending, total_size_bytes, created_at, updated_at + FROM sources ORDER BY created_at DESC"# + ) + .fetch_all(&self.pool) + .await?; + + let mut sources = Vec::new(); + for row in rows { + sources.push(crate::models::Source { + id: row.get("id"), + user_id: row.get("user_id"), + name: row.get("name"), + source_type: row.get::("source_type").try_into() + .map_err(|e| anyhow::anyhow!("Invalid source type: {}", e))?, + enabled: row.get("enabled"), + config: row.get("config"), + status: row.get::("status").try_into() + .map_err(|e| anyhow::anyhow!("Invalid source status: {}", e))?, + last_sync_at: row.get("last_sync_at"), + last_error: row.get("last_error"), + last_error_at: row.get("last_error_at"), + total_files_synced: row.get("total_files_synced"), + total_files_pending: row.get("total_files_pending"), + total_size_bytes: row.get("total_size_bytes"), + created_at: row.get("created_at"), + updated_at: row.get("updated_at"), + }); + } + + Ok(sources) + } + + pub async fn get_sources_for_sync(&self) -> Result> { + let rows = sqlx::query( + r#"SELECT id, user_id, name, source_type, enabled, config, status, + last_sync_at, last_error, last_error_at, total_files_synced, + total_files_pending, total_size_bytes, created_at, updated_at + FROM sources + WHERE enabled = true AND status != 'syncing' + ORDER BY last_sync_at ASC NULLS FIRST"# + ) + .fetch_all(&self.pool) + .await?; + + let mut sources = Vec::new(); + for row in rows { + sources.push(crate::models::Source { + id: row.get("id"), + user_id: row.get("user_id"), + name: row.get("name"), + source_type: row.get::("source_type").try_into() + .map_err(|e| anyhow::anyhow!("Invalid source type: {}", e))?, + enabled: row.get("enabled"), + config: row.get("config"), + status: row.get::("status").try_into() + .map_err(|e| anyhow::anyhow!("Invalid source status: {}", e))?, + last_sync_at: row.get("last_sync_at"), + last_error: row.get("last_error"), + last_error_at: row.get("last_error_at"), + total_files_synced: row.get("total_files_synced"), + total_files_pending: row.get("total_files_pending"), + total_size_bytes: row.get("total_size_bytes"), + created_at: row.get("created_at"), + updated_at: row.get("updated_at"), + }); + } + + Ok(sources) + } + + pub async fn get_source_by_id(&self, source_id: Uuid) -> Result> { + let row = sqlx::query( + r#"SELECT id, user_id, name, source_type, enabled, config, status, + last_sync_at, last_error, last_error_at, total_files_synced, + total_files_pending, total_size_bytes, created_at, updated_at + FROM sources WHERE id = $1"# + ) + .bind(source_id) + .fetch_optional(&self.pool) + .await?; + + if let Some(row) = row { + Ok(Some(crate::models::Source { + id: row.get("id"), + user_id: row.get("user_id"), + name: row.get("name"), + source_type: row.get::("source_type").try_into() + .map_err(|e| anyhow::anyhow!("Invalid source type: {}", e))?, + enabled: row.get("enabled"), + config: row.get("config"), + status: row.get::("status").try_into() + .map_err(|e| anyhow::anyhow!("Invalid source status: {}", e))?, + last_sync_at: row.get("last_sync_at"), + last_error: row.get("last_error"), + last_error_at: row.get("last_error_at"), + total_files_synced: row.get("total_files_synced"), + total_files_pending: row.get("total_files_pending"), + total_size_bytes: row.get("total_size_bytes"), + created_at: row.get("created_at"), + updated_at: row.get("updated_at"), + })) + } else { + Ok(None) + } + } } \ No newline at end of file diff --git a/src/lib.rs b/src/lib.rs index 5a7bff7..60c0137 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -4,6 +4,7 @@ pub mod config; pub mod db; pub mod enhanced_ocr; pub mod file_service; +pub mod local_folder_service; pub mod models; pub mod ocr; pub mod ocr_api; @@ -13,7 +14,10 @@ pub mod ocr_health; pub mod ocr_queue; pub mod ocr_tests; pub mod routes; +pub mod s3_service; pub mod seed; +pub mod source_scheduler; +pub mod source_sync; pub mod swagger; pub mod watcher; pub mod webdav_service; diff --git a/src/local_folder_service.rs b/src/local_folder_service.rs new file mode 100644 index 0000000..c5af902 --- /dev/null +++ b/src/local_folder_service.rs @@ -0,0 +1,282 @@ +use std::path::Path; +use std::fs; +use anyhow::{anyhow, Result}; +use chrono::{DateTime, Utc}; +use tracing::{debug, info, warn}; +use walkdir::WalkDir; +use sha2::{Sha256, Digest}; + +use crate::models::{FileInfo, LocalFolderSourceConfig}; + +#[derive(Debug, Clone)] +pub struct LocalFolderService { + config: LocalFolderSourceConfig, +} + +impl LocalFolderService { + pub fn new(config: LocalFolderSourceConfig) -> Result { + // Validate that watch folders exist and are accessible + for folder in &config.watch_folders { + let path = Path::new(folder); + if !path.exists() { + return Err(anyhow!("Watch folder does not exist: {}", folder)); + } + if !path.is_dir() { + return Err(anyhow!("Watch folder is not a directory: {}", folder)); + } + } + + Ok(Self { config }) + } + + /// Discover files in a specific folder + pub async fn discover_files_in_folder(&self, folder_path: &str) -> Result> { + let path = Path::new(folder_path); + if !path.exists() { + return Err(anyhow!("Folder does not exist: {}", folder_path)); + } + + let mut files: Vec = Vec::new(); + + info!("Scanning local folder: {} (recursive: {})", folder_path, self.config.recursive); + + // Use tokio::task::spawn_blocking for file system operations + let folder_path_clone = folder_path.to_string(); + let config = self.config.clone(); + + let discovered_files = tokio::task::spawn_blocking(move || -> Result> { + let mut files: Vec = Vec::new(); + + let walker = if config.recursive { + WalkDir::new(&folder_path_clone) + .follow_links(config.follow_symlinks) + .into_iter() + } else { + WalkDir::new(&folder_path_clone) + .max_depth(1) + .follow_links(config.follow_symlinks) + .into_iter() + }; + + for entry_result in walker { + match entry_result { + Ok(entry) => { + let path = entry.path(); + + // Skip directories and the root folder itself + if path.is_dir() { + continue; + } + + // Check file extension + let extension = path.extension() + .and_then(|ext| ext.to_str()) + .unwrap_or("") + .to_lowercase(); + + if !config.file_extensions.contains(&extension) { + debug!("Skipping file with unsupported extension: {}", path.display()); + continue; + } + + // Get file metadata + match fs::metadata(path) { + Ok(metadata) => { + let modified_time = metadata.modified() + .ok() + .and_then(|time| { + let duration = time.duration_since(std::time::UNIX_EPOCH).ok()?; + DateTime::from_timestamp(duration.as_secs() as i64, 0) + }); + + let file_name = path.file_name() + .and_then(|name| name.to_str()) + .unwrap_or("unknown") + .to_string(); + + // Generate a simple hash-based ETag from file path and modification time + let etag = Self::generate_etag(path, &metadata); + + // Determine MIME type based on extension + let mime_type = Self::get_mime_type(&extension); + + let file_info = FileInfo { + path: path.to_string_lossy().to_string(), + name: file_name, + size: metadata.len() as i64, + mime_type, + last_modified: modified_time, + etag, + is_directory: false, + }; + + files.push(file_info); + } + Err(e) => { + warn!("Failed to get metadata for {}: {}", path.display(), e); + } + } + } + Err(e) => { + warn!("Error walking directory: {}", e); + } + } + } + + Ok(files) + }).await??; + + info!("Found {} files in local folder {}", discovered_files.len(), folder_path); + Ok(discovered_files) + } + + /// Read file content for processing + pub async fn read_file(&self, file_path: &str) -> Result> { + let file_path = file_path.to_string(); + + tokio::task::spawn_blocking(move || -> Result> { + let content = fs::read(&file_path) + .map_err(|e| anyhow!("Failed to read file {}: {}", file_path, e))?; + Ok(content) + }).await? + } + + /// Test if the service can access the configured folders + pub async fn test_connection(&self) -> Result { + let mut accessible_folders = 0; + let mut total_files = 0; + + for folder in &self.config.watch_folders { + match self.discover_files_in_folder(folder).await { + Ok(files) => { + accessible_folders += 1; + total_files += files.len(); + info!("Local folder {} is accessible with {} files", folder, files.len()); + } + Err(e) => { + return Err(anyhow!("Cannot access folder {}: {}", folder, e)); + } + } + } + + Ok(format!( + "Successfully accessed {} folders with {} total files", + accessible_folders, total_files + )) + } + + /// Generate ETag for file based on path and modification time + fn generate_etag(path: &Path, metadata: &fs::Metadata) -> String { + let mut hasher = Sha256::new(); + hasher.update(path.to_string_lossy().as_bytes()); + + if let Ok(modified) = metadata.modified() { + if let Ok(duration) = modified.duration_since(std::time::UNIX_EPOCH) { + hasher.update(duration.as_secs().to_be_bytes()); + } + } + + hasher.update(metadata.len().to_be_bytes()); + let result = hasher.finalize(); + format!("{:x}", result)[..16].to_string() // Use first 16 chars as ETag + } + + /// Get MIME type based on file extension + fn get_mime_type(extension: &str) -> String { + match extension { + "pdf" => "application/pdf", + "txt" => "text/plain", + "png" => "image/png", + "jpg" | "jpeg" => "image/jpeg", + "tiff" | "tif" => "image/tiff", + "bmp" => "image/bmp", + "gif" => "image/gif", + "webp" => "image/webp", + "doc" => "application/msword", + "docx" => "application/vnd.openxmlformats-officedocument.wordprocessingml.document", + "xls" => "application/vnd.ms-excel", + "xlsx" => "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", + "ppt" => "application/vnd.ms-powerpoint", + "pptx" => "application/vnd.openxmlformats-officedocument.presentationml.presentation", + _ => "application/octet-stream", + }.to_string() + } + + pub fn get_config(&self) -> &LocalFolderSourceConfig { + &self.config + } +} + +#[cfg(test)] +mod tests { + use super::*; + use tempfile::TempDir; + use std::fs::File; + use std::io::Write; + + #[tokio::test] + async fn test_local_folder_discovery() { + // Create a temporary directory with test files + let temp_dir = TempDir::new().unwrap(); + let temp_path = temp_dir.path().to_str().unwrap(); + + // Create test files + let mut pdf_file = File::create(temp_dir.path().join("test.pdf")).unwrap(); + pdf_file.write_all(b"fake pdf content").unwrap(); + + let mut txt_file = File::create(temp_dir.path().join("test.txt")).unwrap(); + txt_file.write_all(b"test content").unwrap(); + + // Create unsupported file + let mut bin_file = File::create(temp_dir.path().join("test.bin")).unwrap(); + bin_file.write_all(b"binary content").unwrap(); + + // Create config + let config = LocalFolderSourceConfig { + watch_folders: vec![temp_path.to_string()], + file_extensions: vec!["pdf".to_string(), "txt".to_string()], + auto_sync: true, + sync_interval_minutes: 60, + recursive: false, + follow_symlinks: false, + }; + + let service = LocalFolderService::new(config).unwrap(); + let files = service.discover_files_in_folder(temp_path).await.unwrap(); + + // Should find 2 files (pdf and txt), but not bin + assert_eq!(files.len(), 2); + + let pdf_file = files.iter().find(|f| f.name == "test.pdf").unwrap(); + assert_eq!(pdf_file.mime_type, "application/pdf"); + assert_eq!(pdf_file.size, 16); + + let txt_file = files.iter().find(|f| f.name == "test.txt").unwrap(); + assert_eq!(txt_file.mime_type, "text/plain"); + assert_eq!(txt_file.size, 12); + } + + #[tokio::test] + async fn test_file_reading() { + let temp_dir = TempDir::new().unwrap(); + let file_path = temp_dir.path().join("test.txt"); + let test_content = b"Hello, World!"; + + let mut file = File::create(&file_path).unwrap(); + file.write_all(test_content).unwrap(); + + let config = LocalFolderSourceConfig { + watch_folders: vec![temp_dir.path().to_str().unwrap().to_string()], + file_extensions: vec!["txt".to_string()], + auto_sync: false, + sync_interval_minutes: 60, + recursive: false, + follow_symlinks: false, + }; + + let service = LocalFolderService::new(config).unwrap(); + let content = service.read_file(file_path.to_str().unwrap()).await.unwrap(); + + assert_eq!(content, test_content); + } +} \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index 28300e0..9a062b8 100644 --- a/src/main.rs +++ b/src/main.rs @@ -209,10 +209,13 @@ async fn main() -> Result<(), Box> { } }); - // Create WebDAV scheduler with background state + // Create universal source scheduler with background state (handles WebDAV, Local, S3) + let source_scheduler = Arc::new(readur::source_scheduler::SourceScheduler::new(background_state.clone())); + + // Keep WebDAV scheduler for backward compatibility with existing WebDAV endpoints let webdav_scheduler = Arc::new(readur::webdav_scheduler::WebDAVScheduler::new(background_state.clone())); - // Update the web state to include the scheduler reference + // Update the web state to include scheduler references let updated_web_state = AppState { db: web_state.db.clone(), config: web_state.config.clone(), @@ -220,13 +223,13 @@ async fn main() -> Result<(), Box> { }; let web_state = Arc::new(updated_web_state); - // Start WebDAV background sync scheduler on background runtime - let scheduler_for_background = webdav_scheduler.clone(); + // Start universal source scheduler on background runtime + let scheduler_for_background = source_scheduler.clone(); background_runtime.spawn(async move { - info!("Starting WebDAV background sync scheduler with 30-second startup delay"); + info!("Starting universal source sync scheduler with 30-second startup delay"); // Wait 30 seconds before starting scheduler to allow server to fully initialize tokio::time::sleep(tokio::time::Duration::from_secs(30)).await; - info!("WebDAV background sync scheduler starting after startup delay"); + info!("Universal source sync scheduler starting after startup delay"); scheduler_for_background.start().await; }); diff --git a/src/models.rs b/src/models.rs index 7485482..0156234 100644 --- a/src/models.rs +++ b/src/models.rs @@ -780,4 +780,28 @@ pub struct WebDAVSourceConfig { pub auto_sync: bool, pub sync_interval_minutes: i32, pub server_type: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] +pub struct LocalFolderSourceConfig { + pub watch_folders: Vec, + pub file_extensions: Vec, + pub auto_sync: bool, + pub sync_interval_minutes: i32, + pub recursive: bool, + pub follow_symlinks: bool, +} + +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] +pub struct S3SourceConfig { + pub bucket_name: String, + pub region: String, + pub access_key_id: String, + pub secret_access_key: String, + pub endpoint_url: Option, // For S3-compatible services + pub prefix: Option, // Optional path prefix + pub watch_folders: Vec, // S3 prefixes to monitor + pub file_extensions: Vec, + pub auto_sync: bool, + pub sync_interval_minutes: i32, } \ No newline at end of file diff --git a/src/s3_service.rs b/src/s3_service.rs new file mode 100644 index 0000000..54f62a2 --- /dev/null +++ b/src/s3_service.rs @@ -0,0 +1,330 @@ +use anyhow::{anyhow, Result}; +use chrono::{DateTime, Utc}; +use tracing::{debug, error, info, warn}; + +#[cfg(feature = "s3")] +use aws_sdk_s3::Client; +#[cfg(feature = "s3")] +use aws_config::{BehaviorVersion, load_from_env}; +#[cfg(feature = "s3")] +use aws_credential_types::Credentials; +#[cfg(feature = "s3")] +use aws_types::region::Region as AwsRegion; + +use crate::models::{FileInfo, S3SourceConfig}; + +#[derive(Debug, Clone)] +pub struct S3Service { + #[cfg(feature = "s3")] + client: Client, + config: S3SourceConfig, +} + +impl S3Service { + pub async fn new(config: S3SourceConfig) -> Result { + #[cfg(not(feature = "s3"))] + { + return Err(anyhow!("S3 support not compiled in. Enable the 's3' feature to use S3 sources.")); + } + + #[cfg(feature = "s3")] + { + // Validate required fields + if config.bucket_name.is_empty() { + return Err(anyhow!("Bucket name is required")); + } + if config.access_key_id.is_empty() { + return Err(anyhow!("Access key ID is required")); + } + if config.secret_access_key.is_empty() { + return Err(anyhow!("Secret access key is required")); + } + + // Create S3 client with custom configuration + let credentials = Credentials::new( + &config.access_key_id, + &config.secret_access_key, + None, // session token + None, // expiry + "readur-s3-source" + ); + + let region = if config.region.is_empty() { + "us-east-1".to_string() + } else { + config.region.clone() + }; + + let mut s3_config_builder = aws_sdk_s3::config::Builder::new() + .region(AwsRegion::new(region)) + .credentials_provider(credentials); + + // Set custom endpoint if provided (for S3-compatible services) + if let Some(endpoint_url) = &config.endpoint_url { + if !endpoint_url.is_empty() { + s3_config_builder = s3_config_builder.endpoint_url(endpoint_url); + info!("Using custom S3 endpoint: {}", endpoint_url); + } + } + + let s3_config = s3_config_builder.build(); + let client = Client::from_conf(s3_config); + + Ok(Self { + #[cfg(feature = "s3")] + client, + config + }) + } + } + + /// Discover files in a specific S3 prefix (folder) + pub async fn discover_files_in_folder(&self, folder_path: &str) -> Result> { + #[cfg(not(feature = "s3"))] + { + return Err(anyhow!("S3 support not compiled in")); + } + + #[cfg(feature = "s3")] + { + info!("Scanning S3 bucket: {} prefix: {}", self.config.bucket_name, folder_path); + + let mut files = Vec::new(); + let mut continuation_token: Option = None; + + loop { + let mut list_request = self.client + .list_objects_v2() + .bucket(&self.config.bucket_name) + .prefix(folder_path); + + if let Some(token) = &continuation_token { + list_request = list_request.continuation_token(token); + } + + match list_request.send().await { + Ok(response) => { + if let Some(contents) = response.contents { + for object in contents { + if let Some(key) = object.key { + // Skip "directories" (keys ending with /) + if key.ends_with('/') { + continue; + } + + // Check file extension + let extension = std::path::Path::new(&key) + .extension() + .and_then(|ext| ext.to_str()) + .unwrap_or("") + .to_lowercase(); + + if !self.config.file_extensions.contains(&extension) { + debug!("Skipping S3 object with unsupported extension: {}", key); + continue; + } + + let file_name = std::path::Path::new(&key) + .file_name() + .and_then(|name| name.to_str()) + .unwrap_or(&key) + .to_string(); + + let size = object.size.unwrap_or(0); + let last_modified = object.last_modified + .and_then(|dt| { + // Convert AWS DateTime to chrono DateTime + let timestamp = dt.secs(); + DateTime::from_timestamp(timestamp, 0) + }); + + let etag = object.e_tag.unwrap_or_else(|| { + // Generate a fallback ETag if none provided + format!("fallback-{}", &key.chars().take(16).collect::()) + }); + + // Remove quotes from ETag if present + let etag = etag.trim_matches('"').to_string(); + + let mime_type = Self::get_mime_type(&extension); + + let file_info = FileInfo { + path: key.clone(), + name: file_name, + size, + mime_type, + last_modified, + etag, + is_directory: false, + }; + + files.push(file_info); + } + } + } + + // Check if there are more results + if response.is_truncated == Some(true) { + continuation_token = response.next_continuation_token; + } else { + break; + } + } + Err(e) => { + return Err(anyhow!("Failed to list S3 objects: {}", e)); + } + } + } + + info!("Found {} files in S3 bucket {} prefix {}", files.len(), self.config.bucket_name, folder_path); + Ok(files) + } + } + + /// Download file content from S3 + pub async fn download_file(&self, object_key: &str) -> Result> { + #[cfg(not(feature = "s3"))] + { + return Err(anyhow!("S3 support not compiled in")); + } + + #[cfg(feature = "s3")] + { + info!("Downloading S3 object: {}/{}", self.config.bucket_name, object_key); + + let response = self.client + .get_object() + .bucket(&self.config.bucket_name) + .key(object_key) + .send() + .await + .map_err(|e| anyhow!("Failed to download S3 object {}: {}", object_key, e))?; + + let body = response.body.collect().await + .map_err(|e| anyhow!("Failed to read S3 object body: {}", e))?; + + let bytes = body.into_bytes().to_vec(); + info!("Downloaded S3 object {} ({} bytes)", object_key, bytes.len()); + + Ok(bytes) + } + } + + /// Test S3 connection and access to bucket + pub async fn test_connection(&self) -> Result { + #[cfg(not(feature = "s3"))] + { + return Err(anyhow!("S3 support not compiled in")); + } + + #[cfg(feature = "s3")] + { + info!("Testing S3 connection to bucket: {}", self.config.bucket_name); + + // Test bucket access by listing objects with a limit + let response = self.client + .list_objects_v2() + .bucket(&self.config.bucket_name) + .max_keys(1) + .send() + .await + .map_err(|e| anyhow!("Failed to access S3 bucket {}: {}", self.config.bucket_name, e))?; + + // Test if we can get bucket region (additional validation) + let _head_bucket_response = self.client + .head_bucket() + .bucket(&self.config.bucket_name) + .send() + .await + .map_err(|e| anyhow!("Cannot access bucket {}: {}", self.config.bucket_name, e))?; + + let object_count = response.key_count.unwrap_or(0); + + Ok(format!( + "Successfully connected to S3 bucket '{}' (found {} objects)", + self.config.bucket_name, object_count + )) + } + } + + /// Get estimated file count and size for all watch folders + pub async fn estimate_sync(&self) -> Result<(usize, i64)> { + let mut total_files = 0; + let mut total_size = 0i64; + + for folder in &self.config.watch_folders { + match self.discover_files_in_folder(folder).await { + Ok(files) => { + total_files += files.len(); + total_size += files.iter().map(|f| f.size).sum::(); + } + Err(e) => { + warn!("Failed to estimate folder {}: {}", folder, e); + } + } + } + + Ok((total_files, total_size)) + } + + /// Get MIME type based on file extension + fn get_mime_type(extension: &str) -> String { + match extension { + "pdf" => "application/pdf", + "txt" => "text/plain", + "png" => "image/png", + "jpg" | "jpeg" => "image/jpeg", + "tiff" | "tif" => "image/tiff", + "bmp" => "image/bmp", + "gif" => "image/gif", + "webp" => "image/webp", + "doc" => "application/msword", + "docx" => "application/vnd.openxmlformats-officedocument.wordprocessingml.document", + "xls" => "application/vnd.ms-excel", + "xlsx" => "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", + "ppt" => "application/vnd.ms-powerpoint", + "pptx" => "application/vnd.openxmlformats-officedocument.presentationml.presentation", + _ => "application/octet-stream", + }.to_string() + } + + pub fn get_config(&self) -> &S3SourceConfig { + &self.config + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn test_s3_config_creation() { + let config = S3SourceConfig { + bucket_name: "test-bucket".to_string(), + region: "us-east-1".to_string(), + access_key_id: "test-key".to_string(), + secret_access_key: "test-secret".to_string(), + endpoint_url: None, + prefix: None, + watch_folders: vec!["documents/".to_string()], + file_extensions: vec!["pdf".to_string(), "txt".to_string()], + auto_sync: true, + sync_interval_minutes: 60, + }; + + // This will create the client but won't test actual S3 access + let service = S3Service::new(config).await; + #[cfg(feature = "s3")] + assert!(service.is_ok()); + #[cfg(not(feature = "s3"))] + assert!(service.is_err()); + } + + #[test] + fn test_mime_type_detection() { + assert_eq!(S3Service::get_mime_type("pdf"), "application/pdf"); + assert_eq!(S3Service::get_mime_type("jpg"), "image/jpeg"); + assert_eq!(S3Service::get_mime_type("txt"), "text/plain"); + assert_eq!(S3Service::get_mime_type("unknown"), "application/octet-stream"); + } +} \ No newline at end of file diff --git a/src/s3_service_stub.rs b/src/s3_service_stub.rs new file mode 100644 index 0000000..e9a682e --- /dev/null +++ b/src/s3_service_stub.rs @@ -0,0 +1,37 @@ +// Stub implementation when S3 feature is not enabled +use anyhow::{anyhow, Result}; +use tracing::warn; + +use crate::models::{FileInfo, S3SourceConfig}; + +#[derive(Debug, Clone)] +pub struct S3Service { + config: S3SourceConfig, +} + +impl S3Service { + pub async fn new(_config: S3SourceConfig) -> Result { + Err(anyhow!("S3 support not compiled in. Enable the 's3' feature to use S3 sources.")) + } + + pub async fn discover_files_in_folder(&self, _folder_path: &str) -> Result> { + warn!("S3 support not compiled in"); + Ok(Vec::new()) + } + + pub async fn download_file(&self, _object_key: &str) -> Result> { + Err(anyhow!("S3 support not compiled in")) + } + + pub async fn test_connection(&self) -> Result { + Err(anyhow!("S3 support not compiled in")) + } + + pub async fn estimate_sync(&self) -> Result<(usize, i64)> { + Ok((0, 0)) + } + + pub fn get_config(&self) -> &S3SourceConfig { + &self.config + } +} \ No newline at end of file diff --git a/src/source_scheduler.rs b/src/source_scheduler.rs new file mode 100644 index 0000000..4ece078 --- /dev/null +++ b/src/source_scheduler.rs @@ -0,0 +1,319 @@ +use std::sync::Arc; +use std::time::Duration; +use tokio::time::interval; +use tracing::{error, info, warn}; +use chrono::Utc; + +use crate::{ + AppState, + models::{SourceType, LocalFolderSourceConfig, S3SourceConfig, WebDAVSourceConfig}, + source_sync::SourceSyncService, +}; + +pub struct SourceScheduler { + state: Arc, + sync_service: SourceSyncService, + check_interval: Duration, +} + +impl SourceScheduler { + pub fn new(state: Arc) -> Self { + let sync_service = SourceSyncService::new(state.clone()); + + Self { + state, + sync_service, + check_interval: Duration::from_secs(60), // Check every minute for due syncs + } + } + + pub async fn start(&self) { + info!("Starting universal source sync scheduler"); + + // First, check for any interrupted syncs that need to be resumed + if let Err(e) = self.resume_interrupted_syncs().await { + error!("Error resuming interrupted syncs: {}", e); + } + + let mut interval_timer = interval(self.check_interval); + + loop { + interval_timer.tick().await; + + if let Err(e) = self.check_and_sync_sources().await { + error!("Error in source sync scheduler: {}", e); + } + } + } + + async fn resume_interrupted_syncs(&self) -> Result<(), Box> { + info!("Checking for interrupted source syncs to resume"); + + // Get all enabled sources that might have been interrupted + let sources = self.state.db.get_sources_for_sync().await?; + + for source in sources { + // Check if this source was likely interrupted during sync + // This is a simplified check - you might want to add specific interrupted tracking + if source.status.to_string() == "syncing" { + info!("Found potentially interrupted sync for source {}, will resume", source.name); + + // Reset status and trigger new sync + if let Err(e) = sqlx::query( + r#"UPDATE sources SET status = 'idle', updated_at = NOW() WHERE id = $1"# + ) + .bind(source.id) + .execute(self.state.db.get_pool()) + .await { + error!("Failed to reset interrupted source status: {}", e); + continue; + } + + // Check if auto-sync is enabled for this source + let should_resume = match source.source_type { + SourceType::WebDAV => { + if let Ok(config) = serde_json::from_value::(source.config.clone()) { + config.auto_sync + } else { false } + } + SourceType::LocalFolder => { + if let Ok(config) = serde_json::from_value::(source.config.clone()) { + config.auto_sync + } else { false } + } + SourceType::S3 => { + if let Ok(config) = serde_json::from_value::(source.config.clone()) { + config.auto_sync + } else { false } + } + }; + + if should_resume { + info!("Resuming interrupted sync for source {}", source.name); + + let sync_service = self.sync_service.clone(); + let source_clone = source.clone(); + let state_clone = self.state.clone(); + + tokio::spawn(async move { + // Get user's OCR setting - simplified, you might want to store this in source config + let enable_background_ocr = true; // Default to true, could be made configurable per source + + match sync_service.sync_source(&source_clone, enable_background_ocr).await { + Ok(files_processed) => { + info!("Resumed sync completed for source {}: {} files processed", + source_clone.name, files_processed); + + // Create notification for successful resume + let notification = crate::models::CreateNotification { + notification_type: "success".to_string(), + title: "Source Sync Resumed".to_string(), + message: format!("Resumed sync for {} after server restart. Processed {} files", + source_clone.name, files_processed), + action_url: Some("/sources".to_string()), + metadata: Some(serde_json::json!({ + "source_type": source_clone.source_type.to_string(), + "source_id": source_clone.id, + "files_processed": files_processed + })), + }; + + if let Err(e) = state_clone.db.create_notification(source_clone.user_id, ¬ification).await { + error!("Failed to create resume notification: {}", e); + } + } + Err(e) => { + error!("Resumed sync failed for source {}: {}", source_clone.name, e); + } + } + }); + } + } + } + + Ok(()) + } + + async fn check_and_sync_sources(&self) -> Result<(), Box> { + // Get all sources that might need syncing + let sources = self.state.db.get_sources_for_sync().await?; + + for source in sources { + // Check if sync is due for this source + if self.is_sync_due(&source).await? { + info!("Starting background sync for source: {} ({})", source.name, source.source_type); + + let sync_service = self.sync_service.clone(); + let source_clone = source.clone(); + let state_clone = self.state.clone(); + + // Start sync in background task + tokio::spawn(async move { + // Get user's OCR setting - simplified, you might want to store this in source config + let enable_background_ocr = true; // Default to true, could be made configurable per source + + match sync_service.sync_source(&source_clone, enable_background_ocr).await { + Ok(files_processed) => { + info!("Background sync completed for source {}: {} files processed", + source_clone.name, files_processed); + + // Update last sync time + if let Err(e) = sqlx::query( + r#"UPDATE sources + SET last_sync_at = NOW(), + total_files_synced = total_files_synced + $2, + updated_at = NOW() + WHERE id = $1"# + ) + .bind(source_clone.id) + .bind(files_processed as i64) + .execute(state_clone.db.get_pool()) + .await { + error!("Failed to update source sync time: {}", e); + } + + // Send notification if files were processed + if files_processed > 0 { + let notification = crate::models::CreateNotification { + notification_type: "success".to_string(), + title: "Source Sync Completed".to_string(), + message: format!("Successfully processed {} files from {}", + files_processed, source_clone.name), + action_url: Some("/documents".to_string()), + metadata: Some(serde_json::json!({ + "source_type": source_clone.source_type.to_string(), + "source_id": source_clone.id, + "files_processed": files_processed + })), + }; + + if let Err(e) = state_clone.db.create_notification(source_clone.user_id, ¬ification).await { + error!("Failed to create success notification: {}", e); + } + } + } + Err(e) => { + error!("Background sync failed for source {}: {}", source_clone.name, e); + + // Send error notification + let notification = crate::models::CreateNotification { + notification_type: "error".to_string(), + title: "Source Sync Failed".to_string(), + message: format!("Sync failed for {}: {}", source_clone.name, e), + action_url: Some("/sources".to_string()), + metadata: Some(serde_json::json!({ + "source_type": source_clone.source_type.to_string(), + "source_id": source_clone.id, + "error": e.to_string() + })), + }; + + if let Err(e) = state_clone.db.create_notification(source_clone.user_id, ¬ification).await { + error!("Failed to create error notification: {}", e); + } + } + } + }); + } + } + + Ok(()) + } + + async fn is_sync_due(&self, source: &crate::models::Source) -> Result> { + // Get sync interval from source config + let sync_interval_minutes = match source.source_type { + SourceType::WebDAV => { + let config: WebDAVSourceConfig = serde_json::from_value(source.config.clone())?; + if !config.auto_sync { return Ok(false); } + config.sync_interval_minutes + } + SourceType::LocalFolder => { + let config: LocalFolderSourceConfig = serde_json::from_value(source.config.clone())?; + if !config.auto_sync { return Ok(false); } + config.sync_interval_minutes + } + SourceType::S3 => { + let config: S3SourceConfig = serde_json::from_value(source.config.clone())?; + if !config.auto_sync { return Ok(false); } + config.sync_interval_minutes + } + }; + + if sync_interval_minutes <= 0 { + warn!("Invalid sync interval for source {}: {} minutes", source.name, sync_interval_minutes); + return Ok(false); + } + + // Check if a sync is already running + if source.status.to_string() == "syncing" { + info!("Sync already running for source {}", source.name); + return Ok(false); + } + + // Check last sync time + if let Some(last_sync) = source.last_sync_at { + let elapsed = Utc::now() - last_sync; + let elapsed_minutes = elapsed.num_minutes(); + + if elapsed_minutes < sync_interval_minutes as i64 { + // Only log this occasionally to avoid spam + if elapsed_minutes % 10 == 0 { + info!("Sync not due for source {} (last sync {} minutes ago, interval {} minutes)", + source.name, elapsed_minutes, sync_interval_minutes); + } + return Ok(false); + } + + info!("Sync is due for source {} (last sync {} minutes ago, interval {} minutes)", + source.name, elapsed_minutes, sync_interval_minutes); + } else { + info!("No previous sync found for source {}, sync is due", source.name); + } + + // Sync is due + Ok(true) + } + + pub async fn trigger_sync(&self, source_id: uuid::Uuid) -> Result<(), Box> { + info!("Triggering manual sync for source {}", source_id); + + if let Some(source) = self.state.db.get_source_by_id(source_id).await? { + let sync_service = self.sync_service.clone(); + let state_clone = self.state.clone(); + + tokio::spawn(async move { + let enable_background_ocr = true; // Could be made configurable + + match sync_service.sync_source(&source, enable_background_ocr).await { + Ok(files_processed) => { + info!("Manual sync completed for source {}: {} files processed", + source.name, files_processed); + + // Update sync stats + if let Err(e) = sqlx::query( + r#"UPDATE sources + SET last_sync_at = NOW(), + total_files_synced = total_files_synced + $2, + updated_at = NOW() + WHERE id = $1"# + ) + .bind(source.id) + .bind(files_processed as i64) + .execute(state_clone.db.get_pool()) + .await { + error!("Failed to update source sync stats: {}", e); + } + } + Err(e) => { + error!("Manual sync failed for source {}: {}", source.name, e); + } + } + }); + + Ok(()) + } else { + Err("Source not found".into()) + } + } +} \ No newline at end of file diff --git a/src/source_sync.rs b/src/source_sync.rs new file mode 100644 index 0000000..66b0738 --- /dev/null +++ b/src/source_sync.rs @@ -0,0 +1,383 @@ +use std::sync::Arc; +use std::path::Path; +use anyhow::{anyhow, Result}; +use chrono::Utc; +use tokio::sync::Semaphore; +use futures::stream::{FuturesUnordered, StreamExt}; +use sha2::{Sha256, Digest}; +use tracing::{error, info, warn}; +use uuid::Uuid; + +use crate::{ + AppState, + models::{FileInfo, Source, SourceType, SourceStatus, LocalFolderSourceConfig, S3SourceConfig, WebDAVSourceConfig}, + file_service::FileService, + local_folder_service::LocalFolderService, + s3_service::S3Service, + webdav_service::{WebDAVService, WebDAVConfig}, +}; + +#[derive(Clone)] +pub struct SourceSyncService { + state: Arc, +} + +impl SourceSyncService { + pub fn new(state: Arc) -> Self { + Self { state } + } + + /// Perform sync for any source type + pub async fn sync_source(&self, source: &Source, enable_background_ocr: bool) -> Result { + info!("Starting sync for source {} ({})", source.name, source.source_type); + + // Update source status to syncing + if let Err(e) = self.update_source_status(source.id, SourceStatus::Syncing, None).await { + error!("Failed to update source status: {}", e); + } + + let sync_result = match source.source_type { + SourceType::WebDAV => self.sync_webdav_source(source, enable_background_ocr).await, + SourceType::LocalFolder => self.sync_local_folder_source(source, enable_background_ocr).await, + SourceType::S3 => self.sync_s3_source(source, enable_background_ocr).await, + }; + + match &sync_result { + Ok(files_processed) => { + info!("Sync completed for source {}: {} files processed", source.name, files_processed); + if let Err(e) = self.update_source_status(source.id, SourceStatus::Idle, None).await { + error!("Failed to update source status after successful sync: {}", e); + } + } + Err(e) => { + error!("Sync failed for source {}: {}", source.name, e); + let error_msg = format!("Sync failed: {}", e); + if let Err(e) = self.update_source_status(source.id, SourceStatus::Error, Some(&error_msg)).await { + error!("Failed to update source status after error: {}", e); + } + } + } + + sync_result + } + + async fn sync_webdav_source(&self, source: &Source, enable_background_ocr: bool) -> Result { + let config: WebDAVSourceConfig = serde_json::from_value(source.config.clone()) + .map_err(|e| anyhow!("Invalid WebDAV config: {}", e))?; + + let webdav_config = WebDAVConfig { + server_url: config.server_url, + username: config.username, + password: config.password, + watch_folders: config.watch_folders, + file_extensions: config.file_extensions, + timeout_seconds: 30, + server_type: config.server_type, + }; + + let webdav_service = WebDAVService::new(webdav_config.clone()) + .map_err(|e| anyhow!("Failed to create WebDAV service: {}", e))?; + + self.perform_sync_internal( + source.user_id, + source.id, + &webdav_config.watch_folders, + &webdav_config.file_extensions, + enable_background_ocr, + |folder_path| { + let service = webdav_service.clone(); + async move { service.discover_files_in_folder(&folder_path).await } + }, + |file_path| { + let service = webdav_service.clone(); + async move { service.download_file(&file_path).await } + } + ).await + } + + async fn sync_local_folder_source(&self, source: &Source, enable_background_ocr: bool) -> Result { + let config: LocalFolderSourceConfig = serde_json::from_value(source.config.clone()) + .map_err(|e| anyhow!("Invalid LocalFolder config: {}", e))?; + + let local_service = LocalFolderService::new(config.clone()) + .map_err(|e| anyhow!("Failed to create LocalFolder service: {}", e))?; + + self.perform_sync_internal( + source.user_id, + source.id, + &config.watch_folders, + &config.file_extensions, + enable_background_ocr, + |folder_path| { + let service = local_service.clone(); + async move { service.discover_files_in_folder(&folder_path).await } + }, + |file_path| { + let service = local_service.clone(); + async move { service.read_file(&file_path).await } + } + ).await + } + + async fn sync_s3_source(&self, source: &Source, enable_background_ocr: bool) -> Result { + let config: S3SourceConfig = serde_json::from_value(source.config.clone()) + .map_err(|e| anyhow!("Invalid S3 config: {}", e))?; + + let s3_service = S3Service::new(config.clone()).await + .map_err(|e| anyhow!("Failed to create S3 service: {}", e))?; + + self.perform_sync_internal( + source.user_id, + source.id, + &config.watch_folders, + &config.file_extensions, + enable_background_ocr, + |folder_path| { + let service = s3_service.clone(); + async move { service.discover_files_in_folder(&folder_path).await } + }, + |file_path| { + let service = s3_service.clone(); + async move { service.download_file(&file_path).await } + } + ).await + } + + async fn perform_sync_internal( + &self, + user_id: Uuid, + _source_id: Uuid, + watch_folders: &[String], + file_extensions: &[String], + enable_background_ocr: bool, + discover_files: F, + download_file: D, + ) -> Result + where + F: Fn(String) -> Fut1, + D: Fn(String) -> Fut2 + Clone, + Fut1: std::future::Future>>, + Fut2: std::future::Future>>, + { + let mut total_files_processed = 0; + + for folder_path in watch_folders { + info!("Syncing folder: {}", folder_path); + + // Discover files in the folder + match discover_files(folder_path.clone()).await { + Ok(files) => { + info!("Found {} files in folder {}", files.len(), folder_path); + + // Filter files for processing + let files_to_process: Vec<_> = files.into_iter() + .filter(|file_info| { + if file_info.is_directory { + return false; + } + + let file_extension = Path::new(&file_info.name) + .extension() + .and_then(|ext| ext.to_str()) + .unwrap_or("") + .to_lowercase(); + + file_extensions.contains(&file_extension) + }) + .collect(); + + info!("Processing {} files from folder {}", files_to_process.len(), folder_path); + + // Process files concurrently with a limit + let concurrent_limit = 5; + let semaphore = Arc::new(Semaphore::new(concurrent_limit)); + let mut folder_files_processed = 0; + + let mut file_futures = FuturesUnordered::new(); + + for file_info in files_to_process.iter() { + let state_clone = self.state.clone(); + let file_info_clone = file_info.clone(); + let semaphore_clone = semaphore.clone(); + let download_file_clone = download_file.clone(); + + let future = async move { + Self::process_single_file( + state_clone, + user_id, + _source_id, + &file_info_clone, + enable_background_ocr, + semaphore_clone, + download_file_clone, + ).await + }; + + file_futures.push(future); + } + + // Process files concurrently + while let Some(result) = file_futures.next().await { + match result { + Ok(processed) => { + if processed { + folder_files_processed += 1; + info!("Successfully processed file ({} completed in this folder)", folder_files_processed); + } + } + Err(error) => { + error!("File processing error: {}", error); + } + } + } + + total_files_processed += folder_files_processed; + } + Err(e) => { + error!("Failed to discover files in folder {}: {}", folder_path, e); + } + } + } + + info!("Source sync completed: {} files processed", total_files_processed); + Ok(total_files_processed) + } + + async fn process_single_file( + state: Arc, + user_id: Uuid, + _source_id: Uuid, + file_info: &FileInfo, + enable_background_ocr: bool, + semaphore: Arc, + download_file: D, + ) -> Result + where + D: Fn(String) -> Fut, + Fut: std::future::Future>>, + { + let _permit = semaphore.acquire().await + .map_err(|e| anyhow!("Semaphore error: {}", e))?; + + info!("Processing file: {}", file_info.path); + + // Check if we've already processed this file by looking for documents with same source + // This is a simplified version - you might want to implement source-specific tracking tables + + // Download the file + let file_data = download_file(file_info.path.clone()).await + .map_err(|e| anyhow!("Failed to download {}: {}", file_info.path, e))?; + + info!("Downloaded file: {} ({} bytes)", file_info.name, file_data.len()); + + // Calculate file hash for deduplication + let file_hash = Self::calculate_file_hash(&file_data); + + // Check for duplicate content + if let Ok(existing_docs) = state.db.get_documents_by_user_with_role( + user_id, + crate::models::UserRole::User, + 1000, + 0 + ).await { + let matching_docs: Vec<_> = existing_docs.into_iter() + .filter(|doc| doc.file_size == file_data.len() as i64) + .collect(); + + for existing_doc in matching_docs { + if let Ok(existing_file_data) = tokio::fs::read(&existing_doc.file_path).await { + let existing_hash = Self::calculate_file_hash(&existing_file_data); + if file_hash == existing_hash { + info!("File content already exists, skipping: {}", file_info.path); + return Ok(false); + } + } + } + } + + // Save file to disk + let file_service = FileService::new(state.config.upload_path.clone()); + let saved_file_path = file_service.save_file(&file_info.name, &file_data).await + .map_err(|e| anyhow!("Failed to save {}: {}", file_info.name, e))?; + + // Create document record + let document = file_service.create_document( + &file_info.name, + &file_info.name, + &saved_file_path, + file_data.len() as i64, + &file_info.mime_type, + user_id, + ); + + let created_document = state.db.create_document(document).await + .map_err(|e| anyhow!("Failed to create document {}: {}", file_info.name, e))?; + + info!("Created document record for {}: {}", file_info.name, created_document.id); + + // Queue for OCR if enabled + if enable_background_ocr { + info!("Background OCR enabled, queueing document {} for processing", created_document.id); + + match state.db.pool.acquire().await { + Ok(_conn) => { + let queue_service = crate::ocr_queue::OcrQueueService::new( + state.db.clone(), + state.db.pool.clone(), + 4 + ); + + let priority = if file_info.size <= 1024 * 1024 { 10 } + else if file_info.size <= 5 * 1024 * 1024 { 8 } + else if file_info.size <= 10 * 1024 * 1024 { 6 } + else if file_info.size <= 50 * 1024 * 1024 { 4 } + else { 2 }; + + if let Err(e) = queue_service.enqueue_document(created_document.id, priority, file_info.size).await { + error!("Failed to enqueue document for OCR: {}", e); + } else { + info!("Enqueued document {} for OCR processing", created_document.id); + } + } + Err(e) => { + error!("Failed to connect to database for OCR queueing: {}", e); + } + } + } + + Ok(true) + } + + async fn update_source_status(&self, source_id: Uuid, status: SourceStatus, error_message: Option<&str>) -> Result<()> { + let query = if let Some(error) = error_message { + sqlx::query( + r#"UPDATE sources + SET status = $2, last_error = $3, last_error_at = NOW(), updated_at = NOW() + WHERE id = $1"# + ) + .bind(source_id) + .bind(status.to_string()) + .bind(error) + } else { + sqlx::query( + r#"UPDATE sources + SET status = $2, last_error = NULL, last_error_at = NULL, updated_at = NOW() + WHERE id = $1"# + ) + .bind(source_id) + .bind(status.to_string()) + }; + + query.execute(self.state.db.get_pool()).await + .map_err(|e| anyhow!("Failed to update source status: {}", e))?; + + Ok(()) + } + + fn calculate_file_hash(data: &[u8]) -> String { + let mut hasher = Sha256::new(); + hasher.update(data); + let result = hasher.finalize(); + format!("{:x}", result) + } +} \ No newline at end of file