Skip to content

Commit cab1a9e

Browse files
committed
partitionByRandom seed and new function mapWithIndex
1 parent 7a7e341 commit cab1a9e

10 files changed

Lines changed: 545 additions & 28 deletions

File tree

ignis/driver/api/IDataFrame.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,10 +89,10 @@ def repartition(self, numPartitions, preserveOrdering, global_):
8989
except ignis.rpc.driver.exception.ttypes.IDriverException as ex:
9090
raise IDriverException(ex.message, ex.cause_)
9191

92-
def partitionByRandom(self, numPartitions):
92+
def partitionByRandom(self, numPartitions, seed):
9393
try:
9494
with Ignis._clientPool().getClient() as client:
95-
self._id = client.getDataFrameService().partitionByRandom(self._id, numPartitions)
95+
self._id = client.getDataFrameService().partitionByRandom(self._id, numPartitions, seed)
9696
except ignis.rpc.driver.exception.ttypes.IDriverException as ex:
9797
raise IDriverException(ex.message, ex.cause_)
9898

@@ -138,6 +138,13 @@ def keyBy(self, src):
138138
except ignis.rpc.driver.exception.ttypes.IDriverException as ex:
139139
raise IDriverException(ex.message, ex.cause_)
140140

141+
def mapWithIndex(self, src):
142+
try:
143+
with Ignis._clientPool().getClient() as client:
144+
return IDataFrame(client.getDataFrameService().mapWithIndex(self._id, ISource.wrap(src).rpc()))
145+
except ignis.rpc.driver.exception.ttypes.IDriverException as ex:
146+
raise IDriverException(ex.message, ex.cause_)
147+
141148
def mapPartitions(self, src):
142149
try:
143150
with Ignis._clientPool().getClient() as client:

ignis/executor/core/modules/IGeneralModule.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,12 @@ def keyBy(self, src):
4949
except Exception as ex:
5050
self._pack_exception(ex)
5151

52+
def mapWithIndex(self, src):
53+
try:
54+
self.__pipe_impl.mapWithIndex(self._executor_data.loadLibrary(src))
55+
except Exception as ex:
56+
self._pack_exception(ex)
57+
5258
def mapPartitions(self, src):
5359
try:
5460
self.__pipe_impl.mapPartitions(self._executor_data.loadLibrary(src))
@@ -149,9 +155,9 @@ def repartition(self, numPartitions, preserveOrdering, global_):
149155
except Exception as ex:
150156
self._pack_exception(ex)
151157

152-
def partitionByRandom(self, numPartitions):
158+
def partitionByRandom(self, numPartitions, seed):
153159
try:
154-
self.__repartition_impl.partitionByRandom(numPartitions)
160+
self.__repartition_impl.partitionByRandom(numPartitions, seed)
155161
except Exception as ex:
156162
self._pack_exception(ex)
157163

ignis/executor/core/modules/impl/IPipeImpl.py

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,26 @@ def keyBy(self, f):
9999
f.after(context)
100100
self._executor_data.setPartitions(output)
101101

102+
def mapWithIndex(self, f):
103+
context = self._executor_data.getContext()
104+
input = self._executor_data.getAndDeletePartitions()
105+
f.before(context)
106+
output = self._executor_data.getPartitionTools().newPartitionGroup(input)
107+
logger.info("General: mapWithIndex " + str(len(input)) + " partitions")
108+
109+
elems = sum([len(p) for p in input])
110+
indices = self._executor_data.mpi().native().allgather(elems)
111+
id = sum(indices[0:self._executor_data.mpi().rank()])
112+
113+
for i in range(len(input)):
114+
it = output[i].writeIterator()
115+
for elem in input[i]:
116+
it.write(f.call(id, elem, context))
117+
id += 1
118+
input[i] = None
119+
f.after(context)
120+
self._executor_data.setPartitions(output)
121+
102122
def mapPartitions(self, f):
103123
context = self._executor_data.getContext()
104124
input = self._executor_data.getAndDeletePartitions()
@@ -119,9 +139,13 @@ def mapPartitionsWithIndex(self, f):
119139
f.before(context)
120140
output = self._executor_data.getPartitionTools().newPartitionGroup(input)
121141
logger.info("General: mapPartitionsWithIndex " + str(len(input)) + " partitions")
142+
143+
indices = self._executor_data.mpi().native().allgather(len(input))
144+
offset = sum(indices[0:self._executor_data.mpi().rank()])
145+
122146
for i in range(len(input)):
123147
it = output[i].writeIterator()
124-
for elem in f.call(i, input[i].readIterator(), context):
148+
for elem in f.call(offset + i, input[i].readIterator(), context):
125149
it.write(elem)
126150
input[i] = None
127151
f.after(context)

