diff --git a/Cargo.lock b/Cargo.lock index b4b9552..eb7a8e8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -30,6 +30,8 @@ dependencies = [ name = "amqp" version = "0.1.0" dependencies = [ + "amqp_core", + "amqp_dashboard", "amqp_transport", "anyhow", "tokio", @@ -37,10 +39,31 @@ dependencies = [ "tracing-subscriber", ] +[[package]] +name = "amqp_core" +version = "0.1.0" +dependencies = [ + "parking_lot", + "uuid", +] + +[[package]] +name = "amqp_dashboard" +version = "0.1.0" +dependencies = [ + "amqp_core", + "axum", + "serde", + "tokio", + "tower-http", + "tracing", +] + [[package]] name = "amqp_transport" version = "0.1.0" dependencies = [ + "amqp_core", "anyhow", "nom", "once_cell", @@ -49,7 +72,6 @@ dependencies = [ "thiserror", "tokio", "tracing", - "tracing-subscriber", "uuid", ] @@ -71,12 +93,68 @@ dependencies = [ "backtrace", ] +[[package]] +name = "async-trait" +version = "0.1.52" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "061a7acccaa286c011ddc30970520b98fa40e00c9d644633fb26b5fc63a265e3" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "autocfg" version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" +[[package]] +name = "axum" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1dbbc81d15ddf33148615b778836b525dbae4e0731710294b2c484e80c4858f7" +dependencies = [ + "async-trait", + "axum-core", + "bitflags", + "bytes", + "futures-util", + "http", + "http-body", + "hyper", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "serde", + "serde_json", + "serde_urlencoded", + "sync_wrapper", + "tokio", + "tokio-util 0.6.9", + "tower", + "tower-http", + "tower-layer", + "tower-service", +] + +[[package]] +name = "axum-core" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ca6c0b218388a7ed6a8d25e94f7dea5498daaa4fd8c711fb3ff166041b06fda" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http", + "http-body", + "mime", +] + [[package]] name = "backtrace" version = "0.3.64" @@ -122,6 +200,61 @@ version = "1.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e78d4f1cc4ae33bbfc157ed5d5a5ef3bc29227303d595861deb238fcec4e9457" +[[package]] +name = "fnv" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" + +[[package]] +name = "form_urlencoded" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5fc25a87fa4fd2094bffb06925852034d90a17f0d1e05197d4956d3555752191" +dependencies = [ + "matches", + "percent-encoding", +] + +[[package]] +name = "futures-channel" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c3083ce4b914124575708913bca19bfe887522d6e2e6d0952943f5eac4a74010" +dependencies = [ + "futures-core", +] + +[[package]] +name = "futures-core" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c09fd04b7e4073ac7156a9539b57a484a8ea920f79c7c675d05d289ab6110d3" + +[[package]] +name = "futures-sink" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "21163e139fa306126e6eedaf49ecdb4588f939600f0b1e770f4205ee4b7fa868" + +[[package]] +name = "futures-task" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57c66a976bf5909d801bbef33416c41372779507e7a6b3a5e25e4749c58f776a" + +[[package]] +name = "futures-util" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8b7abd5d659d9b90c8cba917f6ec750a74e2dc23902ef9cd4cc8c8b22e6036a" +dependencies = [ + "futures-core", + "futures-task", + "pin-project-lite", + "pin-utils", +] + [[package]] name = "getrandom" version = "0.2.4" @@ -155,12 +288,66 @@ dependencies = [ ] [[package]] -name = "instant" -version = "0.1.12" +name = "http" +version = "0.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c" +checksum = "31f4c6746584866f0feabcc69893c5b51beef3831656a968ed7ae254cdc4fd03" dependencies = [ - "cfg-if", + "bytes", + "fnv", + "itoa", +] + +[[package]] +name = "http-body" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ff4f84919677303da5f147645dbea6b1881f368d03ac84e1dc09031ebd7b2c6" +dependencies = [ + "bytes", + "http", + "pin-project-lite", +] + +[[package]] +name = "http-range-header" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0bfe8eed0a9285ef776bb792479ea3834e8b94e13d615c2f66d03dd50a435a29" + +[[package]] +name = "httparse" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9100414882e15fb7feccb4897e5f0ff0ff1ca7d1a86a23208ada4d7a18e6c6c4" + +[[package]] +name = "httpdate" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4a1e36c821dbe04574f602848a19f742f4fb3c98d40449f11bcad18d6b17421" + +[[package]] +name = "hyper" +version = "0.14.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "043f0e083e9901b6cc658a77d1eb86f4fc650bbb977a4337dd63192826aa85dd" +dependencies = [ + "bytes", + "futures-channel", + "futures-core", + "futures-util", + "http", + "http-body", + "httparse", + "httpdate", + "itoa", + "pin-project-lite", + "socket2", + "tokio", + "tower-service", + "tracing", + "want", ] [[package]] @@ -172,6 +359,12 @@ dependencies = [ "either", ] +[[package]] +name = "itoa" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1aab8fc367588b89dcee83ab0fd66b72b50b72fa1904d7095045ace2b0c81c35" + [[package]] name = "jetscii" version = "0.5.1" @@ -208,12 +401,49 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "matchers" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558" +dependencies = [ + "regex-automata", +] + +[[package]] +name = "matches" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a3e378b66a060d48947b590737b30a1be76706c8dd7b8ba0f2fe3989c68a853f" + +[[package]] +name = "matchit" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9376a4f0340565ad675d11fc1419227faf5f60cd7ac9cb2e7185a471f30af833" + [[package]] name = "memchr" version = "2.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "308cc39be01b73d0d18f82a0e7b2a3df85245f84af96fdddc5d202d27e47b86a" +[[package]] +name = "mime" +version = "0.3.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d" + +[[package]] +name = "mime_guess" +version = "2.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4192263c238a5f0d0c6bfd21f336a313a4ce1c450542449ca191bb657b4642ef" +dependencies = [ + "mime", + "unicase", +] + [[package]] name = "minimal-lexical" version = "0.2.1" @@ -232,9 +462,9 @@ dependencies = [ [[package]] name = "mio" -version = "0.7.14" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8067b404fe97c70829f082dec8bcf4f71225d7eaea1d8645349cb76fa06205cc" +checksum = "ba272f85fa0b41fc91872be579b3bbe0f56b792aa361a380eb669469f68dafb2" dependencies = [ "libc", "log", @@ -299,27 +529,51 @@ checksum = "da32515d9f6e6e489d7bc9d84c71b060db7247dc035bbe44eac88cf87486d8d5" [[package]] name = "parking_lot" -version = "0.11.2" +version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99" +checksum = "87f5ec2493a61ac0506c0f4199f99070cbe83857b0337006a30f3e6719b8ef58" dependencies = [ - "instant", "lock_api", "parking_lot_core", ] [[package]] name = "parking_lot_core" -version = "0.8.5" +version = "0.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d76e8e1493bcac0d2766c42737f34458f1c8c50c0d23bcb24ea953affb273216" +checksum = "28141e0cc4143da2443301914478dc976a61ffdb3f043058310c70df2fed8954" dependencies = [ "cfg-if", - "instant", "libc", "redox_syscall", "smallvec", - "winapi", + "windows-sys", +] + +[[package]] +name = "percent-encoding" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e" + +[[package]] +name = "pin-project" +version = "1.0.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "58ad3879ad3baf4e44784bc6a718a8698867bb991f8ce24d1bcbe2cfb4c3a75e" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.0.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "744b6f092ba29c3650faf274db506afd39944f48420f6c86b17cfe0ee1cb36bb" +dependencies = [ + "proc-macro2", + "quote", + "syn", ] [[package]] @@ -328,6 +582,12 @@ version = "0.2.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e280fbe77cc62c91527259e9442153f4688736748d24660126286329742b4c6c" +[[package]] +name = "pin-utils" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" + [[package]] name = "ppv-lite86" version = "0.2.16" @@ -412,6 +672,15 @@ dependencies = [ "regex-syntax", ] +[[package]] +name = "regex-automata" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" +dependencies = [ + "regex-syntax", +] + [[package]] name = "regex-syntax" version = "0.6.25" @@ -424,12 +693,61 @@ version = "0.1.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ef03e0a2b150c7a90d01faf6254c9c48a41e95fb2a8c2ac1c6f0d2b9aefc342" +[[package]] +name = "ryu" +version = "1.0.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73b4b750c782965c211b42f022f59af1fbceabdd026623714f104152f1ec149f" + [[package]] name = "scopeguard" version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" +[[package]] +name = "serde" +version = "1.0.136" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ce31e24b01e1e524df96f1c2fdd054405f8d7376249a5110886fb4b658484789" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_derive" +version = "1.0.136" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08597e7152fcd306f41838ed3e37be9eaeed2b61c42e2117266a554fab4662f9" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "serde_json" +version = "1.0.79" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e8d9fa5c3b304765ce1fd9c4c8a3de2c8db365a5b91be52f186efc675681d95" +dependencies = [ + "itoa", + "ryu", + "serde", +] + +[[package]] +name = "serde_urlencoded" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd" +dependencies = [ + "form_urlencoded", + "itoa", + "ryu", + "serde", +] + [[package]] name = "sharded-slab" version = "0.1.4" @@ -454,6 +772,16 @@ version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f2dd574626839106c320a323308629dcb1acfc96e32a8cba364ddc61ac23ee83" +[[package]] +name = "socket2" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "66d72b759436ae32898a2af0a14218dbf55efde3feeb170eb623637db85ee1e0" +dependencies = [ + "libc", + "winapi", +] + [[package]] name = "strong-xml" version = "0.6.3" @@ -489,6 +817,12 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "sync_wrapper" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "20518fe4a4c9acf048008599e464deb21beeae3d3578418951a189c235a7a9a8" + [[package]] name = "thiserror" version = "1.0.30" @@ -520,9 +854,9 @@ dependencies = [ [[package]] name = "tokio" -version = "1.16.1" +version = "1.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c27a64b625de6d309e8c57716ba93021dccf1b3b5c97edd6d3dd2d2135afc0a" +checksum = "2af73ac49756f3f7c01172e34a23e5d0216f6c32333757c2c61feb2bbff5a5ee" dependencies = [ "bytes", "libc", @@ -533,6 +867,7 @@ dependencies = [ "parking_lot", "pin-project-lite", "signal-hook-registry", + "socket2", "tokio-macros", "winapi", ] @@ -549,12 +884,94 @@ dependencies = [ ] [[package]] -name = "tracing" -version = "0.1.30" +name = "tokio-util" +version = "0.6.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2d8d93354fe2a8e50d5953f5ae2e47a3fc2ef03292e7ea46e3cc38f549525fb9" +checksum = "9e99e1983e5d376cd8eb4b66604d2e99e79f5bd988c3055891dcd8c9e2604cc0" +dependencies = [ + "bytes", + "futures-core", + "futures-sink", + "log", + "pin-project-lite", + "tokio", +] + +[[package]] +name = "tokio-util" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "64910e1b9c1901aaf5375561e35b9c057d95ff41a44ede043a03e09279eabaf1" +dependencies = [ + "bytes", + "futures-core", + "futures-sink", + "log", + "pin-project-lite", + "tokio", +] + +[[package]] +name = "tower" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a89fd63ad6adf737582df5db40d286574513c69a11dac5214dc3b5603d6713e" +dependencies = [ + "futures-core", + "futures-util", + "pin-project", + "pin-project-lite", + "tokio", + "tokio-util 0.7.0", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower-http" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2bb284cac1883d54083a0edbdc9cabf931dfed87455f8c7266c01ece6394a43a" +dependencies = [ + "bitflags", + "bytes", + "futures-core", + "futures-util", + "http", + "http-body", + "http-range-header", + "httpdate", + "mime", + "mime_guess", + "percent-encoding", + "pin-project-lite", + "tokio", + "tokio-util 0.7.0", + "tower-layer", + "tower-service", +] + +[[package]] +name = "tower-layer" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "343bc9466d3fe6b0f960ef45960509f84480bf4fd96f92901afe7ff3df9d3a62" + +[[package]] +name = "tower-service" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "360dfd1d6d30e05fda32ace2c8c70e9c0a9da713275777f5a4dbb8a1893930c6" + +[[package]] +name = "tracing" +version = "0.1.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6c650a8ef0cd2dd93736f033d21cbd1224c5a967aa0c258d00fcf7dafef9b9f" dependencies = [ "cfg-if", + "log", "pin-project-lite", "tracing-attributes", "tracing-core", @@ -599,13 +1016,32 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "74786ce43333fcf51efe947aed9718fbe46d5c7328ec3f1029e818083966d9aa" dependencies = [ "ansi_term", + "lazy_static", + "matchers", + "regex", "sharded-slab", "smallvec", "thread_local", + "tracing", "tracing-core", "tracing-log", ] +[[package]] +name = "try-lock" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642" + +[[package]] +name = "unicase" +version = "2.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50f37be617794602aabbeee0be4f259dc1778fabe05e2d67ee8f79326d5cb4f6" +dependencies = [ + "version_check", +] + [[package]] name = "unicode-xid" version = "0.2.2" @@ -630,6 +1066,16 @@ version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" +[[package]] +name = "want" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ce8a968cb1cd110d136ff8b819a556d6fb6d919363c61534f6860c7eb172ba0" +dependencies = [ + "log", + "try-lock", +] + [[package]] name = "wasi" version = "0.10.2+wasi-snapshot-preview1" @@ -658,6 +1104,49 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "windows-sys" +version = "0.32.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3df6e476185f92a12c072be4a189a0210dcdcf512a1891d6dff9edb874deadc6" +dependencies = [ + "windows_aarch64_msvc", + "windows_i686_gnu", + "windows_i686_msvc", + "windows_x86_64_gnu", + "windows_x86_64_msvc", +] + +[[package]] +name = "windows_aarch64_msvc" +version = "0.32.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8e92753b1c443191654ec532f14c199742964a061be25d77d7a96f09db20bf5" + +[[package]] +name = "windows_i686_gnu" +version = "0.32.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a711c68811799e017b6038e0922cb27a5e2f43a2ddb609fe0b6f3eeda9de615" + +[[package]] +name = "windows_i686_msvc" +version = "0.32.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "146c11bb1a02615db74680b32a68e2d61f553cc24c4eb5b4ca10311740e44172" + +[[package]] +name = "windows_x86_64_gnu" +version = "0.32.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c912b12f7454c6620635bbff3450962753834be2a594819bd5e945af18ec64bc" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.32.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "504a2476202769977a040c6364301a3f65d0cc9e3fb08600b2bda150a0488316" + [[package]] name = "xmlparser" version = "0.13.3" diff --git a/Cargo.toml b/Cargo.toml index fefdafd..9492d22 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,5 @@ [workspace] -members = [".", "./amqp_transport", "xtask"] +members = [".", "amqp_core", "amqp_dashboard", "amqp_transport","xtask"] [package] name = "amqp" @@ -10,7 +10,9 @@ edition = "2021" [dependencies] anyhow = "1.0.53" +amqp_core = { path = "./amqp_core" } +amqp_dashboard = { path = "./amqp_dashboard" } amqp_transport = { path = "./amqp_transport" } tokio = { version = "1.16.1", features = ["full"] } tracing = "0.1.30" -tracing-subscriber = "0.3.8" +tracing-subscriber = { version = "0.3.8", features = ["env-filter"] } diff --git a/amqp_core/Cargo.toml b/amqp_core/Cargo.toml new file mode 100644 index 0000000..cec72f4 --- /dev/null +++ b/amqp_core/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "amqp_core" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +parking_lot = "0.12.0" +uuid = "0.8.2" diff --git a/amqp_core/src/lib.rs b/amqp_core/src/lib.rs new file mode 100644 index 0000000..0a5f487 --- /dev/null +++ b/amqp_core/src/lib.rs @@ -0,0 +1,59 @@ +use parking_lot::Mutex; +use std::collections::HashMap; +use std::net::SocketAddr; +use std::sync::Arc; +use uuid::Uuid; + +#[derive(Debug, Clone)] +pub struct GlobalData { + inner: Arc>, +} + +impl Default for GlobalData { + fn default() -> Self { + Self { + inner: Arc::new(Mutex::new(GlobalDataInner { + connections: HashMap::new(), + })), + } + } +} + +impl GlobalData { + pub fn lock(&self) -> parking_lot::MutexGuard<'_, GlobalDataInner> { + self.inner.lock() + } +} + +#[derive(Debug)] +pub struct GlobalDataInner { + pub connections: HashMap, +} + +pub type ConnectionHandle = Arc>; + +#[derive(Debug)] +pub struct Connection { + pub id: Uuid, + pub peer_addr: SocketAddr, + pub global_data: GlobalData, +} + +impl Connection { + pub fn new_handle( + id: Uuid, + peer_addr: SocketAddr, + global_data: GlobalData, + ) -> ConnectionHandle { + Arc::new(Mutex::new(Self { + id, + peer_addr, + global_data, + })) + } + + pub fn close(&self) { + let mut global_data = self.global_data.lock(); + global_data.connections.remove(&self.id); + } +} diff --git a/amqp_dashboard/Cargo.toml b/amqp_dashboard/Cargo.toml new file mode 100644 index 0000000..cd2d40d --- /dev/null +++ b/amqp_dashboard/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "amqp_dashboard" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +amqp_core = { path = "../amqp_core" } +axum = "0.4.5" +serde = { version = "1.0.136", features = ["derive"] } +tokio = { version = "1.17.0", features = ["full"] } +tower-http = { version = "0.2.3", features = ["fs"] } +tracing = "0.1.31" diff --git a/amqp_dashboard/assets/index.html b/amqp_dashboard/assets/index.html new file mode 100644 index 0000000..17f3185 --- /dev/null +++ b/amqp_dashboard/assets/index.html @@ -0,0 +1,19 @@ + + + + + + + AMQP Data + + + + +

AMQP Data

+

Connections

+
+
+ + + + \ No newline at end of file diff --git a/amqp_dashboard/assets/script.js b/amqp_dashboard/assets/script.js new file mode 100644 index 0000000..7485f2c --- /dev/null +++ b/amqp_dashboard/assets/script.js @@ -0,0 +1,41 @@ +const renderTable = (colNames, rows) => { + const table = document.createElement("table"); + + const headerRow = document.createElement("tr"); + + colNames.forEach((name) => { + const th = document.createElement("th"); + th.innerText = name; + + headerRow.append(th); + }); + table.append(headerRow); + + rows.forEach((row) => { + const contentRow = document.createElement("tr"); + row.forEach((cell) => { + const td = document.createElement("td"); + td.innerText = cell; + contentRow.append(td); + }); + table.append(contentRow); + }) + + return table; +} + +const renderConnections = (connections) => { + const wrapper = document.getElementById("connection-wrapper"); + + const table = renderTable(['Connection ID', 'Client Address'], connections.map((conn) => + [conn.id, conn.peer_addr])); + wrapper.replaceChildren(table) +} + +const refresh = async () => { + const fetched = await fetch('http://localhost:3000/api/data'); + const data = await fetched.json(); + renderConnections(data.connections); +} + +setInterval(refresh, 1000); diff --git a/amqp_dashboard/assets/style.css b/amqp_dashboard/assets/style.css new file mode 100644 index 0000000..53fa93b --- /dev/null +++ b/amqp_dashboard/assets/style.css @@ -0,0 +1,10 @@ +html { + font-family: arial, sans-serif; + margin: 10px; +} + +table, th, td { + border: 1px solid black; + border-collapse: collapse; + padding: 10px; +} \ No newline at end of file diff --git a/amqp_dashboard/src/lib.rs b/amqp_dashboard/src/lib.rs new file mode 100644 index 0000000..6ef3867 --- /dev/null +++ b/amqp_dashboard/src/lib.rs @@ -0,0 +1,78 @@ +use amqp_core::GlobalData; +use axum::body::{boxed, Full}; +use axum::response::{Html, IntoResponse, Response}; +use axum::routing::get; +use axum::{Json, Router}; +use serde::Serialize; +use tracing::info; + +const INDEX_HTML: &str = include_str!("../assets/index.html"); +const SCRIPT_JS: &str = include_str!("../assets/script.js"); +const STYLE_CSS: &str = include_str!("../assets/style.css"); + +pub async fn dashboard(global_data: GlobalData) { + let app = Router::new() + .route("/", get(get_index_html)) + .route("/script.js", get(get_script_js)) + .route("/style.css", get(get_style_css)) + .route("/api/data", get(move || get_data(global_data))); + + let socket_addr = "0.0.0.0:3000".parse().unwrap(); + + info!(%socket_addr, "Starting up dashboard on address"); + + axum::Server::bind(&socket_addr) + .serve(app.into_make_service()) + .await + .unwrap(); +} + +async fn get_index_html() -> impl IntoResponse { + info!("Requesting index.html"); + Html(INDEX_HTML) +} + +async fn get_script_js() -> Response { + Response::builder() + .header("content-type", "application/javascript") + .body(boxed(Full::from(SCRIPT_JS))) + .unwrap() +} + +async fn get_style_css() -> Response { + Response::builder() + .header("content-type", "text/css") + .body(boxed(Full::from(STYLE_CSS))) + .unwrap() +} + +#[derive(Serialize)] +struct Data { + connections: Vec, +} + +#[derive(Serialize)] +struct Connection { + id: String, + peer_addr: String, +} + +async fn get_data(global_data: GlobalData) -> impl IntoResponse { + let global_data = global_data.lock(); + + let connections = global_data + .connections + .values() + .map(|conn| { + let conn = conn.lock(); + Connection { + id: conn.id.to_string(), + peer_addr: conn.peer_addr.to_string(), + } + }) + .collect(); + + let data = Data { connections }; + + Json(data) +} diff --git a/amqp_transport/Cargo.toml b/amqp_transport/Cargo.toml index 7129bf7..743d328 100644 --- a/amqp_transport/Cargo.toml +++ b/amqp_transport/Cargo.toml @@ -6,6 +6,7 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +amqp_core = { path = "../amqp_core" } anyhow = { version = "1.0.53", features = ["backtrace"] } nom = "7.1.0" once_cell = "1.9.0" @@ -14,7 +15,6 @@ regex = "1.5.4" thiserror = "1.0.30" tokio = { version = "1.16.1", features = ["full"] } tracing = "0.1.30" -tracing-subscriber = "0.3.8" uuid = "0.8.2" [features] diff --git a/amqp_transport/src/connection.rs b/amqp_transport/src/connection.rs index 316d6ec..11829ea 100644 --- a/amqp_transport/src/connection.rs +++ b/amqp_transport/src/connection.rs @@ -26,15 +26,17 @@ pub struct Connection { max_frame_size: usize, heartbeat_delay: u16, channel_max: u16, + connection_handle: amqp_core::ConnectionHandle, } impl Connection { - pub fn new(stream: TcpStream) -> Self { + pub fn new(stream: TcpStream, connection_handle: amqp_core::ConnectionHandle) -> Self { Self { stream, max_frame_size: FRAME_SIZE_MIN_MAX, heartbeat_delay: HEARTBEAT_DELAY, channel_max: CHANNEL_MAX, + connection_handle, } } @@ -43,6 +45,9 @@ impl Connection { Ok(()) => {} Err(err) => error!(%err, "Error during processing of connection"), } + + let connection_handle = self.connection_handle.lock(); + connection_handle.close(); } pub async fn process_connection(&mut self) -> Result<()> { diff --git a/amqp_transport/src/lib.rs b/amqp_transport/src/lib.rs index d7b1fbe..2c07ab6 100644 --- a/amqp_transport/src/lib.rs +++ b/amqp_transport/src/lib.rs @@ -9,25 +9,34 @@ mod sasl; mod tests; use crate::connection::Connection; +use amqp_core::GlobalData; use anyhow::Result; use tokio::net; use tracing::{info, info_span, Instrument}; use uuid::Uuid; -pub async fn do_thing_i_guess() -> Result<()> { +pub async fn do_thing_i_guess(global_data: GlobalData) -> Result<()> { info!("Binding TCP listener..."); let listener = net::TcpListener::bind(("127.0.0.1", 5672)).await?; info!(addr = ?listener.local_addr()?, "Successfully bound TCP listener"); loop { - let (stream, _) = listener.accept().await?; + let (stream, peer_addr) = listener.accept().await?; let id = Uuid::from_bytes(rand::random()); info!(local_addr = ?stream.local_addr(), %id, "Accepted new connection"); let span = info_span!("client-connection", %id); - let connection = Connection::new(stream); + let connection_handle = + amqp_core::Connection::new_handle(id, peer_addr, global_data.clone()); + + let mut global_data = global_data.lock(); + global_data + .connections + .insert(id, connection_handle.clone()); + + let connection = Connection::new(stream, connection_handle); tokio::spawn(connection.start_connection_processing().instrument(span)); } diff --git a/src/main.rs b/src/main.rs index ce0ec92..24a6203 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,29 +1,37 @@ use anyhow::Result; use std::env; -use tracing::Level; +use tracing::{info_span, Instrument}; #[tokio::main] async fn main() -> Result<()> { - let mut level = Level::DEBUG; + let mut dashboard = false; for arg in env::args().skip(1) { match arg.as_str() { - "--trace" => level = Level::TRACE, + "--dashboard" => dashboard = true, "ignore-this-clippy" => eprintln!("yes please"), _ => {} } } - setup_tracing(level); - amqp_transport::do_thing_i_guess().await + setup_tracing(); + + let global_data = amqp_core::GlobalData::default(); + + if dashboard { + let dashboard_span = info_span!("dashboard"); + tokio::spawn(amqp_dashboard::dashboard(global_data.clone()).instrument(dashboard_span)); + } + + amqp_transport::do_thing_i_guess(global_data).await } -fn setup_tracing(level: Level) { +fn setup_tracing() { tracing_subscriber::fmt() .with_level(true) .with_timer(tracing_subscriber::fmt::time::time()) .with_ansi(true) .with_thread_names(true) - .with_max_level(level) + .with_env_filter("hyper=info,debug") .init() }