import { API } from 'api' import { Node } from 'node' import { h, Component } from 'preact' import htm from 'htm' const html = htm.bind(h) const SYNC_COUNT = 1 const SYNC_HANDLED = 2 const SYNC_DONE = 3 export class Sync { constructor() {//{{{ this.listeners = [] this.messagesReceived = [] }//}}} addListener(fn, runMessageQueue) {//{{{ // Some handlers won't be added until a time after sync messages have been added to the queue. // This is an opportunity for the handler to receive the old messages in order. if (runMessageQueue) for (const msg of this.messagesReceived) fn(msg) this.listeners.push(fn) }//}}} pushMessage(msg) {//{{{ this.messagesReceived.push(msg) for (const fn of this.listeners) fn(msg) }//}}} async run() {//{{{ try { let duration = 0 // in ms // The latest sync node value is used to retrieve the changes // from the backend. const state = await nodeStore.getAppState('latest_sync_node') const oldMax = (state?.value ? state.value : 0) let nodeCount = await this.getNodeCount(oldMax) nodeCount += await nodeStore.sendQueue.count() const msg = { op: SYNC_COUNT, count: nodeCount } this.pushMessage(msg) await this.nodesFromServer(oldMax) .then(durationNodes => { duration = durationNodes // in ms console.log(`Total time: ${Math.round(1000 * durationNodes) / 1000}s`) }) await this.nodesToServer() } finally { this.pushMessage({ op: SYNC_DONE }) } }//}}} async getNodeCount(oldMax) {//{{{ // Retrieve the amount of values the server will send us. const res = await API.query('POST', `/sync/from_server/count/${oldMax}`) return res?.Count }//}}} async nodesFromServer(oldMax) {//{{{ const syncStart = Date.now() let syncEnd try { let currMax = oldMax let offset = 0 let res = { Continue: false } let batch = 0 do { batch++ res = await API.query('POST', `/sync/from_server/${oldMax}/${offset}`) if (res.Nodes.length > 0) console.log(`Node sync batch #${batch}`) offset += res.Nodes.length currMax = Math.max(currMax, res.MaxSeq) /* Go through each node and determine if they are older than * the node in IndexedDB. If they are, they are just history * and can be ignored since history is currently not stored * in the browser. * * If the backed node is newer, the local node is stored in * a separate table in IndexedDB to at a later stage in the * sync be preserved in the backend. */ let backendNode = null for (const i in res.Nodes) { backendNode = new Node(res.Nodes[i], -1) await window._sync.handleNode(backendNode) } } while (res.Continue) nodeStore.setAppState('latest_sync_node', currMax) } catch (e) { console.log('sync node tree', e) } finally { syncEnd = Date.now() const duration = (syncEnd - syncStart) / 1000 const count = await nodeStore.nodeCount() console.log(`Node sync took ${duration}s`, count) } return (syncEnd - syncStart) }//}}} async handleNode(backendNode) {//{{{ try { /* Retrieving the local copy of this node from IndexedDB. * The backend node can be discarded if it is older than * the local copy since it is considered history preserved * in the backend. */ return nodeStore.get(backendNode.UUID) .then(async localNode => { if (localNode.updated() >= backendNode.updated()) { console.log(`History from backend: ${backendNode.UUID}`) return } /* If the local node hasn't seen unsynchronized change, * it can be replaced without anything else being done * since it is already on the backend server. * * If the local node has seen change, the change is already * placed into the send_queue anyway. */ return nodeStore.add([backendNode]) }) .catch(async () => { // Not found in IndexedDB - OK to just insert since it only exists in backend. return nodeStore.add([backendNode]) }) } catch (e) { console.error(e) } finally { this.pushMessage({ op: SYNC_HANDLED, count: 1 }) } }//}}} async nodesToServer() {//{{{ const BATCH_SIZE = 32 while (true) { try { // Send nodes in batches until everything is sent, or an error has occured. const nodesToSend = await nodeStore.sendQueue.retrieve(BATCH_SIZE) if (nodesToSend.length === 0) break console.debug(`Sending ${nodesToSend.length} node(s) to server`) const request = { NodeData: JSON.stringify(nodesToSend), } const res = await API.query('POST', '/sync/to_server', request) if (!res.OK) { // TODO - implement better error management here. console.log(res) alert(res) return } // Nodes are archived on server and can now be deleted from the send queue. const keys = nodesToSend.map(node => node.ClientSequence) await nodeStore.sendQueue.delete(keys) this.pushMessage({ op: SYNC_HANDLED, count: nodesToSend.length }) } catch (e) { console.trace(e) alert(e) return } } }//}}} } export class SyncProgress extends Component { constructor() {//{{{ super() this.reset() }//}}} reset() {//{{{ this.forceUpdateRequest = null this.state = { nodesToSync: 0, nodesSynced: 0, syncedDone: false, } document.getElementById('sync-progress')?.classList.remove('hidden') }//}}} componentDidMount() {//{{{ window._sync.addListener(msg => this.progressHandler(msg), true) }//}}} getSnapshotBeforeUpdate(_, prevState) {//{{{ if (!prevState.syncedDone && this.state.syncedDone) setTimeout(() => document.getElementById('sync-progress')?.classList.add('hidden'), 750) }//}}} componentDidUpdate() {//{{{ if (!this.state.syncedDone) { if (this.forceUpdateRequest !== null) clearTimeout(this.forceUpdateRequest) this.forceUpdateRequest = setTimeout( () => { this.forceUpdateRequest = null this.forceUpdate() }, 50 ) } }//}}} progressHandler(msg) {//{{{ switch (msg.op) { case SYNC_COUNT: this.setState({ nodesToSync: msg.count }) break case SYNC_HANDLED: this.state.nodesSynced += msg.count break case SYNC_DONE: // Hides the progress bar. this.setState({ syncedDone: true }) // Don't update anything if nothing was synced. if (this.state.nodesSynced === 0) break // Reload the tree nodes to reflect the new/updated nodes. if (window._notes2?.current?.reloadTree.value !== null) { nodeStore.purgeCache() window._notes2.current.reloadTree.value = window._notes2.current.reloadTree.value + 1 } break } }//}}} render(_, { nodesToSync, nodesSynced }) {//{{{ if (nodesToSync === 0) return html`
` return html`