ignis/executor/core/modules/impl/IRepartitionImpl.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -226,8 +226,9 @@ def __local_repartition(self, numPartitions):
226226

227227
self._executor_data.setPartitions(output)
228228

229-
def partitionByRandom(self, numPartitions):
230-
self.__partitionBy_impl(lambda elem, ctx: random.randint(0, numPartitions), numPartitions)
229+
def partitionByRandom(self, numPartitions, seed):
230+
r = random.Random(seed)
231+
self.__partitionBy_impl(lambda elem, ctx: r.randint(0, numPartitions), numPartitions)
231232

232233
def partitionByHash(self, numPartitions):
233234
self.__partitionBy_impl(lambda elem, ctx: hash(elem), numPartitions)

ignis/rpc/driver/dataframe/IDataFrameService-remote

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,14 @@ if len(sys.argv) <= 1 or sys.argv[1] == '--help':
3434
print(' void saveAsTextFile(IDataFrameId id, string path)')
3535
print(' void saveAsJsonFile(IDataFrameId id, string path, bool pretty)')
3636
print(' IDataFrameId repartition(IDataFrameId id, i64 numPartitions, bool preserveOrdering, bool global_)')
37-
print(' IDataFrameId partitionByRandom(IDataFrameId id, i64 numPartitions)')
37+
print(' IDataFrameId partitionByRandom(IDataFrameId id, i64 numPartitions, i32 seed)')
3838
print(' IDataFrameId partitionByHash(IDataFrameId id, i64 numPartitions)')
3939
print(' IDataFrameId partitionBy(IDataFrameId id, ISource src, i64 numPartitions)')
4040
print(' IDataFrameId map_(IDataFrameId id, ISource src)')
4141
print(' IDataFrameId filter(IDataFrameId id, ISource src)')
4242
print(' IDataFrameId flatmap(IDataFrameId id, ISource src)')
4343
print(' IDataFrameId keyBy(IDataFrameId id, ISource src)')
44+
print(' IDataFrameId mapWithIndex(IDataFrameId id, ISource src)')
4445
print(' IDataFrameId mapPartitions(IDataFrameId id, ISource src)')
4546
print(' IDataFrameId mapPartitionsWithIndex(IDataFrameId id, ISource src)')
4647
print(' IDataFrameId mapExecutor(IDataFrameId id, ISource src)')
@@ -246,10 +247,10 @@ elif cmd == 'repartition':
246247
pp.pprint(client.repartition(eval(args[0]), eval(args[1]), eval(args[2]), eval(args[3]),))
247248

248249
elif cmd == 'partitionByRandom':
249-
if len(args) != 2:
250-
print('partitionByRandom requires 2 args')
250+
if len(args) != 3:
251+
print('partitionByRandom requires 3 args')
251252
sys.exit(1)
252-
pp.pprint(client.partitionByRandom(eval(args[0]), eval(args[1]),))
253+
pp.pprint(client.partitionByRandom(eval(args[0]), eval(args[1]), eval(args[2]),))
253254

254255
elif cmd == 'partitionByHash':
255256
if len(args) != 2:
@@ -287,6 +288,12 @@ elif cmd == 'keyBy':
287288
sys.exit(1)
288289
pp.pprint(client.keyBy(eval(args[0]), eval(args[1]),))
289290

291+
elif cmd == 'mapWithIndex':
292+
if len(args) != 2:
293+
print('mapWithIndex requires 2 args')
294+
sys.exit(1)
295+
pp.pprint(client.mapWithIndex(eval(args[0]), eval(args[1]),))
296+
290297
elif cmd == 'mapPartitions':
291298
if len(args) != 2:
292299
print('mapPartitions requires 2 args')

0 commit comments

Comments
 (0)