Improve structure
This commit is contained in:
parent
00e78f998b
commit
b151429a41
5 changed files with 259 additions and 134 deletions
44
03a-single-node-broadcast/src/connection.rs
Normal file
44
03a-single-node-broadcast/src/connection.rs
Normal file
|
|
@ -0,0 +1,44 @@
|
|||
use crate::message::Message;
|
||||
use std::io::{BufRead, Write};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Connection<'a> {
|
||||
reader: std::io::BufReader<std::io::StdinLock<'a>>,
|
||||
writer: std::io::Stdout,
|
||||
}
|
||||
|
||||
impl<'a> Connection<'a> {
|
||||
pub fn new(stdin: std::io::Stdin) -> Self {
|
||||
Connection {
|
||||
reader: std::io::BufReader::new(stdin.lock()),
|
||||
writer: std::io::stdout(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn read_one(&mut self) -> Option<Message> {
|
||||
let mut buf = String::new();
|
||||
let _ = self.reader.read_line(&mut buf);
|
||||
return Some(Message::parse_message(buf));
|
||||
}
|
||||
|
||||
pub fn read(&mut self) -> Option<Message> {
|
||||
let mut buffer = String::new();
|
||||
|
||||
match self.reader.read_line(&mut buffer) {
|
||||
Ok(bytes_read) => {
|
||||
if bytes_read > 0 {
|
||||
serde_json::from_str(&buffer).ok()
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
Err(_) => None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn write(&mut self, message: Message) {
|
||||
let message = Message::format_message(message);
|
||||
writeln!(self.writer, "{}", message).unwrap();
|
||||
self.writer.flush().unwrap();
|
||||
}
|
||||
}
|
||||
|
|
@ -1,129 +1,89 @@
|
|||
use serde::{Deserialize, Serialize};
|
||||
use std::io::{self, BufRead, Write};
|
||||
use std::collections::HashMap;
|
||||
mod connection;
|
||||
mod message;
|
||||
mod node;
|
||||
mod storage;
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
struct Message {
|
||||
src: String,
|
||||
dest: String,
|
||||
body: Body,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
#[serde(tag = "type")]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
enum Body {
|
||||
Error {
|
||||
in_reply_to: u64,
|
||||
code: u64,
|
||||
text: String,
|
||||
},
|
||||
Init {
|
||||
msg_id: u64,
|
||||
node_id: String,
|
||||
node_ids: Vec<String>,
|
||||
},
|
||||
InitOk { in_reply_to: u64 },
|
||||
Broadcast {
|
||||
msg_id: u64,
|
||||
message: u64,
|
||||
},
|
||||
BroadcastOk {
|
||||
msg_id: u64,
|
||||
in_reply_to: u64,
|
||||
},
|
||||
Read {
|
||||
msg_id: u64,
|
||||
},
|
||||
ReadOk {
|
||||
msg_id: u64,
|
||||
in_reply_to: u64,
|
||||
messages: Vec<u64>,
|
||||
},
|
||||
Topology {
|
||||
msg_id: u64,
|
||||
topology: Topology,
|
||||
},
|
||||
TopologyOk {
|
||||
msg_id: u64,
|
||||
in_reply_to: u64,
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
struct Messages(Vec<u64>);
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
struct Topology(HashMap<String, Vec<String>>);
|
||||
use crate::connection::Connection;
|
||||
use crate::message::{Body, Message};
|
||||
use crate::node::Node;
|
||||
|
||||
fn main() {
|
||||
let stdin = io::stdin();
|
||||
let mut stdout = io::stdout();
|
||||
let mut node_id = String::new();
|
||||
let stdin = std::io::stdin();
|
||||
let mut connection = Connection::new(stdin);
|
||||
|
||||
let mut messages = Messages(Vec::new());
|
||||
let mut node = init_node(&mut connection);
|
||||
|
||||
for line in stdin.lock().lines() {
|
||||
let input: Message = serde_json::from_str(&line.unwrap()).unwrap();
|
||||
while let Some(message) = connection.read() {
|
||||
handle_message(&mut node, &mut connection, message);
|
||||
}
|
||||
}
|
||||
|
||||
fn init_node(connection: &mut Connection) -> Node {
|
||||
let input = connection.read_one().expect("Didn't get input");
|
||||
|
||||
let node;
|
||||
match input.body {
|
||||
Body::Init {
|
||||
msg_id,
|
||||
node_id: id,
|
||||
..
|
||||
} => {
|
||||
node_id = id;
|
||||
let output = Message {
|
||||
src: node_id.clone(),
|
||||
Body::Init { msg_id, .. } => {
|
||||
node = Node::init(input.clone());
|
||||
|
||||
let response = Message {
|
||||
src: node.id.clone(),
|
||||
dest: input.src,
|
||||
body: Body::InitOk {
|
||||
in_reply_to: msg_id,
|
||||
},
|
||||
};
|
||||
let output_json = serde_json::to_string(&output).unwrap();
|
||||
writeln!(stdout, "{}", output_json).unwrap();
|
||||
stdout.flush().unwrap();
|
||||
}
|
||||
Body::Broadcast { msg_id, message } => {
|
||||
messages.0.push(message);
|
||||
|
||||
let output = Message {
|
||||
src: node_id.clone(),
|
||||
connection.write(response);
|
||||
}
|
||||
_ => panic!("Node is not initalized yet"),
|
||||
}
|
||||
|
||||
node
|
||||
}
|
||||
|
||||
fn handle_message(node: &mut Node, connection: &mut Connection, input: Message) {
|
||||
match input.body {
|
||||
Body::Broadcast { msg_id, message } => {
|
||||
node.storage.add_message(message);
|
||||
|
||||
let response = Message {
|
||||
src: node.id.clone(),
|
||||
dest: input.src,
|
||||
body: Body::BroadcastOk {
|
||||
msg_id,
|
||||
in_reply_to: msg_id,
|
||||
},
|
||||
};
|
||||
let output_json = serde_json::to_string(&output).unwrap();
|
||||
writeln!(stdout, "{}", output_json).unwrap();
|
||||
stdout.flush().unwrap();
|
||||
|
||||
connection.write(response);
|
||||
}
|
||||
Body::Read { msg_id } => {
|
||||
let output = Message {
|
||||
src: node_id.clone(),
|
||||
src: node.id.clone(),
|
||||
dest: input.src,
|
||||
body: Body::ReadOk {
|
||||
msg_id,
|
||||
in_reply_to: msg_id,
|
||||
messages: messages.0.clone(),
|
||||
messages: node.storage.get_messages(),
|
||||
},
|
||||
};
|
||||
let output_json = serde_json::to_string(&output).unwrap();
|
||||
writeln!(stdout, "{}", output_json).unwrap();
|
||||
stdout.flush().unwrap();
|
||||
|
||||
connection.write(output);
|
||||
}
|
||||
Body::Topology { msg_id, .. } => {
|
||||
Body::Topology { msg_id, topology } => {
|
||||
node.storage.init_topology(topology);
|
||||
|
||||
let output = Message {
|
||||
src: node_id.clone(),
|
||||
src: node.id.clone(),
|
||||
dest: input.src,
|
||||
body: Body::TopologyOk {
|
||||
msg_id,
|
||||
in_reply_to: msg_id,
|
||||
},
|
||||
};
|
||||
let output_json = serde_json::to_string(&output).unwrap();
|
||||
writeln!(stdout, "{}", output_json).unwrap();
|
||||
stdout.flush().unwrap();
|
||||
|
||||
connection.write(output);
|
||||
}
|
||||
Body::Error {
|
||||
in_reply_to,
|
||||
|
|
@ -138,4 +98,3 @@ fn main() {
|
|||
_ => (),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
62
03a-single-node-broadcast/src/message.rs
Normal file
62
03a-single-node-broadcast/src/message.rs
Normal file
|
|
@ -0,0 +1,62 @@
|
|||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashMap;
|
||||
|
||||
#[derive(Clone, Serialize, Deserialize, Debug)]
|
||||
pub struct Message {
|
||||
pub src: String,
|
||||
pub dest: String,
|
||||
pub body: Body,
|
||||
}
|
||||
|
||||
#[derive(Clone, Serialize, Deserialize, Debug)]
|
||||
#[serde(tag = "type")]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum Body {
|
||||
Error {
|
||||
in_reply_to: u64,
|
||||
code: u64,
|
||||
text: String,
|
||||
},
|
||||
Init {
|
||||
msg_id: u64,
|
||||
node_id: String,
|
||||
node_ids: Vec<String>,
|
||||
},
|
||||
InitOk {
|
||||
in_reply_to: u64,
|
||||
},
|
||||
Broadcast {
|
||||
msg_id: u64,
|
||||
message: u64,
|
||||
},
|
||||
BroadcastOk {
|
||||
msg_id: u64,
|
||||
in_reply_to: u64,
|
||||
},
|
||||
Read {
|
||||
msg_id: u64,
|
||||
},
|
||||
ReadOk {
|
||||
msg_id: u64,
|
||||
in_reply_to: u64,
|
||||
messages: Vec<u64>,
|
||||
},
|
||||
Topology {
|
||||
msg_id: u64,
|
||||
topology: HashMap<String, Vec<String>>,
|
||||
},
|
||||
TopologyOk {
|
||||
msg_id: u64,
|
||||
in_reply_to: u64,
|
||||
},
|
||||
}
|
||||
|
||||
impl Message {
|
||||
pub(crate) fn parse_message(message: String) -> Message {
|
||||
serde_json::from_str(&message).unwrap()
|
||||
}
|
||||
|
||||
pub(crate) fn format_message(message: Message) -> String {
|
||||
serde_json::to_string(&message).unwrap()
|
||||
}
|
||||
}
|
||||
28
03a-single-node-broadcast/src/node.rs
Normal file
28
03a-single-node-broadcast/src/node.rs
Normal file
|
|
@ -0,0 +1,28 @@
|
|||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::message::{Body, Message};
|
||||
use crate::storage::Storage;
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Default)]
|
||||
pub(crate) struct Node {
|
||||
pub(crate) id: String,
|
||||
pub(crate) availble_nodes: Vec<String>,
|
||||
pub(crate) storage: Storage,
|
||||
}
|
||||
|
||||
impl Node {
|
||||
pub(crate) fn init(message: Message) -> Node {
|
||||
match message.body {
|
||||
Body::Init {
|
||||
node_id, node_ids, ..
|
||||
} => {
|
||||
return Node {
|
||||
id: node_id,
|
||||
availble_nodes: node_ids,
|
||||
storage: Storage::new(),
|
||||
}
|
||||
}
|
||||
_ => panic!("Invalid message type"),
|
||||
}
|
||||
}
|
||||
}
|
||||
32
03a-single-node-broadcast/src/storage.rs
Normal file
32
03a-single-node-broadcast/src/storage.rs
Normal file
|
|
@ -0,0 +1,32 @@
|
|||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashMap;
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Default)]
|
||||
pub(crate) struct Topology(pub(crate) HashMap<String, Vec<String>>);
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Default)]
|
||||
pub(crate) struct Messages(pub(crate) Vec<u64>);
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Default)]
|
||||
pub(crate) struct Storage {
|
||||
pub(crate) messages: Messages,
|
||||
pub(crate) topology: Topology,
|
||||
}
|
||||
|
||||
impl Storage {
|
||||
pub(crate) fn new() -> Storage {
|
||||
Storage::default()
|
||||
}
|
||||
|
||||
pub(crate) fn add_message(&mut self, message: u64) {
|
||||
self.messages.0.push(message);
|
||||
}
|
||||
|
||||
pub(crate) fn get_messages(&mut self) -> Vec<u64> {
|
||||
self.messages.0.to_owned()
|
||||
}
|
||||
|
||||
pub(crate) fn init_topology(&mut self, topology: HashMap<String, Vec<String>>) {
|
||||
self.topology.0 = topology;
|
||||
}
|
||||
}
|
||||
Loading…
Reference in a new issue