From f7c2292833b9469760413181aab4a7a306a3f54f Mon Sep 17 00:00:00 2001 From: Bastian Gruber Date: Sun, 21 May 2023 20:27:51 +0200 Subject: [PATCH] Fix deadlock --- problem_06/bin/client.rs | 49 +++++++++++++++++++++++++----------- problem_06/src/connection.rs | 1 - problem_06/src/db.rs | 35 ++++++++++++++------------ problem_06/src/server.rs | 23 ++++++++--------- problem_06/src/shutdown.rs | 3 ++- problem_06/src/ticketing.rs | 28 +++++++++++++-------- 6 files changed, 82 insertions(+), 57 deletions(-) diff --git a/problem_06/bin/client.rs b/problem_06/bin/client.rs index 0a5d972..d2ceb5f 100644 --- a/problem_06/bin/client.rs +++ b/problem_06/bin/client.rs @@ -13,7 +13,8 @@ async fn main() -> Result<(), Box> { let (mut read, mut write) = stream.split(); // test_all_different_messages(&mut write).await?; - // test_camera_connection(&mut write).await?; + // test_camera1_connection(&mut write).await?; + // test_camera2_connection(&mut write).await?; test_dipatcher_connection(&mut write).await?; let mut buf: [u8; 4] = [0; 4]; @@ -77,23 +78,43 @@ async fn test_all_different_messages( } #[allow(dead_code)] -async fn test_camera_connection( +async fn test_camera1_connection( write: &mut WriteHalf<'_>, ) -> Result<(), Box> { // 80 IAmCamera{ - // 00 42 road: 66, - // 00 64 mile: 100, + // 00 7b road: 123, + // 00 08 mile: 8, // 00 3c limit: 60, // } - let i_am_camera = [0x80, 0x00, 0x42, 0x00, 0x64, 0x00, 0x3c]; + let i_am_camera = [0x80, 0x00, 0x7b, 0x00, 0x08, 0x00, 0x3c]; // 20 Plate { - // 07 52 45 30 35 42 4b 47 plate: "RE05BKG", - // 00 01 e2 40 timestamp: 123456 + // 04 55 4e 31 58 plate: "UN1X", + // 00 00 00 00 timestamp: 0 // } - let plate = [ - 0x20, 0x07, 0x52, 0x45, 0x30, 0x35, 0x42, 0x4b, 0x47, 0x00, 0x01, 0xe2, 0x40, - ]; + let plate = [0x20, 0x04, 0x55, 0x4e, 0x31, 0x58, 0x00, 0x00, 0x00, 0x00]; + + write.write_all(&i_am_camera).await?; + write.write_all(&plate).await?; + + Ok(()) +} +#[allow(dead_code)] +async fn test_camera2_connection( + write: &mut WriteHalf<'_>, +) -> Result<(), Box> { + // 80 IAmCamera{ + // 00 7b road: 123, + // 00 09 mile: 8, + // 00 3c limit: 60, + // } + let i_am_camera = [0x80, 0x00, 0x7b, 0x00, 0x09, 0x00, 0x3c]; + + // 20 Plate { + // 04 55 4e 31 58 plate: "UN1X", + // 00 00 00 2d timestamp: 45 + // } + let plate = [0x20, 0x04, 0x55, 0x4e, 0x31, 0x58, 0x00, 0x00, 0x00, 0x2d]; write.write_all(&i_am_camera).await?; write.write_all(&plate).await?; @@ -106,14 +127,12 @@ async fn test_dipatcher_connection( write: &mut WriteHalf<'_>, ) -> Result<(), Box> { // 81 IAmDispatcher{ - // 03 roads: [ - // 00 42 66, - // 01 70 368, - // 13 88 5000 + // 01 roads: [ + // 00 7b 123, // ] // } // let i_am_dispatcher = [0x81, 0x03, 0x00, 0x42, 0x01, 0x70, 0x13, 0x88]; - let i_am_dispatcher = [0x81, 0x02, 0x00, 0x7b, 0x00, 0x01]; + let i_am_dispatcher = [0x81, 0x01, 0x00, 0x7b]; write.write_all(&i_am_dispatcher).await?; diff --git a/problem_06/src/connection.rs b/problem_06/src/connection.rs index 19750c8..750e12a 100644 --- a/problem_06/src/connection.rs +++ b/problem_06/src/connection.rs @@ -5,7 +5,6 @@ use tokio::{ io::{AsyncReadExt, AsyncWriteExt, BufWriter}, net::TcpStream, }; - use tracing::info; use crate::frame::{self, ClientFrames, ServerFrames}; diff --git a/problem_06/src/db.rs b/problem_06/src/db.rs index fabdf2c..bac630d 100644 --- a/problem_06/src/db.rs +++ b/problem_06/src/db.rs @@ -5,7 +5,7 @@ use std::{ }; use tokio::sync::mpsc; -use tracing::info; +use tracing::{debug, info}; use crate::frame::ServerFrames; @@ -17,10 +17,13 @@ pub(crate) struct CameraId(pub(crate) SocketAddr); #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone)] pub(crate) struct Plate { - pub(crate) plate: String, + pub(crate) plate: PlateName, pub(crate) timestamp: Timestamp, } +#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone)] +pub(crate) struct PlateName(pub(crate) String); + #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone)] pub(crate) struct Camera { pub(crate) road: Road, @@ -78,7 +81,7 @@ pub(crate) struct Db { struct State { cameras: HashMap, dispatchers: HashMap)>>, - plates: HashMap<(Plate, Road), Vec<(Mile, Timestamp)>>, + plates: HashMap<(PlateName, Road), Vec<(Mile, Timestamp)>>, ticketed_plates_by_day: HashSet<(Timestamp, String)>, open_tickets: HashMap>, } @@ -166,42 +169,42 @@ impl Db { road: Road, ) -> Option> { let state = self.state.lock().unwrap(); - state.plates.get(&(plate, road)).cloned() + debug!(?state); + state.plates.get(&(plate.plate, road)).cloned() } pub(crate) fn add_plate(&self, camera_id: CameraId, plate: Plate) { + //TODO: Check if the same plate was already added for the road AND MILE info!("Add car: {plate:?}"); let camera = self.get_camera(camera_id).unwrap(); + let mut state = self.state.lock().unwrap(); - match self - .state - .lock() - .unwrap() + match state .plates - .get_mut(&(plate.clone(), camera.road.clone())) + .get_mut(&(plate.plate.clone(), camera.road.clone())) { Some(v) => v.push((camera.mile, plate.timestamp)), None => { - self.state.lock().unwrap().plates.insert( - (plate.clone(), camera.road), + state.plates.insert( + (plate.clone().plate, camera.road), vec![(camera.mile, plate.timestamp)], ); } } } - pub(crate) fn ticket_plate(&self, day: u32, plate_name: String) { - info!("Add ticket for day: {day}:{plate_name}"); + pub(crate) fn ticket_plate(&self, day: u32, plate_name: PlateName) { + info!("Add ticket for day: {day}:{}", plate_name.0); let mut state = self.state.lock().unwrap(); state .ticketed_plates_by_day - .insert((Timestamp(day), plate_name)); + .insert((Timestamp(day), plate_name.0)); } - pub(crate) fn is_plate_ticketed_for_day(&self, day: u32, plate_name: String) -> bool { + pub(crate) fn is_plate_ticketed_for_day(&self, day: u32, plate_name: PlateName) -> bool { let state = self.state.lock().unwrap(); state .ticketed_plates_by_day - .contains(&(Timestamp(day), plate_name)) + .contains(&(Timestamp(day), plate_name.0)) } } diff --git a/problem_06/src/server.rs b/problem_06/src/server.rs index 4a9d2eb..6e81c56 100644 --- a/problem_06/src/server.rs +++ b/problem_06/src/server.rs @@ -9,7 +9,10 @@ use tracing::{debug, error, info}; use crate::{ connection::ConnectionType, - db::{Camera, CameraId, Db, DbHolder, DispatcherId, Limit, Mile, Plate, Road, Timestamp}, + db::{ + Camera, CameraId, Db, DbHolder, DispatcherId, Limit, Mile, Plate, PlateName, Road, + Timestamp, + }, frame::{ClientFrames, ServerFrames}, heartbeat::Heartbeat, ticketing::{issue_possible_ticket, send_out_waiting_tickets}, @@ -42,7 +45,7 @@ pub async fn run(listener: TcpListener, shutdown: impl Future) -> crate::Result< listener, db_holder: DbHolder::new(), limit_connections: Arc::new(Semaphore::new(MAX_CONNECTIONS)), - notify_shutdown, + notify_shutdown: notify_shutdown.clone(), shutdown_complete_tx, }; @@ -137,9 +140,11 @@ impl Handler { res = self.connection.read_frame() => { info!("Reading from frame: {res:?}"); match res? { - Some(frame) => { + Some(frame) => { info!("Received frame"); - let _ = self.handle_client_frame(self.db.clone(), frame, send_message.clone()).await; + if let Err(e) = self.handle_client_frame(self.db.clone(), frame, send_message.clone()).await { + error!("Error handling frame: {e:?}"); + } }, None => return Ok(()), } @@ -184,18 +189,10 @@ impl Handler { match frame { ClientFrames::Plate { plate, timestamp } => { info!("Receive new plate {plate} {timestamp}"); - db.add_plate( - CameraId(self.connection.get_address()), - Plate { - plate: plate.clone(), - timestamp: Timestamp(timestamp), - }, - ); - issue_possible_ticket( &mut db, Plate { - plate, + plate: PlateName(plate.clone()), timestamp: Timestamp(timestamp), }, CameraId(self.connection.get_address()), diff --git a/problem_06/src/shutdown.rs b/problem_06/src/shutdown.rs index 3c199b1..3f74413 100644 --- a/problem_06/src/shutdown.rs +++ b/problem_06/src/shutdown.rs @@ -1,4 +1,5 @@ use tokio::sync::broadcast; +use tracing::debug; #[derive(Debug)] pub(crate) struct Shutdown { @@ -22,7 +23,7 @@ impl Shutdown { if self.shutdown { return; } - + debug!("waiting for shutdown"); let _ = self.notify.recv().await; self.shutdown = true; diff --git a/problem_06/src/ticketing.rs b/problem_06/src/ticketing.rs index e75b3d1..a05b80b 100644 --- a/problem_06/src/ticketing.rs +++ b/problem_06/src/ticketing.rs @@ -1,21 +1,25 @@ -use crate::db::{CameraId, Db, Plate, Road, Ticket}; - use tracing::debug; +use crate::db::{CameraId, Db, Plate, Road, Ticket}; + pub(crate) async fn issue_possible_ticket(db: &mut Db, plate: Plate, camera_id: CameraId) { debug!("Issue possible ticket"); - let camera = db.get_camera(camera_id).unwrap(); - let observed_plates = db - .get_plates_by_road(plate.clone(), camera.road.clone()) - .unwrap(); + let camera = db.get_camera(camera_id.clone()).unwrap(); + let observed_plates = db.get_plates_by_road(plate.clone(), camera.road.clone()); + + if observed_plates.is_none() { + debug!("No observed plates"); + db.add_plate(camera_id, plate); + return; + } debug!(?observed_plates, "Observed plates"); let mile = camera.mile; let limit = camera.limit; let road = camera.road; - let plate_name = plate.plate; - let timestamp = plate.timestamp; + let plate_name = plate.clone().plate; + let timestamp = plate.clone().timestamp; debug!( ?plate_name, @@ -25,7 +29,7 @@ pub(crate) async fn issue_possible_ticket(db: &mut Db, plate: Plate, camera_id: ?road, "Checking plate" ); - for (m, t) in observed_plates.iter() { + for (m, t) in observed_plates.unwrap().iter() { let distance = if mile > *m { mile.0 - m.0 } else { @@ -38,12 +42,12 @@ pub(crate) async fn issue_possible_ticket(db: &mut Db, plate: Plate, camera_id: (t.0 - timestamp.0, mile.0, timestamp.0, m.0, t.0) }; - let speed = distance * 3600 * 100 / time as u16; + let speed = (distance as u64 * 3600 * 100 / time as u64) as u16; debug!(?distance, ?time, ?speed, "Checking speed"); if speed > limit.0 * 100 { let ticket = Ticket { - plate: plate_name.clone(), + plate: plate_name.clone().0, road: road.0, mile1, timestamp1, @@ -75,6 +79,8 @@ pub(crate) async fn issue_possible_ticket(db: &mut Db, plate: Plate, camera_id: } } } + + db.add_plate(camera_id, plate); } pub(crate) async fn send_out_waiting_tickets(db: Db) {