From 5a1016d1c9aecbd28ec447ff942b3701793bf348 Mon Sep 17 00:00:00 2001 From: Nolan Lawson Date: Fri, 16 Feb 2018 19:38:21 -0800 Subject: [PATCH] handle streamed deletions --- routes/_actions/addStatusOrNotification.js | 55 +++++++++++++ routes/_actions/deleteStatuses.js | 41 ++++++++++ routes/_actions/statuses.js | 22 +++++ routes/_actions/streaming.js | 73 ++--------------- routes/_database/cache.js | 5 ++ routes/_database/databaseLifecycle.js | 41 ++++++---- routes/_database/timelines.js | 94 +++++++++++++++++++++- 7 files changed, 246 insertions(+), 85 deletions(-) create mode 100644 routes/_actions/addStatusOrNotification.js create mode 100644 routes/_actions/deleteStatuses.js create mode 100644 routes/_actions/statuses.js diff --git a/routes/_actions/addStatusOrNotification.js b/routes/_actions/addStatusOrNotification.js new file mode 100644 index 00000000..d832f205 --- /dev/null +++ b/routes/_actions/addStatusOrNotification.js @@ -0,0 +1,55 @@ +import throttle from 'lodash/throttle' +import { getIdsThatTheseStatusesReblogged } from './statuses' +import { database } from '../_database/database' +import { mark, stop } from '../_utils/marks' +import { store } from '../_store/store' +import { scheduleIdleTask } from '../_utils/scheduleIdleTask' + +async function getExistingItemIdsSet (instanceName, timelineName) { + let timelineItemIds = store.getForTimeline(instanceName, timelineName, 'timelineItemIds') || [] + if (timelineName === 'notifications') { + return new Set(timelineItemIds) + } + let reblogIds = await getIdsThatTheseStatusesReblogged(instanceName, timelineItemIds) + return new Set([].concat(timelineItemIds).concat(reblogIds)) +} + +async function removeDuplicates (instanceName, timelineName, updates) { + // remove duplicates, including duplicates due to reblogs + let existingItemIds = await getExistingItemIdsSet(instanceName, timelineName) + return updates.filter(update => !existingItemIds.has(update.id)) +} + +async function processFreshUpdates (instanceName, timelineName) { + mark('processFreshUpdates') + let freshUpdates = store.getForTimeline(instanceName, timelineName, 'freshUpdates') + if (freshUpdates && freshUpdates.length) { + let updates = freshUpdates.slice() + store.setForTimeline(instanceName, timelineName, {freshUpdates: []}) + + updates = await removeDuplicates(instanceName, timelineName, updates) + + await database.insertTimelineItems(instanceName, timelineName, updates) + + let itemIdsToAdd = store.getForTimeline(instanceName, timelineName, 'itemIdsToAdd') || [] + if (updates && updates.length) { + itemIdsToAdd = itemIdsToAdd.concat(updates.map(_ => _.id)) + console.log('adding ', itemIdsToAdd.length, 'items to itemIdsToAdd') + store.setForTimeline(instanceName, timelineName, {itemIdsToAdd: itemIdsToAdd}) + } + stop('processFreshUpdates') + } +} + +const lazilyProcessFreshUpdates = throttle((instanceName, timelineName) => { + scheduleIdleTask(() => { + /* no await */ processFreshUpdates(instanceName, timelineName) + }) +}, 5000) + +export function addStatusOrNotification (instanceName, timelineName, newStatusOrNotification) { + let freshUpdates = store.getForTimeline(instanceName, timelineName, 'freshUpdates') || [] + freshUpdates.push(newStatusOrNotification) + store.setForTimeline(instanceName, timelineName, {freshUpdates: freshUpdates}) + lazilyProcessFreshUpdates(instanceName, timelineName) +} diff --git a/routes/_actions/deleteStatuses.js b/routes/_actions/deleteStatuses.js new file mode 100644 index 00000000..27022c62 --- /dev/null +++ b/routes/_actions/deleteStatuses.js @@ -0,0 +1,41 @@ +import { getIdsThatRebloggedThisStatus, getIdThatThisStatusReblogged, getNotificationIdsForStatuses } from './statuses' +import { store } from '../_store/store' +import { scheduleIdleTask } from '../_utils/scheduleIdleTask' +import { database } from '../_database/database' +import identity from 'lodash/identity' + +function deleteStatusIdsFromStore (instanceName, idsToDelete) { + let idsToDeleteSet = new Set(idsToDelete) + let timelines = store.get('timelines') + if (timelines && timelines[instanceName]) { + Object.keys(timelines[instanceName]).forEach(timelineName => { + let timelineData = timelines[instanceName][timelineName] + if (timelineName !== 'notifications') { + timelineData.timelineItemIds = timelineData.timelineItemIds.filter(_ => !idsToDeleteSet.has(_)) + timelineData.itemIdsToAdd = timelineData.itemIdsToAdd.filter(_ => !idsToDeleteSet.has(_)) + } + }) + store.set({timelines: timelines}) + } +} + +async function deleteStatusesAndNotifications (instanceName, statusIdsToDelete, notificationIdsToDelete) { + deleteStatusIdsFromStore(instanceName, statusIdsToDelete) + await database.deleteStatusesAndNotifications(instanceName, statusIdsToDelete, notificationIdsToDelete) +} + +async function doDeleteStatus (instanceName, statusId) { + let reblogId = await getIdThatThisStatusReblogged(instanceName, statusId) + let rebloggedIds = await getIdsThatRebloggedThisStatus(reblogId || statusId) + let statusIdsToDelete = Array.from(new Set([statusId, reblogId].concat(rebloggedIds).filter(identity))) + let notificationIdsToDelete = new Set(await getNotificationIdsForStatuses(instanceName, statusIdsToDelete)) + await Promise.all([ + deleteStatusesAndNotifications(instanceName, statusIdsToDelete, notificationIdsToDelete) + ]) +} + +export function deleteStatus (instanceName, statusId) { + scheduleIdleTask(() => { + /* no await */ doDeleteStatus(instanceName, statusId) + }) +} diff --git a/routes/_actions/statuses.js b/routes/_actions/statuses.js new file mode 100644 index 00000000..7c47ec0c --- /dev/null +++ b/routes/_actions/statuses.js @@ -0,0 +1,22 @@ +import identity from 'lodash/identity' +import { database } from '../_database/database' + +export async function getIdThatThisStatusReblogged (instanceName, statusId) { + let status = await database.getStatus(instanceName, statusId) + return status.reblog && status.reblog.id +} + +export async function getIdsThatTheseStatusesReblogged (instanceName, statusIds) { + let reblogIds = await Promise.all(statusIds.map(async statusId => { + return getIdThatThisStatusReblogged(instanceName, statusId) + })) + return reblogIds.filter(identity) +} + +export async function getIdsThatRebloggedThisStatus (instanceName, statusId) { + return database.getReblogsForStatus(instanceName, statusId) +} + +export async function getNotificationIdsForStatuses (instanceName, statusIds) { + return database.getNotificationIdsForStatuses(instanceName, statusIds) +} diff --git a/routes/_actions/streaming.js b/routes/_actions/streaming.js index 9c4fb862..8a972637 100644 --- a/routes/_actions/streaming.js +++ b/routes/_actions/streaming.js @@ -1,71 +1,8 @@ import { TimelineStream } from '../_api/TimelineStream' -import identity from 'lodash/identity' -import { database } from '../_database/database' -import { store } from '../_store/store' import { scheduleIdleTask } from '../_utils/scheduleIdleTask' -import throttle from 'lodash/throttle' import { mark, stop } from '../_utils/marks' - -async function getReblogIds (instanceName, statusIds) { - let reblogIds = await Promise.all(statusIds.map(async timelineItemId => { - let status = await database.getStatus(instanceName, timelineItemId) - return status.reblog && status.reblog.id - })) - return reblogIds.filter(identity) -} - -async function getExistingItemIdsSet (instanceName, timelineName) { - let timelineItemIds = store.getForTimeline(instanceName, timelineName, 'timelineItemIds') || [] - if (timelineName === 'notifications') { - return new Set(timelineItemIds) - } - let reblogIds = await getReblogIds(instanceName, timelineItemIds) - return new Set([].concat(timelineItemIds).concat(reblogIds)) -} - -async function removeDuplicates (instanceName, timelineName, updates) { - // remove duplicates, including duplicates due to reblogs - let existingItemIds = await getExistingItemIdsSet(instanceName, timelineName) - return updates.filter(update => !existingItemIds.has(update.id)) -} - -async function processFreshUpdates (instanceName, timelineName) { - mark('processFreshUpdates') - let freshUpdates = store.getForTimeline(instanceName, timelineName, 'freshUpdates') - if (freshUpdates && freshUpdates.length) { - let updates = freshUpdates.slice() - store.setForTimeline(instanceName, timelineName, {freshUpdates: []}) - - updates = await removeDuplicates(instanceName, timelineName, updates) - - await database.insertTimelineItems(instanceName, timelineName, updates) - - let itemIdsToAdd = store.getForTimeline(instanceName, timelineName, 'itemIdsToAdd') || [] - if (updates && updates.length) { - itemIdsToAdd = itemIdsToAdd.concat(updates.map(_ => _.id)) - console.log('adding ', itemIdsToAdd.length, 'items to itemIdsToAdd') - store.setForTimeline(instanceName, timelineName, {itemIdsToAdd: itemIdsToAdd}) - } - stop('processFreshUpdates') - } -} - -const lazilyProcessFreshUpdates = throttle((instanceName, timelineName) => { - scheduleIdleTask(() => { - /* no await */ processFreshUpdates(instanceName, timelineName) - }) -}, 5000) - -function processUpdate (instanceName, timelineName, update) { - let freshUpdates = store.getForTimeline(instanceName, timelineName, 'freshUpdates') || [] - freshUpdates.push(update) - store.setForTimeline(instanceName, timelineName, {freshUpdates: freshUpdates}) - lazilyProcessFreshUpdates(instanceName, timelineName) -} - -function processDelete (instanceName, deletion) { - // TODO -} +import { deleteStatus } from './deleteStatuses' +import { addStatusOrNotification } from './addStatusOrNotification' function processMessage (instanceName, timelineName, message) { mark('processMessage') @@ -73,13 +10,13 @@ function processMessage (instanceName, timelineName, message) { let parsedPayload = JSON.parse(payload) switch (event) { case 'delete': - processDelete(instanceName, parsedPayload) + deleteStatus(instanceName, parsedPayload) break case 'update': - processUpdate(instanceName, timelineName, parsedPayload) + addStatusOrNotification(instanceName, timelineName, parsedPayload) break case 'notification': - processUpdate(instanceName, 'notifications', parsedPayload) + addStatusOrNotification(instanceName, 'notifications', parsedPayload) break } stop('processMessage') diff --git a/routes/_database/cache.js b/routes/_database/cache.js index bb7ee89a..2f3eed2f 100644 --- a/routes/_database/cache.js +++ b/routes/_database/cache.js @@ -64,3 +64,8 @@ export function hasInCache (cache, instanceName, key) { } return res } + +export function deleteFromCache (cache, instanceName, key) { + let instanceCache = getOrCreateInstanceCache(cache, instanceName) + instanceCache.delete(key) +} diff --git a/routes/_database/databaseLifecycle.js b/routes/_database/databaseLifecycle.js index 4ad593e7..6da6ae2f 100644 --- a/routes/_database/databaseLifecycle.js +++ b/routes/_database/databaseLifecycle.js @@ -7,13 +7,13 @@ import { NOTIFICATIONS_STORE, NOTIFICATION_TIMELINES_STORE, PINNED_STATUSES_STORE, - TIMESTAMP + TIMESTAMP, REBLOG_ID } from './constants' const openReqs = {} const databaseCache = {} -const DB_VERSION = 1 +const DB_VERSION = 3 export function getDatabase (instanceName) { if (!instanceName) { @@ -32,20 +32,29 @@ export function getDatabase (instanceName) { } req.onupgradeneeded = (e) => { let db = req.result - db.createObjectStore(STATUSES_STORE, {keyPath: 'id'}) - .createIndex(TIMESTAMP, TIMESTAMP) - db.createObjectStore(STATUS_TIMELINES_STORE, {keyPath: 'id'}) - .createIndex('statusId', 'statusId') - db.createObjectStore(NOTIFICATIONS_STORE, {keyPath: 'id'}) - .createIndex(TIMESTAMP, TIMESTAMP) - db.createObjectStore(NOTIFICATION_TIMELINES_STORE, {keyPath: 'id'}) - .createIndex('notificationId', 'notificationId') - db.createObjectStore(ACCOUNTS_STORE, {keyPath: 'id'}) - .createIndex(TIMESTAMP, TIMESTAMP) - db.createObjectStore(RELATIONSHIPS_STORE, {keyPath: 'id'}) - .createIndex(TIMESTAMP, TIMESTAMP) - db.createObjectStore(META_STORE, {keyPath: 'key'}) - db.createObjectStore(PINNED_STATUSES_STORE, {keyPath: 'id'}) + let tx = e.currentTarget.transaction + if (e.oldVersion < 1) { + db.createObjectStore(STATUSES_STORE, {keyPath: 'id'}) + .createIndex(TIMESTAMP, TIMESTAMP) + db.createObjectStore(STATUS_TIMELINES_STORE, {keyPath: 'id'}) + .createIndex('statusId', 'statusId') + db.createObjectStore(NOTIFICATIONS_STORE, {keyPath: 'id'}) + .createIndex(TIMESTAMP, TIMESTAMP) + db.createObjectStore(NOTIFICATION_TIMELINES_STORE, {keyPath: 'id'}) + .createIndex('notificationId', 'notificationId') + db.createObjectStore(ACCOUNTS_STORE, {keyPath: 'id'}) + .createIndex(TIMESTAMP, TIMESTAMP) + db.createObjectStore(RELATIONSHIPS_STORE, {keyPath: 'id'}) + .createIndex(TIMESTAMP, TIMESTAMP) + db.createObjectStore(META_STORE, {keyPath: 'key'}) + db.createObjectStore(PINNED_STATUSES_STORE, {keyPath: 'id'}) + } + if (e.oldVersion < 2) { + tx.objectStore(STATUSES_STORE).createIndex(REBLOG_ID, REBLOG_ID) + } + if (e.oldVersion < 3) { + tx.objectStore(NOTIFICATIONS_STORE).createIndex('statusId', 'statusId') + } } req.onsuccess = () => resolve(req.result) }) diff --git a/routes/_database/timelines.js b/routes/_database/timelines.js index 22d777b0..75cd760a 100644 --- a/routes/_database/timelines.js +++ b/routes/_database/timelines.js @@ -1,7 +1,10 @@ import { toPaddedBigInt, toReversePaddedBigInt } from './utils' import { cloneForStorage } from './helpers' import { dbPromise, getDatabase } from './databaseLifecycle' -import { accountsCache, getInCache, hasInCache, notificationsCache, setInCache, statusesCache } from './cache' +import { + accountsCache, deleteFromCache, getInCache, hasInCache, notificationsCache, setInCache, + statusesCache +} from './cache' import { scheduleCleanup } from './cleanup' import { ACCOUNTS_STORE, @@ -253,6 +256,82 @@ export async function getNotification (instanceName, id) { return result } +// +// lookup by reblogs +// + +export async function getReblogsForStatus (instanceName, id) { + const db = await getDatabase(instanceName) + await dbPromise(db, STATUSES_STORE, 'readonly', (statusesStore, callback) => { + statusesStore.index(REBLOG_ID).getAll(IDBKeyRange.only(id)).onsuccess = e => { + callback(e.target.result) + } + }) +} + +// +// deletes +// + +export async function deleteStatusesAndNotifications (instanceName, statusIds, notificationIds) { + for (let statusId of statusIds) { + deleteFromCache(statusesCache, instanceName, statusId) + } + for (let notificationId of notificationIds) { + deleteFromCache(notificationsCache, instanceName, notificationId) + } + const db = await getDatabase(instanceName) + let storeNames = [ + STATUSES_STORE, + STATUS_TIMELINES_STORE, + NOTIFICATIONS_STORE, + NOTIFICATION_TIMELINES_STORE, + PINNED_STATUSES_STORE + ] + await dbPromise(db, storeNames, 'readwrite', (stores) => { + let [ + statusesStore, + statusTimelinesStore, + notificationsStore, + notificationTimelinesStore, + pinnedStatusesStore + ] = stores + + function deleteStatus (statusId) { + pinnedStatusesStore.delete(statusId).onerror = e => { + e.preventDefault() + e.stopPropagation() + } + statusesStore.delete(statusId) + let getAllReq = statusTimelinesStore.index('statusId') + .getAllKeys(IDBKeyRange.only(statusId)) + getAllReq.onsuccess = e => { + for (let result of e.target.result) { + statusTimelinesStore.delete(result) + } + } + } + + function deleteNotification (notificationId) { + notificationsStore.delete(notificationId) + let getAllReq = notificationTimelinesStore.index('statusId') + .getAllKeys(IDBKeyRange.only(notificationId)) + getAllReq.onsuccess = e => { + for (let result of e.target.result) { + notificationTimelinesStore.delete(result) + } + } + } + + for (let statusId of statusIds) { + deleteStatus(statusId) + } + for (let notificationId of notificationIds) { + deleteNotification(notificationId) + } + }) +} + // // pinned statuses // @@ -296,3 +375,16 @@ export async function getPinnedStatuses (instanceName, accountId) { } }) } + +// +// notifications by status +// + +export async function getNotificationIdsForStatus (instanceName, statusId) { + const db = await getDatabase(instanceName) + return dbPromise(db, NOTIFICATIONS_STORE, 'readonly', (notificationStore, callback) => { + notificationStore.index(statusId).getAllKeys(IDBKeyRange.only(statusId)).onsuccess = e => { + callback(Array.from(e.target.result)) + } + }) +}