Works already with previous submitted version

This commit is contained in:
Bastian Gruber 2023-05-15 16:46:44 +02:00
parent 2a45e98264
commit 69f2b288e2
No known key found for this signature in database
GPG key ID: BE9F8C772B188CBF
7 changed files with 474 additions and 0 deletions

View file

@ -0,0 +1 @@
/target

View file

@ -0,0 +1,89 @@
# This file is automatically @generated by Cargo.
# It is not intended for manual editing.
version = 3
[[package]]
name = "ch03d-efficient-broadcast-part-one"
version = "0.1.0"
dependencies = [
"serde",
"serde_json",
]
[[package]]
name = "itoa"
version = "1.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "453ad9f582a441959e5f0d088b02ce04cfe8d51a8eaf077f12ac6d3e94164ca6"
[[package]]
name = "proc-macro2"
version = "1.0.56"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2b63bdb0cd06f1f4dedf69b254734f9b45af66e4a031e42a7480257d9898b435"
dependencies = [
"unicode-ident",
]
[[package]]
name = "quote"
version = "1.0.26"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4424af4bf778aae2051a77b60283332f386554255d722233d09fbfc7e30da2fc"
dependencies = [
"proc-macro2",
]
[[package]]
name = "ryu"
version = "1.0.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f91339c0467de62360649f8d3e185ca8de4224ff281f66000de5eb2a77a79041"
[[package]]
name = "serde"
version = "1.0.160"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bb2f3770c8bce3bcda7e149193a069a0f4365bda1fa5cd88e03bca26afc1216c"
dependencies = [
"serde_derive",
]
[[package]]
name = "serde_derive"
version = "1.0.160"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "291a097c63d8497e00160b166a967a4a79c64f3facdd01cbd7502231688d77df"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "serde_json"
version = "1.0.96"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "057d394a50403bcac12672b2b18fb387ab6d289d957dab67dd201875391e52f1"
dependencies = [
"itoa",
"ryu",
"serde",
]
[[package]]
name = "syn"
version = "2.0.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a34fcf3e8b60f57e6a14301a2e916d323af98b0ea63c599441eec8558660c822"
dependencies = [
"proc-macro2",
"quote",
"unicode-ident",
]
[[package]]
name = "unicode-ident"
version = "1.0.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5464a87b239f13a63a501f2701565754bae92d243d4bb7eb12f6d57d2269bf4"

View file

@ -0,0 +1,10 @@
[package]
name = "ch03d-efficient-broadcast-part-one"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
serde = {version = "1", features = ["derive"] }
serde_json = "1"

View file

