Browse Source

master: collect and log stats

ipv6
Denis Drakhnia 8 months ago
parent
commit
d0bf04e177
  1. 20
      master/config/default.toml
  2. 14
      master/src/cli.rs
  3. 23
      master/src/config.rs
  4. 7
      master/src/main.rs
  5. 14
      master/src/master_server.rs
  6. 155
      master/src/stats.rs

20
master/config/default.toml

@ -18,6 +18,26 @@
## Time in seconds before next admin request is allowed after wrong password ## Time in seconds before next admin request is allowed after wrong password
#admin = 10 #admin = 10
#[stat]
# Interval in seconds, set zero to disable
#interval = 0
# Format string description
#
# %s - servers count
#
# Requests per second
# %a - add server
# %d - remove server
# %q - query servers
# %e - errors
#
# Requests per interval time
# %A - add server
# %D - remove server
# %Q - query servers
# %E - errors
#format = "stats: %s servers, %a add/s, %d del/s, %q query/s, %e error/s"
#[client] #[client]
## If client version is less then show update message ## If client version is less then show update message
#version = "0.19" #version = "0.19"

14
master/src/cli.rs

@ -20,6 +20,8 @@ pub enum Error {
InvalidIp(String), InvalidIp(String),
#[error("Invalid port number \"{0}\"")] #[error("Invalid port number \"{0}\"")]
InvalidPort(String), InvalidPort(String),
#[error("Invalid stats interval \"{0}\"")]
InvalidStatsInterval(String),
#[error(transparent)] #[error(transparent)]
Options(#[from] getopts::Fail), Options(#[from] getopts::Fail),
} }
@ -30,6 +32,8 @@ pub struct Cli {
pub listen_ip: Option<IpAddr>, pub listen_ip: Option<IpAddr>,
pub listen_port: Option<u16>, pub listen_port: Option<u16>,
pub config_path: Option<Box<str>>, pub config_path: Option<Box<str>>,
pub stats_format: Option<Box<str>>,
pub stats_interval: Option<u32>,
} }
fn print_usage(opts: Options) { fn print_usage(opts: Options) {
@ -59,6 +63,8 @@ pub fn parse() -> Result<Cli, Error> {
); );
opts.optopt("p", "port", &port_help, "PORT"); opts.optopt("p", "port", &port_help, "PORT");
opts.optopt("c", "config", "config path", "PATH"); opts.optopt("c", "config", "config path", "PATH");
opts.optopt("s", "stats-format", "stats format string", "FMT");
opts.optopt("I", "stats-interval", "stats interval", "SECONDS");
let matches = opts.parse(&args[1..])?; let matches = opts.parse(&args[1..])?;
@ -94,5 +100,13 @@ pub fn parse() -> Result<Cli, Error> {
cli.config_path = Some(s.into_boxed_str()); cli.config_path = Some(s.into_boxed_str());
} }
if let Some(s) = matches.opt_str("stats-format") {
cli.stats_format = Some(s.into_boxed_str());
}
if let Some(s) = matches.opt_str("stats-interval") {
cli.stats_interval = Some(s.parse().map_err(|_| Error::InvalidStatsInterval(s))?);
}
Ok(cli) Ok(cli)
} }

23
master/src/config.rs

@ -56,6 +56,8 @@ pub struct Config {
#[serde(rename = "admin")] #[serde(rename = "admin")]
#[serde(default)] #[serde(default)]
pub admin_list: Box<[AdminConfig]>, pub admin_list: Box<[AdminConfig]>,
#[serde(default)]
pub stat: StatConfig,
} }
#[derive(Deserialize, Debug)] #[derive(Deserialize, Debug)]
@ -173,6 +175,23 @@ pub struct AdminConfig {
pub password: Box<str>, pub password: Box<str>,
} }
#[derive(Deserialize, Debug, Clone)]
#[serde(deny_unknown_fields)]
pub struct StatConfig {
pub interval: u32,
#[serde(default = "default_stats_format")]
pub format: Box<str>,
}
impl Default for StatConfig {
fn default() -> Self {
Self {
interval: 0,
format: default_stats_format(),
}
}
}
fn default_log_level() -> LevelFilter { fn default_log_level() -> LevelFilter {
LevelFilter::Warn LevelFilter::Warn
} }
@ -189,6 +208,10 @@ fn default_hash_personal() -> Box<str> {
Box::from(admin::HASH_PERSONAL) Box::from(admin::HASH_PERSONAL)
} }
fn default_stats_format() -> Box<str> {
Box::from("stats: %s servers, %a add/s, %d del/s, %q query/s, %e error/s")
}
fn deserialize_log_level<'de, D>(de: D) -> Result<LevelFilter, D::Error> fn deserialize_log_level<'de, D>(de: D) -> Result<LevelFilter, D::Error>
where where
D: Deserializer<'de>, D: Deserializer<'de>,

