This commit is contained in:
Bastian Gruber 2023-05-16 19:02:59 +02:00
parent d2fc1fa266
commit d06bac12b2
No known key found for this signature in database
GPG key ID: BE9F8C772B188CBF
5 changed files with 348 additions and 340 deletions

View file

@ -0,0 +1,6 @@
hard_tabs = true
imports_granularity = "Crate"
reorder_impl_items = true
reorder_imports = true
group_imports = "StdExternalCrate"
reorder_modules = true

View file

@ -2,22 +2,22 @@ mod message;
mod node; mod node;
mod storage; mod storage;
use crate::message::{Body, Message}; use std::{io::Write, println, sync::Arc, thread, time::Duration};
use crate::node::Node;
use crate::storage::Storage;
use rand::prelude::*; use rand::{prelude::*, rngs::StdRng};
use rand::rngs::StdRng; use tokio::{
use std::io::Write; io::{AsyncBufReadExt, BufReader},
use std::sync::Arc; sync::{
use std::time::Duration; mpsc,
use std::{println, thread}; mpsc::{Receiver, Sender},
use tokio::io::AsyncBufReadExt; Mutex,
use tokio::io::BufReader; },
use tokio::sync::{ };
mpsc,
mpsc::{Receiver, Sender}, use crate::{
Mutex, message::{Body, Message},
node::Node,
storage::Storage,
}; };
const GOSSIP_DELAY: u64 = 500; const GOSSIP_DELAY: u64 = 500;
@ -26,194 +26,194 @@ const NETWORK_SIZE: usize = 25;
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
let (reader_tx, mut reader_rx) = mpsc::channel(1000); let (reader_tx, mut reader_rx) = mpsc::channel(1000);
let (writer_tx, mut writer_rx) = mpsc::channel(1000); let (writer_tx, mut writer_rx) = mpsc::channel(1000);
let reader_tx1: Sender<Message> = reader_tx.clone(); let reader_tx1: Sender<Message> = reader_tx.clone();
let writer_tx1: Sender<Message> = writer_tx.clone(); let writer_tx1: Sender<Message> = writer_tx.clone();
let writer_tx2: Sender<Message> = writer_tx.clone(); let writer_tx2: Sender<Message> = writer_tx.clone();
let store = Arc::new(Mutex::new(Storage::default())); let store = Arc::new(Mutex::new(Storage::default()));
let node = Node::bootstrap().await; let node = Node::bootstrap().await;
let n1 = node.clone(); let n1 = node.clone();
let s1 = store.clone(); let s1 = store.clone();
let read = tokio::spawn(async move { let read = tokio::spawn(async move {
read_from_stdin(reader_tx1).await; read_from_stdin(reader_tx1).await;
}); });
let write = tokio::spawn(async move { let write = tokio::spawn(async move {
write_to_stdout(&mut writer_rx).await; write_to_stdout(&mut writer_rx).await;
}); });
let gossip = tokio::spawn(async move { let gossip = tokio::spawn(async move {
loop { loop {
thread::sleep(Duration::from_millis(GOSSIP_DELAY)); thread::sleep(Duration::from_millis(GOSSIP_DELAY));
gossip_messages(n1.clone(), s1.clone(), writer_tx2.clone()).await; gossip_messages(n1.clone(), s1.clone(), writer_tx2.clone()).await;
} }
}); });
let handle = tokio::spawn(async move { let handle = tokio::spawn(async move {
handle_messages(node.clone(), store.clone(), &mut reader_rx, writer_tx1).await; handle_messages(node.clone(), store.clone(), &mut reader_rx, writer_tx1).await;
}); });
let _ = tokio::try_join!(read, handle, write, gossip); let _ = tokio::try_join!(read, handle, write, gossip);
} }
async fn read_from_stdin(reader_tx: Sender<Message>) { async fn read_from_stdin(reader_tx: Sender<Message>) {
let stdin = tokio::io::stdin(); let stdin = tokio::io::stdin();
let mut reader = BufReader::new(stdin); let mut reader = BufReader::new(stdin);
loop { loop {
let mut buf = String::new(); let mut buf = String::new();
reader.read_line(&mut buf).await.unwrap(); reader.read_line(&mut buf).await.unwrap();
let message = Message::parse_message(buf.clone()); let message = Message::parse_message(buf.clone());
reader_tx.send(message).await.unwrap(); reader_tx.send(message).await.unwrap();
} }
} }
async fn write_to_stdout(writer_rx: &mut Receiver<Message>) { async fn write_to_stdout(writer_rx: &mut Receiver<Message>) {
let mut stdout = std::io::stdout(); let mut stdout = std::io::stdout();
loop { loop {
let message = writer_rx.recv().await.unwrap(); let message = writer_rx.recv().await.unwrap();
let message = Message::format_message(message); let message = Message::format_message(message);
writeln!(stdout, "{}", message).unwrap(); writeln!(stdout, "{}", message).unwrap();
stdout.flush().unwrap(); stdout.flush().unwrap();
} }
} }
async fn gossip_messages(node: Node, storage: Arc<Mutex<Storage>>, writer: Sender<Message>) { async fn gossip_messages(node: Node, storage: Arc<Mutex<Storage>>, writer: Sender<Message>) {
let mut rng = StdRng::from_entropy(); let mut rng = StdRng::from_entropy();
let num_to_select = rng.gen_range(MIN_AMOUNT_NODES..=NETWORK_SIZE); let num_to_select = rng.gen_range(MIN_AMOUNT_NODES..=NETWORK_SIZE);
let selected_neighbours: Vec<String> = node let selected_neighbours: Vec<String> = node
.get_network() .get_network()
.choose_multiple(&mut rng, num_to_select) .choose_multiple(&mut rng, num_to_select)
.cloned() .cloned()
.collect(); .collect();
let mut tasks = vec![]; let mut tasks = vec![];
for n in selected_neighbours { for n in selected_neighbours {
let storage_clone = storage.clone(); let storage_clone = storage.clone();
let writer_clone = writer.clone(); let writer_clone = writer.clone();
let node_clone = node.clone(); let node_clone = node.clone();
let task = tokio::spawn(async move { let task = tokio::spawn(async move {
let messages = storage_clone let messages = storage_clone
.lock() .lock()
.await .await
.get_new_messages_for_neighbour(n.clone()); .get_new_messages_for_neighbour(n.clone());
if messages.is_empty() { if messages.is_empty() {
return; return;
} }
let message = Message { let message = Message {
src: node_clone.id.clone(), src: node_clone.id.clone(),
dest: n.clone(), dest: n.clone(),
body: Body::Gossip { body: Body::Gossip {
messages: messages.clone(), messages: messages.clone(),
}, },
}; };
writer_clone.send(message).await.unwrap(); writer_clone.send(message).await.unwrap();
}); });
tasks.push(task); tasks.push(task);
} }
// Wait for all the gossip tasks to complete // Wait for all the gossip tasks to complete
for task in tasks { for task in tasks {
task.await.unwrap(); task.await.unwrap();
} }
} }
async fn handle_messages( async fn handle_messages(
node: Node, node: Node,
storage: Arc<Mutex<Storage>>, storage: Arc<Mutex<Storage>>,
input: &mut Receiver<Message>, input: &mut Receiver<Message>,
writer: Sender<Message>, writer: Sender<Message>,
) { ) {
while let Some(input) = input.recv().await { while let Some(input) = input.recv().await {
match input.body { match input.body {
Body::Broadcast { msg_id, message } => { Body::Broadcast { msg_id, message } => {
let id = node.id.clone(); let id = node.id.clone();
storage.lock().await.add_message(message); storage.lock().await.add_message(message);
let response = Message { let response = Message {
src: id, src: id,
dest: input.src, dest: input.src,
body: Body::BroadcastOk { body: Body::BroadcastOk {
msg_id, msg_id,
in_reply_to: msg_id, in_reply_to: msg_id,
}, },
}; };
writer.send(response).await.unwrap(); writer.send(response).await.unwrap();
} }
Body::Gossip { messages } => { Body::Gossip { messages } => {
let id = node.id.clone(); let id = node.id.clone();
storage storage
.lock() .lock()
.await .await
.add_messages(messages.clone(), input.src.clone()); .add_messages(messages.clone(), input.src.clone());
let response = Message { let response = Message {
src: id, src: id,
dest: input.src, dest: input.src,
body: Body::GossipOk { messages }, body: Body::GossipOk { messages },
}; };
writer.send(response).await.unwrap(); writer.send(response).await.unwrap();
} }
Body::GossipOk { messages } => { Body::GossipOk { messages } => {
storage storage
.lock() .lock()
.await .await
.add_to_sent_messages(messages, input.src); .add_to_sent_messages(messages, input.src);
} }
Body::Read { msg_id } => { Body::Read { msg_id } => {
let response = Message { let response = Message {
src: node.id.clone(), src: node.id.clone(),
dest: input.src, dest: input.src,
body: Body::ReadOk { body: Body::ReadOk {
msg_id, msg_id,
in_reply_to: msg_id, in_reply_to: msg_id,
messages: storage.lock().await.get_messages(), messages: storage.lock().await.get_messages(),
}, },
}; };
writer.send(response).await.unwrap(); writer.send(response).await.unwrap();
} }
Body::Topology { msg_id, .. } => { Body::Topology { msg_id, .. } => {
let response = Message { let response = Message {
src: node.id.clone(), src: node.id.clone(),
dest: input.src, dest: input.src,
body: Body::TopologyOk { body: Body::TopologyOk {
msg_id, msg_id,
in_reply_to: msg_id, in_reply_to: msg_id,
}, },
}; };
writer.send(response).await.unwrap(); writer.send(response).await.unwrap();
} }
Body::Error { Body::Error {
in_reply_to, in_reply_to,
code, code,
text, text,
} => { } => {
eprintln!( eprintln!(
"Error received (in_reply_to: {}, code: {}, text: {})", "Error received (in_reply_to: {}, code: {}, text: {})",
in_reply_to, code, text in_reply_to, code, text
); );
} }
_ => (), _ => (),
} }
} }
println!("Error, nothing to read from receiver"); println!("Error, nothing to read from receiver");
} }

