Improve logging
This commit is contained in:
parent
6ac3c46b37
commit
34c6343d63
5 changed files with 10 additions and 52 deletions
|
|
@ -5,7 +5,6 @@ use tokio::{
|
||||||
io::{AsyncReadExt, AsyncWriteExt, BufWriter},
|
io::{AsyncReadExt, AsyncWriteExt, BufWriter},
|
||||||
net::TcpStream,
|
net::TcpStream,
|
||||||
};
|
};
|
||||||
use tracing::info;
|
|
||||||
|
|
||||||
use crate::frame::{self, ClientFrames, ServerFrames};
|
use crate::frame::{self, ClientFrames, ServerFrames};
|
||||||
|
|
||||||
|
|
@ -35,11 +34,8 @@ impl Connection {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn read_frame(&mut self) -> crate::Result<Option<ClientFrames>> {
|
pub async fn read_frame(&mut self) -> crate::Result<Option<ClientFrames>> {
|
||||||
info!("Start read_frame");
|
|
||||||
loop {
|
loop {
|
||||||
info!("Looping to self.parse_frame");
|
|
||||||
if let Some(frame) = self.parse_frame()? {
|
if let Some(frame) = self.parse_frame()? {
|
||||||
info!("parse_frame got a result: {frame:?}");
|
|
||||||
return Ok(Some(frame));
|
return Ok(Some(frame));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -5,7 +5,7 @@ use std::{
|
||||||
};
|
};
|
||||||
|
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
use tracing::{debug, info};
|
use tracing::info;
|
||||||
|
|
||||||
use crate::frame::ServerFrames;
|
use crate::frame::ServerFrames;
|
||||||
|
|
||||||
|
|
@ -115,7 +115,6 @@ impl Db {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn add_camera(&self, camera_id: CameraId, camera: Camera) {
|
pub(crate) fn add_camera(&self, camera_id: CameraId, camera: Camera) {
|
||||||
info!("Add new camera: {camera:?}");
|
|
||||||
let mut state = self.state.lock().unwrap();
|
let mut state = self.state.lock().unwrap();
|
||||||
state.cameras.insert(camera_id, camera);
|
state.cameras.insert(camera_id, camera);
|
||||||
}
|
}
|
||||||
|
|
@ -126,7 +125,7 @@ impl Db {
|
||||||
roads: Vec<u16>,
|
roads: Vec<u16>,
|
||||||
writer_stream: mpsc::Sender<ServerFrames>,
|
writer_stream: mpsc::Sender<ServerFrames>,
|
||||||
) {
|
) {
|
||||||
info!("Add new dispatcher: {roads:?}");
|
info!("Adding new dispatcher for raods: {roads:?}");
|
||||||
let mut state = self.state.lock().unwrap();
|
let mut state = self.state.lock().unwrap();
|
||||||
|
|
||||||
for r in roads.iter() {
|
for r in roads.iter() {
|
||||||
|
|
@ -149,7 +148,6 @@ impl Db {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn add_open_ticket(&self, ticket: Ticket) {
|
pub(crate) fn add_open_ticket(&self, ticket: Ticket) {
|
||||||
info!("Add open ticket: {ticket:?}");
|
|
||||||
let mut state = self.state.lock().unwrap();
|
let mut state = self.state.lock().unwrap();
|
||||||
state
|
state
|
||||||
.open_tickets
|
.open_tickets
|
||||||
|
|
@ -181,13 +179,11 @@ impl Db {
|
||||||
road: Road,
|
road: Road,
|
||||||
) -> Option<Vec<(Mile, Timestamp)>> {
|
) -> Option<Vec<(Mile, Timestamp)>> {
|
||||||
let state = self.state.lock().unwrap();
|
let state = self.state.lock().unwrap();
|
||||||
debug!(?state);
|
|
||||||
state.plates.get(&(plate.plate, road)).cloned()
|
state.plates.get(&(plate.plate, road)).cloned()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn add_plate(&self, camera_id: CameraId, plate: Plate) {
|
pub(crate) fn add_plate(&self, camera_id: CameraId, plate: Plate) {
|
||||||
//TODO: Check if the same plate was already added for the road AND MILE
|
//TODO: Check if the same plate was already added for the road AND MILE
|
||||||
info!("Add car: {plate:?}");
|
|
||||||
let camera = self.get_camera(camera_id).unwrap();
|
let camera = self.get_camera(camera_id).unwrap();
|
||||||
let mut state = self.state.lock().unwrap();
|
let mut state = self.state.lock().unwrap();
|
||||||
|
|
||||||
|
|
@ -206,7 +202,6 @@ impl Db {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn ticket_plate(&self, day: u32, plate_name: PlateName) {
|
pub(crate) fn ticket_plate(&self, day: u32, plate_name: PlateName) {
|
||||||
info!("Add ticket for day: {day}:{}", plate_name.0);
|
|
||||||
let mut state = self.state.lock().unwrap();
|
let mut state = self.state.lock().unwrap();
|
||||||
state
|
state
|
||||||
.ticketed_plates_by_day
|
.ticketed_plates_by_day
|
||||||
|
|
|
||||||
|
|
@ -1,7 +1,6 @@
|
||||||
use std::{fmt, io::Cursor, num::TryFromIntError, string::FromUtf8Error};
|
use std::{fmt, io::Cursor, num::TryFromIntError, string::FromUtf8Error};
|
||||||
|
|
||||||
use bytes::{Buf, BufMut, BytesMut};
|
use bytes::{Buf, BufMut, BytesMut};
|
||||||
use tracing::info;
|
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
pub enum ClientFrames {
|
pub enum ClientFrames {
|
||||||
|
|
@ -36,7 +35,6 @@ pub enum Error {
|
||||||
|
|
||||||
impl ClientFrames {
|
impl ClientFrames {
|
||||||
pub fn check(src: &mut Cursor<&[u8]>) -> Result<(), Error> {
|
pub fn check(src: &mut Cursor<&[u8]>) -> Result<(), Error> {
|
||||||
info!("Checking message: {src:?}");
|
|
||||||
match get_u8(src)? {
|
match get_u8(src)? {
|
||||||
// Error: msg: str (Server -> Client)
|
// Error: msg: str (Server -> Client)
|
||||||
// 0x10 => {
|
// 0x10 => {
|
||||||
|
|
@ -78,10 +76,8 @@ impl ClientFrames {
|
||||||
}
|
}
|
||||||
// IAmDispatcher: numroads: u8, roads: [u16]
|
// IAmDispatcher: numroads: u8, roads: [u16]
|
||||||
0x81 => {
|
0x81 => {
|
||||||
info!("Checking IAmDispatcher message: {src:?}");
|
|
||||||
// numroads
|
// numroads
|
||||||
let amount = get_u8(src)? * 2;
|
let amount = get_u8(src)? * 2;
|
||||||
info!("amount of roads: {amount:?}");
|
|
||||||
// roads
|
// roads
|
||||||
skip(src, amount as usize)?;
|
skip(src, amount as usize)?;
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|
@ -223,10 +219,6 @@ fn get_u16_vec<'a>(src: &mut Cursor<&'a [u8]>, len: usize) -> Result<Vec<u16>, E
|
||||||
}
|
}
|
||||||
|
|
||||||
fn skip(src: &mut Cursor<&[u8]>, n: usize) -> Result<(), Error> {
|
fn skip(src: &mut Cursor<&[u8]>, n: usize) -> Result<(), Error> {
|
||||||
info!(
|
|
||||||
"Bytes left: src: {src:?}: n: {n}, remaining: {}",
|
|
||||||
src.remaining()
|
|
||||||
);
|
|
||||||
if src.remaining() < n {
|
if src.remaining() < n {
|
||||||
return Err(Error::Incomplete);
|
return Err(Error::Incomplete);
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -5,7 +5,7 @@ use tokio::{
|
||||||
sync::{broadcast, mpsc, Semaphore},
|
sync::{broadcast, mpsc, Semaphore},
|
||||||
time::{self, Duration},
|
time::{self, Duration},
|
||||||
};
|
};
|
||||||
use tracing::{debug, error, info};
|
use tracing::error;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
connection::ConnectionType,
|
connection::ConnectionType,
|
||||||
|
|
@ -56,7 +56,7 @@ pub async fn run(listener: TcpListener, shutdown: impl Future) -> crate::Result<
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
_ = shutdown => {
|
_ = shutdown => {
|
||||||
info!("shutting down");
|
// Shutdown signal received
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -76,8 +76,6 @@ pub async fn run(listener: TcpListener, shutdown: impl Future) -> crate::Result<
|
||||||
|
|
||||||
impl Listener {
|
impl Listener {
|
||||||
async fn run(&mut self) -> crate::Result<()> {
|
async fn run(&mut self) -> crate::Result<()> {
|
||||||
info!("accepting inbound connections");
|
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
let permit = self
|
let permit = self
|
||||||
.limit_connections
|
.limit_connections
|
||||||
|
|
@ -97,8 +95,6 @@ impl Listener {
|
||||||
_shutdown_complete: self.shutdown_complete_tx.clone(),
|
_shutdown_complete: self.shutdown_complete_tx.clone(),
|
||||||
};
|
};
|
||||||
|
|
||||||
info!("Created new handler");
|
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
if let Err(err) = handler.run().await {
|
if let Err(err) = handler.run().await {
|
||||||
error!(cause = ?err, "connection error");
|
error!(cause = ?err, "connection error");
|
||||||
|
|
@ -138,10 +134,8 @@ impl Handler {
|
||||||
while !self.shutdown.is_shutdown() {
|
while !self.shutdown.is_shutdown() {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
res = self.connection.read_frame() => {
|
res = self.connection.read_frame() => {
|
||||||
info!("Reading from frame: {res:?}");
|
|
||||||
match res? {
|
match res? {
|
||||||
Some(frame) => {
|
Some(frame) => {
|
||||||
info!("Received frame");
|
|
||||||
if let Err(e) = self.handle_client_frame(self.db.clone(), frame, send_message.clone()).await {
|
if let Err(e) = self.handle_client_frame(self.db.clone(), frame, send_message.clone()).await {
|
||||||
error!("Error handling frame: {e:?}");
|
error!("Error handling frame: {e:?}");
|
||||||
}
|
}
|
||||||
|
|
@ -150,7 +144,6 @@ impl Handler {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
message = receive_message.recv() => {
|
message = receive_message.recv() => {
|
||||||
info!("Reading from channel: {message:?}");
|
|
||||||
match message {
|
match message {
|
||||||
Some(message) => {
|
Some(message) => {
|
||||||
let _ = self.connection.write_frame(message).await;
|
let _ = self.connection.write_frame(message).await;
|
||||||
|
|
@ -159,7 +152,6 @@ impl Handler {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
_ = self.shutdown.recv() => {
|
_ = self.shutdown.recv() => {
|
||||||
debug!("Shutdown");
|
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
@ -185,10 +177,8 @@ impl Handler {
|
||||||
frame: ClientFrames,
|
frame: ClientFrames,
|
||||||
send_message: mpsc::Sender<ServerFrames>,
|
send_message: mpsc::Sender<ServerFrames>,
|
||||||
) -> crate::Result<()> {
|
) -> crate::Result<()> {
|
||||||
debug!(?frame);
|
|
||||||
match frame {
|
match frame {
|
||||||
ClientFrames::Plate { plate, timestamp } => {
|
ClientFrames::Plate { plate, timestamp } => {
|
||||||
info!("Receive new plate {plate} {timestamp}");
|
|
||||||
issue_possible_ticket(
|
issue_possible_ticket(
|
||||||
&mut db,
|
&mut db,
|
||||||
Plate {
|
Plate {
|
||||||
|
|
@ -200,7 +190,6 @@ impl Handler {
|
||||||
.await;
|
.await;
|
||||||
}
|
}
|
||||||
ClientFrames::WantHeartbeat { interval } => {
|
ClientFrames::WantHeartbeat { interval } => {
|
||||||
info!("Receive hearbet request with interval {interval}");
|
|
||||||
if interval > 0 {
|
if interval > 0 {
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
let mut heartbeat = Heartbeat::new(interval, send_message.clone());
|
let mut heartbeat = Heartbeat::new(interval, send_message.clone());
|
||||||
|
|
@ -209,12 +198,10 @@ impl Handler {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
ClientFrames::IAmCamera { road, mile, limit } => {
|
ClientFrames::IAmCamera { road, mile, limit } => {
|
||||||
info!("Receive new camera {road} {mile} {limit}");
|
|
||||||
if self.connection_type.is_some() {
|
if self.connection_type.is_some() {
|
||||||
return Err("Already connected".into());
|
return Err("Already connected".into());
|
||||||
}
|
}
|
||||||
self.set_connection_type(ConnectionType::Camera);
|
self.set_connection_type(ConnectionType::Camera);
|
||||||
info!("Set connection type to camera");
|
|
||||||
|
|
||||||
db.add_camera(
|
db.add_camera(
|
||||||
CameraId(self.connection.get_address()),
|
CameraId(self.connection.get_address()),
|
||||||
|
|
@ -226,7 +213,6 @@ impl Handler {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
ClientFrames::IAmDispatcher { roads } => {
|
ClientFrames::IAmDispatcher { roads } => {
|
||||||
info!("Receive new dispatcher {roads:?}");
|
|
||||||
if self.connection_type.is_some() {
|
if self.connection_type.is_some() {
|
||||||
return Err("Already connected".into());
|
return Err("Already connected".into());
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,19 +1,16 @@
|
||||||
use tracing::debug;
|
use tracing::info;
|
||||||
|
|
||||||
use crate::db::{CameraId, Db, Plate, Road, Ticket};
|
use crate::db::{CameraId, Db, Plate, Road, Ticket};
|
||||||
|
|
||||||
pub(crate) async fn issue_possible_ticket(db: &mut Db, plate: Plate, camera_id: CameraId) {
|
pub(crate) async fn issue_possible_ticket(db: &mut Db, plate: Plate, camera_id: CameraId) {
|
||||||
debug!("Issue possible ticket");
|
|
||||||
let camera = db.get_camera(camera_id.clone()).unwrap();
|
let camera = db.get_camera(camera_id.clone()).unwrap();
|
||||||
let observed_plates = db.get_plates_by_road(plate.clone(), camera.road.clone());
|
let observed_plates = db.get_plates_by_road(plate.clone(), camera.road.clone());
|
||||||
|
|
||||||
if observed_plates.is_none() {
|
if observed_plates.is_none() {
|
||||||
debug!("No observed plates");
|
|
||||||
db.add_plate(camera_id, plate);
|
db.add_plate(camera_id, plate);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
debug!(?observed_plates, "Observed plates");
|
|
||||||
let mile = camera.mile;
|
let mile = camera.mile;
|
||||||
let limit = camera.limit;
|
let limit = camera.limit;
|
||||||
let road = camera.road;
|
let road = camera.road;
|
||||||
|
|
@ -21,14 +18,6 @@ pub(crate) async fn issue_possible_ticket(db: &mut Db, plate: Plate, camera_id:
|
||||||
let plate_name = plate.clone().plate;
|
let plate_name = plate.clone().plate;
|
||||||
let timestamp = plate.clone().timestamp;
|
let timestamp = plate.clone().timestamp;
|
||||||
|
|
||||||
debug!(
|
|
||||||
?plate_name,
|
|
||||||
?timestamp,
|
|
||||||
?mile,
|
|
||||||
?limit,
|
|
||||||
?road,
|
|
||||||
"Checking plate"
|
|
||||||
);
|
|
||||||
for (m, t) in observed_plates.unwrap().iter() {
|
for (m, t) in observed_plates.unwrap().iter() {
|
||||||
let distance = if mile > *m {
|
let distance = if mile > *m {
|
||||||
mile.0 - m.0
|
mile.0 - m.0
|
||||||
|
|
@ -44,7 +33,6 @@ pub(crate) async fn issue_possible_ticket(db: &mut Db, plate: Plate, camera_id:
|
||||||
|
|
||||||
let speed = (distance as u64 * 3600 * 100 / time as u64) as u16;
|
let speed = (distance as u64 * 3600 * 100 / time as u64) as u16;
|
||||||
|
|
||||||
debug!(?distance, ?time, ?speed, "Checking speed");
|
|
||||||
if speed > limit.0 * 100 {
|
if speed > limit.0 * 100 {
|
||||||
let ticket = Ticket {
|
let ticket = Ticket {
|
||||||
plate: plate_name.clone().0,
|
plate: plate_name.clone().0,
|
||||||
|
|
@ -60,20 +48,21 @@ pub(crate) async fn issue_possible_ticket(db: &mut Db, plate: Plate, camera_id:
|
||||||
let day_end = timestamp2 / 86400;
|
let day_end = timestamp2 / 86400;
|
||||||
|
|
||||||
for day in day_start..=day_end {
|
for day in day_start..=day_end {
|
||||||
|
info!("Day {day} for {ticket:?}");
|
||||||
if db.is_plate_ticketed_for_day(day, plate_name.clone()) {
|
if db.is_plate_ticketed_for_day(day, plate_name.clone()) {
|
||||||
debug!(?ticket, "Ticket already issued");
|
info!("Ticket already issued: {ticket:?}");
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
let dispatcher = db.get_dispatcher_for_road(road.clone());
|
let dispatcher = db.get_dispatcher_for_road(road.clone());
|
||||||
|
|
||||||
if dispatcher.is_none() {
|
if dispatcher.is_none() {
|
||||||
debug!(?ticket, "No dispatcher for road");
|
info!("No dispatcher yet for this road: {ticket:?}");
|
||||||
db.add_open_ticket(ticket.clone());
|
db.add_open_ticket(ticket.clone());
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
debug!(?ticket, "Sending ticket");
|
info!("Sending ticket: {ticket:?}");
|
||||||
let _ = dispatcher.unwrap().send(ticket.clone().into()).await;
|
let _ = dispatcher.unwrap().send(ticket.clone().into()).await;
|
||||||
db.ticket_plate(day, plate_name.clone());
|
db.ticket_plate(day, plate_name.clone());
|
||||||
}
|
}
|
||||||
|
|
@ -85,7 +74,7 @@ pub(crate) async fn issue_possible_ticket(db: &mut Db, plate: Plate, camera_id:
|
||||||
|
|
||||||
pub(crate) async fn send_out_waiting_tickets(db: Db) {
|
pub(crate) async fn send_out_waiting_tickets(db: Db) {
|
||||||
let tickets = db.get_open_tickets();
|
let tickets = db.get_open_tickets();
|
||||||
debug!(?tickets, "Sending out waiting tickets");
|
info!("Sending out waiting tickets: {tickets:?}");
|
||||||
for ticket in tickets {
|
for ticket in tickets {
|
||||||
if let Some(dispatcher) = db.get_dispatcher_for_road(Road(ticket.road)) {
|
if let Some(dispatcher) = db.get_dispatcher_for_road(Road(ticket.road)) {
|
||||||
let _ = dispatcher.send(ticket.clone().into()).await;
|
let _ = dispatcher.send(ticket.clone().into()).await;
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue