Notes2/static/js/sync.mjs

227 lines
6.5 KiB
JavaScript

import { API } from 'api'
import { Node } from 'node'
import { CustomHTMLElement } from './lib/custom_html_element.mjs'
export class Sync {
constructor() {//{{{
this.listeners = []
this.messagesReceived = []
}//}}}
async run() {//{{{
try {
let duration = 0 // in ms
// The latest sync node value is used to retrieve the changes
// from the backend.
const state = await nodeStore.getAppState('latest_sync_node')
const oldMax = (state?.value ? state.value : 0)
let nodeCount = await this.getNodeCount(oldMax)
nodeCount += await nodeStore.sendQueue.count()
_mbus.dispatch('SYNC_COUNT', { count: nodeCount })
await this.nodesFromServer(oldMax)
.then(durationNodes => {
duration = durationNodes // in ms
console.log(`Total time: ${Math.round(1000 * durationNodes) / 1000}s`)
})
await this.nodesToServer()
} finally {
_mbus.dispatch('SYNC_DONE')
}
}//}}}
async getNodeCount(oldMax) {//{{{
// Retrieve the amount of values the server will send us.
const res = await API.query('POST', `/sync/from_server/count/${oldMax}`)
return res?.Count
}//}}}
async nodesFromServer(oldMax) {//{{{
const syncStart = Date.now()
let syncEnd
let handled = 0
try {
let currMax = oldMax
let offset = 0
let res = { Continue: false }
let batch = 0
do {
batch++
res = await API.query('POST', `/sync/from_server/${oldMax}/${offset}`)
if (res.Nodes.length > 0)
console.log(`Node sync batch #${batch}`)
offset += res.Nodes.length
currMax = Math.max(currMax, res.MaxSeq)
/* Go through each node and determine if they are older than
* the node in IndexedDB. If they are, they are just history
* and can be ignored since history is currently not stored
* in the browser.
*
* If the backed node is newer, the local node is stored in
* a separate table in IndexedDB to at a later stage in the
* sync be preserved in the backend. */
let backendNode = null
// Create a single transaction to be used in the chain of
// this sync. Otherwise it would take more time to create
// transactions for each node.
const trx = nodeStore.newTransaction('nodes', 'readwrite')
const objstore = trx.objectStore('nodes')
for (const i in res.Nodes) {
backendNode = new Node(res.Nodes[i], -1)
await this.handleNode(backendNode, objstore)
handled++
if (handled % 100 === 0)
_mbus.dispatch('SYNC_HANDLED', { handled })
}
} while (res.Continue)
_mbus.dispatch('SYNC_HANDLED', { handled })
nodeStore.setAppState('latest_sync_node', currMax)
} catch (e) {
console.log('sync node tree', e)
} finally {
syncEnd = Date.now()
const duration = (syncEnd - syncStart) / 1000
const count = await nodeStore.nodeCount()
console.log(`Node sync took ${duration}s`, count)
}
return (syncEnd - syncStart)
}//}}}
async handleNode(backendNode, objstore) {//{{{
try {
/* Retrieving the local copy of this node from IndexedDB.
* The backend node can be discarded if it is older than
* the local copy since it is considered history preserved
* in the backend. */
return nodeStore.get(backendNode.UUID, objstore)
.then(localNode => {
if (localNode.updated() >= backendNode.updated()) {
console.debug(`History from backend: ${backendNode.UUID}`)
return
}
/* If the local node hasn't seen unsynchronized change,
* it can be replaced without anything else being done
* since it is already on the backend server.
*
* If the local node has seen change, the change is already
* placed into the send_queue anyway. */
return nodeStore.add([backendNode], objstore)
})
.catch(() => {
// Not found in IndexedDB - OK to just insert since it only exists in backend.
return nodeStore.add([backendNode], objstore)
})
} catch (e) {
console.error(e)
} finally {
//_mbus.dispatch('SYNC_HANDLED', { count: 1 })
}
}//}}}
async nodesToServer() {//{{{
const BATCH_SIZE = 32
while (true) {
try {
// Send nodes in batches until everything is sent, or an error has occured.
const nodesToSend = await nodeStore.sendQueue.retrieve(BATCH_SIZE)
if (nodesToSend.length === 0)
break
console.debug(`Sending ${nodesToSend.length} node(s) to server`)
const request = {
NodeData: JSON.stringify(nodesToSend),
}
const res = await API.query('POST', '/sync/to_server', request)
if (!res.OK) {
// TODO - implement better error management here.
console.error(res)
alert(res)
return
}
// Nodes are archived on server and can now be deleted from the send queue.
const keys = nodesToSend.map(node => node.ClientSequence)
await nodeStore.sendQueue.delete(keys)
_mbus.dispatch('SYNC_UPLOADED', { count: nodesToSend.length })
} catch (e) {
console.trace(e)
alert(e)
return
}
}
}//}}}
}
export class N2SyncProgress extends CustomHTMLElement {
static {
this.tmpl = document.createElement('template')
this.tmpl.innerHTML = `
<progress data-el="progress" min=0 max=137 value=0></progress>
<div data-el="count" class="count">0 / 0</div>
`
}
constructor() {//{{{
super()
this.reset()
_mbus.subscribe('SYNC_COUNT', event => this.progressHandler(event))
_mbus.subscribe('SYNC_HANDLED', event => this.progressHandler(event))
_mbus.subscribe('SYNC_DONE', event => this.progressHandler(event))
}//}}}
reset() {//{{{
this.state = {
nodesToSync: 0,
nodesSynced: 0,
}
}//}}}
progressHandler(event) {//{{{
const eventData = event.detail.data
switch (event.type) {
case 'SYNC_COUNT':
this.state.nodesToSync = eventData.count
this.setSyncState(true)
break
case 'SYNC_HANDLED':
this.state.nodesSynced = eventData.handled
break
case 'SYNC_DONE':
// Hides the progress bar.
this.setSyncState(false)
// Don't update anything if nothing was synced.
if (this.state.nodesSynced === 0)
break
// Reload the tree nodes to reflect the new/updated nodes.
window._app.tree.reset()
break
}
this.render()
}//}}}
render() {//{{{
this.elProgress.max = this.state.nodesToSync
this.elProgress.value = this.state.nodesSynced
this.elCount.innerText = `${this.state.nodesSynced} / ${this.state.nodesToSync}`
}//}}}
setSyncState(state) {// {{{
if (state)
this.classList.add('show')
else
setTimeout(() => this.classList.remove('show'), 1500)
}// }}}
}
customElements.define('n2-syncprogress', N2SyncProgress)
// vim: foldmethod=marker