Compare commits

...

55 Commits
v0.2.0 ... main

Author SHA1 Message Date
renovate[bot] d5b238f153 fix(deps): update rust crate async-trait to 0.1.63 2023-01-23 04:59:48 +00:00
renovate[bot] 91ab6cb607 fix(deps): update rust crate async-trait to 0.1.61 2023-01-09 04:20:42 +00:00
renovate[bot] de5e26bbf2 fix(deps): update all non-major dependencies 2022-12-19 03:30:40 +00:00
renovate[bot] 09f7d14bcc fix(deps): update all non-major dependencies 2022-12-12 03:57:24 +00:00
renovate[bot] ce546a0c69 fix(deps): update rust crate libloading to 0.7.4 2022-11-07 05:40:58 +00:00
renovate[bot] 619ed0b41d fix(deps): update rust crate clap to 3.2.23 2022-10-31 05:13:52 +00:00
renovate[bot] 5a103de793 fix(deps): update all non-major dependencies 2022-10-24 04:12:45 +00:00
renovate[bot] 0d4ff8041b fix(deps): update all non-major dependencies 2022-10-10 05:19:25 +00:00
renovate[bot] eb462622eb fix(deps): update all non-major dependencies 2022-09-19 03:37:23 +00:00
MedzikUser b56357c085
fix(server): change from task::spawn to thread::spawn
Changed to thread::spawn due to task::spawn spawn max 2 tasks (accept max 2 clients) if your cpu has 2 cores.
2022-09-14 21:03:49 +02:00
renovate[bot] fd4121b0d2 fix(deps): update all non-major dependencies 2022-09-05 05:42:10 +00:00
MedzikUser 032ec88ed6
chore: add some comments in code
Moved `PluginsManager` and `PluginsManagerType` from `servers::plugin::manager` to the `servers::plugin` module.
Added some comments in code.
Implemented function `into()` to the `PluginsManager` struct.
2022-08-22 13:23:08 +02:00
MedzikUser 501cbc74b7
fix(clippy): change `< 1` to .is_empty()
Replaced `args.len() < 1` with `args.is_empty()`.
2022-08-20 15:01:48 +02:00
MedzikUser db6e068926
chore(plugin): change to cdylib
Probably better.
2022-08-20 14:58:32 +02:00
MedzikUser 212223c568
ci(release): build binaries for gnu instead of musl
Musl currently doesn't support dylib.
2022-08-20 14:48:41 +02:00
MedzikUser d15ec0c93e
feat(command): add /broadcast
The `/broadcast` command has been added, which can send a message to all connected clients.
2022-08-19 22:27:42 +02:00
MedzikUser 9f66c2b9d2
fix(cli): update long arguments
Changed clap long attributes from --host to host and --port to port.
2022-08-19 12:16:02 +02:00
MedzikUser 95d0e15786
chore(release): v0.6.0 2022-08-17 22:07:12 +02:00
MedzikUser d31e0fff2f
fix(plugin): execute `on_load` function
Now function `on_load` will be executed if the plugin is loads.
Added span to the logger on TRACE level in plugin loader.
Fixed clippy warning from previous commit.
2022-08-17 22:05:27 +02:00
MedzikUser 67fb1a0a3c
feat(server): add `onCommand` event and handle errors in message processing
Added `OnCommand` event e.g. to disable command for client. (BREAKING CHANGES IN EVENT PLUGINS)
Added function for error handling in message process.
2022-08-17 21:44:06 +02:00
MedzikUser 56e16145f6
chore(release): v0.5.0 2022-08-17 15:45:36 +02:00
MedzikUser 7da5daf522
feat(client): HashMap add Mutex and functions
Changed map type in Client struct to Arc<Mutex<HashMap<String, ClientMapValue>>>.
Implemented functions insert_key, get_value and delete_key to the Client type.
Re-export servers::server::ClientMapValue in servers::plugins::prelude.
2022-08-17 15:40:11 +02:00
MedzikUser bf1c3c4092
chore(release): v0.4.2
- **fix**: update anyhow to v1.0.62
2022-08-17 11:56:29 +02:00
MedzikUser 5d2afe3719
ci(release): delete compress binaries 2022-08-17 11:47:26 +02:00
MedzikUser 83f2961aee
chore(release): v0.4.1 2022-08-16 23:35:31 +02:00
MedzikUser c51c0a2507
ci(fix): fix release 2022-08-16 23:34:47 +02:00
MedzikUser dd854ee6e5
chore(release): v0.4.0 2022-08-16 23:28:39 +02:00
MedzikUser 97b19ae413
feat(tcp client): add tcp-client 2022-08-16 23:24:21 +02:00
renovate[bot] 45a89f365a fix(deps): update all non-major dependencies 2022-08-15 03:18:35 +00:00
MedzikUser f27df56c47
feat(plugins): implement events
- Events: onConnect and onSend now works
2022-08-13 12:06:53 +02:00
MedzikUser 6f7edf3d30
fix(clippy): rename module to fix clippy warning 2022-08-12 23:16:13 +02:00
MedzikUser 84eed33e24
docs: comment code 2022-08-12 23:12:29 +02:00
MedzikUser 25c2f0baa3
fix(id): fix add one to next id
- Fixed add one to next ID
- Added command /id
- Added `id` field to Client struct
2022-08-12 22:52:47 +02:00
MedzikUser d0120a0703
refactor: rewrite
- Better Client struct (TCP and WebSocket in one type)
- WebSocket is now not a proxy to tcp
- Use async-std instead a tokio
- Use Arc and Client type now have a Clone derive
- Add global CLIENTS list
2022-08-12 22:32:22 +02:00
renovate[bot] 868671848b fix(deps): update all non-major dependencies 2022-08-08 03:36:25 +00:00
MedzikUser 5fa16c6705 fix(lint): clippy warnings 2022-08-04 16:39:55 +02:00
MedzikUser 45a9fe605d
release v0.3.0 2022-08-04 16:36:38 +02:00
renovate[bot] 093671ff8f fix(deps): update all non-major dependencies 2022-08-01 06:13:16 +00:00
MedzikUser fc5afe56a1
feat(tcp client): add fn `peer_addr` 2022-07-29 22:10:49 +02:00
MedzikUser 3fb0a1132a
chore: some changes (more in the commit description)
- added `/disconnect` commands
- moved logger init function to other file
- updated command description
- the `/help` command has been accelerated
- re-export `async_trait` so that it doesn't have to be added to dependencies in plugins
2022-07-29 21:55:21 +02:00
MedzikUser 6ad4afb146
chore(renovate): update config 2022-07-29 12:13:35 +02:00
MedzikUser e83a8360c2
chore: move struct Cli to other module 2022-07-29 12:12:13 +02:00
MedzikUser 796084d13e
chore: some changes
- add macros for Mutex (currently unused)
- change logger to tracing
2022-07-28 22:32:28 +02:00
renovate[bot] 2e3bb0cb43 fix(deps): update rust crate clap to 3.2.15 2022-07-25 23:10:17 +00:00
renovate[bot] c48d61f22b fix(deps): update rust crate tokio to 1.20.1 2022-07-25 16:02:05 +00:00
renovate[bot] 183602a69e fix(deps): update rust crate clap to 3.2.14 2022-07-21 05:12:13 +00:00
renovate[bot] 0f0a833dac fix(deps): update rust crate tokio to 1.20.0 2022-07-20 00:42:44 +00:00
renovate[bot] 6257dcfcfd fix(deps): update rust crate clap to 3.2.13 2022-07-19 22:58:45 +00:00
renovate[bot] e303383607 fix(deps): update rust crate tungstenite to 0.17.3 2022-07-13 20:54:13 +00:00
renovate[bot] 40108a20fb fix(deps): update rust crate tokio-tungstenite to 0.17.2 2022-07-13 18:18:11 +00:00
renovate[bot] e6802c6db8 fix(deps): update rust crate clap to 3.2.11 2022-07-13 18:14:16 +00:00
renovate[bot] 783a651e4f fix(deps): update rust crate clap to 3.2.10 2022-07-12 06:17:07 +00:00
MedzikUser f08093ecc7
chore(tcp): use tcp from tokio instead of std 2022-07-01 21:32:20 +02:00
renovate[bot] cb1ccc6fc5 fix(deps): update rust crate clap to 3.2.8 2022-06-30 18:26:06 +00:00
renovate[bot] 4b0b38755c fix(deps): update rust crate clap to 3.2.7 2022-06-28 17:48:04 +00:00
29 changed files with 1540 additions and 1082 deletions

