From 06e1af8c535f457289813583dce7a9d0a705ad31 Mon Sep 17 00:00:00 2001 From: Santosh Domalapalli Date: Mon, 13 Jan 2020 15:09:33 -0500 Subject: [PATCH] Option to set max tasks per instance per datastream --- .../server/assignment/BroadcastStrategy.java | 74 ++++++++++++++++++- .../assignment/BroadcastStrategyFactory.java | 7 +- .../assignment/TestBroadcastStrategy.java | 37 ++++++++++ 3 files changed, 113 insertions(+), 5 deletions(-) diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/BroadcastStrategy.java b/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/BroadcastStrategy.java index f5e097880..2c505ac87 100644 --- a/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/BroadcastStrategy.java +++ b/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/BroadcastStrategy.java @@ -5,12 +5,14 @@ */ package com.linkedin.datastream.server.assignment; +import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.OptionalInt; import java.util.Set; import org.slf4j.Logger; @@ -22,6 +24,7 @@ import com.linkedin.datastream.server.api.strategy.AssignmentStrategy; import static com.linkedin.datastream.server.assignment.BroadcastStrategyFactory.CFG_MAX_TASKS; +import static com.linkedin.datastream.server.assignment.BroadcastStrategyFactory.CFG_MAX_TASKS_PER_INSTANCE; /** @@ -30,6 +33,9 @@ * number of instances, so each instance could process multiple tasks for the same Datastream. If "maxTasks" is not * provided, the strategy will broadcast one task to each of the instances in the cluster. * + * The "maxTasksPerInstance" setting limits the number of tasks per datastream. + * If "maxTasksPerInstance" is not provided, the strategy will not limit the number of tasks per instance + * * All the tasks are redistributed across all the instances equally. */ public class BroadcastStrategy implements AssignmentStrategy { @@ -37,6 +43,7 @@ public class BroadcastStrategy implements AssignmentStrategy { private static final Logger LOG = LoggerFactory.getLogger(BroadcastStrategy.class.getName()); private final Optional _maxTasks; + private final Optional _maxTasksPerInstance; /** * Constructor for BroadcastStrategy @@ -47,6 +54,21 @@ public class BroadcastStrategy implements AssignmentStrategy { */ public BroadcastStrategy(Optional maxTasks) { _maxTasks = maxTasks; + _maxTasksPerInstance = Optional.empty(); + } + + /** + * Constructor for BroadcastStrategy + * @param maxTasks Maximum number of {@link DatastreamTask}s to create out + * of any {@link com.linkedin.datastream.common.Datastream} + * if no value is specified for the "maxTasks" config property + * at an individual datastream level. + * @param maxTasksPerInstance Maximum number of {@link DatastreamTask}s per instance to create out + * of any {@link com.linkedin.datastream.common.Datastream} + */ + public BroadcastStrategy(Optional maxTasks, Optional maxTasksPerInstance) { + _maxTasks = maxTasks; + _maxTasksPerInstance = maxTasksPerInstance; } @Override @@ -55,9 +77,15 @@ public Map> assign(List datastreams int totalAssignedTasks = currentAssignment.values().stream().mapToInt(Set::size).sum(); LOG.info("Assigning {} datastreams to {} instances with {} tasks", datastreams.size(), instances.size(), - totalAssignedTasks); + totalAssignedTasks); + + // if there are no live instances, return empty assignment + if (instances.isEmpty()) { + return new HashMap<>(); + } Map> newAssignment = new HashMap<>(); + Integer[] assignmentCountForDatastream = new Integer[instances.size()]; // Make a copy of the current assignment, since the strategy modifies it during calculation Map> currentAssignmentCopy = new HashMap<>(currentAssignment.size()); @@ -71,6 +99,11 @@ public Map> assign(List datastreams int instancePos = 0; for (DatastreamGroup dg : datastreams) { int numTasks = getNumTasks(dg, instances.size()); + Optional maxTasksPerInstance = getMaxNumTasksPerInstance(dg); + + // initialize the assignment counts on each datastream + Arrays.fill(assignmentCountForDatastream, 0); + for (int taskPos = 0; taskPos < numTasks; taskPos++) { String instance = instances.get(instancePos); @@ -82,9 +115,15 @@ public Map> assign(List datastreams currentAssignmentCopy.get(instance).remove(foundDatastreamTask); newAssignment.get(instance).add(foundDatastreamTask); - - // Move to the next instance - instancePos = (instancePos + 1) % instances.size(); + assignmentCountForDatastream[instancePos]++; + + // Move to the next instance or the next datastream if there is no available capacity + Optional nextPos = getNextInstanceWithCapacity(instances, assignmentCountForDatastream, maxTasksPerInstance, instancePos); + if (nextPos.isPresent()) { + instancePos = nextPos.get(); + } else { + break; + } } } @@ -93,6 +132,21 @@ public Map> assign(List datastreams return newAssignment; } + private Optional getNextInstanceWithCapacity(List instances, Integer[] assignmentCountForDatastream, + Optional maxTasksPerInstance, int prevPos) { + int pos = (prevPos + 1) % instances.size(); + + if (!maxTasksPerInstance.isPresent()) { + return Optional.of(pos); + } + + while (assignmentCountForDatastream[pos] >= maxTasksPerInstance.get() && pos != prevPos) { + pos = (pos + 1) % instances.size(); + } + + return assignmentCountForDatastream[pos] < maxTasksPerInstance.get() ? Optional.of(pos) : Optional.empty(); + } + private int getNumTasks(DatastreamGroup dg, int numInstances) { // Look for an override in any of the datastream. In the case of multiple overrides, select the largest. // If no override is present then use the default "_maxTasks" from config. @@ -105,4 +159,16 @@ private int getNumTasks(DatastreamGroup dg, int numInstances) { .max() .orElse(_maxTasks.orElse(numInstances)); } + + private Optional getMaxNumTasksPerInstance(DatastreamGroup dg) { + // Look for an override in any of the datastream. In the case of multiple overrides, select the largest. + // If no override is present then use the default "_maxTasksPerInstance" from config. + OptionalInt overrideValue = dg.getDatastreams() + .stream() + .map(ds -> ds.getMetadata().get(CFG_MAX_TASKS_PER_INSTANCE)) + .filter(Objects::nonNull) + .mapToInt(Integer::valueOf) + .max(); + return overrideValue.isPresent() ? Optional.of(overrideValue.getAsInt()) : _maxTasksPerInstance; + } } diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/BroadcastStrategyFactory.java b/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/BroadcastStrategyFactory.java index d5c2eb3be..58a9ee614 100644 --- a/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/BroadcastStrategyFactory.java +++ b/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/BroadcastStrategyFactory.java @@ -19,12 +19,17 @@ public class BroadcastStrategyFactory implements AssignmentStrategyFactory { // the number of datastream tasks to create for a datastream public static final String CFG_MAX_TASKS = "maxTasks"; + public static final String CFG_MAX_TASKS_PER_INSTANCE = "maxTasksPerInstance"; @Override public AssignmentStrategy createStrategy(Properties assignmentStrategyProperties) { VerifiableProperties props = new VerifiableProperties(assignmentStrategyProperties); int cfgMaxTasks = props.getInt(CFG_MAX_TASKS, Integer.MIN_VALUE); Optional maxTasks = cfgMaxTasks > 0 ? Optional.of(cfgMaxTasks) : Optional.empty(); - return new BroadcastStrategy(maxTasks); + + int cfgMaxTasksPerInstance = props.getInt(CFG_MAX_TASKS_PER_INSTANCE, Integer.MIN_VALUE); + Optional maxTasksPerInstance = cfgMaxTasksPerInstance > 0 ? Optional.of(cfgMaxTasksPerInstance) : Optional.empty(); + + return maxTasksPerInstance.isPresent() ? new BroadcastStrategy(maxTasks, maxTasksPerInstance) : new BroadcastStrategy(maxTasks); } } diff --git a/datastream-server/src/test/java/com/linkedin/datastream/server/assignment/TestBroadcastStrategy.java b/datastream-server/src/test/java/com/linkedin/datastream/server/assignment/TestBroadcastStrategy.java index b907a01dc..b4f776eb5 100644 --- a/datastream-server/src/test/java/com/linkedin/datastream/server/assignment/TestBroadcastStrategy.java +++ b/datastream-server/src/test/java/com/linkedin/datastream/server/assignment/TestBroadcastStrategy.java @@ -29,6 +29,7 @@ import com.linkedin.datastream.testutil.DatastreamTestUtils; import static com.linkedin.datastream.server.assignment.BroadcastStrategyFactory.CFG_MAX_TASKS; +import static com.linkedin.datastream.server.assignment.BroadcastStrategyFactory.CFG_MAX_TASKS_PER_INSTANCE; /** @@ -106,6 +107,42 @@ private void doTestMaxTasks(BroadcastStrategy strategy, int numInstances, int ex Assert.assertEquals(totalTasks, expectedTotalTasks); } + @Test + public void testMaxTasksPerInstance() { + int numDatastreams = 10; + int numInstances = 20; + int maxTasks = 400; + int maxTasksPerInstance = 2; + int expectedTotalTasks = numInstances * maxTasksPerInstance * numDatastreams; + List datastreams = generateDatastreams("ds", numDatastreams); + doTestMaxTasksPerInstance(new BroadcastStrategy(Optional.of(maxTasks), Optional.of(maxTasksPerInstance)), numInstances, expectedTotalTasks, datastreams); + } + + @Test + public void testMaxTasksPerInstanceDatastreamOverride() { + int numDatastreams = 25; + int numInstances = 4; + int maxTasks = 400; + int maxTasksPerInstance = 2; + List datastreams = generateDatastreams("ds", numDatastreams); + datastreams.get(0).getDatastreams().get(0).getMetadata().put(CFG_MAX_TASKS_PER_INSTANCE, "4"); + int expectedTotalTasks = (numInstances * maxTasksPerInstance * (numDatastreams - 1)) + (numInstances * 4); + doTestMaxTasksPerInstance(new BroadcastStrategy(Optional.of(maxTasks), Optional.of(maxTasksPerInstance)), numInstances, expectedTotalTasks, datastreams); + } + + private void doTestMaxTasksPerInstance(BroadcastStrategy strategy, int numInstances, int expectedTotalTasks, + List datastreams) { + String[] instances = IntStream.range(0, numInstances).mapToObj(x -> "instance" + x).toArray(String[]::new); + Map> assignment = + strategy.assign(datastreams, Arrays.asList(instances), new HashMap<>()); + + int totalTasks = 0; + for (String instance : instances) { + totalTasks += assignment.get(instance).size(); + } + Assert.assertEquals(totalTasks, expectedTotalTasks); + } + private List generateDatastreams(String namePrefix, int numberOfDatastreams) { List datastreams = new ArrayList<>(); String type = DummyConnector.CONNECTOR_TYPE;