Telegram Web K with changes to work inside I2P https://web.telegram.i2p/
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

312 lines
9.7 KiB

/*
* https://github.com/morethanwords/tweb
* Copyright (C) 2019-2021 Eduard Kuzmenko
* https://github.com/morethanwords/tweb/blob/master/LICENSE
*/
import readBlobAsUint8Array from '../../helpers/blob/readBlobAsUint8Array';
import deferredPromise, {CancellablePromise} from '../../helpers/cancellablePromise';
import debounce from '../../helpers/schedulers/debounce';
import {InputFileLocation} from '../../layer';
import CacheStorageController from '../files/cacheStorage';
import {DownloadOptions, MyUploadFile} from '../mtproto/apiFileManager';
import {getMtprotoMessagePort, log, serviceMessagePort} from './index.service';
import {ServiceRequestFilePartTaskPayload} from './serviceMessagePort';
import timeout from './timeout';
const deferredPromises: Map<MessagePort, {[taskId: string]: CancellablePromise<MyUploadFile>}> = new Map();
const cacheStorage = new CacheStorageController('cachedStreamChunks');
const CHUNK_TTL = 86400;
const CHUNK_CACHED_TIME_HEADER = 'Time-Cached';
const USE_CACHE = true;
const clearOldChunks = () => {
return cacheStorage.timeoutOperation((cache) => {
return cache.keys().then((requests) => {
const filtered: Map<StreamId, Request> = new Map();
const timestamp = Date.now() / 1000 | 0;
for(const request of requests) {
const match = request.url.match(/\/(\d+?)\?/);
if(match && !filtered.has(match[1])) {
filtered.set(match[1], request);
}
}
const promises: Promise<any>[] = [];
for(const [id, request] of filtered) {
const promise = cache.match(request).then((response) => {
if((+response.headers.get(CHUNK_CACHED_TIME_HEADER) + CHUNK_TTL) <= timestamp) {
log('will delete stream chunk:', id);
return cache.delete(request, {ignoreSearch: true, ignoreVary: true});
}
});
promises.push(promise);
}
return Promise.all(promises);
});
});
};
setInterval(clearOldChunks, 1800e3);
setInterval(() => {
const mtprotoMessagePort = getMtprotoMessagePort();
for(const [messagePort, promises] of deferredPromises) {
if(messagePort !== mtprotoMessagePort) {
for(const taskId in promises) {
const promise = promises[taskId];
promise.reject();
}
deferredPromises.delete(messagePort);
}
}
}, 120e3);
type StreamRange = [number, number];
type StreamId = DocId;
const streams: Map<StreamId, Stream> = new Map();
class Stream {
private destroyDebounced: () => void;
private id: StreamId;
private limitPart: number;
private loadedOffsets: Set<number> = new Set();
constructor(private info: DownloadOptions) {
this.id = Stream.getId(info);
streams.set(this.id, this);
// ! если грузить очень большое видео чанками по 512Кб в мобильном Safari, то стрим не запустится
this.limitPart = info.size > (75 * 1024 * 1024) ? STREAM_CHUNK_UPPER_LIMIT : STREAM_CHUNK_MIDDLE_LIMIT;
this.destroyDebounced = debounce(this.destroy, 150000, false, true);
}
private destroy = () => {
streams.delete(this.id);
};
private async requestFilePartFromWorker(alignedOffset: number, limit: number, fromPreload = false) {
const payload: ServiceRequestFilePartTaskPayload = {
docId: this.id,
dcId: this.info.dcId,
offset: alignedOffset,
limit
};
const taskId = JSON.stringify(payload);
const mtprotoMessagePort = getMtprotoMessagePort();
let promises = deferredPromises.get(mtprotoMessagePort);
if(!promises) {
deferredPromises.set(mtprotoMessagePort, promises = {});
}
let deferred = promises[taskId];
if(deferred) {
return deferred.then((uploadFile) => uploadFile.bytes);
}
this.loadedOffsets.add(alignedOffset);
deferred = promises[taskId] = deferredPromise();
serviceMessagePort.invoke('requestFilePart', payload, undefined, mtprotoMessagePort)
.then(deferred.resolve, deferred.reject).finally(() => {
if(promises[taskId] === deferred) {
delete promises[taskId];
if(!Object.keys(promises).length) {
deferredPromises.delete(mtprotoMessagePort);
}
}
});
const bytesPromise = deferred.then((uploadFile) => uploadFile.bytes);
if(USE_CACHE) {
this.saveChunkToCache(bytesPromise, alignedOffset, limit);
!fromPreload && this.preloadChunks(alignedOffset, alignedOffset + (this.limitPart * 15));
}
return bytesPromise;
}
private requestFilePartFromCache(alignedOffset: number, limit: number, fromPreload?: boolean) {
if(!USE_CACHE) {
return Promise.resolve();
}
const key = this.getChunkKey(alignedOffset, limit);
return cacheStorage.getFile(key).then((blob: Blob) => {
return fromPreload ? new Uint8Array() : readBlobAsUint8Array(blob);
}, (error: ApiError) => {
if(error.type === 'NO_ENTRY_FOUND') {
return;
}
});
}
private requestFilePart(alignedOffset: number, limit: number, fromPreload?: boolean) {
return this.requestFilePartFromCache(alignedOffset, limit, fromPreload).then((bytes) => {
return bytes || this.requestFilePartFromWorker(alignedOffset, limit, fromPreload);
});
}
private saveChunkToCache(deferred: Promise<Uint8Array>, alignedOffset: number, limit: number) {
return deferred.then((bytes) => {
const key = this.getChunkKey(alignedOffset, limit);
const response = new Response(bytes, {
headers: {
'Content-Length': '' + bytes.length,
'Content-Type': 'application/octet-stream',
[CHUNK_CACHED_TIME_HEADER]: '' + (Date.now() / 1000 | 0)
}
});
return cacheStorage.save(key, response);
});
}
private preloadChunk(offset: number) {
if(this.loadedOffsets.has(offset)) {
return;
}
this.loadedOffsets.add(offset);
this.requestFilePart(offset, this.limitPart, true);
}
private preloadChunks(offset: number, end: number) {
if(end > this.info.size) {
end = this.info.size;
}
if(!offset) { // load last chunk for bounds
this.preloadChunk(alignOffset(offset, this.limitPart));
} else { // don't preload next chunks before the start
for(; offset < end; offset += this.limitPart) {
this.preloadChunk(offset);
}
}
}
public requestRange(range: StreamRange) {
this.destroyDebounced();
const possibleResponse = responseForSafariFirstRange(range, this.info.mimeType, this.info.size);
if(possibleResponse) {
return possibleResponse;
}
let [offset, end] = range;
/* if(info.size > limitPart && isSafari && offset === limitPart) {
//end = info.size - 1;
//offset = info.size - 1 - limitPart;
offset = info.size - (info.size % limitPart);
} */
const limit = end && end < this.limitPart ? alignLimit(end - offset + 1) : this.limitPart;
const alignedOffset = alignOffset(offset, limit);
if(!end) {
end = Math.min(offset + limit, this.info.size - 1);
}
return this.requestFilePart(alignedOffset, limit).then((ab) => {
// log.debug('[stream] requestFilePart result:', result);
// if(isSafari) {
if(offset !== alignedOffset || end !== (alignedOffset + limit)) {
ab = ab.slice(offset - alignedOffset, end - alignedOffset + 1);
}
const headers: Record<string, string> = {
'Accept-Ranges': 'bytes',
'Content-Range': `bytes ${offset}-${offset + ab.byteLength - 1}/${this.info.size || '*'}`,
'Content-Length': `${ab.byteLength}`
};
if(this.info.mimeType) {
headers['Content-Type'] = this.info.mimeType;
}
// simulate slow connection
// setTimeout(() => {
return new Response(ab, {
status: 206,
statusText: 'Partial Content',
headers
});
// }, 2.5e3);
});
}
private getChunkKey(alignedOffset: number, limit: number) {
return this.id + '?offset=' + alignedOffset + '&limit=' + limit;
}
public static get(info: DownloadOptions) {
return streams.get(this.getId(info)) ?? new Stream(info);
}
private static getId(info: DownloadOptions) {
return (info.location as InputFileLocation.inputDocumentFileLocation).id;
}
}
export default function onStreamFetch(event: FetchEvent, params: string) {
const range = parseRange(event.request.headers.get('Range'));
const info: DownloadOptions = JSON.parse(decodeURIComponent(params));
const stream = Stream.get(info);
// log.debug('[stream]', url, offset, end);
event.respondWith(Promise.race([
timeout(45 * 1000),
stream.requestRange(range)
]));
}
function responseForSafariFirstRange(range: StreamRange, mimeType: string, size: number): Response {
if(range[0] === 0 && range[1] === 1) {
return new Response(new Uint8Array(2).buffer, {
status: 206,
statusText: 'Partial Content',
headers: {
'Accept-Ranges': 'bytes',
'Content-Range': `bytes 0-1/${size || '*'}`,
'Content-Length': '2',
'Content-Type': mimeType || 'video/mp4'
}
});
}
return null;
}
/* const STREAM_CHUNK_UPPER_LIMIT = 256 * 1024;
const SMALLEST_CHUNK_LIMIT = 256 * 4; */
/* const STREAM_CHUNK_UPPER_LIMIT = 1024 * 1024;
const SMALLEST_CHUNK_LIMIT = 1024 * 4; */
const STREAM_CHUNK_MIDDLE_LIMIT = 512 * 1024;
const STREAM_CHUNK_UPPER_LIMIT = 1024 * 1024;
const SMALLEST_CHUNK_LIMIT = 512 * 4;
function parseRange(header: string): StreamRange {
if(!header) return [0, 0];
const [, chunks] = header.split('=');
const ranges = chunks.split(', ');
const [offset, end] = ranges[0].split('-');
return [+offset, +end || 0];
}
function alignOffset(offset: number, base = SMALLEST_CHUNK_LIMIT) {
return offset - (offset % base);
}
function alignLimit(limit: number) {
return 2 ** Math.ceil(Math.log(limit) / Math.log(2));
}