View File

@ -17,8 +17,8 @@ jobs:
matrix:
target:
- x86_64-unknown-linux-musl
- aarch64-unknown-linux-musl
- x86_64-unknown-linux-gnu
- aarch64-unknown-linux-gnu
- x86_64-pc-windows-msvc
- x86_64-apple-darwin
- aarch64-apple-darwin
@ -26,21 +26,17 @@ jobs:
include:
- os: ubuntu-latest
target: x86_64-unknown-linux-musl
artifact_name: target/x86_64-unknown-linux-musl/release/servers
release_name: x86_64-unknown-linux-musl
target: x86_64-unknown-linux-gnu
artifact_name: target/x86_64-unknown-linux-gnu/release/servers
release_name: x86_64-unknown-linux-gnu
cross: true
strip: true
compress: true
cargo_flags: ""
- os: ubuntu-latest
target: aarch64-unknown-linux-musl
artifact_name: target/aarch64-unknown-linux-musl/release/servers
release_name: aarch64-unknown-linux-musl
target: aarch64-unknown-linux-gnu
artifact_name: target/aarch64-unknown-linux-gnu/release/servers
release_name: aarch64-unknown-linux-gnu
cross: true
strip: false
compress: true
cargo_flags: ""
- os: windows-latest
@ -48,8 +44,6 @@ jobs:
artifact_name: target/x86_64-pc-windows-msvc/release/servers.exe
release_name: x86_64-pc-windows-msvc.exe
cross: false
strip: true
compress: true
cargo_flags: ""
- os: macos-latest
@ -57,8 +51,6 @@ jobs:
artifact_name: target/x86_64-apple-darwin/release/servers
release_name: x86_64-apple-darwin
cross: false
strip: true
compress: true
cargo_flags: ""
- os: macos-latest
@ -66,8 +58,6 @@ jobs:
artifact_name: target/aarch64-apple-darwin/release/servers
release_name: aarch64-apple-darwin
cross: false
strip: true
compress: true
cargo_flags: ""
- os: ubuntu-latest
@ -75,8 +65,6 @@ jobs:
artifact_name: target/x86_64-unknown-freebsd/release/servers
release_name: x86_64-unknown-freebsd
cross: true
strip: false
compress: false
cargo_flags: ""
name: ${{ matrix.os }} for ${{ matrix.target }}
@ -99,24 +87,12 @@ jobs:
args: --release --target=${{ matrix.target }} ${{ matrix.cargo_flags }}
use-cross: ${{ matrix.cross }}
- name: Compress binaries
uses: svenstaro/upx-action@v2
with:
file: ${{ matrix.artifact_name }}
args: --lzma
strip: ${{ matrix.strip }}
if: ${{ matrix.compress }}
- name: Upload artifact
uses: actions/upload-artifact@v3
with:
name: ${{ matrix.target }}
path: ${{ matrix.artifact_name }}
###
# Below this line, steps will only be ran if a tag was pushed.
###
- name: Get tag name
id: tag_name
run: |
@ -124,14 +100,6 @@ jobs:
shell: bash
if: startsWith(github.ref, 'refs/tags/v')
- name: Get CHANGELOG.md entry
id: changelog_reader
uses: mindsers/changelog-reader-action@v1
with:
version: ${{ steps.tag_name.outputs.current_version }}
path: ./CHANGELOG.md
if: startsWith(github.ref, 'refs/tags/v')
- name: Publish
uses: svenstaro/upload-release-action@v2
with:

View File

