Scaffolding for problem 06

This commit is contained in:
Bastian Gruber 2023-05-06 07:41:24 +02:00
parent 20d9acf917
commit 9aa553e2e1
No known key found for this signature in database
GPG key ID: BE9F8C772B188CBF
7 changed files with 361 additions and 0 deletions

3
problem_06/.gitignore vendored Normal file
View file

@ -0,0 +1,3 @@
/target/
.idea
.DS_STORE

14
problem_06/Cargo.toml Normal file
View file

@ -0,0 +1,14 @@
[package]
name = "problem_06"
version = "0.1.0"
edition = "2021"
[[bin]]
name = "server"
path = "bin/server.rs"
[dependencies]
bytes = "1"
tokio = { version = "1", features = ["full"] }
tracing = "0.1.34"
tracing-subscriber = { version = "0.3.11", features = ["env-filter"] }

View file

@ -0,0 +1,74 @@
use crate::frame::{self, Frame};
use bytes::{Buf, BytesMut};
use std::io::Cursor;
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufWriter};
use tokio::net::TcpStream;
use tracing::{debug, info};
#[derive(Debug)]
pub struct Connection {
stream: BufWriter<TcpStream>,
buffer: BytesMut,
}
impl Connection {
pub fn new(socket: TcpStream) -> Connection {
Connection {
stream: BufWriter::new(socket),
buffer: BytesMut::with_capacity(4 * 1024),
}
}
pub async fn read_frame(&mut self) -> crate::Result<Option<Frame>> {
loop {
info!("Loop read_frame");
if let Some(frame) = self.parse_frame()? {
info!("Frame parsed");
return Ok(Some(frame));
}
if 0 == self.stream.read_buf(&mut self.buffer).await? {
if self.buffer.is_empty() {
return Ok(None);
} else {
return Err("connection reset by peer".into());
}
}
}
}
fn parse_frame(&mut self) -> crate::Result<Option<Frame>> {
use frame::Error::Incomplete;
let mut buf = Cursor::new(&self.buffer[..]);
debug!(?buf);
match Frame::check(&mut buf) {
Ok(_) => {
info!("Frame::check succesful");
let len = buf.position() as usize;
debug!(?len);
buf.set_position(0);
let frame = Frame::parse(&mut buf)?;
self.buffer.advance(len);
Ok(Some(frame))
}
Err(Incomplete) => Ok(None),
Err(e) => Err(e.into()),
}
}
pub async fn write_frame(&mut self, frame: &Frame) -> tokio::io::Result<()> {
debug!(?frame);
if let Frame::Response(mean) = frame {
let _ = self.stream.write_i32(*mean as i32).await?;
info!("Write frame Response to stream");
return self.stream.flush().await;
}
Ok(())
}
}

83
problem_06/src/frame.rs Normal file
View file

@ -0,0 +1,83 @@
use bytes::Buf;
use std::fmt;
use std::io::Cursor;
use std::num::TryFromIntError;
use std::string::FromUtf8Error;
use tracing::{debug, error, info};
#[derive(Clone, Debug)]
pub enum Frame {}
#[derive(Debug)]
pub enum Error {
Incomplete,
Other(crate::Error),
}
impl Frame {
pub fn check(src: &mut Cursor<&[u8]>) -> Result<(), Error> {
unimplemented!()
}
pub fn parse(src: &mut Cursor<&[u8]>) -> Result<Frame, Error> {
unimplemented!()
}
}
fn get_decimal(src: &[u8]) -> Result<i32, Error> {
debug!(?src);
if let Ok(number) = <[u8; 4]>::try_from(src) {
return Ok(i32::from_be_bytes(number));
};
Err("protocol error; invalid frame format".into())
}
fn get_u8(src: &mut Cursor<&[u8]>) -> Result<u8, Error> {
if !src.has_remaining() {
error!("Incomplete frame");
return Err(Error::Incomplete);
}
Ok(src.get_u8())
}
fn get_line<'a>(src: &mut Cursor<&'a [u8]>) -> Result<&'a [u8], Error> {
unimplemented!()
}
impl From<String> for Error {
fn from(src: String) -> Error {
Error::Other(src.into())
}
}
impl From<&str> for Error {
fn from(src: &str) -> Error {
src.to_string().into()
}
}
impl From<FromUtf8Error> for Error {
fn from(_src: FromUtf8Error) -> Error {
"protocol error; invalid frame format".into()
}
}
impl From<TryFromIntError> for Error {
fn from(_src: TryFromIntError) -> Error {
"protocol error; invalid frame format".into()
}
}
impl std::error::Error for Error {}
impl fmt::Display for Error {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
match self {
Error::Incomplete => "stream ended early".fmt(fmt),
Error::Other(err) => err.fmt(fmt),
}
}
}

