Ditch pure async

This commit is contained in:
Bastian Gruber 2023-05-22 14:05:42 +02:00
parent e7c9ad95e7
commit 52f92969b5
No known key found for this signature in database
GPG key ID: BE9F8C772B188CBF
3 changed files with 40 additions and 76 deletions

View file

@ -1,7 +1,6 @@
use std::{
collections::{HashMap, HashSet},
net::SocketAddr,
sync::{Arc, Mutex},
};
use tokio::sync::mpsc;
@ -68,17 +67,8 @@ pub(crate) struct Timestamp(pub(crate) u32);
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone)]
pub(crate) struct Mile(pub(crate) u16);
pub(crate) struct DbHolder {
db: Db,
}
#[derive(Clone)]
pub(crate) struct Db {
state: Arc<Mutex<State>>,
}
#[derive(Debug)]
struct State {
pub(crate) struct Db {
cameras: HashMap<CameraId, Camera>,
dispatchers: HashMap<Road, Vec<(DispatcherId, mpsc::Sender<ServerFrames>)>>,
plates: HashMap<(PlateName, Road), Vec<(Mile, Timestamp)>>,
@ -86,51 +76,34 @@ struct State {
open_tickets: HashMap<Road, Vec<Ticket>>,
}
impl DbHolder {
pub(crate) fn new() -> DbHolder {
DbHolder { db: Db::new() }
}
pub(crate) fn db(&self) -> Db {
self.db.clone()
}
}
impl Db {
pub(crate) fn new() -> Db {
let state = Arc::new(Mutex::new(State {
Db {
cameras: HashMap::new(),
dispatchers: HashMap::new(),
plates: HashMap::new(),
ticketed_plates_by_day: HashSet::new(),
open_tickets: HashMap::new(),
}));
Db { state }
}
}
pub(crate) fn get_camera(&self, camera_id: CameraId) -> Option<Camera> {
let state = self.state.lock().unwrap();
state.cameras.get(&camera_id).cloned()
self.cameras.get(&camera_id).cloned()
}
pub(crate) fn add_camera(&self, camera_id: CameraId, camera: Camera) {
let mut state = self.state.lock().unwrap();
state.cameras.insert(camera_id, camera);
pub(crate) fn add_camera(&mut self, camera_id: CameraId, camera: Camera) {
self.cameras.insert(camera_id, camera);
}
pub(crate) fn add_dispatcher(
&self,
&mut self,
dispatcher_id: DispatcherId,
roads: Vec<u16>,
writer_stream: mpsc::Sender<ServerFrames>,
) {
info!("Adding new dispatcher for roads: {roads:?}");
let mut state = self.state.lock().unwrap();
for r in roads.iter() {
state
.dispatchers
self.dispatchers
.entry(Road(*r))
.or_insert(Vec::new())
.push((dispatcher_id.clone(), writer_stream.clone()));
@ -138,8 +111,7 @@ impl Db {
}
pub(crate) fn get_dispatcher_for_road(&self, road: Road) -> Option<mpsc::Sender<ServerFrames>> {
let state = self.state.lock().unwrap();
let senders = state.dispatchers.get(&road);
let senders = self.dispatchers.get(&road);
if senders.is_none() {
return None;
}
@ -147,28 +119,24 @@ impl Db {
senders.unwrap().first().map(|(_, s)| s.clone())
}
pub(crate) fn add_open_ticket(&self, ticket: Ticket) {
let mut state = self.state.lock().unwrap();
pub(crate) fn add_open_ticket(&mut self, ticket: Ticket) {
info!("Adding open ticket: {ticket:?}");
state
.open_tickets
self.open_tickets
.entry(Road(ticket.road))
.or_insert(Vec::new())
.push(ticket);
}
pub(crate) fn get_open_tickets(&self) -> Vec<Ticket> {
let state = self.state.lock().unwrap();
state.open_tickets.values().flatten().cloned().collect()
self.open_tickets.values().flatten().cloned().collect()
}
pub(crate) fn remove_open_ticket(&self, road: Road, ticket: Ticket) -> bool {
pub(crate) fn remove_open_ticket(&mut self, road: Road, ticket: Ticket) -> bool {
info!("Removing open ticket: {ticket:?}");
let mut state = self.state.lock().unwrap();
if let Some(tickets) = state.open_tickets.get_mut(&road) {
if let Some(tickets) = self.open_tickets.get_mut(&road) {
tickets.retain(|t| t.plate != ticket.plate);
if tickets.is_empty() {
state.open_tickets.remove(&road);
self.open_tickets.remove(&road);
}
return true;
}
@ -180,22 +148,20 @@ impl Db {
plate: Plate,
road: Road,
) -> Option<Vec<(Mile, Timestamp)>> {
let state = self.state.lock().unwrap();
state.plates.get(&(plate.plate, road)).cloned()
self.plates.get(&(plate.plate, road)).cloned()
}
pub(crate) fn add_plate(&self, camera_id: CameraId, plate: Plate) {
pub(crate) fn add_plate(&mut self, camera_id: CameraId, plate: Plate) {
//TODO: Check if the same plate was already added for the road AND MILE
let camera = self.get_camera(camera_id).unwrap();
let mut state = self.state.lock().unwrap();
match state
match self
.plates
.get_mut(&(plate.plate.clone(), camera.road.clone()))
{
Some(v) => v.push((camera.mile, plate.timestamp)),
None => {
state.plates.insert(
self.plates.insert(
(plate.clone().plate, camera.road),
vec![(camera.mile, plate.timestamp)],
);
@ -203,22 +169,18 @@ impl Db {
}
}
pub(crate) fn ticket_plate(&self, day: u32, plate_name: PlateName) {
pub(crate) fn ticket_plate(&mut self, day: u32, plate_name: PlateName) {
info!("Add {plate_name:?} for day:{day} ");
let mut state = self.state.lock().unwrap();
state
.ticketed_plates_by_day
self.ticketed_plates_by_day
.insert((Timestamp(day), plate_name.0));
}
pub(crate) fn is_plate_ticketed_for_day(&self, day: u32, plate_name: PlateName) -> bool {
let state = self.state.lock().unwrap();
info!(
"Current ticketed plates, by day: {:?}",
state.ticketed_plates_by_day
self.ticketed_plates_by_day
);
state
.ticketed_plates_by_day
self.ticketed_plates_by_day
.contains(&(Timestamp(day), plate_name.0))
}
}

View file

@ -2,17 +2,14 @@ use std::{future::Future, sync::Arc};
use tokio::{
net::{TcpListener, TcpStream},
sync::{broadcast, mpsc, Semaphore},
sync::{broadcast, mpsc, Mutex, Semaphore},
time::{self, Duration},
};
use tracing::{error, info};
use crate::{
connection::ConnectionType,
db::{
Camera, CameraId, Db, DbHolder, DispatcherId, Limit, Mile, Plate, PlateName, Road,
Timestamp,
},
db::{Camera, CameraId, Db, DispatcherId, Limit, Mile, Plate, PlateName, Road, Timestamp},
frame::{ClientFrames, ServerFrames},
heartbeat::Heartbeat,
ticketing::{issue_possible_ticket, send_out_waiting_tickets},
@ -21,7 +18,7 @@ use crate::{
struct Listener {
listener: TcpListener,
db_holder: DbHolder,
db: Arc<Mutex<Db>>,
limit_connections: Arc<Semaphore>,
notify_shutdown: broadcast::Sender<()>,
shutdown_complete_tx: mpsc::Sender<()>,
@ -30,7 +27,7 @@ struct Listener {
struct Handler {
connection: Connection,
connection_type: Option<ConnectionType>,
db: Db,
db: Arc<Mutex<Db>>,
shutdown: Shutdown,
_shutdown_complete: mpsc::Sender<()>,
}
@ -43,7 +40,7 @@ pub async fn run(listener: TcpListener, shutdown: impl Future) -> crate::Result<
let mut server = Listener {
listener,
db_holder: DbHolder::new(),
db: Arc::new(Mutex::new(Db::new())),
limit_connections: Arc::new(Semaphore::new(MAX_CONNECTIONS)),
notify_shutdown: notify_shutdown.clone(),
shutdown_complete_tx,
@ -90,7 +87,7 @@ impl Listener {
let mut handler = Handler {
connection: Connection::new(address, socket),
connection_type: None,
db: self.db_holder.db(),
db: self.db.clone(),
shutdown: Shutdown::new(self.notify_shutdown.subscribe()),
_shutdown_complete: self.shutdown_complete_tx.clone(),
};
@ -173,7 +170,7 @@ impl Handler {
async fn handle_client_frame(
&mut self,
mut db: Db,
db: Arc<Mutex<Db>>,
frame: ClientFrames,
send_message: mpsc::Sender<ServerFrames>,
) -> crate::Result<()> {
@ -181,7 +178,7 @@ impl Handler {
ClientFrames::Plate { plate, timestamp } => {
info!("Receive new plate: {plate} at {timestamp}");
issue_possible_ticket(
&mut db,
db,
Plate {
plate: PlateName(plate.clone()),
timestamp: Timestamp(timestamp),
@ -205,7 +202,7 @@ impl Handler {
}
self.set_connection_type(ConnectionType::Camera);
db.add_camera(
db.lock().await.add_camera(
CameraId(self.connection.get_address()),
Camera {
road: Road(road),
@ -220,7 +217,7 @@ impl Handler {
}
self.set_connection_type(ConnectionType::Dispatcher);
db.add_dispatcher(
db.lock().await.add_dispatcher(
DispatcherId(self.connection.get_address()),
roads.to_vec(),
send_message.clone(),

View file

@ -1,8 +1,12 @@
use std::sync::Arc;
use tokio::sync::Mutex;
use tracing::info;
use crate::db::{CameraId, Db, Plate, Road, Ticket};
pub(crate) async fn issue_possible_ticket(db: &mut Db, plate: Plate, camera_id: CameraId) {
pub(crate) async fn issue_possible_ticket(db: Arc<Mutex<Db>>, plate: Plate, camera_id: CameraId) {
let mut db = db.lock().await;
let camera = db.get_camera(camera_id.clone()).unwrap();
let observed_plates = db.get_plates_by_road(plate.clone(), camera.road.clone());
@ -72,7 +76,8 @@ pub(crate) async fn issue_possible_ticket(db: &mut Db, plate: Plate, camera_id:
db.add_plate(camera_id, plate);
}
pub(crate) async fn send_out_waiting_tickets(db: Db) {
pub(crate) async fn send_out_waiting_tickets(db: Arc<Mutex<Db>>) {
let mut db = db.lock().await;
let tickets = db.get_open_tickets();
info!("Sending out waiting tickets: {tickets:?}");
for ticket in tickets {