diff --git a/problem_03/.gitignore b/problem_03/.gitignore new file mode 100644 index 0000000..bef0727 --- /dev/null +++ b/problem_03/.gitignore @@ -0,0 +1,2 @@ +/target/ +.idea diff --git a/problem_03/Cargo.toml b/problem_03/Cargo.toml new file mode 100644 index 0000000..6a31808 --- /dev/null +++ b/problem_03/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "problem_03" +version = "0.1.0" +edition = "2021" + +[[bin]] +name = "server" +path = "src/server.rs" + +[[bin]] +name = "client" +path = "src/client.rs" + +[dependencies] +tokio = { version = "1.14.0", features = ["full"] } +tracing = "0.1.38" +tracing-subscriber = "0.3.17" diff --git a/problem_03/src/client.rs b/problem_03/src/client.rs new file mode 100644 index 0000000..d12edbd --- /dev/null +++ b/problem_03/src/client.rs @@ -0,0 +1,62 @@ +use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; +use tokio::net::TcpStream; +use tokio::sync::mpsc::channel; +use tokio::task; +use tracing::{debug, info, error}; + +#[tokio::main] +async fn main() -> Result<(), Box> { + tracing_subscriber::fmt::try_init().expect("Tracing was not setup"); + + let stream = TcpStream::connect("127.0.0.1:8080").await?; + + let (tx, mut rx) = channel::(10); + + let (mut reader, mut writer) = tokio::io::split(stream); + + let tx_clone = tx.clone(); + + task::spawn(async move { + let mut reader = BufReader::new(&mut reader); + + loop { + info!("Inside reading lines from server loop"); + let mut buf = String::new(); + if let Ok(n) = reader.read_line(&mut buf).await { + if n > 0 { + println!("{}", buf.trim_end()); + } else { + break; + } + } else { + break; + } + } + tx_clone.send("exit".to_string()).await.unwrap(); + }); + + loop { + info!("Inside read from std::io loop"); + let mut buf = String::new(); + std::io::stdin().read_line(&mut buf)?; + + let buf = buf.trim_end().to_string(); + info!("New line: {}", buf); + if buf.to_lowercase() == "exit" { + break; + } + debug!(?buf); + if let Err(_) = writer.write_all(buf.as_bytes()).await { + error!("Could not sent"); + break; + } + + if let Some(msg) = rx.recv().await { + if msg.to_lowercase() == "exit" { + break; + } + } + } + + Ok(()) +} diff --git a/problem_03/src/server.rs b/problem_03/src/server.rs new file mode 100644 index 0000000..4794008 --- /dev/null +++ b/problem_03/src/server.rs @@ -0,0 +1,110 @@ +use std::collections::HashMap; +use std::sync::{Arc, Mutex}; +use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; +use tokio::net::TcpListener; +use tokio::sync::broadcast::{self, Sender}; +use tracing::{debug, info, error}; + +#[tokio::main] +async fn main() -> Result<(), Box> { + tracing_subscriber::fmt::try_init().expect("Tracing was not setup"); + + let listener = TcpListener::bind("0.0.0.0:1222").await?; + info!("Start listening on 0.0.0.0:1222"); + + let clients = Arc::new(Mutex::new(HashMap::new())); + let (tx, _) = broadcast::channel(10); + + loop { + let (mut socket, _addr) = listener.accept().await?; + let clients = clients.clone(); + let tx = tx.clone(); + // let mut stream = stream?; + + tokio::spawn(async move { + let (mut reader, mut writer) = socket.split(); + let mut buf = String::new(); + let mut reader = BufReader::new(&mut reader); + + // Request the user's name + if let Err(e) = writer + .write_all(b"Welcome to budgetchat! What shall I call you?\n") + .await + { + println!("Failed to send name request: {}", e); + return; + } + + // Get the user's name + match reader.read_line(&mut buf).await { + Ok(_) => { + let name = buf.trim().to_string(); + info!("Receiving name: {}", name); + if !is_valid_name(&name) { + if let Err(e) = writer + .write_all(b"Invalid name. Connection closed.\n") + .await + { + println!("Failed to send error message: {}", e); + } + return; + } + + let (client_tx, _client_rx) = broadcast::channel(10); + + { + let mut clients = clients.lock().unwrap(); + announce_join(&name, &clients, &tx); + clients.insert(name.clone(), client_tx); + } + + // Relay messages to other clients + while let Ok(_) = reader.read_line(&mut buf).await { + let message = buf.trim().to_string(); + + if message.is_empty() { + break; + } + + relay_message(&name, &message, &tx); + } + + // Client disconnected, remove from clients and announce leave + { + let mut clients = clients.lock().unwrap(); + clients.remove(&name); + announce_leave(&name, &clients, &tx); + } + } + Err(e) => println!("Failed to read name: {}", e), + } + }); + } +} + +fn is_valid_name(name: &str) -> bool { + !name.is_empty() && name.chars().all(|c| c.is_ascii_alphanumeric()) +} + +fn announce_join(name: &str, clients: &HashMap>, tx: &Sender) { + let message = format!("* {} has entered the room", name); + for client_name in clients.keys() { + if client_name != name { + let _ = tx.send(message.clone()); + } + } +} + +fn announce_leave(name: &str, clients: &HashMap>, tx: &Sender) { + let message = format!("* {} has left the room", name); + for client_name in clients.keys() { + if client_name != name { + let _ = tx.send(message.clone()); + } + } +} + +fn relay_message(name: &str, message: &str, tx: &Sender) { + let message = format!("[{}] {}", name, message); + let _ = tx.send(message); +}