Add working solution for 3c
This commit is contained in:
parent
5a2a738abf
commit
2a45e98264
7 changed files with 474 additions and 0 deletions
1
3c-fault-tolerant-broadcast/.gitignore
vendored
Normal file
1
3c-fault-tolerant-broadcast/.gitignore
vendored
Normal file
|
|
@ -0,0 +1 @@
|
||||||
|
/target
|
||||||
89
3c-fault-tolerant-broadcast/Cargo.lock
generated
Normal file
89
3c-fault-tolerant-broadcast/Cargo.lock
generated
Normal file
|
|
@ -0,0 +1,89 @@
|
||||||
|
# This file is automatically @generated by Cargo.
|
||||||
|
# It is not intended for manual editing.
|
||||||
|
version = 3
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "ch03c-fault-tolerant-broadcast"
|
||||||
|
version = "0.1.0"
|
||||||
|
dependencies = [
|
||||||
|
"serde",
|
||||||
|
"serde_json",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "itoa"
|
||||||
|
version = "1.0.6"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "453ad9f582a441959e5f0d088b02ce04cfe8d51a8eaf077f12ac6d3e94164ca6"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "proc-macro2"
|
||||||
|
version = "1.0.56"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "2b63bdb0cd06f1f4dedf69b254734f9b45af66e4a031e42a7480257d9898b435"
|
||||||
|
dependencies = [
|
||||||
|
"unicode-ident",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "quote"
|
||||||
|
version = "1.0.26"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "4424af4bf778aae2051a77b60283332f386554255d722233d09fbfc7e30da2fc"
|
||||||
|
dependencies = [
|
||||||
|
"proc-macro2",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "ryu"
|
||||||
|
version = "1.0.13"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "f91339c0467de62360649f8d3e185ca8de4224ff281f66000de5eb2a77a79041"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "serde"
|
||||||
|
version = "1.0.160"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "bb2f3770c8bce3bcda7e149193a069a0f4365bda1fa5cd88e03bca26afc1216c"
|
||||||
|
dependencies = [
|
||||||
|
"serde_derive",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "serde_derive"
|
||||||
|
version = "1.0.160"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "291a097c63d8497e00160b166a967a4a79c64f3facdd01cbd7502231688d77df"
|
||||||
|
dependencies = [
|
||||||
|
"proc-macro2",
|
||||||
|
"quote",
|
||||||
|
"syn",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "serde_json"
|
||||||
|
version = "1.0.96"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "057d394a50403bcac12672b2b18fb387ab6d289d957dab67dd201875391e52f1"
|
||||||
|
dependencies = [
|
||||||
|
"itoa",
|
||||||
|
"ryu",
|
||||||
|
"serde",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "syn"
|
||||||
|
version = "2.0.15"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "a34fcf3e8b60f57e6a14301a2e916d323af98b0ea63c599441eec8558660c822"
|
||||||
|
dependencies = [
|
||||||
|
"proc-macro2",
|
||||||
|
"quote",
|
||||||
|
"unicode-ident",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "unicode-ident"
|
||||||
|
version = "1.0.8"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "e5464a87b239f13a63a501f2701565754bae92d243d4bb7eb12f6d57d2269bf4"
|
||||||
10
3c-fault-tolerant-broadcast/Cargo.toml
Normal file
10
3c-fault-tolerant-broadcast/Cargo.toml
Normal file
|
|
@ -0,0 +1,10 @@
|
||||||
|
[package]
|
||||||
|
name = "ch03c-fault-tolerant-broadcast"
|
||||||
|
version = "0.1.0"
|
||||||
|
edition = "2021"
|
||||||
|
|
||||||
|
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
serde = {version = "1", features = ["derive"] }
|
||||||
|
serde_json = "1"
|
||||||
193
3c-fault-tolerant-broadcast/src/main.rs
Normal file
193
3c-fault-tolerant-broadcast/src/main.rs
Normal file
|
|
@ -0,0 +1,193 @@
|
||||||
|
mod message;
|
||||||
|
mod node;
|
||||||
|
mod storage;
|
||||||
|
|
||||||
|
use crate::message::{Body, Message};
|
||||||
|
use crate::node::Node;
|
||||||
|
|
||||||
|
use std::io::prelude::*;
|
||||||
|
use std::io::{BufReader, Write};
|
||||||
|
use std::sync::{
|
||||||
|
mpsc,
|
||||||
|
mpsc::{Receiver, Sender},
|
||||||
|
Arc, Mutex,
|
||||||
|
};
|
||||||
|
use std::thread;
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
|
fn main() {
|
||||||
|
let (reader_tx, mut reader_rx) = mpsc::channel();
|
||||||
|
let (writer_tx, mut writer_rx) = mpsc::channel();
|
||||||
|
|
||||||
|
let node = Arc::new(Mutex::new(Node::default()));
|
||||||
|
|
||||||
|
let n1 = node.clone();
|
||||||
|
let n2 = node.clone();
|
||||||
|
|
||||||
|
let reader_tx1: Sender<Message> = reader_tx.clone();
|
||||||
|
let writer_tx1: Sender<Message> = writer_tx.clone();
|
||||||
|
let writer_tx2: Sender<Message> = writer_tx.clone();
|
||||||
|
|
||||||
|
let read = thread::spawn(move || {
|
||||||
|
read_from_stdin(reader_tx1);
|
||||||
|
});
|
||||||
|
|
||||||
|
let write = thread::spawn(move || {
|
||||||
|
write_to_stdout(&mut writer_rx);
|
||||||
|
});
|
||||||
|
|
||||||
|
let gossip = thread::spawn(move || loop {
|
||||||
|
thread::sleep(Duration::from_secs(1));
|
||||||
|
gossip_messages(n1.clone(), writer_tx2.clone());
|
||||||
|
});
|
||||||
|
|
||||||
|
let handle = thread::spawn(move || {
|
||||||
|
handle_messages(n2, &mut reader_rx, writer_tx1);
|
||||||
|
});
|
||||||
|
let _ = handle.join();
|
||||||
|
let _ = write.join();
|
||||||
|
let _ = gossip.join();
|
||||||
|
let _ = read.join();
|
||||||
|
}
|
||||||
|
|
||||||
|
fn read_from_stdin(reader_tx: Sender<Message>) {
|
||||||
|
let stdin = std::io::stdin();
|
||||||
|
let mut reader = BufReader::new(stdin.lock());
|
||||||
|
|
||||||
|
loop {
|
||||||
|
let mut buf = String::new();
|
||||||
|
reader.read_line(&mut buf).unwrap();
|
||||||
|
let message = Message::parse_message(buf.clone());
|
||||||
|
reader_tx.send(message).unwrap();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn write_to_stdout(writer_rx: &mut Receiver<Message>) {
|
||||||
|
let mut stdout = std::io::stdout();
|
||||||
|
|
||||||
|
loop {
|
||||||
|
let message = writer_rx.recv().unwrap();
|
||||||
|
let message = Message::format_message(message);
|
||||||
|
writeln!(stdout, "{}", message).unwrap();
|
||||||
|
stdout.flush().unwrap();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn gossip_messages(node: Arc<Mutex<Node>>, writer: Sender<Message>) {
|
||||||
|
let node = node.lock().unwrap();
|
||||||
|
if let Some(neighbours) = node.storage.get_neighbours(&node.get_id()) {
|
||||||
|
for n in neighbours {
|
||||||
|
let messages = node.storage.get_messages_for_node(n.clone());
|
||||||
|
let message = Message {
|
||||||
|
src: node.id.clone(),
|
||||||
|
dest: n.clone(),
|
||||||
|
body: Body::Gossip {
|
||||||
|
messages: messages.clone(),
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
writer.send(message).unwrap();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn handle_messages(node: Arc<Mutex<Node>>, input: &mut Receiver<Message>, writer: Sender<Message>) {
|
||||||
|
while let Ok(input) = input.recv() {
|
||||||
|
match input.body {
|
||||||
|
Body::Init { msg_id, .. } => {
|
||||||
|
node.lock().unwrap().init(input.clone());
|
||||||
|
let id = node.lock().unwrap().get_id();
|
||||||
|
let response = Message {
|
||||||
|
src: id,
|
||||||
|
dest: input.src,
|
||||||
|
body: Body::InitOk {
|
||||||
|
in_reply_to: msg_id,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
writer.send(response).unwrap();
|
||||||
|
}
|
||||||
|
Body::Broadcast { msg_id, message } => {
|
||||||
|
let id = node.lock().unwrap().get_id();
|
||||||
|
node.lock()
|
||||||
|
.unwrap()
|
||||||
|
.storage
|
||||||
|
.add_message(message, id.clone());
|
||||||
|
|
||||||
|
let response = Message {
|
||||||
|
src: id,
|
||||||
|
dest: input.src,
|
||||||
|
body: Body::BroadcastOk {
|
||||||
|
msg_id,
|
||||||
|
in_reply_to: msg_id,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
writer.send(response).unwrap();
|
||||||
|
}
|
||||||
|
Body::Gossip { messages } => {
|
||||||
|
let id = node.lock().unwrap().get_id();
|
||||||
|
for m in messages.iter() {
|
||||||
|
node.lock().unwrap().storage.add_message(*m, id.clone());
|
||||||
|
}
|
||||||
|
|
||||||
|
let response = Message {
|
||||||
|
src: id,
|
||||||
|
dest: input.src,
|
||||||
|
body: Body::GossipOk { messages },
|
||||||
|
};
|
||||||
|
|
||||||
|
writer.send(response).unwrap();
|
||||||
|
}
|
||||||
|
Body::GossipOk { messages } => {
|
||||||
|
let id = node.lock().unwrap().get_id();
|
||||||
|
node.lock()
|
||||||
|
.unwrap()
|
||||||
|
.storage
|
||||||
|
.add_to_sent_messages(messages, id.clone());
|
||||||
|
}
|
||||||
|
Body::Read { msg_id } => {
|
||||||
|
let id = node.lock().unwrap().get_id();
|
||||||
|
|
||||||
|
let response = Message {
|
||||||
|
src: id,
|
||||||
|
dest: input.src,
|
||||||
|
body: Body::ReadOk {
|
||||||
|
msg_id,
|
||||||
|
in_reply_to: msg_id,
|
||||||
|
messages: node.lock().unwrap().storage.get_messages(),
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
writer.send(response).unwrap();
|
||||||
|
}
|
||||||
|
Body::Topology { msg_id, topology } => {
|
||||||
|
let id = node.lock().unwrap().get_id();
|
||||||
|
node.lock().unwrap().storage.init_topology(topology);
|
||||||
|
|
||||||
|
let response = Message {
|
||||||
|
src: id,
|
||||||
|
dest: input.src,
|
||||||
|
body: Body::TopologyOk {
|
||||||
|
msg_id,
|
||||||
|
in_reply_to: msg_id,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
writer.send(response).unwrap();
|
||||||
|
}
|
||||||
|
Body::Error {
|
||||||
|
in_reply_to,
|
||||||
|
code,
|
||||||
|
text,
|
||||||
|
} => {
|
||||||
|
eprintln!(
|
||||||
|
"Error received (in_reply_to: {}, code: {}, text: {})",
|
||||||
|
in_reply_to, code, text
|
||||||
|
);
|
||||||
|
}
|
||||||
|
_ => (),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
println!("Error, nothing to read from receiver");
|
||||||
|
}
|
||||||
68
3c-fault-tolerant-broadcast/src/message.rs
Normal file
68
3c-fault-tolerant-broadcast/src/message.rs
Normal file
|
|
@ -0,0 +1,68 @@
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use std::collections::HashMap;
|
||||||
|
|
||||||
|
#[derive(Clone, Serialize, Deserialize, Debug)]
|
||||||
|
pub struct Message {
|
||||||
|
pub src: String,
|
||||||
|
pub dest: String,
|
||||||
|
pub body: Body,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Serialize, Deserialize, Debug)]
|
||||||
|
#[serde(tag = "type")]
|
||||||
|
#[serde(rename_all = "snake_case")]
|
||||||
|
pub enum Body {
|
||||||
|
Error {
|
||||||
|
in_reply_to: u64,
|
||||||
|
code: u64,
|
||||||
|
text: String,
|
||||||
|
},
|
||||||
|
Init {
|
||||||
|
msg_id: u64,
|
||||||
|
node_id: String,
|
||||||
|
node_ids: Vec<String>,
|
||||||
|
},
|
||||||
|
InitOk {
|
||||||
|
in_reply_to: u64,
|
||||||
|
},
|
||||||
|
Broadcast {
|
||||||
|
msg_id: u64,
|
||||||
|
message: u64,
|
||||||
|
},
|
||||||
|
BroadcastOk {
|
||||||
|
msg_id: u64,
|
||||||
|
in_reply_to: u64,
|
||||||
|
},
|
||||||
|
Read {
|
||||||
|
msg_id: u64,
|
||||||
|
},
|
||||||
|
ReadOk {
|
||||||
|
msg_id: u64,
|
||||||
|
in_reply_to: u64,
|
||||||
|
messages: Vec<u64>,
|
||||||
|
},
|
||||||
|
Topology {
|
||||||
|
msg_id: u64,
|
||||||
|
topology: HashMap<String, Vec<String>>,
|
||||||
|
},
|
||||||
|
TopologyOk {
|
||||||
|
msg_id: u64,
|
||||||
|
in_reply_to: u64,
|
||||||
|
},
|
||||||
|
Gossip {
|
||||||
|
messages: Vec<u64>,
|
||||||
|
},
|
||||||
|
GossipOk {
|
||||||
|
messages: Vec<u64>,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Message {
|
||||||
|
pub(crate) fn parse_message(message: String) -> Message {
|
||||||
|
serde_json::from_str(&message).unwrap()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn format_message(message: Message) -> String {
|
||||||
|
serde_json::to_string(&message).unwrap()
|
||||||
|
}
|
||||||
|
}
|
||||||
29
3c-fault-tolerant-broadcast/src/node.rs
Normal file
29
3c-fault-tolerant-broadcast/src/node.rs
Normal file
|
|
@ -0,0 +1,29 @@
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
|
use crate::message::{Body, Message};
|
||||||
|
use crate::storage::Storage;
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize, Debug, Default)]
|
||||||
|
pub(crate) struct Node {
|
||||||
|
pub(crate) id: String,
|
||||||
|
pub(crate) availble_nodes: Vec<String>,
|
||||||
|
pub(crate) storage: Storage,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Node {
|
||||||
|
pub(crate) fn init(&mut self, message: Message) {
|
||||||
|
match message.body {
|
||||||
|
Body::Init {
|
||||||
|
node_id, node_ids, ..
|
||||||
|
} => {
|
||||||
|
self.id = node_id;
|
||||||
|
self.availble_nodes = node_ids;
|
||||||
|
}
|
||||||
|
_ => panic!("Invalid message type"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn get_id(&self) -> String {
|
||||||
|
self.id.clone()
|
||||||
|
}
|
||||||
|
}
|
||||||
84
3c-fault-tolerant-broadcast/src/storage.rs
Normal file
84
3c-fault-tolerant-broadcast/src/storage.rs
Normal file
|
|
@ -0,0 +1,84 @@
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use std::collections::{HashMap, HashSet};
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize, Debug, Default)]
|
||||||
|
pub(crate) struct Topology(pub(crate) HashMap<String, Vec<String>>);
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize, Debug, Default)]
|
||||||
|
pub(crate) struct Messages(pub(crate) HashSet<u64>);
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize, Debug, Default)]
|
||||||
|
pub(crate) struct Storage {
|
||||||
|
pub(crate) messages: Messages,
|
||||||
|
pub(crate) received_messages: HashMap<String, Messages>,
|
||||||
|
pub(crate) sent_messages: HashMap<String, Messages>,
|
||||||
|
pub(crate) topology: Topology,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Storage {
|
||||||
|
pub(crate) fn add_message(&mut self, message: u64, node: String) {
|
||||||
|
self.messages.0.insert(message);
|
||||||
|
|
||||||
|
if self.received_messages.contains_key(&node) {
|
||||||
|
self.received_messages
|
||||||
|
.get_mut(&node)
|
||||||
|
.unwrap()
|
||||||
|
.0
|
||||||
|
.insert(message);
|
||||||
|
} else {
|
||||||
|
let mut v = Messages::default();
|
||||||
|
v.0.insert(message);
|
||||||
|
self.received_messages.insert(node, v);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn get_messages(&mut self) -> Vec<u64> {
|
||||||
|
self.messages.0.clone().into_iter().collect()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn get_messages_for_node(&self, node: String) -> Vec<u64> {
|
||||||
|
let received: Vec<u64> = self
|
||||||
|
.received_messages
|
||||||
|
.iter()
|
||||||
|
.filter(|(key, _)| *key == &node)
|
||||||
|
.flat_map(|(_, Messages(value))| value)
|
||||||
|
.cloned()
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
let sent: Vec<u64> = self
|
||||||
|
.sent_messages
|
||||||
|
.iter()
|
||||||
|
.filter(|(key, _)| *key == &node)
|
||||||
|
.flat_map(|(_, Messages(value))| value)
|
||||||
|
.cloned()
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
self.messages
|
||||||
|
.0
|
||||||
|
.iter()
|
||||||
|
.filter(|m| !received.contains(m) && !sent.contains(m))
|
||||||
|
.cloned()
|
||||||
|
.collect()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn add_to_sent_messages(&mut self, messages: Vec<u64>, node: String) {
|
||||||
|
if self.sent_messages.contains_key(&node) {
|
||||||
|
self.sent_messages
|
||||||
|
.get_mut(&node)
|
||||||
|
.unwrap()
|
||||||
|
.0
|
||||||
|
.extend(messages);
|
||||||
|
} else {
|
||||||
|
self.sent_messages
|
||||||
|
.insert(node, Messages(messages.iter().cloned().collect()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn init_topology(&mut self, topology: HashMap<String, Vec<String>>) {
|
||||||
|
self.topology.0 = topology;
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn get_neighbours(&self, node_id: &str) -> Option<Vec<String>> {
|
||||||
|
self.topology.0.get(node_id).cloned()
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
Reference in a new issue