pleroma-relay/relay/actor.py

348 lines
11 KiB
Python
Raw Normal View History

2018-08-10 21:14:22 +00:00
import aiohttp
2018-08-10 19:59:46 +00:00
import aiohttp.web
import asyncio
2018-08-10 19:59:46 +00:00
import logging
2018-08-11 01:36:24 +00:00
import uuid
import re
2018-08-11 01:36:24 +00:00
import simplejson as json
import cgi
import datetime
from urllib.parse import urlsplit
2018-08-10 19:59:46 +00:00
from Crypto.PublicKey import RSA
from cachetools import LFUCache
from . import app, CONFIG
2018-08-10 19:59:46 +00:00
from .database import DATABASE
2018-11-18 00:07:36 +00:00
from .http_debug import http_debug
from .remote_actor import fetch_actor
from .http_signatures import sign_headers, generate_body_digest
2018-11-18 14:15:34 +00:00
2018-08-10 19:59:46 +00:00
# generate actor keys if not present
if "actorKeys" not in DATABASE:
logging.info("No actor keys present, generating 4096-bit RSA keypair.")
privkey = RSA.generate(4096)
pubkey = privkey.publickey()
DATABASE["actorKeys"] = {
2018-12-27 09:07:20 +00:00
"publicKey": pubkey.exportKey('PEM').decode('utf-8'),
"privateKey": privkey.exportKey('PEM').decode('utf-8')
2018-08-10 19:59:46 +00:00
}
2018-08-11 01:36:24 +00:00
PRIVKEY = RSA.importKey(DATABASE["actorKeys"]["privateKey"])
PUBKEY = PRIVKEY.publickey()
AP_CONFIG = CONFIG['ap']
2018-11-18 14:15:34 +00:00
CACHE_SIZE = CONFIG.get('cache-size', 16384)
CACHE = LFUCache(CACHE_SIZE)
2018-08-17 23:09:24 +00:00
sem = asyncio.Semaphore(500)
2018-08-10 19:59:46 +00:00
async def actor(request):
data = {
"@context": "https://www.w3.org/ns/activitystreams",
"endpoints": {
"sharedInbox": "https://{}/inbox".format(request.host)
},
"followers": "https://{}/followers".format(request.host),
2018-08-18 00:53:46 +00:00
"following": "https://{}/following".format(request.host),
2018-08-10 19:59:46 +00:00
"inbox": "https://{}/inbox".format(request.host),
2018-10-30 01:42:17 +00:00
"name": "ActivityRelay",
2018-08-10 19:59:46 +00:00
"type": "Application",
2018-08-10 20:14:51 +00:00
"id": "https://{}/actor".format(request.host),
2018-08-10 19:59:46 +00:00
"publicKey": {
"id": "https://{}/actor#main-key".format(request.host),
"owner": "https://{}/actor".format(request.host),
"publicKeyPem": DATABASE["actorKeys"]["publicKey"]
},
2018-10-30 01:42:17 +00:00
"summary": "ActivityRelay bot",
"preferredUsername": "relay",
"url": "https://{}/actor".format(request.host)
2018-08-10 19:59:46 +00:00
}
return aiohttp.web.json_response(data, content_type='application/activity+json')
2018-08-10 19:59:46 +00:00
app.router.add_get('/actor', actor)
get_actor_inbox = lambda actor: actor.get('endpoints', {}).get('sharedInbox', actor['inbox'])
2018-08-11 01:36:24 +00:00
async def push_message_to_actor(actor, message, our_key_id):
inbox = get_actor_inbox(actor)
url = urlsplit(inbox)
2018-08-11 01:36:24 +00:00
# XXX: Digest
data = json.dumps(message)
headers = {
'(request-target)': 'post {}'.format(url.path),
'Content-Length': str(len(data)),
'Content-Type': 'application/activity+json',
'User-Agent': 'ActivityRelay',
'Host': url.netloc,
'Digest': 'SHA-256={}'.format(generate_body_digest(data)),
'Date': datetime.datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT')
2018-08-11 01:36:24 +00:00
}
headers['signature'] = sign_headers(headers, PRIVKEY, our_key_id)
headers.pop('(request-target)')
headers.pop('Host')
2018-08-11 01:36:24 +00:00
logging.debug('%r >> %r', inbox, message)
2018-08-18 00:53:46 +00:00
global sem
async with sem:
try:
async with aiohttp.ClientSession(trace_configs=[http_debug()]) as session:
async with session.post(inbox, data=data, headers=headers) as resp:
if resp.status == 202:
return
resp_payload = await resp.text()
logging.debug('%r >> resp %r', inbox, resp_payload)
except Exception as e:
logging.info('Caught %r while pushing to %r.', e, inbox)
2018-08-11 01:36:24 +00:00
2020-12-03 04:13:33 +00:00
async def fetch_nodeinfo(domain):
2020-12-04 06:43:49 +00:00
headers = {'Accept': 'application/activity+json'}
nodeinfo_url = None
wk_nodeinfo = await fetch_actor(f'https://{domain}/.well-known/nodeinfo', headers=headers)
if not wk_nodeinfo:
return
for link in wk_nodeinfo.get('links', ''):
if link['rel'] == 'http://nodeinfo.diaspora.software/ns/schema/2.0':
nodeinfo_url = link['href']
break
if not nodeinfo_url:
return
nodeinfo_data = await fetch_actor(nodeinfo_url)
2020-12-03 04:13:33 +00:00
software = nodeinfo_data.get('software')
2020-12-04 06:43:49 +00:00
2020-12-03 04:13:33 +00:00
return software.get('name') if software else None
2018-08-17 23:09:24 +00:00
async def follow_remote_actor(actor_uri):
actor = await fetch_actor(actor_uri)
2019-05-21 16:29:55 +00:00
if not actor:
logging.info('failed to fetch actor at: %r', actor_uri)
return
2019-05-21 16:29:55 +00:00
if AP_CONFIG['whitelist_enabled'] is True and urlsplit(actor_uri).hostname not in AP_CONFIG['whitelist']:
logging.info('refusing to follow non-whitelisted actor: %r', actor_uri)
return
logging.info('following: %r', actor_uri)
2018-08-17 23:09:24 +00:00
message = {
"@context": "https://www.w3.org/ns/activitystreams",
"type": "Follow",
"to": [actor['id']],
2018-08-18 00:53:46 +00:00
"object": actor['id'],
2018-08-17 23:09:24 +00:00
"id": "https://{}/activities/{}".format(AP_CONFIG['host'], uuid.uuid4()),
"actor": "https://{}/actor".format(AP_CONFIG['host'])
}
await push_message_to_actor(actor, message, "https://{}/actor#main-key".format(AP_CONFIG['host']))
2018-08-18 01:03:46 +00:00
async def unfollow_remote_actor(actor_uri):
actor = await fetch_actor(actor_uri)
if not actor:
logging.info('failed to fetch actor at: %r', actor_uri)
return
logging.info('unfollowing: %r', actor_uri)
2018-08-18 01:03:46 +00:00
message = {
"@context": "https://www.w3.org/ns/activitystreams",
"type": "Undo",
"to": [actor['id']],
"object": {
"type": "Follow",
"object": actor_uri,
"actor": actor['id'],
2018-08-18 01:04:18 +00:00
"id": "https://{}/activities/{}".format(AP_CONFIG['host'], uuid.uuid4())
},
2018-08-18 01:03:46 +00:00
"id": "https://{}/activities/{}".format(AP_CONFIG['host'], uuid.uuid4()),
"actor": "https://{}/actor".format(AP_CONFIG['host'])
}
await push_message_to_actor(actor, message, "https://{}/actor#main-key".format(AP_CONFIG['host']))
2018-08-11 01:36:24 +00:00
tag_re = re.compile(r'(<!--.*?-->|<[^>]*>)')
def strip_html(data):
no_tags = tag_re.sub('', data)
return cgi.escape(no_tags)
def distill_inboxes(actor, object_id):
global DATABASE
origin_hostname = urlsplit(object_id).hostname
inbox = get_actor_inbox(actor)
targets = [target for target in DATABASE.get('relay-list', []) if target != inbox]
targets = [target for target in targets if urlsplit(target).hostname != origin_hostname]
hostnames = [urlsplit(target).hostname for target in targets]
assert inbox not in targets
assert origin_hostname not in hostnames
return targets
def distill_object_id(activity):
logging.debug('>> determining object ID for %r', activity['object'])
obj = activity['object']
if isinstance(obj, str):
return obj
return obj['id']
async def handle_relay(actor, data, request):
2018-11-18 14:20:17 +00:00
global CACHE
object_id = distill_object_id(data)
2018-11-18 14:20:17 +00:00
if object_id in CACHE:
logging.debug('>> already relayed %r as %r', object_id, CACHE[object_id])
return
activity_id = "https://{}/activities/{}".format(request.host, uuid.uuid4())
message = {
"@context": "https://www.w3.org/ns/activitystreams",
"type": "Announce",
"to": ["https://{}/followers".format(request.host)],
"actor": "https://{}/actor".format(request.host),
"object": object_id,
2018-11-18 14:20:17 +00:00
"id": activity_id
}
logging.debug('>> relay: %r', message)
inboxes = distill_inboxes(actor, object_id)
futures = [push_message_to_actor({'inbox': inbox}, message, 'https://{}/actor#main-key'.format(request.host)) for inbox in inboxes]
asyncio.ensure_future(asyncio.gather(*futures))
2018-08-17 23:01:44 +00:00
2018-11-18 14:20:17 +00:00
CACHE[object_id] = activity_id
2018-08-11 01:36:24 +00:00
async def handle_forward(actor, data, request):
object_id = distill_object_id(data)
logging.debug('>> Relay %r', data)
inboxes = distill_inboxes(actor, object_id)
futures = [
push_message_to_actor(
{'inbox': inbox},
data,
'https://{}/actor#main-key'.format(request.host))
for inbox in inboxes]
asyncio.ensure_future(asyncio.gather(*futures))
2018-08-11 01:36:24 +00:00
async def handle_follow(actor, data, request):
global DATABASE
following = DATABASE.get('relay-list', [])
inbox = get_actor_inbox(actor)
2018-10-31 19:29:30 +00:00
2020-12-03 04:13:33 +00:00
if urlsplit(inbox).hostname in AP_CONFIG['blocked_instances']:
2018-10-31 19:29:30 +00:00
return
if inbox not in following:
following += [inbox]
DATABASE['relay-list'] = following
asyncio.ensure_future(follow_remote_actor(actor['id']))
2018-08-11 01:36:24 +00:00
message = {
"@context": "https://www.w3.org/ns/activitystreams",
"type": "Accept",
"to": [actor["id"]],
2018-08-17 23:09:24 +00:00
"actor": "https://{}/actor".format(request.host),
2018-08-11 01:36:24 +00:00
# this is wrong per litepub, but mastodon < 2.4 is not compliant with that profile.
"object": {
"type": "Follow",
"id": data["id"],
"object": "https://{}/actor".format(request.host),
"actor": actor["id"]
2018-08-11 01:36:24 +00:00
},
"id": "https://{}/activities/{}".format(request.host, uuid.uuid4()),
}
asyncio.ensure_future(push_message_to_actor(actor, message, 'https://{}/actor#main-key'.format(request.host)))
async def handle_undo(actor, data, request):
global DATABASE
child = data['object']
if child['type'] == 'Follow':
following = DATABASE.get('relay-list', [])
inbox = get_actor_inbox(actor)
if inbox in following:
following.remove(inbox)
DATABASE['relay-list'] = following
await unfollow_remote_actor(actor['id'])
2018-08-11 01:36:24 +00:00
processors = {
'Announce': handle_relay,
'Create': handle_relay,
'Delete': handle_forward,
'Follow': handle_follow,
'Undo': handle_undo,
'Update': handle_forward,
2018-08-11 01:36:24 +00:00
}
async def inbox(request):
2018-08-11 02:24:23 +00:00
data = await request.json()
instance = urlsplit(data['actor']).hostname
2018-08-11 01:53:01 +00:00
2020-12-04 07:34:40 +00:00
if AP_CONFIG['blocked_software']:
2020-12-03 04:13:33 +00:00
software = await fetch_nodeinfo(instance)
2020-12-04 07:34:40 +00:00
if software and software.lower() in AP_CONFIG['blocked_software']:
2020-12-03 04:13:33 +00:00
raise aiohttp.web.HTTPUnauthorized(body='relays have been blocked', content_type='text/plain')
2018-08-11 01:53:01 +00:00
if 'actor' not in data or not request['validated']:
raise aiohttp.web.HTTPUnauthorized(body='access denied', content_type='text/plain')
2018-08-11 01:36:24 +00:00
2019-05-21 16:29:55 +00:00
elif data['type'] != 'Follow' and 'https://{}/inbox'.format(instance) not in DATABASE['relay-list']:
raise aiohttp.web.HTTPUnauthorized(body='access denied', content_type='text/plain')
elif AP_CONFIG['whitelist_enabled'] is True and instance not in AP_CONFIG['whitelist']:
raise aiohttp.web.HTTPUnauthorized(body='access denied', content_type='text/plain')
2018-08-11 01:36:24 +00:00
actor = await fetch_actor(data["actor"])
actor_uri = 'https://{}/actor'.format(request.host)
logging.debug(">> payload %r", data)
2018-08-11 01:36:24 +00:00
processor = processors.get(data['type'], None)
if processor:
await processor(actor, data, request)
return aiohttp.web.Response(body=b'{}', content_type='application/activity+json')
app.router.add_post('/inbox', inbox)