diff --git a/C/impl/sonLibKVDatabaseConf.c b/C/impl/sonLibKVDatabaseConf.c index 765e6bb..24423c9 100644 --- a/C/impl/sonLibKVDatabaseConf.c +++ b/C/impl/sonLibKVDatabaseConf.c @@ -27,6 +27,7 @@ struct stKVDatabaseConf { unsigned port; int timeout; int64_t maxKTRecordSize; + int64_t maxRedisRecordSize; int64_t maxKTBulkSetSize; int64_t maxRedisBulkSetSize; int64_t maxKTBulkSetNumRecords; @@ -60,12 +61,14 @@ stKVDatabaseConf *stKVDatabaseConf_constructKyotoTycoon(const char *host, unsign return conf; } -stKVDatabaseConf *stKVDatabaseConf_constructRedis(const char *host, unsigned port, int64_t maxRedisBulkSetSize) { +stKVDatabaseConf *stKVDatabaseConf_constructRedis(const char *host, unsigned port, + int64_t maxRecordSize, int64_t maxBulkSetSize) { stKVDatabaseConf *conf = stSafeCCalloc(sizeof(stKVDatabaseConf)); conf->type = stKVDatabaseTypeRedis; conf->host = stString_copy(host); conf->port = port; - conf->maxRedisBulkSetSize = maxRedisBulkSetSize; + conf->maxRedisRecordSize = maxRecordSize; + conf->maxRedisBulkSetSize = maxBulkSetSize; return conf; } @@ -220,6 +223,18 @@ static int64_t getXMLMaxKTBulkSetNumRecords(stHash *hash) { } } +/* Default to 510M. Redis keys and values cannot be greater than 512M + * refer to https://redis.io/topics/data-types-intro + */ +static int64_t getXMLMaxRedisRecordSize(stHash *hash) { + const char *value = stHash_search(hash, "max_record_size"); + if (value == NULL) { + return (int64_t) 510000000; + } else { + return stSafeStrToInt64(value); + } +} + /* Default to 1.75GB which seems to be about where the * redis network error danger zone starts */ @@ -257,7 +272,7 @@ static stKVDatabaseConf *constructFromString(const char *xmlString) { getXmlValueRequired(hash, "database_name"), getXmlValueRequired(hash, "table_name")); } else if (stString_eq(type, "redis")) { databaseConf = stKVDatabaseConf_constructRedis(getXmlValueRequired(hash, "host"), getXmlPort(hash), - getXMLMaxRedisBulkSetSize(hash)); + getXMLMaxRedisRecordSize(hash), getXMLMaxRedisBulkSetSize(hash)); } else { stThrowNew(ST_KV_DATABASE_EXCEPTION_ID, "invalid database type \"%s\"", type); } @@ -285,6 +300,7 @@ stKVDatabaseConf *stKVDatabaseConf_constructClone(stKVDatabaseConf *srcConf) { conf->timeout = srcConf->timeout; conf->maxKTRecordSize = srcConf->maxKTRecordSize; conf->maxKTBulkSetSize = srcConf->maxKTBulkSetSize; + conf->maxRedisRecordSize = srcConf->maxRedisRecordSize; conf->maxRedisBulkSetSize = srcConf->maxRedisBulkSetSize; conf->maxKTBulkSetNumRecords = srcConf->maxKTBulkSetNumRecords; conf->user = stString_copy(srcConf->user); @@ -343,6 +359,15 @@ int64_t stKVDatabaseConf_getMaxKTBulkSetNumRecords(stKVDatabaseConf *conf) { return conf->maxKTBulkSetNumRecords; } +int64_t stKVDatabaseConf_getMaxRedisRecordSize(stKVDatabaseConf *conf) { + return conf->maxRedisRecordSize; +} + +void stKVDatabaseConf_setMaxRedisRecordSize(stKVDatabaseConf *conf, + int64_t maxRecordSize) { + conf->maxRedisRecordSize = maxRecordSize; +} + int64_t stKVDatabaseConf_getMaxRedisBulkSetSize(stKVDatabaseConf *conf) { return conf->maxRedisBulkSetSize; } diff --git a/C/impl/sonLibKVDatabase_Redis.c b/C/impl/sonLibKVDatabase_Redis.c index 1485dfa..3a93805 100644 --- a/C/impl/sonLibKVDatabase_Redis.c +++ b/C/impl/sonLibKVDatabase_Redis.c @@ -4,10 +4,13 @@ #ifdef HAVE_REDIS #include "hiredis.h" +#define SPLIT_RECORDS_KEY "_SPLIT_RECORDS" + // Thin wrapper around the redis context, in case we want to support // something like auth, using a different DB number, etc. typedef struct { redisContext *ctxt; + stSet *splitRecords; } RedisDB; static redisReply *stRedisCommand(RedisDB *db, const char *string, ...) { @@ -52,6 +55,27 @@ static redisReply *stRedisGetReply(RedisDB *db) { return reply; } +static void fillSplitRecordsCache(RedisDB* db){ + if (db->splitRecords != NULL){ + stSet_destruct(db->splitRecords); + } + db->splitRecords = stSet_construct3((uint64_t (*)(const void *)) stIntTuple_hashKey, + (int (*)(const void *, const void *)) stIntTuple_equalsFn, + (void (*)(void *)) stIntTuple_destruct); + redisReply *reply = stRedisCommand(db, "SMEMBERS %s", SPLIT_RECORDS_KEY); + if (reply->type == REDIS_REPLY_ARRAY) { + for (int64_t i = 0; i < reply->elements; i++) { + redisReply* subReply = reply->element[i]; + assert(subReply->len == sizeof(int64_t)); + int64_t key; + memcpy((char *) &key, subReply->str, sizeof(int64_t)); + stIntTuple *keyTuple = stIntTuple_construct1(key); + stSet_insert(db->splitRecords, keyTuple); + } + } + freeReplyObject(reply); +} + /* connect to a database server */ static RedisDB *connect(stKVDatabaseConf *conf) { RedisDB *db = st_calloc(1, sizeof(RedisDB)); @@ -63,12 +87,15 @@ static RedisDB *connect(stKVDatabaseConf *conf) { free(db); stThrow(except); } + db->splitRecords = NULL; + fillSplitRecordsCache(db); return db; } static void destructDB(stKVDatabase *database) { RedisDB *db = database->dbImpl; redisFree(db->ctxt); + free(db->splitRecords); free(db); database->dbImpl = NULL; } @@ -84,6 +111,51 @@ static void insertInt64(stKVDatabase *database, int64_t key, int64_t value) { freeReplyObject(reply); } +static bool containsRecord(stKVDatabase *database, int64_t key) { + redisReply *reply = stRedisCommand(database->dbImpl, "EXISTS %" PRIi64, key); + assert(reply->type == REDIS_REPLY_INTEGER); + bool exists; + if (reply->integer) { + exists = true; + } else { + exists = false; + } + freeReplyObject(reply); + return exists; +} + +/* check if a record is split directly from database*/ +static bool recordIsSplitDB(stKVDatabase *database, int64_t key) { + redisReply *reply = stRedisCommand(database->dbImpl, "SISMEMBER %s %b", SPLIT_RECORDS_KEY, (char *) &key, sizeof(int64_t)); + assert(reply->type == REDIS_REPLY_INTEGER); + bool isSplit; + if (reply->integer) { + isSplit = true; + } else { + isSplit = false; + } + freeReplyObject(reply); + return isSplit; +} + +/* check if a record is split from cache + * It's recomended to use this function instead of recordIsSplitDB() whenever + * the commands are being pipelined so it avoids adding "SISMEMBER" to piplining + * (Make sure to run fillSplitRecordsCache() before using this function)*/ +static bool recordIsSplitCache(stKVDatabase *database, int64_t key) +{ + RedisDB *db = (RedisDB *) database->dbImpl; + stIntTuple *tuple = stIntTuple_construct1(key); + bool isSplit; + if (stSet_search(db->splitRecords, tuple) == NULL) { + isSplit = false; + } else { + isSplit = true; + } + stIntTuple_destruct(tuple); + return isSplit; +} + static void updateInt64(stKVDatabase *database, int64_t key, int64_t value) { // NB: this doesn't quite follow the "update" semantics: it will // insert the key if it doesn't exist. This could be fixed by @@ -91,12 +163,63 @@ static void updateInt64(stKVDatabase *database, int64_t key, int64_t value) { insertInt64(database, key, value); } -static void insertRecord(stKVDatabase *database, int64_t key, const void *value, int64_t sizeOfRecord) { - redisReply *reply = stRedisCommand(database->dbImpl, "SET %" PRIi64 " %b", key, - value, sizeOfRecord); +static void removeSplitRecordIfPresent(stKVDatabase *database, int64_t key) { + if (recordIsSplitDB(database, key) == true) { + redisReply *reply; + if (getenv("ST_SPLIT_RECORD_DBG") != NULL) { + fprintf(stderr, "removing split record %" PRIi64 "\n", key); + } + reply = stRedisCommand(database->dbImpl, "SREM %s %b", SPLIT_RECORDS_KEY, (char *) &key, sizeof(int64_t)); + freeReplyObject(reply); + reply = stRedisCommand(database->dbImpl, "DEL %" PRIi64, key); + freeReplyObject(reply); + } +} + +static void removeRecord(stKVDatabase *database, int64_t key) { + redisReply *reply; + if (recordIsSplitDB(database, key) == true) { + if (getenv("ST_SPLIT_RECORD_DBG") != NULL) { + fprintf(stderr, "removing split record %" PRIi64 "\n", key); + } + reply = stRedisCommand(database->dbImpl, "SREM %s %b", SPLIT_RECORDS_KEY, (char *) &key, sizeof(int64_t)); + freeReplyObject(reply); + } + reply = stRedisCommand(database->dbImpl, "DEL %" PRIi64, key); freeReplyObject(reply); } +static void insertRecord(stKVDatabase *database, int64_t key, const void *value, int64_t sizeOfRecord) { + stKVDatabaseConf* conf = stKVDatabase_getConf(database); + int64_t maxRecordSize = stKVDatabaseConf_getMaxRedisRecordSize(conf); + redisReply *reply; + if (sizeOfRecord > maxRecordSize) + { + removeRecord(database, key); + int64_t numSplitRecords = sizeOfRecord / maxRecordSize + 1; + if (getenv("ST_SPLIT_RECORD_DBG") != NULL) { + fprintf(stderr, "attempting to create %" PRIi64 " records for key %" PRIi64 "\n", numSplitRecords, key); + } + + reply = stRedisCommand(database->dbImpl, "SADD %s %b", SPLIT_RECORDS_KEY, (char *) &key, sizeof(int64_t)); + freeReplyObject(reply); + for (int64_t i = 0; i < numSplitRecords; i++) { + reply = stRedisCommand(database->dbImpl, "RPUSH %" PRIi64 " %b", key, + ((const char *)value) + i * maxRecordSize, + (i == numSplitRecords - 1) + ? sizeOfRecord - i * maxRecordSize + : maxRecordSize); + freeReplyObject(reply); + } + } + else + { + removeSplitRecordIfPresent(database, key); + redisReply *reply = stRedisCommand(database->dbImpl, "SET %" PRIi64 " %b", key, value, sizeOfRecord); + freeReplyObject(reply); + } +} + static void updateRecord(stKVDatabase *database, int64_t key, const void *value, int64_t sizeOfRecord) { // See caveats about the "update" in the comment to updateInt64 above. @@ -112,21 +235,43 @@ static int64_t numberOfRecords(stKVDatabase *database) { } static void *getRecord2(stKVDatabase *database, int64_t key, int64_t *sizeOfRecord) { - redisReply *reply = stRedisCommand(database->dbImpl, "GET %" PRIi64, key); + char* record = NULL; + stKVDatabaseConf* conf = stKVDatabase_getConf(database); + int64_t maxRecordSize = stKVDatabaseConf_getMaxRedisRecordSize(conf); + redisReply *reply; + if (recordIsSplitDB(database, key)) { + if (getenv("ST_SPLIT_RECORD_DBG") != NULL) { + fprintf(stderr, "attempting to read split records from record %" PRIi64 "\n",key); + } + reply = stRedisCommand(database->dbImpl, "LRANGE %" PRIi64 " 0 -1", key); + } + else { + reply = stRedisCommand(database->dbImpl, "GET %" PRIi64, key); + } + if (reply->type == REDIS_REPLY_NIL) { // No key in DB. freeReplyObject(reply); return NULL; - } else { - assert(reply->type == REDIS_REPLY_STRING); - void *ret = malloc(reply->len); - memcpy(ret, reply->str, reply->len); + } else if (reply->type == REDIS_REPLY_STRING) { + record = malloc(reply->len); + memcpy(record, reply->str, reply->len); if (sizeOfRecord != NULL) { *sizeOfRecord = reply->len; } - freeReplyObject(reply); - return ret; + } + else { // key is split so it should be saved as a list + assert(reply->type == REDIS_REPLY_ARRAY); + record = (char *) st_malloc(maxRecordSize * reply->elements); + *sizeOfRecord = 0; + for (int64_t i = 0; i < reply->elements; i++) { + redisReply* subReply = reply->element[i]; + *sizeOfRecord += subReply->len; + memcpy(record + i * maxRecordSize, subReply->str, subReply->len); + } } + freeReplyObject(reply); + return record; } static int64_t getInt64(stKVDatabase *database, int64_t key) { @@ -150,38 +295,39 @@ static void *getRecord(stKVDatabase *database, int64_t key) { return getRecord2(database, key, NULL); } -static bool containsRecord(stKVDatabase *database, int64_t key) { - redisReply *reply = stRedisCommand(database->dbImpl, "EXISTS %" PRIi64, key); - assert(reply->type == REDIS_REPLY_INTEGER); - bool exists; - if (reply->integer) { - exists = true; - } else { - exists = false; - } - freeReplyObject(reply); - return exists; -} - static void *getPartialRecord(stKVDatabase *database, int64_t key, int64_t zeroBasedByteOffset, int64_t sizeInBytes, int64_t recordSize) { - redisReply *reply = stRedisCommand(database->dbImpl, "GETRANGE %" PRIi64 " %" PRIi64 " %" PRIi64, - key, zeroBasedByteOffset, - zeroBasedByteOffset + sizeInBytes - 1); - assert(reply->type == REDIS_REPLY_STRING); - if (reply->len != sizeInBytes) { - int64_t len = reply->len; + char* partialRecord; + if (recordIsSplitDB(database, key)){ + // The record is split so get the whole value and extract the partial value + int64_t recordSize2; + char *record = (char *)getRecord2(database, key, &recordSize2); + if(recordSize2 != recordSize) { + stThrowNew(ST_KV_DATABASE_EXCEPTION_ID, "The given record size is incorrect: %lld, should be %lld", (long long)recordSize, recordSize2); + } + if(record == NULL) { + stThrowNew(ST_KV_DATABASE_EXCEPTION_ID, "The record does not exist: %lld for partial retrieval", (long long)key); + } + if(zeroBasedByteOffset < 0 || sizeInBytes < 0 || zeroBasedByteOffset + sizeInBytes > recordSize) { + stThrowNew(ST_KV_DATABASE_EXCEPTION_ID, "Partial record retrieval to out of bounds memory, record size: %lld, requested start: %lld, requested size: %lld", (long long)recordSize, (long long)zeroBasedByteOffset, (long long)sizeInBytes); + } + partialRecord = memcpy(st_malloc(sizeInBytes), record + zeroBasedByteOffset, sizeInBytes); + free(record); + } + else { + redisReply *reply = stRedisCommand(database->dbImpl, "GETRANGE %" PRIi64 " %" PRIi64 " %" PRIi64, + key, zeroBasedByteOffset, + zeroBasedByteOffset + sizeInBytes - 1); + assert(reply->type == REDIS_REPLY_STRING); + if (reply->len != sizeInBytes) { + int64_t len = reply->len; + freeReplyObject(reply); + stThrowNew(ST_KV_DATABASE_EXCEPTION_ID, "partial read of key %lld, expected %lld bytes got %lld bytes", key, sizeInBytes, len); + } + partialRecord = malloc(reply->len * sizeof(char)); + memcpy(partialRecord, reply->str, reply->len); freeReplyObject(reply); - stThrowNew(ST_KV_DATABASE_EXCEPTION_ID, "partial read of key %lld, expected %lld bytes got %lld bytes", key, sizeInBytes, len); } - char *data = malloc(reply->len * sizeof(char)); - memcpy(data, reply->str, reply->len); - freeReplyObject(reply); - return data; -} - -static void removeRecord(stKVDatabase *database, int64_t key) { - redisReply *reply = stRedisCommand(database->dbImpl, "DEL %" PRIi64, key); - freeReplyObject(reply); + return partialRecord; } static int64_t incrementInt64(stKVDatabase *database, int64_t key, int64_t incrementAmount) { @@ -193,17 +339,26 @@ static int64_t incrementInt64(stKVDatabase *database, int64_t key, int64_t incre } static void bulkRemoveRecords(stKVDatabase *database, stList *records) { - RedisDB *db = database->dbImpl; + RedisDB* db = database->dbImpl; + fillSplitRecordsCache(db); + int64_t numberOfSplitRecords = 0; redisAppendCommand(db->ctxt, "MULTI"); for(int64_t i = 0; i < stList_length(records); i++) { - stIntTuple *box = stList_get(records, i); - redisAppendCommand(db->ctxt, "DEL %" PRIi64, stIntTuple_get(box, 0)); + int64_t key = stIntTuple_get((stIntTuple *)stList_get(records, i), 0); + if (recordIsSplitCache(database, key) == true) { + if (getenv("ST_SPLIT_RECORD_DBG") != NULL) { + fprintf(stderr, "removing split record %" PRIi64 "\n", key); + } + redisAppendCommand(db->ctxt, "SREM %s %b", SPLIT_RECORDS_KEY, (char *) &key, sizeof(int64_t)); + numberOfSplitRecords += 1; + } + redisAppendCommand(db->ctxt, "DEL %" PRIi64, key); } redisAppendCommand(db->ctxt, "EXEC"); redisReply *reply; reply = stRedisGetReply(db); freeReplyObject(reply); - for(int64_t i = 0; i < stList_length(records); i++) { + for(int64_t i = 0; i < (stList_length(records) + numberOfSplitRecords); i++) { reply = stRedisGetReply(db); freeReplyObject(reply); } @@ -218,30 +373,50 @@ static void setRecord(stKVDatabase *database, int64_t key, static stList *bulkGetRecords(stKVDatabase *database, stList* keys) { RedisDB *db = database->dbImpl; + fillSplitRecordsCache(db); + stList* results = stList_construct3(stList_length(keys), + (void(*)(void *))stKVDatabaseBulkResult_destruct); + + // Get split records for (int64_t i = 0; i < stList_length(keys); i++) { int64_t key = *((int64_t *) stList_get(keys, i)); - redisAppendCommand(db->ctxt, "GET %" PRIi64, key); + if (recordIsSplitCache(database, key) == true) { + void *record; + int64_t length; + record = getRecord2(database, key, &length); + stKVDatabaseBulkResult* result = stKVDatabaseBulkResult_construct(record, length); + stList_set(results, i, result); + } } - stList* results = stList_construct3(stList_length(keys), - (void(*)(void *))stKVDatabaseBulkResult_destruct); + // Pipeline GET commands for not-split records for(int64_t i = 0; i < stList_length(keys); i++) { - redisReply *reply = stRedisGetReply(db); - void *record; - int64_t length; - if (reply->type == REDIS_REPLY_NIL) { - // No record found - record = NULL; - length = 0; - } else { - // Found a record - assert(reply->type == REDIS_REPLY_STRING); - record = malloc(reply->len); - memcpy(record, reply->str, reply->len); - length = reply->len; - } - stKVDatabaseBulkResult* result = stKVDatabaseBulkResult_construct(record, length); - stList_set(results, i, result); - freeReplyObject(reply); + int64_t key = *((int64_t *) stList_get(keys, i)); + if (recordIsSplitCache(database, key) == false) { + redisAppendCommand(db->ctxt, "GET %" PRIi64, key); + } + } + // Get not-split records + for(int64_t i = 0; i < stList_length(keys); i++) { + int64_t key = *((int64_t *) stList_get(keys, i)); + if (recordIsSplitCache(database, key) == false) { + void *record; + int64_t length; + redisReply *reply = stRedisGetReply(db); + if (reply->type == REDIS_REPLY_NIL) { + // No record found + record = NULL; + length = 0; + } else { + // Found a record + assert(reply->type == REDIS_REPLY_STRING); + record = malloc(reply->len); + memcpy(record, reply->str, reply->len); + length = reply->len; + } + freeReplyObject(reply); + stKVDatabaseBulkResult* result = stKVDatabaseBulkResult_construct(record, length); + stList_set(results, i, result); + } } return results; } @@ -263,6 +438,17 @@ static void bulkSetRecords(stKVDatabase *database, stList *records) { RedisDB *db = database->dbImpl; stKVDatabaseConf* conf = stKVDatabase_getConf(database); int64_t maxBulkSetSize = stKVDatabaseConf_getMaxRedisBulkSetSize(conf); + int64_t maxRecordSize = stKVDatabaseConf_getMaxRedisRecordSize(conf); + for(int64_t i = 0; i < stList_length(records); i++) { + stKVDatabaseBulkRequest *request = stList_get(records, i); + if (request->size > maxRecordSize) { + // Set large records without MULTI + setRecord(database, request->key, request->value, request->size); + } + else { + removeSplitRecordIfPresent(database, request->key); + } + } int64_t runningSize = 0; int64_t countAddedRecords = 0; redisAppendCommand(db->ctxt, "MULTI"); @@ -283,9 +469,11 @@ static void bulkSetRecords(stKVDatabase *database, stList *records) { countAddedRecords = 0; redisAppendCommand(db->ctxt, "MULTI"); } - redisAppendCommand(db->ctxt, "SET %" PRIi64 " %b", request->key, request->value, request->size); - runningSize += request->size; - countAddedRecords += 1; + if (request->size <= maxRecordSize) { + redisAppendCommand(db->ctxt, "SET %" PRIi64 " %b", request->key, request->value, request->size); + runningSize += request->size; + countAddedRecords += 1; + } } redisAppendCommand(db->ctxt, "EXEC"); redisReply *reply; diff --git a/C/inc/sonLibKVDatabaseConf.h b/C/inc/sonLibKVDatabaseConf.h index 7192ee7..412c009 100644 --- a/C/inc/sonLibKVDatabaseConf.h +++ b/C/inc/sonLibKVDatabaseConf.h @@ -38,7 +38,8 @@ stKVDatabaseConf *stKVDatabaseConf_constructKyotoTycoon(const char *host, unsign * Construct a new database configuration object for a Redis * database remote object. */ -stKVDatabaseConf *stKVDatabaseConf_constructRedis(const char *host, unsigned port, int64_t maxBulkSetSize); +stKVDatabaseConf *stKVDatabaseConf_constructRedis(const char *host, unsigned port, + int64_t maxRecordSize, int64_t maxBulkSetSize); /* * Construct a new database configuration object for a MySql database. @@ -103,6 +104,13 @@ int64_t stKVDatabaseConf_getMaxKTBulkSetSize(stKVDatabaseConf *conf); /* get the maximum number of records in kyoto tycoon bulk set */ int64_t stKVDatabaseConf_getMaxKTBulkSetNumRecords(stKVDatabaseConf *conf); +/* get the maximum size in bytes of a redis record */ +int64_t stKVDatabaseConf_getMaxRedisRecordSize(stKVDatabaseConf *conf); + +/* set the maximum size in bytes of a redis record */ +void stKVDatabaseConf_setMaxRedisRecordSize(stKVDatabaseConf *conf, + int64_t maxRecordSize); + /* get the maximum size in bytes of a redis bulk set */ int64_t stKVDatabaseConf_getMaxRedisBulkSetSize(stKVDatabaseConf *conf); diff --git a/C/tests/kvDatabaseTestCommon.c b/C/tests/kvDatabaseTestCommon.c index eccf3c3..d5502d0 100644 --- a/C/tests/kvDatabaseTestCommon.c +++ b/C/tests/kvDatabaseTestCommon.c @@ -58,7 +58,8 @@ stKVDatabaseConf *kvDatabaseTestParseOptions(int argc, char *const *argv, const {"timeout", required_argument, NULL, 'i'}, {"maxKTRecordSize", required_argument, NULL, 'r'}, {"maxKTBulkSetSize", required_argument, NULL, 'b'}, - {"maxRedisBulkSetSize", required_argument, NULL, 'R'}, + {"maxRedisRecordSize", required_argument, NULL, 'R'}, + {"maxRedisBulkSetSize", required_argument, NULL, 'B'}, {"user", required_argument, NULL, 'u'}, {"pass", required_argument, NULL, 'p'}, {"name", required_argument, NULL, 'n'}, @@ -73,12 +74,13 @@ stKVDatabaseConf *kvDatabaseTestParseOptions(int argc, char *const *argv, const int64_t optMaxKTRecordSize = (int64_t) 1U << 27; int64_t optMaxKTBulkSetSize = (int64_t) 1U << 27; int64_t optMaxKTBulkSetNumRecords = (int64_t) 1U << 27; + int64_t optMaxRedisRecordSize = (int64_t) 1U << 28; int64_t optMaxRedisBulkSetSize = (int64_t) 1U << 30; const char *optUser = NULL; const char *optPass = NULL; const char *optName = NULL; int optKey, optIndex; - while ((optKey = getopt_long(argc, argv, "t:d:H:P:i:r:b:R:u:p:h", longOptions, &optIndex)) >= 0) { + while ((optKey = getopt_long(argc, argv, "t:d:H:P:i:r:b:R:B:u:p:h", longOptions, &optIndex)) >= 0) { switch (optKey) { case 't': optType = parseDbType(optarg); @@ -102,6 +104,9 @@ stKVDatabaseConf *kvDatabaseTestParseOptions(int argc, char *const *argv, const optMaxKTBulkSetSize = stSafeStrToInt64(optarg); break; case 'R': + optMaxRedisRecordSize = stSafeStrToInt64(optarg); + break; + case 'B': optMaxRedisBulkSetSize = stSafeStrToInt64(optarg); break; case 'u': @@ -150,7 +155,7 @@ stKVDatabaseConf *kvDatabaseTestParseOptions(int argc, char *const *argv, const conf = stKVDatabaseConf_constructMySql(optHost, 0, optUser, optPass, optDb, "cactusDbTest"); fprintf(stderr, "running MySQL sonLibKVDatabase tests\n"); } else if (optType == stKVDatabaseTypeRedis) { - conf = stKVDatabaseConf_constructRedis(optHost, optPort, optMaxRedisBulkSetSize); + conf = stKVDatabaseConf_constructRedis(optHost, optPort, optMaxRedisRecordSize, optMaxRedisBulkSetSize); fprintf(stderr, "running Redis sonLibKVDatabase tests\n"); } return conf;