Finish happy path of dispatching messages
This commit is contained in:
parent
cfdc4e0e53
commit
6e4c12a9b6
4 changed files with 42 additions and 8 deletions
|
|
@ -98,3 +98,20 @@ async fn test_camera_connection(
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn test_dipatcher_connection(
|
||||||
|
write: &mut WriteHalf<'_>,
|
||||||
|
) -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
// 81 IAmDispatcher{
|
||||||
|
// 03 roads: [
|
||||||
|
// 00 42 66,
|
||||||
|
// 01 70 368,
|
||||||
|
// 13 88 5000
|
||||||
|
// ]
|
||||||
|
// }
|
||||||
|
let i_am_dispatcher = [0x81, 0x03, 0x00, 0x42, 0x01, 0x70, 0x13, 0x88];
|
||||||
|
|
||||||
|
write.write_all(&i_am_dispatcher).await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -7,6 +7,8 @@ use std::{
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
use tracing::debug;
|
use tracing::debug;
|
||||||
|
|
||||||
|
use crate::frame::ServerFrames;
|
||||||
|
|
||||||
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone)]
|
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone)]
|
||||||
pub(crate) struct DispatcherId(pub(crate) SocketAddr);
|
pub(crate) struct DispatcherId(pub(crate) SocketAddr);
|
||||||
|
|
||||||
|
|
@ -37,6 +39,20 @@ pub(crate) struct Ticket {
|
||||||
pub(crate) speed: u16,
|
pub(crate) speed: u16,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl From<Ticket> for ServerFrames {
|
||||||
|
fn from(ticket: Ticket) -> Self {
|
||||||
|
ServerFrames::Ticket {
|
||||||
|
plate: ticket.plate,
|
||||||
|
road: ticket.road,
|
||||||
|
mile1: ticket.mile1,
|
||||||
|
timestamp1: ticket.timestamp1,
|
||||||
|
mile2: ticket.mile2,
|
||||||
|
timestamp2: ticket.timestamp2,
|
||||||
|
speed: ticket.speed,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone)]
|
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone)]
|
||||||
pub(crate) struct Road(pub(crate) u16);
|
pub(crate) struct Road(pub(crate) u16);
|
||||||
|
|
||||||
|
|
@ -61,7 +77,7 @@ pub(crate) struct Db {
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
struct State {
|
struct State {
|
||||||
cameras: HashMap<CameraId, Camera>,
|
cameras: HashMap<CameraId, Camera>,
|
||||||
dispatchers: HashMap<Road, Vec<(DispatcherId, mpsc::Sender<Ticket>)>>,
|
dispatchers: HashMap<Road, Vec<(DispatcherId, mpsc::Sender<ServerFrames>)>>,
|
||||||
plates: HashMap<(Plate, Road), Vec<(Mile, Timestamp)>>,
|
plates: HashMap<(Plate, Road), Vec<(Mile, Timestamp)>>,
|
||||||
ticketed_plates_by_day: HashSet<(Timestamp, String)>,
|
ticketed_plates_by_day: HashSet<(Timestamp, String)>,
|
||||||
open_tickets: HashMap<Road, Vec<Ticket>>,
|
open_tickets: HashMap<Road, Vec<Ticket>>,
|
||||||
|
|
@ -105,7 +121,7 @@ impl Db {
|
||||||
&self,
|
&self,
|
||||||
dispatcher_id: DispatcherId,
|
dispatcher_id: DispatcherId,
|
||||||
roads: Vec<u16>,
|
roads: Vec<u16>,
|
||||||
writer_stream: mpsc::Sender<Ticket>,
|
writer_stream: mpsc::Sender<ServerFrames>,
|
||||||
) {
|
) {
|
||||||
let mut state = self.state.lock().unwrap();
|
let mut state = self.state.lock().unwrap();
|
||||||
|
|
||||||
|
|
@ -120,7 +136,7 @@ impl Db {
|
||||||
debug!(?state);
|
debug!(?state);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn get_dispatcher_for_road(&self, road: Road) -> Option<mpsc::Sender<Ticket>> {
|
pub(crate) fn get_dispatcher_for_road(&self, road: Road) -> Option<mpsc::Sender<ServerFrames>> {
|
||||||
let state = self.state.lock().unwrap();
|
let state = self.state.lock().unwrap();
|
||||||
let senders = state.dispatchers.get(&road);
|
let senders = state.dispatchers.get(&road);
|
||||||
if senders.is_none() {
|
if senders.is_none() {
|
||||||
|
|
|
||||||
|
|
@ -181,7 +181,7 @@ impl Handler {
|
||||||
match frame {
|
match frame {
|
||||||
ClientFrames::Plate { plate, timestamp } => {
|
ClientFrames::Plate { plate, timestamp } => {
|
||||||
info!("Receive new plate {plate} {timestamp}");
|
info!("Receive new plate {plate} {timestamp}");
|
||||||
db.insert_plate(
|
db.add_plate(
|
||||||
CameraId(self.connection.get_address()),
|
CameraId(self.connection.get_address()),
|
||||||
Plate {
|
Plate {
|
||||||
plate: plate.clone(),
|
plate: plate.clone(),
|
||||||
|
|
@ -196,7 +196,8 @@ impl Handler {
|
||||||
timestamp: Timestamp(timestamp),
|
timestamp: Timestamp(timestamp),
|
||||||
},
|
},
|
||||||
CameraId(self.connection.get_address()),
|
CameraId(self.connection.get_address()),
|
||||||
);
|
)
|
||||||
|
.await;
|
||||||
}
|
}
|
||||||
ClientFrames::WantHeartbeat { interval } => {
|
ClientFrames::WantHeartbeat { interval } => {
|
||||||
info!("Want heartbeat: {interval}");
|
info!("Want heartbeat: {interval}");
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,6 @@
|
||||||
use crate::db::{CameraId, Db, Plate, Ticket};
|
use crate::db::{CameraId, Db, Plate, Ticket};
|
||||||
|
|
||||||
pub(crate) fn issue_possible_ticket(db: &mut Db, plate: Plate, camera_id: CameraId) {
|
pub(crate) async fn issue_possible_ticket(db: &mut Db, plate: Plate, camera_id: CameraId) {
|
||||||
let camera = db.get_camera(camera_id).unwrap();
|
let camera = db.get_camera(camera_id).unwrap();
|
||||||
let observed_plates = db
|
let observed_plates = db
|
||||||
.get_plates_by_road(plate.clone(), camera.road.clone())
|
.get_plates_by_road(plate.clone(), camera.road.clone())
|
||||||
|
|
@ -50,11 +50,11 @@ pub(crate) fn issue_possible_ticket(db: &mut Db, plate: Plate, camera_id: Camera
|
||||||
let dispatcher = db.get_dispatcher_for_road(road.clone());
|
let dispatcher = db.get_dispatcher_for_road(road.clone());
|
||||||
|
|
||||||
if dispatcher.is_none() {
|
if dispatcher.is_none() {
|
||||||
db.add_open_ticket(ticket);
|
db.add_open_ticket(ticket.clone());
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
dispatcher.unwrap().send(ticket).await;
|
let _ = dispatcher.unwrap().send(ticket.clone().into()).await;
|
||||||
db.ticket_plate(day, plate_name.clone());
|
db.ticket_plate(day, plate_name.clone());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue