diff --git a/Cargo.lock b/Cargo.lock index 26f07400..0f279b90 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -156,7 +156,7 @@ dependencies = [ "arrow-schema", "arrow-select", "atoi", - "base64", + "base64 0.22.1", "chrono", "half", "lexical-core", @@ -210,6 +210,18 @@ dependencies = [ "num", ] +[[package]] +name = "async-channel" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "924ed96dd52d1b75e9c1a3e6275715fd320f5f9439fb5a4a11fa51f4221158d2" +dependencies = [ + "concurrent-queue", + "event-listener-strategy", + "futures-core", + "pin-project-lite", +] + [[package]] name = "async-stream" version = "0.3.6" @@ -232,6 +244,12 @@ dependencies = [ "syn", ] +[[package]] +name = "async-task" +version = "4.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b75356056920673b02621b35afd0f7dda9306d03c79a30f5c56c44cf256e3de" + [[package]] name = "async-trait" version = "0.1.89" @@ -277,7 +295,7 @@ dependencies = [ "http", "http-body", "http-body-util", - "itoa", + "itoa 1.0.17", "matchit", "memchr", "mime", @@ -311,12 +329,27 @@ dependencies = [ "tower-service", ] +[[package]] +name = "base64" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8" + [[package]] name = "base64" version = "0.22.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" +[[package]] +name = "base64-url" +version = "1.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67a99c239d0c7e77c85dddfa9cebce48704b3c49550fcd3b84dd637e4484899f" +dependencies = [ + "base64 0.13.1", +] + [[package]] name = "bincode" version = "1.3.3" @@ -386,6 +419,15 @@ dependencies = [ "typenum", ] +[[package]] +name = "block-buffer" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4152116fd6e9dadb291ae18fc1ec3575ed6d84c29642d97890f4b4a3417297e4" +dependencies = [ + "generic-array", +] + [[package]] name = "block-buffer" version = "0.10.4" @@ -395,6 +437,19 @@ dependencies = [ "generic-array", ] +[[package]] +name = "blocking" +version = "1.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e83f8d02be6967315521be875afa792a316e28d57b5a2d401897e2a7921b7f21" +dependencies = [ + "async-channel", + "async-task", + "futures-io", + "futures-lite", + "piper", +] + [[package]] name = "bumpalo" version = "3.19.1" @@ -404,6 +459,12 @@ dependencies = [ "allocator-api2", ] +[[package]] +name = "byteorder" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" + [[package]] name = "bytes" version = "1.11.0" @@ -469,7 +530,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d8144c22e24bbcf26ade86cb6501a0916c46b7e4787abdb0045a467eb1645a1d" dependencies = [ "ambient-authority", - "rand", + "rand 0.8.5", ] [[package]] @@ -637,6 +698,15 @@ dependencies = [ "unicode-width 0.2.2", ] +[[package]] +name = "concurrent-queue" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ca0197aee26d1ae37445ee532fefce43251d24cc7c166799f4d46817f1d3973" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "const-random" version = "0.1.18" @@ -657,6 +727,16 @@ dependencies = [ "tiny-keccak", ] +[[package]] +name = "core-foundation" +version = "0.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91e195e091a93c46f7102ec7818a2aa394e1e1771c3ab4825963fa03e45afb8f" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "core-foundation-sys" version = "0.8.7" @@ -742,7 +822,7 @@ dependencies = [ "rustc-hash 2.1.1", "serde", "serde_derive", - "sha2", + "sha2 0.10.9", "smallvec", "target-lexicon", "wasmtime-internal-math", @@ -874,7 +954,7 @@ dependencies = [ "bitflags 2.10.0", "crossterm_winapi", "document-features", - "parking_lot", + "parking_lot 0.12.5", "rustix 1.1.3", "winapi", ] @@ -904,6 +984,25 @@ dependencies = [ "typenum", ] +[[package]] +name = "curve25519-dalek" +version = "3.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "90f9d052967f590a76e62eb387bd0bbb1b000182c3cefe5364db6b7211651bc0" +dependencies = [ + "byteorder", + "digest 0.9.0", + "rand_core 0.5.1", + "subtle", + "zeroize", +] + +[[package]] +name = "data-encoding" +version = "2.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7a1e2f27636f116493b8b860f5546edb47c8d8f8ea73e1d2a20be88e28d1fea" + [[package]] name = "debugid" version = "0.8.0" @@ -920,6 +1019,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ececcb659e7ba858fb4f10388c250a7252eb0a27373f1a72b8748afdd248e587" dependencies = [ "powerfmt", + "serde_core", +] + +[[package]] +name = "digest" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3dd60d1080a57a05ab032377049e0591415d2b31afd7028356dbf3cc6dcb066" +dependencies = [ + "generic-array", ] [[package]] @@ -928,7 +1037,7 @@ version = "0.10.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" dependencies = [ - "block-buffer", + "block-buffer 0.10.4", "crypto-common", ] @@ -985,6 +1094,27 @@ dependencies = [ "shared_child", ] +[[package]] +name = "ed25519" +version = "1.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91cff35c70bba8a626e3185d8cd48cc11b5437e1a5bcd15b9b5fa3c64b6dfee7" +dependencies = [ + "signature", +] + +[[package]] +name = "ed25519-dalek" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c762bae6dcaf24c4c84667b8579785430908723d5c889f469d76a41d59cc7a9d" +dependencies = [ + "curve25519-dalek", + "ed25519", + "sha2 0.9.9", + "zeroize", +] + [[package]] name = "either" version = "1.15.0" @@ -1053,12 +1183,42 @@ version = "3.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dea2df4cf52843e0452895c455a1a2cfbb842a1e7329671acf418fdc53ed4c59" +[[package]] +name = "event-listener" +version = "5.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e13b66accf52311f30a0db42147dadea9850cb48cd070028831ae5f5d4b856ab" +dependencies = [ + "concurrent-queue", + "parking", + "pin-project-lite", +] + +[[package]] +name = "event-listener-strategy" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8be9f3dfaaffdae2972880079a491a1a8bb7cbed0b8dd7a347f668b4150a3b93" +dependencies = [ + "event-listener", + "pin-project-lite", +] + [[package]] name = "fallible-iterator" version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2acce4a10f12dc2fb14a218589d4f1f62ef011b2d0cc4b3cb1bba8e94da14649" +[[package]] +name = "fastrand" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e51093e27b0797c359783294ca4f0a911c270184cb10f85783b118614a1501be" +dependencies = [ + "instant", +] + [[package]] name = "fastrand" version = "2.3.0" @@ -1145,14 +1305,15 @@ dependencies = [ "arrow-ipc", "arrow-schema", "async-trait", - "base64", + "base64 0.22.1", "bincode", "clap", "crossbeam-channel", "log", "lru", + "nats", "num_cpus", - "parking_lot", + "parking_lot 0.12.5", "pest", "pest_derive", "proctitle", @@ -1227,6 +1388,16 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" +[[package]] +name = "futures-lite" +version = "2.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f78e10609fe0e0b3f4157ffab1876319b5b0db102a2c60dc4626306dc46b44ad" +dependencies = [ + "futures-core", + "pin-project-lite", +] + [[package]] name = "futures-sink" version = "0.3.31" @@ -1280,6 +1451,17 @@ dependencies = [ "version_check", ] +[[package]] +name = "getrandom" +version = "0.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8fc3cb4d91f53b50155bdcfd23f6a4c39ae1969c2ae85982b135750cccaf5fce" +dependencies = [ + "cfg-if", + "libc", + "wasi 0.9.0+wasi-snapshot-preview1", +] + [[package]] name = "getrandom" version = "0.2.16" @@ -1288,7 +1470,7 @@ checksum = "335ff9f135e4384c8150d6f27c6daed433577f86b4750418338c01a1a2528592" dependencies = [ "cfg-if", "libc", - "wasi", + "wasi 0.11.1+wasi-snapshot-preview1", ] [[package]] @@ -1409,7 +1591,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3ba2a386d7f85a81f119ad7498ebe444d2e22c2af0b86b069416ace48b3311a" dependencies = [ "bytes", - "itoa", + "itoa 1.0.17", ] [[package]] @@ -1468,7 +1650,7 @@ dependencies = [ "http-body", "httparse", "httpdate", - "itoa", + "itoa 1.0.17", "pin-project-lite", "pin-utils", "smallvec", @@ -1649,7 +1831,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "af1955a75fa080c677d3972822ec4bad316169ab1cfc6c257a942c2265dbe5fe" dependencies = [ "bitmaps", - "rand_core", + "rand_core 0.6.4", "rand_xoshiro", "sized-chunks", "typenum", @@ -1678,6 +1860,15 @@ dependencies = [ "serde_core", ] +[[package]] +name = "instant" +version = "0.1.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e0242819d153cba4b4b05a5a8f2a7e9bbf97b6055b2a002b395c96b5ff3c0222" +dependencies = [ + "cfg-if", +] + [[package]] name = "io-extras" version = "0.18.4" @@ -1735,6 +1926,12 @@ dependencies = [ "either", ] +[[package]] +name = "itoa" +version = "0.4.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b71991ff56294aa922b450139ee08b3bfc70982c6b2c7562771375cf73542dd4" + [[package]] name = "itoa" version = "1.0.17" @@ -1781,6 +1978,12 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "json" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "078e285eafdfb6c4b434e0d31e8cfcb5115b651496faca5749b88fafd4f23bfd" + [[package]] name = "lazy_static" version = "1.5.0" @@ -2051,7 +2254,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a69bcab0ad47271a0234d9422b131806bf3968021e5dc9328caf2d4cd58557fc" dependencies = [ "libc", - "wasi", + "wasi 0.11.1+wasi-snapshot-preview1", "windows-sys 0.61.2", ] @@ -2061,6 +2264,41 @@ version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1d87ecb2933e8aeadb3e3a02b828fed80a7528047e68b4f424523a0981a3a084" +[[package]] +name = "nats" +version = "0.18.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "603b57313fd7ff9ddf81b833923ee872264bec6426bff89b3c8c90c0de2267cb" +dependencies = [ + "base64 0.13.1", + "base64-url", + "blocking", + "crossbeam-channel", + "fastrand 1.9.0", + "itoa 0.4.8", + "json", + "lazy_static", + "libc", + "log", + "memchr", + "nkeys", + "nuid", + "once_cell", + "parking_lot 0.11.2", + "regex", + "rustls", + "rustls-native-certs", + "rustls-pemfile", + "serde", + "serde_json", + "serde_nanos", + "serde_repr", + "time", + "url", + "webpki", + "winapi", +] + [[package]] name = "nibble_vec" version = "0.1.0" @@ -2082,6 +2320,20 @@ dependencies = [ "libc", ] +[[package]] +name = "nkeys" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1a98f0a974ff737974b57ba1c71d2e0fe7ec18e5a828d4b8e02683171349dfa" +dependencies = [ + "byteorder", + "data-encoding", + "ed25519-dalek", + "log", + "rand 0.7.3", + "signatory", +] + [[package]] name = "nom" version = "7.1.3" @@ -2101,6 +2353,16 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "nuid" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "20c1bb65186718d348306bf1afdeb20d9ab45b2ab80fb793c0fdcf59ffbb4f38" +dependencies = [ + "lazy_static", + "rand 0.8.5", +] + [[package]] name = "num" version = "0.4.3" @@ -2237,6 +2499,18 @@ version = "1.70.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "384b8ab6d37215f3c5301a95a4accb5d64aa607f1fcb26a11b5303878451b4fe" +[[package]] +name = "opaque-debug" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c08d65885ee38876c4f86fa503fb49d7b507c2b62552df7c70b2fce627e06381" + +[[package]] +name = "openssl-probe" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e" + [[package]] name = "openssl-sys" version = "0.9.111" @@ -2259,6 +2533,23 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "parking" +version = "2.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f38d5652c16fde515bb1ecef450ab0f6a219d619a7274976324d5e377f7dceba" + +[[package]] +name = "parking_lot" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99" +dependencies = [ + "instant", + "lock_api", + "parking_lot_core 0.8.6", +] + [[package]] name = "parking_lot" version = "0.12.5" @@ -2266,7 +2557,21 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "93857453250e3077bd71ff98b6a65ea6621a19bb0f559a85248955ac12c45a1a" dependencies = [ "lock_api", - "parking_lot_core", + "parking_lot_core 0.9.12", +] + +[[package]] +name = "parking_lot_core" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "60a2cfe6f0ad2bfc16aefa463b497d5c7a5ecd44a23efa72aa342d90177356dc" +dependencies = [ + "cfg-if", + "instant", + "libc", + "redox_syscall 0.2.16", + "smallvec", + "winapi", ] [[package]] @@ -2277,7 +2582,7 @@ checksum = "2621685985a2ebf1c516881c026032ac7deafcda1a2c9b7850dc81e3dfcb64c1" dependencies = [ "cfg-if", "libc", - "redox_syscall", + "redox_syscall 0.5.18", "smallvec", "windows-link", ] @@ -2334,7 +2639,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "602113b5b5e8621770cfd490cfd90b9f84ab29bd2b0e49ad83eb6d186cef2365" dependencies = [ "pest", - "sha2", + "sha2 0.10.9", ] [[package]] @@ -2389,6 +2694,17 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "piper" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c835479a4443ded371d6c535cbfd8d31ad92c5d23ae9770a61bc155e4992a3c1" +dependencies = [ + "atomic-waker", + "fastrand 2.3.0", + "futures-io", +] + [[package]] name = "pkg-config" version = "0.3.32" @@ -2581,6 +2897,19 @@ dependencies = [ "nibble_vec", ] +[[package]] +name = "rand" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a6b1679d49b24bbfe0c803429aa1874472f50d9b363131f0e89fc356b544d03" +dependencies = [ + "getrandom 0.1.16", + "libc", + "rand_chacha 0.2.2", + "rand_core 0.5.1", + "rand_hc", +] + [[package]] name = "rand" version = "0.8.5" @@ -2588,8 +2917,18 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" dependencies = [ "libc", - "rand_chacha", - "rand_core", + "rand_chacha 0.3.1", + "rand_core 0.6.4", +] + +[[package]] +name = "rand_chacha" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f4c8ed856279c9737206bf725bf36935d8666ead7aa69b52be55af369d193402" +dependencies = [ + "ppv-lite86", + "rand_core 0.5.1", ] [[package]] @@ -2599,7 +2938,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" dependencies = [ "ppv-lite86", - "rand_core", + "rand_core 0.6.4", +] + +[[package]] +name = "rand_core" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "90bde5296fc891b0cef12a6d03ddccc162ce7b2aff54160af9338f8d40df6d19" +dependencies = [ + "getrandom 0.1.16", ] [[package]] @@ -2611,13 +2959,22 @@ dependencies = [ "getrandom 0.2.16", ] +[[package]] +name = "rand_hc" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca3129af7b92a17112d59ad498c6f81eaf463253766b90396d39ea7a39d6613c" +dependencies = [ + "rand_core 0.5.1", +] + [[package]] name = "rand_xoshiro" version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6f97cdb2a36ed4183de61b2f824cc45c9f1037f28afe0a322e9fff4c108b5aaa" dependencies = [ - "rand_core", + "rand_core 0.6.4", ] [[package]] @@ -2673,6 +3030,15 @@ dependencies = [ "sasl2-sys", ] +[[package]] +name = "redox_syscall" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb5a58c1855b4b6819d59012155603f0b22ad30cad752600aadfcb695265519a" +dependencies = [ + "bitflags 1.3.2", +] + [[package]] name = "redox_syscall" version = "0.5.18" @@ -2737,6 +3103,21 @@ version = "0.8.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7a2d987857b319362043e95f5353c0535c1f58eec5336fdfcf626430af7def58" +[[package]] +name = "ring" +version = "0.16.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3053cf52e236a3ed746dfc745aa9cacf1b791d846bdaf412f60a8d7d6e17c8fc" +dependencies = [ + "cc", + "libc", + "once_cell", + "spin", + "untrusted", + "web-sys", + "winapi", +] + [[package]] name = "rocksdb" version = "0.21.0" @@ -2810,6 +3191,40 @@ dependencies = [ "rustix 1.1.3", ] +[[package]] +name = "rustls" +version = "0.19.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35edb675feee39aec9c99fa5ff985081995a06d594114ae14cbe797ad7b7a6d7" +dependencies = [ + "base64 0.13.1", + "log", + "ring", + "sct", + "webpki", +] + +[[package]] +name = "rustls-native-certs" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a07b7c1885bd8ed3831c289b7870b13ef46fe0e856d288c30d9cc17d75a2092" +dependencies = [ + "openssl-probe", + "rustls", + "schannel", + "security-framework", +] + +[[package]] +name = "rustls-pemfile" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5eebeaeb360c87bfb72e84abdb3447159c0eaececf1bef2aecd65a8be949d1c9" +dependencies = [ + "base64 0.13.1", +] + [[package]] name = "rustversion" version = "1.0.22" @@ -2856,12 +3271,54 @@ dependencies = [ "pkg-config", ] +[[package]] +name = "schannel" +version = "0.1.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91c1b7e4904c873ef0710c1f407dde2e6287de2bebc1bbbf7d430bb7cbffd939" +dependencies = [ + "windows-sys 0.61.2", +] + [[package]] name = "scopeguard" version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" +[[package]] +name = "sct" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b362b83898e0e69f38515b82ee15aa80636befe47c3b6d3d89a911e78fc228ce" +dependencies = [ + "ring", + "untrusted", +] + +[[package]] +name = "security-framework" +version = "2.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "897b2245f0b511c87893af39b033e5ca9cce68824c4d7e7630b5a1d339658d02" +dependencies = [ + "bitflags 2.10.0", + "core-foundation", + "core-foundation-sys", + "libc", + "security-framework-sys", +] + +[[package]] +name = "security-framework-sys" +version = "2.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ce2691df843ecc5d231c0b14ece2acc3efb62c0a398c7e1d875f3983ce020e3" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "semver" version = "1.0.27" @@ -2908,13 +3365,33 @@ version = "1.0.148" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3084b546a1dd6289475996f182a22aba973866ea8e8b02c51d9f46b1336a22da" dependencies = [ - "itoa", + "itoa 1.0.17", "memchr", "serde", "serde_core", "zmij", ] +[[package]] +name = "serde_nanos" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a93142f0367a4cc53ae0fead1bcda39e85beccfad3dcd717656cacab94b12985" +dependencies = [ + "serde", +] + +[[package]] +name = "serde_repr" +version = "0.1.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "175ee3e80ae9982737ca543e96133087cbd9a485eecc3bc4de9c1a37b47ea59c" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "serde_spanned" version = "1.0.4" @@ -2931,12 +3408,25 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6a8b1a1a2ebf674015cc02edccce75287f1a0130d394307b36743c2f5d504b47" dependencies = [ "indexmap 2.12.1", - "itoa", + "itoa 1.0.17", "ryu", "serde", "unsafe-libyaml", ] +[[package]] +name = "sha2" +version = "0.9.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4d58a1e1bf39749807d89cf2d98ac2dfa0ff1cb3faa38fbb64dd88ac8013d800" +dependencies = [ + "block-buffer 0.9.0", + "cfg-if", + "cpufeatures", + "digest 0.9.0", + "opaque-debug", +] + [[package]] name = "sha2" version = "0.10.9" @@ -2945,7 +3435,7 @@ checksum = "a7507d819769d01a365ab707794a4084392c824f54a7a6a7862f8c3d0892b283" dependencies = [ "cfg-if", "cpufeatures", - "digest", + "digest 0.10.7", ] [[package]] @@ -3005,6 +3495,24 @@ dependencies = [ "libc", ] +[[package]] +name = "signatory" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9eaebd4be561a7d8148803baa108092f85090189c4b8c3ffb81602b15b5c1771" +dependencies = [ + "getrandom 0.1.16", + "signature", + "subtle-encoding", + "zeroize", +] + +[[package]] +name = "signature" +version = "1.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "74233d3b3b2f6d4b006dc19dee745e73e2a6bfb6f93607cd3b02bd5b00797d7c" + [[package]] name = "sized-chunks" version = "0.6.5" @@ -3050,6 +3558,12 @@ dependencies = [ "windows-sys 0.60.2", ] +[[package]] +name = "spin" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" + [[package]] name = "stable_deref_trait" version = "1.2.1" @@ -3068,6 +3582,21 @@ version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" +[[package]] +name = "subtle" +version = "2.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292" + +[[package]] +name = "subtle-encoding" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7dcb1ed7b8330c5eed5441052651dd7a12c75e2ed88f2ec024ae1fa3a5e59945" +dependencies = [ + "zeroize", +] + [[package]] name = "syn" version = "2.0.113" @@ -3124,7 +3653,7 @@ version = "3.24.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "655da9c7eb6305c55742045d5a8d2037996d61d8de95806335c7c86ce0f82e9c" dependencies = [ - "fastrand", + "fastrand 2.3.0", "getrandom 0.3.4", "once_cell", "rustix 1.1.3", @@ -3196,7 +3725,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "91e7d9e3bb61134e77bde20dd4825b97c010155709965fedf0f49bb138e52a9d" dependencies = [ "deranged", - "itoa", + "itoa 1.0.17", "num-conv", "powerfmt", "serde", @@ -3248,7 +3777,7 @@ dependencies = [ "bytes", "libc", "mio", - "parking_lot", + "parking_lot 0.12.5", "pin-project-lite", "signal-hook-registry", "socket2 0.6.1", @@ -3351,7 +3880,7 @@ dependencies = [ "async-stream", "async-trait", "axum", - "base64", + "base64 0.22.1", "bytes", "h2", "http", @@ -3397,7 +3926,7 @@ dependencies = [ "indexmap 1.9.3", "pin-project", "pin-project-lite", - "rand", + "rand 0.8.5", "slab", "tokio", "tokio-util", @@ -3573,6 +4102,12 @@ version = "0.2.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "673aac59facbab8a9007c7f6108d11f63b603f7cabff99fabf650fea5c32b861" +[[package]] +name = "untrusted" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a" + [[package]] name = "url" version = "2.5.7" @@ -3635,6 +4170,12 @@ dependencies = [ "try-lock", ] +[[package]] +name = "wasi" +version = "0.9.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cccddf32554fecc6acb585f82a32a72e28b48f8c4c1883ddfeeeaa96f7d8e519" + [[package]] name = "wasi" version = "0.11.1+wasi-snapshot-preview1" @@ -3840,14 +4381,14 @@ version = "41.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f52a985f5b5dae53147fc596f3a313c334e2c24fd1ba708634e1382f6ecd727" dependencies = [ - "base64", + "base64 0.22.1", "directories-next", "log", "postcard", "rustix 1.1.3", "serde", "serde_derive", - "sha2", + "sha2 0.10.9", "toml", "wasmtime-environ", "windows-sys 0.61.2", @@ -4085,6 +4626,26 @@ dependencies = [ "wast 243.0.0", ] +[[package]] +name = "web-sys" +version = "0.3.83" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b32828d774c412041098d182a8b38b16ea816958e07cf40eec2bc080ae137ac" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + +[[package]] +name = "webpki" +version = "0.21.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8e38c0608262c46d4a56202ebabdeb094cef7e560ca7a226c6bf055188aa4ea" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "wiggle" version = "41.0.3" @@ -4525,6 +5086,26 @@ dependencies = [ "synstructure", ] +[[package]] +name = "zeroize" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4756f7db3f7b5574938c3eb1c117038b8e07f95ee6718c0efad4ac21508f1efd" +dependencies = [ + "zeroize_derive", +] + +[[package]] +name = "zeroize_derive" +version = "1.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85a5b4158499876c763cb03bc4e49185d3cccbabb15b33c627f7884f43db852e" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "zerotrie" version = "0.2.3" diff --git a/Cargo.toml b/Cargo.toml index 4b855aa9..c75b5c06 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,6 +36,7 @@ async-trait = "0.1" num_cpus = "1.0" protocol = { path = "./protocol" } rdkafka = { version = "0.38", features = ["cmake-build", "ssl", "gssapi"] } +nats = "0.18" crossbeam-channel = "0.5" pest = "2.7" pest_derive = "2.7" diff --git a/src/runtime/input/input_provider.rs b/src/runtime/input/input_provider.rs index 3f6606cd..91b0609e 100644 --- a/src/runtime/input/input_provider.rs +++ b/src/runtime/input/input_provider.rs @@ -105,6 +105,41 @@ impl InputProvider { runtime, ))) } + InputConfig::Nats { + url, + subject, + queue_group, + extra, + runtime: _, + } => { + use crate::runtime::input::InputRunner; + use crate::runtime::input::protocol::nats::{NatsConfig, NatsProtocol}; + + if url.is_empty() { + return Err(Box::new(std::io::Error::new( + std::io::ErrorKind::InvalidData, + format!( + "Invalid nats url in input config (group #{}): empty (subject: {})", + group_idx + 1, + subject + ), + )) as Box); + } + + let nats_config = NatsConfig::new( + url.clone(), + subject.clone(), + queue_group.clone(), + extra.clone(), + ); + let runtime = input_config.input_runtime_config(); + Ok(Box::new(InputRunner::new( + NatsProtocol::new(nats_config), + group_idx, + input_idx, + runtime, + ))) + } } } } diff --git a/src/runtime/input/protocol/mod.rs b/src/runtime/input/protocol/mod.rs index b9574391..ded9e6e8 100644 --- a/src/runtime/input/protocol/mod.rs +++ b/src/runtime/input/protocol/mod.rs @@ -11,3 +11,4 @@ // limitations under the License. pub mod kafka; +pub mod nats; diff --git a/src/runtime/input/protocol/nats/config.rs b/src/runtime/input/protocol/nats/config.rs new file mode 100644 index 00000000..ec4f27b0 --- /dev/null +++ b/src/runtime/input/protocol/nats/config.rs @@ -0,0 +1,42 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; + +/// NatsConfig - NATS consumer configuration +#[derive(Debug, Clone)] +pub struct NatsConfig { + /// NATS server URL(s), comma-separated + pub url: String, + /// Subject to subscribe to + pub subject: String, + /// Optional queue group for load balancing + pub queue_group: Option, + /// Extra options (e.g. token, user, pass) + pub properties: HashMap, +} + +impl NatsConfig { + pub fn new( + url: String, + subject: String, + queue_group: Option, + properties: HashMap, + ) -> Self { + Self { + url, + subject, + queue_group, + properties, + } + } +} diff --git a/src/runtime/input/protocol/nats/mod.rs b/src/runtime/input/protocol/nats/mod.rs new file mode 100644 index 00000000..7c17282b --- /dev/null +++ b/src/runtime/input/protocol/nats/mod.rs @@ -0,0 +1,18 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod config; +pub mod nats_protocol; +pub mod options; + +pub use config::NatsConfig; +pub use nats_protocol::NatsProtocol; diff --git a/src/runtime/input/protocol/nats/nats_protocol.rs b/src/runtime/input/protocol/nats/nats_protocol.rs new file mode 100644 index 00000000..bb6f825e --- /dev/null +++ b/src/runtime/input/protocol/nats/nats_protocol.rs @@ -0,0 +1,83 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use super::config::NatsConfig; +use super::options; +use crate::runtime::buffer_and_event::BufferOrEvent; +use crate::runtime::input::input_protocol::InputProtocol; +use std::sync::OnceLock; +use std::time::Duration; + +pub struct NatsProtocol { + config: NatsConfig, + subscription: OnceLock, +} + +impl NatsProtocol { + pub fn new(config: NatsConfig) -> Self { + Self { + config, + subscription: OnceLock::new(), + } + } +} + +impl InputProtocol for NatsProtocol { + fn name(&self) -> String { + format!("nats-{}", self.config.subject) + } + + fn init(&self) -> Result<(), Box> { + let nc = options::nats_connect(&self.config.url, &self.config.properties) + .map_err(|e| Box::new(std::io::Error::other(e)) as Box)?; + + let sub = if let Some(q) = &self.config.queue_group { + nc.queue_subscribe(&self.config.subject, q).map_err(|e| { + Box::new(std::io::Error::other(e)) as Box + })? + } else { + nc.subscribe(&self.config.subject).map_err(|e| { + Box::new(std::io::Error::other(e)) as Box + })? + }; + + self.subscription.set(sub).map_err(|_| { + Box::new(std::io::Error::other("NATS subscription already init")) + as Box + })?; + Ok(()) + } + + fn poll( + &self, + timeout: Duration, + ) -> Result, Box> { + let sub = self.subscription.get().ok_or_else(|| { + Box::new(std::io::Error::other("NATS subscription not init")) + as Box + })?; + + match sub.next_timeout(timeout) { + Ok(msg) => { + let payload = msg.data.to_vec(); + Ok(Some(BufferOrEvent::new_buffer( + payload, + Some(self.config.subject.clone()), + false, + false, + ))) + } + Err(ref e) if e.kind() == std::io::ErrorKind::TimedOut => Ok(None), + Err(e) => Err(Box::new(e) as Box), + } + } +} diff --git a/src/runtime/input/protocol/nats/options.rs b/src/runtime/input/protocol/nats/options.rs new file mode 100644 index 00000000..b6bb7c6b --- /dev/null +++ b/src/runtime/input/protocol/nats/options.rs @@ -0,0 +1,95 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Build `nats::Options` from URL and properties map. +//! Used by both NATS input and output protocols so that auth and other options +//! are not silently ignored. + +use std::collections::HashMap; +use std::path::Path; + +/// Get property by key, case-insensitive. Prefer exact match, then lowercase. +fn get_prop<'a>(props: &'a HashMap, key: &str) -> Option<&'a str> { + props.get(key).map(String::as_str).or_else(|| { + let key_lower = key.to_lowercase(); + props + .iter() + .find(|(k, _)| k.to_lowercase() == key_lower) + .map(|(_, v)| v.as_str()) + }) +} + +/// Build NATS connection options from properties. +/// +/// Supported properties (case-insensitive): +/// - `token`: auth token +/// - `user` / `username`: username (with `password` or `pass`) +/// - `password` / `pass`: password +/// - `name`: client connection name +/// - `tls_required`: "true" / "false" +/// - `client_cert`: path to client cert (with `client_key`) +/// - `client_key`: path to client key +/// - `credentials` / `creds`: path to .creds file +/// - `root_certificate` / `tls_ca`: path to root CA PEM +pub fn build_nats_options(properties: &HashMap) -> nats::Options { + let opts = if let Some(token) = get_prop(properties, "token") { + nats::Options::with_token(token) + } else if let (Some(u), Some(p)) = ( + get_prop(properties, "user").or_else(|| get_prop(properties, "username")), + get_prop(properties, "password").or_else(|| get_prop(properties, "pass")), + ) { + nats::Options::with_user_pass(u, p) + } else if let Some(creds) = + get_prop(properties, "credentials").or_else(|| get_prop(properties, "creds")) + { + nats::Options::with_credentials(Path::new(creds)) + } else { + nats::Options::new() + }; + + let opts = if let Some(name) = get_prop(properties, "name") { + opts.with_name(name) + } else { + opts + }; + + let opts = if let Some(s) = get_prop(properties, "tls_required") { + opts.tls_required(s.eq_ignore_ascii_case("true") || s.eq_ignore_ascii_case("1")) + } else { + opts + }; + + let opts = if let (Some(cert), Some(key)) = ( + get_prop(properties, "client_cert"), + get_prop(properties, "client_key"), + ) { + opts.client_cert(Path::new(cert), Path::new(key)) + } else { + opts + }; + + if let Some(ca) = + get_prop(properties, "root_certificate").or_else(|| get_prop(properties, "tls_ca")) + { + opts.add_root_certificate(Path::new(ca)) + } else { + opts + } +} + +/// Connect to NATS using URL and properties (auth, name, TLS, etc.). +pub fn nats_connect( + url: &str, + properties: &HashMap, +) -> std::io::Result { + build_nats_options(properties).connect(url) +} diff --git a/src/runtime/output/output_provider.rs b/src/runtime/output/output_provider.rs index c6d01fef..a5cd1f9a 100644 --- a/src/runtime/output/output_provider.rs +++ b/src/runtime/output/output_provider.rs @@ -88,6 +88,33 @@ impl OutputProvider { let runtime = output_config.output_runtime_config(); Ok(Box::new(OutputRunner::new(protocol, output_idx, runtime))) } + OutputConfig::Nats { + url, + subject, + extra, + runtime: _, + } => { + use crate::runtime::output::output_runner::OutputRunner; + use crate::runtime::output::protocol::nats::{ + NatsOutputProtocol, NatsProducerConfig, + }; + + if url.is_empty() { + return Err(Box::new(std::io::Error::new( + std::io::ErrorKind::InvalidData, + format!( + "Invalid nats url in output config: empty (subject: {})", + subject + ), + )) as Box); + } + + let nats_config = + NatsProducerConfig::new(url.clone(), subject.clone(), extra.clone()); + let protocol = NatsOutputProtocol::new(nats_config); + let runtime = output_config.output_runtime_config(); + Ok(Box::new(OutputRunner::new(protocol, output_idx, runtime))) + } } } } diff --git a/src/runtime/output/protocol/mod.rs b/src/runtime/output/protocol/mod.rs index 20e4d1c5..53a74859 100644 --- a/src/runtime/output/protocol/mod.rs +++ b/src/runtime/output/protocol/mod.rs @@ -15,3 +15,4 @@ // Provides implementations of various output protocols pub mod kafka; +pub mod nats; diff --git a/src/runtime/output/protocol/nats/mod.rs b/src/runtime/output/protocol/nats/mod.rs new file mode 100644 index 00000000..c431b8d2 --- /dev/null +++ b/src/runtime/output/protocol/nats/mod.rs @@ -0,0 +1,17 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod nats_protocol; +pub mod producer_config; + +pub use nats_protocol::NatsOutputProtocol; +pub use producer_config::NatsProducerConfig; diff --git a/src/runtime/output/protocol/nats/nats_protocol.rs b/src/runtime/output/protocol/nats/nats_protocol.rs new file mode 100644 index 00000000..87e6bddf --- /dev/null +++ b/src/runtime/output/protocol/nats/nats_protocol.rs @@ -0,0 +1,68 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use super::producer_config::NatsProducerConfig; +use crate::runtime::buffer_and_event::BufferOrEvent; +use crate::runtime::input::protocol::nats::options; +use crate::runtime::output::output_protocol::OutputProtocol; +use std::sync::Mutex; + +pub struct NatsOutputProtocol { + config: NatsProducerConfig, + connection: Mutex>, +} + +impl NatsOutputProtocol { + pub fn new(config: NatsProducerConfig) -> Self { + Self { + config, + connection: Mutex::new(None), + } + } +} + +impl OutputProtocol for NatsOutputProtocol { + fn name(&self) -> String { + format!("nats-{}", self.config.subject) + } + + fn init(&self) -> Result<(), Box> { + let nc = options::nats_connect(&self.config.url, &self.config.properties) + .map_err(|e| Box::new(std::io::Error::other(e)) as Box)?; + *self.connection.lock().unwrap() = Some(nc); + Ok(()) + } + + fn send(&self, data: BufferOrEvent) -> Result<(), Box> { + if let Some(payload) = data.into_buffer() { + let lock = self.connection.lock().unwrap(); + let nc = lock.as_ref().ok_or_else(|| { + Box::new(std::io::Error::other("NATS connection not initialized")) + as Box + })?; + nc.publish(&self.config.subject, &payload).map_err(|e| { + Box::new(std::io::Error::other(e)) as Box + })?; + } + Ok(()) + } + + fn flush(&self) -> Result<(), Box> { + let lock = self.connection.lock().unwrap(); + if let Some(nc) = lock.as_ref() { + nc.flush().map_err(|e| { + Box::new(std::io::Error::other(e)) as Box + })?; + } + Ok(()) + } +} diff --git a/src/runtime/output/protocol/nats/producer_config.rs b/src/runtime/output/protocol/nats/producer_config.rs new file mode 100644 index 00000000..e40c7bd5 --- /dev/null +++ b/src/runtime/output/protocol/nats/producer_config.rs @@ -0,0 +1,31 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; + +/// NatsProducerConfig - NATS producer configuration +#[derive(Debug, Clone)] +pub struct NatsProducerConfig { + pub url: String, + pub subject: String, + pub properties: HashMap, +} + +impl NatsProducerConfig { + pub fn new(url: String, subject: String, properties: HashMap) -> Self { + Self { + url, + subject, + properties, + } + } +} diff --git a/src/runtime/task/processor_config.rs b/src/runtime/task/processor_config.rs index fe515647..8f250ec9 100644 --- a/src/runtime/task/processor_config.rs +++ b/src/runtime/task/processor_config.rs @@ -144,6 +144,16 @@ pub enum InputConfig { #[serde(default)] runtime: InputRuntimeConfig, }, + Nats { + url: String, + subject: String, + #[serde(default)] + queue_group: Option, + #[serde(flatten)] + extra: HashMap, + #[serde(default)] + runtime: InputRuntimeConfig, + }, } impl InputConfig { @@ -172,12 +182,14 @@ impl InputConfig { pub fn input_type(&self) -> &'static str { match self { InputConfig::Kafka { .. } => "kafka", + InputConfig::Nats { .. } => "nats", } } pub fn input_runtime_config(&self) -> InputRuntimeConfig { match self { InputConfig::Kafka { runtime, .. } => runtime.clone(), + InputConfig::Nats { runtime, .. } => runtime.clone(), } } } @@ -520,6 +532,14 @@ pub enum OutputConfig { #[serde(default)] runtime: OutputRuntimeConfig, }, + Nats { + url: String, + subject: String, + #[serde(flatten)] + extra: HashMap, + #[serde(default)] + runtime: OutputRuntimeConfig, + }, } impl OutputConfig { @@ -547,12 +567,14 @@ impl OutputConfig { pub fn output_type(&self) -> &'static str { match self { OutputConfig::Kafka { .. } => "kafka", + OutputConfig::Nats { .. } => "nats", } } pub fn output_runtime_config(&self) -> OutputRuntimeConfig { match self { OutputConfig::Kafka { runtime, .. } => runtime.clone(), + OutputConfig::Nats { runtime, .. } => runtime.clone(), } } }