Start ticketing function

This commit is contained in:
Bastian Gruber 2023-05-20 20:16:09 +02:00
parent 0fdc85a630
commit cfdc4e0e53
No known key found for this signature in database
GPG key ID: BE9F8C772B188CBF
4 changed files with 189 additions and 18 deletions

View file

@ -1,5 +1,5 @@
use std::{ use std::{
collections::HashMap, collections::{HashMap, HashSet},
net::SocketAddr, net::SocketAddr,
sync::{Arc, Mutex}, sync::{Arc, Mutex},
}; };
@ -7,8 +7,6 @@ use std::{
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tracing::debug; use tracing::debug;
use crate::frame::ServerFrames;
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone)] #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone)]
pub(crate) struct DispatcherId(pub(crate) SocketAddr); pub(crate) struct DispatcherId(pub(crate) SocketAddr);
@ -18,18 +16,38 @@ pub(crate) struct CameraId(pub(crate) SocketAddr);
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone)] #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone)]
pub(crate) struct Plate { pub(crate) struct Plate {
pub(crate) plate: String, pub(crate) plate: String,
pub(crate) timestamp: u32, pub(crate) timestamp: Timestamp,
} }
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone)] #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone)]
pub(crate) struct Camera { pub(crate) struct Camera {
pub(crate) road: u16, pub(crate) road: Road,
pub(crate) mile: u16, pub(crate) mile: Mile,
pub(crate) limit: u16, pub(crate) limit: Limit,
} }
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone)] #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone)]
pub(crate) struct Road(u16); pub(crate) struct Ticket {
pub(crate) plate: String,
pub(crate) road: u16,
pub(crate) mile1: u16,
pub(crate) timestamp1: u32,
pub(crate) mile2: u16,
pub(crate) timestamp2: u32,
pub(crate) speed: u16,
}
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone)]
pub(crate) struct Road(pub(crate) u16);
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone)]
pub(crate) struct Limit(pub(crate) u16);
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone)]
pub(crate) struct Timestamp(pub(crate) u32);
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone)]
pub(crate) struct Mile(pub(crate) u16);
pub(crate) struct DbHolder { pub(crate) struct DbHolder {
db: Db, db: Db,
@ -43,8 +61,10 @@ pub(crate) struct Db {
#[derive(Debug)] #[derive(Debug)]
struct State { struct State {
cameras: HashMap<CameraId, Camera>, cameras: HashMap<CameraId, Camera>,
dispatchers: HashMap<Road, (DispatcherId, mpsc::Sender<ServerFrames>)>, dispatchers: HashMap<Road, Vec<(DispatcherId, mpsc::Sender<Ticket>)>>,
plates: HashMap<CameraId, Plate>, plates: HashMap<(Plate, Road), Vec<(Mile, Timestamp)>>,
ticketed_plates_by_day: HashSet<(Timestamp, String)>,
open_tickets: HashMap<Road, Vec<Ticket>>,
} }
impl DbHolder { impl DbHolder {
@ -63,11 +83,18 @@ impl Db {
cameras: HashMap::new(), cameras: HashMap::new(),
dispatchers: HashMap::new(), dispatchers: HashMap::new(),
plates: HashMap::new(), plates: HashMap::new(),
ticketed_plates_by_day: HashSet::new(),
open_tickets: HashMap::new(),
})); }));
Db { state } Db { state }
} }
pub(crate) fn get_camera(&self, camera_id: CameraId) -> Option<Camera> {
let state = self.state.lock().unwrap();
state.cameras.get(&camera_id).cloned()
}
pub(crate) fn add_camera(&self, camera_id: CameraId, camera: Camera) { pub(crate) fn add_camera(&self, camera_id: CameraId, camera: Camera) {
let mut state = self.state.lock().unwrap(); let mut state = self.state.lock().unwrap();
state.cameras.insert(camera_id, camera); state.cameras.insert(camera_id, camera);
@ -78,22 +105,85 @@ impl Db {
&self, &self,
dispatcher_id: DispatcherId, dispatcher_id: DispatcherId,
roads: Vec<u16>, roads: Vec<u16>,
writer_stream: mpsc::Sender<ServerFrames>, writer_stream: mpsc::Sender<Ticket>,
) { ) {
let mut state = self.state.lock().unwrap(); let mut state = self.state.lock().unwrap();
for r in roads.iter() { for r in roads.iter() {
state state
.dispatchers .dispatchers
.insert(Road(*r), (dispatcher_id.clone(), writer_stream.clone())); .entry(Road(*r))
.or_insert(Vec::new())
.push((dispatcher_id.clone(), writer_stream.clone()));
} }
debug!(?state); debug!(?state);
} }
pub(crate) fn insert_plate(&self, camera_id: CameraId, plate: Plate) { pub(crate) fn get_dispatcher_for_road(&self, road: Road) -> Option<mpsc::Sender<Ticket>> {
let state = self.state.lock().unwrap();
let senders = state.dispatchers.get(&road);
if senders.is_none() {
return None;
}
senders.unwrap().first().map(|(_, s)| s.clone())
}
pub(crate) fn add_open_ticket(&self, ticket: Ticket) {
let mut state = self.state.lock().unwrap(); let mut state = self.state.lock().unwrap();
state.plates.insert(camera_id, plate); state
.open_tickets
.entry(Road(ticket.road))
.or_insert(Vec::new())
.push(ticket);
}
pub(crate) fn get_plates_by_road(
&self,
plate: Plate,
road: Road,
) -> Option<Vec<(Mile, Timestamp)>> {
let state = self.state.lock().unwrap();
state.plates.get(&(plate, road)).cloned()
}
pub(crate) fn add_plate(&self, camera_id: CameraId, plate: Plate) {
let state = self.state.lock().unwrap();
let camera = self.get_camera(camera_id).unwrap();
match self
.state
.lock()
.unwrap()
.plates
.get_mut(&(plate.clone(), camera.road.clone()))
{
Some(v) => v.push((camera.mile, plate.timestamp)),
None => {
self.state.lock().unwrap().plates.insert(
(plate.clone(), camera.road),
vec![(camera.mile, plate.timestamp)],
);
}
}
debug!(?state); debug!(?state);
} }
pub(crate) fn ticket_plate(&self, day: u32, plate_name: String) {
let mut state = self.state.lock().unwrap();
state
.ticketed_plates_by_day
.insert((Timestamp(day), plate_name));
debug!(?state);
}
pub(crate) fn is_plate_ticketed_for_day(&self, day: u32, plate_name: String) -> bool {
let state = self.state.lock().unwrap();
state
.ticketed_plates_by_day
.contains(&(Timestamp(day), plate_name))
// debug!(?state);
}
} }

