Skip to content

Commit

Permalink
PDO protocol (peer data operation): Get more history sync + better me…
Browse files Browse the repository at this point in the history
…ssage retry mechanism (#919)

* feat(feature/pdo-sync): initial commit

* feat(feature/pdo-sync): Moved to conventional send functions, exported, patched some errors

* fix(feature/pdo-sync): Linting and more bugsquatting

* chore(feature/pdo-sync): linting done

* feat/fix(feat/pdo-sync): Newsletter decrypt + ack

* merge (#946)

* fix: profilePictureUrl (#901)

* Update module to latest version  (#926)

* Update package.json

Update the module to the latest

* Add files via upload

* Fix: Readme use upsert events (#908)

* Fix: getUSyncDevices (#862)

* Update messages-send.ts

* Update messages-send.ts

* Update messages-send.ts

* Fix lint

* Fix lint

* fix(master): update linting workflow to node 20 (current LTS)

---------

Co-authored-by: Akhlaqul Muhammad Fadwa <75623219+zennn08@users.noreply.github.com>
Co-authored-by: Rizz2Dev <muhamad.rizki27483@smp.belajar.id>
Co-authored-by: Oscar Guindzberg <oscar.guindzberg@gmail.com>
Co-authored-by: Bob <115008575+bobslavtriev@users.noreply.github.com>

* chore(feature/pdo-sync): final linting

* fix(feature/pdo-sync): make replies optional

* feat(feat/pdo-sync): add <unavailable> handle

* feat(feature/pdo-sync): Fixed the issues with peer messages and implemented some more logic

* fix(feature/pdo-sync): Make progress optional

* fix(feature/pdo-sync): Nullify and defeat Message absent from node if it is resolved immediately

* feat(feature/pdo-sync): Export message absent from node and export PDO request ID with it

---------

Co-authored-by: Akhlaqul Muhammad Fadwa <75623219+zennn08@users.noreply.github.com>
Co-authored-by: Rizz2Dev <muhamad.rizki27483@smp.belajar.id>
Co-authored-by: Oscar Guindzberg <oscar.guindzberg@gmail.com>
Co-authored-by: Bob <115008575+bobslavtriev@users.noreply.github.com>
  • Loading branch information
5 people authored Aug 14, 2024
1 parent 35f6d75 commit 1f9cfb1
Show file tree
Hide file tree
Showing 12 changed files with 316 additions and 60 deletions.
79 changes: 71 additions & 8 deletions Example/example.ts
Original file line number Diff line number Diff line change
@@ -1,23 +1,26 @@
import { Boom } from '@hapi/boom'
import NodeCache from 'node-cache'
import readline from 'readline'
import makeWASocket, { AnyMessageContent, BinaryInfo, delay, DisconnectReason, encodeWAM, fetchLatestBaileysVersion, getAggregateVotesInPollMessage, makeCacheableSignalKeyStore, makeInMemoryStore, PHONENUMBER_MCC, proto, useMultiFileAuthState, WAMessageContent, WAMessageKey } from '../src'
import MAIN_LOGGER from '../src/Utils/logger'
import makeWASocket, { AnyMessageContent, BinaryInfo, delay, DisconnectReason, downloadAndProcessHistorySyncNotification, encodeWAM, fetchLatestBaileysVersion, getAggregateVotesInPollMessage, getHistoryMsg, isJidNewsletter, makeCacheableSignalKeyStore, makeInMemoryStore, PHONENUMBER_MCC, proto, useMultiFileAuthState, WAMessageContent, WAMessageKey } from '../src'
//import MAIN_LOGGER from '../src/Utils/logger'
import open from 'open'
import fs from 'fs'
import P from 'pino'

const logger = MAIN_LOGGER.child({})
const logger = P({ timestamp: () => `,"time":"${new Date().toJSON()}"` }, P.destination('./wa-logs.txt'))
logger.level = 'trace'

const useStore = !process.argv.includes('--no-store')
const doReplies = !process.argv.includes('--no-reply')
const doReplies = process.argv.includes('--do-reply')
const usePairingCode = process.argv.includes('--use-pairing-code')
const useMobile = process.argv.includes('--mobile')

// external map to store retry counts of messages when decryption/encryption fails
// keep this out of the socket itself, so as to prevent a message decryption/encryption loop across socket restarts
const msgRetryCounterCache = new NodeCache()

const onDemandMap = new Map<string, string>()

// Read line interface
const rl = readline.createInterface({ input: process.stdin, output: process.stdout })
const question = (text: string) => new Promise<string>((resolve) => rl.question(text, resolve))
Expand Down Expand Up @@ -231,8 +234,11 @@ const startSock = async() => {

// history received
if(events['messaging-history.set']) {
const { chats, contacts, messages, isLatest } = events['messaging-history.set']
console.log(`recv ${chats.length} chats, ${contacts.length} contacts, ${messages.length} msgs (is latest: ${isLatest})`)
const { chats, contacts, messages, isLatest, progress, syncType } = events['messaging-history.set']
if (syncType === proto.HistorySync.HistorySyncType.ON_DEMAND) {
console.log('received on-demand history sync, messages=', messages)
}
console.log(`recv ${chats.length} chats, ${contacts.length} contacts, ${messages.length} msgs (is latest: ${isLatest}, progress: ${progress}%), type: ${syncType}`)
}

// received a new message
Expand All @@ -241,8 +247,65 @@ const startSock = async() => {
console.log('recv messages ', JSON.stringify(upsert, undefined, 2))

if(upsert.type === 'notify') {
for(const msg of upsert.messages) {
if(!msg.key.fromMe && doReplies) {
for (const msg of upsert.messages) {
//TODO: More built-in implementation of this
/* if (
msg.message?.protocolMessage?.type ===
proto.Message.ProtocolMessage.Type.HISTORY_SYNC_NOTIFICATION
) {
const historySyncNotification = getHistoryMsg(msg.message)
if (
historySyncNotification?.syncType ==
proto.HistorySync.HistorySyncType.ON_DEMAND
) {
const { messages } =
await downloadAndProcessHistorySyncNotification(
historySyncNotification,
{}
)
const chatId = onDemandMap.get(
historySyncNotification!.peerDataRequestSessionId!
)
console.log(messages)
onDemandMap.delete(
historySyncNotification!.peerDataRequestSessionId!
)
/*
// 50 messages is the limit imposed by whatsapp
//TODO: Add ratelimit of 7200 seconds
//TODO: Max retries 10
const messageId = await sock.fetchMessageHistory(
50,
oldestMessageKey,
oldestMessageTimestamp
)
onDemandMap.set(messageId, chatId)
}
} */

if (msg.message?.conversation || msg.message?.extendedTextMessage?.text) {
const text = msg.message?.conversation || msg.message?.extendedTextMessage?.text
if (text == "requestPlaceholder" && !upsert.requestId) {
const messageId = await sock.requestPlaceholderResend(msg.key)
console.log('requested placeholder resync, id=', messageId)
} else if (upsert.requestId) {
console.log('Message received from phone, id=', upsert.requestId, msg)
}

// go to an old chat and send this
if (text == "onDemandHistSync") {
const messageId = await sock.fetchMessageHistory(50, msg.key, msg.messageTimestamp!)
console.log('requested on-demand sync, id=', messageId)
}
}

if(!msg.key.fromMe && doReplies && !isJidNewsletter(msg.key?.remoteJid!)) {

console.log('replying to', msg.key.remoteJid)
await sock!.readMessages([msg.key])
await sendMessageWTyping({ text: 'Hello there!' }, msg.key.remoteJid!)
Expand Down
3 changes: 2 additions & 1 deletion src/Defaults/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ export const PROCESSABLE_HISTORY_TYPES = [
proto.Message.HistorySyncNotification.HistorySyncType.INITIAL_BOOTSTRAP,
proto.Message.HistorySyncNotification.HistorySyncType.PUSH_NAME,
proto.Message.HistorySyncNotification.HistorySyncType.RECENT,
proto.Message.HistorySyncNotification.HistorySyncType.FULL
proto.Message.HistorySyncNotification.HistorySyncType.FULL,
proto.Message.HistorySyncNotification.HistorySyncType.ON_DEMAND,
]

export const DEFAULT_CONNECTION_CONFIG: SocketConfig = {
Expand Down
13 changes: 12 additions & 1 deletion src/Socket/chats.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { Boom } from '@hapi/boom'
import NodeCache from 'node-cache'
import { proto } from '../../WAProto'
import { PROCESSABLE_HISTORY_TYPES } from '../Defaults'
import { DEFAULT_CACHE_TTLS, PROCESSABLE_HISTORY_TYPES } from '../Defaults'
import { ALL_WA_PATCH_NAMES, ChatModification, ChatMutation, LTHashState, MessageUpsertType, PresenceData, SocketConfig, WABusinessHoursConfig, WABusinessProfile, WAMediaUpload, WAMessage, WAPatchCreate, WAPatchName, WAPresence, WAPrivacyCallValue, WAPrivacyOnlineValue, WAPrivacyValue, WAReadReceiptsValue } from '../Types'
import { chatModificationToAppPatch, ChatMutationMap, decodePatches, decodeSyncdSnapshot, encodeSyncdPatch, extractSyncdPatches, generateProfilePicture, getHistoryMsg, newLTHashState, processSyncAction } from '../Utils'
import { makeMutex } from '../Utils/make-mutex'
Expand Down Expand Up @@ -36,6 +37,15 @@ export const makeChatsSocket = (config: SocketConfig) => {
/** this mutex ensures that the notifications (receipts, messages etc.) are processed in order */
const processingMutex = makeMutex()

const placeholderResendCache = config.placeholderResendCache || new NodeCache({
stdTTL: DEFAULT_CACHE_TTLS.MSG_RETRY, // 1 hour
useClones: false
})

if(!config.placeholderResendCache) {
config.placeholderResendCache = placeholderResendCache
}

/** helper function to fetch the given app state sync key */
const getAppStateSyncKey = async(keyId: string) => {
const { [keyId]: key } = await authState.keys.get('app-state-sync-key', [keyId])
Expand Down Expand Up @@ -876,6 +886,7 @@ export const makeChatsSocket = (config: SocketConfig) => {
msg,
{
shouldProcessHistoryMsg,
placeholderResendCache,
ev,
creds: authState.creds,
keyStore: authState.keys,
Expand Down
124 changes: 113 additions & 11 deletions src/Socket/messages-recv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {
aesEncryptGCM,
Curve,
decodeMediaRetryNode,
decodeMessageNode,
decryptMessageNode,
delay,
derivePairingCodeKey,
Expand All @@ -19,6 +20,7 @@ import {
getHistoryMsg,
getNextPreKeys,
getStatusFromReceiptType, hkdf,
NO_MESSAGE_FOUND_ERROR_TEXT,
unixTimestampSeconds,
xmppPreKey,
xmppSignedPreKey
Expand Down Expand Up @@ -65,6 +67,7 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => {
relayMessage,
sendReceipt,
uploadPreKeys,
sendPeerDataOperationMessage,
} = sock

/** this mutex ensures that each retryRequest will wait for the previous one to finish */
Expand All @@ -79,6 +82,11 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => {
useClones: false
})

const placeholderResendCache = config.placeholderResendCache || new NodeCache({
stdTTL: DEFAULT_CACHE_TTLS.MSG_RETRY, // 1 hour
useClones: false
})

let sendActiveReceipts = false

const sendMessageAck = async({ tag, attrs, content }: BinaryNode) => {
Expand All @@ -99,14 +107,13 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => {
stanza.attrs.recipient = attrs.recipient
}

if(!!attrs.type && (tag !== 'message' || getBinaryNodeChild({ tag, attrs, content }, 'unavailable'))) {
stanza.attrs.type = attrs.type
}

if(!!attrs.type && (tag !== 'message' || getBinaryNodeChild({ tag, attrs, content }, 'unavailable'))) {
stanza.attrs.type = attrs.type
}

if(tag === 'message' && getBinaryNodeChild({ tag, attrs, content }, 'unavailable')) {
stanza.attrs.from = authState.creds.me!.id
}
if(tag === 'message' && getBinaryNodeChild({ tag, attrs, content }, 'unavailable')) {
stanza.attrs.from = authState.creds.me!.id
}

logger.debug({ recv: { tag, attrs }, sent: stanza.attrs }, 'sent ack')
await sendNode(stanza)
Expand All @@ -133,9 +140,11 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => {
}

const sendRetryRequest = async(node: BinaryNode, forceIncludeKeys = false) => {
const { id: msgId, participant } = node.attrs
const { fullMessage } = decodeMessageNode(node, authState.creds.me!.id, authState.creds.me!.lid || '')
const { key: msgKey } = fullMessage
const msgId = msgKey.id!

const key = `${msgId}:${participant}`
const key = `${msgId}:${msgKey?.participant}`
let retryCount = msgRetryCache.get<number>(key) || 0
if(retryCount >= maxMsgRetryCount) {
logger.debug({ retryCount, msgId }, 'reached retry limit, clearing')
Expand All @@ -148,6 +157,12 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => {

const { account, signedPreKey, signedIdentityKey: identityKey } = authState.creds

if(retryCount === 1) {
//request a resend via phone
const msgId = await requestPlaceholderResend(msgKey)
logger.debug(`sendRetryRequest: requested placeholder resend for message ${msgId}`)
}

const deviceIdentity = encodeSignedDeviceIdentity(account!, true)
await authState.keys.transaction(
async() => {
Expand Down Expand Up @@ -699,12 +714,30 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => {
}

const handleMessage = async(node: BinaryNode) => {
if(shouldIgnoreJid(node.attrs.from!) && node.attrs.from! !== '@s.whatsapp.net') {
if(shouldIgnoreJid(node.attrs.from) && node.attrs.from !== '@s.whatsapp.net') {
logger.debug({ key: node.attrs.key }, 'ignored message')
await sendMessageAck(node)
return
}

let response: string | undefined

if(getBinaryNodeChild(node, 'unavailable') && !getBinaryNodeChild(node, 'enc')) {
await sendMessageAck(node)
const { key } = decodeMessageNode(node, authState.creds.me!.id, authState.creds.me!.lid || '').fullMessage
response = await requestPlaceholderResend(key)
if(response === 'RESOLVED') {
return
}

logger.debug('received unavailable message, acked and requested resend from phone')
} else {
if(placeholderResendCache.get(node.attrs.id)) {
placeholderResendCache.del(node.attrs.id)
}
}


const { fullMessage: msg, category, author, decrypt } = decryptMessageNode(
node,
authState.creds.me!.id,
Expand All @@ -713,6 +746,10 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => {
logger,
)

if(response && msg?.messageStubParameters?.[0] === NO_MESSAGE_FOUND_ERROR_TEXT) {
msg.messageStubParameters = [NO_MESSAGE_FOUND_ERROR_TEXT, response]
}

if(msg.message?.protocolMessage?.type === proto.Message.ProtocolMessage.Type.SHARE_PHONE_NUMBER) {
if(node.attrs.sender_pn) {
ev.emit('chats.phoneNumberShare', { lid: node.attrs.from, jid: node.attrs.sender_pn })
Expand All @@ -728,6 +765,10 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => {
retryMutex.mutex(
async() => {
if(ws.isOpen) {
if(getBinaryNodeChild(node, 'unavailable')) {
return
}

const encNode = getBinaryNodeChild(node, 'enc')
await sendRetryRequest(node, !encNode)
if(retryRequestDelayMs) {
Expand Down Expand Up @@ -773,6 +814,65 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => {
])
}

const fetchMessageHistory = async(
count: number,
oldestMsgKey: WAMessageKey,
oldestMsgTimestamp: number | Long
): Promise<string> => {
if(!authState.creds.me?.id) {
throw new Boom('Not authenticated')
}

const pdoMessage = {
historySyncOnDemandRequest: {
chatJid: oldestMsgKey.remoteJid,
oldestMsgFromMe: oldestMsgKey.fromMe,
oldestMsgId: oldestMsgKey.id,
oldestMsgTimestampMs: oldestMsgTimestamp,
onDemandMsgCount: count
},
peerDataOperationRequestType: proto.Message.PeerDataOperationRequestType.HISTORY_SYNC_ON_DEMAND
}

return sendPeerDataOperationMessage(pdoMessage)
}

const requestPlaceholderResend = async(messageKey: WAMessageKey): Promise<'RESOLVED'| string | undefined> => {
if(!authState.creds.me?.id) {
throw new Boom('Not authenticated')
}

if(placeholderResendCache.get(messageKey?.id!)) {
logger.debug('already requested resend', { messageKey })
return
} else {
placeholderResendCache.set(messageKey?.id!, true)
}

await delay(5000)

if(!placeholderResendCache.get(messageKey?.id!)) {
logger.debug('message received while resend requested', { messageKey })
return 'RESOLVED'
}

const pdoMessage = {
placeholderMessageResendRequest: [{
messageKey
}],
peerDataOperationRequestType: proto.Message.PeerDataOperationRequestType.PLACEHOLDER_MESSAGE_RESEND
}

setTimeout(() => {
if(placeholderResendCache.get(messageKey?.id!)) {
logger.debug('PDO message without response after 15 seconds. Phone possibly offline', { messageKey })
placeholderResendCache.del(messageKey?.id!)
}
}, 15_000)

return sendPeerDataOperationMessage(pdoMessage)
}

const handleCall = async(node: BinaryNode) => {
const { attrs } = node
const [infoChild] = getAllBinaryNodeChildren(node)
Expand Down Expand Up @@ -925,6 +1025,8 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => {
...sock,
sendMessageAck,
sendRetryRequest,
rejectCall
rejectCall,
fetchMessageHistory,
requestPlaceholderResend,
}
}
Loading

0 comments on commit 1f9cfb1

Please sign in to comment.