From 2a45e98264547ebbb4de7fab6eefc4eafeddc32f Mon Sep 17 00:00:00 2001 From: Bastian Gruber Date: Mon, 15 May 2023 16:43:08 +0200 Subject: [PATCH] Add working solution for 3c --- 3c-fault-tolerant-broadcast/.gitignore | 1 + 3c-fault-tolerant-broadcast/Cargo.lock | 89 ++++++++++ 3c-fault-tolerant-broadcast/Cargo.toml | 10 ++ 3c-fault-tolerant-broadcast/src/main.rs | 193 +++++++++++++++++++++ 3c-fault-tolerant-broadcast/src/message.rs | 68 ++++++++ 3c-fault-tolerant-broadcast/src/node.rs | 29 ++++ 3c-fault-tolerant-broadcast/src/storage.rs | 84 +++++++++ 7 files changed, 474 insertions(+) create mode 100644 3c-fault-tolerant-broadcast/.gitignore create mode 100644 3c-fault-tolerant-broadcast/Cargo.lock create mode 100644 3c-fault-tolerant-broadcast/Cargo.toml create mode 100644 3c-fault-tolerant-broadcast/src/main.rs create mode 100644 3c-fault-tolerant-broadcast/src/message.rs create mode 100644 3c-fault-tolerant-broadcast/src/node.rs create mode 100644 3c-fault-tolerant-broadcast/src/storage.rs diff --git a/3c-fault-tolerant-broadcast/.gitignore b/3c-fault-tolerant-broadcast/.gitignore new file mode 100644 index 0000000..ea8c4bf --- /dev/null +++ b/3c-fault-tolerant-broadcast/.gitignore @@ -0,0 +1 @@ +/target diff --git a/3c-fault-tolerant-broadcast/Cargo.lock b/3c-fault-tolerant-broadcast/Cargo.lock new file mode 100644 index 0000000..16cffab --- /dev/null +++ b/3c-fault-tolerant-broadcast/Cargo.lock @@ -0,0 +1,89 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 3 + +[[package]] +name = "ch03c-fault-tolerant-broadcast" +version = "0.1.0" +dependencies = [ + "serde", + "serde_json", +] + +[[package]] +name = "itoa" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "453ad9f582a441959e5f0d088b02ce04cfe8d51a8eaf077f12ac6d3e94164ca6" + +[[package]] +name = "proc-macro2" +version = "1.0.56" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b63bdb0cd06f1f4dedf69b254734f9b45af66e4a031e42a7480257d9898b435" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "quote" +version = "1.0.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4424af4bf778aae2051a77b60283332f386554255d722233d09fbfc7e30da2fc" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "ryu" +version = "1.0.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f91339c0467de62360649f8d3e185ca8de4224ff281f66000de5eb2a77a79041" + +[[package]] +name = "serde" +version = "1.0.160" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb2f3770c8bce3bcda7e149193a069a0f4365bda1fa5cd88e03bca26afc1216c" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_derive" +version = "1.0.160" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "291a097c63d8497e00160b166a967a4a79c64f3facdd01cbd7502231688d77df" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "serde_json" +version = "1.0.96" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "057d394a50403bcac12672b2b18fb387ab6d289d957dab67dd201875391e52f1" +dependencies = [ + "itoa", + "ryu", + "serde", +] + +[[package]] +name = "syn" +version = "2.0.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a34fcf3e8b60f57e6a14301a2e916d323af98b0ea63c599441eec8558660c822" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + +[[package]] +name = "unicode-ident" +version = "1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5464a87b239f13a63a501f2701565754bae92d243d4bb7eb12f6d57d2269bf4" diff --git a/3c-fault-tolerant-broadcast/Cargo.toml b/3c-fault-tolerant-broadcast/Cargo.toml new file mode 100644 index 0000000..2e5490a --- /dev/null +++ b/3c-fault-tolerant-broadcast/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "ch03c-fault-tolerant-broadcast" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +serde = {version = "1", features = ["derive"] } +serde_json = "1" diff --git a/3c-fault-tolerant-broadcast/src/main.rs b/3c-fault-tolerant-broadcast/src/main.rs new file mode 100644 index 0000000..d714283 --- /dev/null +++ b/3c-fault-tolerant-broadcast/src/main.rs @@ -0,0 +1,193 @@ +mod message; +mod node; +mod storage; + +use crate::message::{Body, Message}; +use crate::node::Node; + +use std::io::prelude::*; +use std::io::{BufReader, Write}; +use std::sync::{ + mpsc, + mpsc::{Receiver, Sender}, + Arc, Mutex, +}; +use std::thread; +use std::time::Duration; + +fn main() { + let (reader_tx, mut reader_rx) = mpsc::channel(); + let (writer_tx, mut writer_rx) = mpsc::channel(); + + let node = Arc::new(Mutex::new(Node::default())); + + let n1 = node.clone(); + let n2 = node.clone(); + + let reader_tx1: Sender = reader_tx.clone(); + let writer_tx1: Sender = writer_tx.clone(); + let writer_tx2: Sender = writer_tx.clone(); + + let read = thread::spawn(move || { + read_from_stdin(reader_tx1); + }); + + let write = thread::spawn(move || { + write_to_stdout(&mut writer_rx); + }); + + let gossip = thread::spawn(move || loop { + thread::sleep(Duration::from_secs(1)); + gossip_messages(n1.clone(), writer_tx2.clone()); + }); + + let handle = thread::spawn(move || { + handle_messages(n2, &mut reader_rx, writer_tx1); + }); + let _ = handle.join(); + let _ = write.join(); + let _ = gossip.join(); + let _ = read.join(); +} + +fn read_from_stdin(reader_tx: Sender) { + let stdin = std::io::stdin(); + let mut reader = BufReader::new(stdin.lock()); + + loop { + let mut buf = String::new(); + reader.read_line(&mut buf).unwrap(); + let message = Message::parse_message(buf.clone()); + reader_tx.send(message).unwrap(); + } +} + +fn write_to_stdout(writer_rx: &mut Receiver) { + let mut stdout = std::io::stdout(); + + loop { + let message = writer_rx.recv().unwrap(); + let message = Message::format_message(message); + writeln!(stdout, "{}", message).unwrap(); + stdout.flush().unwrap(); + } +} + +fn gossip_messages(node: Arc>, writer: Sender) { + let node = node.lock().unwrap(); + if let Some(neighbours) = node.storage.get_neighbours(&node.get_id()) { + for n in neighbours { + let messages = node.storage.get_messages_for_node(n.clone()); + let message = Message { + src: node.id.clone(), + dest: n.clone(), + body: Body::Gossip { + messages: messages.clone(), + }, + }; + + writer.send(message).unwrap(); + } + } +} + +fn handle_messages(node: Arc>, input: &mut Receiver, writer: Sender) { + while let Ok(input) = input.recv() { + match input.body { + Body::Init { msg_id, .. } => { + node.lock().unwrap().init(input.clone()); + let id = node.lock().unwrap().get_id(); + let response = Message { + src: id, + dest: input.src, + body: Body::InitOk { + in_reply_to: msg_id, + }, + }; + + writer.send(response).unwrap(); + } + Body::Broadcast { msg_id, message } => { + let id = node.lock().unwrap().get_id(); + node.lock() + .unwrap() + .storage + .add_message(message, id.clone()); + + let response = Message { + src: id, + dest: input.src, + body: Body::BroadcastOk { + msg_id, + in_reply_to: msg_id, + }, + }; + + writer.send(response).unwrap(); + } + Body::Gossip { messages } => { + let id = node.lock().unwrap().get_id(); + for m in messages.iter() { + node.lock().unwrap().storage.add_message(*m, id.clone()); + } + + let response = Message { + src: id, + dest: input.src, + body: Body::GossipOk { messages }, + }; + + writer.send(response).unwrap(); + } + Body::GossipOk { messages } => { + let id = node.lock().unwrap().get_id(); + node.lock() + .unwrap() + .storage + .add_to_sent_messages(messages, id.clone()); + } + Body::Read { msg_id } => { + let id = node.lock().unwrap().get_id(); + + let response = Message { + src: id, + dest: input.src, + body: Body::ReadOk { + msg_id, + in_reply_to: msg_id, + messages: node.lock().unwrap().storage.get_messages(), + }, + }; + + writer.send(response).unwrap(); + } + Body::Topology { msg_id, topology } => { + let id = node.lock().unwrap().get_id(); + node.lock().unwrap().storage.init_topology(topology); + + let response = Message { + src: id, + dest: input.src, + body: Body::TopologyOk { + msg_id, + in_reply_to: msg_id, + }, + }; + + writer.send(response).unwrap(); + } + Body::Error { + in_reply_to, + code, + text, + } => { + eprintln!( + "Error received (in_reply_to: {}, code: {}, text: {})", + in_reply_to, code, text + ); + } + _ => (), + } + } + println!("Error, nothing to read from receiver"); +} diff --git a/3c-fault-tolerant-broadcast/src/message.rs b/3c-fault-tolerant-broadcast/src/message.rs new file mode 100644 index 0000000..452289b --- /dev/null +++ b/3c-fault-tolerant-broadcast/src/message.rs @@ -0,0 +1,68 @@ +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; + +#[derive(Clone, Serialize, Deserialize, Debug)] +pub struct Message { + pub src: String, + pub dest: String, + pub body: Body, +} + +#[derive(Clone, Serialize, Deserialize, Debug)] +#[serde(tag = "type")] +#[serde(rename_all = "snake_case")] +pub enum Body { + Error { + in_reply_to: u64, + code: u64, + text: String, + }, + Init { + msg_id: u64, + node_id: String, + node_ids: Vec, + }, + InitOk { + in_reply_to: u64, + }, + Broadcast { + msg_id: u64, + message: u64, + }, + BroadcastOk { + msg_id: u64, + in_reply_to: u64, + }, + Read { + msg_id: u64, + }, + ReadOk { + msg_id: u64, + in_reply_to: u64, + messages: Vec, + }, + Topology { + msg_id: u64, + topology: HashMap>, + }, + TopologyOk { + msg_id: u64, + in_reply_to: u64, + }, + Gossip { + messages: Vec, + }, + GossipOk { + messages: Vec, + }, +} + +impl Message { + pub(crate) fn parse_message(message: String) -> Message { + serde_json::from_str(&message).unwrap() + } + + pub(crate) fn format_message(message: Message) -> String { + serde_json::to_string(&message).unwrap() + } +} diff --git a/3c-fault-tolerant-broadcast/src/node.rs b/3c-fault-tolerant-broadcast/src/node.rs new file mode 100644 index 0000000..f48d4c2 --- /dev/null +++ b/3c-fault-tolerant-broadcast/src/node.rs @@ -0,0 +1,29 @@ +use serde::{Deserialize, Serialize}; + +use crate::message::{Body, Message}; +use crate::storage::Storage; + +#[derive(Serialize, Deserialize, Debug, Default)] +pub(crate) struct Node { + pub(crate) id: String, + pub(crate) availble_nodes: Vec, + pub(crate) storage: Storage, +} + +impl Node { + pub(crate) fn init(&mut self, message: Message) { + match message.body { + Body::Init { + node_id, node_ids, .. + } => { + self.id = node_id; + self.availble_nodes = node_ids; + } + _ => panic!("Invalid message type"), + } + } + + pub(crate) fn get_id(&self) -> String { + self.id.clone() + } +} diff --git a/3c-fault-tolerant-broadcast/src/storage.rs b/3c-fault-tolerant-broadcast/src/storage.rs new file mode 100644 index 0000000..4f2f43c --- /dev/null +++ b/3c-fault-tolerant-broadcast/src/storage.rs @@ -0,0 +1,84 @@ +use serde::{Deserialize, Serialize}; +use std::collections::{HashMap, HashSet}; + +#[derive(Serialize, Deserialize, Debug, Default)] +pub(crate) struct Topology(pub(crate) HashMap>); + +#[derive(Serialize, Deserialize, Debug, Default)] +pub(crate) struct Messages(pub(crate) HashSet); + +#[derive(Serialize, Deserialize, Debug, Default)] +pub(crate) struct Storage { + pub(crate) messages: Messages, + pub(crate) received_messages: HashMap, + pub(crate) sent_messages: HashMap, + pub(crate) topology: Topology, +} + +impl Storage { + pub(crate) fn add_message(&mut self, message: u64, node: String) { + self.messages.0.insert(message); + + if self.received_messages.contains_key(&node) { + self.received_messages + .get_mut(&node) + .unwrap() + .0 + .insert(message); + } else { + let mut v = Messages::default(); + v.0.insert(message); + self.received_messages.insert(node, v); + } + } + + pub(crate) fn get_messages(&mut self) -> Vec { + self.messages.0.clone().into_iter().collect() + } + + pub(crate) fn get_messages_for_node(&self, node: String) -> Vec { + let received: Vec = self + .received_messages + .iter() + .filter(|(key, _)| *key == &node) + .flat_map(|(_, Messages(value))| value) + .cloned() + .collect(); + + let sent: Vec = self + .sent_messages + .iter() + .filter(|(key, _)| *key == &node) + .flat_map(|(_, Messages(value))| value) + .cloned() + .collect(); + + self.messages + .0 + .iter() + .filter(|m| !received.contains(m) && !sent.contains(m)) + .cloned() + .collect() + } + + pub(crate) fn add_to_sent_messages(&mut self, messages: Vec, node: String) { + if self.sent_messages.contains_key(&node) { + self.sent_messages + .get_mut(&node) + .unwrap() + .0 + .extend(messages); + } else { + self.sent_messages + .insert(node, Messages(messages.iter().cloned().collect())); + } + } + + pub(crate) fn init_topology(&mut self, topology: HashMap>) { + self.topology.0 = topology; + } + + pub(crate) fn get_neighbours(&self, node_id: &str) -> Option> { + self.topology.0.get(node_id).cloned() + } +}