Telegram Web K with changes to work inside I2P
https://web.telegram.i2p/
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
289 lines
7.3 KiB
289 lines
7.3 KiB
/* |
|
* https://github.com/morethanwords/tweb |
|
* Copyright (C) 2019-2021 Eduard Kuzmenko |
|
* https://github.com/morethanwords/tweb/blob/master/LICENSE |
|
*/ |
|
|
|
import {logger} from '../lib/logger'; |
|
import insertInDescendSortedArray from './array/insertInDescendSortedArray'; |
|
import {getMiddleware, MiddlewareHelper} from './middleware'; |
|
import middlewarePromise from './middlewarePromise'; |
|
import safeAssign from './object/safeAssign'; |
|
import pause from './schedulers/pause'; |
|
|
|
export type SortedElementBase<T = any> = { |
|
id: T, |
|
index: number |
|
}; |
|
|
|
let id = 0; |
|
|
|
export class BatchProcessor<Item extends any = any> { |
|
protected queue: Promise<Item>[]; |
|
protected promise: Promise<void>; |
|
|
|
protected middlewareHelper: MiddlewareHelper; |
|
protected log: ReturnType<typeof logger>; |
|
|
|
protected process: (batch: Item[], m: ReturnType<typeof middlewarePromise>, log: BatchProcessor['log']) => Promise<any>; |
|
protected possibleError: any; |
|
|
|
constructor(options: { |
|
log?: BatchProcessor['log'], |
|
// middleware: MiddlewareHelper, |
|
process: BatchProcessor<Item>['process'], |
|
possibleError?: BatchProcessor['possibleError'] |
|
}) { |
|
safeAssign(this, options); |
|
|
|
this.queue = []; |
|
this.middlewareHelper ??= getMiddleware(); |
|
|
|
const prefix = 'BATCH-PROCESSOR-' + ++id; |
|
if(this.log) { |
|
this.log = this.log.bindPrefix(prefix); |
|
} else { |
|
this.log = logger(prefix); |
|
} |
|
} |
|
|
|
public get queuePromise() { |
|
return this.promise; |
|
} |
|
|
|
public clear() { |
|
this.log('clear'); |
|
this.queue.length = 0; |
|
this.promise = undefined; |
|
this.middlewareHelper.clean(); |
|
} |
|
|
|
public addToQueue(item: BatchProcessor<Item>['queue'][0]) { |
|
this.queue.push(item); |
|
return this.setQueue(); |
|
} |
|
|
|
protected setQueue() { |
|
if(!this.queue.length) { |
|
return Promise.resolve(); |
|
} |
|
|
|
if(this.promise) { |
|
return this.promise; |
|
} |
|
|
|
const middleware = this.middlewareHelper.get(); |
|
const log = this.log.bindPrefix('queue'); |
|
const m = middlewarePromise(middleware, this.possibleError); |
|
|
|
const processQueue = async(): Promise<void> => { |
|
log('start', this.queue.length); |
|
|
|
const queue = this.queue.splice(0, this.queue.length); |
|
|
|
const perf = performance.now(); |
|
const promises = queue.map((promise) => { |
|
promise.then((details) => { |
|
log('render item time', performance.now() - perf, details); |
|
}); |
|
|
|
return promise; |
|
}); |
|
|
|
const renderedQueue = await m(Promise.all(promises)); |
|
await m(this.process(renderedQueue, m, log)); |
|
|
|
log('queue rendered'); |
|
|
|
if(this.queue.length) { |
|
log('have new items to render'); |
|
return processQueue(); |
|
} else { |
|
log('end'); |
|
} |
|
}; |
|
|
|
log('setting pause'); |
|
const promise = this.promise = m(pause(0)) |
|
.then( |
|
() => processQueue().catch((err: ApiError) => { |
|
if(err !== this.possibleError) { |
|
log.error('process queue error', err); |
|
} |
|
|
|
throw err; |
|
}), |
|
(err) => { |
|
log('pause has been cleared'); |
|
throw err; |
|
} |
|
) |
|
.finally(() => { |
|
if(this.promise === promise) { |
|
this.promise = undefined; |
|
} |
|
}); |
|
|
|
return promise; |
|
} |
|
} |
|
|
|
export default class SortedList<SortedElement extends SortedElementBase, SortedElementId = SortedElement['id']> { |
|
protected elements: Map<SortedElementId, SortedElement>; |
|
protected sorted: Array<SortedElement>; |
|
|
|
protected getIndex: (element: SortedElement) => PromiseLike<number> | number; |
|
protected onDelete: (element: SortedElement) => void; |
|
protected onUpdate: (element: SortedElement) => void; |
|
protected onSort: (element: SortedElement, idx: number) => void; |
|
protected onElementCreate: (base: SortedElementBase) => PromiseLike<SortedElement> | SortedElement; |
|
|
|
protected updateElementWith = (callback: () => void) => callback(); |
|
protected updateListWith = (callback: (canUpdate: boolean | undefined) => void) => callback(true); |
|
|
|
protected middleware: MiddlewareHelper; |
|
|
|
protected batchProcessor: BatchProcessor<SortedElement>; |
|
|
|
protected log: ReturnType<typeof logger>; |
|
|
|
constructor(options: { |
|
getIndex: SortedList<SortedElement>['getIndex'], |
|
onDelete?: SortedList<SortedElement>['onDelete'], |
|
onUpdate?: SortedList<SortedElement>['onUpdate'], |
|
onSort?: SortedList<SortedElement>['onSort'], |
|
onElementCreate: SortedList<SortedElement>['onElementCreate'], |
|
|
|
updateElementWith?: SortedList<SortedElement>['updateElementWith'], |
|
updateListWith?: SortedList<SortedElement>['updateListWith'], |
|
|
|
log?: SortedList<SortedElement>['log'] |
|
}) { |
|
safeAssign(this, options); |
|
|
|
this.elements = new Map(); |
|
this.sorted = []; |
|
this.middleware = getMiddleware(); |
|
|
|
this.batchProcessor = new BatchProcessor<SortedElement>({ |
|
log: this.log, |
|
process: async(batch, m, log) => { |
|
// const elements = await Promise.all(batch.map((element) => this.onElementCreate(element))); |
|
const elements = batch; |
|
const promises = elements.map((element) => this.update(element.id, element)); |
|
await m(Promise.all(promises)); |
|
} |
|
}); |
|
} |
|
|
|
public clear() { |
|
this.batchProcessor.clear(); |
|
this.middleware.clean(); |
|
this.elements.clear(); |
|
this.sorted.length = 0; |
|
} |
|
|
|
protected _updateList() { |
|
this.elements.forEach((element) => { |
|
this.update(element.id); |
|
}); |
|
|
|
if(this.onSort) { |
|
this.sorted.forEach((element, idx) => { |
|
this.onSort(element, idx); |
|
}); |
|
} |
|
} |
|
|
|
public updateList(callback?: (updated: boolean) => void) { |
|
const middleware = this.middleware.get(); |
|
this.updateListWith((canUpdate) => { |
|
if(!middleware() || (canUpdate !== undefined && !canUpdate)) { |
|
callback?.(false); |
|
return; |
|
} |
|
|
|
this._updateList(); |
|
|
|
callback?.(true); |
|
}); |
|
} |
|
|
|
public has(id: SortedElementId) { |
|
return this.elements.has(id); |
|
} |
|
|
|
public get(id: SortedElementId) { |
|
return this.elements.get(id); |
|
} |
|
|
|
public getAll() { |
|
return this.elements; |
|
} |
|
|
|
public async add(id: SortedElementId) { |
|
const element = this.get(id); |
|
if(element) { |
|
return; |
|
// return element; |
|
} |
|
|
|
const base: SortedElementBase = { |
|
id, |
|
index: 0 |
|
}; |
|
|
|
this.elements.set(id, base as SortedElement); |
|
const createPromise = Promise.resolve(this.onElementCreate(base)); |
|
return this.batchProcessor.addToQueue(createPromise); |
|
|
|
// return element; |
|
} |
|
|
|
public delete(id: SortedElementId, noScheduler?: boolean) { |
|
const element = this.elements.get(id); |
|
if(!element) { |
|
return false; |
|
} |
|
|
|
this.elements.delete(id); |
|
|
|
const idx = this.sorted.indexOf(element); |
|
if(idx !== -1) { |
|
this.sorted.splice(idx, 1); |
|
} |
|
|
|
if(this.onDelete) { |
|
if(noScheduler) { |
|
this.onDelete(element); |
|
} else { |
|
const middleware = this.middleware.get(); |
|
this.updateElementWith(() => { |
|
if(!middleware()) { |
|
return; |
|
} |
|
|
|
this.onDelete(element); |
|
}); |
|
} |
|
} |
|
|
|
return true; |
|
} |
|
|
|
public async update(id: SortedElementId, element = this.get(id)) { |
|
if(!element) { |
|
return; |
|
} |
|
|
|
element.index = await this.getIndex(element); |
|
if(this.get(id) !== element) { |
|
return; |
|
} |
|
|
|
this.onUpdate?.(element); |
|
|
|
const idx = insertInDescendSortedArray(this.sorted, element, 'index'); |
|
this.onSort(element, idx); |
|
} |
|
}
|
|
|