From f530a8aa32361a5c9a98ef1bee923dd398a3a8a4 Mon Sep 17 00:00:00 2001 From: Bastian Gruber Date: Fri, 28 Apr 2023 16:44:07 +0200 Subject: [PATCH] Use frames instead of manually framing the messages --- problem_03/Cargo.toml | 2 ++ problem_03/src/server.rs | 56 ++++++++++++---------------------------- 2 files changed, 18 insertions(+), 40 deletions(-) diff --git a/problem_03/Cargo.toml b/problem_03/Cargo.toml index 6a31808..3654eae 100644 --- a/problem_03/Cargo.toml +++ b/problem_03/Cargo.toml @@ -12,6 +12,8 @@ name = "client" path = "src/client.rs" [dependencies] +futures = "0.3.28" tokio = { version = "1.14.0", features = ["full"] } +tokio-util = { version = "0.7.4", features = ["codec"] } tracing = "0.1.38" tracing-subscriber = "0.3.17" diff --git a/problem_03/src/server.rs b/problem_03/src/server.rs index 5ea0b28..16d5aa6 100644 --- a/problem_03/src/server.rs +++ b/problem_03/src/server.rs @@ -1,7 +1,6 @@ -use tokio::{ - io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter}, - net::TcpListener, -}; +use futures::{SinkExt, StreamExt}; +use tokio::net::TcpListener; +use tokio_util::codec::{Framed, LinesCodec}; use tracing::{error, info}; const IP: &str = "0.0.0.0"; @@ -20,53 +19,30 @@ async fn main() -> Result<()> { loop { // Get the TCP stream out of the new connection, and the address from which // it is connected to - let (mut stream, address) = listener.accept().await?; + + let (stream, address) = listener.accept().await?; + let mut framed = Framed::new(stream, LinesCodec::new()); info!("New address connected: {}", address); + let _ = framed.send("You are connected!".to_string()).await; + // We spawn a new task, so every incoming connection can be put on a thread // and be worked on "in the background" // This allows us to handle multiple connections "at the same time" - let _ = stream.write_all("You are connected!\n".as_bytes()).await; tokio::spawn(async move { - // From the stream (TcpStream), we can extract the reading, and the writing part - // So we can read and write to the connected client on this port - let (reader, writer) = stream.split(); - - // So we don't read "directly" on the reader. Therefore we use - // BufReader, which performs large, infrequent reads on the underlying - // AsyncRead instance (reader) - let mut reader = BufReader::new(reader); - - // We do the same for the writing part to the stream - // let mut writer = BufWriter::new(writer); - let mut writer = BufWriter::new(writer); - - // We need to store what we read from the stream in a local buffer/object - let mut line = String::new(); - loop { // We read exactly one line per loop. A line ends with \n. // So if the client doesn't frame their package with \n at the end, // we won't process until we find one. - let _ = match reader.read_line(&mut line).await { - Ok(n) if n == 0 => return, - Ok(n) => n, - Err(e) => { - error!("Error reading: {}", e); - return; + match framed.next().await { + Some(n) => { + if let Err(e) = n { + error!("Error parsing message: {}", e); + } else { + let _ = framed.send(n.unwrap()).await; + } } + None => return, }; - - info!("New client message received: {}", line.trim_end()); - - if let Err(e) = writer.write_all(line.as_bytes()).await { - error!("Error writing: {}", e); - return; - } - - let _ = writer.write_all(&[b'\n']).await; - let _ = writer.flush().await; - - line.clear(); } }); }