diff --git a/problem_06/Cargo.toml b/problem_06/Cargo.toml index 989ce71..afa1cfa 100644 --- a/problem_06/Cargo.toml +++ b/problem_06/Cargo.toml @@ -3,6 +3,10 @@ name = "problem_06" version = "0.1.0" edition = "2021" +[[bin]] +name = "client" +path = "bin/client.rs" + [[bin]] name = "server" path = "bin/server.rs" diff --git a/problem_06/bin/client.rs b/problem_06/bin/client.rs new file mode 100644 index 0000000..6d420cc --- /dev/null +++ b/problem_06/bin/client.rs @@ -0,0 +1,40 @@ +use problem_06::{DEFAULT_IP, DEFAULT_PORT}; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::net::TcpStream; +use tracing::{debug, error, info}; + +#[tokio::main] +async fn main() -> Result<(), Box> { + tracing_subscriber::fmt::init(); + + let mut stream = TcpStream::connect(format!("{DEFAULT_IP}:{DEFAULT_PORT}")).await?; + let (mut read, mut write) = stream.split(); + + let mut buf: [u8; 4] = [0; 4]; + + // 20 Plate { + // 07 52 45 30 35 42 4b 47 plate: "RE05BKG", + // 00 01 e2 40 timestamp: 123456 + // } + let message = [ + 0x20, 0x07, 0x52, 0x45, 0x30, 0x35, 0x42, 0x4b, 0x47, 0x00, 0x01, 0xe2, 0x40, + ]; + + write.write_all(&message).await?; + + if let Ok(n) = read.read_exact(&mut buf).await { + info!("Stream incoming..."); + + if n == 0 { + info!("End of stream"); + return Ok(()); + } + + let message = i32::from_be_bytes(buf); + debug!(?message); + return Ok(()); + } + + error!("Cannot read from socket"); + Err("Could not read from socket".into()) +} diff --git a/problem_06/bin/server.rs b/problem_06/bin/server.rs new file mode 100644 index 0000000..3f53949 --- /dev/null +++ b/problem_06/bin/server.rs @@ -0,0 +1,16 @@ +use problem_06::{server, DEFAULT_IP, DEFAULT_PORT}; + +use tokio::net::TcpListener; +use tokio::signal; + +#[tokio::main] +pub async fn main() -> problem_06::Result<()> { + tracing_subscriber::fmt::try_init().expect("Couldn't setup logging"); + + // Bind a TCP listener + let listener = TcpListener::bind(&format!("{DEFAULT_IP}:{DEFAULT_PORT}")).await?; + + let _ = server::run(listener, signal::ctrl_c()).await; + + Ok(()) +} diff --git a/problem_06/src/connection.rs b/problem_06/src/connection.rs index 43b2127..b2ad7ed 100644 --- a/problem_06/src/connection.rs +++ b/problem_06/src/connection.rs @@ -62,13 +62,6 @@ impl Connection { } pub async fn write_frame(&mut self, frame: &Frame) -> tokio::io::Result<()> { - debug!(?frame); - if let Frame::Response(mean) = frame { - let _ = self.stream.write_i32(*mean as i32).await?; - info!("Write frame Response to stream"); - return self.stream.flush().await; - } - - Ok(()) + unimplemented!() } } diff --git a/problem_06/src/frame.rs b/problem_06/src/frame.rs index 60418f0..fdfb1fd 100644 --- a/problem_06/src/frame.rs +++ b/problem_06/src/frame.rs @@ -16,7 +16,44 @@ pub enum Error { impl Frame { pub fn check(src: &mut Cursor<&[u8]>) -> Result<(), Error> { - unimplemented!() + match get_u8(src)? { + // Error: msg: str + 0x10 => { + let n = get_length(src)?; + skip(src, n as usize) + } + // Plate: plate: str, timestamp: u32 + 0x20 => { + // Read length character of the plate string + let n = get_length(src)?; + // Skip the string to get to the timestamp + skip(src, n)?; + // check if valid timestamp + get_u32(src)?; + Ok(()) + } + // Ticket (just Server -> Client) + // 0x21 => { + // Ok(()) + // } + // Want Heartbeat: interval: u32 + 0x40 => { + unimplemented!() + } + // Heartbeat (just Server -> Client) + // 0x41 => { + // Ok(()) + // } + // IAmCamera: road: u16, mile: u16, limit: u16 + 0x80 => { + unimplemented!() + } + // IAmDispatcher: numroads: u8, numroads: [u16] + 0x81 => { + unimplemented!() + } + actual => Err(format!("protocol error; invalid frame type byte `{}`", actual).into()), + } } pub fn parse(src: &mut Cursor<&[u8]>) -> Result { @@ -24,14 +61,37 @@ impl Frame { } } -fn get_decimal(src: &[u8]) -> Result { - debug!(?src); +fn peek_u8(src: &mut Cursor<&[u8]>) -> Result { + if !src.has_remaining() { + return Err(Error::Incomplete); + } - if let Ok(number) = <[u8; 4]>::try_from(src) { - return Ok(i32::from_be_bytes(number)); - }; + Ok(src.chunk()[0]) +} - Err("protocol error; invalid frame format".into()) +fn get_str<'a>(src: &mut Cursor<&'a [u8]>, len: usize) -> Result<&'a str, Error> { + if src.remaining() < len { + return Err(Error::Incomplete); + } + + let position = src.position() as usize; + let slice = &src.get_ref()[position..position + len]; + + let message = + std::str::from_utf8(slice).map_err(|_| "protocol error; invalid frame format".into()); + + src.advance(len); + + message +} + +fn skip(src: &mut Cursor<&[u8]>, n: usize) -> Result<(), Error> { + if src.remaining() < n { + return Err(Error::Incomplete); + } + + src.advance(n); + Ok(()) } fn get_u8(src: &mut Cursor<&[u8]>) -> Result { @@ -40,9 +100,34 @@ fn get_u8(src: &mut Cursor<&[u8]>) -> Result { return Err(Error::Incomplete); } + info!("get_u8: current cursor position: {:?}", src.position()); + Ok(src.get_u8()) } +fn get_u32(src: &mut Cursor<&[u8]>) -> Result { + if !src.has_remaining() { + error!("Incomplete frame"); + return Err(Error::Incomplete); + } + + info!("get_u32: current cursor position: {:?}", src.position()); + + Ok(src.get_u32()) +} + +// Same as get_u8, but the current cursor points to the byte of the length of a message string. +fn get_length(src: &mut Cursor<&[u8]>) -> Result { + if !src.has_remaining() { + error!("Incomplete frame"); + return Err(Error::Incomplete); + } + + info!("get_length: current cursor position: {:?}", src.position()); + + Ok(src.get_u8() as usize) +} + fn get_line<'a>(src: &mut Cursor<&'a [u8]>) -> Result<&'a [u8], Error> { unimplemented!() } diff --git a/problem_06/src/lib.rs b/problem_06/src/lib.rs index 5252f1a..bdc2cc7 100644 --- a/problem_06/src/lib.rs +++ b/problem_06/src/lib.rs @@ -9,6 +9,7 @@ pub mod server; mod shutdown; use shutdown::Shutdown; +pub const DEFAULT_IP: &'static str = "0.0.0.0"; pub const DEFAULT_PORT: u16 = 1222; pub type Error = Box; diff --git a/problem_06/src/server.rs b/problem_06/src/server.rs index 68addf4..8f4df0c 100644 --- a/problem_06/src/server.rs +++ b/problem_06/src/server.rs @@ -26,7 +26,7 @@ struct Handler { type Timestamp = i32; type Price = i32; -const MAX_CONNECTIONS: usize = 5; +const MAX_CONNECTIONS: usize = 1500; pub async fn run(listener: TcpListener, shutdown: impl Future) -> crate::Result<()> { let (notify_shutdown, _) = broadcast::channel(1);