Msg per node+oepration: 13, max latency: 500

This commit is contained in:
Bastian Gruber 2023-05-16 08:56:05 +02:00
parent de23b881a7
commit 3d1dd23b9b
No known key found for this signature in database
GPG key ID: BE9F8C772B188CBF
5 changed files with 480 additions and 120 deletions

View file

@ -2,6 +2,24 @@
# It is not intended for manual editing.
version = 3
[[package]]
name = "autocfg"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
[[package]]
name = "bitflags"
version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]]
name = "bytes"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "89b2fd2a0dcf38d7971e2194b6b6eebab45ae01067456a7fd93d5547a61b70be"
[[package]]
name = "cfg-if"
version = "1.0.0"
@ -15,6 +33,7 @@ dependencies = [
"rand",
"serde",
"serde_json",
"tokio",
]
[[package]]
@ -28,6 +47,15 @@ dependencies = [
"wasi",
]
[[package]]
name = "hermit-abi"
version = "0.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ee512640fe35acbfb4bb779db6f0d80704c2cacfa2e39b601ef3e3f47d1ae4c7"
dependencies = [
"libc",
]
[[package]]
name = "itoa"
version = "1.0.6"
@ -40,6 +68,76 @@ version = "0.2.144"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2b00cc1c228a6782d0f076e7b232802e0c5689d41bb5df366f2a6b6621cfdfe1"
[[package]]
name = "lock_api"
version = "0.4.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "435011366fe56583b16cf956f9df0095b405b82d76425bc8981c0e22e60ec4df"
dependencies = [
"autocfg",
"scopeguard",
]
[[package]]
name = "log"
version = "0.4.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "abb12e687cfb44aa40f41fc3978ef76448f9b6038cad6aef4259d3c095a2382e"
dependencies = [
"cfg-if",
]
[[package]]
name = "mio"
version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b9d9a46eff5b4ff64b45a9e316a6d1e0bc719ef429cbec4dc630684212bfdf9"
dependencies = [
"libc",
"log",
"wasi",
"windows-sys 0.45.0",
]
[[package]]
name = "num_cpus"
version = "1.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0fac9e2da13b5eb447a6ce3d392f23a29d8694bff781bf03a16cd9ac8697593b"
dependencies = [
"hermit-abi",
"libc",
]
[[package]]
name = "parking_lot"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f"
dependencies = [
"lock_api",
"parking_lot_core",
]
[[package]]
name = "parking_lot_core"
version = "0.9.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9069cbb9f99e3a5083476ccb29ceb1de18b9118cafa53e90c9551235de2b9521"
dependencies = [
"cfg-if",
"libc",
"redox_syscall",
"smallvec",
"windows-sys 0.45.0",
]
[[package]]
name = "pin-project-lite"
version = "0.2.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e0a7ae3ac2f1173085d398531c705756c94a4c56843785df85a60c1a0afac116"
[[package]]
name = "ppv-lite86"
version = "0.2.17"
@ -94,12 +192,27 @@ dependencies = [
"getrandom",
]
[[package]]
name = "redox_syscall"
version = "0.2.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fb5a58c1855b4b6819d59012155603f0b22ad30cad752600aadfcb695265519a"
dependencies = [
"bitflags",
]
[[package]]
name = "ryu"
version = "1.0.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f91339c0467de62360649f8d3e185ca8de4224ff281f66000de5eb2a77a79041"
[[package]]
name = "scopeguard"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd"
[[package]]
name = "serde"
version = "1.0.160"
@ -131,6 +244,31 @@ dependencies = [
"serde",
]
[[package]]
name = "signal-hook-registry"
version = "1.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d8229b473baa5980ac72ef434c4415e70c4b5e71b423043adb4ba059f89c99a1"
dependencies = [
"libc",
]
[[package]]
name = "smallvec"
version = "1.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a507befe795404456341dfab10cef66ead4c041f62b8b11bbb92bffe5d0953e0"
[[package]]
name = "socket2"
version = "0.4.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "64a4a911eed85daf18834cfaa86a79b7d266ff93ff5ba14005426219480ed662"
dependencies = [
"libc",
"winapi",
]
[[package]]
name = "syn"
version = "2.0.15"
@ -142,6 +280,36 @@ dependencies = [
"unicode-ident",
]
[[package]]
name = "tokio"
version = "1.28.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0aa32867d44e6f2ce3385e89dceb990188b8bb0fb25b0cf576647a6f98ac5105"
dependencies = [
"autocfg",
"bytes",
"libc",
"mio",
"num_cpus",
"parking_lot",
"pin-project-lite",
"signal-hook-registry",
"socket2",
"tokio-macros",
"windows-sys 0.48.0",
]
[[package]]
name = "tokio-macros"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "unicode-ident"
version = "1.0.8"
@ -153,3 +321,157 @@ name = "wasi"
version = "0.11.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
[[package]]
name = "winapi"
version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419"
dependencies = [
"winapi-i686-pc-windows-gnu",
"winapi-x86_64-pc-windows-gnu",
]
[[package]]
name = "winapi-i686-pc-windows-gnu"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
[[package]]
name = "winapi-x86_64-pc-windows-gnu"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
[[package]]
name = "windows-sys"
version = "0.45.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "75283be5efb2831d37ea142365f009c02ec203cd29a3ebecbc093d52315b66d0"
dependencies = [
"windows-targets 0.42.2",
]
[[package]]
name = "windows-sys"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9"
dependencies = [
"windows-targets 0.48.0",
]
[[package]]
name = "windows-targets"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e5180c00cd44c9b1c88adb3693291f1cd93605ded80c250a75d472756b4d071"
dependencies = [
"windows_aarch64_gnullvm 0.42.2",
"windows_aarch64_msvc 0.42.2",
"windows_i686_gnu 0.42.2",
"windows_i686_msvc 0.42.2",
"windows_x86_64_gnu 0.42.2",
"windows_x86_64_gnullvm 0.42.2",
"windows_x86_64_msvc 0.42.2",
]
[[package]]
name = "windows-targets"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b1eb6f0cd7c80c79759c929114ef071b87354ce476d9d94271031c0497adfd5"
dependencies = [
"windows_aarch64_gnullvm 0.48.0",
"windows_aarch64_msvc 0.48.0",
"windows_i686_gnu 0.48.0",
"windows_i686_msvc 0.48.0",
"windows_x86_64_gnu 0.48.0",
"windows_x86_64_gnullvm 0.48.0",
"windows_x86_64_msvc 0.48.0",
]
[[package]]
name = "windows_aarch64_gnullvm"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "597a5118570b68bc08d8d59125332c54f1ba9d9adeedeef5b99b02ba2b0698f8"
[[package]]
name = "windows_aarch64_gnullvm"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "91ae572e1b79dba883e0d315474df7305d12f569b400fcf90581b06062f7e1bc"
[[package]]
name = "windows_aarch64_msvc"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e08e8864a60f06ef0d0ff4ba04124db8b0fb3be5776a5cd47641e942e58c4d43"
[[package]]
name = "windows_aarch64_msvc"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b2ef27e0d7bdfcfc7b868b317c1d32c641a6fe4629c171b8928c7b08d98d7cf3"
[[package]]
name = "windows_i686_gnu"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c61d927d8da41da96a81f029489353e68739737d3beca43145c8afec9a31a84f"
[[package]]
name = "windows_i686_gnu"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "622a1962a7db830d6fd0a69683c80a18fda201879f0f447f065a3b7467daa241"
[[package]]
name = "windows_i686_msvc"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "44d840b6ec649f480a41c8d80f9c65108b92d89345dd94027bfe06ac444d1060"
[[package]]
name = "windows_i686_msvc"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4542c6e364ce21bf45d69fdd2a8e455fa38d316158cfd43b3ac1c5b1b19f8e00"
[[package]]
name = "windows_x86_64_gnu"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8de912b8b8feb55c064867cf047dda097f92d51efad5b491dfb98f6bbb70cb36"
[[package]]
name = "windows_x86_64_gnu"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ca2b8a661f7628cbd23440e50b05d705db3686f894fc9580820623656af974b1"
[[package]]
name = "windows_x86_64_gnullvm"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "26d41b46a36d453748aedef1486d5c7a85db22e56aff34643984ea85514e94a3"
[[package]]
name = "windows_x86_64_gnullvm"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7896dbc1f41e08872e9d5e8f8baa8fdd2677f29468c4e156210174edc7f7b953"
[[package]]
name = "windows_x86_64_msvc"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9aec5da331524158c6d1a4ac0ab1541149c0b9505fde06423b02f5ef0106b9f0"
[[package]]
name = "windows_x86_64_msvc"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1a515f5799fe4961cb532f983ce2b23082366b898e52ffbce459c86f67c8378a"

