ig-pixelfed-mirror/igmirror.py

349 lines
12 KiB
Python

#!/usr/bin/python3
import threading
import requests
import hashlib
import string
import random
import time
import json
import os
import re
CONFIG = {}
def add_igaccount(acc_id):
accfile = './db/accounts/{}'.format(acc_id)
# user_create script must exist before running the API server
if not os.path.exists('./scripts/user_create'):
print('E| You may need to initialize the server environment first')
return 1
if not os.path.exists(accfile):
# get all profile data from instagram acc
data = getig_user_data(acc_id)
# this tells us the user probably don't exist (or also might be a network error?)
if len(data.keys()) == 0:
print('E| User "{}" does not exist on Instagram'.format(acc_id))
return 2
# we don't and can't mirror private profiles
# (well, we can if we follow them and follow back, but we just don't need that)
if data['graphql']['user']['is_private']:
print('E| User "{}" is a private account. We just won\'t mirror that!'.format(acc_id))
return 3
# get account display name to create it
name = data['graphql']['user']['full_name']
name = re.sub(r'[^a-zA-Z0-9_\s]', '', name)
account = {
'name': name,
'username': acc_id,
'password': random_string()
}
# we are completely sure the parameters can't contain unwanted characters
# a shell exploit is not possible here :)
os.system('./scripts/user_create \'{}\' \'{}\' \'{}\''.format(\
account['name'], account['username'], account['password']))
# save the account login information for updates and mirroring
db_set('accounts', acc_id, account)
db_set('posts', acc_id, [])
# set Pixelfed account data for the username
pixelfed_setpic(acc_id, data['graphql']['user']['profile_pic_url'])
pixelfed_setinfo(acc_id, data['graphql']['user']['biography'],\
data['graphql']['user']['external_url'])
update_igaccount_async(acc_id)
else:
print('W| User "{}" already exists in local database'.format(acc_id))
return 0
def update_igaccount_async(acc_id):
threading.Thread(target=update_igaccount, args=(acc_id,)).start()
def update_igaccount(acc_id):
# if account does not exist, we stop the mirroring process
accfile = './db/accounts/{}'.format(acc_id)
if not os.path.exists(accfile):
print('E| User "'+acc_id+'" has not been created yet, maybe you wanted to call /<username>/add ?')
return 1
# do it on a thread because it might take long to download the latest posts
data = getig_user_data(acc_id)
pixelfed_dlposts(acc_id, data['graphql']['user'])
def update_allaccounts_async():
threading.Thread(target=update_allaccounts).start()
def update_allaccounts():
# update all accounts with a timeout of 20 seconds
for acc_id in os.listdir('./db/accounts'):
print('I| mirroring account "{}"...'.format(acc_id))
update_igaccount(acc_id)
print('I| timeout 20 seconds')
time.sleep(20)
print()
def pixelfed_islogged(acc_id, accdata=None):
if accdata is None:
accdata = db_get('accounts', acc_id)
return 'cookie' in accdata
def pixelfed_login(acc_id, force=False):
# check account is already logged in if not "force"
accdata = db_get('accounts', acc_id)
if not force and pixelfed_islogged(acc_id, accdata):
return
# obtain one time tokens for the pixelfed instance
_cookies, _token = pixelfed_token_url()
# do the login post and retrieve the raw cookies, the rest of API calls will have this cookies
r = requests.post( 'https://'+config()['instance']+'/login' ,\
data={
'_token': _token, 'email': 'pixelfed.'+acc_id+'@localhost',
'password': accdata['password'], 'remember': 'on'
},
cookies=_cookies
)
# add the raw cookies to the account data for later calls
accdata['cookie'] = dict(r.cookies)
db_set('accounts', acc_id, accdata)
def pixelfed_token_url(url='', _cookies=None):
r = requests.get( 'https://'+config()['instance']+url, cookies=_cookies )
_token = re.search(r'name="_token".+value="([^"]+)"', r.text).group(1)
return r.cookies, _token
def pixelfed_dlposts(acc_id, data):
ts = []
for edge in data['edge_owner_to_timeline_media']['edges']:
ts.append(edge['node']['taken_at_timestamp'])
for edge in data['edge_felix_video_timeline']['edges']:
ts.append(edge['node']['taken_at_timestamp'])
ts = sorted(ts)
items = []
for t in ts:
brkit = False
for edge in data['edge_owner_to_timeline_media']['edges']:
if edge['node']['taken_at_timestamp'] == t:
items.append(edge['node'])
brkit = True
break
if brkit:
continue
for edge in data['edge_felix_video_timeline']['edges']:
if edge['node']['taken_at_timestamp'] == t:
items.append(edge['node'])
break
# mirror posts from the account (only the last N, without loading more),
# but only the ones that has not already been imported
accposts = db_get('posts', acc_id)
accdata = db_get('accounts', acc_id)
for item in items:
if item['is_video']:
continue
if item['shortcode'] in accposts:
print('I| skipping IG post {}:{}. Already added'.format(acc_id, item['shortcode']))
continue
print('I| processing IG post {}:{}'.format(acc_id, item['shortcode']))
ig_url = 'https://www.instagram.com/p/{}/'.format(item['shortcode'])
title = item['title'] if 'title' in item else None
media = None
if not item['is_video']:
media = item['display_url']
else:
media = item['video_url']
caption = item['edge_media_to_caption']['edges'][0]['node']['text'] \
if len(item['edge_media_to_caption']['edges']) > 0 else ''
altcaption = item['accessibility_caption'] if 'accessibility_caption' in item else None
# for now, we only support images (not videos :( )
if not item['is_video']:
_token, jsdata = pixelfed_postimage(acc_id, media, accdata)
if not jsdata:
print('E| Could not upload media for {}:{}'.format(acc_id, item['shortcode']))
continue
jsdata['description'] = ig_url
caption = caption[0:136]+'...' if len(caption) > 140 else caption
jsdata['alt'] = altcaption[0:136]+'...' if len(altcaption) > 140 else altcaption
jsdata['cw'] = False
r = requests.post('https://'+config()['instance']+'/api/compose/v0/publish',\
json={"media": [jsdata], "caption": caption, "visibility": "public", "cw": False,\
"comments_disabled": False, "place": False, "tagged": [],"optimize_media": True},\
cookies=accdata['cookie'],\
headers={
'Content-Type': 'application/json',
'X-CSRF-TOKEN': _token,
'X-Requested-With': 'XMLHttpRequest',
'X-XSRF-TOKEN': accdata['cookie']['XSRF-TOKEN']
}
)
accposts.append(item['shortcode'])
print('I| uploaded media for {}:{} : {}'.format(acc_id, item['shortcode'], r.status_code))
print('I| done uploading media for {}'.format(acc_id))
db_set('posts', acc_id, accposts)
# upload media and return data
def pixelfed_postimage(acc_id, image_url, accdata=None):
if accdata is None:
accdata = db_get('accounts', acc_id)
cachef = pixelfed_cacheimg(image_url)
_, _token = pixelfed_token_url('', accdata['cookie'])
r = requests.post( 'https://'+config()['instance']+'/api/compose/v0/media/upload',\
files={'file': open(cachef, 'rb')}, cookies=accdata['cookie'],\
headers={
'X-CSRF-TOKEN': _token,
'X-Requested-With': 'XMLHttpRequest',
'X-XSRF-TOKEN': accdata['cookie']['XSRF-TOKEN']
}
)
if r.status_code == 200:
return _token, json.loads(r.text)
return None, False
# get the image by URL but cache it forever, as if the profile changes the pic
# the url will be different, and therefore, the sum will also be different
def pixelfed_cacheimg(image_url):
cachef = './cache/{}.jpg'.format(md5sum(image_url))
if not os.path.exists(cachef):
r = requests.get(image_url)
w = open(cachef, 'wb')
w.write(r.content)
w.close()
return cachef
def pixelfed_setpic(acc_id, pic_url, count=0):
count += 1
pixelfed_login(acc_id)
cachef = pixelfed_cacheimg(pic_url)
accdata = db_get('accounts', acc_id)
_, _token = pixelfed_token_url('/settings/home', accdata['cookie'])
r = requests.post( 'https://'+config()['instance']+'/settings/avatar',\
data={'_token': _token}, cookies=accdata['cookie'], files={'avatar': open(cachef, 'rb')}
)
# try to login if the upload failed
if r.status_code == 419 and count < 3:
pixelfed_login(acc_id, True)
return pixelfed_setpic(acc_id, pic_url, count)
return True
def pixelfed_setinfo(acc_id, bio, website, count=0):
count += 1
pixelfed_login(acc_id)
accdata = db_get('accounts', acc_id)
_, _token = pixelfed_token_url('/settings/home', accdata['cookie'])
r = requests.post( 'https://'+config()['instance']+'/settings/home',\
data={
'_token': _token, 'name': accdata['name'],
'website': website, 'bio': bio, 'language': 'en'
},
cookies=accdata['cookie']
)
# try to login if the upload failed
if r.status_code == 419 and count < 3:
pixelfed_login(acc_id, True)
return pixelfed_setinfo(acc_id, bio, website, count)
return True
def random_string(count=32):
return ''.join(random.choices(string.ascii_uppercase + string.ascii_lowercase + string.digits, k=count))
def md5sum(_str):
return hashlib.md5(_str.encode()).hexdigest()
# get all profile data from user:
# - display name
# - bio description
# - shared posts (images/videos)
# - much more info...
def getig_user_data(acc_id):
return json.loads(
instagram_get('/{}/?__a=1'.format(acc_id))
)
# runs a basic GET request emulating Tor Browser
def instagram_get(url, CACHE_SECS=600):
headers = get_random_headers()
default_headers = {
'Accept': '*/*',
'Accept-Language': 'en-US,en;q=0.5',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive',
'Cache-Control': 'no-cache',
'Pragma': 'no-cache',
'Host': 'www.instagram.com',
'Referer': 'https://www.instagram.com/',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; rv:78.0) Gecko/20100101 Firefox/78.0'
}
for key in default_headers.keys():
if not key in headers:
headers[key] = default_headers[key]
url = 'https://www.instagram.com{}'.format(url)
cachef = './cache/'+md5sum(url)
now = str(time.time())
now = int(now[:now.index('.')])
if os.path.exists(cachef):
cache = readf(cachef).splitlines()
ctime = int(cache[0])
if now < ctime:
return '\n'.join(cache[1:])
r = requests.get(url, headers=headers)
resp = r.text
w = open(cachef, 'w')
w.write(str(now+CACHE_SECS) + '\n')
w.write(resp)
return resp
def get_random_headers():
a = os.listdir('./headers')
rin = 0
if len(a)-1 > 0:
rin = random.randint(0, len(a)-1)
lines = readf('./headers/{}'.format(a[rin])).splitlines()
headers = {}
for line in lines:
reg = re.search('(^[^:]+):(.*)', line)
headers[reg.group(1).strip()] = reg.group(2).strip()
return headers
def db_set(table, acc_id, accdata):
w = open('./db/{}/{}'.format(table, acc_id), 'w')
w.write(json.dumps(accdata))
w.close()
def db_get(table, acc_id):
return json.loads(readf('./db/{}/{}'.format(table, acc_id)))
def config():
global CONFIG
if len(CONFIG.keys()) == 0:
CONFIG = json.loads(readf('./config.json'))
return CONFIG
def readf(f):
r = open(f,'r')
c = r.read().strip()
r.close()
return c
if __name__ == '__main__':
main()