@ -27,7 +27,7 @@ import DEBUG from '../../config/debug';
@@ -27,7 +27,7 @@ import DEBUG from '../../config/debug';
import Modes from '../../config/modes' ;
import noop from '../../helpers/noop' ;
/// #if MTPROTO_HTTP_UPLOAD || MTPROTO _HTTP
/// #if MTPROTO_HAS _HTTP
import HTTP from './transports/http' ;
/// #endif
@ -35,6 +35,7 @@ import type TcpObfuscated from './transports/tcpObfuscated';
@@ -35,6 +35,7 @@ import type TcpObfuscated from './transports/tcpObfuscated';
import { bigInt2str , rightShift_ , str2bigInt } from '../../vendor/leemon' ;
import { forEachReverse } from '../../helpers/array' ;
import { ConnectionStatus } from './connectionStatus' ;
import ctx from '../../environment/ctx' ;
//console.error('networker included!', new Error().stack);
@ -95,21 +96,22 @@ export default class MTPNetworker {
@@ -95,21 +96,22 @@ export default class MTPNetworker {
private pendingMessages : { [ msgId : MTLong ] : number } = { } ;
private pendingAcks : Array < MTLong > = [ ] ;
private pendingResends : Array < MTLong > = [ ] ;
public connectionInited = false ;
public connectionInited : boolean ;
private nextReqTimeout : number ;
private nextReq : number = 0 ;
/// #if MTPROTO_HTTP || MTPROTO_HTTP_UPLOAD
//private longPollInt: number;
private longPollPending = 0 ;
/// #if MTPROTO_HAS_HTTP
private longPollInterval : number ;
private longPollPending : number ;
private checkConnectionTimeout : number ;
private checkConnectionPeriod = 0 ;
private sleepAfter = 0 ;
private sleepAfter : number ;
private offline = false ;
private sendingLongPoll : boolean ;
/// #endif
private seqNo : number = 0 ;
private seqNo : number ;
private prevSessionId : Uint8Array ;
private sessionId : Uint8Array ;
private serverSalt : Uint8Array ;
@ -133,14 +135,22 @@ export default class MTPNetworker {
@@ -133,14 +135,22 @@ export default class MTPNetworker {
public onDrain : ( ) = > void ;
private onDrainTimeout : number ;
public transport : MTTransport ;
//private disconnectDelay: number;
//private pingPromise: CancellablePromise<any>;
//public onConnectionStatusChange: (online: boolean) => void;
//private debugRequests: Array<{before: Uint8Array, after: Uint8Array}> = [];
constructor ( public dcId : number , private authKey : Uint8Array , private authKeyId : Uint8Array ,
serverSalt : Uint8Array , public transport : MTTransport , options : InvokeApiOptions = { } ) {
constructor (
public dcId : number ,
private authKey : Uint8Array ,
private authKeyId : Uint8Array ,
serverSalt : Uint8Array ,
transport : MTTransport ,
options : InvokeApiOptions = { }
) {
this . authKeyUint8 = convertToUint8Array ( this . authKey ) ;
this . serverSalt = convertToUint8Array ( serverSalt ) ;
@ -151,7 +161,7 @@ export default class MTPNetworker {
@@ -151,7 +161,7 @@ export default class MTPNetworker {
const suffix = this . isFileUpload ? '-U' : this . isFileDownload ? '-D' : '' ;
this . name = 'NET-' + dcId + suffix ;
//this.log = logger(this.name, this.upload && this.dcId === 2 ? LogLevels.debug | LogLevels.warn | LogLevels.log | LogLevels.error : LogLevels.error);
this . log = logger ( this . name , LogTypes . Log | /* LogTypes.Debug | */ LogTypes . Error | LogTypes . Warn , undefined ) ;
this . log = logger ( this . name , LogTypes . Log | LogTypes . Debug | LogTypes . Error | LogTypes . Warn , undefined ) ;
this . log ( 'constructor' /* , this.authKey, this.authKeyID, this.serverSalt */ ) ;
// Test resend after bad_server_salt
@ -168,22 +178,7 @@ export default class MTPNetworker {
@@ -168,22 +178,7 @@ export default class MTPNetworker {
// rootScope.offlineConnecting = true */
// }
/// #if MTPROTO_HTTP_UPLOAD
if ( this . transport instanceof HTTP ) {
/* this.longPollInt = */ setInterval ( this . checkLongPoll , 10000 ) ;
this . checkLongPoll ( ) ;
} else {
( this . transport as TcpObfuscated ) . networker = this ;
}
/// #elif MTPROTO_HTTP
//if(this.transport instanceof HTTP) {
/* this.longPollInt = */ setInterval ( this . checkLongPoll , 10000 ) ;
this . checkLongPoll ( ) ;
/// #else
//} else {
( this . transport as TcpObfuscated ) . networker = this ;
//}
/// #endif
this . changeTransport ( transport ) ;
// * handle outcoming dead socket, server will close the connection
// if((this.transport as TcpObfuscated).networker) {
@ -191,10 +186,6 @@ export default class MTPNetworker {
@@ -191,10 +186,6 @@ export default class MTPNetworker {
// //setInterval(this.sendPingDelayDisconnect, (this.disconnectDelay - 5) * 1000);
// this.sendPingDelayDisconnect();
// }
if ( ( this . transport as TcpObfuscated ) . connected ) {
this . setConnectionStatus ( ConnectionStatus . Connected ) ;
}
}
private updateSession() {
@ -232,9 +223,9 @@ export default class MTPNetworker {
@@ -232,9 +223,9 @@ export default class MTPNetworker {
sentMessage . msg_id = timeManager . generateId ( ) ;
sentMessage . seq_no = this . generateSeqNo ( sentMessage . notContentRelated || sentMessage . container ) ;
/ * i f ( D E B U G ) {
this . log ( 'updateSentMessage' , sentMessage . msg_id , sentMessageId ) ;
} * /
if ( this . debug ) {
this . log ( ` updateSentMessage, old= ${ sentMessageId } , new= ${ sentMessage . msg_id } ` ) ;
}
this . sentMessages [ sentMessage . msg_id ] = sentMessage ;
delete this . sentMessages [ sentMessageId ] ;
@ -372,9 +363,57 @@ export default class MTPNetworker {
@@ -372,9 +363,57 @@ export default class MTPNetworker {
return this . pushMessage ( message , options ) ;
}
public changeTransport ( transport? : MTTransport ) {
const oldTransport = this . transport ;
if ( oldTransport ) {
if ( ( oldTransport as TcpObfuscated ) . destroy ) {
( oldTransport as TcpObfuscated ) . destroy ( ) ;
}
if ( this . nextReqTimeout ) {
clearTimeout ( this . nextReqTimeout ) ;
this . nextReqTimeout = 0 ;
this . nextReq = 0 ;
}
/// #if MTPROTO_HAS_HTTP
if ( this . longPollInterval !== undefined ) {
clearInterval ( this . longPollInterval ) ;
this . longPollInterval = undefined ;
}
if ( this . checkConnectionTimeout !== undefined ) {
clearTimeout ( this . checkConnectionTimeout ) ;
this . checkConnectionTimeout = undefined ;
}
/// #endif
}
this . transport = transport ;
if ( ! transport ) {
return ;
}
/// #if MTPROTO_HAS_HTTP
/// #if MTPROTO_HAS_WS
if ( transport instanceof HTTP ) {
/// #endif
this . longPollInterval = ctx . setInterval ( this . checkLongPoll , 10000 ) ;
this . checkLongPoll ( ) ;
/// #if MTPROTO_HAS_WS
}
/// #endif
/// #endif
transport . networker = this ;
if ( ( transport as TcpObfuscated ) . connected ) {
this . setConnectionStatus ( ConnectionStatus . Connected ) ;
}
}
public destroy() {
//assumeType<TcpObfuscated>(this.transport);
( this . transport as TcpObfuscated ) . destroy ( ) ;
this . changeTransport ( ) ;
}
public forceReconnectTimeout() {
@ -496,13 +535,14 @@ export default class MTPNetworker {
@@ -496,13 +535,14 @@ export default class MTPNetworker {
// });
// };
/// #if MTPROTO_HTTP || MTPROTO_HTTP_UPLOAD
/// #if MTPROTO_HAS_HTTP
private checkLongPoll = ( ) = > {
const isClean = this . cleanupSent ( ) ;
//this.log.error('Check lp', this.longPollPending, this.dcId, isClean, this);
if ( ( this . longPollPending && Date . now ( ) < this . longPollPending ) ||
this . offline ||
this . isStopped ( ) ) {
this . isStopped ( ) ||
this . isFileNetworker ) {
//this.log('No lp this time');
return false ;
}
@ -510,7 +550,6 @@ export default class MTPNetworker {
@@ -510,7 +550,6 @@ export default class MTPNetworker {
sessionStorage . get ( 'dc' ) . then ( ( baseDcId ) = > {
if ( isClean && (
baseDcId !== this . dcId ||
this . isFileNetworker ||
( this . sleepAfter && Date . now ( ) > this . sleepAfter )
) ) {
//console.warn(dT(), 'Send long-poll for DC is delayed', this.dcId, this.sleepAfter);
@ -522,10 +561,12 @@ export default class MTPNetworker {
@@ -522,10 +561,12 @@ export default class MTPNetworker {
} ;
private sendLongPoll() {
if ( this . sendingLongPoll ) return ;
this . sendingLongPoll = true ;
const maxWait = 25000 ;
this . longPollPending = Date . now ( ) + maxWait ;
//this.log('Set lp', this.longPollPending, tsNow())
this . debug && this . log . debug ( 'sendLongPoll' , this . longPollPending ) ;
this . wrapMtpCall ( 'http_wait' , {
max_delay : 500 ,
@ -535,19 +576,19 @@ export default class MTPNetworker {
@@ -535,19 +576,19 @@ export default class MTPNetworker {
noResponse : true ,
longPoll : true
} ) . then ( ( ) = > {
this . longPollPending = 0 ;
this . longPollPending = undefined ;
setTimeout ( this . checkLongPoll , 0 ) ;
} , ( error : ErrorEvent ) = > {
this . log ( 'Long-poll failed' , error ) ;
} ) . finally ( ( ) = > {
this . sendingLongPoll = undefined ;
} ) ;
}
private checkConnection = ( event : Event | string ) = > {
/* rootScope.offlineConnecting = true */
this . log ( 'Check connection' , event ) ;
this . debug && this . log ( 'Check connection' , event ) ;
clearTimeout ( this . checkConnectionTimeout ) ;
this . checkConnectionTimeout = 0 ;
this . checkConnectionTimeout = undefined ;
const serializer = new TLSerialization ( { mtproto : true } ) ;
const pingId = randomLong ( ) ;
@ -562,16 +603,12 @@ export default class MTPNetworker {
@@ -562,16 +603,12 @@ export default class MTPNetworker {
body : serializer.getBytes ( true )
} ;
this . sendEncryptedRequest ( pingMessage ) . then ( ( result ) = > {
/* delete rootScope.offlineConnecting */
this . sendEncryptedRequest ( pingMessage ) . then ( ( ) = > {
this . toggleOffline ( false ) ;
} , ( ) = > {
this . log ( 'Delay ' , this . checkConnectionPeriod * 1000 ) ;
this . checkConnectionTimeout = setTimeout ( this . checkConnection , this . checkConnectionPeriod * 1000 | 0 ) ;
this . debug && this . log ( 'Delay ' , this . checkConnectionPeriod * 1000 ) ;
this . checkConnectionTimeout = ctx . setTimeout ( this . checkConnection , this . checkConnectionPeriod * 1000 | 0 ) ;
this . checkConnectionPeriod = Math . min ( 60 , this . checkConnectionPeriod * 1.5 ) ;
/ * s e t T i m e o u t ( f u n c t i o n ( ) {
delete rootScope . offlineConnecting
} , 1000 ) ; * /
} ) ;
} ;
@ -582,8 +619,6 @@ export default class MTPNetworker {
@@ -582,8 +619,6 @@ export default class MTPNetworker {
}
this . offline = enabled ;
/ * r o o t S c o p e . o f f l i n e = e n a b l e d ;
rootScope . offlineConnecting = false ; * /
if ( this . offline ) {
clearTimeout ( this . nextReqTimeout ) ;
@ -594,7 +629,7 @@ export default class MTPNetworker {
@@ -594,7 +629,7 @@ export default class MTPNetworker {
this . checkConnectionPeriod = 0 ;
}
this . checkConnectionTimeout = setTimeout ( this . checkConnection , this . checkConnectionPeriod * 1000 | 0 ) ;
this . checkConnectionTimeout = ctx . setTimeout ( this . checkConnection , this . checkConnectionPeriod * 1000 | 0 ) ;
this . checkConnectionPeriod = Math . min ( 30 , ( 1 + this . checkConnectionPeriod ) * 1.5 ) ;
/// #if !MTPROTO_WORKER
@ -612,16 +647,14 @@ export default class MTPNetworker {
@@ -612,16 +647,14 @@ export default class MTPNetworker {
/// #endif
clearTimeout ( this . checkConnectionTimeout ) ;
this . checkConnectionTimeout = 0 ;
this . checkConnectionTimeout = undefined ;
}
}
private handleSentEncryptedRequestHTTP ( promise : ReturnType < MTPNetworker [ ' sendEncryptedRequest ' ] > , message : MTMessage , noResponseMsgs : string [ ] ) {
promise
. then ( ( result ) = > {
promise . then ( ( result ) = > {
this . toggleOffline ( false ) ;
// this.log('parse for', message)
// this.log('parse for', message);
this . parseResponse ( result ) . then ( ( response ) = > {
if ( Modes . debug ) {
this . log . debug ( 'Server response' , response ) ;
@ -645,7 +678,7 @@ export default class MTPNetworker {
@@ -645,7 +678,7 @@ export default class MTPNetworker {
this . log . error ( 'Encrypted request failed' , error , message ) ;
if ( message . container ) {
message . inner . forEach ( ( msgId : string ) = > {
message . inner . forEach ( ( msgId ) = > {
this . pendingMessages [ msgId ] = 0 ;
} ) ;
@ -661,7 +694,7 @@ export default class MTPNetworker {
@@ -661,7 +694,7 @@ export default class MTPNetworker {
delete this . pendingMessages [ msgId ] ;
deferred . reject ( ) ;
}
} )
} ) ;
this . toggleOffline ( true ) ;
} ) ;
@ -732,7 +765,7 @@ export default class MTPNetworker {
@@ -732,7 +765,7 @@ export default class MTPNetworker {
public setDrainTimeout() {
if ( ! this . activeRequests && this . onDrain && this . onDrainTimeout === undefined ) {
this . onDrainTimeout = self . setTimeout ( ( ) = > {
this . onDrainTimeout = ctx . setTimeout ( ( ) = > {
this . onDrainTimeout = undefined ;
this . log ( 'drain' ) ;
this . onDrain ( ) ;
@ -890,7 +923,7 @@ export default class MTPNetworker {
@@ -890,7 +923,7 @@ export default class MTPNetworker {
//const currentTime = Date.now();
let messagesByteLen = 0 ;
/// #if MTPROTO_HTTP || MTPROTO_HTTP_UPLOAD
/// #if MTPROTO_HAS_HTTP
let hasApiCall = false ;
let hasHttpWait = false ;
/// #endif
@ -923,7 +956,7 @@ export default class MTPNetworker {
@@ -923,7 +956,7 @@ export default class MTPNetworker {
messages . push ( message ) ;
messagesByteLen += messageByteLength ;
/// #if MTPROTO_HTTP || MTPROTO_HTTP_UPLOAD
/// #if MTPROTO_HAS_HTTP
if ( message . isAPI ) {
hasApiCall = true ;
} else if ( message . longPoll ) {
@ -940,10 +973,10 @@ export default class MTPNetworker {
@@ -940,10 +973,10 @@ export default class MTPNetworker {
//}
}
/// #if MTPROTO_HTTP_UPLOAD
/// #if MTPROTO_HAS_HTTP
/// #if MTPROTO_HAS_WS
if ( this . transport instanceof HTTP )
/// #endif
/// #if MTPROTO_HTTP || MTPROTO_HTTP_UPLOAD
if ( hasApiCall && ! hasHttpWait ) {
const serializer = new TLSerialization ( { mtproto : true } ) ;
serializer . storeMethod ( 'http_wait' , {
@ -965,7 +998,7 @@ export default class MTPNetworker {
@@ -965,7 +998,7 @@ export default class MTPNetworker {
return ;
}
/// #if MTPROTO_HTTP_UPLOAD || MTPROTO _HTTP
/// #if MTPROTO_HAS _HTTP
const noResponseMsgs : Array < string > = messages . filter ( message = > message . noResponse ) . map ( message = > message . msg_id ) ;
/// #endif
@ -982,18 +1015,18 @@ export default class MTPNetworker {
@@ -982,18 +1015,18 @@ export default class MTPNetworker {
const promise = this . sendEncryptedRequest ( outMessage ) ;
/// #if MTPROTO_HTTP_UPLOAD
if ( ! ( this . transport instanceof HTTP ) ) {
//if(noResponseMsgs.length) this.log.error('noResponseMsgs length!', noResponseMsgs);
this . cleanupSent ( ) ; // ! WARNING
} else {
/// #if MTPROTO_HAS_HTTP
/// #if MTPROTO_HAS_WS
if ( this . transport instanceof HTTP )
/// #endif
this . handleSentEncryptedRequestHTTP ( promise , outMessage , noResponseMsgs ) ;
}
/// #elif !MTPROTO_HTTP
/// #endif
/// #if MTPROTO_HAS_WS
/// #if MTPROTO_HAS_HTTP
if ( ! ( this . transport instanceof HTTP ) )
/// #endif
this . cleanupSent ( ) ; // ! WARNING
/// #else
this . handleSentEncryptedRequestHTTP ( promise , outMessage , noResponseMsgs ) ;
//}
/// #endif
if ( lengthOverflow ) {
@ -1157,10 +1190,14 @@ export default class MTPNetworker {
@@ -1157,10 +1190,14 @@ export default class MTPNetworker {
this . debug && this . log . debug ( 'sendEncryptedRequest: launching message into space:' , message , [ message . msg_id ] . concat ( message . inner || [ ] ) ) ;
const promise : Promise < Uint8Array > = this . transport . send ( requestData ) as any ;
/// #if !MTPROTO_HTTP && !MTPROTO_HTTP_UPLOAD
/// #if !MTPROTO_HAS_HTTP
return promise ;
/// #else
/// #if MTPROTO_HAS_WS
if ( ! ( this . transport instanceof HTTP ) ) return promise ;
/// #endif
const baseError = {
code : 406 ,
@ -1322,13 +1359,18 @@ export default class MTPNetworker {
@@ -1322,13 +1359,18 @@ export default class MTPNetworker {
return ;
} * /
/// #if MTPROTO_HTTP || MTPROTO_HTTP_UPLOAD
if ( ! ( this . transport instanceof HTTP ) ) {
this . performScheduledRequest ( ) ;
return ;
} else if ( this . offline ) {
/// #if MTPROTO_HAS_HTTP
/// #if MTPROTO_HAS_WS
if ( this . transport instanceof HTTP ) {
/// #endif
if ( this . offline ) {
this . checkConnection ( 'forced schedule' ) ;
}
delay || = 0 ; // set zero timeout to pack other messages too
/// #if MTPROTO_HAS_WS
}
/// #endif
/// #endif
const nextReq = Date . now ( ) + ( delay || 0 ) ;
@ -1354,23 +1396,22 @@ export default class MTPNetworker {
@@ -1354,23 +1396,22 @@ export default class MTPNetworker {
this . nextReqTimeout = 0 ;
this . nextReq = 0 ;
/// #if MTPROTO_HTTP || MTPROTO_HTTP_UPLOAD
/// #if MTPROTO_HAS_HTTP
/// #if MTPROTO_HAS_WS
if ( this . transport instanceof HTTP )
/// #endif
if ( this . offline ) {
//this.log('Cancel scheduled');
return ;
}
/// #else
/ * i f ( ! t h i s . i s O n l i n e ) {
return ;
} * /
/// #endif
this . performScheduledRequest ( ) ;
/// #endif
} ;
this . nextReq = nextReq ;
if ( delay ) {
if ( delay !== undefined ) {
this . nextReqTimeout = self . setTimeout ( cb , delay ) ;
} else {
cb ( ) ;
@ -1381,11 +1422,16 @@ export default class MTPNetworker {
@@ -1381,11 +1422,16 @@ export default class MTPNetworker {
// this.log('ack message', msgID)
this . pendingAcks . push ( msgId ) ;
/// #if MTPROTO_HTTP || MTPROTO_HTTP_UPLOAD
this . scheduleRequest ( 30000 ) ;
/// #else
this . scheduleRequest ( ) ;
let delay : number ;
/// #if MTPROTO_HAS_HTTP
/// #if MTPROTO_HAS_WS
if ( this . transport instanceof HTTP )
/// #endif
delay = 30000 ;
/// #endif
this . scheduleRequest ( delay ) ;
}
private reqResendMessage ( msgId : MTLong ) {
@ -1680,8 +1726,7 @@ export default class MTPNetworker {
@@ -1680,8 +1726,7 @@ export default class MTPNetworker {
break ;
}
case 'pong' : { // * https://core.telegram.org/mtproto/service_messages#ping-messages-pingpong - These messages doesn't require acknowledgments
if ( ( this . transport as TcpObfuscated ) . networker ) {
case 'pong' : { // * https://core.telegram.org/mtproto/service_messages#ping-messages-pingpong - These messages don't require acknowledgments
const sentMessageId = message . msg_id ;
const sentMessage = this . sentMessages [ sentMessageId ] ;
@ -1689,7 +1734,6 @@ export default class MTPNetworker {
@@ -1689,7 +1734,6 @@ export default class MTPNetworker {
sentMessage . deferred . resolve ( message ) ;
delete this . sentMessages [ sentMessageId ] ;
}
}
break ;
}