Use frames instead of manually framing the messages
This commit is contained in:
parent
9615a18a03
commit
f530a8aa32
2 changed files with 18 additions and 40 deletions
|
|
@ -12,6 +12,8 @@ name = "client"
|
|||
path = "src/client.rs"
|
||||
|
||||
[dependencies]
|
||||
futures = "0.3.28"
|
||||
tokio = { version = "1.14.0", features = ["full"] }
|
||||
tokio-util = { version = "0.7.4", features = ["codec"] }
|
||||
tracing = "0.1.38"
|
||||
tracing-subscriber = "0.3.17"
|
||||
|
|
|
|||
|
|
@ -1,7 +1,6 @@
|
|||
use tokio::{
|
||||
io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter},
|
||||
net::TcpListener,
|
||||
};
|
||||
use futures::{SinkExt, StreamExt};
|
||||
use tokio::net::TcpListener;
|
||||
use tokio_util::codec::{Framed, LinesCodec};
|
||||
use tracing::{error, info};
|
||||
|
||||
const IP: &str = "0.0.0.0";
|
||||
|
|
@ -20,53 +19,30 @@ async fn main() -> Result<()> {
|
|||
loop {
|
||||
// Get the TCP stream out of the new connection, and the address from which
|
||||
// it is connected to
|
||||
let (mut stream, address) = listener.accept().await?;
|
||||
|
||||
let (stream, address) = listener.accept().await?;
|
||||
let mut framed = Framed::new(stream, LinesCodec::new());
|
||||
info!("New address connected: {}", address);
|
||||
let _ = framed.send("You are connected!".to_string()).await;
|
||||
|
||||
// We spawn a new task, so every incoming connection can be put on a thread
|
||||
// and be worked on "in the background"
|
||||
// This allows us to handle multiple connections "at the same time"
|
||||
let _ = stream.write_all("You are connected!\n".as_bytes()).await;
|
||||
tokio::spawn(async move {
|
||||
// From the stream (TcpStream), we can extract the reading, and the writing part
|
||||
// So we can read and write to the connected client on this port
|
||||
let (reader, writer) = stream.split();
|
||||
|
||||
// So we don't read "directly" on the reader. Therefore we use
|
||||
// BufReader, which performs large, infrequent reads on the underlying
|
||||
// AsyncRead instance (reader)
|
||||
let mut reader = BufReader::new(reader);
|
||||
|
||||
// We do the same for the writing part to the stream
|
||||
// let mut writer = BufWriter::new(writer);
|
||||
let mut writer = BufWriter::new(writer);
|
||||
|
||||
// We need to store what we read from the stream in a local buffer/object
|
||||
let mut line = String::new();
|
||||
|
||||
loop {
|
||||
// We read exactly one line per loop. A line ends with \n.
|
||||
// So if the client doesn't frame their package with \n at the end,
|
||||
// we won't process until we find one.
|
||||
let _ = match reader.read_line(&mut line).await {
|
||||
Ok(n) if n == 0 => return,
|
||||
Ok(n) => n,
|
||||
Err(e) => {
|
||||
error!("Error reading: {}", e);
|
||||
return;
|
||||
match framed.next().await {
|
||||
Some(n) => {
|
||||
if let Err(e) = n {
|
||||
error!("Error parsing message: {}", e);
|
||||
} else {
|
||||
let _ = framed.send(n.unwrap()).await;
|
||||
}
|
||||
}
|
||||
None => return,
|
||||
};
|
||||
|
||||
info!("New client message received: {}", line.trim_end());
|
||||
|
||||
if let Err(e) = writer.write_all(line.as_bytes()).await {
|
||||
error!("Error writing: {}", e);
|
||||
return;
|
||||
}
|
||||
|
||||
let _ = writer.write_all(&[b'\n']).await;
|
||||
let _ = writer.flush().await;
|
||||
|
||||
line.clear();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue