summaryrefslogtreecommitdiff
path: root/src/devices/stream_device.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/devices/stream_device.rs')
-rw-r--r--src/devices/stream_device.rs235
1 files changed, 235 insertions, 0 deletions
diff --git a/src/devices/stream_device.rs b/src/devices/stream_device.rs
new file mode 100644
index 0000000..e44ffb8
--- /dev/null
+++ b/src/devices/stream_device.rs
@@ -0,0 +1,235 @@
+use crate::*;
+
+use std::collections::VecDeque;
+use std::io::{BufRead, Stdout, Write};
+use std::sync::mpsc::{self, TryRecvError};
+
+
+pub struct StreamDevice {
+ /// True if a source is connected to stdin.
+ stdin_connected: bool,
+ /// True if a transmission is in progress.
+ stdin_control: bool,
+ stdin_rx: mpsc::Receiver<Vec<u8>>,
+ /// Bytes received in the current transmission.
+ stdin_queue: VecDeque<u8>,
+ /// Bytes received since stdin end-of-transmission.
+ stdin_excess: VecDeque<u8>,
+
+ stdout: Stdout,
+ /// True if a sink is connected to stdout.
+ stdout_connected: bool,
+
+ /// True if stdin is transmission-encoded.
+ decode_stdin: bool,
+ /// True if stdout should be transmission-encoded.
+ encode_stdout: bool,
+ /// Half-byte buffer for decoding stdin.
+ decode_buffer: Option<u8>,
+
+ wake: bool,
+}
+
+
+impl Device for StreamDevice {
+ fn read(&mut self, port: u8) -> u8 {
+ match port {
+ 0x0 => read_b!(self.stdin_connected),
+ 0x1 => read_b!(self.stdout_connected),
+ 0x2 => read_b!(self.stdin_control),
+ 0x3 => 0xff,
+ 0x4 => self.stdin_length(),
+ 0x5 => 0xff,
+ 0x6 => self.stdin_read(),
+ 0x7 => self.stdin_read(),
+ 0x8 => todo!(),
+ 0x9 => todo!(),
+ 0xa => todo!(),
+ 0xb => todo!(),
+ 0xc => todo!(),
+ 0xd => todo!(),
+ 0xe => todo!(),
+ 0xf => todo!(),
+ _ => unreachable!(),
+ }
+ }
+
+ fn write(&mut self, port: u8, value: u8) -> Option<Signal> {
+ match port {
+ 0x0 => (),
+ 0x1 => (),
+ 0x2 => self.stdin_enable(),
+ 0x3 => self.stdout_disable(),
+ 0x4 => (),
+ 0x5 => (),
+ 0x6 => self.stdout_write(value),
+ 0x7 => self.stdout_write(value),
+ 0x8 => todo!(),
+ 0x9 => todo!(),
+ 0xa => todo!(),
+ 0xb => todo!(),
+ 0xc => todo!(),
+ 0xd => todo!(),
+ 0xe => todo!(),
+ 0xf => todo!(),
+ _ => unreachable!(),
+ };
+ return None;
+ }
+
+ fn wake(&mut self) -> bool {
+ self.fetch_stdin_data();
+ std::mem::take(&mut self.wake)
+ }
+}
+
+
+impl StreamDevice {
+ pub fn new(config: &EmulatorConfig) -> Self {
+ // Spawn a thread to enable non-blocking reads of stdin.
+ let (stdin_tx, stdin_rx) = std::sync::mpsc::channel();
+ std::thread::spawn(move || loop {
+ let mut stdin = std::io::stdin().lock();
+ match stdin.fill_buf() {
+ Ok(buf) if !buf.is_empty() => {
+ let length = buf.len();
+ stdin_tx.send(buf.to_vec()).unwrap();
+ stdin.consume(length);
+ }
+ _ => break,
+ };
+ });
+
+ Self {
+ stdin_connected: true,
+ stdin_control: false,
+ stdin_rx,
+ stdin_queue: VecDeque::new(),
+ stdin_excess: VecDeque::new(),
+
+ stdout: std::io::stdout(),
+ stdout_connected: true,
+
+ decode_stdin: config.decode_stdin,
+ encode_stdout: config.encode_stdout,
+ decode_buffer: None,
+
+ wake: true,
+ }
+ }
+
+ pub fn stdin_length(&mut self) -> u8 {
+ self.fetch_stdin_data();
+ self.stdin_queue.len().try_into().unwrap_or(u8::MAX)
+ }
+
+ pub fn stdin_enable(&mut self) {
+ self.stdin_control = true;
+ }
+
+ pub fn stdin_read(&mut self) -> u8 {
+ self.fetch_stdin_data();
+ self.stdin_queue.pop_front().unwrap_or(0)
+ }
+
+ pub fn stdout_write(&mut self, value: u8) {
+ macro_rules! hex {
+ ($value:expr) => { match $value {
+ 0x0..=0x9 => $value + b'0',
+ 0xa..=0xf => $value - 0x0a + b'a',
+ _ => unreachable!("Cannot encode value as hex digit: 0x{:02x}", $value),
+ } };
+ }
+ if self.encode_stdout {
+ let encoded = [hex!(value >> 4), hex!(value & 0xf), b' '];
+ self.stdout_write_raw(&encoded);
+ } else {
+ self.stdout_write_raw(&[value]);
+ };
+ }
+
+ fn stdout_write_raw(&mut self, bytes: &[u8]) {
+ if let Err(_) = self.stdout.write_all(bytes) {
+ if self.stdout_connected {
+ self.stdout_connected = false;
+ self.wake = true; // wake because stdout was disconnected.
+ }
+ }
+ }
+
+ pub fn stdout_disable(&mut self) {
+ if self.encode_stdout {
+ self.stdout_write_raw(&[b'\n']);
+ }
+ }
+
+ /// Fetch all pending data from stdin.
+ pub fn fetch_stdin_data(&mut self) {
+ while self.stdin_control {
+ match self.stdin_excess.pop_front() {
+ Some(byte) => self.fetch_byte(byte),
+ None => break,
+ }
+ }
+ loop {
+ match self.stdin_rx.try_recv() {
+ Ok(tx) => {
+ for byte in tx {
+ match self.stdin_control {
+ true => self.fetch_byte(byte),
+ false => self.stdin_excess.push_back(byte),
+ }
+ }
+ }
+ Err(TryRecvError::Empty) => {
+ break;
+ }
+ Err(TryRecvError::Disconnected) => {
+ self.stdin_control = false;
+ if self.stdin_connected {
+ self.stdin_connected = false;
+ self.wake = true; // wake because stdin was disconnected.
+ }
+ break;
+ }
+ }
+ }
+ }
+
+ fn fetch_byte(&mut self, byte: u8) {
+ if self.decode_stdin {
+ let decoded = match byte {
+ b'0'..=b'9' => byte - b'0',
+ b'a'..=b'f' => byte - b'a' + 0x0a,
+ b'A'..=b'F' => byte - b'A' + 0x0a,
+ b'\n' => {
+ self.decode_buffer = None;
+ self.stdin_control = false;
+ self.wake = true; // wake because a transmission ended.
+ return;
+ },
+ _ => return,
+ };
+ if let Some(high) = std::mem::take(&mut self.decode_buffer) {
+ self.stdin_queue.push_back((high << 4) | decoded);
+ self.wake = true; // wake because a byte was received.
+ } else {
+ self.decode_buffer = Some(decoded);
+ }
+ } else {
+ self.stdin_queue.push_back(byte);
+ self.wake = true; // wake because a byte was received.
+ }
+ }
+
+ pub fn flush(&mut self) {
+ let _ = self.stdout.flush();
+ }
+}
+
+
+impl Drop for StreamDevice {
+ fn drop(&mut self) {
+ self.flush();
+ }
+}