From b56357c0856acd0af0ca4c75ff15c76358e17fb2 Mon Sep 17 00:00:00 2001 From: MedzikUser Date: Wed, 14 Sep 2022 21:03:49 +0200 Subject: [PATCH] fix(server): change from task::spawn to thread::spawn Changed to thread::spawn due to task::spawn spawn max 2 tasks (accept max 2 clients) if your cpu has 2 cores. --- src/server/client.rs | 7 ++++++- src/server/run.rs | 44 ++++++++++++++++++++++++++++++++++---------- 2 files changed, 40 insertions(+), 11 deletions(-) diff --git a/src/server/client.rs b/src/server/client.rs index 444bb0a..42e3a25 100644 --- a/src/server/client.rs +++ b/src/server/client.rs @@ -6,6 +6,7 @@ use std::{ sync::{Arc, Mutex}, }; +use tracing::info; use tungstenite::{accept, Message, WebSocket}; use super::run::PLUGINS_MANAGER; @@ -131,6 +132,8 @@ impl Client { msg.pop(); } + info!("[Recieved]: {}", msg); + Ok(msg) } @@ -150,10 +153,12 @@ impl Client { match &self.stream { ClientStream::TCP(stream) => stream.as_ref().write_all(buf)?, ClientStream::WebSocket(stream) => { - stream.lock().unwrap().write_message(Message::from(msg))? + stream.lock().unwrap().write_message(Message::from(buf))? }, } + info!("[Sent]: {}", msg); + Ok(()) } diff --git a/src/server/run.rs b/src/server/run.rs index a58d327..0436518 100644 --- a/src/server/run.rs +++ b/src/server/run.rs @@ -1,5 +1,6 @@ -use std::net::TcpListener; +use std::{net::TcpListener, thread}; +use anyhow::anyhow; use async_std::task; use futures::join; use lazy_static::lazy_static; @@ -111,8 +112,15 @@ async fn process(client: Client) -> anyhow::Result<()> { // handle errors from message processing if let Err(err) = handle(&client, buf).await { - error!("Unexpected error in message handler: {}", err); - client.send("Unexpected error")?; + let err = err.to_string(); + + // client disconnect e.g. using ctrl + c + if err.contains("Broken pipe") { + return Err(anyhow!("disconnected")); + } else { + error!("Unexpected error in message handler: {}", err); + client.send("Unexpected error")?; + } } client.flush()?; @@ -133,7 +141,7 @@ async fn start_tcp(host: String) -> anyhow::Result<()> { // add one to next id *CLIENT_NEXT.lock().unwrap() += 1; - task::spawn(async move { + thread::spawn(move || { // get id for the client and add one to next id let client = Client::new_tcp(stream, id); @@ -144,8 +152,15 @@ async fn start_tcp(host: String) -> anyhow::Result<()> { let span = span!(Level::ERROR, "TCP", id = client.id); let _enter = span.enter(); - if let Err(err) = process(client).await { - error!("{}", err); + if let Err(err) = task::block_on(process(client)) { + let err = err.to_string(); + + // client disconnect e.g. using ctrl + c + if err == "disconnected" { + info!("Client disconnected") + } else { + error!("{}", err); + } } // delete the client from CLIENTS map @@ -170,18 +185,27 @@ async fn start_websocket(host: String) -> anyhow::Result<()> { // add one to next id *CLIENT_NEXT.lock().unwrap() += 1; - task::spawn(async move { + thread::spawn(move || { let client = Client::new_websocket(stream, id).unwrap(); // insert the cloned client to CLIENTS CLIENTS.lock().unwrap().insert(id, client.clone()); // add span to logger - let span = span!(Level::ERROR, "UDP", id = client.id); + let span = span!(Level::ERROR, "WS", id = client.id); let _enter = span.enter(); - if let Err(err) = process(client).await { - error!("{}", err); + if let Err(err) = task::block_on(process(client)) { + let err = err.to_string(); + + // client disconnect e.g. using ctrl + c + if err == "disconnected" + || err.contains("Connection reset without closing handshake") + { + info!("Client disconnected") + } else { + error!("{}", err); + } } // delete the client from CLIENTS map