use crate::{ db::{Db, DbHolder}, frame::{ClientFrames, ServerFrames}, ticketing, Connection, Shutdown, }; use std::future::Future; use std::sync::Arc; use tokio::net::{TcpListener, TcpStream}; use tokio::sync::{broadcast, mpsc, Semaphore}; use tokio::time::{self, Duration}; use tracing::{debug, error, info}; struct Listener { listener: TcpListener, db_holder: DbHolder, limit_connections: Arc, notify_shutdown: broadcast::Sender<()>, shutdown_complete_tx: mpsc::Sender<()>, } struct Handler { connection: Connection, receive_ticket: broadcast::Receiver, send_ticket: broadcast::Sender, connection_type: Option, db: Db, shutdown: Shutdown, _shutdown_complete: mpsc::Sender<()>, } const MAX_CONNECTIONS: usize = 1500; pub async fn run(listener: TcpListener, shutdown: impl Future) -> crate::Result<()> { let (notify_shutdown, _) = broadcast::channel(1); let (shutdown_complete_tx, mut shutdown_complete_rx) = mpsc::channel(1); let mut server = Listener { listener, db_holder: DbHolder::new(), limit_connections: Arc::new(Semaphore::new(MAX_CONNECTIONS)), notify_shutdown, shutdown_complete_tx, }; tokio::select! { res = server.run() => { if let Err(err) = res { error!(cause = %err, "failed to accept"); } } _ = shutdown => { info!("shutting down"); } } let Listener { shutdown_complete_tx, notify_shutdown, .. } = server; drop(notify_shutdown); drop(shutdown_complete_tx); let _ = shutdown_complete_rx.recv().await; Ok(()) } impl Listener { async fn run(&mut self) -> crate::Result<()> { info!("accepting inbound connections"); let (send_ticket, _): ( broadcast::Sender, broadcast::Receiver, ) = broadcast::channel(4096); loop { let permit = self .limit_connections .clone() .acquire_owned() .await .unwrap(); let socket = self.accept().await?; let mut handler = Handler { connection: Connection::new(socket), send_ticket: send_ticket.clone(), receive_ticket: send_ticket.subscribe(), connection_type: None, db: self.db_holder.db(), shutdown: Shutdown::new(self.notify_shutdown.subscribe()), _shutdown_complete: self.shutdown_complete_tx.clone(), }; info!("Created new handler"); tokio::spawn(async move { if let Err(err) = handler.run().await { error!(cause = ?err, "connection error"); } drop(permit); }); } } async fn accept(&mut self) -> crate::Result { let mut backoff = 1; loop { match self.listener.accept().await { Ok((socket, _)) => return Ok(socket), Err(err) => { if backoff > 64 { return Err(err.into()); } } } time::sleep(Duration::from_secs(backoff)).await; backoff *= 2; } } } impl Handler { async fn run(&mut self) -> crate::Result<()> { while !self.shutdown.is_shutdown() { let maybe_frame = tokio::select! { res = self.connection.read_frame() => res?, _ = self.shutdown.recv() => { debug!("Shutdown"); return Ok(()); } message = self.receive_ticket.recv() => { match message { Ok(message) => { match message { ServerFrames::Ticket { .. } => { let _ = self.connection.write_frame(message).await; } _ => () } }, Err(_) => return Ok(()), } None } }; let frame = match maybe_frame { Some(frame) => frame, None => return Ok(()), }; match frame { ClientFrames::Plate { plate, timestamp } => { info!("Insert {plate} and {timestamp}"); self.db.insert_plate(plate, timestamp); } ClientFrames::WantHeartbeat { interval } => { info!("Want heartbeat: {interval}"); } ClientFrames::IAmCamera { road, mile, limit } => { if self.connection_type.is_some() { return Err("Already connected".into()); } self.set_connection_type(ticketing::ClientType::Camera(road, mile)); info!("Road: {road}, mile: {mile}, limit: {limit}"); } ClientFrames::IAmDispatcher { roads } => { if self.connection_type.is_some() { return Err("Already connected".into()); } self.set_connection_type(ticketing::ClientType::Dispatcher); self.db .add_dispatcher(roads.to_vec(), self.send_ticket.clone()); } } } Ok(()) } fn set_connection_type(&mut self, connection_type: ticketing::ClientType) { match connection_type { ticketing::ClientType::Camera(_, _) => { self.connection_type = Some(connection_type); } ticketing::ClientType::Dispatcher => { self.connection_type = Some(connection_type); } } } }