@ -1,35 +0,0 @@
# Changelog
All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/)
and this project adheres to [Semantic Versioning](http://semver.org/).
<!-- next-header -->
## [Unreleased]
## [0.2.0] - 2022-06-26
### Features
- **plugins**: add `Result<()>` in `fn execute()` (Event and Command traits)
- **websocket**: WS Client <-> TCP Proxy (default port 9998) <-> TCP (default port 9999)
### Chore
- **deps**: upgrade
## [0.1.0] - 2022-06-17
### Default commands
- help
### Dynamic plugins loader
You can create custom commands and events (events executed if client connected or send message)
### Cli
You set custom host and port `./servers --host 0.0.0.0 --port 9999`
Show cli help `./servers --help`
<!-- next-url -->
[Unreleased]: https://github.com/MedzikUser/servers/compare/v0.2.0...HEAD
[0.2.0]: https://github.com/MedzikUser/servers/commits/v0.2.0
[0.1.0]: https://github.com/MedzikUser/servers/commits/v0.1.0

824
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -1,30 +1,24 @@
[workspace]
members = ["plugin_test"]
resolver = "2"
[package]
name = "servers"
description = "Simple TCP server for clients written in Rust with plugins support"
version = "0.2.0"
license = "MIT"
authors = ["MedzikUser <nivua1fn@duck.com>"]
version = "0.6.0"
description = "TCP and WebSocket server for Clients written in Rust"
homepage = "https://github.com/MedzikUser/servers"
repository = "https://github.com/MedzikUser/servers.git"
license = "MIT"
edition = "2021"
[profile.release]
lto = true
opt-level = 'z'
codegen-units = 1
[dependencies]
anyhow = "1.0.58"
async-trait = "0.1.56"
better-panic = "0.3.0"
libloading = "0.7.3"
simplelog = "0.12.0"
tokio-tungstenite = "0.17.1"
tungstenite = "0.17.2"
clap = { version = "3.2.6", features = ["derive"] }
futures-util = { version = "0.3", default-features = false, features = ["sink", "std"] }
log = { version = "0.4.17", features = ["release_max_level_info", "max_level_debug"] }
tokio = { version = "1.19.2", features = ["rt-multi-thread", "macros", "net"] }
anyhow = "1.0.68"
async-std = { version = "1.12.0", features = ["attributes"] }
async-trait = "0.1.63"
clap = { version = "3.2.23", features = ["derive"] }
libloading = "0.7.4"
tracing = "0.1.37"
tracing-subscriber = "0.3.16"
tungstenite = "0.18.0"
futures = "0.3.25"
lazy_static = "1.4.0"

View File

@ -1,3 +1,2 @@
# NOTE: Custom image specification for freebsd is required until new version of cross is released.
[target.x86_64-unknown-freebsd]
image = "svenstaro/cross-x86_64-unknown-freebsd:latest"

View File

@ -4,8 +4,7 @@ version = "0.1.0"
edition = "2021"
[lib]
crate-type = ["dylib"]
crate-type = ["cdylib"]
[dependencies]
async-trait = "0.1.56"
servers = { path = ".." }

View File

@ -1,70 +1,55 @@
use async_trait::async_trait;
use servers::{
plugins::{Command, Event, Plugin, PluginManagerType, Registrar, Result},
tcp::Client,
};
use servers::plugins::prelude::*;
struct PluginTest;
/// Create a new plugin
#[async_trait]
impl Plugin for PluginTest {
/// Name of the plugin.
fn name(&self) -> &'static str {
"test"
"test_plugin"
}
/// A function will be executed when plugin loading.
/// Usally used for initialization.
async fn on_plugin_load(&self) {}
/// A function that will be executed when the plugin is loaded.
async fn on_load(&self) {}
}
/// Create a new command
#[async_trait]
impl Command for PluginTest {
/// Command name
/// Name of the command.
fn name(&self) -> &'static str {
"/test"
}
/// Help message of the command
fn help(&self) -> &'static str {
"test command"
/// Aliases for the command.
fn aliases(&self) -> Vec<&'static str> {
Vec::new()
}
/// Command function
async fn execute(
&self,
client: &mut Client,
_args: Vec<&str>,
_commands: &PluginManagerType,
) -> Result<()> {
client.send("content")?;
Ok(())
/// Help message of the command.
fn help(&self) -> &'static str {
"Test commend loaded from dylib"
}
/// Usage message of the command.
fn usage(&self) -> &'static str {
"/test"
}
/// Command function.
async fn execute(&self, client: &Client, _args: Vec<&str>) -> anyhow::Result<()> {
client.send("successful executed command from dylib")
}
}
/// Create a new event
#[async_trait]
impl Event for PluginTest {
/// Event name (onConnect or onSend)
fn name(&self) -> &'static str {
"onConnect"
fn event(&self) -> EventType {
EventType::OnConnect
}
/// Event function
async fn execute(&self, client: &mut Client) -> Result<()> {
client.send(&format!("Welcome {}", client.stream.peer_addr().unwrap()))?;
Ok(())
async fn execute(&self, client: &Client, _data: EventData) -> anyhow::Result<()> {
client.send("Hello!")
}
}
/// Register plugin
#[no_mangle]
pub fn plugin_entry(registrar: &mut dyn Registrar) {
registrar.register_plugin(Box::new(PluginTest));
registrar.register_command(Box::new(PluginTest));
registrar.register_event(Box::new(PluginTest));
registrar.register_plugins(Box::new(PluginTest));
registrar.register_commands(Box::new(PluginTest));
registrar.register_events(Box::new(PluginTest));
}

View File

@ -1,8 +1,14 @@
{
"$schema": "https://docs.renovatebot.com/renovate-schema.json",
"extends": [
"config:base"
"config:base",
"schedule:weekly",
"group:allNonMajor",
":semanticCommits"
],
"labels": [
"dependencies"
],
"prHourlyLimit": 0,
"automergeType": "pr",
"prCreation": "immediate",
"packageRules": [

11
rustfmt.toml Normal file
View File

@ -0,0 +1,11 @@
# https://rust-lang.github.io/rustfmt
# stable
edition = "2021"
newline_style = "Unix"
match_block_trailing_comma = true
# nightly
group_imports = "StdExternalCrate"
imports_granularity = "Crate"
format_code_in_doc_comments = true

91
src/bin/tcp-client.rs Normal file
View File

@ -0,0 +1,91 @@
use std::{
io::{self, Read, Write},
net::TcpStream,
sync::Arc,
thread,
};
use clap::Parser;
use servers::server::MAX_PACKET_LEN;
#[derive(Debug, Parser)]
#[clap(
name = "tcp-client",
version = env!("CARGO_PKG_VERSION"),
about = env!("CARGO_PKG_DESCRIPTION")
)]
struct Cli {
#[clap(
short = 'i',
long = "host",
help = "Server host",
default_value = "0.0.0.0",
display_order = 1
)]
host: String,
#[clap(
short = 'p',
long = "port",
help = "Server port",
default_value = "9999",
display_order = 2
)]
port: String,
}
fn main() -> anyhow::Result<()> {
let args = Cli::parse();
let addr = format!("{}:{}", args.host, args.port);
println!("Connecting to {}...", addr);
let stream = TcpStream::connect(addr)?;
println!("Connected!");
let stream = Arc::new(stream);
let reader = stream.clone();
let writer = stream;
// read the output from the server to write to the stdout
thread::spawn(move || {
let mut buf = [0; MAX_PACKET_LEN];
// read buffer from the server
while let Ok(buf_len) = reader.as_ref().read(&mut buf) {
// ignore unused bytes
let buf = &buf[0..buf_len];
// decode buffer from &[u8] to a String
let mut buf_str = String::from_utf8(buf.to_vec()).unwrap();
// delete new line characters from the buffer
buf_str = buf_str.replace('\n', "");
buf_str = buf_str.replace('\r', "");
println!("{}", buf_str);
}
});
// create a new stdin handler
let stdin = io::stdin();
// send command from stdin
loop {
let mut buf = String::new();
// read buffer from stdin
stdin.read_line(&mut buf)?;
// remove new line characters
while buf.ends_with('\n') || buf.ends_with('\r') {
buf.pop();
}
// send the buffer to the server
writer.as_ref().write_all(buf.as_bytes())?;
}
}

54
src/commands/broadcast.rs Normal file
View File

@ -0,0 +1,54 @@
use async_std::task;
use crate::{plugins::prelude::*, CLIENTS};
pub struct Broadcast;
#[async_trait]
impl Command for Broadcast {
fn name(&self) -> &'static str {
"/broadcast"
}
fn aliases(&self) -> Vec<&'static str> {
vec![]
}
fn help(&self) -> &'static str {
"Send message to all connected clients"
}
fn usage(&self) -> &'static str {
"/broadcast <message>"
}
async fn execute(&self, client: &Client, args: Vec<&str>) -> anyhow::Result<()> {
if args.is_empty() || args.join(" ").is_empty() {
client.send("Missing message")?;
return Ok(());
}
let msg = args.join(" ");
let mut children = Vec::new();
// send message to all connected clients
for (_i, client) in CLIENTS.lock().unwrap().clone() {
let msg = msg.clone();
let child = task::spawn(async move {
client
.send(msg)
.expect("failed to send broadcast message to client")
});
children.push(child);
}
// wait for all task to complete
for child in children {
child.await;
}
Ok(())
}
}

View File

@ -0,0 +1,26 @@
use crate::plugins::prelude::*;
pub struct Disconnect;
#[async_trait]
impl Command for Disconnect {
fn name(&self) -> &'static str {
"/disconnect"
}
fn aliases(&self) -> Vec<&'static str> {
vec!["/close", "/exit"]
}
fn help(&self) -> &'static str {
"Close the connection"
}
fn usage(&self) -> &'static str {
"/disconnect"
}
async fn execute(&self, client: &Client, _args: Vec<&str>) -> anyhow::Result<()> {
client.close()
}
}

View File

@ -1,32 +1,45 @@
use async_trait::async_trait;
use crate::plugins::prelude::*;
use crate::{
plugins::{Command, PluginManagerType, Result},
tcp::Client,
};
pub struct CommandHelp;
pub struct Help;
#[async_trait]
impl Command for CommandHelp {
impl Command for Help {
fn name(&self) -> &'static str {
"/help"
}
fn help(&self) -> &'static str {
"show help"
fn aliases(&self) -> Vec<&'static str> {
vec!["/h", "/?", "?"]
}
async fn execute(
&self,
client: &mut Client,
_args: Vec<&str>,
plugin_manager: &PluginManagerType,
) -> Result<()> {
for command in plugin_manager.commands.iter() {
client.send(&format!("{} - {}", command.name(), command.help()))?;
fn help(&self) -> &'static str {
"Show commands help menu"
}
fn usage(&self) -> &'static str {
"/help"
}
async fn execute(&self, client: &Client, _args: Vec<&str>) -> anyhow::Result<()> {
let mut msg = Vec::new();
for cmd in client.plugins_manager.commands.iter() {
let aliases = cmd.aliases();
let aliases = if !aliases.is_empty() {
cmd.aliases().join(", ")
} else {
"none".to_string()
};
msg.push(format!(
"{name} - {help} (Aliases: {aliases})",
name = cmd.name(),
help = cmd.help(),
aliases = aliases,
))
}
Ok(())
client.send(msg.join("\n"))
}
}

