from http.server import BaseHTTPRequestHandler, HTTPServer from urllib.parse import urlparse import igmirror import json import sys import os import re class MyServer(BaseHTTPRequestHandler): def do_GET(self): _ = urlparse(self.path) qs = _.query path = _.path.strip().strip('/') parts = path.split('/') if len(parts) == 1: response(self, 200, 'html') action = parts[0].lower() if False: pass # list accounts (plain text) elif action == 'list': accounts = os.listdir('./db/accounts') for acc in sorted(set(accounts)): echo(self, acc) # lists accounts on a pretty HTML + CSS # making sure there is no XSS possible on account names elif action == 'mirrors': html = """ {instance} - Mirrors

{instance}

{item_count} accounts
{mirrors}
""" html = igmirror.pixelfed_htmlfill_mirrors(html) echo(self, html) return if len(parts) == 2: response(self, 200, 'json') acc_id = parts[0] action = parts[1].lower() if acc_id == '*': if False: pass elif action == 'update': igmirror.update_allaccounts_async() elif action == 'login': igmirror.pixelfed_loginall_async() elif action == 'logout': igmirror.pixelfed_logoutall_async() else: # make sure account name contains only safe characters # i think IG usernames can only have this characters: if False: pass elif action == 'add': igmirror.add_igaccount(acc_id) return echo(self, {'status': 'ok', 'message': 'New mirror account added to Pixelfed!'}) elif action == 'update': igmirror.update_igaccount_async(acc_id) return echo(self, {'status': 'ok', 'message': 'Account update process started asyncronously'}) elif action == 'login': igmirror.pixelfed_login(acc_id, True) return echo(self, {'status': 'ok', 'message': 'Account logged in to Pixelfed'}) elif action == 'logout': igmirror.pixelfed_logout(acc_id) return echo(self, {'status': 'ok', 'message': 'Account logged out of Pixelfed'}) elif action == 'nuke': igmirror.delete_statuses(acc_id) return echo(self, {'status': 'ok', 'message': 'Account nuked successfully'}) return echo(self, {'status': 'error', 'message': '2nd parameter action not configured: {}'.format(action)}) if len(parts) == 3: response(self, 200, 'json') acc_id = parts[0] action = parts[1].lower() key = parts[2].lower() if False: pass elif action == 'cfg': status, message = igmirror.account_config(acc_id, key, qs) status = 'ok' if status else 'error' return echo(self, {'status': status, 'message': message}) return echo(self, {'status': 'error', 'message': '2nd parameter action not configured: {}'.format(arg2)}) response(self, 400, 'json') return echo(self, {'status': 'error', 'message': 'Parameters are not correct. Check out the API documentation'}) def response(obj, code, typ): obj.send_response(code) typ = typ.lower().strip() if typ == 'html': obj.send_header('Content-Type', 'text/html') elif typ == 'json': obj.send_header('Content-Type', 'application/json') obj.end_headers() def echo(obj, what, newline=True): if type(what) is dict or type(what) is list: what = json.dumps(what) if newline: what = what + '\n' obj.wfile.write(bytes(what, "utf-8")) if __name__ == "__main__": addr = '0.0.0.0' port = 8080 if len(sys.argv) > 1: arg = sys.argv[1].strip() if ':' in arg: ps = arg.split(':') addr = ps[0] port = int(ps[1]) else: port = int(arg) webServer = HTTPServer((addr, port), MyServer) print("Server started http://%s:%s" % (addr, port)) try: webServer.serve_forever() except KeyboardInterrupt: pass webServer.server_close() print("Server stopped.")