From 9615a18a03ed676656eac0718f0db12c41160068 Mon Sep 17 00:00:00 2001 From: Bastian Gruber Date: Fri, 28 Apr 2023 10:13:32 +0200 Subject: [PATCH] Back to simple TCP echo server impl and starting from first principles --- .gitignore | 2 + problem_03/src/client.rs | 64 ++++++++--------- problem_03/src/server.rs | 149 +++++++++++++++------------------------ 3 files changed, 87 insertions(+), 128 deletions(-) diff --git a/.gitignore b/.gitignore index 088ba6b..52e4d31 100644 --- a/.gitignore +++ b/.gitignore @@ -8,3 +8,5 @@ Cargo.lock # These are backup files generated by rustfmt **/*.rs.bk + +.idea diff --git a/problem_03/src/client.rs b/problem_03/src/client.rs index d12edbd..c684eac 100644 --- a/problem_03/src/client.rs +++ b/problem_03/src/client.rs @@ -1,62 +1,56 @@ -use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; +use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter}; use tokio::net::TcpStream; -use tokio::sync::mpsc::channel; use tokio::task; -use tracing::{debug, info, error}; +use tracing::{error, info}; #[tokio::main] async fn main() -> Result<(), Box> { tracing_subscriber::fmt::try_init().expect("Tracing was not setup"); - let stream = TcpStream::connect("127.0.0.1:8080").await?; + let stream = TcpStream::connect("0.0.0.0:1222").await?; - let (tx, mut rx) = channel::(10); + let (reader, writer) = tokio::io::split(stream); + let mut buf_reader = BufReader::new(reader); + let mut writer = BufWriter::new(writer); - let (mut reader, mut writer) = tokio::io::split(stream); - - let tx_clone = tx.clone(); - - task::spawn(async move { - let mut reader = BufReader::new(&mut reader); + let server_handle = task::spawn(async move { + let mut buf = String::new(); loop { info!("Inside reading lines from server loop"); - let mut buf = String::new(); - if let Ok(n) = reader.read_line(&mut buf).await { + if let Ok(n) = buf_reader.read_line(&mut buf).await { if n > 0 { - println!("{}", buf.trim_end()); + info!("Receivng from server: {}", buf.trim_end()); } else { - break; + info!("Server is finished sending, break"); + return; } } else { - break; + error!("Cannot receive"); + return; } + + buf.clear(); } - tx_clone.send("exit".to_string()).await.unwrap(); }); - loop { - info!("Inside read from std::io loop"); - let mut buf = String::new(); - std::io::stdin().read_line(&mut buf)?; + let std_handle = tokio::spawn(async move { + let mut stdin_reader = BufReader::new(tokio::io::stdin()).lines(); + while let Ok(Some(line)) = stdin_reader.next_line().await { + info!("Received line from stdin: {}", line); - let buf = buf.trim_end().to_string(); - info!("New line: {}", buf); - if buf.to_lowercase() == "exit" { - break; - } - debug!(?buf); - if let Err(_) = writer.write_all(buf.as_bytes()).await { - error!("Could not sent"); - break; - } - - if let Some(msg) = rx.recv().await { - if msg.to_lowercase() == "exit" { + if let Err(_) = writer.write_all(line.as_bytes()).await { + error!("Error reading from std"); break; } + + let _ = writer.write_all(&[b'\n']).await; + let _ = writer.flush().await; } - } + }); + + let _ = server_handle.await; + let _ = std_handle.await; Ok(()) } diff --git a/problem_03/src/server.rs b/problem_03/src/server.rs index 4794008..5ea0b28 100644 --- a/problem_03/src/server.rs +++ b/problem_03/src/server.rs @@ -1,110 +1,73 @@ -use std::collections::HashMap; -use std::sync::{Arc, Mutex}; -use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; -use tokio::net::TcpListener; -use tokio::sync::broadcast::{self, Sender}; -use tracing::{debug, info, error}; +use tokio::{ + io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter}, + net::TcpListener, +}; +use tracing::{error, info}; + +const IP: &str = "0.0.0.0"; +const PORT: u16 = 1222; + +type Error = Box; +type Result = std::result::Result; #[tokio::main] -async fn main() -> Result<(), Box> { +async fn main() -> Result<()> { tracing_subscriber::fmt::try_init().expect("Tracing was not setup"); - let listener = TcpListener::bind("0.0.0.0:1222").await?; - info!("Start listening on 0.0.0.0:1222"); - - let clients = Arc::new(Mutex::new(HashMap::new())); - let (tx, _) = broadcast::channel(10); - + let listener = TcpListener::bind(format!("{IP}:{PORT}")).await?; + info!("Listening on: {}", format!("{IP}:{PORT}")); + // Infinite loop to always listen to new connections on this IP/PORT loop { - let (mut socket, _addr) = listener.accept().await?; - let clients = clients.clone(); - let tx = tx.clone(); - // let mut stream = stream?; - + // Get the TCP stream out of the new connection, and the address from which + // it is connected to + let (mut stream, address) = listener.accept().await?; + info!("New address connected: {}", address); + // We spawn a new task, so every incoming connection can be put on a thread + // and be worked on "in the background" + // This allows us to handle multiple connections "at the same time" + let _ = stream.write_all("You are connected!\n".as_bytes()).await; tokio::spawn(async move { - let (mut reader, mut writer) = socket.split(); - let mut buf = String::new(); - let mut reader = BufReader::new(&mut reader); + // From the stream (TcpStream), we can extract the reading, and the writing part + // So we can read and write to the connected client on this port + let (reader, writer) = stream.split(); - // Request the user's name - if let Err(e) = writer - .write_all(b"Welcome to budgetchat! What shall I call you?\n") - .await - { - println!("Failed to send name request: {}", e); - return; - } + // So we don't read "directly" on the reader. Therefore we use + // BufReader, which performs large, infrequent reads on the underlying + // AsyncRead instance (reader) + let mut reader = BufReader::new(reader); - // Get the user's name - match reader.read_line(&mut buf).await { - Ok(_) => { - let name = buf.trim().to_string(); - info!("Receiving name: {}", name); - if !is_valid_name(&name) { - if let Err(e) = writer - .write_all(b"Invalid name. Connection closed.\n") - .await - { - println!("Failed to send error message: {}", e); - } + // We do the same for the writing part to the stream + // let mut writer = BufWriter::new(writer); + let mut writer = BufWriter::new(writer); + + // We need to store what we read from the stream in a local buffer/object + let mut line = String::new(); + + loop { + // We read exactly one line per loop. A line ends with \n. + // So if the client doesn't frame their package with \n at the end, + // we won't process until we find one. + let _ = match reader.read_line(&mut line).await { + Ok(n) if n == 0 => return, + Ok(n) => n, + Err(e) => { + error!("Error reading: {}", e); return; } + }; - let (client_tx, _client_rx) = broadcast::channel(10); + info!("New client message received: {}", line.trim_end()); - { - let mut clients = clients.lock().unwrap(); - announce_join(&name, &clients, &tx); - clients.insert(name.clone(), client_tx); - } - - // Relay messages to other clients - while let Ok(_) = reader.read_line(&mut buf).await { - let message = buf.trim().to_string(); - - if message.is_empty() { - break; - } - - relay_message(&name, &message, &tx); - } - - // Client disconnected, remove from clients and announce leave - { - let mut clients = clients.lock().unwrap(); - clients.remove(&name); - announce_leave(&name, &clients, &tx); - } + if let Err(e) = writer.write_all(line.as_bytes()).await { + error!("Error writing: {}", e); + return; } - Err(e) => println!("Failed to read name: {}", e), + + let _ = writer.write_all(&[b'\n']).await; + let _ = writer.flush().await; + + line.clear(); } }); } } - -fn is_valid_name(name: &str) -> bool { - !name.is_empty() && name.chars().all(|c| c.is_ascii_alphanumeric()) -} - -fn announce_join(name: &str, clients: &HashMap>, tx: &Sender) { - let message = format!("* {} has entered the room", name); - for client_name in clients.keys() { - if client_name != name { - let _ = tx.send(message.clone()); - } - } -} - -fn announce_leave(name: &str, clients: &HashMap>, tx: &Sender) { - let message = format!("* {} has left the room", name); - for client_name in clients.keys() { - if client_name != name { - let _ = tx.send(message.clone()); - } - } -} - -fn relay_message(name: &str, message: &str, tx: &Sender) { - let message = format!("[{}] {}", name, message); - let _ = tx.send(message); -}