From e37bb7cfa33ac15869090387da6e9d2e655a91a1 Mon Sep 17 00:00:00 2001 From: Bastian Gruber Date: Sat, 27 May 2023 06:25:34 +0200 Subject: [PATCH] First, naive and wrong attempt, have to add interval later on --- 4-grow-only-counter/src/main.rs | 92 +++++++++++++++++++++++++++------ 1 file changed, 77 insertions(+), 15 deletions(-) diff --git a/4-grow-only-counter/src/main.rs b/4-grow-only-counter/src/main.rs index 9a3e2b3..72ea85e 100644 --- a/4-grow-only-counter/src/main.rs +++ b/4-grow-only-counter/src/main.rs @@ -2,6 +2,9 @@ use std::io::{self, BufRead, Write}; use serde::{Deserialize, Serialize}; +const SEQ_KV: &str = "seq-kv"; +const KEY: &str = "counter"; + #[derive(Serialize, Deserialize, Debug)] struct Message { src: String, @@ -27,11 +30,12 @@ enum Body { in_reply_to: u64, }, Read { - msg_id: u64, + msg_id: Option, + key: Option, }, ReadOk { + in_reply_to: Option, value: u64, - in_reply_to: u64, }, Add { msg_id: u64, @@ -40,7 +44,6 @@ enum Body { AddOk { in_reply_to: u64, }, - //SEQ-KV Write { msg_id: u64, key: String, @@ -49,6 +52,16 @@ enum Body { WriteOk { in_reply_to: u64, }, + Cas { + msg_id: u64, + key: String, + from: u64, + to: u64, + }, + CasOk { + msg_id: u64, + in_reply_to: u64, + }, } fn main() { @@ -57,15 +70,39 @@ fn main() { let mut stderr = io::stderr(); let mut id = String::new(); + let mut counter = 0; + let mut tmp_counter = 0; for line in stdin.lock().lines() { + let input = serde_json::from_str::(&line.as_ref().unwrap()); + + if let Err(e) = input { + writeln!(stderr, "Error: {:?}", e).unwrap(); + stderr.flush().unwrap(); + continue; + } let input: Message = serde_json::from_str(&line.unwrap()).unwrap(); + match input.body { - Body::Error { text, .. } => { - let output_json = serde_json::to_string(&text).unwrap(); - writeln!(stderr, "{}", output_json).unwrap(); - stdout.flush().unwrap(); - } + Body::Error { code, .. } => match code { + 22 => { + let output = Message { + src: id.clone(), + dest: SEQ_KV.to_string(), + body: Body::Read { + msg_id: Some(2), + key: Some(KEY.to_string()), + }, + }; + let output_json = serde_json::to_string(&output).unwrap(); + writeln!(stdout, "{}", output_json).unwrap(); + stdout.flush().unwrap(); + } + _ => { + writeln!(stderr, "Error: {:?}", input).unwrap(); + stderr.flush().unwrap(); + } + }, Body::Init { msg_id, node_id, .. } => { @@ -82,31 +119,47 @@ fn main() { let output = Message { src: id.clone(), - dest: "seq-kv".to_string(), + dest: SEQ_KV.to_string(), body: Body::Write { - msg_id: 1, - key: "TEST".to_string(), - value: 42, + msg_id, + key: KEY.to_string(), + value: counter, }, }; let output_json = serde_json::to_string(&output).unwrap(); writeln!(stdout, "{}", output_json).unwrap(); stdout.flush().unwrap(); } - Body::Read { msg_id } => { + Body::Read { msg_id, .. } => { let output = Message { src: id.clone(), dest: input.src, body: Body::ReadOk { in_reply_to: msg_id, - value: 42, + value: counter, }, }; let output_json = serde_json::to_string(&output).unwrap(); writeln!(stdout, "{}", output_json).unwrap(); + stdout.flush().unwrap(); } - Body::Add { msg_id, .. } => { + Body::Add { msg_id, delta } => { + tmp_counter += delta; + + let output = Message { + src: id.clone(), + dest: SEQ_KV.to_string(), + body: Body::Cas { + msg_id, + key: KEY.to_string(), + from: counter, + to: tmp_counter, + }, + }; + let output_json = serde_json::to_string(&output).unwrap(); + writeln!(stdout, "{}", output_json).unwrap(); + let output = Message { src: id.clone(), dest: input.src, @@ -121,6 +174,15 @@ fn main() { Body::WriteOk { .. } => { // } + Body::AddOk { .. } => { + // + } + Body::CasOk { .. } => { + counter = tmp_counter; + } + Body::ReadOk { value, .. } => { + counter = value; + } _ => println!("Unhandled message: {:?}", input), } }