Trial and error, not working yet

This commit is contained in:
Bastian Gruber 2023-05-14 20:41:57 +02:00
parent 1a3de81fb7
commit 2d84e77b92
No known key found for this signature in database
GPG key ID: BE9F8C772B188CBF

View file

@ -7,57 +7,58 @@ use crate::node::Node;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter}; use tokio::io::AsyncBufReadExt;
use tokio::io::{AsyncWriteExt, BufReader, BufWriter};
use tokio::sync::Mutex; use tokio::sync::Mutex;
use tokio::sync::{mpsc, mpsc::Receiver, mpsc::Sender}; use tokio::sync::{broadcast, broadcast::Receiver, broadcast::Sender};
use tokio::time; use tokio::time;
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
// let mut interval = time::interval(Duration::from_secs(5)); let mut interval = time::interval(Duration::from_millis(200));
let (reader_tx, mut reader_rx) = broadcast::channel(100);
let (reader_tx, mut reader_rx) = mpsc::channel(100); let (writer_tx, mut writer_rx) = broadcast::channel(100);
let (writer_tx, mut writer_rx) = mpsc::channel(100);
let node = Arc::new(Mutex::new(Node::default())); let node = Arc::new(Mutex::new(Node::default()));
// let writer_tx = Arc::new(Mutex::new(writer_tx));
let n1 = node.clone(); let n1 = node.clone();
let n2 = node.clone(); let n2 = node.clone();
let w1_tx = writer_tx.clone(); let reader_tx1 = reader_tx.clone();
let w2_tx = writer_tx.clone(); let writer_tx1 = writer_tx.clone();
let writer_tx2 = writer_tx.clone();
let read = tokio::spawn(async move { let read = tokio::spawn(async move {
read_from_stdin(reader_tx).await; read_from_stdin(reader_tx1).await;
}); });
let write = tokio::spawn(async move { let write = tokio::spawn(async move {
write_to_stdout(&mut writer_rx).await; write_to_stdout(&mut writer_rx).await;
}); });
// tokio::spawn(async move { tokio::spawn(async move {
// loop { loop {
// interval.tick().await; interval.tick().await;
// gossip_messages(n1.clone(), w1_tx.clone()).await; gossip_messages(n1.clone(), writer_tx2.clone()).await;
// } }
// });
let handle = tokio::spawn(async move {
handle_messages(n2, &mut reader_rx, writer_tx).await;
}); });
let _ = tokio::join!(read, write, handle); let handle = tokio::spawn(async move {
handle_messages(n2, &mut reader_rx, writer_tx1).await;
});
let _ = tokio::try_join!(read, handle, write);
} }
async fn read_from_stdin(reader_tx: Sender<Message>) { async fn read_from_stdin(reader_tx: Sender<Message>) {
let stdin = tokio::io::stdin(); let stdin = tokio::io::stdin();
let mut reader = BufReader::new(stdin).lines(); let mut reader = BufReader::new(stdin);
eprintln!("Reading from stdin");
while let Ok(Some(line)) = reader.next_line().await { loop {
eprintln!("Reading next line {line:?}"); let mut buf = String::new();
let message = Message::parse_message(line); reader.read_line(&mut buf).await.unwrap();
reader_tx.send(message).await.unwrap(); let message = Message::parse_message(buf.clone());
reader_tx.send(message).unwrap();
} }
} }
@ -65,21 +66,16 @@ async fn write_to_stdout(writer_rx: &mut Receiver<Message>) {
let stdout = tokio::io::stdout(); let stdout = tokio::io::stdout();
let mut writer = BufWriter::new(stdout); let mut writer = BufWriter::new(stdout);
while let Some(message) = writer_rx.recv().await { loop {
let message = writer_rx.recv().await.unwrap();
let message = Message::format_message(message); let message = Message::format_message(message);
if let Err(e) = writer.write_all(message.as_bytes()).await { writer.write_all(&message.as_bytes()).await.unwrap();
eprintln!("Failed to write to stdout: {}", e); writer.flush().await.unwrap();
}
if let Err(e) = writer.flush().await {
eprintln!("Failed to flush stdout: {}", e);
}
} }
} }
async fn gossip_messages(node: Arc<Mutex<Node>>, writer: Arc<Mutex<Sender<Message>>>) { async fn gossip_messages(node: Arc<Mutex<Node>>, writer: Sender<Message>) {
let mut node = node.lock().await; let mut node = node.lock().await;
let writer = writer.lock().await;
for n in node.storage.get_neighbours(&node.get_id()) { for n in node.storage.get_neighbours(&node.get_id()) {
let messages = node.storage.get_messages_for_node(n.clone()); let messages = node.storage.get_messages_for_node(n.clone());
@ -91,7 +87,7 @@ async fn gossip_messages(node: Arc<Mutex<Node>>, writer: Arc<Mutex<Sender<Messag
}, },
}; };
let _ = writer.send(message).await.unwrap(); writer.send(message).unwrap();
node.storage node.storage
.add_to_sent_messages(messages.into_iter().collect(), n); .add_to_sent_messages(messages.into_iter().collect(), n);
} }
@ -102,7 +98,7 @@ async fn handle_messages(
input: &mut Receiver<Message>, input: &mut Receiver<Message>,
writer: Sender<Message>, writer: Sender<Message>,
) { ) {
while let Some(input) = input.recv().await { while let Ok(input) = input.recv().await {
match input.body { match input.body {
Body::Init { msg_id, .. } => { Body::Init { msg_id, .. } => {
node.lock().await.init(input.clone()); node.lock().await.init(input.clone());
@ -115,7 +111,7 @@ async fn handle_messages(
}, },
}; };
let _ = writer.send(response).await; writer.send(response).unwrap();
} }
Body::Broadcast { msg_id, message } => { Body::Broadcast { msg_id, message } => {
let id = node.lock().await.get_id(); let id = node.lock().await.get_id();
@ -130,12 +126,12 @@ async fn handle_messages(
}, },
}; };
let _ = writer.send(response).await; writer.send(response).unwrap();
} }
Body::Read { msg_id } => { Body::Read { msg_id } => {
let id = node.lock().await.get_id(); let id = node.lock().await.get_id();
let output = Message { let response = Message {
src: id, src: id,
dest: input.src, dest: input.src,
body: Body::ReadOk { body: Body::ReadOk {
@ -145,13 +141,13 @@ async fn handle_messages(
}, },
}; };
let _ = writer.send(output).await; writer.send(response).unwrap();
} }
Body::Topology { msg_id, topology } => { Body::Topology { msg_id, topology } => {
let id = node.lock().await.get_id(); let id = node.lock().await.get_id();
node.lock().await.storage.init_topology(topology); node.lock().await.storage.init_topology(topology);
let output = Message { let response = Message {
src: id, src: id,
dest: input.src, dest: input.src,
body: Body::TopologyOk { body: Body::TopologyOk {
@ -160,7 +156,7 @@ async fn handle_messages(
}, },
}; };
let _ = writer.send(output).await; writer.send(response).unwrap();
} }
Body::Error { Body::Error {
in_reply_to, in_reply_to,
@ -175,4 +171,5 @@ async fn handle_messages(
_ => (), _ => (),
} }
} }
println!("Error, nothing to read from receiver");
} }