26
src/commands/id.rs Normal file
View File

@ -0,0 +1,26 @@
use crate::plugins::prelude::*;
pub struct Id;
#[async_trait]
impl Command for Id {
fn name(&self) -> &'static str {
"/id"
}
fn aliases(&self) -> Vec<&'static str> {
Vec::new()
}
fn help(&self) -> &'static str {
"Get id of the client"
}
fn usage(&self) -> &'static str {
"/id"
}
async fn execute(&self, client: &Client, _args: Vec<&str>) -> anyhow::Result<()> {
client.send(client.id)
}
}

View File

@ -1,11 +1,25 @@
//! Build-in commands
//! Default servers commands.
//!
//! List of commands:
//! - /broadcast
//! - /disconnect
//! - /help
//! - /id
mod broadcast;
mod disconnect;
mod help;
mod id;
use crate::plugins::Command;
use self::{broadcast::Broadcast, disconnect::Disconnect, help::Help, id::Id};
use crate::plugins::prelude::*;
/// Register build-in commands
/// Register default commands
pub fn register_commands() -> Vec<Box<dyn Command>> {
// create array with build-in commands
vec![Box::new(help::CommandHelp)]
vec![
Box::new(Broadcast),
Box::new(Disconnect),
Box::new(Help),
Box::new(Id),
]
}

View File

@ -1,27 +1,16 @@
//! # Servers - Simple TCP and WebSocket server
//!
//! [image]: https://socialify.git.ci/MedzikUser/servers/image?description=1&font=KoHo&language=1&owner=1&pattern=Circuit%20Board&theme=Light
//!
//! [![image]](https://github.com/MedzikUser/servers)
//!
//! ## 👨‍💻 Building
//!
//! First clone the repository: `git clone https://github.com/MedzikUser/servers.git`
//!
//! ### Requirements
//! - Rust
//!
//! To build run the command: `cargo build --release`
//!
//! The compiled binary can be found in `./target/release/servers`
//!
//! ## Writing plugins
//!
//! Go to [plugins](plugins) module
use std::{collections::HashMap, sync::Mutex};
#![doc(html_root_url = "https://servers.medzik.xyz")]
#![warn(missing_docs)]
use lazy_static::lazy_static;
use crate::server::Client;
pub mod commands;
pub mod plugins;
pub mod tcp;
pub mod server;
lazy_static! {
/// List with all connected clients
pub static ref CLIENTS: Mutex<HashMap<usize, Client>> = Mutex::new(HashMap::new());
/// Next ID of the client to be add to [CLIENTS]
pub static ref CLIENT_NEXT: Mutex<usize> = Mutex::new(0);
}

View File

@ -1,140 +1,46 @@
use std::{fs::File, net::TcpListener};
use clap::Parser;
use log::{error, info, LevelFilter};
use servers::{
plugins::loader,
tcp::{handle_connection, handle_websocket, Client},
};
use simplelog::{ColorChoice, CombinedLogger, Config, TermLogger, TerminalMode, WriteLogger};
use servers::server;
#[derive(Parser)]
#[derive(Debug, Parser)]
#[clap(
name = "servers",
about = "A simple TCP server for client which can be extended with plugins."
name = env!("CARGO_PKG_NAME"),
version = env!("CARGO_PKG_VERSION"),
about = env!("CARGO_PKG_DESCRIPTION")
)]
struct Cli {
#[clap(
short = 'h',
short = 'i',
long = "host",
help = "Server host",
default_value = "0.0.0.0",
help = "Tcp server host",
display_order = 1
)]
host: String,
#[clap(
short = 'p',
long = "port",
short = 't',
long = "tcp-port",
help = "TCP server port",
default_value = "9999",
help = "Tcp server port [set 0 to random]",
display_order = 2
)]
port: String,
tcp_port: u16,
#[clap(
short = 'w',
long = "ws-port",
long = "websocket-port",
help = "WebSocket server port",
default_value = "9998",
help = "WebSocket server port [set 0 to random]",
display_order = 3
)]
ws_port: String,
#[clap(
long = "disable-websocket",
help = "Disable WebSocket proxy to Tcp",
display_order = 4
)]
ws_disable: bool,
ws_port: u16,
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// init better panic
better_panic::install();
// init logger
CombinedLogger::init(vec![
TermLogger::new(
LevelFilter::Trace,
Config::default(),
TerminalMode::Mixed,
ColorChoice::Auto,
),
WriteLogger::new(
LevelFilter::Debug,
Config::default(),
File::create("server.log").unwrap(),
),
])?;
fn main() {
tracing_subscriber::fmt().init();
// parse cli args
let cli = Cli::parse();
let args = Cli::parse();
// if enabled start WebSocket server
if !cli.ws_disable {
tokio::spawn(start_ws_server(
cli.host.clone(),
cli.ws_port,
cli.port.clone(),
));
}
let tcp_host = format!("{host}:{port}", host = args.host, port = args.tcp_port);
let ws_host = format!("{host}:{port}", host = args.host, port = args.ws_port);
// start tcp server
start_tcp_server(cli.host, cli.port).await?;
Ok(())
}
/// Start tcp server
async fn start_tcp_server(host: String, port: String) -> anyhow::Result<()> {
// listen Tcp server
let listener = TcpListener::bind(format!("{host}:{port}"))?;
info!("Tcp server started at: {}", listener.local_addr()?);
// load plugins, commands and events
let plugin_manager = loader()?;
// Accepts a new incoming connection from this listener.
while let Ok((stream, _address)) = listener.accept() {
let client = Client::new(stream);
let plugin_manager = plugin_manager.clone();
// handle client connection in new thread
tokio::spawn(async move {
let ip = client.stream.peer_addr().unwrap();
match handle_connection(client, plugin_manager).await {
Ok(_) => (),
Err(err) => error!("Client {}, {}", ip, err),
}
});
}
// server for a unexpectedly reason be terminated
panic!("Server unexpectedly terminated!")
}
/// Start WebSocket server
async fn start_ws_server(host: String, port: String, tcp_port: String) -> anyhow::Result<()> {
// listen Tcp server
let listener = tokio::net::TcpListener::bind(format!("{host}:{port}")).await?;
info!("WebSocket server started at: {}", listener.local_addr()?);
// Accepts a new incoming connection from this listener.
while let Ok((stream, _address)) = listener.accept().await {
let tcp_port = tcp_port.clone();
tokio::spawn(async {
let ip = stream.peer_addr().unwrap();
match handle_websocket(stream, tcp_port).await {
Ok(_) => (),
Err(err) => error!("Client {}, {}", ip, err),
}
});
}
// server for a unexpectedly reason be terminated
panic!("Server unexpectedly terminated!")
server::run(tcp_host, ws_host).expect("failed to start tcp server");
}

64
src/plugins/load.rs Normal file
View File

