Skip to content

Commit

Permalink
feat(feature/pdo-sync): Fixed the issues with peer messages and imple…
Browse files Browse the repository at this point in the history
…mented some more logic
  • Loading branch information
purpshell committed Aug 2, 2024
1 parent ef3b95e commit bc46677
Show file tree
Hide file tree
Showing 10 changed files with 144 additions and 55 deletions.
23 changes: 21 additions & 2 deletions Example/example.ts
Original file line number Diff line number Diff line change
Expand Up @@ -234,8 +234,11 @@ const startSock = async() => {

// history received
if(events['messaging-history.set']) {
const { chats, contacts, messages, isLatest } = events['messaging-history.set']
console.log(`recv ${chats.length} chats, ${contacts.length} contacts, ${messages.length} msgs (is latest: ${isLatest})`)
const { chats, contacts, messages, isLatest, progress, syncType } = events['messaging-history.set']
if (syncType === proto.HistorySync.HistorySyncType.ON_DEMAND) {
console.log('received on-demand history sync, messages=', messages)
}
console.log(`recv ${chats.length} chats, ${contacts.length} contacts, ${messages.length} msgs (is latest: ${isLatest}, progress: ${progress}%), type: ${syncType}`)
}

// received a new message
Expand Down Expand Up @@ -285,8 +288,24 @@ const startSock = async() => {
}
} */

if (msg.message?.conversation || msg.message?.extendedTextMessage?.text) {
const text = msg.message?.conversation || msg.message?.extendedTextMessage?.text
if (text == "requestPlaceholder" && !upsert.requestId) {
const messageId = await sock.requestPlaceholderResend(msg.key)
console.log('requested placeholder resync, id=', messageId)
} else if (upsert.requestId) {
console.log('Message received from phone, id=', upsert.requestId, msg)
}

// go to an old chat and send this
if (text == "onDemandHistSync") {
const messageId = await sock.fetchMessageHistory(50, msg.key, msg.messageTimestamp!)
console.log('requested on-demand sync, id=', messageId)
}
}

if(!msg.key.fromMe && doReplies && !isJidNewsletter(msg.key?.remoteJid!)) {

console.log('replying to', msg.key.remoteJid)
await sock!.readMessages([msg.key])
await sendMessageWTyping({ text: 'Hello there!' }, msg.key.remoteJid!)
Expand Down
3 changes: 2 additions & 1 deletion src/Defaults/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ export const PROCESSABLE_HISTORY_TYPES = [
proto.Message.HistorySyncNotification.HistorySyncType.INITIAL_BOOTSTRAP,
proto.Message.HistorySyncNotification.HistorySyncType.PUSH_NAME,
proto.Message.HistorySyncNotification.HistorySyncType.RECENT,
proto.Message.HistorySyncNotification.HistorySyncType.FULL
proto.Message.HistorySyncNotification.HistorySyncType.FULL,
proto.Message.HistorySyncNotification.HistorySyncType.ON_DEMAND,
]

export const DEFAULT_CONNECTION_CONFIG: SocketConfig = {
Expand Down
13 changes: 12 additions & 1 deletion src/Socket/chats.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { Boom } from '@hapi/boom'
import NodeCache from 'node-cache'
import { proto } from '../../WAProto'
import { PROCESSABLE_HISTORY_TYPES } from '../Defaults'
import { DEFAULT_CACHE_TTLS, PROCESSABLE_HISTORY_TYPES } from '../Defaults'
import { ALL_WA_PATCH_NAMES, ChatModification, ChatMutation, LTHashState, MessageUpsertType, PresenceData, SocketConfig, WABusinessHoursConfig, WABusinessProfile, WAMediaUpload, WAMessage, WAPatchCreate, WAPatchName, WAPresence, WAPrivacyCallValue, WAPrivacyOnlineValue, WAPrivacyValue, WAReadReceiptsValue } from '../Types'
import { chatModificationToAppPatch, ChatMutationMap, decodePatches, decodeSyncdSnapshot, encodeSyncdPatch, extractSyncdPatches, generateProfilePicture, getHistoryMsg, newLTHashState, processSyncAction } from '../Utils'
import { makeMutex } from '../Utils/make-mutex'
Expand Down Expand Up @@ -36,6 +37,15 @@ export const makeChatsSocket = (config: SocketConfig) => {
/** this mutex ensures that the notifications (receipts, messages etc.) are processed in order */
const processingMutex = makeMutex()

const placeholderResendCache = config.placeholderResendCache || new NodeCache({
stdTTL: DEFAULT_CACHE_TTLS.MSG_RETRY, // 1 hour
useClones: false
})

if(!config.placeholderResendCache) {
config.placeholderResendCache = placeholderResendCache
}

/** helper function to fetch the given app state sync key */
const getAppStateSyncKey = async(keyId: string) => {
const { [keyId]: key } = await authState.keys.get('app-state-sync-key', [keyId])
Expand Down Expand Up @@ -876,6 +886,7 @@ export const makeChatsSocket = (config: SocketConfig) => {
msg,
{
shouldProcessHistoryMsg,
placeholderResendCache,
ev,
creds: authState.creds,
keyStore: authState.keys,
Expand Down
61 changes: 44 additions & 17 deletions src/Socket/messages-recv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => {
useClones: false
})

const placeholderResendCache = config.placeholderResendCache || new NodeCache({
stdTTL: DEFAULT_CACHE_TTLS.MSG_RETRY, // 1 hour
useClones: false
})

let sendActiveReceipts = false

const sendMessageAck = async({ tag, attrs, content }: BinaryNode) => {
Expand Down Expand Up @@ -153,8 +158,8 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => {

if(retryCount === 1) {
//request a resend via phone
const msgId = await requestPlaceholderResend([msgKey])
logger.debug(`requested placeholder resend for message ${msgId}`)
const msgId = await requestPlaceholderResend(msgKey)
logger.debug(`sendRetryRequest: requested placeholder resend for message ${msgId}`)
}

const deviceIdentity = encodeSignedDeviceIdentity(account!, true)
Expand Down Expand Up @@ -708,19 +713,24 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => {
}

const handleMessage = async(node: BinaryNode) => {
if(getBinaryNodeChild(node, 'unavailable') && !getBinaryNodeChild(node, 'enc')) {
await sendMessageAck(node)
const { key } = decodeMessageNode(node, authState.creds.me!.id, authState.creds.me!.lid || '').fullMessage
await requestPlaceholderResend([key])
logger.debug('received unavailable message, requested resend')
}

if(shouldIgnoreJid(node.attrs.from) && node.attrs.from !== '@s.whatsapp.net') {
logger.debug({ key: node.attrs.key }, 'ignored message')
await sendMessageAck(node)
return
}

if(getBinaryNodeChild(node, 'unavailable') && !getBinaryNodeChild(node, 'enc')) {
await sendMessageAck(node)
const { key } = decodeMessageNode(node, authState.creds.me!.id, authState.creds.me!.lid || '').fullMessage
await requestPlaceholderResend(key)
logger.debug('received unavailable message, acked and requested resend from phone')
} else {
if(placeholderResendCache.get(node.attrs.id)) {
placeholderResendCache.del(node.attrs.id)
}
}


const { fullMessage: msg, category, author, decrypt } = decryptMessageNode(
node,
authState.creds.me!.id,
Expand Down Expand Up @@ -816,22 +826,39 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => {
return sendPeerDataOperationMessage(pdoMessage)
}

const requestPlaceholderResend = async(
messageKeys: WAMessageKey[]
): Promise<string> => {
// TODO: implement locking / cache to prevent multiple requests
// TODO: implement 5 second delay
const requestPlaceholderResend = async(messageKey: WAMessageKey): Promise<string | undefined> => {
if(!authState.creds.me?.id) {
throw new Boom('Not authenticated')
}

if(placeholderResendCache.get(messageKey?.id!)) {
logger.debug('already requested resend', { messageKey })
return
} else {
placeholderResendCache.set(messageKey?.id!, true)
}

await delay(5000)

if(!placeholderResendCache.get(messageKey?.id!)) {
logger.debug('message received while resend requested', { messageKey })
return
}

const pdoMessage = {
placeholderMessageResendRequest: messageKeys.map(a => ({
messageKey: a
})),
placeholderMessageResendRequest: [{
messageKey
}],
peerDataOperationRequestType: proto.Message.PeerDataOperationRequestType.PLACEHOLDER_MESSAGE_RESEND
}

setTimeout(() => {
if(placeholderResendCache.get(messageKey?.id!)) {
logger.debug('PDO message without response after 15 seconds. Phone possibly offline', { messageKey })
placeholderResendCache.del(messageKey?.id!)
}
}, 15_000)

return sendPeerDataOperationMessage(pdoMessage)
}

Expand Down
24 changes: 16 additions & 8 deletions src/Socket/messages-send.ts
Original file line number Diff line number Diff line change
Expand Up @@ -465,11 +465,12 @@ export const makeMessagesSocket = (config: SocketConfig) => {
if(!participant) {
devices.push({ user })
// do not send message to self if the device is 0 (mobile)
if(meDevice !== undefined && meDevice !== 0) {
devices.push({ user: meUser })
}

if(!(additionalAttributes?.['category'] === 'peer' && user === meUser)) {
if(meDevice !== undefined && meDevice !== 0) {
devices.push({ user: meUser })
}

const additionalDevices = await getUSyncDevices([ meId, jid ], !!useUserDevicesCache, true)
devices.push(...additionalDevices)
}
Expand Down Expand Up @@ -506,11 +507,18 @@ export const makeMessagesSocket = (config: SocketConfig) => {
}

if(participants.length) {
binaryNodeContent.push({
tag: 'participants',
attrs: { },
content: participants
})
if(additionalAttributes?.['category'] === 'peer') {
const peerNode = participants[0]?.content?.[0] as BinaryNode
if(peerNode) {
binaryNodeContent.push(peerNode) // push only enc
}
} else {
binaryNodeContent.push({
tag: 'participants',
attrs: { },
content: participants
})
}
}

const stanza: BinaryNode = {
Expand Down
8 changes: 7 additions & 1 deletion src/Store/make-in-memory-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,14 @@ export default (config: BaileysInMemoryStoreConfig) => {
chats: newChats,
contacts: newContacts,
messages: newMessages,
isLatest
isLatest,
syncType
}) => {
if(syncType === proto.HistorySync.HistorySyncType.ON_DEMAND) {
return // FOR NOW,
//TODO: HANDLE
}

if(isLatest) {
chats.clear()

Expand Down
7 changes: 5 additions & 2 deletions src/Types/Events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ export type BaileysEventMap = {
chats: Chat[]
contacts: Contact[]
messages: WAMessage[]
isLatest: boolean
isLatest?: boolean
progress: number
syncType: proto.HistorySync.HistorySyncType
}
/** upsert chats */
'chats.upsert': Chat[]
Expand All @@ -41,8 +43,9 @@ export type BaileysEventMap = {
/**
* add/update the given messages. If they were received while the connection was online,
* the update will have type: "notify"
* if requestId is provided, then the messages was received from the phone due to it being unavailable
* */
'messages.upsert': { messages: WAMessage[], type: MessageUpsertType }
'messages.upsert': { messages: WAMessage[], type: MessageUpsertType, requestId?: string }
/** message was reacted to. If reaction was removed -- then "reaction.text" will be falsey */
'messages.reaction': { key: WAMessageKey, reaction: proto.IReaction }[]

Expand Down
2 changes: 2 additions & 0 deletions src/Types/Socket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ export type SocketConfig = {
userDevicesCache?: CacheStore
/** cache to store call offers */
callOfferCache?: CacheStore
/** cache to track placeholder resends */
placeholderResendCache?: CacheStore
/** width for link preview images */
linkPreviewImageThumbnailWidth: number
/** Should Baileys ask the phone for full history, will be received async */
Expand Down
2 changes: 2 additions & 0 deletions src/Utils/history.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ export const processHistoryMessage = (item: proto.IHistorySync) => {
chats,
contacts,
messages,
syncType: item.syncType,
progress: item.progress,
}
}

Expand Down
Loading

0 comments on commit bc46677

Please sign in to comment.