diff --git a/problem_02/Cargo.toml b/problem_02/Cargo.toml index 0f494ac..b9a1c0e 100644 --- a/problem_02/Cargo.toml +++ b/problem_02/Cargo.toml @@ -8,7 +8,6 @@ name = "server" path = "bin/server.rs" [dependencies] -atoi = "0.3.2" bytes = "1" tokio = { version = "1", features = ["full"] } tracing = "0.1.34" diff --git a/problem_02/src/connection.rs b/problem_02/src/connection.rs index f040300..d6694ba 100644 --- a/problem_02/src/connection.rs +++ b/problem_02/src/connection.rs @@ -2,7 +2,7 @@ use crate::frame::{self, Frame}; use bytes::{Buf, BytesMut}; use std::io::Cursor; -use tokio::io::{AsyncReadExt, BufWriter}; +use tokio::io::{AsyncReadExt, AsyncWriteExt, BufWriter}; use tokio::net::TcpStream; use tracing::{debug, info}; @@ -48,7 +48,7 @@ impl Connection { Ok(_) => { info!("Frame::check succesful"); let len = buf.position() as usize; - + debug!(?len); buf.set_position(0); let frame = Frame::parse(&mut buf)?; @@ -60,4 +60,14 @@ impl Connection { Err(e) => Err(e.into()), } } + + pub async fn write_frame(&mut self, frame: &Frame) -> crate::Result<()> { + debug!(?frame); + if let Frame::Response(mean) = frame { + let _ = self.stream.write(&[mean.to_ne_bytes()[0]]); + return Ok(()); + } + + Err("Wrong frame".into()) + } } diff --git a/problem_02/src/frame.rs b/problem_02/src/frame.rs index 4eb8c37..2ff80d4 100644 --- a/problem_02/src/frame.rs +++ b/problem_02/src/frame.rs @@ -9,6 +9,7 @@ use tracing::{debug, error, info}; pub enum Frame { Insert { timestamp: i32, price: i32 }, Query { mintime: i32, maxtime: i32 }, + Response(i64), } #[derive(Debug)] @@ -22,10 +23,12 @@ impl Frame { info!("Check frame"); match get_u8(src)? { b'I' => { + debug!("INSERT message"); get_line(src)?; Ok(()) } b'Q' => { + debug!("QUERY message"); get_line(src)?; Ok(()) } @@ -78,8 +81,9 @@ fn get_u8(src: &mut Cursor<&[u8]>) -> Result { } fn get_line<'a>(src: &mut Cursor<&'a [u8]>) -> Result<&'a [u8], Error> { - if src.get_ref().len() == 9 { + if src.get_ref().len() >= 9 { src.set_position(9); + info!("Set cursors position to 9"); return Ok(&src.get_ref()[..]); } diff --git a/problem_02/src/server.rs b/problem_02/src/server.rs index e7a4dea..259a89b 100644 --- a/problem_02/src/server.rs +++ b/problem_02/src/server.rs @@ -1,5 +1,6 @@ -use crate::{Connection, Shutdown}; +use crate::{frame::Frame, Connection, Shutdown}; +use std::collections::BTreeMap; use std::future::Future; use std::sync::Arc; use tokio::net::{TcpListener, TcpStream}; @@ -18,9 +19,13 @@ struct Listener { struct Handler { connection: Connection, shutdown: Shutdown, + local_db: BTreeMap, _shutdown_complete: mpsc::Sender<()>, } +type Timestamp = i32; +type Price = i32; + const MAX_CONNECTIONS: usize = 5; pub async fn run(listener: TcpListener, shutdown: impl Future) -> crate::Result<()> { @@ -78,9 +83,12 @@ impl Listener { let mut handler = Handler { connection: Connection::new(socket), shutdown: Shutdown::new(self.notify_shutdown.subscribe()), + local_db: BTreeMap::new(), _shutdown_complete: self.shutdown_complete_tx.clone(), }; + info!("Created new handler"); + tokio::spawn(async move { if let Err(err) = handler.run().await { error!(cause = ?err, "connection error"); @@ -116,6 +124,7 @@ impl Handler { let maybe_frame = tokio::select! { res = self.connection.read_frame() => res?, _ = self.shutdown.recv() => { + debug!("Shutdown"); return Ok(()); } }; @@ -127,7 +136,29 @@ impl Handler { None => return Ok(()), }; - debug!(?frame); + match frame { + Frame::Insert { timestamp, price } => { + self.local_db.insert(timestamp, price); + } + Frame::Query { mintime, maxtime } => { + debug!(?mintime, ?maxtime); + + if mintime <= maxtime { + let mut count = 0; + let mut sum = 0i64; + + for (_, price) in self.local_db.range(mintime..=maxtime) { + sum += *price as i64; + count += 1; + } + + let mean = if count > 0 { sum / count } else { 0 }; + debug!(?mean); + self.connection.write_frame(&Frame::Response(mean)).await?; + } + } + _ => unimplemented!(), + } } Ok(())