Skip to content

Commit fcd2610

Browse files
committed
[CELEBORN-2174] Remove partitionSplitEnabled from ReserveSlots and FileInfo
1 parent 13ea40c commit fcd2610

25 files changed

Lines changed: 41 additions & 152 deletions

File tree

client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1320,7 +1320,6 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends
13201320
rangeReadFilter,
13211321
userIdentifier,
13221322
conf.pushDataTimeoutMs,
1323-
partitionSplitEnabled = true,
13241323
isSegmentGranularityVisible = isSegmentGranularityVisible))
13251324
futures.add((future, workerInfo))
13261325
}(ec)

common/src/main/java/org/apache/celeborn/common/meta/DiskFileInfo.java

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
import java.io.File;
2121
import java.util.ArrayList;
22-
import java.util.Arrays;
22+
import java.util.Collections;
2323

2424
import com.google.common.annotations.VisibleForTesting;
2525
import org.apache.hadoop.fs.FileSystem;
@@ -42,24 +42,22 @@ public class DiskFileInfo extends FileInfo {
4242

4343
public DiskFileInfo(
4444
UserIdentifier userIdentifier,
45-
boolean partitionSplitEnabled,
4645
FileMeta fileMeta,
4746
String filePath,
4847
StorageInfo.Type storageType) {
49-
super(userIdentifier, partitionSplitEnabled, fileMeta);
48+
super(userIdentifier, fileMeta);
5049
this.filePath = filePath;
5150
this.storageType = storageType;
5251
}
5352

5453
// only called when restore from pb or in UT
5554
public DiskFileInfo(
5655
UserIdentifier userIdentifier,
57-
boolean partitionSplitEnabled,
5856
FileMeta fileMeta,
5957
String filePath,
6058
StorageInfo.Type storageType,
6159
long bytesFlushed) {
62-
super(userIdentifier, partitionSplitEnabled, fileMeta);
60+
super(userIdentifier, fileMeta);
6361
this.filePath = filePath;
6462
if (storageType != null) {
6563
this.storageType = storageType;
@@ -73,14 +71,13 @@ public DiskFileInfo(
7371
public DiskFileInfo(File file, UserIdentifier userIdentifier, CelebornConf conf) {
7472
this(
7573
userIdentifier,
76-
true,
77-
new ReduceFileMeta(new ArrayList<>(Arrays.asList(0L)), conf.shuffleChunkSize()),
74+
new ReduceFileMeta(new ArrayList<>(Collections.singletonList(0L)), conf.shuffleChunkSize()),
7875
file.getAbsolutePath(),
7976
StorageInfo.Type.HDD);
8077
}
8178

8279
public DiskFileInfo(UserIdentifier userIdentifier, FileMeta fileMeta, String filePath) {
83-
super(userIdentifier, true, fileMeta);
80+
super(userIdentifier, fileMeta);
8481
this.filePath = filePath;
8582
this.storageType = StorageInfo.Type.HDD;
8683
}

common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -24,17 +24,13 @@
2424

2525
public abstract class FileInfo {
2626
private final UserIdentifier userIdentifier;
27-
// whether to split is decided by client side.
28-
// now it's just used for mappartition to compatible with old client which can't support split
29-
private boolean partitionSplitEnabled;
3027
protected FileMeta fileMeta;
3128
protected final Set<Long> streams = ConcurrentHashMap.newKeySet();
3229
protected volatile long bytesFlushed;
3330
private boolean isReduceFileMeta;
3431

35-
public FileInfo(UserIdentifier userIdentifier, boolean partitionSplitEnabled, FileMeta fileMeta) {
32+
public FileInfo(UserIdentifier userIdentifier, FileMeta fileMeta) {
3633
this.userIdentifier = userIdentifier;
37-
this.partitionSplitEnabled = partitionSplitEnabled;
3834
this.fileMeta = fileMeta;
3935
this.isReduceFileMeta = fileMeta instanceof ReduceFileMeta;
4036
}
@@ -67,14 +63,6 @@ public UserIdentifier getUserIdentifier() {
6763
return userIdentifier;
6864
}
6965

70-
public boolean isPartitionSplitEnabled() {
71-
return partitionSplitEnabled;
72-
}
73-
74-
public void setPartitionSplitEnabled(boolean partitionSplitEnabled) {
75-
this.partitionSplitEnabled = partitionSplitEnabled;
76-
}
77-
7866
public boolean addStream(long streamId) {
7967
if (!isReduceFileMeta) {
8068
throw new IllegalStateException("In addStream, filemeta cannot be MapFileMeta");

common/src/main/java/org/apache/celeborn/common/meta/MemoryFileInfo.java

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -32,18 +32,13 @@ public class MemoryFileInfo extends FileInfo {
3232
private CompositeByteBuf sortedBuffer;
3333
private Map<Integer, List<ShuffleBlockInfo>> sortedIndexes;
3434

35-
public MemoryFileInfo(
36-
UserIdentifier userIdentifier, boolean partitionSplitEnabled, FileMeta fileMeta) {
37-
super(userIdentifier, partitionSplitEnabled, fileMeta);
35+
public MemoryFileInfo(UserIdentifier userIdentifier, FileMeta fileMeta) {
36+
super(userIdentifier, fileMeta);
3837
}
3938

4039
// This constructor is only used in partition sorter for temp new memory file
41-
public MemoryFileInfo(
42-
UserIdentifier userIdentifier,
43-
boolean partitionSplitEnabled,
44-
FileMeta fileMeta,
45-
CompositeByteBuf buffer) {
46-
super(userIdentifier, partitionSplitEnabled, fileMeta);
40+
public MemoryFileInfo(UserIdentifier userIdentifier, FileMeta fileMeta, CompositeByteBuf buffer) {
41+
super(userIdentifier, fileMeta);
4742
this.buffer = buffer;
4843
}
4944

common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -481,7 +481,6 @@ object ControlMessages extends Logging {
481481
rangeReadFilter: Boolean,
482482
userIdentifier: UserIdentifier,
483483
pushDataTimeout: Long,
484-
partitionSplitEnabled: Boolean = false,
485484
isSegmentGranularityVisible: Boolean = false)
486485
extends WorkerMessage
487486

@@ -960,7 +959,6 @@ object ControlMessages extends Logging {
960959
rangeReadFilter,
961960
userIdentifier,
962961
pushDataTimeout,
963-
partitionSplitEnabled,
964962
isSegmentGranularityVisible) =>
965963
val payload = PbReserveSlots.newBuilder()
966964
.setApplicationId(applicationId)
@@ -973,7 +971,6 @@ object ControlMessages extends Logging {
973971
.setRangeReadFilter(rangeReadFilter)
974972
.setUserIdentifier(PbSerDeUtils.toPbUserIdentifier(userIdentifier))
975973
.setPushDataTimeout(pushDataTimeout)
976-
.setPartitionSplitEnabled(partitionSplitEnabled)
977974
.setIsSegmentGranularityVisible(isSegmentGranularityVisible)
978975
.build().toByteArray
979976
new TransportMessage(MessageType.RESERVE_SLOTS, payload)
@@ -1438,7 +1435,6 @@ object ControlMessages extends Logging {
14381435
pbReserveSlots.getRangeReadFilter,
14391436
userIdentifier,
14401437
pbReserveSlots.getPushDataTimeout,
1441-
pbReserveSlots.getPartitionSplitEnabled,
14421438
pbReserveSlots.getIsSegmentGranularityVisible)
14431439

14441440
case RESERVE_SLOTS_RESPONSE_VALUE =>

common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,6 @@ object PbSerDeUtils {
128128
}
129129
new DiskFileInfo(
130130
userIdentifier,
131-
pbFileInfo.getPartitionSplitEnabled,
132131
meta,
133132
pbFileInfo.getFilePath,
134133
storageType,
@@ -153,21 +152,20 @@ object PbSerDeUtils {
153152
.setFilePath(fileInfo.getFilePath)
154153
.setUserIdentifier(toPbUserIdentifier(fileInfo.getUserIdentifier))
155154
.setBytesFlushed(fileInfo.getFileLength)
156-
.setPartitionSplitEnabled(fileInfo.isPartitionSplitEnabled)
157155
.setStorageType(fileInfo.getStorageType.getValue)
158-
if (fileInfo.getFileMeta.isInstanceOf[MapFileMeta]) {
159-
val mapFileMeta = fileInfo.getFileMeta.asInstanceOf[MapFileMeta]
160-
builder.setPartitionType(PartitionType.MAP.getValue)
161-
builder.setBufferSize(mapFileMeta.getBufferSize)
162-
builder.setNumSubpartitions(mapFileMeta.getNumSubpartitions)
163-
builder.setIsSegmentGranularityVisible(mapFileMeta.isSegmentGranularityVisible)
164-
builder.putAllPartitionWritingSegment(mapFileMeta.getSubPartitionWritingSegmentId)
165-
builder.addAllSegmentIndex(
166-
mapFileMeta.getSubPartitionSegmentIndexes.asScala.map(toPbSegmentIndex).asJava)
167-
} else {
168-
val reduceFileMeta = fileInfo.getFileMeta.asInstanceOf[ReduceFileMeta]
169-
builder.setPartitionType(PartitionType.REDUCE.getValue)
170-
builder.addAllChunkOffsets(reduceFileMeta.getChunkOffsets)
156+
fileInfo.getFileMeta match {
157+
case mapFileMeta: MapFileMeta =>
158+
builder.setPartitionType(PartitionType.MAP.getValue)
159+
builder.setBufferSize(mapFileMeta.getBufferSize)
160+
builder.setNumSubpartitions(mapFileMeta.getNumSubpartitions)
161+
builder.setIsSegmentGranularityVisible(mapFileMeta.isSegmentGranularityVisible)
162+
builder.putAllPartitionWritingSegment(mapFileMeta.getSubPartitionWritingSegmentId)
163+
builder.addAllSegmentIndex(
164+
mapFileMeta.getSubPartitionSegmentIndexes.asScala.map(toPbSegmentIndex).asJava)
165+
case _ =>
166+
val reduceFileMeta = fileInfo.getFileMeta.asInstanceOf[ReduceFileMeta]
167+
builder.setPartitionType(PartitionType.REDUCE.getValue)
168+
builder.addAllChunkOffsets(reduceFileMeta.getChunkOffsets)
171169
}
172170
builder.build
173171
}

common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -77,57 +77,49 @@ class PbSerDeUtilsTest extends CelebornFunSuite {
7777

7878
val fileInfo1 = new DiskFileInfo(
7979
userIdentifier1,
80-
true,
8180
new ReduceFileMeta(chunkOffsets1, 123),
8281
file1.getAbsolutePath,
8382
StorageInfo.Type.HDD,
8483
3000L)
8584
val fileInfo2 = new DiskFileInfo(
8685
userIdentifier2,
87-
true,
8886
new ReduceFileMeta(chunkOffsets2, 123),
8987
file2.getAbsolutePath,
9088
StorageInfo.Type.SSD,
9189
6000L)
9290
val fileInfo3 = new DiskFileInfo(
9391
userIdentifier3,
94-
true,
9592
new ReduceFileMeta(chunkOffsets3, 123),
9693
file3,
9794
StorageInfo.Type.HDFS,
9895
6000L)
9996
val fileInfo4 = new DiskFileInfo(
10097
userIdentifier3,
101-
true,
10298
new ReduceFileMeta(chunkOffsets3, 123),
10399
file4,
104100
StorageInfo.Type.OSS,
105101
6000L)
106102
val fileInfo5 = new DiskFileInfo(
107103
userIdentifier3,
108-
true,
109104
new ReduceFileMeta(chunkOffsets3, 123),
110105
file5,
111106
StorageInfo.Type.S3,
112107
6000L)
113108
val fileInfo6 = new DiskFileInfo(
114109
userIdentifier3,
115-
true,
116110
new ReduceFileMeta(chunkOffsets3, 123),
117111
file6,
118112
StorageInfo.Type.S3,
119113
6000L)
120114

121115
val mapFileInfo1 = new DiskFileInfo(
122116
userIdentifier1,
123-
true,
124117
new MapFileMeta(1024, 10),
125118
file1.getAbsolutePath,
126119
StorageInfo.Type.HDD,
127120
6000L)
128121
val mapFileInfo2 = new DiskFileInfo(
129122
userIdentifier2,
130-
true,
131123
new MapFileMeta(1024, 10),
132124
file2.getAbsolutePath,
133125
StorageInfo.Type.SSD,
@@ -376,7 +368,6 @@ class PbSerDeUtilsTest extends CelebornFunSuite {
376368
.setFilePath(diskFileInfo.getFilePath)
377369
.setUserIdentifier(toPbUserIdentifier(diskFileInfo.getUserIdentifier))
378370
.setBytesFlushed(diskFileInfo.getFileLength)
379-
.setPartitionSplitEnabled(diskFileInfo.isPartitionSplitEnabled)
380371
val reduceFileMeta = diskFileInfo.getFileMeta.asInstanceOf[ReduceFileMeta]
381372
builder.setPartitionType(PartitionType.REDUCE.getValue)
382373
builder.addAllChunkOffsets(reduceFileMeta.getChunkOffsets)

worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionDataReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -455,7 +455,7 @@ public void release() {
455455
logger.debug("release reader for stream {}", streamId);
456456
// old client can't support BufferStreamEnd, so for new client it tells client that this
457457
// stream is finished.
458-
if (fileInfo.isPartitionSplitEnabled() && !errorNotified) {
458+
if (!errorNotified) {
459459
associatedChannel.writeAndFlush(
460460
new RpcRequest(
461461
TransportClient.requestId(),

worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriterContext.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ public class PartitionDataWriterContext {
3333
private final String appId;
3434
private final int shuffleId;
3535
private final UserIdentifier userIdentifier;
36-
private final boolean partitionSplitEnabled;
3736
private final String shuffleKey;
3837
private final PartitionType partitionType;
3938
private final boolean isSegmentGranularityVisible;
@@ -51,7 +50,6 @@ public PartitionDataWriterContext(
5150
int shuffleId,
5251
UserIdentifier userIdentifier,
5352
PartitionType partitionType,
54-
boolean partitionSplitEnabled,
5553
boolean isSegmentGranularityVisible) {
5654
this.splitThreshold = splitThreshold;
5755
this.partitionSplitMode = partitionSplitMode;
@@ -60,7 +58,6 @@ public PartitionDataWriterContext(
6058
this.appId = appId;
6159
this.shuffleId = shuffleId;
6260
this.userIdentifier = userIdentifier;
63-
this.partitionSplitEnabled = partitionSplitEnabled;
6461
this.partitionType = partitionType;
6562
this.shuffleKey = Utils.makeShuffleKey(appId, shuffleId);
6663
this.isSegmentGranularityVisible = isSegmentGranularityVisible;
@@ -94,10 +91,6 @@ public UserIdentifier getUserIdentifier() {
9491
return userIdentifier;
9592
}
9693

97-
public boolean isPartitionSplitEnabled() {
98-
return partitionSplitEnabled;
99-
}
100-
10194
public String getShuffleKey() {
10295
return shuffleKey;
10396
}
@@ -152,8 +145,6 @@ public String toString() {
152145
+ shuffleId
153146
+ ", userIdentifier="
154147
+ userIdentifier
155-
+ ", partitionSplitEnabled="
156-
+ partitionSplitEnabled
157148
+ ", shuffleKey='"
158149
+ shuffleKey
159150
+ '\''

worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -227,11 +227,7 @@ public FileInfo getSortedFileInfo(
227227
memoryFileInfo.getSortedBuffer(),
228228
targetBuffer,
229229
shuffleChunkSize);
230-
return new MemoryFileInfo(
231-
memoryFileInfo.getUserIdentifier(),
232-
memoryFileInfo.isPartitionSplitEnabled(),
233-
reduceFileMeta,
234-
targetBuffer);
230+
return new MemoryFileInfo(memoryFileInfo.getUserIdentifier(), reduceFileMeta, targetBuffer);
235231
} else {
236232
DiskFileInfo diskFileInfo = ((DiskFileInfo) fileInfo);
237233
String fileId = shuffleKey + "-" + fileName;

0 commit comments

Comments
 (0)