Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 23 additions & 28 deletions db/sqllogfill.c
Original file line number Diff line number Diff line change
Expand Up @@ -313,40 +313,28 @@ void sql_logfill_metrics(int64_t *applied_recs, int64_t *applied_bytes, int64_t
*qsize = apply_queue_size();
}

static void print_record_info(const char *prefix, cdb2_hndl_tp *hndl)
static void print_record_info(const char *prefix, const char *lsn)
{
static int64_t cnt = 0;
static int lastprint = 0;
int now = comdb2_time_epoch();
cnt++;
if (now - lastprint > 0) {
char *lsn = (char *)cdb2_column_value(hndl, 0);
logmsg(LOGMSG_USER, "%s: lsn=%s cnt=%" PRId64 "\n", prefix, lsn, cnt);
lastprint = now;
}
}

static int gen_okay(bdb_state_type *bdb_state, cdb2_hndl_tp *hndl, u_int32_t mygen)
static inline int gen_okay(bdb_state_type *bdb_state, int64_t recgen, u_int32_t mygen)
{
int64_t *recgenp = (int64_t *)cdb2_column_value(hndl, 2);
if (!recgenp)
return 1;
return *recgenp <= mygen;
return recgen <= mygen;
}

static int apply_record(bdb_state_type *bdb_state, cdb2_hndl_tp *hndl, LOG_INFO *last_lsn, DB_LSN *gap_lsn,
u_int32_t gen)
static int apply_record(bdb_state_type *bdb_state, const char *lsn, void *blob, int blob_len, int64_t genp,
LOG_INFO *last_lsn, DB_LSN *gap_lsn, u_int32_t gen)
{
char *lsn;
void *blob;
DB_LSN mylsn = {0};
int blob_len, rc;
int64_t *genp;

lsn = (char *)cdb2_column_value(hndl, 0);
blob = cdb2_column_value(hndl, 4);
blob_len = cdb2_column_size(hndl, 4);
genp = (int64_t *)cdb2_column_value(hndl, 2);
int rc;

/* Validate blob pointer */
if (!blob && blob_len > 0) {
Expand All @@ -369,8 +357,8 @@ static int apply_record(bdb_state_type *bdb_state, cdb2_hndl_tp *hndl, LOG_INFO

/* Handle file boundary, inject REP_NEWFILE record */
if (last_lsn->file < mylsn.file) {
rc = handle_log(bdb_state, last_lsn->file, get_next_offset(bdb_state->dbenv, *last_lsn), REP_NEWFILE,
genp ? *genp : 0, NULL, 0, gen);
rc = handle_log(bdb_state, last_lsn->file, get_next_offset(bdb_state->dbenv, *last_lsn), REP_NEWFILE, genp,
NULL, 0, gen);
if (rc != 0 && rc != DB_REP_ISPERM && rc != DB_REP_NOTPERM) {
logmsg(LOGMSG_FATAL, "%s error applying newfile log record, rc=%d\n", __func__, rc);
exit(1);
Expand All @@ -383,7 +371,7 @@ static int apply_record(bdb_state_type *bdb_state, cdb2_hndl_tp *hndl, LOG_INFO

/* Skip applying the gap LSN itself, it will be applied from repdb */
if (!gap_lsn || log_compare(&mylsn, gap_lsn) < 0) {
rc = handle_log(bdb_state, mylsn.file, mylsn.offset, REP_LOG, genp ? *genp : 0, blob, blob_len, gen);
rc = handle_log(bdb_state, mylsn.file, mylsn.offset, REP_LOG, genp, blob, blob_len, gen);
if (rc != 0 && rc != DB_REP_ISPERM && rc != DB_REP_NOTPERM) {
logmsg(LOGMSG_FATAL, "%s error applying log record at [%u:%u], rc=%d\n", __func__, mylsn.file, mylsn.offset,
rc);
Expand All @@ -408,7 +396,7 @@ static void request_logs_from_master(bdb_state_type *bdb_state)

/* Copy master name under bdb-lock */
BDB_READLOCK(__func__);
if (!thedb->master) {
if (!thedb->master || thedb->master == gbl_myhostname) {
BDB_RELLOCK();
sleep(1);
return;
Expand Down Expand Up @@ -480,7 +468,8 @@ static void request_logs_from_master(bdb_state_type *bdb_state)
timeout = timeout > 0 ? timeout : 10;

/* Query transaction log with BLOCK and SENTINEL flags */
rc = snprintf(sql_cmd, SQL_CMD_LEN, "select * from comdb2_transaction_logs('{%u:%u}', NULL, %d, %d)",
rc = snprintf(sql_cmd, SQL_CMD_LEN,
"select lsn, generation, payload from comdb2_transaction_logs('{%u:%u}', NULL, %d, %d)",
last_lsn.file, last_lsn.offset, TRANLOG_FLAGS_BLOCK | TRANLOG_FLAGS_SENTINEL, timeout);

if (rc < 0 || rc >= SQL_CMD_LEN) {
Expand Down Expand Up @@ -522,7 +511,7 @@ static void request_logs_from_master(bdb_state_type *bdb_state)

/* First record is the duplicate at last_lsn- skip it */
if (gbl_debug_sql_logfill) {
print_record_info("first-record (duplicate, skipping)", hndl);
print_record_info("first-record (duplicate, skipping)", lsn);
}
sql_nexts++;

Expand All @@ -532,22 +521,28 @@ static void request_logs_from_master(bdb_state_type *bdb_state)

sql_nexts++;
if (gbl_debug_sql_logfill) {
print_record_info("record", hndl);
print_record_info("record", lsn);
}

BDB_READLOCK(__func__);
bdb_state->dbenv->get_rep_gen(bdb_state->dbenv, &gen);

int64_t *recgenp = (int64_t *)cdb2_column_value(hndl, 1);

/* Verify gen before applying record */
if (firstgen != gen || !gen_okay(bdb_state, hndl, gen)) {
if (firstgen != gen || (recgenp != NULL && !gen_okay(bdb_state, *recgenp, gen))) {
if (gbl_debug_sql_logfill) {
int64_t *recgenp = (int64_t *)cdb2_column_value(hndl, 2);
logmsg(LOGMSG_USER, "%s: exiting apply loop, firstgen=%u gen=%u recgen=%" PRId64 "\n", __func__,
firstgen, gen, recgenp ? *recgenp : 0);
}
BDB_RELLOCK();
break;
}
rc = apply_record(bdb_state, hndl, &last_lsn, &gap_lsn, gen);

void *blob = cdb2_column_value(hndl, 2);
int blob_len = cdb2_column_size(hndl, 2);

rc = apply_record(bdb_state, lsn, blob, blob_len, recgenp ? *recgenp : 0, &last_lsn, &gap_lsn, gen);
if (rc != 0) {
logmsg(LOGMSG_ERROR, "%s: apply_record failed rc=%d - exiting apply loop\n", __func__, rc);
BDB_RELLOCK();
Expand Down