diff options
Diffstat (limited to 'src/devices/stream_device.rs')
-rw-r--r-- | src/devices/stream_device.rs | 235 |
1 files changed, 235 insertions, 0 deletions
diff --git a/src/devices/stream_device.rs b/src/devices/stream_device.rs new file mode 100644 index 0000000..e44ffb8 --- /dev/null +++ b/src/devices/stream_device.rs @@ -0,0 +1,235 @@ +use crate::*; + +use std::collections::VecDeque; +use std::io::{BufRead, Stdout, Write}; +use std::sync::mpsc::{self, TryRecvError}; + + +pub struct StreamDevice { + /// True if a source is connected to stdin. + stdin_connected: bool, + /// True if a transmission is in progress. + stdin_control: bool, + stdin_rx: mpsc::Receiver<Vec<u8>>, + /// Bytes received in the current transmission. + stdin_queue: VecDeque<u8>, + /// Bytes received since stdin end-of-transmission. + stdin_excess: VecDeque<u8>, + + stdout: Stdout, + /// True if a sink is connected to stdout. + stdout_connected: bool, + + /// True if stdin is transmission-encoded. + decode_stdin: bool, + /// True if stdout should be transmission-encoded. + encode_stdout: bool, + /// Half-byte buffer for decoding stdin. + decode_buffer: Option<u8>, + + wake: bool, +} + + +impl Device for StreamDevice { + fn read(&mut self, port: u8) -> u8 { + match port { + 0x0 => read_b!(self.stdin_connected), + 0x1 => read_b!(self.stdout_connected), + 0x2 => read_b!(self.stdin_control), + 0x3 => 0xff, + 0x4 => self.stdin_length(), + 0x5 => 0xff, + 0x6 => self.stdin_read(), + 0x7 => self.stdin_read(), + 0x8 => todo!(), + 0x9 => todo!(), + 0xa => todo!(), + 0xb => todo!(), + 0xc => todo!(), + 0xd => todo!(), + 0xe => todo!(), + 0xf => todo!(), + _ => unreachable!(), + } + } + + fn write(&mut self, port: u8, value: u8) -> Option<Signal> { + match port { + 0x0 => (), + 0x1 => (), + 0x2 => self.stdin_enable(), + 0x3 => self.stdout_disable(), + 0x4 => (), + 0x5 => (), + 0x6 => self.stdout_write(value), + 0x7 => self.stdout_write(value), + 0x8 => todo!(), + 0x9 => todo!(), + 0xa => todo!(), + 0xb => todo!(), + 0xc => todo!(), + 0xd => todo!(), + 0xe => todo!(), + 0xf => todo!(), + _ => unreachable!(), + }; + return None; + } + + fn wake(&mut self) -> bool { + self.fetch_stdin_data(); + std::mem::take(&mut self.wake) + } +} + + +impl StreamDevice { + pub fn new(config: &EmulatorConfig) -> Self { + // Spawn a thread to enable non-blocking reads of stdin. + let (stdin_tx, stdin_rx) = std::sync::mpsc::channel(); + std::thread::spawn(move || loop { + let mut stdin = std::io::stdin().lock(); + match stdin.fill_buf() { + Ok(buf) if !buf.is_empty() => { + let length = buf.len(); + stdin_tx.send(buf.to_vec()).unwrap(); + stdin.consume(length); + } + _ => break, + }; + }); + + Self { + stdin_connected: true, + stdin_control: false, + stdin_rx, + stdin_queue: VecDeque::new(), + stdin_excess: VecDeque::new(), + + stdout: std::io::stdout(), + stdout_connected: true, + + decode_stdin: config.decode_stdin, + encode_stdout: config.encode_stdout, + decode_buffer: None, + + wake: true, + } + } + + pub fn stdin_length(&mut self) -> u8 { + self.fetch_stdin_data(); + self.stdin_queue.len().try_into().unwrap_or(u8::MAX) + } + + pub fn stdin_enable(&mut self) { + self.stdin_control = true; + } + + pub fn stdin_read(&mut self) -> u8 { + self.fetch_stdin_data(); + self.stdin_queue.pop_front().unwrap_or(0) + } + + pub fn stdout_write(&mut self, value: u8) { + macro_rules! hex { + ($value:expr) => { match $value { + 0x0..=0x9 => $value + b'0', + 0xa..=0xf => $value - 0x0a + b'a', + _ => unreachable!("Cannot encode value as hex digit: 0x{:02x}", $value), + } }; + } + if self.encode_stdout { + let encoded = [hex!(value >> 4), hex!(value & 0xf), b' ']; + self.stdout_write_raw(&encoded); + } else { + self.stdout_write_raw(&[value]); + }; + } + + fn stdout_write_raw(&mut self, bytes: &[u8]) { + if let Err(_) = self.stdout.write_all(bytes) { + if self.stdout_connected { + self.stdout_connected = false; + self.wake = true; // wake because stdout was disconnected. + } + } + } + + pub fn stdout_disable(&mut self) { + if self.encode_stdout { + self.stdout_write_raw(&[b'\n']); + } + } + + /// Fetch all pending data from stdin. + pub fn fetch_stdin_data(&mut self) { + while self.stdin_control { + match self.stdin_excess.pop_front() { + Some(byte) => self.fetch_byte(byte), + None => break, + } + } + loop { + match self.stdin_rx.try_recv() { + Ok(tx) => { + for byte in tx { + match self.stdin_control { + true => self.fetch_byte(byte), + false => self.stdin_excess.push_back(byte), + } + } + } + Err(TryRecvError::Empty) => { + break; + } + Err(TryRecvError::Disconnected) => { + self.stdin_control = false; + if self.stdin_connected { + self.stdin_connected = false; + self.wake = true; // wake because stdin was disconnected. + } + break; + } + } + } + } + + fn fetch_byte(&mut self, byte: u8) { + if self.decode_stdin { + let decoded = match byte { + b'0'..=b'9' => byte - b'0', + b'a'..=b'f' => byte - b'a' + 0x0a, + b'A'..=b'F' => byte - b'A' + 0x0a, + b'\n' => { + self.decode_buffer = None; + self.stdin_control = false; + self.wake = true; // wake because a transmission ended. + return; + }, + _ => return, + }; + if let Some(high) = std::mem::take(&mut self.decode_buffer) { + self.stdin_queue.push_back((high << 4) | decoded); + self.wake = true; // wake because a byte was received. + } else { + self.decode_buffer = Some(decoded); + } + } else { + self.stdin_queue.push_back(byte); + self.wake = true; // wake because a byte was received. + } + } + + pub fn flush(&mut self) { + let _ = self.stdout.flush(); + } +} + + +impl Drop for StreamDevice { + fn drop(&mut self) { + self.flush(); + } +} |