Fix deadlock

This commit is contained in:
Bastian Gruber 2023-05-21 20:27:51 +02:00
parent 86cfe13bac
commit f7c2292833
No known key found for this signature in database
GPG key ID: BE9F8C772B188CBF
6 changed files with 82 additions and 57 deletions

View file

@ -13,7 +13,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let (mut read, mut write) = stream.split(); let (mut read, mut write) = stream.split();
// test_all_different_messages(&mut write).await?; // test_all_different_messages(&mut write).await?;
// test_camera_connection(&mut write).await?; // test_camera1_connection(&mut write).await?;
// test_camera2_connection(&mut write).await?;
test_dipatcher_connection(&mut write).await?; test_dipatcher_connection(&mut write).await?;
let mut buf: [u8; 4] = [0; 4]; let mut buf: [u8; 4] = [0; 4];
@ -77,23 +78,43 @@ async fn test_all_different_messages(
} }
#[allow(dead_code)] #[allow(dead_code)]
async fn test_camera_connection( async fn test_camera1_connection(
write: &mut WriteHalf<'_>, write: &mut WriteHalf<'_>,
) -> Result<(), Box<dyn std::error::Error>> { ) -> Result<(), Box<dyn std::error::Error>> {
// 80 IAmCamera{ // 80 IAmCamera{
// 00 42 road: 66, // 00 7b road: 123,
// 00 64 mile: 100, // 00 08 mile: 8,
// 00 3c limit: 60, // 00 3c limit: 60,
// } // }
let i_am_camera = [0x80, 0x00, 0x42, 0x00, 0x64, 0x00, 0x3c]; let i_am_camera = [0x80, 0x00, 0x7b, 0x00, 0x08, 0x00, 0x3c];
// 20 Plate { // 20 Plate {
// 07 52 45 30 35 42 4b 47 plate: "RE05BKG", // 04 55 4e 31 58 plate: "UN1X",
// 00 01 e2 40 timestamp: 123456 // 00 00 00 00 timestamp: 0
// } // }
let plate = [ let plate = [0x20, 0x04, 0x55, 0x4e, 0x31, 0x58, 0x00, 0x00, 0x00, 0x00];
0x20, 0x07, 0x52, 0x45, 0x30, 0x35, 0x42, 0x4b, 0x47, 0x00, 0x01, 0xe2, 0x40,
]; write.write_all(&i_am_camera).await?;
write.write_all(&plate).await?;
Ok(())
}
#[allow(dead_code)]
async fn test_camera2_connection(
write: &mut WriteHalf<'_>,
) -> Result<(), Box<dyn std::error::Error>> {
// 80 IAmCamera{
// 00 7b road: 123,
// 00 09 mile: 8,
// 00 3c limit: 60,
// }
let i_am_camera = [0x80, 0x00, 0x7b, 0x00, 0x09, 0x00, 0x3c];
// 20 Plate {
// 04 55 4e 31 58 plate: "UN1X",
// 00 00 00 2d timestamp: 45
// }
let plate = [0x20, 0x04, 0x55, 0x4e, 0x31, 0x58, 0x00, 0x00, 0x00, 0x2d];
write.write_all(&i_am_camera).await?; write.write_all(&i_am_camera).await?;
write.write_all(&plate).await?; write.write_all(&plate).await?;
@ -106,14 +127,12 @@ async fn test_dipatcher_connection(
write: &mut WriteHalf<'_>, write: &mut WriteHalf<'_>,
) -> Result<(), Box<dyn std::error::Error>> { ) -> Result<(), Box<dyn std::error::Error>> {
// 81 IAmDispatcher{ // 81 IAmDispatcher{
// 03 roads: [ // 01 roads: [
// 00 42 66, // 00 7b 123,
// 01 70 368,
// 13 88 5000
// ] // ]
// } // }
// let i_am_dispatcher = [0x81, 0x03, 0x00, 0x42, 0x01, 0x70, 0x13, 0x88]; // let i_am_dispatcher = [0x81, 0x03, 0x00, 0x42, 0x01, 0x70, 0x13, 0x88];
let i_am_dispatcher = [0x81, 0x02, 0x00, 0x7b, 0x00, 0x01]; let i_am_dispatcher = [0x81, 0x01, 0x00, 0x7b];
write.write_all(&i_am_dispatcher).await?; write.write_all(&i_am_dispatcher).await?;

View file

@ -5,7 +5,6 @@ use tokio::{
io::{AsyncReadExt, AsyncWriteExt, BufWriter}, io::{AsyncReadExt, AsyncWriteExt, BufWriter},
net::TcpStream, net::TcpStream,
}; };
use tracing::info; use tracing::info;
use crate::frame::{self, ClientFrames, ServerFrames}; use crate::frame::{self, ClientFrames, ServerFrames};

View file

@ -5,7 +5,7 @@ use std::{
}; };
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tracing::info; use tracing::{debug, info};
use crate::frame::ServerFrames; use crate::frame::ServerFrames;
@ -17,10 +17,13 @@ pub(crate) struct CameraId(pub(crate) SocketAddr);
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone)] #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone)]
pub(crate) struct Plate { pub(crate) struct Plate {
pub(crate) plate: String, pub(crate) plate: PlateName,
pub(crate) timestamp: Timestamp, pub(crate) timestamp: Timestamp,
} }
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone)]
pub(crate) struct PlateName(pub(crate) String);
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone)] #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone)]
pub(crate) struct Camera { pub(crate) struct Camera {
pub(crate) road: Road, pub(crate) road: Road,
@ -78,7 +81,7 @@ pub(crate) struct Db {
struct State { struct State {
cameras: HashMap<CameraId, Camera>, cameras: HashMap<CameraId, Camera>,
dispatchers: HashMap<Road, Vec<(DispatcherId, mpsc::Sender<ServerFrames>)>>, dispatchers: HashMap<Road, Vec<(DispatcherId, mpsc::Sender<ServerFrames>)>>,
plates: HashMap<(Plate, Road), Vec<(Mile, Timestamp)>>, plates: HashMap<(PlateName, Road), Vec<(Mile, Timestamp)>>,
ticketed_plates_by_day: HashSet<(Timestamp, String)>, ticketed_plates_by_day: HashSet<(Timestamp, String)>,
open_tickets: HashMap<Road, Vec<Ticket>>, open_tickets: HashMap<Road, Vec<Ticket>>,
} }
@ -166,42 +169,42 @@ impl Db {
road: Road, road: Road,
) -> Option<Vec<(Mile, Timestamp)>> { ) -> Option<Vec<(Mile, Timestamp)>> {
let state = self.state.lock().unwrap(); let state = self.state.lock().unwrap();
state.plates.get(&(plate, road)).cloned() debug!(?state);
state.plates.get(&(plate.plate, road)).cloned()
} }
pub(crate) fn add_plate(&self, camera_id: CameraId, plate: Plate) { pub(crate) fn add_plate(&self, camera_id: CameraId, plate: Plate) {
//TODO: Check if the same plate was already added for the road AND MILE
info!("Add car: {plate:?}"); info!("Add car: {plate:?}");
let camera = self.get_camera(camera_id).unwrap(); let camera = self.get_camera(camera_id).unwrap();
let mut state = self.state.lock().unwrap();
match self match state
.state
.lock()
.unwrap()
.plates .plates
.get_mut(&(plate.clone(), camera.road.clone())) .get_mut(&(plate.plate.clone(), camera.road.clone()))
{ {
Some(v) => v.push((camera.mile, plate.timestamp)), Some(v) => v.push((camera.mile, plate.timestamp)),
None => { None => {
self.state.lock().unwrap().plates.insert( state.plates.insert(
(plate.clone(), camera.road), (plate.clone().plate, camera.road),
vec![(camera.mile, plate.timestamp)], vec![(camera.mile, plate.timestamp)],
); );
} }
} }
} }
pub(crate) fn ticket_plate(&self, day: u32, plate_name: String) { pub(crate) fn ticket_plate(&self, day: u32, plate_name: PlateName) {
info!("Add ticket for day: {day}:{plate_name}"); info!("Add ticket for day: {day}:{}", plate_name.0);
let mut state = self.state.lock().unwrap(); let mut state = self.state.lock().unwrap();
state state
.ticketed_plates_by_day .ticketed_plates_by_day
.insert((Timestamp(day), plate_name)); .insert((Timestamp(day), plate_name.0));
} }
pub(crate) fn is_plate_ticketed_for_day(&self, day: u32, plate_name: String) -> bool { pub(crate) fn is_plate_ticketed_for_day(&self, day: u32, plate_name: PlateName) -> bool {
let state = self.state.lock().unwrap(); let state = self.state.lock().unwrap();
state state
.ticketed_plates_by_day .ticketed_plates_by_day
.contains(&(Timestamp(day), plate_name)) .contains(&(Timestamp(day), plate_name.0))
} }
} }

