protohackers/problem_06/src/server.rs

153 lines
4.1 KiB
Rust
Raw Normal View History

2023-05-06 05:41:24 +00:00
use crate::{frame::Frame, Connection, Shutdown};
use std::future::Future;
use std::sync::Arc;
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::{broadcast, mpsc, Semaphore};
use tokio::time::{self, Duration};
use tracing::{debug, error, info};
struct Listener {
listener: TcpListener,
limit_connections: Arc<Semaphore>,
notify_shutdown: broadcast::Sender<()>,
shutdown_complete_rx: mpsc::Receiver<()>,
shutdown_complete_tx: mpsc::Sender<()>,
}
struct Handler {
connection: Connection,
shutdown: Shutdown,
_shutdown_complete: mpsc::Sender<()>,
}
2023-05-07 06:32:07 +00:00
const MAX_CONNECTIONS: usize = 1500;
2023-05-06 05:41:24 +00:00
pub async fn run(listener: TcpListener, shutdown: impl Future) -> crate::Result<()> {
let (notify_shutdown, _) = broadcast::channel(1);
let (shutdown_complete_tx, shutdown_complete_rx) = mpsc::channel(1);
let mut server = Listener {
listener,
limit_connections: Arc::new(Semaphore::new(MAX_CONNECTIONS)),
notify_shutdown,
shutdown_complete_tx,
shutdown_complete_rx,
};
tokio::select! {
res = server.run() => {
if let Err(err) = res {
error!(cause = %err, "failed to accept");
}
}
_ = shutdown => {
info!("shutting down");
}
}
let Listener {
mut shutdown_complete_rx,
shutdown_complete_tx,
notify_shutdown,
..
} = server;
drop(notify_shutdown);
drop(shutdown_complete_tx);
let _ = shutdown_complete_rx.recv().await;
Ok(())
}
impl Listener {
async fn run(&mut self) -> crate::Result<()> {
info!("accepting inbound connections");
loop {
let permit = self
.limit_connections
.clone()
.acquire_owned()
.await
.unwrap();
let socket = self.accept().await?;
let mut handler = Handler {
connection: Connection::new(socket),
shutdown: Shutdown::new(self.notify_shutdown.subscribe()),
_shutdown_complete: self.shutdown_complete_tx.clone(),
};
info!("Created new handler");
tokio::spawn(async move {
if let Err(err) = handler.run().await {
error!(cause = ?err, "connection error");
}
drop(permit);
});
}
}
async fn accept(&mut self) -> crate::Result<TcpStream> {
let mut backoff = 1;
loop {
match self.listener.accept().await {
Ok((socket, _)) => return Ok(socket),
Err(err) => {
if backoff > 64 {
return Err(err.into());
}
}
}
time::sleep(Duration::from_secs(backoff)).await;
backoff *= 2;
}
}
}
impl Handler {
async fn run(&mut self) -> crate::Result<()> {
while !self.shutdown.is_shutdown() {
let maybe_frame = tokio::select! {
res = self.connection.read_frame() => res?,
_ = self.shutdown.recv() => {
debug!("Shutdown");
return Ok(());
}
};
let frame = match maybe_frame {
Some(frame) => frame,
None => return Ok(()),
};
2023-05-07 20:41:32 +00:00
match frame {
Frame::Error { msg } => {
info!("Error message: {msg}")
}
Frame::Plate { plate, timestamp } => {
info!("Plate: {plate}, timestamp: {timestamp}");
}
Frame::WantHeartbeat { interval } => {
info!("Want heartbeat: {interval}");
}
Frame::IAmCamera { road, mile, limit } => {
info!("Road: {road}, mile: {mile}, limit: {limit}");
}
Frame::IAmDispatcher { roads } => {
info!("roads: {roads:?}");
}
}
2023-05-06 05:41:24 +00:00
}
Ok(())
}
}