1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
|
# Copyright 2013 The Servo Project Developers. See the COPYRIGHT
# file at the top-level directory of this distribution.
#
# Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
# http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
# <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
# option. This file may not be copied, modified, or distributed
# except according to those terms.
from concurrent.futures import Future
import logging
from geckordp.actors.root import RootActor
from geckordp.actors.descriptors.tab import TabActor
from geckordp.actors.watcher import WatcherActor
from geckordp.actors.resources import Resources
from geckordp.actors.events import Events
from geckordp.rdp_client import RDPClient
import http.server
import os.path
import socketserver
import subprocess
import time
from threading import Thread
from typing import Optional
import unittest
class DevtoolsTests(unittest.IsolatedAsyncioTestCase):
# /path/to/servo/python/servo
script_path = None
def __init__(self, methodName="runTest"):
super().__init__(methodName)
self.servoshell = None
self.base_url = None
self.web_server = None
self.web_server_thread = None
def test_sources_list(self):
self.run_servoshell(test_dir=os.path.join(DevtoolsTests.script_path, "devtools_tests/sources"))
self.assert_sources_list([f"{self.base_url}/classic.js", f"{self.base_url}/test.html", "https://servo.org/js/load-table.js"])
def test_sources_list_with_data_no_scripts(self):
self.run_servoshell(url="data:text/html,")
self.assert_sources_list([])
def test_sources_list_with_data_empty_inline_script(self):
self.run_servoshell(url="data:text/html,<script></script>")
self.assert_sources_list([])
def test_sources_list_with_data_inline_script(self):
self.run_servoshell(url="data:text/html,<script>;</script>")
self.assert_sources_list(["data:text/html,<script>;</script>"])
def run_servoshell(self, *, test_dir=None, url=None):
if test_dir is None:
test_dir = os.path.join(DevtoolsTests.script_path, "devtools_tests")
base_url = Future()
class Handler(http.server.SimpleHTTPRequestHandler):
def __init__(self, *args, **kwargs):
super().__init__(*args, directory=test_dir, **kwargs)
def log_message(self, format, *args):
# Uncomment this to log requests.
# return super().log_message(format, *args)
pass
def server_thread():
self.web_server = socketserver.TCPServer(("0.0.0.0", 0), Handler)
base_url.set_result(f"http://127.0.0.1:{self.web_server.server_address[1]}")
self.web_server.serve_forever()
# Start a web server for the test.
self.web_server_thread = Thread(target=server_thread)
self.web_server_thread.start()
self.base_url = base_url.result(1)
# Change this setting if you want to debug Servo.
os.environ["RUST_LOG"] = "error,devtools=warn"
# Run servoshell.
if url is None:
url = f"{self.base_url}/test.html"
self.servoshell = subprocess.Popen(["target/release/servo", "--devtools=6080", url])
# FIXME: Don’t do this
time.sleep(1)
def tearDown(self):
# Terminate servoshell.
self.servoshell.terminate()
# Stop the web server.
self.web_server.shutdown()
self.web_server_thread.join()
def assert_sources_list(self, expected_urls):
client = RDPClient()
client.connect("127.0.0.1", 6080)
root = RootActor(client)
tabs = root.list_tabs()
tab_dict = tabs[0]
tab = TabActor(client, tab_dict["actor"])
watcher = tab.get_watcher()
watcher = WatcherActor(client, watcher["actor"])
target = Future()
def on_target(data):
if data["target"]["browsingContextID"] == tab_dict["browsingContextID"]:
target.set_result(data["target"])
client.add_event_listener(
watcher.actor_id, Events.Watcher.TARGET_AVAILABLE_FORM, on_target,
)
watcher.watch_targets(WatcherActor.Targets.FRAME)
done = Future()
target = target.result(1)
def on_source_resource(data):
for [resource_type, sources] in data["array"]:
try:
self.assertEqual(resource_type, "source")
self.assertEqual([source["url"] for source in sources], expected_urls)
done.set_result(None)
except Exception as e:
# Raising here does nothing, for some reason.
# Send the exception back so it can be raised.
done.set_result(e)
client.add_event_listener(
target["actor"],
Events.Watcher.RESOURCES_AVAILABLE_ARRAY,
on_source_resource,
)
watcher.watch_resources([Resources.SOURCE])
result: Optional[Exception] = done.result(1)
if result:
raise result
client.disconnect()
def run_tests(script_path):
DevtoolsTests.script_path = script_path
verbosity = 1 if logging.getLogger().level >= logging.WARN else 2
suite = unittest.TestLoader().loadTestsFromTestCase(DevtoolsTests)
return unittest.TextTestRunner(verbosity=verbosity).run(suite).wasSuccessful()
|