Back to simple TCP echo server impl and starting from first principles
This commit is contained in:
parent
7dcc1bc4d6
commit
9615a18a03
3 changed files with 87 additions and 128 deletions
2
.gitignore
vendored
2
.gitignore
vendored
|
|
@ -8,3 +8,5 @@ Cargo.lock
|
|||
|
||||
# These are backup files generated by rustfmt
|
||||
**/*.rs.bk
|
||||
|
||||
.idea
|
||||
|
|
|
|||
|
|
@ -1,62 +1,56 @@
|
|||
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
|
||||
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter};
|
||||
use tokio::net::TcpStream;
|
||||
use tokio::sync::mpsc::channel;
|
||||
use tokio::task;
|
||||
use tracing::{debug, info, error};
|
||||
use tracing::{error, info};
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
tracing_subscriber::fmt::try_init().expect("Tracing was not setup");
|
||||
|
||||
let stream = TcpStream::connect("127.0.0.1:8080").await?;
|
||||
let stream = TcpStream::connect("0.0.0.0:1222").await?;
|
||||
|
||||
let (tx, mut rx) = channel::<String>(10);
|
||||
let (reader, writer) = tokio::io::split(stream);
|
||||
let mut buf_reader = BufReader::new(reader);
|
||||
let mut writer = BufWriter::new(writer);
|
||||
|
||||
let (mut reader, mut writer) = tokio::io::split(stream);
|
||||
|
||||
let tx_clone = tx.clone();
|
||||
|
||||
task::spawn(async move {
|
||||
let mut reader = BufReader::new(&mut reader);
|
||||
let server_handle = task::spawn(async move {
|
||||
let mut buf = String::new();
|
||||
|
||||
loop {
|
||||
info!("Inside reading lines from server loop");
|
||||
let mut buf = String::new();
|
||||
if let Ok(n) = reader.read_line(&mut buf).await {
|
||||
if let Ok(n) = buf_reader.read_line(&mut buf).await {
|
||||
if n > 0 {
|
||||
println!("{}", buf.trim_end());
|
||||
info!("Receivng from server: {}", buf.trim_end());
|
||||
} else {
|
||||
break;
|
||||
info!("Server is finished sending, break");
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
break;
|
||||
error!("Cannot receive");
|
||||
return;
|
||||
}
|
||||
|
||||
buf.clear();
|
||||
}
|
||||
tx_clone.send("exit".to_string()).await.unwrap();
|
||||
});
|
||||
|
||||
loop {
|
||||
info!("Inside read from std::io loop");
|
||||
let mut buf = String::new();
|
||||
std::io::stdin().read_line(&mut buf)?;
|
||||
let std_handle = tokio::spawn(async move {
|
||||
let mut stdin_reader = BufReader::new(tokio::io::stdin()).lines();
|
||||
while let Ok(Some(line)) = stdin_reader.next_line().await {
|
||||
info!("Received line from stdin: {}", line);
|
||||
|
||||
let buf = buf.trim_end().to_string();
|
||||
info!("New line: {}", buf);
|
||||
if buf.to_lowercase() == "exit" {
|
||||
break;
|
||||
}
|
||||
debug!(?buf);
|
||||
if let Err(_) = writer.write_all(buf.as_bytes()).await {
|
||||
error!("Could not sent");
|
||||
break;
|
||||
}
|
||||
|
||||
if let Some(msg) = rx.recv().await {
|
||||
if msg.to_lowercase() == "exit" {
|
||||
if let Err(_) = writer.write_all(line.as_bytes()).await {
|
||||
error!("Error reading from std");
|
||||
break;
|
||||
}
|
||||
|
||||
let _ = writer.write_all(&[b'\n']).await;
|
||||
let _ = writer.flush().await;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let _ = server_handle.await;
|
||||
let _ = std_handle.await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,110 +1,73 @@
|
|||
use std::collections::HashMap;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
|
||||
use tokio::net::TcpListener;
|
||||
use tokio::sync::broadcast::{self, Sender};
|
||||
use tracing::{debug, info, error};
|
||||
use tokio::{
|
||||
io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter},
|
||||
net::TcpListener,
|
||||
};
|
||||
use tracing::{error, info};
|
||||
|
||||
const IP: &str = "0.0.0.0";
|
||||
const PORT: u16 = 1222;
|
||||
|
||||
type Error = Box<dyn std::error::Error + Send + Sync>;
|
||||
type Result<T> = std::result::Result<T, Error>;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
async fn main() -> Result<()> {
|
||||
tracing_subscriber::fmt::try_init().expect("Tracing was not setup");
|
||||
|
||||
let listener = TcpListener::bind("0.0.0.0:1222").await?;
|
||||
info!("Start listening on 0.0.0.0:1222");
|
||||
|
||||
let clients = Arc::new(Mutex::new(HashMap::new()));
|
||||
let (tx, _) = broadcast::channel(10);
|
||||
|
||||
let listener = TcpListener::bind(format!("{IP}:{PORT}")).await?;
|
||||
info!("Listening on: {}", format!("{IP}:{PORT}"));
|
||||
// Infinite loop to always listen to new connections on this IP/PORT
|
||||
loop {
|
||||
let (mut socket, _addr) = listener.accept().await?;
|
||||
let clients = clients.clone();
|
||||
let tx = tx.clone();
|
||||
// let mut stream = stream?;
|
||||
|
||||
// Get the TCP stream out of the new connection, and the address from which
|
||||
// it is connected to
|
||||
let (mut stream, address) = listener.accept().await?;
|
||||
info!("New address connected: {}", address);
|
||||
// We spawn a new task, so every incoming connection can be put on a thread
|
||||
// and be worked on "in the background"
|
||||
// This allows us to handle multiple connections "at the same time"
|
||||
let _ = stream.write_all("You are connected!\n".as_bytes()).await;
|
||||
tokio::spawn(async move {
|
||||
let (mut reader, mut writer) = socket.split();
|
||||
let mut buf = String::new();
|
||||
let mut reader = BufReader::new(&mut reader);
|
||||
// From the stream (TcpStream), we can extract the reading, and the writing part
|
||||
// So we can read and write to the connected client on this port
|
||||
let (reader, writer) = stream.split();
|
||||
|
||||
// Request the user's name
|
||||
if let Err(e) = writer
|
||||
.write_all(b"Welcome to budgetchat! What shall I call you?\n")
|
||||
.await
|
||||
{
|
||||
println!("Failed to send name request: {}", e);
|
||||
return;
|
||||
}
|
||||
// So we don't read "directly" on the reader. Therefore we use
|
||||
// BufReader, which performs large, infrequent reads on the underlying
|
||||
// AsyncRead instance (reader)
|
||||
let mut reader = BufReader::new(reader);
|
||||
|
||||
// Get the user's name
|
||||
match reader.read_line(&mut buf).await {
|
||||
Ok(_) => {
|
||||
let name = buf.trim().to_string();
|
||||
info!("Receiving name: {}", name);
|
||||
if !is_valid_name(&name) {
|
||||
if let Err(e) = writer
|
||||
.write_all(b"Invalid name. Connection closed.\n")
|
||||
.await
|
||||
{
|
||||
println!("Failed to send error message: {}", e);
|
||||
}
|
||||
// We do the same for the writing part to the stream
|
||||
// let mut writer = BufWriter::new(writer);
|
||||
let mut writer = BufWriter::new(writer);
|
||||
|
||||
// We need to store what we read from the stream in a local buffer/object
|
||||
let mut line = String::new();
|
||||
|
||||
loop {
|
||||
// We read exactly one line per loop. A line ends with \n.
|
||||
// So if the client doesn't frame their package with \n at the end,
|
||||
// we won't process until we find one.
|
||||
let _ = match reader.read_line(&mut line).await {
|
||||
Ok(n) if n == 0 => return,
|
||||
Ok(n) => n,
|
||||
Err(e) => {
|
||||
error!("Error reading: {}", e);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let (client_tx, _client_rx) = broadcast::channel(10);
|
||||
info!("New client message received: {}", line.trim_end());
|
||||
|
||||
{
|
||||
let mut clients = clients.lock().unwrap();
|
||||
announce_join(&name, &clients, &tx);
|
||||
clients.insert(name.clone(), client_tx);
|
||||
}
|
||||
|
||||
// Relay messages to other clients
|
||||
while let Ok(_) = reader.read_line(&mut buf).await {
|
||||
let message = buf.trim().to_string();
|
||||
|
||||
if message.is_empty() {
|
||||
break;
|
||||
}
|
||||
|
||||
relay_message(&name, &message, &tx);
|
||||
}
|
||||
|
||||
// Client disconnected, remove from clients and announce leave
|
||||
{
|
||||
let mut clients = clients.lock().unwrap();
|
||||
clients.remove(&name);
|
||||
announce_leave(&name, &clients, &tx);
|
||||
}
|
||||
if let Err(e) = writer.write_all(line.as_bytes()).await {
|
||||
error!("Error writing: {}", e);
|
||||
return;
|
||||
}
|
||||
Err(e) => println!("Failed to read name: {}", e),
|
||||
|
||||
let _ = writer.write_all(&[b'\n']).await;
|
||||
let _ = writer.flush().await;
|
||||
|
||||
line.clear();
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
fn is_valid_name(name: &str) -> bool {
|
||||
!name.is_empty() && name.chars().all(|c| c.is_ascii_alphanumeric())
|
||||
}
|
||||
|
||||
fn announce_join(name: &str, clients: &HashMap<String, Sender<String>>, tx: &Sender<String>) {
|
||||
let message = format!("* {} has entered the room", name);
|
||||
for client_name in clients.keys() {
|
||||
if client_name != name {
|
||||
let _ = tx.send(message.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn announce_leave(name: &str, clients: &HashMap<String, Sender<String>>, tx: &Sender<String>) {
|
||||
let message = format!("* {} has left the room", name);
|
||||
for client_name in clients.keys() {
|
||||
if client_name != name {
|
||||
let _ = tx.send(message.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn relay_message(name: &str, message: &str, tx: &Sender<String>) {
|
||||
let message = format!("[{}] {}", name, message);
|
||||
let _ = tx.send(message);
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue