Skip to content
Closed
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ WORKDIR /evolution

COPY ./package.json ./tsconfig.json ./

RUN npm install
RUN npm install --force --legacy-peer-deps

COPY ./src ./src
COPY ./public ./public
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-- AlterTable
ALTER TABLE `OpenaiSetting` MODIFY COLUMN `speechToText` BOOLEAN NULL DEFAULT true;

-- Update existing records to use the new default
UPDATE `OpenaiSetting` SET `speechToText` = true WHERE `speechToText` IS NULL OR `speechToText` = false;
2 changes: 1 addition & 1 deletion prisma/mysql-schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ model OpenaiSetting {
ignoreJids Json?
splitMessages Boolean? @default(false)
timePerChar Int? @default(50) @db.Int
speechToText Boolean? @default(false)
speechToText Boolean? @default(true)
createdAt DateTime? @default(dbgenerated("CURRENT_TIMESTAMP")) @db.Timestamp
updatedAt DateTime @updatedAt @db.Timestamp
OpenaiCreds OpenaiCreds? @relation(fields: [openaiCredsId], references: [id])
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-- AlterTable
ALTER TABLE "OpenaiSetting" ALTER COLUMN "speechToText" SET DEFAULT true;

-- Update existing records to use the new default
UPDATE "OpenaiSetting" SET "speechToText" = true WHERE "speechToText" IS NULL OR "speechToText" = false;
2 changes: 1 addition & 1 deletion prisma/postgresql-schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,7 @@ model OpenaiSetting {
ignoreJids Json?
splitMessages Boolean? @default(false) @db.Boolean
timePerChar Int? @default(50) @db.Integer
speechToText Boolean? @default(false) @db.Boolean
speechToText Boolean? @default(true) @db.Boolean
createdAt DateTime? @default(now()) @db.Timestamp
updatedAt DateTime @updatedAt @db.Timestamp
OpenaiCreds OpenaiCreds? @relation(fields: [openaiCredsId], references: [id])
Expand Down
229 changes: 187 additions & 42 deletions src/api/integrations/channel/whatsapp/whatsapp.baileys.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ import Long from 'long';
import mimeTypes from 'mime-types';
import NodeCache from 'node-cache';
import cron from 'node-cron';
import dayjs from 'dayjs';
import { release } from 'os';
import { join } from 'path';
import P from 'pino';
Expand All @@ -151,6 +152,52 @@ import { useVoiceCallsBaileys } from './voiceCalls/useVoiceCallsBaileys';

const groupMetadataCache = new CacheService(new CacheEngine(configService, 'groups').getEngine());

// Function to normalize JID and handle LID/JID conversion
function normalizeJid(jid: string): string {
if (!jid) return jid;

// Remove LID suffix and convert to standard JID format
if (jid.includes(':lid')) {
return jid.split(':')[0] + '@s.whatsapp.net';
}

// Remove participant suffix from group messages
if (jid.includes(':') && jid.includes('@g.us')) {
return jid.split(':')[0] + '@g.us';
}

// Remove any other participant suffixes
if (jid.includes(':') && !jid.includes('@g.us')) {
return jid.split(':')[0] + '@s.whatsapp.net';
}

return jid;
}

// Function to clear corrupted session data
async function clearCorruptedSessionData(instanceId: string, baileysCache: CacheService) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (bug_risk): The cache clearing patterns in clearCorruptedSessionData may be overly broad.

The current cache deletion patterns may match unintended keys, risking accidental data loss. Please refine them to target only the necessary session data.

Suggested implementation:

    // Clear only session-related cache keys for this instance
    const patterns = [
      `${instanceId}:session*`,
      `${instanceId}:prekey*`,
      `${instanceId}:app-state-sync-key*`,
      `${instanceId}:signal-identities*`
    ];
    for (const pattern of patterns) {
      await baileysCache.deleteByPattern(pattern);
    }

try {
// Clear all baileys cache for this instance
await baileysCache.deleteAll(instanceId);

// Clear session-related cache patterns
const patterns = [
`${instanceId}_*`,
`*${instanceId}*`,
`*session*${instanceId}*`,
`*prekey*${instanceId}*`
];

for (const pattern of patterns) {
await baileysCache.deleteAll(pattern);
}

console.log(`Cleared corrupted session data for instance: ${instanceId}`);
} catch (error) {
console.error('Error clearing session data:', error);
}
}

// Adicione a função getVideoDuration no início do arquivo
async function getVideoDuration(input: Buffer | string | Readable): Promise<number> {
const MediaInfoFactory = (await import('mediainfo.js')).default;
Expand Down Expand Up @@ -375,6 +422,11 @@ export class BaileysStartupService extends ChannelStartupService {
state: connection,
statusReason: (lastDisconnect?.error as Boom)?.output?.statusCode ?? 200,
};

this.logger.log(`Connection state changed to: ${connection}, instance: ${this.instance.id}`);
if (lastDisconnect?.error) {
this.logger.warn(`Connection error: ${JSON.stringify(lastDisconnect.error)}`);
}
Comment on lines +434 to +437
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚨 suggestion (security): Logging sensitive error details may expose internal information.

Sanitize or redact sensitive fields from lastDisconnect.error before logging, particularly in production.

Suggested change
this.logger.log(`Connection state changed to: ${connection}, instance: ${this.instance.id}`);
if (lastDisconnect?.error) {
this.logger.warn(`Connection error: ${JSON.stringify(lastDisconnect.error)}`);
}
this.logger.log(`Connection state changed to: ${connection}, instance: ${this.instance.id}`);
if (lastDisconnect?.error) {
// Sanitize error object before logging
const error = lastDisconnect.error;
const sanitizedError = { ...error };
// Redact sensitive fields if present
if ('stack' in sanitizedError) sanitizedError.stack = '[REDACTED]';
if ('message' in sanitizedError && process.env.NODE_ENV === 'production') sanitizedError.message = '[REDACTED]';
if ('output' in sanitizedError && sanitizedError.output?.payload) sanitizedError.output.payload = '[REDACTED]';
this.logger.warn(`Connection error: ${JSON.stringify(sanitizedError)}`);
}

}

if (connection === 'close') {
Expand Down Expand Up @@ -653,6 +705,9 @@ export class BaileysStartupService extends ChannelStartupService {

this.endSession = false;

// Clear any corrupted session data before connecting
await clearCorruptedSessionData(this.instanceId, this.baileysCache);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (performance): Clearing session data on every connect may impact performance.

If clearing session data isn't always required, consider making this operation conditional or rate-limited to avoid unnecessary latency, especially under high load.

Suggested implementation:

    // Rate-limit clearing corrupted session data to avoid unnecessary latency
    const SESSION_CLEAR_INTERVAL_MS = 10 * 60 * 1000; // 10 minutes
    if (!this.lastSessionClear || Date.now() - this.lastSessionClear > SESSION_CLEAR_INTERVAL_MS) {
      await clearCorruptedSessionData(this.instanceId, this.baileysCache);
      this.lastSessionClear = Date.now();
    }

    this.client = makeWASocket(socketConfig);

You must ensure that this.lastSessionClear is initialized in the class constructor, e.g.:

this.lastSessionClear = 0;

If you want a different condition (e.g., a flag or a more advanced rate limiter), adjust the logic accordingly.


this.client = makeWASocket(socketConfig);

if (this.localSettings.wavoipToken && this.localSettings.wavoipToken.length > 0) {
Expand Down Expand Up @@ -1091,7 +1146,8 @@ export class BaileysStartupService extends ChannelStartupService {
}
}

const messageKey = `${this.instance.id}_${received.key.id}`;
const normalizedJid = normalizeJid(received.key.remoteJid);
const messageKey = `${this.instance.id}_${normalizedJid}_${received.key.id}`;
const cached = await this.baileysCache.get(messageKey);

if (cached && !editedMessage) {
Expand All @@ -1118,8 +1174,9 @@ export class BaileysStartupService extends ChannelStartupService {
continue;
}

const normalizedRemoteJid = normalizeJid(received.key.remoteJid);
const existingChat = await this.prismaRepository.chat.findFirst({
where: { instanceId: this.instanceId, remoteJid: received.key.remoteJid },
where: { instanceId: this.instanceId, remoteJid: normalizedRemoteJid },
select: { id: true, name: true },
});

Expand Down Expand Up @@ -1198,7 +1255,8 @@ export class BaileysStartupService extends ChannelStartupService {
const { remoteJid } = received.key;
const timestamp = msg.messageTimestamp;
const fromMe = received.key.fromMe.toString();
const messageKey = `${remoteJid}_${timestamp}_${fromMe}`;
const normalizedRemoteJid = normalizeJid(remoteJid);
const messageKey = `${normalizedRemoteJid}_${timestamp}_${fromMe}`;

const cachedTimestamp = await this.baileysCache.get(messageKey);

Expand Down Expand Up @@ -1295,6 +1353,41 @@ export class BaileysStartupService extends ChannelStartupService {

this.sendDataWebhook(Events.MESSAGES_UPSERT, messageRaw);

// Schedule automatic status update for PENDING sent messages
if (messageRaw.key.fromMe && messageRaw.status === 'PENDING') {
setTimeout(async () => {
try {
const stillPendingMessage = await this.prismaRepository.message.findFirst({
where: {
instanceId: this.instanceId,
key: { path: ['id'], equals: messageRaw.key.id },
status: 'PENDING'
}
});

if (stillPendingMessage) {
this.logger.warn(`Forcing status update for PENDING message after timeout: ${messageRaw.key.id}`);
await this.prismaRepository.message.update({
where: { id: stillPendingMessage.id },
data: { status: 'SERVER_ACK' }
});

// Emit webhook for the status change
this.sendDataWebhook(Events.MESSAGES_UPDATE, {
messageId: stillPendingMessage.id,
keyId: messageRaw.key.id,
remoteJid: messageRaw.key.remoteJid,
fromMe: messageRaw.key.fromMe,
status: 'SERVER_ACK',
instanceId: this.instanceId
});
}
} catch (error) {
this.logger.error(`Error updating PENDING message status: ${error.message}`);
}
}, 30000); // 30 seconds timeout
}

await chatbotController.emit({
instance: { instanceName: this.instance.name, instanceId: this.instanceId },
remoteJid: messageRaw.key.remoteJid,
Expand All @@ -1303,13 +1396,13 @@ export class BaileysStartupService extends ChannelStartupService {
});

const contact = await this.prismaRepository.contact.findFirst({
where: { remoteJid: received.key.remoteJid, instanceId: this.instanceId },
where: { remoteJid: normalizedRemoteJid, instanceId: this.instanceId },
});

const contactRaw: { remoteJid: string; pushName: string; profilePicUrl?: string; instanceId: string } = {
remoteJid: received.key.remoteJid,
remoteJid: normalizedRemoteJid,
pushName: received.key.fromMe ? '' : received.key.fromMe == null ? '' : received.pushName,
profilePicUrl: (await this.profilePicture(received.key.remoteJid)).profilePictureUrl,
profilePicUrl: (await this.profilePicture(normalizedRemoteJid)).profilePictureUrl,
instanceId: this.instanceId,
};

Expand Down Expand Up @@ -1366,7 +1459,11 @@ export class BaileysStartupService extends ChannelStartupService {
continue;
}

const updateKey = `${this.instance.id}_${key.id}_${update.status}`;
// Normalize JID and ensure we have valid key components
const normalizedJid = normalizeJid(key.remoteJid);
const messageId = key.id || 'unknown';
const status = update.status || 'unknown';
const updateKey = `${this.instance.id}_${normalizedJid}_${messageId}_${status}`;

const cached = await this.baileysCache.get(updateKey);

Expand Down Expand Up @@ -1436,32 +1533,34 @@ export class BaileysStartupService extends ChannelStartupService {

continue;
} else if (update.status !== undefined && status[update.status] !== findMessage.status) {
if (!key.fromMe && key.remoteJid) {
readChatToUpdate[key.remoteJid] = true;
const { remoteJid } = key;
const timestamp = findMessage.messageTimestamp;
const fromMe = key.fromMe.toString();
const normalizedRemoteJid = normalizeJid(remoteJid);
const messageKey = `${normalizedRemoteJid}_${timestamp}_${fromMe}`;

const { remoteJid } = key;
const timestamp = findMessage.messageTimestamp;
const fromMe = key.fromMe.toString();
const messageKey = `${remoteJid}_${timestamp}_${fromMe}`;
const cachedTimestamp = await this.baileysCache.get(messageKey);

const cachedTimestamp = await this.baileysCache.get(messageKey);
if (!cachedTimestamp) {
// Handle read status for received messages
if (!key.fromMe && key.remoteJid && status[update.status] === status[4]) {
readChatToUpdate[key.remoteJid] = true;
this.logger.log(`Update as read in message.update ${remoteJid} - ${timestamp}`);
await this.updateMessagesReadedByTimestamp(remoteJid, timestamp);
await this.baileysCache.set(messageKey, true, 5 * 60);
}

if (!cachedTimestamp) {
if (status[update.status] === status[4]) {
this.logger.log(`Update as read in message.update ${remoteJid} - ${timestamp}`);
await this.updateMessagesReadedByTimestamp(remoteJid, timestamp);
await this.baileysCache.set(messageKey, true, 5 * 60);
}
// Update message status for all messages (sent and received)
await this.prismaRepository.message.update({
where: { id: findMessage.id },
data: { status: status[update.status] },
});

await this.prismaRepository.message.update({
where: { id: findMessage.id },
data: { status: status[update.status] },
});
} else {
this.logger.info(
`Update readed messages duplicated ignored in message.update [avoid deadlock]: ${messageKey}`,
);
}
this.logger.log(`Message status updated from ${findMessage.status} to ${status[update.status]} for message ${key.id}`);
} else {
this.logger.info(
`Update messages duplicated ignored in message.update [avoid deadlock]: ${messageKey}`,
);
}
}

Expand Down Expand Up @@ -1888,11 +1987,19 @@ export class BaileysStartupService extends ChannelStartupService {
}

if (message['conversation']) {
return await this.client.sendMessage(
sender,
{ text: message['conversation'], mentions, linkPreview: linkPreview } as unknown as AnyMessageContent,
option as unknown as MiscMessageGenerationOptions,
);
try {
this.logger.log(`Attempting to send conversation message to ${sender}: ${message['conversation']}`);
const result = await this.client.sendMessage(
sender,
{ text: message['conversation'], mentions, linkPreview: linkPreview } as unknown as AnyMessageContent,
option as unknown as MiscMessageGenerationOptions,
);
this.logger.log(`Message sent successfully with ID: ${result.key.id}`);
return result;
} catch (error) {
this.logger.error(`Failed to send message to ${sender}: ${error.message || JSON.stringify(error)}`);
throw error;
}
}

if (!message['audio'] && !message['poll'] && !message['sticker'] && sender != 'status@broadcast') {
Expand Down Expand Up @@ -3158,12 +3265,35 @@ export class BaileysStartupService extends ChannelStartupService {
const cachedNumbers = await getOnWhatsappCache(numbersToVerify);
console.log('cachedNumbers', cachedNumbers);

const filteredNumbers = numbersToVerify.filter(
(jid) => !cachedNumbers.some((cached) => cached.jidOptions.includes(jid)),
);
// Filter numbers that are not cached OR should be re-verified
const filteredNumbers = numbersToVerify.filter((jid) => {
const cached = cachedNumbers.find((cached) => cached.jidOptions.includes(jid));
// If not cached, we should verify
if (!cached) return true;

// For Brazilian numbers, force verification if both formats exist in cache
// to ensure we're using the correct format
const isBrazilian = jid.startsWith('55') && jid.includes('@s.whatsapp.net');
if (isBrazilian) {
const numberPart = jid.replace('@s.whatsapp.net', '');
const hasDigit9 = numberPart.length === 13 && numberPart.slice(4, 5) === '9';
const altFormat = hasDigit9
? numberPart.slice(0, 4) + numberPart.slice(5)
: numberPart.slice(0, 4) + '9' + numberPart.slice(4);
const altJid = altFormat + '@s.whatsapp.net';

// If both formats exist in cache, prefer the one with 9
const altCached = cachedNumbers.find((c) => c.jidOptions.includes(altJid));
if (cached && altCached && !hasDigit9) {
return true; // Force verification to get the correct format
}
}

return false; // Use cached result
});
console.log('filteredNumbers', filteredNumbers);

const verify = await this.client.onWhatsApp(...filteredNumbers);
const verify = filteredNumbers.length > 0 ? await this.client.onWhatsApp(...filteredNumbers) : [];
console.log('verify', verify);
normalVerifiedUsers = await Promise.all(
normalUsers.map(async (user) => {
Expand Down Expand Up @@ -3259,7 +3389,6 @@ export class BaileysStartupService extends ChannelStartupService {
.filter((user) => user.exists)
.map((user) => ({
remoteJid: user.jid,
jidOptions: user.jid.replace('+', ''),
lid: user.lid,
})),
);
Expand Down Expand Up @@ -4130,8 +4259,15 @@ export class BaileysStartupService extends ChannelStartupService {
const contentType = getContentType(message.message);
const contentMsg = message?.message[contentType] as any;

// Normalize JID to handle LID/JID conversion
const normalizedKey = {
...message.key,
remoteJid: normalizeJid(message.key.remoteJid),
participant: message.key.participant ? normalizeJid(message.key.participant) : undefined,
};

const messageRaw = {
key: message.key,
key: normalizedKey,
pushName:
message.pushName ||
(message.key.fromMe
Expand All @@ -4146,8 +4282,17 @@ export class BaileysStartupService extends ChannelStartupService {
source: getDevice(message.key.id),
};

if (!messageRaw.status && message.key.fromMe === false) {
messageRaw.status = status[3]; // DELIVERED MESSAGE
// Log for debugging PENDING status
if (message.key.fromMe && (!message.status || message.status === 1)) {
this.logger.warn(`Message sent with PENDING status - ID: ${message.key.id}, Instance: ${this.instance.id}, Status: ${message.status}, RemoteJid: ${message.key.remoteJid}`);
}

if (!messageRaw.status) {
if (message.key.fromMe === false) {
messageRaw.status = status[3]; // DELIVERED MESSAGE for received messages
} else {
messageRaw.status = status[2]; // SERVER_ACK for sent messages without status
}
}
Comment on lines +4409 to 4415
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (bug_risk): Defaulting sent message status to SERVER_ACK may mask delivery issues.

Review whether SERVER_ACK is the appropriate default for all sent messages, or if stricter checks are needed to avoid hiding delivery problems.

Suggested change
if (!messageRaw.status) {
if (message.key.fromMe === false) {
messageRaw.status = status[3]; // DELIVERED MESSAGE for received messages
} else {
messageRaw.status = status[2]; // SERVER_ACK for sent messages without status
}
}
if (!messageRaw.status) {
if (message.key.fromMe === false) {
messageRaw.status = status[3]; // DELIVERED MESSAGE for received messages
} else {
// Only set SERVER_ACK if we have evidence of server acknowledgement
if (message.status && message.status >= status[2]) {
messageRaw.status = status[2]; // SERVER_ACK for sent messages with server ack
} else {
// Otherwise, set to PENDING and log a warning
messageRaw.status = status[1]; // PENDING
this.logger.warn(
`Sent message without explicit server ack - defaulting to PENDING. ID: ${message.key.id}, Instance: ${this.instance.id}, Status: ${message.status}, RemoteJid: ${message.key.remoteJid}`
);
}
}
}


if (messageRaw.message.extendedTextMessage) {
Expand Down
Loading