Extend the basic server, debug with broadcast

This commit is contained in:
Bastian Gruber 2023-04-29 09:30:10 +02:00
parent f530a8aa32
commit 7e90fae353
No known key found for this signature in database
GPG key ID: BE9F8C772B188CBF
7 changed files with 186 additions and 23 deletions

View file

@ -9,9 +9,10 @@ path = "src/server.rs"
[[bin]]
name = "client"
path = "src/client.rs"
path = "bin/client.rs"
[dependencies]
anyhow = "1.0.70"
futures = "0.3.28"
tokio = { version = "1.14.0", features = ["full"] }
tokio-util = { version = "0.7.4", features = ["codec"] }

View file

@ -22,7 +22,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
if n > 0 {
info!("Receivng from server: {}", buf.trim_end());
} else {
info!("Server is finished sending, break");
info!("Server is finished sending: {}", n);
return;
}
} else {

15
problem_03/bin/server.rs Normal file
View file

@ -0,0 +1,15 @@
// use problem_03::{server, DEFAULT_PORT};
// use tokio::net::TcpListener;
// use tokio::signal;
// #[tokio::main]
// pub async fn main() -> problem_03::Result<()> {
// tracing_subscriber::fmt::try_init()?;
// let listener = TcpListener::bind(&format!("0.0.0.0:{}", DEFAULT_PORT)).await?;
// server::run(listener, signal::ctrl_c()).await?;
// Ok(())
// }

View file

@ -0,0 +1,39 @@
use futures::{SinkExt, StreamExt};
use tokio::net::TcpStream;
use tokio_util::codec::{Framed, LinesCodec};
use tracing::{debug, error, info};
#[derive(Debug)]
pub struct Connection {
stream: Framed<TcpStream, LinesCodec>,
}
impl Connection {
pub fn new(socket: TcpStream) -> Connection {
Connection {
stream: Framed::new(socket, LinesCodec::new()),
}
}
pub async fn read_frame(&mut self) -> crate::Result<Option<String>> {
loop {
info!("Read next frame");
if let Some(Ok(frame)) = self.stream.next().await {
info!("Frame parsed");
return Ok(Some(frame));
} else {
return Err("connection reset by peer".into());
}
}
}
pub async fn write_frame(&mut self, response: String) -> crate::Result<()> {
debug!(?response);
if let Err(e) = self.stream.send(response.clone()).await {
error!("Could not write frame to stream");
return Err(e.to_string().into());
}
info!("Wrote to frame: {}", response);
Ok(())
}
}

12
problem_03/src/lib.rs Normal file
View file

@ -0,0 +1,12 @@
mod connection;
pub use connection::Connection;
pub mod server;
mod shutdown;
use shutdown::Shutdown;
pub const DEFAULT_PORT: u16 = 1222;
pub type Error = Box<dyn std::error::Error + Send + Sync>;
pub type Result<T> = std::result::Result<T, Error>;

View file

@ -1,5 +1,12 @@
use futures::{SinkExt, StreamExt};
use futures::{stream::StreamExt, SinkExt};
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::{Arc, Mutex};
use tokio::net::TcpListener;
use tokio::sync::{
broadcast,
broadcast::{Receiver, Sender},
};
use tokio_util::codec::{Framed, LinesCodec};
use tracing::{error, info};
@ -9,41 +16,100 @@ const PORT: u16 = 1222;
type Error = Box<dyn std::error::Error + Send + Sync>;
type Result<T> = std::result::Result<T, Error>;
type Username = String;
type Address = SocketAddr;
#[derive(Clone, Debug, Default)]
struct Users(Arc<Mutex<HashMap<Username, Address>>>);
#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt::try_init().expect("Tracing was not setup");
let listener = TcpListener::bind(format!("{IP}:{PORT}")).await?;
info!("Listening on: {}", format!("{IP}:{PORT}"));
let (tx, _) = broadcast::channel(256);
let db = Users::default();
// Infinite loop to always listen to new connections on this IP/PORT
loop {
// Get the TCP stream out of the new connection, and the address from which
// it is connected to
let (stream, address) = listener.accept().await?;
let mut framed = Framed::new(stream, LinesCodec::new());
info!("New address connected: {}", address);
let _ = framed.send("You are connected!".to_string()).await;
let (tx, mut rx) = (tx.clone(), tx.subscribe());
let db = db.clone();
// We spawn a new task, so every incoming connection can be put on a thread
// and be worked on "in the background"
// This allows us to handle multiple connections "at the same time"
tokio::spawn(async move {
let mut framed = Framed::new(stream, LinesCodec::new());
info!("New address connected: {address}");
let _ = framed
.send("Welcome to budgetchat! What shall I call you?".to_string())
.await;
let mut name = String::default();
// We read exactly one line per loop. A line ends with \n.
// So if the client doesn't frame their package with \n at the end,
// we won't process until we find one.
match framed.next().await {
Some(Ok(username)) => {
name = username.clone();
db.0.lock().unwrap().insert(username.clone(), address);
let message = compose_message(username.clone(), db.clone());
info!("Adding username: {username} to db");
let _ = framed.send(message).await;
info!("Send message to client");
let _ = tx.send(format!("* {} has entered the room", username));
}
Some(Err(e)) => {
error!("Error parsing message: {e}");
}
None => {
info!("No frame");
}
}
loop {
// We read exactly one line per loop. A line ends with \n.
// So if the client doesn't frame their package with \n at the end,
// we won't process until we find one.
match framed.next().await {
Some(n) => {
if let Err(e) = n {
error!("Error parsing message: {}", e);
} else {
let _ = framed.send(n.unwrap()).await;
tokio::select! {
n = framed.next() => {
match n {
Some(Ok(n)) => {
// broadcast message to all clients except the one who sent it
info!("Receiving new chat message: {n}");
let _ = tx.send(format!("[{}]: {}", name, n));
}
Some(Err(e)) => {
error!("Error receiving chat message: {e}");
}
None => {
// Connection dropped
// remove client from db etc.
// send leave message
info!("No next frame");
let _ = tx.send(format!("* {} has left the room", name));
break;
}
}
}
None => return,
};
message = rx.recv() => {
info!("Broadcast received: {:?}", message.clone().unwrap());
let _ = framed.send(message.unwrap()).await;
}
}
}
});
}
}
fn compose_message(name: String, db: Users) -> String {
format!(
"* The room contains: {}",
db.0.lock()
.unwrap()
.keys()
.filter(|n| n.as_str() != name)
.map(|n| n.to_string())
.collect::<Vec<_>>()
.join(", ")
)
}

View file

@ -0,0 +1,30 @@
use tokio::sync::broadcast;
#[derive(Debug)]
pub(crate) struct Shutdown {
shutdown: bool,
notify: broadcast::Receiver<()>,
}
impl Shutdown {
pub(crate) fn new(notify: broadcast::Receiver<()>) -> Shutdown {
Shutdown {
shutdown: false,
notify,
}
}
pub(crate) fn is_shutdown(&self) -> bool {
self.shutdown
}
pub(crate) async fn recv(&mut self) {
if self.shutdown {
return;
}
let _ = self.notify.recv().await;
self.shutdown = true;
}
}