Write frame

This commit is contained in:
Bastian Gruber 2023-04-24 22:37:35 +02:00
parent 6128d38d99
commit a245bc8200
No known key found for this signature in database
GPG key ID: BE9F8C772B188CBF
4 changed files with 50 additions and 6 deletions

View file

@ -8,7 +8,6 @@ name = "server"
path = "bin/server.rs"
[dependencies]
atoi = "0.3.2"
bytes = "1"
tokio = { version = "1", features = ["full"] }
tracing = "0.1.34"

View file

@ -2,7 +2,7 @@ use crate::frame::{self, Frame};
use bytes::{Buf, BytesMut};
use std::io::Cursor;
use tokio::io::{AsyncReadExt, BufWriter};
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufWriter};
use tokio::net::TcpStream;
use tracing::{debug, info};
@ -48,7 +48,7 @@ impl Connection {
Ok(_) => {
info!("Frame::check succesful");
let len = buf.position() as usize;
debug!(?len);
buf.set_position(0);
let frame = Frame::parse(&mut buf)?;
@ -60,4 +60,14 @@ impl Connection {
Err(e) => Err(e.into()),
}
}
pub async fn write_frame(&mut self, frame: &Frame) -> crate::Result<()> {
debug!(?frame);
if let Frame::Response(mean) = frame {
let _ = self.stream.write(&[mean.to_ne_bytes()[0]]);
return Ok(());
}
Err("Wrong frame".into())
}
}

View file

@ -9,6 +9,7 @@ use tracing::{debug, error, info};
pub enum Frame {
Insert { timestamp: i32, price: i32 },
Query { mintime: i32, maxtime: i32 },
Response(i64),
}
#[derive(Debug)]
@ -22,10 +23,12 @@ impl Frame {
info!("Check frame");
match get_u8(src)? {
b'I' => {
debug!("INSERT message");
get_line(src)?;
Ok(())
}
b'Q' => {
debug!("QUERY message");
get_line(src)?;
Ok(())
}
@ -78,8 +81,9 @@ fn get_u8(src: &mut Cursor<&[u8]>) -> Result<u8, Error> {
}
fn get_line<'a>(src: &mut Cursor<&'a [u8]>) -> Result<&'a [u8], Error> {
if src.get_ref().len() == 9 {
if src.get_ref().len() >= 9 {
src.set_position(9);
info!("Set cursors position to 9");
return Ok(&src.get_ref()[..]);
}

View file

@ -1,5 +1,6 @@
use crate::{Connection, Shutdown};
use crate::{frame::Frame, Connection, Shutdown};
use std::collections::BTreeMap;
use std::future::Future;
use std::sync::Arc;
use tokio::net::{TcpListener, TcpStream};
@ -18,9 +19,13 @@ struct Listener {
struct Handler {
connection: Connection,
shutdown: Shutdown,
local_db: BTreeMap<Timestamp, Price>,
_shutdown_complete: mpsc::Sender<()>,
}
type Timestamp = i32;
type Price = i32;
const MAX_CONNECTIONS: usize = 5;
pub async fn run(listener: TcpListener, shutdown: impl Future) -> crate::Result<()> {
@ -78,9 +83,12 @@ impl Listener {
let mut handler = Handler {
connection: Connection::new(socket),
shutdown: Shutdown::new(self.notify_shutdown.subscribe()),
local_db: BTreeMap::new(),
_shutdown_complete: self.shutdown_complete_tx.clone(),
};
info!("Created new handler");
tokio::spawn(async move {
if let Err(err) = handler.run().await {
error!(cause = ?err, "connection error");
@ -116,6 +124,7 @@ impl Handler {
let maybe_frame = tokio::select! {
res = self.connection.read_frame() => res?,
_ = self.shutdown.recv() => {
debug!("Shutdown");
return Ok(());
}
};
@ -127,7 +136,29 @@ impl Handler {
None => return Ok(()),
};
debug!(?frame);
match frame {
Frame::Insert { timestamp, price } => {
self.local_db.insert(timestamp, price);
}
Frame::Query { mintime, maxtime } => {
debug!(?mintime, ?maxtime);
if mintime <= maxtime {
let mut count = 0;
let mut sum = 0i64;
for (_, price) in self.local_db.range(mintime..=maxtime) {
sum += *price as i64;
count += 1;
}
let mean = if count > 0 { sum / count } else { 0 };
debug!(?mean);
self.connection.write_frame(&Frame::Response(mean)).await?;
}
}
_ => unimplemented!(),
}
}
Ok(())