View file

@ -9,3 +9,4 @@ edition = "2021"
rand = "0.8.5"
serde = {version = "1", features = ["derive"] }
serde_json = "1"
tokio = { version = "1.28.1", features = ["full"] }

View file

@ -4,81 +4,121 @@ mod storage;
use crate::message::{Body, Message};
use crate::node::Node;
use crate::storage::Storage;
use std::io::prelude::*;
use std::io::{BufReader, Write};
use std::sync::{
mpsc,
mpsc::{Receiver, Sender},
Arc, Mutex,
};
use std::io::Write;
use std::sync::Arc;
use std::time::Duration;
use std::{println, thread};
use tokio::io::AsyncBufReadExt;
use tokio::io::BufReader;
use tokio::sync::{
mpsc,
mpsc::{Receiver, Sender},
Mutex,
};
fn main() {
let (reader_tx, mut reader_rx) = mpsc::channel();
let (writer_tx, mut writer_rx) = mpsc::channel();
let node = Arc::new(Mutex::new(Node::default()));
let n1 = node.clone();
let n2 = node.clone();
#[tokio::main]
async fn main() {
let (reader_tx, mut reader_rx) = mpsc::channel(1000);
let (writer_tx, mut writer_rx) = mpsc::channel(1000);
let reader_tx1: Sender<Message> = reader_tx.clone();
let writer_tx1: Sender<Message> = writer_tx.clone();
let writer_tx2: Sender<Message> = writer_tx.clone();
let read = thread::spawn(move || {
read_from_stdin(reader_tx1);
let node = Node::default();
let store = Arc::new(Mutex::new(Storage::default()));
let node = init_node(node).await;
let n1 = node.clone();
let s1 = store.clone();
let read = tokio::spawn(async move {
read_from_stdin(reader_tx1).await;
});
let write = thread::spawn(move || {
write_to_stdout(&mut writer_rx);
let write = tokio::spawn(async move {
write_to_stdout(&mut writer_rx).await;
});
let gossip = thread::spawn(move || loop {
thread::sleep(Duration::from_millis(100));
gossip_messages(n1.clone(), writer_tx2.clone());
let gossip = tokio::spawn(async move {
loop {
thread::sleep(Duration::from_millis(25));
gossip_messages(n1.clone(), s1.clone(), writer_tx2.clone()).await;
}
});
let handle = thread::spawn(move || {
handle_messages(n2, &mut reader_rx, writer_tx1);
let handle = tokio::spawn(async move {
handle_messages(node.clone(), store.clone(), &mut reader_rx, writer_tx1).await;
});
let _ = handle.join();
let _ = write.join();
let _ = gossip.join();
let _ = read.join();
let _ = tokio::try_join!(read, handle, write, gossip);
}
fn read_from_stdin(reader_tx: Sender<Message>) {
let stdin = std::io::stdin();
let mut reader = BufReader::new(stdin.lock());
async fn init_node(node: Node) -> Node {
let stdin = tokio::io::stdin();
let mut stdout = std::io::stdout();
let mut reader = BufReader::new(stdin);
let mut buf = String::new();
reader.read_line(&mut buf).await.unwrap();
let message = Message::parse_message(buf.clone());
let node = node.init(message.clone());
match message.body {
Body::Init {
msg_id, node_id, ..
} => {
let response = Message {
src: node_id,
dest: message.src.clone(),
body: Body::InitOk {
in_reply_to: msg_id,
},
};
let message = Message::format_message(response);
writeln!(stdout, "{}", message).unwrap();
stdout.flush().unwrap();
}
_ => (),
}
node
}
async fn read_from_stdin(reader_tx: Sender<Message>) {
let stdin = tokio::io::stdin();
let mut reader = BufReader::new(stdin);
loop {
let mut buf = String::new();
reader.read_line(&mut buf).unwrap();
reader.read_line(&mut buf).await.unwrap();
let message = Message::parse_message(buf.clone());
reader_tx.send(message).unwrap();
reader_tx.send(message).await.unwrap();
}
}
fn write_to_stdout(writer_rx: &mut Receiver<Message>) {
async fn write_to_stdout(writer_rx: &mut Receiver<Message>) {
let mut stdout = std::io::stdout();
loop {
let message = writer_rx.recv().unwrap();
let message = writer_rx.recv().await.unwrap();
let message = Message::format_message(message);
writeln!(stdout, "{}", message).unwrap();
stdout.flush().unwrap();
}
}
fn gossip_messages(node: Arc<Mutex<Node>>, writer: Sender<Message>) {
let node = node.lock().unwrap();
for n in node.storage.get_neighbours() {
let messages = node.storage.get_messages_for_node(n.clone());
async fn gossip_messages(node: Node, storage: Arc<Mutex<Storage>>, writer: Sender<Message>) {
for n in node.get_neighbours() {
let messages = storage.lock().await.get_messages_for_node(n.clone());
if messages.len() == 0 {
if messages.len() < 1 || storage.lock().await.get_retries(n.clone()) < 2 {
storage.lock().await.increase_or_insert(n);
continue;
}
@ -90,39 +130,22 @@ fn gossip_messages(node: Arc<Mutex<Node>>, writer: Sender<Message>) {
},
};
writer.send(message).unwrap();
storage.lock().await.decrease_or_remove(n);
writer.send(message).await.unwrap();
}
}
fn handle_messages(node: Arc<Mutex<Node>>, input: &mut Receiver<Message>, writer: Sender<Message>) {
while let Ok(input) = input.recv() {
async fn handle_messages(
node: Node,
storage: Arc<Mutex<Storage>>,
input: &mut Receiver<Message>,
writer: Sender<Message>,
) {
while let Some(input) = input.recv().await {
match input.body {
Body::Init {
msg_id,
ref node_id,
ref node_ids,
..
} => {
let mut node = node.lock().unwrap();
node.init(input.clone());
node.storage.init_topology(node_id.clone(), &node_ids);
let response = Message {
src: node_id.clone(),
dest: input.src,
body: Body::InitOk {
in_reply_to: msg_id,
},
};
writer.send(response).unwrap();
}
Body::Broadcast { msg_id, message } => {
let id = node.lock().unwrap().get_id();
node.lock()
.unwrap()
.storage
.add_message(message, id.clone());
let id = node.id.clone();
storage.lock().await.add_message(message, id.clone());
let response = Message {
src: id,
@ -133,12 +156,12 @@ fn handle_messages(node: Arc<Mutex<Node>>, input: &mut Receiver<Message>, writer
},
};
writer.send(response).unwrap();
writer.send(response).await.unwrap();
}
Body::Gossip { messages } => {
let id = node.lock().unwrap().get_id();
let id = node.id.clone();
for m in messages.iter() {
node.lock().unwrap().storage.add_message(*m, id.clone());
storage.lock().await.add_message(*m, id.clone());
}
let response = Message {
@ -147,35 +170,30 @@ fn handle_messages(node: Arc<Mutex<Node>>, input: &mut Receiver<Message>, writer
body: Body::GossipOk { messages },
};
writer.send(response).unwrap();
writer.send(response).await.unwrap();
}
Body::GossipOk { messages } => {
let id = node.lock().unwrap().get_id();
node.lock()
.unwrap()
.storage
.add_to_sent_messages(messages, id.clone());
storage
.lock()
.await
.add_to_sent_messages(messages, node.id.clone());
}
Body::Read { msg_id } => {
let id = node.lock().unwrap().get_id();
let response = Message {
src: id,
src: node.id.clone(),
dest: input.src,
body: Body::ReadOk {
msg_id,
in_reply_to: msg_id,
messages: node.lock().unwrap().storage.get_messages(),
messages: storage.lock().await.get_messages(),
},
};
writer.send(response).unwrap();
writer.send(response).await.unwrap();
}
Body::Topology { msg_id, .. } => {
let id = node.lock().unwrap().get_id();
let response = Message {
src: id,
src: node.id.clone(),
dest: input.src,
body: Body::TopologyOk {
msg_id,
@ -183,7 +201,7 @@ fn handle_messages(node: Arc<Mutex<Node>>, input: &mut Receiver<Message>, writer
},
};
writer.send(response).unwrap();
writer.send(response).await.unwrap();
}
Body::Error {
in_reply_to,

View file

@ -1,29 +1,47 @@
use rand::seq::SliceRandom;
use rand::thread_rng;
use serde::{Deserialize, Serialize};
use crate::message::{Body, Message};
use crate::storage::Storage;
use std::collections::HashSet;
#[derive(Serialize, Deserialize, Debug, Default)]
#[derive(Serialize, Deserialize, Clone, Debug, Default)]
pub(crate) struct Neighbours(pub(crate) HashSet<String>);
#[derive(Serialize, Deserialize, Clone, Debug, Default)]
pub(crate) struct Node {
pub(crate) id: String,
pub(crate) availble_nodes: Vec<String>,
pub(crate) storage: Storage,
pub(crate) neighbours: Neighbours,
}
impl Node {
pub(crate) fn init(&mut self, message: Message) {
pub(crate) fn init(&self, message: Message) -> Node {
match message.body {
Body::Init {
node_id, node_ids, ..
} => {
self.id = node_id.clone();
self.availble_nodes = node_ids.clone();
return Node {
id: node_id.clone(),
availble_nodes: node_ids.clone(),
neighbours: self.init_topology(node_ids),
}
}
_ => panic!("Invalid message type"),
}
}
pub(crate) fn get_id(&self) -> String {
self.id.clone()
fn init_topology(&self, nodes: Vec<String>) -> Neighbours {
let mut neighbours = Neighbours::default();
let mut rng = thread_rng();
let selections: Vec<String> = nodes.choose_multiple(&mut rng, 9).cloned().collect();
neighbours.0.extend(selections);
neighbours
}
pub(crate) fn get_neighbours(&self) -> HashSet<String> {
self.neighbours.0.clone()
}
}

View file

@ -1,20 +1,15 @@
use rand::seq::SliceRandom;
use rand::thread_rng;
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
#[derive(Serialize, Deserialize, Debug, Default)]
pub(crate) struct Neighbours(pub(crate) HashSet<String>);
#[derive(Serialize, Deserialize, Debug, Default)]
#[derive(Serialize, Deserialize, Clone, Debug, Default)]
pub(crate) struct Messages(pub(crate) HashSet<u64>);
#[derive(Serialize, Deserialize, Debug, Default)]
#[derive(Serialize, Deserialize, Clone, Debug, Default)]
pub(crate) struct Storage {
pub(crate) messages: Messages,
pub(crate) received_messages: HashMap<String, Messages>,
pub(crate) sent_messages: HashMap<String, Messages>,
pub(crate) neighbours: Neighbours,
pub(crate) retry: HashMap<String, u8>,
}
impl Storage {
@ -38,6 +33,30 @@ impl Storage {
self.messages.0.clone().into_iter().collect()
}
pub(crate) fn get_retries(&self, node: String) -> u8 {
match self.retry.get(&node) {
Some(count) => *count,
None => 0,
}
}
pub(crate) fn increase_or_insert(&mut self, node: String) {
let count = self.retry.entry(node).or_insert(0);
*count += 1;
}
pub(crate) fn decrease_or_remove(&mut self, node: String) {
match self.retry.get_mut(&node) {
Some(count) => {
*count -= 1;
if *count == 0 {
self.retry.remove(&node);
}
}
None => (),
}
}
pub(crate) fn get_messages_for_node(&self, node: String) -> Vec<u64> {
let received: Vec<u64> = self
.received_messages
@ -75,22 +94,4 @@ impl Storage {
.insert(node, Messages(messages.iter().cloned().collect()));
}
}
pub(crate) fn init_topology(&mut self, node_id: String, nodes: &Vec<String>) {
let i = nodes.iter().position(|x| *x == node_id).unwrap();
let left_neighbor = nodes[(i + nodes.len() - 1) % nodes.len()].clone();
let right_neighbor = nodes[(i + 1) % nodes.len()].clone();
let mut rng = thread_rng();
let selections: Vec<String> = nodes.choose_multiple(&mut rng, 2).cloned().collect();
self.neighbours.0.extend(selections);
self.neighbours.0.insert(left_neighbor);
self.neighbours.0.insert(right_neighbor);
}
pub(crate) fn get_neighbours(&self) -> HashSet<String> {
self.neighbours.0.clone()
}
}