252 lines
7.5 KiB
JavaScript
252 lines
7.5 KiB
JavaScript
import { API } from 'api'
|
|
import { Node } from 'node'
|
|
import { CustomHTMLElement } from './lib/custom_html_element.mjs'
|
|
|
|
export class Sync {
|
|
constructor() {//{{{
|
|
this.listeners = []
|
|
this.messagesReceived = []
|
|
}//}}}
|
|
|
|
async run() {//{{{
|
|
try {
|
|
let duration = 0 // in ms
|
|
|
|
// The latest sync node value is used to retrieve the changes
|
|
// from the backend.
|
|
const state = await nodeStore.getAppState('latest_sync_node')
|
|
const oldMax = (state?.value ? state.value : 0)
|
|
|
|
let nodeCountDownload = await this.getNodeCount(oldMax)
|
|
let nodeCountUpload = await nodeStore.sendQueue.count()
|
|
|
|
_mbus.dispatch('SYNC_START')
|
|
_mbus.dispatch('SYNC_DOWNLOAD_COUNT', { count: nodeCountDownload })
|
|
_mbus.dispatch('SYNC_UPLOAD_COUNT', { count: nodeCountUpload })
|
|
|
|
await this.nodesFromServer(oldMax)
|
|
.then(durationNodes => {
|
|
duration = durationNodes // in ms
|
|
console.log(`Total time: ${Math.round(1000 * durationNodes) / 1000}s`)
|
|
})
|
|
|
|
// Uploads of modified nodes to server.
|
|
await this.nodesToServer()
|
|
} finally {
|
|
_mbus.dispatch('SYNC_DONE')
|
|
}
|
|
}//}}}
|
|
async getNodeCount(oldMax) {//{{{
|
|
// Retrieve the amount of values the server will send us.
|
|
const res = await API.query('POST', `/sync/from_server/count/${oldMax}`)
|
|
return res?.Count
|
|
}//}}}
|
|
async nodesFromServer(oldMax) {//{{{
|
|
const syncStart = Date.now()
|
|
let syncEnd
|
|
let handled = 0
|
|
try {
|
|
let currMax = oldMax
|
|
let offset = 0
|
|
let res = { Continue: false }
|
|
let batch = 0
|
|
do {
|
|
batch++
|
|
res = await API.query('POST', `/sync/from_server/${oldMax}/${offset}`)
|
|
if (res.Nodes.length > 0)
|
|
console.log(`Node sync batch #${batch}`)
|
|
offset += res.Nodes.length
|
|
currMax = Math.max(currMax, res.MaxSeq)
|
|
|
|
/* Go through each node and determine if they are older than
|
|
* the node in IndexedDB. If they are, they are just history
|
|
* and can be ignored since history is currently not stored
|
|
* in the browser.
|
|
*
|
|
* If the backed node is newer, the local node is stored in
|
|
* a separate table in IndexedDB to at a later stage in the
|
|
* sync be preserved in the backend. */
|
|
|
|
let backendNode = null
|
|
|
|
// Create a single transaction to be used in the chain of
|
|
// this sync. Otherwise it would take more time to create
|
|
// transactions for each node.
|
|
const trx = nodeStore.newTransaction('nodes', 'readwrite')
|
|
const objstore = trx.objectStore('nodes')
|
|
|
|
for (const i in res.Nodes) {
|
|
backendNode = new Node(res.Nodes[i], -1)
|
|
await this.handleNode(backendNode, objstore)
|
|
|
|
handled++
|
|
if (handled % 100 === 0)
|
|
_mbus.dispatch('SYNC_DOWNLOADED', { handled })
|
|
}
|
|
|
|
} while (res.Continue)
|
|
_mbus.dispatch('SYNC_DOWNLOADED', { handled })
|
|
|
|
nodeStore.setAppState('latest_sync_node', currMax)
|
|
} catch (e) {
|
|
console.error('sync node tree', e)
|
|
alert(e.message)
|
|
} finally {
|
|
syncEnd = Date.now()
|
|
const duration = (syncEnd - syncStart) / 1000
|
|
const count = await nodeStore.nodeCount()
|
|
console.log(`Node sync took ${duration}s`, count)
|
|
}
|
|
return (syncEnd - syncStart)
|
|
}//}}}
|
|
async handleNode(backendNode, objstore) {//{{{
|
|
try {
|
|
/* Retrieving the local copy of this node from IndexedDB.
|
|
* The backend node can be discarded if it is older than
|
|
* the local copy since it is considered history preserved
|
|
* in the backend. */
|
|
return nodeStore.get(backendNode.UUID, objstore)
|
|
.then(localNode => {
|
|
if (localNode.updated() >= backendNode.updated()) {
|
|
console.debug(`History from backend: ${backendNode.UUID}`)
|
|
return
|
|
}
|
|
|
|
/* If the local node hasn't seen unsynchronized change,
|
|
* it can be replaced without anything else being done
|
|
* since it is already on the backend server.
|
|
*
|
|
* If the local node has seen change, the change is already
|
|
* placed into the send_queue anyway. */
|
|
return nodeStore.add([backendNode], objstore)
|
|
|
|
})
|
|
.catch(() => {
|
|
// Not found in IndexedDB - OK to just insert since it only exists in backend.
|
|
return nodeStore.add([backendNode], objstore)
|
|
})
|
|
} catch (e) {
|
|
console.error(e)
|
|
} finally {
|
|
//_mbus.dispatch('SYNC_HANDLED', { count: 1 })
|
|
}
|
|
}//}}}
|
|
async nodesToServer() {//{{{
|
|
const BATCH_SIZE = 32
|
|
while (true) {
|
|
try {
|
|
// Send nodes in batches until everything is sent, or an error has occured.
|
|
const nodesToSend = await nodeStore.sendQueue.retrieve(BATCH_SIZE)
|
|
if (nodesToSend.length === 0)
|
|
break
|
|
console.debug(`Sending ${nodesToSend.length} node(s) to server`)
|
|
|
|
const request = {
|
|
NodeData: JSON.stringify(nodesToSend),
|
|
}
|
|
const res = await API.query('POST', '/sync/to_server', request)
|
|
if (!res.OK) {
|
|
// TODO - implement better error management here.
|
|
console.error(res)
|
|
alert(res)
|
|
return
|
|
}
|
|
|
|
// Nodes are archived on server and can now be deleted from the send queue.
|
|
const keys = nodesToSend.map(node => node.ClientSequence)
|
|
await nodeStore.sendQueue.delete(keys)
|
|
_mbus.dispatch('SYNC_UPLOADED', { count: nodesToSend.length })
|
|
|
|
} catch (e) {
|
|
console.error(e)
|
|
alert(e.message)
|
|
return
|
|
}
|
|
}
|
|
}//}}}
|
|
}
|
|
|
|
export class N2SyncProgress extends CustomHTMLElement {
|
|
static {// {{{
|
|
this.tmpl = document.createElement('template')
|
|
this.tmpl.innerHTML = `
|
|
<img src="/images/${_VERSION}/icon_transfer.svg">
|
|
<div data-el="download-transferred" class="count">0</div> <div>/</div> <div data-el="download-total">0</div>
|
|
<div data-el="upload-transferred" class="count">0</div> <div>/</div> <div data-el="upload-total">0</div>
|
|
`
|
|
}// }}}
|
|
constructor() {//{{{
|
|
super()
|
|
this.reset()
|
|
_mbus.subscribe('SYNC_START', () => this.reset())
|
|
_mbus.subscribe('SYNC_DOWNLOAD_COUNT', event => this.progressHandler(event))
|
|
_mbus.subscribe('SYNC_UPLOAD_COUNT', event => this.progressHandler(event))
|
|
_mbus.subscribe('SYNC_DOWNLOADED', event => this.progressHandler(event))
|
|
_mbus.subscribe('SYNC_UPLOADED', event => this.progressHandler(event))
|
|
_mbus.subscribe('SYNC_DONE', event => this.progressHandler(event))
|
|
}//}}}
|
|
reset() {//{{{
|
|
this.classList.remove('ok')
|
|
this.state = {
|
|
nodesToDownload: 0,
|
|
nodesToUpload: 0,
|
|
nodesDowloaded: 0,
|
|
nodesUploaded: 0,
|
|
}
|
|
this.render()
|
|
}//}}}
|
|
progressHandler(event) {//{{{
|
|
const eventData = event.detail.data
|
|
switch (event.type) {
|
|
case 'SYNC_DOWNLOAD_COUNT':
|
|
this.state.nodesToDownload = eventData.count
|
|
this.setSyncState(true)
|
|
break
|
|
|
|
case 'SYNC_UPLOAD_COUNT':
|
|
this.state.nodesToUpload = eventData.count
|
|
this.setSyncState(true)
|
|
break
|
|
|
|
case 'SYNC_DOWNLOADED':
|
|
this.state.nodesDowloaded = eventData.handled
|
|
break
|
|
|
|
case 'SYNC_UPLOADED':
|
|
this.state.nodesUploaded += eventData.count
|
|
break
|
|
|
|
case 'SYNC_DONE':
|
|
this.classList.add('ok')
|
|
|
|
// Hides the progress bar.
|
|
this.setSyncState(false)
|
|
|
|
// Don't update anything if nothing was synced.
|
|
if (this.state.nodesDowloaded === 0)
|
|
break
|
|
|
|
// Reload the tree nodes to reflect the new/updated nodes.
|
|
window._app.sidebar.reset()
|
|
break
|
|
}
|
|
this.render()
|
|
}//}}}
|
|
render() {//{{{
|
|
this.elDownloadTransferred.innerText = this.state.nodesDowloaded
|
|
this.elDownloadTotal.innerText = this.state.nodesToDownload
|
|
|
|
this.elUploadTransferred.innerText = this.state.nodesUploaded
|
|
this.elUploadTotal.innerText = this.state.nodesToUpload
|
|
}//}}}
|
|
setSyncState(state) {// {{{
|
|
if (state)
|
|
this.classList.add('show')
|
|
else
|
|
// Give the user a chance to see what it ended on.
|
|
setTimeout(() => this.classList.remove('show'), 1500)
|
|
}// }}}
|
|
}
|
|
customElements.define('n2-syncprogress', N2SyncProgress)
|
|
|
|
// vim: foldmethod=marker
|