fix(server): change from task::spawn to thread::spawn

Changed to thread::spawn due to task::spawn spawn max 2 tasks (accept max 2 clients) if your cpu has 2 cores.
This commit is contained in:
MedzikUser 2022-09-14 21:03:49 +02:00
parent fd4121b0d2
commit b56357c085
No known key found for this signature in database
GPG Key ID: A5FAC1E185C112DB
2 changed files with 40 additions and 11 deletions

View File

@ -6,6 +6,7 @@ use std::{
sync::{Arc, Mutex},
};
use tracing::info;
use tungstenite::{accept, Message, WebSocket};
use super::run::PLUGINS_MANAGER;
@ -131,6 +132,8 @@ impl Client {
msg.pop();
}
info!("[Recieved]: {}", msg);
Ok(msg)
}
@ -150,10 +153,12 @@ impl Client {
match &self.stream {
ClientStream::TCP(stream) => stream.as_ref().write_all(buf)?,
ClientStream::WebSocket(stream) => {
stream.lock().unwrap().write_message(Message::from(msg))?
stream.lock().unwrap().write_message(Message::from(buf))?
},
}
info!("[Sent]: {}", msg);
Ok(())
}

View File

@ -1,5 +1,6 @@
use std::net::TcpListener;
use std::{net::TcpListener, thread};
use anyhow::anyhow;
use async_std::task;
use futures::join;
use lazy_static::lazy_static;
@ -111,8 +112,15 @@ async fn process(client: Client) -> anyhow::Result<()> {
// handle errors from message processing
if let Err(err) = handle(&client, buf).await {
error!("Unexpected error in message handler: {}", err);
client.send("Unexpected error")?;
let err = err.to_string();
// client disconnect e.g. using ctrl + c
if err.contains("Broken pipe") {
return Err(anyhow!("disconnected"));
} else {
error!("Unexpected error in message handler: {}", err);
client.send("Unexpected error")?;
}
}
client.flush()?;
@ -133,7 +141,7 @@ async fn start_tcp(host: String) -> anyhow::Result<()> {
// add one to next id
*CLIENT_NEXT.lock().unwrap() += 1;
task::spawn(async move {
thread::spawn(move || {
// get id for the client and add one to next id
let client = Client::new_tcp(stream, id);
@ -144,8 +152,15 @@ async fn start_tcp(host: String) -> anyhow::Result<()> {
let span = span!(Level::ERROR, "TCP", id = client.id);
let _enter = span.enter();
if let Err(err) = process(client).await {
error!("{}", err);
if let Err(err) = task::block_on(process(client)) {
let err = err.to_string();
// client disconnect e.g. using ctrl + c
if err == "disconnected" {
info!("Client disconnected")
} else {
error!("{}", err);
}
}
// delete the client from CLIENTS map
@ -170,18 +185,27 @@ async fn start_websocket(host: String) -> anyhow::Result<()> {
// add one to next id
*CLIENT_NEXT.lock().unwrap() += 1;
task::spawn(async move {
thread::spawn(move || {
let client = Client::new_websocket(stream, id).unwrap();
// insert the cloned client to CLIENTS
CLIENTS.lock().unwrap().insert(id, client.clone());
// add span to logger
let span = span!(Level::ERROR, "UDP", id = client.id);
let span = span!(Level::ERROR, "WS", id = client.id);
let _enter = span.enter();
if let Err(err) = process(client).await {
error!("{}", err);
if let Err(err) = task::block_on(process(client)) {
let err = err.to_string();
// client disconnect e.g. using ctrl + c
if err == "disconnected"
|| err.contains("Connection reset without closing handshake")
{
info!("Client disconnected")
} else {
error!("{}", err);
}
}
// delete the client from CLIENTS map