diff --git a/3d-efficient-broadcast-part-one/src/main.rs b/3d-efficient-broadcast-part-one/src/main.rs index 2363662..f64475a 100644 --- a/3d-efficient-broadcast-part-one/src/main.rs +++ b/3d-efficient-broadcast-part-one/src/main.rs @@ -6,6 +6,8 @@ use crate::message::{Body, Message}; use crate::node::Node; use crate::storage::Storage; +use rand::prelude::*; +use rand::rngs::StdRng; use std::io::Write; use std::sync::Arc; use std::time::Duration; @@ -18,6 +20,10 @@ use tokio::sync::{ Mutex, }; +const GOSSIP_DELAY: u64 = 150; +const MIN_AMOUNT_NODES: usize = 4; +const NETWORK_SIZE: usize = 25; + #[tokio::main] async fn main() { let (reader_tx, mut reader_rx) = mpsc::channel(1000); @@ -45,7 +51,7 @@ async fn main() { let gossip = tokio::spawn(async move { loop { - thread::sleep(Duration::from_millis(25)); + thread::sleep(Duration::from_millis(GOSSIP_DELAY)); gossip_messages(n1.clone(), s1.clone(), writer_tx2.clone()).await; } }); @@ -114,24 +120,50 @@ async fn write_to_stdout(writer_rx: &mut Receiver) { } async fn gossip_messages(node: Node, storage: Arc>, writer: Sender) { - for n in node.get_neighbours() { - let messages = storage.lock().await.get_messages_for_node(n.clone()); + let mut rng = StdRng::from_entropy(); - if messages.len() < 1 || storage.lock().await.get_retries(n.clone()) < 2 { - storage.lock().await.increase_or_insert(n); - continue; - } + let num_to_select = rng.gen_range(MIN_AMOUNT_NODES..=NETWORK_SIZE); - let message = Message { - src: node.id.clone(), - dest: n.clone(), - body: Body::Gossip { - messages: messages.clone(), - }, - }; + let selected_neighbours: Vec = node + .get_network() + .choose_multiple(&mut rng, num_to_select) + .cloned() + .collect(); - storage.lock().await.decrease_or_remove(n); - writer.send(message).await.unwrap(); + let mut tasks = vec![]; + + for n in selected_neighbours { + let storage_clone = storage.clone(); + let writer_clone = writer.clone(); + let node_clone = node.clone(); + + let task = tokio::spawn(async move { + let messages = storage_clone + .lock() + .await + .get_new_messages_for_neighbour(n.clone()); + + if messages.is_empty() { + return; + } + + let message = Message { + src: node_clone.id.clone(), + dest: n.clone(), + body: Body::Gossip { + messages: messages.clone(), + }, + }; + + writer_clone.send(message).await.unwrap(); + }); + + tasks.push(task); + } + + // Wait for all the gossip tasks to complete + for task in tasks { + task.await.unwrap(); } } @@ -145,7 +177,7 @@ async fn handle_messages( match input.body { Body::Broadcast { msg_id, message } => { let id = node.id.clone(); - storage.lock().await.add_message(message, id.clone()); + storage.lock().await.add_message(message); let response = Message { src: id, @@ -160,9 +192,10 @@ async fn handle_messages( } Body::Gossip { messages } => { let id = node.id.clone(); - for m in messages.iter() { - storage.lock().await.add_message(*m, id.clone()); - } + storage + .lock() + .await + .add_messages(messages.clone(), input.src.clone()); let response = Message { src: id, @@ -176,7 +209,7 @@ async fn handle_messages( storage .lock() .await - .add_to_sent_messages(messages, node.id.clone()); + .add_to_sent_messages(messages, input.src); } Body::Read { msg_id } => { let response = Message { diff --git a/3d-efficient-broadcast-part-one/src/node.rs b/3d-efficient-broadcast-part-one/src/node.rs index 0c85e22..bd8adcb 100644 --- a/3d-efficient-broadcast-part-one/src/node.rs +++ b/3d-efficient-broadcast-part-one/src/node.rs @@ -1,18 +1,15 @@ -use rand::seq::SliceRandom; -use rand::thread_rng; -use serde::{Deserialize, Serialize}; - use crate::message::{Body, Message}; +use serde::{Deserialize, Serialize}; use std::collections::HashSet; #[derive(Serialize, Deserialize, Clone, Debug, Default)] -pub(crate) struct Neighbours(pub(crate) HashSet); +pub(crate) struct Network(pub(crate) HashSet); #[derive(Serialize, Deserialize, Clone, Debug, Default)] pub(crate) struct Node { pub(crate) id: String, pub(crate) availble_nodes: Vec, - pub(crate) neighbours: Neighbours, + pub(crate) network: Network, } impl Node { @@ -24,24 +21,20 @@ impl Node { return Node { id: node_id.clone(), availble_nodes: node_ids.clone(), - neighbours: self.init_topology(node_ids), + network: self.init_network(node_ids), } } _ => panic!("Invalid message type"), } } - fn init_topology(&self, nodes: Vec) -> Neighbours { - let mut neighbours = Neighbours::default(); - - let mut rng = thread_rng(); - let selections: Vec = nodes.choose_multiple(&mut rng, 9).cloned().collect(); - - neighbours.0.extend(selections); + fn init_network(&self, nodes: Vec) -> Network { + let mut neighbours = Network::default(); + neighbours.0.extend(nodes); neighbours } - pub(crate) fn get_neighbours(&self) -> HashSet { - self.neighbours.0.clone() + pub(crate) fn get_network(&self) -> Vec { + self.network.0.clone().into_iter().collect::>() } } diff --git a/3d-efficient-broadcast-part-one/src/storage.rs b/3d-efficient-broadcast-part-one/src/storage.rs index b534bd7..27739d5 100644 --- a/3d-efficient-broadcast-part-one/src/storage.rs +++ b/3d-efficient-broadcast-part-one/src/storage.rs @@ -7,66 +7,46 @@ pub(crate) struct Messages(pub(crate) HashSet); #[derive(Serialize, Deserialize, Clone, Debug, Default)] pub(crate) struct Storage { pub(crate) messages: Messages, - pub(crate) received_messages: HashMap, + pub(crate) received_gossip_messages: HashMap, pub(crate) sent_messages: HashMap, pub(crate) retry: HashMap, } impl Storage { - pub(crate) fn add_message(&mut self, message: u64, node: String) { - self.messages.0.insert(message); + pub(crate) fn add_message(&mut self, message: u64) { + if !self.messages.0.contains(&message) { + self.messages.0.insert(message); + } + } - if self.received_messages.contains_key(&node) { - self.received_messages + pub(crate) fn add_messages(&mut self, messages: Vec, node: String) { + if self.received_gossip_messages.contains_key(&node) { + self.received_gossip_messages .get_mut(&node) .unwrap() .0 - .insert(message); + .extend(messages.iter()); } else { let mut v = Messages::default(); - v.0.insert(message); - self.received_messages.insert(node, v); + v.0.extend(messages.iter()); + self.received_gossip_messages.insert(node, v); + } + + for m in messages { + if !self.messages.0.contains(&m) { + self.messages.0.insert(m); + } } } pub(crate) fn get_messages(&mut self) -> Vec { - self.messages.0.clone().into_iter().collect() + self.messages.0.iter().cloned().collect() } - pub(crate) fn get_retries(&self, node: String) -> u8 { - match self.retry.get(&node) { - Some(count) => *count, - None => 0, - } - } + pub(crate) fn get_new_messages_for_neighbour(&self, node: String) -> Vec { + let received_messages = self.messages.0.clone().into_iter().collect::>(); - pub(crate) fn increase_or_insert(&mut self, node: String) { - let count = self.retry.entry(node).or_insert(0); - *count += 1; - } - - pub(crate) fn decrease_or_remove(&mut self, node: String) { - match self.retry.get_mut(&node) { - Some(count) => { - *count -= 1; - if *count == 0 { - self.retry.remove(&node); - } - } - None => (), - } - } - - pub(crate) fn get_messages_for_node(&self, node: String) -> Vec { - let received: Vec = self - .received_messages - .iter() - .filter(|(key, _)| *key == &node) - .flat_map(|(_, Messages(value))| value) - .cloned() - .collect(); - - let sent: Vec = self + let sent_to_node: Vec = self .sent_messages .iter() .filter(|(key, _)| *key == &node) @@ -74,12 +54,20 @@ impl Storage { .cloned() .collect(); - self.messages - .0 + let received_from_node: Vec = self + .received_gossip_messages .iter() - .filter(|m| !received.contains(m) && !sent.contains(m)) + .filter(|(key, _)| *key == &node) + .flat_map(|(_, Messages(value))| value) .cloned() - .collect() + .collect(); + + let filtered_messages: Vec = received_messages + .into_iter() + .filter(|x| !sent_to_node.contains(x) && !received_from_node.contains(x)) + .collect(); + + filtered_messages } pub(crate) fn add_to_sent_messages(&mut self, messages: Vec, node: String) {