From 52f92969b559e3082b5c3531f875f4fb7f6b3ac3 Mon Sep 17 00:00:00 2001 From: Bastian Gruber Date: Mon, 22 May 2023 14:05:42 +0200 Subject: [PATCH] Ditch pure async --- problem_06/src/db.rs | 84 ++++++++++--------------------------- problem_06/src/server.rs | 23 +++++----- problem_06/src/ticketing.rs | 9 +++- 3 files changed, 40 insertions(+), 76 deletions(-) diff --git a/problem_06/src/db.rs b/problem_06/src/db.rs index 358f6dd..f943892 100644 --- a/problem_06/src/db.rs +++ b/problem_06/src/db.rs @@ -1,7 +1,6 @@ use std::{ collections::{HashMap, HashSet}, net::SocketAddr, - sync::{Arc, Mutex}, }; use tokio::sync::mpsc; @@ -68,17 +67,8 @@ pub(crate) struct Timestamp(pub(crate) u32); #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone)] pub(crate) struct Mile(pub(crate) u16); -pub(crate) struct DbHolder { - db: Db, -} - -#[derive(Clone)] -pub(crate) struct Db { - state: Arc>, -} - #[derive(Debug)] -struct State { +pub(crate) struct Db { cameras: HashMap, dispatchers: HashMap)>>, plates: HashMap<(PlateName, Road), Vec<(Mile, Timestamp)>>, @@ -86,51 +76,34 @@ struct State { open_tickets: HashMap>, } -impl DbHolder { - pub(crate) fn new() -> DbHolder { - DbHolder { db: Db::new() } - } - - pub(crate) fn db(&self) -> Db { - self.db.clone() - } -} - impl Db { pub(crate) fn new() -> Db { - let state = Arc::new(Mutex::new(State { + Db { cameras: HashMap::new(), dispatchers: HashMap::new(), plates: HashMap::new(), ticketed_plates_by_day: HashSet::new(), open_tickets: HashMap::new(), - })); - - Db { state } + } } pub(crate) fn get_camera(&self, camera_id: CameraId) -> Option { - let state = self.state.lock().unwrap(); - state.cameras.get(&camera_id).cloned() + self.cameras.get(&camera_id).cloned() } - pub(crate) fn add_camera(&self, camera_id: CameraId, camera: Camera) { - let mut state = self.state.lock().unwrap(); - state.cameras.insert(camera_id, camera); + pub(crate) fn add_camera(&mut self, camera_id: CameraId, camera: Camera) { + self.cameras.insert(camera_id, camera); } pub(crate) fn add_dispatcher( - &self, + &mut self, dispatcher_id: DispatcherId, roads: Vec, writer_stream: mpsc::Sender, ) { info!("Adding new dispatcher for roads: {roads:?}"); - let mut state = self.state.lock().unwrap(); - for r in roads.iter() { - state - .dispatchers + self.dispatchers .entry(Road(*r)) .or_insert(Vec::new()) .push((dispatcher_id.clone(), writer_stream.clone())); @@ -138,8 +111,7 @@ impl Db { } pub(crate) fn get_dispatcher_for_road(&self, road: Road) -> Option> { - let state = self.state.lock().unwrap(); - let senders = state.dispatchers.get(&road); + let senders = self.dispatchers.get(&road); if senders.is_none() { return None; } @@ -147,28 +119,24 @@ impl Db { senders.unwrap().first().map(|(_, s)| s.clone()) } - pub(crate) fn add_open_ticket(&self, ticket: Ticket) { - let mut state = self.state.lock().unwrap(); + pub(crate) fn add_open_ticket(&mut self, ticket: Ticket) { info!("Adding open ticket: {ticket:?}"); - state - .open_tickets + self.open_tickets .entry(Road(ticket.road)) .or_insert(Vec::new()) .push(ticket); } pub(crate) fn get_open_tickets(&self) -> Vec { - let state = self.state.lock().unwrap(); - state.open_tickets.values().flatten().cloned().collect() + self.open_tickets.values().flatten().cloned().collect() } - pub(crate) fn remove_open_ticket(&self, road: Road, ticket: Ticket) -> bool { + pub(crate) fn remove_open_ticket(&mut self, road: Road, ticket: Ticket) -> bool { info!("Removing open ticket: {ticket:?}"); - let mut state = self.state.lock().unwrap(); - if let Some(tickets) = state.open_tickets.get_mut(&road) { + if let Some(tickets) = self.open_tickets.get_mut(&road) { tickets.retain(|t| t.plate != ticket.plate); if tickets.is_empty() { - state.open_tickets.remove(&road); + self.open_tickets.remove(&road); } return true; } @@ -180,22 +148,20 @@ impl Db { plate: Plate, road: Road, ) -> Option> { - let state = self.state.lock().unwrap(); - state.plates.get(&(plate.plate, road)).cloned() + self.plates.get(&(plate.plate, road)).cloned() } - pub(crate) fn add_plate(&self, camera_id: CameraId, plate: Plate) { + pub(crate) fn add_plate(&mut self, camera_id: CameraId, plate: Plate) { //TODO: Check if the same plate was already added for the road AND MILE let camera = self.get_camera(camera_id).unwrap(); - let mut state = self.state.lock().unwrap(); - match state + match self .plates .get_mut(&(plate.plate.clone(), camera.road.clone())) { Some(v) => v.push((camera.mile, plate.timestamp)), None => { - state.plates.insert( + self.plates.insert( (plate.clone().plate, camera.road), vec![(camera.mile, plate.timestamp)], ); @@ -203,22 +169,18 @@ impl Db { } } - pub(crate) fn ticket_plate(&self, day: u32, plate_name: PlateName) { + pub(crate) fn ticket_plate(&mut self, day: u32, plate_name: PlateName) { info!("Add {plate_name:?} for day:{day} "); - let mut state = self.state.lock().unwrap(); - state - .ticketed_plates_by_day + self.ticketed_plates_by_day .insert((Timestamp(day), plate_name.0)); } pub(crate) fn is_plate_ticketed_for_day(&self, day: u32, plate_name: PlateName) -> bool { - let state = self.state.lock().unwrap(); info!( "Current ticketed plates, by day: {:?}", - state.ticketed_plates_by_day + self.ticketed_plates_by_day ); - state - .ticketed_plates_by_day + self.ticketed_plates_by_day .contains(&(Timestamp(day), plate_name.0)) } } diff --git a/problem_06/src/server.rs b/problem_06/src/server.rs index 7dbcd60..c086557 100644 --- a/problem_06/src/server.rs +++ b/problem_06/src/server.rs @@ -2,17 +2,14 @@ use std::{future::Future, sync::Arc}; use tokio::{ net::{TcpListener, TcpStream}, - sync::{broadcast, mpsc, Semaphore}, + sync::{broadcast, mpsc, Mutex, Semaphore}, time::{self, Duration}, }; use tracing::{error, info}; use crate::{ connection::ConnectionType, - db::{ - Camera, CameraId, Db, DbHolder, DispatcherId, Limit, Mile, Plate, PlateName, Road, - Timestamp, - }, + db::{Camera, CameraId, Db, DispatcherId, Limit, Mile, Plate, PlateName, Road, Timestamp}, frame::{ClientFrames, ServerFrames}, heartbeat::Heartbeat, ticketing::{issue_possible_ticket, send_out_waiting_tickets}, @@ -21,7 +18,7 @@ use crate::{ struct Listener { listener: TcpListener, - db_holder: DbHolder, + db: Arc>, limit_connections: Arc, notify_shutdown: broadcast::Sender<()>, shutdown_complete_tx: mpsc::Sender<()>, @@ -30,7 +27,7 @@ struct Listener { struct Handler { connection: Connection, connection_type: Option, - db: Db, + db: Arc>, shutdown: Shutdown, _shutdown_complete: mpsc::Sender<()>, } @@ -43,7 +40,7 @@ pub async fn run(listener: TcpListener, shutdown: impl Future) -> crate::Result< let mut server = Listener { listener, - db_holder: DbHolder::new(), + db: Arc::new(Mutex::new(Db::new())), limit_connections: Arc::new(Semaphore::new(MAX_CONNECTIONS)), notify_shutdown: notify_shutdown.clone(), shutdown_complete_tx, @@ -90,7 +87,7 @@ impl Listener { let mut handler = Handler { connection: Connection::new(address, socket), connection_type: None, - db: self.db_holder.db(), + db: self.db.clone(), shutdown: Shutdown::new(self.notify_shutdown.subscribe()), _shutdown_complete: self.shutdown_complete_tx.clone(), }; @@ -173,7 +170,7 @@ impl Handler { async fn handle_client_frame( &mut self, - mut db: Db, + db: Arc>, frame: ClientFrames, send_message: mpsc::Sender, ) -> crate::Result<()> { @@ -181,7 +178,7 @@ impl Handler { ClientFrames::Plate { plate, timestamp } => { info!("Receive new plate: {plate} at {timestamp}"); issue_possible_ticket( - &mut db, + db, Plate { plate: PlateName(plate.clone()), timestamp: Timestamp(timestamp), @@ -205,7 +202,7 @@ impl Handler { } self.set_connection_type(ConnectionType::Camera); - db.add_camera( + db.lock().await.add_camera( CameraId(self.connection.get_address()), Camera { road: Road(road), @@ -220,7 +217,7 @@ impl Handler { } self.set_connection_type(ConnectionType::Dispatcher); - db.add_dispatcher( + db.lock().await.add_dispatcher( DispatcherId(self.connection.get_address()), roads.to_vec(), send_message.clone(), diff --git a/problem_06/src/ticketing.rs b/problem_06/src/ticketing.rs index 9abafe3..ab6ac65 100644 --- a/problem_06/src/ticketing.rs +++ b/problem_06/src/ticketing.rs @@ -1,8 +1,12 @@ +use std::sync::Arc; + +use tokio::sync::Mutex; use tracing::info; use crate::db::{CameraId, Db, Plate, Road, Ticket}; -pub(crate) async fn issue_possible_ticket(db: &mut Db, plate: Plate, camera_id: CameraId) { +pub(crate) async fn issue_possible_ticket(db: Arc>, plate: Plate, camera_id: CameraId) { + let mut db = db.lock().await; let camera = db.get_camera(camera_id.clone()).unwrap(); let observed_plates = db.get_plates_by_road(plate.clone(), camera.road.clone()); @@ -72,7 +76,8 @@ pub(crate) async fn issue_possible_ticket(db: &mut Db, plate: Plate, camera_id: db.add_plate(camera_id, plate); } -pub(crate) async fn send_out_waiting_tickets(db: Db) { +pub(crate) async fn send_out_waiting_tickets(db: Arc>) { + let mut db = db.lock().await; let tickets = db.get_open_tickets(); info!("Sending out waiting tickets: {tickets:?}"); for ticket in tickets {