This commit is contained in:
Bastian Gruber 2023-05-16 19:00:50 +02:00
parent 1923670da8
commit 877762521f
No known key found for this signature in database
GPG key ID: BE9F8C772B188CBF
2 changed files with 40 additions and 38 deletions

View file

@ -33,10 +33,9 @@ async fn main() {
let writer_tx1: Sender<Message> = writer_tx.clone();
let writer_tx2: Sender<Message> = writer_tx.clone();
let node = Node::default();
let store = Arc::new(Mutex::new(Storage::default()));
let node = init_node(node).await;
let node = Node::bootstrap().await;
let n1 = node.clone();
let s1 = store.clone();
@ -63,39 +62,6 @@ async fn main() {
let _ = tokio::try_join!(read, handle, write, gossip);
}
async fn init_node(node: Node) -> Node {
let stdin = tokio::io::stdin();
let mut stdout = std::io::stdout();
let mut reader = BufReader::new(stdin);
let mut buf = String::new();
reader.read_line(&mut buf).await.unwrap();
let message = Message::parse_message(buf.clone());
let node = node.init(message.clone());
match message.body {
Body::Init {
msg_id, node_id, ..
} => {
let response = Message {
src: node_id,
dest: message.src.clone(),
body: Body::InitOk {
in_reply_to: msg_id,
},
};
let message = Message::format_message(response);
writeln!(stdout, "{}", message).unwrap();
stdout.flush().unwrap();
}
_ => (),
}
node
}
async fn read_from_stdin(reader_tx: Sender<Message>) {
let stdin = tokio::io::stdin();
let mut reader = BufReader::new(stdin);

View file

@ -1,6 +1,9 @@
use crate::message::{Body, Message};
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use std::io::Write;
use tokio::io::AsyncBufReadExt;
use tokio::io::BufReader;
#[derive(Serialize, Deserialize, Clone, Debug, Default)]
pub(crate) struct Network(pub(crate) HashSet<String>);
@ -13,7 +16,40 @@ pub(crate) struct Node {
}
impl Node {
pub(crate) fn init(&self, message: Message) -> Node {
pub(crate) async fn bootstrap() -> Node {
let stdin = tokio::io::stdin();
let mut stdout = std::io::stdout();
let mut reader = BufReader::new(stdin);
let mut buf = String::new();
reader.read_line(&mut buf).await.unwrap();
let message = Message::parse_message(buf.clone());
let node = Node::init(message.clone());
match message.body {
Body::Init {
msg_id, node_id, ..
} => {
let response = Message {
src: node_id,
dest: message.src.clone(),
body: Body::InitOk {
in_reply_to: msg_id,
},
};
let message = Message::format_message(response);
writeln!(stdout, "{}", message).unwrap();
stdout.flush().unwrap();
}
_ => (),
}
node
}
pub(crate) fn init(message: Message) -> Node {
match message.body {
Body::Init {
node_id, node_ids, ..
@ -21,14 +57,14 @@ impl Node {
return Node {
id: node_id.clone(),
availble_nodes: node_ids.clone(),
network: self.init_network(node_ids),
network: Node::init_network(node_ids),
}
}
_ => panic!("Invalid message type"),
}
}
fn init_network(&self, nodes: Vec<String>) -> Network {
fn init_network(nodes: Vec<String>) -> Network {
let mut neighbours = Network::default();
neighbours.0.extend(nodes);
neighbours