Hit target latency with "small-network-topolgy"

This commit is contained in:
Bastian Gruber 2023-05-15 22:27:42 +02:00
parent 69f2b288e2
commit de23b881a7
No known key found for this signature in database
GPG key ID: BE9F8C772B188CBF
5 changed files with 115 additions and 27 deletions

View file

@ -2,20 +2,50 @@
# It is not intended for manual editing.
version = 3
[[package]]
name = "cfg-if"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "ch03d-efficient-broadcast-part-one"
version = "0.1.0"
dependencies = [
"rand",
"serde",
"serde_json",
]
[[package]]
name = "getrandom"
version = "0.2.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c85e1d9ab2eadba7e5040d4e09cbd6d072b76a557ad64e797c2cb9d4da21d7e4"
dependencies = [
"cfg-if",
"libc",
"wasi",
]
[[package]]
name = "itoa"
version = "1.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "453ad9f582a441959e5f0d088b02ce04cfe8d51a8eaf077f12ac6d3e94164ca6"
[[package]]
name = "libc"
version = "0.2.144"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2b00cc1c228a6782d0f076e7b232802e0c5689d41bb5df366f2a6b6621cfdfe1"
[[package]]
name = "ppv-lite86"
version = "0.2.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de"
[[package]]
name = "proc-macro2"
version = "1.0.56"
@ -34,6 +64,36 @@ dependencies = [
"proc-macro2",
]
[[package]]
name = "rand"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404"
dependencies = [
"libc",
"rand_chacha",
"rand_core",
]
[[package]]
name = "rand_chacha"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88"
dependencies = [
"ppv-lite86",
"rand_core",
]
[[package]]
name = "rand_core"
version = "0.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c"
dependencies = [
"getrandom",
]
[[package]]
name = "ryu"
version = "1.0.13"
@ -87,3 +147,9 @@ name = "unicode-ident"
version = "1.0.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5464a87b239f13a63a501f2701565754bae92d243d4bb7eb12f6d57d2269bf4"
[[package]]
name = "wasi"
version = "0.11.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"

View file

@ -6,5 +6,6 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
rand = "0.8.5"
serde = {version = "1", features = ["derive"] }
serde_json = "1"

View file

@ -12,8 +12,8 @@ use std::sync::{
mpsc::{Receiver, Sender},
Arc, Mutex,
};
use std::thread;
use std::time::Duration;
use std::{println, thread};
fn main() {
let (reader_tx, mut reader_rx) = mpsc::channel();
@ -37,7 +37,7 @@ fn main() {
});
let gossip = thread::spawn(move || loop {
thread::sleep(Duration::from_secs(1));
thread::sleep(Duration::from_millis(100));
gossip_messages(n1.clone(), writer_tx2.clone());
});
@ -75,9 +75,13 @@ fn write_to_stdout(writer_rx: &mut Receiver<Message>) {
fn gossip_messages(node: Arc<Mutex<Node>>, writer: Sender<Message>) {
let node = node.lock().unwrap();
if let Some(neighbours) = node.storage.get_neighbours(&node.get_id()) {
for n in neighbours {
for n in node.storage.get_neighbours() {
let messages = node.storage.get_messages_for_node(n.clone());
if messages.len() == 0 {
continue;
}
let message = Message {
src: node.id.clone(),
dest: n.clone(),
@ -89,16 +93,22 @@ fn gossip_messages(node: Arc<Mutex<Node>>, writer: Sender<Message>) {
writer.send(message).unwrap();
}
}
}
fn handle_messages(node: Arc<Mutex<Node>>, input: &mut Receiver<Message>, writer: Sender<Message>) {
while let Ok(input) = input.recv() {
match input.body {
Body::Init { msg_id, .. } => {
node.lock().unwrap().init(input.clone());
let id = node.lock().unwrap().get_id();
Body::Init {
msg_id,
ref node_id,
ref node_ids,
..
} => {
let mut node = node.lock().unwrap();
node.init(input.clone());
node.storage.init_topology(node_id.clone(), &node_ids);
let response = Message {
src: id,
src: node_id.clone(),
dest: input.src,
body: Body::InitOk {
in_reply_to: msg_id,
@ -161,9 +171,8 @@ fn handle_messages(node: Arc<Mutex<Node>>, input: &mut Receiver<Message>, writer
writer.send(response).unwrap();
}
Body::Topology { msg_id, topology } => {
Body::Topology { msg_id, .. } => {
let id = node.lock().unwrap().get_id();
node.lock().unwrap().storage.init_topology(topology);
let response = Message {
src: id,

View file

@ -16,8 +16,8 @@ impl Node {
Body::Init {
node_id, node_ids, ..
} => {
self.id = node_id;
self.availble_nodes = node_ids;
self.id = node_id.clone();
self.availble_nodes = node_ids.clone();
}
_ => panic!("Invalid message type"),
}

View file

@ -1,8 +1,10 @@
use rand::seq::SliceRandom;
use rand::thread_rng;
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
#[derive(Serialize, Deserialize, Debug, Default)]
pub(crate) struct Topology(pub(crate) HashMap<String, Vec<String>>);
pub(crate) struct Neighbours(pub(crate) HashSet<String>);
#[derive(Serialize, Deserialize, Debug, Default)]
pub(crate) struct Messages(pub(crate) HashSet<u64>);
@ -12,7 +14,7 @@ pub(crate) struct Storage {
pub(crate) messages: Messages,
pub(crate) received_messages: HashMap<String, Messages>,
pub(crate) sent_messages: HashMap<String, Messages>,
pub(crate) topology: Topology,
pub(crate) neighbours: Neighbours,
}
impl Storage {
@ -74,11 +76,21 @@ impl Storage {
}
}
pub(crate) fn init_topology(&mut self, topology: HashMap<String, Vec<String>>) {
self.topology.0 = topology;
pub(crate) fn init_topology(&mut self, node_id: String, nodes: &Vec<String>) {
let i = nodes.iter().position(|x| *x == node_id).unwrap();
let left_neighbor = nodes[(i + nodes.len() - 1) % nodes.len()].clone();
let right_neighbor = nodes[(i + 1) % nodes.len()].clone();
let mut rng = thread_rng();
let selections: Vec<String> = nodes.choose_multiple(&mut rng, 2).cloned().collect();
self.neighbours.0.extend(selections);
self.neighbours.0.insert(left_neighbor);
self.neighbours.0.insert(right_neighbor);
}
pub(crate) fn get_neighbours(&self, node_id: &str) -> Option<Vec<String>> {
self.topology.0.get(node_id).cloned()
pub(crate) fn get_neighbours(&self) -> HashSet<String> {
self.neighbours.0.clone()
}
}