cargo fmt, restructure db for dispatchers

This commit is contained in:
Bastian Gruber 2023-05-19 15:01:12 +02:00
parent 4b30354214
commit 0fdc85a630
No known key found for this signature in database
GPG key ID: BE9F8C772B188CBF
8 changed files with 620 additions and 608 deletions

View file

@ -1,98 +1,100 @@
use problem_06::{DEFAULT_IP, DEFAULT_PORT}; use problem_06::{DEFAULT_IP, DEFAULT_PORT};
use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::{
use tokio::net::{tcp::WriteHalf, TcpStream}; io::{AsyncReadExt, AsyncWriteExt},
net::{tcp::WriteHalf, TcpStream},
};
use tracing::{debug, error, info}; use tracing::{debug, error, info};
#[tokio::main] #[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> { async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt::init(); tracing_subscriber::fmt::init();
let mut stream = TcpStream::connect(format!("{DEFAULT_IP}:{DEFAULT_PORT}")).await?; let mut stream = TcpStream::connect(format!("{DEFAULT_IP}:{DEFAULT_PORT}")).await?;
let (mut read, mut write) = stream.split(); let (mut read, mut write) = stream.split();
// test_all_different_messages(&mut write).await?; // test_all_different_messages(&mut write).await?;
test_camera_connection(&mut write).await?; test_camera_connection(&mut write).await?;
let mut buf: [u8; 4] = [0; 4]; let mut buf: [u8; 4] = [0; 4];
if let Ok(n) = read.read_exact(&mut buf).await { if let Ok(n) = read.read_exact(&mut buf).await {
info!("Stream incoming..."); info!("Stream incoming...");
if n == 0 { if n == 0 {
info!("End of stream"); info!("End of stream");
return Ok(()); return Ok(());
} }
let message = i32::from_be_bytes(buf); let message = i32::from_be_bytes(buf);
debug!(?message); debug!(?message);
return Ok(()); return Ok(());
} }
error!("Cannot read from socket"); error!("Cannot read from socket");
Err("Could not read from socket".into()) Err("Could not read from socket".into())
} }
#[allow(dead_code)] #[allow(dead_code)]
async fn test_all_different_messages( async fn test_all_different_messages(
write: &mut WriteHalf<'_>, write: &mut WriteHalf<'_>,
) -> Result<(), Box<dyn std::error::Error>> { ) -> Result<(), Box<dyn std::error::Error>> {
// 20 Plate { // 20 Plate {
// 07 52 45 30 35 42 4b 47 plate: "RE05BKG", // 07 52 45 30 35 42 4b 47 plate: "RE05BKG",
// 00 01 e2 40 timestamp: 123456 // 00 01 e2 40 timestamp: 123456
// } // }
let plate = [ let plate = [
0x20, 0x07, 0x52, 0x45, 0x30, 0x35, 0x42, 0x4b, 0x47, 0x00, 0x01, 0xe2, 0x40, 0x20, 0x07, 0x52, 0x45, 0x30, 0x35, 0x42, 0x4b, 0x47, 0x00, 0x01, 0xe2, 0x40,
]; ];
// 40 WantHeartbeat{ // 40 WantHeartbeat{
// 00 00 00 0a interval: 10 // 00 00 00 0a interval: 10
// } // }
let want_heartbeat = [0x40, 0x00, 0x00, 0x00, 0x0a]; let want_heartbeat = [0x40, 0x00, 0x00, 0x00, 0x0a];
// 80 IAmCamera{ // 80 IAmCamera{
// 00 42 road: 66, // 00 42 road: 66,
// 00 64 mile: 100, // 00 64 mile: 100,
// 00 3c limit: 60, // 00 3c limit: 60,
// } // }
let i_am_camera = [0x80, 0x00, 0x42, 0x00, 0x64, 0x00, 0x3c]; let i_am_camera = [0x80, 0x00, 0x42, 0x00, 0x64, 0x00, 0x3c];
// 81 IAmDispatcher{ // 81 IAmDispatcher{
// 03 roads: [ // 03 roads: [
// 00 42 66, // 00 42 66,
// 01 70 368, // 01 70 368,
// 13 88 5000 // 13 88 5000
// ] // ]
// } // }
let i_am_dispatcher = [0x81, 0x03, 0x00, 0x42, 0x01, 0x70, 0x13, 0x88]; let i_am_dispatcher = [0x81, 0x03, 0x00, 0x42, 0x01, 0x70, 0x13, 0x88];
write.write_all(&plate).await?; write.write_all(&plate).await?;
write.write_all(&want_heartbeat).await?; write.write_all(&want_heartbeat).await?;
write.write_all(&i_am_camera).await?; write.write_all(&i_am_camera).await?;
write.write_all(&i_am_dispatcher).await?; write.write_all(&i_am_dispatcher).await?;
Ok(()) Ok(())
} }
async fn test_camera_connection( async fn test_camera_connection(
write: &mut WriteHalf<'_>, write: &mut WriteHalf<'_>,
) -> Result<(), Box<dyn std::error::Error>> { ) -> Result<(), Box<dyn std::error::Error>> {
// 80 IAmCamera{ // 80 IAmCamera{
// 00 42 road: 66, // 00 42 road: 66,
// 00 64 mile: 100, // 00 64 mile: 100,
// 00 3c limit: 60, // 00 3c limit: 60,
// } // }
let i_am_camera = [0x80, 0x00, 0x42, 0x00, 0x64, 0x00, 0x3c]; let i_am_camera = [0x80, 0x00, 0x42, 0x00, 0x64, 0x00, 0x3c];
// 20 Plate { // 20 Plate {
// 07 52 45 30 35 42 4b 47 plate: "RE05BKG", // 07 52 45 30 35 42 4b 47 plate: "RE05BKG",
// 00 01 e2 40 timestamp: 123456 // 00 01 e2 40 timestamp: 123456
// } // }
let plate = [ let plate = [
0x20, 0x07, 0x52, 0x45, 0x30, 0x35, 0x42, 0x4b, 0x47, 0x00, 0x01, 0xe2, 0x40, 0x20, 0x07, 0x52, 0x45, 0x30, 0x35, 0x42, 0x4b, 0x47, 0x00, 0x01, 0xe2, 0x40,
]; ];
write.write_all(&i_am_camera).await?; write.write_all(&i_am_camera).await?;
write.write_all(&plate).await?; write.write_all(&plate).await?;
Ok(()) Ok(())
} }