@ -0,0 +1,64 @@
use std::{fs, path::Path};
use async_std::task;
use libloading::{Library, Symbol};
use tracing::{info, span, trace, Level};
use crate::{
commands,
plugins::{
manager::{PluginsManager, PluginsManagerType},
prelude::*,
},
};
/// Load all plugins, commands and events.
pub fn loader(plugins_dir: &str) -> anyhow::Result<PluginsManagerType> {
// if plugins directory doesn't exists, create it
if !Path::new(plugins_dir).exists() {
fs::create_dir_all(plugins_dir)?;
}
// get all files from the plugins directory
let plugins_files = fs::read_dir(plugins_dir)?;
// init a plugins manager
let mut plugins_manager = PluginsManager::new();
// register default commands
plugins_manager.commands = commands::register_commands();
for plugin_path in plugins_files {
let path = plugin_path?.path();
let path_str = path.to_str().unwrap();
// add span to logger
let span = span!(Level::TRACE, "", plugin_path = path_str);
let _enter = span.enter();
info!("Loading plugin {}", path_str);
// loading library from .so is unsafe
unsafe {
// Box::new and Box::leak must be there because
// if it isn't there it throws an segmentation fault
let lib = Box::leak(Box::new(Library::new(&path)?));
trace!("Finding symbol `plugin_entry` in {}", path_str);
let func: Symbol<unsafe extern "C" fn(&mut dyn Registrar) -> ()> =
lib.get(b"plugin_entry")?;
// execute the function `plugin_entry` to load the plugin (possible segmentation fault)
trace!("Running function `plugin_entry` from plugin {}", path_str);
func(&mut plugins_manager);
}
}
for plugin in plugins_manager.plugins.iter() {
// execute the `on_load` function from the plugin
task::block_on(async { plugin.on_load().await });
info!("Loaded plugin {}.", plugin.name());
}
Ok(plugins_manager.into())
}

View File

@ -1,53 +0,0 @@
use std::{fs, sync::Arc};
use libloading::{Library, Symbol};
use log::{debug, trace};
use crate::{commands, plugins::Registrar};
use super::{PluginManager, PluginManagerType};
/// Plugins and Commands loader
pub fn loader() -> anyhow::Result<PluginManagerType> {
// get path to .so lib from command argument
let config_dir = "./plugins";
let paths = fs::read_dir(config_dir)?;
// create a plugin manager
let mut plugin_manager = PluginManager::new();
// register default commands
for command in commands::register_commands() {
plugin_manager.commands.push(command)
}
// for all plugin in directory
for path in paths {
// get library file path
let path = path?.path();
let plugin_path = path.to_str().unwrap_or("unknown");
// log debug info
debug!("Loading plugin `{}`", plugin_path);
// loading library with .so is unsafe
unsafe {
// load library
// Box::new and Box::leak must be there because if it isn't there it throws a segmentation fault
let lib = Box::leak(Box::new(Library::new(&path)?));
// get `plugin_entry` from library
trace!("Finding symbol `plugin_entry` in `{}`", plugin_path);
let func: Symbol<unsafe extern "C" fn(&mut dyn Registrar) -> ()> =
lib.get(b"plugin_entry")?;
// execute initial plugin function
trace!("Running `plugin_entry(...)` in plugin `{}`", plugin_path);
func(&mut plugin_manager);
}
}
// return a `PluginManager`
Ok(Arc::new(plugin_manager))
}

44
src/plugins/manager.rs Normal file
View File

@ -0,0 +1,44 @@
use core::fmt;
use std::sync::Arc;
use crate::plugins::prelude::*;
/// Plugins manager struct with Clone derive added by Arc.
pub type PluginsManagerType = Arc<PluginsManager>;
/// A plugins manager that stores all plugins, commands and events.
#[derive(Default)]
pub struct PluginsManager {
/// Vector with all loaded plugins.
pub plugins: Vec<Box<dyn Plugin>>,
/// Vector with all loaded commands.
pub commands: Vec<Box<dyn Command>>,
/// Vector with all loaded events.
pub events: Vec<Box<dyn Event>>,
}
impl PluginsManager {
/// Returns an empty instance of [PluginsManager]
pub fn new() -> Self {
Self {
plugins: Vec::new(),
commands: Vec::new(),
events: Vec::new(),
}
}
/// Returns the instance in [PluginsManagerType].
pub fn into(self) -> PluginsManagerType {
Arc::new(self)
}
}
impl fmt::Debug for PluginsManager {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("PluginsManager")
.field("plugins", &self.plugins.len())
.field("commands", &self.commands.len())
.field("events", &self.events.len())
.finish()
}
}

View File

@ -1,164 +1,20 @@
//! Plugins loader
//!
//! # Writing a plugins
//!
//! Create a new project `cargo new --lib plugin`
//!
//! Set a `crate-type` in Cargo.toml (to build a `.so` plugin)
//!
//! ```toml
//! [lib]
//! crate-type = ["dylib"]
//! ```
//!
//! Add a `servers` and `async-trait` dependencies to Cargo.toml
//!
//! ```toml
//! [dependencies]
//! async-trait = "0.1.56"
//! servers = "0.1.0"
//! ```
//!
//! In file `src/lib.rs`
//!
//! ```no_run
//! use async_trait::async_trait;
//! use servers::{plugins::{Plugin, Registrar}, tcp::Client};
//!
//! struct PluginTest;
//!
//! /// Create a new plugin
//! #[async_trait]
//! impl Plugin for PluginTest {
//! /// Name of the plugin.
//! fn name(&self) -> &'static str {
//! "test"
//! }
//!
//! /// A function will be executed when plugin loading.
//! /// Usally used for initialization.
//! async fn on_plugin_load(&self) {}
//! }
//!
//! /// Register plugin
//! #[no_mangle]
//! pub fn plugin_entry(registrar: &mut dyn Registrar) {
//! registrar.register_plugin(Box::new(PluginTest));
//! }
//! ```
//!
//! ## Add command
//!
//! ```no_run
//! use async_trait::async_trait;
//! use servers::{
//! plugins::{Command, PluginManagerType, Registrar, Result},
//! tcp::Client,
//! };
//! #
//! # struct PluginTest;
//! #
//! # #[async_trait]
//! # impl servers::plugins::Plugin for PluginTest {
//! # /// Name of the plugin.
//! # fn name(&self) -> &'static str {
//! # "test"
//! # }
//! #
//! # /// A function will be executed when plugin loading.
//! # /// Usally used for initialization.
//! # async fn on_plugin_load(&self) {}
//! # }
//!
//! /// Create a new command
//! #[async_trait]
//! impl Command for PluginTest {
//! /// Command name
//! fn name(&self) -> &'static str {
//! "/test"
//! }
//!
//! /// Help message of the command
//! fn help(&self) -> &'static str {
//! "test command"
//! }
//!
//! /// Command function
//! async fn execute(&self, client: &mut Client, _args: Vec<&str>, _commands: &PluginManagerType) -> Result<()> {
//! client.send("Command executed!")?;
//!
//! Ok(())
//! }
//! }
//!
//! /// Register plugin
//! #[no_mangle]
//! pub fn plugin_entry(registrar: &mut dyn Registrar) {
//! # registrar.register_plugin(Box::new(PluginTest));
//! registrar.register_command(Box::new(PluginTest));
//! }
//! ```
//!
//! ## Add event
//!
//! In file `src/lib.rs`
//!
//! ```no_run
//! use async_trait::async_trait;
//! use servers::{
//! plugins::{Event, Registrar, Result},
//! tcp::Client,
//! };
//! #
//! # struct PluginTest;
//! #
//! # #[async_trait]
//! # impl servers::plugins::Plugin for PluginTest {
//! # /// Name of the plugin.
//! # fn name(&self) -> &'static str {
//! # "test"
//! # }
//! #
//! # /// A function will be executed when plugin loading.
//! # /// Usally used for initialization.
//! # async fn on_plugin_load(&self) {}
//! # }
//!
//! /// Create a new event
//! #[async_trait]
//! impl Event for PluginTest {
//! /// Event name (onConnect or onSend)
//! fn name(&self) -> &'static str {
//! "onConnect"
//! }
//!
//! /// Event function
//! async fn execute(&self, client: &mut Client) -> Result<()> {
//! client
//! .send(&format!("Welcome {}", client.stream.peer_addr().unwrap()))?;
//!
//! Ok(())
//! }
//! }
//!
//! /// Register plugin
//! #[no_mangle]
//! pub fn plugin_entry(registrar: &mut dyn Registrar) {
//! # registrar.register_plugin(Box::new(PluginTest));
//! registrar.register_event(Box::new(PluginTest));
//! }
//! ```
//!
//! ## Build plugin
//!
//! To build plugin run command: `cargo build --release`
//!
//! The compiled plugin can be found in `target/release/libplugin.so`
//!
//! Move (or create a symlink) the built plugin to the `plugin/` directory in the server root directory.
//! Plugin infrastructure.
mod loader;
mod types;
mod load;
mod manager;
pub mod types;
pub use loader::*;
pub use types::*;
pub use load::*;
pub use manager::*;
/// Crates and types required in plugins.
pub mod prelude {
use super::*;
pub extern crate anyhow;
pub extern crate async_std;
pub use async_trait::async_trait;
pub use self::types::*;
pub use crate::server::{Client, ClientMapValue};
}

