diff --git a/problem_06/bin/client.rs b/problem_06/bin/client.rs index 6d420cc..817817a 100644 --- a/problem_06/bin/client.rs +++ b/problem_06/bin/client.rs @@ -16,11 +16,35 @@ async fn main() -> Result<(), Box> { // 07 52 45 30 35 42 4b 47 plate: "RE05BKG", // 00 01 e2 40 timestamp: 123456 // } - let message = [ + let plate = [ 0x20, 0x07, 0x52, 0x45, 0x30, 0x35, 0x42, 0x4b, 0x47, 0x00, 0x01, 0xe2, 0x40, ]; - write.write_all(&message).await?; + // 40 WantHeartbeat{ + // 00 00 00 0a interval: 10 + // } + let want_heartbeat = [0x40, 0x00, 0x00, 0x00, 0x0a]; + + // 80 IAmCamera{ + // 00 42 road: 66, + // 00 64 mile: 100, + // 00 3c limit: 60, + // } + let i_am_camera = [0x80, 0x00, 0x42, 0x00, 0x64, 0x00, 0x3c]; + + // 81 IAmDispatcher{ + // 03 roads: [ + // 00 42 66, + // 01 70 368, + // 13 88 5000 + // ] + // } + let i_am_dispatcher = [0x81, 0x03, 0x00, 0x42, 0x01, 0x70, 0x13, 0x88]; + + write.write_all(&plate).await?; + write.write_all(&want_heartbeat).await?; + write.write_all(&i_am_camera).await?; + write.write_all(&i_am_dispatcher).await?; if let Ok(n) = read.read_exact(&mut buf).await { info!("Stream incoming..."); diff --git a/problem_06/src/frame.rs b/problem_06/src/frame.rs index fdfb1fd..21597c9 100644 --- a/problem_06/src/frame.rs +++ b/problem_06/src/frame.rs @@ -3,10 +3,16 @@ use std::fmt; use std::io::Cursor; use std::num::TryFromIntError; use std::string::FromUtf8Error; -use tracing::{debug, error, info}; +use tracing::{debug, error}; #[derive(Clone, Debug)] -pub enum Frame {} +pub enum Frame { + Error { msg: String }, + Plate { plate: String, timestamp: u32 }, + WantHeartbeat { interval: u32 }, + IAmCamera { road: u16, mile: u16, limit: u16 }, + IAmDispatcher { roads: Vec }, +} #[derive(Debug)] pub enum Error { @@ -38,7 +44,8 @@ impl Frame { // } // Want Heartbeat: interval: u32 0x40 => { - unimplemented!() + get_u32(src)?; + Ok(()) } // Heartbeat (just Server -> Client) // 0x41 => { @@ -46,27 +53,79 @@ impl Frame { // } // IAmCamera: road: u16, mile: u16, limit: u16 0x80 => { - unimplemented!() + // road + get_u16(src)?; + // mile + get_u16(src)?; + // limit + get_u16(src)?; + Ok(()) } - // IAmDispatcher: numroads: u8, numroads: [u16] + // IAmDispatcher: numroads: u8, roads: [u16] 0x81 => { - unimplemented!() + // numroads + let amount = get_u8(src)? * 2; + // roads + skip(src, amount as usize)?; + Ok(()) } actual => Err(format!("protocol error; invalid frame type byte `{}`", actual).into()), } } pub fn parse(src: &mut Cursor<&[u8]>) -> Result { - unimplemented!() - } -} + match get_u8(src)? { + // Error: msg: str + 0x10 => { + let n = get_length(src)?; + let msg = get_str(src, n)?.to_string(); + Ok(Frame::Error { msg }) + } + // Plate: plate: str, timestamp: u32 + 0x20 => { + // Read length character of the plate string + let n = get_length(src)?; + // Skip the string to get to the timestamp + let plate = get_str(src, n)?.to_string(); + // check if valid timestamp + let timestamp = get_u32(src)?; + Ok(Frame::Plate { plate, timestamp }) + } + // Ticket (just Server -> Client) + // 0x21 => { + // Ok(()) + // } + // Want Heartbeat: interval: u32 + 0x40 => { + let interval = get_u32(src)?; + Ok(Frame::WantHeartbeat { interval }) + } + // Heartbeat (just Server -> Client) + // 0x41 => { + // Ok(()) + // } + // IAmCamera: road: u16, mile: u16, limit: u16 + 0x80 => { + // road + let road = get_u16(src)?; + // mile + let mile = get_u16(src)?; + // limit + let limit = get_u16(src)?; + Ok(Frame::IAmCamera { road, mile, limit }) + } + // IAmDispatcher: numroads: u8, roads: [u16] + 0x81 => { + // numroads + let numroads = get_u8(src)?; + // roads + let roads = get_u16_vec(src, numroads as usize)?; -fn peek_u8(src: &mut Cursor<&[u8]>) -> Result { - if !src.has_remaining() { - return Err(Error::Incomplete); + Ok(Frame::IAmDispatcher { roads }) + } + actual => Err(format!("protocol error; invalid frame type byte `{}`", actual).into()), + } } - - Ok(src.chunk()[0]) } fn get_str<'a>(src: &mut Cursor<&'a [u8]>, len: usize) -> Result<&'a str, Error> { @@ -85,6 +144,22 @@ fn get_str<'a>(src: &mut Cursor<&'a [u8]>, len: usize) -> Result<&'a str, Error> message } +fn get_u16_vec<'a>(src: &mut Cursor<&'a [u8]>, len: usize) -> Result, Error> { + if src.remaining() < len { + return Err(Error::Incomplete); + } + + let mut roads = Vec::new(); + + for _ in 0..len { + let road = src.get_u16(); + debug!(?road); + roads.push(road); + } + + Ok(roads) +} + fn skip(src: &mut Cursor<&[u8]>, n: usize) -> Result<(), Error> { if src.remaining() < n { return Err(Error::Incomplete); @@ -100,19 +175,24 @@ fn get_u8(src: &mut Cursor<&[u8]>) -> Result { return Err(Error::Incomplete); } - info!("get_u8: current cursor position: {:?}", src.position()); - Ok(src.get_u8()) } +fn get_u16(src: &mut Cursor<&[u8]>) -> Result { + if !src.has_remaining() { + error!("Incomplete frame"); + return Err(Error::Incomplete); + } + + Ok(src.get_u16()) +} + fn get_u32(src: &mut Cursor<&[u8]>) -> Result { if !src.has_remaining() { error!("Incomplete frame"); return Err(Error::Incomplete); } - info!("get_u32: current cursor position: {:?}", src.position()); - Ok(src.get_u32()) } @@ -123,15 +203,9 @@ fn get_length(src: &mut Cursor<&[u8]>) -> Result { return Err(Error::Incomplete); } - info!("get_length: current cursor position: {:?}", src.position()); - Ok(src.get_u8() as usize) } -fn get_line<'a>(src: &mut Cursor<&'a [u8]>) -> Result<&'a [u8], Error> { - unimplemented!() -} - impl From for Error { fn from(src: String) -> Error { Error::Other(src.into()) diff --git a/problem_06/src/server.rs b/problem_06/src/server.rs index 8f4df0c..2cfd3df 100644 --- a/problem_06/src/server.rs +++ b/problem_06/src/server.rs @@ -1,6 +1,5 @@ use crate::{frame::Frame, Connection, Shutdown}; -use std::collections::BTreeMap; use std::future::Future; use std::sync::Arc; use tokio::net::{TcpListener, TcpStream}; @@ -19,13 +18,9 @@ struct Listener { struct Handler { connection: Connection, shutdown: Shutdown, - local_db: BTreeMap, _shutdown_complete: mpsc::Sender<()>, } -type Timestamp = i32; -type Price = i32; - const MAX_CONNECTIONS: usize = 1500; pub async fn run(listener: TcpListener, shutdown: impl Future) -> crate::Result<()> { @@ -83,7 +78,6 @@ impl Listener { let mut handler = Handler { connection: Connection::new(socket), shutdown: Shutdown::new(self.notify_shutdown.subscribe()), - local_db: BTreeMap::new(), _shutdown_complete: self.shutdown_complete_tx.clone(), }; @@ -129,12 +123,28 @@ impl Handler { } }; - debug!(?maybe_frame); - let frame = match maybe_frame { Some(frame) => frame, None => return Ok(()), }; + + match frame { + Frame::Error { msg } => { + info!("Error message: {msg}") + } + Frame::Plate { plate, timestamp } => { + info!("Plate: {plate}, timestamp: {timestamp}"); + } + Frame::WantHeartbeat { interval } => { + info!("Want heartbeat: {interval}"); + } + Frame::IAmCamera { road, mile, limit } => { + info!("Road: {road}, mile: {mile}, limit: {limit}"); + } + Frame::IAmDispatcher { roads } => { + info!("roads: {roads:?}"); + } + } } Ok(())