View file

@ -1,16 +1,14 @@
use problem_06::{server, DEFAULT_IP, DEFAULT_PORT}; use problem_06::{server, DEFAULT_IP, DEFAULT_PORT};
use tokio::{net::TcpListener, signal};
use tokio::net::TcpListener;
use tokio::signal;
#[tokio::main] #[tokio::main]
pub async fn main() -> problem_06::Result<()> { pub async fn main() -> problem_06::Result<()> {
tracing_subscriber::fmt::try_init().expect("Couldn't setup logging"); tracing_subscriber::fmt::try_init().expect("Couldn't setup logging");
// Bind a TCP listener // Bind a TCP listener
let listener = TcpListener::bind(&format!("{DEFAULT_IP}:{DEFAULT_PORT}")).await?; let listener = TcpListener::bind(&format!("{DEFAULT_IP}:{DEFAULT_PORT}")).await?;
let _ = server::run(listener, signal::ctrl_c()).await; let _ = server::run(listener, signal::ctrl_c()).await;
Ok(()) Ok(())
} }

6
problem_06/rustfmt.toml Normal file
View file

@ -0,0 +1,6 @@
hard_tabs = true
imports_granularity = "Crate"
reorder_impl_items = true
reorder_imports = true
group_imports = "StdExternalCrate"
reorder_modules = true

View file

@ -1,81 +1,83 @@
use crate::frame::{self, ClientFrames, ServerFrames}; use std::{io::Cursor, net::SocketAddr};
use bytes::{Buf, BytesMut}; use bytes::{Buf, BytesMut};
use std::io::Cursor; use tokio::{
use std::net::SocketAddr; io::{AsyncReadExt, AsyncWriteExt, BufWriter},
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufWriter}; net::TcpStream,
use tokio::net::TcpStream; };
use tracing::{debug, info}; use tracing::{debug, info};
use crate::frame::{self, ClientFrames, ServerFrames};
pub(crate) enum ConnectionType { pub(crate) enum ConnectionType {
Camera, Camera,
Dispatcher, Dispatcher,
} }
#[derive(Debug)] #[derive(Debug)]
pub struct Connection { pub struct Connection {
pub address: SocketAddr, pub address: SocketAddr,
buffer: BytesMut, buffer: BytesMut,
pub(crate) stream: BufWriter<TcpStream>, pub(crate) stream: BufWriter<TcpStream>,
} }
impl Connection { impl Connection {
pub fn new(address: SocketAddr, socket: TcpStream) -> Connection { pub fn new(address: SocketAddr, socket: TcpStream) -> Connection {
Connection { Connection {
address, address,
buffer: BytesMut::with_capacity(4 * 1024), buffer: BytesMut::with_capacity(4 * 1024),
stream: BufWriter::new(socket), stream: BufWriter::new(socket),
} }
} }
pub fn get_address(&self) -> SocketAddr { pub fn get_address(&self) -> SocketAddr {
self.address.clone() self.address.clone()
} }
pub async fn read_frame(&mut self) -> crate::Result<Option<ClientFrames>> { pub async fn read_frame(&mut self) -> crate::Result<Option<ClientFrames>> {
loop { loop {
info!("Loop read_frame"); info!("Loop read_frame");
if let Some(frame) = self.parse_frame()? { if let Some(frame) = self.parse_frame()? {
info!("Frame parsed"); info!("Frame parsed");
return Ok(Some(frame)); return Ok(Some(frame));
} }
if 0 == self.stream.read_buf(&mut self.buffer).await? { if 0 == self.stream.read_buf(&mut self.buffer).await? {
if self.buffer.is_empty() { if self.buffer.is_empty() {
return Ok(None); return Ok(None);
} else { } else {
return Err("connection reset by peer".into()); return Err("connection reset by peer".into());
} }
} }
} }
} }
fn parse_frame(&mut self) -> crate::Result<Option<ClientFrames>> { fn parse_frame(&mut self) -> crate::Result<Option<ClientFrames>> {
use frame::Error::Incomplete; use frame::Error::Incomplete;
let mut buf = Cursor::new(&self.buffer[..]); let mut buf = Cursor::new(&self.buffer[..]);
debug!(?buf); debug!(?buf);
match ClientFrames::check(&mut buf) { match ClientFrames::check(&mut buf) {
Ok(_) => { Ok(_) => {
info!("Frame::check succesful"); info!("Frame::check succesful");
let len = buf.position() as usize; let len = buf.position() as usize;
debug!(?len); debug!(?len);
buf.set_position(0); buf.set_position(0);
let frame = ClientFrames::parse(&mut buf)?; let frame = ClientFrames::parse(&mut buf)?;
self.buffer.advance(len); self.buffer.advance(len);
Ok(Some(frame)) Ok(Some(frame))
} }
Err(Incomplete) => Ok(None), Err(Incomplete) => Ok(None),
Err(e) => Err(e.into()), Err(e) => Err(e.into()),
} }
} }
pub async fn write_frame(&mut self, frame: ServerFrames) -> tokio::io::Result<()> { pub async fn write_frame(&mut self, frame: ServerFrames) -> tokio::io::Result<()> {
let _ = self.stream.write_all(&frame.convert_to_bytes()).await; let _ = self.stream.write_all(&frame.convert_to_bytes()).await;
self.stream.flush().await?; self.stream.flush().await?;
Ok(()) Ok(())
} }
} }

