#!/usr/bin/python3 import threading import requests import hashlib import string import random import time import json import os import re CONFIG = {} def add_igaccount(acc_id): # user_create script must exist before running the API server if not os.path.exists('./scripts/user_create'): print('E| You may need to initialize the server environment first') return 1 if not account_exists(acc_id): # get all profile data from instagram acc data = getig_user_data(acc_id) # this tells us the user probably don't exist (or also might be a network error?) if len(data.keys()) == 0: print('E| User "{}" does not exist on Instagram'.format(acc_id)) return 2 # we don't and can't mirror private profiles # (well, we can if we follow them and follow back, but we just don't need that) if data['graphql']['user']['is_private']: print('E| User "{}" is a private account. We just won\'t mirror that!'.format(acc_id)) return 3 # get account display name to create it name = getig_user_fullname(data) account = { 'name': name, 'username': acc_id, 'password': random_string() } # we are completely sure the parameters can't contain unwanted characters # a shell exploit is not possible here :) os.system('./scripts/user_create \'{}\' \'{}\' \'{}\''.format(\ account['name'], account['username'], account['password'])) # save the account login information for updates and mirroring db_set('accounts', acc_id, account) db_set('posts', acc_id, []) # set Pixelfed account data for the username pixelfed_setpic(acc_id, data['graphql']['user']['profile_pic_url']) pixelfed_setinfo(acc_id, data['graphql']['user']['biography'],\ data['graphql']['user']['external_url']) update_igaccount_async(acc_id, False) else: print('W| User "{}" already exists in local database'.format(acc_id)) return 0 def update_igaccount_async(acc_id, profileset=True): threading.Thread(target=update_igaccount, args=(acc_id, profileset,)).start() def update_igaccount(acc_id, profileset=True): accdata = db_get('accounts', acc_id) # Some IG might post faster than others, so we want to asign simple scheduling # to alter the way accounts are updated depending on how much you want 1 account to update if 'sched' in accdata: sched_err_msg = 'E| User schedule is not configured correctly, for "{}": {}'.format(acc_id, accdata['sched']) if not re.match(r'^\d+$', accdata['sched']): print(sched_err_msg) return 1 times = int(accdata['sched']) scnow = random.randint(1, times) if 'sched_now' in accdata: scnow = int(accdata['sched_now']) if scnow == times: scnow = 1 else: scnow += 1 accdata['sched_now'] = str(scnow) db_set('accounts', acc_id, accdata) if scnow != times: print('I| Skipping user "{}" according to configured schedule: {} of {}'.format(\ acc_id, scnow, times)) return 1 # if account does not exist, we stop the update process if not account_exists(acc_id): print('E| User "'+acc_id+'" has not been created yet, maybe you wanted to call //add ?') return 2 data = getig_user_data(acc_id) if profileset: # update the fullname of the user on local DB accdata['name'] = getig_user_fullname(data) db_set('accounts', acc_id, accdata) # set the information from IG to the Pixelfed Account info pixelfed_setpic(acc_id, data['graphql']['user']['profile_pic_url']) pixelfed_setinfo(acc_id, data['graphql']['user']['biography'],\ data['graphql']['user']['external_url']) # sincronize posts (images/videos...) pixelfed_dlposts(acc_id, data['graphql']['user']) return 0 def update_allaccounts_async(): threading.Thread(target=update_allaccounts).start() def update_allaccounts(): # update all accounts with a timeout of 20 seconds sleeptime = int(config()['timeout_btw_accounts']) accounts = os.listdir('./db/accounts') random.shuffle(accounts) i = 0 for acc_id in accounts: i += 1 print('I| mirroring account "{}"...'.format(acc_id)) ret = update_igaccount(acc_id) if ret == 0: print('I| {} of {} completed. Waiting {} seconds'.format(i, len(accounts), sleeptime)) time.sleep(sleeptime) print() print('I| done updating all accounts') def delete_statuses(acc_id): accdata = db_get('accounts', acc_id) if not pixelfed_islogged(acc_id, accdata): print('E| user "{}" is not logged in. Please log in'.format(acc_id)) return 1 # check our information to validate our account exists r = requests.get( 'https://'+config()['instance']+'/api/pixelfed/v1/accounts/verify_credentials', cookies=accdata['cookie']) if r.status_code != 200: print('E| user "{}" is not logged in. Please log in'.format(acc_id)) return 2 pixdata = json.loads(r.text) if not 'id' in pixdata: print('E| fatal! API is not working!. Might be a connectivity issue or the Account does Not Exist??'.format(acc_id)) return 3 _, _token = pixelfed_token_url('', accdata['cookie']) _headers = { 'Content-Type': 'application/json', 'X-Requested-With': 'XMLHttpRequest', 'X-CSRF-TOKEN': _token, 'X-XSRF-TOKEN': accdata['cookie']['XSRF-TOKEN'] } # delete all statuses on pixelfed by Polling next N items while True: r2 = requests.get('https://'+config()['instance']+'/api/pixelfed/v1/accounts/{}/statuses?min_id=1'.format(pixdata['id']),\ cookies=accdata['cookie'] ) jsdata = json.loads(r2.text) if not jsdata: break for status in jsdata: print('I| deleting status "{}" for account "{}"... '.format(status['id'], acc_id), end='') r3 = requests.post('https://'+config()['instance']+'/i/delete', json={'item': status['id'], 'type': 'status'},\ cookies=accdata['cookie'], headers=_headers) print(r3.status_code) db_set('posts', acc_id, []) print('I| done nuking account posts for "{}"'.format(acc_id)) return 0 def pixelfed_logoutall_async(): threading.Thread(target=pixelfed_logoutall).start() def pixelfed_logoutall(): for acc_id in os.listdir('./db/accounts'): print('I| logging out account "{}": '.format(acc_id), end='') if pixelfed_logout(acc_id): print('ok') else: print('not logged') print('I| done logging out all accounts\n') def pixelfed_logout(acc_id): accdata = db_get('accounts', acc_id) if not pixelfed_islogged(acc_id, accdata): return False _, _token = pixelfed_token_url('', accdata['cookie']) r = requests.post('https://'+config()['instance']+'/logout', data={'_token': _token}, cookies=accdata['cookie']) del accdata['cookie'] db_set('accounts', acc_id, accdata) return True def pixelfed_loginall_async(): threading.Thread(target=pixelfed_loginall).start() def pixelfed_loginall(): for acc_id in os.listdir('./db/accounts'): print('I| logging in account "{}": '.format(acc_id), end='') if pixelfed_login(acc_id): print('ok') else: print('already logged') print('I| done logging in all accounts\n') def pixelfed_login(acc_id, force=False): # check account is already logged in if not "force" accdata = db_get('accounts', acc_id) if not force and pixelfed_islogged(acc_id, accdata): return False # obtain one time tokens for the pixelfed instance _cookies, _token = pixelfed_token_url() # do the login post and retrieve the raw cookies, the rest of API calls will have this cookies r = requests.post( 'https://'+config()['instance']+'/login' ,\ data={ '_token': _token, 'email': 'pixelfed.'+acc_id+'@localhost', 'password': accdata['password'], 'remember': 'on' }, cookies=_cookies ) # add the raw cookies to the account data for later calls accdata['cookie'] = dict(r.cookies) db_set('accounts', acc_id, accdata) return True def pixelfed_islogged(acc_id, accdata=None): if accdata is None: accdata = db_get('accounts', acc_id) return 'cookie' in accdata def pixelfed_token_url(url='', _cookies=None): r = requests.get( 'https://'+config()['instance']+url, cookies=_cookies ) _token = re.search(r'name="_token".+value="([^"]+)"', r.text).group(1) return r.cookies, _token def pixelfed_dlstories_async(): threading.Thread(target=pixelfed_dlstories).start() def pixelfed_dlstories(): # get reels_tray of the account (stories are called like this) data = json.loads(instagram_get('https://i.instagram.com/api/v1/feed/reels_tray/', 1000000, { 'Host': 'i.instagram.com', })) for item in data['tray']: # for now stories don't support videos: # https://github.com/pixelfed/pixelfed/issues/2169 # So, we will upload them as posts print(json.dumps(item, indent=4)) def pixelfed_dlposts(acc_id, data, is_story=False): ts = [] items = [] if not is_story: for edge in data['edge_owner_to_timeline_media']['edges']: ts.append(edge['node']['taken_at_timestamp']) for edge in data['edge_felix_video_timeline']['edges']: ts.append(edge['node']['taken_at_timestamp']) ts = sorted(ts) for t in ts: brkit = False for edge in data['edge_owner_to_timeline_media']['edges']: if edge['node']['taken_at_timestamp'] == t: items.append(edge['node']) brkit = True break if brkit: continue for edge in data['edge_felix_video_timeline']['edges']: if edge['node']['taken_at_timestamp'] == t: items.append(edge['node']) break else: print(data) return # mirror posts from the account (only the last N, without loading more), # but only the ones that has not already been imported accposts = db_get('posts', acc_id) accdata = db_get('accounts', acc_id) for item in items: if item['shortcode'] in accposts: print('I| skipping IG post {}:{}. Already added'.format(acc_id, item['shortcode'])) continue print('I| >>>> {}:{}'.format(acc_id, item['shortcode'])) ig_url = 'https://www.instagram.com/p/{}/'.format(item['shortcode']) title = item['title'] if 'title' in item else None caption = item['edge_media_to_caption']['edges'][0]['node']['text'] \ if len(item['edge_media_to_caption']['edges']) > 0 else '' altcaption = item['accessibility_caption'] if 'accessibility_caption' in item else '' altcaption = '' if altcaption is None else altcaption # add support for posts with multiple images # get the data from the post URL. (we need all images, as IG can have not only 1 image in the post) postdata = json.loads(instagram_get('/p/{}/?__a=1'.format(item['shortcode']), 216000)) multiple = False multmedia = None if len(postdata.keys()) > 0 and 'edge_sidecar_to_children' in postdata['graphql']['shortcode_media']: multiple = True multmedia = postdata['graphql']['shortcode_media']['edge_sidecar_to_children']['edges'] _headers = { 'Content-Type': 'application/json', 'X-Requested-With': 'XMLHttpRequest', 'X-XSRF-TOKEN': accdata['cookie']['XSRF-TOKEN'] } jsdata_items = [] _token = None failed = False if item['is_video']: # if the video is bigger than "max_video_size", we don't even try to upload it! r = requests.head(item['video_url']) size_in_mb = (int(r.headers['Content-Length']) / 1024 / 1024) if size_in_mb > config()['max_video_size']: # hack, we "add" it on posts so it doesn't try it again! accposts.append(item['shortcode']) print('W| video exceeds the configured Maximum of "{}MB"'.format(config()['max_video_size'])) continue # try to upload to Pixelfed, it might failed depending on MAX_PHOTO_SIZE setting print('I| fetching VIDEO for {}:{}... '.format(acc_id, item['shortcode']), end='') _token, jsdata = pixelfed_postvideo(acc_id, item['video_url']) if not jsdata: print('err') print('E| tried to upload a video of "{}MB" but failed. MAX_PHOTO_SIZE setting?'.format(int(size_in_mb))) print() continue jsdata_items.append(jsdata) print('done') else: print('I| uploading IMAGES for {}:{}... '.format(acc_id, item['shortcode']), end='') media2iterate = [a['node']['display_url'] for a in multmedia] if multiple else [item['display_url']] # we add support to multiple media here for media in media2iterate: _token, jsdata = pixelfed_postimage(acc_id, media, accdata) if not jsdata: print('E| Could not upload media for {}:{}'.format(acc_id, item['shortcode'])) failed = True break jsdata_items.append(jsdata) if failed: continue print('done') # add the accesibility captions i = 0 _headers['X-CSRF-TOKEN'] = _token for jsdata in jsdata_items: jsdata['description'] = ig_url jsdata['cw'] = False jsdata['alt'] = '' if multiple: mnode = multmedia[i]['node'] if 'accessibility_caption' in mnode and not mnode['accessibility_caption'] is None: jsdata['alt'] = mnode['accessibility_caption'][0:136]+'...' \ if len(mnode['accessibility_caption']) > 140 else mnode['accessibility_caption'] else: jsdata['alt'] = altcaption[0:136]+'...' if len(altcaption) > 140 else altcaption i += 1 # publish the post using Pixelfed API # the caption will be the original instagram URL print('I| publishing post for {}:{}... '.format(acc_id, item['shortcode']), end='') r = requests.post('https://'+config()['instance']+'/api/compose/v0/publish',\ json={"media": jsdata_items, "caption": ig_url, "visibility": "public", "cw": False,\ "comments_disabled": False, "place": False, "tagged": [],"optimize_media": True},\ cookies=accdata['cookie'], headers=_headers ) # do a comment as it supports larger descriptions if r.status_code == 200 and len(r.text) > 5: ps = r.text.strip('/').split('/') status_id = ps[len(ps)-1] print('done | StatusID -> {}'.format(status_id)) print('I| publishing comments containing caption for {}:{}... '.format(acc_id, item['shortcode']), end='') i = 1 failed = False for comment in [caption[i:i+495] for i in range(0, len(caption), 495)]: r2 = requests.post('https://'+config()['instance']+'/i/comment',\ json={'comment': '('+str(i)+') '+comment, 'item': status_id, 'sensitive': False},\ cookies=accdata['cookie'], headers=_headers ) if not r2.status_code == 200: failed = True print('err. CODE -> {}'.format(r2.status_code)) print(r2.text) break i += 1 if not failed: print('done') accposts.append(item['shortcode']) print('I| uploaded post {}:{} : OK'.format(acc_id, item['shortcode'])) time.sleep(int(config()['timeout_btw_posts'])) else: print(r.text) print(r.headers) return print('I| done updating "{}" account'.format(acc_id)) db_set('posts', acc_id, accposts) # upload media and return data def pixelfed_postimage(acc_id, image_url, accdata=None): return pixelfed_postmedia(acc_id, image_url, 'jpg', accdata) def pixelfed_postvideo(acc_id, video_url, accdata=None): return pixelfed_postmedia(acc_id, video_url, 'mp4', accdata) def pixelfed_postmedia(acc_id, url, ext, accdata=None): return _pixelfed_postmedia(acc_id, pixelfed_cachemedia(url, ext), accdata) def _pixelfed_postmedia(acc_id, cachef, accdata=None): if accdata is None: accdata = db_get('accounts', acc_id) _, _token = pixelfed_token_url('', accdata['cookie']) r = requests.post( 'https://'+config()['instance']+'/api/compose/v0/media/upload',\ files={'file': open(cachef, 'rb')}, cookies=accdata['cookie'],\ headers={ 'X-CSRF-TOKEN': _token, 'X-Requested-With': 'XMLHttpRequest', 'X-XSRF-TOKEN': accdata['cookie']['XSRF-TOKEN'] } ) if r.status_code == 200: return _token, json.loads(r.text) return None, False # get the image by URL but cache it forever, as if the profile changes the pic # the url will be different, and therefore, the sum will also be different def pixelfed_cacheimg(image_url): return pixelfed_cachemedia(image_url, 'jpg') def pixelfed_cachevid(video_url): return pixelfed_cachemedia(image_url, 'mp4') def pixelfed_cachemedia(url, ext='jpg'): cachef = './cache/{}.{}'.format(md5sum(url), ext) if not os.path.exists(cachef): r = requests.get(url) w = open(cachef, 'wb') w.write(r.content) w.close() return cachef def pixelfed_setpic(acc_id, pic_url, count=0): count += 1 pixelfed_login(acc_id) cachef = pixelfed_cacheimg(pic_url) accdata = db_get('accounts', acc_id) print('I| setting avatar for "{}" '.format(acc_id), end="") _, _token = pixelfed_token_url('/settings/home', accdata['cookie']) r = requests.post( 'https://'+config()['instance']+'/settings/avatar',\ data={'_token': _token}, cookies=accdata['cookie'], files={'avatar': open(cachef, 'rb')} ) # try to login if the upload failed if r.status_code == 419 and count < 3: print('err (login required)') pixelfed_login(acc_id, True) return pixelfed_setpic(acc_id, pic_url, count) print('ok') return True def pixelfed_setinfo(acc_id, bio, website, count=0): accdata = db_get('accounts', acc_id) name = accdata['name'] + ' [Mirror]' if count == 0: bio = 'Mirrored from Instagram: instagram.com/{} | {}'.format(acc_id, bio) count += 1 pixelfed_login(acc_id) print('I| setting account-info for "{}" '.format(acc_id), end="") _, _token = pixelfed_token_url('/settings/home', accdata['cookie']) r = requests.post( 'https://'+config()['instance']+'/settings/home',\ data={ '_token': _token, 'name': name, 'website': website, 'bio': bio, 'language': 'en' }, cookies=accdata['cookie'] ) # try to login if the upload failed if r.status_code == 419 and count < 3: print('err (login required)') pixelfed_login(acc_id, True) return pixelfed_setinfo(acc_id, bio, website, count) print('ok') return True def pixelfed_htmlfill_mirrors(html): accounts = os.listdir('./db/accounts') mirr_html = '' for acc_id in sorted(set(accounts)): accdata = db_get('accounts', acc_id) mirr_html += """

{1}

@{0}
""".format( accdata['username'], htmlesc(accdata['name']), \ config()['instance'] ) html = html.replace('{mirrors}', mirr_html) html = html.replace('{item_count}', str(len(accounts)) ) html = html.replace('{instance}', config()['instance']) return html def htmlesc(strr): return strr.replace('&', '&')\ .replace('<', '<')\ .replace('>', '>') def random_string(count=32): return ''.join(random.choices(string.ascii_uppercase + string.ascii_lowercase + string.digits, k=count)) def md5sum(_str): return hashlib.md5(_str.encode()).hexdigest() # get all profile data from user: # - display name # - bio description # - shared posts (images/videos) # - much more info... def getig_user_data(acc_id): return json.loads( instagram_get('/{}/?__a=1'.format(acc_id), 1800) ) def getig_user_fullname(data): if data is None: return '.ERROR.' return re.sub(r'[^a-zA-Z0-9_\s]', '',\ data['graphql']['user']['full_name']) # runs a basic GET request emulating Tor Browser def instagram_get(url, CACHE_SECS=600, add_headers={}): headers = get_random_headers() default_headers = { 'Accept': '*/*', 'Accept-Language': 'en-US,en;q=0.5', 'Accept-Encoding': 'gzip, deflate, br', 'Connection': 'keep-alive', 'Cache-Control': 'no-cache', 'Pragma': 'no-cache', 'Host': 'www.instagram.com', 'Referer': 'https://www.instagram.com/', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; rv:78.0) Gecko/20100101 Firefox/78.0' } for key in default_headers.keys(): if not key in headers: headers[key] = default_headers[key] for key in add_headers.keys(): headers[key] = add_headers[key] if not '.instagram.com/' in url: url = 'https://www.instagram.com{}'.format(url) cachef = './cache/'+md5sum(url) now = str(time.time()) now = int(now[:now.index('.')]) if os.path.exists(cachef): cache = readf(cachef).splitlines() ctime = int(cache[0]) if now < ctime: return '\n'.join(cache[1:]) r = requests.get(url, headers=headers) resp = r.text w = open(cachef, 'w') w.write(str(now+CACHE_SECS) + '\n') w.write(resp) return resp def get_random_headers(): a = os.listdir('./headers') rin = 0 if len(a)-1 > 0: rin = random.randint(0, len(a)-1) lines = readf('./headers/{}'.format(a[rin])).splitlines() headers = {} for line in lines: reg = re.search('(^[^:]+):(.*)', line) headers[reg.group(1).strip()] = reg.group(2).strip() return headers def account_exists(acc_id): return os.path.exists('./db/accounts/{}'.format(acc_id)) def account_config(acc_id, key, value): if not account_exists(acc_id): return False, 'Account does not exist: {}'.format(acc_id) accdata = db_get('accounts', acc_id) key = key.strip() value = value.strip() # make sure passwords or cookie cannot be retrieved if value == '' and not key in ['password', 'cookie']: if not key in accdata: return False, 'Key does not exist yet: {}'.format(key) return True, accdata[key] # protect inmutable keys if key in ['name', 'username', 'password', 'cookie']: return False, 'The given key is inmutable: {}'.format(key) # value has something, so we set it accdata[key] = value db_set('accounts', acc_id, accdata) return True, 'Account configuration saved: {}'.format(key) def db_set(table, acc_id, accdata): w = open('./db/{}/{}'.format(table, acc_id), 'w') w.write(json.dumps(accdata)) w.close() def db_get(table, acc_id): return json.loads(readf('./db/{}/{}'.format(table, acc_id))) def template(name): tplfile = './templates/{}.html'.format(name) if not os.path.exists(tplfile): return False return readf(tplfile) def config(): global CONFIG if len(CONFIG.keys()) == 0: CONFIG = json.loads(readf('./config.json')) return CONFIG def readf(f): r = open(f,'r') c = r.read().strip() r.close() return c if __name__ == '__main__': main()