323 lines
9.6 KiB
Python
323 lines
9.6 KiB
Python
import aiohttp
|
|
import aiohttp.web
|
|
import asyncio
|
|
import logging
|
|
import uuid
|
|
import re
|
|
import simplejson as json
|
|
import cgi
|
|
from urllib.parse import urlsplit
|
|
from Crypto.PublicKey import RSA
|
|
from .database import DATABASE
|
|
from .http_debug import http_debug
|
|
|
|
from cachetools import LFUCache
|
|
|
|
|
|
# generate actor keys if not present
|
|
if "actorKeys" not in DATABASE:
|
|
logging.info("No actor keys present, generating 4096-bit RSA keypair.")
|
|
|
|
privkey = RSA.generate(4096)
|
|
pubkey = privkey.publickey()
|
|
|
|
DATABASE["actorKeys"] = {
|
|
"publicKey": pubkey.exportKey('PEM').decode('utf-8'),
|
|
"privateKey": privkey.exportKey('PEM').decode('utf-8')
|
|
}
|
|
|
|
|
|
PRIVKEY = RSA.importKey(DATABASE["actorKeys"]["privateKey"])
|
|
PUBKEY = PRIVKEY.publickey()
|
|
|
|
sem = asyncio.Semaphore(500)
|
|
|
|
from . import app, CONFIG
|
|
from .remote_actor import fetch_actor
|
|
|
|
|
|
AP_CONFIG = CONFIG.get('ap', {'host': 'localhost','blocked_instances':[], 'whitelist_enabled': False, 'whitelist': []})
|
|
CACHE_SIZE = CONFIG.get('cache-size', 16384)
|
|
|
|
|
|
CACHE = LFUCache(CACHE_SIZE)
|
|
|
|
|
|
async def actor(request):
|
|
data = {
|
|
"@context": "https://www.w3.org/ns/activitystreams",
|
|
"endpoints": {
|
|
"sharedInbox": "https://{}/inbox".format(request.host)
|
|
},
|
|
"followers": "https://{}/followers".format(request.host),
|
|
"following": "https://{}/following".format(request.host),
|
|
"inbox": "https://{}/inbox".format(request.host),
|
|
"name": "ActivityRelay",
|
|
"type": "Application",
|
|
"id": "https://{}/actor".format(request.host),
|
|
"publicKey": {
|
|
"id": "https://{}/actor#main-key".format(request.host),
|
|
"owner": "https://{}/actor".format(request.host),
|
|
"publicKeyPem": DATABASE["actorKeys"]["publicKey"]
|
|
},
|
|
"summary": "ActivityRelay bot",
|
|
"preferredUsername": "relay",
|
|
"url": "https://{}/actor".format(request.host)
|
|
}
|
|
return aiohttp.web.json_response(data)
|
|
|
|
|
|
app.router.add_get('/actor', actor)
|
|
|
|
|
|
from .http_signatures import sign_headers
|
|
|
|
|
|
get_actor_inbox = lambda actor: actor.get('endpoints', {}).get('sharedInbox', actor['inbox'])
|
|
|
|
|
|
async def push_message_to_actor(actor, message, our_key_id):
|
|
inbox = get_actor_inbox(actor)
|
|
url = urlsplit(inbox)
|
|
|
|
# XXX: Digest
|
|
data = json.dumps(message)
|
|
headers = {
|
|
'(request-target)': 'post {}'.format(url.path),
|
|
'Content-Length': str(len(data)),
|
|
'Content-Type': 'application/activity+json',
|
|
'User-Agent': 'ActivityRelay'
|
|
}
|
|
headers['signature'] = sign_headers(headers, PRIVKEY, our_key_id)
|
|
headers.pop('(request-target)')
|
|
|
|
logging.debug('%r >> %r', inbox, message)
|
|
|
|
global sem
|
|
async with sem:
|
|
try:
|
|
async with aiohttp.ClientSession(trace_configs=[http_debug()]) as session:
|
|
async with session.post(inbox, data=data, headers=headers) as resp:
|
|
if resp.status == 202:
|
|
return
|
|
resp_payload = await resp.text()
|
|
logging.debug('%r >> resp %r', inbox, resp_payload)
|
|
except Exception as e:
|
|
logging.info('Caught %r while pushing to %r.', e, inbox)
|
|
|
|
|
|
async def follow_remote_actor(actor_uri):
|
|
actor = await fetch_actor(actor_uri)
|
|
|
|
if not actor:
|
|
logging.info('failed to fetch actor at: %r', actor_uri)
|
|
return
|
|
|
|
if AP_CONFIG['whitelist_enabled'] is True and urlsplit(actor_uri).hostname not in AP_CONFIG['whitelist']:
|
|
logging.info('refusing to follow non-whitelisted actor: %r', actor_uri)
|
|
return
|
|
|
|
logging.info('following: %r', actor_uri)
|
|
|
|
message = {
|
|
"@context": "https://www.w3.org/ns/activitystreams",
|
|
"type": "Follow",
|
|
"to": [actor['id']],
|
|
"object": actor['id'],
|
|
"id": "https://{}/activities/{}".format(AP_CONFIG['host'], uuid.uuid4()),
|
|
"actor": "https://{}/actor".format(AP_CONFIG['host'])
|
|
}
|
|
await push_message_to_actor(actor, message, "https://{}/actor#main-key".format(AP_CONFIG['host']))
|
|
|
|
|
|
async def unfollow_remote_actor(actor_uri):
|
|
actor = await fetch_actor(actor_uri)
|
|
if not actor:
|
|
logging.info('failed to fetch actor at: %r', actor_uri)
|
|
return
|
|
|
|
logging.info('unfollowing: %r', actor_uri)
|
|
|
|
message = {
|
|
"@context": "https://www.w3.org/ns/activitystreams",
|
|
"type": "Undo",
|
|
"to": [actor['id']],
|
|
"object": {
|
|
"type": "Follow",
|
|
"object": actor_uri,
|
|
"actor": actor['id'],
|
|
"id": "https://{}/activities/{}".format(AP_CONFIG['host'], uuid.uuid4())
|
|
},
|
|
"id": "https://{}/activities/{}".format(AP_CONFIG['host'], uuid.uuid4()),
|
|
"actor": "https://{}/actor".format(AP_CONFIG['host'])
|
|
}
|
|
await push_message_to_actor(actor, message, "https://{}/actor#main-key".format(AP_CONFIG['host']))
|
|
|
|
|
|
tag_re = re.compile(r'(<!--.*?-->|<[^>]*>)')
|
|
def strip_html(data):
|
|
no_tags = tag_re.sub('', data)
|
|
return cgi.escape(no_tags)
|
|
|
|
|
|
def distill_inboxes(actor, object_id):
|
|
global DATABASE
|
|
|
|
origin_hostname = urlsplit(object_id).hostname
|
|
|
|
inbox = get_actor_inbox(actor)
|
|
targets = [target for target in DATABASE.get('relay-list', []) if target != inbox]
|
|
targets = [target for target in targets if urlsplit(target).hostname != origin_hostname]
|
|
hostnames = [urlsplit(target).hostname for target in targets]
|
|
|
|
assert inbox not in targets
|
|
assert origin_hostname not in hostnames
|
|
|
|
return targets
|
|
|
|
|
|
def distill_object_id(activity):
|
|
logging.debug('>> determining object ID for %r', activity['object'])
|
|
obj = activity['object']
|
|
|
|
if isinstance(obj, str):
|
|
return obj
|
|
|
|
return obj['id']
|
|
|
|
|
|
async def handle_relay(actor, data, request):
|
|
global CACHE
|
|
|
|
object_id = distill_object_id(data)
|
|
|
|
if object_id in CACHE:
|
|
logging.debug('>> already relayed %r as %r', object_id, CACHE[object_id])
|
|
return
|
|
|
|
activity_id = "https://{}/activities/{}".format(request.host, uuid.uuid4())
|
|
|
|
message = {
|
|
"@context": "https://www.w3.org/ns/activitystreams",
|
|
"type": "Announce",
|
|
"to": ["https://{}/followers".format(request.host)],
|
|
"actor": "https://{}/actor".format(request.host),
|
|
"object": object_id,
|
|
"id": activity_id
|
|
}
|
|
|
|
logging.debug('>> relay: %r', message)
|
|
|
|
inboxes = distill_inboxes(actor, object_id)
|
|
|
|
futures = [push_message_to_actor({'inbox': inbox}, message, 'https://{}/actor#main-key'.format(request.host)) for inbox in inboxes]
|
|
asyncio.ensure_future(asyncio.gather(*futures))
|
|
|
|
CACHE[object_id] = activity_id
|
|
|
|
|
|
async def handle_forward(actor, data, request):
|
|
object_id = distill_object_id(data)
|
|
|
|
logging.debug('>> Relay %r', data)
|
|
|
|
inboxes = distill_inboxes(actor, object_id)
|
|
|
|
futures = [
|
|
push_message_to_actor(
|
|
{'inbox': inbox},
|
|
data,
|
|
'https://{}/actor#main-key'.format(request.host))
|
|
for inbox in inboxes]
|
|
asyncio.ensure_future(asyncio.gather(*futures))
|
|
|
|
|
|
async def handle_follow(actor, data, request):
|
|
global DATABASE
|
|
|
|
following = DATABASE.get('relay-list', [])
|
|
inbox = get_actor_inbox(actor)
|
|
|
|
if urlsplit(inbox).hostname in AP_CONFIG['blocked_instances']:
|
|
return
|
|
|
|
if inbox not in following:
|
|
following += [inbox]
|
|
DATABASE['relay-list'] = following
|
|
|
|
if data['object'].endswith('/actor'):
|
|
asyncio.ensure_future(follow_remote_actor(actor['id']))
|
|
|
|
message = {
|
|
"@context": "https://www.w3.org/ns/activitystreams",
|
|
"type": "Accept",
|
|
"to": [actor["id"]],
|
|
"actor": "https://{}/actor".format(request.host),
|
|
|
|
# this is wrong per litepub, but mastodon < 2.4 is not compliant with that profile.
|
|
"object": {
|
|
"type": "Follow",
|
|
"id": data["id"],
|
|
"object": "https://{}/actor".format(request.host),
|
|
"actor": actor["id"]
|
|
},
|
|
|
|
"id": "https://{}/activities/{}".format(request.host, uuid.uuid4()),
|
|
}
|
|
|
|
asyncio.ensure_future(push_message_to_actor(actor, message, 'https://{}/actor#main-key'.format(request.host)))
|
|
|
|
|
|
async def handle_undo(actor, data, request):
|
|
global DATABASE
|
|
|
|
child = data['object']
|
|
if child['type'] == 'Follow':
|
|
following = DATABASE.get('relay-list', [])
|
|
|
|
inbox = get_actor_inbox(actor)
|
|
|
|
if inbox in following:
|
|
following.remove(inbox)
|
|
DATABASE['relay-list'] = following
|
|
|
|
if child['object'].endswith('/actor'):
|
|
await unfollow_remote_actor(actor['id'])
|
|
|
|
|
|
processors = {
|
|
'Announce': handle_relay,
|
|
'Create': handle_relay,
|
|
'Delete': handle_forward,
|
|
'Follow': handle_follow,
|
|
'Undo': handle_undo,
|
|
'Update': handle_forward,
|
|
}
|
|
|
|
|
|
async def inbox(request):
|
|
data = await request.json()
|
|
instance = urlsplit(data['actor']).hostname
|
|
|
|
if 'actor' not in data or not request['validated']:
|
|
raise aiohttp.web.HTTPUnauthorized(body='access denied', content_type='text/plain')
|
|
|
|
elif data['type'] != 'Follow' and 'https://{}/inbox'.format(instance) not in DATABASE['relay-list']:
|
|
raise aiohttp.web.HTTPUnauthorized(body='access denied', content_type='text/plain')
|
|
|
|
elif AP_CONFIG['whitelist_enabled'] is True and instance not in AP_CONFIG['whitelist']:
|
|
raise aiohttp.web.HTTPUnauthorized(body='access denied', content_type='text/plain')
|
|
|
|
actor = await fetch_actor(data["actor"])
|
|
actor_uri = 'https://{}/actor'.format(request.host)
|
|
|
|
logging.debug(">> payload %r", data)
|
|
|
|
processor = processors.get(data['type'], None)
|
|
if processor:
|
|
await processor(actor, data, request)
|
|
|
|
return aiohttp.web.Response(body=b'{}', content_type='application/activity+json')
|
|
|
|
app.router.add_post('/inbox', inbox)
|