Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 125 additions & 0 deletions app/lib/methods/messageSync.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
import { Q } from '@nozbe/watermelondb';

import database from '../database';
import log from './helpers/log';
import { messagesStatus } from '../constants/messagesStatus';
import { changeMessageStatus, resendMessage } from './sendMessage';
import { getSingleMessage as getSingleMessageService } from '../services/restApi';
import type { TMessageModel } from '../../definitions';

const TEMP_RECONCILIATION_THRESHOLD_MS = 5 * 60 * 1000;

const hasMessageNotFoundHint = (value?: string): boolean =>
/message[\s_-]*not[\s_-]*found|error-message-not-found/i.test(value ?? '');

const shouldResendAfterLookupFailure = (error: unknown): boolean => {
if (!error) {
return false;
}

if (typeof error === 'string') {
return hasMessageNotFoundHint(error);
}

if (error instanceof Error) {
return hasMessageNotFoundHint(error.message);
}

const err = error as {
message?: string;
error?: string;
reason?: string;
data?: { message?: string; error?: string; errorType?: string };
};

return (
hasMessageNotFoundHint(err.message) ||
hasMessageNotFoundHint(err.error) ||
hasMessageNotFoundHint(err.reason) ||
hasMessageNotFoundHint(err.data?.message) ||
hasMessageNotFoundHint(err.data?.error) ||
hasMessageNotFoundHint(err.data?.errorType)
);
};

const processSequentially = <T>(items: T[], processItem: (item: T) => Promise<void>) =>
items.reduce<Promise<void>>(async (previous, item) => {
await previous;
await processItem(item);
}, Promise.resolve());

export async function reconcileTempMessages(): Promise<void> {
const db = database.active;
if (!db) {
return;
}

const msgCollection = db.get('messages');
const threshold = Date.now() - TEMP_RECONCILIATION_THRESHOLD_MS;

try {
const tempMessages = await msgCollection
.query(Q.where('status', messagesStatus.TEMP), Q.where('ts', Q.lt(threshold)))
.fetch();
await processSequentially(tempMessages as TMessageModel[], async record => {
try {
const result = await getSingleMessageService(record.id);
if (result?.success && result.message) {
await changeMessageStatus(
record.id,
messagesStatus.SENT,
record.tmid ?? undefined,
result.message
);
return;
}

if (shouldResendAfterLookupFailure(result)) {
try {
await resendMessage(record, record.tmid ?? undefined);
} catch (e) {
log(e);
}
return;
}

log(result);
} catch (e) {
if (shouldResendAfterLookupFailure(e)) {
try {
await resendMessage(record, record.tmid ?? undefined);
} catch (resendError) {
log(resendError);
}
return;
}

log(e);
}
Comment thread
deepak0x marked this conversation as resolved.
});
} catch (e) {
log(e);
}
}

export async function retryErrorMessages(): Promise<void> {
const db = database.active;
if (!db) {
return;
}

const msgCollection = db.get('messages');

try {
const errorMessages = await msgCollection.query(Q.where('status', messagesStatus.ERROR)).fetch();
await processSequentially(errorMessages as TMessageModel[], async record => {
try {
await resendMessage(record, record.tmid ?? undefined);
} catch (e) {
log(e);
}
});
} catch (e) {
log(e);
}
}
2 changes: 1 addition & 1 deletion app/lib/methods/sendMessage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import sdk from '../services/sdk';
import { E2E_MESSAGE_TYPE, E2E_STATUS } from '../constants/keys';
import { messagesStatus } from '../constants/messagesStatus';

const changeMessageStatus = async (id: string, status: number, tmid?: string, message?: IMessage) => {
export const changeMessageStatus = async (id: string, status: number, tmid?: string, message?: IMessage) => {
const db = database.active;
const msgCollection = db.get('messages');
const threadMessagesCollection = db.get('thread_messages');
Expand Down
4 changes: 3 additions & 1 deletion app/sagas/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import createDiscussion from './createDiscussion';
import encryption from './encryption';
import videoConf from './videoConf';
import troubleshootingNotification from './troubleshootingNotification';
import messageSync from './messageSync';

const root = function* root() {
yield all([
Expand All @@ -32,7 +33,8 @@ const root = function* root() {
inquiry(),
encryption(),
videoConf(),
troubleshootingNotification()
troubleshootingNotification(),
messageSync()
]);
};

Expand Down
10 changes: 10 additions & 0 deletions app/sagas/login.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import appNavigation from '../lib/navigation/appNavigation';
import { showActionSheetRef } from '../containers/ActionSheet';
import { SupportedVersionsWarning } from '../containers/SupportedVersions';
import { isIOS } from '../lib/methods/helpers';
import { reconcileTempMessages } from '../lib/methods/messageSync';

const getServer = state => state.server.server;
const loginWithPasswordCall = args => loginWithPassword(args);
Expand Down Expand Up @@ -223,6 +224,14 @@ const fetchUsersRoles = function* fetchRoomsFork() {
}
};

const reconcileTempMessagesSaga = function* reconcileTempMessagesSaga() {
try {
yield call(reconcileTempMessages);
} catch (e) {
log(e);
}
};

const handleLoginSuccess = function* handleLoginSuccess({ user }) {
try {
getUserPresence(user.id);
Expand All @@ -239,6 +248,7 @@ const handleLoginSuccess = function* handleLoginSuccess({ user }) {
yield fork(fetchEnterpriseModulesFork, { user });
yield fork(subscribeSettingsFork);
yield fork(fetchUsersRoles);
yield fork(reconcileTempMessagesSaga);

setLanguage(user?.language);

Expand Down
25 changes: 25 additions & 0 deletions app/sagas/messageSync.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import { call, select, takeEvery } from 'redux-saga/effects';

import { METEOR } from '../actions/actionsTypes';
import log from '../lib/methods/helpers/log';
import { retryErrorMessages } from '../lib/methods/messageSync';

const getUser = state => state.login.user;

const retryErrorMessagesSaga = function* retryErrorMessagesSaga() {
const user = yield select(getUser);
if (!user?.id) {
return;
}
try {
yield call(retryErrorMessages);
} catch (e) {
log(e);
}
};

const root = function* root() {
yield takeEvery(METEOR.SUCCESS, retryErrorMessagesSaga);
};

export default root;