From 15f64eb8c832240358eab99b856968598ab10abd Mon Sep 17 00:00:00 2001 From: Bastian Gruber Date: Sun, 21 May 2023 14:38:53 +0200 Subject: [PATCH] Add heartbeat --- problem_06/bin/client.rs | 2 ++ problem_06/bin/server.rs | 1 - problem_06/src/db.rs | 1 - problem_06/src/heartbeat.rs | 39 +++++++++++++++++++++++++++++++++++++ problem_06/src/lib.rs | 17 +++++++--------- problem_06/src/server.rs | 6 +++++- 6 files changed, 53 insertions(+), 13 deletions(-) create mode 100644 problem_06/src/heartbeat.rs diff --git a/problem_06/bin/client.rs b/problem_06/bin/client.rs index 487e440..361f360 100644 --- a/problem_06/bin/client.rs +++ b/problem_06/bin/client.rs @@ -75,6 +75,7 @@ async fn test_all_different_messages( Ok(()) } +#[allow(dead_code)] async fn test_camera_connection( write: &mut WriteHalf<'_>, ) -> Result<(), Box> { @@ -99,6 +100,7 @@ async fn test_camera_connection( Ok(()) } +#[allow(dead_code)] async fn test_dipatcher_connection( write: &mut WriteHalf<'_>, ) -> Result<(), Box> { diff --git a/problem_06/bin/server.rs b/problem_06/bin/server.rs index a3fc428..25db771 100644 --- a/problem_06/bin/server.rs +++ b/problem_06/bin/server.rs @@ -5,7 +5,6 @@ use tokio::{net::TcpListener, signal}; pub async fn main() -> problem_06::Result<()> { tracing_subscriber::fmt::try_init().expect("Couldn't setup logging"); - // Bind a TCP listener let listener = TcpListener::bind(&format!("{DEFAULT_IP}:{DEFAULT_PORT}")).await?; let _ = server::run(listener, signal::ctrl_c()).await; diff --git a/problem_06/src/db.rs b/problem_06/src/db.rs index 0d2faa0..61e1683 100644 --- a/problem_06/src/db.rs +++ b/problem_06/src/db.rs @@ -200,6 +200,5 @@ impl Db { state .ticketed_plates_by_day .contains(&(Timestamp(day), plate_name)) - // debug!(?state); } } diff --git a/problem_06/src/heartbeat.rs b/problem_06/src/heartbeat.rs new file mode 100644 index 0000000..b0e971b --- /dev/null +++ b/problem_06/src/heartbeat.rs @@ -0,0 +1,39 @@ +use crate::frame::ServerFrames; +use std::time::Duration; +use tokio::sync::mpsc; + +pub(crate) struct Heartbeat { + is_running: bool, + interval: Duration, + message: mpsc::Sender, +} + +impl Heartbeat { + pub(crate) fn new(interval: u32, message: mpsc::Sender) -> Self { + Self { + is_running: false, + interval: Duration::from_millis((interval * 100) as u64), + message, + } + } + + pub(crate) async fn start(&mut self) { + if self.is_running { + let _ = self.message.send(ServerFrames::Error { + msg: "Heartbeat alreadt exists".to_string(), + }); + return; + } + + self.is_running = true; + + let mut interval = tokio::time::interval(self.interval); + + interval.tick().await; + + loop { + interval.tick().await; + let _ = self.message.send(ServerFrames::Heartbeat); + } + } +} diff --git a/problem_06/src/lib.rs b/problem_06/src/lib.rs index 9cb67cc..a4381be 100644 --- a/problem_06/src/lib.rs +++ b/problem_06/src/lib.rs @@ -1,16 +1,13 @@ mod connection; -pub use connection::Connection; - -pub mod frame; -pub use frame::ClientFrames; - -pub mod db; - +mod db; +mod frame; +mod heartbeat; pub mod server; - -pub mod ticketing; - mod shutdown; +mod ticketing; + +pub use connection::Connection; +pub use frame::ClientFrames; use shutdown::Shutdown; pub const DEFAULT_IP: &'static str = "0.0.0.0"; diff --git a/problem_06/src/server.rs b/problem_06/src/server.rs index ecc165b..21e8e34 100644 --- a/problem_06/src/server.rs +++ b/problem_06/src/server.rs @@ -11,6 +11,7 @@ use crate::{ connection::ConnectionType, db::{Camera, CameraId, Db, DbHolder, DispatcherId, Limit, Mile, Plate, Road, Timestamp}, frame::{ClientFrames, ServerFrames}, + heartbeat::Heartbeat, ticketing::issue_possible_ticket, Connection, Shutdown, }; @@ -200,7 +201,10 @@ impl Handler { .await; } ClientFrames::WantHeartbeat { interval } => { - info!("Want heartbeat: {interval}"); + tokio::spawn(async move { + let mut heartbeat = Heartbeat::new(interval, send_message.clone()); + heartbeat.start().await; + }); } ClientFrames::IAmCamera { road, mile, limit } => { if self.connection_type.is_some() {