View file

@ -1,6 +1,9 @@
use std::collections::HashMap; use std::{
use std::net::SocketAddr; collections::HashMap,
use std::sync::{Arc, Mutex}; net::SocketAddr,
sync::{Arc, Mutex},
};
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tracing::debug; use tracing::debug;
@ -14,82 +17,83 @@ pub(crate) struct CameraId(pub(crate) SocketAddr);
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone)] #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone)]
pub(crate) struct Plate { pub(crate) struct Plate {
pub(crate) plate: String, pub(crate) plate: String,
pub(crate) timestamp: u32, pub(crate) timestamp: u32,
} }
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone)] #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone)]
pub(crate) struct Camera { pub(crate) struct Camera {
pub(crate) road: u16, pub(crate) road: u16,
pub(crate) mile: u16, pub(crate) mile: u16,
pub(crate) limit: u16, pub(crate) limit: u16,
} }
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone)]
pub(crate) struct Road(u16);
pub(crate) struct DbHolder { pub(crate) struct DbHolder {
/// The `Db` instance that will be shut down when this `DbHolder` struct db: Db,
/// is dropped.
db: Db,
} }
#[derive(Clone)] #[derive(Clone)]
pub(crate) struct Db { pub(crate) struct Db {
state: Arc<Mutex<State>>, state: Arc<Mutex<State>>,
} }
#[derive(Debug)] #[derive(Debug)]
struct State { struct State {
cameras: HashMap<CameraId, Camera>, cameras: HashMap<CameraId, Camera>,
dispatchers: HashMap<DispatcherId, (Vec<u16>, mpsc::Sender<ServerFrames>)>, dispatchers: HashMap<Road, (DispatcherId, mpsc::Sender<ServerFrames>)>,
plates: HashMap<CameraId, Plate>, plates: HashMap<CameraId, Plate>,
} }
impl DbHolder { impl DbHolder {
/// Create a new `DbHolder`, wrapping a `Db` instance. When this is dropped pub(crate) fn new() -> DbHolder {
/// the `Db`'s purge task will be shut down. DbHolder { db: Db::new() }
pub(crate) fn new() -> DbHolder { }
DbHolder { db: Db::new() }
}
/// Get the shared database. Internally, this is an pub(crate) fn db(&self) -> Db {
/// `Arc`, so a clone only increments the ref count. self.db.clone()
pub(crate) fn db(&self) -> Db { }
self.db.clone()
}
} }
impl Db { impl Db {
pub(crate) fn new() -> Db { pub(crate) fn new() -> Db {
let state = Arc::new(Mutex::new(State { let state = Arc::new(Mutex::new(State {
cameras: HashMap::new(), cameras: HashMap::new(),
dispatchers: HashMap::new(), dispatchers: HashMap::new(),
plates: HashMap::new(), plates: HashMap::new(),
})); }));
Db { state } Db { state }
} }
pub(crate) fn add_camera(&self, camera_id: CameraId, camera: Camera) { pub(crate) fn add_camera(&self, camera_id: CameraId, camera: Camera) {
let mut state = self.state.lock().unwrap(); let mut state = self.state.lock().unwrap();
state.cameras.insert(camera_id, camera); state.cameras.insert(camera_id, camera);
debug!(?state); debug!(?state);
} }
pub(crate) fn add_dispatcher( pub(crate) fn add_dispatcher(
&self, &self,
dispatcher_id: DispatcherId, dispatcher_id: DispatcherId,
roads: Vec<u16>, roads: Vec<u16>,
writer_stream: mpsc::Sender<ServerFrames>, writer_stream: mpsc::Sender<ServerFrames>,
) { ) {
let mut state = self.state.lock().unwrap(); let mut state = self.state.lock().unwrap();
state
.dispatchers
.insert(dispatcher_id, (roads, writer_stream));
debug!(?state);
}
pub(crate) fn insert_plate(&self, camera_id: CameraId, plate: Plate) { for r in roads.iter() {
let mut state = self.state.lock().unwrap(); state
state.plates.insert(camera_id, plate); .dispatchers
debug!(?state); .insert(Road(*r), (dispatcher_id.clone(), writer_stream.clone()));
} }
debug!(?state);
}
pub(crate) fn insert_plate(&self, camera_id: CameraId, plate: Plate) {
let mut state = self.state.lock().unwrap();
state.plates.insert(camera_id, plate);
debug!(?state);
}
} }

View file

@ -1,304 +1,302 @@
use std::{fmt, io::Cursor, num::TryFromIntError, string::FromUtf8Error};
use bytes::{Buf, BufMut, BytesMut}; use bytes::{Buf, BufMut, BytesMut};
use std::fmt;
use std::io::Cursor;
use std::num::TryFromIntError;
use std::string::FromUtf8Error;
use tracing::{debug, error}; use tracing::{debug, error};
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub enum ClientFrames { pub enum ClientFrames {
Plate { plate: String, timestamp: u32 }, Plate { plate: String, timestamp: u32 },
WantHeartbeat { interval: u32 }, WantHeartbeat { interval: u32 },
IAmCamera { road: u16, mile: u16, limit: u16 }, IAmCamera { road: u16, mile: u16, limit: u16 },
IAmDispatcher { roads: Vec<u16> }, IAmDispatcher { roads: Vec<u16> },
} }
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub enum ServerFrames { pub enum ServerFrames {
Error { Error {
msg: String, msg: String,
}, },
Ticket { Ticket {
plate: String, plate: String,
road: u16, road: u16,
mile1: u16, mile1: u16,
timestamp1: u32, timestamp1: u32,
mile2: u16, mile2: u16,
timestamp2: u32, timestamp2: u32,
speed: u16, speed: u16,
}, },
Heartbeat, Heartbeat,
} }
#[derive(Debug)] #[derive(Debug)]
pub enum Error { pub enum Error {
Incomplete, Incomplete,
Other(crate::Error), Other(crate::Error),
} }
impl ClientFrames { impl ClientFrames {
pub fn check(src: &mut Cursor<&[u8]>) -> Result<(), Error> { pub fn check(src: &mut Cursor<&[u8]>) -> Result<(), Error> {
match get_u8(src)? { match get_u8(src)? {
// Error: msg: str (Server -> Client) // Error: msg: str (Server -> Client)
// 0x10 => { // 0x10 => {
// let n = get_length(src)?; // let n = get_length(src)?;
// skip(src, n as usize) // skip(src, n as usize)
// } // }
// Plate: plate: str, timestamp: u32 // Plate: plate: str, timestamp: u32
0x20 => { 0x20 => {
// Read length character of the plate string // Read length character of the plate string
let n = get_length(src)?; let n = get_length(src)?;
// Skip the string to get to the timestamp // Skip the string to get to the timestamp
skip(src, n)?; skip(src, n)?;
// check if valid timestamp // check if valid timestamp
get_u32(src)?; get_u32(src)?;
Ok(()) Ok(())
} }
// Ticket (just Server -> Client) // Ticket (just Server -> Client)
// 0x21 => { // 0x21 => {
// Ok(()) // Ok(())
// } // }
// Want Heartbeat: interval: u32 // Want Heartbeat: interval: u32
0x40 => { 0x40 => {
get_u32(src)?; get_u32(src)?;
Ok(()) Ok(())
} }
// Heartbeat (just Server -> Client) // Heartbeat (just Server -> Client)
// 0x41 => { // 0x41 => {
// Ok(()) // Ok(())
// } // }
// IAmCamera: road: u16, mile: u16, limit: u16 // IAmCamera: road: u16, mile: u16, limit: u16
0x80 => { 0x80 => {
// road // road
get_u16(src)?; get_u16(src)?;
// mile // mile
get_u16(src)?; get_u16(src)?;
// limit // limit
get_u16(src)?; get_u16(src)?;
Ok(()) Ok(())
} }
// IAmDispatcher: numroads: u8, roads: [u16] // IAmDispatcher: numroads: u8, roads: [u16]
0x81 => { 0x81 => {
// numroads // numroads
let amount = get_u8(src)? * 2; let amount = get_u8(src)? * 2;
// roads // roads
skip(src, amount as usize)?; skip(src, amount as usize)?;
Ok(()) Ok(())
} }
actual => Err(format!("protocol error; invalid frame type byte `{}`", actual).into()), actual => Err(format!("protocol error; invalid frame type byte `{}`", actual).into()),
} }
} }
pub fn parse(src: &mut Cursor<&[u8]>) -> Result<ClientFrames, Error> { pub fn parse(src: &mut Cursor<&[u8]>) -> Result<ClientFrames, Error> {
match get_u8(src)? { match get_u8(src)? {
// Error: msg: str (Server -> Client) // Error: msg: str (Server -> Client)
// 0x10 => { // 0x10 => {
// let n = get_length(src)?; // let n = get_length(src)?;
// let msg = get_str(src, n)?.to_string(); // let msg = get_str(src, n)?.to_string();
// Ok(Frame::Error { msg }) // Ok(Frame::Error { msg })
// } // }
// Plate: plate: str, timestamp: u32 // Plate: plate: str, timestamp: u32
0x20 => { 0x20 => {
// Read length character of the plate string // Read length character of the plate string
let n = get_length(src)?; let n = get_length(src)?;
// Skip the string to get to the timestamp // Skip the string to get to the timestamp
let plate = get_str(src, n)?.to_string(); let plate = get_str(src, n)?.to_string();
// check if valid timestamp // check if valid timestamp
let timestamp = get_u32(src)?; let timestamp = get_u32(src)?;
Ok(ClientFrames::Plate { plate, timestamp }) Ok(ClientFrames::Plate { plate, timestamp })
} }
// Ticket (just Server -> Client) // Ticket (just Server -> Client)
// 0x21 => { // 0x21 => {
// Ok(()) // Ok(())
// } // }
// Want Heartbeat: interval: u32 // Want Heartbeat: interval: u32
0x40 => { 0x40 => {
let interval = get_u32(src)?; let interval = get_u32(src)?;
Ok(ClientFrames::WantHeartbeat { interval }) Ok(ClientFrames::WantHeartbeat { interval })
} }
// Heartbeat (just Server -> Client) // Heartbeat (just Server -> Client)
// 0x41 => { // 0x41 => {
// Ok(()) // Ok(())
// } // }
// IAmCamera: road: u16, mile: u16, limit: u16 // IAmCamera: road: u16, mile: u16, limit: u16
0x80 => { 0x80 => {
// road // road
let road = get_u16(src)?; let road = get_u16(src)?;
// mile // mile
let mile = get_u16(src)?; let mile = get_u16(src)?;
// limit // limit
let limit = get_u16(src)?; let limit = get_u16(src)?;
Ok(ClientFrames::IAmCamera { road, mile, limit }) Ok(ClientFrames::IAmCamera { road, mile, limit })
} }
// IAmDispatcher: numroads: u8, roads: [u16] // IAmDispatcher: numroads: u8, roads: [u16]
0x81 => { 0x81 => {
// numroads // numroads
let numroads = get_u8(src)?; let numroads = get_u8(src)?;
// roads // roads
let roads = get_u16_vec(src, numroads as usize)?; let roads = get_u16_vec(src, numroads as usize)?;
Ok(ClientFrames::IAmDispatcher { roads }) Ok(ClientFrames::IAmDispatcher { roads })
} }
actual => Err(format!("protocol error; invalid frame type byte `{}`", actual).into()), actual => Err(format!("protocol error; invalid frame type byte `{}`", actual).into()),
} }
} }
} }
impl ServerFrames { impl ServerFrames {
pub(crate) fn convert_to_bytes(&self) -> BytesMut { pub(crate) fn convert_to_bytes(&self) -> BytesMut {
match self { match self {
ServerFrames::Error { msg } => { ServerFrames::Error { msg } => {
let mut buf = BytesMut::with_capacity(1 + 1 + msg.len()); let mut buf = BytesMut::with_capacity(1 + 1 + msg.len());
buf.put_u8(0x10); buf.put_u8(0x10);
buf.put_u8(msg.len() as u8); buf.put_u8(msg.len() as u8);
buf.put_slice(msg.as_bytes()); buf.put_slice(msg.as_bytes());
return buf; return buf;
} }
ServerFrames::Ticket { ServerFrames::Ticket {
plate, plate,
road, road,
mile1, mile1,
timestamp1, timestamp1,
mile2, mile2,
timestamp2, timestamp2,
speed, speed,
} => { } => {
let mut buf = BytesMut::with_capacity(1 + 1 + plate.len() + 2 + 2 + 4 + 2 + 4 + 2); let mut buf = BytesMut::with_capacity(1 + 1 + plate.len() + 2 + 2 + 4 + 2 + 4 + 2);
buf.put_u8(0x21); buf.put_u8(0x21);
buf.put_u8(plate.len() as u8); buf.put_u8(plate.len() as u8);
buf.put_slice(plate.as_bytes()); buf.put_slice(plate.as_bytes());
buf.put_u16(*road); buf.put_u16(*road);
buf.put_u16(*mile1); buf.put_u16(*mile1);
buf.put_u32(*timestamp1); buf.put_u32(*timestamp1);
buf.put_u16(*mile2); buf.put_u16(*mile2);
buf.put_u32(*timestamp2); buf.put_u32(*timestamp2);
buf.put_u16(*speed); buf.put_u16(*speed);
return buf; return buf;
} }
ServerFrames::Heartbeat => { ServerFrames::Heartbeat => {
let mut buf = BytesMut::new(); let mut buf = BytesMut::new();
buf.put_u8(0x41); buf.put_u8(0x41);
return buf; return buf;
} }
} }
} }
} }
fn get_str<'a>(src: &mut Cursor<&'a [u8]>, len: usize) -> Result<&'a str, Error> { fn get_str<'a>(src: &mut Cursor<&'a [u8]>, len: usize) -> Result<&'a str, Error> {
if src.remaining() < len { if src.remaining() < len {
return Err(Error::Incomplete); return Err(Error::Incomplete);
} }
let position = src.position() as usize; let position = src.position() as usize;
let slice = &src.get_ref()[position..position + len]; let slice = &src.get_ref()[position..position + len];
let message = let message =
std::str::from_utf8(slice).map_err(|_| "protocol error; invalid frame format".into()); std::str::from_utf8(slice).map_err(|_| "protocol error; invalid frame format".into());
src.advance(len); src.advance(len);
message message
} }
fn get_u16_vec<'a>(src: &mut Cursor<&'a [u8]>, len: usize) -> Result<Vec<u16>, Error> { fn get_u16_vec<'a>(src: &mut Cursor<&'a [u8]>, len: usize) -> Result<Vec<u16>, Error> {
if src.remaining() < len { if src.remaining() < len {
return Err(Error::Incomplete); return Err(Error::Incomplete);
} }
let mut roads = Vec::new(); let mut roads = Vec::new();
for _ in 0..len { for _ in 0..len {
let road = src.get_u16(); let road = src.get_u16();
debug!(?road); debug!(?road);
roads.push(road); roads.push(road);
} }
Ok(roads) Ok(roads)
} }
fn skip(src: &mut Cursor<&[u8]>, n: usize) -> Result<(), Error> { fn skip(src: &mut Cursor<&[u8]>, n: usize) -> Result<(), Error> {
if src.remaining() < n { if src.remaining() < n {
return Err(Error::Incomplete); return Err(Error::Incomplete);
} }
src.advance(n); src.advance(n);
Ok(()) Ok(())
} }
fn get_u8(src: &mut Cursor<&[u8]>) -> Result<u8, Error> { fn get_u8(src: &mut Cursor<&[u8]>) -> Result<u8, Error> {
if !src.has_remaining() { if !src.has_remaining() {
error!("Incomplete frame"); error!("Incomplete frame");
return Err(Error::Incomplete); return Err(Error::Incomplete);
} }
Ok(src.get_u8()) Ok(src.get_u8())
} }
fn get_u16(src: &mut Cursor<&[u8]>) -> Result<u16, Error> { fn get_u16(src: &mut Cursor<&[u8]>) -> Result<u16, Error> {
if !src.has_remaining() { if !src.has_remaining() {
error!("Incomplete frame"); error!("Incomplete frame");
return Err(Error::Incomplete); return Err(Error::Incomplete);
} }
Ok(src.get_u16()) Ok(src.get_u16())
} }
fn get_u32(src: &mut Cursor<&[u8]>) -> Result<u32, Error> { fn get_u32(src: &mut Cursor<&[u8]>) -> Result<u32, Error> {
if !src.has_remaining() { if !src.has_remaining() {
error!("Incomplete frame"); error!("Incomplete frame");
return Err(Error::Incomplete); return Err(Error::Incomplete);
} }
Ok(src.get_u32()) Ok(src.get_u32())
} }
// Same as get_u8, but the current cursor points to the byte of the length of a message string. // Same as get_u8, but the current cursor points to the byte of the length of a message string.
fn get_length(src: &mut Cursor<&[u8]>) -> Result<usize, Error> { fn get_length(src: &mut Cursor<&[u8]>) -> Result<usize, Error> {
if !src.has_remaining() { if !src.has_remaining() {
error!("Incomplete frame"); error!("Incomplete frame");
return Err(Error::Incomplete); return Err(Error::Incomplete);
} }
Ok(src.get_u8() as usize) Ok(src.get_u8() as usize)
} }
impl From<String> for Error { impl From<String> for Error {
fn from(src: String) -> Error { fn from(src: String) -> Error {
Error::Other(src.into()) Error::Other(src.into())
} }
} }
impl From<&str> for Error { impl From<&str> for Error {
fn from(src: &str) -> Error { fn from(src: &str) -> Error {
src.to_string().into() src.to_string().into()
} }
} }
impl From<FromUtf8Error> for Error { impl From<FromUtf8Error> for Error {
fn from(_src: FromUtf8Error) -> Error { fn from(_src: FromUtf8Error) -> Error {
"protocol error; invalid frame format".into() "protocol error; invalid frame format".into()
} }
} }
impl From<TryFromIntError> for Error { impl From<TryFromIntError> for Error {
fn from(_src: TryFromIntError) -> Error { fn from(_src: TryFromIntError) -> Error {
"protocol error; invalid frame format".into() "protocol error; invalid frame format".into()
} }
} }
impl std::error::Error for Error {} impl std::error::Error for Error {}
impl fmt::Display for Error { impl fmt::Display for Error {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
match self { match self {
Error::Incomplete => "stream ended early".fmt(fmt), Error::Incomplete => "stream ended early".fmt(fmt),
Error::Other(err) => err.fmt(fmt), Error::Other(err) => err.fmt(fmt),
} }
} }
} }