@ -0,0 +1,193 @@
mod message;
mod node;
mod storage;
use crate::message::{Body, Message};
use crate::node::Node;
use std::io::prelude::*;
use std::io::{BufReader, Write};
use std::sync::{
mpsc,
mpsc::{Receiver, Sender},
Arc, Mutex,
};
use std::thread;
use std::time::Duration;
fn main() {
let (reader_tx, mut reader_rx) = mpsc::channel();
let (writer_tx, mut writer_rx) = mpsc::channel();
let node = Arc::new(Mutex::new(Node::default()));
let n1 = node.clone();
let n2 = node.clone();
let reader_tx1: Sender<Message> = reader_tx.clone();
let writer_tx1: Sender<Message> = writer_tx.clone();
let writer_tx2: Sender<Message> = writer_tx.clone();
let read = thread::spawn(move || {
read_from_stdin(reader_tx1);
});
let write = thread::spawn(move || {
write_to_stdout(&mut writer_rx);
});
let gossip = thread::spawn(move || loop {
thread::sleep(Duration::from_secs(1));
gossip_messages(n1.clone(), writer_tx2.clone());
});
let handle = thread::spawn(move || {
handle_messages(n2, &mut reader_rx, writer_tx1);
});
let _ = handle.join();
let _ = write.join();
let _ = gossip.join();
let _ = read.join();
}
fn read_from_stdin(reader_tx: Sender<Message>) {
let stdin = std::io::stdin();
let mut reader = BufReader::new(stdin.lock());
loop {
let mut buf = String::new();
reader.read_line(&mut buf).unwrap();
let message = Message::parse_message(buf.clone());
reader_tx.send(message).unwrap();
}
}
fn write_to_stdout(writer_rx: &mut Receiver<Message>) {
let mut stdout = std::io::stdout();
loop {
let message = writer_rx.recv().unwrap();
let message = Message::format_message(message);
writeln!(stdout, "{}", message).unwrap();
stdout.flush().unwrap();
}
}
fn gossip_messages(node: Arc<Mutex<Node>>, writer: Sender<Message>) {
let node = node.lock().unwrap();
if let Some(neighbours) = node.storage.get_neighbours(&node.get_id()) {
for n in neighbours {
let messages = node.storage.get_messages_for_node(n.clone());
let message = Message {
src: node.id.clone(),
dest: n.clone(),
body: Body::Gossip {
messages: messages.clone(),
},
};
writer.send(message).unwrap();
}
}
}
fn handle_messages(node: Arc<Mutex<Node>>, input: &mut Receiver<Message>, writer: Sender<Message>) {
while let Ok(input) = input.recv() {
match input.body {
Body::Init { msg_id, .. } => {
node.lock().unwrap().init(input.clone());
let id = node.lock().unwrap().get_id();
let response = Message {
src: id,
dest: input.src,
body: Body::InitOk {
in_reply_to: msg_id,
},
};
writer.send(response).unwrap();
}
Body::Broadcast { msg_id, message } => {
let id = node.lock().unwrap().get_id();
node.lock()
.unwrap()
.storage
.add_message(message, id.clone());
let response = Message {
src: id,
dest: input.src,
body: Body::BroadcastOk {
msg_id,
in_reply_to: msg_id,
},
};
writer.send(response).unwrap();
}
Body::Gossip { messages } => {
let id = node.lock().unwrap().get_id();
for m in messages.iter() {
node.lock().unwrap().storage.add_message(*m, id.clone());
}
let response = Message {
src: id,
dest: input.src,
body: Body::GossipOk { messages },
};
writer.send(response).unwrap();
}
Body::GossipOk { messages } => {
let id = node.lock().unwrap().get_id();
node.lock()
.unwrap()
.storage
.add_to_sent_messages(messages, id.clone());
}
Body::Read { msg_id } => {
let id = node.lock().unwrap().get_id();
let response = Message {
src: id,
dest: input.src,
body: Body::ReadOk {
msg_id,
in_reply_to: msg_id,
messages: node.lock().unwrap().storage.get_messages(),
},
};
writer.send(response).unwrap();
}
Body::Topology { msg_id, topology } => {
let id = node.lock().unwrap().get_id();
node.lock().unwrap().storage.init_topology(topology);
let response = Message {
src: id,
dest: input.src,
body: Body::TopologyOk {
msg_id,
in_reply_to: msg_id,
},
};
writer.send(response).unwrap();
}
Body::Error {
in_reply_to,
code,
text,
} => {
eprintln!(
"Error received (in_reply_to: {}, code: {}, text: {})",
in_reply_to, code, text
);
}
_ => (),
}
}
println!("Error, nothing to read from receiver");
}

View file

