diff --git a/problem_06/src/connection.rs b/problem_06/src/connection.rs index 6b26c3a..b5e7bdd 100644 --- a/problem_06/src/connection.rs +++ b/problem_06/src/connection.rs @@ -8,15 +8,15 @@ use tracing::{debug, info}; #[derive(Debug)] pub struct Connection { - stream: BufWriter, buffer: BytesMut, + pub(crate) stream: BufWriter, } impl Connection { pub fn new(socket: TcpStream) -> Connection { Connection { - stream: BufWriter::new(socket), buffer: BytesMut::with_capacity(4 * 1024), + stream: BufWriter::new(socket), } } @@ -61,7 +61,9 @@ impl Connection { } } - pub async fn write_frame(&mut self, frame: &ServerFrames) -> tokio::io::Result<()> { - unimplemented!() + pub async fn write_frame(&mut self, frame: ServerFrames) -> tokio::io::Result<()> { + let _ = self.stream.write_all(&frame.convert_to_bytes()).await; + self.stream.flush().await?; + Ok(()) } } diff --git a/problem_06/src/db.rs b/problem_06/src/db.rs new file mode 100644 index 0000000..715305f --- /dev/null +++ b/problem_06/src/db.rs @@ -0,0 +1,66 @@ +use std::collections::HashMap; +use std::sync::{Arc, Mutex}; +use tokio::sync::broadcast; + +use crate::frame::ServerFrames; + +#[derive(Debug)] +pub(crate) struct DbHolder { + /// The `Db` instance that will be shut down when this `DbHolder` struct + /// is dropped. + db: Db, +} + +#[derive(Debug, Clone)] +pub(crate) struct Db { + state: Arc>, +} + +#[derive(Debug)] +struct State { + // cameras: HashMap<(u32, u32), u32>, + dispatchers: HashMap, broadcast::Sender>, + plates: HashMap, +} + +impl DbHolder { + /// Create a new `DbHolder`, wrapping a `Db` instance. When this is dropped + /// the `Db`'s purge task will be shut down. + pub(crate) fn new() -> DbHolder { + DbHolder { db: Db::new() } + } + + /// Get the shared database. Internally, this is an + /// `Arc`, so a clone only increments the ref count. + pub(crate) fn db(&self) -> Db { + self.db.clone() + } +} + +impl Db { + pub(crate) fn new() -> Db { + let state = Arc::new(Mutex::new(State { + // cameras: HashMap::new(), + dispatchers: HashMap::new(), + plates: HashMap::new(), + })); + + Db { state } + } + + // pub(crate) fn new_camera(&self, road: u32, mile: u32, limit: u32) {} + + pub(crate) fn add_dispatcher( + &self, + roads: Vec, + writer_stream: broadcast::Sender, + ) { + let mut state = self.state.lock().unwrap(); + state.dispatchers.insert(roads, writer_stream); + } + + pub(crate) fn insert_plate(&self, plate: String, timestamp: u32) { + let mut state = self.state.lock().unwrap(); + state.plates.insert(plate, timestamp); + } +} diff --git a/problem_06/src/frame.rs b/problem_06/src/frame.rs index 5654471..86cbd1e 100644 --- a/problem_06/src/frame.rs +++ b/problem_06/src/frame.rs @@ -1,4 +1,4 @@ -use bytes::Buf; +use bytes::{Buf, BufMut, BytesMut}; use std::fmt; use std::io::Cursor; use std::num::TryFromIntError; @@ -22,9 +22,9 @@ pub enum ServerFrames { plate: String, road: u16, mile1: u16, - timestamp1: u16, + timestamp1: u32, mile2: u16, - timestamp2: u16, + timestamp2: u32, speed: u16, }, Heartbeat, @@ -144,6 +144,52 @@ impl ClientFrames { } } +impl ServerFrames { + pub(crate) fn convert_to_bytes(&self) -> BytesMut { + match self { + ServerFrames::Error { msg } => { + let mut buf = BytesMut::with_capacity(1 + 1 + msg.len()); + + buf.put_u8(0x10); + buf.put_u8(msg.len() as u8); + buf.put_slice(msg.as_bytes()); + + return buf; + } + ServerFrames::Ticket { + plate, + road, + mile1, + timestamp1, + mile2, + timestamp2, + speed, + } => { + let mut buf = BytesMut::with_capacity(1 + 1 + plate.len() + 2 + 2 + 4 + 2 + 4 + 2); + + buf.put_u8(0x21); + buf.put_u8(plate.len() as u8); + buf.put_slice(plate.as_bytes()); + buf.put_u16(*road); + buf.put_u16(*mile1); + buf.put_u32(*timestamp1); + buf.put_u16(*mile2); + buf.put_u32(*timestamp2); + buf.put_u16(*speed); + + return buf; + } + ServerFrames::Heartbeat => { + let mut buf = BytesMut::new(); + + buf.put_u8(0x41); + + return buf; + } + } + } +} + fn get_str<'a>(src: &mut Cursor<&'a [u8]>, len: usize) -> Result<&'a str, Error> { if src.remaining() < len { return Err(Error::Incomplete); diff --git a/problem_06/src/lib.rs b/problem_06/src/lib.rs index 4ba6a3a..9cb67cc 100644 --- a/problem_06/src/lib.rs +++ b/problem_06/src/lib.rs @@ -4,8 +4,12 @@ pub use connection::Connection; pub mod frame; pub use frame::ClientFrames; +pub mod db; + pub mod server; +pub mod ticketing; + mod shutdown; use shutdown::Shutdown; diff --git a/problem_06/src/server.rs b/problem_06/src/server.rs index 835174b..7ec9add 100644 --- a/problem_06/src/server.rs +++ b/problem_06/src/server.rs @@ -1,4 +1,8 @@ -use crate::{frame::ClientFrames, Connection, Shutdown}; +use crate::{ + db::{Db, DbHolder}, + frame::{ClientFrames, ServerFrames}, + ticketing, Connection, Shutdown, +}; use std::future::Future; use std::sync::Arc; @@ -8,16 +12,19 @@ use tokio::time::{self, Duration}; use tracing::{debug, error, info}; struct Listener { - db_holder: DbDropGuard, listener: TcpListener, + db_holder: DbHolder, limit_connections: Arc, notify_shutdown: broadcast::Sender<()>, - shutdown_complete_rx: mpsc::Receiver<()>, shutdown_complete_tx: mpsc::Sender<()>, } struct Handler { connection: Connection, + receive_ticket: broadcast::Receiver, + send_ticket: broadcast::Sender, + connection_type: Option, + db: Db, shutdown: Shutdown, _shutdown_complete: mpsc::Sender<()>, } @@ -26,14 +33,14 @@ const MAX_CONNECTIONS: usize = 1500; pub async fn run(listener: TcpListener, shutdown: impl Future) -> crate::Result<()> { let (notify_shutdown, _) = broadcast::channel(1); - let (shutdown_complete_tx, shutdown_complete_rx) = mpsc::channel(1); + let (shutdown_complete_tx, mut shutdown_complete_rx) = mpsc::channel(1); let mut server = Listener { listener, + db_holder: DbHolder::new(), limit_connections: Arc::new(Semaphore::new(MAX_CONNECTIONS)), notify_shutdown, shutdown_complete_tx, - shutdown_complete_rx, }; tokio::select! { @@ -48,7 +55,6 @@ pub async fn run(listener: TcpListener, shutdown: impl Future) -> crate::Result< } let Listener { - mut shutdown_complete_rx, shutdown_complete_tx, notify_shutdown, .. @@ -66,6 +72,11 @@ impl Listener { async fn run(&mut self) -> crate::Result<()> { info!("accepting inbound connections"); + let (send_ticket, _): ( + broadcast::Sender, + broadcast::Receiver, + ) = broadcast::channel(4096); + loop { let permit = self .limit_connections @@ -78,6 +89,10 @@ impl Listener { let mut handler = Handler { connection: Connection::new(socket), + send_ticket: send_ticket.clone(), + receive_ticket: send_ticket.subscribe(), + connection_type: None, + db: self.db_holder.db(), shutdown: Shutdown::new(self.notify_shutdown.subscribe()), _shutdown_complete: self.shutdown_complete_tx.clone(), }; @@ -122,6 +137,22 @@ impl Handler { debug!("Shutdown"); return Ok(()); } + message = self.receive_ticket.recv() => { + match message { + Ok(message) => { + match message { + ServerFrames::Ticket { + .. + } => { + let _ = self.connection.write_frame(message).await; + } + _ => () + } + }, + Err(_) => return Ok(()), + } + None + } }; let frame = match maybe_frame { @@ -131,20 +162,42 @@ impl Handler { match frame { ClientFrames::Plate { plate, timestamp } => { - info!("Plate: {plate}, timestamp: {timestamp}"); + info!("Insert {plate} and {timestamp}"); + self.db.insert_plate(plate, timestamp); } ClientFrames::WantHeartbeat { interval } => { info!("Want heartbeat: {interval}"); } ClientFrames::IAmCamera { road, mile, limit } => { + if self.connection_type.is_some() { + return Err("Already connected".into()); + } + self.set_connection_type(ticketing::ClientType::Camera(road, mile)); info!("Road: {road}, mile: {mile}, limit: {limit}"); } ClientFrames::IAmDispatcher { roads } => { - info!("roads: {roads:?}"); + if self.connection_type.is_some() { + return Err("Already connected".into()); + } + + self.set_connection_type(ticketing::ClientType::Dispatcher); + self.db + .add_dispatcher(roads.to_vec(), self.send_ticket.clone()); } } } Ok(()) } + + fn set_connection_type(&mut self, connection_type: ticketing::ClientType) { + match connection_type { + ticketing::ClientType::Camera(_, _) => { + self.connection_type = Some(connection_type); + } + ticketing::ClientType::Dispatcher => { + self.connection_type = Some(connection_type); + } + } + } } diff --git a/problem_06/src/ticketing.rs b/problem_06/src/ticketing.rs new file mode 100644 index 0000000..44784ec --- /dev/null +++ b/problem_06/src/ticketing.rs @@ -0,0 +1,4 @@ +pub(crate) enum ClientType { + Camera(u16, u16), + Dispatcher, +}