View file

@ -1,217 +1,219 @@
use crate::{ use std::{future::Future, sync::Arc};
connection::ConnectionType,
db::{Camera, CameraId, Db, DbHolder, DispatcherId, Plate},
frame::{ClientFrames, ServerFrames},
Connection, Shutdown,
};
use std::future::Future; use tokio::{
use std::sync::Arc; net::{TcpListener, TcpStream},
use tokio::net::{TcpListener, TcpStream}; sync::{broadcast, mpsc, Semaphore},
use tokio::sync::{broadcast, mpsc, Semaphore}; time::{self, Duration},
use tokio::time::{self, Duration}; };
use tracing::{debug, error, info}; use tracing::{debug, error, info};
use crate::{
connection::ConnectionType,
db::{Camera, CameraId, Db, DbHolder, DispatcherId, Plate},
frame::{ClientFrames, ServerFrames},
Connection, Shutdown,
};
struct Listener { struct Listener {
listener: TcpListener, listener: TcpListener,
db_holder: DbHolder, db_holder: DbHolder,
limit_connections: Arc<Semaphore>, limit_connections: Arc<Semaphore>,
notify_shutdown: broadcast::Sender<()>, notify_shutdown: broadcast::Sender<()>,
shutdown_complete_tx: mpsc::Sender<()>, shutdown_complete_tx: mpsc::Sender<()>,
} }
struct Handler { struct Handler {
connection: Connection, connection: Connection,
connection_type: Option<ConnectionType>, connection_type: Option<ConnectionType>,
db: Db, db: Db,
shutdown: Shutdown, shutdown: Shutdown,
_shutdown_complete: mpsc::Sender<()>, _shutdown_complete: mpsc::Sender<()>,
} }
const MAX_CONNECTIONS: usize = 1500; const MAX_CONNECTIONS: usize = 1500;
pub async fn run(listener: TcpListener, shutdown: impl Future) -> crate::Result<()> { pub async fn run(listener: TcpListener, shutdown: impl Future) -> crate::Result<()> {
let (notify_shutdown, _) = broadcast::channel(1); let (notify_shutdown, _) = broadcast::channel(1);
let (shutdown_complete_tx, mut shutdown_complete_rx) = mpsc::channel(1); let (shutdown_complete_tx, mut shutdown_complete_rx) = mpsc::channel(1);
let mut server = Listener { let mut server = Listener {
listener, listener,
db_holder: DbHolder::new(), db_holder: DbHolder::new(),
limit_connections: Arc::new(Semaphore::new(MAX_CONNECTIONS)), limit_connections: Arc::new(Semaphore::new(MAX_CONNECTIONS)),
notify_shutdown, notify_shutdown,
shutdown_complete_tx, shutdown_complete_tx,
}; };
tokio::select! { tokio::select! {
res = server.run() => { res = server.run() => {
if let Err(err) = res { if let Err(err) = res {
error!(cause = %err, "failed to accept"); error!(cause = %err, "failed to accept");
} }
} }
_ = shutdown => { _ = shutdown => {
info!("shutting down"); info!("shutting down");
} }
} }
let Listener { let Listener {
shutdown_complete_tx, shutdown_complete_tx,
notify_shutdown, notify_shutdown,
.. ..
} = server; } = server;
drop(notify_shutdown); drop(notify_shutdown);
drop(shutdown_complete_tx); drop(shutdown_complete_tx);
let _ = shutdown_complete_rx.recv().await; let _ = shutdown_complete_rx.recv().await;
Ok(()) Ok(())
} }
impl Listener { impl Listener {
async fn run(&mut self) -> crate::Result<()> { async fn run(&mut self) -> crate::Result<()> {
info!("accepting inbound connections"); info!("accepting inbound connections");
loop { loop {
let permit = self let permit = self
.limit_connections .limit_connections
.clone() .clone()
.acquire_owned() .acquire_owned()
.await .await
.unwrap(); .unwrap();
let socket = self.accept().await?; let socket = self.accept().await?;
let address = socket.peer_addr()?; let address = socket.peer_addr()?;
let mut handler = Handler { let mut handler = Handler {
connection: Connection::new(address, socket), connection: Connection::new(address, socket),
connection_type: None, connection_type: None,
db: self.db_holder.db(), db: self.db_holder.db(),
shutdown: Shutdown::new(self.notify_shutdown.subscribe()), shutdown: Shutdown::new(self.notify_shutdown.subscribe()),
_shutdown_complete: self.shutdown_complete_tx.clone(), _shutdown_complete: self.shutdown_complete_tx.clone(),
}; };
info!("Created new handler"); info!("Created new handler");
tokio::spawn(async move { tokio::spawn(async move {
if let Err(err) = handler.run().await { if let Err(err) = handler.run().await {
error!(cause = ?err, "connection error"); error!(cause = ?err, "connection error");
} }
drop(permit); drop(permit);
}); });
} }
} }
async fn accept(&mut self) -> crate::Result<TcpStream> { async fn accept(&mut self) -> crate::Result<TcpStream> {
let mut backoff = 1; let mut backoff = 1;
loop { loop {
match self.listener.accept().await { match self.listener.accept().await {
Ok((socket, _)) => return Ok(socket), Ok((socket, _)) => return Ok(socket),
Err(err) => { Err(err) => {
if backoff > 64 { if backoff > 64 {
return Err(err.into()); return Err(err.into());
} }
} }
} }
time::sleep(Duration::from_secs(backoff)).await; time::sleep(Duration::from_secs(backoff)).await;
backoff *= 2; backoff *= 2;
} }
} }
} }
impl Handler { impl Handler {
async fn run(&mut self) -> crate::Result<()> { async fn run(&mut self) -> crate::Result<()> {
let (send_message, mut receive_message): ( let (send_message, mut receive_message): (
mpsc::Sender<ServerFrames>, mpsc::Sender<ServerFrames>,
mpsc::Receiver<ServerFrames>, mpsc::Receiver<ServerFrames>,
) = mpsc::channel(1024); ) = mpsc::channel(1024);
while !self.shutdown.is_shutdown() { while !self.shutdown.is_shutdown() {
tokio::select! { tokio::select! {
res = self.connection.read_frame() => { res = self.connection.read_frame() => {
match res? { match res? {
Some(frame) => { Some(frame) => {
info!("Received frame"); info!("Received frame");
let _ = self.handle_client_frame(self.db.clone(), frame, send_message.clone()).await; let _ = self.handle_client_frame(self.db.clone(), frame, send_message.clone()).await;
}, },
None => return Ok(()), None => return Ok(()),
} }
} }
message = receive_message.recv() => { message = receive_message.recv() => {
match message { match message {
Some(message) => { Some(message) => {
let _ = self.connection.write_frame(message).await; let _ = self.connection.write_frame(message).await;
}, },
None => (), None => (),
} }
} }
_ = self.shutdown.recv() => { _ = self.shutdown.recv() => {
debug!("Shutdown"); debug!("Shutdown");
return Ok(()); return Ok(());
} }
}; };
} }
Ok(()) Ok(())
} }
fn set_connection_type(&mut self, connection_type: ConnectionType) { fn set_connection_type(&mut self, connection_type: ConnectionType) {
match connection_type { match connection_type {
ConnectionType::Camera => { ConnectionType::Camera => {
self.connection_type = Some(connection_type); self.connection_type = Some(connection_type);
} }
ConnectionType::Dispatcher => { ConnectionType::Dispatcher => {
self.connection_type = Some(connection_type); self.connection_type = Some(connection_type);
} }
} }
} }
async fn handle_client_frame( async fn handle_client_frame(
&mut self, &mut self,
db: Db, db: Db,
frame: ClientFrames, frame: ClientFrames,
send_message: mpsc::Sender<ServerFrames>, send_message: mpsc::Sender<ServerFrames>,
) -> crate::Result<()> { ) -> crate::Result<()> {
match frame { match frame {
ClientFrames::Plate { plate, timestamp } => { ClientFrames::Plate { plate, timestamp } => {
info!("Receive new plate {plate} {timestamp}"); info!("Receive new plate {plate} {timestamp}");
db.insert_plate( db.insert_plate(
CameraId(self.connection.get_address()), CameraId(self.connection.get_address()),
Plate { plate, timestamp }, Plate { plate, timestamp },
); );
} }
ClientFrames::WantHeartbeat { interval } => { ClientFrames::WantHeartbeat { interval } => {
info!("Want heartbeat: {interval}"); info!("Want heartbeat: {interval}");
} }
ClientFrames::IAmCamera { road, mile, limit } => { ClientFrames::IAmCamera { road, mile, limit } => {
if self.connection_type.is_some() { if self.connection_type.is_some() {
return Err("Already connected".into()); return Err("Already connected".into());
} }
self.set_connection_type(ConnectionType::Camera); self.set_connection_type(ConnectionType::Camera);
info!("Set connection type to camera"); info!("Set connection type to camera");
db.add_camera( db.add_camera(
CameraId(self.connection.get_address()), CameraId(self.connection.get_address()),
Camera { road, mile, limit }, Camera { road, mile, limit },
); );
} }
ClientFrames::IAmDispatcher { roads } => { ClientFrames::IAmDispatcher { roads } => {
if self.connection_type.is_some() { if self.connection_type.is_some() {
return Err("Already connected".into()); return Err("Already connected".into());
} }
self.set_connection_type(ConnectionType::Dispatcher); self.set_connection_type(ConnectionType::Dispatcher);
db.add_dispatcher( db.add_dispatcher(
DispatcherId(self.connection.get_address()), DispatcherId(self.connection.get_address()),
roads.to_vec(), roads.to_vec(),
send_message.clone(), send_message.clone(),
); );
} }
} }
Ok(()) Ok(())
} }
} }

View file

@ -2,29 +2,29 @@ use tokio::sync::broadcast;
#[derive(Debug)] #[derive(Debug)]
pub(crate) struct Shutdown { pub(crate) struct Shutdown {
shutdown: bool, shutdown: bool,
notify: broadcast::Receiver<()>, notify: broadcast::Receiver<()>,
} }
impl Shutdown { impl Shutdown {
pub(crate) fn new(notify: broadcast::Receiver<()>) -> Shutdown { pub(crate) fn new(notify: broadcast::Receiver<()>) -> Shutdown {
Shutdown { Shutdown {
shutdown: false, shutdown: false,
notify, notify,
} }
} }
pub(crate) fn is_shutdown(&self) -> bool { pub(crate) fn is_shutdown(&self) -> bool {
self.shutdown self.shutdown
} }
pub(crate) async fn recv(&mut self) { pub(crate) async fn recv(&mut self) {
if self.shutdown { if self.shutdown {
return; return;
} }
let _ = self.notify.recv().await; let _ = self.notify.recv().await;
self.shutdown = true; self.shutdown = true;
} }
} }