15
problem_06/src/lib.rs Normal file
View file

@ -0,0 +1,15 @@
mod connection;
pub use connection::Connection;
pub mod frame;
pub use frame::Frame;
pub mod server;
mod shutdown;
use shutdown::Shutdown;
pub const DEFAULT_PORT: u16 = 1222;
pub type Error = Box<dyn std::error::Error + Send + Sync>;
pub type Result<T> = std::result::Result<T, Error>;

142
problem_06/src/server.rs Normal file
View file

@ -0,0 +1,142 @@
use crate::{frame::Frame, Connection, Shutdown};
use std::collections::BTreeMap;
use std::future::Future;
use std::sync::Arc;
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::{broadcast, mpsc, Semaphore};
use tokio::time::{self, Duration};
use tracing::{debug, error, info};
struct Listener {
listener: TcpListener,
limit_connections: Arc<Semaphore>,
notify_shutdown: broadcast::Sender<()>,
shutdown_complete_rx: mpsc::Receiver<()>,
shutdown_complete_tx: mpsc::Sender<()>,
}
struct Handler {
connection: Connection,
shutdown: Shutdown,
local_db: BTreeMap<Timestamp, Price>,
_shutdown_complete: mpsc::Sender<()>,
}
type Timestamp = i32;
type Price = i32;
const MAX_CONNECTIONS: usize = 5;
pub async fn run(listener: TcpListener, shutdown: impl Future) -> crate::Result<()> {
let (notify_shutdown, _) = broadcast::channel(1);
let (shutdown_complete_tx, shutdown_complete_rx) = mpsc::channel(1);
let mut server = Listener {
listener,
limit_connections: Arc::new(Semaphore::new(MAX_CONNECTIONS)),
notify_shutdown,
shutdown_complete_tx,
shutdown_complete_rx,
};
tokio::select! {
res = server.run() => {
if let Err(err) = res {
error!(cause = %err, "failed to accept");
}
}
_ = shutdown => {
info!("shutting down");
}
}
let Listener {
mut shutdown_complete_rx,
shutdown_complete_tx,
notify_shutdown,
..
} = server;
drop(notify_shutdown);
drop(shutdown_complete_tx);
let _ = shutdown_complete_rx.recv().await;
Ok(())
}
impl Listener {
async fn run(&mut self) -> crate::Result<()> {
info!("accepting inbound connections");
loop {
let permit = self
.limit_connections
.clone()
.acquire_owned()
.await
.unwrap();
let socket = self.accept().await?;
let mut handler = Handler {
connection: Connection::new(socket),
shutdown: Shutdown::new(self.notify_shutdown.subscribe()),
local_db: BTreeMap::new(),
_shutdown_complete: self.shutdown_complete_tx.clone(),
};
info!("Created new handler");
tokio::spawn(async move {
if let Err(err) = handler.run().await {
error!(cause = ?err, "connection error");
}
drop(permit);
});
}
}
async fn accept(&mut self) -> crate::Result<TcpStream> {
let mut backoff = 1;
loop {
match self.listener.accept().await {
Ok((socket, _)) => return Ok(socket),
Err(err) => {
if backoff > 64 {
return Err(err.into());
}
}
}
time::sleep(Duration::from_secs(backoff)).await;
backoff *= 2;
}
}
}
impl Handler {
async fn run(&mut self) -> crate::Result<()> {
while !self.shutdown.is_shutdown() {
let maybe_frame = tokio::select! {
res = self.connection.read_frame() => res?,
_ = self.shutdown.recv() => {
debug!("Shutdown");
return Ok(());
}
};
debug!(?maybe_frame);
let frame = match maybe_frame {
Some(frame) => frame,
None => return Ok(()),
};
}
Ok(())
}
}

View file

@ -0,0 +1,30 @@
use tokio::sync::broadcast;
#[derive(Debug)]
pub(crate) struct Shutdown {
shutdown: bool,
notify: broadcast::Receiver<()>,
}
impl Shutdown {
pub(crate) fn new(notify: broadcast::Receiver<()>) -> Shutdown {
Shutdown {
shutdown: false,
notify,
}
}
pub(crate) fn is_shutdown(&self) -> bool {
self.shutdown
}
pub(crate) async fn recv(&mut self) {
if self.shutdown {
return;
}
let _ = self.notify.recv().await;
self.shutdown = true;
}
}