diff --git a/3d-efficient-broadcast-part-one/Cargo.lock b/3d-efficient-broadcast-part-one/Cargo.lock index e81885d..db5ca6e 100644 --- a/3d-efficient-broadcast-part-one/Cargo.lock +++ b/3d-efficient-broadcast-part-one/Cargo.lock @@ -2,20 +2,50 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "cfg-if" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" + [[package]] name = "ch03d-efficient-broadcast-part-one" version = "0.1.0" dependencies = [ + "rand", "serde", "serde_json", ] +[[package]] +name = "getrandom" +version = "0.2.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c85e1d9ab2eadba7e5040d4e09cbd6d072b76a557ad64e797c2cb9d4da21d7e4" +dependencies = [ + "cfg-if", + "libc", + "wasi", +] + [[package]] name = "itoa" version = "1.0.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "453ad9f582a441959e5f0d088b02ce04cfe8d51a8eaf077f12ac6d3e94164ca6" +[[package]] +name = "libc" +version = "0.2.144" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b00cc1c228a6782d0f076e7b232802e0c5689d41bb5df366f2a6b6621cfdfe1" + +[[package]] +name = "ppv-lite86" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" + [[package]] name = "proc-macro2" version = "1.0.56" @@ -34,6 +64,36 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "rand" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" +dependencies = [ + "libc", + "rand_chacha", + "rand_core", +] + +[[package]] +name = "rand_chacha" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" +dependencies = [ + "ppv-lite86", + "rand_core", +] + +[[package]] +name = "rand_core" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" +dependencies = [ + "getrandom", +] + [[package]] name = "ryu" version = "1.0.13" @@ -87,3 +147,9 @@ name = "unicode-ident" version = "1.0.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5464a87b239f13a63a501f2701565754bae92d243d4bb7eb12f6d57d2269bf4" + +[[package]] +name = "wasi" +version = "0.11.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" diff --git a/3d-efficient-broadcast-part-one/Cargo.toml b/3d-efficient-broadcast-part-one/Cargo.toml index a0a5707..bb6183d 100644 --- a/3d-efficient-broadcast-part-one/Cargo.toml +++ b/3d-efficient-broadcast-part-one/Cargo.toml @@ -6,5 +6,6 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +rand = "0.8.5" serde = {version = "1", features = ["derive"] } serde_json = "1" diff --git a/3d-efficient-broadcast-part-one/src/main.rs b/3d-efficient-broadcast-part-one/src/main.rs index d714283..95b6310 100644 --- a/3d-efficient-broadcast-part-one/src/main.rs +++ b/3d-efficient-broadcast-part-one/src/main.rs @@ -12,8 +12,8 @@ use std::sync::{ mpsc::{Receiver, Sender}, Arc, Mutex, }; -use std::thread; use std::time::Duration; +use std::{println, thread}; fn main() { let (reader_tx, mut reader_rx) = mpsc::channel(); @@ -37,7 +37,7 @@ fn main() { }); let gossip = thread::spawn(move || loop { - thread::sleep(Duration::from_secs(1)); + thread::sleep(Duration::from_millis(100)); gossip_messages(n1.clone(), writer_tx2.clone()); }); @@ -75,30 +75,40 @@ fn write_to_stdout(writer_rx: &mut Receiver) { fn gossip_messages(node: Arc>, writer: Sender) { let node = node.lock().unwrap(); - if let Some(neighbours) = node.storage.get_neighbours(&node.get_id()) { - for n in neighbours { - let messages = node.storage.get_messages_for_node(n.clone()); - let message = Message { - src: node.id.clone(), - dest: n.clone(), - body: Body::Gossip { - messages: messages.clone(), - }, - }; + for n in node.storage.get_neighbours() { + let messages = node.storage.get_messages_for_node(n.clone()); - writer.send(message).unwrap(); + if messages.len() == 0 { + continue; } + + let message = Message { + src: node.id.clone(), + dest: n.clone(), + body: Body::Gossip { + messages: messages.clone(), + }, + }; + + writer.send(message).unwrap(); } } fn handle_messages(node: Arc>, input: &mut Receiver, writer: Sender) { while let Ok(input) = input.recv() { match input.body { - Body::Init { msg_id, .. } => { - node.lock().unwrap().init(input.clone()); - let id = node.lock().unwrap().get_id(); + Body::Init { + msg_id, + ref node_id, + ref node_ids, + .. + } => { + let mut node = node.lock().unwrap(); + node.init(input.clone()); + node.storage.init_topology(node_id.clone(), &node_ids); + let response = Message { - src: id, + src: node_id.clone(), dest: input.src, body: Body::InitOk { in_reply_to: msg_id, @@ -161,9 +171,8 @@ fn handle_messages(node: Arc>, input: &mut Receiver, writer writer.send(response).unwrap(); } - Body::Topology { msg_id, topology } => { + Body::Topology { msg_id, .. } => { let id = node.lock().unwrap().get_id(); - node.lock().unwrap().storage.init_topology(topology); let response = Message { src: id, diff --git a/3d-efficient-broadcast-part-one/src/node.rs b/3d-efficient-broadcast-part-one/src/node.rs index f48d4c2..39abb8a 100644 --- a/3d-efficient-broadcast-part-one/src/node.rs +++ b/3d-efficient-broadcast-part-one/src/node.rs @@ -16,8 +16,8 @@ impl Node { Body::Init { node_id, node_ids, .. } => { - self.id = node_id; - self.availble_nodes = node_ids; + self.id = node_id.clone(); + self.availble_nodes = node_ids.clone(); } _ => panic!("Invalid message type"), } diff --git a/3d-efficient-broadcast-part-one/src/storage.rs b/3d-efficient-broadcast-part-one/src/storage.rs index 4f2f43c..1943a75 100644 --- a/3d-efficient-broadcast-part-one/src/storage.rs +++ b/3d-efficient-broadcast-part-one/src/storage.rs @@ -1,8 +1,10 @@ +use rand::seq::SliceRandom; +use rand::thread_rng; use serde::{Deserialize, Serialize}; use std::collections::{HashMap, HashSet}; #[derive(Serialize, Deserialize, Debug, Default)] -pub(crate) struct Topology(pub(crate) HashMap>); +pub(crate) struct Neighbours(pub(crate) HashSet); #[derive(Serialize, Deserialize, Debug, Default)] pub(crate) struct Messages(pub(crate) HashSet); @@ -12,7 +14,7 @@ pub(crate) struct Storage { pub(crate) messages: Messages, pub(crate) received_messages: HashMap, pub(crate) sent_messages: HashMap, - pub(crate) topology: Topology, + pub(crate) neighbours: Neighbours, } impl Storage { @@ -74,11 +76,21 @@ impl Storage { } } - pub(crate) fn init_topology(&mut self, topology: HashMap>) { - self.topology.0 = topology; + pub(crate) fn init_topology(&mut self, node_id: String, nodes: &Vec) { + let i = nodes.iter().position(|x| *x == node_id).unwrap(); + + let left_neighbor = nodes[(i + nodes.len() - 1) % nodes.len()].clone(); + let right_neighbor = nodes[(i + 1) % nodes.len()].clone(); + + let mut rng = thread_rng(); + let selections: Vec = nodes.choose_multiple(&mut rng, 2).cloned().collect(); + + self.neighbours.0.extend(selections); + self.neighbours.0.insert(left_neighbor); + self.neighbours.0.insert(right_neighbor); } - pub(crate) fn get_neighbours(&self, node_id: &str) -> Option> { - self.topology.0.get(node_id).cloned() + pub(crate) fn get_neighbours(&self) -> HashSet { + self.neighbours.0.clone() } }