View file

@ -8,6 +8,8 @@ pub mod db;
pub mod server; pub mod server;
pub mod ticketing;
mod shutdown; mod shutdown;
use shutdown::Shutdown; use shutdown::Shutdown;

View file

@ -9,8 +9,9 @@ use tracing::{debug, error, info};
use crate::{ use crate::{
connection::ConnectionType, connection::ConnectionType,
db::{Camera, CameraId, Db, DbHolder, DispatcherId, Plate}, db::{Camera, CameraId, Db, DbHolder, DispatcherId, Limit, Mile, Plate, Road, Timestamp},
frame::{ClientFrames, ServerFrames}, frame::{ClientFrames, ServerFrames},
ticketing::issue_possible_ticket,
Connection, Shutdown, Connection, Shutdown,
}; };
@ -173,7 +174,7 @@ impl Handler {
async fn handle_client_frame( async fn handle_client_frame(
&mut self, &mut self,
db: Db, mut db: Db,
frame: ClientFrames, frame: ClientFrames,
send_message: mpsc::Sender<ServerFrames>, send_message: mpsc::Sender<ServerFrames>,
) -> crate::Result<()> { ) -> crate::Result<()> {
@ -182,7 +183,19 @@ impl Handler {
info!("Receive new plate {plate} {timestamp}"); info!("Receive new plate {plate} {timestamp}");
db.insert_plate( db.insert_plate(
CameraId(self.connection.get_address()), CameraId(self.connection.get_address()),
Plate { plate, timestamp }, Plate {
plate: plate.clone(),
timestamp: Timestamp(timestamp),
},
);
issue_possible_ticket(
&mut db,
Plate {
plate,
timestamp: Timestamp(timestamp),
},
CameraId(self.connection.get_address()),
); );
} }
ClientFrames::WantHeartbeat { interval } => { ClientFrames::WantHeartbeat { interval } => {
@ -197,7 +210,11 @@ impl Handler {
db.add_camera( db.add_camera(
CameraId(self.connection.get_address()), CameraId(self.connection.get_address()),
Camera { road, mile, limit }, Camera {
road: Road(road),
mile: Mile(mile),
limit: Limit(limit),
},
); );
} }
ClientFrames::IAmDispatcher { roads } => { ClientFrames::IAmDispatcher { roads } => {

View file

@ -0,0 +1,62 @@
use crate::db::{CameraId, Db, Plate, Ticket};
pub(crate) fn issue_possible_ticket(db: &mut Db, plate: Plate, camera_id: CameraId) {
let camera = db.get_camera(camera_id).unwrap();
let observed_plates = db
.get_plates_by_road(plate.clone(), camera.road.clone())
.unwrap();
let mile = camera.mile;
let limit = camera.limit;
let road = camera.road;
let plate_name = plate.plate;
let timestamp = plate.timestamp;
for (m, t) in observed_plates.iter() {
let distance = if mile > *m {
mile.0 - m.0
} else {
m.0 - mile.0
};
let (time, mile1, timestamp1, mile2, timestamp2) = if timestamp > *t {
(timestamp.0 - t.0, m.0, t.0, mile.0, timestamp.0)
} else {
(t.0 - timestamp.0, mile.0, timestamp.0, m.0, t.0)
};
let speed = distance * 3600 * 100 / time as u16;
if speed > limit.0 * 100 {
let ticket = Ticket {
plate: plate_name.clone(),
road: road.0,
mile1,
timestamp1,
mile2,
timestamp2,
speed,
};
let day_start = timestamp1 / 86400;
let day_end = timestamp2 / 86400;
for day in day_start..=day_end {
if db.is_plate_ticketed_for_day(day, plate_name.clone()) {
continue;
}
let dispatcher = db.get_dispatcher_for_road(road.clone());
if dispatcher.is_none() {
db.add_open_ticket(ticket);
continue;
}
dispatcher.unwrap().send(ticket).await;
db.ticket_plate(day, plate_name.clone());
}
}
}
}