Compare commits

..

10 commits

Author SHA1 Message Date
2734c62139
PASS 2023-05-22 15:02:37 +02:00
641b48b367
Send error 2023-05-22 15:00:59 +02:00
c57c0de2db
Send error on not supported frame 2023-05-22 14:56:07 +02:00
3178c07f84 Solved: Big world test 2023-05-22 12:50:04 +00:00
52f92969b5
Ditch pure async 2023-05-22 14:05:42 +02:00
e7c9ad95e7
Debug 2023-05-22 10:53:24 +02:00
9e7b2af480
Move logic around to find bug 2023-05-22 10:40:33 +02:00
b70dbbb24b
debug 2023-05-22 10:22:24 +02:00
ef131fbf59
Debugging 2023-05-22 10:18:16 +02:00
f62a6e804a
Logging 2023-05-22 10:13:04 +02:00
4 changed files with 112 additions and 104 deletions

View file

@ -8,6 +8,7 @@ use tokio::{
use crate::frame::{self, ClientFrames, ServerFrames}; use crate::frame::{self, ClientFrames, ServerFrames};
#[derive(PartialEq)]
pub(crate) enum ConnectionType { pub(crate) enum ConnectionType {
Camera, Camera,
Dispatcher, Dispatcher,

View file

@ -1,7 +1,6 @@
use std::{ use std::{
collections::{HashMap, HashSet}, collections::{HashMap, HashSet},
net::SocketAddr, net::SocketAddr,
sync::{Arc, Mutex},
}; };
use tokio::sync::mpsc; use tokio::sync::mpsc;
@ -68,17 +67,8 @@ pub(crate) struct Timestamp(pub(crate) u32);
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone)] #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone)]
pub(crate) struct Mile(pub(crate) u16); pub(crate) struct Mile(pub(crate) u16);
pub(crate) struct DbHolder {
db: Db,
}
#[derive(Clone)]
pub(crate) struct Db {
state: Arc<Mutex<State>>,
}
#[derive(Debug)] #[derive(Debug)]
struct State { pub(crate) struct Db {
cameras: HashMap<CameraId, Camera>, cameras: HashMap<CameraId, Camera>,
dispatchers: HashMap<Road, Vec<(DispatcherId, mpsc::Sender<ServerFrames>)>>, dispatchers: HashMap<Road, Vec<(DispatcherId, mpsc::Sender<ServerFrames>)>>,
plates: HashMap<(PlateName, Road), Vec<(Mile, Timestamp)>>, plates: HashMap<(PlateName, Road), Vec<(Mile, Timestamp)>>,
@ -86,51 +76,34 @@ struct State {
open_tickets: HashMap<Road, Vec<Ticket>>, open_tickets: HashMap<Road, Vec<Ticket>>,
} }
impl DbHolder {
pub(crate) fn new() -> DbHolder {
DbHolder { db: Db::new() }
}
pub(crate) fn db(&self) -> Db {
self.db.clone()
}
}
impl Db { impl Db {
pub(crate) fn new() -> Db { pub(crate) fn new() -> Db {
let state = Arc::new(Mutex::new(State { Db {
cameras: HashMap::new(), cameras: HashMap::new(),
dispatchers: HashMap::new(), dispatchers: HashMap::new(),
plates: HashMap::new(), plates: HashMap::new(),
ticketed_plates_by_day: HashSet::new(), ticketed_plates_by_day: HashSet::new(),
open_tickets: HashMap::new(), open_tickets: HashMap::new(),
})); }
Db { state }
} }
pub(crate) fn get_camera(&self, camera_id: CameraId) -> Option<Camera> { pub(crate) fn get_camera(&self, camera_id: CameraId) -> Option<Camera> {
let state = self.state.lock().unwrap(); self.cameras.get(&camera_id).cloned()
state.cameras.get(&camera_id).cloned()
} }
pub(crate) fn add_camera(&self, camera_id: CameraId, camera: Camera) { pub(crate) fn add_camera(&mut self, camera_id: CameraId, camera: Camera) {
let mut state = self.state.lock().unwrap(); self.cameras.insert(camera_id, camera);
state.cameras.insert(camera_id, camera);
} }
pub(crate) fn add_dispatcher( pub(crate) fn add_dispatcher(
&self, &mut self,
dispatcher_id: DispatcherId, dispatcher_id: DispatcherId,
roads: Vec<u16>, roads: Vec<u16>,
writer_stream: mpsc::Sender<ServerFrames>, writer_stream: mpsc::Sender<ServerFrames>,
) { ) {
info!("Adding new dispatcher for raods: {roads:?}"); info!("Adding new dispatcher for roads: {roads:?}");
let mut state = self.state.lock().unwrap();
for r in roads.iter() { for r in roads.iter() {
state self.dispatchers
.dispatchers
.entry(Road(*r)) .entry(Road(*r))
.or_insert(Vec::new()) .or_insert(Vec::new())
.push((dispatcher_id.clone(), writer_stream.clone())); .push((dispatcher_id.clone(), writer_stream.clone()));
@ -138,8 +111,7 @@ impl Db {
} }
pub(crate) fn get_dispatcher_for_road(&self, road: Road) -> Option<mpsc::Sender<ServerFrames>> { pub(crate) fn get_dispatcher_for_road(&self, road: Road) -> Option<mpsc::Sender<ServerFrames>> {
let state = self.state.lock().unwrap(); let senders = self.dispatchers.get(&road);
let senders = state.dispatchers.get(&road);
if senders.is_none() { if senders.is_none() {
return None; return None;
} }
@ -147,26 +119,24 @@ impl Db {
senders.unwrap().first().map(|(_, s)| s.clone()) senders.unwrap().first().map(|(_, s)| s.clone())
} }
pub(crate) fn add_open_ticket(&self, ticket: Ticket) { pub(crate) fn add_open_ticket(&mut self, ticket: Ticket) {
let mut state = self.state.lock().unwrap(); info!("Adding open ticket: {ticket:?}");
state self.open_tickets
.open_tickets
.entry(Road(ticket.road)) .entry(Road(ticket.road))
.or_insert(Vec::new()) .or_insert(Vec::new())
.push(ticket); .push(ticket);
} }
pub(crate) fn get_open_tickets(&self) -> Vec<Ticket> { pub(crate) fn get_open_tickets(&self) -> Vec<Ticket> {
let state = self.state.lock().unwrap(); self.open_tickets.values().flatten().cloned().collect()
state.open_tickets.values().flatten().cloned().collect()
} }
pub(crate) fn remove_open_ticket(&self, road: Road, ticket: Ticket) -> bool { pub(crate) fn remove_open_ticket(&mut self, road: Road, ticket: Ticket) -> bool {
let mut state = self.state.lock().unwrap(); info!("Removing open ticket: {ticket:?}");
if let Some(tickets) = state.open_tickets.get_mut(&road) { if let Some(tickets) = self.open_tickets.get_mut(&road) {
tickets.retain(|t| t.plate != ticket.plate); tickets.retain(|t| t.plate != ticket.plate);
if tickets.is_empty() { if tickets.is_empty() {
state.open_tickets.remove(&road); self.open_tickets.remove(&road);
} }
return true; return true;
} }
@ -178,22 +148,20 @@ impl Db {
plate: Plate, plate: Plate,
road: Road, road: Road,
) -> Option<Vec<(Mile, Timestamp)>> { ) -> Option<Vec<(Mile, Timestamp)>> {
let state = self.state.lock().unwrap(); self.plates.get(&(plate.plate, road)).cloned()
state.plates.get(&(plate.plate, road)).cloned()
} }
pub(crate) fn add_plate(&self, camera_id: CameraId, plate: Plate) { pub(crate) fn add_plate(&mut self, camera_id: CameraId, plate: Plate) {
//TODO: Check if the same plate was already added for the road AND MILE //TODO: Check if the same plate was already added for the road AND MILE
let camera = self.get_camera(camera_id).unwrap(); let camera = self.get_camera(camera_id).unwrap();
let mut state = self.state.lock().unwrap();
match state match self
.plates .plates
.get_mut(&(plate.plate.clone(), camera.road.clone())) .get_mut(&(plate.plate.clone(), camera.road.clone()))
{ {
Some(v) => v.push((camera.mile, plate.timestamp)), Some(v) => v.push((camera.mile, plate.timestamp)),
None => { None => {
state.plates.insert( self.plates.insert(
(plate.clone().plate, camera.road), (plate.clone().plate, camera.road),
vec![(camera.mile, plate.timestamp)], vec![(camera.mile, plate.timestamp)],
); );
@ -201,17 +169,18 @@ impl Db {
} }
} }
pub(crate) fn ticket_plate(&self, day: u32, plate_name: PlateName) { pub(crate) fn ticket_plate(&mut self, day: u32, plate_name: PlateName) {
let mut state = self.state.lock().unwrap(); info!("Add {plate_name:?} for day:{day} ");
state self.ticketed_plates_by_day
.ticketed_plates_by_day
.insert((Timestamp(day), plate_name.0)); .insert((Timestamp(day), plate_name.0));
} }
pub(crate) fn is_plate_ticketed_for_day(&self, day: u32, plate_name: PlateName) -> bool { pub(crate) fn is_plate_ticketed_for_day(&self, day: u32, plate_name: PlateName) -> bool {
let state = self.state.lock().unwrap(); info!(
state "Current ticketed plates, by day: {:?}",
.ticketed_plates_by_day self.ticketed_plates_by_day
);
self.ticketed_plates_by_day
.contains(&(Timestamp(day), plate_name.0)) .contains(&(Timestamp(day), plate_name.0))
} }
} }

View file

@ -2,17 +2,14 @@ use std::{future::Future, sync::Arc};
use tokio::{ use tokio::{
net::{TcpListener, TcpStream}, net::{TcpListener, TcpStream},
sync::{broadcast, mpsc, Semaphore}, sync::{broadcast, mpsc, Mutex, Semaphore},
time::{self, Duration}, time::{self, Duration},
}; };
use tracing::error; use tracing::{error, info};
use crate::{ use crate::{
connection::ConnectionType, connection::ConnectionType,
db::{ db::{Camera, CameraId, Db, DispatcherId, Limit, Mile, Plate, PlateName, Road, Timestamp},
Camera, CameraId, Db, DbHolder, DispatcherId, Limit, Mile, Plate, PlateName, Road,
Timestamp,
},
frame::{ClientFrames, ServerFrames}, frame::{ClientFrames, ServerFrames},
heartbeat::Heartbeat, heartbeat::Heartbeat,
ticketing::{issue_possible_ticket, send_out_waiting_tickets}, ticketing::{issue_possible_ticket, send_out_waiting_tickets},
@ -21,7 +18,7 @@ use crate::{
struct Listener { struct Listener {
listener: TcpListener, listener: TcpListener,
db_holder: DbHolder, db: Arc<Mutex<Db>>,
limit_connections: Arc<Semaphore>, limit_connections: Arc<Semaphore>,
notify_shutdown: broadcast::Sender<()>, notify_shutdown: broadcast::Sender<()>,
shutdown_complete_tx: mpsc::Sender<()>, shutdown_complete_tx: mpsc::Sender<()>,
@ -30,7 +27,7 @@ struct Listener {
struct Handler { struct Handler {
connection: Connection, connection: Connection,
connection_type: Option<ConnectionType>, connection_type: Option<ConnectionType>,
db: Db, db: Arc<Mutex<Db>>,
shutdown: Shutdown, shutdown: Shutdown,
_shutdown_complete: mpsc::Sender<()>, _shutdown_complete: mpsc::Sender<()>,
} }
@ -43,7 +40,7 @@ pub async fn run(listener: TcpListener, shutdown: impl Future) -> crate::Result<
let mut server = Listener { let mut server = Listener {
listener, listener,
db_holder: DbHolder::new(), db: Arc::new(Mutex::new(Db::new())),
limit_connections: Arc::new(Semaphore::new(MAX_CONNECTIONS)), limit_connections: Arc::new(Semaphore::new(MAX_CONNECTIONS)),
notify_shutdown: notify_shutdown.clone(), notify_shutdown: notify_shutdown.clone(),
shutdown_complete_tx, shutdown_complete_tx,
@ -90,7 +87,7 @@ impl Listener {
let mut handler = Handler { let mut handler = Handler {
connection: Connection::new(address, socket), connection: Connection::new(address, socket),
connection_type: None, connection_type: None,
db: self.db_holder.db(), db: self.db.clone(),
shutdown: Shutdown::new(self.notify_shutdown.subscribe()), shutdown: Shutdown::new(self.notify_shutdown.subscribe()),
_shutdown_complete: self.shutdown_complete_tx.clone(), _shutdown_complete: self.shutdown_complete_tx.clone(),
}; };
@ -134,13 +131,17 @@ impl Handler {
while !self.shutdown.is_shutdown() { while !self.shutdown.is_shutdown() {
tokio::select! { tokio::select! {
res = self.connection.read_frame() => { res = self.connection.read_frame() => {
match res? { match res {
Some(frame) => { Ok(Some(frame)) => {
if let Err(e) = self.handle_client_frame(self.db.clone(), frame, send_message.clone()).await { if let Err(e) = self.handle_client_frame(self.db.clone(), frame, send_message.clone()).await {
error!("Error handling frame: {e:?}"); error!("Error handling frame: {e:?}");
} }
}, },
None => return Ok(()), Ok(None) => return Ok(()),
Err(_) => {
let _ = self.connection.write_frame(ServerFrames::Error { msg: "Not supported frame sent".to_string() }).await;
return Ok(());
}
} }
} }
message = receive_message.recv() => { message = receive_message.recv() => {
@ -173,21 +174,33 @@ impl Handler {
async fn handle_client_frame( async fn handle_client_frame(
&mut self, &mut self,
mut db: Db, db: Arc<Mutex<Db>>,
frame: ClientFrames, frame: ClientFrames,
send_message: mpsc::Sender<ServerFrames>, send_message: mpsc::Sender<ServerFrames>,
) -> crate::Result<()> { ) -> crate::Result<()> {
match frame { match frame {
ClientFrames::Plate { plate, timestamp } => { ClientFrames::Plate { plate, timestamp } => {
issue_possible_ticket( if self.connection_type.is_some()
&mut db, && self.connection_type == Some(ConnectionType::Camera)
Plate { {
plate: PlateName(plate.clone()), info!("Receive new plate: {plate} at {timestamp}");
timestamp: Timestamp(timestamp), issue_possible_ticket(
}, db,
CameraId(self.connection.get_address()), Plate {
) plate: PlateName(plate.clone()),
.await; timestamp: Timestamp(timestamp),
},
CameraId(self.connection.get_address()),
)
.await;
} else {
let _ = send_message
.send(ServerFrames::Error {
msg: "Not connected as camera".to_string(),
})
.await;
return Err("Already connected".into());
}
} }
ClientFrames::WantHeartbeat { interval } => { ClientFrames::WantHeartbeat { interval } => {
if interval > 0 { if interval > 0 {
@ -198,12 +211,18 @@ impl Handler {
} }
} }
ClientFrames::IAmCamera { road, mile, limit } => { ClientFrames::IAmCamera { road, mile, limit } => {
info!("Receive new camera: {road} at {mile} with limit {limit}");
if self.connection_type.is_some() { if self.connection_type.is_some() {
let _ = send_message
.send(ServerFrames::Error {
msg: "Already connected as a connection type".to_string(),
})
.await;
return Err("Already connected".into()); return Err("Already connected".into());
} }
self.set_connection_type(ConnectionType::Camera); self.set_connection_type(ConnectionType::Camera);
db.add_camera( db.lock().await.add_camera(
CameraId(self.connection.get_address()), CameraId(self.connection.get_address()),
Camera { Camera {
road: Road(road), road: Road(road),
@ -214,11 +233,16 @@ impl Handler {
} }
ClientFrames::IAmDispatcher { roads } => { ClientFrames::IAmDispatcher { roads } => {
if self.connection_type.is_some() { if self.connection_type.is_some() {
let _ = send_message
.send(ServerFrames::Error {
msg: "Already connected as a connection type".to_string(),
})
.await;
return Err("Already connected".into()); return Err("Already connected".into());
} }
self.set_connection_type(ConnectionType::Dispatcher); self.set_connection_type(ConnectionType::Dispatcher);
db.add_dispatcher( db.lock().await.add_dispatcher(
DispatcherId(self.connection.get_address()), DispatcherId(self.connection.get_address()),
roads.to_vec(), roads.to_vec(),
send_message.clone(), send_message.clone(),

View file

@ -1,8 +1,12 @@
use std::sync::Arc;
use tokio::sync::Mutex;
use tracing::info; use tracing::info;
use crate::db::{CameraId, Db, Plate, Road, Ticket}; use crate::db::{CameraId, Db, Plate, Road, Ticket};
pub(crate) async fn issue_possible_ticket(db: &mut Db, plate: Plate, camera_id: CameraId) { pub(crate) async fn issue_possible_ticket(db: Arc<Mutex<Db>>, plate: Plate, camera_id: CameraId) {
let mut db = db.lock().await;
let camera = db.get_camera(camera_id.clone()).unwrap(); let camera = db.get_camera(camera_id.clone()).unwrap();
let observed_plates = db.get_plates_by_road(plate.clone(), camera.road.clone()); let observed_plates = db.get_plates_by_road(plate.clone(), camera.road.clone());
@ -47,32 +51,42 @@ pub(crate) async fn issue_possible_ticket(db: &mut Db, plate: Plate, camera_id:
let day_start = timestamp1 / 86400; let day_start = timestamp1 / 86400;
let day_end = timestamp2 / 86400; let day_end = timestamp2 / 86400;
let spans_multiple_days = day_start != day_end;
if spans_multiple_days
&& (db.is_plate_ticketed_for_day(day_start, plate_name.clone())
|| db.is_plate_ticketed_for_day(day_end, plate_name.clone()))
{
continue;
}
if db.is_plate_ticketed_for_day(day_start, plate_name.clone()) {
continue;
}
for day in day_start..=day_end { for day in day_start..=day_end {
info!("Day {day} for {ticket:?}"); info!("Ticket for day {day} for {ticket:?}");
if db.is_plate_ticketed_for_day(day, plate_name.clone()) {
info!("Ticket already issued: {ticket:?}");
continue;
}
let dispatcher = db.get_dispatcher_for_road(road.clone());
if dispatcher.is_none() {
info!("No dispatcher yet for this road: {ticket:?}");
db.add_open_ticket(ticket.clone());
continue;
}
info!("Sending ticket: {ticket:?}");
let _ = dispatcher.unwrap().send(ticket.clone().into()).await;
db.ticket_plate(day, plate_name.clone()); db.ticket_plate(day, plate_name.clone());
} }
let dispatcher = db.get_dispatcher_for_road(road.clone());
if dispatcher.is_none() {
info!("No dispatcher yet for this road: {ticket:?}");
db.add_open_ticket(ticket.clone());
continue;
}
info!("Sending ticket: {ticket:?}");
let _ = dispatcher.unwrap().send(ticket.clone().into()).await;
} }
} }
db.add_plate(camera_id, plate); db.add_plate(camera_id, plate);
} }
pub(crate) async fn send_out_waiting_tickets(db: Db) { pub(crate) async fn send_out_waiting_tickets(db: Arc<Mutex<Db>>) {
let mut db = db.lock().await;
let tickets = db.get_open_tickets(); let tickets = db.get_open_tickets();
info!("Sending out waiting tickets: {tickets:?}"); info!("Sending out waiting tickets: {tickets:?}");
for ticket in tickets { for ticket in tickets {