diff --git a/CHANGELOG.md b/CHANGELOG.md index 74df100..6d5c95e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ## [Unreleased] +## Chore +- **tcp**: use tcp from tokio instead of std ## [0.2.0] - 2022-06-26 ### Features diff --git a/plugin_test/src/lib.rs b/plugin_test/src/lib.rs index dd447dd..04fc864 100644 --- a/plugin_test/src/lib.rs +++ b/plugin_test/src/lib.rs @@ -39,7 +39,7 @@ impl Command for PluginTest { _args: Vec<&str>, _commands: &PluginManagerType, ) -> Result<()> { - client.send("content")?; + client.send("content").await?; Ok(()) } @@ -55,7 +55,7 @@ impl Event for PluginTest { /// Event function async fn execute(&self, client: &mut Client) -> Result<()> { - client.send(&format!("Welcome {}", client.stream.peer_addr().unwrap()))?; + client.send(&format!("Welcome {}", client.stream.peer_addr().unwrap())).await?; Ok(()) } diff --git a/src/commands/help.rs b/src/commands/help.rs index f3fae59..d5be081 100644 --- a/src/commands/help.rs +++ b/src/commands/help.rs @@ -24,7 +24,7 @@ impl Command for CommandHelp { plugin_manager: &PluginManagerType, ) -> Result<()> { for command in plugin_manager.commands.iter() { - client.send(&format!("{} - {}", command.name(), command.help()))?; + client.send(&format!("{} - {}", command.name(), command.help())).await?; } Ok(()) diff --git a/src/main.rs b/src/main.rs index f6d5581..6f9451b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,4 +1,4 @@ -use std::{fs::File, net::TcpListener}; +use std::fs::File; use clap::Parser; use log::{error, info, LevelFilter}; @@ -7,6 +7,7 @@ use servers::{ tcp::{handle_connection, handle_websocket, Client}, }; use simplelog::{ColorChoice, CombinedLogger, Config, TermLogger, TerminalMode, WriteLogger}; +use tokio::net::TcpListener; #[derive(Parser)] #[clap( @@ -88,7 +89,7 @@ async fn main() -> anyhow::Result<()> { /// Start tcp server async fn start_tcp_server(host: String, port: String) -> anyhow::Result<()> { // listen Tcp server - let listener = TcpListener::bind(format!("{host}:{port}"))?; + let listener = TcpListener::bind(format!("{host}:{port}")).await?; info!("Tcp server started at: {}", listener.local_addr()?); @@ -96,7 +97,7 @@ async fn start_tcp_server(host: String, port: String) -> anyhow::Result<()> { let plugin_manager = loader()?; // Accepts a new incoming connection from this listener. - while let Ok((stream, _address)) = listener.accept() { + while let Ok((stream, _address)) = listener.accept().await { let client = Client::new(stream); let plugin_manager = plugin_manager.clone(); diff --git a/src/plugins/mod.rs b/src/plugins/mod.rs index a5253c0..73a8e71 100644 --- a/src/plugins/mod.rs +++ b/src/plugins/mod.rs @@ -85,7 +85,7 @@ //! //! /// Command function //! async fn execute(&self, client: &mut Client, _args: Vec<&str>, _commands: &PluginManagerType) -> Result<()> { -//! client.send("Command executed!")?; +//! client.send("Command executed!").await?; //! //! Ok(()) //! } @@ -135,7 +135,8 @@ //! /// Event function //! async fn execute(&self, client: &mut Client) -> Result<()> { //! client -//! .send(&format!("Welcome {}", client.stream.peer_addr().unwrap()))?; +//! .send(&format!("Welcome {}", client.stream.peer_addr()?)) +//! .await?; //! //! Ok(()) //! } diff --git a/src/tcp/client.rs b/src/tcp/client.rs index fadd867..56555ab 100644 --- a/src/tcp/client.rs +++ b/src/tcp/client.rs @@ -1,13 +1,10 @@ #![allow(clippy::unused_io_amount)] +use tokio::{net::TcpStream, io::{self, AsyncWriteExt, AsyncReadExt}}; + /// Max size of a TCP packet pub const MAX_PACKET_LEN: usize = 65536; -use std::{ - io::{self, Read, Write}, - net::TcpStream, -}; - /// TCP Client pub struct Client { /// TCP stream of this client @@ -21,12 +18,12 @@ impl Client { } /// Read message/buffer from client - pub fn read(&mut self) -> anyhow::Result { + pub async fn read(&mut self) -> anyhow::Result { // allocate an empty buffer let mut buf = [0; MAX_PACKET_LEN]; // read buffer from stream - let len = self.stream.read(&mut buf)?; + let len = self.stream.read(&mut buf).await?; // select only used bytes from the buffer let recv_buf = &buf[0..len]; @@ -38,11 +35,11 @@ impl Client { } /// Send message to client - pub fn send(&mut self, content: &str) -> io::Result<()> { + pub async fn send(&mut self, content: &str) -> io::Result<()> { // add a new line at the end of the content let content = format!("{content}\n\r"); // send message - self.stream.write_all(content.as_bytes()) + self.stream.write_all(content.as_bytes()).await } } diff --git a/src/tcp/handle_connection.rs b/src/tcp/handle_connection.rs index b7f5b79..fef8e85 100644 --- a/src/tcp/handle_connection.rs +++ b/src/tcp/handle_connection.rs @@ -1,6 +1,5 @@ -use std::io::Write; - use log::{error, info, trace}; +use tokio::io::AsyncWriteExt; use crate::plugins::PluginManagerType; @@ -18,7 +17,7 @@ pub async fn handle_connection( loop { // read client message/buffer - let buf = client.read()?; + let buf = client.read().await?; // run `onSend` events from plugins check_event(&mut client, &plugin_manager, "onSend").await?; @@ -28,7 +27,7 @@ pub async fn handle_connection( // client sent an empty buffer if args.is_empty() { - client.send("empty buffer")?; + client.send("empty buffer").await?; // don't execute the following commands because it causes panic continue; @@ -52,7 +51,7 @@ pub async fn handle_connection( Err(err) => { error!("failed to execute command `{cmd}`, error message = `{err}`"); - client.send(&format!("error: {err}"))?; + client.send(&format!("error: {err}")).await?; } } @@ -62,7 +61,7 @@ pub async fn handle_connection( } // if an I/O or EOF error, abort the connection - if client.stream.flush().is_err() { + if client.stream.flush().await.is_err() { // terminate connection break; } @@ -88,7 +87,7 @@ async fn check_event( Err(err) => { error!("failed to execute event `{event_name}`, error message = `{err}`"); - client.send(&format!("error: {err}"))?; + client.send(&format!("error: {err}")).await?; } } }