7
master/src/main.rs

@ -7,6 +7,7 @@ mod cli;
mod config; mod config;
mod logger; mod logger;
mod master_server; mod master_server;
mod stats;
use std::process; use std::process;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
@ -35,6 +36,12 @@ fn load_config(cli: &Cli) -> Result<Config, config::Error> {
if let Some(port) = cli.listen_port { if let Some(port) = cli.listen_port {
cfg.server.port = port; cfg.server.port = port;
} }
if let Some(format) = &cli.stats_format {
cfg.stat.format = format.clone();
}
if let Some(interval) = cli.stats_interval {
cfg.stat.interval = interval;
}
log::set_max_level(cfg.log.level); log::set_max_level(cfg.log.level);

14
master/src/master_server.rs

@ -19,6 +19,7 @@ use xash3d_protocol::wrappers::Str;
use xash3d_protocol::{admin, game, master, server, Error as ProtocolError, ServerInfo}; use xash3d_protocol::{admin, game, master, server, Error as ProtocolError, ServerInfo};
use crate::config::{self, Config}; use crate::config::{self, Config};
use crate::stats::Stats;
/// The maximum size of UDP packets. /// The maximum size of UDP packets.
const MAX_PACKET_SIZE: usize = 512; const MAX_PACKET_SIZE: usize = 512;
@ -129,6 +130,8 @@ pub struct MasterServer {
hash: config::HashConfig, hash: config::HashConfig,
blocklist: HashSet<Ipv4Addr>, blocklist: HashSet<Ipv4Addr>,
stats: Stats,
} }
fn resolve_socket_addr<A>(addr: A) -> io::Result<Option<SocketAddrV4>> fn resolve_socket_addr<A>(addr: A) -> io::Result<Option<SocketAddrV4>>
@ -196,6 +199,7 @@ impl MasterServer {
admin_limit_counter: Counter::new(ADMIN_LIMIT_CLEANUP_MAX), admin_limit_counter: Counter::new(ADMIN_LIMIT_CLEANUP_MAX),
hash: cfg.hash, hash: cfg.hash,
blocklist: Default::default(), blocklist: Default::default(),
stats: Stats::new(cfg.stat),
}) })
} }
@ -218,6 +222,7 @@ impl MasterServer {
self.update_map = cfg.client.update_map; self.update_map = cfg.client.update_map;
self.admin_list = cfg.admin_list; self.admin_list = cfg.admin_list;
self.hash = cfg.hash; self.hash = cfg.hash;
self.stats.update_config(cfg.stat);
Ok(()) Ok(())
} }
@ -245,6 +250,7 @@ impl MasterServer {
let src = &buf[..n]; let src = &buf[..n];
if let Err(e) = self.handle_packet(from, src) { if let Err(e) = self.handle_packet(from, src) {
debug!("{}: {}: \"{}\"", from, e, Str(src)); debug!("{}: {}: \"{}\"", from, e, Str(src));
self.stats.on_error();
} }
} }
Ok(()) Ok(())
@ -255,6 +261,7 @@ impl MasterServer {
self.challenges.clear(); self.challenges.clear();
self.servers.clear(); self.servers.clear();
self.admin_challenges.clear(); self.admin_challenges.clear();
self.stats.clear();
} }
fn handle_server_packet(&mut self, from: SocketAddrV4, p: server::Packet) -> Result<(), Error> { fn handle_server_packet(&mut self, from: SocketAddrV4, p: server::Packet) -> Result<(), Error> {
@ -290,11 +297,13 @@ impl MasterServer {
} }
if self.challenges.remove(&from).is_some() { if self.challenges.remove(&from).is_some() {
self.add_server(from, ServerInfo::new(&p)); self.add_server(from, ServerInfo::new(&p));
self.stats.on_server_add();
self.stats.servers_count(self.servers.len());
} }
self.remove_outdated_servers(); self.remove_outdated_servers();
} }
server::Packet::ServerRemove => { server::Packet::ServerRemove => {
// ignore self.stats.on_server_del();
} }
_ => { _ => {
return Err(Error::UnexpectedPacket); return Err(Error::UnexpectedPacket);
@ -326,6 +335,8 @@ impl MasterServer {
if p.filter.flags.contains(FilterFlags::NAT) { if p.filter.flags.contains(FilterFlags::NAT) {
self.send_client_to_nat_servers(from, iter)?; self.send_client_to_nat_servers(from, iter)?;
} }
self.stats.on_query_servers();
} }
} }
game::Packet::GetServerInfo(_) => { game::Packet::GetServerInfo(_) => {
@ -522,6 +533,7 @@ impl MasterServer {
let now = self.now(); let now = self.now();
self.servers self.servers
.retain(|_, v| v.is_valid(now, self.timeout.server)); .retain(|_, v| v.is_valid(now, self.timeout.server));
self.stats.servers_count(self.servers.len());
} }
} }

