aboutsummaryrefslogtreecommitdiffstats
path: root/python/servo/devtools_tests.py
blob: 2a305e656a460afb146dd5e84b752c4d381f2eaf (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
# Copyright 2013 The Servo Project Developers. See the COPYRIGHT
# file at the top-level directory of this distribution.
#
# Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
# http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
# <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
# option. This file may not be copied, modified, or distributed
# except according to those terms.

from concurrent.futures import Future
from geckordp.actors.root import RootActor
from geckordp.actors.descriptors.tab import TabActor
from geckordp.actors.watcher import WatcherActor
from geckordp.actors.resources import Resources
from geckordp.actors.events import Events
from geckordp.rdp_client import RDPClient
import http.server
import os.path
import socketserver
import subprocess
import sys
import time
from threading import Thread


def run_tests(script_path):
    run_test(sources_test, os.path.join(script_path, "devtools_tests/sources"))


def run_test(test_fun, test_dir):
    print(f">>> {test_dir}", file=sys.stderr)
    server = None
    base_url = Future()

    class Handler(http.server.SimpleHTTPRequestHandler):
        def __init__(self, *args, **kwargs):
            super().__init__(*args, directory=test_dir, **kwargs)

        def log_message(self, format, *args):
            # Uncomment this to log requests.
            # return super().log_message(format, *args)
            pass

    def server_thread():
        nonlocal server
        server = socketserver.TCPServer(("0.0.0.0", 0), Handler)
        base_url.set_result(f"http://127.0.0.1:{server.server_address[1]}")
        server.serve_forever()

    # Start a web server for the test.
    thread = Thread(target=server_thread)
    thread.start()
    base_url = base_url.result(1)

    # Change this setting if you want to debug Servo.
    os.environ["RUST_LOG"] = "error,devtools=warn"

    # Run servoshell.
    servoshell = subprocess.Popen(["target/release/servo", "--devtools=6080", f"{base_url}/test.html"])

    # FIXME: Don’t do this
    time.sleep(1)

    try:
        client = RDPClient()
        client.connect("127.0.0.1", 6080)
        test_fun(client, base_url)
    except Exception as e:
        raise e
    finally:
        # Terminate servoshell.
        servoshell.terminate()

        # Stop the web server.
        server.shutdown()
        thread.join()


def sources_test(client, base_url):
    root = RootActor(client)
    tabs = root.list_tabs()
    tab_dict = tabs[0]
    tab = TabActor(client, tab_dict["actor"])
    watcher = tab.get_watcher()
    watcher = WatcherActor(client, watcher["actor"])

    target = Future()

    def on_target(data):
        if data["target"]["browsingContextID"] == tab_dict["browsingContextID"]:
            target.set_result(data["target"])

    client.add_event_listener(
        watcher.actor_id, Events.Watcher.TARGET_AVAILABLE_FORM, on_target,
    )
    watcher.watch_targets(WatcherActor.Targets.FRAME)

    done = Future()
    target = target.result(1)

    def on_source_resource(data):
        for [resource_type, sources] in data["array"]:
            # FIXME: If these assertions fail, we just see TimeoutError with no further information
            assert resource_type == "source"
            assert [source["url"] for source in sources] == [f"{base_url}/classic.js", f"{base_url}/test.html", "https://servo.org/js/load-table.js"]
            done.set_result(True)

    client.add_event_listener(
        target["actor"],
        Events.Watcher.RESOURCES_AVAILABLE_ARRAY,
        on_source_resource,
    )
    watcher.watch_resources([Resources.SOURCE])

    assert done.result(1)
    client.disconnect()