summaryrefslogtreecommitdiff
path: root/test/server.py
diff options
context:
space:
mode:
authorJörg Frings-Fürst <debian@jff.email>2023-06-14 20:36:37 +0200
committerJörg Frings-Fürst <debian@jff.email>2023-06-14 20:36:37 +0200
commitbb80d3feebdc9acc52e3f4ad24084d8425f043a2 (patch)
tree2084a84c39f159c6aea254775dc0880d52579d45 /test/server.py
parentb26ff0798252a1a8072dd2c7a67f6205de9fde11 (diff)
parent31804433d72460cbe0a39f9f8ea5e76058d84cda (diff)
Merge branch 'feature/upstream' into develop
Diffstat (limited to 'test/server.py')
-rwxr-xr-xtest/server.py87
1 files changed, 77 insertions, 10 deletions
diff --git a/test/server.py b/test/server.py
index 10f3c70..72c7acf 100755
--- a/test/server.py
+++ b/test/server.py
@@ -25,6 +25,49 @@ import http.server
import cgi
import urllib.parse
import time
+import argparse
+import ssl
+
+from OpenSSL import crypto, SSL
+
+def cert_gen(
+ emailAddress="emailAddress",
+ commonName="commonName",
+ countryName="NT",
+ localityName="localityName",
+ stateOrProvinceName="stateOrProvinceName",
+ organizationName="organizationName",
+ organizationUnitName="organizationUnitName",
+ serialNumber=0,
+ validityStartInSeconds=0,
+ validityEndInSeconds=10*365*24*60*60,
+ KEY_FILE = "key.pem",
+ CERT_FILE="cert.pem"):
+ #can look at generated file using openssl:
+ #openssl x509 -inform pem -in selfsigned.crt -noout -text
+ # create a key pair
+ k = crypto.PKey()
+ k.generate_key(crypto.TYPE_RSA, 4096)
+ # create a self-signed cert
+ cert = crypto.X509()
+ cert.get_subject().C = countryName
+ cert.get_subject().ST = stateOrProvinceName
+ cert.get_subject().L = localityName
+ cert.get_subject().O = organizationName
+ cert.get_subject().OU = organizationUnitName
+ cert.get_subject().CN = commonName
+ cert.get_subject().emailAddress = emailAddress
+ cert.set_serial_number(serialNumber)
+ cert.gmtime_adj_notBefore(0)
+ cert.gmtime_adj_notAfter(validityEndInSeconds)
+ cert.set_issuer(cert.get_subject())
+ cert.set_pubkey(k)
+ cert.sign(k, 'sha512')
+ with open(CERT_FILE, "wt") as f:
+ f.write(crypto.dump_certificate(crypto.FILETYPE_PEM, cert).decode("utf-8"))
+ with open(KEY_FILE, "wt") as f:
+ f.write(crypto.dump_privatekey(crypto.FILETYPE_PEM, k).decode("utf-8"))
+
# This is a simple implementation of the Piwigo protocol to run locally
# for testing publishing in offline-situations
@@ -36,6 +79,7 @@ class SimpleRequestHandler(http.server.BaseHTTPRequestHandler):
self.log_message("Content-Type = " + ctype)
if ctype == 'multipart/form-data':
pdict['boundary'] = bytes(pdict['boundary'], 'utf-8')
+ pdict['CONTENT-LENGTH'] = self.headers['Content-Length']
postvars = cgi.parse_multipart(self.rfile, pdict)
elif ctype == 'application/x-www-form-urlencoded':
length = int(self.headers['content-length'])
@@ -49,44 +93,55 @@ class SimpleRequestHandler(http.server.BaseHTTPRequestHandler):
except:
method = postvars['method'][0]
+ # Make sure we have a utf8 string
+ try:
+ method = method.decode()
+ except:
+ pass
+
self.log_message("Received method call for " + str(method))
time.sleep(1)
if self.path == '/ws.php':
try:
-
- if method == b'pwg.session.login':
+ if method == 'pwg.session.login':
self.send_response(200)
self.send_header('Content-type', 'text/xml')
self.send_header('Set-Cookie', 'pwg_id="12345"')
self.end_headers()
self.wfile.write(b'<?xml version="1.0"?><piwigo stat="ok"></piwigo>')
return
- elif method == b'pwg.session.getStatus':
+ elif method == 'pwg.session.getStatus':
self.send_response(200)
self.send_header('Content-type', 'text/xml')
self.send_header('Set-Cookie', 'pwg_id="12345"')
self.end_headers()
self.wfile.write(b'<?xml version="1.0"?><piwigo stat="ok"><username>test</username></piwigo>')
return
- elif method == b'pwg.categories.getList':
+ elif method == 'pwg.categories.getList':
self.send_response(200)
self.send_header('Content-type', 'text/xml')
self.send_header('Set-Cookie', 'pwg_id="12345"')
self.end_headers()
self.wfile.write(b'<?xml version="1.0"?><piwigo stat="ok"><categories></categories></piwigo>')
return
- elif method == b'pwg.categories.add':
+ elif method == 'pwg.categories.add':
self.send_response(200)
self.send_header('Set-Cookie', 'pwg_id="12345"')
self.end_headers()
self.wfile.write(b'<?xml version="1.0"?><piwigo stat="ok"><id>765</id></piwigo>')
return
- elif method == b'pwg.images.addSimple':
+ elif method == 'pwg.images.addSimple':
self.send_response(200)
self.send_header('Set-Cookie', 'pwg_id="12345"')
self.end_headers()
- self.wfile.write(b'<?xml version="1.0"?><piwigo stat="ok"></piwigo>')
+ self.wfile.write(b'<?xml version="1.0"?><piwigo stat="ok"><image_id>2387</image_id></piwigo>')
+ return
+ elif method == 'pwg.images.rate':
+ self.send_response(200)
+ self.send_header('Set-Cookie', 'pwg_id="12345"')
+ self.end_headers()
+ self.wfile.write(b'<?xml version="1.0"?><piwigo stat="ok"><image_id>2387</image_id></piwigo>')
return
except:
self.log_error('Unknown method {0}'.format(postvars[b'method']))
@@ -94,9 +149,21 @@ class SimpleRequestHandler(http.server.BaseHTTPRequestHandler):
self.send_response(500)
-def run(server_class = http.server.HTTPServer, handler_class = SimpleRequestHandler):
- server_address = ('127.0.0.1', 8080)
+def run(server_class = http.server.HTTPServer, handler_class = SimpleRequestHandler, port=8080, do_ssl=False):
+ server_address = ('127.0.0.1', port)
httpd = server_class(server_address, handler_class)
+ if do_ssl:
+ cert_gen()
+ context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
+ context.load_cert_chain("cert.pem", "key.pem")
+ httpd.socket = context.wrap_socket(httpd.socket, server_side=True)
+
httpd.serve_forever()
-run()
+if __name__ == "__main__":
+ parser = argparse.ArgumentParser(description = "Piwigo test server")
+ parser.add_argument('--port', type=int, default=8080)
+ parser.add_argument('--ssl', action='store_true')
+ args = parser.parse_args()
+
+ run(port=args.port, do_ssl = args.ssl)