View file

@ -1,68 +1,69 @@
use serde::{Deserialize, Serialize};
use std::collections::HashMap; use std::collections::HashMap;
use serde::{Deserialize, Serialize};
#[derive(Clone, Serialize, Deserialize, Debug)] #[derive(Clone, Serialize, Deserialize, Debug)]
pub struct Message { pub struct Message {
pub src: String, pub src: String,
pub dest: String, pub dest: String,
pub body: Body, pub body: Body,
} }
#[derive(Clone, Serialize, Deserialize, Debug)] #[derive(Clone, Serialize, Deserialize, Debug)]
#[serde(tag = "type")] #[serde(tag = "type")]
#[serde(rename_all = "snake_case")] #[serde(rename_all = "snake_case")]
pub enum Body { pub enum Body {
Error { Error {
in_reply_to: u64, in_reply_to: u64,
code: u64, code: u64,
text: String, text: String,
}, },
Init { Init {
msg_id: u64, msg_id: u64,
node_id: String, node_id: String,
node_ids: Vec<String>, node_ids: Vec<String>,
}, },
InitOk { InitOk {
in_reply_to: u64, in_reply_to: u64,
}, },
Broadcast { Broadcast {
msg_id: u64, msg_id: u64,
message: u64, message: u64,
}, },
BroadcastOk { BroadcastOk {
msg_id: u64, msg_id: u64,
in_reply_to: u64, in_reply_to: u64,
}, },
Read { Read {
msg_id: u64, msg_id: u64,
}, },
ReadOk { ReadOk {
msg_id: u64, msg_id: u64,
in_reply_to: u64, in_reply_to: u64,
messages: Vec<u64>, messages: Vec<u64>,
}, },
Topology { Topology {
msg_id: u64, msg_id: u64,
topology: HashMap<String, Vec<String>>, topology: HashMap<String, Vec<String>>,
}, },
TopologyOk { TopologyOk {
msg_id: u64, msg_id: u64,
in_reply_to: u64, in_reply_to: u64,
}, },
Gossip { Gossip {
messages: Vec<u64>, messages: Vec<u64>,
}, },
GossipOk { GossipOk {
messages: Vec<u64>, messages: Vec<u64>,
}, },
} }
impl Message { impl Message {
pub(crate) fn parse_message(message: String) -> Message { pub(crate) fn parse_message(message: String) -> Message {
serde_json::from_str(&message).unwrap() serde_json::from_str(&message).unwrap()
} }
pub(crate) fn format_message(message: Message) -> String { pub(crate) fn format_message(message: Message) -> String {
serde_json::to_string(&message).unwrap() serde_json::to_string(&message).unwrap()
} }
} }

