From b151429a41287f1e1c268a167a64a2b608e66221 Mon Sep 17 00:00:00 2001 From: Bastian Gruber Date: Sat, 13 May 2023 06:21:48 +0200 Subject: [PATCH] Improve structure --- 03a-single-node-broadcast/src/connection.rs | 44 ++++ 03a-single-node-broadcast/src/main.rs | 227 ++++++++------------ 03a-single-node-broadcast/src/message.rs | 62 ++++++ 03a-single-node-broadcast/src/node.rs | 28 +++ 03a-single-node-broadcast/src/storage.rs | 32 +++ 5 files changed, 259 insertions(+), 134 deletions(-) create mode 100644 03a-single-node-broadcast/src/connection.rs create mode 100644 03a-single-node-broadcast/src/message.rs create mode 100644 03a-single-node-broadcast/src/node.rs create mode 100644 03a-single-node-broadcast/src/storage.rs diff --git a/03a-single-node-broadcast/src/connection.rs b/03a-single-node-broadcast/src/connection.rs new file mode 100644 index 0000000..983d24c --- /dev/null +++ b/03a-single-node-broadcast/src/connection.rs @@ -0,0 +1,44 @@ +use crate::message::Message; +use std::io::{BufRead, Write}; + +#[derive(Debug)] +pub struct Connection<'a> { + reader: std::io::BufReader>, + writer: std::io::Stdout, +} + +impl<'a> Connection<'a> { + pub fn new(stdin: std::io::Stdin) -> Self { + Connection { + reader: std::io::BufReader::new(stdin.lock()), + writer: std::io::stdout(), + } + } + + pub fn read_one(&mut self) -> Option { + let mut buf = String::new(); + let _ = self.reader.read_line(&mut buf); + return Some(Message::parse_message(buf)); + } + + pub fn read(&mut self) -> Option { + let mut buffer = String::new(); + + match self.reader.read_line(&mut buffer) { + Ok(bytes_read) => { + if bytes_read > 0 { + serde_json::from_str(&buffer).ok() + } else { + None + } + } + Err(_) => None, + } + } + + pub fn write(&mut self, message: Message) { + let message = Message::format_message(message); + writeln!(self.writer, "{}", message).unwrap(); + self.writer.flush().unwrap(); + } +} diff --git a/03a-single-node-broadcast/src/main.rs b/03a-single-node-broadcast/src/main.rs index e8e567c..749b0f0 100644 --- a/03a-single-node-broadcast/src/main.rs +++ b/03a-single-node-broadcast/src/main.rs @@ -1,141 +1,100 @@ -use serde::{Deserialize, Serialize}; -use std::io::{self, BufRead, Write}; -use std::collections::HashMap; +mod connection; +mod message; +mod node; +mod storage; -#[derive(Serialize, Deserialize, Debug)] -struct Message { - src: String, - dest: String, - body: Body, -} - -#[derive(Serialize, Deserialize, Debug)] -#[serde(tag = "type")] -#[serde(rename_all = "snake_case")] -enum Body { - Error { - in_reply_to: u64, - code: u64, - text: String, - }, - Init { - msg_id: u64, - node_id: String, - node_ids: Vec, - }, - InitOk { in_reply_to: u64 }, - Broadcast { - msg_id: u64, - message: u64, - }, - BroadcastOk { - msg_id: u64, - in_reply_to: u64, - }, - Read { - msg_id: u64, - }, - ReadOk { - msg_id: u64, - in_reply_to: u64, - messages: Vec, - }, - Topology { - msg_id: u64, - topology: Topology, - }, - TopologyOk { - msg_id: u64, - in_reply_to: u64, - } -} - -#[derive(Serialize, Deserialize, Debug)] -struct Messages(Vec); - -#[derive(Serialize, Deserialize, Debug)] -struct Topology(HashMap>); +use crate::connection::Connection; +use crate::message::{Body, Message}; +use crate::node::Node; fn main() { - let stdin = io::stdin(); - let mut stdout = io::stdout(); - let mut node_id = String::new(); + let stdin = std::io::stdin(); + let mut connection = Connection::new(stdin); - let mut messages = Messages(Vec::new()); + let mut node = init_node(&mut connection); - for line in stdin.lock().lines() { - let input: Message = serde_json::from_str(&line.unwrap()).unwrap(); - match input.body { - Body::Init { - msg_id, - node_id: id, - .. - } => { - node_id = id; - let output = Message { - src: node_id.clone(), - dest: input.src, - body: Body::InitOk { - in_reply_to: msg_id, - }, - }; - let output_json = serde_json::to_string(&output).unwrap(); - writeln!(stdout, "{}", output_json).unwrap(); - stdout.flush().unwrap(); - } - Body::Broadcast { msg_id, message } => { - messages.0.push(message); - - let output = Message { - src: node_id.clone(), - dest: input.src, - body: Body::BroadcastOk { - msg_id, - in_reply_to: msg_id, - }, - }; - let output_json = serde_json::to_string(&output).unwrap(); - writeln!(stdout, "{}", output_json).unwrap(); - stdout.flush().unwrap(); - } - Body::Read { msg_id } => { - let output = Message { - src: node_id.clone(), - dest: input.src, - body: Body::ReadOk { - msg_id, - in_reply_to: msg_id, - messages: messages.0.clone(), - }, - }; - let output_json = serde_json::to_string(&output).unwrap(); - writeln!(stdout, "{}", output_json).unwrap(); - stdout.flush().unwrap(); - } - Body::Topology { msg_id, .. } => { - let output = Message { - src: node_id.clone(), - dest: input.src, - body: Body::TopologyOk { - msg_id, - in_reply_to: msg_id, - }, - }; - let output_json = serde_json::to_string(&output).unwrap(); - writeln!(stdout, "{}", output_json).unwrap(); - stdout.flush().unwrap(); - } - Body::Error { - in_reply_to, - code, - text, - } => { - eprintln!( - "Error received (in_reply_to: {}, code: {}, text: {})", - in_reply_to, code, text - ); - } - _ => (), - } + while let Some(message) = connection.read() { + handle_message(&mut node, &mut connection, message); + } +} + +fn init_node(connection: &mut Connection) -> Node { + let input = connection.read_one().expect("Didn't get input"); + + let node; + match input.body { + Body::Init { msg_id, .. } => { + node = Node::init(input.clone()); + + let response = Message { + src: node.id.clone(), + dest: input.src, + body: Body::InitOk { + in_reply_to: msg_id, + }, + }; + + connection.write(response); + } + _ => panic!("Node is not initalized yet"), + } + + node +} + +fn handle_message(node: &mut Node, connection: &mut Connection, input: Message) { + match input.body { + Body::Broadcast { msg_id, message } => { + node.storage.add_message(message); + + let response = Message { + src: node.id.clone(), + dest: input.src, + body: Body::BroadcastOk { + msg_id, + in_reply_to: msg_id, + }, + }; + + connection.write(response); + } + Body::Read { msg_id } => { + let output = Message { + src: node.id.clone(), + dest: input.src, + body: Body::ReadOk { + msg_id, + in_reply_to: msg_id, + messages: node.storage.get_messages(), + }, + }; + + connection.write(output); + } + Body::Topology { msg_id, topology } => { + node.storage.init_topology(topology); + + let output = Message { + src: node.id.clone(), + dest: input.src, + body: Body::TopologyOk { + msg_id, + in_reply_to: msg_id, + }, + }; + + connection.write(output); + } + Body::Error { + in_reply_to, + code, + text, + } => { + eprintln!( + "Error received (in_reply_to: {}, code: {}, text: {})", + in_reply_to, code, text + ); + } + _ => (), } } diff --git a/03a-single-node-broadcast/src/message.rs b/03a-single-node-broadcast/src/message.rs new file mode 100644 index 0000000..356fa6f --- /dev/null +++ b/03a-single-node-broadcast/src/message.rs @@ -0,0 +1,62 @@ +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; + +#[derive(Clone, Serialize, Deserialize, Debug)] +pub struct Message { + pub src: String, + pub dest: String, + pub body: Body, +} + +#[derive(Clone, Serialize, Deserialize, Debug)] +#[serde(tag = "type")] +#[serde(rename_all = "snake_case")] +pub enum Body { + Error { + in_reply_to: u64, + code: u64, + text: String, + }, + Init { + msg_id: u64, + node_id: String, + node_ids: Vec, + }, + InitOk { + in_reply_to: u64, + }, + Broadcast { + msg_id: u64, + message: u64, + }, + BroadcastOk { + msg_id: u64, + in_reply_to: u64, + }, + Read { + msg_id: u64, + }, + ReadOk { + msg_id: u64, + in_reply_to: u64, + messages: Vec, + }, + Topology { + msg_id: u64, + topology: HashMap>, + }, + TopologyOk { + msg_id: u64, + in_reply_to: u64, + }, +} + +impl Message { + pub(crate) fn parse_message(message: String) -> Message { + serde_json::from_str(&message).unwrap() + } + + pub(crate) fn format_message(message: Message) -> String { + serde_json::to_string(&message).unwrap() + } +} diff --git a/03a-single-node-broadcast/src/node.rs b/03a-single-node-broadcast/src/node.rs new file mode 100644 index 0000000..03ae7c1 --- /dev/null +++ b/03a-single-node-broadcast/src/node.rs @@ -0,0 +1,28 @@ +use serde::{Deserialize, Serialize}; + +use crate::message::{Body, Message}; +use crate::storage::Storage; + +#[derive(Serialize, Deserialize, Debug, Default)] +pub(crate) struct Node { + pub(crate) id: String, + pub(crate) availble_nodes: Vec, + pub(crate) storage: Storage, +} + +impl Node { + pub(crate) fn init(message: Message) -> Node { + match message.body { + Body::Init { + node_id, node_ids, .. + } => { + return Node { + id: node_id, + availble_nodes: node_ids, + storage: Storage::new(), + } + } + _ => panic!("Invalid message type"), + } + } +} diff --git a/03a-single-node-broadcast/src/storage.rs b/03a-single-node-broadcast/src/storage.rs new file mode 100644 index 0000000..d51ad21 --- /dev/null +++ b/03a-single-node-broadcast/src/storage.rs @@ -0,0 +1,32 @@ +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; + +#[derive(Serialize, Deserialize, Debug, Default)] +pub(crate) struct Topology(pub(crate) HashMap>); + +#[derive(Serialize, Deserialize, Debug, Default)] +pub(crate) struct Messages(pub(crate) Vec); + +#[derive(Serialize, Deserialize, Debug, Default)] +pub(crate) struct Storage { + pub(crate) messages: Messages, + pub(crate) topology: Topology, +} + +impl Storage { + pub(crate) fn new() -> Storage { + Storage::default() + } + + pub(crate) fn add_message(&mut self, message: u64) { + self.messages.0.push(message); + } + + pub(crate) fn get_messages(&mut self) -> Vec { + self.messages.0.to_owned() + } + + pub(crate) fn init_topology(&mut self, topology: HashMap>) { + self.topology.0 = topology; + } +}