Max latency under 1.5seconds, msg-ops count below 17
This commit is contained in:
parent
d7b6314280
commit
1923670da8
7 changed files with 934 additions and 0 deletions
1
3e-efficient-broadcast-part-two/.gitignore
vendored
Normal file
1
3e-efficient-broadcast-part-two/.gitignore
vendored
Normal file
|
|
@ -0,0 +1 @@
|
|||
/target
|
||||
477
3e-efficient-broadcast-part-two/Cargo.lock
generated
Normal file
477
3e-efficient-broadcast-part-two/Cargo.lock
generated
Normal file
|
|
@ -0,0 +1,477 @@
|
|||
# This file is automatically @generated by Cargo.
|
||||
# It is not intended for manual editing.
|
||||
version = 3
|
||||
|
||||
[[package]]
|
||||
name = "autocfg"
|
||||
version = "1.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
|
||||
|
||||
[[package]]
|
||||
name = "bitflags"
|
||||
version = "1.3.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
|
||||
|
||||
[[package]]
|
||||
name = "bytes"
|
||||
version = "1.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "89b2fd2a0dcf38d7971e2194b6b6eebab45ae01067456a7fd93d5547a61b70be"
|
||||
|
||||
[[package]]
|
||||
name = "cfg-if"
|
||||
version = "1.0.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
|
||||
|
||||
[[package]]
|
||||
name = "ch03e-efficient-broadcast-part-two"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"rand",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "getrandom"
|
||||
version = "0.2.9"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c85e1d9ab2eadba7e5040d4e09cbd6d072b76a557ad64e797c2cb9d4da21d7e4"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"libc",
|
||||
"wasi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "hermit-abi"
|
||||
version = "0.2.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ee512640fe35acbfb4bb779db6f0d80704c2cacfa2e39b601ef3e3f47d1ae4c7"
|
||||
dependencies = [
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "itoa"
|
||||
version = "1.0.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "453ad9f582a441959e5f0d088b02ce04cfe8d51a8eaf077f12ac6d3e94164ca6"
|
||||
|
||||
[[package]]
|
||||
name = "libc"
|
||||
version = "0.2.144"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2b00cc1c228a6782d0f076e7b232802e0c5689d41bb5df366f2a6b6621cfdfe1"
|
||||
|
||||
[[package]]
|
||||
name = "lock_api"
|
||||
version = "0.4.9"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "435011366fe56583b16cf956f9df0095b405b82d76425bc8981c0e22e60ec4df"
|
||||
dependencies = [
|
||||
"autocfg",
|
||||
"scopeguard",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "log"
|
||||
version = "0.4.17"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "abb12e687cfb44aa40f41fc3978ef76448f9b6038cad6aef4259d3c095a2382e"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "mio"
|
||||
version = "0.8.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5b9d9a46eff5b4ff64b45a9e316a6d1e0bc719ef429cbec4dc630684212bfdf9"
|
||||
dependencies = [
|
||||
"libc",
|
||||
"log",
|
||||
"wasi",
|
||||
"windows-sys 0.45.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "num_cpus"
|
||||
version = "1.15.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0fac9e2da13b5eb447a6ce3d392f23a29d8694bff781bf03a16cd9ac8697593b"
|
||||
dependencies = [
|
||||
"hermit-abi",
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "parking_lot"
|
||||
version = "0.12.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f"
|
||||
dependencies = [
|
||||
"lock_api",
|
||||
"parking_lot_core",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "parking_lot_core"
|
||||
version = "0.9.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9069cbb9f99e3a5083476ccb29ceb1de18b9118cafa53e90c9551235de2b9521"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"libc",
|
||||
"redox_syscall",
|
||||
"smallvec",
|
||||
"windows-sys 0.45.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pin-project-lite"
|
||||
version = "0.2.9"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e0a7ae3ac2f1173085d398531c705756c94a4c56843785df85a60c1a0afac116"
|
||||
|
||||
[[package]]
|
||||
name = "ppv-lite86"
|
||||
version = "0.2.17"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de"
|
||||
|
||||
[[package]]
|
||||
name = "proc-macro2"
|
||||
version = "1.0.56"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2b63bdb0cd06f1f4dedf69b254734f9b45af66e4a031e42a7480257d9898b435"
|
||||
dependencies = [
|
||||
"unicode-ident",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "quote"
|
||||
version = "1.0.26"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4424af4bf778aae2051a77b60283332f386554255d722233d09fbfc7e30da2fc"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rand"
|
||||
version = "0.8.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404"
|
||||
dependencies = [
|
||||
"libc",
|
||||
"rand_chacha",
|
||||
"rand_core",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rand_chacha"
|
||||
version = "0.3.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88"
|
||||
dependencies = [
|
||||
"ppv-lite86",
|
||||
"rand_core",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rand_core"
|
||||
version = "0.6.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c"
|
||||
dependencies = [
|
||||
"getrandom",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "redox_syscall"
|
||||
version = "0.2.16"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "fb5a58c1855b4b6819d59012155603f0b22ad30cad752600aadfcb695265519a"
|
||||
dependencies = [
|
||||
"bitflags",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "ryu"
|
||||
version = "1.0.13"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f91339c0467de62360649f8d3e185ca8de4224ff281f66000de5eb2a77a79041"
|
||||
|
||||
[[package]]
|
||||
name = "scopeguard"
|
||||
version = "1.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd"
|
||||
|
||||
[[package]]
|
||||
name = "serde"
|
||||
version = "1.0.160"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bb2f3770c8bce3bcda7e149193a069a0f4365bda1fa5cd88e03bca26afc1216c"
|
||||
dependencies = [
|
||||
"serde_derive",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "serde_derive"
|
||||
version = "1.0.160"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "291a097c63d8497e00160b166a967a4a79c64f3facdd01cbd7502231688d77df"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "serde_json"
|
||||
version = "1.0.96"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "057d394a50403bcac12672b2b18fb387ab6d289d957dab67dd201875391e52f1"
|
||||
dependencies = [
|
||||
"itoa",
|
||||
"ryu",
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "signal-hook-registry"
|
||||
version = "1.4.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d8229b473baa5980ac72ef434c4415e70c4b5e71b423043adb4ba059f89c99a1"
|
||||
dependencies = [
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "smallvec"
|
||||
version = "1.10.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a507befe795404456341dfab10cef66ead4c041f62b8b11bbb92bffe5d0953e0"
|
||||
|
||||
[[package]]
|
||||
name = "socket2"
|
||||
version = "0.4.9"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "64a4a911eed85daf18834cfaa86a79b7d266ff93ff5ba14005426219480ed662"
|
||||
dependencies = [
|
||||
"libc",
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "syn"
|
||||
version = "2.0.15"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a34fcf3e8b60f57e6a14301a2e916d323af98b0ea63c599441eec8558660c822"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"unicode-ident",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tokio"
|
||||
version = "1.28.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0aa32867d44e6f2ce3385e89dceb990188b8bb0fb25b0cf576647a6f98ac5105"
|
||||
dependencies = [
|
||||
"autocfg",
|
||||
"bytes",
|
||||
"libc",
|
||||
"mio",
|
||||
"num_cpus",
|
||||
"parking_lot",
|
||||
"pin-project-lite",
|
||||
"signal-hook-registry",
|
||||
"socket2",
|
||||
"tokio-macros",
|
||||
"windows-sys 0.48.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tokio-macros"
|
||||
version = "2.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "unicode-ident"
|
||||
version = "1.0.8"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e5464a87b239f13a63a501f2701565754bae92d243d4bb7eb12f6d57d2269bf4"
|
||||
|
||||
[[package]]
|
||||
name = "wasi"
|
||||
version = "0.11.0+wasi-snapshot-preview1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
|
||||
|
||||
[[package]]
|
||||
name = "winapi"
|
||||
version = "0.3.9"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419"
|
||||
dependencies = [
|
||||
"winapi-i686-pc-windows-gnu",
|
||||
"winapi-x86_64-pc-windows-gnu",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "winapi-i686-pc-windows-gnu"
|
||||
version = "0.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
|
||||
|
||||
[[package]]
|
||||
name = "winapi-x86_64-pc-windows-gnu"
|
||||
version = "0.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
|
||||
|
||||
[[package]]
|
||||
name = "windows-sys"
|
||||
version = "0.45.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "75283be5efb2831d37ea142365f009c02ec203cd29a3ebecbc093d52315b66d0"
|
||||
dependencies = [
|
||||
"windows-targets 0.42.2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "windows-sys"
|
||||
version = "0.48.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9"
|
||||
dependencies = [
|
||||
"windows-targets 0.48.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "windows-targets"
|
||||
version = "0.42.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8e5180c00cd44c9b1c88adb3693291f1cd93605ded80c250a75d472756b4d071"
|
||||
dependencies = [
|
||||
"windows_aarch64_gnullvm 0.42.2",
|
||||
"windows_aarch64_msvc 0.42.2",
|
||||
"windows_i686_gnu 0.42.2",
|
||||
"windows_i686_msvc 0.42.2",
|
||||
"windows_x86_64_gnu 0.42.2",
|
||||
"windows_x86_64_gnullvm 0.42.2",
|
||||
"windows_x86_64_msvc 0.42.2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "windows-targets"
|
||||
version = "0.48.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7b1eb6f0cd7c80c79759c929114ef071b87354ce476d9d94271031c0497adfd5"
|
||||
dependencies = [
|
||||
"windows_aarch64_gnullvm 0.48.0",
|
||||
"windows_aarch64_msvc 0.48.0",
|
||||
"windows_i686_gnu 0.48.0",
|
||||
"windows_i686_msvc 0.48.0",
|
||||
"windows_x86_64_gnu 0.48.0",
|
||||
"windows_x86_64_gnullvm 0.48.0",
|
||||
"windows_x86_64_msvc 0.48.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "windows_aarch64_gnullvm"
|
||||
version = "0.42.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "597a5118570b68bc08d8d59125332c54f1ba9d9adeedeef5b99b02ba2b0698f8"
|
||||
|
||||
[[package]]
|
||||
name = "windows_aarch64_gnullvm"
|
||||
version = "0.48.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "91ae572e1b79dba883e0d315474df7305d12f569b400fcf90581b06062f7e1bc"
|
||||
|
||||
[[package]]
|
||||
name = "windows_aarch64_msvc"
|
||||
version = "0.42.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e08e8864a60f06ef0d0ff4ba04124db8b0fb3be5776a5cd47641e942e58c4d43"
|
||||
|
||||
[[package]]
|
||||
name = "windows_aarch64_msvc"
|
||||
version = "0.48.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b2ef27e0d7bdfcfc7b868b317c1d32c641a6fe4629c171b8928c7b08d98d7cf3"
|
||||
|
||||
[[package]]
|
||||
name = "windows_i686_gnu"
|
||||
version = "0.42.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c61d927d8da41da96a81f029489353e68739737d3beca43145c8afec9a31a84f"
|
||||
|
||||
[[package]]
|
||||
name = "windows_i686_gnu"
|
||||
version = "0.48.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "622a1962a7db830d6fd0a69683c80a18fda201879f0f447f065a3b7467daa241"
|
||||
|
||||
[[package]]
|
||||
name = "windows_i686_msvc"
|
||||
version = "0.42.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "44d840b6ec649f480a41c8d80f9c65108b92d89345dd94027bfe06ac444d1060"
|
||||
|
||||
[[package]]
|
||||
name = "windows_i686_msvc"
|
||||
version = "0.48.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4542c6e364ce21bf45d69fdd2a8e455fa38d316158cfd43b3ac1c5b1b19f8e00"
|
||||
|
||||
[[package]]
|
||||
name = "windows_x86_64_gnu"
|
||||
version = "0.42.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8de912b8b8feb55c064867cf047dda097f92d51efad5b491dfb98f6bbb70cb36"
|
||||
|
||||
[[package]]
|
||||
name = "windows_x86_64_gnu"
|
||||
version = "0.48.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ca2b8a661f7628cbd23440e50b05d705db3686f894fc9580820623656af974b1"
|
||||
|
||||
[[package]]
|
||||
name = "windows_x86_64_gnullvm"
|
||||
version = "0.42.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "26d41b46a36d453748aedef1486d5c7a85db22e56aff34643984ea85514e94a3"
|
||||
|
||||
[[package]]
|
||||
name = "windows_x86_64_gnullvm"
|
||||
version = "0.48.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7896dbc1f41e08872e9d5e8f8baa8fdd2677f29468c4e156210174edc7f7b953"
|
||||
|
||||
[[package]]
|
||||
name = "windows_x86_64_msvc"
|
||||
version = "0.42.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9aec5da331524158c6d1a4ac0ab1541149c0b9505fde06423b02f5ef0106b9f0"
|
||||
|
||||
[[package]]
|
||||
name = "windows_x86_64_msvc"
|
||||
version = "0.48.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1a515f5799fe4961cb532f983ce2b23082366b898e52ffbce459c86f67c8378a"
|
||||
10
3e-efficient-broadcast-part-two/Cargo.toml
Normal file
10
3e-efficient-broadcast-part-two/Cargo.toml
Normal file
|
|
@ -0,0 +1,10 @@
|
|||
[package]
|
||||
name = "ch03e-efficient-broadcast-part-two"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
rand = "0.8.5"
|
||||
serde = {version = "1", features = ["derive"] }
|
||||
serde_json = "1"
|
||||
tokio = { version = "1.28.1", features = ["full"] }
|
||||
253
3e-efficient-broadcast-part-two/src/main.rs
Normal file
253
3e-efficient-broadcast-part-two/src/main.rs
Normal file
|
|
@ -0,0 +1,253 @@
|
|||
mod message;
|
||||
mod node;
|
||||
mod storage;
|
||||
|
||||
use crate::message::{Body, Message};
|
||||
use crate::node::Node;
|
||||
use crate::storage::Storage;
|
||||
|
||||
use rand::prelude::*;
|
||||
use rand::rngs::StdRng;
|
||||
use std::io::Write;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use std::{println, thread};
|
||||
use tokio::io::AsyncBufReadExt;
|
||||
use tokio::io::BufReader;
|
||||
use tokio::sync::{
|
||||
mpsc,
|
||||
mpsc::{Receiver, Sender},
|
||||
Mutex,
|
||||
};
|
||||
|
||||
const GOSSIP_DELAY: u64 = 500;
|
||||
const MIN_AMOUNT_NODES: usize = 1;
|
||||
const NETWORK_SIZE: usize = 25;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
let (reader_tx, mut reader_rx) = mpsc::channel(1000);
|
||||
let (writer_tx, mut writer_rx) = mpsc::channel(1000);
|
||||
|
||||
let reader_tx1: Sender<Message> = reader_tx.clone();
|
||||
let writer_tx1: Sender<Message> = writer_tx.clone();
|
||||
let writer_tx2: Sender<Message> = writer_tx.clone();
|
||||
|
||||
let node = Node::default();
|
||||
let store = Arc::new(Mutex::new(Storage::default()));
|
||||
|
||||
let node = init_node(node).await;
|
||||
|
||||
let n1 = node.clone();
|
||||
let s1 = store.clone();
|
||||
|
||||
let read = tokio::spawn(async move {
|
||||
read_from_stdin(reader_tx1).await;
|
||||
});
|
||||
|
||||
let write = tokio::spawn(async move {
|
||||
write_to_stdout(&mut writer_rx).await;
|
||||
});
|
||||
|
||||
let gossip = tokio::spawn(async move {
|
||||
loop {
|
||||
thread::sleep(Duration::from_millis(GOSSIP_DELAY));
|
||||
gossip_messages(n1.clone(), s1.clone(), writer_tx2.clone()).await;
|
||||
}
|
||||
});
|
||||
|
||||
let handle = tokio::spawn(async move {
|
||||
handle_messages(node.clone(), store.clone(), &mut reader_rx, writer_tx1).await;
|
||||
});
|
||||
|
||||
let _ = tokio::try_join!(read, handle, write, gossip);
|
||||
}
|
||||
|
||||
async fn init_node(node: Node) -> Node {
|
||||
let stdin = tokio::io::stdin();
|
||||
let mut stdout = std::io::stdout();
|
||||
|
||||
let mut reader = BufReader::new(stdin);
|
||||
let mut buf = String::new();
|
||||
|
||||
reader.read_line(&mut buf).await.unwrap();
|
||||
let message = Message::parse_message(buf.clone());
|
||||
let node = node.init(message.clone());
|
||||
|
||||
match message.body {
|
||||
Body::Init {
|
||||
msg_id, node_id, ..
|
||||
} => {
|
||||
let response = Message {
|
||||
src: node_id,
|
||||
dest: message.src.clone(),
|
||||
body: Body::InitOk {
|
||||
in_reply_to: msg_id,
|
||||
},
|
||||
};
|
||||
|
||||
let message = Message::format_message(response);
|
||||
writeln!(stdout, "{}", message).unwrap();
|
||||
stdout.flush().unwrap();
|
||||
}
|
||||
_ => (),
|
||||
}
|
||||
|
||||
node
|
||||
}
|
||||
|
||||
async fn read_from_stdin(reader_tx: Sender<Message>) {
|
||||
let stdin = tokio::io::stdin();
|
||||
let mut reader = BufReader::new(stdin);
|
||||
|
||||
loop {
|
||||
let mut buf = String::new();
|
||||
reader.read_line(&mut buf).await.unwrap();
|
||||
let message = Message::parse_message(buf.clone());
|
||||
reader_tx.send(message).await.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
async fn write_to_stdout(writer_rx: &mut Receiver<Message>) {
|
||||
let mut stdout = std::io::stdout();
|
||||
|
||||
loop {
|
||||
let message = writer_rx.recv().await.unwrap();
|
||||
let message = Message::format_message(message);
|
||||
writeln!(stdout, "{}", message).unwrap();
|
||||
stdout.flush().unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
async fn gossip_messages(node: Node, storage: Arc<Mutex<Storage>>, writer: Sender<Message>) {
|
||||
let mut rng = StdRng::from_entropy();
|
||||
|
||||
let num_to_select = rng.gen_range(MIN_AMOUNT_NODES..=NETWORK_SIZE);
|
||||
|
||||
let selected_neighbours: Vec<String> = node
|
||||
.get_network()
|
||||
.choose_multiple(&mut rng, num_to_select)
|
||||
.cloned()
|
||||
.collect();
|
||||
|
||||
let mut tasks = vec![];
|
||||
|
||||
for n in selected_neighbours {
|
||||
let storage_clone = storage.clone();
|
||||
let writer_clone = writer.clone();
|
||||
let node_clone = node.clone();
|
||||
|
||||
let task = tokio::spawn(async move {
|
||||
let messages = storage_clone
|
||||
.lock()
|
||||
.await
|
||||
.get_new_messages_for_neighbour(n.clone());
|
||||
|
||||
if messages.is_empty() {
|
||||
return;
|
||||
}
|
||||
|
||||
let message = Message {
|
||||
src: node_clone.id.clone(),
|
||||
dest: n.clone(),
|
||||
body: Body::Gossip {
|
||||
messages: messages.clone(),
|
||||
},
|
||||
};
|
||||
|
||||
writer_clone.send(message).await.unwrap();
|
||||
});
|
||||
|
||||
tasks.push(task);
|
||||
}
|
||||
|
||||
// Wait for all the gossip tasks to complete
|
||||
for task in tasks {
|
||||
task.await.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_messages(
|
||||
node: Node,
|
||||
storage: Arc<Mutex<Storage>>,
|
||||
input: &mut Receiver<Message>,
|
||||
writer: Sender<Message>,
|
||||
) {
|
||||
while let Some(input) = input.recv().await {
|
||||
match input.body {
|
||||
Body::Broadcast { msg_id, message } => {
|
||||
let id = node.id.clone();
|
||||
storage.lock().await.add_message(message);
|
||||
|
||||
let response = Message {
|
||||
src: id,
|
||||
dest: input.src,
|
||||
body: Body::BroadcastOk {
|
||||
msg_id,
|
||||
in_reply_to: msg_id,
|
||||
},
|
||||
};
|
||||
|
||||
writer.send(response).await.unwrap();
|
||||
}
|
||||
Body::Gossip { messages } => {
|
||||
let id = node.id.clone();
|
||||
storage
|
||||
.lock()
|
||||
.await
|
||||
.add_messages(messages.clone(), input.src.clone());
|
||||
|
||||
let response = Message {
|
||||
src: id,
|
||||
dest: input.src,
|
||||
body: Body::GossipOk { messages },
|
||||
};
|
||||
|
||||
writer.send(response).await.unwrap();
|
||||
}
|
||||
Body::GossipOk { messages } => {
|
||||
storage
|
||||
.lock()
|
||||
.await
|
||||
.add_to_sent_messages(messages, input.src);
|
||||
}
|
||||
Body::Read { msg_id } => {
|
||||
let response = Message {
|
||||
src: node.id.clone(),
|
||||
dest: input.src,
|
||||
body: Body::ReadOk {
|
||||
msg_id,
|
||||
in_reply_to: msg_id,
|
||||
messages: storage.lock().await.get_messages(),
|
||||
},
|
||||
};
|
||||
|
||||
writer.send(response).await.unwrap();
|
||||
}
|
||||
Body::Topology { msg_id, .. } => {
|
||||
let response = Message {
|
||||
src: node.id.clone(),
|
||||
dest: input.src,
|
||||
body: Body::TopologyOk {
|
||||
msg_id,
|
||||
in_reply_to: msg_id,
|
||||
},
|
||||
};
|
||||
|
||||
writer.send(response).await.unwrap();
|
||||
}
|
||||
Body::Error {
|
||||
in_reply_to,
|
||||
code,
|
||||
text,
|
||||
} => {
|
||||
eprintln!(
|
||||
"Error received (in_reply_to: {}, code: {}, text: {})",
|
||||
in_reply_to, code, text
|
||||
);
|
||||
}
|
||||
_ => (),
|
||||
}
|
||||
}
|
||||
println!("Error, nothing to read from receiver");
|
||||
}
|
||||
68
3e-efficient-broadcast-part-two/src/message.rs
Normal file
68
3e-efficient-broadcast-part-two/src/message.rs
Normal file
|
|
@ -0,0 +1,68 @@
|
|||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashMap;
|
||||
|
||||
#[derive(Clone, Serialize, Deserialize, Debug)]
|
||||
pub struct Message {
|
||||
pub src: String,
|
||||
pub dest: String,
|
||||
pub body: Body,
|
||||
}
|
||||
|
||||
#[derive(Clone, Serialize, Deserialize, Debug)]
|
||||
#[serde(tag = "type")]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum Body {
|
||||
Error {
|
||||
in_reply_to: u64,
|
||||
code: u64,
|
||||
text: String,
|
||||
},
|
||||
Init {
|
||||
msg_id: u64,
|
||||
node_id: String,
|
||||
node_ids: Vec<String>,
|
||||
},
|
||||
InitOk {
|
||||
in_reply_to: u64,
|
||||
},
|
||||
Broadcast {
|
||||
msg_id: u64,
|
||||
message: u64,
|
||||
},
|
||||
BroadcastOk {
|
||||
msg_id: u64,
|
||||
in_reply_to: u64,
|
||||
},
|
||||
Read {
|
||||
msg_id: u64,
|
||||
},
|
||||
ReadOk {
|
||||
msg_id: u64,
|
||||
in_reply_to: u64,
|
||||
messages: Vec<u64>,
|
||||
},
|
||||
Topology {
|
||||
msg_id: u64,
|
||||
topology: HashMap<String, Vec<String>>,
|
||||
},
|
||||
TopologyOk {
|
||||
msg_id: u64,
|
||||
in_reply_to: u64,
|
||||
},
|
||||
Gossip {
|
||||
messages: Vec<u64>,
|
||||
},
|
||||
GossipOk {
|
||||
messages: Vec<u64>,
|
||||
},
|
||||
}
|
||||
|
||||
impl Message {
|
||||
pub(crate) fn parse_message(message: String) -> Message {
|
||||
serde_json::from_str(&message).unwrap()
|
||||
}
|
||||
|
||||
pub(crate) fn format_message(message: Message) -> String {
|
||||
serde_json::to_string(&message).unwrap()
|
||||
}
|
||||
}
|
||||
40
3e-efficient-broadcast-part-two/src/node.rs
Normal file
40
3e-efficient-broadcast-part-two/src/node.rs
Normal file
|
|
@ -0,0 +1,40 @@
|
|||
use crate::message::{Body, Message};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashSet;
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Debug, Default)]
|
||||
pub(crate) struct Network(pub(crate) HashSet<String>);
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Debug, Default)]
|
||||
pub(crate) struct Node {
|
||||
pub(crate) id: String,
|
||||
pub(crate) availble_nodes: Vec<String>,
|
||||
pub(crate) network: Network,
|
||||
}
|
||||
|
||||
impl Node {
|
||||
pub(crate) fn init(&self, message: Message) -> Node {
|
||||
match message.body {
|
||||
Body::Init {
|
||||
node_id, node_ids, ..
|
||||
} => {
|
||||
return Node {
|
||||
id: node_id.clone(),
|
||||
availble_nodes: node_ids.clone(),
|
||||
network: self.init_network(node_ids),
|
||||
}
|
||||
}
|
||||
_ => panic!("Invalid message type"),
|
||||
}
|
||||
}
|
||||
|
||||
fn init_network(&self, nodes: Vec<String>) -> Network {
|
||||
let mut neighbours = Network::default();
|
||||
neighbours.0.extend(nodes);
|
||||
neighbours
|
||||
}
|
||||
|
||||
pub(crate) fn get_network(&self) -> Vec<String> {
|
||||
self.network.0.clone().into_iter().collect::<Vec<_>>()
|
||||
}
|
||||
}
|
||||
85
3e-efficient-broadcast-part-two/src/storage.rs
Normal file
85
3e-efficient-broadcast-part-two/src/storage.rs
Normal file
|
|
@ -0,0 +1,85 @@
|
|||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::{HashMap, HashSet};
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Debug, Default)]
|
||||
pub(crate) struct Messages(pub(crate) HashSet<u64>);
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Debug, Default)]
|
||||
pub(crate) struct Storage {
|
||||
pub(crate) messages: Messages,
|
||||
pub(crate) received_gossip_messages: HashMap<String, Messages>,
|
||||
pub(crate) sent_messages: HashMap<String, Messages>,
|
||||
pub(crate) retry: HashMap<String, u8>,
|
||||
}
|
||||
|
||||
impl Storage {
|
||||
pub(crate) fn add_message(&mut self, message: u64) {
|
||||
if !self.messages.0.contains(&message) {
|
||||
self.messages.0.insert(message);
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn add_messages(&mut self, messages: Vec<u64>, node: String) {
|
||||
if self.received_gossip_messages.contains_key(&node) {
|
||||
self.received_gossip_messages
|
||||
.get_mut(&node)
|
||||
.unwrap()
|
||||
.0
|
||||
.extend(messages.iter());
|
||||
} else {
|
||||
let mut v = Messages::default();
|
||||
v.0.extend(messages.iter());
|
||||
self.received_gossip_messages.insert(node, v);
|
||||
}
|
||||
|
||||
for m in messages {
|
||||
if !self.messages.0.contains(&m) {
|
||||
self.messages.0.insert(m);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn get_messages(&mut self) -> Vec<u64> {
|
||||
self.messages.0.iter().cloned().collect()
|
||||
}
|
||||
|
||||
pub(crate) fn get_new_messages_for_neighbour(&self, node: String) -> Vec<u64> {
|
||||
let received_messages = self.messages.0.clone().into_iter().collect::<Vec<_>>();
|
||||
|
||||
let sent_to_node: Vec<u64> = self
|
||||
.sent_messages
|
||||
.iter()
|
||||
.filter(|(key, _)| *key == &node)
|
||||
.flat_map(|(_, Messages(value))| value)
|
||||
.cloned()
|
||||
.collect();
|
||||
|
||||
let received_from_node: Vec<u64> = self
|
||||
.received_gossip_messages
|
||||
.iter()
|
||||
.filter(|(key, _)| *key == &node)
|
||||
.flat_map(|(_, Messages(value))| value)
|
||||
.cloned()
|
||||
.collect();
|
||||
|
||||
let filtered_messages: Vec<u64> = received_messages
|
||||
.into_iter()
|
||||
.filter(|x| !sent_to_node.contains(x) && !received_from_node.contains(x))
|
||||
.collect();
|
||||
|
||||
filtered_messages
|
||||
}
|
||||
|
||||
pub(crate) fn add_to_sent_messages(&mut self, messages: Vec<u64>, node: String) {
|
||||
if self.sent_messages.contains_key(&node) {
|
||||
self.sent_messages
|
||||
.get_mut(&node)
|
||||
.unwrap()
|
||||
.0
|
||||
.extend(messages);
|
||||
} else {
|
||||
self.sent_messages
|
||||
.insert(node, Messages(messages.iter().cloned().collect()));
|
||||
}
|
||||
}
|
||||
}
|
||||
Loading…
Reference in a new issue