diff --git a/setup.py b/setup.py index d641c45..c0dd041 100755 --- a/setup.py +++ b/setup.py @@ -3,7 +3,7 @@ from distutils.core import setup setup( name='telephus', - version='0.8.0~beta1', + version='1.0.0beta1', description='connection pooled, low-level client API for Cassandra in Twisted python', author='brandon@faltering.com', url='http://github.com/driftx/Telephus', diff --git a/telephus/client.py b/telephus/client.py index 8093396..aa1333c 100644 --- a/telephus/client.py +++ b/telephus/client.py @@ -179,6 +179,34 @@ def remove_counter(self, key=None, column_family=None, column=None, super_column req = ManagedThriftRequest('remove_counter', key, cp, consistency) return self.manager.pushRequest(req, retries=retries) + @requirekwargs('column_family', 'mapping') + def batch_multikey_add(self, column_family=None, mapping=None, consistency=None, + retries=None): + consistency = consistency or self.consistency + mutmap = dict([(k, {column_family: self._mk_counter_cols_or_supers(v)}) + for k, v in mapping.items()]) + return self.batch_mutate_counters(mutmap, consistency=consistency, retries=retries) + + @requirekwargs('key', 'column_family', 'mapping') + def batch_add(self, key=None, column_family=None, mapping=None, consistency=None, + retries=None): + consistency = consistency or self.consistency + mutmap = {key: {column_family: self._mk_counter_cols_or_supers(mapping)}} + return self.batch_mutate_counters(mutmap, consistency=consistency, retries=retries) + + @requirekwargs('column_family', 'mapping') + def batch_multikey_insert(self, column_family=None, mapping=None, timestamp=None, + consistency=None, retries=None, ttl=None): + for value in mapping.values(): + if isinstance(value, list) and timestamp is not None: + raise RuntimeError('Timestamp cannot be specified with a list of Mutations') + timestamp = timestamp or self._time() + consistency = consistency or self.consistency + mutmap = dict([(k, {column_family: self._mk_cols_or_supers(v, timestamp, ttl)}) + for k, v in mapping.items()]) + return self.batch_mutate(mutmap, timestamp=timestamp, consistency=consistency, + retries=retries) + @requirekwargs('key', 'column_family', 'mapping') def batch_insert(self, key=None, column_family=None, mapping=None, timestamp=None, consistency=None, retries=None, ttl=None): @@ -190,6 +218,17 @@ def batch_insert(self, key=None, column_family=None, mapping=None, timestamp=Non return self.batch_mutate(mutmap, timestamp=timestamp, consistency=consistency, retries=retries) + @requirekwargs('cfmap') + def batch_remove_rows(self, cfmap=None, consistency=None, timestamp=None, retries=None): + timestamp = timestamp or self._time() + consistency = consistency or self.consistency + mutmap = defaultdict(dict) + for cf, keys in cfmap.iteritems(): + for key in keys: + mutmap[key][cf] = [Mutation(deletion=Deletion(timestamp))] + req = ManagedThriftRequest('batch_mutate', mutmap, consistency) + return self.manager.pushRequest(req, retries=retries) + @requirekwargs('cfmap') def batch_remove(self, cfmap=None, start='', finish='', count=100, names=None, reverse=False, consistency=None, timestamp=None, supercolumn=None, @@ -204,6 +243,25 @@ def batch_remove(self, cfmap=None, start='', finish='', count=100, names=None, req = ManagedThriftRequest('batch_mutate', mutmap, consistency) return self.manager.pushRequest(req, retries=retries) + @requirekwargs('mutationmap') + def batch_mutate_counters(self, mutationmap=None, consistency=None, retries=None): + consistency = consistency or self.consistency + mutmap = defaultdict(dict) + for key, cfmap in mutationmap.iteritems(): + for cf, colmap in cfmap.iteritems(): + cols_or_supers = self._mk_counter_cols_or_supers(colmap) + muts = [] + for c in cols_or_supers: + if isinstance(c, CounterSuperColumn): + muts.append(Mutation(ColumnOrSuperColumn(counter_super_column=c))) + elif isinstance(c, CounterColumn): + muts.append(Mutation(ColumnOrSuperColumn(counter_column=c))) + else: + muts.append(c) + mutmap[key][cf] = muts + req = ManagedThriftRequest('batch_mutate', mutmap, consistency) + return self.manager.pushRequest(req, retries=retries) + @requirekwargs('mutationmap') def batch_mutate(self, mutationmap=None, timestamp=None, consistency=None, retries=None, ttl=None): timestamp = timestamp or self._time() @@ -226,6 +284,25 @@ def batch_mutate(self, mutationmap=None, timestamp=None, consistency=None, retri req = ManagedThriftRequest('batch_mutate', mutmap, consistency) return self.manager.pushRequest(req, retries=retries) + def _mk_counter_cols_or_supers(self, mapping): + if isinstance(mapping, list): + return mapping + colsorsupers = [] + if isinstance(mapping, dict): + first = mapping.keys()[0] + if isinstance(mapping[first], dict): + for name in mapping: + cols = [] + for col,val in mapping[name].iteritems(): + cols.append(CounterColumn(col, val)) + colsorsupers.append(CounterSuperColumn(name=name, columns=cols)) + else: + for col, val in mapping.iteritems(): + colsorsupers.append(CounterColumn(col, val)) + else: + raise TypeError('dict (of dicts) or list of CounterColumn/CounterSuperColumn expected') + return colsorsupers + def _mk_cols_or_supers(self, mapping, timestamp, ttl=None, make_deletions=False): if isinstance(mapping, list): return mapping diff --git a/telephus/translate.py b/telephus/translate.py index a74a2d2..b25c56c 100644 --- a/telephus/translate.py +++ b/telephus/translate.py @@ -2,8 +2,8 @@ from telephus.cassandra.c08.constants import VERSION as CASSANDRA_08_VERSION supported_versions = ( - ('0.7', CASSANDRA_07_VERSION), ('0.8', CASSANDRA_08_VERSION), + ('0.7', CASSANDRA_07_VERSION), ) class APIMismatch(Exception): diff --git a/test/test_cassandraclient.py b/test/test_cassandraclient.py index cdcc3a6..1341553 100644 --- a/test/test_cassandraclient.py +++ b/test/test_cassandraclient.py @@ -114,6 +114,27 @@ def test_insert_get(self): res = yield self.client.get('test2', SCF, column=COLUMN, super_column=SCOLUMN) self.assertEqual(res.column.value, 'superval2') + @defer.inlineCallbacks + def test_batch_remove_rows(self): + yield self.client.insert('test', CF, 'testval', column=COLUMN) + yield self.client.insert('test2', CF, 'testval2', column=COLUMN) + yield self.client.batch_remove_rows({CF:["test", "test2"]}) + res = yield self.client.multiget(['test', 'test2'], CF, column=COLUMN) + self.assertEqual(len(res['test']), 0) + self.assertEqual(len(res['test2']), 0) + + @defer.inlineCallbacks + def test_batch_multikey_insert_get_slice_and_count(self): + mapping = {'testA':{COLUMN: 'column1A', COLUMN2: 'column2A'}, + 'testB':{COLUMN: 'column1B', COLUMN2: 'column2B'}} + yield self.client.batch_multikey_insert(CF, mapping) + res = yield self.client.get_slice('testA', CF, names=(COLUMN, COLUMN2)) + self.assertEqual(res[0].column.value, 'column1A') + self.assertEqual(res[1].column.value, 'column2A') + res = yield self.client.get_slice('testB', CF, names=(COLUMN, COLUMN2)) + self.assertEqual(res[0].column.value, 'column1B') + self.assertEqual(res[1].column.value, 'column2B') + @defer.inlineCallbacks def test_batch_insert_get_slice_and_count(self): yield self.client.batch_insert('test', CF, @@ -198,6 +219,52 @@ def test_indexed_slices(self): res = yield self.client.get_indexed_slices(IDX_CF, expressions, start_key='') self.assertEquals(res[0].columns[0].column.value,'two') + @defer.inlineCallbacks + def test_counter_batch_multikey_add(self): + if self.version != CASSANDRA_08_VERSION: + raise unittest.SkipTest('Counters are not supported in 0.7') + mapping = { + "keyA":{"col1A":1, "col2A":5}, + "keyB":{"col1B":2, "col2B":3}} + yield self.client.batch_multikey_add(COUNTER_CF, mapping) + res = yield self.client.get('keyA', COUNTER_CF, column='col1A') + self.assertEquals(res.counter_column.value, 1) + res = yield self.client.get('keyA', COUNTER_CF, column='col2A') + self.assertEquals(res.counter_column.value, 5) + res = yield self.client.get('keyB', COUNTER_CF, column='col1B') + self.assertEquals(res.counter_column.value, 2) + res = yield self.client.get('keyB', COUNTER_CF, column='col2B') + self.assertEquals(res.counter_column.value, 3) + mapping = { + "keyA":{"col1A":1, "col2A":1}, + "keyB":{"col1B":1, "col2B":1}} + yield self.client.batch_multikey_add(COUNTER_CF, mapping) + res = yield self.client.get('keyA', COUNTER_CF, column='col1A') + self.assertEquals(res.counter_column.value, 2) + res = yield self.client.get('keyA', COUNTER_CF, column='col2A') + self.assertEquals(res.counter_column.value, 6) + res = yield self.client.get('keyB', COUNTER_CF, column='col1B') + self.assertEquals(res.counter_column.value, 3) + res = yield self.client.get('keyB', COUNTER_CF, column='col2B') + self.assertEquals(res.counter_column.value, 4) + + @defer.inlineCallbacks + def test_counter_batch_add(self): + if self.version != CASSANDRA_08_VERSION: + raise unittest.SkipTest('Counters are not supported in 0.7') + mapping = {"col1":1, "col2":5} + yield self.client.batch_add('test', COUNTER_CF, mapping) + res = yield self.client.get('test', COUNTER_CF, column='col1') + self.assertEquals(res.counter_column.value, 1) + res = yield self.client.get('test', COUNTER_CF, column='col2') + self.assertEquals(res.counter_column.value, 5) + mapping = {"col1":1, "col2":1} + yield self.client.batch_add('test', COUNTER_CF, mapping) + res = yield self.client.get('test', COUNTER_CF, column='col1') + self.assertEquals(res.counter_column.value, 2) + res = yield self.client.get('test', COUNTER_CF, column='col2') + self.assertEquals(res.counter_column.value, 6) + @defer.inlineCallbacks def test_counter_add(self): if self.version != CASSANDRA_08_VERSION: