ig-pixelfed-mirror/server.py

219 lines
6.2 KiB
Python

from http.server import BaseHTTPRequestHandler, HTTPServer
from urllib.parse import urlparse
import igmirror
import json
import sys
import os
import re
class MyServer(BaseHTTPRequestHandler):
def do_GET(self):
_ = urlparse(self.path)
qs = _.query
path = _.path.strip().strip('/')
parts = path.split('/')
if len(parts) == 1:
response(self, 200, 'html')
action = parts[0].lower()
if False:
pass
# list accounts (plain text)
elif action == 'list':
accounts = os.listdir('./db/accounts')
for acc in sorted(set(accounts)):
echo(self, acc)
# lists accounts on a pretty HTML + CSS
# making sure there is no XSS possible on account names
elif action == 'mirrors':
html = """<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8"/>
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>{instance} - Mirrors</title>
</head>
<body>
<style type="text/css">
body {
font-family: Arial;
}
div#content {
max-width: 40em;
margin: auto;
}
div#content > div:nth-child(1) {
display: flex;
margin: 2em 1em;
border-bottom: 3px solid #dbdbdb;
padding-bottom: 2em;
}
div#content > div:nth-child(1) > h3 {
margin: 0;
width: 100%;
}
div#content > div:nth-child(1) > span {
min-width: 6em;
}
div.item {
padding: 1em;
box-shadow: 0px 2px .2em #acacac;
margin-bottom: 1em;
}
div.item > h3.name {
font-weight: initial;
}
div.item > div.links {
display: flex;
height: 2.5em;
margin-top: 2em;
}
div.item > div.links > div {
display: flex;
width: 100%;
}
div.item > div.links > div > a {
margin: auto;
text-decoration: none;
}
div.item > div.links > div:nth-child(1) {
background: #ffdade;
}
div.item > div.links > div:nth-child(1) > a {
font-weight: bold;
color: #ce0680;
}
div.item > div.links > div:nth-child(2) {
background: #f2f2f2;
}
div.item > div.links > div:nth-child(2) > a {
color: #367280;
font-style: italic;
}
</style>
<div id="content">
<div>
<h3>{instance}</h3>
<span>{item_count} accounts</span>
</div>
{mirrors}
</div>
</body>
</html>
"""
html = igmirror.pixelfed_htmlfill_mirrors(html)
echo(self, html)
return
if len(parts) == 2:
response(self, 200, 'json')
acc_id = parts[0]
action = parts[1].lower()
if acc_id == '*':
if False:
pass
elif action == 'update':
igmirror.update_allaccounts_async()
return echo(self, {'status': 'ok', 'message': 'All accounts update asyncronous started!'})
elif action == 'login':
igmirror.pixelfed_loginall_async()
return echo(self, {'status': 'ok', 'message': 'All accounts login asyncronous started!'})
elif action == 'logout':
igmirror.pixelfed_logoutall_async()
return echo(self, {'status': 'ok', 'message': 'All accounts logout asyncronous started!'})
else:
# make sure account name contains only safe characters
# i think IG usernames can only have this characters:
if False:
pass
elif action == 'add':
igmirror.add_igaccount(acc_id)
return echo(self, {'status': 'ok', 'message': 'New mirror account added to Pixelfed!'})
elif action == 'update':
igmirror.update_igaccount_async(acc_id)
return echo(self, {'status': 'ok', 'message': 'Account update process started asyncronously'})
elif action == 'login':
igmirror.pixelfed_login(acc_id, True)
return echo(self, {'status': 'ok', 'message': 'Account logged in to Pixelfed'})
elif action == 'logout':
igmirror.pixelfed_logout(acc_id)
return echo(self, {'status': 'ok', 'message': 'Account logged out of Pixelfed'})
elif action == 'nuke':
igmirror.delete_statuses(acc_id)
return echo(self, {'status': 'ok', 'message': 'Account nuked successfully'})
return echo(self, {'status': 'error', 'message': '2nd parameter action not configured: {}'.format(action)})
if len(parts) == 3:
response(self, 200, 'json')
acc_id = parts[0]
action = parts[1].lower()
key = parts[2].lower()
if False:
pass
elif action == 'cfg':
status, message = igmirror.account_config(acc_id, key, qs)
status = 'ok' if status else 'error'
return echo(self, {'status': status, 'message': message})
return echo(self, {'status': 'error', 'message': '2nd parameter action not configured: {}'.format(arg2)})
response(self, 400, 'json')
return echo(self, {'status': 'error', 'message': 'Parameters are not correct. Check out the API documentation'})
def response(obj, code, typ):
obj.send_response(code)
typ = typ.lower().strip()
if typ == 'html':
obj.send_header('Content-Type', 'text/html')
elif typ == 'json':
obj.send_header('Content-Type', 'application/json')
obj.end_headers()
def echo(obj, what, newline=True):
if type(what) is dict or type(what) is list:
what = json.dumps(what)
if newline:
what = what + '\n'
obj.wfile.write(bytes(what, "utf-8"))
if __name__ == "__main__":
addr = '0.0.0.0'
port = 8080
if len(sys.argv) > 1:
arg = sys.argv[1].strip()
if ':' in arg:
ps = arg.split(':')
addr = ps[0]
port = int(ps[1])
else:
port = int(arg)
webServer = HTTPServer((addr, port), MyServer)
print("Server started http://%s:%s" % (addr, port))
try:
webServer.serve_forever()
except KeyboardInterrupt:
pass
webServer.server_close()
print("Server stopped.")