@ -0,0 +1,68 @@
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
#[derive(Clone, Serialize, Deserialize, Debug)]
pub struct Message {
pub src: String,
pub dest: String,
pub body: Body,
}
#[derive(Clone, Serialize, Deserialize, Debug)]
#[serde(tag = "type")]
#[serde(rename_all = "snake_case")]
pub enum Body {
Error {
in_reply_to: u64,
code: u64,
text: String,
},
Init {
msg_id: u64,
node_id: String,
node_ids: Vec<String>,
},
InitOk {
in_reply_to: u64,
},
Broadcast {
msg_id: u64,
message: u64,
},
BroadcastOk {
msg_id: u64,
in_reply_to: u64,
},
Read {
msg_id: u64,
},
ReadOk {
msg_id: u64,
in_reply_to: u64,
messages: Vec<u64>,
},
Topology {
msg_id: u64,
topology: HashMap<String, Vec<String>>,
},
TopologyOk {
msg_id: u64,
in_reply_to: u64,
},
Gossip {
messages: Vec<u64>,
},
GossipOk {
messages: Vec<u64>,
},
}
impl Message {
pub(crate) fn parse_message(message: String) -> Message {
serde_json::from_str(&message).unwrap()
}
pub(crate) fn format_message(message: Message) -> String {
serde_json::to_string(&message).unwrap()
}
}

View file

@ -0,0 +1,29 @@
use serde::{Deserialize, Serialize};
use crate::message::{Body, Message};
use crate::storage::Storage;
#[derive(Serialize, Deserialize, Debug, Default)]
pub(crate) struct Node {
pub(crate) id: String,
pub(crate) availble_nodes: Vec<String>,
pub(crate) storage: Storage,
}
impl Node {
pub(crate) fn init(&mut self, message: Message) {
match message.body {
Body::Init {
node_id, node_ids, ..
} => {
self.id = node_id;
self.availble_nodes = node_ids;
}
_ => panic!("Invalid message type"),
}
}
pub(crate) fn get_id(&self) -> String {
self.id.clone()
}
}

View file

@ -0,0 +1,84 @@
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
#[derive(Serialize, Deserialize, Debug, Default)]
pub(crate) struct Topology(pub(crate) HashMap<String, Vec<String>>);
#[derive(Serialize, Deserialize, Debug, Default)]
pub(crate) struct Messages(pub(crate) HashSet<u64>);
#[derive(Serialize, Deserialize, Debug, Default)]
pub(crate) struct Storage {
pub(crate) messages: Messages,
pub(crate) received_messages: HashMap<String, Messages>,
pub(crate) sent_messages: HashMap<String, Messages>,
pub(crate) topology: Topology,
}
impl Storage {
pub(crate) fn add_message(&mut self, message: u64, node: String) {
self.messages.0.insert(message);
if self.received_messages.contains_key(&node) {
self.received_messages
.get_mut(&node)
.unwrap()
.0
.insert(message);
} else {
let mut v = Messages::default();
v.0.insert(message);
self.received_messages.insert(node, v);
}
}
pub(crate) fn get_messages(&mut self) -> Vec<u64> {
self.messages.0.clone().into_iter().collect()
}
pub(crate) fn get_messages_for_node(&self, node: String) -> Vec<u64> {
let received: Vec<u64> = self
.received_messages
.iter()
.filter(|(key, _)| *key == &node)
.flat_map(|(_, Messages(value))| value)
.cloned()
.collect();
let sent: Vec<u64> = self
.sent_messages
.iter()
.filter(|(key, _)| *key == &node)
.flat_map(|(_, Messages(value))| value)
.cloned()
.collect();
self.messages
.0
.iter()
.filter(|m| !received.contains(m) && !sent.contains(m))
.cloned()
.collect()
}
pub(crate) fn add_to_sent_messages(&mut self, messages: Vec<u64>, node: String) {
if self.sent_messages.contains_key(&node) {
self.sent_messages
.get_mut(&node)
.unwrap()
.0
.extend(messages);
} else {
self.sent_messages
.insert(node, Messages(messages.iter().cloned().collect()));
}
}
pub(crate) fn init_topology(&mut self, topology: HashMap<String, Vec<String>>) {
self.topology.0 = topology;
}
pub(crate) fn get_neighbours(&self, node_id: &str) -> Option<Vec<String>> {
self.topology.0.get(node_id).cloned()
}
}