View File

@ -1,20 +1,18 @@
use std::{any::Any, sync::Arc};
//! Types used for creating plugins.
use std::any::Any;
use async_trait::async_trait;
use crate::tcp::Client;
use crate::{plugins::manager::PluginsManager, server::Client};
/// Custom Result alias, imported from [anyhow::Result].
pub type Result<T> = anyhow::Result<T>;
/// A plugin wich allows you to add extra functionality.
/// A main plugin trait.
#[async_trait]
pub trait Plugin: Any + Send + Sync {
/// Name of the plugin.
fn name(&self) -> &'static str;
/// A function will be executed when plugin loading.
/// Usally used for initialization.
async fn on_plugin_load(&self);
/// A function that will be executed when the plugin is loaded.
async fn on_load(&self);
}
/// Add a command to the plugin.
@ -22,76 +20,65 @@ pub trait Plugin: Any + Send + Sync {
pub trait Command: Any + Send + Sync {
/// Name of the command.
fn name(&self) -> &'static str;
/// Aliases for the command.
fn aliases(&self) -> Vec<&'static str>;
/// Help message of the command.
fn help(&self) -> &'static str;
/// Command function
async fn execute(
&self,
client: &mut Client,
args: Vec<&str>,
plugin_manager: &PluginManagerType,
) -> Result<()>;
/// Usage message of the command.
fn usage(&self) -> &'static str;
/// Command function.
async fn execute(&self, client: &Client, args: Vec<&str>) -> anyhow::Result<()>;
}
/// Add a new function that will be executed when the event occurs.
/// All possible to run events.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum EventType {
/// On client connected.
OnConnect,
/// On client sent message.
OnSend,
/// Event executed before command execute (e.g. for disable command).
OnCommand,
}
/// All possible to run events.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum EventData {
/// for `onCommand` event
Command(String),
/// No data
None,
}
/// Add a event to the plugin.
#[async_trait]
pub trait Event: Any + Send + Sync {
/// Event name (onConnect or onSend)
fn name(&self) -> &'static str;
/// Event function
async fn execute(&self, client: &mut Client) -> Result<()>;
/// Type of the event.
fn event(&self) -> EventType;
/// Event function.
async fn execute(&self, client: &Client, data: EventData) -> anyhow::Result<()>;
}
/// Plugin Manager with all plugins features.
pub struct PluginManager {
/// Array with loaded plugins.
pub plugins: Vec<Box<dyn Plugin>>,
/// Array with all commands.
pub commands: Vec<Box<dyn Command>>,
/// Array with all events.
pub events: Vec<Box<dyn Event>>,
}
impl PluginManager {
/// Create an empty [PluginManager]
pub fn new() -> Self {
Self {
plugins: Vec::new(),
commands: Vec::new(),
events: Vec::new(),
}
}
}
impl Default for PluginManager {
fn default() -> Self {
Self::new()
}
}
/// Arc type of the [PluginManager].
pub type PluginManagerType = Arc<PluginManager>;
/// Plugin Registrar
/// A plugin registrar trait.
pub trait Registrar {
/// Function to register the plugin
fn register_plugin(&mut self, plugin: Box<dyn Plugin>);
/// Function to register the command
fn register_command(&mut self, command: Box<dyn Command>);
/// Function to register the event
fn register_event(&mut self, event: Box<dyn Event>);
/// Function to register plugins.
fn register_plugins(&mut self, plugin: Box<dyn Plugin>);
/// Function to register commands.
fn register_commands(&mut self, command: Box<dyn Command>);
/// Function to register events.
fn register_events(&mut self, event: Box<dyn Event>);
}
impl Registrar for PluginManager {
fn register_plugin(&mut self, plugin: Box<dyn Plugin>) {
impl Registrar for PluginsManager {
fn register_plugins(&mut self, plugin: Box<dyn Plugin>) {
self.plugins.push(plugin)
}
fn register_command(&mut self, command: Box<dyn Command>) {
fn register_commands(&mut self, command: Box<dyn Command>) {
self.commands.push(command)
}
fn register_event(&mut self, event: Box<dyn Event>) {
fn register_events(&mut self, event: Box<dyn Event>) {
self.events.push(event)
}
}

232
src/server/client.rs Normal file
View File

@ -0,0 +1,232 @@
use std::{
collections::HashMap,
fmt,
io::{Read, Write},
net::{Shutdown, SocketAddr, TcpStream},
sync::{Arc, Mutex},
};
use tracing::info;
use tungstenite::{accept, Message, WebSocket};
use super::run::PLUGINS_MANAGER;
use crate::plugins::{
prelude::{EventData, EventType},
PluginsManagerType,
};
/// Max length of a TCP and UDP packet
pub const MAX_PACKET_LEN: usize = 65536;
/// Client struct
#[derive(Debug, Clone)]
pub struct Client {
/// ID of the client
pub id: usize,
/// Connection stream of the client
pub stream: ClientStream,
/// Custom Client Map
pub map: Arc<Mutex<HashMap<String, ClientMapValue>>>,
/// Plugins Manager
pub plugins_manager: PluginsManagerType,
}
// impl Drop for Client {
// fn drop(&mut self) {
// if let Err(err) = self.close() {
// error!("Failed to close client connection: {err}");
// }
// }
// }
/// Value type of the client map entry
#[derive(Debug, Clone)]
pub enum ClientMapValue {
String(String),
Array(Vec<String>),
Bool(bool),
Int(isize),
UInt(usize),
}
/// Connection stream of the client
#[derive(Debug, Clone)]
pub enum ClientStream {
/// TCP stream
TCP(Arc<TcpStream>),
/// WebSocket stream
WebSocket(Arc<Mutex<WebSocket<TcpStream>>>),
}
impl From<TcpStream> for Client {
fn from(stream: TcpStream) -> Self {
Self {
id: 0,
stream: ClientStream::TCP(Arc::new(stream)),
map: Arc::new(Mutex::new(HashMap::new())),
plugins_manager: PLUGINS_MANAGER.clone(),
}
}
}
impl From<WebSocket<TcpStream>> for Client {
fn from(stream: WebSocket<TcpStream>) -> Self {
Self {
id: 0,
stream: ClientStream::WebSocket(Arc::new(Mutex::new(stream))),
map: Arc::new(Mutex::new(HashMap::new())),
plugins_manager: PLUGINS_MANAGER.clone(),
}
}
}
impl Client {
/// Create a new TCP Client instance
pub fn new_tcp(stream: TcpStream, id: usize) -> Self {
let mut client = Self::from(stream);
client.id = id;
client
}
/// Create a new WebSocket Client instance
pub fn new_websocket(stream: TcpStream, id: usize) -> anyhow::Result<Self> {
let websocket = accept(stream)?;
let mut client = Self::from(websocket);
client.id = id;
Ok(client)
}
/// Recieve a message from the client
pub fn read(&self) -> anyhow::Result<String> {
// read the message from the stream
let mut msg = match &self.stream {
ClientStream::TCP(stream) => {
// allocate an empty buffer
let mut buf = [0; MAX_PACKET_LEN];
// read the message and get length of it
let len = stream.as_ref().read(&mut buf)?;
// select only used bytes in the buffer
let buf = &buf[0..len];
// decode buffer (&[u8]) to a String
String::from_utf8(buf.to_vec())?
},
ClientStream::WebSocket(stream) => {
// read the message from the stream
let msg = stream.lock().unwrap().read_message()?;
// decode message to a String
msg.to_string()
},
};
// remove new line characters
while msg.ends_with('\n') || msg.ends_with('\r') {
msg.pop();
}
info!("[Recieved]: {}", msg);
Ok(msg)
}
/// Send a message to the client
pub fn send<S>(&self, msg: S) -> anyhow::Result<()>
where
S: ToString,
S: fmt::Display,
{
// convert the message into a string
let msg = msg.to_string();
// convert the message into bytes to send it
let buf = msg.as_bytes();
// send the message
match &self.stream {
ClientStream::TCP(stream) => stream.as_ref().write_all(buf)?,
ClientStream::WebSocket(stream) => {
stream.lock().unwrap().write_message(Message::from(buf))?
},
}
info!("[Sent]: {}", msg);
Ok(())
}
/// Returns the socket address of the remote peer of this connection.
pub fn peer_addr(&self) -> anyhow::Result<SocketAddr> {
let addr = match &self.stream {
ClientStream::TCP(stream) => stream.peer_addr()?,
ClientStream::WebSocket(stream) => stream.lock().unwrap().get_ref().peer_addr()?,
};
Ok(addr)
}
/// Flush this output stream, ensuring that all intermediately buffered contents reach their destination.
pub fn flush(&self) -> anyhow::Result<()> {
match &self.stream {
ClientStream::TCP(stream) => stream.as_ref().flush()?,
ClientStream::WebSocket(_stream) => {},
}
Ok(())
}
/// Close the client connection
pub fn close(&self) -> anyhow::Result<()> {
match &self.stream {
ClientStream::TCP(stream) => stream.shutdown(Shutdown::Both)?,
ClientStream::WebSocket(stream) => stream.lock().unwrap().close(None)?,
}
Ok(())
}
/// Inserts a key-value pair into the map.
pub fn insert_key<S>(&self, key: S, value: ClientMapValue) -> Option<ClientMapValue>
where
S: ToString,
{
self.map.lock().unwrap().insert(key.to_string(), value)
}
/// Returns the value from the key.
pub fn get_value<S>(&self, key: S) -> Option<ClientMapValue>
where
S: ToString,
{
self.map.lock().unwrap().get(&key.to_string()).cloned()
}
/// Delete key from the map.
pub fn delete_key<S>(&self, key: S) -> Option<ClientMapValue>
where
S: ToString,
{
self.map.lock().unwrap().remove(&key.to_string())
}
pub async fn run_events(
&self,
event_type: EventType,
event_data: EventData,
) -> anyhow::Result<()> {
for event in self.plugins_manager.events.iter() {
if event.event() == event_type {
event.execute(self, event_data.clone()).await?;
}
}
Ok(())
}
}

7
src/server/mod.rs Normal file
View File

@ -0,0 +1,7 @@
//! Server infrastructure.
mod client;
mod run;
pub use client::*;
pub use run::*;

217
src/server/run.rs Normal file
View File

@ -0,0 +1,217 @@
use std::{net::TcpListener, thread};
use anyhow::anyhow;
use async_std::task;
use futures::join;
use lazy_static::lazy_static;
use tracing::{error, info, span, Level};
use crate::{
plugins::{
self,
prelude::{EventData, EventType},
PluginsManagerType,
},
server::Client,
CLIENTS, CLIENT_NEXT,
};
/// Plugins directory.
pub const PLUGINS_DIR: &str = "plugins";
lazy_static! {
/// Plugin manager, where you can find loaded plugins, commands and events
pub static ref PLUGINS_MANAGER: PluginsManagerType =
plugins::loader(PLUGINS_DIR).expect("failed to load plugins");
}
/// Start servers
pub fn run(tcp_host: String, ws_host: String) -> anyhow::Result<()> {
info!("Loaded {} plugins", PLUGINS_MANAGER.plugins.len());
info!("Loaded {} commands", PLUGINS_MANAGER.commands.len());
info!("Loaded {} events", PLUGINS_MANAGER.events.len());
let tcp_child = task::spawn(async move {
start_tcp(tcp_host).await.unwrap();
});
let ws_child = task::spawn(async move {
start_websocket(ws_host).await.unwrap();
});
task::block_on(async {
join!(tcp_child, ws_child);
});
Ok(())
}
/// Process client connection
async fn process(client: Client) -> anyhow::Result<()> {
let client_addr = client.peer_addr()?;
info!("Processing client connection: {}", client_addr);
// run `onConnect` events
client
.run_events(EventType::OnConnect, EventData::None)
.await?;
loop {
let buf = client.read()?;
// functions for error handling see `if` below function
async fn handle(client: &Client, buf: String) -> anyhow::Result<()> {
// run `onSend` events
client
.run_events(EventType::OnSend, EventData::None)
.await?;
let mut args: Vec<&str> = buf.split_ascii_whitespace().collect();
// if client sent an empty buffer
if args.is_empty() {
client.send("empty buffer")?;
return Ok(());
}
let cmd = args[0];
// remove command name from args
args = args[1..args.len()].to_vec();
// find command
let command = client
.plugins_manager
.commands
.iter()
.enumerate()
.find(|&(_i, command)| command.name() == cmd || command.aliases().contains(&cmd));
// execute command, if command isn't blocked
// to block a command return error in the `onCommand` event
if let Some((_i, cmd)) = command {
// run `onCommand` events
if client
.run_events(
EventType::OnCommand,
EventData::Command(cmd.name().to_string()),
)
.await
.is_ok()
{
// execute command
cmd.execute(client, args).await?;
}
} else {
client.send("unknown command")?;
}
Ok(())
}
// handle errors from message processing
if let Err(err) = handle(&client, buf).await {
let err = err.to_string();
// client disconnect e.g. using ctrl + c
if err.contains("Broken pipe") {
return Err(anyhow!("disconnected"));
} else {
error!("Unexpected error in message handler: {}", err);
client.send("Unexpected error")?;
}
}
client.flush()?;
}
}
async fn start_tcp(host: String) -> anyhow::Result<()> {
let listener = TcpListener::bind(host)?;
let incoming = listener.incoming();
for stream in incoming {
let stream = stream?;
// get id for the client
let id = *CLIENT_NEXT.lock().unwrap();
// add one to next id
*CLIENT_NEXT.lock().unwrap() += 1;
thread::spawn(move || {
// get id for the client and add one to next id
let client = Client::new_tcp(stream, id);
// insert the cloned client to CLIENTS
CLIENTS.lock().unwrap().insert(id, client.clone());
// add span to logger
let span = span!(Level::ERROR, "TCP", id = client.id);
let _enter = span.enter();
if let Err(err) = task::block_on(process(client)) {
let err = err.to_string();
// client disconnect e.g. using ctrl + c
if err == "disconnected" {
info!("Client disconnected")
} else {
error!("{}", err);
}
}
// delete the client from CLIENTS map
CLIENTS.lock().unwrap().remove(&id);
});
}
Ok(())
}
async fn start_websocket(host: String) -> anyhow::Result<()> {
let listener = TcpListener::bind(host)?;
let incoming = listener.incoming();
for stream in incoming {
let stream = stream?;
// get id for the client
let id = *CLIENT_NEXT.lock().unwrap();
// add one to next id
*CLIENT_NEXT.lock().unwrap() += 1;
thread::spawn(move || {
let client = Client::new_websocket(stream, id).unwrap();
// insert the cloned client to CLIENTS
CLIENTS.lock().unwrap().insert(id, client.clone());
// add span to logger
let span = span!(Level::ERROR, "WS", id = client.id);
let _enter = span.enter();
if let Err(err) = task::block_on(process(client)) {
let err = err.to_string();
// client disconnect e.g. using ctrl + c
if err == "disconnected"
|| err.contains("Connection reset without closing handshake")
{
info!("Client disconnected")
} else {
error!("{}", err);
}
}
// delete the client from CLIENTS map
CLIENTS.lock().unwrap().remove(&id);
});
}
Ok(())
}

View File

@ -1,48 +0,0 @@
#![allow(clippy::unused_io_amount)]
/// Max size of a TCP packet
pub const MAX_PACKET_LEN: usize = 65536;
use std::{
io::{self, Read, Write},
net::TcpStream,
};
/// TCP Client
pub struct Client {
/// TCP stream of this client
pub stream: TcpStream,
}
impl Client {
/// Create new Client
pub fn new(stream: TcpStream) -> Self {
Self { stream }
}
/// Read message/buffer from client
pub fn read(&mut self) -> anyhow::Result<String> {
// allocate an empty buffer
let mut buf = [0; MAX_PACKET_LEN];
// read buffer from stream
let len = self.stream.read(&mut buf)?;
// select only used bytes from the buffer
let recv_buf = &buf[0..len];
// encode buffer (&[u8]) to a String
let decoded = String::from_utf8(recv_buf.to_vec())?;
Ok(decoded)
}
/// Send message to client
pub fn send(&mut self, content: &str) -> io::Result<()> {
// add a new line at the end of the content
let content = format!("{content}\n\r");
// send message
self.stream.write_all(content.as_bytes())
}
}

View File

@ -1,98 +0,0 @@
use std::io::Write;
use log::{error, info, trace};
use crate::plugins::PluginManagerType;
use super::Client;
/// Handle Client connection
pub async fn handle_connection(
mut client: Client,
plugin_manager: PluginManagerType,
) -> anyhow::Result<()> {
info!("New Client: {}", client.stream.peer_addr()?);
// run `onConnect` events from plugins
check_event(&mut client, &plugin_manager, "onConnect").await?;
loop {
// read client message/buffer
let buf = client.read()?;
// run `onSend` events from plugins
check_event(&mut client, &plugin_manager, "onSend").await?;
// split the message by whitespaces and collect it into Vec<&str>
let mut args = buf.split_ascii_whitespace().collect::<Vec<&str>>();
// client sent an empty buffer
if args.is_empty() {
client.send("empty buffer")?;
// don't execute the following commands because it causes panic
continue;
}
// get command from args
let cmd = args[0];
// remove command name from args
args = args[1..args.len()].to_vec();
// search if a command exists
for command in plugin_manager.commands.iter() {
// if this is the entered command
if cmd == command.name() {
trace!("Executing a command `{}`", command.name());
// execute command
match command.execute(&mut client, args, &plugin_manager).await {
Ok(_) => (),
Err(err) => {
error!("failed to execute command `{cmd}`, error message = `{err}`");
client.send(&format!("error: {err}"))?;
}
}
// don't search for more commands
break;
}
}
// if an I/O or EOF error, abort the connection
if client.stream.flush().is_err() {
// terminate connection
break;
}
}
Ok(())
}
/// Search for a events and execute it
async fn check_event(
client: &mut Client,
events: &PluginManagerType,
event_name: &str,
) -> anyhow::Result<()> {
for event in events.events.iter() {
// check if this event should be started
if event.name() == event_name {
trace!("Executing a event `{}`", event.name());
// execute event
match event.execute(client).await {
Ok(_) => (),
Err(err) => {
error!("failed to execute event `{event_name}`, error message = `{err}`");
client.send(&format!("error: {err}"))?;
}
}
}
}
Ok(())
}

View File

@ -1,85 +0,0 @@
#![allow(clippy::unused_io_amount)]
use futures_util::{
stream::{SplitSink, SplitStream},
SinkExt, StreamExt,
};
use log::info;
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::{
tcp::{OwnedReadHalf, OwnedWriteHalf},
TcpStream,
},
};
use tokio_tungstenite::WebSocketStream;
use tungstenite::Message;
use super::MAX_PACKET_LEN;
/// Handle WebSocket connection
pub async fn handle_websocket(stream: TcpStream, tcp_port: String) -> anyhow::Result<()> {
info!("New WebSocket Client: {}", stream.peer_addr()?);
// accept connection as WebSocket
let ws_stream = tokio_tungstenite::accept_async(stream).await?;
// connect to Tcp server
let tcp_stream = TcpStream::connect(format!("0.0.0.0:{}", tcp_port)).await?;
// split streams
let (tcp_read, tcp_write) = tcp_stream.into_split();
let (ws_write, ws_read) = ws_stream.split();
// tcp read -> ws write
tokio::spawn(tcp_to_ws(tcp_read, ws_write));
// ws read -> tcp write
ws_to_tcp(tcp_write, ws_read).await?;
Ok(())
}
/// Tcp read -> WebSocket write
async fn tcp_to_ws(
mut tcp_read: OwnedReadHalf,
mut ws_write: SplitSink<WebSocketStream<TcpStream>, Message>,
) -> anyhow::Result<()> {
// allocate an empty buffer
let mut buf = [0; MAX_PACKET_LEN];
loop {
// read buffer from tcp
let len = tcp_read.read(&mut buf).await?;
if len > 0 {
// select only used bytes from the buffer
let recv_buf = &buf[0..len];
// covert &[u8] buffer to a vector
let recv_vec = recv_buf.to_vec();
// create a `Message` type from buffer Vec<u8>
let msg = Message::Binary(recv_vec);
// write buffer to websocket
ws_write.send(msg).await?;
}
}
}
/// WebSocket read -> Tcp write
async fn ws_to_tcp(
mut tcp_write: OwnedWriteHalf,
mut ws_read: SplitStream<WebSocketStream<TcpStream>>,
) -> anyhow::Result<()> {
while let Some(msg) = ws_read.next().await {
// handle error in the message
let msg = msg?;
// create a buffer from a message
let buf = msg.into_data();
// write buffer to tcp
tcp_write.write(&buf).await?;
}
Ok(())
}

View File

@ -1,9 +0,0 @@
//! TCP connection utils
mod client;
mod handle_connection;
mod handle_websocket;
pub use client::*;
pub use handle_connection::*;
pub use handle_websocket::*;