diff --git a/3d-efficient-broadcast-part-one/Cargo.lock b/3d-efficient-broadcast-part-one/Cargo.lock index db5ca6e..71a9242 100644 --- a/3d-efficient-broadcast-part-one/Cargo.lock +++ b/3d-efficient-broadcast-part-one/Cargo.lock @@ -2,6 +2,24 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "autocfg" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" + +[[package]] +name = "bitflags" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" + +[[package]] +name = "bytes" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89b2fd2a0dcf38d7971e2194b6b6eebab45ae01067456a7fd93d5547a61b70be" + [[package]] name = "cfg-if" version = "1.0.0" @@ -15,6 +33,7 @@ dependencies = [ "rand", "serde", "serde_json", + "tokio", ] [[package]] @@ -28,6 +47,15 @@ dependencies = [ "wasi", ] +[[package]] +name = "hermit-abi" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee512640fe35acbfb4bb779db6f0d80704c2cacfa2e39b601ef3e3f47d1ae4c7" +dependencies = [ + "libc", +] + [[package]] name = "itoa" version = "1.0.6" @@ -40,6 +68,76 @@ version = "0.2.144" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2b00cc1c228a6782d0f076e7b232802e0c5689d41bb5df366f2a6b6621cfdfe1" +[[package]] +name = "lock_api" +version = "0.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "435011366fe56583b16cf956f9df0095b405b82d76425bc8981c0e22e60ec4df" +dependencies = [ + "autocfg", + "scopeguard", +] + +[[package]] +name = "log" +version = "0.4.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "abb12e687cfb44aa40f41fc3978ef76448f9b6038cad6aef4259d3c095a2382e" +dependencies = [ + "cfg-if", +] + +[[package]] +name = "mio" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b9d9a46eff5b4ff64b45a9e316a6d1e0bc719ef429cbec4dc630684212bfdf9" +dependencies = [ + "libc", + "log", + "wasi", + "windows-sys 0.45.0", +] + +[[package]] +name = "num_cpus" +version = "1.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fac9e2da13b5eb447a6ce3d392f23a29d8694bff781bf03a16cd9ac8697593b" +dependencies = [ + "hermit-abi", + "libc", +] + +[[package]] +name = "parking_lot" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f" +dependencies = [ + "lock_api", + "parking_lot_core", +] + +[[package]] +name = "parking_lot_core" +version = "0.9.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9069cbb9f99e3a5083476ccb29ceb1de18b9118cafa53e90c9551235de2b9521" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall", + "smallvec", + "windows-sys 0.45.0", +] + +[[package]] +name = "pin-project-lite" +version = "0.2.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e0a7ae3ac2f1173085d398531c705756c94a4c56843785df85a60c1a0afac116" + [[package]] name = "ppv-lite86" version = "0.2.17" @@ -94,12 +192,27 @@ dependencies = [ "getrandom", ] +[[package]] +name = "redox_syscall" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb5a58c1855b4b6819d59012155603f0b22ad30cad752600aadfcb695265519a" +dependencies = [ + "bitflags", +] + [[package]] name = "ryu" version = "1.0.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f91339c0467de62360649f8d3e185ca8de4224ff281f66000de5eb2a77a79041" +[[package]] +name = "scopeguard" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" + [[package]] name = "serde" version = "1.0.160" @@ -131,6 +244,31 @@ dependencies = [ "serde", ] +[[package]] +name = "signal-hook-registry" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8229b473baa5980ac72ef434c4415e70c4b5e71b423043adb4ba059f89c99a1" +dependencies = [ + "libc", +] + +[[package]] +name = "smallvec" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a507befe795404456341dfab10cef66ead4c041f62b8b11bbb92bffe5d0953e0" + +[[package]] +name = "socket2" +version = "0.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "64a4a911eed85daf18834cfaa86a79b7d266ff93ff5ba14005426219480ed662" +dependencies = [ + "libc", + "winapi", +] + [[package]] name = "syn" version = "2.0.15" @@ -142,6 +280,36 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "tokio" +version = "1.28.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0aa32867d44e6f2ce3385e89dceb990188b8bb0fb25b0cf576647a6f98ac5105" +dependencies = [ + "autocfg", + "bytes", + "libc", + "mio", + "num_cpus", + "parking_lot", + "pin-project-lite", + "signal-hook-registry", + "socket2", + "tokio-macros", + "windows-sys 0.48.0", +] + +[[package]] +name = "tokio-macros" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "unicode-ident" version = "1.0.8" @@ -153,3 +321,157 @@ name = "wasi" version = "0.11.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" + +[[package]] +name = "winapi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" +dependencies = [ + "winapi-i686-pc-windows-gnu", + "winapi-x86_64-pc-windows-gnu", +] + +[[package]] +name = "winapi-i686-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" + +[[package]] +name = "winapi-x86_64-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" + +[[package]] +name = "windows-sys" +version = "0.45.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75283be5efb2831d37ea142365f009c02ec203cd29a3ebecbc093d52315b66d0" +dependencies = [ + "windows-targets 0.42.2", +] + +[[package]] +name = "windows-sys" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9" +dependencies = [ + "windows-targets 0.48.0", +] + +[[package]] +name = "windows-targets" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e5180c00cd44c9b1c88adb3693291f1cd93605ded80c250a75d472756b4d071" +dependencies = [ + "windows_aarch64_gnullvm 0.42.2", + "windows_aarch64_msvc 0.42.2", + "windows_i686_gnu 0.42.2", + "windows_i686_msvc 0.42.2", + "windows_x86_64_gnu 0.42.2", + "windows_x86_64_gnullvm 0.42.2", + "windows_x86_64_msvc 0.42.2", +] + +[[package]] +name = "windows-targets" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b1eb6f0cd7c80c79759c929114ef071b87354ce476d9d94271031c0497adfd5" +dependencies = [ + "windows_aarch64_gnullvm 0.48.0", + "windows_aarch64_msvc 0.48.0", + "windows_i686_gnu 0.48.0", + "windows_i686_msvc 0.48.0", + "windows_x86_64_gnu 0.48.0", + "windows_x86_64_gnullvm 0.48.0", + "windows_x86_64_msvc 0.48.0", +] + +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "597a5118570b68bc08d8d59125332c54f1ba9d9adeedeef5b99b02ba2b0698f8" + +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91ae572e1b79dba883e0d315474df7305d12f569b400fcf90581b06062f7e1bc" + +[[package]] +name = "windows_aarch64_msvc" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e08e8864a60f06ef0d0ff4ba04124db8b0fb3be5776a5cd47641e942e58c4d43" + +[[package]] +name = "windows_aarch64_msvc" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2ef27e0d7bdfcfc7b868b317c1d32c641a6fe4629c171b8928c7b08d98d7cf3" + +[[package]] +name = "windows_i686_gnu" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c61d927d8da41da96a81f029489353e68739737d3beca43145c8afec9a31a84f" + +[[package]] +name = "windows_i686_gnu" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "622a1962a7db830d6fd0a69683c80a18fda201879f0f447f065a3b7467daa241" + +[[package]] +name = "windows_i686_msvc" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "44d840b6ec649f480a41c8d80f9c65108b92d89345dd94027bfe06ac444d1060" + +[[package]] +name = "windows_i686_msvc" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4542c6e364ce21bf45d69fdd2a8e455fa38d316158cfd43b3ac1c5b1b19f8e00" + +[[package]] +name = "windows_x86_64_gnu" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8de912b8b8feb55c064867cf047dda097f92d51efad5b491dfb98f6bbb70cb36" + +[[package]] +name = "windows_x86_64_gnu" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca2b8a661f7628cbd23440e50b05d705db3686f894fc9580820623656af974b1" + +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26d41b46a36d453748aedef1486d5c7a85db22e56aff34643984ea85514e94a3" + +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7896dbc1f41e08872e9d5e8f8baa8fdd2677f29468c4e156210174edc7f7b953" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9aec5da331524158c6d1a4ac0ab1541149c0b9505fde06423b02f5ef0106b9f0" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a515f5799fe4961cb532f983ce2b23082366b898e52ffbce459c86f67c8378a" diff --git a/3d-efficient-broadcast-part-one/Cargo.toml b/3d-efficient-broadcast-part-one/Cargo.toml index bb6183d..8ccc2aa 100644 --- a/3d-efficient-broadcast-part-one/Cargo.toml +++ b/3d-efficient-broadcast-part-one/Cargo.toml @@ -9,3 +9,4 @@ edition = "2021" rand = "0.8.5" serde = {version = "1", features = ["derive"] } serde_json = "1" +tokio = { version = "1.28.1", features = ["full"] } diff --git a/3d-efficient-broadcast-part-one/src/main.rs b/3d-efficient-broadcast-part-one/src/main.rs index 95b6310..2363662 100644 --- a/3d-efficient-broadcast-part-one/src/main.rs +++ b/3d-efficient-broadcast-part-one/src/main.rs @@ -4,81 +4,121 @@ mod storage; use crate::message::{Body, Message}; use crate::node::Node; +use crate::storage::Storage; -use std::io::prelude::*; -use std::io::{BufReader, Write}; -use std::sync::{ - mpsc, - mpsc::{Receiver, Sender}, - Arc, Mutex, -}; +use std::io::Write; +use std::sync::Arc; use std::time::Duration; use std::{println, thread}; +use tokio::io::AsyncBufReadExt; +use tokio::io::BufReader; +use tokio::sync::{ + mpsc, + mpsc::{Receiver, Sender}, + Mutex, +}; -fn main() { - let (reader_tx, mut reader_rx) = mpsc::channel(); - let (writer_tx, mut writer_rx) = mpsc::channel(); - - let node = Arc::new(Mutex::new(Node::default())); - - let n1 = node.clone(); - let n2 = node.clone(); +#[tokio::main] +async fn main() { + let (reader_tx, mut reader_rx) = mpsc::channel(1000); + let (writer_tx, mut writer_rx) = mpsc::channel(1000); let reader_tx1: Sender = reader_tx.clone(); let writer_tx1: Sender = writer_tx.clone(); let writer_tx2: Sender = writer_tx.clone(); - let read = thread::spawn(move || { - read_from_stdin(reader_tx1); + let node = Node::default(); + let store = Arc::new(Mutex::new(Storage::default())); + + let node = init_node(node).await; + + let n1 = node.clone(); + let s1 = store.clone(); + + let read = tokio::spawn(async move { + read_from_stdin(reader_tx1).await; }); - let write = thread::spawn(move || { - write_to_stdout(&mut writer_rx); + let write = tokio::spawn(async move { + write_to_stdout(&mut writer_rx).await; }); - let gossip = thread::spawn(move || loop { - thread::sleep(Duration::from_millis(100)); - gossip_messages(n1.clone(), writer_tx2.clone()); + let gossip = tokio::spawn(async move { + loop { + thread::sleep(Duration::from_millis(25)); + gossip_messages(n1.clone(), s1.clone(), writer_tx2.clone()).await; + } }); - let handle = thread::spawn(move || { - handle_messages(n2, &mut reader_rx, writer_tx1); + let handle = tokio::spawn(async move { + handle_messages(node.clone(), store.clone(), &mut reader_rx, writer_tx1).await; }); - let _ = handle.join(); - let _ = write.join(); - let _ = gossip.join(); - let _ = read.join(); + + let _ = tokio::try_join!(read, handle, write, gossip); } -fn read_from_stdin(reader_tx: Sender) { - let stdin = std::io::stdin(); - let mut reader = BufReader::new(stdin.lock()); +async fn init_node(node: Node) -> Node { + let stdin = tokio::io::stdin(); + let mut stdout = std::io::stdout(); + + let mut reader = BufReader::new(stdin); + let mut buf = String::new(); + + reader.read_line(&mut buf).await.unwrap(); + let message = Message::parse_message(buf.clone()); + let node = node.init(message.clone()); + + match message.body { + Body::Init { + msg_id, node_id, .. + } => { + let response = Message { + src: node_id, + dest: message.src.clone(), + body: Body::InitOk { + in_reply_to: msg_id, + }, + }; + + let message = Message::format_message(response); + writeln!(stdout, "{}", message).unwrap(); + stdout.flush().unwrap(); + } + _ => (), + } + + node +} + +async fn read_from_stdin(reader_tx: Sender) { + let stdin = tokio::io::stdin(); + let mut reader = BufReader::new(stdin); loop { let mut buf = String::new(); - reader.read_line(&mut buf).unwrap(); + reader.read_line(&mut buf).await.unwrap(); let message = Message::parse_message(buf.clone()); - reader_tx.send(message).unwrap(); + reader_tx.send(message).await.unwrap(); } } -fn write_to_stdout(writer_rx: &mut Receiver) { +async fn write_to_stdout(writer_rx: &mut Receiver) { let mut stdout = std::io::stdout(); loop { - let message = writer_rx.recv().unwrap(); + let message = writer_rx.recv().await.unwrap(); let message = Message::format_message(message); writeln!(stdout, "{}", message).unwrap(); stdout.flush().unwrap(); } } -fn gossip_messages(node: Arc>, writer: Sender) { - let node = node.lock().unwrap(); - for n in node.storage.get_neighbours() { - let messages = node.storage.get_messages_for_node(n.clone()); +async fn gossip_messages(node: Node, storage: Arc>, writer: Sender) { + for n in node.get_neighbours() { + let messages = storage.lock().await.get_messages_for_node(n.clone()); - if messages.len() == 0 { + if messages.len() < 1 || storage.lock().await.get_retries(n.clone()) < 2 { + storage.lock().await.increase_or_insert(n); continue; } @@ -90,39 +130,22 @@ fn gossip_messages(node: Arc>, writer: Sender) { }, }; - writer.send(message).unwrap(); + storage.lock().await.decrease_or_remove(n); + writer.send(message).await.unwrap(); } } -fn handle_messages(node: Arc>, input: &mut Receiver, writer: Sender) { - while let Ok(input) = input.recv() { +async fn handle_messages( + node: Node, + storage: Arc>, + input: &mut Receiver, + writer: Sender, +) { + while let Some(input) = input.recv().await { match input.body { - Body::Init { - msg_id, - ref node_id, - ref node_ids, - .. - } => { - let mut node = node.lock().unwrap(); - node.init(input.clone()); - node.storage.init_topology(node_id.clone(), &node_ids); - - let response = Message { - src: node_id.clone(), - dest: input.src, - body: Body::InitOk { - in_reply_to: msg_id, - }, - }; - - writer.send(response).unwrap(); - } Body::Broadcast { msg_id, message } => { - let id = node.lock().unwrap().get_id(); - node.lock() - .unwrap() - .storage - .add_message(message, id.clone()); + let id = node.id.clone(); + storage.lock().await.add_message(message, id.clone()); let response = Message { src: id, @@ -133,12 +156,12 @@ fn handle_messages(node: Arc>, input: &mut Receiver, writer }, }; - writer.send(response).unwrap(); + writer.send(response).await.unwrap(); } Body::Gossip { messages } => { - let id = node.lock().unwrap().get_id(); + let id = node.id.clone(); for m in messages.iter() { - node.lock().unwrap().storage.add_message(*m, id.clone()); + storage.lock().await.add_message(*m, id.clone()); } let response = Message { @@ -147,35 +170,30 @@ fn handle_messages(node: Arc>, input: &mut Receiver, writer body: Body::GossipOk { messages }, }; - writer.send(response).unwrap(); + writer.send(response).await.unwrap(); } Body::GossipOk { messages } => { - let id = node.lock().unwrap().get_id(); - node.lock() - .unwrap() - .storage - .add_to_sent_messages(messages, id.clone()); + storage + .lock() + .await + .add_to_sent_messages(messages, node.id.clone()); } Body::Read { msg_id } => { - let id = node.lock().unwrap().get_id(); - let response = Message { - src: id, + src: node.id.clone(), dest: input.src, body: Body::ReadOk { msg_id, in_reply_to: msg_id, - messages: node.lock().unwrap().storage.get_messages(), + messages: storage.lock().await.get_messages(), }, }; - writer.send(response).unwrap(); + writer.send(response).await.unwrap(); } Body::Topology { msg_id, .. } => { - let id = node.lock().unwrap().get_id(); - let response = Message { - src: id, + src: node.id.clone(), dest: input.src, body: Body::TopologyOk { msg_id, @@ -183,7 +201,7 @@ fn handle_messages(node: Arc>, input: &mut Receiver, writer }, }; - writer.send(response).unwrap(); + writer.send(response).await.unwrap(); } Body::Error { in_reply_to, diff --git a/3d-efficient-broadcast-part-one/src/node.rs b/3d-efficient-broadcast-part-one/src/node.rs index 39abb8a..0c85e22 100644 --- a/3d-efficient-broadcast-part-one/src/node.rs +++ b/3d-efficient-broadcast-part-one/src/node.rs @@ -1,29 +1,47 @@ +use rand::seq::SliceRandom; +use rand::thread_rng; use serde::{Deserialize, Serialize}; use crate::message::{Body, Message}; -use crate::storage::Storage; +use std::collections::HashSet; -#[derive(Serialize, Deserialize, Debug, Default)] +#[derive(Serialize, Deserialize, Clone, Debug, Default)] +pub(crate) struct Neighbours(pub(crate) HashSet); + +#[derive(Serialize, Deserialize, Clone, Debug, Default)] pub(crate) struct Node { pub(crate) id: String, pub(crate) availble_nodes: Vec, - pub(crate) storage: Storage, + pub(crate) neighbours: Neighbours, } impl Node { - pub(crate) fn init(&mut self, message: Message) { + pub(crate) fn init(&self, message: Message) -> Node { match message.body { Body::Init { node_id, node_ids, .. } => { - self.id = node_id.clone(); - self.availble_nodes = node_ids.clone(); + return Node { + id: node_id.clone(), + availble_nodes: node_ids.clone(), + neighbours: self.init_topology(node_ids), + } } _ => panic!("Invalid message type"), } } - pub(crate) fn get_id(&self) -> String { - self.id.clone() + fn init_topology(&self, nodes: Vec) -> Neighbours { + let mut neighbours = Neighbours::default(); + + let mut rng = thread_rng(); + let selections: Vec = nodes.choose_multiple(&mut rng, 9).cloned().collect(); + + neighbours.0.extend(selections); + neighbours + } + + pub(crate) fn get_neighbours(&self) -> HashSet { + self.neighbours.0.clone() } } diff --git a/3d-efficient-broadcast-part-one/src/storage.rs b/3d-efficient-broadcast-part-one/src/storage.rs index 1943a75..b534bd7 100644 --- a/3d-efficient-broadcast-part-one/src/storage.rs +++ b/3d-efficient-broadcast-part-one/src/storage.rs @@ -1,20 +1,15 @@ -use rand::seq::SliceRandom; -use rand::thread_rng; use serde::{Deserialize, Serialize}; use std::collections::{HashMap, HashSet}; -#[derive(Serialize, Deserialize, Debug, Default)] -pub(crate) struct Neighbours(pub(crate) HashSet); - -#[derive(Serialize, Deserialize, Debug, Default)] +#[derive(Serialize, Deserialize, Clone, Debug, Default)] pub(crate) struct Messages(pub(crate) HashSet); -#[derive(Serialize, Deserialize, Debug, Default)] +#[derive(Serialize, Deserialize, Clone, Debug, Default)] pub(crate) struct Storage { pub(crate) messages: Messages, pub(crate) received_messages: HashMap, pub(crate) sent_messages: HashMap, - pub(crate) neighbours: Neighbours, + pub(crate) retry: HashMap, } impl Storage { @@ -38,6 +33,30 @@ impl Storage { self.messages.0.clone().into_iter().collect() } + pub(crate) fn get_retries(&self, node: String) -> u8 { + match self.retry.get(&node) { + Some(count) => *count, + None => 0, + } + } + + pub(crate) fn increase_or_insert(&mut self, node: String) { + let count = self.retry.entry(node).or_insert(0); + *count += 1; + } + + pub(crate) fn decrease_or_remove(&mut self, node: String) { + match self.retry.get_mut(&node) { + Some(count) => { + *count -= 1; + if *count == 0 { + self.retry.remove(&node); + } + } + None => (), + } + } + pub(crate) fn get_messages_for_node(&self, node: String) -> Vec { let received: Vec = self .received_messages @@ -75,22 +94,4 @@ impl Storage { .insert(node, Messages(messages.iter().cloned().collect())); } } - - pub(crate) fn init_topology(&mut self, node_id: String, nodes: &Vec) { - let i = nodes.iter().position(|x| *x == node_id).unwrap(); - - let left_neighbor = nodes[(i + nodes.len() - 1) % nodes.len()].clone(); - let right_neighbor = nodes[(i + 1) % nodes.len()].clone(); - - let mut rng = thread_rng(); - let selections: Vec = nodes.choose_multiple(&mut rng, 2).cloned().collect(); - - self.neighbours.0.extend(selections); - self.neighbours.0.insert(left_neighbor); - self.neighbours.0.insert(right_neighbor); - } - - pub(crate) fn get_neighbours(&self) -> HashSet { - self.neighbours.0.clone() - } }