diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 9ccb342..eca5345 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -48,3 +48,35 @@ jobs: - name: Run tests run: cargo test + + integration: + runs-on: ubuntu-latest + needs: rust + steps: + - name: Checkout code + uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 + + - name: Install Rust toolchain + uses: dtolnay/rust-toolchain@e97e2d8cc328f1b50210efc529dca0028893a2d9 # v1 + with: + toolchain: stable + + - name: Cache cargo registry and build + uses: actions/cache@668228422ae6a00e4ad889ee87cd7109ec5666a7 # v4 + with: + path: | + ~/.cargo/registry + ~/.cargo/git + target + key: cargo-${{ runner.os }}-${{ hashFiles('Cargo.lock') }} + restore-keys: | + cargo-${{ runner.os }}- + + - name: Install uv + uses: astral-sh/setup-uv@6b9c6063abd6010835644d4c2e1bef4cf5cd0fca # v6 + + - name: Install OGx + run: uv tool install "ogx[starter]" --with "sentence-transformers>=5" --with "huggingface_hub<1.18" + + - name: Run integration tests + run: make integration-test diff --git a/Cargo.lock b/Cargo.lock index bbbccb1..0137072 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6,6 +6,7 @@ version = 4 name = "agentic-core" version = "0.1.0" dependencies = [ + "async-trait", "bytes", "chrono", "criterion", @@ -42,6 +43,7 @@ dependencies = [ "reqwest", "serde_json", "tokio", + "tokio-stream", "tower-http", "tracing", "tracing-subscriber", @@ -133,6 +135,17 @@ version = "1.0.102" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f202df86484c868dbad7eaa557ef785d5c66295e41b460ef922eca0723b842c" +[[package]] +name = "async-trait" +version = "0.1.89" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9035ad2d096bed7955a320ee7e2230574d28fd3c3a0f186cbea1ff3c7eed5dbb" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "atoi" version = "2.0.0" @@ -150,9 +163,9 @@ checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" [[package]] name = "autocfg" -version = "1.5.0" +version = "1.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" +checksum = "f2032f911046de80f0a198e0901378627c33f59ea0ac00e363d481118bd70a53" [[package]] name = "axum" @@ -220,9 +233,9 @@ checksum = "2af50177e190e07a26ab74f8b1efbfe2ef87da2116221318cb1c2e82baf7de06" [[package]] name = "bitflags" -version = "2.11.1" +version = "2.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c4512299f36f043ab09a583e57bceb5a5aab7a73db1805848e8fef3c9e8c78b3" +checksum = "84d7ced0ae9557296835c32bf1b1e02b44c746701f898460fb000d7eaa84f00a" dependencies = [ "serde_core", ] @@ -238,9 +251,9 @@ dependencies = [ [[package]] name = "bumpalo" -version = "3.20.2" +version = "3.20.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5d20789868f4b01b2f2caec9f5c4e0213b41e3e5702a50157d699ae31ced2fcb" +checksum = "72f5acc6cb2ba439de613abc23857ec3d78374d8ed5ac84e9d11336e87da8649" [[package]] name = "byteorder" @@ -262,9 +275,9 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" [[package]] name = "cc" -version = "1.2.62" +version = "1.2.63" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1dce859f0832a7d088c4f1119888ab94ef4b5d6795d1ce05afb7fe159d79f98" +checksum = "556e016178bb5662a08681bbe0f00f8e17631781a4dfc8c45e466e4b185ec27f" dependencies = [ "find-msvc-tools", "shlex", @@ -276,12 +289,6 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9330f8b2ff13f34540b44e946ef35111825727b38d33286ef986142615121801" -[[package]] -name = "cfg_aliases" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" - [[package]] name = "chrono" version = "0.4.44" @@ -537,9 +544,9 @@ dependencies = [ [[package]] name = "displaydoc" -version = "0.2.5" +version = "0.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0" +checksum = "1ac70aa55017e108007fbaf5aa0f54b021c98f92ff8af59d42eda9da96e3dd4f" dependencies = [ "proc-macro2", "quote", @@ -554,9 +561,9 @@ checksum = "1aaf95b3e5c8f23aa320147307562d361db0ae0d51242340f558153b4eb2439b" [[package]] name = "either" -version = "1.15.0" +version = "1.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" +checksum = "91622ff5e7162018101f2fea40d6ebf4a78bbe5a49736a2020649edf9693679e" dependencies = [ "serde", ] @@ -768,24 +775,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff2abc00be7fca6ebc474524697ae276ad847ad0a6b3faa4bcb027e9a4614ad0" dependencies = [ "cfg-if", - "js-sys", "libc", "wasi", - "wasm-bindgen", -] - -[[package]] -name = "getrandom" -version = "0.3.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "899def5c37c4fd7b2664648c28120ecec138e4d395b459e5ca34f9cce2dd77fd" -dependencies = [ - "cfg-if", - "js-sys", - "libc", - "r-efi 5.3.0", - "wasip2", - "wasm-bindgen", ] [[package]] @@ -796,7 +787,7 @@ checksum = "0de51e6874e94e7bf76d726fc5d13ba782deca734ff60d5bb2fb2607c7406555" dependencies = [ "cfg-if", "libc", - "r-efi 6.0.0", + "r-efi", "wasip2", "wasip3", ] @@ -885,9 +876,9 @@ dependencies = [ [[package]] name = "http" -version = "1.4.0" +version = "1.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e3ba2a386d7f85a81f119ad7498ebe444d2e22c2af0b86b069416ace48b3311a" +checksum = "8be7462df143984c4598a256ef469b251d7d7f9e271135073e78fc535414f3d0" dependencies = [ "bytes", "itoa", @@ -930,9 +921,9 @@ checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" [[package]] name = "hyper" -version = "1.9.0" +version = "1.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6299f016b246a94207e63da54dbe807655bf9e00044f73ded42c3ac5305fbcca" +checksum = "55281c53a1894c864990125767da440a4e630446785086f52523b20033b74498" dependencies = [ "atomic-waker", "bytes", @@ -949,22 +940,6 @@ dependencies = [ "want", ] -[[package]] -name = "hyper-rustls" -version = "0.27.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "33ca68d021ef39cf6463ab54c1d0f5daf03377b70561305bb89a8f83aab66e0f" -dependencies = [ - "http", - "hyper", - "hyper-util", - "rustls", - "tokio", - "tokio-rustls", - "tower-service", - "webpki-roots 1.0.7", -] - [[package]] name = "hyper-tls" version = "0.6.0" @@ -1189,9 +1164,9 @@ checksum = "8f42a60cbdf9a97f5d2305f08a87dc4e09308d1276d28c869c684d7777685682" [[package]] name = "js-sys" -version = "0.3.98" +version = "0.3.99" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "67df7112613f8bfd9150013a0314e196f4800d3201ae742489d999db2f979f08" +checksum = "142bc4740e452c1e57ade0cbc129f139c9093e354346f0872ef985f4f5cf5f11" dependencies = [ "cfg-if", "futures-util", @@ -1235,7 +1210,7 @@ dependencies = [ "bitflags", "libc", "plain", - "redox_syscall 0.8.0", + "redox_syscall 0.8.1", ] [[package]] @@ -1272,15 +1247,9 @@ dependencies = [ [[package]] name = "log" -version = "0.4.29" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897" - -[[package]] -name = "lru-slab" -version = "0.1.2" +version = "0.4.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "112b39cec0b298b6c1999fee3e31427f74f676e4cb9879ed1a121b43661a4154" +checksum = "113b30b4cd05f7c06868fdb2854f66a7b9fece9a48425351cd532e810d74024f" [[package]] name = "matchers" @@ -1309,9 +1278,9 @@ dependencies = [ [[package]] name = "memchr" -version = "2.8.0" +version = "2.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f8ca58f447f06ed17d5fc4043ce1b10dd205e060fb3ce5b979b8ed8e59ff3f79" +checksum = "6b947ae49db0d222b1dbc6b113ce7248a3fc3a6ca21b696717bfc000ba4484d8" [[package]] name = "mime" @@ -1319,11 +1288,21 @@ version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" +[[package]] +name = "mime_guess" +version = "2.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7c44f8e672c00fe5308fa235f821cb4198414e1c77935c1ab6948d3fd78550e" +dependencies = [ + "mime", + "unicase", +] + [[package]] name = "mio" -version = "1.2.0" +version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "50b7e5b27aa02a74bac8c3f23f448f8d87ff11f92d3aac1a6ed369ee08cc56c1" +checksum = "02bd0af71c67b473010cbbc60715ee815645a4dc942899111f494b4b737d6fda" dependencies = [ "libc", "wasi", @@ -1367,7 +1346,7 @@ dependencies = [ "num-integer", "num-iter", "num-traits", - "rand 0.8.6", + "rand", "smallvec", "zeroize", ] @@ -1422,9 +1401,9 @@ checksum = "d6790f58c7ff633d8771f42965289203411a5e5c68388703c06e14f24770b41e" [[package]] name = "openssl" -version = "0.10.79" +version = "0.10.80" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bf0b434746ee2832f4f0baf10137e1cabb18cbe6912c69e2e33263c45250f542" +checksum = "a45fa2aa886c42762255da344f0a0d313e254066c46aad76f300c3d3da62d967" dependencies = [ "bitflags", "cfg-if", @@ -1453,9 +1432,9 @@ checksum = "7c87def4c32ab89d880effc9e097653c8da5d6ef28e6b539d313baaacfbafcbe" [[package]] name = "openssl-sys" -version = "0.9.115" +version = "0.9.116" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "158fe5b292746440aa6e7a7e690e55aeb72d41505e2804c23c6973ad0e9c9781" +checksum = "f28a22dc7140cda5f096e5e7724a6962ca81a7f8bfd2979f9b18c11af56318c4" dependencies = [ "cc", "libc", @@ -1611,61 +1590,6 @@ dependencies = [ "unicode-ident", ] -[[package]] -name = "quinn" -version = "0.11.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b9e20a958963c291dc322d98411f541009df2ced7b5a4f2bd52337638cfccf20" -dependencies = [ - "bytes", - "cfg_aliases", - "pin-project-lite", - "quinn-proto", - "quinn-udp", - "rustc-hash", - "rustls", - "socket2", - "thiserror", - "tokio", - "tracing", - "web-time", -] - -[[package]] -name = "quinn-proto" -version = "0.11.14" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "434b42fec591c96ef50e21e886936e66d3cc3f737104fdb9b737c40ffb94c098" -dependencies = [ - "bytes", - "getrandom 0.3.4", - "lru-slab", - "rand 0.9.4", - "ring", - "rustc-hash", - "rustls", - "rustls-pki-types", - "slab", - "thiserror", - "tinyvec", - "tracing", - "web-time", -] - -[[package]] -name = "quinn-udp" -version = "0.5.14" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "addec6a0dcad8a8d96a771f815f0eaf55f9d1805756410b39f5fa81332574cbd" -dependencies = [ - "cfg_aliases", - "libc", - "once_cell", - "socket2", - "tracing", - "windows-sys 0.52.0", -] - [[package]] name = "quote" version = "1.0.45" @@ -1675,12 +1599,6 @@ dependencies = [ "proc-macro2", ] -[[package]] -name = "r-efi" -version = "5.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f" - [[package]] name = "r-efi" version = "6.0.0" @@ -1694,18 +1612,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5ca0ecfa931c29007047d1bc58e623ab12e5590e8c7cc53200d5202b69266d8a" dependencies = [ "libc", - "rand_chacha 0.3.1", - "rand_core 0.6.4", -] - -[[package]] -name = "rand" -version = "0.9.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "44c5af06bb1b7d3216d91932aed5265164bf384dc89cd6ba05cf59a35f5f76ea" -dependencies = [ - "rand_chacha 0.9.0", - "rand_core 0.9.5", + "rand_chacha", + "rand_core", ] [[package]] @@ -1715,17 +1623,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" dependencies = [ "ppv-lite86", - "rand_core 0.6.4", -] - -[[package]] -name = "rand_chacha" -version = "0.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb" -dependencies = [ - "ppv-lite86", - "rand_core 0.9.5", + "rand_core", ] [[package]] @@ -1737,15 +1635,6 @@ dependencies = [ "getrandom 0.2.17", ] -[[package]] -name = "rand_core" -version = "0.9.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "76afc826de14238e6e8c374ddcc1fa19e374fd8dd986b0d2af0d02377261d83c" -dependencies = [ - "getrandom 0.3.4", -] - [[package]] name = "rayon" version = "1.12.0" @@ -1777,9 +1666,9 @@ dependencies = [ [[package]] name = "redox_syscall" -version = "0.8.0" +version = "0.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7c7591fa2c6b601dfcfe5f043f65a1c39fcdf50efefcd7f1572e538c1f4b398d" +checksum = "5b44b894f2a6e36457d665d1e08c3866add6ed5e70050c1b4ba8a8ddedb02ce7" dependencies = [ "bitflags", ] @@ -1827,16 +1716,14 @@ dependencies = [ "http-body", "http-body-util", "hyper", - "hyper-rustls", "hyper-tls", "hyper-util", "js-sys", "log", + "mime_guess", "native-tls", "percent-encoding", "pin-project-lite", - "quinn", - "rustls", "rustls-pki-types", "serde", "serde_json", @@ -1844,7 +1731,6 @@ dependencies = [ "sync_wrapper", "tokio", "tokio-native-tls", - "tokio-rustls", "tokio-util", "tower", "tower-http", @@ -1854,7 +1740,6 @@ dependencies = [ "wasm-bindgen-futures", "wasm-streams", "web-sys", - "webpki-roots 1.0.7", ] [[package]] @@ -1884,19 +1769,13 @@ dependencies = [ "num-traits", "pkcs1", "pkcs8", - "rand_core 0.6.4", + "rand_core", "signature", "spki", "subtle", "zeroize", ] -[[package]] -name = "rustc-hash" -version = "2.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "94300abf3f1ae2e2b8ffb7b58043de3d399c73fa6f4b73826402a5c457614dbe" - [[package]] name = "rustix" version = "1.1.4" @@ -1930,7 +1809,6 @@ version = "1.14.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "30a7197ae7eb376e574fe940d068c30fe0462554a3ddbe4eca7838e049c937a9" dependencies = [ - "web-time", "zeroize", ] @@ -2042,9 +1920,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.149" +version = "1.0.150" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "83fc039473c5595ace860d8c4fafa220ff474b3fc6bfdb4293327f1a37e94d86" +checksum = "e8014e44b4736ed0538adeecded0fce2a272f22dc9578a7eb6b2d9993c74cfb9" dependencies = [ "itoa", "memchr", @@ -2109,9 +1987,9 @@ dependencies = [ [[package]] name = "shlex" -version = "1.3.0" +version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" +checksum = "f8fadd59c855ef2080decdef8ff161eb6661b86933c9d82e5ba29dc602a55aba" [[package]] name = "signal-hook-registry" @@ -2130,7 +2008,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "77549399552de45a898a580c1b41d445bf730df867cc44e6c0233bbc4b8329de" dependencies = [ "digest", - "rand_core 0.6.4", + "rand_core", ] [[package]] @@ -2150,9 +2028,9 @@ dependencies = [ [[package]] name = "socket2" -version = "0.6.3" +version = "0.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3a766e1110788c36f4fa1c2b71b387a7815aa65f88ce0229841826633d93723e" +checksum = "52d1cfed4120b4d927bf7c0f86d2087a4a7d6027c906d9f9d525a80573b9be51" dependencies = [ "libc", "windows-sys 0.61.2", @@ -2293,7 +2171,7 @@ dependencies = [ "memchr", "once_cell", "percent-encoding", - "rand 0.8.6", + "rand", "rsa", "serde", "sha1", @@ -2331,7 +2209,7 @@ dependencies = [ "md-5", "memchr", "once_cell", - "rand 0.8.6", + "rand", "serde", "serde_json", "sha2", @@ -2542,16 +2420,6 @@ dependencies = [ "tokio", ] -[[package]] -name = "tokio-rustls" -version = "0.26.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1729aa945f29d91ba541258c8df89027d5792d85a8841fb65e8bf0f4ede4ef61" -dependencies = [ - "rustls", - "tokio", -] - [[package]] name = "tokio-stream" version = "0.1.18" @@ -2594,9 +2462,9 @@ dependencies = [ [[package]] name = "tower-http" -version = "0.6.10" +version = "0.6.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "68d6fdd9f81c2819c9a8b0e0cd91660e7746a8e6ea2ba7c6b2b057985f6bcb51" +checksum = "4cfcf7e2740e6fc6d4d688b4ef00650406bb94adf4731e43c096c3a19fe40840" dependencies = [ "bitflags", "bytes", @@ -2692,9 +2560,15 @@ checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" [[package]] name = "typenum" -version = "1.20.0" +version = "1.20.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "40ce102ab67701b8526c123c1bab5cbe42d7040ccfd0f64af1a385808d2f43de" +checksum = "b6f5e870be6c3b371b77fe0ee0bafb859fa4964b4404c27de1d380043c4dda20" + +[[package]] +name = "unicase" +version = "2.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dbc4bc3a9f746d862c45cb89d705aa10f187bb96c76001afab07a0d35ce60142" [[package]] name = "unicode-bidi" @@ -2761,9 +2635,9 @@ checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" [[package]] name = "uuid" -version = "1.23.1" +version = "1.23.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ddd74a9687298c6858e9b88ec8935ec45d22e8fd5e6394fa1bd4e99a87789c76" +checksum = "d258b83ceec21034727ecee8c382cfa6c3e133699b0742c64571814fb420c9f7" dependencies = [ "getrandom 0.4.2", "js-sys", @@ -2840,9 +2714,9 @@ checksum = "b8dad83b4f25e74f184f64c43b150b91efe7647395b42289f38e50566d82855b" [[package]] name = "wasm-bindgen" -version = "0.2.121" +version = "0.2.122" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49ace1d07c165b0864824eee619580c4689389afa9dc9ed3a4c75040d82e6790" +checksum = "3ed04576f974d2b2fba0f38c51dbc5518011e38c36bf1143164be765528fd409" dependencies = [ "cfg-if", "once_cell", @@ -2853,9 +2727,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-futures" -version = "0.4.71" +version = "0.4.72" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "96492d0d3ffba25305a7dc88720d250b1401d7edca02cc3bcd50633b424673b8" +checksum = "9473dbd2991ae90b6291c3c32c30c6187ac49aa32f9905d1cce280ec1e110b0f" dependencies = [ "js-sys", "wasm-bindgen", @@ -2863,9 +2737,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.121" +version = "0.2.122" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e68e6f4afd367a562002c05637acb8578ff2dea1943df76afb9e83d177c8578" +checksum = "916151b09da36bd82f6615cbf3a419e2f0ba23a03c6160e8e92eb6bd4aa1dec6" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -2873,9 +2747,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.121" +version = "0.2.122" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d95a9ec35c64b2a7cb35d3fead40c4238d0940c86d107136999567a4703259f2" +checksum = "299047362ccbfce148b67ab7e73349f77748e00c8296f9542adfad2ad82c5c5e" dependencies = [ "bumpalo", "proc-macro2", @@ -2886,9 +2760,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.121" +version = "0.2.122" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c4e0100b01e9f0d03189a92b96772a1fb998639d981193d7dbab487302513441" +checksum = "9a929b2c61f11ba3e9bc35b50c1f25cb38e0e892c0c231ae2b8cf78d5dad4437" dependencies = [ "unicode-ident", ] @@ -2942,19 +2816,9 @@ dependencies = [ [[package]] name = "web-sys" -version = "0.3.98" +version = "0.3.99" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4b572dff8bcf38bad0fa19729c89bb5748b2b9b1d8be70cf90df697e3a8f32aa" -dependencies = [ - "js-sys", - "wasm-bindgen", -] - -[[package]] -name = "web-time" -version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a6580f308b1fad9207618087a65c04e7a10bc77e02c8e84e9b00dd4b12fa0bb" +checksum = "6d621441cfc37b84979402712047321980c178f299193a3589d05b99e8763436" dependencies = [ "js-sys", "wasm-bindgen", @@ -3329,18 +3193,18 @@ dependencies = [ [[package]] name = "zerocopy" -version = "0.8.48" +version = "0.8.50" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eed437bf9d6692032087e337407a86f04cd8d6a16a37199ed57949d415bd68e9" +checksum = "3b065d4f0e55f82fae73202e189638116a87c55ab6b8e6c2721e13dd9d854ad1" dependencies = [ "zerocopy-derive", ] [[package]] name = "zerocopy-derive" -version = "0.8.48" +version = "0.8.50" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "70e3cd084b1788766f53af483dd21f93881ff30d7320490ec3ef7526d203bad4" +checksum = "0b631b19d36a892ab55420c92dbc83ccd79274f25be714855d3074aa71cab639" dependencies = [ "proc-macro2", "quote", @@ -3349,9 +3213,9 @@ dependencies = [ [[package]] name = "zerofrom" -version = "0.1.7" +version = "0.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69faa1f2a1ea75661980b013019ed6687ed0e83d069bc1114e2cc74c6c04c4df" +checksum = "0ec05a11813ea801ff6d75110ad09cd0824ddba17dfe17128ea0d5f68e6c5272" dependencies = [ "zerofrom-derive", ] diff --git a/Makefile b/Makefile index 9a2f13a..26c0f62 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -.PHONY: help install lint format test build pre-commit clean +.PHONY: help install lint format test build pre-commit clean integration-test help: ## Show this help message @grep -E '^[a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) | sort | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-15s\033[0m %s\n", $$1, $$2}' @@ -23,3 +23,6 @@ pre-commit: ## Run pre-commit hooks on all files clean: ## Remove Rust build artifacts cargo clean + +integration-test: ## Run integration tests (starts OGx, runs tests, tears down) + ./crates/agentic-server/tests/integration/run.sh diff --git a/crates/agentic-core/Cargo.toml b/crates/agentic-core/Cargo.toml index 281333d..c24301c 100644 --- a/crates/agentic-core/Cargo.toml +++ b/crates/agentic-core/Cargo.toml @@ -7,10 +7,11 @@ license.workspace = true repository.workspace = true [dependencies] +async-trait = "0.1" bytes.workspace = true futures.workspace = true http.workspace = true -reqwest = { workspace = true, features = ["default-tls", "stream"] } +reqwest = { workspace = true, features = ["default-tls", "json", "stream"] } serde.workspace = true serde_json.workspace = true thiserror.workspace = true diff --git a/crates/agentic-core/src/error.rs b/crates/agentic-core/src/error.rs index 011c6eb..980ee67 100644 --- a/crates/agentic-core/src/error.rs +++ b/crates/agentic-core/src/error.rs @@ -19,4 +19,19 @@ pub enum Error { #[error("{0}")] Config(String), + + #[error("store request failed")] + Store(#[source] reqwest::Error), + + #[error("store returned {status}: {body}")] + StoreResponse { status: u16, body: String }, + + #[error("vLLM proxy request failed")] + Proxy(#[source] reqwest::Error), + + #[error("vLLM returned {status}: {body}")] + ProxyResponse { status: u16, body: String }, + + #[error("agentic loop exceeded {max_iterations} iterations")] + MaxIterations { max_iterations: u32 }, } diff --git a/crates/agentic-core/src/lib.rs b/crates/agentic-core/src/lib.rs index 20877b6..42d4f0c 100644 --- a/crates/agentic-core/src/lib.rs +++ b/crates/agentic-core/src/lib.rs @@ -5,6 +5,7 @@ pub mod readiness; pub mod storage; pub mod types; pub mod utils; +pub mod vector_search; pub use storage::{ ConversationData, ConversationStore, DbPool, InOutItem, ItemKind, ResponseData, ResponseMetadata, ResponseStore, diff --git a/crates/agentic-core/src/vector_search/mod.rs b/crates/agentic-core/src/vector_search/mod.rs new file mode 100644 index 0000000..267186b --- /dev/null +++ b/crates/agentic-core/src/vector_search/mod.rs @@ -0,0 +1,11 @@ +pub mod ogx; +pub mod types; + +use async_trait::async_trait; + +use types::SearchResult; + +#[async_trait] +pub trait VectorSearch: Send + Sync { + async fn search(&self, store_id: &str, query: &str) -> Result, crate::error::Error>; +} diff --git a/crates/agentic-core/src/vector_search/ogx.rs b/crates/agentic-core/src/vector_search/ogx.rs new file mode 100644 index 0000000..88b99a6 --- /dev/null +++ b/crates/agentic-core/src/vector_search/ogx.rs @@ -0,0 +1,49 @@ +use async_trait::async_trait; +use tracing::debug; + +use super::types::{SearchResponse, SearchResult}; +use crate::error::Error; + +pub struct OgxStore { + base_url: String, + client: reqwest::Client, +} + +impl OgxStore { + #[must_use] + pub fn new(base_url: &str, client: reqwest::Client) -> Self { + let base_url = base_url.trim_end_matches('/').to_owned(); + Self { base_url, client } + } +} + +#[async_trait] +impl super::VectorSearch for OgxStore { + async fn search(&self, store_id: &str, query: &str) -> Result, Error> { + let url = format!("{}/v1/vector_stores/{store_id}/search", self.base_url); + debug!(%url, %query, "searching vector store via OGx"); + + let resp = self + .client + .post(&url) + .json(&serde_json::json!({ + "query": query, + "max_num_results": 10 + })) + .send() + .await + .map_err(Error::Store)?; + + let status = resp.status(); + if !status.is_success() { + let body = resp.text().await.unwrap_or_default(); + return Err(Error::StoreResponse { + status: status.as_u16(), + body, + }); + } + + let search_resp: SearchResponse = resp.json().await.map_err(Error::Store)?; + Ok(search_resp.data) + } +} diff --git a/crates/agentic-core/src/vector_search/types.rs b/crates/agentic-core/src/vector_search/types.rs new file mode 100644 index 0000000..c23e97f --- /dev/null +++ b/crates/agentic-core/src/vector_search/types.rs @@ -0,0 +1,79 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ResponseRequest { + pub model: String, + #[serde(default)] + pub input: Vec, + #[serde(default)] + pub stream: bool, + #[serde(default)] + pub tools: Vec, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub previous_response_id: Option, + #[serde(flatten)] + pub rest: serde_json::Map, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ToolConfig { + pub r#type: String, + #[serde(default)] + pub vector_store_ids: Option>, + #[serde(flatten)] + pub rest: serde_json::Map, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ResponseBody { + pub id: String, + #[serde(default)] + pub output: Vec, + #[serde(default)] + pub status: String, + #[serde(flatten)] + pub rest: serde_json::Map, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "type")] +pub enum VllmOutputItem { + #[serde(rename = "message")] + Message { + #[serde(flatten)] + fields: serde_json::Map, + }, + #[serde(rename = "function_call")] + FunctionCall { + id: String, + call_id: String, + name: String, + arguments: String, + #[serde(flatten)] + rest: serde_json::Map, + }, + #[serde(other)] + Other, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SearchResponse { + pub data: Vec, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SearchResult { + pub file_id: String, + pub filename: String, + pub score: f64, + #[serde(default)] + pub attributes: Option>, + #[serde(default)] + pub content: Vec, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ContentChunk { + pub r#type: String, + pub text: String, +} diff --git a/crates/agentic-server/Cargo.toml b/crates/agentic-server/Cargo.toml index cc9bbf0..a522ad8 100644 --- a/crates/agentic-server/Cargo.toml +++ b/crates/agentic-server/Cargo.toml @@ -9,9 +9,10 @@ repository.workspace = true [dependencies] agentic-core.workspace = true axum.workspace = true +bytes.workspace = true clap.workspace = true http.workspace = true -reqwest = { workspace = true, default-features = false, features = ["rustls-tls"] } +reqwest = { workspace = true, features = ["json", "stream"] } serde_json.workspace = true tokio.workspace = true tower-http.workspace = true @@ -19,12 +20,12 @@ tracing.workspace = true tracing-subscriber.workspace = true [dev-dependencies] -bytes.workspace = true criterion.workspace = true futures.workspace = true -reqwest = { workspace = true, features = ["json"] } +reqwest = { workspace = true, features = ["json", "multipart"] } serde_json.workspace = true tokio = { workspace = true, features = ["test-util"] } +tokio-stream = "0.1" [[bench]] name = "proxy_bench" diff --git a/crates/agentic-server/benches/proxy_bench.rs b/crates/agentic-server/benches/proxy_bench.rs index 7d924bc..32c41b2 100644 --- a/crates/agentic-server/benches/proxy_bench.rs +++ b/crates/agentic-server/benches/proxy_bench.rs @@ -1,4 +1,5 @@ use std::convert::Infallible; +use std::sync::Arc; use axum::body::Body; use axum::extract::Request; @@ -14,7 +15,9 @@ use tokio::runtime::Runtime; use agentic_core::config::Config; use agentic_core::proxy::ProxyState; +use agentic_core::vector_search::ogx::OgxStore; use agentic_server::app::{ServerConfig, build_router}; +use agentic_server::handler::AppState; fn bench_config(llm_url: &str) -> Config { Config { @@ -71,7 +74,14 @@ async fn spawn_llm() -> String { } async fn spawn_gateway(config: Config) -> String { - let state = ProxyState::new(config).unwrap(); + let proxy = ProxyState::new(config).unwrap(); + let client = reqwest::Client::new(); + let ogx_store = Arc::new(OgxStore::new("http://127.0.0.1:1", client)); + let state = Arc::new(AppState { + proxy, + max_iterations: 10, + vector_search: ogx_store, + }); let server_config = ServerConfig::from_env(); let router = build_router(state, &server_config); diff --git a/crates/agentic-server/src/app.rs b/crates/agentic-server/src/app.rs index 1c22a18..f028186 100644 --- a/crates/agentic-server/src/app.rs +++ b/crates/agentic-server/src/app.rs @@ -1,18 +1,17 @@ -use agentic_core::proxy::ProxyState; +use std::sync::Arc; + use axum::Router; use axum::routing::{get, post}; use http::HeaderValue; use tower_http::cors::{AllowOrigin, Any, CorsLayer}; -use crate::handler::{health, proxy_responses, ready}; +use crate::handler::{AppState, handle_responses, health, ready}; -/// Server-level configuration read from environment variables. pub struct ServerConfig { pub cors_allowed_origins: Vec, } impl ServerConfig { - /// Read `CORS_ALLOWED_ORIGINS` (comma-separated). Unset or empty = permissive. #[must_use] pub fn from_env() -> Self { let cors_allowed_origins = std::env::var("CORS_ALLOWED_ORIGINS") @@ -47,11 +46,11 @@ impl ServerConfig { } } -pub fn build_router(state: ProxyState, server_config: &ServerConfig) -> Router { +pub fn build_router(state: Arc, server_config: &ServerConfig) -> Router { Router::new() .route("/health", get(health)) .route("/ready", get(ready)) - .route("/v1/responses", post(proxy_responses)) + .route("/v1/responses", post(handle_responses)) .layer(server_config.cors_layer()) .with_state(state) } diff --git a/crates/agentic-server/src/handler.rs b/crates/agentic-server/src/handler.rs index 154cdd6..8b4c07f 100644 --- a/crates/agentic-server/src/handler.rs +++ b/crates/agentic-server/src/handler.rs @@ -1,22 +1,33 @@ -use agentic_core::proxy::{ProxyBody, ProxyRequest, ProxyResponse, ProxyState, error_response}; +use std::sync::Arc; + use axum::body::Body; use axum::extract::State; +use axum::http::{HeaderMap, StatusCode}; use axum::response::{IntoResponse, Response}; -use http::StatusCode; -use tracing::warn; +use bytes::Bytes; +use tracing::{debug, error, warn}; + +use agentic_core::proxy::ProxyState; +use agentic_core::vector_search::VectorSearch; +use agentic_core::vector_search::types::{ResponseBody, ResponseRequest, SearchResult, VllmOutputItem}; -const MAX_BODY_SIZE: usize = 10 * 1024 * 1024; +pub struct AppState { + pub proxy: ProxyState, + pub max_iterations: u32, + pub vector_search: Arc, +} -pub async fn health() -> impl IntoResponse { +#[allow(clippy::unused_async)] +pub async fn health() -> StatusCode { StatusCode::OK } -pub async fn ready(State(state): State) -> impl IntoResponse { - let base = state.config.llm_api_base.trim_end_matches('/'); +pub async fn ready(State(state): State>) -> impl IntoResponse { + let base = state.proxy.config.llm_api_base.trim_end_matches('/'); let url = format!("{base}/health"); let mut headers = reqwest::header::HeaderMap::new(); - if let Some(key) = state.config.openai_api_key.as_deref() { + if let Some(key) = state.proxy.config.openai_api_key.as_deref() { let trimmed = key.trim(); if !trimmed.is_empty() { if let Ok(v) = reqwest::header::HeaderValue::from_str(&format!("Bearer {trimmed}")) { @@ -47,33 +58,237 @@ pub async fn ready(State(state): State) -> impl IntoResponse { } } -fn convert_response(resp: ProxyResponse) -> Response { - let mut builder = Response::builder().status(resp.status); - for (name, value) in &resp.headers { - builder = builder.header(name, value); +pub async fn handle_responses(State(state): State>, headers: HeaderMap, body: Bytes) -> Response { + let request: ResponseRequest = match serde_json::from_slice(&body) { + Ok(r) => r, + Err(e) => { + return ( + StatusCode::BAD_REQUEST, + serde_json::json!({"error": {"message": format!("invalid request body: {e}")}}).to_string(), + ) + .into_response(); + } + }; + + let has_file_search = request.tools.iter().any(|t| t.r#type == "file_search"); + + if has_file_search { + match agentic_loop(&state, &headers, request).await { + Ok(resp) => resp, + Err(e) => { + error!(error = %e, "agentic loop failed"); + json_error_response(StatusCode::BAD_GATEWAY, &format!("agentic loop error: {e}")) + } + } + } else { + proxy_to_vllm(&state, &headers, &body, request.stream).await + } +} + +async fn agentic_loop( + state: &AppState, + client_headers: &HeaderMap, + mut request: ResponseRequest, +) -> Result { + let vector_store_ids: Vec = request + .tools + .iter() + .filter(|t| t.r#type == "file_search") + .filter_map(|t| t.vector_store_ids.clone()) + .flatten() + .collect(); + + for iteration in 0..state.max_iterations { + debug!(iteration, "agentic loop iteration"); + + let mut loop_request = build_vllm_request(state, client_headers); + + let mut body = serde_json::to_value(&request) + .map_err(|e| agentic_core::error::Error::Config(format!("failed to serialize request: {e}")))?; + if let Some(obj) = body.as_object_mut() { + obj.insert("stream".to_owned(), serde_json::Value::Bool(false)); + if let Some(serde_json::Value::Array(tools)) = obj.get_mut("tools") { + let had_file_search = tools + .iter() + .any(|t| t.get("type").and_then(serde_json::Value::as_str) == Some("file_search")); + tools.retain(|t| t.get("type").and_then(serde_json::Value::as_str) != Some("file_search")); + if had_file_search { + tools.push(serde_json::json!({ + "type": "function", + "name": "file_search", + "description": "Search uploaded files for relevant content. Use this when the user asks about documents or needs information from files.", + "parameters": { + "type": "object", + "properties": { + "query": { + "type": "string", + "description": "The search query to find relevant content in files" + } + }, + "required": ["query"] + } + })); + } + } + } + + loop_request = loop_request.json(&body); + + let resp = loop_request.send().await.map_err(agentic_core::error::Error::Proxy)?; + + let status = resp.status(); + if !status.is_success() { + let resp_body = resp.text().await.unwrap_or_default(); + return Ok(( + StatusCode::from_u16(status.as_u16()).unwrap_or(StatusCode::BAD_GATEWAY), + resp_body, + ) + .into_response()); + } + + let response_body: ResponseBody = resp.json().await.map_err(agentic_core::error::Error::Proxy)?; + + let tool_calls: Vec<_> = response_body + .output + .iter() + .filter_map(|item| match item { + VllmOutputItem::FunctionCall { + call_id, + name, + arguments, + .. + } if name == "file_search" => Some((call_id.clone(), arguments.clone())), + _ => None, + }) + .collect(); + + if tool_calls.is_empty() { + debug!(iteration, "no tool calls, returning final response"); + let final_json = serde_json::to_string(&response_body) + .unwrap_or_else(|_| r#"{"error":"serialization failed"}"#.to_owned()); + return Ok((StatusCode::OK, [("content-type", "application/json")], final_json).into_response()); + } + + for output_item in &response_body.output { + request + .input + .push(serde_json::to_value(output_item).unwrap_or_default()); + } + + for (call_id, arguments) in &tool_calls { + let query = extract_query(arguments); + debug!(%call_id, %query, "executing file_search tool call"); + + let results = execute_file_search(state, &vector_store_ids, &query).await; + + let tool_output = serde_json::json!({ + "type": "function_call_output", + "call_id": call_id, + "output": serde_json::to_string(&results).unwrap_or_default() + }); + request.input.push(tool_output); + } + + debug!( + iteration, + tool_calls = tool_calls.len(), + "fed tool results back, continuing loop" + ); } - match resp.body { - ProxyBody::Full(bytes) => builder.body(Body::from(bytes)).expect("valid response"), - ProxyBody::Stream(stream) => builder.body(Body::from_stream(stream)).expect("valid response"), + + warn!( + max_iterations = state.max_iterations, + "agentic loop reached max iterations" + ); + Err(agentic_core::error::Error::MaxIterations { + max_iterations: state.max_iterations, + }) +} + +async fn execute_file_search(state: &AppState, vector_store_ids: &[String], query: &str) -> Vec { + let mut all_results = Vec::new(); + for store_id in vector_store_ids { + match state.vector_search.search(store_id, query).await { + Ok(results) => all_results.extend(results), + Err(e) => { + warn!(%store_id, error = %e, "file_search failed for vector store"); + } + } } + all_results } -pub async fn proxy_responses(State(state): State, req: axum::extract::Request) -> Response { - let (parts, body) = req.into_parts(); +fn extract_query(arguments: &str) -> String { + serde_json::from_str::(arguments) + .ok() + .and_then(|v| v.get("query").and_then(serde_json::Value::as_str).map(String::from)) + .unwrap_or_default() +} - let Ok(body_bytes) = axum::body::to_bytes(body, MAX_BODY_SIZE).await else { - return convert_response(error_response( - StatusCode::PAYLOAD_TOO_LARGE, - "body_too_large", - "Request body too large", - )); - }; +async fn proxy_to_vllm(state: &AppState, client_headers: &HeaderMap, body: &Bytes, stream: bool) -> Response { + let req = build_vllm_request(state, client_headers).body(body.clone()); - let proxy_req = ProxyRequest { - headers: parts.headers, - body: body_bytes, - query: parts.uri.query().map(String::from), + let resp = match req.send().await { + Ok(r) => r, + Err(e) => { + error!(error = %e, "failed to connect to vLLM"); + return json_error_response(StatusCode::BAD_GATEWAY, &format!("vLLM connection failed: {e}")); + } }; - convert_response(agentic_core::proxy::proxy_request(proxy_req, &state).await) + let status = StatusCode::from_u16(resp.status().as_u16()).unwrap_or(StatusCode::BAD_GATEWAY); + + if stream { + let content_type = resp + .headers() + .get("content-type") + .and_then(|v| v.to_str().ok()) + .unwrap_or("text/event-stream") + .to_owned(); + + let stream = resp.bytes_stream(); + let body = Body::from_stream(stream); + + (status, [("content-type", content_type)], body).into_response() + } else { + let content_type = resp + .headers() + .get("content-type") + .and_then(|v| v.to_str().ok()) + .unwrap_or("application/json") + .to_owned(); + + match resp.bytes().await { + Ok(bytes) => (status, [("content-type", content_type)], bytes).into_response(), + Err(e) => json_error_response(StatusCode::BAD_GATEWAY, &format!("failed to read vLLM response: {e}")), + } + } +} + +fn build_vllm_request(state: &AppState, client_headers: &HeaderMap) -> reqwest::RequestBuilder { + let url = format!("{}/v1/responses", state.proxy.config.llm_api_base); + let mut req = state + .proxy + .non_stream_client + .post(&url) + .header("content-type", "application/json"); + + if let Some(auth) = client_headers.get(http::header::AUTHORIZATION) { + if let Ok(v) = auth.to_str() { + req = req.header("authorization", v); + } + } else if let Some(key) = &state.proxy.config.openai_api_key { + req = req.header("authorization", format!("Bearer {key}")); + } + + req +} + +fn json_error_response(status: StatusCode, message: &str) -> Response { + ( + status, + [("content-type", "application/json")], + serde_json::json!({"error": {"message": message}}).to_string(), + ) + .into_response() } diff --git a/crates/agentic-server/src/main.rs b/crates/agentic-server/src/main.rs index 216b510..f0e8a1a 100644 --- a/crates/agentic-server/src/main.rs +++ b/crates/agentic-server/src/main.rs @@ -21,6 +21,12 @@ struct CommonArgs { #[arg(long, default_value_t = 2.0, global = true)] llm_ready_interval_s: f64, + + #[arg(long, default_value = "http://localhost:8080", global = true)] + ogx_base_url: String, + + #[arg(long, default_value_t = 10, global = true)] + max_iterations: u32, } #[derive(Parser)] @@ -86,7 +92,14 @@ async fn main() -> Result<(), Error> { ) })?; let config = build_config(normalize_base_url(&base), &common); - server::run(config, &common.gateway_host, common.gateway_port).await + server::run( + config, + &common.gateway_host, + common.gateway_port, + &common.ogx_base_url, + common.max_iterations, + ) + .await } Some(Commands::Serve { model, port, llm_args }) => { if llm_api_base.is_some() { @@ -102,7 +115,15 @@ async fn main() -> Result<(), Error> { args.push(port.to_string()); args.extend(llm_args); - server::run_with_llm(config, &common.gateway_host, common.gateway_port, args).await + server::run_with_llm( + config, + &common.gateway_host, + common.gateway_port, + args, + &common.ogx_base_url, + common.max_iterations, + ) + .await } } } diff --git a/crates/agentic-server/src/server.rs b/crates/agentic-server/src/server.rs index e9ce317..b5bfcbd 100644 --- a/crates/agentic-server/src/server.rs +++ b/crates/agentic-server/src/server.rs @@ -1,14 +1,36 @@ +use std::sync::Arc; + use agentic_core::config::Config; use agentic_core::error::Error; use agentic_core::proxy::ProxyState; use agentic_core::readiness::wait_llm_ready; +use agentic_core::vector_search::ogx::OgxStore; use agentic_server::app::{ServerConfig, build_router}; +use agentic_server::handler::AppState; use tokio::net::TcpListener; use tracing::info; -async fn serve_gateway(config: Config, host: &str, port: u16) -> Result<(), Error> { +fn build_app_state(config: Config, ogx_base_url: &str, max_iterations: u32) -> Result, Error> { + let proxy = ProxyState::new(config)?; + let client = reqwest::Client::new(); + let ogx_store = Arc::new(OgxStore::new(ogx_base_url, client)); + + Ok(Arc::new(AppState { + proxy, + max_iterations, + vector_search: ogx_store, + })) +} + +async fn serve_gateway( + config: Config, + host: &str, + port: u16, + ogx_base_url: &str, + max_iterations: u32, +) -> Result<(), Error> { let addr = format!("{host}:{port}"); - let state = ProxyState::new(config)?; + let state = build_app_state(config, ogx_base_url, max_iterations)?; let server_config = ServerConfig::from_env(); let router = build_router(state, &server_config); let listener = TcpListener::bind(&addr).await?; @@ -22,10 +44,10 @@ async fn serve_gateway(config: Config, host: &str, port: u16) -> Result<(), Erro /// # Errors /// /// Returns an error if LLM readiness polling fails or the server cannot bind. -pub async fn run(config: Config, host: &str, port: u16) -> Result<(), Error> { +pub async fn run(config: Config, host: &str, port: u16, ogx_base_url: &str, max_iterations: u32) -> Result<(), Error> { wait_llm_ready(&config).await?; info!("LLM ready: {}", config.llm_api_base); - serve_gateway(config, host, port).await + serve_gateway(config, host, port, ogx_base_url, max_iterations).await } /// Spawn vLLM as a subprocess and run the gateway in the foreground. @@ -33,7 +55,14 @@ pub async fn run(config: Config, host: &str, port: u16) -> Result<(), Error> { /// # Errors /// /// Returns an error if vLLM fails to start or the gateway errors. -pub async fn run_with_llm(config: Config, host: &str, port: u16, llm_args: Vec) -> Result<(), Error> { +pub async fn run_with_llm( + config: Config, + host: &str, + port: u16, + llm_args: Vec, + ogx_base_url: &str, + max_iterations: u32, +) -> Result<(), Error> { let mut cmd = tokio::process::Command::new("python"); cmd.arg("-m").arg("vllm.entrypoints.openai.api_server"); cmd.args(&llm_args); @@ -61,7 +90,7 @@ pub async fn run_with_llm(config: Config, host: &str, port: u16, llm_args: Vec gateway, + gateway = serve_gateway(config, host, port, ogx_base_url, max_iterations) => gateway, status = child.wait() => { let status = status?; Err(Error::LlmProcessExited { diff --git a/crates/agentic-server/tests/agentic_loop_test.rs b/crates/agentic-server/tests/agentic_loop_test.rs new file mode 100644 index 0000000..fbbb5be --- /dev/null +++ b/crates/agentic-server/tests/agentic_loop_test.rs @@ -0,0 +1,175 @@ +#[allow(dead_code)] +mod common; + +use common::{spawn_ogx, spawn_vllm, spawn_vllm_with_tool_calls, start_gateway}; + +#[tokio::test] +async fn test_passthrough_no_tools() { + let (vllm_port, _h) = spawn_vllm().await; + let (ogx_port, _h2) = spawn_ogx().await; + let (gw_addr, _) = start_gateway(vllm_port, Some(ogx_port), None).await; + + let client = reqwest::Client::new(); + let resp = client + .post(format!("http://{gw_addr}/v1/responses")) + .json(&serde_json::json!({ + "model": "model-a", + "input": [{"role": "user", "content": "hello"}] + })) + .send() + .await + .unwrap(); + + assert_eq!(resp.status(), 200); + let body: serde_json::Value = resp.json().await.unwrap(); + assert_eq!(body["id"], "resp_test"); +} + +#[tokio::test] +async fn test_single_file_search() { + let tool_call_response = serde_json::json!({ + "id": "resp_1", + "object": "response", + "status": "completed", + "output": [{ + "type": "function_call", + "id": "fc_1", + "call_id": "call_1", + "name": "file_search", + "arguments": "{\"query\": \"test query\"}", + "status": "completed" + }] + }); + + let final_response = serde_json::json!({ + "id": "resp_2", + "object": "response", + "status": "completed", + "output": [{ + "type": "message", + "role": "assistant", + "content": [{"type": "output_text", "text": "Based on the search results..."}] + }] + }); + + let (vllm_port, _h) = spawn_vllm_with_tool_calls(vec![tool_call_response, final_response]).await; + let (ogx_port, _h2) = spawn_ogx().await; + let (gw_addr, _) = start_gateway(vllm_port, Some(ogx_port), None).await; + + let client = reqwest::Client::new(); + let resp = client + .post(format!("http://{gw_addr}/v1/responses")) + .json(&serde_json::json!({ + "model": "model-a", + "input": [{"role": "user", "content": "search for something"}], + "tools": [{"type": "file_search", "vector_store_ids": ["vs_123"]}] + })) + .send() + .await + .unwrap(); + + assert_eq!(resp.status(), 200); + let body: serde_json::Value = resp.json().await.unwrap(); + assert_eq!(body["id"], "resp_2"); + assert_eq!(body["output"][0]["type"], "message"); +} + +#[tokio::test] +async fn test_multi_turn_tool_calls() { + let turn1 = serde_json::json!({ + "id": "resp_1", + "object": "response", + "status": "completed", + "output": [{ + "type": "function_call", + "id": "fc_1", + "call_id": "call_1", + "name": "file_search", + "arguments": "{\"query\": \"first query\"}", + "status": "completed" + }] + }); + + let turn2 = serde_json::json!({ + "id": "resp_2", + "object": "response", + "status": "completed", + "output": [{ + "type": "function_call", + "id": "fc_2", + "call_id": "call_2", + "name": "file_search", + "arguments": "{\"query\": \"second query\"}", + "status": "completed" + }] + }); + + let final_resp = serde_json::json!({ + "id": "resp_3", + "object": "response", + "status": "completed", + "output": [{ + "type": "message", + "role": "assistant", + "content": [{"type": "output_text", "text": "final answer"}] + }] + }); + + let (vllm_port, _h) = spawn_vllm_with_tool_calls(vec![turn1, turn2, final_resp]).await; + let (ogx_port, _h2) = spawn_ogx().await; + let (gw_addr, _) = start_gateway(vllm_port, Some(ogx_port), None).await; + + let client = reqwest::Client::new(); + let resp = client + .post(format!("http://{gw_addr}/v1/responses")) + .json(&serde_json::json!({ + "model": "model-a", + "input": [{"role": "user", "content": "multi-turn search"}], + "tools": [{"type": "file_search", "vector_store_ids": ["vs_123"]}] + })) + .send() + .await + .unwrap(); + + assert_eq!(resp.status(), 200); + let body: serde_json::Value = resp.json().await.unwrap(); + assert_eq!(body["id"], "resp_3"); +} + +#[tokio::test] +async fn test_max_iterations_reached() { + let tool_call = serde_json::json!({ + "id": "resp_loop", + "object": "response", + "status": "completed", + "output": [{ + "type": "function_call", + "id": "fc_loop", + "call_id": "call_loop", + "name": "file_search", + "arguments": "{\"query\": \"infinite loop\"}", + "status": "completed" + }] + }); + + let (vllm_port, _h) = spawn_vllm_with_tool_calls(vec![tool_call]).await; + let (ogx_port, _h2) = spawn_ogx().await; + let (gw_addr, _) = start_gateway(vllm_port, Some(ogx_port), None).await; + + let client = reqwest::Client::new(); + let resp = client + .post(format!("http://{gw_addr}/v1/responses")) + .json(&serde_json::json!({ + "model": "model-a", + "input": [{"role": "user", "content": "search forever"}], + "tools": [{"type": "file_search", "vector_store_ids": ["vs_123"]}] + })) + .send() + .await + .unwrap(); + + assert_eq!(resp.status(), 502); + let body: serde_json::Value = resp.json().await.unwrap(); + let msg = body["error"]["message"].as_str().unwrap_or(""); + assert!(msg.contains("exceeded"), "expected max iterations error, got: {msg}"); +} diff --git a/crates/agentic-server/tests/common/mod.rs b/crates/agentic-server/tests/common/mod.rs new file mode 100644 index 0000000..2cf0d75 --- /dev/null +++ b/crates/agentic-server/tests/common/mod.rs @@ -0,0 +1,231 @@ +use std::convert::Infallible; +use std::sync::Arc; +use std::sync::atomic::{AtomicUsize, Ordering}; + +use axum::Router; +use axum::body::Body; +use axum::extract::Request; +use axum::response::{IntoResponse, Response}; +use axum::routing::{get, post}; +use bytes::Bytes; +use futures::stream; +use http::StatusCode; +use tokio::net::TcpListener; + +use agentic_core::config::Config; +use agentic_core::proxy::ProxyState; +use agentic_core::vector_search::ogx::OgxStore; +use agentic_server::handler::AppState; + +fn test_config(llm_port: u16, api_key: Option<&str>) -> Config { + Config { + llm_api_base: format!("http://127.0.0.1:{llm_port}"), + openai_api_key: api_key.map(String::from), + llm_ready_timeout_s: 5.0, + llm_ready_interval_s: 0.1, + } +} + +pub async fn start_gateway(vllm_port: u16, ogx_port: Option, api_key: Option<&str>) -> (String, u16) { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let port = listener.local_addr().unwrap().port(); + let addr = format!("127.0.0.1:{port}"); + + let ogx_base = match ogx_port { + Some(p) => format!("http://127.0.0.1:{p}"), + None => "http://127.0.0.1:1".to_owned(), + }; + + let config = test_config(vllm_port, api_key); + let proxy = ProxyState::new(config).unwrap(); + let client = reqwest::Client::new(); + let ogx_store = Arc::new(OgxStore::new(&ogx_base, client)); + + let state = Arc::new(AppState { + proxy, + max_iterations: 10, + vector_search: ogx_store, + }); + + let server_config = agentic_server::app::ServerConfig::from_env(); + let router = agentic_server::app::build_router(state, &server_config); + + tokio::spawn(async move { + axum::serve(listener, router).await.unwrap(); + }); + + (addr, port) +} + +async fn health_handler() -> impl IntoResponse { + StatusCode::OK +} + +async fn responses_handler(req: Request) -> Response { + let headers = req.headers().clone(); + let body_bytes = axum::body::to_bytes(req.into_body(), 10 * 1024 * 1024) + .await + .unwrap_or_default(); + + let body: serde_json::Value = serde_json::from_slice(&body_bytes).unwrap_or_default(); + + if body + .get("echo_auth") + .and_then(serde_json::Value::as_bool) + .unwrap_or(false) + { + let auth = headers.get("authorization").and_then(|v| v.to_str().ok()).unwrap_or(""); + let resp_body = serde_json::json!({"authorization": auth}); + return ( + StatusCode::OK, + [("content-type", "application/json")], + serde_json::to_string(&resp_body).unwrap(), + ) + .into_response(); + } + + if body.get("force_error").and_then(serde_json::Value::as_u64) == Some(429) { + return ( + StatusCode::TOO_MANY_REQUESTS, + [("content-type", "application/json")], + r#"{"error":{"message":"rate limited","code":"rate_limit"}}"#, + ) + .into_response(); + } + + if body.get("stream").and_then(serde_json::Value::as_bool).unwrap_or(false) { + let chunks: Vec> = vec![ + Ok(Bytes::from( + "data: {\"type\":\"response.output_text.delta\",\"delta\":\"hello\"}\n\n", + )), + Ok(Bytes::from("data: [DONE]\n\n")), + ]; + let body = Body::from_stream(stream::iter(chunks)); + return ( + StatusCode::OK, + [("content-type", "text/event-stream; charset=utf-8")], + body, + ) + .into_response(); + } + + let out = r#"{"id":"resp_test","object":"response","status":"completed","output":[]}"#; + (StatusCode::OK, [("content-type", "application/json")], out).into_response() +} + +pub async fn spawn_vllm() -> (u16, tokio::task::JoinHandle<()>) { + let app = Router::new() + .route("/health", get(health_handler)) + .route("/v1/responses", post(responses_handler)); + + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let port = listener.local_addr().unwrap().port(); + let handle = tokio::spawn(async move { + axum::serve(listener, app).await.unwrap(); + }); + + (port, handle) +} + +pub async fn spawn_mid_stream_failure_vllm() -> (u16, tokio::task::JoinHandle<()>) { + async fn handler(_req: Request) -> Response { + let (tx, rx) = tokio::sync::mpsc::channel::>(2); + tokio::spawn(async move { + let _ = tx + .send(Ok(Bytes::from( + "data: {\"type\":\"response.output_text.delta\",\"delta\":\"hello\"}\n\n", + ))) + .await; + tokio::time::sleep(std::time::Duration::from_millis(10)).await; + drop(tx); + }); + let stream = tokio_stream::wrappers::ReceiverStream::new(rx); + let body = Body::from_stream(stream); + ( + StatusCode::OK, + [("content-type", "text/event-stream; charset=utf-8")], + body, + ) + .into_response() + } + + let app = Router::new() + .route("/health", get(health_handler)) + .route("/v1/responses", post(handler)); + + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let port = listener.local_addr().unwrap().port(); + let handle = tokio::spawn(async move { + axum::serve(listener, app).await.unwrap(); + }); + + (port, handle) +} + +pub async fn spawn_vllm_with_tool_calls(responses: Vec) -> (u16, tokio::task::JoinHandle<()>) { + let responses = Arc::new(responses); + let counter = Arc::new(AtomicUsize::new(0)); + + let app = Router::new().route("/health", get(health_handler)).route( + "/v1/responses", + post({ + let responses = Arc::clone(&responses); + let counter = Arc::clone(&counter); + move |_req: Request| { + let responses = Arc::clone(&responses); + let counter = Arc::clone(&counter); + async move { + let idx = counter.fetch_add(1, Ordering::SeqCst); + let resp = responses.get(idx).unwrap_or(responses.last().unwrap()); + ( + StatusCode::OK, + [("content-type", "application/json")], + serde_json::to_string(resp).unwrap(), + ) + .into_response() + } + } + }), + ); + + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let port = listener.local_addr().unwrap().port(); + let handle = tokio::spawn(async move { + axum::serve(listener, app).await.unwrap(); + }); + + (port, handle) +} + +pub async fn spawn_ogx() -> (u16, tokio::task::JoinHandle<()>) { + async fn search_handler(_req: Request) -> Response { + let body = serde_json::json!({ + "object": "vector_store.search_results.page", + "search_query": ["test query"], + "data": [{ + "file_id": "file_abc", + "filename": "doc.txt", + "score": 0.95, + "attributes": {}, + "content": [{"type": "text", "text": "relevant content from doc"}] + }], + "has_more": false + }); + ( + StatusCode::OK, + [("content-type", "application/json")], + serde_json::to_string(&body).unwrap(), + ) + .into_response() + } + + let app = Router::new().route("/v1/vector_stores/{store_id}/search", post(search_handler)); + + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let port = listener.local_addr().unwrap().port(); + let handle = tokio::spawn(async move { + axum::serve(listener, app).await.unwrap(); + }); + + (port, handle) +} diff --git a/crates/agentic-server/tests/cors_test.rs b/crates/agentic-server/tests/cors_test.rs index e10f05e..eb41fed 100644 --- a/crates/agentic-server/tests/cors_test.rs +++ b/crates/agentic-server/tests/cors_test.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use axum::Router; use axum::response::IntoResponse; use axum::routing::get; @@ -6,6 +8,8 @@ use tokio::net::TcpListener; use agentic_core::config::Config; use agentic_core::proxy::ProxyState; +use agentic_core::vector_search::ogx::OgxStore; +use agentic_server::handler::AppState; fn test_config(llm_url: &str) -> Config { Config { @@ -27,7 +31,14 @@ async fn spawn_mock_llm() -> (String, tokio::task::JoinHandle<()>) { } async fn spawn_gateway(config: Config) -> (String, tokio::task::JoinHandle<()>) { - let state = ProxyState::new(config).unwrap(); + let proxy = ProxyState::new(config).unwrap(); + let client = reqwest::Client::new(); + let ogx_store = Arc::new(OgxStore::new("http://127.0.0.1:1", client)); + let state = Arc::new(AppState { + proxy, + max_iterations: 10, + vector_search: ogx_store, + }); let server_config = agentic_server::app::ServerConfig::from_env(); let router = agentic_server::app::build_router(state, &server_config); let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); diff --git a/crates/agentic-server/tests/health_test.rs b/crates/agentic-server/tests/health_test.rs index 180d65f..f0c92dd 100644 --- a/crates/agentic-server/tests/health_test.rs +++ b/crates/agentic-server/tests/health_test.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use axum::Router; use axum::response::IntoResponse; use axum::routing::get; @@ -6,6 +8,8 @@ use tokio::net::TcpListener; use agentic_core::config::Config; use agentic_core::proxy::ProxyState; +use agentic_core::vector_search::ogx::OgxStore; +use agentic_server::handler::AppState; fn test_config(llm_url: &str) -> Config { Config { @@ -34,7 +38,14 @@ async fn spawn_mock_llm() -> (String, tokio::task::JoinHandle<()>) { } async fn spawn_gateway(config: Config) -> (String, tokio::task::JoinHandle<()>) { - let state = ProxyState::new(config).unwrap(); + let proxy = ProxyState::new(config).unwrap(); + let client = reqwest::Client::new(); + let ogx_store = Arc::new(OgxStore::new("http://127.0.0.1:1", client)); + let state = Arc::new(AppState { + proxy, + max_iterations: 10, + vector_search: ogx_store, + }); let server_config = agentic_server::app::ServerConfig::from_env(); let router = agentic_server::app::build_router(state, &server_config); let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); @@ -51,20 +62,18 @@ async fn test_health_returns_200() { let config = test_config(&llm_url); let (gw_url, _h2) = spawn_gateway(config).await; - let resp = reqwest::get(format!("{gw_url}/health")).await.unwrap(); + let client = reqwest::Client::new(); + let resp = client.get(format!("{gw_url}/health")).send().await.unwrap(); assert_eq!(resp.status(), 200); } #[tokio::test] async fn test_health_returns_200_even_when_llm_down() { - let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); - let dead_addr = listener.local_addr().unwrap(); - drop(listener); - - let config = test_config_no_key(&format!("http://{dead_addr}")); - let (gw_url, _h2) = spawn_gateway(config).await; + let config = test_config_no_key("http://127.0.0.1:1"); + let (gw_url, _h) = spawn_gateway(config).await; - let resp = reqwest::get(format!("{gw_url}/health")).await.unwrap(); + let client = reqwest::Client::new(); + let resp = client.get(format!("{gw_url}/health")).send().await.unwrap(); assert_eq!(resp.status(), 200); } @@ -74,19 +83,17 @@ async fn test_ready_returns_200_when_llm_healthy() { let config = test_config(&llm_url); let (gw_url, _h2) = spawn_gateway(config).await; - let resp = reqwest::get(format!("{gw_url}/ready")).await.unwrap(); + let client = reqwest::Client::new(); + let resp = client.get(format!("{gw_url}/ready")).send().await.unwrap(); assert_eq!(resp.status(), 200); } #[tokio::test] async fn test_ready_returns_503_when_llm_unreachable() { - let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); - let dead_addr = listener.local_addr().unwrap(); - drop(listener); - - let config = test_config_no_key(&format!("http://{dead_addr}")); - let (gw_url, _h2) = spawn_gateway(config).await; + let config = test_config_no_key("http://127.0.0.1:1"); + let (gw_url, _h) = spawn_gateway(config).await; - let resp = reqwest::get(format!("{gw_url}/ready")).await.unwrap(); + let client = reqwest::Client::new(); + let resp = client.get(format!("{gw_url}/ready")).send().await.unwrap(); assert_eq!(resp.status(), 503); } diff --git a/crates/agentic-server/tests/integration/ogx-config.yaml b/crates/agentic-server/tests/integration/ogx-config.yaml new file mode 100644 index 0000000..22cabf4 --- /dev/null +++ b/crates/agentic-server/tests/integration/ogx-config.yaml @@ -0,0 +1,55 @@ +version: 2 +distro_name: agentic-api-test + +apis: + - inference + - files + - vector_io + - file_processors + +providers: + inference: + - provider_id: sentence-transformers + provider_type: inline::sentence-transformers + config: {} + + files: + - provider_id: localfs + provider_type: inline::localfs + config: + storage_dir: /tmp/ogx-test/files + metadata_store: + table_name: files_metadata + backend: sql_default + + vector_io: + - provider_id: faiss + provider_type: inline::faiss + config: + persistence: + namespace: vector_io::faiss + backend: kv_default + + file_processors: + - provider_id: auto + provider_type: inline::auto + config: {} + +storage: + backends: + kv_default: + type: kv_sqlite + db_path: /tmp/ogx-test/kvstore.db + sql_default: + type: sql_sqlite + db_path: /tmp/ogx-test/sql_store.db + stores: + metadata: + namespace: registry + backend: kv_default + inference: + table_name: inference_store + backend: sql_default + vector_stores: + table_name: vector_store_metadata + backend: sql_default diff --git a/crates/agentic-server/tests/integration/run.sh b/crates/agentic-server/tests/integration/run.sh new file mode 100755 index 0000000..b588f73 --- /dev/null +++ b/crates/agentic-server/tests/integration/run.sh @@ -0,0 +1,46 @@ +#!/usr/bin/env bash +set -euo pipefail + +OGX_PORT="${OGX_PORT:-8321}" +OGX_PID="" + +cleanup() { + if [ -n "$OGX_PID" ] && kill -0 "$OGX_PID" 2>/dev/null; then + echo "Stopping OGx (pid $OGX_PID)..." + kill "$OGX_PID" 2>/dev/null || true + wait "$OGX_PID" 2>/dev/null || true + fi +} +trap cleanup EXIT + +OGX_CMD="${OGX_CMD:-ogx}" +OGX_CONFIG="$(cd "$(dirname "$0")" && pwd)/ogx-config.yaml" + +echo "Starting OGx on port $OGX_PORT..." +HF_HUB_TRUST_REMOTE_CODE=1 $OGX_CMD run "$OGX_CONFIG" --port "$OGX_PORT" > /tmp/ogx-server.log 2>&1 & +OGX_PID=$! + +echo "Waiting for OGx to be ready..." +for i in $(seq 1 60); do + if curl -sf "http://localhost:$OGX_PORT/v1/health" > /dev/null 2>&1; then + echo "OGx is ready." + break + fi + if ! kill -0 "$OGX_PID" 2>/dev/null; then + echo "OGx process exited unexpectedly. Logs:" + cat /tmp/ogx-server.log + exit 1 + fi + sleep 1 +done + +if ! curl -sf "http://localhost:$OGX_PORT/v1/health" > /dev/null 2>&1; then + echo "OGx failed to start within 60s. Logs:" + cat /tmp/ogx-server.log + exit 1 +fi + +echo "Running integration tests..." +OGX_BASE_URL="http://localhost:$OGX_PORT" cargo test -p agentic-server --test integration_test -- --nocapture + +echo "Integration tests passed." diff --git a/crates/agentic-server/tests/integration_test.rs b/crates/agentic-server/tests/integration_test.rs new file mode 100644 index 0000000..6e09893 --- /dev/null +++ b/crates/agentic-server/tests/integration_test.rs @@ -0,0 +1,126 @@ +fn ogx_base_url() -> Option { + std::env::var("OGX_BASE_URL").ok() +} + +async fn find_embedding_model(client: &reqwest::Client, ogx_url: &str) -> (String, u64) { + let models_resp = client.get(format!("{ogx_url}/v1/models")).send().await.unwrap(); + let models: serde_json::Value = models_resp.json().await.unwrap(); + let embedding_model = models["data"] + .as_array() + .and_then(|arr| { + arr.iter() + .find(|m| m["custom_metadata"]["model_type"].as_str() == Some("embedding")) + }) + .expect("OGx should have at least one embedding model") + .clone(); + let model_id = embedding_model["id"].as_str().unwrap().to_owned(); + let dim = embedding_model["custom_metadata"]["embedding_dimension"] + .as_u64() + .unwrap(); + (model_id, dim) +} + +async fn create_vector_store(client: &reqwest::Client, ogx_url: &str, model_id: &str, dim: u64) -> String { + let vs_resp = client + .post(format!("{ogx_url}/v1/vector_stores")) + .json(&serde_json::json!({ + "name": "integration-test-docs", + "metadata": { "embedding_model": model_id, "embedding_dimension": dim } + })) + .send() + .await + .unwrap(); + assert!(vs_resp.status().is_success(), "Failed to create vector store"); + let vs: serde_json::Value = vs_resp.json().await.unwrap(); + vs["id"].as_str().unwrap().to_owned() +} + +async fn upload_and_attach(client: &reqwest::Client, ogx_url: &str, vs_id: &str) { + let file_content = "Rust enforces memory safety without a garbage collector through its ownership system with borrowing and lifetimes. The borrow checker ensures references do not outlive the data they point to."; + + let form = reqwest::multipart::Form::new().text("purpose", "assistants").part( + "file", + reqwest::multipart::Part::text(file_content.to_owned()) + .file_name("rust-memory-safety.txt") + .mime_str("text/plain") + .unwrap(), + ); + + let file_resp = client + .post(format!("{ogx_url}/v1/files")) + .multipart(form) + .send() + .await + .unwrap(); + assert!(file_resp.status().is_success(), "Failed to upload file"); + + let file: serde_json::Value = file_resp.json().await.unwrap(); + let file_id = file["id"].as_str().unwrap(); + eprintln!("Uploaded file: {file_id}"); + + let attach_resp = client + .post(format!("{ogx_url}/v1/vector_stores/{vs_id}/files")) + .json(&serde_json::json!({"file_id": file_id})) + .send() + .await + .unwrap(); + assert!(attach_resp.status().is_success(), "Failed to attach file"); + + let attach: serde_json::Value = attach_resp.json().await.unwrap(); + let status = attach["status"].as_str().unwrap_or("unknown"); + assert_eq!( + status, + "completed", + "File attachment failed: {}", + attach + .get("last_error") + .map_or("none".to_owned(), std::string::ToString::to_string) + ); +} + +#[tokio::test] +async fn test_vector_search_with_ogx() { + let Some(ogx_url) = ogx_base_url() else { + eprintln!("Skipping: OGX_BASE_URL not set"); + return; + }; + + let client = reqwest::Client::new(); + + let (model_id, dim) = find_embedding_model(&client, &ogx_url).await; + eprintln!("Using embedding model: {model_id} (dim={dim})"); + + let vs_id = create_vector_store(&client, &ogx_url, &model_id, dim).await; + eprintln!("Created vector store: {vs_id}"); + + upload_and_attach(&client, &ogx_url, &vs_id).await; + + let search_resp = client + .post(format!("{ogx_url}/v1/vector_stores/{vs_id}/search")) + .json(&serde_json::json!({ + "query": "memory safety ownership", + "max_num_results": 2 + })) + .send() + .await + .unwrap(); + assert!(search_resp.status().is_success(), "Search failed"); + + let results: serde_json::Value = search_resp.json().await.unwrap(); + let data = results["data"].as_array().expect("search should return data array"); + assert!(!data.is_empty(), "search should return at least one result"); + + let top_result = &data[0]; + let score = top_result["score"].as_f64().unwrap_or(0.0); + assert!(score > 0.0, "top result should have a positive score"); + + let content = top_result["content"] + .as_array() + .and_then(|c| c.first()) + .and_then(|c| c["text"].as_str()) + .unwrap_or(""); + assert!(!content.is_empty(), "top result should have content text"); + + eprintln!("Search returned {} results, top score: {score:.3}", data.len()); + eprintln!("Top result: {content}"); +} diff --git a/crates/agentic-server/tests/proxy_test.rs b/crates/agentic-server/tests/proxy_test.rs new file mode 100644 index 0000000..4d06b40 --- /dev/null +++ b/crates/agentic-server/tests/proxy_test.rs @@ -0,0 +1,142 @@ +#[allow(dead_code)] +mod common; + +use common::{spawn_mid_stream_failure_vllm, spawn_vllm, start_gateway}; + +#[tokio::test] +async fn test_non_stream_passthrough() { + let (vllm_port, _h) = spawn_vllm().await; + let (gw_addr, _) = start_gateway(vllm_port, None, Some("env-vllm-key")).await; + + let client = reqwest::Client::new(); + let resp = client + .post(format!("http://{gw_addr}/v1/responses")) + .json(&serde_json::json!({ + "model": "model-a", + "input": [{"role": "user", "content": "hello"}] + })) + .send() + .await + .unwrap(); + + assert_eq!(resp.status(), 200); + let body: serde_json::Value = resp.json().await.unwrap(); + assert_eq!(body["id"], "resp_test"); +} + +#[tokio::test] +async fn test_stream_passthrough() { + let (vllm_port, _h) = spawn_vllm().await; + let (gw_addr, _) = start_gateway(vllm_port, None, Some("env-vllm-key")).await; + + let client = reqwest::Client::new(); + let resp = client + .post(format!("http://{gw_addr}/v1/responses")) + .json(&serde_json::json!({ + "model": "model-a", + "input": [{"role": "user", "content": "hello"}], + "stream": true + })) + .send() + .await + .unwrap(); + + assert_eq!(resp.status(), 200); + + let text = resp.text().await.unwrap(); + assert!(text.contains("data: [DONE]")); + assert!(text.contains("response.output_text.delta")); +} + +#[tokio::test] +async fn test_auth_injection() { + let (vllm_port, _h) = spawn_vllm().await; + let (gw_addr, _) = start_gateway(vllm_port, None, Some("env-vllm-key")).await; + + let client = reqwest::Client::new(); + let resp = client + .post(format!("http://{gw_addr}/v1/responses")) + .json(&serde_json::json!({"model": "model-a", "input": [], "echo_auth": true})) + .send() + .await + .unwrap(); + + assert_eq!(resp.status(), 200); + let body: serde_json::Value = resp.json().await.unwrap(); + assert_eq!(body["authorization"], "Bearer env-vllm-key"); +} + +#[tokio::test] +async fn test_client_auth_precedence() { + let (vllm_port, _h) = spawn_vllm().await; + let (gw_addr, _) = start_gateway(vllm_port, None, Some("env-vllm-key")).await; + + let client = reqwest::Client::new(); + let resp = client + .post(format!("http://{gw_addr}/v1/responses")) + .json(&serde_json::json!({"model": "model-a", "input": [], "echo_auth": true})) + .header("authorization", "Bearer client-token") + .send() + .await + .unwrap(); + + assert_eq!(resp.status(), 200); + let body: serde_json::Value = resp.json().await.unwrap(); + assert_eq!(body["authorization"], "Bearer client-token"); +} + +#[tokio::test] +async fn test_vllm_http_error_passthrough() { + let (vllm_port, _h) = spawn_vllm().await; + let (gw_addr, _) = start_gateway(vllm_port, None, Some("env-vllm-key")).await; + + let client = reqwest::Client::new(); + let resp = client + .post(format!("http://{gw_addr}/v1/responses")) + .json(&serde_json::json!({"model": "model-a", "input": [], "force_error": 429})) + .send() + .await + .unwrap(); + + assert_eq!(resp.status(), 429); + let body: serde_json::Value = resp.json().await.unwrap(); + assert_eq!(body["error"]["message"], "rate limited"); + assert_eq!(body["error"]["code"], "rate_limit"); +} + +#[tokio::test] +async fn test_mid_stream_failure_closes_cleanly() { + let (vllm_port, _h) = spawn_mid_stream_failure_vllm().await; + let (gw_addr, _) = start_gateway(vllm_port, None, Some("env-vllm-key")).await; + + let client = reqwest::Client::new(); + let resp = client + .post(format!("http://{gw_addr}/v1/responses")) + .json(&serde_json::json!({ + "model": "model-a", + "input": [], + "stream": true + })) + .send() + .await + .unwrap(); + + assert_eq!(resp.status(), 200); + let text = resp.text().await.unwrap_or_default(); + assert!(text.contains("response.output_text.delta")); +} + +#[tokio::test] +async fn test_connect_error_maps_to_502() { + let (gw_addr, _) = start_gateway(1, None, None).await; + + let client = reqwest::Client::new(); + let resp = client + .post(format!("http://{gw_addr}/v1/responses")) + .json(&serde_json::json!({"model": "model-a", "input": []})) + .send() + .await + .unwrap(); + + assert_eq!(resp.status(), 502); +} diff --git a/docs/architecture/index.md b/docs/architecture/index.md new file mode 100644 index 0000000..b7a3e7e --- /dev/null +++ b/docs/architecture/index.md @@ -0,0 +1,72 @@ +# Architecture + +## Overview + +The vLLM Agentic API is a Rust gateway that sits between clients and vLLM, adding stateful capabilities on top of vLLM's stateless Responses API. The gateway is structured as a three-crate workspace. + +```mermaid +graph TD + Client -->|POST /v1/responses| Gateway[agentic-server :9000] + Gateway -->|proxy| vLLM[vLLM :8000] + Gateway -.->|vector search| OGx[OGx :8080] +``` + +## Crate Structure + +| Crate | Role | +|-------|------| +| `agentic-core` | Framework-agnostic core: inference caller, storage, vector search traits, OGx client | +| `agentic-server` | Axum HTTP server: routes, handler, CLI, agentic loop | +| `agentic-praxis` | Reserved for Praxis gateway adapter | + +### agentic-core + +Pure async Rust with no framework dependency. Contains: + +- **Proxy** (`proxy.rs`) — HTTP client that forwards requests to vLLM with auth injection, header filtering, and streaming support +- **Readiness** (`readiness.rs`) — Polls vLLM's `/health` endpoint until ready +- **Storage** (`storage/`) — SQLx-based CRUD for conversations and responses (SQLite, PostgreSQL, MySQL) +- **Vector search** (`vector_search/`) — `VectorSearch` trait and OGx implementation for file_search tool calls +- **Types** (`types/`) — Serde structs for the Responses API IO types + +### agentic-server + +Axum-based HTTP server that wires everything together: + +- **Handler** (`handler.rs`) — Request routing: runs the agentic loop if `file_search` tools are present, otherwise proxies to vLLM +- **App** (`app.rs`) — Router with `/health`, `/ready`, `/v1/responses` routes and CORS +- **CLI** (`main.rs`) — Clap-based CLI with `--llm-api-base`, `--ogx-base-url`, `--max-iterations`, and a `serve` subcommand that spawns vLLM as a subprocess + +## Request Flow + +### Passthrough (no tools) + +``` +Client → Gateway → vLLM → Gateway → Client +``` + +The request is forwarded to vLLM unchanged. Streaming responses are proxied as SSE. + +### Agentic Loop (file_search) + +When the request includes `tools: [{type: "file_search", vector_store_ids: [...]}]`: + +1. Convert `file_search` to a `function` tool definition for vLLM +2. Send to vLLM (non-streaming, forced `stream: false`) +3. If vLLM returns `function_call` output items with `name: "file_search"`: + - Extract the query from the call arguments + - Search each vector store via OGx (`POST /v1/vector_stores/{id}/search`) + - Append the tool call output and search results to the input + - Go to step 2 +4. If no tool calls, return the final response to the client +5. If `max_iterations` is reached, return a 502 error + +## OGx Integration + +[OGx](https://github.com/meta-llama/llama-stack) provides the vector search backend via its OpenAI-compatible API: + +| Endpoint | Purpose | +|----------|---------| +| `POST /v1/vector_stores/{id}/search` | Execute vector search for file_search tool calls | + +The `OgxStore` struct implements the `VectorSearch` trait, so the handler depends on the trait, not OGx directly.