View file

@ -9,7 +9,10 @@ use tracing::{debug, error, info};
use crate::{ use crate::{
connection::ConnectionType, connection::ConnectionType,
db::{Camera, CameraId, Db, DbHolder, DispatcherId, Limit, Mile, Plate, Road, Timestamp}, db::{
Camera, CameraId, Db, DbHolder, DispatcherId, Limit, Mile, Plate, PlateName, Road,
Timestamp,
},
frame::{ClientFrames, ServerFrames}, frame::{ClientFrames, ServerFrames},
heartbeat::Heartbeat, heartbeat::Heartbeat,
ticketing::{issue_possible_ticket, send_out_waiting_tickets}, ticketing::{issue_possible_ticket, send_out_waiting_tickets},
@ -42,7 +45,7 @@ pub async fn run(listener: TcpListener, shutdown: impl Future) -> crate::Result<
listener, listener,
db_holder: DbHolder::new(), db_holder: DbHolder::new(),
limit_connections: Arc::new(Semaphore::new(MAX_CONNECTIONS)), limit_connections: Arc::new(Semaphore::new(MAX_CONNECTIONS)),
notify_shutdown, notify_shutdown: notify_shutdown.clone(),
shutdown_complete_tx, shutdown_complete_tx,
}; };
@ -137,9 +140,11 @@ impl Handler {
res = self.connection.read_frame() => { res = self.connection.read_frame() => {
info!("Reading from frame: {res:?}"); info!("Reading from frame: {res:?}");
match res? { match res? {
Some(frame) => { Some(frame) => {
info!("Received frame"); info!("Received frame");
let _ = self.handle_client_frame(self.db.clone(), frame, send_message.clone()).await; if let Err(e) = self.handle_client_frame(self.db.clone(), frame, send_message.clone()).await {
error!("Error handling frame: {e:?}");
}
}, },
None => return Ok(()), None => return Ok(()),
} }
@ -184,18 +189,10 @@ impl Handler {
match frame { match frame {
ClientFrames::Plate { plate, timestamp } => { ClientFrames::Plate { plate, timestamp } => {
info!("Receive new plate {plate} {timestamp}"); info!("Receive new plate {plate} {timestamp}");
db.add_plate(
CameraId(self.connection.get_address()),
Plate {
plate: plate.clone(),
timestamp: Timestamp(timestamp),
},
);
issue_possible_ticket( issue_possible_ticket(
&mut db, &mut db,
Plate { Plate {
plate, plate: PlateName(plate.clone()),
timestamp: Timestamp(timestamp), timestamp: Timestamp(timestamp),
}, },
CameraId(self.connection.get_address()), CameraId(self.connection.get_address()),

View file

@ -1,4 +1,5 @@
use tokio::sync::broadcast; use tokio::sync::broadcast;
use tracing::debug;
#[derive(Debug)] #[derive(Debug)]
pub(crate) struct Shutdown { pub(crate) struct Shutdown {
@ -22,7 +23,7 @@ impl Shutdown {
if self.shutdown { if self.shutdown {
return; return;
} }
debug!("waiting for shutdown");
let _ = self.notify.recv().await; let _ = self.notify.recv().await;
self.shutdown = true; self.shutdown = true;

View file

@ -1,21 +1,25 @@
use crate::db::{CameraId, Db, Plate, Road, Ticket};
use tracing::debug; use tracing::debug;
use crate::db::{CameraId, Db, Plate, Road, Ticket};
pub(crate) async fn issue_possible_ticket(db: &mut Db, plate: Plate, camera_id: CameraId) { pub(crate) async fn issue_possible_ticket(db: &mut Db, plate: Plate, camera_id: CameraId) {
debug!("Issue possible ticket"); debug!("Issue possible ticket");
let camera = db.get_camera(camera_id).unwrap(); let camera = db.get_camera(camera_id.clone()).unwrap();
let observed_plates = db let observed_plates = db.get_plates_by_road(plate.clone(), camera.road.clone());
.get_plates_by_road(plate.clone(), camera.road.clone())
.unwrap(); if observed_plates.is_none() {
debug!("No observed plates");
db.add_plate(camera_id, plate);
return;
}
debug!(?observed_plates, "Observed plates"); debug!(?observed_plates, "Observed plates");
let mile = camera.mile; let mile = camera.mile;
let limit = camera.limit; let limit = camera.limit;
let road = camera.road; let road = camera.road;
let plate_name = plate.plate; let plate_name = plate.clone().plate;
let timestamp = plate.timestamp; let timestamp = plate.clone().timestamp;
debug!( debug!(
?plate_name, ?plate_name,
@ -25,7 +29,7 @@ pub(crate) async fn issue_possible_ticket(db: &mut Db, plate: Plate, camera_id:
?road, ?road,
"Checking plate" "Checking plate"
); );
for (m, t) in observed_plates.iter() { for (m, t) in observed_plates.unwrap().iter() {
let distance = if mile > *m { let distance = if mile > *m {
mile.0 - m.0 mile.0 - m.0
} else { } else {
@ -38,12 +42,12 @@ pub(crate) async fn issue_possible_ticket(db: &mut Db, plate: Plate, camera_id:
(t.0 - timestamp.0, mile.0, timestamp.0, m.0, t.0) (t.0 - timestamp.0, mile.0, timestamp.0, m.0, t.0)
}; };
let speed = distance * 3600 * 100 / time as u16; let speed = (distance as u64 * 3600 * 100 / time as u64) as u16;
debug!(?distance, ?time, ?speed, "Checking speed"); debug!(?distance, ?time, ?speed, "Checking speed");
if speed > limit.0 * 100 { if speed > limit.0 * 100 {
let ticket = Ticket { let ticket = Ticket {
plate: plate_name.clone(), plate: plate_name.clone().0,
road: road.0, road: road.0,
mile1, mile1,
timestamp1, timestamp1,
@ -75,6 +79,8 @@ pub(crate) async fn issue_possible_ticket(db: &mut Db, plate: Plate, camera_id:
} }
} }
} }
db.add_plate(camera_id, plate);
} }
pub(crate) async fn send_out_waiting_tickets(db: Db) { pub(crate) async fn send_out_waiting_tickets(db: Db) {