Trial async version
This commit is contained in:
parent
4768e74d2c
commit
1a3de81fb7
5 changed files with 215 additions and 145 deletions
|
|
@ -1,44 +0,0 @@
|
||||||
use crate::message::Message;
|
|
||||||
use std::io::{BufRead, Write};
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct Connection<'a> {
|
|
||||||
reader: std::io::BufReader<std::io::StdinLock<'a>>,
|
|
||||||
writer: std::io::Stdout,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<'a> Connection<'a> {
|
|
||||||
pub fn new(stdin: std::io::Stdin) -> Self {
|
|
||||||
Connection {
|
|
||||||
reader: std::io::BufReader::new(stdin.lock()),
|
|
||||||
writer: std::io::stdout(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn read_one(&mut self) -> Option<Message> {
|
|
||||||
let mut buf = String::new();
|
|
||||||
let _ = self.reader.read_line(&mut buf);
|
|
||||||
return Some(Message::parse_message(buf));
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn read(&mut self) -> Option<Message> {
|
|
||||||
let mut buffer = String::new();
|
|
||||||
|
|
||||||
match self.reader.read_line(&mut buffer) {
|
|
||||||
Ok(bytes_read) => {
|
|
||||||
if bytes_read > 0 {
|
|
||||||
serde_json::from_str(&buffer).ok()
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Err(_) => None,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn write(&mut self, message: Message) {
|
|
||||||
let message = Message::format_message(message);
|
|
||||||
writeln!(self.writer, "{}", message).unwrap();
|
|
||||||
self.writer.flush().unwrap();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
@ -1,115 +1,178 @@
|
||||||
mod connection;
|
|
||||||
mod message;
|
mod message;
|
||||||
mod node;
|
mod node;
|
||||||
mod storage;
|
mod storage;
|
||||||
|
|
||||||
use crate::connection::Connection;
|
|
||||||
use crate::message::{Body, Message};
|
use crate::message::{Body, Message};
|
||||||
use crate::node::Node;
|
use crate::node::Node;
|
||||||
|
|
||||||
fn main() {
|
use std::sync::Arc;
|
||||||
let stdin = std::io::stdin();
|
use std::time::Duration;
|
||||||
let mut connection = Connection::new(stdin);
|
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter};
|
||||||
|
use tokio::sync::Mutex;
|
||||||
|
use tokio::sync::{mpsc, mpsc::Receiver, mpsc::Sender};
|
||||||
|
use tokio::time;
|
||||||
|
|
||||||
let mut node = init_node(&mut connection);
|
#[tokio::main]
|
||||||
|
async fn main() {
|
||||||
|
// let mut interval = time::interval(Duration::from_secs(5));
|
||||||
|
|
||||||
while let Some(message) = connection.read() {
|
let (reader_tx, mut reader_rx) = mpsc::channel(100);
|
||||||
handle_message(&mut node, &mut connection, message);
|
let (writer_tx, mut writer_rx) = mpsc::channel(100);
|
||||||
|
|
||||||
|
let node = Arc::new(Mutex::new(Node::default()));
|
||||||
|
// let writer_tx = Arc::new(Mutex::new(writer_tx));
|
||||||
|
|
||||||
|
let n1 = node.clone();
|
||||||
|
let n2 = node.clone();
|
||||||
|
|
||||||
|
let w1_tx = writer_tx.clone();
|
||||||
|
let w2_tx = writer_tx.clone();
|
||||||
|
|
||||||
|
let read = tokio::spawn(async move {
|
||||||
|
read_from_stdin(reader_tx).await;
|
||||||
|
});
|
||||||
|
|
||||||
|
let write = tokio::spawn(async move {
|
||||||
|
write_to_stdout(&mut writer_rx).await;
|
||||||
|
});
|
||||||
|
|
||||||
|
// tokio::spawn(async move {
|
||||||
|
// loop {
|
||||||
|
// interval.tick().await;
|
||||||
|
// gossip_messages(n1.clone(), w1_tx.clone()).await;
|
||||||
|
// }
|
||||||
|
// });
|
||||||
|
|
||||||
|
let handle = tokio::spawn(async move {
|
||||||
|
handle_messages(n2, &mut reader_rx, writer_tx).await;
|
||||||
|
});
|
||||||
|
|
||||||
|
let _ = tokio::join!(read, write, handle);
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn read_from_stdin(reader_tx: Sender<Message>) {
|
||||||
|
let stdin = tokio::io::stdin();
|
||||||
|
let mut reader = BufReader::new(stdin).lines();
|
||||||
|
eprintln!("Reading from stdin");
|
||||||
|
while let Ok(Some(line)) = reader.next_line().await {
|
||||||
|
eprintln!("Reading next line {line:?}");
|
||||||
|
let message = Message::parse_message(line);
|
||||||
|
reader_tx.send(message).await.unwrap();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn init_node(connection: &mut Connection) -> Node {
|
async fn write_to_stdout(writer_rx: &mut Receiver<Message>) {
|
||||||
let input = connection.read_one().expect("Didn't get input");
|
let stdout = tokio::io::stdout();
|
||||||
|
let mut writer = BufWriter::new(stdout);
|
||||||
|
|
||||||
let node;
|
while let Some(message) = writer_rx.recv().await {
|
||||||
match input.body {
|
let message = Message::format_message(message);
|
||||||
Body::Init { msg_id, .. } => {
|
if let Err(e) = writer.write_all(message.as_bytes()).await {
|
||||||
node = Node::init(input.clone());
|
eprintln!("Failed to write to stdout: {}", e);
|
||||||
|
|
||||||
let response = Message {
|
|
||||||
src: node.id.clone(),
|
|
||||||
dest: input.src,
|
|
||||||
body: Body::InitOk {
|
|
||||||
in_reply_to: msg_id,
|
|
||||||
},
|
|
||||||
};
|
|
||||||
|
|
||||||
connection.write(response);
|
|
||||||
}
|
}
|
||||||
_ => panic!("Node is not initalized yet"),
|
|
||||||
}
|
|
||||||
|
|
||||||
node
|
if let Err(e) = writer.flush().await {
|
||||||
|
eprintln!("Failed to flush stdout: {}", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn handle_message(node: &mut Node, connection: &mut Connection, input: Message) {
|
async fn gossip_messages(node: Arc<Mutex<Node>>, writer: Arc<Mutex<Sender<Message>>>) {
|
||||||
match input.body {
|
let mut node = node.lock().await;
|
||||||
Body::Broadcast { msg_id, message } => {
|
let writer = writer.lock().await;
|
||||||
node.storage.add_message(message);
|
|
||||||
|
|
||||||
let response = Message {
|
for n in node.storage.get_neighbours(&node.get_id()) {
|
||||||
src: node.id.clone(),
|
let messages = node.storage.get_messages_for_node(n.clone());
|
||||||
dest: input.src,
|
let message = Message {
|
||||||
body: Body::BroadcastOk {
|
src: node.id.clone(),
|
||||||
msg_id,
|
dest: n.clone(),
|
||||||
in_reply_to: msg_id,
|
body: Body::Gossip {
|
||||||
},
|
messages: messages.clone(),
|
||||||
};
|
},
|
||||||
|
};
|
||||||
|
|
||||||
connection.write(response);
|
let _ = writer.send(message).await.unwrap();
|
||||||
|
node.storage
|
||||||
|
.add_to_sent_messages(messages.into_iter().collect(), n);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
let nodes = node.storage.get_neighbours(node.id.clone());
|
async fn handle_messages(
|
||||||
|
node: Arc<Mutex<Node>>,
|
||||||
for n in nodes {
|
input: &mut Receiver<Message>,
|
||||||
let output = Message {
|
writer: Sender<Message>,
|
||||||
src: node.id.clone(),
|
) {
|
||||||
dest: n,
|
while let Some(input) = input.recv().await {
|
||||||
body: Body::Broadcast {
|
match input.body {
|
||||||
msg_id,
|
Body::Init { msg_id, .. } => {
|
||||||
message: node.storage.get_messages().last().unwrap().clone(),
|
node.lock().await.init(input.clone());
|
||||||
|
let id = node.lock().await.get_id();
|
||||||
|
let response = Message {
|
||||||
|
src: id,
|
||||||
|
dest: input.src,
|
||||||
|
body: Body::InitOk {
|
||||||
|
in_reply_to: msg_id,
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
connection.write(output);
|
let _ = writer.send(response).await;
|
||||||
}
|
}
|
||||||
}
|
Body::Broadcast { msg_id, message } => {
|
||||||
Body::Read { msg_id } => {
|
let id = node.lock().await.get_id();
|
||||||
let output = Message {
|
node.lock().await.storage.add_message(message, id.clone());
|
||||||
src: node.id.clone(),
|
|
||||||
dest: input.src,
|
|
||||||
body: Body::ReadOk {
|
|
||||||
msg_id,
|
|
||||||
in_reply_to: msg_id,
|
|
||||||
messages: node.storage.get_messages(),
|
|
||||||
},
|
|
||||||
};
|
|
||||||
|
|
||||||
connection.write(output);
|
let response = Message {
|
||||||
}
|
src: id,
|
||||||
Body::Topology { msg_id, topology } => {
|
dest: input.src,
|
||||||
node.storage.init_topology(topology);
|
body: Body::BroadcastOk {
|
||||||
|
msg_id,
|
||||||
|
in_reply_to: msg_id,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
let output = Message {
|
let _ = writer.send(response).await;
|
||||||
src: node.id.clone(),
|
}
|
||||||
dest: input.src,
|
Body::Read { msg_id } => {
|
||||||
body: Body::TopologyOk {
|
let id = node.lock().await.get_id();
|
||||||
msg_id,
|
|
||||||
in_reply_to: msg_id,
|
|
||||||
},
|
|
||||||
};
|
|
||||||
|
|
||||||
connection.write(output);
|
let output = Message {
|
||||||
|
src: id,
|
||||||
|
dest: input.src,
|
||||||
|
body: Body::ReadOk {
|
||||||
|
msg_id,
|
||||||
|
in_reply_to: msg_id,
|
||||||
|
messages: node.lock().await.storage.get_messages(),
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
let _ = writer.send(output).await;
|
||||||
|
}
|
||||||
|
Body::Topology { msg_id, topology } => {
|
||||||
|
let id = node.lock().await.get_id();
|
||||||
|
node.lock().await.storage.init_topology(topology);
|
||||||
|
|
||||||
|
let output = Message {
|
||||||
|
src: id,
|
||||||
|
dest: input.src,
|
||||||
|
body: Body::TopologyOk {
|
||||||
|
msg_id,
|
||||||
|
in_reply_to: msg_id,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
let _ = writer.send(output).await;
|
||||||
|
}
|
||||||
|
Body::Error {
|
||||||
|
in_reply_to,
|
||||||
|
code,
|
||||||
|
text,
|
||||||
|
} => {
|
||||||
|
eprintln!(
|
||||||
|
"Error received (in_reply_to: {}, code: {}, text: {})",
|
||||||
|
in_reply_to, code, text
|
||||||
|
);
|
||||||
|
}
|
||||||
|
_ => (),
|
||||||
}
|
}
|
||||||
Body::Error {
|
|
||||||
in_reply_to,
|
|
||||||
code,
|
|
||||||
text,
|
|
||||||
} => {
|
|
||||||
eprintln!(
|
|
||||||
"Error received (in_reply_to: {}, code: {}, text: {})",
|
|
||||||
in_reply_to, code, text
|
|
||||||
);
|
|
||||||
}
|
|
||||||
_ => (),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -49,6 +49,9 @@ pub enum Body {
|
||||||
msg_id: u64,
|
msg_id: u64,
|
||||||
in_reply_to: u64,
|
in_reply_to: u64,
|
||||||
},
|
},
|
||||||
|
Gossip {
|
||||||
|
messages: Vec<u64>,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Message {
|
impl Message {
|
||||||
|
|
|
||||||
|
|
@ -11,18 +11,19 @@ pub(crate) struct Node {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Node {
|
impl Node {
|
||||||
pub(crate) fn init(message: Message) -> Node {
|
pub(crate) fn init(&mut self, message: Message) {
|
||||||
match message.body {
|
match message.body {
|
||||||
Body::Init {
|
Body::Init {
|
||||||
node_id, node_ids, ..
|
node_id, node_ids, ..
|
||||||
} => {
|
} => {
|
||||||
return Node {
|
self.id = node_id;
|
||||||
id: node_id,
|
self.availble_nodes = node_ids;
|
||||||
availble_nodes: node_ids,
|
|
||||||
storage: Storage::new(),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
_ => panic!("Invalid message type"),
|
_ => panic!("Invalid message type"),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(crate) fn get_id(&self) -> String {
|
||||||
|
self.id.clone()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,36 +1,83 @@
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use std::collections::HashMap;
|
use std::collections::{HashMap, HashSet};
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, Debug, Default)]
|
#[derive(Serialize, Deserialize, Debug, Default)]
|
||||||
pub(crate) struct Topology(pub(crate) HashMap<String, Vec<String>>);
|
pub(crate) struct Topology(pub(crate) HashMap<String, Vec<String>>);
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, Debug, Default)]
|
#[derive(Serialize, Deserialize, Debug, Default)]
|
||||||
pub(crate) struct Messages(pub(crate) Vec<u64>);
|
pub(crate) struct Messages(pub(crate) HashSet<u64>);
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, Debug, Default)]
|
#[derive(Serialize, Deserialize, Debug, Default)]
|
||||||
pub(crate) struct Storage {
|
pub(crate) struct Storage {
|
||||||
pub(crate) messages: Messages,
|
pub(crate) messages: Messages,
|
||||||
|
pub(crate) received_messages: HashMap<String, Messages>,
|
||||||
|
pub(crate) sent_messages: HashMap<String, Messages>,
|
||||||
pub(crate) topology: Topology,
|
pub(crate) topology: Topology,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Storage {
|
impl Storage {
|
||||||
pub(crate) fn new() -> Storage {
|
pub(crate) fn add_message(&mut self, message: u64, node: String) {
|
||||||
Storage::default()
|
self.messages.0.insert(message);
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) fn add_message(&mut self, message: u64) {
|
if self.received_messages.contains_key(&node) {
|
||||||
self.messages.0.push(message);
|
self.received_messages
|
||||||
|
.get_mut(&node)
|
||||||
|
.unwrap()
|
||||||
|
.0
|
||||||
|
.insert(message);
|
||||||
|
} else {
|
||||||
|
let mut v = Messages::default();
|
||||||
|
v.0.insert(message);
|
||||||
|
self.received_messages.insert(node, v);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn get_messages(&mut self) -> Vec<u64> {
|
pub(crate) fn get_messages(&mut self) -> Vec<u64> {
|
||||||
self.messages.0.to_owned()
|
self.messages.0.clone().into_iter().collect()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn get_messages_for_node(&self, node: String) -> Vec<u64> {
|
||||||
|
let received: Vec<u64> = self
|
||||||
|
.received_messages
|
||||||
|
.iter()
|
||||||
|
.filter(|(key, _)| *key == &node)
|
||||||
|
.flat_map(|(_, Messages(value))| value)
|
||||||
|
.cloned()
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
let sent: Vec<u64> = self
|
||||||
|
.sent_messages
|
||||||
|
.iter()
|
||||||
|
.filter(|(key, _)| *key == &node)
|
||||||
|
.flat_map(|(_, Messages(value))| value)
|
||||||
|
.cloned()
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
self.messages
|
||||||
|
.0
|
||||||
|
.iter()
|
||||||
|
.filter(|m| !received.contains(m) && !sent.contains(m))
|
||||||
|
.cloned()
|
||||||
|
.collect()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn add_to_sent_messages(&mut self, messages: HashSet<u64>, node: String) {
|
||||||
|
if self.sent_messages.contains_key(&node) {
|
||||||
|
self.sent_messages
|
||||||
|
.get_mut(&node)
|
||||||
|
.unwrap()
|
||||||
|
.0
|
||||||
|
.extend(messages);
|
||||||
|
} else {
|
||||||
|
self.sent_messages.insert(node, Messages(messages));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn init_topology(&mut self, topology: HashMap<String, Vec<String>>) {
|
pub(crate) fn init_topology(&mut self, topology: HashMap<String, Vec<String>>) {
|
||||||
self.topology.0 = topology;
|
self.topology.0 = topology;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn get_neighbours(&mut self, node_id: String) -> Vec<String> {
|
pub(crate) fn get_neighbours(&self, node_id: &str) -> Vec<String> {
|
||||||
self.topology.0.get(&node_id).unwrap().to_owned()
|
self.topology.0.get(node_id).unwrap().to_owned()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue