use crate::{frame::ClientFrames, Connection, Shutdown}; use std::future::Future; use std::sync::Arc; use tokio::net::{TcpListener, TcpStream}; use tokio::sync::{broadcast, mpsc, Semaphore}; use tokio::time::{self, Duration}; use tracing::{debug, error, info}; struct Listener { db_holder: DbDropGuard, listener: TcpListener, limit_connections: Arc, notify_shutdown: broadcast::Sender<()>, shutdown_complete_rx: mpsc::Receiver<()>, shutdown_complete_tx: mpsc::Sender<()>, } struct Handler { connection: Connection, shutdown: Shutdown, _shutdown_complete: mpsc::Sender<()>, } const MAX_CONNECTIONS: usize = 1500; pub async fn run(listener: TcpListener, shutdown: impl Future) -> crate::Result<()> { let (notify_shutdown, _) = broadcast::channel(1); let (shutdown_complete_tx, shutdown_complete_rx) = mpsc::channel(1); let mut server = Listener { listener, limit_connections: Arc::new(Semaphore::new(MAX_CONNECTIONS)), notify_shutdown, shutdown_complete_tx, shutdown_complete_rx, }; tokio::select! { res = server.run() => { if let Err(err) = res { error!(cause = %err, "failed to accept"); } } _ = shutdown => { info!("shutting down"); } } let Listener { mut shutdown_complete_rx, shutdown_complete_tx, notify_shutdown, .. } = server; drop(notify_shutdown); drop(shutdown_complete_tx); let _ = shutdown_complete_rx.recv().await; Ok(()) } impl Listener { async fn run(&mut self) -> crate::Result<()> { info!("accepting inbound connections"); loop { let permit = self .limit_connections .clone() .acquire_owned() .await .unwrap(); let socket = self.accept().await?; let mut handler = Handler { connection: Connection::new(socket), shutdown: Shutdown::new(self.notify_shutdown.subscribe()), _shutdown_complete: self.shutdown_complete_tx.clone(), }; info!("Created new handler"); tokio::spawn(async move { if let Err(err) = handler.run().await { error!(cause = ?err, "connection error"); } drop(permit); }); } } async fn accept(&mut self) -> crate::Result { let mut backoff = 1; loop { match self.listener.accept().await { Ok((socket, _)) => return Ok(socket), Err(err) => { if backoff > 64 { return Err(err.into()); } } } time::sleep(Duration::from_secs(backoff)).await; backoff *= 2; } } } impl Handler { async fn run(&mut self) -> crate::Result<()> { while !self.shutdown.is_shutdown() { let maybe_frame = tokio::select! { res = self.connection.read_frame() => res?, _ = self.shutdown.recv() => { debug!("Shutdown"); return Ok(()); } }; let frame = match maybe_frame { Some(frame) => frame, None => return Ok(()), }; match frame { ClientFrames::Plate { plate, timestamp } => { info!("Plate: {plate}, timestamp: {timestamp}"); } ClientFrames::WantHeartbeat { interval } => { info!("Want heartbeat: {interval}"); } ClientFrames::IAmCamera { road, mile, limit } => { info!("Road: {road}, mile: {mile}, limit: {limit}"); } ClientFrames::IAmDispatcher { roads } => { info!("roads: {roads:?}"); } } } Ok(()) } }