Add heartbeat

This commit is contained in:
Bastian Gruber 2023-05-21 14:38:53 +02:00
parent 6e4c12a9b6
commit 15f64eb8c8
No known key found for this signature in database
GPG key ID: BE9F8C772B188CBF
6 changed files with 53 additions and 13 deletions

View file

@ -75,6 +75,7 @@ async fn test_all_different_messages(
Ok(()) Ok(())
} }
#[allow(dead_code)]
async fn test_camera_connection( async fn test_camera_connection(
write: &mut WriteHalf<'_>, write: &mut WriteHalf<'_>,
) -> Result<(), Box<dyn std::error::Error>> { ) -> Result<(), Box<dyn std::error::Error>> {
@ -99,6 +100,7 @@ async fn test_camera_connection(
Ok(()) Ok(())
} }
#[allow(dead_code)]
async fn test_dipatcher_connection( async fn test_dipatcher_connection(
write: &mut WriteHalf<'_>, write: &mut WriteHalf<'_>,
) -> Result<(), Box<dyn std::error::Error>> { ) -> Result<(), Box<dyn std::error::Error>> {

View file

@ -5,7 +5,6 @@ use tokio::{net::TcpListener, signal};
pub async fn main() -> problem_06::Result<()> { pub async fn main() -> problem_06::Result<()> {
tracing_subscriber::fmt::try_init().expect("Couldn't setup logging"); tracing_subscriber::fmt::try_init().expect("Couldn't setup logging");
// Bind a TCP listener
let listener = TcpListener::bind(&format!("{DEFAULT_IP}:{DEFAULT_PORT}")).await?; let listener = TcpListener::bind(&format!("{DEFAULT_IP}:{DEFAULT_PORT}")).await?;
let _ = server::run(listener, signal::ctrl_c()).await; let _ = server::run(listener, signal::ctrl_c()).await;

View file

@ -200,6 +200,5 @@ impl Db {
state state
.ticketed_plates_by_day .ticketed_plates_by_day
.contains(&(Timestamp(day), plate_name)) .contains(&(Timestamp(day), plate_name))
// debug!(?state);
} }
} }

View file

@ -0,0 +1,39 @@
use crate::frame::ServerFrames;
use std::time::Duration;
use tokio::sync::mpsc;
pub(crate) struct Heartbeat {
is_running: bool,
interval: Duration,
message: mpsc::Sender<ServerFrames>,
}
impl Heartbeat {
pub(crate) fn new(interval: u32, message: mpsc::Sender<ServerFrames>) -> Self {
Self {
is_running: false,
interval: Duration::from_millis((interval * 100) as u64),
message,
}
}
pub(crate) async fn start(&mut self) {
if self.is_running {
let _ = self.message.send(ServerFrames::Error {
msg: "Heartbeat alreadt exists".to_string(),
});
return;
}
self.is_running = true;
let mut interval = tokio::time::interval(self.interval);
interval.tick().await;
loop {
interval.tick().await;
let _ = self.message.send(ServerFrames::Heartbeat);
}
}
}

View file

@ -1,16 +1,13 @@
mod connection; mod connection;
pub use connection::Connection; mod db;
mod frame;
pub mod frame; mod heartbeat;
pub use frame::ClientFrames;
pub mod db;
pub mod server; pub mod server;
pub mod ticketing;
mod shutdown; mod shutdown;
mod ticketing;
pub use connection::Connection;
pub use frame::ClientFrames;
use shutdown::Shutdown; use shutdown::Shutdown;
pub const DEFAULT_IP: &'static str = "0.0.0.0"; pub const DEFAULT_IP: &'static str = "0.0.0.0";

View file

@ -11,6 +11,7 @@ use crate::{
connection::ConnectionType, connection::ConnectionType,
db::{Camera, CameraId, Db, DbHolder, DispatcherId, Limit, Mile, Plate, Road, Timestamp}, db::{Camera, CameraId, Db, DbHolder, DispatcherId, Limit, Mile, Plate, Road, Timestamp},
frame::{ClientFrames, ServerFrames}, frame::{ClientFrames, ServerFrames},
heartbeat::Heartbeat,
ticketing::issue_possible_ticket, ticketing::issue_possible_ticket,
Connection, Shutdown, Connection, Shutdown,
}; };
@ -200,7 +201,10 @@ impl Handler {
.await; .await;
} }
ClientFrames::WantHeartbeat { interval } => { ClientFrames::WantHeartbeat { interval } => {
info!("Want heartbeat: {interval}"); tokio::spawn(async move {
let mut heartbeat = Heartbeat::new(interval, send_message.clone());
heartbeat.start().await;
});
} }
ClientFrames::IAmCamera { road, mile, limit } => { ClientFrames::IAmCamera { road, mile, limit } => {
if self.connection_type.is_some() { if self.connection_type.is_some() {