From 692671504969d476097e61bcb03b104547005d18 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartosz=20R=C3=B3=C5=BCa=C5=84ski?= Date: Tue, 19 Sep 2023 10:34:46 +0200 Subject: [PATCH 01/12] Post Service POC --- .gitmodules | 3 + Cargo.lock | 555 ++++++++++++++++++++++++++++++++++++- Cargo.toml | 2 +- service/Cargo.toml | 29 ++ service/api | 1 + service/build.rs | 4 + service/src/client.rs | 116 ++++++++ service/src/main.rs | 175 ++++++++++++ service/src/service.rs | 120 ++++++++ service/src/test_server.rs | 143 ++++++++++ 10 files changed, 1137 insertions(+), 11 deletions(-) create mode 100644 .gitmodules create mode 100644 service/Cargo.toml create mode 160000 service/api create mode 100644 service/build.rs create mode 100644 service/src/client.rs create mode 100644 service/src/main.rs create mode 100644 service/src/service.rs create mode 100644 service/src/test_server.rs diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 00000000..3d4297c5 --- /dev/null +++ b/.gitmodules @@ -0,0 +1,3 @@ +[submodule "service/api"] + path = service/api + url = https://github.com/spacemeshos/api.git diff --git a/Cargo.lock b/Cargo.lock index 48453e25..8561f1c4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -118,6 +118,12 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "anyhow" +version = "1.0.75" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4668cab20f66d8d020e1fbc0ebe47217433c1b6c8f2040faf858554e394ace6" + [[package]] name = "arrayref" version = "0.3.7" @@ -130,6 +136,39 @@ version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "96d30a06541fbafbc7f82ed10c06164cfbd2c401138f6addd8404629c4b16711" +[[package]] +name = "async-stream" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd56dd203fef61ac097dd65721a419ddccb106b2d2b70ba60a6b529f03961a51" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.29", +] + +[[package]] +name = "async-trait" +version = "0.1.73" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc00ceb34980c03614e35a3a4e218276a0a824e911d07651cd0d858a51e8c0f0" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.29", +] + [[package]] name = "atty" version = "0.2.14" @@ -147,6 +186,51 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" +[[package]] +name = "axum" +version = "0.6.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b829e4e32b91e643de6eafe82b1d90675f5874230191a4ffbc1b336dec4d6bf" +dependencies = [ + "async-trait", + "axum-core", + "bitflags 1.3.2", + "bytes", + "futures-util", + "http", + "http-body", + "hyper", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "rustversion", + "serde", + "sync_wrapper", + "tower", + "tower-layer", + "tower-service", +] + +[[package]] +name = "axum-core" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "759fa577a247914fd3f7f76d62972792636412fbfd634cd452f6a385a74d2d2c" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http", + "http-body", + "mime", + "rustversion", + "tower-layer", + "tower-service", +] + [[package]] name = "backtrace" version = "0.3.69" @@ -292,6 +376,12 @@ version = "1.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" +[[package]] +name = "bytes" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2bd12c1caf447e69cd4528f47f94d203fd2582878ecb9e9465484c4148a8223" + [[package]] name = "cast" version = "0.3.0" @@ -428,20 +518,19 @@ dependencies = [ [[package]] name = "clap" -version = "4.4.1" +version = "4.4.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7c8d502cbaec4595d2e7d5f61e318f05417bd2b66fdc3809498f0d3fdf0bea27" +checksum = "b1d7b8d5ec32af0fadc644bf1fd509a688c2103b185644bb1e29d164e0703136" dependencies = [ "clap_builder", "clap_derive", - "once_cell", ] [[package]] name = "clap_builder" -version = "4.4.1" +version = "4.4.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5891c7bc0edb3e1c2204fc5e94009affabeb1821c9e5fdc3959536c5c0bb984d" +checksum = "5179bb514e4d7c2051749d8fcefa2ed6d06a9f4e6d69faf3805f5d80b8cf8d56" dependencies = [ "anstream", "anstyle", @@ -451,9 +540,9 @@ dependencies = [ [[package]] name = "clap_derive" -version = "4.4.0" +version = "4.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c9fd1a5729c4548118d7d70ff234a44868d00489a4b6597b0b020918a0e91a1a" +checksum = "0862016ff20d69b84ef8247369fabf5c008a7417002411897d40ee1f4532b873" dependencies = [ "heck", "proc-macro2", @@ -521,7 +610,7 @@ dependencies = [ "anes", "cast", "ciborium", - "clap 4.4.1", + "clap 4.4.4", "criterion-plot", "is-terminal", "itertools", @@ -802,6 +891,12 @@ dependencies = [ "static_assertions", ] +[[package]] +name = "fixedbitset" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" + [[package]] name = "float-cmp" version = "0.9.0" @@ -963,6 +1058,25 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" +[[package]] +name = "h2" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91fc23aa11be92976ef4729127f1a74adf36d8436f7816b185d18df956790833" +dependencies = [ + "bytes", + "fnv", + "futures-core", + "futures-sink", + "futures-util", + "http", + "indexmap 1.9.3", + "slab", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "half" version = "1.8.2" @@ -1008,12 +1122,82 @@ version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" +[[package]] +name = "http" +version = "0.2.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd6effc99afb63425aff9b05836f029929e345a6148a14b7ecd5ab67af944482" +dependencies = [ + "bytes", + "fnv", + "itoa", +] + +[[package]] +name = "http-body" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d5f38f16d184e36f2408a55281cd658ecbd3ca05cce6d6510a176eca393e26d1" +dependencies = [ + "bytes", + "http", + "pin-project-lite", +] + +[[package]] +name = "httparse" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d897f394bad6a705d5f4104762e116a75639e470d80901eed05a860a95cb1904" + +[[package]] +name = "httpdate" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" + [[package]] name = "humantime" version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" +[[package]] +name = "hyper" +version = "0.14.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ffb1cfd654a8219eaef89881fdb3bb3b1cdc5fa75ded05d6933b2b382e395468" +dependencies = [ + "bytes", + "futures-channel", + "futures-core", + "futures-util", + "h2", + "http", + "http-body", + "httparse", + "httpdate", + "itoa", + "pin-project-lite", + "socket2 0.4.9", + "tokio", + "tower-service", + "tracing", + "want", +] + +[[package]] +name = "hyper-timeout" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" +dependencies = [ + "hyper", + "pin-project-lite", + "tokio", + "tokio-io-timeout", +] + [[package]] name = "iana-time-zone" version = "0.1.57" @@ -1113,7 +1297,7 @@ name = "initializer" version = "0.4.4" dependencies = [ "base64 0.21.3", - "clap 4.4.1", + "clap 4.4.4", "env_logger", "eyre", "post-rs", @@ -1223,6 +1407,12 @@ version = "0.4.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" +[[package]] +name = "matchit" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed1202b2a6f884ae56f04cff409ab315c5ce26b5e58d7412e484f01fd52f52ef" + [[package]] name = "memchr" version = "2.6.2" @@ -1247,6 +1437,12 @@ dependencies = [ "autocfg", ] +[[package]] +name = "mime" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" + [[package]] name = "minimal-lexical" version = "0.2.1" @@ -1262,6 +1458,17 @@ dependencies = [ "adler", ] +[[package]] +name = "mio" +version = "0.8.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "927a765cd3fc26206e66b296465fa9d3e5ab003e651c1b3c060e7956d96b19d2" +dependencies = [ + "libc", + "wasi", + "windows-sys", +] + [[package]] name = "mockall" version = "0.11.4" @@ -1289,6 +1496,12 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "multimap" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a" + [[package]] name = "nix" version = "0.26.4" @@ -1491,6 +1704,42 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "19b17cddbe7ec3f8bc800887bab5e717348c95ea2ca0b1bf0837fb964dc67099" +[[package]] +name = "percent-encoding" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b2a4787296e9989611394c33f193f676704af1686e70b8f8033ab5ba9a35a94" + +[[package]] +name = "petgraph" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1d3afd2628e69da2be385eb6f2fd57c8ac7977ceeff6dc166ff1657b0e386a9" +dependencies = [ + "fixedbitset", + "indexmap 2.0.0", +] + +[[package]] +name = "pin-project" +version = "1.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fda4ed1c6c173e3fc7a83629421152e01d7b1f9b7f65fb301e490e8cfc656422" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.29", +] + [[package]] name = "pin-project-lite" version = "0.2.13" @@ -1675,7 +1924,7 @@ dependencies = [ name = "profiler" version = "0.4.4" dependencies = [ - "clap 4.4.1", + "clap 4.4.4", "env_logger", "eyre", "hex", @@ -1708,6 +1957,60 @@ dependencies = [ "unarray", ] +[[package]] +name = "prost" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f4fdd22f3b9c31b53c060df4a0613a1c7f062d4115a2b984dd15b1858f7e340d" +dependencies = [ + "bytes", + "prost-derive", +] + +[[package]] +name = "prost-build" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8bdf592881d821b83d471f8af290226c8d51402259e9bb5be7f9f8bdebbb11ac" +dependencies = [ + "bytes", + "heck", + "itertools", + "log", + "multimap", + "once_cell", + "petgraph", + "prettyplease", + "prost", + "prost-types", + "regex", + "syn 2.0.29", + "tempfile", + "which", +] + +[[package]] +name = "prost-derive" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "265baba7fabd416cf5078179f7d2cbeca4ce7a9041111900675ea7c4cb8a4c32" +dependencies = [ + "anyhow", + "itertools", + "proc-macro2", + "quote", + "syn 2.0.29", +] + +[[package]] +name = "prost-types" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e081b29f63d83a4bc75cfc9f3fe424f9156cf92d8a4f0c9407cce9a1b67327cf" +dependencies = [ + "prost", +] + [[package]] name = "quick-error" version = "1.2.3" @@ -1973,6 +2276,12 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "rustversion" +version = "1.0.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ffc183a10b4478d04cbbbfc96d0873219d962dd5accaff2ffbd4ceb7df837f4" + [[package]] name = "rusty-fork" version = "0.3.0" @@ -2101,6 +2410,24 @@ dependencies = [ "syn 2.0.29", ] +[[package]] +name = "service" +version = "0.1.0" +dependencies = [ + "async-stream", + "clap 4.4.4", + "env_logger", + "eyre", + "hex", + "log", + "post-rs", + "prost", + "tokio", + "tokio-stream", + "tonic", + "tonic-build", +] + [[package]] name = "shlex" version = "1.1.0" @@ -2122,6 +2449,26 @@ version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "62bb4feee49fdd9f707ef802e22365a35de4b7b299de4763d44bfea899442ff9" +[[package]] +name = "socket2" +version = "0.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "64a4a911eed85daf18834cfaa86a79b7d266ff93ff5ba14005426219480ed662" +dependencies = [ + "libc", + "winapi", +] + +[[package]] +name = "socket2" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4031e820eb552adee9295814c0ced9e5cf38ddf1e8b7d566d6de8e2538ea989e" +dependencies = [ + "libc", + "windows-sys", +] + [[package]] name = "stable_deref_trait" version = "1.2.0" @@ -2197,6 +2544,12 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "sync_wrapper" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" + [[package]] name = "tap" version = "1.0.1" @@ -2305,6 +2658,69 @@ dependencies = [ "serde_json", ] +[[package]] +name = "tokio" +version = "1.32.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "17ed6077ed6cd6c74735e21f37eb16dc3935f96878b1fe961074089cc80893f9" +dependencies = [ + "backtrace", + "bytes", + "libc", + "mio", + "num_cpus", + "pin-project-lite", + "socket2 0.5.4", + "tokio-macros", + "windows-sys", +] + +[[package]] +name = "tokio-io-timeout" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30b74022ada614a1b4834de765f9bb43877f910cc8ce4be40e89042c9223a8bf" +dependencies = [ + "pin-project-lite", + "tokio", +] + +[[package]] +name = "tokio-macros" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.29", +] + +[[package]] +name = "tokio-stream" +version = "0.1.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "397c988d37662c7dda6d2208364a706264bf3d6138b11d436cbac0ad38832842" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + +[[package]] +name = "tokio-util" +version = "0.7.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "806fe8c2c87eccc8b3267cbae29ed3ab2d0bd37fca70ab622e46aaa9375ddb7d" +dependencies = [ + "bytes", + "futures-core", + "futures-sink", + "pin-project-lite", + "tokio", + "tracing", +] + [[package]] name = "toml" version = "0.5.11" @@ -2331,6 +2747,116 @@ dependencies = [ "winnow", ] +[[package]] +name = "tonic" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5469afaf78a11265c343a88969045c1568aa8ecc6c787dbf756e92e70f199861" +dependencies = [ + "async-stream", + "async-trait", + "axum", + "base64 0.21.3", + "bytes", + "h2", + "http", + "http-body", + "hyper", + "hyper-timeout", + "percent-encoding", + "pin-project", + "prost", + "tokio", + "tokio-stream", + "tower", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tonic-build" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b477abbe1d18c0b08f56cd01d1bc288668c5b5cfd19b2ae1886bbf599c546f1" +dependencies = [ + "prettyplease", + "proc-macro2", + "prost-build", + "quote", + "syn 2.0.29", +] + +[[package]] +name = "tower" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" +dependencies = [ + "futures-core", + "futures-util", + "indexmap 1.9.3", + "pin-project", + "pin-project-lite", + "rand", + "slab", + "tokio", + "tokio-util", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower-layer" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c20c8dbed6283a09604c3e69b4b7eeb54e298b8a600d4d5ecb5ad39de609f1d0" + +[[package]] +name = "tower-service" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52" + +[[package]] +name = "tracing" +version = "0.1.37" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8" +dependencies = [ + "cfg-if", + "pin-project-lite", + "tracing-attributes", + "tracing-core", +] + +[[package]] +name = "tracing-attributes" +version = "0.1.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f4f31f56159e98206da9efd823404b79b6ef3143b4a7ab76e67b1751b25a4ab" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.29", +] + +[[package]] +name = "tracing-core" +version = "0.1.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0955b8137a1df6f1a2e9a37d8a6656291ff0297c1a97c24e0d8425fe2312f79a" +dependencies = [ + "once_cell", +] + +[[package]] +name = "try-lock" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3528ecfd12c466c6f163363caf2d02a71161dd5e1cc6ae7b34207ea2d42d81ed" + [[package]] name = "typenum" version = "1.16.0" @@ -2398,6 +2924,15 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "want" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bfa7760aed19e106de2c7c0b581b509f2f25d3dacaf737cb82ac61bc6d760b0e" +dependencies = [ + "try-lock", +] + [[package]] name = "wasi" version = "0.11.0+wasi-snapshot-preview1" diff --git a/Cargo.toml b/Cargo.toml index 2f324ff9..1dcf3a14 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,5 @@ [workspace] -members = [".", "ffi", "scrypt-ocl", "initializer", "profiler"] +members = [".", "ffi", "scrypt-ocl", "initializer", "profiler", "service"] [package] name = "post-rs" diff --git a/service/Cargo.toml b/service/Cargo.toml new file mode 100644 index 00000000..6e2e005e --- /dev/null +++ b/service/Cargo.toml @@ -0,0 +1,29 @@ +[package] +name = "service" +version = "0.1.0" +edition = "2021" + +[[bin]] # Bin to run the HelloWorld gRPC server +name = "test-server" +path = "src/test_server.rs" + +[dependencies] +post-rs = { path = "../" } +prost = "0.12.1" +tonic = "0.10.0" +tokio = { version = "1.0", features = [ + "rt-multi-thread", + "macros", + "sync", + "time", +] } +tokio-stream = "0.1" +async-stream = "0.3.5" +log = "0.4.20" +eyre = "0.6.8" +env_logger = "0.10.0" +clap = { version = "4.4.4", features = ["derive"] } +hex = "0.4.3" + +[build-dependencies] +tonic-build = "0.10.0" diff --git a/service/api b/service/api new file mode 160000 index 00000000..3cdeeff6 --- /dev/null +++ b/service/api @@ -0,0 +1 @@ +Subproject commit 3cdeeff627f1b31ec5becd46b6109784de749892 diff --git a/service/build.rs b/service/build.rs new file mode 100644 index 00000000..b4eb8a0a --- /dev/null +++ b/service/build.rs @@ -0,0 +1,4 @@ +fn main() -> Result<(), Box> { + tonic_build::configure().compile(&["api/spacemesh/v1/post.proto"], &["api"])?; + Ok(()) +} diff --git a/service/src/client.rs b/service/src/client.rs new file mode 100644 index 00000000..be3f50a9 --- /dev/null +++ b/service/src/client.rs @@ -0,0 +1,116 @@ +pub(crate) use spacemesh_v1::post_service_client::PostServiceClient; +use spacemesh_v1::service_response; +use spacemesh_v1::ServiceResponse; +use spacemesh_v1::{GenProofResponse, GenProofStatus, Proof, ProofMetadata}; +use tokio::sync::{mpsc, oneshot}; +use tonic::transport::Channel; +use tonic::Request; + +use crate::service::Command; +use spacemesh_v1::node_request; + +pub mod spacemesh_v1 { + tonic::include_proto!("spacemesh.v1"); +} + +pub(crate) async fn run( + mut client: PostServiceClient, + cmds: mpsc::Sender, +) -> eyre::Result<()> { + let (tx, mut rx) = mpsc::channel::(1); + let outbound = async_stream::stream! { + while let Some(msg) = rx.recv().await { + yield msg; + } + }; + + let response = client.register(Request::new(outbound)).await?; + let mut inbound = response.into_inner(); + + while let Some(request) = inbound.message().await? { + log::debug!("Got request from node: {request:?}"); + match request.kind { + Some(node_request::Kind::GenProof(req)) => { + let (cmd_response, rx) = oneshot::channel(); + let Ok(challenge) = req.challenge.try_into() else { + log::error!("invalid challenge"); + tx.send(ServiceResponse { + kind: Some(service_response::Kind::GenProof(GenProofResponse { + status: GenProofStatus::Error as i32, + ..Default::default() + })), + }) + .await?; + continue; + }; + + // Forward the request to the service + cmds.send(Command::GenProof { + challenge, + response: cmd_response, + }) + .await?; + + // Process the response from the service + let resp = match rx.await? { + Ok(Some(resp)) => { + log::info!("proof generation finished"); + ServiceResponse { + kind: Some(service_response::Kind::GenProof(GenProofResponse { + proof: Some(Proof { + nonce: resp.0.nonce, + indices: resp.0.indices.into_owned(), + pow: resp.0.pow, + }), + metadata: Some(ProofMetadata { + challenge: resp.1.challenge.to_vec(), + node_id: Some(spacemesh_v1::SmesherId { + id: resp.1.node_id.to_vec(), + }), + commitment_atx_id: Some(spacemesh_v1::ActivationId { + id: resp.1.commitment_atx_id.to_vec(), + }), + num_units: resp.1.num_units, + labels_per_unit: resp.1.labels_per_unit, + }), + status: GenProofStatus::Ok as i32, + })), + } + } + Ok(None) => { + log::info!("proof generation in progress"); + ServiceResponse { + kind: Some(service_response::Kind::GenProof(GenProofResponse { + status: GenProofStatus::Ok as i32, + ..Default::default() + })), + } + } + Err(e) => { + log::error!("failed to generate proof: {e:?}"); + ServiceResponse { + kind: Some(service_response::Kind::GenProof(GenProofResponse { + status: GenProofStatus::Error as i32, + ..Default::default() + })), + } + } + }; + + tx.send(resp).await?; + } + None => { + log::warn!("Got a request with no kind"); + tx.send(ServiceResponse { + kind: Some(service_response::Kind::GenProof(GenProofResponse { + status: GenProofStatus::Error as i32, + ..Default::default() + })), + }) + .await? + } + } + } + + Ok(()) +} diff --git a/service/src/main.rs b/service/src/main.rs new file mode 100644 index 00000000..52736084 --- /dev/null +++ b/service/src/main.rs @@ -0,0 +1,175 @@ +use std::{path::PathBuf, time::Duration}; + +use clap::{Args, Parser, ValueEnum}; +use eyre::Context; +use post::pow::randomx::RandomXFlag; +use tokio::{sync::mpsc, time::sleep}; + +mod client; +mod service; + +/// Post Service +#[derive(Parser, Debug)] +#[command(version, about)] +struct Cli { + /// Directory of POST data + #[arg(short, long)] + dir: PathBuf, + + /// Node address to connect to + #[arg(short, long)] + address: String, + + #[command(flatten, next_help_heading = "POST configuration")] + post_config: PostConfig, + + #[command(flatten, next_help_heading = "POST settings")] + post_settings: PostSettings, +} + +#[derive(Args, Debug)] +/// POST configuration - network parameters +struct PostConfig { + /// K1 specifies the difficulty for a label to be a candidate for a proof. + #[arg(long, default_value = "26")] + k1: u32, + /// K2 is the number of labels below the required difficulty required for a proof. + #[arg(long, default_value = "37")] + k2: u32, + /// K3 is the size of the subset of proof indices that is validated. + #[arg(long, default_value = "37")] + k3: u32, + /// Difficulty for the nonce proof of work. Lower values increase difficulty of finding + /// `pow` for [Proof][crate::prove::Proof]. + #[arg( + long, + default_value = "000dfb23b0979b4b000000000000000000000000000000000000000000000000", + value_parser(parse_difficulty) + )] + pow_difficulty: [u8; 32], + + /// Scrypt parameters for initialization + #[command(flatten)] + scrypt: ScryptParams, +} + +#[derive(Args, Debug)] +struct ScryptParams { + /// Scrypt N parameter + #[arg(short, default_value_t = 8192)] + n: usize, + + /// Scrypt R parameter + #[arg(short, default_value_t = 1)] + r: usize, + + /// Scrypt P parameter + #[arg(short, default_value_t = 1)] + p: usize, +} +#[derive(Args, Debug)] +/// POST proof generation settings +struct PostSettings { + /// Number of threads to use. + /// '0' means use all available threads + #[arg(long, default_value_t = 1)] + threads: usize, + + /// Number of nonces to attempt in single pass over POS data. + /// + /// Each group of 16 nonces requires a separate PoW. Must be a multiple of 16. + /// + /// Higher value gives a better chance to find a proof within less passes over the POS data, + /// but also slows down the process. + #[arg(long, default_value_t = 128, value_parser(parse_nonces))] + nonces: usize, + + /// Modes of operation for RandomX. + /// + /// They are interchangeable as they give the same results but have different + /// purpose and memory requirements. + #[arg(long, default_value_t = RandomXMode::Fast)] + randomx_mode: RandomXMode, +} + +#[derive(Debug, Copy, Clone, Eq, PartialEq, ValueEnum)] +enum RandomXMode { + /// Fast mode for proving. Requires 2080 MiB of memory. + Fast, + /// Light mode for verification. Requires only 256 MiB of memory, but runs significantly slower + Light, +} + +impl std::fmt::Display for RandomXMode { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.to_possible_value().unwrap().get_name().fmt(f) + } +} + +impl From for RandomXFlag { + fn from(val: RandomXMode) -> Self { + match val { + RandomXMode::Fast => RandomXFlag::get_recommended_flags() | RandomXFlag::FLAG_FULL_MEM, + RandomXMode::Light => RandomXFlag::get_recommended_flags(), + } + } +} + +fn parse_nonces(arg: &str) -> eyre::Result { + let nonces = arg.parse()?; + eyre::ensure!(nonces % 16 == 0, "nonces must be multiple of 16"); + eyre::ensure!(nonces / 16 <= 256, format!("max nonces is {}", 256 * 16)); + Ok(nonces) +} + +fn parse_difficulty(arg: &str) -> eyre::Result<[u8; 32]> { + hex::decode(arg)? + .as_slice() + .try_into() + .wrap_err("invalid difficulty length") +} + +#[tokio::main] +async fn main() -> eyre::Result<()> { + let args = Cli::parse(); + + let env = env_logger::Env::default().filter_or("RUST_LOG", "info"); + env_logger::init_from_env(env); + + let (tx, rx) = mpsc::channel(1); + + let service = service::PostService::new( + rx, + args.dir, + post::config::Config { + k1: args.post_config.k1, + k2: args.post_config.k2, + k3: args.post_config.k3, + pow_difficulty: args.post_config.pow_difficulty, + scrypt: post::ScryptParams::new( + args.post_config.scrypt.n.ilog2() as u8 - 1, + args.post_config.scrypt.r.ilog2() as u8, + args.post_config.scrypt.p.ilog2() as u8, + ), + }, + args.post_settings.nonces, + args.post_settings.threads, + args.post_settings.randomx_mode.into(), + )?; + tokio::spawn(service.run()); + + loop { + let client = loop { + match client::PostServiceClient::connect(args.address.clone()).await { + Ok(client) => break client, + Err(e) => { + log::error!("failed to connect to node: {e}"); + sleep(Duration::from_secs(1)).await; + } + } + }; + let res = client::run(client, tx.clone()).await; + log::info!("client exited: {res:?}"); + } + // Ok(()) +} diff --git a/service/src/service.rs b/service/src/service.rs new file mode 100644 index 00000000..20db2a2b --- /dev/null +++ b/service/src/service.rs @@ -0,0 +1,120 @@ +use std::path::PathBuf; + +use eyre::Context; +use post::{metadata::ProofMetadata, pow::randomx::RandomXFlag, prove::Proof}; +use tokio::sync::{mpsc, oneshot}; + +#[derive(Debug)] +pub(crate) enum Command { + GenProof { + challenge: [u8; 32], + response: oneshot::Sender, ProofMetadata)>>>, + }, +} + +#[derive(Debug)] +pub(crate) struct PostService { + id: [u8; 32], + datadir: PathBuf, + cfg: post::config::Config, + nonces: usize, + threads: usize, + pow_flags: RandomXFlag, + rx: mpsc::Receiver, + proof_generation: Option>>>, +} + +impl PostService { + pub(crate) fn new( + rx: mpsc::Receiver, + datadir: PathBuf, + cfg: post::config::Config, + nonces: usize, + threads: usize, + pow_flags: RandomXFlag, + ) -> eyre::Result { + let metadata = + post::metadata::load(&datadir).wrap_err("loading metadata. Is POST initialized?")?; + let id = metadata.node_id; + + Ok(Self { + id, + rx, + proof_generation: None, + datadir, + cfg, + nonces, + threads, + pow_flags, + }) + } + + pub(crate) async fn run(mut self) -> eyre::Result<()> { + log::info!("starting PostService"); + while let Some(cmd) = self.rx.recv().await { + log::info!("got {cmd:?}"); + match cmd { + Command::GenProof { + challenge, + response, + } => { + log::info!("got GenProof command"); + match &mut self.proof_generation { + Some(handle) => { + if handle.is_finished() { + log::info!("proof generation is finished"); + let result = handle.await?; + self.proof_generation = None; + match result { + Ok(proof) => { + let metadata = post::metadata::load(&self.datadir).unwrap(); + + _ = response.send(Ok(Some(( + proof, + ProofMetadata { + challenge, + node_id: metadata.node_id, + commitment_atx_id: metadata.commitment_atx_id, + num_units: metadata.num_units, + labels_per_unit: metadata.labels_per_unit, + }, + )))); + } + Err(e) => { + _ = response.send(Err(e)); + } + } + } else { + log::info!("proof generation in progress"); + _ = response.send(Ok(None)); + } + } + None => { + log::info!("starting proof generation"); + let pow_flags = self.pow_flags; + let cfg = self.cfg; + let datadir = self.datadir.clone(); + let node_id = self.id; + let nonces = self.nonces; + let threads = self.threads; + self.proof_generation = Some(tokio::task::spawn_blocking(move || { + post::prove::generate_proof( + &datadir, + &challenge, + cfg, + nonces, + threads, + pow_flags, + Some(node_id), + ) + })); + // in progress + _ = response.send(Ok(None)); + } + } + } + } + } + Ok(()) + } +} diff --git a/service/src/test_server.rs b/service/src/test_server.rs new file mode 100644 index 00000000..1d2568a9 --- /dev/null +++ b/service/src/test_server.rs @@ -0,0 +1,143 @@ +use std::pin::Pin; +use std::sync::Mutex; +use std::time::Duration; + +use tokio::sync::{broadcast, mpsc, oneshot}; +use tokio::time::sleep; +use tokio_stream::{Stream, StreamExt}; +use tonic::{transport::Server, Request, Response, Status}; + +use spacemesh_v1::post_service_server::{PostService, PostServiceServer}; +use spacemesh_v1::{NodeRequest, ServiceResponse}; + +use spacemesh_v1::node_request; +use spacemesh_v1::GenProofRequest; +pub mod spacemesh_v1 { + tonic::include_proto!("spacemesh.v1"); +} + +struct TestNodeRequest { + request: NodeRequest, + response: oneshot::Sender, +} + +#[derive(Debug)] +pub struct TestPostService { + registered: Mutex>>, +} + +impl TestPostService { + fn new() -> Self { + Self { + registered: Mutex::new(broadcast::channel(1).0), + } + } + fn wait_for_connection(&mut self) -> broadcast::Receiver> { + self.registered.lock().unwrap().subscribe() + } +} + +#[tonic::async_trait] +impl PostService for TestPostService { + type RegisterStream = Pin> + Send + 'static>>; + + async fn register( + &self, + request: Request>, + ) -> Result, Status> { + log::info!("Post Service connected: {:?}", request); + let mut stream = request.into_inner(); + + let (tx, mut rx) = tokio::sync::mpsc::channel(1); + self.registered + .lock() + .unwrap() + .send(tx) + .expect("nobody is interested in post service registered"); + + let output = async_stream::try_stream! { + while let Some(req) = rx.recv().await { + yield req.request; + if let Some(Ok(response)) = stream.next().await { + _ = req.response.send(response); + } else { + log::info!("stream closed"); + return; + } + } + }; + + Ok(Response::new(Box::pin(output) as Self::RegisterStream)) + } +} + +#[tokio::main] +async fn main() -> eyre::Result<()> { + let env = env_logger::Env::default().filter_or("RUST_LOG", "info"); + env_logger::init_from_env(env); + + let addr = "[::1]:50051".parse()?; + + let mut test_node = TestPostService::new(); + + let mut reg = test_node.wait_for_connection(); + + let _handle = tokio::spawn( + Server::builder() + .add_service(PostServiceServer::new(test_node)) + .serve(addr), + ); + + loop { + // wait for the connection to be established + let tx = reg.recv().await?; + + loop { + let (resp_tx, resp_rx) = oneshot::channel(); + if let Err(e) = tx + .send(TestNodeRequest { + request: NodeRequest { + kind: Some(node_request::Kind::GenProof(GenProofRequest { + challenge: vec![0xCA; 32], + })), + }, + response: resp_tx, + }) + .await + { + log::error!("post service disconnected: {:?}", e); + break; + } + + let resp = resp_rx.await?; + match resp.kind { + Some(spacemesh_v1::service_response::Kind::GenProof(resp)) => { + log::debug!("Got GenProof response: {resp:?}"); + match resp.status() { + spacemesh_v1::GenProofStatus::Ok => { + if let Some(proof) = resp.proof { + log::info!("POST proof generation finished, proof: {:?}", proof); + break; + } + log::info!("POST proof generation in progress"); + } + spacemesh_v1::GenProofStatus::Unspecified => { + log::error!("unspecified status"); + } + spacemesh_v1::GenProofStatus::Error => { + log::error!("POST proof generation error"); + break; + } + } + } + _ => { + log::error!("Got unexpected response: {:?}", resp); + } + } + sleep(Duration::from_secs(5)).await; + } + } + + // _ = handle.await?; + // Ok(()) +} From df32089922e505fad5baba9626749545d62ee446 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartosz=20R=C3=B3=C5=BCa=C5=84ski?= Date: Thu, 21 Sep 2023 13:02:25 +0200 Subject: [PATCH 02/12] Install protoc in CI --- .github/workflows/ci.yml | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 64821c3a..5a29d560 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -13,7 +13,12 @@ jobs: name: Check runs-on: ubuntu-latest steps: + - uses: arduino/setup-protoc@v2 + with: + repo-token: ${{ secrets.GITHUB_TOKEN }} - uses: actions/checkout@v3 + with: + submodules: true - uses: dtolnay/rust-toolchain@stable - uses: Swatinem/rust-cache@v2 - run: cargo check --workspace --all-features @@ -42,6 +47,8 @@ jobs: steps: - uses: actions/checkout@v3 + with: + submodules: true - uses: dtolnay/rust-toolchain@master with: toolchain: ${{ matrix.toolchain }} @@ -86,6 +93,8 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v3 + with: + submodules: true - uses: dtolnay/rust-toolchain@stable with: components: rustfmt @@ -95,7 +104,12 @@ jobs: clippy: runs-on: ubuntu-latest steps: + - uses: arduino/setup-protoc@v2 + with: + repo-token: ${{ secrets.GITHUB_TOKEN }} - uses: actions/checkout@v3 + with: + submodules: true - uses: dtolnay/rust-toolchain@master with: components: clippy @@ -111,6 +125,8 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v3 + with: + submodules: true - name: Install stable uses: dtolnay/rust-toolchain@stable with: @@ -177,6 +193,8 @@ jobs: name: Install opencl run: vcpkg install opencl - uses: actions/checkout@v3 + with: + submodules: true - uses: dtolnay/rust-toolchain@master with: toolchain: ${{ matrix.toolchain }} From 7b863f35a532b4e34616c46b9afe56f5e29fbb70 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartosz=20R=C3=B3=C5=BCa=C5=84ski?= Date: Fri, 22 Sep 2023 11:27:42 +0200 Subject: [PATCH 03/12] Refactor and simplify post service code --- service/src/client.rs | 98 ++++++++++++++++----------- service/src/main.rs | 56 +++++++--------- service/src/service.rs | 148 +++++++++++++++++++++-------------------- 3 files changed, 158 insertions(+), 144 deletions(-) diff --git a/service/src/client.rs b/service/src/client.rs index be3f50a9..b4d1cc25 100644 --- a/service/src/client.rs +++ b/service/src/client.rs @@ -1,21 +1,60 @@ +use std::sync::Arc; +use std::sync::Mutex; +use std::time::Duration; + pub(crate) use spacemesh_v1::post_service_client::PostServiceClient; -use spacemesh_v1::service_response; -use spacemesh_v1::ServiceResponse; -use spacemesh_v1::{GenProofResponse, GenProofStatus, Proof, ProofMetadata}; -use tokio::sync::{mpsc, oneshot}; +use spacemesh_v1::{node_request, service_response}; +use spacemesh_v1::{GenProofResponse, GenProofStatus, Proof, ProofMetadata, ServiceResponse}; +use tokio::sync::mpsc; +use tokio::time::sleep; use tonic::transport::Channel; use tonic::Request; -use crate::service::Command; -use spacemesh_v1::node_request; +use crate::service::ProofGenState; pub mod spacemesh_v1 { tonic::include_proto!("spacemesh.v1"); } -pub(crate) async fn run( +pub(crate) struct ServiceClient { + address: String, + reconnect_interval: Duration, + service: Arc>, +} + +impl ServiceClient { + pub(crate) fn new( + address: String, + reconnect_interval: Duration, + service: Arc>, + ) -> Self { + Self { + address, + reconnect_interval, + service, + } + } + + pub(crate) async fn run(self) -> eyre::Result<()> { + loop { + let client = loop { + match PostServiceClient::connect(self.address.clone()).await { + Ok(client) => break client, + Err(e) => { + log::info!("could not connect to the node: {e:?}"); + sleep(self.reconnect_interval).await; + } + } + }; + let res = run(client, self.service.clone()).await; + log::info!("client exited: {res:?}"); + } + } +} + +async fn run( mut client: PostServiceClient, - cmds: mpsc::Sender, + service: Arc>, ) -> eyre::Result<()> { let (tx, mut rx) = mpsc::channel::(1); let outbound = async_stream::stream! { @@ -31,53 +70,34 @@ pub(crate) async fn run( log::debug!("Got request from node: {request:?}"); match request.kind { Some(node_request::Kind::GenProof(req)) => { - let (cmd_response, rx) = oneshot::channel(); - let Ok(challenge) = req.challenge.try_into() else { - log::error!("invalid challenge"); - tx.send(ServiceResponse { - kind: Some(service_response::Kind::GenProof(GenProofResponse { - status: GenProofStatus::Error as i32, - ..Default::default() - })), - }) - .await?; - continue; - }; - - // Forward the request to the service - cmds.send(Command::GenProof { - challenge, - response: cmd_response, - }) - .await?; + let result = service.lock().unwrap().gen_proof(req.challenge); - // Process the response from the service - let resp = match rx.await? { - Ok(Some(resp)) => { + let resp = match result { + Ok(ProofGenState::Finished { proof, metadata }) => { log::info!("proof generation finished"); ServiceResponse { kind: Some(service_response::Kind::GenProof(GenProofResponse { proof: Some(Proof { - nonce: resp.0.nonce, - indices: resp.0.indices.into_owned(), - pow: resp.0.pow, + nonce: proof.nonce, + indices: proof.indices.into_owned(), + pow: proof.pow, }), metadata: Some(ProofMetadata { - challenge: resp.1.challenge.to_vec(), + challenge: metadata.challenge.to_vec(), node_id: Some(spacemesh_v1::SmesherId { - id: resp.1.node_id.to_vec(), + id: metadata.node_id.to_vec(), }), commitment_atx_id: Some(spacemesh_v1::ActivationId { - id: resp.1.commitment_atx_id.to_vec(), + id: metadata.commitment_atx_id.to_vec(), }), - num_units: resp.1.num_units, - labels_per_unit: resp.1.labels_per_unit, + num_units: metadata.num_units, + labels_per_unit: metadata.labels_per_unit, }), status: GenProofStatus::Ok as i32, })), } } - Ok(None) => { + Ok(ProofGenState::InProgress) => { log::info!("proof generation in progress"); ServiceResponse { kind: Some(service_response::Kind::GenProof(GenProofResponse { diff --git a/service/src/main.rs b/service/src/main.rs index 52736084..4082a84b 100644 --- a/service/src/main.rs +++ b/service/src/main.rs @@ -1,12 +1,15 @@ -use std::{path::PathBuf, time::Duration}; +mod client; +mod service; + +use std::{ + path::PathBuf, + sync::{Arc, Mutex}, + time::Duration, +}; use clap::{Args, Parser, ValueEnum}; use eyre::Context; use post::pow::randomx::RandomXFlag; -use tokio::{sync::mpsc, time::sleep}; - -mod client; -mod service; /// Post Service #[derive(Parser, Debug)] @@ -15,10 +18,12 @@ struct Cli { /// Directory of POST data #[arg(short, long)] dir: PathBuf, - /// Node address to connect to #[arg(short, long)] address: String, + /// Time to wait before reconnecting to the node + #[arg(long, default_value = "5", value_parser = |secs: &str| secs.parse().map(Duration::from_secs))] + reconnect_interval_s: Duration, #[command(flatten, next_help_heading = "POST configuration")] post_config: PostConfig, @@ -39,15 +44,13 @@ struct PostConfig { /// K3 is the size of the subset of proof indices that is validated. #[arg(long, default_value = "37")] k3: u32, - /// Difficulty for the nonce proof of work. Lower values increase difficulty of finding - /// `pow` for [Proof][crate::prove::Proof]. + /// Difficulty for the nonce proof of work (aka "k2pow"). #[arg( long, default_value = "000dfb23b0979b4b000000000000000000000000000000000000000000000000", value_parser(parse_difficulty) )] pow_difficulty: [u8; 32], - /// Scrypt parameters for initialization #[command(flatten)] scrypt: ScryptParams, @@ -58,15 +61,14 @@ struct ScryptParams { /// Scrypt N parameter #[arg(short, default_value_t = 8192)] n: usize, - /// Scrypt R parameter #[arg(short, default_value_t = 1)] r: usize, - /// Scrypt P parameter #[arg(short, default_value_t = 1)] p: usize, } + #[derive(Args, Debug)] /// POST proof generation settings struct PostSettings { @@ -74,7 +76,6 @@ struct PostSettings { /// '0' means use all available threads #[arg(long, default_value_t = 1)] threads: usize, - /// Number of nonces to attempt in single pass over POS data. /// /// Each group of 16 nonces requires a separate PoW. Must be a multiple of 16. @@ -83,7 +84,6 @@ struct PostSettings { /// but also slows down the process. #[arg(long, default_value_t = 128, value_parser(parse_nonces))] nonces: usize, - /// Modes of operation for RandomX. /// /// They are interchangeable as they give the same results but have different @@ -136,10 +136,7 @@ async fn main() -> eyre::Result<()> { let env = env_logger::Env::default().filter_or("RUST_LOG", "info"); env_logger::init_from_env(env); - let (tx, rx) = mpsc::channel(1); - let service = service::PostService::new( - rx, args.dir, post::config::Config { k1: args.post_config.k1, @@ -155,21 +152,14 @@ async fn main() -> eyre::Result<()> { args.post_settings.nonces, args.post_settings.threads, args.post_settings.randomx_mode.into(), - )?; - tokio::spawn(service.run()); - - loop { - let client = loop { - match client::PostServiceClient::connect(args.address.clone()).await { - Ok(client) => break client, - Err(e) => { - log::error!("failed to connect to node: {e}"); - sleep(Duration::from_secs(1)).await; - } - } - }; - let res = client::run(client, tx.clone()).await; - log::info!("client exited: {res:?}"); - } - // Ok(()) + ) + .wrap_err("creating Post Service")?; + + let client = client::ServiceClient::new( + args.address, + args.reconnect_interval_s, + Arc::new(Mutex::new(service)), + ); + + client.run().await } diff --git a/service/src/service.rs b/service/src/service.rs index 20db2a2b..a71c59ba 100644 --- a/service/src/service.rs +++ b/service/src/service.rs @@ -2,16 +2,21 @@ use std::path::PathBuf; use eyre::Context; use post::{metadata::ProofMetadata, pow::randomx::RandomXFlag, prove::Proof}; -use tokio::sync::{mpsc, oneshot}; -#[derive(Debug)] -pub(crate) enum Command { - GenProof { - challenge: [u8; 32], - response: oneshot::Sender, ProofMetadata)>>>, +pub(crate) enum ProofGenState { + InProgress, + Finished { + proof: Proof<'static>, + metadata: ProofMetadata, }, } +#[derive(Debug)] +struct ProofGenProcess { + handle: std::thread::JoinHandle>>, + challenge: Vec, +} + #[derive(Debug)] pub(crate) struct PostService { id: [u8; 32], @@ -20,13 +25,11 @@ pub(crate) struct PostService { nonces: usize, threads: usize, pow_flags: RandomXFlag, - rx: mpsc::Receiver, - proof_generation: Option>>>, + proof_generation: Option, } impl PostService { pub(crate) fn new( - rx: mpsc::Receiver, datadir: PathBuf, cfg: post::config::Config, nonces: usize, @@ -39,7 +42,6 @@ impl PostService { Ok(Self { id, - rx, proof_generation: None, datadir, cfg, @@ -49,72 +51,74 @@ impl PostService { }) } - pub(crate) async fn run(mut self) -> eyre::Result<()> { - log::info!("starting PostService"); - while let Some(cmd) = self.rx.recv().await { - log::info!("got {cmd:?}"); - match cmd { - Command::GenProof { - challenge, - response, - } => { - log::info!("got GenProof command"); - match &mut self.proof_generation { - Some(handle) => { - if handle.is_finished() { - log::info!("proof generation is finished"); - let result = handle.await?; - self.proof_generation = None; - match result { - Ok(proof) => { - let metadata = post::metadata::load(&self.datadir).unwrap(); + pub(crate) fn gen_proof(&mut self, challenge: Vec) -> eyre::Result { + if let Some(process) = &mut self.proof_generation { + eyre::ensure!( + process.challenge == challenge, + "proof generation is in progress for a different challenge (current: {:X?}, requested: {:X?})", process.challenge, challenge, + ); + + if process.handle.is_finished() { + log::info!("proof generation is finished"); + let result = match self.proof_generation.take().unwrap().handle.join() { + Ok(result) => result, + Err(err) => { + std::panic::resume_unwind(err); + } + }; + + match result { + Ok(proof) => { + let metadata = post::metadata::load(&self.datadir) + .wrap_err("loading POST metadata")?; - _ = response.send(Ok(Some(( - proof, - ProofMetadata { - challenge, - node_id: metadata.node_id, - commitment_atx_id: metadata.commitment_atx_id, - num_units: metadata.num_units, - labels_per_unit: metadata.labels_per_unit, - }, - )))); - } - Err(e) => { - _ = response.send(Err(e)); - } - } - } else { - log::info!("proof generation in progress"); - _ = response.send(Ok(None)); - } - } - None => { - log::info!("starting proof generation"); - let pow_flags = self.pow_flags; - let cfg = self.cfg; - let datadir = self.datadir.clone(); - let node_id = self.id; - let nonces = self.nonces; - let threads = self.threads; - self.proof_generation = Some(tokio::task::spawn_blocking(move || { - post::prove::generate_proof( - &datadir, - &challenge, - cfg, - nonces, - threads, - pow_flags, - Some(node_id), - ) - })); - // in progress - _ = response.send(Ok(None)); - } + return Ok(ProofGenState::Finished { + proof, + metadata: ProofMetadata { + challenge: challenge + .try_into() + .map_err(|_| eyre::eyre!("invalid challenge format"))?, + node_id: metadata.node_id, + commitment_atx_id: metadata.commitment_atx_id, + num_units: metadata.num_units, + labels_per_unit: metadata.labels_per_unit, + }, + }); + } + Err(e) => { + return Err(e); } } + } else { + log::info!("proof generation in progress"); + return Ok(ProofGenState::InProgress); } } - Ok(()) + + log::info!("starting proof generation for challenge {:X?}", challenge); + let pow_flags = self.pow_flags; + let cfg = self.cfg; + let datadir = self.datadir.clone(); + let miner_id = Some(self.id); + let nonces = self.nonces; + let threads = self.threads; + self.proof_generation = Some(ProofGenProcess { + challenge: challenge.clone(), + handle: std::thread::spawn(move || { + post::prove::generate_proof( + &datadir, + &challenge + .try_into() + .map_err(|_| eyre::eyre!("invalid challenge format"))?, + cfg, + nonces, + threads, + pow_flags, + miner_id, + ) + }), + }); + + Ok(ProofGenState::InProgress) } } From e7e199d07d909822c91203ec54671ea9bd2727e7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartosz=20R=C3=B3=C5=BCa=C5=84ski?= Date: Fri, 22 Sep 2023 15:58:01 +0200 Subject: [PATCH 04/12] Support for mTLS --- Cargo.lock | 80 ++++++++++++++++++++++++++++++++++++++ initializer/src/main.rs | 1 + service/Cargo.toml | 4 +- service/src/client.rs | 35 ++++++++++++++--- service/src/main.rs | 80 +++++++++++++++++++++++++++++--------- service/src/service.rs | 5 ++- service/src/test_server.rs | 47 ++++++++++++++++++---- service/src/tls_config.rs | 18 +++++++++ 8 files changed, 235 insertions(+), 35 deletions(-) create mode 100644 service/src/tls_config.rs diff --git a/Cargo.lock b/Cargo.lock index 8561f1c4..76512f5d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2181,6 +2181,21 @@ dependencies = [ "bytemuck", ] +[[package]] +name = "ring" +version = "0.16.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3053cf52e236a3ed746dfc745aa9cacf1b791d846bdaf412f60a8d7d6e17c8fc" +dependencies = [ + "cc", + "libc", + "once_cell", + "spin", + "untrusted", + "web-sys", + "winapi", +] + [[package]] name = "rstest" version = "0.17.0" @@ -2276,6 +2291,37 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "rustls" +version = "0.21.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd8d6c9f025a446bc4d18ad9632e69aec8f287aa84499ee335599fabd20c3fd8" +dependencies = [ + "log", + "ring", + "rustls-webpki", + "sct", +] + +[[package]] +name = "rustls-pemfile" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d3987094b1d07b653b7dfdc3f70ce9a1da9c51ac18c1b06b662e4f9a0e9f4b2" +dependencies = [ + "base64 0.21.3", +] + +[[package]] +name = "rustls-webpki" +version = "0.101.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "45a27e3b59326c16e23d30aeb7a36a24cc0d29e71d68ff611cdfb4a01d013bed" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "rustversion" version = "1.0.14" @@ -2345,6 +2391,16 @@ dependencies = [ "thiserror", ] +[[package]] +name = "sct" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d53dcdb7c9f8158937a7981b48accfd39a43af418591a5d008c7b22b5e1b7ca4" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "semver" version = "1.0.18" @@ -2469,6 +2525,12 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "spin" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" + [[package]] name = "stable_deref_trait" version = "1.2.0" @@ -2696,6 +2758,16 @@ dependencies = [ "syn 2.0.29", ] +[[package]] +name = "tokio-rustls" +version = "0.24.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081" +dependencies = [ + "rustls", + "tokio", +] + [[package]] name = "tokio-stream" version = "0.1.14" @@ -2766,7 +2838,9 @@ dependencies = [ "percent-encoding", "pin-project", "prost", + "rustls-pemfile", "tokio", + "tokio-rustls", "tokio-stream", "tower", "tower-layer", @@ -2887,6 +2961,12 @@ version = "1.0.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "301abaae475aa91687eb82514b328ab47a211a533026cb25fc3e519b86adfc3c" +[[package]] +name = "untrusted" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a" + [[package]] name = "utf8parse" version = "0.2.1" diff --git a/initializer/src/main.rs b/initializer/src/main.rs index 075848c5..f9fe615f 100644 --- a/initializer/src/main.rs +++ b/initializer/src/main.rs @@ -1,3 +1,4 @@ +//! Post Service use std::{ io::{Read, Seek}, path::PathBuf, diff --git a/service/Cargo.toml b/service/Cargo.toml index 6e2e005e..2432a45b 100644 --- a/service/Cargo.toml +++ b/service/Cargo.toml @@ -3,14 +3,14 @@ name = "service" version = "0.1.0" edition = "2021" -[[bin]] # Bin to run the HelloWorld gRPC server +[[bin]] name = "test-server" path = "src/test_server.rs" [dependencies] post-rs = { path = "../" } prost = "0.12.1" -tonic = "0.10.0" +tonic = { version = "0.10.0", features = ["tls"] } tokio = { version = "1.0", features = [ "rt-multi-thread", "macros", diff --git a/service/src/client.rs b/service/src/client.rs index b4d1cc25..1abf1b9c 100644 --- a/service/src/client.rs +++ b/service/src/client.rs @@ -1,3 +1,9 @@ +//! Post Service GRPC client +//! +//! This module implements a GRPC client for the Post Service. +//! It connects to the node and registers itself as a Post Service. +//! It then waits for requests from the node and forwards them to the Post Service. + use std::sync::Arc; use std::sync::Mutex; use std::time::Duration; @@ -7,7 +13,11 @@ use spacemesh_v1::{node_request, service_response}; use spacemesh_v1::{GenProofResponse, GenProofStatus, Proof, ProofMetadata, ServiceResponse}; use tokio::sync::mpsc; use tokio::time::sleep; +use tonic::transport::Certificate; use tonic::transport::Channel; +use tonic::transport::ClientTlsConfig; +use tonic::transport::Endpoint; +use tonic::transport::Identity; use tonic::Request; use crate::service::ProofGenState; @@ -17,7 +27,7 @@ pub mod spacemesh_v1 { } pub(crate) struct ServiceClient { - address: String, + endpoint: Endpoint, reconnect_interval: Duration, service: Arc>, } @@ -26,19 +36,31 @@ impl ServiceClient { pub(crate) fn new( address: String, reconnect_interval: Duration, + cert: Option<(Certificate, Identity)>, service: Arc>, - ) -> Self { - Self { - address, + ) -> eyre::Result { + let endpoint = Channel::builder(address.parse()?); + let endpoint = match cert { + Some((cert, identity)) => endpoint.tls_config( + ClientTlsConfig::new() + .domain_name("localhost") + .ca_certificate(cert) + .identity(identity), + )?, + None => endpoint, + }; + + Ok(Self { + endpoint, reconnect_interval, service, - } + }) } pub(crate) async fn run(self) -> eyre::Result<()> { loop { let client = loop { - match PostServiceClient::connect(self.address.clone()).await { + match PostServiceClient::connect(self.endpoint.clone()).await { Ok(client) => break client, Err(e) => { log::info!("could not connect to the node: {e:?}"); @@ -48,6 +70,7 @@ impl ServiceClient { }; let res = run(client, self.service.clone()).await; log::info!("client exited: {res:?}"); + sleep(self.reconnect_interval).await; } } } diff --git a/service/src/main.rs b/service/src/main.rs index 4082a84b..dccea2a6 100644 --- a/service/src/main.rs +++ b/service/src/main.rs @@ -1,7 +1,9 @@ mod client; mod service; +mod tls_config; use std::{ + fs::read_to_string, path::PathBuf, sync::{Arc, Mutex}, time::Duration, @@ -10,18 +12,19 @@ use std::{ use clap::{Args, Parser, ValueEnum}; use eyre::Context; use post::pow::randomx::RandomXFlag; +use tonic::transport::{Certificate, Identity}; /// Post Service #[derive(Parser, Debug)] #[command(version, about)] struct Cli { - /// Directory of POST data + /// directory of POST data #[arg(short, long)] dir: PathBuf, - /// Node address to connect to + /// address to connect to #[arg(short, long)] address: String, - /// Time to wait before reconnecting to the node + /// time to wait before reconnecting to the node #[arg(long, default_value = "5", value_parser = |secs: &str| secs.parse().map(Duration::from_secs))] reconnect_interval_s: Duration, @@ -30,53 +33,73 @@ struct Cli { #[command(flatten, next_help_heading = "POST settings")] post_settings: PostSettings, + + #[command(flatten, next_help_heading = "TLS configuration")] + tls: Option, } #[derive(Args, Debug)] /// POST configuration - network parameters struct PostConfig { - /// K1 specifies the difficulty for a label to be a candidate for a proof. + /// K1 specifies the difficulty for a label to be a candidate for a proof #[arg(long, default_value = "26")] k1: u32, - /// K2 is the number of labels below the required difficulty required for a proof. + /// K2 is the number of labels below the required difficulty required for a proof #[arg(long, default_value = "37")] k2: u32, - /// K3 is the size of the subset of proof indices that is validated. + /// K3 is the size of the subset of proof indices that is validated #[arg(long, default_value = "37")] k3: u32, - /// Difficulty for the nonce proof of work (aka "k2pow"). + /// difficulty for the nonce proof of work (aka "k2pow") #[arg( long, default_value = "000dfb23b0979b4b000000000000000000000000000000000000000000000000", value_parser(parse_difficulty) )] pow_difficulty: [u8; 32], - /// Scrypt parameters for initialization + /// scrypt parameters for initialization #[command(flatten)] scrypt: ScryptParams, } +/// Scrypt parameters for initialization #[derive(Args, Debug)] struct ScryptParams { - /// Scrypt N parameter + /// scrypt N parameter #[arg(short, default_value_t = 8192)] n: usize, - /// Scrypt R parameter + /// scrypt R parameter #[arg(short, default_value_t = 1)] r: usize, - /// Scrypt P parameter + /// scrypt P parameter #[arg(short, default_value_t = 1)] p: usize, } +/// TLS configuration +/// +/// Either all fields must be specified or none +#[derive(Args, Debug, Clone)] +#[group(required = false)] +pub(crate) struct Tls { + /// server CA certificate + #[arg(long, required = false)] + ca_cert: PathBuf, + /// client certificate + #[arg(long, required = false)] + client_cert: PathBuf, + /// client key + #[arg(long, required = false)] + client_key: PathBuf, +} #[derive(Args, Debug)] /// POST proof generation settings struct PostSettings { - /// Number of threads to use. + /// number of threads to use /// '0' means use all available threads #[arg(long, default_value_t = 1)] threads: usize, - /// Number of nonces to attempt in single pass over POS data. + /// number of nonces to attempt in single pass over POS data /// /// Each group of 16 nonces requires a separate PoW. Must be a multiple of 16. /// @@ -84,14 +107,15 @@ struct PostSettings { /// but also slows down the process. #[arg(long, default_value_t = 128, value_parser(parse_nonces))] nonces: usize, - /// Modes of operation for RandomX. - /// - /// They are interchangeable as they give the same results but have different - /// purpose and memory requirements. + /// modes of operation for RandomX #[arg(long, default_value_t = RandomXMode::Fast)] randomx_mode: RandomXMode, } +/// RandomX modes of operation +/// +/// They are interchangeable as they give the same results but have different +/// purpose and memory requirements. #[derive(Debug, Copy, Clone, Eq, PartialEq, ValueEnum)] enum RandomXMode { /// Fast mode for proving. Requires 2080 MiB of memory. @@ -155,11 +179,31 @@ async fn main() -> eyre::Result<()> { ) .wrap_err("creating Post Service")?; + let cert = if let Some(tls) = args.tls { + log::info!( + "configuring TLS: server CA cert: {}, client cert: {}, client key: {}", + tls.ca_cert.display(), + tls.cert.display(), + tls.key.display(), + ); + let server_ca_cert = read_to_string(tls.ca_cert)?; + let cert = read_to_string(tls.cert)?; + let key = read_to_string(tls.key)?; + Some(( + Certificate::from_pem(server_ca_cert), + Identity::from_pem(cert, key), + )) + } else { + log::info!("not configuring TLS"); + None + }; + let client = client::ServiceClient::new( args.address, args.reconnect_interval_s, + cert, Arc::new(Mutex::new(service)), - ); + )?; client.run().await } diff --git a/service/src/service.rs b/service/src/service.rs index a71c59ba..7f278522 100644 --- a/service/src/service.rs +++ b/service/src/service.rs @@ -1,3 +1,5 @@ +//! Post Service + use std::path::PathBuf; use eyre::Context; @@ -38,10 +40,9 @@ impl PostService { ) -> eyre::Result { let metadata = post::metadata::load(&datadir).wrap_err("loading metadata. Is POST initialized?")?; - let id = metadata.node_id; Ok(Self { - id, + id: metadata.node_id, proof_generation: None, datadir, cfg, diff --git a/service/src/test_server.rs b/service/src/test_server.rs index 1d2568a9..ddbb1a0e 100644 --- a/service/src/test_server.rs +++ b/service/src/test_server.rs @@ -1,10 +1,16 @@ +mod tls_config; + +use std::fs::read_to_string; use std::pin::Pin; use std::sync::Mutex; use std::time::Duration; +use clap::Parser; +use eyre::Context; use tokio::sync::{broadcast, mpsc, oneshot}; use tokio::time::sleep; use tokio_stream::{Stream, StreamExt}; +use tonic::transport::{Certificate, Identity, ServerTlsConfig}; use tonic::{transport::Server, Request, Response, Status}; use spacemesh_v1::post_service_server::{PostService, PostServiceServer}; @@ -71,22 +77,49 @@ impl PostService for TestPostService { } } +/// Post Service test server +#[derive(Parser, Debug)] +#[command(version, about)] +struct Cli { + #[command(flatten, next_help_heading = "TLS configuration")] + tls: Option, +} + #[tokio::main] async fn main() -> eyre::Result<()> { + let args = Cli::parse(); + let env = env_logger::Env::default().filter_or("RUST_LOG", "info"); env_logger::init_from_env(env); - let addr = "[::1]:50051".parse()?; + let server = Server::builder(); + let mut server = if let Some(tls) = args.tls { + log::info!( + "configuring TLS: CA cert: {}, cert: {}, key: {}", + tls.ca_cert.display(), + tls.cert.display(), + tls.key.display(), + ); + let ca_cert = read_to_string(tls.ca_cert)?; + let cert = read_to_string(tls.cert)?; + let key = read_to_string(tls.key)?; + + let tls = ServerTlsConfig::new() + .identity(Identity::from_pem(cert, key)) + .client_ca_root(Certificate::from_pem(ca_cert)); + + server.tls_config(tls).wrap_err("setting up mTLS")? + } else { + log::info!("not configuring TLS"); + server + }; let mut test_node = TestPostService::new(); - let mut reg = test_node.wait_for_connection(); - let _handle = tokio::spawn( - Server::builder() - .add_service(PostServiceServer::new(test_node)) - .serve(addr), - ); + let router = server.add_service(PostServiceServer::new(test_node)); + + let _handle = tokio::spawn(router.serve("[::1]:50051".parse()?)); loop { // wait for the connection to be established diff --git a/service/src/tls_config.rs b/service/src/tls_config.rs new file mode 100644 index 00000000..04cf788f --- /dev/null +++ b/service/src/tls_config.rs @@ -0,0 +1,18 @@ +use std::path::PathBuf; + +use clap::Args; + +/// TLS configuration +/// +/// Either all fields must be specified or none +#[derive(Args, Debug, Clone)] +#[group(required = false)] +pub(crate) struct Tls { + /// CA certificate + #[arg(long, required = false)] + pub(crate) ca_cert: PathBuf, + #[arg(long, required = false)] + pub(crate) cert: PathBuf, + #[arg(long, required = false)] + pub(crate) key: PathBuf, +} From 813ee08e244ef2be6f6a6c1001ec07dd404593b0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartosz=20R=C3=B3=C5=BCa=C5=84ski?= Date: Fri, 22 Sep 2023 16:23:36 +0200 Subject: [PATCH 05/12] Build service in CI --- .github/workflows/ci.yml | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 5a29d560..cd1a2fc3 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -244,6 +244,22 @@ jobs: target/release/profiler${{ matrix.os == 'windows-2019' && '.exe' || '' }} if-no-files-found: error + - uses: arduino/setup-protoc@v2 + with: + repo-token: ${{ secrets.GITHUB_TOKEN }} + - name: Build service + run: cargo build -p service --release + env: + RUSTFLAGS: ${{ matrix.rustflags }} + SDKROOT: "/Library/Developer/CommandLineTools/SDKs/MacOSX12.3.sdk" + - name: Archive service artifacts + uses: actions/upload-artifact@v3 + with: + name: service-${{ matrix.artifact-name }}${{ steps.version.output.suffix }} + path: | + target/release/service${{ matrix.os == 'windows-2019' && '.exe' || '' }} + if-no-files-found: error + release: name: Publish release if: github.event_name == 'push' && github.ref_type == 'tag' From fcffc5dfede68da8b050d484e7afcdf3bf61b0a0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartosz=20R=C3=B3=C5=BCa=C5=84ski?= Date: Mon, 25 Sep 2023 09:17:36 +0200 Subject: [PATCH 06/12] Simplify post service client --- service/src/client.rs | 154 ++++++++++++++++++------------------- service/src/main.rs | 15 +--- service/src/service.rs | 18 ++--- service/src/test_server.rs | 2 +- 4 files changed, 87 insertions(+), 102 deletions(-) diff --git a/service/src/client.rs b/service/src/client.rs index 1abf1b9c..c8995cf3 100644 --- a/service/src/client.rs +++ b/service/src/client.rs @@ -4,8 +4,6 @@ //! It connects to the node and registers itself as a Post Service. //! It then waits for requests from the node and forwards them to the Post Service. -use std::sync::Arc; -use std::sync::Mutex; use std::time::Duration; pub(crate) use spacemesh_v1::post_service_client::PostServiceClient; @@ -29,7 +27,7 @@ pub mod spacemesh_v1 { pub(crate) struct ServiceClient { endpoint: Endpoint, reconnect_interval: Duration, - service: Arc>, + service: crate::service::PostService, } impl ServiceClient { @@ -37,7 +35,7 @@ impl ServiceClient { address: String, reconnect_interval: Duration, cert: Option<(Certificate, Identity)>, - service: Arc>, + service: crate::service::PostService, ) -> eyre::Result { let endpoint = Channel::builder(address.parse()?); let endpoint = match cert { @@ -57,103 +55,103 @@ impl ServiceClient { }) } - pub(crate) async fn run(self) -> eyre::Result<()> { + pub(crate) async fn run(mut self) -> eyre::Result<()> { loop { let client = loop { match PostServiceClient::connect(self.endpoint.clone()).await { Ok(client) => break client, Err(e) => { - log::info!("could not connect to the node: {e:?}"); + log::info!("could not connect to the node: {e}"); sleep(self.reconnect_interval).await; } } }; - let res = run(client, self.service.clone()).await; - log::info!("client exited: {res:?}"); + let res = self.register_and_serve(client).await; + log::info!("disconnected: {res:?}"); sleep(self.reconnect_interval).await; } } -} -async fn run( - mut client: PostServiceClient, - service: Arc>, -) -> eyre::Result<()> { - let (tx, mut rx) = mpsc::channel::(1); - let outbound = async_stream::stream! { - while let Some(msg) = rx.recv().await { - yield msg; - } - }; + async fn register_and_serve( + &mut self, + mut client: PostServiceClient, + ) -> eyre::Result<()> { + let (tx, mut rx) = mpsc::channel::(1); + let outbound = async_stream::stream! { + while let Some(msg) = rx.recv().await { + yield msg; + } + }; - let response = client.register(Request::new(outbound)).await?; - let mut inbound = response.into_inner(); + let response = client.register(Request::new(outbound)).await?; + let mut inbound = response.into_inner(); - while let Some(request) = inbound.message().await? { - log::debug!("Got request from node: {request:?}"); - match request.kind { - Some(node_request::Kind::GenProof(req)) => { - let result = service.lock().unwrap().gen_proof(req.challenge); + while let Some(request) = inbound.message().await? { + log::debug!("Got request from node: {request:?}"); + match request.kind { + Some(node_request::Kind::GenProof(req)) => { + let result = self.service.gen_proof(req.challenge); - let resp = match result { - Ok(ProofGenState::Finished { proof, metadata }) => { - log::info!("proof generation finished"); - ServiceResponse { - kind: Some(service_response::Kind::GenProof(GenProofResponse { - proof: Some(Proof { - nonce: proof.nonce, - indices: proof.indices.into_owned(), - pow: proof.pow, - }), - metadata: Some(ProofMetadata { - challenge: metadata.challenge.to_vec(), - node_id: Some(spacemesh_v1::SmesherId { - id: metadata.node_id.to_vec(), + let resp = match result { + Ok(ProofGenState::Finished { proof, metadata }) => { + log::info!("proof generation finished"); + ServiceResponse { + kind: Some(service_response::Kind::GenProof(GenProofResponse { + proof: Some(Proof { + nonce: proof.nonce, + indices: proof.indices.into_owned(), + pow: proof.pow, }), - commitment_atx_id: Some(spacemesh_v1::ActivationId { - id: metadata.commitment_atx_id.to_vec(), + metadata: Some(ProofMetadata { + challenge: metadata.challenge.to_vec(), + node_id: Some(spacemesh_v1::SmesherId { + id: metadata.node_id.to_vec(), + }), + commitment_atx_id: Some(spacemesh_v1::ActivationId { + id: metadata.commitment_atx_id.to_vec(), + }), + num_units: metadata.num_units, + labels_per_unit: metadata.labels_per_unit, }), - num_units: metadata.num_units, - labels_per_unit: metadata.labels_per_unit, - }), - status: GenProofStatus::Ok as i32, - })), + status: GenProofStatus::Ok as i32, + })), + } } - } - Ok(ProofGenState::InProgress) => { - log::info!("proof generation in progress"); - ServiceResponse { - kind: Some(service_response::Kind::GenProof(GenProofResponse { - status: GenProofStatus::Ok as i32, - ..Default::default() - })), + Ok(ProofGenState::InProgress) => { + log::info!("proof generation in progress"); + ServiceResponse { + kind: Some(service_response::Kind::GenProof(GenProofResponse { + status: GenProofStatus::Ok as i32, + ..Default::default() + })), + } } - } - Err(e) => { - log::error!("failed to generate proof: {e:?}"); - ServiceResponse { - kind: Some(service_response::Kind::GenProof(GenProofResponse { - status: GenProofStatus::Error as i32, - ..Default::default() - })), + Err(e) => { + log::error!("failed to generate proof: {e:?}"); + ServiceResponse { + kind: Some(service_response::Kind::GenProof(GenProofResponse { + status: GenProofStatus::Error as i32, + ..Default::default() + })), + } } - } - }; + }; - tx.send(resp).await?; - } - None => { - log::warn!("Got a request with no kind"); - tx.send(ServiceResponse { - kind: Some(service_response::Kind::GenProof(GenProofResponse { - status: GenProofStatus::Error as i32, - ..Default::default() - })), - }) - .await? + tx.send(resp).await?; + } + None => { + log::warn!("Got a request with no kind"); + tx.send(ServiceResponse { + kind: Some(service_response::Kind::GenProof(GenProofResponse { + status: GenProofStatus::Error as i32, + ..Default::default() + })), + }) + .await? + } } } - } - Ok(()) + Ok(()) + } } diff --git a/service/src/main.rs b/service/src/main.rs index dccea2a6..c351fe7e 100644 --- a/service/src/main.rs +++ b/service/src/main.rs @@ -2,12 +2,7 @@ mod client; mod service; mod tls_config; -use std::{ - fs::read_to_string, - path::PathBuf, - sync::{Arc, Mutex}, - time::Duration, -}; +use std::{fs::read_to_string, path::PathBuf, time::Duration}; use clap::{Args, Parser, ValueEnum}; use eyre::Context; @@ -198,12 +193,8 @@ async fn main() -> eyre::Result<()> { None }; - let client = client::ServiceClient::new( - args.address, - args.reconnect_interval_s, - cert, - Arc::new(Mutex::new(service)), - )?; + let client = + client::ServiceClient::new(args.address, args.reconnect_interval_s, cert, service)?; client.run().await } diff --git a/service/src/service.rs b/service/src/service.rs index 7f278522..1a1a9097 100644 --- a/service/src/service.rs +++ b/service/src/service.rs @@ -96,7 +96,11 @@ impl PostService { } } - log::info!("starting proof generation for challenge {:X?}", challenge); + let ch: [u8; 32] = challenge + .as_slice() + .try_into() + .map_err(|_| eyre::eyre!("invalid challenge format"))?; + log::info!("starting proof generation for challenge {ch:X?}"); let pow_flags = self.pow_flags; let cfg = self.cfg; let datadir = self.datadir.clone(); @@ -104,18 +108,10 @@ impl PostService { let nonces = self.nonces; let threads = self.threads; self.proof_generation = Some(ProofGenProcess { - challenge: challenge.clone(), + challenge, handle: std::thread::spawn(move || { post::prove::generate_proof( - &datadir, - &challenge - .try_into() - .map_err(|_| eyre::eyre!("invalid challenge format"))?, - cfg, - nonces, - threads, - pow_flags, - miner_id, + &datadir, &ch, cfg, nonces, threads, pow_flags, miner_id, ) }), }); diff --git a/service/src/test_server.rs b/service/src/test_server.rs index ddbb1a0e..9ee44c5d 100644 --- a/service/src/test_server.rs +++ b/service/src/test_server.rs @@ -138,7 +138,7 @@ async fn main() -> eyre::Result<()> { }) .await { - log::error!("post service disconnected: {:?}", e); + log::error!("post service disconnected: {e:?}"); break; } From 86b7206603aae388815352a1e54807db922a5f0d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartosz=20R=C3=B3=C5=BCa=C5=84ski?= Date: Mon, 25 Sep 2023 13:26:39 +0200 Subject: [PATCH 07/12] Tests for post service GRPC client --- Cargo.lock | 1 + service/Cargo.toml | 9 +- service/src/bin/test_server.rs | 110 +++++++++++++ service/src/client.rs | 18 ++- service/src/lib.rs | 4 + service/src/main.rs | 28 +--- service/src/service.rs | 10 +- service/src/test_server.rs | 141 +++------------- service/src/tls_config.rs | 8 +- service/tests/test_client.rs | 284 +++++++++++++++++++++++++++++++++ 10 files changed, 450 insertions(+), 163 deletions(-) create mode 100644 service/src/bin/test_server.rs create mode 100644 service/src/lib.rs create mode 100644 service/tests/test_client.rs diff --git a/Cargo.lock b/Cargo.lock index 76512f5d..696d4a4c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2476,6 +2476,7 @@ dependencies = [ "eyre", "hex", "log", + "mockall", "post-rs", "prost", "tokio", diff --git a/service/Cargo.toml b/service/Cargo.toml index 2432a45b..2b0b0a54 100644 --- a/service/Cargo.toml +++ b/service/Cargo.toml @@ -3,9 +3,9 @@ name = "service" version = "0.1.0" edition = "2021" -[[bin]] -name = "test-server" -path = "src/test_server.rs" +[lib] +name = "post_service" +path = "src/lib.rs" [dependencies] post-rs = { path = "../" } @@ -17,13 +17,14 @@ tokio = { version = "1.0", features = [ "sync", "time", ] } -tokio-stream = "0.1" +tokio-stream = { version = "0.1", features = ["net"] } async-stream = "0.3.5" log = "0.4.20" eyre = "0.6.8" env_logger = "0.10.0" clap = { version = "4.4.4", features = ["derive"] } hex = "0.4.3" +mockall = "0.11.4" [build-dependencies] tonic-build = "0.10.0" diff --git a/service/src/bin/test_server.rs b/service/src/bin/test_server.rs new file mode 100644 index 00000000..b1e4b3e2 --- /dev/null +++ b/service/src/bin/test_server.rs @@ -0,0 +1,110 @@ +use std::fs::read_to_string; +use std::time::Duration; + +use clap::Parser; +use eyre::Context; +use tokio::sync::oneshot; +use tokio::time::sleep; +use tonic::transport::Server; +use tonic::transport::{Certificate, Identity, ServerTlsConfig}; + +use post_service::test_server::spacemesh_v1::post_service_server::PostServiceServer; +use post_service::test_server::spacemesh_v1::{ + node_request, service_response, GenProofRequest, GenProofStatus, NodeRequest, +}; +use post_service::test_server::{TestNodeRequest, TestPostService}; + +/// Post Service test server +#[derive(Parser, Debug)] +#[command(version, about)] +struct Cli { + #[command(flatten, next_help_heading = "TLS configuration")] + tls: Option, +} + +#[tokio::main] +async fn main() -> eyre::Result<()> { + let args = Cli::parse(); + + let env = env_logger::Env::default().filter_or("RUST_LOG", "info"); + env_logger::init_from_env(env); + + let server = Server::builder(); + let mut server = if let Some(tls) = args.tls { + log::info!( + "configuring TLS: CA cert: {}, cert: {}, key: {}", + tls.ca_cert.display(), + tls.cert.display(), + tls.key.display(), + ); + let ca_cert = read_to_string(tls.ca_cert)?; + let cert = read_to_string(tls.cert)?; + let key = read_to_string(tls.key)?; + + let tls = ServerTlsConfig::new() + .identity(Identity::from_pem(cert, key)) + .client_ca_root(Certificate::from_pem(ca_cert)); + + server.tls_config(tls).wrap_err("setting up mTLS")? + } else { + log::info!("not configuring TLS"); + server + }; + + let mut test_node = TestPostService::new(); + let mut reg = test_node.register_for_connections(); + + let router = server.add_service(PostServiceServer::new(test_node)); + + let _handle = tokio::spawn(router.serve("[::1]:50051".parse()?)); + + loop { + // wait for the connection to be established + let tx = reg.recv().await?; + + loop { + let (resp_tx, resp_rx) = oneshot::channel(); + if let Err(e) = tx + .send(TestNodeRequest { + request: NodeRequest { + kind: Some(node_request::Kind::GenProof(GenProofRequest { + challenge: vec![0xCA; 32], + })), + }, + response: resp_tx, + }) + .await + { + log::error!("post service disconnected: {e:?}"); + break; + } + + let resp = resp_rx.await?; + match resp.kind { + Some(service_response::Kind::GenProof(resp)) => { + log::debug!("Got GenProof response: {resp:?}"); + match resp.status() { + GenProofStatus::Ok => { + if let Some(proof) = resp.proof { + log::info!("POST proof generation finished, proof: {:?}", proof); + // break; + } + log::info!("POST proof generation in progress"); + } + GenProofStatus::Unspecified => { + log::error!("unspecified status"); + } + GenProofStatus::Error => { + log::error!("POST proof generation error"); + break; + } + } + } + _ => { + log::error!("Got unexpected response: {:?}", resp); + } + } + sleep(Duration::from_secs(5)).await; + } + } +} diff --git a/service/src/client.rs b/service/src/client.rs index c8995cf3..081c49ad 100644 --- a/service/src/client.rs +++ b/service/src/client.rs @@ -24,18 +24,23 @@ pub mod spacemesh_v1 { tonic::include_proto!("spacemesh.v1"); } -pub(crate) struct ServiceClient { +pub struct ServiceClient { endpoint: Endpoint, reconnect_interval: Duration, - service: crate::service::PostService, + service: S, } -impl ServiceClient { - pub(crate) fn new( +#[mockall::automock] +pub trait PostService { + fn gen_proof(&mut self, challenge: Vec) -> eyre::Result; +} + +impl ServiceClient { + pub fn new( address: String, reconnect_interval: Duration, cert: Option<(Certificate, Identity)>, - service: crate::service::PostService, + service: S, ) -> eyre::Result { let endpoint = Channel::builder(address.parse()?); let endpoint = match cert { @@ -55,9 +60,10 @@ impl ServiceClient { }) } - pub(crate) async fn run(mut self) -> eyre::Result<()> { + pub async fn run(mut self) -> eyre::Result<()> { loop { let client = loop { + log::debug!("connecting to the node on {}", self.endpoint.uri()); match PostServiceClient::connect(self.endpoint.clone()).await { Ok(client) => break client, Err(e) => { diff --git a/service/src/lib.rs b/service/src/lib.rs new file mode 100644 index 00000000..c61a2808 --- /dev/null +++ b/service/src/lib.rs @@ -0,0 +1,4 @@ +pub mod client; +pub mod service; +pub mod test_server; +pub mod tls_config; diff --git a/service/src/main.rs b/service/src/main.rs index c351fe7e..643c83aa 100644 --- a/service/src/main.rs +++ b/service/src/main.rs @@ -1,14 +1,12 @@ -mod client; -mod service; -mod tls_config; - use std::{fs::read_to_string, path::PathBuf, time::Duration}; use clap::{Args, Parser, ValueEnum}; use eyre::Context; -use post::pow::randomx::RandomXFlag; use tonic::transport::{Certificate, Identity}; +use post::pow::randomx::RandomXFlag; +use post_service::{client, tls_config::Tls}; + /// Post Service #[derive(Parser, Debug)] #[command(version, about)] @@ -30,7 +28,7 @@ struct Cli { post_settings: PostSettings, #[command(flatten, next_help_heading = "TLS configuration")] - tls: Option, + tls: Option, } #[derive(Args, Debug)] @@ -71,22 +69,6 @@ struct ScryptParams { p: usize, } -/// TLS configuration -/// -/// Either all fields must be specified or none -#[derive(Args, Debug, Clone)] -#[group(required = false)] -pub(crate) struct Tls { - /// server CA certificate - #[arg(long, required = false)] - ca_cert: PathBuf, - /// client certificate - #[arg(long, required = false)] - client_cert: PathBuf, - /// client key - #[arg(long, required = false)] - client_key: PathBuf, -} #[derive(Args, Debug)] /// POST proof generation settings struct PostSettings { @@ -155,7 +137,7 @@ async fn main() -> eyre::Result<()> { let env = env_logger::Env::default().filter_or("RUST_LOG", "info"); env_logger::init_from_env(env); - let service = service::PostService::new( + let service = post_service::service::PostService::new( args.dir, post::config::Config { k1: args.post_config.k1, diff --git a/service/src/service.rs b/service/src/service.rs index 1a1a9097..09ef894c 100644 --- a/service/src/service.rs +++ b/service/src/service.rs @@ -5,7 +5,7 @@ use std::path::PathBuf; use eyre::Context; use post::{metadata::ProofMetadata, pow::randomx::RandomXFlag, prove::Proof}; -pub(crate) enum ProofGenState { +pub enum ProofGenState { InProgress, Finished { proof: Proof<'static>, @@ -20,7 +20,7 @@ struct ProofGenProcess { } #[derive(Debug)] -pub(crate) struct PostService { +pub struct PostService { id: [u8; 32], datadir: PathBuf, cfg: post::config::Config, @@ -31,7 +31,7 @@ pub(crate) struct PostService { } impl PostService { - pub(crate) fn new( + pub fn new( datadir: PathBuf, cfg: post::config::Config, nonces: usize, @@ -51,8 +51,10 @@ impl PostService { pow_flags, }) } +} - pub(crate) fn gen_proof(&mut self, challenge: Vec) -> eyre::Result { +impl crate::client::PostService for PostService { + fn gen_proof(&mut self, challenge: Vec) -> eyre::Result { if let Some(process) = &mut self.proof_generation { eyre::ensure!( process.challenge == challenge, diff --git a/service/src/test_server.rs b/service/src/test_server.rs index 9ee44c5d..5404022b 100644 --- a/service/src/test_server.rs +++ b/service/src/test_server.rs @@ -1,45 +1,42 @@ -mod tls_config; - -use std::fs::read_to_string; use std::pin::Pin; -use std::sync::Mutex; -use std::time::Duration; -use clap::Parser; -use eyre::Context; use tokio::sync::{broadcast, mpsc, oneshot}; -use tokio::time::sleep; use tokio_stream::{Stream, StreamExt}; -use tonic::transport::{Certificate, Identity, ServerTlsConfig}; -use tonic::{transport::Server, Request, Response, Status}; +use tonic::{Request, Response, Status}; -use spacemesh_v1::post_service_server::{PostService, PostServiceServer}; +use spacemesh_v1::post_service_server::PostService; use spacemesh_v1::{NodeRequest, ServiceResponse}; - -use spacemesh_v1::node_request; -use spacemesh_v1::GenProofRequest; pub mod spacemesh_v1 { tonic::include_proto!("spacemesh.v1"); } -struct TestNodeRequest { - request: NodeRequest, - response: oneshot::Sender, +#[derive(Debug)] +pub struct TestNodeRequest { + pub request: NodeRequest, + pub response: oneshot::Sender, } #[derive(Debug)] pub struct TestPostService { - registered: Mutex>>, + registered: broadcast::Sender>, } impl TestPostService { - fn new() -> Self { + pub fn new() -> Self { Self { - registered: Mutex::new(broadcast::channel(1).0), + registered: broadcast::channel(1).0, } } - fn wait_for_connection(&mut self) -> broadcast::Receiver> { - self.registered.lock().unwrap().subscribe() + pub fn register_for_connections( + &mut self, + ) -> broadcast::Receiver> { + self.registered.subscribe() + } +} + +impl Default for TestPostService { + fn default() -> Self { + Self::new() } } @@ -56,8 +53,6 @@ impl PostService for TestPostService { let (tx, mut rx) = tokio::sync::mpsc::channel(1); self.registered - .lock() - .unwrap() .send(tx) .expect("nobody is interested in post service registered"); @@ -76,101 +71,3 @@ impl PostService for TestPostService { Ok(Response::new(Box::pin(output) as Self::RegisterStream)) } } - -/// Post Service test server -#[derive(Parser, Debug)] -#[command(version, about)] -struct Cli { - #[command(flatten, next_help_heading = "TLS configuration")] - tls: Option, -} - -#[tokio::main] -async fn main() -> eyre::Result<()> { - let args = Cli::parse(); - - let env = env_logger::Env::default().filter_or("RUST_LOG", "info"); - env_logger::init_from_env(env); - - let server = Server::builder(); - let mut server = if let Some(tls) = args.tls { - log::info!( - "configuring TLS: CA cert: {}, cert: {}, key: {}", - tls.ca_cert.display(), - tls.cert.display(), - tls.key.display(), - ); - let ca_cert = read_to_string(tls.ca_cert)?; - let cert = read_to_string(tls.cert)?; - let key = read_to_string(tls.key)?; - - let tls = ServerTlsConfig::new() - .identity(Identity::from_pem(cert, key)) - .client_ca_root(Certificate::from_pem(ca_cert)); - - server.tls_config(tls).wrap_err("setting up mTLS")? - } else { - log::info!("not configuring TLS"); - server - }; - - let mut test_node = TestPostService::new(); - let mut reg = test_node.wait_for_connection(); - - let router = server.add_service(PostServiceServer::new(test_node)); - - let _handle = tokio::spawn(router.serve("[::1]:50051".parse()?)); - - loop { - // wait for the connection to be established - let tx = reg.recv().await?; - - loop { - let (resp_tx, resp_rx) = oneshot::channel(); - if let Err(e) = tx - .send(TestNodeRequest { - request: NodeRequest { - kind: Some(node_request::Kind::GenProof(GenProofRequest { - challenge: vec![0xCA; 32], - })), - }, - response: resp_tx, - }) - .await - { - log::error!("post service disconnected: {e:?}"); - break; - } - - let resp = resp_rx.await?; - match resp.kind { - Some(spacemesh_v1::service_response::Kind::GenProof(resp)) => { - log::debug!("Got GenProof response: {resp:?}"); - match resp.status() { - spacemesh_v1::GenProofStatus::Ok => { - if let Some(proof) = resp.proof { - log::info!("POST proof generation finished, proof: {:?}", proof); - break; - } - log::info!("POST proof generation in progress"); - } - spacemesh_v1::GenProofStatus::Unspecified => { - log::error!("unspecified status"); - } - spacemesh_v1::GenProofStatus::Error => { - log::error!("POST proof generation error"); - break; - } - } - } - _ => { - log::error!("Got unexpected response: {:?}", resp); - } - } - sleep(Duration::from_secs(5)).await; - } - } - - // _ = handle.await?; - // Ok(()) -} diff --git a/service/src/tls_config.rs b/service/src/tls_config.rs index 04cf788f..d67ee1a6 100644 --- a/service/src/tls_config.rs +++ b/service/src/tls_config.rs @@ -7,12 +7,12 @@ use clap::Args; /// Either all fields must be specified or none #[derive(Args, Debug, Clone)] #[group(required = false)] -pub(crate) struct Tls { +pub struct Tls { /// CA certificate #[arg(long, required = false)] - pub(crate) ca_cert: PathBuf, + pub ca_cert: PathBuf, #[arg(long, required = false)] - pub(crate) cert: PathBuf, + pub cert: PathBuf, #[arg(long, required = false)] - pub(crate) key: PathBuf, + pub key: PathBuf, } diff --git a/service/tests/test_client.rs b/service/tests/test_client.rs new file mode 100644 index 00000000..c6229b2c --- /dev/null +++ b/service/tests/test_client.rs @@ -0,0 +1,284 @@ +use std::borrow::Cow; + +use tokio::{ + net::TcpListener, + sync::{broadcast, mpsc, oneshot}, +}; +use tokio_stream::wrappers::TcpListenerStream; +use tonic::transport::{Error, Server}; + +use post::{metadata::ProofMetadata, prove::Proof}; +use post_service::{ + client::{MockPostService, PostService, ServiceClient}, + service::ProofGenState, + test_server::{ + spacemesh_v1::{ + node_request, post_service_server::PostServiceServer, service_response, + GenProofRequest, GenProofResponse, GenProofStatus, NodeRequest, + }, + TestNodeRequest, TestPostService, + }, +}; + +struct TestServer { + connected: broadcast::Receiver>, + handle: tokio::task::JoinHandle>, + addr: std::net::SocketAddr, +} + +impl Drop for TestServer { + fn drop(&mut self) { + self.handle.abort(); + } +} + +impl TestServer { + fn create_client(&self, service: S) -> ServiceClient + where + S: PostService, + { + ServiceClient::new( + format!("http://{}", self.addr), + std::time::Duration::from_secs(1), + None, + service, + ) + .unwrap() + } +} + +impl TestServer { + async fn new() -> Self { + let mut test_node = TestPostService::new(); + let reg = test_node.register_for_connections(); + + let listener = TcpListener::bind("[::1]:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + + let server = tokio::spawn( + Server::builder() + .add_service(PostServiceServer::new(test_node)) + .serve_with_incoming(TcpListenerStream::new(listener)), + ); + + TestServer { + connected: reg, + handle: server, + addr, + } + } +} + +#[tokio::test] +async fn test_registers() { + let mut test_server = TestServer::new().await; + let client = test_server.create_client(MockPostService::new()); + let client_handle = tokio::spawn(client.run()); + + // Check if client registered + test_server.connected.recv().await.unwrap(); + client_handle.abort(); + let _ = client_handle.await; +} + +#[tokio::test] +async fn test_gen_proof_in_progress() { + let mut test_server = TestServer::new().await; + + let mut service = MockPostService::new(); + service + .expect_gen_proof() + .returning(|_| Ok(ProofGenState::InProgress)); + + let client = test_server.create_client(service); + let client_handle = tokio::spawn(client.run()); + + let connected = test_server.connected.recv().await.unwrap(); + + let (resp_tx, resp_rx) = oneshot::channel(); + connected + .send(TestNodeRequest { + request: NodeRequest { + kind: Some(node_request::Kind::GenProof(GenProofRequest { + challenge: vec![0xCA; 32], + })), + }, + response: resp_tx, + }) + .await + .unwrap(); + + let response = resp_rx.await.unwrap(); + let _exp_status = GenProofStatus::Ok as i32; + assert!(matches!( + response.kind.unwrap(), + service_response::Kind::GenProof(GenProofResponse { + status: _exp_status, + proof: None, + metadata: None + }) + )); + + client_handle.abort(); + let _ = client_handle.await; +} + +#[tokio::test] +async fn test_gen_proof_failed() { + let mut test_server = TestServer::new().await; + + let mut service = MockPostService::new(); + service + .expect_gen_proof() + .returning(|_| Err(eyre::eyre!("failed to generate proof"))); + + let client = test_server.create_client(service); + let client_handle = tokio::spawn(client.run()); + + let connected = test_server.connected.recv().await.unwrap(); + + let (response, resp_rx) = oneshot::channel(); + connected + .send(TestNodeRequest { + request: NodeRequest { + kind: Some(node_request::Kind::GenProof(GenProofRequest { + challenge: vec![0xCA; 32], + })), + }, + response, + }) + .await + .unwrap(); + + let response = resp_rx.await.unwrap(); + let _exp_status = GenProofStatus::Error as i32; + assert!(matches!( + response.kind.unwrap(), + service_response::Kind::GenProof(GenProofResponse { + status: _exp_status, + proof: None, + metadata: None + }) + )); + + client_handle.abort(); + let _ = client_handle.await; +} + +#[tokio::test] +async fn test_gen_proof_finished() { + let mut test_server = TestServer::new().await; + + let mut service = MockPostService::new(); + + let challenge = &[0xCA; 32]; + let indices = &[0xAA; 32]; + let node_id = &[0xBB; 32]; + let commitment_atx_id = &[0xCC; 32]; + + service.expect_gen_proof().returning(move |c| { + assert_eq!(c.as_slice(), challenge); + Ok(ProofGenState::Finished { + proof: Proof { + nonce: 1, + indices: Cow::Owned(indices.to_vec()), + pow: 7, + pow_creator: None, + }, + metadata: ProofMetadata { + node_id: *node_id, + commitment_atx_id: *commitment_atx_id, + challenge: *challenge, + num_units: 4, + labels_per_unit: 256, + }, + }) + }); + + let client = test_server.create_client(service); + let client_handle = tokio::spawn(client.run()); + + let connected = test_server.connected.recv().await.unwrap(); + + let (response, resp_rx) = oneshot::channel(); + connected + .send(TestNodeRequest { + request: NodeRequest { + kind: Some(node_request::Kind::GenProof(GenProofRequest { + challenge: challenge.to_vec(), + })), + }, + response, + }) + .await + .unwrap(); + + let response = resp_rx.await.unwrap(); + let _exp_status = GenProofStatus::Ok as i32; + let _exp_proof = post_service::test_server::spacemesh_v1::Proof { + nonce: 1, + indices: indices.to_vec(), + pow: 7, + }; + let _exp_metadata = post_service::test_server::spacemesh_v1::ProofMetadata { + challenge: challenge.to_vec(), + node_id: Some(post_service::test_server::spacemesh_v1::SmesherId { + id: node_id.to_vec(), + }), + commitment_atx_id: Some(post_service::test_server::spacemesh_v1::ActivationId { + id: commitment_atx_id.to_vec(), + }), + num_units: 7, + labels_per_unit: 256, + }; + + assert!(matches!( + response.kind.unwrap(), + service_response::Kind::GenProof(GenProofResponse { + status: _exp_status, + proof: Some(_exp_proof), + metadata: Some(_exp_metadata), + }) + )); + + client_handle.abort(); + let _ = client_handle.await; +} + +#[tokio::test] +async fn test_broken_request_no_kind() { + let mut test_server = TestServer::new().await; + + let mut service = MockPostService::new(); + service + .expect_gen_proof() + .returning(|_| Err(eyre::eyre!("failed to generate proof"))); + + let client = test_server.create_client(service); + let client_handle = tokio::spawn(client.run()); + + let connected = test_server.connected.recv().await.unwrap(); + + let (response, resp_rx) = oneshot::channel(); + connected + .send(TestNodeRequest { + request: NodeRequest { kind: None }, + response, + }) + .await + .unwrap(); + + let response = resp_rx.await.unwrap(); + let _exp_status = GenProofStatus::Error as i32; + assert!(matches!( + response.kind.unwrap(), + service_response::Kind::GenProof(GenProofResponse { + status: _exp_status, + proof: None, + metadata: None + }) + )); + + client_handle.abort(); + let _ = client_handle.await; +} From 468028e1d388a41b6d66c3bb39291250a2bca1f9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartosz=20R=C3=B3=C5=BCa=C5=84ski?= Date: Mon, 25 Sep 2023 15:30:25 +0200 Subject: [PATCH 08/12] Tests for post service --- Cargo.lock | 1 + ffi/src/post_impl.rs | 4 +- service/Cargo.toml | 3 + service/src/service.rs | 44 ++++++++++- service/tests/test_service.rs | 144 ++++++++++++++++++++++++++++++++++ src/prove.rs | 55 ++++++++----- tests/generate_and_verify.rs | 19 ++++- 7 files changed, 247 insertions(+), 23 deletions(-) create mode 100644 service/tests/test_service.rs diff --git a/Cargo.lock b/Cargo.lock index 696d4a4c..1e053e8f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2479,6 +2479,7 @@ dependencies = [ "mockall", "post-rs", "prost", + "tempfile", "tokio", "tokio-stream", "tonic", diff --git a/ffi/src/post_impl.rs b/ffi/src/post_impl.rs index 7e702df8..acb30d75 100644 --- a/ffi/src/post_impl.rs +++ b/ffi/src/post_impl.rs @@ -5,6 +5,7 @@ use std::{ ffi::{c_char, c_uchar, CStr}, mem::ManuallyDrop, path::Path, + sync::atomic::AtomicBool, }; pub use post::config::Config; @@ -144,8 +145,9 @@ fn _generate_proof( Some(miner_id.try_into()?) }; + let stop = AtomicBool::new(false); let proof = prove::generate_proof( - datadir, challenge, cfg, nonces, threads, pow_flags, miner_id, + datadir, challenge, cfg, nonces, threads, pow_flags, miner_id, stop, )?; Ok(Box::new(Proof::from(proof))) } diff --git a/service/Cargo.toml b/service/Cargo.toml index 2b0b0a54..bba051c9 100644 --- a/service/Cargo.toml +++ b/service/Cargo.toml @@ -28,3 +28,6 @@ mockall = "0.11.4" [build-dependencies] tonic-build = "0.10.0" + +[dev-dependencies] +tempfile = "3.8.0" diff --git a/service/src/service.rs b/service/src/service.rs index 09ef894c..39f667c6 100644 --- a/service/src/service.rs +++ b/service/src/service.rs @@ -1,6 +1,9 @@ //! Post Service -use std::path::PathBuf; +use std::{ + path::PathBuf, + sync::{atomic::AtomicBool, Arc}, +}; use eyre::Context; use post::{metadata::ProofMetadata, pow::randomx::RandomXFlag, prove::Proof}; @@ -28,6 +31,8 @@ pub struct PostService { threads: usize, pow_flags: RandomXFlag, proof_generation: Option, + + stop: Arc, } impl PostService { @@ -49,6 +54,7 @@ impl PostService { nonces, threads, pow_flags, + stop: Arc::new(AtomicBool::new(false)), }) } } @@ -109,11 +115,12 @@ impl crate::client::PostService for PostService { let miner_id = Some(self.id); let nonces = self.nonces; let threads = self.threads; + let stop = self.stop.clone(); self.proof_generation = Some(ProofGenProcess { challenge, handle: std::thread::spawn(move || { post::prove::generate_proof( - &datadir, &ch, cfg, nonces, threads, pow_flags, miner_id, + &datadir, &ch, cfg, nonces, threads, pow_flags, miner_id, stop, ) }), }); @@ -121,3 +128,36 @@ impl crate::client::PostService for PostService { Ok(ProofGenState::InProgress) } } + +impl Drop for PostService { + fn drop(&mut self) { + log::info!("shutting down post service"); + if let Some(process) = self.proof_generation.take() { + log::debug!("killing proof generation process"); + self.stop.store(true, std::sync::atomic::Ordering::Relaxed); + let _ = process.handle.join().unwrap(); + log::debug!("proof generation process exited"); + } + } +} + +#[cfg(test)] +mod tests { + #[test] + fn needs_post_data() { + assert!(super::PostService::new( + std::path::PathBuf::from(""), + post::config::Config { + k1: 8, + k2: 4, + k3: 4, + pow_difficulty: [0xFF; 32], + scrypt: post::ScryptParams::new(0, 0, 0), + }, + 16, + 1, + post::pow::randomx::RandomXFlag::get_recommended_flags(), + ) + .is_err()); + } +} diff --git a/service/tests/test_service.rs b/service/tests/test_service.rs new file mode 100644 index 00000000..cfc067d8 --- /dev/null +++ b/service/tests/test_service.rs @@ -0,0 +1,144 @@ +use std::{thread::sleep, time::Duration}; + +use post::{ + initialize::{CpuInitializer, Initialize}, + pow::randomx::{PoW, RandomXFlag}, + verification::{Verifier, VerifyingParams}, + ScryptParams, +}; +use post_service::{client::PostService, service::ProofGenState}; + +#[test] +fn test_generate_and_verify() { + // Initialize some data + let labels_per_unit = 256; + let datadir = tempfile::tempdir().unwrap(); + + let cfg = post::config::Config { + k1: 8, + k2: 4, + k3: 4, + pow_difficulty: [0xFF; 32], + scrypt: ScryptParams::new(0, 0, 0), + }; + + CpuInitializer::new(cfg.scrypt) + .initialize( + datadir.path(), + &[0xBE; 32], + &[0xCE; 32], + labels_per_unit, + 4, + labels_per_unit, + None, + ) + .unwrap(); + + let pow_flags = RandomXFlag::get_recommended_flags(); + + // Generate a proof + let mut service = + post_service::service::PostService::new(datadir.into_path(), cfg, 16, 1, pow_flags) + .unwrap(); + + let (proof, metadata) = loop { + if let ProofGenState::Finished { proof, metadata } = + service.gen_proof(vec![0xCA; 32]).unwrap() + { + break (proof, metadata); + } + sleep(Duration::from_millis(10)); + }; + + // Verify the proof + let verifier = Verifier::new(Box::new(PoW::new(pow_flags).unwrap())); + verifier + .verify( + &proof, + &metadata, + VerifyingParams::new(&metadata, &cfg).unwrap(), + ) + .expect("proof should be valid"); +} + +#[test] +fn reject_invalid_challenge() { + // Initialize some data + let labels_per_unit = 256; + let datadir = tempfile::tempdir().unwrap(); + + let cfg = post::config::Config { + k1: 8, + k2: 4, + k3: 4, + pow_difficulty: [0xFF; 32], + scrypt: ScryptParams::new(0, 0, 0), + }; + + CpuInitializer::new(cfg.scrypt) + .initialize( + datadir.path(), + &[0xBE; 32], + &[0xCE; 32], + labels_per_unit, + 4, + labels_per_unit, + None, + ) + .unwrap(); + + // Generate a proof + let mut service = post_service::service::PostService::new( + datadir.into_path(), + cfg, + 16, + 1, + RandomXFlag::get_recommended_flags(), + ) + .unwrap(); + assert!(service.gen_proof(vec![0xCA; 5]).is_err()); +} + +#[test] +fn cannot_run_parallel_proof_gens() { + // Initialize some data + let labels_per_unit = 256; + let datadir = tempfile::tempdir().unwrap(); + + let cfg = post::config::Config { + k1: 8, + k2: 4, + k3: 4, + pow_difficulty: [0xFF; 32], + scrypt: ScryptParams::new(0, 0, 0), + }; + + CpuInitializer::new(cfg.scrypt) + .initialize( + datadir.path(), + &[0xBE; 32], + &[0xCE; 32], + labels_per_unit, + 4, + labels_per_unit, + None, + ) + .unwrap(); + + // Generate a proof + let mut service = post_service::service::PostService::new( + datadir.into_path(), + cfg, + 16, + 1, + RandomXFlag::get_recommended_flags(), + ) + .unwrap(); + + let result = service.gen_proof(vec![0xAA; 32]); + assert!(matches!(result, Ok(ProofGenState::InProgress))); + // Try to generate another proof with a different challenge + assert!(service.gen_proof(vec![0xBB; 5]).is_err()); + // Try again with the same challenge + assert!(matches!(result, Ok(ProofGenState::InProgress))); +} diff --git a/src/prove.rs b/src/prove.rs index 9e73b046..aa2afcb3 100644 --- a/src/prove.rs +++ b/src/prove.rs @@ -14,7 +14,14 @@ use eyre::Context; use primitive_types::U256; use randomx_rs::RandomXFlag; use rayon::prelude::{ParallelBridge, ParallelIterator}; -use std::{borrow::Cow, collections::HashMap, ops::Range, path::Path, sync::Mutex, time::Instant}; +use std::{ + borrow::{Borrow, Cow}, + collections::HashMap, + ops::Range, + path::Path, + sync::{atomic::AtomicBool, Mutex}, + time::Instant, +}; use crate::{ cipher::AesCipher, @@ -262,7 +269,7 @@ impl Prover for Prover8_56 { } /// Generate a proof that data is still held, given the challenge. -pub fn generate_proof( +pub fn generate_proof( datadir: &Path, challenge: &[u8; 32], cfg: Config, @@ -270,7 +277,12 @@ pub fn generate_proof( threads: usize, pow_flags: RandomXFlag, miner_id: Option<[u8; 32]>, -) -> eyre::Result> { + stop: Stopper, +) -> eyre::Result> +where + Stopper: Borrow, +{ + let stop = stop.borrow(); let metadata = metadata::load(datadir).wrap_err("loading metadata")?; let params = ProvingParams::new(&metadata, &cfg)?; log::info!("generating proof with PoW flags: {pow_flags:?} and params: {params:?}"); @@ -286,6 +298,10 @@ pub fn generate_proof( let total_time = Instant::now(); loop { + if stop.load(std::sync::atomic::Ordering::Relaxed) { + eyre::bail!("proof generation was stopped"); + } + let indexes = Mutex::new(HashMap::>::new()); let pow_time = Instant::now(); @@ -307,21 +323,24 @@ pub fn generate_proof( let data_reader = read_data(datadir, 1024 * 1024, metadata.max_file_size)?; log::info!("Started reading POST data"); let result = pool.install(|| { - data_reader.par_bridge().find_map_any(|batch| { - prover.prove( - &batch.data, - batch.pos / BLOCK_SIZE as u64, - |nonce, index| { - let mut indexes = indexes.lock().unwrap(); - let vec = indexes.entry(nonce).or_default(); - vec.push(index); - if vec.len() >= cfg.k2 as usize { - return Some(std::mem::take(vec)); - } - None - }, - ) - }) + data_reader + .par_bridge() + .take_any_while(|_| !stop.load(std::sync::atomic::Ordering::Relaxed)) + .find_map_any(|batch| { + prover.prove( + &batch.data, + batch.pos / BLOCK_SIZE as u64, + |nonce, index| { + let mut indexes = indexes.lock().unwrap(); + let vec = indexes.entry(nonce).or_default(); + vec.push(index); + if vec.len() >= cfg.k2 as usize { + return Some(std::mem::take(vec)); + } + None + }, + ) + }) }); let read_mins = read_time.elapsed().as_secs() / 60; diff --git a/tests/generate_and_verify.rs b/tests/generate_and_verify.rs index 5dc3a729..d314bf61 100644 --- a/tests/generate_and_verify.rs +++ b/tests/generate_and_verify.rs @@ -1,3 +1,5 @@ +use std::sync::atomic::AtomicBool; + use post::{ initialize::{CpuInitializer, Initialize}, metadata::ProofMetadata, @@ -39,7 +41,18 @@ fn test_generate_and_verify() { let pow_flags = RandomXFlag::get_recommended_flags(); // Generate a proof - let proof = generate_proof(datadir.path(), challenge, cfg, 32, 1, pow_flags, miner_id).unwrap(); + let stop = AtomicBool::new(false); + let proof = generate_proof( + datadir.path(), + challenge, + cfg, + 32, + 1, + pow_flags, + miner_id, + stop, + ) + .unwrap(); // Verify the proof let metadata = ProofMetadata { @@ -101,7 +114,9 @@ fn test_generate_and_verify_difficulty_msb_not_zero() { let pow_flags = RandomXFlag::get_recommended_flags(); // Generate a proof - let proof = generate_proof(datadir.path(), challenge, cfg, 32, 1, pow_flags, None).unwrap(); + let stop = AtomicBool::new(false); + let proof = + generate_proof(datadir.path(), challenge, cfg, 32, 1, pow_flags, None, stop).unwrap(); // Verify the proof let metadata = ProofMetadata { From 08e23fc54467648fdeefc76f325f6764553268cf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartosz=20R=C3=B3=C5=BCa=C5=84ski?= Date: Tue, 26 Sep 2023 11:51:59 +0200 Subject: [PATCH 09/12] Post service verifies the proof after generation --- service/src/client.rs | 133 ++++++++++++++++++++------------- service/src/service.rs | 33 ++++++--- service/tests/test_client.rs | 134 +++++++++++++++++----------------- service/tests/test_service.rs | 18 ++--- src/prove.rs | 1 + 5 files changed, 181 insertions(+), 138 deletions(-) diff --git a/service/src/client.rs b/service/src/client.rs index 081c49ad..5bc10642 100644 --- a/service/src/client.rs +++ b/service/src/client.rs @@ -8,7 +8,9 @@ use std::time::Duration; pub(crate) use spacemesh_v1::post_service_client::PostServiceClient; use spacemesh_v1::{node_request, service_response}; -use spacemesh_v1::{GenProofResponse, GenProofStatus, Proof, ProofMetadata, ServiceResponse}; +use spacemesh_v1::{ + GenProofRequest, GenProofResponse, GenProofStatus, Proof, ProofMetadata, ServiceResponse, +}; use tokio::sync::mpsc; use tokio::time::sleep; use tonic::transport::Certificate; @@ -31,8 +33,29 @@ pub struct ServiceClient { } #[mockall::automock] +#[allow(clippy::needless_lifetimes)] pub trait PostService { - fn gen_proof(&mut self, challenge: Vec) -> eyre::Result; + fn gen_proof(&self, challenge: Vec) -> eyre::Result; + + fn verify_proof<'a>( + &self, + proof: &post::prove::Proof<'a>, + metadata: &post::metadata::ProofMetadata, + ) -> eyre::Result<()>; +} + +impl PostService for std::sync::Arc { + fn gen_proof(&self, challenge: Vec) -> eyre::Result { + self.as_ref().gen_proof(challenge) + } + + fn verify_proof( + &self, + proof: &post::prove::Proof, + metadata: &post::metadata::ProofMetadata, + ) -> eyre::Result<()> { + self.as_ref().verify_proof(proof, metadata) + } } impl ServiceClient { @@ -96,53 +119,7 @@ impl ServiceClient { log::debug!("Got request from node: {request:?}"); match request.kind { Some(node_request::Kind::GenProof(req)) => { - let result = self.service.gen_proof(req.challenge); - - let resp = match result { - Ok(ProofGenState::Finished { proof, metadata }) => { - log::info!("proof generation finished"); - ServiceResponse { - kind: Some(service_response::Kind::GenProof(GenProofResponse { - proof: Some(Proof { - nonce: proof.nonce, - indices: proof.indices.into_owned(), - pow: proof.pow, - }), - metadata: Some(ProofMetadata { - challenge: metadata.challenge.to_vec(), - node_id: Some(spacemesh_v1::SmesherId { - id: metadata.node_id.to_vec(), - }), - commitment_atx_id: Some(spacemesh_v1::ActivationId { - id: metadata.commitment_atx_id.to_vec(), - }), - num_units: metadata.num_units, - labels_per_unit: metadata.labels_per_unit, - }), - status: GenProofStatus::Ok as i32, - })), - } - } - Ok(ProofGenState::InProgress) => { - log::info!("proof generation in progress"); - ServiceResponse { - kind: Some(service_response::Kind::GenProof(GenProofResponse { - status: GenProofStatus::Ok as i32, - ..Default::default() - })), - } - } - Err(e) => { - log::error!("failed to generate proof: {e:?}"); - ServiceResponse { - kind: Some(service_response::Kind::GenProof(GenProofResponse { - status: GenProofStatus::Error as i32, - ..Default::default() - })), - } - } - }; - + let resp = self.generate_and_verify_proof(req); tx.send(resp).await?; } None => { @@ -160,4 +137,62 @@ impl ServiceClient { Ok(()) } + + fn generate_and_verify_proof(&self, request: GenProofRequest) -> ServiceResponse { + let result = self.service.gen_proof(request.challenge); + + match result { + Ok(ProofGenState::Finished { proof, metadata }) => { + log::info!("proof generation finished"); + if let Err(err) = self.service.verify_proof(&proof, &metadata) { + log::error!("generated proof is not valid: {err:?}"); + return ServiceResponse { + kind: Some(service_response::Kind::GenProof(GenProofResponse { + status: GenProofStatus::Error as i32, + ..Default::default() + })), + }; + } + ServiceResponse { + kind: Some(service_response::Kind::GenProof(GenProofResponse { + proof: Some(Proof { + nonce: proof.nonce, + indices: proof.indices.into_owned(), + pow: proof.pow, + }), + metadata: Some(ProofMetadata { + challenge: metadata.challenge.to_vec(), + node_id: Some(spacemesh_v1::SmesherId { + id: metadata.node_id.to_vec(), + }), + commitment_atx_id: Some(spacemesh_v1::ActivationId { + id: metadata.commitment_atx_id.to_vec(), + }), + num_units: metadata.num_units, + labels_per_unit: metadata.labels_per_unit, + }), + status: GenProofStatus::Ok as i32, + })), + } + } + Ok(ProofGenState::InProgress) => { + log::info!("proof generation in progress"); + ServiceResponse { + kind: Some(service_response::Kind::GenProof(GenProofResponse { + status: GenProofStatus::Ok as i32, + ..Default::default() + })), + } + } + Err(e) => { + log::error!("failed to generate proof: {e:?}"); + ServiceResponse { + kind: Some(service_response::Kind::GenProof(GenProofResponse { + status: GenProofStatus::Error as i32, + ..Default::default() + })), + } + } + } + } } diff --git a/service/src/service.rs b/service/src/service.rs index 39f667c6..921cbb56 100644 --- a/service/src/service.rs +++ b/service/src/service.rs @@ -2,11 +2,16 @@ use std::{ path::PathBuf, - sync::{atomic::AtomicBool, Arc}, + sync::{atomic::AtomicBool, Arc, Mutex}, }; use eyre::Context; -use post::{metadata::ProofMetadata, pow::randomx::RandomXFlag, prove::Proof}; +use post::{ + metadata::ProofMetadata, + pow::randomx::{PoW, RandomXFlag}, + prove::Proof, + verification::{Verifier, VerifyingParams}, +}; pub enum ProofGenState { InProgress, @@ -22,7 +27,6 @@ struct ProofGenProcess { challenge: Vec, } -#[derive(Debug)] pub struct PostService { id: [u8; 32], datadir: PathBuf, @@ -30,8 +34,9 @@ pub struct PostService { nonces: usize, threads: usize, pow_flags: RandomXFlag, - proof_generation: Option, + proof_generation: Mutex>, + verifier: Verifier, stop: Arc, } @@ -48,20 +53,22 @@ impl PostService { Ok(Self { id: metadata.node_id, - proof_generation: None, + proof_generation: Mutex::new(None), datadir, cfg, nonces, threads, pow_flags, + verifier: Verifier::new(Box::new(PoW::new(RandomXFlag::get_recommended_flags())?)), stop: Arc::new(AtomicBool::new(false)), }) } } impl crate::client::PostService for PostService { - fn gen_proof(&mut self, challenge: Vec) -> eyre::Result { - if let Some(process) = &mut self.proof_generation { + fn gen_proof(&self, challenge: Vec) -> eyre::Result { + let mut proof_gen = self.proof_generation.lock().unwrap(); + if let Some(process) = proof_gen.as_mut() { eyre::ensure!( process.challenge == challenge, "proof generation is in progress for a different challenge (current: {:X?}, requested: {:X?})", process.challenge, challenge, @@ -69,7 +76,7 @@ impl crate::client::PostService for PostService { if process.handle.is_finished() { log::info!("proof generation is finished"); - let result = match self.proof_generation.take().unwrap().handle.join() { + let result = match proof_gen.take().unwrap().handle.join() { Ok(result) => result, Err(err) => { std::panic::resume_unwind(err); @@ -116,7 +123,7 @@ impl crate::client::PostService for PostService { let nonces = self.nonces; let threads = self.threads; let stop = self.stop.clone(); - self.proof_generation = Some(ProofGenProcess { + *proof_gen = Some(ProofGenProcess { challenge, handle: std::thread::spawn(move || { post::prove::generate_proof( @@ -127,12 +134,18 @@ impl crate::client::PostService for PostService { Ok(ProofGenState::InProgress) } + + fn verify_proof(&self, proof: &Proof, metadata: &ProofMetadata) -> eyre::Result<()> { + self.verifier + .verify(proof, metadata, VerifyingParams::new(metadata, &self.cfg)?) + .wrap_err("verifying proof") + } } impl Drop for PostService { fn drop(&mut self) { log::info!("shutting down post service"); - if let Some(process) = self.proof_generation.take() { + if let Some(process) = self.proof_generation.lock().unwrap().take() { log::debug!("killing proof generation process"); self.stop.store(true, std::sync::atomic::Ordering::Relaxed); let _ = process.handle.join().unwrap(); diff --git a/service/tests/test_client.rs b/service/tests/test_client.rs index c6229b2c..b6eb2f0f 100644 --- a/service/tests/test_client.rs +++ b/service/tests/test_client.rs @@ -1,4 +1,4 @@ -use std::borrow::Cow; +use std::{borrow::Cow, sync::Arc}; use tokio::{ net::TcpListener, @@ -14,7 +14,7 @@ use post_service::{ test_server::{ spacemesh_v1::{ node_request, post_service_server::PostServiceServer, service_response, - GenProofRequest, GenProofResponse, GenProofStatus, NodeRequest, + GenProofRequest, GenProofResponse, GenProofStatus, NodeRequest, ServiceResponse, }, TestNodeRequest, TestPostService, }, @@ -32,21 +32,6 @@ impl Drop for TestServer { } } -impl TestServer { - fn create_client(&self, service: S) -> ServiceClient - where - S: PostService, - { - ServiceClient::new( - format!("http://{}", self.addr), - std::time::Duration::from_secs(1), - None, - service, - ) - .unwrap() - } -} - impl TestServer { async fn new() -> Self { let mut test_node = TestPostService::new(); @@ -67,12 +52,42 @@ impl TestServer { addr, } } + + fn create_client(&self, service: S) -> ServiceClient + where + S: PostService, + { + ServiceClient::new( + format!("http://{}", self.addr), + std::time::Duration::from_secs(1), + None, + service, + ) + .unwrap() + } + + async fn generate_proof( + connected: &mpsc::Sender, + challenge: Vec, + ) -> ServiceResponse { + let (response, resp_rx) = oneshot::channel(); + connected + .send(TestNodeRequest { + request: NodeRequest { + kind: Some(node_request::Kind::GenProof(GenProofRequest { challenge })), + }, + response, + }) + .await + .unwrap(); + resp_rx.await.unwrap() + } } #[tokio::test] async fn test_registers() { let mut test_server = TestServer::new().await; - let client = test_server.create_client(MockPostService::new()); + let client = test_server.create_client(Arc::new(MockPostService::new())); let client_handle = tokio::spawn(client.run()); // Check if client registered @@ -89,26 +104,13 @@ async fn test_gen_proof_in_progress() { service .expect_gen_proof() .returning(|_| Ok(ProofGenState::InProgress)); - - let client = test_server.create_client(service); + let service = Arc::new(service); + let client = test_server.create_client(service.clone()); let client_handle = tokio::spawn(client.run()); let connected = test_server.connected.recv().await.unwrap(); + let response = TestServer::generate_proof(&connected, vec![0xCA; 32]).await; - let (resp_tx, resp_rx) = oneshot::channel(); - connected - .send(TestNodeRequest { - request: NodeRequest { - kind: Some(node_request::Kind::GenProof(GenProofRequest { - challenge: vec![0xCA; 32], - })), - }, - response: resp_tx, - }) - .await - .unwrap(); - - let response = resp_rx.await.unwrap(); let _exp_status = GenProofStatus::Ok as i32; assert!(matches!( response.kind.unwrap(), @@ -132,25 +134,13 @@ async fn test_gen_proof_failed() { .expect_gen_proof() .returning(|_| Err(eyre::eyre!("failed to generate proof"))); - let client = test_server.create_client(service); + let service = Arc::new(service); + let client = test_server.create_client(service.clone()); let client_handle = tokio::spawn(client.run()); let connected = test_server.connected.recv().await.unwrap(); + let response = TestServer::generate_proof(&connected, vec![0xCA; 32]).await; - let (response, resp_rx) = oneshot::channel(); - connected - .send(TestNodeRequest { - request: NodeRequest { - kind: Some(node_request::Kind::GenProof(GenProofRequest { - challenge: vec![0xCA; 32], - })), - }, - response, - }) - .await - .unwrap(); - - let response = resp_rx.await.unwrap(); let _exp_status = GenProofStatus::Error as i32; assert!(matches!( response.kind.unwrap(), @@ -169,13 +159,12 @@ async fn test_gen_proof_failed() { async fn test_gen_proof_finished() { let mut test_server = TestServer::new().await; - let mut service = MockPostService::new(); - let challenge = &[0xCA; 32]; let indices = &[0xAA; 32]; let node_id = &[0xBB; 32]; let commitment_atx_id = &[0xCC; 32]; + let mut service = MockPostService::new(); service.expect_gen_proof().returning(move |c| { assert_eq!(c.as_slice(), challenge); Ok(ProofGenState::Finished { @@ -194,26 +183,24 @@ async fn test_gen_proof_finished() { }, }) }); + // First try passes + service + .expect_verify_proof() + .once() + .returning(|_, _| Ok(())); + // Second try fails + service + .expect_verify_proof() + .once() + .returning(|_, _| Err(eyre::eyre!("invalid proof"))); - let client = test_server.create_client(service); + let service = Arc::new(service); + let client = test_server.create_client(service.clone()); let client_handle = tokio::spawn(client.run()); let connected = test_server.connected.recv().await.unwrap(); - let (response, resp_rx) = oneshot::channel(); - connected - .send(TestNodeRequest { - request: NodeRequest { - kind: Some(node_request::Kind::GenProof(GenProofRequest { - challenge: challenge.to_vec(), - })), - }, - response, - }) - .await - .unwrap(); - - let response = resp_rx.await.unwrap(); + let response = TestServer::generate_proof(&connected, challenge.to_vec()).await; let _exp_status = GenProofStatus::Ok as i32; let _exp_proof = post_service::test_server::spacemesh_v1::Proof { nonce: 1, @@ -241,6 +228,18 @@ async fn test_gen_proof_finished() { }) )); + // Second try should fail at verification + let response = TestServer::generate_proof(&connected, challenge.to_vec()).await; + let _exp_status = GenProofStatus::Error as i32; + assert!(matches!( + response.kind.unwrap(), + service_response::Kind::GenProof(GenProofResponse { + status: _exp_status, + proof: None, + metadata: None + }) + )); + client_handle.abort(); let _ = client_handle.await; } @@ -254,7 +253,8 @@ async fn test_broken_request_no_kind() { .expect_gen_proof() .returning(|_| Err(eyre::eyre!("failed to generate proof"))); - let client = test_server.create_client(service); + let service = Arc::new(service); + let client = test_server.create_client(service.clone()); let client_handle = tokio::spawn(client.run()); let connected = test_server.connected.recv().await.unwrap(); diff --git a/service/tests/test_service.rs b/service/tests/test_service.rs index cfc067d8..718295c0 100644 --- a/service/tests/test_service.rs +++ b/service/tests/test_service.rs @@ -2,8 +2,7 @@ use std::{thread::sleep, time::Duration}; use post::{ initialize::{CpuInitializer, Initialize}, - pow::randomx::{PoW, RandomXFlag}, - verification::{Verifier, VerifyingParams}, + pow::randomx::RandomXFlag, ScryptParams, }; use post_service::{client::PostService, service::ProofGenState}; @@ -37,7 +36,7 @@ fn test_generate_and_verify() { let pow_flags = RandomXFlag::get_recommended_flags(); // Generate a proof - let mut service = + let service = post_service::service::PostService::new(datadir.into_path(), cfg, 16, 1, pow_flags) .unwrap(); @@ -51,13 +50,8 @@ fn test_generate_and_verify() { }; // Verify the proof - let verifier = Verifier::new(Box::new(PoW::new(pow_flags).unwrap())); - verifier - .verify( - &proof, - &metadata, - VerifyingParams::new(&metadata, &cfg).unwrap(), - ) + service + .verify_proof(&proof, &metadata) .expect("proof should be valid"); } @@ -88,7 +82,7 @@ fn reject_invalid_challenge() { .unwrap(); // Generate a proof - let mut service = post_service::service::PostService::new( + let service = post_service::service::PostService::new( datadir.into_path(), cfg, 16, @@ -126,7 +120,7 @@ fn cannot_run_parallel_proof_gens() { .unwrap(); // Generate a proof - let mut service = post_service::service::PostService::new( + let service = post_service::service::PostService::new( datadir.into_path(), cfg, 16, diff --git a/src/prove.rs b/src/prove.rs index aa2afcb3..5a502abb 100644 --- a/src/prove.rs +++ b/src/prove.rs @@ -269,6 +269,7 @@ impl Prover for Prover8_56 { } /// Generate a proof that data is still held, given the challenge. +#[allow(clippy::too_many_arguments)] pub fn generate_proof( datadir: &Path, challenge: &[u8; 32], From f7fef27f83d0b5953a02ab3998b481f1569324e1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartosz=20R=C3=B3=C5=BCa=C5=84ski?= Date: Tue, 26 Sep 2023 12:23:11 +0200 Subject: [PATCH 10/12] Don't require POST to be initialized when service starts --- service/src/service.rs | 31 ++++++++++++++++++++++--------- 1 file changed, 22 insertions(+), 9 deletions(-) diff --git a/service/src/service.rs b/service/src/service.rs index 921cbb56..802097a1 100644 --- a/service/src/service.rs +++ b/service/src/service.rs @@ -13,6 +13,7 @@ use post::{ verification::{Verifier, VerifyingParams}, }; +#[derive(Debug)] pub enum ProofGenState { InProgress, Finished { @@ -28,7 +29,6 @@ struct ProofGenProcess { } pub struct PostService { - id: [u8; 32], datadir: PathBuf, cfg: post::config::Config, nonces: usize, @@ -48,11 +48,7 @@ impl PostService { threads: usize, pow_flags: RandomXFlag, ) -> eyre::Result { - let metadata = - post::metadata::load(&datadir).wrap_err("loading metadata. Is POST initialized?")?; - Ok(Self { - id: metadata.node_id, proof_generation: Mutex::new(None), datadir, cfg, @@ -111,6 +107,13 @@ impl crate::client::PostService for PostService { } } + let metadata = post::metadata::load(&self.datadir).wrap_err_with(|| { + format!( + "loading metadata from {}. Is POST initialized?", + self.datadir.as_path().display() + ) + })?; + let ch: [u8; 32] = challenge .as_slice() .try_into() @@ -119,7 +122,6 @@ impl crate::client::PostService for PostService { let pow_flags = self.pow_flags; let cfg = self.cfg; let datadir = self.datadir.clone(); - let miner_id = Some(self.id); let nonces = self.nonces; let threads = self.threads; let stop = self.stop.clone(); @@ -127,7 +129,14 @@ impl crate::client::PostService for PostService { challenge, handle: std::thread::spawn(move || { post::prove::generate_proof( - &datadir, &ch, cfg, nonces, threads, pow_flags, miner_id, stop, + &datadir, + &ch, + cfg, + nonces, + threads, + pow_flags, + Some(metadata.node_id), + stop, ) }), }); @@ -156,9 +165,11 @@ impl Drop for PostService { #[cfg(test)] mod tests { + use crate::client::PostService; + #[test] fn needs_post_data() { - assert!(super::PostService::new( + let service = super::PostService::new( std::path::PathBuf::from(""), post::config::Config { k1: 8, @@ -171,6 +182,8 @@ mod tests { 1, post::pow::randomx::RandomXFlag::get_recommended_flags(), ) - .is_err()); + .unwrap(); + + assert!(service.gen_proof(vec![0xCA; 32]).is_err()); } } From 66903145d1057fb67871f74e412cb99c15601a31 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartosz=20R=C3=B3=C5=BCa=C5=84ski?= Date: Tue, 3 Oct 2023 13:57:19 +0200 Subject: [PATCH 11/12] Configurable server's domain for TLS + cleanup --- service/src/client.rs | 8 ++++---- service/src/main.rs | 12 ++++++++---- service/src/tls_config.rs | 3 +++ src/prove.rs | 19 +++++++++---------- 4 files changed, 24 insertions(+), 18 deletions(-) diff --git a/service/src/client.rs b/service/src/client.rs index 5bc10642..92f2429b 100644 --- a/service/src/client.rs +++ b/service/src/client.rs @@ -62,14 +62,14 @@ impl ServiceClient { pub fn new( address: String, reconnect_interval: Duration, - cert: Option<(Certificate, Identity)>, + tls: Option<(String, Certificate, Identity)>, service: S, ) -> eyre::Result { let endpoint = Channel::builder(address.parse()?); - let endpoint = match cert { - Some((cert, identity)) => endpoint.tls_config( + let endpoint = match tls { + Some((domain, cert, identity)) => endpoint.tls_config( ClientTlsConfig::new() - .domain_name("localhost") + .domain_name(domain) .ca_certificate(cert) .identity(identity), )?, diff --git a/service/src/main.rs b/service/src/main.rs index 643c83aa..8df2e12a 100644 --- a/service/src/main.rs +++ b/service/src/main.rs @@ -137,6 +137,9 @@ async fn main() -> eyre::Result<()> { let env = env_logger::Env::default().filter_or("RUST_LOG", "info"); env_logger::init_from_env(env); + log::info!("POST network parameters: {:?}", args.post_config); + log::info!("POST proving settings: {:?}", args.post_settings); + let service = post_service::service::PostService::new( args.dir, post::config::Config { @@ -156,9 +159,10 @@ async fn main() -> eyre::Result<()> { ) .wrap_err("creating Post Service")?; - let cert = if let Some(tls) = args.tls { + let tls = if let Some(tls) = args.tls { log::info!( - "configuring TLS: server CA cert: {}, client cert: {}, client key: {}", + "configuring TLS: server: (CA cert: {}, domain: {}), client: (cert: {}, key: {})", + tls.domain, tls.ca_cert.display(), tls.cert.display(), tls.key.display(), @@ -167,6 +171,7 @@ async fn main() -> eyre::Result<()> { let cert = read_to_string(tls.cert)?; let key = read_to_string(tls.key)?; Some(( + tls.domain, Certificate::from_pem(server_ca_cert), Identity::from_pem(cert, key), )) @@ -175,8 +180,7 @@ async fn main() -> eyre::Result<()> { None }; - let client = - client::ServiceClient::new(args.address, args.reconnect_interval_s, cert, service)?; + let client = client::ServiceClient::new(args.address, args.reconnect_interval_s, tls, service)?; client.run().await } diff --git a/service/src/tls_config.rs b/service/src/tls_config.rs index d67ee1a6..a686ae48 100644 --- a/service/src/tls_config.rs +++ b/service/src/tls_config.rs @@ -15,4 +15,7 @@ pub struct Tls { pub cert: PathBuf, #[arg(long, required = false)] pub key: PathBuf, + /// domain name to verify the certificate of server against + #[arg(long, default_value = "localhost")] + pub domain: String, } diff --git a/src/prove.rs b/src/prove.rs index 5a502abb..0597aa52 100644 --- a/src/prove.rs +++ b/src/prove.rs @@ -8,20 +8,19 @@ //! ## k2 proof of work //! TODO: explain +use std::borrow::{Borrow, Cow}; +use std::sync::{ + atomic::{AtomicBool, Ordering}, + Mutex, +}; +use std::{collections::HashMap, ops::Range, path::Path, time::Instant}; + use aes::cipher::block_padding::NoPadding; use aes::cipher::BlockEncrypt; use eyre::Context; use primitive_types::U256; use randomx_rs::RandomXFlag; use rayon::prelude::{ParallelBridge, ParallelIterator}; -use std::{ - borrow::{Borrow, Cow}, - collections::HashMap, - ops::Range, - path::Path, - sync::{atomic::AtomicBool, Mutex}, - time::Instant, -}; use crate::{ cipher::AesCipher, @@ -299,7 +298,7 @@ where let total_time = Instant::now(); loop { - if stop.load(std::sync::atomic::Ordering::Relaxed) { + if stop.load(Ordering::Relaxed) { eyre::bail!("proof generation was stopped"); } @@ -326,7 +325,7 @@ where let result = pool.install(|| { data_reader .par_bridge() - .take_any_while(|_| !stop.load(std::sync::atomic::Ordering::Relaxed)) + .take_any_while(|_| !stop.load(Ordering::Relaxed)) .find_map_any(|batch| { prover.prove( &batch.data, From 27177e3400de4dc0ab6cf2ae3bd6ddf9fa7e6054 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartosz=20R=C3=B3=C5=BCa=C5=84ski?= Date: Mon, 9 Oct 2023 13:57:47 +0200 Subject: [PATCH 12/12] Install protoc in CI for coverage job --- .github/workflows/ci.yml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index b2f4cc78..b1d25dad 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -124,6 +124,9 @@ jobs: coverage: runs-on: ubuntu-latest steps: + - uses: arduino/setup-protoc@v2 + with: + repo-token: ${{ secrets.GITHUB_TOKEN }} - name: Install opencl run: sudo apt-get install -y libpocl2 mesa-opencl-icd ocl-icd-opencl-dev - uses: actions/checkout@v3