Browse Source

query: do not use threads

ipv6
Denis Drakhnia 11 months ago
parent
commit
62fe4d0b6c
  1. 4
      protocol/src/master.rs
  2. 395
      query/src/main.rs

4
protocol/src/master.rs

@ -76,6 +76,10 @@ impl<'a> QueryServersResponse<&'a [u8]> {
SocketAddrV4::new(ip, port) SocketAddrV4::new(ip, port)
}) })
} }
pub fn is_empty(&self) -> bool {
self.inner.is_empty()
}
} }
impl<I> QueryServersResponse<I> impl<I> QueryServersResponse<I>

395
query/src/main.rs

@ -7,10 +7,9 @@ use std::cmp;
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use std::fmt; use std::fmt;
use std::io; use std::io;
use std::net::{Ipv4Addr, SocketAddrV4, UdpSocket}; use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4, UdpSocket};
use std::process; use std::process;
use std::sync::{mpsc, Arc}; use std::sync::Arc;
use std::thread;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use serde::{Serialize, Serializer}; use serde::{Serialize, Serializer};
@ -50,34 +49,34 @@ enum ServerResultKind {
#[derive(Clone, Debug, Serialize)] #[derive(Clone, Debug, Serialize)]
struct ServerResult { struct ServerResult {
address: String, address: SocketAddrV4,
ping: f32, ping: f32,
#[serde(flatten)] #[serde(flatten)]
kind: ServerResultKind, kind: ServerResultKind,
} }
impl ServerResult { impl ServerResult {
fn new(address: String, ping: f32, kind: ServerResultKind) -> Self { fn new(address: SocketAddrV4, ping: f32, kind: ServerResultKind) -> Self {
Self { Self {
address: address.to_string(), address,
ping, ping,
kind, kind,
} }
} }
fn ok(address: String, ping: f32, info: ServerInfo) -> Self { fn ok(address: SocketAddrV4, ping: f32, info: ServerInfo) -> Self {
Self::new(address, ping, ServerResultKind::Ok { info }) Self::new(address, ping, ServerResultKind::Ok { info })
} }
fn timeout(address: String) -> Self { fn timeout(address: SocketAddrV4) -> Self {
Self::new(address, 0.0, ServerResultKind::Timeout) Self::new(address, 0.0, ServerResultKind::Timeout)
} }
fn protocol(address: String, ping: f32) -> Self { fn protocol(address: SocketAddrV4, ping: f32) -> Self {
Self::new(address, ping, ServerResultKind::Protocol) Self::new(address, ping, ServerResultKind::Protocol)
} }
fn error<T>(address: String, message: T) -> Self fn error<T>(address: SocketAddrV4, message: T) -> Self
where where
T: fmt::Display, T: fmt::Display,
{ {
@ -90,7 +89,7 @@ impl ServerResult {
) )
} }
fn invalid<T>(address: String, ping: f32, message: T, response: &[u8]) -> Self fn invalid<T>(address: SocketAddrV4, ping: f32, message: T, response: &[u8]) -> Self
where where
T: fmt::Display, T: fmt::Display,
{ {
@ -152,7 +151,7 @@ struct ListResult<'a> {
master_timeout: u32, master_timeout: u32,
masters: &'a [Box<str>], masters: &'a [Box<str>],
filter: &'a str, filter: &'a str,
servers: &'a [&'a str], servers: &'a [SocketAddrV4],
} }
fn serialize_colored<S>(s: &str, ser: S) -> Result<S::Ok, S::Error> fn serialize_colored<S>(s: &str, ser: S) -> Result<S::Ok, S::Error>
@ -203,159 +202,243 @@ impl fmt::Display for Colored<'_> {
} }
} }
enum Message { fn get_socket_addrs<'a>(iter: impl Iterator<Item = &'a str>) -> Result<Vec<SocketAddrV4>, Error> {
Servers(Vec<String>), use std::net::ToSocketAddrs;
ServerResult(ServerResult),
End, let mut out = Vec::with_capacity(iter.size_hint().0);
for i in iter {
match i
.to_socket_addrs()?
.find(|i| matches!(i, SocketAddr::V4(_)))
{
Some(SocketAddr::V4(addr)) => out.push(addr),
_ => eprintln!("warn: failed to resolve address for {}", i),
}
}
Ok(out)
}
struct ServerQuery {
start: Instant,
protocol: usize,
} }
fn cmp_address(a: &str, b: &str) -> cmp::Ordering { impl ServerQuery {
match (a.parse::<SocketAddrV4>(), b.parse::<SocketAddrV4>()) { fn ping(&self) -> f32 {
(Ok(a), Ok(b)) => a.cmp(&b), self.start.elapsed().as_micros() as f32 / 1000.0
_ => a.cmp(b),
} }
} }
fn query_servers( impl ServerQuery {
host: &str, fn new(protocol: usize) -> Self {
cli: &Cli, Self {
timeout: Duration, start: Instant::now(),
tx: &mpsc::Sender<Message>, protocol,
) -> Result<(), Error> {
let sock = UdpSocket::bind("0.0.0.0:0")?;
sock.connect(host)?;
let mut buf = [0; 512];
let p = game::QueryServers {
region: server::Region::RestOfTheWorld,
last: SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 0),
filter: cli.filter.as_str(),
};
let n = p.encode(&mut buf)?;
sock.send(&buf[..n])?;
let start_time = Instant::now();
while let Some(timeout) = timeout.checked_sub(start_time.elapsed()) {
sock.set_read_timeout(Some(timeout))?;
let n = match sock.recv(&mut buf) {
Ok(n) => n,
Err(e) => match e.kind() {
io::ErrorKind::AddrInUse | io::ErrorKind::WouldBlock => break,
_ => Err(e)?,
},
};
if let Ok(packet) = master::QueryServersResponse::decode(&buf[..n]) {
tx.send(Message::Servers(
packet.iter().map(|i| i.to_string()).collect(),
))
.unwrap();
} else {
eprintln!("Unexpected packet from master {}", host);
} }
} }
}
Ok(()) struct Scan<'a> {
cli: &'a Cli,
masters: Vec<SocketAddrV4>,
sock: UdpSocket,
} }
fn get_server_info( impl<'a> Scan<'a> {
addr: String, fn new(cli: &'a Cli) -> Result<Self, Error> {
versions: &[u8], Ok(Self {
timeout: Duration, cli,
) -> Result<ServerResult, Error> { masters: get_socket_addrs(cli.masters.iter().map(|i| i.as_ref()))?,
let sock = UdpSocket::bind("0.0.0.0:0")?; sock: UdpSocket::bind(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 0))?,
sock.connect(&addr)?; })
sock.set_read_timeout(Some(timeout))?; }
let mut ping = 0.0; fn is_master(&self, addr: &SocketAddrV4) -> bool {
for &i in versions { self.masters.iter().any(|i| i == addr)
let p = game::GetServerInfo::new(i); }
let mut buf = [0; 2048];
let n = p.encode(&mut buf)?; fn query_servers(&self) -> Result<(), Error> {
let start = Instant::now(); let mut buf = [0; 512];
sock.send(&buf[..n])?; let packet = game::QueryServers {
region: server::Region::RestOfTheWorld,
let n = match sock.recv(&mut buf) { last: SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 0),
Ok(n) => n, filter: self.cli.filter.as_str(),
Err(e) => match e.kind() {
io::ErrorKind::AddrInUse | io::ErrorKind::WouldBlock => {
return Ok(ServerResult::timeout(addr));
}
_ => Err(e)?,
},
}; };
ping = start.elapsed().as_micros() as f32 / 1000.0; let n = packet.encode(&mut buf)?;
let packet = &buf[..n];
let response = &buf[..n]; for i in &self.masters {
match server::GetServerInfoResponse::decode(response) { self.sock.send_to(packet, i)?;
Ok(packet) => {
let info = ServerInfo::from(packet);
return Ok(ServerResult::ok(addr, ping, info));
}
Err(ProtocolError::InvalidProtocolVersion) => {
// try another protocol version
}
Err(e) => {
return Ok(ServerResult::invalid(addr, ping, e, response));
}
} }
Ok(())
} }
Ok(ServerResult::protocol(addr, ping)) fn servers(&self) -> Result<HashSet<SocketAddrV4>, Error> {
} self.query_servers()?;
fn query_server_info(cli: &Arc<Cli>, servers: &[String]) -> Result<(), Error> { let mut set = HashSet::with_capacity(256);
let (tx, rx) = mpsc::channel(); let mut buf = [0; 2048];
let mut workers = 0; let timeout = Duration::from_secs(self.cli.master_timeout as u64);
let start_time = Instant::now();
if servers.is_empty() {
for i in cli.masters.iter() { while let Some(timeout) = timeout.checked_sub(start_time.elapsed()) {
let master = i.to_owned(); self.sock.set_read_timeout(Some(timeout))?;
let tx = tx.clone();
let timeout = Duration::from_secs(cli.master_timeout as u64); let (n, from) = match self.sock.recv_from(&mut buf) {
let cli = cli.clone(); Ok(x) => x,
thread::spawn(move || { Err(e) => match e.kind() {
if let Err(e) = query_servers(&master, &cli, timeout, &tx) { io::ErrorKind::AddrInUse => break,
eprintln!("master({}) error: {}", master, e); io::ErrorKind::WouldBlock => break,
_ => Err(e)?,
},
};
let from = match from {
SocketAddr::V4(x) => x,
_ => todo!(),
};
if self.is_master(&from) {
if let Ok(packet) = master::QueryServersResponse::decode(&buf[..n]) {
set.extend(packet.iter());
} else {
eprintln!("warn: invalid packet from master {}", from);
} }
tx.send(Message::End).unwrap(); }
});
workers += 1;
} }
} else {
tx.send(Message::Servers(servers.to_vec())).unwrap(); Ok(set)
} }
let mut servers = HashMap::new(); fn server_info(
while let Ok(msg) = rx.recv() { &self,
match msg { list: &[SocketAddrV4],
Message::Servers(list) => { ) -> Result<HashMap<SocketAddrV4, ServerResult>, Error> {
for address in list { let mut set = HashSet::new();
let tx = tx.clone(); let mut active = HashMap::new();
let timeout = Duration::from_secs(cli.server_timeout as u64); let mut out = HashMap::new();
let versions = cli.protocol.clone(); let mut buf = [0; 2048];
thread::spawn(move || {
let result = get_server_info(address.clone(), &versions, timeout) let now = Instant::now();
.unwrap_or_else(|e| ServerResult::error(address, e)); let master_timeout = Duration::from_secs(self.cli.master_timeout as u64);
tx.send(Message::ServerResult(result)).unwrap(); let server_timeout = Duration::from_secs(self.cli.server_timeout as u64);
tx.send(Message::End).unwrap(); let master_end = now + master_timeout;
}); let mut server_end = now + server_timeout;
workers += 1;
if list.is_empty() {
self.query_servers()?;
} else {
let mut buf = [0; 512];
let n = game::GetServerInfo::new(self.cli.protocol[0]).encode(&mut buf)?;
for addr in list.iter().filter(|i| set.insert(**i)) {
match self.sock.send_to(&buf[..n], addr) {
Ok(_) => {
let query = ServerQuery::new(0);
server_end = query.start + server_timeout;
active.insert(*addr, query);
}
Err(e) => {
let res = ServerResult::error(*addr, e);
out.insert(*addr, res);
}
} }
} }
Message::End => { }
workers -= 1;
if workers == 0 { loop {
break; let time = cmp::max(master_end, server_end);
} match time.checked_duration_since(Instant::now()) {
Some(t) => self.sock.set_read_timeout(Some(t))?,
None => break,
} }
Message::ServerResult(result) => {
servers.insert(result.address.clone(), result); let (n, from) = match self.sock.recv_from(&mut buf) {
Ok(x) => x,
Err(e) => match e.kind() {
io::ErrorKind::AddrInUse => break,
io::ErrorKind::WouldBlock => break,
_ => Err(e)?,
},
};
let from = match from {
SocketAddr::V4(x) => x,
_ => todo!(),
};
let raw = &buf[..n];
if self.is_master(&from) {
if let Ok(packet) = master::QueryServersResponse::decode(raw) {
for addr in packet.iter().filter(|i| set.insert(*i)) {
let mut buf = [0; 512];
let n = game::GetServerInfo::new(self.cli.protocol[0]).encode(&mut buf)?;
match self.sock.send_to(&buf[..n], addr) {
Ok(_) => {
let query = ServerQuery::new(0);
server_end = query.start + server_timeout;
active.insert(addr, query);
}
Err(e) => {
let res = ServerResult::error(addr, e);
out.insert(addr, res);
}
}
}
}
} else if let Some(query) = active.remove(&from) {
match server::GetServerInfoResponse::decode(raw) {
Ok(packet) => {
let info = ServerInfo::from(packet);
let res = ServerResult::ok(from, query.ping(), info);
out.insert(from, res);
}
Err(ProtocolError::InvalidProtocolVersion) => {
let next_protocol = query.protocol + 1;
if let Some(protocol) = self.cli.protocol.get(next_protocol) {
let mut buf = [0; 512];
let n = game::GetServerInfo::new(*protocol).encode(&mut buf)?;
match self.sock.send_to(&buf[..n], from) {
Ok(_) => {
active.insert(from, ServerQuery::new(next_protocol));
}
Err(e) => {
let res = ServerResult::error(from, e);
out.insert(from, res);
}
}
} else {
let res = ServerResult::protocol(from, query.ping());
out.insert(from, res);
}
}
Err(e) => {
let res = ServerResult::invalid(from, query.ping(), e, raw);
out.insert(from, res);
}
}
} }
} }
for (addr, _) in active {
let res = ServerResult::timeout(addr);
out.insert(addr, res);
}
Ok(out)
} }
}
fn query_server_info(cli: &Arc<Cli>, servers: &[String]) -> Result<(), Error> {
let scan = Scan::new(cli)?;
let servers = get_socket_addrs(servers.iter().map(|i| i.as_str()))?;
let servers = scan.server_info(&servers)?;
let mut servers: Vec<_> = servers.values().collect(); let mut servers: Vec<_> = servers.values().collect();
servers.sort_by(|a, b| cmp_address(&a.address, &b.address)); servers.sort_by(|a, b| a.address.cmp(&b.address));
if cli.json || cli.debug { if cli.json || cli.debug {
let result = InfoResult { let result = InfoResult {
@ -432,41 +515,9 @@ fn query_server_info(cli: &Arc<Cli>, servers: &[String]) -> Result<(), Error> {
} }
fn list_servers(cli: &Arc<Cli>) -> Result<(), Error> { fn list_servers(cli: &Arc<Cli>) -> Result<(), Error> {
let (tx, rx) = mpsc::channel(); let scan = Scan::new(cli)?;
let mut workers = 0; let mut servers: Vec<_> = scan.servers()?.into_iter().collect();
servers.sort();
for i in cli.masters.iter() {
let master = i.to_owned();
let tx = tx.clone();
let timeout = Duration::from_secs(cli.master_timeout as u64);
let cli = cli.clone();
thread::spawn(move || {
if let Err(e) = query_servers(&master, &cli, timeout, &tx) {
eprintln!("master({}) error: {}", master, e);
}
tx.send(Message::End).unwrap();
});
workers += 1;
}
let mut servers = HashSet::new();
while let Ok(msg) = rx.recv() {
match msg {
Message::Servers(list) => {
servers.extend(list);
}
Message::End => {
workers -= 1;
if workers == 0 {
break;
}
}
_ => panic!(),
}
}
let mut servers: Vec<_> = servers.iter().map(|i| i.as_str()).collect();
servers.sort_by(|a, b| cmp_address(a, b));
if cli.json || cli.debug { if cli.json || cli.debug {
let result = ListResult { let result = ListResult {

Loading…
Cancel
Save