155
master/src/stats.rs

@ -0,0 +1,155 @@
use std::fmt::{self, Write};
use std::sync::atomic::{AtomicU32, AtomicUsize, Ordering::Relaxed};
use std::sync::mpsc;
use std::sync::Arc;
use std::thread;
use std::time::{Duration, Instant};
use log::info;
use crate::config::StatConfig;
#[derive(Default)]
struct Counters {
servers: AtomicUsize,
server_add: AtomicU32,
server_del: AtomicU32,
query_servers: AtomicU32,
errors: AtomicU32,
}
impl Counters {
fn print(&self, mut format: &str, buf: &mut String, time: Duration) -> fmt::Result {
let time = time.as_secs_f64();
let servers = self.servers.load(Relaxed);
let server_add = self.server_add.swap(0, Relaxed);
let server_del = self.server_del.swap(0, Relaxed);
let query_servers = self.query_servers.swap(0, Relaxed);
let errors = self.errors.swap(0, Relaxed);
loop {
// TODO: precompile format string
match format.find('%').map(|i| format.split_at(i)) {
Some((head, tail)) => {
format = &tail[1..];
write!(buf, "{}", head)?;
let mut chars = format.char_indices();
match chars.next().map(|(_, c)| c) {
Some('s') => write!(buf, "{}", servers)?,
Some('A') => write!(buf, "{}", server_add)?,
Some('D') => write!(buf, "{}", server_del)?,
Some('Q') => write!(buf, "{}", query_servers)?,
Some('E') => write!(buf, "{}", errors)?,
Some('a') => write!(buf, "{:.1}", server_add as f64 / time)?,
Some('d') => write!(buf, "{:.1}", server_del as f64 / time)?,
Some('q') => write!(buf, "{:.1}", query_servers as f64 / time)?,
Some('e') => write!(buf, "{:.1}", errors as f64 / time)?,
Some(c) => write!(buf, "%{}", c)?,
None => write!(buf, "%")?,
}
match chars.next() {
Some((i, _)) => format = &format[i..],
None => break,
}
}
None => {
write!(buf, "{}", format)?;
break;
}
}
}
Ok(())
}
fn clear(&self) {
self.servers.store(0, Relaxed);
self.server_add.store(0, Relaxed);
self.server_del.store(0, Relaxed);
self.query_servers.store(0, Relaxed);
self.errors.store(0, Relaxed);
}
}
pub struct Stats {
enabled: bool,
tx: mpsc::Sender<StatConfig>,
counters: Arc<Counters>,
}
impl Stats {
pub fn new(mut config: StatConfig) -> Self {
let counters_ = Arc::new(Counters::default());
let (tx, rx) = mpsc::channel();
let enabled = config.interval != 0;
let counters = counters_.clone();
thread::spawn(move || -> fmt::Result {
let buf = &mut String::new();
loop {
if config.interval == 0 {
config = rx.recv().unwrap();
counters.clear();
continue;
}
let duration = Duration::from_secs(config.interval as u64);
let start = Instant::now();
if let Ok(new_config) = rx.recv_timeout(duration) {
config = new_config;
continue;
}
buf.clear();
counters.print(&config.format, buf, start.elapsed())?;
info!("{}", buf);
}
});
Self {
enabled,
tx,
counters: counters_,
}
}
pub fn update_config(&mut self, config: StatConfig) {
self.enabled = config.interval != 0;
self.tx.send(config).unwrap();
}
pub fn clear(&self) {
self.counters.clear();
}
pub fn servers_count(&self, n: usize) {
if self.enabled {
self.counters.servers.store(n, Relaxed);
}
}
pub fn on_server_add(&self) {
if self.enabled {
self.counters.server_add.fetch_add(1, Relaxed);
}
}
pub fn on_server_del(&self) {
if self.enabled {
self.counters.server_del.fetch_add(1, Relaxed);
}
}
pub fn on_query_servers(&self) {
if self.enabled {
self.counters.query_servers.fetch_add(1, Relaxed);
}
}
pub fn on_error(&self) {
if self.enabled {
self.counters.errors.fetch_add(1, Relaxed);
}
}
}
Loading…
Cancel
Save