Compare commits
No commits in common. "2734c621391c9d573cd76f3f493671594ee77c17" and "34c6343d632315cf968eddfcd4e206bc84a866ae" have entirely different histories.
2734c62139
...
34c6343d63
4 changed files with 104 additions and 112 deletions
|
|
@ -8,7 +8,6 @@ use tokio::{
|
|||
|
||||
use crate::frame::{self, ClientFrames, ServerFrames};
|
||||
|
||||
#[derive(PartialEq)]
|
||||
pub(crate) enum ConnectionType {
|
||||
Camera,
|
||||
Dispatcher,
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
use std::{
|
||||
collections::{HashMap, HashSet},
|
||||
net::SocketAddr,
|
||||
sync::{Arc, Mutex},
|
||||
};
|
||||
|
||||
use tokio::sync::mpsc;
|
||||
|
|
@ -67,8 +68,17 @@ pub(crate) struct Timestamp(pub(crate) u32);
|
|||
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone)]
|
||||
pub(crate) struct Mile(pub(crate) u16);
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct DbHolder {
|
||||
db: Db,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct Db {
|
||||
state: Arc<Mutex<State>>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct State {
|
||||
cameras: HashMap<CameraId, Camera>,
|
||||
dispatchers: HashMap<Road, Vec<(DispatcherId, mpsc::Sender<ServerFrames>)>>,
|
||||
plates: HashMap<(PlateName, Road), Vec<(Mile, Timestamp)>>,
|
||||
|
|
@ -76,34 +86,51 @@ pub(crate) struct Db {
|
|||
open_tickets: HashMap<Road, Vec<Ticket>>,
|
||||
}
|
||||
|
||||
impl DbHolder {
|
||||
pub(crate) fn new() -> DbHolder {
|
||||
DbHolder { db: Db::new() }
|
||||
}
|
||||
|
||||
pub(crate) fn db(&self) -> Db {
|
||||
self.db.clone()
|
||||
}
|
||||
}
|
||||
|
||||
impl Db {
|
||||
pub(crate) fn new() -> Db {
|
||||
Db {
|
||||
let state = Arc::new(Mutex::new(State {
|
||||
cameras: HashMap::new(),
|
||||
dispatchers: HashMap::new(),
|
||||
plates: HashMap::new(),
|
||||
ticketed_plates_by_day: HashSet::new(),
|
||||
open_tickets: HashMap::new(),
|
||||
}
|
||||
}));
|
||||
|
||||
Db { state }
|
||||
}
|
||||
|
||||
pub(crate) fn get_camera(&self, camera_id: CameraId) -> Option<Camera> {
|
||||
self.cameras.get(&camera_id).cloned()
|
||||
let state = self.state.lock().unwrap();
|
||||
state.cameras.get(&camera_id).cloned()
|
||||
}
|
||||
|
||||
pub(crate) fn add_camera(&mut self, camera_id: CameraId, camera: Camera) {
|
||||
self.cameras.insert(camera_id, camera);
|
||||
pub(crate) fn add_camera(&self, camera_id: CameraId, camera: Camera) {
|
||||
let mut state = self.state.lock().unwrap();
|
||||
state.cameras.insert(camera_id, camera);
|
||||
}
|
||||
|
||||
pub(crate) fn add_dispatcher(
|
||||
&mut self,
|
||||
&self,
|
||||
dispatcher_id: DispatcherId,
|
||||
roads: Vec<u16>,
|
||||
writer_stream: mpsc::Sender<ServerFrames>,
|
||||
) {
|
||||
info!("Adding new dispatcher for roads: {roads:?}");
|
||||
info!("Adding new dispatcher for raods: {roads:?}");
|
||||
let mut state = self.state.lock().unwrap();
|
||||
|
||||
for r in roads.iter() {
|
||||
self.dispatchers
|
||||
state
|
||||
.dispatchers
|
||||
.entry(Road(*r))
|
||||
.or_insert(Vec::new())
|
||||
.push((dispatcher_id.clone(), writer_stream.clone()));
|
||||
|
|
@ -111,7 +138,8 @@ impl Db {
|
|||
}
|
||||
|
||||
pub(crate) fn get_dispatcher_for_road(&self, road: Road) -> Option<mpsc::Sender<ServerFrames>> {
|
||||
let senders = self.dispatchers.get(&road);
|
||||
let state = self.state.lock().unwrap();
|
||||
let senders = state.dispatchers.get(&road);
|
||||
if senders.is_none() {
|
||||
return None;
|
||||
}
|
||||
|
|
@ -119,24 +147,26 @@ impl Db {
|
|||
senders.unwrap().first().map(|(_, s)| s.clone())
|
||||
}
|
||||
|
||||
pub(crate) fn add_open_ticket(&mut self, ticket: Ticket) {
|
||||
info!("Adding open ticket: {ticket:?}");
|
||||
self.open_tickets
|
||||
pub(crate) fn add_open_ticket(&self, ticket: Ticket) {
|
||||
let mut state = self.state.lock().unwrap();
|
||||
state
|
||||
.open_tickets
|
||||
.entry(Road(ticket.road))
|
||||
.or_insert(Vec::new())
|
||||
.push(ticket);
|
||||
}
|
||||
|
||||
pub(crate) fn get_open_tickets(&self) -> Vec<Ticket> {
|
||||
self.open_tickets.values().flatten().cloned().collect()
|
||||
let state = self.state.lock().unwrap();
|
||||
state.open_tickets.values().flatten().cloned().collect()
|
||||
}
|
||||
|
||||
pub(crate) fn remove_open_ticket(&mut self, road: Road, ticket: Ticket) -> bool {
|
||||
info!("Removing open ticket: {ticket:?}");
|
||||
if let Some(tickets) = self.open_tickets.get_mut(&road) {
|
||||
pub(crate) fn remove_open_ticket(&self, road: Road, ticket: Ticket) -> bool {
|
||||
let mut state = self.state.lock().unwrap();
|
||||
if let Some(tickets) = state.open_tickets.get_mut(&road) {
|
||||
tickets.retain(|t| t.plate != ticket.plate);
|
||||
if tickets.is_empty() {
|
||||
self.open_tickets.remove(&road);
|
||||
state.open_tickets.remove(&road);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
|
@ -148,20 +178,22 @@ impl Db {
|
|||
plate: Plate,
|
||||
road: Road,
|
||||
) -> Option<Vec<(Mile, Timestamp)>> {
|
||||
self.plates.get(&(plate.plate, road)).cloned()
|
||||
let state = self.state.lock().unwrap();
|
||||
state.plates.get(&(plate.plate, road)).cloned()
|
||||
}
|
||||
|
||||
pub(crate) fn add_plate(&mut self, camera_id: CameraId, plate: Plate) {
|
||||
pub(crate) fn add_plate(&self, camera_id: CameraId, plate: Plate) {
|
||||
//TODO: Check if the same plate was already added for the road AND MILE
|
||||
let camera = self.get_camera(camera_id).unwrap();
|
||||
let mut state = self.state.lock().unwrap();
|
||||
|
||||
match self
|
||||
match state
|
||||
.plates
|
||||
.get_mut(&(plate.plate.clone(), camera.road.clone()))
|
||||
{
|
||||
Some(v) => v.push((camera.mile, plate.timestamp)),
|
||||
None => {
|
||||
self.plates.insert(
|
||||
state.plates.insert(
|
||||
(plate.clone().plate, camera.road),
|
||||
vec![(camera.mile, plate.timestamp)],
|
||||
);
|
||||
|
|
@ -169,18 +201,17 @@ impl Db {
|
|||
}
|
||||
}
|
||||
|
||||
pub(crate) fn ticket_plate(&mut self, day: u32, plate_name: PlateName) {
|
||||
info!("Add {plate_name:?} for day:{day} ");
|
||||
self.ticketed_plates_by_day
|
||||
pub(crate) fn ticket_plate(&self, day: u32, plate_name: PlateName) {
|
||||
let mut state = self.state.lock().unwrap();
|
||||
state
|
||||
.ticketed_plates_by_day
|
||||
.insert((Timestamp(day), plate_name.0));
|
||||
}
|
||||
|
||||
pub(crate) fn is_plate_ticketed_for_day(&self, day: u32, plate_name: PlateName) -> bool {
|
||||
info!(
|
||||
"Current ticketed plates, by day: {:?}",
|
||||
self.ticketed_plates_by_day
|
||||
);
|
||||
self.ticketed_plates_by_day
|
||||
let state = self.state.lock().unwrap();
|
||||
state
|
||||
.ticketed_plates_by_day
|
||||
.contains(&(Timestamp(day), plate_name.0))
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -2,14 +2,17 @@ use std::{future::Future, sync::Arc};
|
|||
|
||||
use tokio::{
|
||||
net::{TcpListener, TcpStream},
|
||||
sync::{broadcast, mpsc, Mutex, Semaphore},
|
||||
sync::{broadcast, mpsc, Semaphore},
|
||||
time::{self, Duration},
|
||||
};
|
||||
use tracing::{error, info};
|
||||
use tracing::error;
|
||||
|
||||
use crate::{
|
||||
connection::ConnectionType,
|
||||
db::{Camera, CameraId, Db, DispatcherId, Limit, Mile, Plate, PlateName, Road, Timestamp},
|
||||
db::{
|
||||
Camera, CameraId, Db, DbHolder, DispatcherId, Limit, Mile, Plate, PlateName, Road,
|
||||
Timestamp,
|
||||
},
|
||||
frame::{ClientFrames, ServerFrames},
|
||||
heartbeat::Heartbeat,
|
||||
ticketing::{issue_possible_ticket, send_out_waiting_tickets},
|
||||
|
|
@ -18,7 +21,7 @@ use crate::{
|
|||
|
||||
struct Listener {
|
||||
listener: TcpListener,
|
||||
db: Arc<Mutex<Db>>,
|
||||
db_holder: DbHolder,
|
||||
limit_connections: Arc<Semaphore>,
|
||||
notify_shutdown: broadcast::Sender<()>,
|
||||
shutdown_complete_tx: mpsc::Sender<()>,
|
||||
|
|
@ -27,7 +30,7 @@ struct Listener {
|
|||
struct Handler {
|
||||
connection: Connection,
|
||||
connection_type: Option<ConnectionType>,
|
||||
db: Arc<Mutex<Db>>,
|
||||
db: Db,
|
||||
shutdown: Shutdown,
|
||||
_shutdown_complete: mpsc::Sender<()>,
|
||||
}
|
||||
|
|
@ -40,7 +43,7 @@ pub async fn run(listener: TcpListener, shutdown: impl Future) -> crate::Result<
|
|||
|
||||
let mut server = Listener {
|
||||
listener,
|
||||
db: Arc::new(Mutex::new(Db::new())),
|
||||
db_holder: DbHolder::new(),
|
||||
limit_connections: Arc::new(Semaphore::new(MAX_CONNECTIONS)),
|
||||
notify_shutdown: notify_shutdown.clone(),
|
||||
shutdown_complete_tx,
|
||||
|
|
@ -87,7 +90,7 @@ impl Listener {
|
|||
let mut handler = Handler {
|
||||
connection: Connection::new(address, socket),
|
||||
connection_type: None,
|
||||
db: self.db.clone(),
|
||||
db: self.db_holder.db(),
|
||||
shutdown: Shutdown::new(self.notify_shutdown.subscribe()),
|
||||
_shutdown_complete: self.shutdown_complete_tx.clone(),
|
||||
};
|
||||
|
|
@ -131,17 +134,13 @@ impl Handler {
|
|||
while !self.shutdown.is_shutdown() {
|
||||
tokio::select! {
|
||||
res = self.connection.read_frame() => {
|
||||
match res {
|
||||
Ok(Some(frame)) => {
|
||||
match res? {
|
||||
Some(frame) => {
|
||||
if let Err(e) = self.handle_client_frame(self.db.clone(), frame, send_message.clone()).await {
|
||||
error!("Error handling frame: {e:?}");
|
||||
}
|
||||
},
|
||||
Ok(None) => return Ok(()),
|
||||
Err(_) => {
|
||||
let _ = self.connection.write_frame(ServerFrames::Error { msg: "Not supported frame sent".to_string() }).await;
|
||||
return Ok(());
|
||||
}
|
||||
None => return Ok(()),
|
||||
}
|
||||
}
|
||||
message = receive_message.recv() => {
|
||||
|
|
@ -174,33 +173,21 @@ impl Handler {
|
|||
|
||||
async fn handle_client_frame(
|
||||
&mut self,
|
||||
db: Arc<Mutex<Db>>,
|
||||
mut db: Db,
|
||||
frame: ClientFrames,
|
||||
send_message: mpsc::Sender<ServerFrames>,
|
||||
) -> crate::Result<()> {
|
||||
match frame {
|
||||
ClientFrames::Plate { plate, timestamp } => {
|
||||
if self.connection_type.is_some()
|
||||
&& self.connection_type == Some(ConnectionType::Camera)
|
||||
{
|
||||
info!("Receive new plate: {plate} at {timestamp}");
|
||||
issue_possible_ticket(
|
||||
db,
|
||||
Plate {
|
||||
plate: PlateName(plate.clone()),
|
||||
timestamp: Timestamp(timestamp),
|
||||
},
|
||||
CameraId(self.connection.get_address()),
|
||||
)
|
||||
.await;
|
||||
} else {
|
||||
let _ = send_message
|
||||
.send(ServerFrames::Error {
|
||||
msg: "Not connected as camera".to_string(),
|
||||
})
|
||||
.await;
|
||||
return Err("Already connected".into());
|
||||
}
|
||||
issue_possible_ticket(
|
||||
&mut db,
|
||||
Plate {
|
||||
plate: PlateName(plate.clone()),
|
||||
timestamp: Timestamp(timestamp),
|
||||
},
|
||||
CameraId(self.connection.get_address()),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
ClientFrames::WantHeartbeat { interval } => {
|
||||
if interval > 0 {
|
||||
|
|
@ -211,18 +198,12 @@ impl Handler {
|
|||
}
|
||||
}
|
||||
ClientFrames::IAmCamera { road, mile, limit } => {
|
||||
info!("Receive new camera: {road} at {mile} with limit {limit}");
|
||||
if self.connection_type.is_some() {
|
||||
let _ = send_message
|
||||
.send(ServerFrames::Error {
|
||||
msg: "Already connected as a connection type".to_string(),
|
||||
})
|
||||
.await;
|
||||
return Err("Already connected".into());
|
||||
}
|
||||
self.set_connection_type(ConnectionType::Camera);
|
||||
|
||||
db.lock().await.add_camera(
|
||||
db.add_camera(
|
||||
CameraId(self.connection.get_address()),
|
||||
Camera {
|
||||
road: Road(road),
|
||||
|
|
@ -233,16 +214,11 @@ impl Handler {
|
|||
}
|
||||
ClientFrames::IAmDispatcher { roads } => {
|
||||
if self.connection_type.is_some() {
|
||||
let _ = send_message
|
||||
.send(ServerFrames::Error {
|
||||
msg: "Already connected as a connection type".to_string(),
|
||||
})
|
||||
.await;
|
||||
return Err("Already connected".into());
|
||||
}
|
||||
|
||||
self.set_connection_type(ConnectionType::Dispatcher);
|
||||
db.lock().await.add_dispatcher(
|
||||
db.add_dispatcher(
|
||||
DispatcherId(self.connection.get_address()),
|
||||
roads.to_vec(),
|
||||
send_message.clone(),
|
||||
|
|
|
|||
|
|
@ -1,12 +1,8 @@
|
|||
use std::sync::Arc;
|
||||
|
||||
use tokio::sync::Mutex;
|
||||
use tracing::info;
|
||||
|
||||
use crate::db::{CameraId, Db, Plate, Road, Ticket};
|
||||
|
||||
pub(crate) async fn issue_possible_ticket(db: Arc<Mutex<Db>>, plate: Plate, camera_id: CameraId) {
|
||||
let mut db = db.lock().await;
|
||||
pub(crate) async fn issue_possible_ticket(db: &mut Db, plate: Plate, camera_id: CameraId) {
|
||||
let camera = db.get_camera(camera_id.clone()).unwrap();
|
||||
let observed_plates = db.get_plates_by_road(plate.clone(), camera.road.clone());
|
||||
|
||||
|
|
@ -51,42 +47,32 @@ pub(crate) async fn issue_possible_ticket(db: Arc<Mutex<Db>>, plate: Plate, came
|
|||
let day_start = timestamp1 / 86400;
|
||||
let day_end = timestamp2 / 86400;
|
||||
|
||||
let spans_multiple_days = day_start != day_end;
|
||||
|
||||
if spans_multiple_days
|
||||
&& (db.is_plate_ticketed_for_day(day_start, plate_name.clone())
|
||||
|| db.is_plate_ticketed_for_day(day_end, plate_name.clone()))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
if db.is_plate_ticketed_for_day(day_start, plate_name.clone()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
for day in day_start..=day_end {
|
||||
info!("Ticket for day {day} for {ticket:?}");
|
||||
info!("Day {day} for {ticket:?}");
|
||||
if db.is_plate_ticketed_for_day(day, plate_name.clone()) {
|
||||
info!("Ticket already issued: {ticket:?}");
|
||||
continue;
|
||||
}
|
||||
|
||||
let dispatcher = db.get_dispatcher_for_road(road.clone());
|
||||
|
||||
if dispatcher.is_none() {
|
||||
info!("No dispatcher yet for this road: {ticket:?}");
|
||||
db.add_open_ticket(ticket.clone());
|
||||
continue;
|
||||
}
|
||||
|
||||
info!("Sending ticket: {ticket:?}");
|
||||
let _ = dispatcher.unwrap().send(ticket.clone().into()).await;
|
||||
db.ticket_plate(day, plate_name.clone());
|
||||
}
|
||||
|
||||
let dispatcher = db.get_dispatcher_for_road(road.clone());
|
||||
|
||||
if dispatcher.is_none() {
|
||||
info!("No dispatcher yet for this road: {ticket:?}");
|
||||
db.add_open_ticket(ticket.clone());
|
||||
continue;
|
||||
}
|
||||
|
||||
info!("Sending ticket: {ticket:?}");
|
||||
let _ = dispatcher.unwrap().send(ticket.clone().into()).await;
|
||||
}
|
||||
}
|
||||
|
||||
db.add_plate(camera_id, plate);
|
||||
}
|
||||
|
||||
pub(crate) async fn send_out_waiting_tickets(db: Arc<Mutex<Db>>) {
|
||||
let mut db = db.lock().await;
|
||||
pub(crate) async fn send_out_waiting_tickets(db: Db) {
|
||||
let tickets = db.get_open_tickets();
|
||||
info!("Sending out waiting tickets: {tickets:?}");
|
||||
for ticket in tickets {
|
||||
|
|
|
|||
Loading…
Reference in a new issue