Add test cases, reorganized server code, be able to add data

This commit is contained in:
Bastian Gruber 2023-05-18 09:44:54 +02:00
parent 0c8f486907
commit 4b30354214
No known key found for this signature in database
GPG key ID: BE9F8C772B188CBF
6 changed files with 183 additions and 100 deletions

View file

@ -1,6 +1,6 @@
use problem_06::{DEFAULT_IP, DEFAULT_PORT};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
use tokio::net::{tcp::WriteHalf, TcpStream};
use tracing::{debug, error, info};
#[tokio::main]
@ -10,8 +10,32 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut stream = TcpStream::connect(format!("{DEFAULT_IP}:{DEFAULT_PORT}")).await?;
let (mut read, mut write) = stream.split();
// test_all_different_messages(&mut write).await?;
test_camera_connection(&mut write).await?;
let mut buf: [u8; 4] = [0; 4];
if let Ok(n) = read.read_exact(&mut buf).await {
info!("Stream incoming...");
if n == 0 {
info!("End of stream");
return Ok(());
}
let message = i32::from_be_bytes(buf);
debug!(?message);
return Ok(());
}
error!("Cannot read from socket");
Err("Could not read from socket".into())
}
#[allow(dead_code)]
async fn test_all_different_messages(
write: &mut WriteHalf<'_>,
) -> Result<(), Box<dyn std::error::Error>> {
// 20 Plate {
// 07 52 45 30 35 42 4b 47 plate: "RE05BKG",
// 00 01 e2 40 timestamp: 123456
@ -46,19 +70,29 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
write.write_all(&i_am_camera).await?;
write.write_all(&i_am_dispatcher).await?;
if let Ok(n) = read.read_exact(&mut buf).await {
info!("Stream incoming...");
if n == 0 {
info!("End of stream");
return Ok(());
Ok(())
}
let message = i32::from_be_bytes(buf);
debug!(?message);
return Ok(());
}
async fn test_camera_connection(
write: &mut WriteHalf<'_>,
) -> Result<(), Box<dyn std::error::Error>> {
// 80 IAmCamera{
// 00 42 road: 66,
// 00 64 mile: 100,
// 00 3c limit: 60,
// }
let i_am_camera = [0x80, 0x00, 0x42, 0x00, 0x64, 0x00, 0x3c];
error!("Cannot read from socket");
Err("Could not read from socket".into())
// 20 Plate {
// 07 52 45 30 35 42 4b 47 plate: "RE05BKG",
// 00 01 e2 40 timestamp: 123456
// }
let plate = [
0x20, 0x07, 0x52, 0x45, 0x30, 0x35, 0x42, 0x4b, 0x47, 0x00, 0x01, 0xe2, 0x40,
];
write.write_all(&i_am_camera).await?;
write.write_all(&plate).await?;
Ok(())
}

View file

@ -2,24 +2,36 @@ use crate::frame::{self, ClientFrames, ServerFrames};
use bytes::{Buf, BytesMut};
use std::io::Cursor;
use std::net::SocketAddr;
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufWriter};
use tokio::net::TcpStream;
use tracing::{debug, info};
pub(crate) enum ConnectionType {
Camera,
Dispatcher,
}
#[derive(Debug)]
pub struct Connection {
pub address: SocketAddr,
buffer: BytesMut,
pub(crate) stream: BufWriter<TcpStream>,
}
impl Connection {
pub fn new(socket: TcpStream) -> Connection {
pub fn new(address: SocketAddr, socket: TcpStream) -> Connection {
Connection {
address,
buffer: BytesMut::with_capacity(4 * 1024),
stream: BufWriter::new(socket),
}
}
pub fn get_address(&self) -> SocketAddr {
self.address.clone()
}
pub async fn read_frame(&mut self) -> crate::Result<Option<ClientFrames>> {
loop {
info!("Loop read_frame");

View file

@ -1,26 +1,46 @@
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::{Arc, Mutex};
use tokio::sync::broadcast;
use tokio::sync::mpsc;
use tracing::debug;
use crate::frame::ServerFrames;
#[derive(Debug)]
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone)]
pub(crate) struct DispatcherId(pub(crate) SocketAddr);
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone)]
pub(crate) struct CameraId(pub(crate) SocketAddr);
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone)]
pub(crate) struct Plate {
pub(crate) plate: String,
pub(crate) timestamp: u32,
}
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone)]
pub(crate) struct Camera {
pub(crate) road: u16,
pub(crate) mile: u16,
pub(crate) limit: u16,
}
pub(crate) struct DbHolder {
/// The `Db` instance that will be shut down when this `DbHolder` struct
/// is dropped.
db: Db,
}
#[derive(Debug, Clone)]
#[derive(Clone)]
pub(crate) struct Db {
state: Arc<Mutex<State>>,
}
#[derive(Debug)]
struct State {
// cameras: HashMap<(u32, u32), u32>,
dispatchers: HashMap<Vec<u16>, broadcast::Sender<ServerFrames>>,
plates: HashMap<String, u32>,
cameras: HashMap<CameraId, Camera>,
dispatchers: HashMap<DispatcherId, (Vec<u16>, mpsc::Sender<ServerFrames>)>,
plates: HashMap<CameraId, Plate>,
}
impl DbHolder {
@ -40,7 +60,7 @@ impl DbHolder {
impl Db {
pub(crate) fn new() -> Db {
let state = Arc::new(Mutex::new(State {
// cameras: HashMap::new(),
cameras: HashMap::new(),
dispatchers: HashMap::new(),
plates: HashMap::new(),
}));
@ -48,19 +68,28 @@ impl Db {
Db { state }
}
// pub(crate) fn new_camera(&self, road: u32, mile: u32, limit: u32) {}
pub(crate) fn add_camera(&self, camera_id: CameraId, camera: Camera) {
let mut state = self.state.lock().unwrap();
state.cameras.insert(camera_id, camera);
debug!(?state);
}
pub(crate) fn add_dispatcher(
&self,
dispatcher_id: DispatcherId,
roads: Vec<u16>,
writer_stream: broadcast::Sender<ServerFrames>,
writer_stream: mpsc::Sender<ServerFrames>,
) {
let mut state = self.state.lock().unwrap();
state.dispatchers.insert(roads, writer_stream);
state
.dispatchers
.insert(dispatcher_id, (roads, writer_stream));
debug!(?state);
}
pub(crate) fn insert_plate(&self, plate: String, timestamp: u32) {
pub(crate) fn insert_plate(&self, camera_id: CameraId, plate: Plate) {
let mut state = self.state.lock().unwrap();
state.plates.insert(plate, timestamp);
state.plates.insert(camera_id, plate);
debug!(?state);
}
}

View file

@ -8,8 +8,6 @@ pub mod db;
pub mod server;
pub mod ticketing;
mod shutdown;
use shutdown::Shutdown;

View file

@ -1,7 +1,8 @@
use crate::{
db::{Db, DbHolder},
connection::ConnectionType,
db::{Camera, CameraId, Db, DbHolder, DispatcherId, Plate},
frame::{ClientFrames, ServerFrames},
ticketing, Connection, Shutdown,
Connection, Shutdown,
};
use std::future::Future;
@ -21,9 +22,7 @@ struct Listener {
struct Handler {
connection: Connection,
receive_ticket: broadcast::Receiver<ServerFrames>,
send_ticket: broadcast::Sender<ServerFrames>,
connection_type: Option<ticketing::ClientType>,
connection_type: Option<ConnectionType>,
db: Db,
shutdown: Shutdown,
_shutdown_complete: mpsc::Sender<()>,
@ -72,11 +71,6 @@ impl Listener {
async fn run(&mut self) -> crate::Result<()> {
info!("accepting inbound connections");
let (send_ticket, _): (
broadcast::Sender<ServerFrames>,
broadcast::Receiver<ServerFrames>,
) = broadcast::channel(4096);
loop {
let permit = self
.limit_connections
@ -86,11 +80,10 @@ impl Listener {
.unwrap();
let socket = self.accept().await?;
let address = socket.peer_addr()?;
let mut handler = Handler {
connection: Connection::new(socket),
send_ticket: send_ticket.clone(),
receive_ticket: send_ticket.subscribe(),
connection: Connection::new(address, socket),
connection_type: None,
db: self.db_holder.db(),
shutdown: Shutdown::new(self.notify_shutdown.subscribe()),
@ -130,40 +123,65 @@ impl Listener {
impl Handler {
async fn run(&mut self) -> crate::Result<()> {
let (send_message, mut receive_message): (
mpsc::Sender<ServerFrames>,
mpsc::Receiver<ServerFrames>,
) = mpsc::channel(1024);
while !self.shutdown.is_shutdown() {
let maybe_frame = tokio::select! {
res = self.connection.read_frame() => res?,
tokio::select! {
res = self.connection.read_frame() => {
match res? {
Some(frame) => {
info!("Received frame");
let _ = self.handle_client_frame(self.db.clone(), frame, send_message.clone()).await;
},
None => return Ok(()),
}
}
message = receive_message.recv() => {
match message {
Some(message) => {
let _ = self.connection.write_frame(message).await;
},
None => (),
}
}
_ = self.shutdown.recv() => {
debug!("Shutdown");
return Ok(());
}
message = self.receive_ticket.recv() => {
match message {
Ok(message) => {
match message {
ServerFrames::Ticket {
..
} => {
let _ = self.connection.write_frame(message).await;
}
_ => ()
}
},
Err(_) => return Ok(()),
}
None
}
};
}
let frame = match maybe_frame {
Some(frame) => frame,
None => return Ok(()),
};
Ok(())
}
fn set_connection_type(&mut self, connection_type: ConnectionType) {
match connection_type {
ConnectionType::Camera => {
self.connection_type = Some(connection_type);
}
ConnectionType::Dispatcher => {
self.connection_type = Some(connection_type);
}
}
}
async fn handle_client_frame(
&mut self,
db: Db,
frame: ClientFrames,
send_message: mpsc::Sender<ServerFrames>,
) -> crate::Result<()> {
match frame {
ClientFrames::Plate { plate, timestamp } => {
info!("Insert {plate} and {timestamp}");
self.db.insert_plate(plate, timestamp);
info!("Receive new plate {plate} {timestamp}");
db.insert_plate(
CameraId(self.connection.get_address()),
Plate { plate, timestamp },
);
}
ClientFrames::WantHeartbeat { interval } => {
info!("Want heartbeat: {interval}");
@ -172,32 +190,28 @@ impl Handler {
if self.connection_type.is_some() {
return Err("Already connected".into());
}
self.set_connection_type(ticketing::ClientType::Camera(road, mile));
info!("Road: {road}, mile: {mile}, limit: {limit}");
self.set_connection_type(ConnectionType::Camera);
info!("Set connection type to camera");
db.add_camera(
CameraId(self.connection.get_address()),
Camera { road, mile, limit },
);
}
ClientFrames::IAmDispatcher { roads } => {
if self.connection_type.is_some() {
return Err("Already connected".into());
}
self.set_connection_type(ticketing::ClientType::Dispatcher);
self.db
.add_dispatcher(roads.to_vec(), self.send_ticket.clone());
}
self.set_connection_type(ConnectionType::Dispatcher);
db.add_dispatcher(
DispatcherId(self.connection.get_address()),
roads.to_vec(),
send_message.clone(),
);
}
}
Ok(())
}
fn set_connection_type(&mut self, connection_type: ticketing::ClientType) {
match connection_type {
ticketing::ClientType::Camera(_, _) => {
self.connection_type = Some(connection_type);
}
ticketing::ClientType::Dispatcher => {
self.connection_type = Some(connection_type);
}
}
}
}

View file

@ -1,4 +0,0 @@
pub(crate) enum ClientType {
Camera(u16, u16),
Dispatcher,
}