View file

@ -1,76 +1,76 @@
use crate::message::{Body, Message}; use std::{collections::HashSet, io::Write};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::collections::HashSet; use tokio::io::{AsyncBufReadExt, BufReader};
use std::io::Write;
use tokio::io::AsyncBufReadExt; use crate::message::{Body, Message};
use tokio::io::BufReader;
#[derive(Serialize, Deserialize, Clone, Debug, Default)] #[derive(Serialize, Deserialize, Clone, Debug, Default)]
pub(crate) struct Network(pub(crate) HashSet<String>); pub(crate) struct Network(pub(crate) HashSet<String>);
#[derive(Serialize, Deserialize, Clone, Debug, Default)] #[derive(Serialize, Deserialize, Clone, Debug, Default)]
pub(crate) struct Node { pub(crate) struct Node {
pub(crate) id: String, pub(crate) id: String,
pub(crate) availble_nodes: Vec<String>, pub(crate) availble_nodes: Vec<String>,
pub(crate) network: Network, pub(crate) network: Network,
} }
impl Node { impl Node {
pub(crate) async fn bootstrap() -> Node { pub(crate) async fn bootstrap() -> Node {
let stdin = tokio::io::stdin(); let stdin = tokio::io::stdin();
let mut stdout = std::io::stdout(); let mut stdout = std::io::stdout();
let mut reader = BufReader::new(stdin); let mut reader = BufReader::new(stdin);
let mut buf = String::new(); let mut buf = String::new();
reader.read_line(&mut buf).await.unwrap(); reader.read_line(&mut buf).await.unwrap();
let message = Message::parse_message(buf.clone()); let message = Message::parse_message(buf.clone());
let node = Node::init(message.clone()); let node = Node::init(message.clone());
match message.body { match message.body {
Body::Init { Body::Init {
msg_id, node_id, .. msg_id, node_id, ..
} => { } => {
let response = Message { let response = Message {
src: node_id, src: node_id,
dest: message.src.clone(), dest: message.src.clone(),
body: Body::InitOk { body: Body::InitOk {
in_reply_to: msg_id, in_reply_to: msg_id,
}, },
}; };
let message = Message::format_message(response); let message = Message::format_message(response);
writeln!(stdout, "{}", message).unwrap(); writeln!(stdout, "{}", message).unwrap();
stdout.flush().unwrap(); stdout.flush().unwrap();
} }
_ => (), _ => (),
} }
node node
} }
pub(crate) fn init(message: Message) -> Node { pub(crate) fn init(message: Message) -> Node {
match message.body { match message.body {
Body::Init { Body::Init {
node_id, node_ids, .. node_id, node_ids, ..
} => { } => {
return Node { return Node {
id: node_id.clone(), id: node_id.clone(),
availble_nodes: node_ids.clone(), availble_nodes: node_ids.clone(),
network: Node::init_network(node_ids), network: Node::init_network(node_ids),
} }
} }
_ => panic!("Invalid message type"), _ => panic!("Invalid message type"),
} }
} }
fn init_network(nodes: Vec<String>) -> Network { fn init_network(nodes: Vec<String>) -> Network {
let mut neighbours = Network::default(); let mut neighbours = Network::default();
neighbours.0.extend(nodes); neighbours.0.extend(nodes);
neighbours neighbours
} }
pub(crate) fn get_network(&self) -> Vec<String> { pub(crate) fn get_network(&self) -> Vec<String> {
self.network.0.clone().into_iter().collect::<Vec<_>>() self.network.0.clone().into_iter().collect::<Vec<_>>()
} }
} }

View file

@ -1,85 +1,86 @@
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Clone, Debug, Default)] #[derive(Serialize, Deserialize, Clone, Debug, Default)]
pub(crate) struct Messages(pub(crate) HashSet<u64>); pub(crate) struct Messages(pub(crate) HashSet<u64>);
#[derive(Serialize, Deserialize, Clone, Debug, Default)] #[derive(Serialize, Deserialize, Clone, Debug, Default)]
pub(crate) struct Storage { pub(crate) struct Storage {
pub(crate) messages: Messages, pub(crate) messages: Messages,
pub(crate) received_gossip_messages: HashMap<String, Messages>, pub(crate) received_gossip_messages: HashMap<String, Messages>,
pub(crate) sent_messages: HashMap<String, Messages>, pub(crate) sent_messages: HashMap<String, Messages>,
pub(crate) retry: HashMap<String, u8>, pub(crate) retry: HashMap<String, u8>,
} }
impl Storage { impl Storage {
pub(crate) fn add_message(&mut self, message: u64) { pub(crate) fn add_message(&mut self, message: u64) {
if !self.messages.0.contains(&message) { if !self.messages.0.contains(&message) {
self.messages.0.insert(message); self.messages.0.insert(message);
} }
} }
pub(crate) fn add_messages(&mut self, messages: Vec<u64>, node: String) { pub(crate) fn add_messages(&mut self, messages: Vec<u64>, node: String) {
if self.received_gossip_messages.contains_key(&node) { if self.received_gossip_messages.contains_key(&node) {
self.received_gossip_messages self.received_gossip_messages
.get_mut(&node) .get_mut(&node)
.unwrap() .unwrap()
.0 .0
.extend(messages.iter()); .extend(messages.iter());
} else { } else {
let mut v = Messages::default(); let mut v = Messages::default();
v.0.extend(messages.iter()); v.0.extend(messages.iter());
self.received_gossip_messages.insert(node, v); self.received_gossip_messages.insert(node, v);
} }
for m in messages { for m in messages {
if !self.messages.0.contains(&m) { if !self.messages.0.contains(&m) {
self.messages.0.insert(m); self.messages.0.insert(m);
} }
} }
} }
pub(crate) fn get_messages(&mut self) -> Vec<u64> { pub(crate) fn get_messages(&mut self) -> Vec<u64> {
self.messages.0.iter().cloned().collect() self.messages.0.iter().cloned().collect()
} }
pub(crate) fn get_new_messages_for_neighbour(&self, node: String) -> Vec<u64> { pub(crate) fn get_new_messages_for_neighbour(&self, node: String) -> Vec<u64> {
let received_messages = self.messages.0.clone().into_iter().collect::<Vec<_>>(); let received_messages = self.messages.0.clone().into_iter().collect::<Vec<_>>();
let sent_to_node: Vec<u64> = self let sent_to_node: Vec<u64> = self
.sent_messages .sent_messages
.iter() .iter()
.filter(|(key, _)| *key == &node) .filter(|(key, _)| *key == &node)
.flat_map(|(_, Messages(value))| value) .flat_map(|(_, Messages(value))| value)
.cloned() .cloned()
.collect(); .collect();
let received_from_node: Vec<u64> = self let received_from_node: Vec<u64> = self
.received_gossip_messages .received_gossip_messages
.iter() .iter()
.filter(|(key, _)| *key == &node) .filter(|(key, _)| *key == &node)
.flat_map(|(_, Messages(value))| value) .flat_map(|(_, Messages(value))| value)
.cloned() .cloned()
.collect(); .collect();
let filtered_messages: Vec<u64> = received_messages let filtered_messages: Vec<u64> = received_messages
.into_iter() .into_iter()
.filter(|x| !sent_to_node.contains(x) && !received_from_node.contains(x)) .filter(|x| !sent_to_node.contains(x) && !received_from_node.contains(x))
.collect(); .collect();
filtered_messages filtered_messages
} }
pub(crate) fn add_to_sent_messages(&mut self, messages: Vec<u64>, node: String) { pub(crate) fn add_to_sent_messages(&mut self, messages: Vec<u64>, node: String) {
if self.sent_messages.contains_key(&node) { if self.sent_messages.contains_key(&node) {
self.sent_messages self.sent_messages
.get_mut(&node) .get_mut(&node)
.unwrap() .unwrap()
.0 .0
.extend(messages); .extend(messages);
} else { } else {
self.sent_messages self.sent_messages
.insert(node, Messages(messages.iter().cloned().collect())); .insert(node, Messages(messages.iter().cloned().collect()));
} }
} }
} }