// SPDX-License-Identifier: GPL-3.0-only // SPDX-FileCopyrightText: 2023 Denis Drakhnia use std::{ cmp::Eq, collections::hash_map, fmt::Display, hash::Hash, io, net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6, ToSocketAddrs, UdpSocket}, ops::Deref, str::FromStr, sync::atomic::{AtomicBool, Ordering}, time::{Duration, Instant}, }; use ahash::{AHashMap as HashMap, AHashSet as HashSet}; use blake2b_simd::Params; use fastrand::Rng; use log::{debug, error, info, trace, warn}; use thiserror::Error; use xash3d_protocol::{ admin, filter::{Filter, FilterFlags, Version}, game, master::{self, ServerAddress}, server, server::Region, wrappers::Str, Error as ProtocolError, ServerInfo, }; use crate::{ config::{self, Config}, stats::Stats, }; pub trait AddrExt: Sized + Eq + Hash + Display + Copy + ToSocketAddrs + ServerAddress { type Ip: Eq + Hash + Display + Copy + FromStr; fn extract(addr: SocketAddr) -> Result; fn ip(&self) -> &Self::Ip; fn wrap(self) -> SocketAddr; fn mtu() -> usize; } impl AddrExt for SocketAddrV4 { type Ip = Ipv4Addr; fn extract(addr: SocketAddr) -> Result { if let SocketAddr::V4(addr) = addr { Ok(addr) } else { Err(addr) } } fn ip(&self) -> &Self::Ip { SocketAddrV4::ip(self) } fn wrap(self) -> SocketAddr { SocketAddr::V4(self) } #[inline(always)] fn mtu() -> usize { 512 } } impl AddrExt for SocketAddrV6 { type Ip = Ipv6Addr; fn extract(addr: SocketAddr) -> Result { if let SocketAddr::V6(addr) = addr { Ok(addr) } else { Err(addr) } } fn ip(&self) -> &Self::Ip { SocketAddrV6::ip(self) } fn wrap(self) -> SocketAddr { SocketAddr::V6(self) } #[inline(always)] fn mtu() -> usize { MAX_PACKET_SIZE } } /// The maximum size of UDP packets. const MAX_PACKET_SIZE: usize = 1280; /// How many cleanup calls should be skipped before removing outdated servers. const SERVER_CLEANUP_MAX: usize = 100; /// How many cleanup calls should be skipped before removing outdated challenges. const CHALLENGE_CLEANUP_MAX: usize = 100; /// How many cleanup calls should be skipped before removing outdated admin challenges. const ADMIN_CHALLENGE_CLEANUP_MAX: usize = 100; /// How many cleanup calls should be skipped before removing outdated admin limit entries const ADMIN_LIMIT_CLEANUP_MAX: usize = 100; #[derive(Error, Debug)] pub enum Error { #[error("Failed to bind server socket: {0}")] BindSocket(io::Error), #[error(transparent)] Protocol(#[from] ProtocolError), #[error(transparent)] Io(#[from] io::Error), #[error("Admin challenge do not exist")] AdminChallengeNotFound, #[error("Undefined packet")] UndefinedPacket, #[error("Unexpected packet")] UnexpectedPacket, } /// HashMap entry to keep tracking creation time. #[derive(Copy, Clone, Debug)] struct Entry { time: u32, value: T, } impl Entry { fn new(time: u32, value: T) -> Self { Self { time, value } } fn is_valid(&self, now: u32, duration: u32) -> bool { (now - self.time) < duration } } impl Entry { fn matches(&self, addr: Addr, region: Region, filter: &Filter) -> bool { self.region == region && filter.matches(addr.wrap(), &self.value) } } impl Deref for Entry { type Target = T; fn deref(&self) -> &Self::Target { &self.value } } struct Counter { max: usize, cur: usize, } impl Counter { fn new(max: usize) -> Self { Self { max, cur: 0 } } fn next(&mut self) -> bool { if self.cur <= self.max { self.cur += 1; false } else { self.cur = 0; true } } } pub struct MasterServer { sock: UdpSocket, challenges: HashMap>, challenges_counter: Counter, servers: HashMap>, servers_counter: Counter, max_servers_per_ip: u16, rng: Rng, start_time: Instant, timeout: config::TimeoutConfig, clver: Version, update_title: Box, update_map: Box, update_addr: SocketAddr, admin_challenges: HashMap>, admin_challenges_counter: Counter, admin_list: Box<[config::AdminConfig]>, // rate limit if hash is invalid admin_limit: HashMap>, admin_limit_counter: Counter, hash: config::HashConfig, blocklist: HashSet, stats: Stats, // temporary data filtered_servers: Vec, filtered_servers_nat: Vec, } fn resolve_socket_addr(addr: A, is_ipv4: bool) -> io::Result> where A: ToSocketAddrs, { for i in addr.to_socket_addrs()? { if i.is_ipv4() == is_ipv4 { return Ok(Some(i)); } } Ok(None) } fn resolve_update_addr(cfg: &Config, local_addr: SocketAddr) -> SocketAddr { if let Some(s) = cfg.client.update_addr.as_deref() { let addr = if !s.contains(':') { format!("{}:{}", s, local_addr.port()) } else { s.to_owned() }; match resolve_socket_addr(&addr, local_addr.is_ipv4()) { Ok(Some(x)) => return x, Ok(None) => error!("Update address: failed to resolve IP for \"{}\"", addr), Err(e) => error!("Update address: {}", e), } } local_addr } pub enum Master { V4(MasterServer), V6(MasterServer), } impl Master { pub fn new(cfg: Config) -> Result { match SocketAddr::new(cfg.server.ip, cfg.server.port) { SocketAddr::V4(addr) => MasterServer::new(cfg, addr).map(Self::V4), SocketAddr::V6(addr) => MasterServer::new(cfg, addr).map(Self::V6), } } pub fn update_config(&mut self, cfg: Config) -> Result<(), Error> { let cfg = match self { Self::V4(inner) => inner.update_config(cfg)?, Self::V6(inner) => inner.update_config(cfg)?, }; if let Some(cfg) = cfg { info!("Server IP version changed, full restart"); *self = Self::new(cfg)?; } Ok(()) } pub fn run(&mut self, sig_flag: &AtomicBool) -> Result<(), Error> { match self { Self::V4(inner) => inner.run(sig_flag), Self::V6(inner) => inner.run(sig_flag), } } } impl MasterServer { pub fn new(cfg: Config, addr: Addr) -> Result { info!("Listen address: {}", addr); let sock = UdpSocket::bind(addr).map_err(Error::BindSocket)?; // make socket interruptable by singals sock.set_read_timeout(Some(Duration::from_secs(u32::MAX as u64)))?; let update_addr = resolve_update_addr(&cfg, addr.wrap()); Ok(Self { sock, start_time: Instant::now(), challenges: Default::default(), challenges_counter: Counter::new(CHALLENGE_CLEANUP_MAX), servers: Default::default(), servers_counter: Counter::new(SERVER_CLEANUP_MAX), max_servers_per_ip: cfg.server.max_servers_per_ip, rng: Rng::new(), timeout: cfg.server.timeout, clver: cfg.client.version, update_title: cfg.client.update_title, update_map: cfg.client.update_map, update_addr, admin_challenges: Default::default(), admin_challenges_counter: Counter::new(ADMIN_CHALLENGE_CLEANUP_MAX), admin_list: cfg.admin_list, admin_limit: Default::default(), admin_limit_counter: Counter::new(ADMIN_LIMIT_CLEANUP_MAX), hash: cfg.hash, blocklist: Default::default(), stats: Stats::new(cfg.stat), filtered_servers: Default::default(), filtered_servers_nat: Default::default(), }) } fn local_addr(&self) -> io::Result { self.sock.local_addr() } pub fn update_config(&mut self, cfg: Config) -> Result, Error> { let local_addr = self.local_addr()?; let addr = SocketAddr::new(cfg.server.ip, cfg.server.port); if local_addr.is_ipv4() != addr.is_ipv4() { return Ok(Some(cfg)); } else if local_addr != addr { info!("Listen address: {}", addr); self.sock = UdpSocket::bind(addr).map_err(Error::BindSocket)?; // make socket interruptable by singals self.sock .set_read_timeout(Some(Duration::from_secs(u32::MAX as u64)))?; self.clear(); } self.update_addr = resolve_update_addr(&cfg, addr); self.timeout = cfg.server.timeout; self.clver = cfg.client.version; self.update_title = cfg.client.update_title; self.update_map = cfg.client.update_map; self.admin_list = cfg.admin_list; self.hash = cfg.hash; self.stats.update_config(cfg.stat); Ok(None) } pub fn run(&mut self, sig_flag: &AtomicBool) -> Result<(), Error> { let mut buf = [0; MAX_PACKET_SIZE]; while !sig_flag.load(Ordering::Relaxed) { let (n, from) = match self.sock.recv_from(&mut buf[..Addr::mtu()]) { Ok(x) => x, Err(e) => match e.kind() { io::ErrorKind::Interrupted => break, io::ErrorKind::TimedOut | io::ErrorKind::WouldBlock => continue, _ => Err(e)?, }, }; let from = match Addr::extract(from) { Ok(from) => from, Err(_) => continue, }; let src = &buf[..n]; if let Err(e) = self.handle_packet(from, src) { debug!("{}: {}: \"{}\"", from, e, Str(src)); self.stats.on_error(); } } Ok(()) } fn clear(&mut self) { info!("Clear all servers and challenges"); self.challenges.clear(); self.servers.clear(); self.admin_challenges.clear(); self.stats.clear(); } fn handle_server_packet(&mut self, from: Addr, p: server::Packet) -> Result<(), Error> { trace!("{}: recv {:?}", from, p); match p { server::Packet::Challenge(p) => { let master_challenge = self.add_challenge(from); let mut buf = [0; MAX_PACKET_SIZE]; let p = master::ChallengeResponse::new(master_challenge, p.server_challenge); trace!("{}: send {:?}", from, p); let n = p.encode(&mut buf)?; self.sock.send_to(&buf[..n], from)?; self.remove_outdated_challenges(); } server::Packet::ServerAdd(p) => { let entry = match self.challenges.get(&from) { Some(e) => e, None => { trace!("{}: Challenge does not exists", from); return Ok(()); } }; if !entry.is_valid(self.now(), self.timeout.challenge) { return Ok(()); } if p.challenge != entry.value { warn!( "{}: Expected challenge {} but received {}", from, entry.value, p.challenge ); return Ok(()); } if self.challenges.remove(&from).is_some() { self.add_server(from, ServerInfo::new(&p)); self.stats.on_server_add(); self.stats.servers_count(self.servers.len()); } self.remove_outdated_servers(); } server::Packet::ServerRemove => { self.stats.on_server_del(); } _ => { return Err(Error::UnexpectedPacket); } } Ok(()) } fn handle_game_packet(&mut self, from: Addr, p: game::Packet) -> Result<(), Error> { trace!("{}: recv {:?}", from, p); match p { game::Packet::QueryServers(p) => { if p.filter.clver.map_or(false, |v| v < self.clver) { match self.update_addr { SocketAddr::V4(addr) => { self.send_server_list(from, p.filter.key, &[addr])?; } SocketAddr::V6(addr) => { self.send_server_list(from, p.filter.key, &[addr])?; } } } else { let now = self.now(); self.filtered_servers.clear(); self.filtered_servers_nat.clear(); self.servers .iter() .filter(|(addr, info)| { info.is_valid(now, self.timeout.server) && info.matches(**addr, p.region, &p.filter) }) .for_each(|(addr, info)| { self.filtered_servers.push(*addr); if info.flags.contains(FilterFlags::NAT) { self.filtered_servers_nat.push(*addr); } }); self.send_server_list(from, p.filter.key, &self.filtered_servers)?; // NOTE: If NAT is not set in a filter then by default the client is announced // to filtered servers behind NAT. if p.filter.contains_flags(FilterFlags::NAT).unwrap_or(true) { self.send_client_to_nat_servers(from, &self.filtered_servers_nat)?; } self.stats.on_query_servers(); } } game::Packet::GetServerInfo(_) => { let p = server::GetServerInfoResponse { map: self.update_map.as_ref(), host: self.update_title.as_ref(), protocol: 48, // XXX: how to detect what version client will accept? dm: true, maxcl: 32, gamedir: "valve", // XXX: probably must be specific for client... ..Default::default() }; trace!("{}: send {:?}", from, p); let mut buf = [0; MAX_PACKET_SIZE]; let n = p.encode(&mut buf[..Addr::mtu()])?; self.sock.send_to(&buf[..n], from)?; } } Ok(()) } fn handle_admin_packet(&mut self, from: Addr, p: admin::Packet) -> Result<(), Error> { trace!("{}: recv {:?}", from, p); let now = self.now(); if let Some(e) = self.admin_limit.get(from.ip()) { if e.is_valid(now, self.timeout.admin) { trace!("{}: rate limit", from); return Ok(()); } } match p { admin::Packet::AdminChallenge => { let (master_challenge, hash_challenge) = self.admin_challenge_add(from); let p = master::AdminChallengeResponse::new(master_challenge, hash_challenge); trace!("{}: send {:?}", from, p); let mut buf = [0; 64]; let n = p.encode(&mut buf)?; self.sock.send_to(&buf[..n], from)?; self.admin_challenges_cleanup(); } admin::Packet::AdminCommand(p) => { let entry = *self .admin_challenges .get(from.ip()) .ok_or(Error::AdminChallengeNotFound)?; if entry.0 != p.master_challenge { trace!("{}: master challenge is not valid", from); return Ok(()); } if !entry.is_valid(now, self.timeout.challenge) { trace!("{}: challenge is outdated", from); return Ok(()); } let state = Params::new() .hash_length(self.hash.len) .key(self.hash.key.as_bytes()) .personal(self.hash.personal.as_bytes()) .to_state(); let admin = self.admin_list.iter().find(|i| { let hash = state .clone() .update(i.password.as_bytes()) .update(&entry.1.to_le_bytes()) .finalize(); *p.hash == hash.as_bytes() }); match admin { Some(admin) => { info!("{}: admin({}), command: {:?}", from, &admin.name, p.command); self.admin_command(p.command); self.admin_challenge_remove(from); } None => { warn!("{}: invalid admin hash, command: {:?}", from, p.command); self.admin_limit.insert(*from.ip(), Entry::new(now, ())); self.admin_limit_cleanup(); } } } } Ok(()) } fn handle_packet(&mut self, from: Addr, src: &[u8]) -> Result<(), Error> { if self.is_blocked(from.ip()) { return Ok(()); } match server::Packet::decode(src) { Ok(Some(p)) => return self.handle_server_packet(from, p), Ok(None) => {} Err(e) => Err(e)?, } match game::Packet::decode(src) { Ok(Some(p)) => return self.handle_game_packet(from, p), Ok(None) => {} Err(e) => Err(e)?, } match admin::Packet::decode(self.hash.len, src) { Ok(Some(p)) => return self.handle_admin_packet(from, p), Ok(None) => {} Err(e) => Err(e)?, } Err(Error::UndefinedPacket) } fn now(&self) -> u32 { self.start_time.elapsed().as_secs() as u32 } fn add_challenge(&mut self, addr: Addr) -> u32 { let x = self.rng.u32(..); let entry = Entry::new(self.now(), x); self.challenges.insert(addr, entry); x } fn remove_outdated_challenges(&mut self) { if self.challenges_counter.next() { let now = self.now(); self.challenges .retain(|_, v| v.is_valid(now, self.timeout.challenge)); } } fn admin_challenge_add(&mut self, addr: Addr) -> (u32, u32) { let x = self.rng.u32(..); let y = self.rng.u32(..); let entry = Entry::new(self.now(), (x, y)); self.admin_challenges.insert(*addr.ip(), entry); (x, y) } fn admin_challenge_remove(&mut self, addr: Addr) { self.admin_challenges.remove(addr.ip()); } /// Remove outdated entries fn admin_challenges_cleanup(&mut self) { if self.admin_challenges_counter.next() { let now = self.now(); self.admin_challenges .retain(|_, v| v.is_valid(now, self.timeout.challenge)); } } fn admin_limit_cleanup(&mut self) { if self.admin_limit_counter.next() { let now = self.now(); self.admin_limit .retain(|_, v| v.is_valid(now, self.timeout.admin)); } } fn count_servers(&self, ip: &Addr::Ip) -> u16 { self.servers.keys().filter(|i| i.ip() == ip).count() as u16 } fn add_server(&mut self, addr: Addr, server: ServerInfo) { let now = self.now(); match self.servers.entry(addr) { hash_map::Entry::Occupied(mut e) => { trace!("{}: Updated GameServer", addr); e.insert(Entry::new(now, server)); } hash_map::Entry::Vacant(_) => { if self.count_servers(addr.ip()) >= self.max_servers_per_ip { trace!("{}: max servers per ip", addr); return; } trace!("{}: New GameServer", addr); self.servers.insert(addr, Entry::new(now, server)); } } } fn remove_outdated_servers(&mut self) { if self.servers_counter.next() { let now = self.now(); self.servers .retain(|_, v| v.is_valid(now, self.timeout.server)); self.stats.servers_count(self.servers.len()); } } fn send_server_list(&self, to: A, key: Option, servers: &[S]) -> Result<(), Error> where A: ToSocketAddrs, S: ServerAddress, { let mut buf = [0; MAX_PACKET_SIZE]; let mut offset = 0; let mut list = master::QueryServersResponse::new(key); while offset < servers.len() { let (n, c) = list.encode(&mut buf[..Addr::mtu()], &servers[offset..])?; offset += c; self.sock.send_to(&buf[..n], &to)?; } Ok(()) } fn send_client_to_nat_servers(&self, to: Addr, servers: &[Addr]) -> Result<(), Error> { let mut buf = [0; 64]; let n = master::ClientAnnounce::new(to.wrap()).encode(&mut buf)?; let buf = &buf[..n]; for i in servers { self.sock.send_to(buf, i)?; } Ok(()) } #[inline] fn is_blocked(&self, ip: &Addr::Ip) -> bool { self.blocklist.contains(ip) } fn admin_command(&mut self, cmd: &str) { let args: Vec<_> = cmd.split(' ').collect(); fn helper(args: &[&str], mut op: F) where Addr: AddrExt, F: FnMut(&str, Addr::Ip), { let iter = args.iter().map(|i| (i, i.parse::())); for (i, ip) in iter { match ip { Ok(ip) => op(i, ip), Err(_) => warn!("invalid ip: {}", i), } } } match args[0] { "ban" => { helper::(&args[1..], |_, ip| { if self.blocklist.insert(ip) { info!("ban ip: {}", ip); } }); } "unban" => { helper::(&args[1..], |_, ip| { if self.blocklist.remove(&ip) { info!("unban ip: {}", ip); } }); } _ => { warn!("invalid command: {}", args[0]); } } } }