From 0fdc85a630b54f7bc8d204a7b1f654642966bea7 Mon Sep 17 00:00:00 2001 From: Bastian Gruber Date: Fri, 19 May 2023 15:01:12 +0200 Subject: [PATCH] cargo fmt, restructure db for dispatchers --- problem_06/bin/client.rs | 138 +++++------ problem_06/bin/server.rs | 14 +- problem_06/rustfmt.toml | 6 + problem_06/src/connection.rs | 120 +++++----- problem_06/src/db.rs | 114 ++++----- problem_06/src/frame.rs | 448 +++++++++++++++++------------------ problem_06/src/server.rs | 352 +++++++++++++-------------- problem_06/src/shutdown.rs | 36 +-- 8 files changed, 620 insertions(+), 608 deletions(-) create mode 100644 problem_06/rustfmt.toml diff --git a/problem_06/bin/client.rs b/problem_06/bin/client.rs index d897c80..9e29ee1 100644 --- a/problem_06/bin/client.rs +++ b/problem_06/bin/client.rs @@ -1,98 +1,100 @@ use problem_06::{DEFAULT_IP, DEFAULT_PORT}; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; -use tokio::net::{tcp::WriteHalf, TcpStream}; +use tokio::{ + io::{AsyncReadExt, AsyncWriteExt}, + net::{tcp::WriteHalf, TcpStream}, +}; use tracing::{debug, error, info}; #[tokio::main] async fn main() -> Result<(), Box> { - tracing_subscriber::fmt::init(); + tracing_subscriber::fmt::init(); - let mut stream = TcpStream::connect(format!("{DEFAULT_IP}:{DEFAULT_PORT}")).await?; - let (mut read, mut write) = stream.split(); + let mut stream = TcpStream::connect(format!("{DEFAULT_IP}:{DEFAULT_PORT}")).await?; + let (mut read, mut write) = stream.split(); - // test_all_different_messages(&mut write).await?; - test_camera_connection(&mut write).await?; + // test_all_different_messages(&mut write).await?; + test_camera_connection(&mut write).await?; - let mut buf: [u8; 4] = [0; 4]; + let mut buf: [u8; 4] = [0; 4]; - if let Ok(n) = read.read_exact(&mut buf).await { - info!("Stream incoming..."); + if let Ok(n) = read.read_exact(&mut buf).await { + info!("Stream incoming..."); - if n == 0 { - info!("End of stream"); - return Ok(()); - } + if n == 0 { + info!("End of stream"); + return Ok(()); + } - let message = i32::from_be_bytes(buf); - debug!(?message); - return Ok(()); - } + let message = i32::from_be_bytes(buf); + debug!(?message); + return Ok(()); + } - error!("Cannot read from socket"); - Err("Could not read from socket".into()) + error!("Cannot read from socket"); + Err("Could not read from socket".into()) } #[allow(dead_code)] async fn test_all_different_messages( - write: &mut WriteHalf<'_>, + write: &mut WriteHalf<'_>, ) -> Result<(), Box> { - // 20 Plate { - // 07 52 45 30 35 42 4b 47 plate: "RE05BKG", - // 00 01 e2 40 timestamp: 123456 - // } - let plate = [ - 0x20, 0x07, 0x52, 0x45, 0x30, 0x35, 0x42, 0x4b, 0x47, 0x00, 0x01, 0xe2, 0x40, - ]; + // 20 Plate { + // 07 52 45 30 35 42 4b 47 plate: "RE05BKG", + // 00 01 e2 40 timestamp: 123456 + // } + let plate = [ + 0x20, 0x07, 0x52, 0x45, 0x30, 0x35, 0x42, 0x4b, 0x47, 0x00, 0x01, 0xe2, 0x40, + ]; - // 40 WantHeartbeat{ - // 00 00 00 0a interval: 10 - // } - let want_heartbeat = [0x40, 0x00, 0x00, 0x00, 0x0a]; + // 40 WantHeartbeat{ + // 00 00 00 0a interval: 10 + // } + let want_heartbeat = [0x40, 0x00, 0x00, 0x00, 0x0a]; - // 80 IAmCamera{ - // 00 42 road: 66, - // 00 64 mile: 100, - // 00 3c limit: 60, - // } - let i_am_camera = [0x80, 0x00, 0x42, 0x00, 0x64, 0x00, 0x3c]; + // 80 IAmCamera{ + // 00 42 road: 66, + // 00 64 mile: 100, + // 00 3c limit: 60, + // } + let i_am_camera = [0x80, 0x00, 0x42, 0x00, 0x64, 0x00, 0x3c]; - // 81 IAmDispatcher{ - // 03 roads: [ - // 00 42 66, - // 01 70 368, - // 13 88 5000 - // ] - // } - let i_am_dispatcher = [0x81, 0x03, 0x00, 0x42, 0x01, 0x70, 0x13, 0x88]; + // 81 IAmDispatcher{ + // 03 roads: [ + // 00 42 66, + // 01 70 368, + // 13 88 5000 + // ] + // } + let i_am_dispatcher = [0x81, 0x03, 0x00, 0x42, 0x01, 0x70, 0x13, 0x88]; - write.write_all(&plate).await?; - write.write_all(&want_heartbeat).await?; - write.write_all(&i_am_camera).await?; - write.write_all(&i_am_dispatcher).await?; + write.write_all(&plate).await?; + write.write_all(&want_heartbeat).await?; + write.write_all(&i_am_camera).await?; + write.write_all(&i_am_dispatcher).await?; - Ok(()) + Ok(()) } async fn test_camera_connection( - write: &mut WriteHalf<'_>, + write: &mut WriteHalf<'_>, ) -> Result<(), Box> { - // 80 IAmCamera{ - // 00 42 road: 66, - // 00 64 mile: 100, - // 00 3c limit: 60, - // } - let i_am_camera = [0x80, 0x00, 0x42, 0x00, 0x64, 0x00, 0x3c]; + // 80 IAmCamera{ + // 00 42 road: 66, + // 00 64 mile: 100, + // 00 3c limit: 60, + // } + let i_am_camera = [0x80, 0x00, 0x42, 0x00, 0x64, 0x00, 0x3c]; - // 20 Plate { - // 07 52 45 30 35 42 4b 47 plate: "RE05BKG", - // 00 01 e2 40 timestamp: 123456 - // } - let plate = [ - 0x20, 0x07, 0x52, 0x45, 0x30, 0x35, 0x42, 0x4b, 0x47, 0x00, 0x01, 0xe2, 0x40, - ]; + // 20 Plate { + // 07 52 45 30 35 42 4b 47 plate: "RE05BKG", + // 00 01 e2 40 timestamp: 123456 + // } + let plate = [ + 0x20, 0x07, 0x52, 0x45, 0x30, 0x35, 0x42, 0x4b, 0x47, 0x00, 0x01, 0xe2, 0x40, + ]; - write.write_all(&i_am_camera).await?; - write.write_all(&plate).await?; + write.write_all(&i_am_camera).await?; + write.write_all(&plate).await?; - Ok(()) + Ok(()) } diff --git a/problem_06/bin/server.rs b/problem_06/bin/server.rs index 3f53949..a3fc428 100644 --- a/problem_06/bin/server.rs +++ b/problem_06/bin/server.rs @@ -1,16 +1,14 @@ use problem_06::{server, DEFAULT_IP, DEFAULT_PORT}; - -use tokio::net::TcpListener; -use tokio::signal; +use tokio::{net::TcpListener, signal}; #[tokio::main] pub async fn main() -> problem_06::Result<()> { - tracing_subscriber::fmt::try_init().expect("Couldn't setup logging"); + tracing_subscriber::fmt::try_init().expect("Couldn't setup logging"); - // Bind a TCP listener - let listener = TcpListener::bind(&format!("{DEFAULT_IP}:{DEFAULT_PORT}")).await?; + // Bind a TCP listener + let listener = TcpListener::bind(&format!("{DEFAULT_IP}:{DEFAULT_PORT}")).await?; - let _ = server::run(listener, signal::ctrl_c()).await; + let _ = server::run(listener, signal::ctrl_c()).await; - Ok(()) + Ok(()) } diff --git a/problem_06/rustfmt.toml b/problem_06/rustfmt.toml new file mode 100644 index 0000000..abf4214 --- /dev/null +++ b/problem_06/rustfmt.toml @@ -0,0 +1,6 @@ +hard_tabs = true +imports_granularity = "Crate" +reorder_impl_items = true +reorder_imports = true +group_imports = "StdExternalCrate" +reorder_modules = true \ No newline at end of file diff --git a/problem_06/src/connection.rs b/problem_06/src/connection.rs index b201ea7..fd7d9d9 100644 --- a/problem_06/src/connection.rs +++ b/problem_06/src/connection.rs @@ -1,81 +1,83 @@ -use crate::frame::{self, ClientFrames, ServerFrames}; +use std::{io::Cursor, net::SocketAddr}; use bytes::{Buf, BytesMut}; -use std::io::Cursor; -use std::net::SocketAddr; -use tokio::io::{AsyncReadExt, AsyncWriteExt, BufWriter}; -use tokio::net::TcpStream; +use tokio::{ + io::{AsyncReadExt, AsyncWriteExt, BufWriter}, + net::TcpStream, +}; use tracing::{debug, info}; +use crate::frame::{self, ClientFrames, ServerFrames}; + pub(crate) enum ConnectionType { - Camera, - Dispatcher, + Camera, + Dispatcher, } #[derive(Debug)] pub struct Connection { - pub address: SocketAddr, - buffer: BytesMut, - pub(crate) stream: BufWriter, + pub address: SocketAddr, + buffer: BytesMut, + pub(crate) stream: BufWriter, } impl Connection { - pub fn new(address: SocketAddr, socket: TcpStream) -> Connection { - Connection { - address, - buffer: BytesMut::with_capacity(4 * 1024), - stream: BufWriter::new(socket), - } - } + pub fn new(address: SocketAddr, socket: TcpStream) -> Connection { + Connection { + address, + buffer: BytesMut::with_capacity(4 * 1024), + stream: BufWriter::new(socket), + } + } - pub fn get_address(&self) -> SocketAddr { - self.address.clone() - } + pub fn get_address(&self) -> SocketAddr { + self.address.clone() + } - pub async fn read_frame(&mut self) -> crate::Result> { - loop { - info!("Loop read_frame"); - if let Some(frame) = self.parse_frame()? { - info!("Frame parsed"); - return Ok(Some(frame)); - } + pub async fn read_frame(&mut self) -> crate::Result> { + loop { + info!("Loop read_frame"); + if let Some(frame) = self.parse_frame()? { + info!("Frame parsed"); + return Ok(Some(frame)); + } - if 0 == self.stream.read_buf(&mut self.buffer).await? { - if self.buffer.is_empty() { - return Ok(None); - } else { - return Err("connection reset by peer".into()); - } - } - } - } + if 0 == self.stream.read_buf(&mut self.buffer).await? { + if self.buffer.is_empty() { + return Ok(None); + } else { + return Err("connection reset by peer".into()); + } + } + } + } - fn parse_frame(&mut self) -> crate::Result> { - use frame::Error::Incomplete; + fn parse_frame(&mut self) -> crate::Result> { + use frame::Error::Incomplete; - let mut buf = Cursor::new(&self.buffer[..]); - debug!(?buf); + let mut buf = Cursor::new(&self.buffer[..]); + debug!(?buf); - match ClientFrames::check(&mut buf) { - Ok(_) => { - info!("Frame::check succesful"); - let len = buf.position() as usize; - debug!(?len); - buf.set_position(0); + match ClientFrames::check(&mut buf) { + Ok(_) => { + info!("Frame::check succesful"); + let len = buf.position() as usize; + debug!(?len); + buf.set_position(0); - let frame = ClientFrames::parse(&mut buf)?; - self.buffer.advance(len); + let frame = ClientFrames::parse(&mut buf)?; + self.buffer.advance(len); - Ok(Some(frame)) - } - Err(Incomplete) => Ok(None), - Err(e) => Err(e.into()), - } - } + Ok(Some(frame)) + } + Err(Incomplete) => Ok(None), + Err(e) => Err(e.into()), + } + } - pub async fn write_frame(&mut self, frame: ServerFrames) -> tokio::io::Result<()> { - let _ = self.stream.write_all(&frame.convert_to_bytes()).await; - self.stream.flush().await?; - Ok(()) - } + pub async fn write_frame(&mut self, frame: ServerFrames) -> tokio::io::Result<()> { + let _ = self.stream.write_all(&frame.convert_to_bytes()).await; + self.stream.flush().await?; + Ok(()) + } } diff --git a/problem_06/src/db.rs b/problem_06/src/db.rs index 02e8e74..06d12a2 100644 --- a/problem_06/src/db.rs +++ b/problem_06/src/db.rs @@ -1,6 +1,9 @@ -use std::collections::HashMap; -use std::net::SocketAddr; -use std::sync::{Arc, Mutex}; +use std::{ + collections::HashMap, + net::SocketAddr, + sync::{Arc, Mutex}, +}; + use tokio::sync::mpsc; use tracing::debug; @@ -14,82 +17,83 @@ pub(crate) struct CameraId(pub(crate) SocketAddr); #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone)] pub(crate) struct Plate { - pub(crate) plate: String, - pub(crate) timestamp: u32, + pub(crate) plate: String, + pub(crate) timestamp: u32, } #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone)] pub(crate) struct Camera { - pub(crate) road: u16, - pub(crate) mile: u16, - pub(crate) limit: u16, + pub(crate) road: u16, + pub(crate) mile: u16, + pub(crate) limit: u16, } +#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone)] +pub(crate) struct Road(u16); + pub(crate) struct DbHolder { - /// The `Db` instance that will be shut down when this `DbHolder` struct - /// is dropped. - db: Db, + db: Db, } #[derive(Clone)] pub(crate) struct Db { - state: Arc>, + state: Arc>, } #[derive(Debug)] struct State { - cameras: HashMap, - dispatchers: HashMap, mpsc::Sender)>, - plates: HashMap, + cameras: HashMap, + dispatchers: HashMap)>, + plates: HashMap, } impl DbHolder { - /// Create a new `DbHolder`, wrapping a `Db` instance. When this is dropped - /// the `Db`'s purge task will be shut down. - pub(crate) fn new() -> DbHolder { - DbHolder { db: Db::new() } - } + pub(crate) fn new() -> DbHolder { + DbHolder { db: Db::new() } + } - /// Get the shared database. Internally, this is an - /// `Arc`, so a clone only increments the ref count. - pub(crate) fn db(&self) -> Db { - self.db.clone() - } + pub(crate) fn db(&self) -> Db { + self.db.clone() + } } impl Db { - pub(crate) fn new() -> Db { - let state = Arc::new(Mutex::new(State { - cameras: HashMap::new(), - dispatchers: HashMap::new(), - plates: HashMap::new(), - })); + pub(crate) fn new() -> Db { + let state = Arc::new(Mutex::new(State { + cameras: HashMap::new(), + dispatchers: HashMap::new(), + plates: HashMap::new(), + })); - Db { state } - } + Db { state } + } - pub(crate) fn add_camera(&self, camera_id: CameraId, camera: Camera) { - let mut state = self.state.lock().unwrap(); - state.cameras.insert(camera_id, camera); - debug!(?state); - } + pub(crate) fn add_camera(&self, camera_id: CameraId, camera: Camera) { + let mut state = self.state.lock().unwrap(); + state.cameras.insert(camera_id, camera); + debug!(?state); + } - pub(crate) fn add_dispatcher( - &self, - dispatcher_id: DispatcherId, - roads: Vec, - writer_stream: mpsc::Sender, - ) { - let mut state = self.state.lock().unwrap(); - state - .dispatchers - .insert(dispatcher_id, (roads, writer_stream)); - debug!(?state); - } + pub(crate) fn add_dispatcher( + &self, + dispatcher_id: DispatcherId, + roads: Vec, + writer_stream: mpsc::Sender, + ) { + let mut state = self.state.lock().unwrap(); - pub(crate) fn insert_plate(&self, camera_id: CameraId, plate: Plate) { - let mut state = self.state.lock().unwrap(); - state.plates.insert(camera_id, plate); - debug!(?state); - } + for r in roads.iter() { + state + .dispatchers + .insert(Road(*r), (dispatcher_id.clone(), writer_stream.clone())); + } + + debug!(?state); + } + + pub(crate) fn insert_plate(&self, camera_id: CameraId, plate: Plate) { + let mut state = self.state.lock().unwrap(); + state.plates.insert(camera_id, plate); + debug!(?state); + } } diff --git a/problem_06/src/frame.rs b/problem_06/src/frame.rs index 86cbd1e..4679e55 100644 --- a/problem_06/src/frame.rs +++ b/problem_06/src/frame.rs @@ -1,304 +1,302 @@ +use std::{fmt, io::Cursor, num::TryFromIntError, string::FromUtf8Error}; + use bytes::{Buf, BufMut, BytesMut}; -use std::fmt; -use std::io::Cursor; -use std::num::TryFromIntError; -use std::string::FromUtf8Error; use tracing::{debug, error}; #[derive(Clone, Debug)] pub enum ClientFrames { - Plate { plate: String, timestamp: u32 }, - WantHeartbeat { interval: u32 }, - IAmCamera { road: u16, mile: u16, limit: u16 }, - IAmDispatcher { roads: Vec }, + Plate { plate: String, timestamp: u32 }, + WantHeartbeat { interval: u32 }, + IAmCamera { road: u16, mile: u16, limit: u16 }, + IAmDispatcher { roads: Vec }, } #[derive(Clone, Debug)] pub enum ServerFrames { - Error { - msg: String, - }, - Ticket { - plate: String, - road: u16, - mile1: u16, - timestamp1: u32, - mile2: u16, - timestamp2: u32, - speed: u16, - }, - Heartbeat, + Error { + msg: String, + }, + Ticket { + plate: String, + road: u16, + mile1: u16, + timestamp1: u32, + mile2: u16, + timestamp2: u32, + speed: u16, + }, + Heartbeat, } #[derive(Debug)] pub enum Error { - Incomplete, - Other(crate::Error), + Incomplete, + Other(crate::Error), } impl ClientFrames { - pub fn check(src: &mut Cursor<&[u8]>) -> Result<(), Error> { - match get_u8(src)? { - // Error: msg: str (Server -> Client) - // 0x10 => { - // let n = get_length(src)?; - // skip(src, n as usize) - // } - // Plate: plate: str, timestamp: u32 - 0x20 => { - // Read length character of the plate string - let n = get_length(src)?; - // Skip the string to get to the timestamp - skip(src, n)?; - // check if valid timestamp - get_u32(src)?; - Ok(()) - } - // Ticket (just Server -> Client) - // 0x21 => { - // Ok(()) - // } - // Want Heartbeat: interval: u32 - 0x40 => { - get_u32(src)?; - Ok(()) - } - // Heartbeat (just Server -> Client) - // 0x41 => { - // Ok(()) - // } - // IAmCamera: road: u16, mile: u16, limit: u16 - 0x80 => { - // road - get_u16(src)?; - // mile - get_u16(src)?; - // limit - get_u16(src)?; - Ok(()) - } - // IAmDispatcher: numroads: u8, roads: [u16] - 0x81 => { - // numroads - let amount = get_u8(src)? * 2; - // roads - skip(src, amount as usize)?; - Ok(()) - } - actual => Err(format!("protocol error; invalid frame type byte `{}`", actual).into()), - } - } + pub fn check(src: &mut Cursor<&[u8]>) -> Result<(), Error> { + match get_u8(src)? { + // Error: msg: str (Server -> Client) + // 0x10 => { + // let n = get_length(src)?; + // skip(src, n as usize) + // } + // Plate: plate: str, timestamp: u32 + 0x20 => { + // Read length character of the plate string + let n = get_length(src)?; + // Skip the string to get to the timestamp + skip(src, n)?; + // check if valid timestamp + get_u32(src)?; + Ok(()) + } + // Ticket (just Server -> Client) + // 0x21 => { + // Ok(()) + // } + // Want Heartbeat: interval: u32 + 0x40 => { + get_u32(src)?; + Ok(()) + } + // Heartbeat (just Server -> Client) + // 0x41 => { + // Ok(()) + // } + // IAmCamera: road: u16, mile: u16, limit: u16 + 0x80 => { + // road + get_u16(src)?; + // mile + get_u16(src)?; + // limit + get_u16(src)?; + Ok(()) + } + // IAmDispatcher: numroads: u8, roads: [u16] + 0x81 => { + // numroads + let amount = get_u8(src)? * 2; + // roads + skip(src, amount as usize)?; + Ok(()) + } + actual => Err(format!("protocol error; invalid frame type byte `{}`", actual).into()), + } + } - pub fn parse(src: &mut Cursor<&[u8]>) -> Result { - match get_u8(src)? { - // Error: msg: str (Server -> Client) - // 0x10 => { - // let n = get_length(src)?; - // let msg = get_str(src, n)?.to_string(); - // Ok(Frame::Error { msg }) - // } - // Plate: plate: str, timestamp: u32 - 0x20 => { - // Read length character of the plate string - let n = get_length(src)?; - // Skip the string to get to the timestamp - let plate = get_str(src, n)?.to_string(); - // check if valid timestamp - let timestamp = get_u32(src)?; - Ok(ClientFrames::Plate { plate, timestamp }) - } - // Ticket (just Server -> Client) - // 0x21 => { - // Ok(()) - // } - // Want Heartbeat: interval: u32 - 0x40 => { - let interval = get_u32(src)?; - Ok(ClientFrames::WantHeartbeat { interval }) - } - // Heartbeat (just Server -> Client) - // 0x41 => { - // Ok(()) - // } - // IAmCamera: road: u16, mile: u16, limit: u16 - 0x80 => { - // road - let road = get_u16(src)?; - // mile - let mile = get_u16(src)?; - // limit - let limit = get_u16(src)?; - Ok(ClientFrames::IAmCamera { road, mile, limit }) - } - // IAmDispatcher: numroads: u8, roads: [u16] - 0x81 => { - // numroads - let numroads = get_u8(src)?; - // roads - let roads = get_u16_vec(src, numroads as usize)?; + pub fn parse(src: &mut Cursor<&[u8]>) -> Result { + match get_u8(src)? { + // Error: msg: str (Server -> Client) + // 0x10 => { + // let n = get_length(src)?; + // let msg = get_str(src, n)?.to_string(); + // Ok(Frame::Error { msg }) + // } + // Plate: plate: str, timestamp: u32 + 0x20 => { + // Read length character of the plate string + let n = get_length(src)?; + // Skip the string to get to the timestamp + let plate = get_str(src, n)?.to_string(); + // check if valid timestamp + let timestamp = get_u32(src)?; + Ok(ClientFrames::Plate { plate, timestamp }) + } + // Ticket (just Server -> Client) + // 0x21 => { + // Ok(()) + // } + // Want Heartbeat: interval: u32 + 0x40 => { + let interval = get_u32(src)?; + Ok(ClientFrames::WantHeartbeat { interval }) + } + // Heartbeat (just Server -> Client) + // 0x41 => { + // Ok(()) + // } + // IAmCamera: road: u16, mile: u16, limit: u16 + 0x80 => { + // road + let road = get_u16(src)?; + // mile + let mile = get_u16(src)?; + // limit + let limit = get_u16(src)?; + Ok(ClientFrames::IAmCamera { road, mile, limit }) + } + // IAmDispatcher: numroads: u8, roads: [u16] + 0x81 => { + // numroads + let numroads = get_u8(src)?; + // roads + let roads = get_u16_vec(src, numroads as usize)?; - Ok(ClientFrames::IAmDispatcher { roads }) - } - actual => Err(format!("protocol error; invalid frame type byte `{}`", actual).into()), - } - } + Ok(ClientFrames::IAmDispatcher { roads }) + } + actual => Err(format!("protocol error; invalid frame type byte `{}`", actual).into()), + } + } } impl ServerFrames { - pub(crate) fn convert_to_bytes(&self) -> BytesMut { - match self { - ServerFrames::Error { msg } => { - let mut buf = BytesMut::with_capacity(1 + 1 + msg.len()); + pub(crate) fn convert_to_bytes(&self) -> BytesMut { + match self { + ServerFrames::Error { msg } => { + let mut buf = BytesMut::with_capacity(1 + 1 + msg.len()); - buf.put_u8(0x10); - buf.put_u8(msg.len() as u8); - buf.put_slice(msg.as_bytes()); + buf.put_u8(0x10); + buf.put_u8(msg.len() as u8); + buf.put_slice(msg.as_bytes()); - return buf; - } - ServerFrames::Ticket { - plate, - road, - mile1, - timestamp1, - mile2, - timestamp2, - speed, - } => { - let mut buf = BytesMut::with_capacity(1 + 1 + plate.len() + 2 + 2 + 4 + 2 + 4 + 2); + return buf; + } + ServerFrames::Ticket { + plate, + road, + mile1, + timestamp1, + mile2, + timestamp2, + speed, + } => { + let mut buf = BytesMut::with_capacity(1 + 1 + plate.len() + 2 + 2 + 4 + 2 + 4 + 2); - buf.put_u8(0x21); - buf.put_u8(plate.len() as u8); - buf.put_slice(plate.as_bytes()); - buf.put_u16(*road); - buf.put_u16(*mile1); - buf.put_u32(*timestamp1); - buf.put_u16(*mile2); - buf.put_u32(*timestamp2); - buf.put_u16(*speed); + buf.put_u8(0x21); + buf.put_u8(plate.len() as u8); + buf.put_slice(plate.as_bytes()); + buf.put_u16(*road); + buf.put_u16(*mile1); + buf.put_u32(*timestamp1); + buf.put_u16(*mile2); + buf.put_u32(*timestamp2); + buf.put_u16(*speed); - return buf; - } - ServerFrames::Heartbeat => { - let mut buf = BytesMut::new(); + return buf; + } + ServerFrames::Heartbeat => { + let mut buf = BytesMut::new(); - buf.put_u8(0x41); + buf.put_u8(0x41); - return buf; - } - } - } + return buf; + } + } + } } fn get_str<'a>(src: &mut Cursor<&'a [u8]>, len: usize) -> Result<&'a str, Error> { - if src.remaining() < len { - return Err(Error::Incomplete); - } + if src.remaining() < len { + return Err(Error::Incomplete); + } - let position = src.position() as usize; - let slice = &src.get_ref()[position..position + len]; + let position = src.position() as usize; + let slice = &src.get_ref()[position..position + len]; - let message = - std::str::from_utf8(slice).map_err(|_| "protocol error; invalid frame format".into()); + let message = + std::str::from_utf8(slice).map_err(|_| "protocol error; invalid frame format".into()); - src.advance(len); + src.advance(len); - message + message } fn get_u16_vec<'a>(src: &mut Cursor<&'a [u8]>, len: usize) -> Result, Error> { - if src.remaining() < len { - return Err(Error::Incomplete); - } + if src.remaining() < len { + return Err(Error::Incomplete); + } - let mut roads = Vec::new(); + let mut roads = Vec::new(); - for _ in 0..len { - let road = src.get_u16(); - debug!(?road); - roads.push(road); - } + for _ in 0..len { + let road = src.get_u16(); + debug!(?road); + roads.push(road); + } - Ok(roads) + Ok(roads) } fn skip(src: &mut Cursor<&[u8]>, n: usize) -> Result<(), Error> { - if src.remaining() < n { - return Err(Error::Incomplete); - } + if src.remaining() < n { + return Err(Error::Incomplete); + } - src.advance(n); - Ok(()) + src.advance(n); + Ok(()) } fn get_u8(src: &mut Cursor<&[u8]>) -> Result { - if !src.has_remaining() { - error!("Incomplete frame"); - return Err(Error::Incomplete); - } + if !src.has_remaining() { + error!("Incomplete frame"); + return Err(Error::Incomplete); + } - Ok(src.get_u8()) + Ok(src.get_u8()) } fn get_u16(src: &mut Cursor<&[u8]>) -> Result { - if !src.has_remaining() { - error!("Incomplete frame"); - return Err(Error::Incomplete); - } + if !src.has_remaining() { + error!("Incomplete frame"); + return Err(Error::Incomplete); + } - Ok(src.get_u16()) + Ok(src.get_u16()) } fn get_u32(src: &mut Cursor<&[u8]>) -> Result { - if !src.has_remaining() { - error!("Incomplete frame"); - return Err(Error::Incomplete); - } + if !src.has_remaining() { + error!("Incomplete frame"); + return Err(Error::Incomplete); + } - Ok(src.get_u32()) + Ok(src.get_u32()) } // Same as get_u8, but the current cursor points to the byte of the length of a message string. fn get_length(src: &mut Cursor<&[u8]>) -> Result { - if !src.has_remaining() { - error!("Incomplete frame"); - return Err(Error::Incomplete); - } + if !src.has_remaining() { + error!("Incomplete frame"); + return Err(Error::Incomplete); + } - Ok(src.get_u8() as usize) + Ok(src.get_u8() as usize) } impl From for Error { - fn from(src: String) -> Error { - Error::Other(src.into()) - } + fn from(src: String) -> Error { + Error::Other(src.into()) + } } impl From<&str> for Error { - fn from(src: &str) -> Error { - src.to_string().into() - } + fn from(src: &str) -> Error { + src.to_string().into() + } } impl From for Error { - fn from(_src: FromUtf8Error) -> Error { - "protocol error; invalid frame format".into() - } + fn from(_src: FromUtf8Error) -> Error { + "protocol error; invalid frame format".into() + } } impl From for Error { - fn from(_src: TryFromIntError) -> Error { - "protocol error; invalid frame format".into() - } + fn from(_src: TryFromIntError) -> Error { + "protocol error; invalid frame format".into() + } } impl std::error::Error for Error {} impl fmt::Display for Error { - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - match self { - Error::Incomplete => "stream ended early".fmt(fmt), - Error::Other(err) => err.fmt(fmt), - } - } + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + match self { + Error::Incomplete => "stream ended early".fmt(fmt), + Error::Other(err) => err.fmt(fmt), + } + } } diff --git a/problem_06/src/server.rs b/problem_06/src/server.rs index 08a1008..7c6eb0b 100644 --- a/problem_06/src/server.rs +++ b/problem_06/src/server.rs @@ -1,217 +1,219 @@ -use crate::{ - connection::ConnectionType, - db::{Camera, CameraId, Db, DbHolder, DispatcherId, Plate}, - frame::{ClientFrames, ServerFrames}, - Connection, Shutdown, -}; +use std::{future::Future, sync::Arc}; -use std::future::Future; -use std::sync::Arc; -use tokio::net::{TcpListener, TcpStream}; -use tokio::sync::{broadcast, mpsc, Semaphore}; -use tokio::time::{self, Duration}; +use tokio::{ + net::{TcpListener, TcpStream}, + sync::{broadcast, mpsc, Semaphore}, + time::{self, Duration}, +}; use tracing::{debug, error, info}; +use crate::{ + connection::ConnectionType, + db::{Camera, CameraId, Db, DbHolder, DispatcherId, Plate}, + frame::{ClientFrames, ServerFrames}, + Connection, Shutdown, +}; + struct Listener { - listener: TcpListener, - db_holder: DbHolder, - limit_connections: Arc, - notify_shutdown: broadcast::Sender<()>, - shutdown_complete_tx: mpsc::Sender<()>, + listener: TcpListener, + db_holder: DbHolder, + limit_connections: Arc, + notify_shutdown: broadcast::Sender<()>, + shutdown_complete_tx: mpsc::Sender<()>, } struct Handler { - connection: Connection, - connection_type: Option, - db: Db, - shutdown: Shutdown, - _shutdown_complete: mpsc::Sender<()>, + connection: Connection, + connection_type: Option, + db: Db, + shutdown: Shutdown, + _shutdown_complete: mpsc::Sender<()>, } const MAX_CONNECTIONS: usize = 1500; pub async fn run(listener: TcpListener, shutdown: impl Future) -> crate::Result<()> { - let (notify_shutdown, _) = broadcast::channel(1); - let (shutdown_complete_tx, mut shutdown_complete_rx) = mpsc::channel(1); + let (notify_shutdown, _) = broadcast::channel(1); + let (shutdown_complete_tx, mut shutdown_complete_rx) = mpsc::channel(1); - let mut server = Listener { - listener, - db_holder: DbHolder::new(), - limit_connections: Arc::new(Semaphore::new(MAX_CONNECTIONS)), - notify_shutdown, - shutdown_complete_tx, - }; + let mut server = Listener { + listener, + db_holder: DbHolder::new(), + limit_connections: Arc::new(Semaphore::new(MAX_CONNECTIONS)), + notify_shutdown, + shutdown_complete_tx, + }; - tokio::select! { - res = server.run() => { - if let Err(err) = res { - error!(cause = %err, "failed to accept"); - } - } - _ = shutdown => { - info!("shutting down"); - } - } + tokio::select! { + res = server.run() => { + if let Err(err) = res { + error!(cause = %err, "failed to accept"); + } + } + _ = shutdown => { + info!("shutting down"); + } + } - let Listener { - shutdown_complete_tx, - notify_shutdown, - .. - } = server; + let Listener { + shutdown_complete_tx, + notify_shutdown, + .. + } = server; - drop(notify_shutdown); - drop(shutdown_complete_tx); + drop(notify_shutdown); + drop(shutdown_complete_tx); - let _ = shutdown_complete_rx.recv().await; + let _ = shutdown_complete_rx.recv().await; - Ok(()) + Ok(()) } impl Listener { - async fn run(&mut self) -> crate::Result<()> { - info!("accepting inbound connections"); + async fn run(&mut self) -> crate::Result<()> { + info!("accepting inbound connections"); - loop { - let permit = self - .limit_connections - .clone() - .acquire_owned() - .await - .unwrap(); + loop { + let permit = self + .limit_connections + .clone() + .acquire_owned() + .await + .unwrap(); - let socket = self.accept().await?; - let address = socket.peer_addr()?; + let socket = self.accept().await?; + let address = socket.peer_addr()?; - let mut handler = Handler { - connection: Connection::new(address, socket), - connection_type: None, - db: self.db_holder.db(), - shutdown: Shutdown::new(self.notify_shutdown.subscribe()), - _shutdown_complete: self.shutdown_complete_tx.clone(), - }; + let mut handler = Handler { + connection: Connection::new(address, socket), + connection_type: None, + db: self.db_holder.db(), + shutdown: Shutdown::new(self.notify_shutdown.subscribe()), + _shutdown_complete: self.shutdown_complete_tx.clone(), + }; - info!("Created new handler"); + info!("Created new handler"); - tokio::spawn(async move { - if let Err(err) = handler.run().await { - error!(cause = ?err, "connection error"); - } - drop(permit); - }); - } - } + tokio::spawn(async move { + if let Err(err) = handler.run().await { + error!(cause = ?err, "connection error"); + } + drop(permit); + }); + } + } - async fn accept(&mut self) -> crate::Result { - let mut backoff = 1; + async fn accept(&mut self) -> crate::Result { + let mut backoff = 1; - loop { - match self.listener.accept().await { - Ok((socket, _)) => return Ok(socket), - Err(err) => { - if backoff > 64 { - return Err(err.into()); - } - } - } + loop { + match self.listener.accept().await { + Ok((socket, _)) => return Ok(socket), + Err(err) => { + if backoff > 64 { + return Err(err.into()); + } + } + } - time::sleep(Duration::from_secs(backoff)).await; + time::sleep(Duration::from_secs(backoff)).await; - backoff *= 2; - } - } + backoff *= 2; + } + } } impl Handler { - async fn run(&mut self) -> crate::Result<()> { - let (send_message, mut receive_message): ( - mpsc::Sender, - mpsc::Receiver, - ) = mpsc::channel(1024); + async fn run(&mut self) -> crate::Result<()> { + let (send_message, mut receive_message): ( + mpsc::Sender, + mpsc::Receiver, + ) = mpsc::channel(1024); - while !self.shutdown.is_shutdown() { - tokio::select! { - res = self.connection.read_frame() => { - match res? { - Some(frame) => { - info!("Received frame"); - let _ = self.handle_client_frame(self.db.clone(), frame, send_message.clone()).await; - }, - None => return Ok(()), - } + while !self.shutdown.is_shutdown() { + tokio::select! { + res = self.connection.read_frame() => { + match res? { + Some(frame) => { + info!("Received frame"); + let _ = self.handle_client_frame(self.db.clone(), frame, send_message.clone()).await; + }, + None => return Ok(()), + } - } - message = receive_message.recv() => { - match message { - Some(message) => { - let _ = self.connection.write_frame(message).await; - }, - None => (), - } - } - _ = self.shutdown.recv() => { - debug!("Shutdown"); - return Ok(()); - } - }; - } + } + message = receive_message.recv() => { + match message { + Some(message) => { + let _ = self.connection.write_frame(message).await; + }, + None => (), + } + } + _ = self.shutdown.recv() => { + debug!("Shutdown"); + return Ok(()); + } + }; + } - Ok(()) - } + Ok(()) + } - fn set_connection_type(&mut self, connection_type: ConnectionType) { - match connection_type { - ConnectionType::Camera => { - self.connection_type = Some(connection_type); - } - ConnectionType::Dispatcher => { - self.connection_type = Some(connection_type); - } - } - } + fn set_connection_type(&mut self, connection_type: ConnectionType) { + match connection_type { + ConnectionType::Camera => { + self.connection_type = Some(connection_type); + } + ConnectionType::Dispatcher => { + self.connection_type = Some(connection_type); + } + } + } - async fn handle_client_frame( - &mut self, - db: Db, - frame: ClientFrames, - send_message: mpsc::Sender, - ) -> crate::Result<()> { - match frame { - ClientFrames::Plate { plate, timestamp } => { - info!("Receive new plate {plate} {timestamp}"); - db.insert_plate( - CameraId(self.connection.get_address()), - Plate { plate, timestamp }, - ); - } - ClientFrames::WantHeartbeat { interval } => { - info!("Want heartbeat: {interval}"); - } - ClientFrames::IAmCamera { road, mile, limit } => { - if self.connection_type.is_some() { - return Err("Already connected".into()); - } - self.set_connection_type(ConnectionType::Camera); - info!("Set connection type to camera"); + async fn handle_client_frame( + &mut self, + db: Db, + frame: ClientFrames, + send_message: mpsc::Sender, + ) -> crate::Result<()> { + match frame { + ClientFrames::Plate { plate, timestamp } => { + info!("Receive new plate {plate} {timestamp}"); + db.insert_plate( + CameraId(self.connection.get_address()), + Plate { plate, timestamp }, + ); + } + ClientFrames::WantHeartbeat { interval } => { + info!("Want heartbeat: {interval}"); + } + ClientFrames::IAmCamera { road, mile, limit } => { + if self.connection_type.is_some() { + return Err("Already connected".into()); + } + self.set_connection_type(ConnectionType::Camera); + info!("Set connection type to camera"); - db.add_camera( - CameraId(self.connection.get_address()), - Camera { road, mile, limit }, - ); - } - ClientFrames::IAmDispatcher { roads } => { - if self.connection_type.is_some() { - return Err("Already connected".into()); - } + db.add_camera( + CameraId(self.connection.get_address()), + Camera { road, mile, limit }, + ); + } + ClientFrames::IAmDispatcher { roads } => { + if self.connection_type.is_some() { + return Err("Already connected".into()); + } - self.set_connection_type(ConnectionType::Dispatcher); - db.add_dispatcher( - DispatcherId(self.connection.get_address()), - roads.to_vec(), - send_message.clone(), - ); - } - } + self.set_connection_type(ConnectionType::Dispatcher); + db.add_dispatcher( + DispatcherId(self.connection.get_address()), + roads.to_vec(), + send_message.clone(), + ); + } + } - Ok(()) - } + Ok(()) + } } diff --git a/problem_06/src/shutdown.rs b/problem_06/src/shutdown.rs index 1c86f83..3c199b1 100644 --- a/problem_06/src/shutdown.rs +++ b/problem_06/src/shutdown.rs @@ -2,29 +2,29 @@ use tokio::sync::broadcast; #[derive(Debug)] pub(crate) struct Shutdown { - shutdown: bool, - notify: broadcast::Receiver<()>, + shutdown: bool, + notify: broadcast::Receiver<()>, } impl Shutdown { - pub(crate) fn new(notify: broadcast::Receiver<()>) -> Shutdown { - Shutdown { - shutdown: false, - notify, - } - } + pub(crate) fn new(notify: broadcast::Receiver<()>) -> Shutdown { + Shutdown { + shutdown: false, + notify, + } + } - pub(crate) fn is_shutdown(&self) -> bool { - self.shutdown - } + pub(crate) fn is_shutdown(&self) -> bool { + self.shutdown + } - pub(crate) async fn recv(&mut self) { - if self.shutdown { - return; - } + pub(crate) async fn recv(&mut self) { + if self.shutdown { + return; + } - let _ = self.notify.recv().await; + let _ = self.notify.recv().await; - self.shutdown = true; - } + self.shutdown = true; + } }