Telegram Web K with changes to work inside I2P https://web.telegram.i2p/
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

602 lines
19 KiB

//import apiManager from '../mtproto/apiManager';
4 years ago
import { MOUNT_CLASS_TO } from '../../config/debug';
import { logger, LogLevels } from '../logger';
import apiManager from '../mtproto/mtprotoworker';
import rootScope from '../rootScope';
//import networkerFactory from '../mtproto/networkerFactory';
5 years ago
import appChatsManager from "./appChatsManager";
import appPeersManager from "./appPeersManager";
import appStateManager from './appStateManager';
import appUsersManager from "./appUsersManager";
5 years ago
4 years ago
type UpdatesState = {
pendingPtsUpdates: any[],
pendingSeqUpdates?: any,
4 years ago
syncPending: {
seqAwaiting?: number,
ptsAwaiting?: true,
timeout: number
},
syncLoading: boolean,
seq?: number,
pts?: number,
date?: number,
lastPtsUpdateTime?: number
};
4 years ago
const SYNC_DELAY = 6;
5 years ago
export class ApiUpdatesManager {
4 years ago
public updatesState: UpdatesState = {
5 years ago
pendingPtsUpdates: [],
pendingSeqUpdates: {},
4 years ago
syncPending: null,
5 years ago
syncLoading: true
};
public channelStates: {[channelId: number]: UpdatesState} = {};
private attached = false;
4 years ago
private log = logger('UPDATES', LogLevels.error | LogLevels.log | LogLevels.warn | LogLevels.debug);
constructor() {
4 years ago
// * false for test purposes
/* false && */appStateManager.addListener('save', async() => {
const us = this.updatesState;
appStateManager.pushToState('updates', {
seq: us.seq,
pts: us.pts,
date: us.date
});
});
}
5 years ago
public popPendingSeqUpdate() {
4 years ago
const nextSeq = this.updatesState.seq + 1;
const pendingUpdatesData = this.updatesState.pendingSeqUpdates[nextSeq];
5 years ago
if(!pendingUpdatesData) {
return false;
}
4 years ago
const updates = pendingUpdatesData.updates;
for(let i = 0, length = updates.length; i < length; i++) {
5 years ago
this.saveUpdate(updates[i]);
}
this.updatesState.seq = pendingUpdatesData.seq;
if(pendingUpdatesData.date && this.updatesState.date < pendingUpdatesData.date) {
this.updatesState.date = pendingUpdatesData.date;
}
delete this.updatesState.pendingSeqUpdates[nextSeq];
if(!this.popPendingSeqUpdate() &&
this.updatesState.syncPending &&
this.updatesState.syncPending.seqAwaiting &&
this.updatesState.seq >= this.updatesState.syncPending.seqAwaiting) {
if(!this.updatesState.syncPending.ptsAwaiting) {
4 years ago
clearTimeout(this.updatesState.syncPending.timeout);
this.updatesState.syncPending = null;
5 years ago
} else {
delete this.updatesState.syncPending.seqAwaiting;
}
}
return true;
}
public popPendingPtsUpdate(channelId: number) {
const curState = channelId ? this.getChannelState(channelId) : this.updatesState;
5 years ago
if(!curState.pendingPtsUpdates.length) {
return false;
}
curState.pendingPtsUpdates.sort((a: any, b: any) => {
return a.pts - b.pts;
});
// this.log('pop update', channelId, curState.pendingPtsUpdates)
5 years ago
4 years ago
let curPts = curState.pts;
let goodPts = 0;
let goodIndex = 0;
for(let i = 0, length = curState.pendingPtsUpdates.length; i < length; i++) {
const update = curState.pendingPtsUpdates[i];
5 years ago
curPts += update.pts_count;
if(curPts >= update.pts) {
goodPts = update.pts;
goodIndex = i;
}
}
if(!goodPts) {
return false;
}
this.log('pop pending pts updates', goodPts, curState.pendingPtsUpdates.slice(0, goodIndex + 1));
5 years ago
curState.pts = goodPts;
4 years ago
for(let i = 0; i <= goodIndex; i++) {
const update = curState.pendingPtsUpdates[i];
5 years ago
this.saveUpdate(update);
}
curState.pendingPtsUpdates.splice(0, goodIndex + 1);
if(!curState.pendingPtsUpdates.length && curState.syncPending) {
if(!curState.syncPending.seqAwaiting) {
clearTimeout(curState.syncPending.timeout);
4 years ago
curState.syncPending = null;
5 years ago
} else {
delete curState.syncPending.ptsAwaiting;
}
}
return true;
}
public forceGetDifference() {
if(!this.updatesState.syncLoading) {
this.getDifference();
}
}
4 years ago
public processUpdateMessage = (updateMessage: any, options: Partial<{
4 years ago
ignoreSyncLoading: boolean
}> = {}) => {
5 years ago
// return forceGetDifference()
4 years ago
const processOpts = {
5 years ago
date: updateMessage.date,
seq: updateMessage.seq,
4 years ago
seqStart: updateMessage.seq_start,
ignoreSyncLoading: options.ignoreSyncLoading
5 years ago
};
4 years ago
this.log('processUpdateMessage', updateMessage);
5 years ago
switch(updateMessage._) {
case 'updatesTooLong':
case 'new_session_created':
this.forceGetDifference();
break;
case 'updateShort':
this.processUpdate(updateMessage.update, processOpts);
break;
case 'updateShortMessage':
case 'updateShortChatMessage': {
this.log('updateShortMessage | updateShortChatMessage', {...updateMessage});
const isOut = updateMessage.pFlags.out;
const fromId = updateMessage.from_id || (isOut ? rootScope.myId : updateMessage.user_id);
const toId = updateMessage.chat_id
5 years ago
? -updateMessage.chat_id
: (updateMessage.user_id || rootScope.myId);
5 years ago
this.processUpdate({
_: 'updateNewMessage',
message: {
_: 'message',
pFlags: updateMessage.pFlags,
id: updateMessage.id,
from_id: appPeersManager.getOutputPeer(fromId),
peer_id: appPeersManager.getOutputPeer(toId),
5 years ago
date: updateMessage.date,
message: updateMessage.message,
fwd_from: updateMessage.fwd_from,
reply_to: updateMessage.reply_to,
5 years ago
entities: updateMessage.entities
},
pts: updateMessage.pts,
pts_count: updateMessage.pts_count
}, processOpts);
break;
}
5 years ago
case 'updatesCombined':
case 'updates':
appUsersManager.saveApiUsers(updateMessage.users);
appChatsManager.saveApiChats(updateMessage.chats);
updateMessage.updates.forEach((update: any) => {
this.processUpdate(update, processOpts);
});
break;
default:
this.log.warn('Unknown update message', updateMessage);
5 years ago
}
};
5 years ago
4 years ago
public getDifference(first = false) {
// this.trace('Get full diff')
const updatesState = this.updatesState;
if(!updatesState.syncLoading) {
5 years ago
updatesState.syncLoading = true;
updatesState.pendingSeqUpdates = {};
updatesState.pendingPtsUpdates = [];
4 years ago
rootScope.broadcast('state_synchronizing');
5 years ago
}
if(updatesState.syncPending) {
clearTimeout(updatesState.syncPending.timeout);
4 years ago
updatesState.syncPending = null;
5 years ago
}
return apiManager.invokeApi('updates.getDifference', {
5 years ago
pts: updatesState.pts,
date: updatesState.date,
qts: -1
}, {
timeout: 0x7fffffff
}).then((differenceResult) => {
this.log('Get diff result', differenceResult);
if(differenceResult._ === 'updates.differenceEmpty') {
this.log('apply empty diff', differenceResult.seq);
5 years ago
updatesState.date = differenceResult.date;
updatesState.seq = differenceResult.seq;
updatesState.syncLoading = false;
4 years ago
rootScope.broadcast('state_synchronized');
5 years ago
return false;
}
4 years ago
// ! SORRY I'M SORRY I'M SORRY
if(first) {
rootScope.broadcast('state_synchronizing');
}
if(differenceResult._ !== 'updates.differenceTooLong') {
appUsersManager.saveApiUsers(differenceResult.users);
appChatsManager.saveApiChats(differenceResult.chats);
// Should be first because of updateMessageID
// this.log('applying', differenceResult.other_updates.length, 'other updates')
differenceResult.other_updates.forEach((update) => {
switch(update._) {
case 'updateChannelTooLong':
case 'updateNewChannelMessage':
case 'updateEditChannelMessage':
this.processUpdate(update);
return;
}
this.saveUpdate(update);
5 years ago
});
// this.log('applying', differenceResult.new_messages.length, 'new messages')
differenceResult.new_messages.forEach((apiMessage) => {
this.saveUpdate({
_: 'updateNewMessage',
message: apiMessage,
pts: updatesState.pts,
pts_count: 0
});
});
const nextState = differenceResult._ === 'updates.difference' ? differenceResult.state : differenceResult.intermediate_state;
updatesState.seq = nextState.seq;
updatesState.pts = nextState.pts;
updatesState.date = nextState.date;
} else {
updatesState.pts = differenceResult.pts;
delete updatesState.seq;
delete updatesState.date;
}
5 years ago
// this.log('apply diff', updatesState.seq, updatesState.pts)
5 years ago
if(differenceResult._ === 'updates.differenceSlice') {
5 years ago
this.getDifference();
} else {
// this.log('finished get diff')
4 years ago
rootScope.broadcast('state_synchronized');
5 years ago
updatesState.syncLoading = false;
}
}, () => {
updatesState.syncLoading = false;
});
}
public getChannelDifference(channelId: number) {
const channelState = this.getChannelState(channelId);
5 years ago
if(!channelState.syncLoading) {
channelState.syncLoading = true;
channelState.pendingPtsUpdates = [];
rootScope.broadcast('state_synchronizing', channelId);
5 years ago
}
5 years ago
if(channelState.syncPending) {
clearTimeout(channelState.syncPending.timeout);
4 years ago
channelState.syncPending = null;
5 years ago
}
//this.log.trace('Get channel diff', appChatsManager.getChat(channelId), channelState.pts);
apiManager.invokeApi('updates.getChannelDifference', {
channel: appChatsManager.getChannelInput(channelId),
5 years ago
filter: {_: 'channelMessagesFilterEmpty'},
pts: channelState.pts,
limit: 30
}, {timeout: 0x7fffffff}).then((differenceResult) => {
this.log('Get channel diff result', differenceResult)
channelState.pts = 'pts' in differenceResult ? differenceResult.pts : undefined;
5 years ago
if(differenceResult._ === 'updates.channelDifferenceEmpty') {
this.log('apply channel empty diff', differenceResult);
5 years ago
channelState.syncLoading = false;
rootScope.broadcast('state_synchronized', channelId);
5 years ago
return false;
}
if(differenceResult._ === 'updates.channelDifferenceTooLong') {
this.log('channel diff too long', differenceResult);
5 years ago
channelState.syncLoading = false;
delete this.channelStates[channelId];
this.saveUpdate({_: 'updateChannelReload', channel_id: channelId});
5 years ago
return false;
}
appUsersManager.saveApiUsers(differenceResult.users);
appChatsManager.saveApiChats(differenceResult.chats);
// Should be first because of updateMessageID
this.log('applying', differenceResult.other_updates.length, 'channel other updates');
differenceResult.other_updates.forEach((update) => {
5 years ago
this.saveUpdate(update);
});
this.log('applying', differenceResult.new_messages.length, 'channel new messages');
differenceResult.new_messages.forEach((apiMessage) => {
5 years ago
this.saveUpdate({
_: 'updateNewChannelMessage',
message: apiMessage,
pts: channelState.pts,
pts_count: 0
});
});
this.log('apply channel diff', channelState.pts);
5 years ago
if(differenceResult._ === 'updates.channelDifference' &&
5 years ago
!differenceResult.pFlags['final']) {
this.getChannelDifference(channelId);
5 years ago
} else {
this.log('finished channel get diff');
rootScope.broadcast('state_synchronized', channelId);
5 years ago
channelState.syncLoading = false;
}
}, () => {
channelState.syncLoading = false;
});
}
public addChannelState(channelId: number, pts: number) {
5 years ago
if(!pts) {
throw new Error('Add channel state without pts ' + channelId);
5 years ago
}
if(!(channelId in this.channelStates)) {
this.channelStates[channelId] = {
4 years ago
pts,
5 years ago
pendingPtsUpdates: [],
4 years ago
syncPending: null,
5 years ago
syncLoading: false
};
5 years ago
return true;
}
5 years ago
return false;
}
public getChannelState(channelId: number, pts?: number) {
if(this.channelStates[channelId] === undefined) {
this.addChannelState(channelId, pts);
5 years ago
}
return this.channelStates[channelId];
5 years ago
}
4 years ago
public processUpdate(update: any, options: Partial<{
date: number,
seq: number,
seqStart: number,
ignoreSyncLoading: boolean
}> = {}) {
let channelId = 0;
5 years ago
switch(update._) {
case 'updateNewChannelMessage':
case 'updateEditChannelMessage':
channelId = -appPeersManager.getPeerId(update.message.peer_id);
5 years ago
break;
case 'updateDeleteChannelMessages':
channelId = update.channel_id;
5 years ago
break;
case 'updateChannelTooLong':
channelId = update.channel_id;
if(!(channelId in this.channelStates)) {
5 years ago
return false;
}
break;
}
const curState = channelId ? this.getChannelState(channelId, update.pts) : this.updatesState;
5 years ago
// this.log.log('process', channelId, curState.pts, update)
5 years ago
4 years ago
if(curState.syncLoading && !options.ignoreSyncLoading) {
5 years ago
return false;
}
if(update._ === 'updateChannelTooLong') {
5 years ago
if(!curState.lastPtsUpdateTime ||
4 years ago
curState.lastPtsUpdateTime < (Date.now() - SYNC_DELAY)) {
// this.log.trace('channel too long, get diff', channelId, update)
this.getChannelDifference(channelId);
5 years ago
}
return false;
}
if(update._ === 'updateNewMessage' ||
update._ === 'updateEditMessage' ||
update._ === 'updateNewChannelMessage' ||
update._ === 'updateEditChannelMessage') {
4 years ago
const message = update.message;
const toPeerId = appPeersManager.getPeerId(message.peer_id);
4 years ago
const fwdHeader = message.fwd_from || {};
let reason: any = false;
if(message.from_id && !appUsersManager.hasUser(appPeersManager.getPeerId(message.from_id), message.pFlags.post/* || channelId*/) && (reason = 'author') ||
fwdHeader.from_id && !appUsersManager.hasUser(appPeersManager.getPeerId(fwdHeader.from_id), !!fwdHeader.channel_id) && (reason = 'fwdAuthor') ||
5 years ago
fwdHeader.channel_id && !appChatsManager.hasChat(fwdHeader.channel_id, true) && (reason = 'fwdChannel') ||
toPeerId > 0 && !appUsersManager.hasUser(toPeerId) && (reason = 'toPeer User') ||
toPeerId < 0 && !appChatsManager.hasChat(-toPeerId) && (reason = 'toPeer Chat')) {
this.log.warn('Not enough data for message update', toPeerId, reason, message)
if(channelId && appChatsManager.hasChat(channelId)) {
this.getChannelDifference(channelId);
5 years ago
} else {
this.forceGetDifference();
}
return false;
}
} else if(channelId && !appChatsManager.hasChat(channelId)) {
// this.log.log('skip update, missing channel', channelId, update)
5 years ago
return false;
}
4 years ago
let popPts: boolean;
let popSeq: boolean;
5 years ago
if(update.pts) {
4 years ago
const newPts = curState.pts + (update.pts_count || 0);
5 years ago
if(newPts < update.pts) {
this.log.warn('Pts hole', curState, update, channelId && appChatsManager.getChat(channelId));
5 years ago
curState.pendingPtsUpdates.push(update);
if(!curState.syncPending) {
curState.syncPending = {
4 years ago
timeout: window.setTimeout(() => {
if(channelId) {
this.getChannelDifference(channelId);
5 years ago
} else {
this.getDifference();
}
}, SYNC_DELAY)
5 years ago
}
}
curState.syncPending.ptsAwaiting = true;
return false;
}
if(update.pts > curState.pts) {
curState.pts = update.pts;
popPts = true;
curState.lastPtsUpdateTime = Date.now();
5 years ago
} else if(update.pts_count) {
// this.log.warn('Duplicate update', update)
5 years ago
return false;
}
if(channelId && options.date && this.updatesState.date < options.date) {
5 years ago
this.updatesState.date = options.date;
}
} else if(!channelId && options.seq > 0) {
4 years ago
const seq = options.seq;
const seqStart = options.seqStart || seq;
5 years ago
if(seqStart !== curState.seq + 1) {
5 years ago
if(seqStart > curState.seq) {
this.log.warn('Seq hole', curState, curState.syncPending && curState.syncPending.seqAwaiting);
5 years ago
if(curState.pendingSeqUpdates[seqStart] === undefined) {
4 years ago
curState.pendingSeqUpdates[seqStart] = {seq, date: options.date, updates: []};
5 years ago
}
curState.pendingSeqUpdates[seqStart].updates.push(update);
if(!curState.syncPending) {
curState.syncPending = {
4 years ago
timeout: window.setTimeout(() => {
5 years ago
this.getDifference();
}, SYNC_DELAY)
5 years ago
}
}
if(!curState.syncPending.seqAwaiting ||
curState.syncPending.seqAwaiting < seqStart) {
curState.syncPending.seqAwaiting = seqStart;
}
return false;
}
}
if(curState.seq !== seq) {
5 years ago
curState.seq = seq;
if(options.date && curState.date < options.date) {
curState.date = options.date;
}
popSeq = true;
}
}
this.saveUpdate(update);
if(popPts) {
this.popPendingPtsUpdate(channelId);
5 years ago
} else if(popSeq) {
this.popPendingSeqUpdate();
}
}
public saveUpdate(update: any) {
rootScope.broadcast('apiUpdate', update);
5 years ago
}
public attach() {
if(this.attached) return;
//return;
this.attached = true;
appStateManager.getState().then(_state => {
const state = _state.updates;
apiManager.setUpdatesProcessor(this.processUpdateMessage);
4 years ago
//rootScope.broadcast('state_synchronizing');
if(!state || !state.pts || !state.date || !state.seq) {
apiManager.invokeApi('updates.getState', {}, {noErrorBox: true}).then((stateResult) => {
this.updatesState.seq = stateResult.seq;
this.updatesState.pts = stateResult.pts;
this.updatesState.date = stateResult.date;
4 years ago
//setTimeout(() => {
this.updatesState.syncLoading = false;
4 years ago
//rootScope.broadcast('state_synchronized');
4 years ago
//}, 1000);
4 years ago
// ! for testing
// updatesState.seq = 1
// updatesState.pts = stateResult.pts - 5000
// updatesState.date = 1
// getDifference()
});
} else {
4 years ago
// ! for testing
/* state.seq = 1;
4 years ago
state.pts = state.pts - 15;
4 years ago
state.date = 1; */
Object.assign(this.updatesState, state);
4 years ago
this.getDifference(true)/* .finally(() => {
if(this.updatesState.syncLoading) {
rootScope.broadcast('state_synchronizing');
}
}) */;
}
});
5 years ago
}
}
const apiUpdatesManager = new ApiUpdatesManager();
MOUNT_CLASS_TO && (MOUNT_CLASS_TO.apiUpdatesManager = apiUpdatesManager);
export default apiUpdatesManager