diff options
Diffstat (limited to 'python/servo/devtools_tests.py')
-rw-r--r-- | python/servo/devtools_tests.py | 150 |
1 files changed, 150 insertions, 0 deletions
diff --git a/python/servo/devtools_tests.py b/python/servo/devtools_tests.py new file mode 100644 index 00000000000..c50fffb0ac1 --- /dev/null +++ b/python/servo/devtools_tests.py @@ -0,0 +1,150 @@ +# Copyright 2013 The Servo Project Developers. See the COPYRIGHT +# file at the top-level directory of this distribution. +# +# Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or +# http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +# <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your +# option. This file may not be copied, modified, or distributed +# except according to those terms. + +from concurrent.futures import Future +import logging +from geckordp.actors.root import RootActor +from geckordp.actors.descriptors.tab import TabActor +from geckordp.actors.watcher import WatcherActor +from geckordp.actors.resources import Resources +from geckordp.actors.events import Events +from geckordp.rdp_client import RDPClient +import http.server +import os.path +import socketserver +import subprocess +import time +from threading import Thread +from typing import Optional +import unittest + + +class DevtoolsTests(unittest.IsolatedAsyncioTestCase): + # /path/to/servo/python/servo + script_path = None + + def __init__(self, methodName="runTest"): + super().__init__(methodName) + self.servoshell = None + self.base_url = None + self.web_server = None + self.web_server_thread = None + + def test_sources_list(self): + self.run_servoshell(test_dir=os.path.join(DevtoolsTests.script_path, "devtools_tests/sources")) + self.assert_sources_list([f"{self.base_url}/classic.js", f"{self.base_url}/test.html", "https://servo.org/js/load-table.js"]) + + def test_sources_list_with_data_no_scripts(self): + self.run_servoshell(url="data:text/html,") + self.assert_sources_list([]) + + def test_sources_list_with_data_empty_inline_script(self): + self.run_servoshell(url="data:text/html,<script></script>") + self.assert_sources_list([]) + + def test_sources_list_with_data_inline_script(self): + self.run_servoshell(url="data:text/html,<script>;</script>") + self.assert_sources_list(["data:text/html,<script>;</script>"]) + + def run_servoshell(self, *, test_dir=None, url=None): + if test_dir is None: + test_dir = os.path.join(DevtoolsTests.script_path, "devtools_tests") + base_url = Future() + + class Handler(http.server.SimpleHTTPRequestHandler): + def __init__(self, *args, **kwargs): + super().__init__(*args, directory=test_dir, **kwargs) + + def log_message(self, format, *args): + # Uncomment this to log requests. + # return super().log_message(format, *args) + pass + + def server_thread(): + self.web_server = socketserver.TCPServer(("0.0.0.0", 0), Handler) + base_url.set_result(f"http://127.0.0.1:{self.web_server.server_address[1]}") + self.web_server.serve_forever() + + # Start a web server for the test. + self.web_server_thread = Thread(target=server_thread) + self.web_server_thread.start() + self.base_url = base_url.result(1) + + # Change this setting if you want to debug Servo. + os.environ["RUST_LOG"] = "error,devtools=warn" + + # Run servoshell. + if url is None: + url = f"{self.base_url}/test.html" + self.servoshell = subprocess.Popen(["target/release/servo", "--devtools=6080", url]) + + # FIXME: Don’t do this + time.sleep(1) + + def tearDown(self): + # Terminate servoshell. + self.servoshell.terminate() + + # Stop the web server. + self.web_server.shutdown() + self.web_server_thread.join() + + def assert_sources_list(self, expected_urls): + client = RDPClient() + client.connect("127.0.0.1", 6080) + root = RootActor(client) + tabs = root.list_tabs() + tab_dict = tabs[0] + tab = TabActor(client, tab_dict["actor"]) + watcher = tab.get_watcher() + watcher = WatcherActor(client, watcher["actor"]) + + target = Future() + + def on_target(data): + if data["target"]["browsingContextID"] == tab_dict["browsingContextID"]: + target.set_result(data["target"]) + + client.add_event_listener( + watcher.actor_id, Events.Watcher.TARGET_AVAILABLE_FORM, on_target, + ) + watcher.watch_targets(WatcherActor.Targets.FRAME) + + done = Future() + target = target.result(1) + + def on_source_resource(data): + for [resource_type, sources] in data["array"]: + try: + self.assertEqual(resource_type, "source") + self.assertEqual([source["url"] for source in sources], expected_urls) + done.set_result(None) + except Exception as e: + # Raising here does nothing, for some reason. + # Send the exception back so it can be raised. + done.set_result(e) + + client.add_event_listener( + target["actor"], + Events.Watcher.RESOURCES_AVAILABLE_ARRAY, + on_source_resource, + ) + watcher.watch_resources([Resources.SOURCE]) + + result: Optional[Exception] = done.result(1) + if result: + raise result + client.disconnect() + + +def run_tests(script_path): + DevtoolsTests.script_path = script_path + verbosity = 1 if logging.getLogger().level >= logging.WARN else 2 + suite = unittest.TestLoader().loadTestsFromTestCase(DevtoolsTests) + return unittest.TextTestRunner(verbosity=verbosity).run(suite).wasSuccessful() |