Compare commits

..

10 commits

Author SHA1 Message Date
2734c62139
PASS 2023-05-22 15:02:37 +02:00
641b48b367
Send error 2023-05-22 15:00:59 +02:00
c57c0de2db
Send error on not supported frame 2023-05-22 14:56:07 +02:00
3178c07f84 Solved: Big world test 2023-05-22 12:50:04 +00:00
52f92969b5
Ditch pure async 2023-05-22 14:05:42 +02:00
e7c9ad95e7
Debug 2023-05-22 10:53:24 +02:00
9e7b2af480
Move logic around to find bug 2023-05-22 10:40:33 +02:00
b70dbbb24b
debug 2023-05-22 10:22:24 +02:00
ef131fbf59
Debugging 2023-05-22 10:18:16 +02:00
f62a6e804a
Logging 2023-05-22 10:13:04 +02:00
4 changed files with 112 additions and 104 deletions

View file

@ -8,6 +8,7 @@ use tokio::{
use crate::frame::{self, ClientFrames, ServerFrames};
#[derive(PartialEq)]
pub(crate) enum ConnectionType {
Camera,
Dispatcher,

View file

@ -1,7 +1,6 @@
use std::{
collections::{HashMap, HashSet},
net::SocketAddr,
sync::{Arc, Mutex},
};
use tokio::sync::mpsc;
@ -68,17 +67,8 @@ pub(crate) struct Timestamp(pub(crate) u32);
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone)]
pub(crate) struct Mile(pub(crate) u16);
pub(crate) struct DbHolder {
db: Db,
}
#[derive(Clone)]
pub(crate) struct Db {
state: Arc<Mutex<State>>,
}
#[derive(Debug)]
struct State {
pub(crate) struct Db {
cameras: HashMap<CameraId, Camera>,
dispatchers: HashMap<Road, Vec<(DispatcherId, mpsc::Sender<ServerFrames>)>>,
plates: HashMap<(PlateName, Road), Vec<(Mile, Timestamp)>>,
@ -86,51 +76,34 @@ struct State {
open_tickets: HashMap<Road, Vec<Ticket>>,
}
impl DbHolder {
pub(crate) fn new() -> DbHolder {
DbHolder { db: Db::new() }
}
pub(crate) fn db(&self) -> Db {
self.db.clone()
}
}
impl Db {
pub(crate) fn new() -> Db {
let state = Arc::new(Mutex::new(State {
Db {
cameras: HashMap::new(),
dispatchers: HashMap::new(),
plates: HashMap::new(),
ticketed_plates_by_day: HashSet::new(),
open_tickets: HashMap::new(),
}));
Db { state }
}
}
pub(crate) fn get_camera(&self, camera_id: CameraId) -> Option<Camera> {
let state = self.state.lock().unwrap();
state.cameras.get(&camera_id).cloned()
self.cameras.get(&camera_id).cloned()
}
pub(crate) fn add_camera(&self, camera_id: CameraId, camera: Camera) {
let mut state = self.state.lock().unwrap();
state.cameras.insert(camera_id, camera);
pub(crate) fn add_camera(&mut self, camera_id: CameraId, camera: Camera) {
self.cameras.insert(camera_id, camera);
}
pub(crate) fn add_dispatcher(
&self,
&mut self,
dispatcher_id: DispatcherId,
roads: Vec<u16>,
writer_stream: mpsc::Sender<ServerFrames>,
) {
info!("Adding new dispatcher for raods: {roads:?}");
let mut state = self.state.lock().unwrap();
info!("Adding new dispatcher for roads: {roads:?}");
for r in roads.iter() {
state
.dispatchers
self.dispatchers
.entry(Road(*r))
.or_insert(Vec::new())
.push((dispatcher_id.clone(), writer_stream.clone()));
@ -138,8 +111,7 @@ impl Db {
}
pub(crate) fn get_dispatcher_for_road(&self, road: Road) -> Option<mpsc::Sender<ServerFrames>> {
let state = self.state.lock().unwrap();
let senders = state.dispatchers.get(&road);
let senders = self.dispatchers.get(&road);
if senders.is_none() {
return None;
}
@ -147,26 +119,24 @@ impl Db {
senders.unwrap().first().map(|(_, s)| s.clone())
}
pub(crate) fn add_open_ticket(&self, ticket: Ticket) {
let mut state = self.state.lock().unwrap();
state
.open_tickets
pub(crate) fn add_open_ticket(&mut self, ticket: Ticket) {
info!("Adding open ticket: {ticket:?}");
self.open_tickets
.entry(Road(ticket.road))
.or_insert(Vec::new())
.push(ticket);
}
pub(crate) fn get_open_tickets(&self) -> Vec<Ticket> {
let state = self.state.lock().unwrap();
state.open_tickets.values().flatten().cloned().collect()
self.open_tickets.values().flatten().cloned().collect()
}
pub(crate) fn remove_open_ticket(&self, road: Road, ticket: Ticket) -> bool {
let mut state = self.state.lock().unwrap();
if let Some(tickets) = state.open_tickets.get_mut(&road) {
pub(crate) fn remove_open_ticket(&mut self, road: Road, ticket: Ticket) -> bool {
info!("Removing open ticket: {ticket:?}");
if let Some(tickets) = self.open_tickets.get_mut(&road) {
tickets.retain(|t| t.plate != ticket.plate);
if tickets.is_empty() {
state.open_tickets.remove(&road);
self.open_tickets.remove(&road);
}
return true;
}
@ -178,22 +148,20 @@ impl Db {
plate: Plate,
road: Road,
) -> Option<Vec<(Mile, Timestamp)>> {
let state = self.state.lock().unwrap();
state.plates.get(&(plate.plate, road)).cloned()
self.plates.get(&(plate.plate, road)).cloned()
}
pub(crate) fn add_plate(&self, camera_id: CameraId, plate: Plate) {
pub(crate) fn add_plate(&mut self, camera_id: CameraId, plate: Plate) {
//TODO: Check if the same plate was already added for the road AND MILE
let camera = self.get_camera(camera_id).unwrap();
let mut state = self.state.lock().unwrap();
match state
match self
.plates
.get_mut(&(plate.plate.clone(), camera.road.clone()))
{
Some(v) => v.push((camera.mile, plate.timestamp)),
None => {
state.plates.insert(
self.plates.insert(
(plate.clone().plate, camera.road),
vec![(camera.mile, plate.timestamp)],
);
@ -201,17 +169,18 @@ impl Db {
}
}
pub(crate) fn ticket_plate(&self, day: u32, plate_name: PlateName) {
let mut state = self.state.lock().unwrap();
state
.ticketed_plates_by_day
pub(crate) fn ticket_plate(&mut self, day: u32, plate_name: PlateName) {
info!("Add {plate_name:?} for day:{day} ");
self.ticketed_plates_by_day
.insert((Timestamp(day), plate_name.0));
}
pub(crate) fn is_plate_ticketed_for_day(&self, day: u32, plate_name: PlateName) -> bool {
let state = self.state.lock().unwrap();
state
.ticketed_plates_by_day
info!(
"Current ticketed plates, by day: {:?}",
self.ticketed_plates_by_day
);
self.ticketed_plates_by_day
.contains(&(Timestamp(day), plate_name.0))
}
}

View file

@ -2,17 +2,14 @@ use std::{future::Future, sync::Arc};
use tokio::{
net::{TcpListener, TcpStream},
sync::{broadcast, mpsc, Semaphore},
sync::{broadcast, mpsc, Mutex, Semaphore},
time::{self, Duration},
};
use tracing::error;
use tracing::{error, info};
use crate::{
connection::ConnectionType,
db::{
Camera, CameraId, Db, DbHolder, DispatcherId, Limit, Mile, Plate, PlateName, Road,
Timestamp,
},
db::{Camera, CameraId, Db, DispatcherId, Limit, Mile, Plate, PlateName, Road, Timestamp},
frame::{ClientFrames, ServerFrames},
heartbeat::Heartbeat,
ticketing::{issue_possible_ticket, send_out_waiting_tickets},
@ -21,7 +18,7 @@ use crate::{
struct Listener {
listener: TcpListener,
db_holder: DbHolder,
db: Arc<Mutex<Db>>,
limit_connections: Arc<Semaphore>,
notify_shutdown: broadcast::Sender<()>,
shutdown_complete_tx: mpsc::Sender<()>,
@ -30,7 +27,7 @@ struct Listener {
struct Handler {
connection: Connection,
connection_type: Option<ConnectionType>,
db: Db,
db: Arc<Mutex<Db>>,
shutdown: Shutdown,
_shutdown_complete: mpsc::Sender<()>,
}
@ -43,7 +40,7 @@ pub async fn run(listener: TcpListener, shutdown: impl Future) -> crate::Result<
let mut server = Listener {
listener,
db_holder: DbHolder::new(),
db: Arc::new(Mutex::new(Db::new())),
limit_connections: Arc::new(Semaphore::new(MAX_CONNECTIONS)),
notify_shutdown: notify_shutdown.clone(),
shutdown_complete_tx,
@ -90,7 +87,7 @@ impl Listener {
let mut handler = Handler {
connection: Connection::new(address, socket),
connection_type: None,
db: self.db_holder.db(),
db: self.db.clone(),
shutdown: Shutdown::new(self.notify_shutdown.subscribe()),
_shutdown_complete: self.shutdown_complete_tx.clone(),
};
@ -134,13 +131,17 @@ impl Handler {
while !self.shutdown.is_shutdown() {
tokio::select! {
res = self.connection.read_frame() => {
match res? {
Some(frame) => {
match res {
Ok(Some(frame)) => {
if let Err(e) = self.handle_client_frame(self.db.clone(), frame, send_message.clone()).await {
error!("Error handling frame: {e:?}");
}
},
None => return Ok(()),
Ok(None) => return Ok(()),
Err(_) => {
let _ = self.connection.write_frame(ServerFrames::Error { msg: "Not supported frame sent".to_string() }).await;
return Ok(());
}
}
}
message = receive_message.recv() => {
@ -173,14 +174,18 @@ impl Handler {
async fn handle_client_frame(
&mut self,
mut db: Db,
db: Arc<Mutex<Db>>,
frame: ClientFrames,
send_message: mpsc::Sender<ServerFrames>,
) -> crate::Result<()> {
match frame {
ClientFrames::Plate { plate, timestamp } => {
if self.connection_type.is_some()
&& self.connection_type == Some(ConnectionType::Camera)
{
info!("Receive new plate: {plate} at {timestamp}");
issue_possible_ticket(
&mut db,
db,
Plate {
plate: PlateName(plate.clone()),
timestamp: Timestamp(timestamp),
@ -188,6 +193,14 @@ impl Handler {
CameraId(self.connection.get_address()),
)
.await;
} else {
let _ = send_message
.send(ServerFrames::Error {
msg: "Not connected as camera".to_string(),
})
.await;
return Err("Already connected".into());
}
}
ClientFrames::WantHeartbeat { interval } => {
if interval > 0 {
@ -198,12 +211,18 @@ impl Handler {
}
}
ClientFrames::IAmCamera { road, mile, limit } => {
info!("Receive new camera: {road} at {mile} with limit {limit}");
if self.connection_type.is_some() {
let _ = send_message
.send(ServerFrames::Error {
msg: "Already connected as a connection type".to_string(),
})
.await;
return Err("Already connected".into());
}
self.set_connection_type(ConnectionType::Camera);
db.add_camera(
db.lock().await.add_camera(
CameraId(self.connection.get_address()),
Camera {
road: Road(road),
@ -214,11 +233,16 @@ impl Handler {
}
ClientFrames::IAmDispatcher { roads } => {
if self.connection_type.is_some() {
let _ = send_message
.send(ServerFrames::Error {
msg: "Already connected as a connection type".to_string(),
})
.await;
return Err("Already connected".into());
}
self.set_connection_type(ConnectionType::Dispatcher);
db.add_dispatcher(
db.lock().await.add_dispatcher(
DispatcherId(self.connection.get_address()),
roads.to_vec(),
send_message.clone(),

View file

@ -1,8 +1,12 @@
use std::sync::Arc;
use tokio::sync::Mutex;
use tracing::info;
use crate::db::{CameraId, Db, Plate, Road, Ticket};
pub(crate) async fn issue_possible_ticket(db: &mut Db, plate: Plate, camera_id: CameraId) {
pub(crate) async fn issue_possible_ticket(db: Arc<Mutex<Db>>, plate: Plate, camera_id: CameraId) {
let mut db = db.lock().await;
let camera = db.get_camera(camera_id.clone()).unwrap();
let observed_plates = db.get_plates_by_road(plate.clone(), camera.road.clone());
@ -47,13 +51,24 @@ pub(crate) async fn issue_possible_ticket(db: &mut Db, plate: Plate, camera_id:
let day_start = timestamp1 / 86400;
let day_end = timestamp2 / 86400;
for day in day_start..=day_end {
info!("Day {day} for {ticket:?}");
if db.is_plate_ticketed_for_day(day, plate_name.clone()) {
info!("Ticket already issued: {ticket:?}");
let spans_multiple_days = day_start != day_end;
if spans_multiple_days
&& (db.is_plate_ticketed_for_day(day_start, plate_name.clone())
|| db.is_plate_ticketed_for_day(day_end, plate_name.clone()))
{
continue;
}
if db.is_plate_ticketed_for_day(day_start, plate_name.clone()) {
continue;
}
for day in day_start..=day_end {
info!("Ticket for day {day} for {ticket:?}");
db.ticket_plate(day, plate_name.clone());
}
let dispatcher = db.get_dispatcher_for_road(road.clone());
if dispatcher.is_none() {
@ -64,15 +79,14 @@ pub(crate) async fn issue_possible_ticket(db: &mut Db, plate: Plate, camera_id:
info!("Sending ticket: {ticket:?}");
let _ = dispatcher.unwrap().send(ticket.clone().into()).await;
db.ticket_plate(day, plate_name.clone());
}
}
}
db.add_plate(camera_id, plate);
}
pub(crate) async fn send_out_waiting_tickets(db: Db) {
pub(crate) async fn send_out_waiting_tickets(db: Arc<Mutex<Db>>) {
let mut db = db.lock().await;
let tickets = db.get_open_tickets();
info!("Sending out waiting tickets: {tickets:?}");
for ticket in tickets {