Source code

Revision control

Copy as Markdown

Other Tools

#!/usr/bin/python
import json
import urllib
from mod_pywebsocket import msgutil
from wptserve import stash
address, authkey = stash.load_env_config()
stash = stash.Stash("/stash_responder", address=address, authkey=authkey)
def web_socket_do_extra_handshake(request):
return
def web_socket_transfer_data(request):
while True:
line = request.ws_stream.receive_message()
if line == "echo":
query = request.unparsed_uri.split('?')[1]
GET = dict(urllib.parse.parse_qsl(query))
# TODO(kristijanburnik): This code should be reused from
# /mixed-content/generic/expect.py or implemented more generally
# for other tests.
path = GET.get("path", request.unparsed_uri.split('?')[0])
key = GET["key"]
action = GET["action"]
if action == "put":
value = GET["value"]
stash.take(key=key, path=path)
stash.put(key=key, value=value, path=path)
response_data = json.dumps({"status": "success", "result": key})
elif action == "purge":
value = stash.take(key=key, path=path)
response_data = json.dumps({"status": "success", "result": value})
elif action == "take":
value = stash.take(key=key, path=path)
if value is None:
status = "allowed"
else:
status = "blocked"
response_data = json.dumps({"status": status, "result": value})
msgutil.send_message(request, response_data)
return