Browse Source

Possible connection fix with ping_disconnect_delay

master
Eduard Kuzmenko 4 years ago
parent
commit
f792855c61
  1. 4
      src/helpers/schedulers.ts
  2. 4
      src/lib/logger.ts
  3. 45
      src/lib/mtproto/apiFileManager.ts
  4. 12
      src/lib/mtproto/apiManager.ts
  5. 28
      src/lib/mtproto/authorizer.ts
  6. 2
      src/lib/mtproto/dcConfigurator.ts
  7. 250
      src/lib/mtproto/networker.ts
  8. 91
      src/lib/mtproto/tl_utils.ts
  9. 38
      src/lib/mtproto/transports/websocket.ts
  10. 6
      webpack.common.js

4
src/helpers/schedulers.ts

@ -20,14 +20,14 @@ export function debounce<F extends AnyToVoidFunction>( @@ -20,14 +20,14 @@ export function debounce<F extends AnyToVoidFunction>(
fn(...args);
}
waitingTimeout = window.setTimeout(() => {
waitingTimeout = setTimeout(() => {
if(shouldRunLast) {
// @ts-ignore
fn(...args);
}
waitingTimeout = null;
}, ms);
}, ms) as any;
};
}

4
src/lib/logger.ts

@ -1,4 +1,4 @@ @@ -1,4 +1,4 @@
//import { DEBUG } from "./mtproto/mtproto_config";
import { DEBUG } from "./mtproto/mtproto_config";
export enum LogLevels {
log = 1,
@ -13,7 +13,7 @@ function dT() { @@ -13,7 +13,7 @@ function dT() {
}
export function logger(prefix: string, level = LogLevels.log | LogLevels.warn | LogLevels.error) {
if(process.env.NODE_ENV !== 'development'/* || true */) {
if(!DEBUG/* || true */) {
level = LogLevels.error;
}

45
src/lib/mtproto/apiFileManager.ts

@ -9,7 +9,7 @@ import FileManager from "../filemanager"; @@ -9,7 +9,7 @@ import FileManager from "../filemanager";
import { logger, LogLevels } from "../logger";
import apiManager from "./apiManager";
import { isWebpSupported } from "./mtproto.worker";
import { MOUNT_CLASS_TO } from "./mtproto_config";
import { DEBUG, Modes, MOUNT_CLASS_TO } from "./mtproto_config";
type Delayed = {
@ -59,9 +59,10 @@ export class ApiFileManager { @@ -59,9 +59,10 @@ export class ApiFileManager {
public webpConvertPromises: {[fileName: string]: CancellablePromise<Uint8Array>} = {};
private log: ReturnType<typeof logger> = logger('AFM', LogLevels.error);
private log: ReturnType<typeof logger> = logger('AFM', LogLevels.error | LogLevels.log);
private tempId = 0;
private queueId = 0;
private debug = Modes.debug;
public downloadRequest(dcId: 'upload', id: number, cb: () => Promise<void>, activeDelta: number, queueId?: number): Promise<void>;
public downloadRequest(dcId: number, id: number, cb: () => Promise<MyUploadFile>, activeDelta: number, queueId?: number): Promise<MyUploadFile>;
@ -138,8 +139,6 @@ export class ApiFileManager { @@ -138,8 +139,6 @@ export class ApiFileManager {
}
public requestFilePart(dcId: number, location: InputFileLocation | FileLocation, offset: number, limit: number, id = 0, queueId = 0, checkCancel?: () => void) {
//const delta = limit / 1024 / 256;
const delta = limit / 1024 / 128;
return this.downloadRequest(dcId, id, async() => {
checkCancel && checkCancel();
@ -151,13 +150,17 @@ export class ApiFileManager { @@ -151,13 +150,17 @@ export class ApiFileManager {
dcId,
fileDownload: true
}) as Promise<MyUploadFile>;
}, delta, queueId);
}, this.getDelta(limit), queueId);
}
/* private convertBlobToBytes(blob: Blob) {
return blob.arrayBuffer().then(buffer => new Uint8Array(buffer));
} */
private getDelta(bytes: number) {
return bytes / 1024 / 128;
}
private getLimitPart(size: number): number {
let bytes: number;
@ -205,7 +208,7 @@ export class ApiFileManager { @@ -205,7 +208,7 @@ export class ApiFileManager {
const cachedPromise = this.cachedDownloadPromises[fileName];
const fileStorage = this.getFileStorage();
this.log('downloadFile', fileName, size, location, options.mimeType, process);
this.debug && this.log('downloadFile', fileName, size, location, options.mimeType);
/* if(options.queueId) {
this.log.error('downloadFile queueId:', fileName, options.queueId);
@ -217,7 +220,7 @@ export class ApiFileManager { @@ -217,7 +220,7 @@ export class ApiFileManager {
if(size) {
return cachedPromise.then((blob: Blob) => {
if(blob.size < size) {
this.log('downloadFile need to deleteFile, wrong size:', blob.size, size);
this.debug && this.log('downloadFile need to deleteFile, wrong size:', blob.size, size);
return this.deleteFile(fileName).then(() => {
return this.downloadFile(options);
@ -313,7 +316,7 @@ export class ApiFileManager { @@ -313,7 +316,7 @@ export class ApiFileManager {
superpuper();
}
this.log('downloadFile requestFilePart result:', fileName, result);
this.debug && this.log('downloadFile requestFilePart result:', fileName, result);
const isFinal = offset + limit >= size || !bytes.byteLength;
if(bytes.byteLength) {
//done += limit;
@ -390,20 +393,19 @@ export class ApiFileManager { @@ -390,20 +393,19 @@ export class ApiFileManager {
let canceled = false,
resolved = false,
doneParts = 0,
partSize = 262144, // 256 Kb
activeDelta = 2;
partSize = 262144; // 256 Kb
/* if(fileSize > (524288 * 3000)) {
partSize = 1024 * 1024;
activeDelta = 8;
} else */if(fileSize > 67108864) {
partSize = 524288;
activeDelta = 4;
} else if(fileSize < 102400) {
partSize = 32768;
activeDelta = 1;
}
const activeDelta = this.getDelta(partSize);
const totalParts = Math.ceil(fileSize / partSize);
const fileId: [number, number] = [nextRandomInt(0xFFFFFFFF), nextRandomInt(0xFFFFFFFF)];
@ -473,17 +475,24 @@ export class ApiFileManager { @@ -473,17 +475,24 @@ export class ApiFileManager {
return;
}
//////this.log('Starting to upload file, isBig:', isBigFile, fileID, part, e.target.result);
let buffer = e.target.result as ArrayBuffer;
self.debug && self.log('Upload file part, isBig:', isBigFile, part, buffer.byteLength, new Uint8Array(buffer).length, new Uint8Array(buffer).slice().length);
/* const u = new Uint8Array(buffer.byteLength);
for(let i = 0; i < u.length; ++i) {
//u[i] = Math.random() * 255 | 0;
u[i] = 0;
}
buffer = u.buffer; */
apiManager.invokeApi(method, {
file_id: fileId,
file_part: part,
file_total_parts: totalParts,
bytes: e.target.result/* new Uint8Array(e.target.result as ArrayBuffer) */
bytes: buffer/* new Uint8Array(buffer) */
} as any, {
//startMaxLength: partSize + 256,
fileUpload: true,
//singleInRequest: true
fileUpload: true
}).then((result) => {
doneParts++;
uploadResolve();
@ -513,7 +522,7 @@ export class ApiFileManager { @@ -513,7 +522,7 @@ export class ApiFileManager {
(r.value as Promise<void>).then(process);
};
const maxRequests = 10;
const maxRequests = Infinity;
//const maxRequests = 10;
/* for(let i = 0; i < 10; ++i) {
process();
@ -523,7 +532,7 @@ export class ApiFileManager { @@ -523,7 +532,7 @@ export class ApiFileManager {
}
deferred.cancel = () => {
this.log('cancel upload', canceled, resolved);
//this.log('cancel upload', canceled, resolved);
if(!canceled && !resolved) {
canceled = true;
errorHandler({type: 'UPLOAD_CANCELED'});

12
src/lib/mtproto/apiManager.ts

@ -182,8 +182,16 @@ export class ApiManager { @@ -182,8 +182,16 @@ export class ApiManager {
}
const networkers = cache[dcId];
if(networkers.length >= /* 1 */(connectionType !== 'download' ? 1 : 3)) {
const networker = networkers.pop();
if(networkers.length >= /* 1 */(connectionType === 'client' ? 1 : (connectionType === 'download' ? 3 : 3))) {
let i = networkers.length - 1, found = false;
for(; i >= 0; --i) {
if(networkers[i].isOnline) {
found = true;
break;
}
}
const networker = found ? networkers.splice(i, 1)[0] : networkers.pop();
networkers.unshift(networker);
return Promise.resolve(networker);
}

28
src/lib/mtproto/authorizer.ts

@ -70,16 +70,16 @@ export class Authorizer { @@ -70,16 +70,16 @@ export class Authorizer {
}
public mtpSendPlainRequest(dcId: number, requestArray: Uint8Array) {
var requestLength = requestArray.byteLength;
const requestLength = requestArray.byteLength;
//requestArray = new /* Int32Array */Uint8Array(requestBuffer);
var header = new TLSerialization();
const header = new TLSerialization();
header.storeLongP(0, 0, 'auth_key_id'); // Auth key
header.storeLong(timeManager.generateId(), 'msg_id'); // Msg_id
header.storeInt(requestLength, 'request_length');
let headerArray = header.getBytes(true) as Uint8Array;
let resultArray = new Uint8Array(headerArray.byteLength + requestLength);
const headerArray = header.getBytes(true) as Uint8Array;
const resultArray = new Uint8Array(headerArray.byteLength + requestLength);
resultArray.set(headerArray);
resultArray.set(requestArray, headerArray.length);
@ -93,9 +93,9 @@ export class Authorizer { @@ -93,9 +93,9 @@ export class Authorizer {
resultArray.set(headerArray);
resultArray.set(requestArray, headerArray.length);
let requestData = xhrSendBuffer ? resultBuffer : resultArray; */
let transport = dcConfigurator.chooseServer(dcId);
let baseError = {
const requestData = xhrSendBuffer ? resultBuffer : resultArray; */
const transport = dcConfigurator.chooseServer(dcId);
const baseError = {
code: 406,
type: 'NETWORK_BAD_RESPONSE',
transport: transport
@ -118,20 +118,20 @@ export class Authorizer { @@ -118,20 +118,20 @@ export class Authorizer {
/* result = fResult ? fResult : result;
fResult = new Uint8Array(0); */
let deserializer = new TLDeserialization(result, {mtproto: true});
let auth_key_id = deserializer.fetchLong('auth_key_id');
if(auth_key_id !== 0) this.log.error('auth_key_id !== 0', auth_key_id);
const deserializer = new TLDeserialization(result, {mtproto: true});
const auth_key_id = deserializer.fetchLong('auth_key_id');
if(auth_key_id !== '0') this.log.error('auth_key_id !== 0', auth_key_id);
let msg_id = deserializer.fetchLong('msg_id');
if(msg_id === 0) this.log.error('msg_id === 0', msg_id);
const msg_id = deserializer.fetchLong('msg_id');
if(msg_id === '0') this.log.error('msg_id === 0', msg_id);
let msg_len = deserializer.fetchInt('msg_len');
const msg_len = deserializer.fetchInt('msg_len');
if(!msg_len) this.log.error('no msg_len', msg_len);
return deserializer;
} catch(e) {
this.log.error('mtpSendPlainRequest: deserialization went bad', e);
let error = Object.assign(baseError, {originalError: e});
const error = Object.assign(baseError, {originalError: e});
throw error;
}
}, error => {

2
src/lib/mtproto/dcConfigurator.ts

@ -48,7 +48,7 @@ export class DcConfigurator { @@ -48,7 +48,7 @@ export class DcConfigurator {
const path = Modes.test ? 'apiws_test' : 'apiws';
const chosenServer = 'wss://' + subdomain + '.web.telegram.org/' + path;
const suffix = connectionType === 'upload' ? '-U' : connectionType === 'download' ? '-D' : '';
return new Socket(dcId, chosenServer, suffix);
return new Socket(dcId, chosenServer, suffix, connectionType === 'client' ? 30000 : 10000);
};
private transportHTTP = (dcId: number, connectionType: ConnectionType) => {

250
src/lib/mtproto/networker.ts

@ -7,12 +7,12 @@ import Schema from './schema'; @@ -7,12 +7,12 @@ import Schema from './schema';
import timeManager from './timeManager';
import NetworkerFactory from './networkerFactory';
import { logger, LogLevels } from '../logger';
import { Modes, App } from './mtproto_config';
import { Modes, App, DEBUG } from './mtproto_config';
import { InvokeApiOptions } from '../../types';
import { longToBytes } from '../crypto/crypto_utils';
import MTTransport from './transports/transport';
import { convertToUint8Array, bufferConcat, bytesCmp, bytesToHex } from '../../helpers/bytes';
import { nextRandomInt } from '../../helpers/random';
import { nextRandomInt, randomLong } from '../../helpers/random';
/// #if MTPROTO_HTTP_UPLOAD
// @ts-ignore
@ -62,7 +62,6 @@ export type MTMessage = InvokeApiOptions & MTMessageOptions & { @@ -62,7 +62,6 @@ export type MTMessage = InvokeApiOptions & MTMessageOptions & {
resultType?: string,
singleInRequest?: true,
longPoll?: true,
noResponse?: true, // only with http (http_wait for longPoll)
};
@ -107,21 +106,17 @@ export default class MTPNetworker { @@ -107,21 +106,17 @@ export default class MTPNetworker {
resend_msg_ids: Array<string>
} | null = null;
//private transport: MTTransport;
private name: string;
private log: ReturnType<typeof logger>;
private isOnline = false;
public isOnline = false;
private lastResponseTime = 0;
private disconnectDelay: number;
//public onConnectionStatusChange: (online: boolean) => void;
constructor(public dcId: number, private authKey: number[], private authKeyId: Uint8Array,
private serverSalt: number[], private transport: MTTransport, options: InvokeApiOptions = {}) {
this.authKeyUint8 = convertToUint8Array(this.authKey);
//this.authKeyID = sha1BytesSync(this.authKey).slice(-8);
//console.trace('Create', dcId, options);
this.isFileUpload = !!options.fileUpload;
this.isFileDownload = !!options.fileDownload;
@ -130,7 +125,7 @@ export default class MTPNetworker { @@ -130,7 +125,7 @@ export default class MTPNetworker {
const suffix = this.isFileUpload ? '-U' : this.isFileDownload ? '-D' : '';
this.name = 'NET-' + dcId + suffix;
//this.log = logger(this.name, this.upload && this.dcId === 2 ? LogLevels.debug | LogLevels.warn | LogLevels.log | LogLevels.error : LogLevels.error);
this.log = logger(this.name, LogLevels.log | LogLevels.error);
this.log = logger(this.name, LogLevels.log | LogLevels.error | LogLevels.debug | LogLevels.warn);
this.log('constructor'/* , this.authKey, this.authKeyID, this.serverSalt */);
// Test resend after bad_server_salt
@ -163,6 +158,13 @@ export default class MTPNetworker { @@ -163,6 +158,13 @@ export default class MTPNetworker {
(this.transport as Socket).networker = this;
//}
/// #endif
// * handle outcoming dead socket, server will close the connection
if((this.transport as Socket).networker) {
this.disconnectDelay = (this.transport as Socket).retryTimeout / 1000 | 0;
setInterval(this.sendPingDelayDisconnect, (this.disconnectDelay - 5) * 1000);
this.sendPingDelayDisconnect();
}
}
public updateSession() {
@ -178,21 +180,20 @@ export default class MTPNetworker { @@ -178,21 +180,20 @@ export default class MTPNetworker {
}
if(sentMessage.container) {
const newInner: string[] = [];
sentMessage.inner.forEach((innerSentMessageId) => {
sentMessage.inner.forEachReverse((innerSentMessageId, idx) => {
const innerSentMessage = this.updateSentMessage(innerSentMessageId);
if(innerSentMessage) {
newInner.push(innerSentMessage.msg_id);
if(!innerSentMessage) {
sentMessage.inner.splice(idx, 1);
}
});
sentMessage.inner = newInner;
}
sentMessage.msg_id = timeManager.generateId();
sentMessage.seq_no = this.generateSeqNo(sentMessage.notContentRelated || sentMessage.container);
this.log('updateSentMessage', sentMessage.msg_id, sentMessageId);
/* if(DEBUG) {
this.log('updateSentMessage', sentMessage.msg_id, sentMessageId);
} */
this.sentMessages[sentMessage.msg_id] = sentMessage;
delete this.sentMessages[sentMessageId];
@ -292,7 +293,10 @@ export default class MTPNetworker { @@ -292,7 +293,10 @@ export default class MTPNetworker {
const invokeAfterMsg = Schema.API.methods.find(m => m.method === 'invokeAfterMsg');
if(!invokeAfterMsg) throw new Error('no invokeAfterMsg!');
this.log('Api call options.afterMessageId!');
if(DEBUG) {
this.log('Api call options.afterMessageId!');
}
serializer.storeInt(+invokeAfterMsg.id >>> 0, 'invokeAfterMsg');
serializer.storeLong(options.afterMessageId, 'msg_id');
}
@ -314,13 +318,24 @@ export default class MTPNetworker { @@ -314,13 +318,24 @@ export default class MTPNetworker {
if(Modes.debug/* || true */) {
this.log('Api call', method, message, params, options);
} else {
} else if(DEBUG) {
this.log('Api call', method, params, options);
}
return this.pushMessage(message, options);
}
private sendPingDelayDisconnect = () => {
if(!this.isOnline) return; // * already disconnected
this.wrapMtpCall('ping_delay_disconnect', {
ping_id: randomLong(),
disconnect_delay: this.disconnectDelay
}, {
noResponse: true,
notContentRelated: true
});
};
/// #if MTPROTO_HTTP || MTPROTO_HTTP_UPLOAD
public checkLongPoll = () => {
const isClean = this.cleanupSent();
@ -421,15 +436,19 @@ export default class MTPNetworker { @@ -421,15 +436,19 @@ export default class MTPNetworker {
this.checkConnectionTimeout = setTimeout(this.checkConnection, this.checkConnectionPeriod * 1000 | 0);
this.checkConnectionPeriod = Math.min(30, (1 + this.checkConnectionPeriod) * 1.5);
/// #if !MTPROTO_WORKER
document.body.addEventListener('online', this.checkConnection, false);
document.body.addEventListener('focus', this.checkConnection, false);
/// #endif
} else {
this.checkLongPoll();
this.scheduleRequest();
/// #if !MTPROTO_WORKER
document.body.removeEventListener('online', this.checkConnection);
document.body.removeEventListener('focus', this.checkConnection);
/// #endif
clearTimeout(this.checkConnectionTimeout);
this.checkConnectionTimeout = 0;
@ -539,16 +558,20 @@ export default class MTPNetworker { @@ -539,16 +558,20 @@ export default class MTPNetworker {
const willChange = this.isOnline !== online;
this.isOnline = online;
if(willChange && NetworkerFactory.onConnectionStatusChange) {
NetworkerFactory.onConnectionStatusChange({
_: 'networkerStatus',
online: this.isOnline,
dcId: this.dcId,
name: this.name,
isFileNetworker: this.isFileNetworker,
isFileDownload: this.isFileDownload,
isFileUpload: this.isFileUpload
});
if(willChange) {
if(NetworkerFactory.onConnectionStatusChange) {
NetworkerFactory.onConnectionStatusChange({
_: 'networkerStatus',
online: this.isOnline,
dcId: this.dcId,
name: this.name,
isFileNetworker: this.isFileNetworker,
isFileDownload: this.isFileDownload,
isFileUpload: this.isFileUpload
});
}
this.sendPingDelayDisconnect();
}
/* if(this.onConnectionStatusChange) {
this.onConnectionStatusChange(this.isOnline);
@ -565,8 +588,14 @@ export default class MTPNetworker { @@ -565,8 +588,14 @@ export default class MTPNetworker {
} else {
this.pendingMessages[messageId] = value;
}
if(sentMessage.acked) {
this.log.error('pushResend: acked message?', sentMessage);
}
this.log('Resend', messageId, sentMessage, this.pendingMessages);
if(DEBUG) {
this.log('pushResend:', messageId, sentMessage, this.pendingMessages);
}
this.scheduleRequest(delay);
}
@ -614,18 +643,9 @@ export default class MTPNetworker { @@ -614,18 +643,9 @@ export default class MTPNetworker {
});
}
public performScheduledRequest = () => {
private performScheduledRequest() {
// this.log('scheduled', this.dcId, this.iii)
/// #if MTPROTO_HTTP || MTPROTO_HTTP_UPLOAD
if(this.offline) {
this.log('Cancel scheduled');
return false;
}
this.nextReq = 0;
/// #endif
if(this.pendingAcks.length) {
const ackMsgIds: Array<string> = this.pendingAcks.slice();
@ -647,7 +667,7 @@ export default class MTPNetworker { @@ -647,7 +667,7 @@ export default class MTPNetworker {
messageId: '' // will set in wrapMtpMessage->pushMessage
};
this.log('resendReq messages', resendMsgIds);
//this.log('resendReq messages', resendMsgIds);
this.wrapMtpMessage({
_: 'msg_resend_req',
msg_ids: resendMsgIds
@ -667,17 +687,18 @@ export default class MTPNetworker { @@ -667,17 +687,18 @@ export default class MTPNetworker {
let hasApiCall = false;
let hasHttpWait = false;
let lengthOverflow = false;
let singlesCount = 0;
for(const messageId in this.pendingMessages) {
const value = this.pendingMessages[messageId];
if(!value || value >= currentTime) {
if(message = this.sentMessages[messageId]) {
//this.log('performScheduledRequest message:', message);
const messageByteLength = (/* message.body.byteLength || */message.body.length) + 32;
if(!message.notContentRelated &&
lengthOverflow) {
/* if(message.fileUpload) {
this.log('performScheduledRequest message:', message, message.body.length, (message.body as Uint8Array).byteLength, (message.body as Uint8Array).buffer.byteLength);
} */
const messageByteLength = message.body.length + 32;
if(!message.notContentRelated && lengthOverflow) {
continue; // maybe break here
}
@ -689,13 +710,6 @@ export default class MTPNetworker { @@ -689,13 +710,6 @@ export default class MTPNetworker {
continue; // maybe break here
}
if(message.singleInRequest) {
singlesCount++;
if(singlesCount > 1) {
continue; // maybe break here
}
}
messages.push(message);
messagesByteLen += messageByteLength;
if(message.isAPI) {
@ -772,7 +786,7 @@ export default class MTPNetworker { @@ -772,7 +786,7 @@ export default class MTPNetworker {
this.sentMessages[message.msg_id] = containerSentMessage;
if(Modes.debug || true) {
if(Modes.debug) {
this.log('Container', innerMessages, message.msg_id, message.seq_no);
}
} else {
@ -789,14 +803,14 @@ export default class MTPNetworker { @@ -789,14 +803,14 @@ export default class MTPNetworker {
/// #if MTPROTO_HTTP_UPLOAD
if(!(this.transport instanceof HTTP)) {
if(noResponseMsgs.length) this.log.error('noResponseMsgs length!', noResponseMsgs);
//if(noResponseMsgs.length) this.log.error('noResponseMsgs length!', noResponseMsgs);
this.cleanupSent(); // ! WARNING
} else {
this.handleSentEncryptedRequestHTTP(promise, message, noResponseMsgs);
}
/// #elif !MTPROTO_HTTP
//if(!(this.transport instanceof HTTP)) {
if(noResponseMsgs.length) this.log.error('noResponseMsgs length!', noResponseMsgs);
//if(noResponseMsgs.length) this.log.error('noResponseMsgs length!', noResponseMsgs);
this.cleanupSent(); // ! WARNING
//} else {
/// #else
@ -804,7 +818,7 @@ export default class MTPNetworker { @@ -804,7 +818,7 @@ export default class MTPNetworker {
//}
/// #endif
if(lengthOverflow || singlesCount > 1) {
if(lengthOverflow) {
this.scheduleRequest();
}
};
@ -832,8 +846,10 @@ export default class MTPNetworker { @@ -832,8 +846,10 @@ export default class MTPNetworker {
}
public sendEncryptedRequest(message: MTMessage) {
this.log.debug('Send encrypted', message, this.authKeyId);
// console.trace()
/* if(DEBUG) {
this.log.debug('Send encrypted', message, this.authKeyId);
} */
const data = new TLSerialization({
startMaxLength: message.body.length + 2048
});
@ -851,7 +867,6 @@ export default class MTPNetworker { @@ -851,7 +867,6 @@ export default class MTPNetworker {
const paddingLength = (16 - (data.offset % 16)) + 16 * (1 + nextRandomInt(5));
const padding = [...new Uint8Array(paddingLength).randomize()];
//MTProto.secureRandom.nextBytes(padding);
const dataWithPadding = bufferConcat(dataBuffer, padding);
// this.log('Adding padding', dataBuffer, padding, dataWithPadding)
@ -862,7 +877,9 @@ export default class MTPNetworker { @@ -862,7 +877,9 @@ export default class MTPNetworker {
} */
return this.getEncryptedMessage(dataWithPadding).then((encryptedResult) => {
this.log.debug('Got encrypted out message', encryptedResult);
/* if(DEBUG) {
this.log.debug('Got encrypted out message', encryptedResult);
} */
const request = new TLSerialization({
startMaxLength: encryptedResult.bytes.length + 256
@ -871,25 +888,23 @@ export default class MTPNetworker { @@ -871,25 +888,23 @@ export default class MTPNetworker {
request.storeIntBytes(encryptedResult.msgKey, 128, 'msg_key');
request.storeRawBytes(encryptedResult.bytes, 'encrypted_data');
//var requestData = xhrSendBuffer ? request.getBuffer() : request.getBytes(true) as Uint8Array;
const requestData = request.getBytes(true);
const baseError = {
code: 406,
type: 'NETWORK_BAD_RESPONSE',
transport: this.transport
};
/* if(message.fileUpload) {
this.log('Send encrypted: requestData length:', requestData.length, requestData.length % 16, paddingLength % 16, paddingLength, data.offset);
} */
const promise = this.transport.send(requestData);
/// #if !MTPROTO_HTTP && !MTPROTO_HTTP_UPLOAD
/* if(!(this.transport instanceof HTTP)) */ return promise;
return promise;
/// #else
if(!(this.transport instanceof HTTP)) return promise;
const baseError = {
code: 406,
type: 'NETWORK_BAD_RESPONSE',
transport: this.transport
};
return promise.then((result) => {
if(!result || !result.byteLength) {
return Promise.reject(baseError);
@ -911,7 +926,9 @@ export default class MTPNetworker { @@ -911,7 +926,9 @@ export default class MTPNetworker {
public parseResponse(responseBuffer: Uint8Array) {
//const perf = performance.now();
this.log.debug('Start parsing response'/* , responseBuffer */);
/* if(DEBUG) {
this.log.debug('Start parsing response', responseBuffer);
} */
this.lastResponseTime = Date.now();
@ -1037,6 +1054,7 @@ export default class MTPNetworker { @@ -1037,6 +1054,7 @@ export default class MTPNetworker {
this.serverSalt = serverSalt;
}
// ! таймаут очень сильно тормозит скорость работы сокета (даже нулевой)
public scheduleRequest(delay = 0) {
/// #if !MTPROTO_HTTP && !MTPROTO_HTTP_UPLOAD
/* clearTimeout(this.nextReqTimeout);
@ -1053,22 +1071,37 @@ export default class MTPNetworker { @@ -1053,22 +1071,37 @@ export default class MTPNetworker {
delay = 0;
} */
var nextReq = Date.now() + delay;
const nextReq = Date.now() + delay;
if(delay && this.nextReq && this.nextReq <= nextReq) {
//this.log('scheduleRequest: nextReq', this.nextReq, nextReq);
return false;
}
// this.log('schedule req', delay)
// console.trace()
//this.log('scheduleRequest: delay', delay);
/* if(this.nextReqTimeout) {
return;
} */
const perf = performance.now();
clearTimeout(this.nextReqTimeout);
this.nextReqTimeout = 0;
if(delay > 0) {
this.nextReqTimeout = self.setTimeout(this.performScheduledRequest, delay || 0);
} else {
setTimeout(this.performScheduledRequest, 0);
}
this.nextReqTimeout = self.setTimeout(() => {
//this.log('scheduleRequest: timeout delay was:', performance.now() - perf);
this.nextReqTimeout = 0;
/// #if MTPROTO_HTTP || MTPROTO_HTTP_UPLOAD || true
if(this.offline) {
//this.log('Cancel scheduled');
return false;
}
this.nextReq = 0;
/// #endif
this.performScheduledRequest();
}, delay);
this.nextReq = nextReq;
/// #endif
@ -1081,7 +1114,10 @@ export default class MTPNetworker { @@ -1081,7 +1114,10 @@ export default class MTPNetworker {
}
public reqResendMessage(msgId: string) {
this.log('Req resend', msgId);
if(DEBUG) {
this.log('Req resend', msgId);
}
this.pendingResends.push(msgId);
this.scheduleRequest(100);
}
@ -1119,11 +1155,7 @@ export default class MTPNetworker { @@ -1119,11 +1155,7 @@ export default class MTPNetworker {
if(sentMessage && !sentMessage.acked) {
delete sentMessage.body;
sentMessage.acked = true;
return true;
}
return false;
}
public processError(rawError: {error_message: string, error_code: number}) {
@ -1182,7 +1214,9 @@ export default class MTPNetworker { @@ -1182,7 +1214,9 @@ export default class MTPNetworker {
return;
}
this.log.debug('process message', message, messageId, sessionId);
/* if(DEBUG) {
this.log('process message', message, messageId, sessionId);
} */
switch(message._) {
case 'msg_container': {
@ -1254,7 +1288,9 @@ export default class MTPNetworker { @@ -1254,7 +1288,9 @@ export default class MTPNetworker {
case 'new_session_created': {
this.ackMessage(messageId);
this.log.debug('new_session_created', message);
if(DEBUG) {
this.log.debug('new_session_created', message);
}
//this.updateSession();
this.processMessageAck(message.first_msg_id);
@ -1322,20 +1358,9 @@ export default class MTPNetworker { @@ -1322,20 +1358,9 @@ export default class MTPNetworker {
}
} else {
if(deferred) {
if(Modes.debug) {
this.log.debug('Rpc response', message.result);
} else {
let dRes = message.result._;
if(!dRes) {
if(message.result.length > 5) {
dRes = '[..' + message.result.length + '..]';
} else {
dRes = message.result;
}
}
this.log.debug('Rpc response', dRes, sentMessage);
}
/* if(DEBUG) {
this.log.debug('Rpc response', message.result, sentMessage);
} */
sentMessage.deferred.resolve(message.result);
}
@ -1348,16 +1373,35 @@ export default class MTPNetworker { @@ -1348,16 +1373,35 @@ export default class MTPNetworker {
delete this.sentMessages[sentMessageId];
} else {
this.log('Rpc result for unknown message:', sentMessageId);
if(DEBUG) {
this.log('Rpc result for unknown message:', sentMessageId, message);
}
}
break;
}
case 'pong': { // * https://core.telegram.org/mtproto/service_messages#ping-messages-pingpong - These messages do not require acknowledgments
/* if((this.transport as Socket).networker) {
const sentMessageId = message.msg_id;
const sentMessage = this.sentMessages[sentMessageId];
if(sentMessage) {
delete this.sentMessages[sentMessageId];
}
} */
break;
}
default:
this.ackMessage(messageId);
this.log.debug('Update', message);
/* if(DEBUG) {
this.log.debug('Update', message);
} */
if(NetworkerFactory.updatesProcessor !== null) {
NetworkerFactory.updatesProcessor(message);

91
src/lib/mtproto/tl_utils.ts

@ -37,8 +37,8 @@ class TLSerialization { @@ -37,8 +37,8 @@ class TLSerialization {
}
public getArray() {
let resultBuffer = new ArrayBuffer(this.offset);
let resultArray = new Int32Array(resultBuffer);
const resultBuffer = new ArrayBuffer(this.offset);
const resultArray = new Int32Array(resultBuffer);
resultArray.set(this.intView.subarray(0, this.offset / 4));
@ -53,16 +53,16 @@ class TLSerialization { @@ -53,16 +53,16 @@ class TLSerialization {
public getBytes(typed?: false): number[];
public getBytes(typed?: boolean): number[] | Uint8Array {
if(typed) {
let resultBuffer = new ArrayBuffer(this.offset);
let resultArray = new Uint8Array(resultBuffer);
const resultBuffer = new ArrayBuffer(this.offset);
const resultArray = new Uint8Array(resultBuffer);
resultArray.set(this.byteView.subarray(0, this.offset));
return resultArray;
}
let bytes: number[] = [];
for(var i = 0; i < this.offset; i++) {
const bytes: number[] = [];
for(let i = 0; i < this.offset; i++) {
bytes.push(this.byteView[i]);
}
return bytes;
@ -126,16 +126,16 @@ class TLSerialization { @@ -126,16 +126,16 @@ class TLSerialization {
if(typeof sLong !== 'string') {
sLong = sLong ? sLong.toString() : '0';
}
var divRem = bigStringInt(sLong).divideAndRemainder(bigint(0x100000000));
const divRem = bigStringInt(sLong).divideAndRemainder(bigint(0x100000000));
this.writeInt(divRem[1].intValue(), (field || '') + ':long[low]');
this.writeInt(divRem[0].intValue(), (field || '') + ':long[high]');
}
public storeDouble(f: any, field?: string) {
var buffer = new ArrayBuffer(8);
var intView = new Int32Array(buffer);
var doubleView = new Float64Array(buffer);
const buffer = new ArrayBuffer(8);
const intView = new Int32Array(buffer);
const doubleView = new Float64Array(buffer);
doubleView[0] = f;
@ -149,11 +149,11 @@ class TLSerialization { @@ -149,11 +149,11 @@ class TLSerialization {
if(s === undefined) {
s = '';
}
var sUTF8 = unescape(encodeURIComponent(s));
const sUTF8 = unescape(encodeURIComponent(s));
this.checkLength(sUTF8.length + 8);
var len = sUTF8.length;
const len = sUTF8.length;
if(len <= 253) {
this.byteView[this.offset++] = len;
} else {
@ -162,7 +162,7 @@ class TLSerialization { @@ -162,7 +162,7 @@ class TLSerialization {
this.byteView[this.offset++] = (len & 0xFF00) >> 8;
this.byteView[this.offset++] = (len & 0xFF0000) >> 16;
}
for(var i = 0; i < len; i++) {
for(let i = 0; i < len; i++) {
this.byteView[this.offset++] = sUTF8.charCodeAt(i);
}
@ -181,7 +181,7 @@ class TLSerialization { @@ -181,7 +181,7 @@ class TLSerialization {
this.debug && console.log('>>>', bytesToHex(bytes as number[]), (field || '') + ':bytes');
// if uint8array were json.stringified, then will be: {'0': 123, '1': 123}
var len = (bytes as ArrayBuffer).byteLength || (bytes as Uint8Array).length;
const len = (bytes as ArrayBuffer).byteLength || (bytes as Uint8Array).length;
this.checkLength(len + 8);
if(len <= 253) {
this.byteView[this.offset++] = len;
@ -206,7 +206,7 @@ class TLSerialization { @@ -206,7 +206,7 @@ class TLSerialization {
bytes = new Uint8Array(bytes);
}
var len = bytes.length;
const len = bytes.length;
if((bits % 32) || (len * 8) !== bits) {
const error = new Error('Invalid bits: ' + bits + ', ' + bytes.length);
console.error(error, bytes, field);
@ -225,7 +225,7 @@ class TLSerialization { @@ -225,7 +225,7 @@ class TLSerialization {
bytes = new Uint8Array(bytes);
}
var len = bytes.length;
const len = bytes.length;
this.debug && console.log('>>>', bytesToHex(bytes), (field || ''));
this.checkLength(len);
@ -317,9 +317,9 @@ class TLSerialization { @@ -317,9 +317,9 @@ class TLSerialization {
throw new Error('Invalid vector type ' + type);
}
var itemType = type.substr(7, type.length - 8); // for "Vector<itemType>"
const itemType = type.substr(7, type.length - 8); // for "Vector<itemType>"
this.writeInt(obj.length, field + '[count]');
for(var i = 0; i < obj.length; i++) {
for(let i = 0; i < obj.length; i++) {
this.storeObject(obj[i], itemType, field + '[' + i + ']');
}
@ -437,7 +437,7 @@ class TLDeserialization { @@ -437,7 +437,7 @@ class TLDeserialization {
}
//var i = this.intView[this.offset / 4];
let i = new Uint32Array(this.byteView.buffer.slice(this.offset, this.offset + 4))[0];
const i = new Uint32Array(this.byteView.buffer.slice(this.offset, this.offset + 4))[0];
this.debug/* || field.includes('[dialog][read_outbox_max_id]') */
&& console.log('<<<', i.toString(16), i, field,
@ -454,9 +454,9 @@ class TLDeserialization { @@ -454,9 +454,9 @@ class TLDeserialization {
}
public fetchDouble(field?: string) {
var buffer = new ArrayBuffer(8);
var intView = new Int32Array(buffer);
var doubleView = new Float64Array(buffer);
const buffer = new ArrayBuffer(8);
const intView = new Int32Array(buffer);
const doubleView = new Float64Array(buffer);
intView[0] = this.readInt((field || '') + ':double[low]'),
intView[1] = this.readInt((field || '') + ':double[high]');
@ -464,17 +464,17 @@ class TLDeserialization { @@ -464,17 +464,17 @@ class TLDeserialization {
return doubleView[0];
}
public fetchLong(field?: string) {
var iLow = this.readInt((field || '') + ':long[low]');
var iHigh = this.readInt((field || '') + ':long[high]');
public fetchLong(field?: string): string {
const iLow = this.readInt((field || '') + ':long[low]');
const iHigh = this.readInt((field || '') + ':long[high]');
var longDec = bigint(iHigh).shiftLeft(32).add(bigint(iLow)).toString();
const longDec = bigint(iHigh).shiftLeft(32).add(bigint(iLow)).toString();
return longDec;
}
public fetchBool(field?: string) {
var i = this.readInt((field || '') + ':bool');
public fetchBool(field?: string): boolean {
const i = this.readInt((field || '') + ':bool');
if(i === boolTrue) {
return true;
} else if(i === boolFalse) {
@ -485,17 +485,17 @@ class TLDeserialization { @@ -485,17 +485,17 @@ class TLDeserialization {
return this.fetchObject('Object', field);
}
public fetchString(field?: string) {
var len = this.byteView[this.offset++];
public fetchString(field?: string): string {
let len = this.byteView[this.offset++];
if(len === 254) {
var len = this.byteView[this.offset++] |
len = this.byteView[this.offset++] |
(this.byteView[this.offset++] << 8) |
(this.byteView[this.offset++] << 16);
}
var sUTF8 = '';
for(var i = 0; i < len; i++) {
let sUTF8 = '';
for(let i = 0; i < len; i++) {
sUTF8 += String.fromCharCode(this.byteView[this.offset++]);
}
@ -504,10 +504,11 @@ class TLDeserialization { @@ -504,10 +504,11 @@ class TLDeserialization {
this.offset++;
}
let s: string;
try {
var s = decodeURIComponent(escape(sUTF8));
s = decodeURIComponent(escape(sUTF8));
} catch (e) {
var s = sUTF8;
s = sUTF8;
}
this.debug && console.log('<<<', s, (field || '') + ':string');
@ -515,8 +516,8 @@ class TLDeserialization { @@ -515,8 +516,8 @@ class TLDeserialization {
return s;
}
public fetchBytes(field?: string) {
var len = this.byteView[this.offset++];
public fetchBytes(field?: string): Uint8Array {
let len = this.byteView[this.offset++];
if(len === 254) {
len = this.byteView[this.offset++] |
@ -524,7 +525,7 @@ class TLDeserialization { @@ -524,7 +525,7 @@ class TLDeserialization {
(this.byteView[this.offset++] << 16);
}
var bytes = this.byteView.subarray(this.offset, this.offset + len);
const bytes = this.byteView.subarray(this.offset, this.offset + len);
this.offset += len;
// Padding
@ -544,15 +545,15 @@ class TLDeserialization { @@ -544,15 +545,15 @@ class TLDeserialization {
throw new Error('Invalid bits: ' + bits);
}
var len = bits / 8;
const len = bits / 8;
if(typed) {
var result = this.byteView.subarray(this.offset, this.offset + len);
const result = this.byteView.subarray(this.offset, this.offset + len);
this.offset += len;
return result;
}
var bytes = [];
for(var i = 0; i < len; i++) {
const bytes: number[] = [];
for(let i = 0; i < len; i++) {
bytes.push(this.byteView[this.offset++]);
}
@ -572,14 +573,14 @@ class TLDeserialization { @@ -572,14 +573,14 @@ class TLDeserialization {
}
if(typed) {
let bytes = new Uint8Array(len);
const bytes = new Uint8Array(len);
bytes.set(this.byteView.subarray(this.offset, this.offset + len));
this.offset += len;
return bytes;
}
var bytes = [];
for(var i = 0; i < len; i++) {
const bytes: number[] = [];
for(let i = 0; i < len; i++) {
bytes.push(this.byteView[this.offset++]);
}

38
src/lib/mtproto/transports/websocket.ts

@ -5,8 +5,8 @@ import intermediatePacketCodec from './intermediate'; @@ -5,8 +5,8 @@ import intermediatePacketCodec from './intermediate';
import MTPNetworker from '../networker';
import { logger, LogLevels } from '../../logger';
import Obfuscation from './obfuscation';
const CONNECTION_RETRY_TIMEOUT = 30000;
import { DEBUG, Modes } from '../mtproto_config';
//import { debounce } from '../../../helpers/schedulers';
export default class Socket extends MTTransport {
ws: WebSocket;
@ -32,12 +32,18 @@ export default class Socket extends MTTransport { @@ -32,12 +32,18 @@ export default class Socket extends MTTransport {
lastCloseTime: number;
constructor(dcId: number, url: string, logSuffix: string) {
debug = Modes.debug;
//releasePendingDebounced: () => void;
constructor(dcId: number, url: string, logSuffix: string, public retryTimeout: number) {
super(dcId, url);
this.log = logger(`WS-${dcId}` + logSuffix, LogLevels.error | LogLevels.log/* | LogLevels.debug */);
let logLevel = LogLevels.error | LogLevels.log;
if(this.debug) logLevel |= LogLevels.debug;
this.log = logger(`WS-${dcId}` + logSuffix, logLevel);
this.log('constructor');
this.connect();
//this.releasePendingDebounced = debounce(() => this.releasePending(true), 2000, false, true);
}
connect = () => {
@ -60,7 +66,7 @@ export default class Socket extends MTTransport { @@ -60,7 +66,7 @@ export default class Socket extends MTTransport {
handleOpen = () => {
this.log('opened');
this.log.debug('sending init packet');
this.debug && this.log.debug('sending init packet');
this.ws.send(this.obfuscation.init(this.codec));
//setTimeout(() => {
@ -89,7 +95,7 @@ export default class Socket extends MTTransport { @@ -89,7 +95,7 @@ export default class Socket extends MTTransport {
const time = Date.now();
const diff = time - this.lastCloseTime;
const needTimeout = !isNaN(diff) && diff < CONNECTION_RETRY_TIMEOUT ? CONNECTION_RETRY_TIMEOUT - diff : 0;
const needTimeout = !isNaN(diff) && diff < this.retryTimeout ? this.retryTimeout - diff : 0;
if(this.networker) {
this.networker.setConnectionStatus(false);
@ -111,7 +117,7 @@ export default class Socket extends MTTransport { @@ -111,7 +117,7 @@ export default class Socket extends MTTransport {
};
handleMessage = (event: MessageEvent) => {
this.log.debug('<-', 'handleMessage', event);
this.debug && this.log.debug('<-', 'handleMessage', event);
let data = this.obfuscation.decode(new Uint8Array(event.data));
data = this.codec.readPacket(data);
@ -119,9 +125,9 @@ export default class Socket extends MTTransport { @@ -119,9 +125,9 @@ export default class Socket extends MTTransport {
if(this.networker) { // authenticated!
//this.pending = this.pending.filter(p => p.body); // clear pending
this.log.debug('redirecting to networker');
this.debug && this.log.debug('redirecting to networker');
return this.networker.parseResponse(data).then(response => {
this.log.debug('redirecting to networker response:', response);
this.debug && this.log.debug('redirecting to networker response:', response);
this.networker.processMessage(response.response, response.messageId, response.sessionId);
});
}
@ -129,14 +135,15 @@ export default class Socket extends MTTransport { @@ -129,14 +135,15 @@ export default class Socket extends MTTransport {
//console.log('got hex:', data.hex);
const pending = this.pending.shift();
if(!pending) {
return this.log.debug('no pending for res:', data.hex);
this.debug && this.log.debug('no pending for res:', data.hex);
return;
}
pending.resolve(data);
};
send = (body: Uint8Array) => {
this.log.debug('-> body length to pending:', body.length);
this.debug && this.log.debug('-> body length to pending:', body.length);
//return;
@ -154,12 +161,17 @@ export default class Socket extends MTTransport { @@ -154,12 +161,17 @@ export default class Socket extends MTTransport {
}
}
releasePending() {
releasePending(/* tt = false */) {
if(!this.connected) {
//this.connect();
return;
}
/* if(!tt) {
this.releasePendingDebounced();
return;
} */
//this.log.error('Pending length:', this.pending.length);
let length = this.pending.length;
//for(let i = length - 1; i >= 0; --i) {
@ -173,7 +185,7 @@ export default class Socket extends MTTransport { @@ -173,7 +185,7 @@ export default class Socket extends MTTransport {
const enc = this.obfuscation.encode(toEncode);
//this.log('send after obf:', enc.hex);
this.log.debug('-> body length to send:', enc.length);
this.debug && this.log.debug('-> body length to send:', enc.length, this.ws.bufferedAmount);
/* if(this.ws.bufferedAmount) {
this.log.error('bufferedAmount:', this.ws.bufferedAmount);
} */

6
webpack.common.js

@ -6,9 +6,9 @@ const postcssPresetEnv = require('postcss-preset-env'); @@ -6,9 +6,9 @@ const postcssPresetEnv = require('postcss-preset-env');
const ServiceWorkerWebpackPlugin = require('serviceworker-webpack-plugin');
const fs = require('fs');
const allowedIPs = ['194.58.97.147', '195.66.140.39', '127.0.0.1', '176.100.8.202'];
const allowedIPs = ['194.58.97.147', '195.66.140.39', '127.0.0.1', '176.100.8.254'];
const devMode = process.env.NODE_ENV !== 'production';
const useLocal = true;
const useLocal = false;
const useLocalNotLocal = false;
if(devMode) {
@ -18,7 +18,7 @@ if(devMode) { @@ -18,7 +18,7 @@ if(devMode) {
const opts = {
MTPROTO_WORKER: true,
MTPROTO_HTTP: false,
MTPROTO_HTTP_UPLOAD: true,
MTPROTO_HTTP_UPLOAD: false,
DEBUG: devMode,
version: 3,
"ifdef-verbose": devMode, // add this for verbose output

Loading…
Cancel
Save