From bc46677ab621855e2b0deb138efb0c2ad15a4c7d Mon Sep 17 00:00:00 2001 From: Rajeh Taher Date: Fri, 2 Aug 2024 14:13:48 +0300 Subject: [PATCH] feat(feature/pdo-sync): Fixed the issues with peer messages and implemented some more logic --- Example/example.ts | 23 +++++++++++- src/Defaults/index.ts | 3 +- src/Socket/chats.ts | 13 ++++++- src/Socket/messages-recv.ts | 61 ++++++++++++++++++++++--------- src/Socket/messages-send.ts | 24 ++++++++---- src/Store/make-in-memory-store.ts | 8 +++- src/Types/Events.ts | 7 +++- src/Types/Socket.ts | 2 + src/Utils/history.ts | 2 + src/Utils/process-message.ts | 56 ++++++++++++++++------------ 10 files changed, 144 insertions(+), 55 deletions(-) diff --git a/Example/example.ts b/Example/example.ts index bf994282486..66ab3023944 100644 --- a/Example/example.ts +++ b/Example/example.ts @@ -234,8 +234,11 @@ const startSock = async() => { // history received if(events['messaging-history.set']) { - const { chats, contacts, messages, isLatest } = events['messaging-history.set'] - console.log(`recv ${chats.length} chats, ${contacts.length} contacts, ${messages.length} msgs (is latest: ${isLatest})`) + const { chats, contacts, messages, isLatest, progress, syncType } = events['messaging-history.set'] + if (syncType === proto.HistorySync.HistorySyncType.ON_DEMAND) { + console.log('received on-demand history sync, messages=', messages) + } + console.log(`recv ${chats.length} chats, ${contacts.length} contacts, ${messages.length} msgs (is latest: ${isLatest}, progress: ${progress}%), type: ${syncType}`) } // received a new message @@ -285,8 +288,24 @@ const startSock = async() => { } } */ + if (msg.message?.conversation || msg.message?.extendedTextMessage?.text) { + const text = msg.message?.conversation || msg.message?.extendedTextMessage?.text + if (text == "requestPlaceholder" && !upsert.requestId) { + const messageId = await sock.requestPlaceholderResend(msg.key) + console.log('requested placeholder resync, id=', messageId) + } else if (upsert.requestId) { + console.log('Message received from phone, id=', upsert.requestId, msg) + } + + // go to an old chat and send this + if (text == "onDemandHistSync") { + const messageId = await sock.fetchMessageHistory(50, msg.key, msg.messageTimestamp!) + console.log('requested on-demand sync, id=', messageId) + } + } if(!msg.key.fromMe && doReplies && !isJidNewsletter(msg.key?.remoteJid!)) { + console.log('replying to', msg.key.remoteJid) await sock!.readMessages([msg.key]) await sendMessageWTyping({ text: 'Hello there!' }, msg.key.remoteJid!) diff --git a/src/Defaults/index.ts b/src/Defaults/index.ts index 564581f0cd7..72df30ef83e 100644 --- a/src/Defaults/index.ts +++ b/src/Defaults/index.ts @@ -49,7 +49,8 @@ export const PROCESSABLE_HISTORY_TYPES = [ proto.Message.HistorySyncNotification.HistorySyncType.INITIAL_BOOTSTRAP, proto.Message.HistorySyncNotification.HistorySyncType.PUSH_NAME, proto.Message.HistorySyncNotification.HistorySyncType.RECENT, - proto.Message.HistorySyncNotification.HistorySyncType.FULL + proto.Message.HistorySyncNotification.HistorySyncType.FULL, + proto.Message.HistorySyncNotification.HistorySyncType.ON_DEMAND, ] export const DEFAULT_CONNECTION_CONFIG: SocketConfig = { diff --git a/src/Socket/chats.ts b/src/Socket/chats.ts index e9ee6df55ac..bb315d0db4f 100644 --- a/src/Socket/chats.ts +++ b/src/Socket/chats.ts @@ -1,6 +1,7 @@ import { Boom } from '@hapi/boom' +import NodeCache from 'node-cache' import { proto } from '../../WAProto' -import { PROCESSABLE_HISTORY_TYPES } from '../Defaults' +import { DEFAULT_CACHE_TTLS, PROCESSABLE_HISTORY_TYPES } from '../Defaults' import { ALL_WA_PATCH_NAMES, ChatModification, ChatMutation, LTHashState, MessageUpsertType, PresenceData, SocketConfig, WABusinessHoursConfig, WABusinessProfile, WAMediaUpload, WAMessage, WAPatchCreate, WAPatchName, WAPresence, WAPrivacyCallValue, WAPrivacyOnlineValue, WAPrivacyValue, WAReadReceiptsValue } from '../Types' import { chatModificationToAppPatch, ChatMutationMap, decodePatches, decodeSyncdSnapshot, encodeSyncdPatch, extractSyncdPatches, generateProfilePicture, getHistoryMsg, newLTHashState, processSyncAction } from '../Utils' import { makeMutex } from '../Utils/make-mutex' @@ -36,6 +37,15 @@ export const makeChatsSocket = (config: SocketConfig) => { /** this mutex ensures that the notifications (receipts, messages etc.) are processed in order */ const processingMutex = makeMutex() + const placeholderResendCache = config.placeholderResendCache || new NodeCache({ + stdTTL: DEFAULT_CACHE_TTLS.MSG_RETRY, // 1 hour + useClones: false + }) + + if(!config.placeholderResendCache) { + config.placeholderResendCache = placeholderResendCache + } + /** helper function to fetch the given app state sync key */ const getAppStateSyncKey = async(keyId: string) => { const { [keyId]: key } = await authState.keys.get('app-state-sync-key', [keyId]) @@ -876,6 +886,7 @@ export const makeChatsSocket = (config: SocketConfig) => { msg, { shouldProcessHistoryMsg, + placeholderResendCache, ev, creds: authState.creds, keyStore: authState.keys, diff --git a/src/Socket/messages-recv.ts b/src/Socket/messages-recv.ts index c110c6255e4..5fb9ecd2b81 100644 --- a/src/Socket/messages-recv.ts +++ b/src/Socket/messages-recv.ts @@ -81,6 +81,11 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => { useClones: false }) + const placeholderResendCache = config.placeholderResendCache || new NodeCache({ + stdTTL: DEFAULT_CACHE_TTLS.MSG_RETRY, // 1 hour + useClones: false + }) + let sendActiveReceipts = false const sendMessageAck = async({ tag, attrs, content }: BinaryNode) => { @@ -153,8 +158,8 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => { if(retryCount === 1) { //request a resend via phone - const msgId = await requestPlaceholderResend([msgKey]) - logger.debug(`requested placeholder resend for message ${msgId}`) + const msgId = await requestPlaceholderResend(msgKey) + logger.debug(`sendRetryRequest: requested placeholder resend for message ${msgId}`) } const deviceIdentity = encodeSignedDeviceIdentity(account!, true) @@ -708,19 +713,24 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => { } const handleMessage = async(node: BinaryNode) => { - if(getBinaryNodeChild(node, 'unavailable') && !getBinaryNodeChild(node, 'enc')) { - await sendMessageAck(node) - const { key } = decodeMessageNode(node, authState.creds.me!.id, authState.creds.me!.lid || '').fullMessage - await requestPlaceholderResend([key]) - logger.debug('received unavailable message, requested resend') - } - if(shouldIgnoreJid(node.attrs.from) && node.attrs.from !== '@s.whatsapp.net') { logger.debug({ key: node.attrs.key }, 'ignored message') await sendMessageAck(node) return } + if(getBinaryNodeChild(node, 'unavailable') && !getBinaryNodeChild(node, 'enc')) { + await sendMessageAck(node) + const { key } = decodeMessageNode(node, authState.creds.me!.id, authState.creds.me!.lid || '').fullMessage + await requestPlaceholderResend(key) + logger.debug('received unavailable message, acked and requested resend from phone') + } else { + if(placeholderResendCache.get(node.attrs.id)) { + placeholderResendCache.del(node.attrs.id) + } + } + + const { fullMessage: msg, category, author, decrypt } = decryptMessageNode( node, authState.creds.me!.id, @@ -816,22 +826,39 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => { return sendPeerDataOperationMessage(pdoMessage) } - const requestPlaceholderResend = async( - messageKeys: WAMessageKey[] - ): Promise => { - // TODO: implement locking / cache to prevent multiple requests - // TODO: implement 5 second delay + const requestPlaceholderResend = async(messageKey: WAMessageKey): Promise => { if(!authState.creds.me?.id) { throw new Boom('Not authenticated') } + if(placeholderResendCache.get(messageKey?.id!)) { + logger.debug('already requested resend', { messageKey }) + return + } else { + placeholderResendCache.set(messageKey?.id!, true) + } + + await delay(5000) + + if(!placeholderResendCache.get(messageKey?.id!)) { + logger.debug('message received while resend requested', { messageKey }) + return + } + const pdoMessage = { - placeholderMessageResendRequest: messageKeys.map(a => ({ - messageKey: a - })), + placeholderMessageResendRequest: [{ + messageKey + }], peerDataOperationRequestType: proto.Message.PeerDataOperationRequestType.PLACEHOLDER_MESSAGE_RESEND } + setTimeout(() => { + if(placeholderResendCache.get(messageKey?.id!)) { + logger.debug('PDO message without response after 15 seconds. Phone possibly offline', { messageKey }) + placeholderResendCache.del(messageKey?.id!) + } + }, 15_000) + return sendPeerDataOperationMessage(pdoMessage) } diff --git a/src/Socket/messages-send.ts b/src/Socket/messages-send.ts index de1853b3aa9..f75a776a7d7 100644 --- a/src/Socket/messages-send.ts +++ b/src/Socket/messages-send.ts @@ -465,11 +465,12 @@ export const makeMessagesSocket = (config: SocketConfig) => { if(!participant) { devices.push({ user }) // do not send message to self if the device is 0 (mobile) - if(meDevice !== undefined && meDevice !== 0) { - devices.push({ user: meUser }) - } if(!(additionalAttributes?.['category'] === 'peer' && user === meUser)) { + if(meDevice !== undefined && meDevice !== 0) { + devices.push({ user: meUser }) + } + const additionalDevices = await getUSyncDevices([ meId, jid ], !!useUserDevicesCache, true) devices.push(...additionalDevices) } @@ -506,11 +507,18 @@ export const makeMessagesSocket = (config: SocketConfig) => { } if(participants.length) { - binaryNodeContent.push({ - tag: 'participants', - attrs: { }, - content: participants - }) + if(additionalAttributes?.['category'] === 'peer') { + const peerNode = participants[0]?.content?.[0] as BinaryNode + if(peerNode) { + binaryNodeContent.push(peerNode) // push only enc + } + } else { + binaryNodeContent.push({ + tag: 'participants', + attrs: { }, + content: participants + }) + } } const stanza: BinaryNode = { diff --git a/src/Store/make-in-memory-store.ts b/src/Store/make-in-memory-store.ts index 165fd2bfc7a..b96808c72c2 100644 --- a/src/Store/make-in-memory-store.ts +++ b/src/Store/make-in-memory-store.ts @@ -93,8 +93,14 @@ export default (config: BaileysInMemoryStoreConfig) => { chats: newChats, contacts: newContacts, messages: newMessages, - isLatest + isLatest, + syncType }) => { + if(syncType === proto.HistorySync.HistorySyncType.ON_DEMAND) { + return // FOR NOW, + //TODO: HANDLE + } + if(isLatest) { chats.clear() diff --git a/src/Types/Events.ts b/src/Types/Events.ts index e10aad74f74..1dc18ebdc67 100644 --- a/src/Types/Events.ts +++ b/src/Types/Events.ts @@ -20,7 +20,9 @@ export type BaileysEventMap = { chats: Chat[] contacts: Contact[] messages: WAMessage[] - isLatest: boolean + isLatest?: boolean + progress: number + syncType: proto.HistorySync.HistorySyncType } /** upsert chats */ 'chats.upsert': Chat[] @@ -41,8 +43,9 @@ export type BaileysEventMap = { /** * add/update the given messages. If they were received while the connection was online, * the update will have type: "notify" + * if requestId is provided, then the messages was received from the phone due to it being unavailable * */ - 'messages.upsert': { messages: WAMessage[], type: MessageUpsertType } + 'messages.upsert': { messages: WAMessage[], type: MessageUpsertType, requestId?: string } /** message was reacted to. If reaction was removed -- then "reaction.text" will be falsey */ 'messages.reaction': { key: WAMessageKey, reaction: proto.IReaction }[] diff --git a/src/Types/Socket.ts b/src/Types/Socket.ts index ab41b4ab5f0..4d6e1c7524c 100644 --- a/src/Types/Socket.ts +++ b/src/Types/Socket.ts @@ -74,6 +74,8 @@ export type SocketConfig = { userDevicesCache?: CacheStore /** cache to store call offers */ callOfferCache?: CacheStore + /** cache to track placeholder resends */ + placeholderResendCache?: CacheStore /** width for link preview images */ linkPreviewImageThumbnailWidth: number /** Should Baileys ask the phone for full history, will be received async */ diff --git a/src/Utils/history.ts b/src/Utils/history.ts index 453b7fcc713..ad3c693b55a 100644 --- a/src/Utils/history.ts +++ b/src/Utils/history.ts @@ -94,6 +94,8 @@ export const processHistoryMessage = (item: proto.IHistorySync) => { chats, contacts, messages, + syncType: item.syncType, + progress: item.progress, } } diff --git a/src/Utils/process-message.ts b/src/Utils/process-message.ts index f799c3f379c..b002bfa180f 100644 --- a/src/Utils/process-message.ts +++ b/src/Utils/process-message.ts @@ -1,15 +1,16 @@ import { AxiosRequestConfig } from 'axios' import type { Logger } from 'pino' import { proto } from '../../WAProto' -import { AuthenticationCreds, BaileysEventEmitter, Chat, GroupMetadata, ParticipantAction, RequestJoinAction, RequestJoinMethod, SignalKeyStoreWithTransaction, SocketConfig, WAMessageStubType } from '../Types' +import { AuthenticationCreds, BaileysEventEmitter, CacheStore, Chat, GroupMetadata, ParticipantAction, RequestJoinAction, RequestJoinMethod, SignalKeyStoreWithTransaction, SocketConfig, WAMessageStubType } from '../Types' import { getContentType, normalizeMessageContent } from '../Utils/messages' import { areJidsSameUser, isJidBroadcast, isJidStatusBroadcast, jidNormalizedUser } from '../WABinary' import { aesDecryptGCM, hmacSign } from './crypto' -import { getKeyAuthor, toNumber } from './generics' +import { delay, getKeyAuthor, toNumber } from './generics' import { downloadAndProcessHistorySyncNotification } from './history' type ProcessMessageContext = { shouldProcessHistoryMsg: boolean + placeholderResendCache?: CacheStore creds: AuthenticationCreds keyStore: SignalKeyStoreWithTransaction ev: BaileysEventEmitter @@ -152,6 +153,7 @@ const processMessage = async( message: proto.IWebMessageInfo, { shouldProcessHistoryMsg, + placeholderResendCache, ev, creds, keyStore, @@ -190,7 +192,7 @@ const processMessage = async( if(protocolMsg) { switch (protocolMsg.type) { case proto.Message.ProtocolMessage.Type.HISTORY_SYNC_NOTIFICATION: - const histNotification = protocolMsg!.historySyncNotification! + const histNotification = protocolMsg.historySyncNotification! const process = shouldProcessHistoryMsg const isLatest = !creds.processedHistoryMessages?.length @@ -202,19 +204,27 @@ const processMessage = async( }, 'got history notification') if(process) { - ev.emit('creds.update', { - processedHistoryMessages: [ - ...(creds.processedHistoryMessages || []), - { key: message.key, messageTimestamp: message.messageTimestamp } - ] - }) + if(histNotification.syncType !== proto.HistorySync.HistorySyncType.ON_DEMAND) { + ev.emit('creds.update', { + processedHistoryMessages: [ + ...(creds.processedHistoryMessages || []), + { key: message.key, messageTimestamp: message.messageTimestamp } + ] + }) + } const data = await downloadAndProcessHistorySyncNotification( histNotification, options ) - ev.emit('messaging-history.set', { ...data, isLatest }) + ev.emit('messaging-history.set', { + ...data, + isLatest: + histNotification.syncType !== proto.HistorySync.HistorySyncType.ON_DEMAND + ? isLatest + : undefined + }) } break @@ -266,22 +276,22 @@ const processMessage = async( break case proto.Message.ProtocolMessage.Type.PEER_DATA_OPERATION_REQUEST_RESPONSE_MESSAGE: const response = protocolMsg.peerDataOperationRequestResponseMessage! - if (response) { + if(response) { + placeholderResendCache?.del(response.stanzaId!) // TODO: IMPLEMENT HISTORY SYNC ETC (sticker uploads etc.). const { peerDataOperationResult } = response for(const result of peerDataOperationResult!) { const { placeholderMessageResendResponse: retryResponse } = result if(retryResponse) { const webMessageInfo = proto.WebMessageInfo.decode(retryResponse.webMessageInfoBytes!) - // maybe messages.upsert is not ideal here - though it's upsert so people should handle already existing messages - // the message might come before requesting to the phone and that could be an issue here. - // in that case we get 2 events for the same message - or almost same, since the phone can omit or add info - ev.emit('messages.upsert', { - messages: [ - webMessageInfo - ], - type: 'notify' // TODO: DECIDE IF THIS SHOULD BE APPEND OR NOTIFY - }) + // wait till another upsert event is available, don't want it to be part of the PDO response message + setTimeout(() => { + ev.emit('messages.upsert', { + messages: [webMessageInfo], + type: 'notify', + requestId: response.stanzaId! + }) + }, 500) } } } @@ -295,10 +305,10 @@ const processMessage = async( } ev.emit('messages.reaction', [{ reaction, - key: content.reactionMessage!.key!, + key: content.reactionMessage?.key!, }]) } else if(message.messageStubType) { - const jid = message.key!.remoteJid! + const jid = message.key?.remoteJid! //let actor = whatsappID (message.participant) let participants: string[] const emitParticipantsUpdate = (action: ParticipantAction) => ( @@ -387,7 +397,7 @@ const processMessage = async( if(pollMsg) { const meIdNormalised = jidNormalizedUser(meId) const pollCreatorJid = getKeyAuthor(creationMsgKey, meIdNormalised) - const voterJid = getKeyAuthor(message.key!, meIdNormalised) + const voterJid = getKeyAuthor(message.key, meIdNormalised) const pollEncKey = pollMsg.messageContextInfo?.messageSecret! try {