protohackers/problem_02/src/connection.rs

75 lines
2 KiB
Rust
Raw Normal View History

use crate::frame::{self, Frame};
use bytes::{Buf, BytesMut};
use std::io::Cursor;
2023-04-24 20:37:35 +00:00
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufWriter};
use tokio::net::TcpStream;
use tracing::{debug, info};
#[derive(Debug)]
pub struct Connection {
stream: BufWriter<TcpStream>,
buffer: BytesMut,
}
impl Connection {
pub fn new(socket: TcpStream) -> Connection {
Connection {
stream: BufWriter::new(socket),
buffer: BytesMut::with_capacity(4 * 1024),
}
}
pub async fn read_frame(&mut self) -> crate::Result<Option<Frame>> {
loop {
info!("Loop read_frame");
if let Some(frame) = self.parse_frame()? {
info!("Frame parsed");
return Ok(Some(frame));
}
if 0 == self.stream.read_buf(&mut self.buffer).await? {
if self.buffer.is_empty() {
return Ok(None);
} else {
return Err("connection reset by peer".into());
}
}
}
}
fn parse_frame(&mut self) -> crate::Result<Option<Frame>> {
use frame::Error::Incomplete;
let mut buf = Cursor::new(&self.buffer[..]);
debug!(?buf);
match Frame::check(&mut buf) {
Ok(_) => {
info!("Frame::check succesful");
let len = buf.position() as usize;
2023-04-24 20:37:35 +00:00
debug!(?len);
buf.set_position(0);
let frame = Frame::parse(&mut buf)?;
self.buffer.advance(len);
Ok(Some(frame))
}
Err(Incomplete) => Ok(None),
Err(e) => Err(e.into()),
}
}
2023-04-24 20:37:35 +00:00
2023-04-24 21:46:41 +00:00
pub async fn write_frame(&mut self, frame: &Frame) -> tokio::io::Result<()> {
2023-04-24 20:37:35 +00:00
debug!(?frame);
if let Frame::Response(mean) = frame {
2023-04-24 21:54:52 +00:00
let _ = self.stream.write_i32(*mean as i32).await?;
2023-04-24 21:46:41 +00:00
info!("Write frame Response to stream");
return self.stream.flush().await;
2023-04-24 20:37:35 +00:00
}
2023-04-24 21:49:24 +00:00
Ok(())
2023-04-24 20:37:35 +00:00
}
}