diff options
author | Jörg Frings-Fürst <debian@jff.email> | 2023-06-14 20:36:37 +0200 |
---|---|---|
committer | Jörg Frings-Fürst <debian@jff.email> | 2023-06-14 20:36:37 +0200 |
commit | bb80d3feebdc9acc52e3f4ad24084d8425f043a2 (patch) | |
tree | 2084a84c39f159c6aea254775dc0880d52579d45 /test/server.py | |
parent | b26ff0798252a1a8072dd2c7a67f6205de9fde11 (diff) | |
parent | 31804433d72460cbe0a39f9f8ea5e76058d84cda (diff) |
Merge branch 'feature/upstream' into develop
Diffstat (limited to 'test/server.py')
-rwxr-xr-x | test/server.py | 87 |
1 files changed, 77 insertions, 10 deletions
diff --git a/test/server.py b/test/server.py index 10f3c70..72c7acf 100755 --- a/test/server.py +++ b/test/server.py @@ -25,6 +25,49 @@ import http.server import cgi import urllib.parse import time +import argparse +import ssl + +from OpenSSL import crypto, SSL + +def cert_gen( + emailAddress="emailAddress", + commonName="commonName", + countryName="NT", + localityName="localityName", + stateOrProvinceName="stateOrProvinceName", + organizationName="organizationName", + organizationUnitName="organizationUnitName", + serialNumber=0, + validityStartInSeconds=0, + validityEndInSeconds=10*365*24*60*60, + KEY_FILE = "key.pem", + CERT_FILE="cert.pem"): + #can look at generated file using openssl: + #openssl x509 -inform pem -in selfsigned.crt -noout -text + # create a key pair + k = crypto.PKey() + k.generate_key(crypto.TYPE_RSA, 4096) + # create a self-signed cert + cert = crypto.X509() + cert.get_subject().C = countryName + cert.get_subject().ST = stateOrProvinceName + cert.get_subject().L = localityName + cert.get_subject().O = organizationName + cert.get_subject().OU = organizationUnitName + cert.get_subject().CN = commonName + cert.get_subject().emailAddress = emailAddress + cert.set_serial_number(serialNumber) + cert.gmtime_adj_notBefore(0) + cert.gmtime_adj_notAfter(validityEndInSeconds) + cert.set_issuer(cert.get_subject()) + cert.set_pubkey(k) + cert.sign(k, 'sha512') + with open(CERT_FILE, "wt") as f: + f.write(crypto.dump_certificate(crypto.FILETYPE_PEM, cert).decode("utf-8")) + with open(KEY_FILE, "wt") as f: + f.write(crypto.dump_privatekey(crypto.FILETYPE_PEM, k).decode("utf-8")) + # This is a simple implementation of the Piwigo protocol to run locally # for testing publishing in offline-situations @@ -36,6 +79,7 @@ class SimpleRequestHandler(http.server.BaseHTTPRequestHandler): self.log_message("Content-Type = " + ctype) if ctype == 'multipart/form-data': pdict['boundary'] = bytes(pdict['boundary'], 'utf-8') + pdict['CONTENT-LENGTH'] = self.headers['Content-Length'] postvars = cgi.parse_multipart(self.rfile, pdict) elif ctype == 'application/x-www-form-urlencoded': length = int(self.headers['content-length']) @@ -49,44 +93,55 @@ class SimpleRequestHandler(http.server.BaseHTTPRequestHandler): except: method = postvars['method'][0] + # Make sure we have a utf8 string + try: + method = method.decode() + except: + pass + self.log_message("Received method call for " + str(method)) time.sleep(1) if self.path == '/ws.php': try: - - if method == b'pwg.session.login': + if method == 'pwg.session.login': self.send_response(200) self.send_header('Content-type', 'text/xml') self.send_header('Set-Cookie', 'pwg_id="12345"') self.end_headers() self.wfile.write(b'<?xml version="1.0"?><piwigo stat="ok"></piwigo>') return - elif method == b'pwg.session.getStatus': + elif method == 'pwg.session.getStatus': self.send_response(200) self.send_header('Content-type', 'text/xml') self.send_header('Set-Cookie', 'pwg_id="12345"') self.end_headers() self.wfile.write(b'<?xml version="1.0"?><piwigo stat="ok"><username>test</username></piwigo>') return - elif method == b'pwg.categories.getList': + elif method == 'pwg.categories.getList': self.send_response(200) self.send_header('Content-type', 'text/xml') self.send_header('Set-Cookie', 'pwg_id="12345"') self.end_headers() self.wfile.write(b'<?xml version="1.0"?><piwigo stat="ok"><categories></categories></piwigo>') return - elif method == b'pwg.categories.add': + elif method == 'pwg.categories.add': self.send_response(200) self.send_header('Set-Cookie', 'pwg_id="12345"') self.end_headers() self.wfile.write(b'<?xml version="1.0"?><piwigo stat="ok"><id>765</id></piwigo>') return - elif method == b'pwg.images.addSimple': + elif method == 'pwg.images.addSimple': self.send_response(200) self.send_header('Set-Cookie', 'pwg_id="12345"') self.end_headers() - self.wfile.write(b'<?xml version="1.0"?><piwigo stat="ok"></piwigo>') + self.wfile.write(b'<?xml version="1.0"?><piwigo stat="ok"><image_id>2387</image_id></piwigo>') + return + elif method == 'pwg.images.rate': + self.send_response(200) + self.send_header('Set-Cookie', 'pwg_id="12345"') + self.end_headers() + self.wfile.write(b'<?xml version="1.0"?><piwigo stat="ok"><image_id>2387</image_id></piwigo>') return except: self.log_error('Unknown method {0}'.format(postvars[b'method'])) @@ -94,9 +149,21 @@ class SimpleRequestHandler(http.server.BaseHTTPRequestHandler): self.send_response(500) -def run(server_class = http.server.HTTPServer, handler_class = SimpleRequestHandler): - server_address = ('127.0.0.1', 8080) +def run(server_class = http.server.HTTPServer, handler_class = SimpleRequestHandler, port=8080, do_ssl=False): + server_address = ('127.0.0.1', port) httpd = server_class(server_address, handler_class) + if do_ssl: + cert_gen() + context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) + context.load_cert_chain("cert.pem", "key.pem") + httpd.socket = context.wrap_socket(httpd.socket, server_side=True) + httpd.serve_forever() -run() +if __name__ == "__main__": + parser = argparse.ArgumentParser(description = "Piwigo test server") + parser.add_argument('--port', type=int, default=8080) + parser.add_argument('--ssl', action='store_true') + args = parser.parse_args() + + run(port=args.port, do_ssl = args.ssl) |