mirror of https://github.com/wayvr-org/wayvr.git
wayvr: IPC: Fix dangling fds (#513)
This commit is contained in:
parent
ec2e0501c1
commit
cdd658d4ea
|
|
@ -42,35 +42,40 @@ pub fn send_packet(conn: &mut local_socket::Stream, data: &[u8]) -> anyhow::Resu
|
|||
Ok(())
|
||||
}
|
||||
|
||||
fn read_check(expected_size: u32, res: std::io::Result<usize>) -> bool {
|
||||
fn read_check(expected_size: u32, res: std::io::Result<usize>) -> anyhow::Result<bool> {
|
||||
match res {
|
||||
Ok(count) => {
|
||||
if count == 0 {
|
||||
return false;
|
||||
anyhow::bail!("End of stream");
|
||||
}
|
||||
if count as u32 == expected_size {
|
||||
true // read succeeded
|
||||
return Ok(true); // read succeeded
|
||||
} else {
|
||||
log::error!("count {count} is not {expected_size}");
|
||||
false
|
||||
return Ok(false);
|
||||
}
|
||||
}
|
||||
Err(_e) => {
|
||||
//log::error!("failed to get packet size: {}", e);
|
||||
false
|
||||
}
|
||||
Err(e) => match e.kind() {
|
||||
std::io::ErrorKind::WouldBlock => {
|
||||
// no incoming data (this socket is non-blocking), try again later
|
||||
return Ok(false);
|
||||
}
|
||||
_ => {
|
||||
anyhow::bail!("Connection error: {:?}", e);
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
type Payload = SmallVec<[u8; 64]>;
|
||||
|
||||
fn read_payload(conn: &mut local_socket::Stream, size: u32) -> Option<Payload> {
|
||||
fn read_payload(conn: &mut local_socket::Stream, size: u32) -> anyhow::Result<Option<Payload>> {
|
||||
let mut payload = Payload::new();
|
||||
payload.resize(size as usize, 0);
|
||||
if read_check(size, conn.read(&mut payload)) {
|
||||
Some(payload)
|
||||
if read_check(size, conn.read(&mut payload))? {
|
||||
Ok(Some(payload))
|
||||
} else {
|
||||
None
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -433,36 +438,33 @@ impl Connection {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
fn process_check_payload(&mut self, params: &mut TickParams, payload: Payload) -> bool {
|
||||
log::debug!("payload size {}", payload.len());
|
||||
|
||||
fn process_check_payload(
|
||||
&mut self,
|
||||
params: &mut TickParams,
|
||||
payload: Payload,
|
||||
) -> anyhow::Result<()> {
|
||||
if let Err(e) = self.process_payload(params, payload) {
|
||||
log::error!("Invalid payload from the client, closing connection: {e}");
|
||||
// send also error message directly to the client before disconnecting
|
||||
self.kill(format!("{e}").as_str());
|
||||
false
|
||||
} else {
|
||||
true
|
||||
anyhow::bail!("Invalid payload from the client, closing connection: {e}");
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn read_packet(&mut self, params: &mut TickParams) -> bool {
|
||||
fn read_packet(&mut self, params: &mut TickParams) -> anyhow::Result<bool> {
|
||||
if let Some(payload_size) = self.next_packet {
|
||||
let Some(payload) = read_payload(&mut self.conn, payload_size) else {
|
||||
// still failed to read payload, try in next tick
|
||||
return false;
|
||||
let Some(payload) = read_payload(&mut self.conn, payload_size)? else {
|
||||
// still failed to read payload, try in the next tick
|
||||
return Ok(false);
|
||||
};
|
||||
|
||||
if !self.process_check_payload(params, payload) {
|
||||
return false;
|
||||
}
|
||||
|
||||
self.process_check_payload(params, payload)?;
|
||||
self.next_packet = None;
|
||||
}
|
||||
|
||||
let mut buf_packet_header: [u8; 4] = [0; 4];
|
||||
if !read_check(4, self.conn.read(&mut buf_packet_header)) {
|
||||
return false;
|
||||
if !read_check(4, self.conn.read(&mut buf_packet_header))? {
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
let payload_size = u32::from_be_bytes(buf_packet_header[0..4].try_into().unwrap()); // 0-3 bytes (u32 size)
|
||||
|
|
@ -475,30 +477,43 @@ impl Connection {
|
|||
"Client sent a packet header with the size over {size_limit} bytes, closing connection."
|
||||
);
|
||||
self.kill("Too big packet received (over 128 KiB)");
|
||||
return false;
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
let Some(payload) = read_payload(&mut self.conn, payload_size) else {
|
||||
let Some(payload) = read_payload(&mut self.conn, payload_size)? else {
|
||||
// failed to read payload, try in next tick
|
||||
self.next_packet = Some(payload_size);
|
||||
return false;
|
||||
return Ok(false);
|
||||
};
|
||||
|
||||
if !self.process_check_payload(params, payload) {
|
||||
return false;
|
||||
}
|
||||
self.process_check_payload(params, payload)?;
|
||||
|
||||
true
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
fn tick(&mut self, params: &mut TickParams) {
|
||||
while self.read_packet(params) {}
|
||||
loop {
|
||||
match self.read_packet(params) {
|
||||
Ok(loop_further) => {
|
||||
if !loop_further {
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
log::debug!("Disconnecting client: {:?}", e);
|
||||
self.alive = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for Connection {
|
||||
fn drop(&mut self) {
|
||||
log::info!("Connection closed");
|
||||
if let Some(auth) = &self.auth {
|
||||
log::info!("IPC: Client \"{}\" disconnected.", auth.client_name);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue