diff --git a/master/config/default.toml b/master/config/default.toml index b7967ec..8cc84e2 100644 --- a/master/config/default.toml +++ b/master/config/default.toml @@ -18,6 +18,26 @@ ## Time in seconds before next admin request is allowed after wrong password #admin = 10 +#[stat] +# Interval in seconds, set zero to disable +#interval = 0 +# Format string description +# +# %s - servers count +# +# Requests per second +# %a - add server +# %d - remove server +# %q - query servers +# %e - errors +# +# Requests per interval time +# %A - add server +# %D - remove server +# %Q - query servers +# %E - errors +#format = "stats: %s servers, %a add/s, %d del/s, %q query/s, %e error/s" + #[client] ## If client version is less then show update message #version = "0.19" diff --git a/master/src/cli.rs b/master/src/cli.rs index e036d23..aea551c 100644 --- a/master/src/cli.rs +++ b/master/src/cli.rs @@ -20,6 +20,8 @@ pub enum Error { InvalidIp(String), #[error("Invalid port number \"{0}\"")] InvalidPort(String), + #[error("Invalid stats interval \"{0}\"")] + InvalidStatsInterval(String), #[error(transparent)] Options(#[from] getopts::Fail), } @@ -30,6 +32,8 @@ pub struct Cli { pub listen_ip: Option, pub listen_port: Option, pub config_path: Option>, + pub stats_format: Option>, + pub stats_interval: Option, } fn print_usage(opts: Options) { @@ -59,6 +63,8 @@ pub fn parse() -> Result { ); opts.optopt("p", "port", &port_help, "PORT"); opts.optopt("c", "config", "config path", "PATH"); + opts.optopt("s", "stats-format", "stats format string", "FMT"); + opts.optopt("I", "stats-interval", "stats interval", "SECONDS"); let matches = opts.parse(&args[1..])?; @@ -94,5 +100,13 @@ pub fn parse() -> Result { cli.config_path = Some(s.into_boxed_str()); } + if let Some(s) = matches.opt_str("stats-format") { + cli.stats_format = Some(s.into_boxed_str()); + } + + if let Some(s) = matches.opt_str("stats-interval") { + cli.stats_interval = Some(s.parse().map_err(|_| Error::InvalidStatsInterval(s))?); + } + Ok(cli) } diff --git a/master/src/config.rs b/master/src/config.rs index 3152ec6..cf23f65 100644 --- a/master/src/config.rs +++ b/master/src/config.rs @@ -56,6 +56,8 @@ pub struct Config { #[serde(rename = "admin")] #[serde(default)] pub admin_list: Box<[AdminConfig]>, + #[serde(default)] + pub stat: StatConfig, } #[derive(Deserialize, Debug)] @@ -173,6 +175,23 @@ pub struct AdminConfig { pub password: Box, } +#[derive(Deserialize, Debug, Clone)] +#[serde(deny_unknown_fields)] +pub struct StatConfig { + pub interval: u32, + #[serde(default = "default_stats_format")] + pub format: Box, +} + +impl Default for StatConfig { + fn default() -> Self { + Self { + interval: 0, + format: default_stats_format(), + } + } +} + fn default_log_level() -> LevelFilter { LevelFilter::Warn } @@ -189,6 +208,10 @@ fn default_hash_personal() -> Box { Box::from(admin::HASH_PERSONAL) } +fn default_stats_format() -> Box { + Box::from("stats: %s servers, %a add/s, %d del/s, %q query/s, %e error/s") +} + fn deserialize_log_level<'de, D>(de: D) -> Result where D: Deserializer<'de>, diff --git a/master/src/main.rs b/master/src/main.rs index e9d720b..7c9bb3e 100644 --- a/master/src/main.rs +++ b/master/src/main.rs @@ -7,6 +7,7 @@ mod cli; mod config; mod logger; mod master_server; +mod stats; use std::process; use std::sync::atomic::{AtomicBool, Ordering}; @@ -35,6 +36,12 @@ fn load_config(cli: &Cli) -> Result { if let Some(port) = cli.listen_port { cfg.server.port = port; } + if let Some(format) = &cli.stats_format { + cfg.stat.format = format.clone(); + } + if let Some(interval) = cli.stats_interval { + cfg.stat.interval = interval; + } log::set_max_level(cfg.log.level); diff --git a/master/src/master_server.rs b/master/src/master_server.rs index dc01791..dce096a 100644 --- a/master/src/master_server.rs +++ b/master/src/master_server.rs @@ -19,6 +19,7 @@ use xash3d_protocol::wrappers::Str; use xash3d_protocol::{admin, game, master, server, Error as ProtocolError, ServerInfo}; use crate::config::{self, Config}; +use crate::stats::Stats; /// The maximum size of UDP packets. const MAX_PACKET_SIZE: usize = 512; @@ -129,6 +130,8 @@ pub struct MasterServer { hash: config::HashConfig, blocklist: HashSet, + + stats: Stats, } fn resolve_socket_addr(addr: A) -> io::Result> @@ -196,6 +199,7 @@ impl MasterServer { admin_limit_counter: Counter::new(ADMIN_LIMIT_CLEANUP_MAX), hash: cfg.hash, blocklist: Default::default(), + stats: Stats::new(cfg.stat), }) } @@ -218,6 +222,7 @@ impl MasterServer { self.update_map = cfg.client.update_map; self.admin_list = cfg.admin_list; self.hash = cfg.hash; + self.stats.update_config(cfg.stat); Ok(()) } @@ -245,6 +250,7 @@ impl MasterServer { let src = &buf[..n]; if let Err(e) = self.handle_packet(from, src) { debug!("{}: {}: \"{}\"", from, e, Str(src)); + self.stats.on_error(); } } Ok(()) @@ -255,6 +261,7 @@ impl MasterServer { self.challenges.clear(); self.servers.clear(); self.admin_challenges.clear(); + self.stats.clear(); } fn handle_server_packet(&mut self, from: SocketAddrV4, p: server::Packet) -> Result<(), Error> { @@ -290,11 +297,13 @@ impl MasterServer { } if self.challenges.remove(&from).is_some() { self.add_server(from, ServerInfo::new(&p)); + self.stats.on_server_add(); + self.stats.servers_count(self.servers.len()); } self.remove_outdated_servers(); } server::Packet::ServerRemove => { - // ignore + self.stats.on_server_del(); } _ => { return Err(Error::UnexpectedPacket); @@ -326,6 +335,8 @@ impl MasterServer { if p.filter.flags.contains(FilterFlags::NAT) { self.send_client_to_nat_servers(from, iter)?; } + + self.stats.on_query_servers(); } } game::Packet::GetServerInfo(_) => { @@ -522,6 +533,7 @@ impl MasterServer { let now = self.now(); self.servers .retain(|_, v| v.is_valid(now, self.timeout.server)); + self.stats.servers_count(self.servers.len()); } } diff --git a/master/src/stats.rs b/master/src/stats.rs new file mode 100644 index 0000000..ff4a9a6 --- /dev/null +++ b/master/src/stats.rs @@ -0,0 +1,155 @@ +use std::fmt::{self, Write}; +use std::sync::atomic::{AtomicU32, AtomicUsize, Ordering::Relaxed}; +use std::sync::mpsc; +use std::sync::Arc; +use std::thread; +use std::time::{Duration, Instant}; + +use log::info; + +use crate::config::StatConfig; + +#[derive(Default)] +struct Counters { + servers: AtomicUsize, + server_add: AtomicU32, + server_del: AtomicU32, + query_servers: AtomicU32, + errors: AtomicU32, +} + +impl Counters { + fn print(&self, mut format: &str, buf: &mut String, time: Duration) -> fmt::Result { + let time = time.as_secs_f64(); + let servers = self.servers.load(Relaxed); + let server_add = self.server_add.swap(0, Relaxed); + let server_del = self.server_del.swap(0, Relaxed); + let query_servers = self.query_servers.swap(0, Relaxed); + let errors = self.errors.swap(0, Relaxed); + + loop { + // TODO: precompile format string + match format.find('%').map(|i| format.split_at(i)) { + Some((head, tail)) => { + format = &tail[1..]; + write!(buf, "{}", head)?; + let mut chars = format.char_indices(); + match chars.next().map(|(_, c)| c) { + Some('s') => write!(buf, "{}", servers)?, + Some('A') => write!(buf, "{}", server_add)?, + Some('D') => write!(buf, "{}", server_del)?, + Some('Q') => write!(buf, "{}", query_servers)?, + Some('E') => write!(buf, "{}", errors)?, + Some('a') => write!(buf, "{:.1}", server_add as f64 / time)?, + Some('d') => write!(buf, "{:.1}", server_del as f64 / time)?, + Some('q') => write!(buf, "{:.1}", query_servers as f64 / time)?, + Some('e') => write!(buf, "{:.1}", errors as f64 / time)?, + Some(c) => write!(buf, "%{}", c)?, + None => write!(buf, "%")?, + } + match chars.next() { + Some((i, _)) => format = &format[i..], + None => break, + } + } + None => { + write!(buf, "{}", format)?; + break; + } + } + } + + Ok(()) + } + + fn clear(&self) { + self.servers.store(0, Relaxed); + self.server_add.store(0, Relaxed); + self.server_del.store(0, Relaxed); + self.query_servers.store(0, Relaxed); + self.errors.store(0, Relaxed); + } +} + +pub struct Stats { + enabled: bool, + tx: mpsc::Sender, + counters: Arc, +} + +impl Stats { + pub fn new(mut config: StatConfig) -> Self { + let counters_ = Arc::new(Counters::default()); + let (tx, rx) = mpsc::channel(); + + let enabled = config.interval != 0; + let counters = counters_.clone(); + thread::spawn(move || -> fmt::Result { + let buf = &mut String::new(); + + loop { + if config.interval == 0 { + config = rx.recv().unwrap(); + counters.clear(); + continue; + } + + let duration = Duration::from_secs(config.interval as u64); + let start = Instant::now(); + if let Ok(new_config) = rx.recv_timeout(duration) { + config = new_config; + continue; + } + + buf.clear(); + counters.print(&config.format, buf, start.elapsed())?; + info!("{}", buf); + } + }); + + Self { + enabled, + tx, + counters: counters_, + } + } + + pub fn update_config(&mut self, config: StatConfig) { + self.enabled = config.interval != 0; + self.tx.send(config).unwrap(); + } + + pub fn clear(&self) { + self.counters.clear(); + } + + pub fn servers_count(&self, n: usize) { + if self.enabled { + self.counters.servers.store(n, Relaxed); + } + } + + pub fn on_server_add(&self) { + if self.enabled { + self.counters.server_add.fetch_add(1, Relaxed); + } + } + + pub fn on_server_del(&self) { + if self.enabled { + self.counters.server_del.fetch_add(1, Relaxed); + } + } + + pub fn on_query_servers(&self) { + if self.enabled { + self.counters.query_servers.fetch_add(1, Relaxed); + } + } + + pub fn on_error(&self) { + if self.enabled { + self.counters.errors.fetch_add(1, Relaxed); + } + } +}