aboutsummaryrefslogtreecommitdiffstats
path: root/etc/devtools-tools/src
diff options
context:
space:
mode:
Diffstat (limited to 'etc/devtools-tools/src')
-rw-r--r--etc/devtools-tools/src/command/replay.rs201
-rw-r--r--etc/devtools-tools/src/main.rs268
-rw-r--r--etc/devtools-tools/src/protocol.rs84
3 files changed, 295 insertions, 258 deletions
diff --git a/etc/devtools-tools/src/command/replay.rs b/etc/devtools-tools/src/command/replay.rs
new file mode 100644
index 00000000000..18b841d454d
--- /dev/null
+++ b/etc/devtools-tools/src/command/replay.rs
@@ -0,0 +1,201 @@
+use std::{
+ collections::BTreeMap,
+ fs::File,
+ io::{BufReader, Read},
+ net::TcpListener,
+};
+
+use jane_eyre::eyre;
+use serde_json::Value;
+use serde_jsonlines::BufReadExt;
+use tracing::{debug, info, warn};
+
+use crate::protocol::{
+ DevtoolsRead, DevtoolsWrite, EmptyResponse, Message, ResourcesAvailableArray, WrappedMessage,
+};
+
+#[derive(clap::Args, Debug)]
+pub struct Replay {
+ json_path: String,
+}
+
+pub fn main(args: Replay) -> eyre::Result<()> {
+ // Allow records to be commented out by prefixing the line with `//`.
+ let mut file = String::default();
+ File::open(args.json_path)?.read_to_string(&mut file)?;
+ let file = file
+ .split_terminator('\n')
+ .filter(|x| !x.starts_with("//"))
+ .collect::<Vec<_>>()
+ .join("\n");
+ let file = BufReader::new(file.as_bytes());
+
+ // Collect messages by parsing and unwrapping records from `devtools_parser.py --json`.
+ let mut messages = vec![];
+ for line in file.json_lines::<WrappedMessage>() {
+ messages.push(line?.message);
+ }
+
+ let Some((initial_message, subsequent_messages)) = messages.split_first() else {
+ warn!("No initial message! Exiting");
+ return Ok(());
+ };
+
+ // Organise client requests into queues by actor.
+ #[derive(Debug, Eq, Ord, PartialEq, PartialOrd)]
+ struct Actor(String);
+ let mut requests: BTreeMap<Actor, Vec<&Message>> = BTreeMap::default();
+ for message in subsequent_messages.iter() {
+ if let client_message @ Message::Client { to, .. } = message {
+ requests
+ .entry(Actor(to.clone()))
+ .or_default()
+ .push(client_message);
+ }
+ }
+
+ // Match up server responses with client requests, organised into queues by actor and type.
+ #[derive(Debug, Eq, Ord, PartialEq, PartialOrd)]
+ struct Type(String);
+ #[derive(Debug, Eq, Ord, PartialEq, PartialOrd)]
+ struct ResourceType(String);
+ let mut transactions: BTreeMap<(Actor, Type), Vec<(&Message, &Message)>> = BTreeMap::default();
+ let mut spontaneous_messages: BTreeMap<Actor, Vec<&Message>> = BTreeMap::default();
+ let mut target_available_form_messages = vec![];
+ let mut frame_update_messages = vec![];
+ let mut resources_available: BTreeMap<ResourceType, BTreeMap<Actor, Vec<Value>>> =
+ BTreeMap::default();
+ for message in subsequent_messages.iter() {
+ if let server_message @ Message::Server { from, r#type, .. } = message {
+ // Server messages with types are probably spontaneous messages (often in request/reply/notify pattern),
+ // e.g. watcher resources-available-array, watcher target-available-form, thread newSource.
+ // Each of these messages need custom logic defining when they should be replayed, but this logic may not
+ // be perfect, so it’s always a *model* of the real behaviour.
+ // <https://firefox-source-docs.mozilla.org/devtools/backend/protocol.html#common-patterns-of-actor-communication>
+ match r#type.as_deref() {
+ Some("target-available-form") => {
+ // watcher target-available-form model: send all on watcher watchTargets
+ target_available_form_messages.push(message);
+ },
+ Some("frameUpdate") => {
+ // watcher frameUpdate model: send all on watcher watchTargets, after all watcher target-available-form
+ frame_update_messages.push(message);
+ },
+ Some("resources-available-array") => {
+ // watcher resources-available-array model: on watcher watchResources, send resources with the
+ // given resourceTypes, from their original actors
+ let message: ResourcesAvailableArray =
+ serde_json::from_str(&serde_json::to_string(message)?)?;
+ for (resource_type, resources) in message.array {
+ resources_available
+ .entry(ResourceType(resource_type))
+ .or_default()
+ .entry(Actor(from.clone()))
+ .or_default()
+ .extend(resources);
+ }
+ },
+ Some(r#type) => {
+ // TODO: figure out how to realistically replay other spontaneous messages at the right time
+ // (maybe the client will be completely deterministic, but if not, we need some way to anchor them)
+ spontaneous_messages
+ .entry(Actor(from.clone()))
+ .or_default()
+ .push(server_message);
+ warn!(%r#type, ?server_message, "Spontaneous");
+ },
+ None => {
+ let client_message = requests.entry(Actor(from.clone())).or_default().remove(0);
+ let Message::Client { to, r#type, .. } = client_message else {
+ unreachable!("Guaranteed by code populating it")
+ };
+ let Some(r#type) = r#type else {
+ panic!("Message from client has no type! {client_message:?}")
+ };
+ assert_eq!(to, from);
+ transactions
+ .entry((Actor(from.clone()), Type(r#type.clone())))
+ .or_default()
+ .push((client_message, server_message));
+ },
+ }
+ }
+ }
+
+ let listen_addr = "0.0.0.0:6080";
+ info!(%listen_addr, "Listening");
+ let (mut stream, remote_addr) = TcpListener::bind(listen_addr)?.accept()?;
+ info!(%remote_addr, "Accepted connection");
+ let mut reader = BufReader::new(stream.try_clone()?);
+ stream.write_json_packet(initial_message)?;
+ loop {
+ let message = reader.read_packet()?;
+ debug!(?message);
+ let Message::Client { to, r#type, rest } = &message else {
+ panic!("Not a client message: {message:?}")
+ };
+ let Some(r#type) = r#type else {
+ panic!("Message from client has no type! {message:?}")
+ };
+ let queue = transactions
+ .entry((Actor(to.clone()), Type(r#type.clone())))
+ .or_default();
+
+ let Some((expected_message, reply_message)) = (!queue.is_empty()).then(|| queue.remove(0))
+ else {
+ warn!(?message, "No reply found for message");
+ continue;
+ };
+
+ match &**r#type {
+ "updateConfiguration" => {
+ // thread-configuration updateConfiguration is sometimes sent in groups of non-deterministic order,
+ // such as one for shouldPauseOnDebuggerStatement, one for observeWasm and pauseWorkersUntilAttach, and
+ // one for ignoreCaughtExceptions and pauseOnExceptions. It looks like the response is always empty, so
+ // let’s just send an empty response without checking that the requests came in the correct order.
+ // TODO: this could make thread-configuration queues go out of sync, if there are other request types
+ stream.write_json_packet(&EmptyResponse { from: to.clone() })?;
+ continue;
+ },
+ _ => {},
+ }
+
+ // The key idea here is that for each actor and type, the client will send us the same requests.
+ assert_eq!(&message, expected_message);
+ stream.write_json_packet(reply_message)?;
+
+ match &**r#type {
+ "watchTargets" => {
+ for spontaneous_message in target_available_form_messages.drain(..) {
+ debug!(?spontaneous_message);
+ stream.write_json_packet(spontaneous_message)?;
+ }
+ for spontaneous_message in frame_update_messages.drain(..) {
+ debug!(?spontaneous_message);
+ stream.write_json_packet(spontaneous_message)?;
+ }
+ },
+ "watchResources" => {
+ let resource_types = rest.get("resourceTypes");
+ let resource_types = resource_types
+ .iter()
+ .flat_map(|x| x.as_array())
+ .flat_map(|x| x.iter())
+ .flat_map(|x| x.as_str());
+ for resource_type in resource_types {
+ let resources = resources_available
+ .entry(ResourceType(resource_type.to_owned()))
+ .or_default();
+ for (actor, resources) in resources.iter() {
+ stream.write_json_packet(&ResourcesAvailableArray {
+ from: actor.0.clone(),
+ r#type: "resources-available-array".to_owned(),
+ array: vec![(resource_type.to_owned(), resources.clone())],
+ })?;
+ }
+ }
+ },
+ _ => {},
+ }
+ }
+}
diff --git a/etc/devtools-tools/src/main.rs b/etc/devtools-tools/src/main.rs
index 593333a796c..49c6a4dd535 100644
--- a/etc/devtools-tools/src/main.rs
+++ b/etc/devtools-tools/src/main.rs
@@ -1,56 +1,18 @@
+mod command {
+ pub mod replay;
+}
+
+mod protocol;
+
use core::str;
-use std::{
- collections::BTreeMap,
- fs::File,
- io::{BufRead, BufReader, Read, Write},
- net::{TcpListener, TcpStream},
-};
use clap::Parser;
use jane_eyre::eyre;
-use serde::{Deserialize, Serialize};
-use serde_json::Value;
-use serde_jsonlines::BufReadExt;
-use tracing::{debug, info, warn};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
#[derive(clap::Parser, Debug)]
-struct Args {
- json_path: String,
-}
-
-#[derive(Debug, Deserialize, Serialize)]
-struct WrappedMessage {
- message: Message,
-}
-
-#[derive(Debug, Deserialize, PartialEq, Serialize)]
-#[serde(untagged)]
-enum Message {
- Server {
- from: String,
- r#type: Option<String>,
- #[serde(flatten)]
- rest: BTreeMap<String, Value>,
- },
- Client {
- to: String,
- r#type: Option<String>,
- #[serde(flatten)]
- rest: BTreeMap<String, Value>,
- },
-}
-
-#[derive(Debug, Deserialize, Serialize)]
-struct ResourcesAvailableArray {
- from: String,
- r#type: String,
- array: Vec<(String, Vec<Value>)>,
-}
-
-#[derive(Debug, Deserialize, Serialize)]
-struct EmptyResponse {
- from: String,
+enum Args {
+ Replay(crate::command::replay::Replay),
}
fn main() -> eyre::Result<()> {
@@ -65,217 +27,7 @@ fn main() -> eyre::Result<()> {
.init();
let args = Args::parse();
- // Allow records to be commented out by prefixing the line with `//`.
- let mut file = String::default();
- File::open(args.json_path)?.read_to_string(&mut file)?;
- let file = file
- .split_terminator('\n')
- .filter(|x| !x.starts_with("//"))
- .collect::<Vec<_>>()
- .join("\n");
- let file = BufReader::new(file.as_bytes());
-
- // Collect messages by parsing and unwrapping records from `devtools_parser.py --json`.
- let mut messages = vec![];
- for line in file.json_lines::<WrappedMessage>() {
- messages.push(line?.message);
- }
-
- let Some((initial_message, subsequent_messages)) = messages.split_first() else {
- warn!("No initial message! Exiting");
- return Ok(());
- };
-
- // Organise client requests into queues by actor.
- #[derive(Debug, Eq, Ord, PartialEq, PartialOrd)]
- struct Actor(String);
- let mut requests: BTreeMap<Actor, Vec<&Message>> = BTreeMap::default();
- for message in subsequent_messages.iter() {
- if let client_message @ Message::Client { to, .. } = message {
- requests
- .entry(Actor(to.clone()))
- .or_default()
- .push(client_message);
- }
- }
-
- // Match up server responses with client requests, organised into queues by actor and type.
- #[derive(Debug, Eq, Ord, PartialEq, PartialOrd)]
- struct Type(String);
- #[derive(Debug, Eq, Ord, PartialEq, PartialOrd)]
- struct ResourceType(String);
- let mut transactions: BTreeMap<(Actor, Type), Vec<(&Message, &Message)>> = BTreeMap::default();
- let mut spontaneous_messages: BTreeMap<Actor, Vec<&Message>> = BTreeMap::default();
- let mut target_available_form_messages = vec![];
- let mut frame_update_messages = vec![];
- let mut resources_available: BTreeMap<ResourceType, BTreeMap<Actor, Vec<Value>>> =
- BTreeMap::default();
- for message in subsequent_messages.iter() {
- if let server_message @ Message::Server { from, r#type, .. } = message {
- // Server messages with types are probably spontaneous messages (often in request/reply/notify pattern),
- // e.g. watcher resources-available-array, watcher target-available-form, thread newSource.
- // Each of these messages need custom logic defining when they should be replayed, but this logic may not
- // be perfect, so it’s always a *model* of the real behaviour.
- // <https://firefox-source-docs.mozilla.org/devtools/backend/protocol.html#common-patterns-of-actor-communication>
- match r#type.as_deref() {
- Some("target-available-form") => {
- // watcher target-available-form model: send all on watcher watchTargets
- target_available_form_messages.push(message);
- },
- Some("frameUpdate") => {
- // watcher frameUpdate model: send all on watcher watchTargets, after all watcher target-available-form
- frame_update_messages.push(message);
- },
- Some("resources-available-array") => {
- // watcher resources-available-array model: on watcher watchResources, send resources with the
- // given resourceTypes, from their original actors
- let message: ResourcesAvailableArray =
- serde_json::from_str(&serde_json::to_string(message)?)?;
- for (resource_type, resources) in message.array {
- resources_available
- .entry(ResourceType(resource_type))
- .or_default()
- .entry(Actor(from.clone()))
- .or_default()
- .extend(resources);
- }
- },
- Some(r#type) => {
- // TODO: figure out how to realistically replay other spontaneous messages at the right time
- // (maybe the client will be completely deterministic, but if not, we need some way to anchor them)
- spontaneous_messages
- .entry(Actor(from.clone()))
- .or_default()
- .push(server_message);
- warn!(%r#type, ?server_message, "Spontaneous");
- },
- None => {
- let client_message = requests.entry(Actor(from.clone())).or_default().remove(0);
- let Message::Client { to, r#type, .. } = client_message else {
- unreachable!("Guaranteed by code populating it")
- };
- let Some(r#type) = r#type else {
- panic!("Message from client has no type! {client_message:?}")
- };
- assert_eq!(to, from);
- transactions
- .entry((Actor(from.clone()), Type(r#type.clone())))
- .or_default()
- .push((client_message, server_message));
- },
- }
- }
- }
-
- let listen_addr = "0.0.0.0:6080";
- info!(%listen_addr, "Listening");
- let (mut stream, remote_addr) = TcpListener::bind(listen_addr)?.accept()?;
- info!(%remote_addr, "Accepted connection");
- let mut reader = BufReader::new(stream.try_clone()?);
- stream.write_json_packet(initial_message)?;
- loop {
- let message = reader.read_packet()?;
- debug!(?message);
- let Message::Client { to, r#type, rest } = &message else {
- panic!("Not a client message: {message:?}")
- };
- let Some(r#type) = r#type else {
- panic!("Message from client has no type! {message:?}")
- };
- let queue = transactions
- .entry((Actor(to.clone()), Type(r#type.clone())))
- .or_default();
-
- let Some((expected_message, reply_message)) = (!queue.is_empty()).then(|| queue.remove(0))
- else {
- warn!(?message, "No reply found for message");
- continue;
- };
-
- match &**r#type {
- "updateConfiguration" => {
- // thread-configuration updateConfiguration is sometimes sent in groups of non-deterministic order,
- // such as one for shouldPauseOnDebuggerStatement, one for observeWasm and pauseWorkersUntilAttach, and
- // one for ignoreCaughtExceptions and pauseOnExceptions. It looks like the response is always empty, so
- // let’s just send an empty response without checking that the requests came in the correct order.
- // TODO: this could make thread-configuration queues go out of sync, if there are other request types
- stream.write_json_packet(&EmptyResponse { from: to.clone() })?;
- continue;
- },
- _ => {},
- }
-
- // The key idea here is that for each actor and type, the client will send us the same requests.
- assert_eq!(&message, expected_message);
- stream.write_json_packet(reply_message)?;
-
- match &**r#type {
- "watchTargets" => {
- for spontaneous_message in target_available_form_messages.drain(..) {
- debug!(?spontaneous_message);
- stream.write_json_packet(spontaneous_message)?;
- }
- for spontaneous_message in frame_update_messages.drain(..) {
- debug!(?spontaneous_message);
- stream.write_json_packet(spontaneous_message)?;
- }
- },
- "watchResources" => {
- let resource_types = rest.get("resourceTypes");
- let resource_types = resource_types
- .iter()
- .flat_map(|x| x.as_array())
- .flat_map(|x| x.iter())
- .flat_map(|x| x.as_str());
- for resource_type in resource_types {
- let resources = resources_available
- .entry(ResourceType(resource_type.to_owned()))
- .or_default();
- for (actor, resources) in resources.iter() {
- stream.write_json_packet(&ResourcesAvailableArray {
- from: actor.0.clone(),
- r#type: "resources-available-array".to_owned(),
- array: vec![(resource_type.to_owned(), resources.clone())],
- })?;
- }
- }
- },
- _ => {},
- }
- }
-}
-
-// <https://firefox-source-docs.mozilla.org/devtools/backend/protocol.html#id1>
-trait DevtoolsWrite {
- /// <https://firefox-source-docs.mozilla.org/devtools/backend/protocol.html#json-packets>
- fn write_json_packet<T: Serialize>(&mut self, message: &T) -> eyre::Result<()>;
-}
-trait DevtoolsRead {
- fn read_packet(&mut self) -> eyre::Result<Message>;
-}
-
-impl DevtoolsWrite for TcpStream {
- fn write_json_packet<T: Serialize>(&mut self, message: &T) -> eyre::Result<()> {
- let result = serde_json::to_string(message)?;
- let result = format!("{}:{}", result.len(), result);
- self.write_all(result.as_bytes())?;
- Ok(())
- }
-}
-impl DevtoolsRead for BufReader<TcpStream> {
- fn read_packet(&mut self) -> eyre::Result<Message> {
- let mut prefix = vec![];
- self.read_until(b':', &mut prefix)?;
- let Some(prefix) = prefix.strip_suffix(b":") else {
- panic!("Unexpected EOF")
- };
- let prefix = str::from_utf8(&prefix)?;
-
- // TODO: implement bulk packets
- let len = prefix.parse::<usize>()?;
- let mut result = vec![0u8; len];
- self.read_exact(&mut result)?;
-
- Ok(serde_json::from_slice(&result)?)
+ match args {
+ Args::Replay(args) => crate::command::replay::main(args),
}
}
diff --git a/etc/devtools-tools/src/protocol.rs b/etc/devtools-tools/src/protocol.rs
new file mode 100644
index 00000000000..f22cb116e25
--- /dev/null
+++ b/etc/devtools-tools/src/protocol.rs
@@ -0,0 +1,84 @@
+use std::{
+ collections::BTreeMap,
+ io::{BufRead, BufReader, Read, Write},
+ net::TcpStream,
+ str,
+};
+
+use jane_eyre::eyre;
+use serde::{Deserialize, Serialize};
+use serde_json::Value;
+
+// <https://firefox-source-docs.mozilla.org/devtools/backend/protocol.html#id1>
+pub trait DevtoolsWrite {
+ /// <https://firefox-source-docs.mozilla.org/devtools/backend/protocol.html#json-packets>
+ fn write_json_packet<T: Serialize>(&mut self, message: &T) -> eyre::Result<()>;
+}
+
+// <https://firefox-source-docs.mozilla.org/devtools/backend/protocol.html#id1>
+pub trait DevtoolsRead {
+ /// <https://firefox-source-docs.mozilla.org/devtools/backend/protocol.html#json-packets>
+ fn read_packet(&mut self) -> eyre::Result<Message>;
+}
+
+impl DevtoolsWrite for TcpStream {
+ fn write_json_packet<T: Serialize>(&mut self, message: &T) -> eyre::Result<()> {
+ let result = serde_json::to_string(message)?;
+ let result = format!("{}:{}", result.len(), result);
+ self.write_all(result.as_bytes())?;
+ Ok(())
+ }
+}
+
+impl DevtoolsRead for BufReader<TcpStream> {
+ fn read_packet(&mut self) -> eyre::Result<Message> {
+ let mut prefix = vec![];
+ self.read_until(b':', &mut prefix)?;
+ let Some(prefix) = prefix.strip_suffix(b":") else {
+ panic!("Unexpected EOF")
+ };
+ let prefix = str::from_utf8(&prefix)?;
+
+ // TODO: implement bulk packets
+ let len = prefix.parse::<usize>()?;
+ let mut result = vec![0u8; len];
+ self.read_exact(&mut result)?;
+
+ Ok(serde_json::from_slice(&result)?)
+ }
+}
+
+/// Message wrapped in an object with a `_to` or `_from` field, as generated by `devtools_parser.py --json`.
+#[derive(Debug, Deserialize, Serialize)]
+pub struct WrappedMessage {
+ pub message: Message,
+}
+
+#[derive(Debug, Deserialize, PartialEq, Serialize)]
+#[serde(untagged)]
+pub enum Message {
+ Server {
+ from: String,
+ r#type: Option<String>,
+ #[serde(flatten)]
+ rest: BTreeMap<String, Value>,
+ },
+ Client {
+ to: String,
+ r#type: Option<String>,
+ #[serde(flatten)]
+ rest: BTreeMap<String, Value>,
+ },
+}
+
+#[derive(Debug, Deserialize, Serialize)]
+pub struct ResourcesAvailableArray {
+ pub from: String,
+ pub r#type: String,
+ pub array: Vec<(String, Vec<Value>)>,
+}
+
+#[derive(Debug, Deserialize, Serialize)]
+pub struct EmptyResponse {
+ pub from: String,
+}