From 1923670da807f368b1f5636f0075533da69129d5 Mon Sep 17 00:00:00 2001 From: Bastian Gruber Date: Tue, 16 May 2023 18:31:38 +0200 Subject: [PATCH] Max latency under 1.5seconds, msg-ops count below 17 --- 3e-efficient-broadcast-part-two/.gitignore | 1 + 3e-efficient-broadcast-part-two/Cargo.lock | 477 ++++++++++++++++++ 3e-efficient-broadcast-part-two/Cargo.toml | 10 + 3e-efficient-broadcast-part-two/src/main.rs | 253 ++++++++++ .../src/message.rs | 68 +++ 3e-efficient-broadcast-part-two/src/node.rs | 40 ++ .../src/storage.rs | 85 ++++ 7 files changed, 934 insertions(+) create mode 100644 3e-efficient-broadcast-part-two/.gitignore create mode 100644 3e-efficient-broadcast-part-two/Cargo.lock create mode 100644 3e-efficient-broadcast-part-two/Cargo.toml create mode 100644 3e-efficient-broadcast-part-two/src/main.rs create mode 100644 3e-efficient-broadcast-part-two/src/message.rs create mode 100644 3e-efficient-broadcast-part-two/src/node.rs create mode 100644 3e-efficient-broadcast-part-two/src/storage.rs diff --git a/3e-efficient-broadcast-part-two/.gitignore b/3e-efficient-broadcast-part-two/.gitignore new file mode 100644 index 0000000..ea8c4bf --- /dev/null +++ b/3e-efficient-broadcast-part-two/.gitignore @@ -0,0 +1 @@ +/target diff --git a/3e-efficient-broadcast-part-two/Cargo.lock b/3e-efficient-broadcast-part-two/Cargo.lock new file mode 100644 index 0000000..2d3ef89 --- /dev/null +++ b/3e-efficient-broadcast-part-two/Cargo.lock @@ -0,0 +1,477 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 3 + +[[package]] +name = "autocfg" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" + +[[package]] +name = "bitflags" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" + +[[package]] +name = "bytes" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89b2fd2a0dcf38d7971e2194b6b6eebab45ae01067456a7fd93d5547a61b70be" + +[[package]] +name = "cfg-if" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" + +[[package]] +name = "ch03e-efficient-broadcast-part-two" +version = "0.1.0" +dependencies = [ + "rand", + "serde", + "serde_json", + "tokio", +] + +[[package]] +name = "getrandom" +version = "0.2.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c85e1d9ab2eadba7e5040d4e09cbd6d072b76a557ad64e797c2cb9d4da21d7e4" +dependencies = [ + "cfg-if", + "libc", + "wasi", +] + +[[package]] +name = "hermit-abi" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee512640fe35acbfb4bb779db6f0d80704c2cacfa2e39b601ef3e3f47d1ae4c7" +dependencies = [ + "libc", +] + +[[package]] +name = "itoa" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "453ad9f582a441959e5f0d088b02ce04cfe8d51a8eaf077f12ac6d3e94164ca6" + +[[package]] +name = "libc" +version = "0.2.144" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b00cc1c228a6782d0f076e7b232802e0c5689d41bb5df366f2a6b6621cfdfe1" + +[[package]] +name = "lock_api" +version = "0.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "435011366fe56583b16cf956f9df0095b405b82d76425bc8981c0e22e60ec4df" +dependencies = [ + "autocfg", + "scopeguard", +] + +[[package]] +name = "log" +version = "0.4.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "abb12e687cfb44aa40f41fc3978ef76448f9b6038cad6aef4259d3c095a2382e" +dependencies = [ + "cfg-if", +] + +[[package]] +name = "mio" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b9d9a46eff5b4ff64b45a9e316a6d1e0bc719ef429cbec4dc630684212bfdf9" +dependencies = [ + "libc", + "log", + "wasi", + "windows-sys 0.45.0", +] + +[[package]] +name = "num_cpus" +version = "1.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fac9e2da13b5eb447a6ce3d392f23a29d8694bff781bf03a16cd9ac8697593b" +dependencies = [ + "hermit-abi", + "libc", +] + +[[package]] +name = "parking_lot" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f" +dependencies = [ + "lock_api", + "parking_lot_core", +] + +[[package]] +name = "parking_lot_core" +version = "0.9.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9069cbb9f99e3a5083476ccb29ceb1de18b9118cafa53e90c9551235de2b9521" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall", + "smallvec", + "windows-sys 0.45.0", +] + +[[package]] +name = "pin-project-lite" +version = "0.2.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e0a7ae3ac2f1173085d398531c705756c94a4c56843785df85a60c1a0afac116" + +[[package]] +name = "ppv-lite86" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" + +[[package]] +name = "proc-macro2" +version = "1.0.56" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b63bdb0cd06f1f4dedf69b254734f9b45af66e4a031e42a7480257d9898b435" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "quote" +version = "1.0.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4424af4bf778aae2051a77b60283332f386554255d722233d09fbfc7e30da2fc" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "rand" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" +dependencies = [ + "libc", + "rand_chacha", + "rand_core", +] + +[[package]] +name = "rand_chacha" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" +dependencies = [ + "ppv-lite86", + "rand_core", +] + +[[package]] +name = "rand_core" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" +dependencies = [ + "getrandom", +] + +[[package]] +name = "redox_syscall" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb5a58c1855b4b6819d59012155603f0b22ad30cad752600aadfcb695265519a" +dependencies = [ + "bitflags", +] + +[[package]] +name = "ryu" +version = "1.0.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f91339c0467de62360649f8d3e185ca8de4224ff281f66000de5eb2a77a79041" + +[[package]] +name = "scopeguard" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" + +[[package]] +name = "serde" +version = "1.0.160" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb2f3770c8bce3bcda7e149193a069a0f4365bda1fa5cd88e03bca26afc1216c" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_derive" +version = "1.0.160" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "291a097c63d8497e00160b166a967a4a79c64f3facdd01cbd7502231688d77df" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "serde_json" +version = "1.0.96" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "057d394a50403bcac12672b2b18fb387ab6d289d957dab67dd201875391e52f1" +dependencies = [ + "itoa", + "ryu", + "serde", +] + +[[package]] +name = "signal-hook-registry" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8229b473baa5980ac72ef434c4415e70c4b5e71b423043adb4ba059f89c99a1" +dependencies = [ + "libc", +] + +[[package]] +name = "smallvec" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a507befe795404456341dfab10cef66ead4c041f62b8b11bbb92bffe5d0953e0" + +[[package]] +name = "socket2" +version = "0.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "64a4a911eed85daf18834cfaa86a79b7d266ff93ff5ba14005426219480ed662" +dependencies = [ + "libc", + "winapi", +] + +[[package]] +name = "syn" +version = "2.0.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a34fcf3e8b60f57e6a14301a2e916d323af98b0ea63c599441eec8558660c822" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + +[[package]] +name = "tokio" +version = "1.28.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0aa32867d44e6f2ce3385e89dceb990188b8bb0fb25b0cf576647a6f98ac5105" +dependencies = [ + "autocfg", + "bytes", + "libc", + "mio", + "num_cpus", + "parking_lot", + "pin-project-lite", + "signal-hook-registry", + "socket2", + "tokio-macros", + "windows-sys 0.48.0", +] + +[[package]] +name = "tokio-macros" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "unicode-ident" +version = "1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5464a87b239f13a63a501f2701565754bae92d243d4bb7eb12f6d57d2269bf4" + +[[package]] +name = "wasi" +version = "0.11.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" + +[[package]] +name = "winapi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" +dependencies = [ + "winapi-i686-pc-windows-gnu", + "winapi-x86_64-pc-windows-gnu", +] + +[[package]] +name = "winapi-i686-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" + +[[package]] +name = "winapi-x86_64-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" + +[[package]] +name = "windows-sys" +version = "0.45.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75283be5efb2831d37ea142365f009c02ec203cd29a3ebecbc093d52315b66d0" +dependencies = [ + "windows-targets 0.42.2", +] + +[[package]] +name = "windows-sys" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9" +dependencies = [ + "windows-targets 0.48.0", +] + +[[package]] +name = "windows-targets" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e5180c00cd44c9b1c88adb3693291f1cd93605ded80c250a75d472756b4d071" +dependencies = [ + "windows_aarch64_gnullvm 0.42.2", + "windows_aarch64_msvc 0.42.2", + "windows_i686_gnu 0.42.2", + "windows_i686_msvc 0.42.2", + "windows_x86_64_gnu 0.42.2", + "windows_x86_64_gnullvm 0.42.2", + "windows_x86_64_msvc 0.42.2", +] + +[[package]] +name = "windows-targets" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b1eb6f0cd7c80c79759c929114ef071b87354ce476d9d94271031c0497adfd5" +dependencies = [ + "windows_aarch64_gnullvm 0.48.0", + "windows_aarch64_msvc 0.48.0", + "windows_i686_gnu 0.48.0", + "windows_i686_msvc 0.48.0", + "windows_x86_64_gnu 0.48.0", + "windows_x86_64_gnullvm 0.48.0", + "windows_x86_64_msvc 0.48.0", +] + +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "597a5118570b68bc08d8d59125332c54f1ba9d9adeedeef5b99b02ba2b0698f8" + +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91ae572e1b79dba883e0d315474df7305d12f569b400fcf90581b06062f7e1bc" + +[[package]] +name = "windows_aarch64_msvc" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e08e8864a60f06ef0d0ff4ba04124db8b0fb3be5776a5cd47641e942e58c4d43" + +[[package]] +name = "windows_aarch64_msvc" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2ef27e0d7bdfcfc7b868b317c1d32c641a6fe4629c171b8928c7b08d98d7cf3" + +[[package]] +name = "windows_i686_gnu" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c61d927d8da41da96a81f029489353e68739737d3beca43145c8afec9a31a84f" + +[[package]] +name = "windows_i686_gnu" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "622a1962a7db830d6fd0a69683c80a18fda201879f0f447f065a3b7467daa241" + +[[package]] +name = "windows_i686_msvc" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "44d840b6ec649f480a41c8d80f9c65108b92d89345dd94027bfe06ac444d1060" + +[[package]] +name = "windows_i686_msvc" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4542c6e364ce21bf45d69fdd2a8e455fa38d316158cfd43b3ac1c5b1b19f8e00" + +[[package]] +name = "windows_x86_64_gnu" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8de912b8b8feb55c064867cf047dda097f92d51efad5b491dfb98f6bbb70cb36" + +[[package]] +name = "windows_x86_64_gnu" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca2b8a661f7628cbd23440e50b05d705db3686f894fc9580820623656af974b1" + +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26d41b46a36d453748aedef1486d5c7a85db22e56aff34643984ea85514e94a3" + +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7896dbc1f41e08872e9d5e8f8baa8fdd2677f29468c4e156210174edc7f7b953" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9aec5da331524158c6d1a4ac0ab1541149c0b9505fde06423b02f5ef0106b9f0" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a515f5799fe4961cb532f983ce2b23082366b898e52ffbce459c86f67c8378a" diff --git a/3e-efficient-broadcast-part-two/Cargo.toml b/3e-efficient-broadcast-part-two/Cargo.toml new file mode 100644 index 0000000..dc566a8 --- /dev/null +++ b/3e-efficient-broadcast-part-two/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "ch03e-efficient-broadcast-part-two" +version = "0.1.0" +edition = "2021" + +[dependencies] +rand = "0.8.5" +serde = {version = "1", features = ["derive"] } +serde_json = "1" +tokio = { version = "1.28.1", features = ["full"] } diff --git a/3e-efficient-broadcast-part-two/src/main.rs b/3e-efficient-broadcast-part-two/src/main.rs new file mode 100644 index 0000000..360f062 --- /dev/null +++ b/3e-efficient-broadcast-part-two/src/main.rs @@ -0,0 +1,253 @@ +mod message; +mod node; +mod storage; + +use crate::message::{Body, Message}; +use crate::node::Node; +use crate::storage::Storage; + +use rand::prelude::*; +use rand::rngs::StdRng; +use std::io::Write; +use std::sync::Arc; +use std::time::Duration; +use std::{println, thread}; +use tokio::io::AsyncBufReadExt; +use tokio::io::BufReader; +use tokio::sync::{ + mpsc, + mpsc::{Receiver, Sender}, + Mutex, +}; + +const GOSSIP_DELAY: u64 = 500; +const MIN_AMOUNT_NODES: usize = 1; +const NETWORK_SIZE: usize = 25; + +#[tokio::main] +async fn main() { + let (reader_tx, mut reader_rx) = mpsc::channel(1000); + let (writer_tx, mut writer_rx) = mpsc::channel(1000); + + let reader_tx1: Sender = reader_tx.clone(); + let writer_tx1: Sender = writer_tx.clone(); + let writer_tx2: Sender = writer_tx.clone(); + + let node = Node::default(); + let store = Arc::new(Mutex::new(Storage::default())); + + let node = init_node(node).await; + + let n1 = node.clone(); + let s1 = store.clone(); + + let read = tokio::spawn(async move { + read_from_stdin(reader_tx1).await; + }); + + let write = tokio::spawn(async move { + write_to_stdout(&mut writer_rx).await; + }); + + let gossip = tokio::spawn(async move { + loop { + thread::sleep(Duration::from_millis(GOSSIP_DELAY)); + gossip_messages(n1.clone(), s1.clone(), writer_tx2.clone()).await; + } + }); + + let handle = tokio::spawn(async move { + handle_messages(node.clone(), store.clone(), &mut reader_rx, writer_tx1).await; + }); + + let _ = tokio::try_join!(read, handle, write, gossip); +} + +async fn init_node(node: Node) -> Node { + let stdin = tokio::io::stdin(); + let mut stdout = std::io::stdout(); + + let mut reader = BufReader::new(stdin); + let mut buf = String::new(); + + reader.read_line(&mut buf).await.unwrap(); + let message = Message::parse_message(buf.clone()); + let node = node.init(message.clone()); + + match message.body { + Body::Init { + msg_id, node_id, .. + } => { + let response = Message { + src: node_id, + dest: message.src.clone(), + body: Body::InitOk { + in_reply_to: msg_id, + }, + }; + + let message = Message::format_message(response); + writeln!(stdout, "{}", message).unwrap(); + stdout.flush().unwrap(); + } + _ => (), + } + + node +} + +async fn read_from_stdin(reader_tx: Sender) { + let stdin = tokio::io::stdin(); + let mut reader = BufReader::new(stdin); + + loop { + let mut buf = String::new(); + reader.read_line(&mut buf).await.unwrap(); + let message = Message::parse_message(buf.clone()); + reader_tx.send(message).await.unwrap(); + } +} + +async fn write_to_stdout(writer_rx: &mut Receiver) { + let mut stdout = std::io::stdout(); + + loop { + let message = writer_rx.recv().await.unwrap(); + let message = Message::format_message(message); + writeln!(stdout, "{}", message).unwrap(); + stdout.flush().unwrap(); + } +} + +async fn gossip_messages(node: Node, storage: Arc>, writer: Sender) { + let mut rng = StdRng::from_entropy(); + + let num_to_select = rng.gen_range(MIN_AMOUNT_NODES..=NETWORK_SIZE); + + let selected_neighbours: Vec = node + .get_network() + .choose_multiple(&mut rng, num_to_select) + .cloned() + .collect(); + + let mut tasks = vec![]; + + for n in selected_neighbours { + let storage_clone = storage.clone(); + let writer_clone = writer.clone(); + let node_clone = node.clone(); + + let task = tokio::spawn(async move { + let messages = storage_clone + .lock() + .await + .get_new_messages_for_neighbour(n.clone()); + + if messages.is_empty() { + return; + } + + let message = Message { + src: node_clone.id.clone(), + dest: n.clone(), + body: Body::Gossip { + messages: messages.clone(), + }, + }; + + writer_clone.send(message).await.unwrap(); + }); + + tasks.push(task); + } + + // Wait for all the gossip tasks to complete + for task in tasks { + task.await.unwrap(); + } +} + +async fn handle_messages( + node: Node, + storage: Arc>, + input: &mut Receiver, + writer: Sender, +) { + while let Some(input) = input.recv().await { + match input.body { + Body::Broadcast { msg_id, message } => { + let id = node.id.clone(); + storage.lock().await.add_message(message); + + let response = Message { + src: id, + dest: input.src, + body: Body::BroadcastOk { + msg_id, + in_reply_to: msg_id, + }, + }; + + writer.send(response).await.unwrap(); + } + Body::Gossip { messages } => { + let id = node.id.clone(); + storage + .lock() + .await + .add_messages(messages.clone(), input.src.clone()); + + let response = Message { + src: id, + dest: input.src, + body: Body::GossipOk { messages }, + }; + + writer.send(response).await.unwrap(); + } + Body::GossipOk { messages } => { + storage + .lock() + .await + .add_to_sent_messages(messages, input.src); + } + Body::Read { msg_id } => { + let response = Message { + src: node.id.clone(), + dest: input.src, + body: Body::ReadOk { + msg_id, + in_reply_to: msg_id, + messages: storage.lock().await.get_messages(), + }, + }; + + writer.send(response).await.unwrap(); + } + Body::Topology { msg_id, .. } => { + let response = Message { + src: node.id.clone(), + dest: input.src, + body: Body::TopologyOk { + msg_id, + in_reply_to: msg_id, + }, + }; + + writer.send(response).await.unwrap(); + } + Body::Error { + in_reply_to, + code, + text, + } => { + eprintln!( + "Error received (in_reply_to: {}, code: {}, text: {})", + in_reply_to, code, text + ); + } + _ => (), + } + } + println!("Error, nothing to read from receiver"); +} diff --git a/3e-efficient-broadcast-part-two/src/message.rs b/3e-efficient-broadcast-part-two/src/message.rs new file mode 100644 index 0000000..452289b --- /dev/null +++ b/3e-efficient-broadcast-part-two/src/message.rs @@ -0,0 +1,68 @@ +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; + +#[derive(Clone, Serialize, Deserialize, Debug)] +pub struct Message { + pub src: String, + pub dest: String, + pub body: Body, +} + +#[derive(Clone, Serialize, Deserialize, Debug)] +#[serde(tag = "type")] +#[serde(rename_all = "snake_case")] +pub enum Body { + Error { + in_reply_to: u64, + code: u64, + text: String, + }, + Init { + msg_id: u64, + node_id: String, + node_ids: Vec, + }, + InitOk { + in_reply_to: u64, + }, + Broadcast { + msg_id: u64, + message: u64, + }, + BroadcastOk { + msg_id: u64, + in_reply_to: u64, + }, + Read { + msg_id: u64, + }, + ReadOk { + msg_id: u64, + in_reply_to: u64, + messages: Vec, + }, + Topology { + msg_id: u64, + topology: HashMap>, + }, + TopologyOk { + msg_id: u64, + in_reply_to: u64, + }, + Gossip { + messages: Vec, + }, + GossipOk { + messages: Vec, + }, +} + +impl Message { + pub(crate) fn parse_message(message: String) -> Message { + serde_json::from_str(&message).unwrap() + } + + pub(crate) fn format_message(message: Message) -> String { + serde_json::to_string(&message).unwrap() + } +} diff --git a/3e-efficient-broadcast-part-two/src/node.rs b/3e-efficient-broadcast-part-two/src/node.rs new file mode 100644 index 0000000..bd8adcb --- /dev/null +++ b/3e-efficient-broadcast-part-two/src/node.rs @@ -0,0 +1,40 @@ +use crate::message::{Body, Message}; +use serde::{Deserialize, Serialize}; +use std::collections::HashSet; + +#[derive(Serialize, Deserialize, Clone, Debug, Default)] +pub(crate) struct Network(pub(crate) HashSet); + +#[derive(Serialize, Deserialize, Clone, Debug, Default)] +pub(crate) struct Node { + pub(crate) id: String, + pub(crate) availble_nodes: Vec, + pub(crate) network: Network, +} + +impl Node { + pub(crate) fn init(&self, message: Message) -> Node { + match message.body { + Body::Init { + node_id, node_ids, .. + } => { + return Node { + id: node_id.clone(), + availble_nodes: node_ids.clone(), + network: self.init_network(node_ids), + } + } + _ => panic!("Invalid message type"), + } + } + + fn init_network(&self, nodes: Vec) -> Network { + let mut neighbours = Network::default(); + neighbours.0.extend(nodes); + neighbours + } + + pub(crate) fn get_network(&self) -> Vec { + self.network.0.clone().into_iter().collect::>() + } +} diff --git a/3e-efficient-broadcast-part-two/src/storage.rs b/3e-efficient-broadcast-part-two/src/storage.rs new file mode 100644 index 0000000..27739d5 --- /dev/null +++ b/3e-efficient-broadcast-part-two/src/storage.rs @@ -0,0 +1,85 @@ +use serde::{Deserialize, Serialize}; +use std::collections::{HashMap, HashSet}; + +#[derive(Serialize, Deserialize, Clone, Debug, Default)] +pub(crate) struct Messages(pub(crate) HashSet); + +#[derive(Serialize, Deserialize, Clone, Debug, Default)] +pub(crate) struct Storage { + pub(crate) messages: Messages, + pub(crate) received_gossip_messages: HashMap, + pub(crate) sent_messages: HashMap, + pub(crate) retry: HashMap, +} + +impl Storage { + pub(crate) fn add_message(&mut self, message: u64) { + if !self.messages.0.contains(&message) { + self.messages.0.insert(message); + } + } + + pub(crate) fn add_messages(&mut self, messages: Vec, node: String) { + if self.received_gossip_messages.contains_key(&node) { + self.received_gossip_messages + .get_mut(&node) + .unwrap() + .0 + .extend(messages.iter()); + } else { + let mut v = Messages::default(); + v.0.extend(messages.iter()); + self.received_gossip_messages.insert(node, v); + } + + for m in messages { + if !self.messages.0.contains(&m) { + self.messages.0.insert(m); + } + } + } + + pub(crate) fn get_messages(&mut self) -> Vec { + self.messages.0.iter().cloned().collect() + } + + pub(crate) fn get_new_messages_for_neighbour(&self, node: String) -> Vec { + let received_messages = self.messages.0.clone().into_iter().collect::>(); + + let sent_to_node: Vec = self + .sent_messages + .iter() + .filter(|(key, _)| *key == &node) + .flat_map(|(_, Messages(value))| value) + .cloned() + .collect(); + + let received_from_node: Vec = self + .received_gossip_messages + .iter() + .filter(|(key, _)| *key == &node) + .flat_map(|(_, Messages(value))| value) + .cloned() + .collect(); + + let filtered_messages: Vec = received_messages + .into_iter() + .filter(|x| !sent_to_node.contains(x) && !received_from_node.contains(x)) + .collect(); + + filtered_messages + } + + pub(crate) fn add_to_sent_messages(&mut self, messages: Vec, node: String) { + if self.sent_messages.contains_key(&node) { + self.sent_messages + .get_mut(&node) + .unwrap() + .0 + .extend(messages); + } else { + self.sent_messages + .insert(node, Messages(messages.iter().cloned().collect())); + } + } +}