archinstall/tests/qemu/testbase.py

73 lines
2.3 KiB
Python

import enum
import time
import threading
import select
import logging
import socket
logger = logging.getLogger("archtest")
class Keyboard(enum.Enum):
# https://vt100.net/docs/vt100-ug/chapter3.html
# https://espterm.github.io/docs/VT100%20escape%20codes.html
arrow_up = '\033[A'
arrow_down = '\033[B'
arrow_right = '\033[C'
arrow_left = '\033[D'
enter = '\015'
forward_slash = '\057'
escape = '\033'
class TestBase(threading.Thread):
def __init__(self, serial_monitor):
self.serial_monitor = serial_monitor
self.buffer = b''
async def _send_key(self, key):
pass # This is for the QMP socket if needed
def send_string(self, chars, delay=None):
logger.debug(f"Sending string: {chars.encode('UTF-8')}")
while len(select.select([self.serial_monitor.client_socket], [], [], 0.2)[0]):
# Waiting for read buffer to finish to not collide
time.sleep(0.02)
if len(select.select([], [self.serial_monitor.client_socket], [], 0.2)[1]):
# asyncio.run_coroutine_threadsafe(self._send_key(vt100_key), loop=self.serial_monitor.QMP.loop)
self.serial_monitor.client_socket.send(chars.encode('UTF-8'), socket.MSG_WAITALL) # flags=socket.MSG_WAITALL
# os.fsync(self.serial_monitor.client_socket.fileno())
else:
logger.error(f"Could not send key, serial was not in a write state!")
if delay:
time.sleep(delay)
def send_key(self, key, delay=None):
try:
vt100_key = Keyboard[key].value.encode('UTF-8')
except:
# logger.warning(f"Could not convert key: {key}")
vt100_key = key.encode('UTF-8')
logger.debug(f"Sending key: {key} \033[0;32m({vt100_key})\033[0m")
while len(select.select([self.serial_monitor.client_socket], [], [], 0.2)[0]):
# Waiting for read buffer to finish to not collide
time.sleep(0.02)
if len(select.select([], [self.serial_monitor.client_socket], [], 0.2)[1]):
# asyncio.run_coroutine_threadsafe(self._send_key(vt100_key), loop=self.serial_monitor.QMP.loop)
self.serial_monitor.client_socket.send(vt100_key, socket.MSG_WAITALL) # flags=socket.MSG_WAITALL
# os.fsync(self.serial_monitor.client_socket.fileno())
else:
logger.error(f"Could not send key, serial was not in a write state!")
if delay:
time.sleep(delay)
# Flushing the serial buffer, as it can quickly become frozen
# self.serial_monitor.client_socket.sendall(b"")
def feed(self, line):
self.buffer += line