msg-ops below 60, max latency below 600

This commit is contained in:
Bastian Gruber 2023-05-16 18:04:32 +02:00
parent 3d1dd23b9b
commit d7b6314280
No known key found for this signature in database
GPG key ID: BE9F8C772B188CBF
3 changed files with 97 additions and 83 deletions

View file

@ -6,6 +6,8 @@ use crate::message::{Body, Message};
use crate::node::Node; use crate::node::Node;
use crate::storage::Storage; use crate::storage::Storage;
use rand::prelude::*;
use rand::rngs::StdRng;
use std::io::Write; use std::io::Write;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
@ -18,6 +20,10 @@ use tokio::sync::{
Mutex, Mutex,
}; };
const GOSSIP_DELAY: u64 = 150;
const MIN_AMOUNT_NODES: usize = 4;
const NETWORK_SIZE: usize = 25;
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
let (reader_tx, mut reader_rx) = mpsc::channel(1000); let (reader_tx, mut reader_rx) = mpsc::channel(1000);
@ -45,7 +51,7 @@ async fn main() {
let gossip = tokio::spawn(async move { let gossip = tokio::spawn(async move {
loop { loop {
thread::sleep(Duration::from_millis(25)); thread::sleep(Duration::from_millis(GOSSIP_DELAY));
gossip_messages(n1.clone(), s1.clone(), writer_tx2.clone()).await; gossip_messages(n1.clone(), s1.clone(), writer_tx2.clone()).await;
} }
}); });
@ -114,24 +120,50 @@ async fn write_to_stdout(writer_rx: &mut Receiver<Message>) {
} }
async fn gossip_messages(node: Node, storage: Arc<Mutex<Storage>>, writer: Sender<Message>) { async fn gossip_messages(node: Node, storage: Arc<Mutex<Storage>>, writer: Sender<Message>) {
for n in node.get_neighbours() { let mut rng = StdRng::from_entropy();
let messages = storage.lock().await.get_messages_for_node(n.clone());
if messages.len() < 1 || storage.lock().await.get_retries(n.clone()) < 2 { let num_to_select = rng.gen_range(MIN_AMOUNT_NODES..=NETWORK_SIZE);
storage.lock().await.increase_or_insert(n);
continue; let selected_neighbours: Vec<String> = node
.get_network()
.choose_multiple(&mut rng, num_to_select)
.cloned()
.collect();
let mut tasks = vec![];
for n in selected_neighbours {
let storage_clone = storage.clone();
let writer_clone = writer.clone();
let node_clone = node.clone();
let task = tokio::spawn(async move {
let messages = storage_clone
.lock()
.await
.get_new_messages_for_neighbour(n.clone());
if messages.is_empty() {
return;
} }
let message = Message { let message = Message {
src: node.id.clone(), src: node_clone.id.clone(),
dest: n.clone(), dest: n.clone(),
body: Body::Gossip { body: Body::Gossip {
messages: messages.clone(), messages: messages.clone(),
}, },
}; };
storage.lock().await.decrease_or_remove(n); writer_clone.send(message).await.unwrap();
writer.send(message).await.unwrap(); });
tasks.push(task);
}
// Wait for all the gossip tasks to complete
for task in tasks {
task.await.unwrap();
} }
} }
@ -145,7 +177,7 @@ async fn handle_messages(
match input.body { match input.body {
Body::Broadcast { msg_id, message } => { Body::Broadcast { msg_id, message } => {
let id = node.id.clone(); let id = node.id.clone();
storage.lock().await.add_message(message, id.clone()); storage.lock().await.add_message(message);
let response = Message { let response = Message {
src: id, src: id,
@ -160,9 +192,10 @@ async fn handle_messages(
} }
Body::Gossip { messages } => { Body::Gossip { messages } => {
let id = node.id.clone(); let id = node.id.clone();
for m in messages.iter() { storage
storage.lock().await.add_message(*m, id.clone()); .lock()
} .await
.add_messages(messages.clone(), input.src.clone());
let response = Message { let response = Message {
src: id, src: id,
@ -176,7 +209,7 @@ async fn handle_messages(
storage storage
.lock() .lock()
.await .await
.add_to_sent_messages(messages, node.id.clone()); .add_to_sent_messages(messages, input.src);
} }
Body::Read { msg_id } => { Body::Read { msg_id } => {
let response = Message { let response = Message {

View file

@ -1,18 +1,15 @@
use rand::seq::SliceRandom;
use rand::thread_rng;
use serde::{Deserialize, Serialize};
use crate::message::{Body, Message}; use crate::message::{Body, Message};
use serde::{Deserialize, Serialize};
use std::collections::HashSet; use std::collections::HashSet;
#[derive(Serialize, Deserialize, Clone, Debug, Default)] #[derive(Serialize, Deserialize, Clone, Debug, Default)]
pub(crate) struct Neighbours(pub(crate) HashSet<String>); pub(crate) struct Network(pub(crate) HashSet<String>);
#[derive(Serialize, Deserialize, Clone, Debug, Default)] #[derive(Serialize, Deserialize, Clone, Debug, Default)]
pub(crate) struct Node { pub(crate) struct Node {
pub(crate) id: String, pub(crate) id: String,
pub(crate) availble_nodes: Vec<String>, pub(crate) availble_nodes: Vec<String>,
pub(crate) neighbours: Neighbours, pub(crate) network: Network,
} }
impl Node { impl Node {
@ -24,24 +21,20 @@ impl Node {
return Node { return Node {
id: node_id.clone(), id: node_id.clone(),
availble_nodes: node_ids.clone(), availble_nodes: node_ids.clone(),
neighbours: self.init_topology(node_ids), network: self.init_network(node_ids),
} }
} }
_ => panic!("Invalid message type"), _ => panic!("Invalid message type"),
} }
} }
fn init_topology(&self, nodes: Vec<String>) -> Neighbours { fn init_network(&self, nodes: Vec<String>) -> Network {
let mut neighbours = Neighbours::default(); let mut neighbours = Network::default();
neighbours.0.extend(nodes);
let mut rng = thread_rng();
let selections: Vec<String> = nodes.choose_multiple(&mut rng, 9).cloned().collect();
neighbours.0.extend(selections);
neighbours neighbours
} }
pub(crate) fn get_neighbours(&self) -> HashSet<String> { pub(crate) fn get_network(&self) -> Vec<String> {
self.neighbours.0.clone() self.network.0.clone().into_iter().collect::<Vec<_>>()
} }
} }

View file

@ -7,66 +7,46 @@ pub(crate) struct Messages(pub(crate) HashSet<u64>);
#[derive(Serialize, Deserialize, Clone, Debug, Default)] #[derive(Serialize, Deserialize, Clone, Debug, Default)]
pub(crate) struct Storage { pub(crate) struct Storage {
pub(crate) messages: Messages, pub(crate) messages: Messages,
pub(crate) received_messages: HashMap<String, Messages>, pub(crate) received_gossip_messages: HashMap<String, Messages>,
pub(crate) sent_messages: HashMap<String, Messages>, pub(crate) sent_messages: HashMap<String, Messages>,
pub(crate) retry: HashMap<String, u8>, pub(crate) retry: HashMap<String, u8>,
} }
impl Storage { impl Storage {
pub(crate) fn add_message(&mut self, message: u64, node: String) { pub(crate) fn add_message(&mut self, message: u64) {
if !self.messages.0.contains(&message) {
self.messages.0.insert(message); self.messages.0.insert(message);
}
}
if self.received_messages.contains_key(&node) { pub(crate) fn add_messages(&mut self, messages: Vec<u64>, node: String) {
self.received_messages if self.received_gossip_messages.contains_key(&node) {
self.received_gossip_messages
.get_mut(&node) .get_mut(&node)
.unwrap() .unwrap()
.0 .0
.insert(message); .extend(messages.iter());
} else { } else {
let mut v = Messages::default(); let mut v = Messages::default();
v.0.insert(message); v.0.extend(messages.iter());
self.received_messages.insert(node, v); self.received_gossip_messages.insert(node, v);
}
for m in messages {
if !self.messages.0.contains(&m) {
self.messages.0.insert(m);
}
} }
} }
pub(crate) fn get_messages(&mut self) -> Vec<u64> { pub(crate) fn get_messages(&mut self) -> Vec<u64> {
self.messages.0.clone().into_iter().collect() self.messages.0.iter().cloned().collect()
} }
pub(crate) fn get_retries(&self, node: String) -> u8 { pub(crate) fn get_new_messages_for_neighbour(&self, node: String) -> Vec<u64> {
match self.retry.get(&node) { let received_messages = self.messages.0.clone().into_iter().collect::<Vec<_>>();
Some(count) => *count,
None => 0,
}
}
pub(crate) fn increase_or_insert(&mut self, node: String) { let sent_to_node: Vec<u64> = self
let count = self.retry.entry(node).or_insert(0);
*count += 1;
}
pub(crate) fn decrease_or_remove(&mut self, node: String) {
match self.retry.get_mut(&node) {
Some(count) => {
*count -= 1;
if *count == 0 {
self.retry.remove(&node);
}
}
None => (),
}
}
pub(crate) fn get_messages_for_node(&self, node: String) -> Vec<u64> {
let received: Vec<u64> = self
.received_messages
.iter()
.filter(|(key, _)| *key == &node)
.flat_map(|(_, Messages(value))| value)
.cloned()
.collect();
let sent: Vec<u64> = self
.sent_messages .sent_messages
.iter() .iter()
.filter(|(key, _)| *key == &node) .filter(|(key, _)| *key == &node)
@ -74,12 +54,20 @@ impl Storage {
.cloned() .cloned()
.collect(); .collect();
self.messages let received_from_node: Vec<u64> = self
.0 .received_gossip_messages
.iter() .iter()
.filter(|m| !received.contains(m) && !sent.contains(m)) .filter(|(key, _)| *key == &node)
.flat_map(|(_, Messages(value))| value)
.cloned() .cloned()
.collect() .collect();
let filtered_messages: Vec<u64> = received_messages
.into_iter()
.filter(|x| !sent_to_node.contains(x) && !received_from_node.contains(x))
.collect();
filtered_messages
} }
pub(crate) fn add_to_sent_messages(&mut self, messages: Vec<u64>, node: String) { pub(crate) fn add_to_sent_messages(&mut self, messages: Vec<u64>, node: String) {