Workflow without replacing address
This commit is contained in:
parent
aeebaff9f7
commit
a06d96fcf0
3 changed files with 107 additions and 34 deletions
|
|
@ -7,7 +7,13 @@ edition = "2021"
|
||||||
name = "server"
|
name = "server"
|
||||||
path = "bin/server.rs"
|
path = "bin/server.rs"
|
||||||
|
|
||||||
|
[[bin]]
|
||||||
|
name = "client"
|
||||||
|
path = "bin/client.rs"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
|
futures = "0.3.28"
|
||||||
tokio = { version = "1.14.0", features = ["full"] }
|
tokio = { version = "1.14.0", features = ["full"] }
|
||||||
|
tokio-util = { version = "0.7.4", features = ["codec"] }
|
||||||
tracing = "0.1.37"
|
tracing = "0.1.37"
|
||||||
tracing-subscriber = "0.3.17"
|
tracing-subscriber = "0.3.17"
|
||||||
|
|
|
||||||
56
problem_05/bin/client.rs
Normal file
56
problem_05/bin/client.rs
Normal file
|
|
@ -0,0 +1,56 @@
|
||||||
|
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter};
|
||||||
|
use tokio::net::TcpStream;
|
||||||
|
use tokio::task;
|
||||||
|
use tracing::{error, info};
|
||||||
|
|
||||||
|
#[tokio::main]
|
||||||
|
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
tracing_subscriber::fmt::try_init().expect("Tracing was not setup");
|
||||||
|
|
||||||
|
let stream = TcpStream::connect("0.0.0.0:1222").await?;
|
||||||
|
|
||||||
|
let (reader, writer) = tokio::io::split(stream);
|
||||||
|
let mut buf_reader = BufReader::new(reader);
|
||||||
|
let mut writer = BufWriter::new(writer);
|
||||||
|
|
||||||
|
let server_handle = task::spawn(async move {
|
||||||
|
let mut buf = String::new();
|
||||||
|
|
||||||
|
loop {
|
||||||
|
info!("Inside reading lines from server loop");
|
||||||
|
if let Ok(n) = buf_reader.read_line(&mut buf).await {
|
||||||
|
if n > 0 {
|
||||||
|
info!("Receivng from server: {}", buf.trim_end());
|
||||||
|
} else {
|
||||||
|
info!("Server is finished sending: {}", n);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
error!("Cannot receive");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
buf.clear();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
let std_handle = tokio::spawn(async move {
|
||||||
|
let mut stdin_reader = BufReader::new(tokio::io::stdin()).lines();
|
||||||
|
while let Ok(Some(line)) = stdin_reader.next_line().await {
|
||||||
|
info!("Received line from stdin: {}", line);
|
||||||
|
|
||||||
|
if let Err(_) = writer.write_all(line.as_bytes()).await {
|
||||||
|
error!("Error reading from std");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
let _ = writer.write_all(&[b'\n']).await;
|
||||||
|
let _ = writer.flush().await;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
let _ = server_handle.await;
|
||||||
|
let _ = std_handle.await;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
@ -1,10 +1,10 @@
|
||||||
|
use futures::{SinkExt, StreamExt};
|
||||||
use tokio::net::{TcpListener, TcpStream};
|
use tokio::net::{TcpListener, TcpStream};
|
||||||
use tracing::{info, error};
|
use tokio_util::codec::{FramedRead, FramedWrite, LinesCodec};
|
||||||
use std::net::SocketAddr;
|
use tracing::info;
|
||||||
use tokio_util::codec::{Framed, LinesCodec};
|
|
||||||
use tokio::sync::broadcast;
|
|
||||||
|
|
||||||
const DEFAULT_IP: &str = "0.0.0.0";
|
|
||||||
|
const DEFAULT_IP: &str = "127.0.0.1";
|
||||||
const DEFAULT_PORT: &str = "1222";
|
const DEFAULT_PORT: &str = "1222";
|
||||||
|
|
||||||
const UPSTREAM_IP: &str = "206.189.113.124";
|
const UPSTREAM_IP: &str = "206.189.113.124";
|
||||||
|
|
@ -13,49 +13,60 @@ const UPSTREAM_PORT: &str = "16963";
|
||||||
type Error = Box<dyn std::error::Error + Send + Sync>;
|
type Error = Box<dyn std::error::Error + Send + Sync>;
|
||||||
type Result<T> = std::result::Result<T, Error>;
|
type Result<T> = std::result::Result<T, Error>;
|
||||||
|
|
||||||
enum Events {
|
|
||||||
ClientRequest(String),
|
|
||||||
ClientResponse(String),
|
|
||||||
UpstreamRequest(String),
|
|
||||||
UpstreamResponse(String),
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
pub async fn main() -> Result<()> {
|
pub async fn main() -> Result<()> {
|
||||||
tracing_subscriber::fmt::try_init()?;
|
tracing_subscriber::fmt::try_init().expect("Cannot init tracing");
|
||||||
|
|
||||||
let listener = TcpListener::bind(&format!("{DEFAULT_IP}:{DEFAULT_PORT}")).await?;
|
let listener = TcpListener::bind(&format!("{DEFAULT_IP}:{DEFAULT_PORT}")).await?;
|
||||||
let stream = TcpStream::connect(&format!("{UPSTREAM_IP}:{UPSTREAM_PORT}")).await?;
|
|
||||||
|
|
||||||
let (sender, receiver) = broadcast::channel(2);
|
|
||||||
|
|
||||||
info!("Start TCP server on {DEFAULT_IP}:{DEFAULT_PORT}");
|
info!("Start TCP server on {DEFAULT_IP}:{DEFAULT_PORT}");
|
||||||
info!("Connect to upstream on {UPSTREAM_IP}:{UPSTREAM_PORT}");
|
|
||||||
|
|
||||||
let listener_handle = tokio::spawn(async move {
|
loop {
|
||||||
loop {
|
let (socket, address) = listener
|
||||||
let (socket, address) = listener.accept().await?;
|
.accept()
|
||||||
|
.await
|
||||||
|
.expect("Cannot establish connection");
|
||||||
|
|
||||||
tokio::spawn(async move {
|
info!("New request from: {address}");
|
||||||
info!("New request from: {address}");
|
|
||||||
let _ = handle_request(socket).await;
|
let upstream = TcpStream::connect(&format!("{UPSTREAM_IP}:{UPSTREAM_PORT}"))
|
||||||
});
|
.await
|
||||||
|
.expect("Cannot establish upstream connection");
|
||||||
|
|
||||||
|
info!("Connect to upstream on {UPSTREAM_IP}:{UPSTREAM_PORT}");
|
||||||
|
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let _ = handle_request(socket, upstream).await;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn handle_request(socket: TcpStream, upstream: TcpStream) -> Result<()> {
|
||||||
|
let (client_read, client_write) = socket.into_split();
|
||||||
|
let mut framed_client_read = FramedRead::new(client_read, LinesCodec::new());
|
||||||
|
let mut framed_client_write = FramedWrite::new(client_write, LinesCodec::new());
|
||||||
|
|
||||||
|
let (server_read, server_write) = upstream.into_split();
|
||||||
|
let mut farmed_server_read = FramedRead::new(server_read, LinesCodec::new());
|
||||||
|
let mut framed_server_write = FramedWrite::new(server_write, LinesCodec::new());
|
||||||
|
|
||||||
|
|
||||||
|
let read_client_write_upstream = tokio::spawn(async move {
|
||||||
|
while let Some(Ok(request)) = framed_client_read.next().await {
|
||||||
|
info!("Send upstream: {request}");
|
||||||
|
let _ = framed_server_write.send(request).await;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
let upstream_handle = tokio::spawn({
|
let read_upstream_write_client = tokio::spawn(async move {
|
||||||
loop {
|
while let Some(Ok(response)) = farmed_server_read.next().await {
|
||||||
|
info!("Send to client: {response}");
|
||||||
|
let _ = framed_client_write.send(response).await;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
let _ = listener_handle.await;
|
let _ = read_client_write_upstream.await;
|
||||||
let _ = upstream_handle.await;
|
let _ = read_upstream_write_client.await;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn handle_request(mut socket: TcpStream) -> Result<()> {
|
|
||||||
let framed = Framed::new(socket, LinesCodec::new());
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue