From 2d84e77b92d4807fdd15be005dcf8941a9788edd Mon Sep 17 00:00:00 2001 From: Bastian Gruber Date: Sun, 14 May 2023 20:41:57 +0200 Subject: [PATCH] Trial and error, not working yet --- 03b-multi-node-broadcast/src/main.rs | 85 ++++++++++++++-------------- 1 file changed, 41 insertions(+), 44 deletions(-) diff --git a/03b-multi-node-broadcast/src/main.rs b/03b-multi-node-broadcast/src/main.rs index 16059c8..811a63d 100644 --- a/03b-multi-node-broadcast/src/main.rs +++ b/03b-multi-node-broadcast/src/main.rs @@ -7,57 +7,58 @@ use crate::node::Node; use std::sync::Arc; use std::time::Duration; -use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter}; +use tokio::io::AsyncBufReadExt; +use tokio::io::{AsyncWriteExt, BufReader, BufWriter}; use tokio::sync::Mutex; -use tokio::sync::{mpsc, mpsc::Receiver, mpsc::Sender}; +use tokio::sync::{broadcast, broadcast::Receiver, broadcast::Sender}; use tokio::time; #[tokio::main] async fn main() { - // let mut interval = time::interval(Duration::from_secs(5)); - - let (reader_tx, mut reader_rx) = mpsc::channel(100); - let (writer_tx, mut writer_rx) = mpsc::channel(100); + let mut interval = time::interval(Duration::from_millis(200)); + let (reader_tx, mut reader_rx) = broadcast::channel(100); + let (writer_tx, mut writer_rx) = broadcast::channel(100); let node = Arc::new(Mutex::new(Node::default())); - // let writer_tx = Arc::new(Mutex::new(writer_tx)); let n1 = node.clone(); let n2 = node.clone(); - let w1_tx = writer_tx.clone(); - let w2_tx = writer_tx.clone(); + let reader_tx1 = reader_tx.clone(); + let writer_tx1 = writer_tx.clone(); + let writer_tx2 = writer_tx.clone(); let read = tokio::spawn(async move { - read_from_stdin(reader_tx).await; + read_from_stdin(reader_tx1).await; }); let write = tokio::spawn(async move { write_to_stdout(&mut writer_rx).await; }); - // tokio::spawn(async move { - // loop { - // interval.tick().await; - // gossip_messages(n1.clone(), w1_tx.clone()).await; - // } - // }); - - let handle = tokio::spawn(async move { - handle_messages(n2, &mut reader_rx, writer_tx).await; + tokio::spawn(async move { + loop { + interval.tick().await; + gossip_messages(n1.clone(), writer_tx2.clone()).await; + } }); - let _ = tokio::join!(read, write, handle); + let handle = tokio::spawn(async move { + handle_messages(n2, &mut reader_rx, writer_tx1).await; + }); + + let _ = tokio::try_join!(read, handle, write); } async fn read_from_stdin(reader_tx: Sender) { let stdin = tokio::io::stdin(); - let mut reader = BufReader::new(stdin).lines(); - eprintln!("Reading from stdin"); - while let Ok(Some(line)) = reader.next_line().await { - eprintln!("Reading next line {line:?}"); - let message = Message::parse_message(line); - reader_tx.send(message).await.unwrap(); + let mut reader = BufReader::new(stdin); + + loop { + let mut buf = String::new(); + reader.read_line(&mut buf).await.unwrap(); + let message = Message::parse_message(buf.clone()); + reader_tx.send(message).unwrap(); } } @@ -65,21 +66,16 @@ async fn write_to_stdout(writer_rx: &mut Receiver) { let stdout = tokio::io::stdout(); let mut writer = BufWriter::new(stdout); - while let Some(message) = writer_rx.recv().await { + loop { + let message = writer_rx.recv().await.unwrap(); let message = Message::format_message(message); - if let Err(e) = writer.write_all(message.as_bytes()).await { - eprintln!("Failed to write to stdout: {}", e); - } - - if let Err(e) = writer.flush().await { - eprintln!("Failed to flush stdout: {}", e); - } + writer.write_all(&message.as_bytes()).await.unwrap(); + writer.flush().await.unwrap(); } } -async fn gossip_messages(node: Arc>, writer: Arc>>) { +async fn gossip_messages(node: Arc>, writer: Sender) { let mut node = node.lock().await; - let writer = writer.lock().await; for n in node.storage.get_neighbours(&node.get_id()) { let messages = node.storage.get_messages_for_node(n.clone()); @@ -91,7 +87,7 @@ async fn gossip_messages(node: Arc>, writer: Arc, writer: Sender, ) { - while let Some(input) = input.recv().await { + while let Ok(input) = input.recv().await { match input.body { Body::Init { msg_id, .. } => { node.lock().await.init(input.clone()); @@ -115,7 +111,7 @@ async fn handle_messages( }, }; - let _ = writer.send(response).await; + writer.send(response).unwrap(); } Body::Broadcast { msg_id, message } => { let id = node.lock().await.get_id(); @@ -130,12 +126,12 @@ async fn handle_messages( }, }; - let _ = writer.send(response).await; + writer.send(response).unwrap(); } Body::Read { msg_id } => { let id = node.lock().await.get_id(); - let output = Message { + let response = Message { src: id, dest: input.src, body: Body::ReadOk { @@ -145,13 +141,13 @@ async fn handle_messages( }, }; - let _ = writer.send(output).await; + writer.send(response).unwrap(); } Body::Topology { msg_id, topology } => { let id = node.lock().await.get_id(); node.lock().await.storage.init_topology(topology); - let output = Message { + let response = Message { src: id, dest: input.src, body: Body::TopologyOk { @@ -160,7 +156,7 @@ async fn handle_messages( }, }; - let _ = writer.send(output).await; + writer.send(response).unwrap(); } Body::Error { in_reply_to, @@ -175,4 +171,5 @@ async fn handle_messages( _ => (), } } + println!("Error, nothing to read from receiver"); }