From 4b30354214bd18030a55f6276c9ac4bd72a6f5b9 Mon Sep 17 00:00:00 2001 From: Bastian Gruber Date: Thu, 18 May 2023 09:44:54 +0200 Subject: [PATCH] Add test cases, reorganized server code, be able to add data --- problem_06/bin/client.rs | 66 ++++++++++++---- problem_06/src/connection.rs | 14 +++- problem_06/src/db.rs | 53 ++++++++++--- problem_06/src/lib.rs | 2 - problem_06/src/server.rs | 144 +++++++++++++++++++---------------- problem_06/src/ticketing.rs | 4 - 6 files changed, 183 insertions(+), 100 deletions(-) delete mode 100644 problem_06/src/ticketing.rs diff --git a/problem_06/bin/client.rs b/problem_06/bin/client.rs index 817817a..d897c80 100644 --- a/problem_06/bin/client.rs +++ b/problem_06/bin/client.rs @@ -1,6 +1,6 @@ use problem_06::{DEFAULT_IP, DEFAULT_PORT}; use tokio::io::{AsyncReadExt, AsyncWriteExt}; -use tokio::net::TcpStream; +use tokio::net::{tcp::WriteHalf, TcpStream}; use tracing::{debug, error, info}; #[tokio::main] @@ -10,8 +10,32 @@ async fn main() -> Result<(), Box> { let mut stream = TcpStream::connect(format!("{DEFAULT_IP}:{DEFAULT_PORT}")).await?; let (mut read, mut write) = stream.split(); + // test_all_different_messages(&mut write).await?; + test_camera_connection(&mut write).await?; + let mut buf: [u8; 4] = [0; 4]; + if let Ok(n) = read.read_exact(&mut buf).await { + info!("Stream incoming..."); + + if n == 0 { + info!("End of stream"); + return Ok(()); + } + + let message = i32::from_be_bytes(buf); + debug!(?message); + return Ok(()); + } + + error!("Cannot read from socket"); + Err("Could not read from socket".into()) +} + +#[allow(dead_code)] +async fn test_all_different_messages( + write: &mut WriteHalf<'_>, +) -> Result<(), Box> { // 20 Plate { // 07 52 45 30 35 42 4b 47 plate: "RE05BKG", // 00 01 e2 40 timestamp: 123456 @@ -46,19 +70,29 @@ async fn main() -> Result<(), Box> { write.write_all(&i_am_camera).await?; write.write_all(&i_am_dispatcher).await?; - if let Ok(n) = read.read_exact(&mut buf).await { - info!("Stream incoming..."); - - if n == 0 { - info!("End of stream"); - return Ok(()); - } - - let message = i32::from_be_bytes(buf); - debug!(?message); - return Ok(()); - } - - error!("Cannot read from socket"); - Err("Could not read from socket".into()) + Ok(()) +} + +async fn test_camera_connection( + write: &mut WriteHalf<'_>, +) -> Result<(), Box> { + // 80 IAmCamera{ + // 00 42 road: 66, + // 00 64 mile: 100, + // 00 3c limit: 60, + // } + let i_am_camera = [0x80, 0x00, 0x42, 0x00, 0x64, 0x00, 0x3c]; + + // 20 Plate { + // 07 52 45 30 35 42 4b 47 plate: "RE05BKG", + // 00 01 e2 40 timestamp: 123456 + // } + let plate = [ + 0x20, 0x07, 0x52, 0x45, 0x30, 0x35, 0x42, 0x4b, 0x47, 0x00, 0x01, 0xe2, 0x40, + ]; + + write.write_all(&i_am_camera).await?; + write.write_all(&plate).await?; + + Ok(()) } diff --git a/problem_06/src/connection.rs b/problem_06/src/connection.rs index b5e7bdd..b201ea7 100644 --- a/problem_06/src/connection.rs +++ b/problem_06/src/connection.rs @@ -2,24 +2,36 @@ use crate::frame::{self, ClientFrames, ServerFrames}; use bytes::{Buf, BytesMut}; use std::io::Cursor; +use std::net::SocketAddr; use tokio::io::{AsyncReadExt, AsyncWriteExt, BufWriter}; use tokio::net::TcpStream; use tracing::{debug, info}; +pub(crate) enum ConnectionType { + Camera, + Dispatcher, +} + #[derive(Debug)] pub struct Connection { + pub address: SocketAddr, buffer: BytesMut, pub(crate) stream: BufWriter, } impl Connection { - pub fn new(socket: TcpStream) -> Connection { + pub fn new(address: SocketAddr, socket: TcpStream) -> Connection { Connection { + address, buffer: BytesMut::with_capacity(4 * 1024), stream: BufWriter::new(socket), } } + pub fn get_address(&self) -> SocketAddr { + self.address.clone() + } + pub async fn read_frame(&mut self) -> crate::Result> { loop { info!("Loop read_frame"); diff --git a/problem_06/src/db.rs b/problem_06/src/db.rs index 715305f..02e8e74 100644 --- a/problem_06/src/db.rs +++ b/problem_06/src/db.rs @@ -1,26 +1,46 @@ use std::collections::HashMap; +use std::net::SocketAddr; use std::sync::{Arc, Mutex}; -use tokio::sync::broadcast; +use tokio::sync::mpsc; +use tracing::debug; use crate::frame::ServerFrames; -#[derive(Debug)] +#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone)] +pub(crate) struct DispatcherId(pub(crate) SocketAddr); + +#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone)] +pub(crate) struct CameraId(pub(crate) SocketAddr); + +#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone)] +pub(crate) struct Plate { + pub(crate) plate: String, + pub(crate) timestamp: u32, +} + +#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone)] +pub(crate) struct Camera { + pub(crate) road: u16, + pub(crate) mile: u16, + pub(crate) limit: u16, +} + pub(crate) struct DbHolder { /// The `Db` instance that will be shut down when this `DbHolder` struct /// is dropped. db: Db, } -#[derive(Debug, Clone)] +#[derive(Clone)] pub(crate) struct Db { state: Arc>, } #[derive(Debug)] struct State { - // cameras: HashMap<(u32, u32), u32>, - dispatchers: HashMap, broadcast::Sender>, - plates: HashMap, + cameras: HashMap, + dispatchers: HashMap, mpsc::Sender)>, + plates: HashMap, } impl DbHolder { @@ -40,7 +60,7 @@ impl DbHolder { impl Db { pub(crate) fn new() -> Db { let state = Arc::new(Mutex::new(State { - // cameras: HashMap::new(), + cameras: HashMap::new(), dispatchers: HashMap::new(), plates: HashMap::new(), })); @@ -48,19 +68,28 @@ impl Db { Db { state } } - // pub(crate) fn new_camera(&self, road: u32, mile: u32, limit: u32) {} + pub(crate) fn add_camera(&self, camera_id: CameraId, camera: Camera) { + let mut state = self.state.lock().unwrap(); + state.cameras.insert(camera_id, camera); + debug!(?state); + } pub(crate) fn add_dispatcher( &self, + dispatcher_id: DispatcherId, roads: Vec, - writer_stream: broadcast::Sender, + writer_stream: mpsc::Sender, ) { let mut state = self.state.lock().unwrap(); - state.dispatchers.insert(roads, writer_stream); + state + .dispatchers + .insert(dispatcher_id, (roads, writer_stream)); + debug!(?state); } - pub(crate) fn insert_plate(&self, plate: String, timestamp: u32) { + pub(crate) fn insert_plate(&self, camera_id: CameraId, plate: Plate) { let mut state = self.state.lock().unwrap(); - state.plates.insert(plate, timestamp); + state.plates.insert(camera_id, plate); + debug!(?state); } } diff --git a/problem_06/src/lib.rs b/problem_06/src/lib.rs index 9cb67cc..ee65b73 100644 --- a/problem_06/src/lib.rs +++ b/problem_06/src/lib.rs @@ -8,8 +8,6 @@ pub mod db; pub mod server; -pub mod ticketing; - mod shutdown; use shutdown::Shutdown; diff --git a/problem_06/src/server.rs b/problem_06/src/server.rs index 7ec9add..08a1008 100644 --- a/problem_06/src/server.rs +++ b/problem_06/src/server.rs @@ -1,7 +1,8 @@ use crate::{ - db::{Db, DbHolder}, + connection::ConnectionType, + db::{Camera, CameraId, Db, DbHolder, DispatcherId, Plate}, frame::{ClientFrames, ServerFrames}, - ticketing, Connection, Shutdown, + Connection, Shutdown, }; use std::future::Future; @@ -21,9 +22,7 @@ struct Listener { struct Handler { connection: Connection, - receive_ticket: broadcast::Receiver, - send_ticket: broadcast::Sender, - connection_type: Option, + connection_type: Option, db: Db, shutdown: Shutdown, _shutdown_complete: mpsc::Sender<()>, @@ -72,11 +71,6 @@ impl Listener { async fn run(&mut self) -> crate::Result<()> { info!("accepting inbound connections"); - let (send_ticket, _): ( - broadcast::Sender, - broadcast::Receiver, - ) = broadcast::channel(4096); - loop { let permit = self .limit_connections @@ -86,11 +80,10 @@ impl Listener { .unwrap(); let socket = self.accept().await?; + let address = socket.peer_addr()?; let mut handler = Handler { - connection: Connection::new(socket), - send_ticket: send_ticket.clone(), - receive_ticket: send_ticket.subscribe(), + connection: Connection::new(address, socket), connection_type: None, db: self.db_holder.db(), shutdown: Shutdown::new(self.notify_shutdown.subscribe()), @@ -130,74 +123,95 @@ impl Listener { impl Handler { async fn run(&mut self) -> crate::Result<()> { + let (send_message, mut receive_message): ( + mpsc::Sender, + mpsc::Receiver, + ) = mpsc::channel(1024); + while !self.shutdown.is_shutdown() { - let maybe_frame = tokio::select! { - res = self.connection.read_frame() => res?, + tokio::select! { + res = self.connection.read_frame() => { + match res? { + Some(frame) => { + info!("Received frame"); + let _ = self.handle_client_frame(self.db.clone(), frame, send_message.clone()).await; + }, + None => return Ok(()), + } + + } + message = receive_message.recv() => { + match message { + Some(message) => { + let _ = self.connection.write_frame(message).await; + }, + None => (), + } + } _ = self.shutdown.recv() => { debug!("Shutdown"); return Ok(()); } - message = self.receive_ticket.recv() => { - match message { - Ok(message) => { - match message { - ServerFrames::Ticket { - .. - } => { - let _ = self.connection.write_frame(message).await; - } - _ => () - } - }, - Err(_) => return Ok(()), - } - None - } }; - - let frame = match maybe_frame { - Some(frame) => frame, - None => return Ok(()), - }; - - match frame { - ClientFrames::Plate { plate, timestamp } => { - info!("Insert {plate} and {timestamp}"); - self.db.insert_plate(plate, timestamp); - } - ClientFrames::WantHeartbeat { interval } => { - info!("Want heartbeat: {interval}"); - } - ClientFrames::IAmCamera { road, mile, limit } => { - if self.connection_type.is_some() { - return Err("Already connected".into()); - } - self.set_connection_type(ticketing::ClientType::Camera(road, mile)); - info!("Road: {road}, mile: {mile}, limit: {limit}"); - } - ClientFrames::IAmDispatcher { roads } => { - if self.connection_type.is_some() { - return Err("Already connected".into()); - } - - self.set_connection_type(ticketing::ClientType::Dispatcher); - self.db - .add_dispatcher(roads.to_vec(), self.send_ticket.clone()); - } - } } Ok(()) } - fn set_connection_type(&mut self, connection_type: ticketing::ClientType) { + fn set_connection_type(&mut self, connection_type: ConnectionType) { match connection_type { - ticketing::ClientType::Camera(_, _) => { + ConnectionType::Camera => { self.connection_type = Some(connection_type); } - ticketing::ClientType::Dispatcher => { + ConnectionType::Dispatcher => { self.connection_type = Some(connection_type); } } } + + async fn handle_client_frame( + &mut self, + db: Db, + frame: ClientFrames, + send_message: mpsc::Sender, + ) -> crate::Result<()> { + match frame { + ClientFrames::Plate { plate, timestamp } => { + info!("Receive new plate {plate} {timestamp}"); + db.insert_plate( + CameraId(self.connection.get_address()), + Plate { plate, timestamp }, + ); + } + ClientFrames::WantHeartbeat { interval } => { + info!("Want heartbeat: {interval}"); + } + ClientFrames::IAmCamera { road, mile, limit } => { + if self.connection_type.is_some() { + return Err("Already connected".into()); + } + self.set_connection_type(ConnectionType::Camera); + info!("Set connection type to camera"); + + db.add_camera( + CameraId(self.connection.get_address()), + Camera { road, mile, limit }, + ); + } + ClientFrames::IAmDispatcher { roads } => { + if self.connection_type.is_some() { + return Err("Already connected".into()); + } + + self.set_connection_type(ConnectionType::Dispatcher); + db.add_dispatcher( + DispatcherId(self.connection.get_address()), + roads.to_vec(), + send_message.clone(), + ); + } + } + + Ok(()) + } } diff --git a/problem_06/src/ticketing.rs b/problem_06/src/ticketing.rs deleted file mode 100644 index 44784ec..0000000 --- a/problem_06/src/ticketing.rs +++ /dev/null @@ -1,4 +0,0 @@ -pub(crate) enum ClientType { - Camera(u16, u16), - Dispatcher, -}