From d06bac12b24756c65302f0aa6d502d3c00546058 Mon Sep 17 00:00:00 2001 From: Bastian Gruber Date: Tue, 16 May 2023 19:02:59 +0200 Subject: [PATCH] rustmft --- 3e-efficient-broadcast-part-two/rustfmt.toml | 6 + 3e-efficient-broadcast-part-two/src/main.rs | 330 +++++++++--------- .../src/message.rs | 107 +++--- 3e-efficient-broadcast-part-two/src/node.rs | 114 +++--- .../src/storage.rs | 131 +++---- 5 files changed, 348 insertions(+), 340 deletions(-) create mode 100644 3e-efficient-broadcast-part-two/rustfmt.toml diff --git a/3e-efficient-broadcast-part-two/rustfmt.toml b/3e-efficient-broadcast-part-two/rustfmt.toml new file mode 100644 index 0000000..abf4214 --- /dev/null +++ b/3e-efficient-broadcast-part-two/rustfmt.toml @@ -0,0 +1,6 @@ +hard_tabs = true +imports_granularity = "Crate" +reorder_impl_items = true +reorder_imports = true +group_imports = "StdExternalCrate" +reorder_modules = true \ No newline at end of file diff --git a/3e-efficient-broadcast-part-two/src/main.rs b/3e-efficient-broadcast-part-two/src/main.rs index f3bfd50..04d8248 100644 --- a/3e-efficient-broadcast-part-two/src/main.rs +++ b/3e-efficient-broadcast-part-two/src/main.rs @@ -2,22 +2,22 @@ mod message; mod node; mod storage; -use crate::message::{Body, Message}; -use crate::node::Node; -use crate::storage::Storage; +use std::{io::Write, println, sync::Arc, thread, time::Duration}; -use rand::prelude::*; -use rand::rngs::StdRng; -use std::io::Write; -use std::sync::Arc; -use std::time::Duration; -use std::{println, thread}; -use tokio::io::AsyncBufReadExt; -use tokio::io::BufReader; -use tokio::sync::{ - mpsc, - mpsc::{Receiver, Sender}, - Mutex, +use rand::{prelude::*, rngs::StdRng}; +use tokio::{ + io::{AsyncBufReadExt, BufReader}, + sync::{ + mpsc, + mpsc::{Receiver, Sender}, + Mutex, + }, +}; + +use crate::{ + message::{Body, Message}, + node::Node, + storage::Storage, }; const GOSSIP_DELAY: u64 = 500; @@ -26,194 +26,194 @@ const NETWORK_SIZE: usize = 25; #[tokio::main] async fn main() { - let (reader_tx, mut reader_rx) = mpsc::channel(1000); - let (writer_tx, mut writer_rx) = mpsc::channel(1000); + let (reader_tx, mut reader_rx) = mpsc::channel(1000); + let (writer_tx, mut writer_rx) = mpsc::channel(1000); - let reader_tx1: Sender = reader_tx.clone(); - let writer_tx1: Sender = writer_tx.clone(); - let writer_tx2: Sender = writer_tx.clone(); + let reader_tx1: Sender = reader_tx.clone(); + let writer_tx1: Sender = writer_tx.clone(); + let writer_tx2: Sender = writer_tx.clone(); - let store = Arc::new(Mutex::new(Storage::default())); + let store = Arc::new(Mutex::new(Storage::default())); - let node = Node::bootstrap().await; + let node = Node::bootstrap().await; - let n1 = node.clone(); - let s1 = store.clone(); + let n1 = node.clone(); + let s1 = store.clone(); - let read = tokio::spawn(async move { - read_from_stdin(reader_tx1).await; - }); + let read = tokio::spawn(async move { + read_from_stdin(reader_tx1).await; + }); - let write = tokio::spawn(async move { - write_to_stdout(&mut writer_rx).await; - }); + let write = tokio::spawn(async move { + write_to_stdout(&mut writer_rx).await; + }); - let gossip = tokio::spawn(async move { - loop { - thread::sleep(Duration::from_millis(GOSSIP_DELAY)); - gossip_messages(n1.clone(), s1.clone(), writer_tx2.clone()).await; - } - }); + let gossip = tokio::spawn(async move { + loop { + thread::sleep(Duration::from_millis(GOSSIP_DELAY)); + gossip_messages(n1.clone(), s1.clone(), writer_tx2.clone()).await; + } + }); - let handle = tokio::spawn(async move { - handle_messages(node.clone(), store.clone(), &mut reader_rx, writer_tx1).await; - }); + let handle = tokio::spawn(async move { + handle_messages(node.clone(), store.clone(), &mut reader_rx, writer_tx1).await; + }); - let _ = tokio::try_join!(read, handle, write, gossip); + let _ = tokio::try_join!(read, handle, write, gossip); } async fn read_from_stdin(reader_tx: Sender) { - let stdin = tokio::io::stdin(); - let mut reader = BufReader::new(stdin); + let stdin = tokio::io::stdin(); + let mut reader = BufReader::new(stdin); - loop { - let mut buf = String::new(); - reader.read_line(&mut buf).await.unwrap(); - let message = Message::parse_message(buf.clone()); - reader_tx.send(message).await.unwrap(); - } + loop { + let mut buf = String::new(); + reader.read_line(&mut buf).await.unwrap(); + let message = Message::parse_message(buf.clone()); + reader_tx.send(message).await.unwrap(); + } } async fn write_to_stdout(writer_rx: &mut Receiver) { - let mut stdout = std::io::stdout(); + let mut stdout = std::io::stdout(); - loop { - let message = writer_rx.recv().await.unwrap(); - let message = Message::format_message(message); - writeln!(stdout, "{}", message).unwrap(); - stdout.flush().unwrap(); - } + loop { + let message = writer_rx.recv().await.unwrap(); + let message = Message::format_message(message); + writeln!(stdout, "{}", message).unwrap(); + stdout.flush().unwrap(); + } } async fn gossip_messages(node: Node, storage: Arc>, writer: Sender) { - let mut rng = StdRng::from_entropy(); + let mut rng = StdRng::from_entropy(); - let num_to_select = rng.gen_range(MIN_AMOUNT_NODES..=NETWORK_SIZE); + let num_to_select = rng.gen_range(MIN_AMOUNT_NODES..=NETWORK_SIZE); - let selected_neighbours: Vec = node - .get_network() - .choose_multiple(&mut rng, num_to_select) - .cloned() - .collect(); + let selected_neighbours: Vec = node + .get_network() + .choose_multiple(&mut rng, num_to_select) + .cloned() + .collect(); - let mut tasks = vec![]; + let mut tasks = vec![]; - for n in selected_neighbours { - let storage_clone = storage.clone(); - let writer_clone = writer.clone(); - let node_clone = node.clone(); + for n in selected_neighbours { + let storage_clone = storage.clone(); + let writer_clone = writer.clone(); + let node_clone = node.clone(); - let task = tokio::spawn(async move { - let messages = storage_clone - .lock() - .await - .get_new_messages_for_neighbour(n.clone()); + let task = tokio::spawn(async move { + let messages = storage_clone + .lock() + .await + .get_new_messages_for_neighbour(n.clone()); - if messages.is_empty() { - return; - } + if messages.is_empty() { + return; + } - let message = Message { - src: node_clone.id.clone(), - dest: n.clone(), - body: Body::Gossip { - messages: messages.clone(), - }, - }; + let message = Message { + src: node_clone.id.clone(), + dest: n.clone(), + body: Body::Gossip { + messages: messages.clone(), + }, + }; - writer_clone.send(message).await.unwrap(); - }); + writer_clone.send(message).await.unwrap(); + }); - tasks.push(task); - } + tasks.push(task); + } - // Wait for all the gossip tasks to complete - for task in tasks { - task.await.unwrap(); - } + // Wait for all the gossip tasks to complete + for task in tasks { + task.await.unwrap(); + } } async fn handle_messages( - node: Node, - storage: Arc>, - input: &mut Receiver, - writer: Sender, + node: Node, + storage: Arc>, + input: &mut Receiver, + writer: Sender, ) { - while let Some(input) = input.recv().await { - match input.body { - Body::Broadcast { msg_id, message } => { - let id = node.id.clone(); - storage.lock().await.add_message(message); + while let Some(input) = input.recv().await { + match input.body { + Body::Broadcast { msg_id, message } => { + let id = node.id.clone(); + storage.lock().await.add_message(message); - let response = Message { - src: id, - dest: input.src, - body: Body::BroadcastOk { - msg_id, - in_reply_to: msg_id, - }, - }; + let response = Message { + src: id, + dest: input.src, + body: Body::BroadcastOk { + msg_id, + in_reply_to: msg_id, + }, + }; - writer.send(response).await.unwrap(); - } - Body::Gossip { messages } => { - let id = node.id.clone(); - storage - .lock() - .await - .add_messages(messages.clone(), input.src.clone()); + writer.send(response).await.unwrap(); + } + Body::Gossip { messages } => { + let id = node.id.clone(); + storage + .lock() + .await + .add_messages(messages.clone(), input.src.clone()); - let response = Message { - src: id, - dest: input.src, - body: Body::GossipOk { messages }, - }; + let response = Message { + src: id, + dest: input.src, + body: Body::GossipOk { messages }, + }; - writer.send(response).await.unwrap(); - } - Body::GossipOk { messages } => { - storage - .lock() - .await - .add_to_sent_messages(messages, input.src); - } - Body::Read { msg_id } => { - let response = Message { - src: node.id.clone(), - dest: input.src, - body: Body::ReadOk { - msg_id, - in_reply_to: msg_id, - messages: storage.lock().await.get_messages(), - }, - }; + writer.send(response).await.unwrap(); + } + Body::GossipOk { messages } => { + storage + .lock() + .await + .add_to_sent_messages(messages, input.src); + } + Body::Read { msg_id } => { + let response = Message { + src: node.id.clone(), + dest: input.src, + body: Body::ReadOk { + msg_id, + in_reply_to: msg_id, + messages: storage.lock().await.get_messages(), + }, + }; - writer.send(response).await.unwrap(); - } - Body::Topology { msg_id, .. } => { - let response = Message { - src: node.id.clone(), - dest: input.src, - body: Body::TopologyOk { - msg_id, - in_reply_to: msg_id, - }, - }; + writer.send(response).await.unwrap(); + } + Body::Topology { msg_id, .. } => { + let response = Message { + src: node.id.clone(), + dest: input.src, + body: Body::TopologyOk { + msg_id, + in_reply_to: msg_id, + }, + }; - writer.send(response).await.unwrap(); - } - Body::Error { - in_reply_to, - code, - text, - } => { - eprintln!( - "Error received (in_reply_to: {}, code: {}, text: {})", - in_reply_to, code, text - ); - } - _ => (), - } - } - println!("Error, nothing to read from receiver"); + writer.send(response).await.unwrap(); + } + Body::Error { + in_reply_to, + code, + text, + } => { + eprintln!( + "Error received (in_reply_to: {}, code: {}, text: {})", + in_reply_to, code, text + ); + } + _ => (), + } + } + println!("Error, nothing to read from receiver"); } diff --git a/3e-efficient-broadcast-part-two/src/message.rs b/3e-efficient-broadcast-part-two/src/message.rs index 452289b..5887a20 100644 --- a/3e-efficient-broadcast-part-two/src/message.rs +++ b/3e-efficient-broadcast-part-two/src/message.rs @@ -1,68 +1,69 @@ -use serde::{Deserialize, Serialize}; use std::collections::HashMap; +use serde::{Deserialize, Serialize}; + #[derive(Clone, Serialize, Deserialize, Debug)] pub struct Message { - pub src: String, - pub dest: String, - pub body: Body, + pub src: String, + pub dest: String, + pub body: Body, } #[derive(Clone, Serialize, Deserialize, Debug)] #[serde(tag = "type")] #[serde(rename_all = "snake_case")] pub enum Body { - Error { - in_reply_to: u64, - code: u64, - text: String, - }, - Init { - msg_id: u64, - node_id: String, - node_ids: Vec, - }, - InitOk { - in_reply_to: u64, - }, - Broadcast { - msg_id: u64, - message: u64, - }, - BroadcastOk { - msg_id: u64, - in_reply_to: u64, - }, - Read { - msg_id: u64, - }, - ReadOk { - msg_id: u64, - in_reply_to: u64, - messages: Vec, - }, - Topology { - msg_id: u64, - topology: HashMap>, - }, - TopologyOk { - msg_id: u64, - in_reply_to: u64, - }, - Gossip { - messages: Vec, - }, - GossipOk { - messages: Vec, - }, + Error { + in_reply_to: u64, + code: u64, + text: String, + }, + Init { + msg_id: u64, + node_id: String, + node_ids: Vec, + }, + InitOk { + in_reply_to: u64, + }, + Broadcast { + msg_id: u64, + message: u64, + }, + BroadcastOk { + msg_id: u64, + in_reply_to: u64, + }, + Read { + msg_id: u64, + }, + ReadOk { + msg_id: u64, + in_reply_to: u64, + messages: Vec, + }, + Topology { + msg_id: u64, + topology: HashMap>, + }, + TopologyOk { + msg_id: u64, + in_reply_to: u64, + }, + Gossip { + messages: Vec, + }, + GossipOk { + messages: Vec, + }, } impl Message { - pub(crate) fn parse_message(message: String) -> Message { - serde_json::from_str(&message).unwrap() - } + pub(crate) fn parse_message(message: String) -> Message { + serde_json::from_str(&message).unwrap() + } - pub(crate) fn format_message(message: Message) -> String { - serde_json::to_string(&message).unwrap() - } + pub(crate) fn format_message(message: Message) -> String { + serde_json::to_string(&message).unwrap() + } } diff --git a/3e-efficient-broadcast-part-two/src/node.rs b/3e-efficient-broadcast-part-two/src/node.rs index 77542fb..ddc5b11 100644 --- a/3e-efficient-broadcast-part-two/src/node.rs +++ b/3e-efficient-broadcast-part-two/src/node.rs @@ -1,76 +1,76 @@ -use crate::message::{Body, Message}; +use std::{collections::HashSet, io::Write}; + use serde::{Deserialize, Serialize}; -use std::collections::HashSet; -use std::io::Write; -use tokio::io::AsyncBufReadExt; -use tokio::io::BufReader; +use tokio::io::{AsyncBufReadExt, BufReader}; + +use crate::message::{Body, Message}; #[derive(Serialize, Deserialize, Clone, Debug, Default)] pub(crate) struct Network(pub(crate) HashSet); #[derive(Serialize, Deserialize, Clone, Debug, Default)] pub(crate) struct Node { - pub(crate) id: String, - pub(crate) availble_nodes: Vec, - pub(crate) network: Network, + pub(crate) id: String, + pub(crate) availble_nodes: Vec, + pub(crate) network: Network, } impl Node { - pub(crate) async fn bootstrap() -> Node { - let stdin = tokio::io::stdin(); - let mut stdout = std::io::stdout(); + pub(crate) async fn bootstrap() -> Node { + let stdin = tokio::io::stdin(); + let mut stdout = std::io::stdout(); - let mut reader = BufReader::new(stdin); - let mut buf = String::new(); + let mut reader = BufReader::new(stdin); + let mut buf = String::new(); - reader.read_line(&mut buf).await.unwrap(); - let message = Message::parse_message(buf.clone()); - let node = Node::init(message.clone()); + reader.read_line(&mut buf).await.unwrap(); + let message = Message::parse_message(buf.clone()); + let node = Node::init(message.clone()); - match message.body { - Body::Init { - msg_id, node_id, .. - } => { - let response = Message { - src: node_id, - dest: message.src.clone(), - body: Body::InitOk { - in_reply_to: msg_id, - }, - }; + match message.body { + Body::Init { + msg_id, node_id, .. + } => { + let response = Message { + src: node_id, + dest: message.src.clone(), + body: Body::InitOk { + in_reply_to: msg_id, + }, + }; - let message = Message::format_message(response); - writeln!(stdout, "{}", message).unwrap(); - stdout.flush().unwrap(); - } - _ => (), - } + let message = Message::format_message(response); + writeln!(stdout, "{}", message).unwrap(); + stdout.flush().unwrap(); + } + _ => (), + } - node - } + node + } - pub(crate) fn init(message: Message) -> Node { - match message.body { - Body::Init { - node_id, node_ids, .. - } => { - return Node { - id: node_id.clone(), - availble_nodes: node_ids.clone(), - network: Node::init_network(node_ids), - } - } - _ => panic!("Invalid message type"), - } - } + pub(crate) fn init(message: Message) -> Node { + match message.body { + Body::Init { + node_id, node_ids, .. + } => { + return Node { + id: node_id.clone(), + availble_nodes: node_ids.clone(), + network: Node::init_network(node_ids), + } + } + _ => panic!("Invalid message type"), + } + } - fn init_network(nodes: Vec) -> Network { - let mut neighbours = Network::default(); - neighbours.0.extend(nodes); - neighbours - } + fn init_network(nodes: Vec) -> Network { + let mut neighbours = Network::default(); + neighbours.0.extend(nodes); + neighbours + } - pub(crate) fn get_network(&self) -> Vec { - self.network.0.clone().into_iter().collect::>() - } + pub(crate) fn get_network(&self) -> Vec { + self.network.0.clone().into_iter().collect::>() + } } diff --git a/3e-efficient-broadcast-part-two/src/storage.rs b/3e-efficient-broadcast-part-two/src/storage.rs index 27739d5..7a1360f 100644 --- a/3e-efficient-broadcast-part-two/src/storage.rs +++ b/3e-efficient-broadcast-part-two/src/storage.rs @@ -1,85 +1,86 @@ -use serde::{Deserialize, Serialize}; use std::collections::{HashMap, HashSet}; +use serde::{Deserialize, Serialize}; + #[derive(Serialize, Deserialize, Clone, Debug, Default)] pub(crate) struct Messages(pub(crate) HashSet); #[derive(Serialize, Deserialize, Clone, Debug, Default)] pub(crate) struct Storage { - pub(crate) messages: Messages, - pub(crate) received_gossip_messages: HashMap, - pub(crate) sent_messages: HashMap, - pub(crate) retry: HashMap, + pub(crate) messages: Messages, + pub(crate) received_gossip_messages: HashMap, + pub(crate) sent_messages: HashMap, + pub(crate) retry: HashMap, } impl Storage { - pub(crate) fn add_message(&mut self, message: u64) { - if !self.messages.0.contains(&message) { - self.messages.0.insert(message); - } - } + pub(crate) fn add_message(&mut self, message: u64) { + if !self.messages.0.contains(&message) { + self.messages.0.insert(message); + } + } - pub(crate) fn add_messages(&mut self, messages: Vec, node: String) { - if self.received_gossip_messages.contains_key(&node) { - self.received_gossip_messages - .get_mut(&node) - .unwrap() - .0 - .extend(messages.iter()); - } else { - let mut v = Messages::default(); - v.0.extend(messages.iter()); - self.received_gossip_messages.insert(node, v); - } + pub(crate) fn add_messages(&mut self, messages: Vec, node: String) { + if self.received_gossip_messages.contains_key(&node) { + self.received_gossip_messages + .get_mut(&node) + .unwrap() + .0 + .extend(messages.iter()); + } else { + let mut v = Messages::default(); + v.0.extend(messages.iter()); + self.received_gossip_messages.insert(node, v); + } - for m in messages { - if !self.messages.0.contains(&m) { - self.messages.0.insert(m); - } - } - } + for m in messages { + if !self.messages.0.contains(&m) { + self.messages.0.insert(m); + } + } + } - pub(crate) fn get_messages(&mut self) -> Vec { - self.messages.0.iter().cloned().collect() - } + pub(crate) fn get_messages(&mut self) -> Vec { + self.messages.0.iter().cloned().collect() + } - pub(crate) fn get_new_messages_for_neighbour(&self, node: String) -> Vec { - let received_messages = self.messages.0.clone().into_iter().collect::>(); + pub(crate) fn get_new_messages_for_neighbour(&self, node: String) -> Vec { + let received_messages = self.messages.0.clone().into_iter().collect::>(); - let sent_to_node: Vec = self - .sent_messages - .iter() - .filter(|(key, _)| *key == &node) - .flat_map(|(_, Messages(value))| value) - .cloned() - .collect(); + let sent_to_node: Vec = self + .sent_messages + .iter() + .filter(|(key, _)| *key == &node) + .flat_map(|(_, Messages(value))| value) + .cloned() + .collect(); - let received_from_node: Vec = self - .received_gossip_messages - .iter() - .filter(|(key, _)| *key == &node) - .flat_map(|(_, Messages(value))| value) - .cloned() - .collect(); + let received_from_node: Vec = self + .received_gossip_messages + .iter() + .filter(|(key, _)| *key == &node) + .flat_map(|(_, Messages(value))| value) + .cloned() + .collect(); - let filtered_messages: Vec = received_messages - .into_iter() - .filter(|x| !sent_to_node.contains(x) && !received_from_node.contains(x)) - .collect(); + let filtered_messages: Vec = received_messages + .into_iter() + .filter(|x| !sent_to_node.contains(x) && !received_from_node.contains(x)) + .collect(); - filtered_messages - } + filtered_messages + } - pub(crate) fn add_to_sent_messages(&mut self, messages: Vec, node: String) { - if self.sent_messages.contains_key(&node) { - self.sent_messages - .get_mut(&node) - .unwrap() - .0 - .extend(messages); - } else { - self.sent_messages - .insert(node, Messages(messages.iter().cloned().collect())); - } - } + pub(crate) fn add_to_sent_messages(&mut self, messages: Vec, node: String) { + if self.sent_messages.contains_key(&node) { + self.sent_messages + .get_mut(&node) + .unwrap() + .0 + .extend(messages); + } else { + self.sent_messages + .insert(node, Messages(messages.iter().cloned().collect())); + } + } }