From a06d96fcf07987083d5a87d27948940e3d1fd27b Mon Sep 17 00:00:00 2001 From: Bastian Gruber Date: Tue, 2 May 2023 16:45:41 +0200 Subject: [PATCH] Workflow without replacing address --- problem_05/Cargo.toml | 6 +++ problem_05/bin/client.rs | 56 ++++++++++++++++++++++++++++ problem_05/bin/server.rs | 79 +++++++++++++++++++++++----------------- 3 files changed, 107 insertions(+), 34 deletions(-) create mode 100644 problem_05/bin/client.rs diff --git a/problem_05/Cargo.toml b/problem_05/Cargo.toml index 2a321ee..c384fa4 100644 --- a/problem_05/Cargo.toml +++ b/problem_05/Cargo.toml @@ -7,7 +7,13 @@ edition = "2021" name = "server" path = "bin/server.rs" +[[bin]] +name = "client" +path = "bin/client.rs" + [dependencies] +futures = "0.3.28" tokio = { version = "1.14.0", features = ["full"] } +tokio-util = { version = "0.7.4", features = ["codec"] } tracing = "0.1.37" tracing-subscriber = "0.3.17" diff --git a/problem_05/bin/client.rs b/problem_05/bin/client.rs new file mode 100644 index 0000000..8af32a1 --- /dev/null +++ b/problem_05/bin/client.rs @@ -0,0 +1,56 @@ +use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter}; +use tokio::net::TcpStream; +use tokio::task; +use tracing::{error, info}; + +#[tokio::main] +async fn main() -> Result<(), Box> { + tracing_subscriber::fmt::try_init().expect("Tracing was not setup"); + + let stream = TcpStream::connect("0.0.0.0:1222").await?; + + let (reader, writer) = tokio::io::split(stream); + let mut buf_reader = BufReader::new(reader); + let mut writer = BufWriter::new(writer); + + let server_handle = task::spawn(async move { + let mut buf = String::new(); + + loop { + info!("Inside reading lines from server loop"); + if let Ok(n) = buf_reader.read_line(&mut buf).await { + if n > 0 { + info!("Receivng from server: {}", buf.trim_end()); + } else { + info!("Server is finished sending: {}", n); + return; + } + } else { + error!("Cannot receive"); + return; + } + + buf.clear(); + } + }); + + let std_handle = tokio::spawn(async move { + let mut stdin_reader = BufReader::new(tokio::io::stdin()).lines(); + while let Ok(Some(line)) = stdin_reader.next_line().await { + info!("Received line from stdin: {}", line); + + if let Err(_) = writer.write_all(line.as_bytes()).await { + error!("Error reading from std"); + break; + } + + let _ = writer.write_all(&[b'\n']).await; + let _ = writer.flush().await; + } + }); + + let _ = server_handle.await; + let _ = std_handle.await; + + Ok(()) +} diff --git a/problem_05/bin/server.rs b/problem_05/bin/server.rs index 75715df..77622e0 100644 --- a/problem_05/bin/server.rs +++ b/problem_05/bin/server.rs @@ -1,10 +1,10 @@ +use futures::{SinkExt, StreamExt}; use tokio::net::{TcpListener, TcpStream}; -use tracing::{info, error}; -use std::net::SocketAddr; -use tokio_util::codec::{Framed, LinesCodec}; -use tokio::sync::broadcast; +use tokio_util::codec::{FramedRead, FramedWrite, LinesCodec}; +use tracing::info; -const DEFAULT_IP: &str = "0.0.0.0"; + +const DEFAULT_IP: &str = "127.0.0.1"; const DEFAULT_PORT: &str = "1222"; const UPSTREAM_IP: &str = "206.189.113.124"; @@ -13,49 +13,60 @@ const UPSTREAM_PORT: &str = "16963"; type Error = Box; type Result = std::result::Result; -enum Events { - ClientRequest(String), - ClientResponse(String), - UpstreamRequest(String), - UpstreamResponse(String), -} - #[tokio::main] pub async fn main() -> Result<()> { - tracing_subscriber::fmt::try_init()?; + tracing_subscriber::fmt::try_init().expect("Cannot init tracing"); let listener = TcpListener::bind(&format!("{DEFAULT_IP}:{DEFAULT_PORT}")).await?; - let stream = TcpStream::connect(&format!("{UPSTREAM_IP}:{UPSTREAM_PORT}")).await?; - - let (sender, receiver) = broadcast::channel(2); info!("Start TCP server on {DEFAULT_IP}:{DEFAULT_PORT}"); - info!("Connect to upstream on {UPSTREAM_IP}:{UPSTREAM_PORT}"); - let listener_handle = tokio::spawn(async move { - loop { - let (socket, address) = listener.accept().await?; + loop { + let (socket, address) = listener + .accept() + .await + .expect("Cannot establish connection"); - tokio::spawn(async move { - info!("New request from: {address}"); - let _ = handle_request(socket).await; - }); + info!("New request from: {address}"); + + let upstream = TcpStream::connect(&format!("{UPSTREAM_IP}:{UPSTREAM_PORT}")) + .await + .expect("Cannot establish upstream connection"); + + info!("Connect to upstream on {UPSTREAM_IP}:{UPSTREAM_PORT}"); + + tokio::spawn(async move { + let _ = handle_request(socket, upstream).await; + }); + } +} + +pub async fn handle_request(socket: TcpStream, upstream: TcpStream) -> Result<()> { + let (client_read, client_write) = socket.into_split(); + let mut framed_client_read = FramedRead::new(client_read, LinesCodec::new()); + let mut framed_client_write = FramedWrite::new(client_write, LinesCodec::new()); + + let (server_read, server_write) = upstream.into_split(); + let mut farmed_server_read = FramedRead::new(server_read, LinesCodec::new()); + let mut framed_server_write = FramedWrite::new(server_write, LinesCodec::new()); + + + let read_client_write_upstream = tokio::spawn(async move { + while let Some(Ok(request)) = framed_client_read.next().await { + info!("Send upstream: {request}"); + let _ = framed_server_write.send(request).await; } }); - let upstream_handle = tokio::spawn({ - loop { - + let read_upstream_write_client = tokio::spawn(async move { + while let Some(Ok(response)) = farmed_server_read.next().await { + info!("Send to client: {response}"); + let _ = framed_client_write.send(response).await; } }); - let _ = listener_handle.await; - let _ = upstream_handle.await; + let _ = read_client_write_upstream.await; + let _ = read_upstream_write_client.await; Ok(()) - -} - -pub async fn handle_request(mut socket: TcpStream) -> Result<()> { - let framed = Framed::new(socket, LinesCodec::new()); }