diff --git a/summingbird-scalding-test/src/test/scala/com/twitter/summingbird/scalding/NamedOptionsSpec.scala b/summingbird-scalding-test/src/test/scala/com/twitter/summingbird/scalding/NamedOptionsSpec.scala new file mode 100644 index 000000000..e8dada573 --- /dev/null +++ b/summingbird-scalding-test/src/test/scala/com/twitter/summingbird/scalding/NamedOptionsSpec.scala @@ -0,0 +1,217 @@ +/* + Copyright 2013 Twitter, Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + +package com.twitter.summingbird.scalding + +import cascading.flow.FlowDef +import cascading.pipe.Pipe +import cascading.property.ConfigDef +import cascading.property.ConfigDef.Setter +import cascading.tuple.Fields +import com.twitter.scalding.{Test => TestMode, _} +import com.twitter.summingbird._ +import com.twitter.summingbird.batch.option.Reducers +import com.twitter.summingbird.option.MonoidIsCommutative +import org.scalatest.WordSpec + +/** + * Tests for application of named options. + */ +class NamedOptionsSpec extends WordSpec { + + private val ReducerKey = "mapred.reduce.tasks" + private val FlatMapNodeName1 = "FM1" + private val FlatMapNodeName2 = "FM2" + private val SummerNodeName1 = "SM1" + private val SummerNodeName2 = "SM2" + + private val IdentitySink = new Sink[Int] { + override def write(incoming: PipeFactory[Int]): PipeFactory[Int] = incoming + } + + implicit def timeExtractor[T <: (Int, _)] = + new TimeExtractor[T] { + override def apply(t: T) = t._1.toLong + } + + def pipeConfig(pipe: Pipe): Map[String, List[String]] = { + val configCollector = new Setter { + var config = Map.empty[String, List[String]] + override def set(key: String, value: String): String = { + if (config.contains(key)) { + config = config.updated(key, value :: config(key)) + } else { + config += key -> List(value) + } + "" + } + override def get(key: String): String = ??? + override def update(key: String, value: String): String = ??? + } + + def recurse(p: Pipe): Unit = { + val cfg = p.getStepConfigDef + if (!cfg.isEmpty) { + cfg.apply(ConfigDef.Mode.REPLACE, configCollector) + } + p.getPrevious.foreach(recurse(_)) + } + + recurse(pipe) + configCollector.config + } + + def verify[T]( + options: Map[String, Options], + expectedReducers: Int)( + jobGen: (Producer[Scalding, (Int, Int)], scalding.Store[Int, Int]) => TailProducer[Scalding, Any]) = { + + val src = Scalding.sourceFromMappable { dr => IterableSource(List.empty[(Int, Int)]) } + val store = TestStore[Int, Int]("store", TestUtil.simpleBatcher, Map.empty[Int, Int], Long.MaxValue) + val job = jobGen(src, store) + val interval = TestUtil.toTimeInterval(1L, Long.MaxValue) + + val scaldingPlatform = Scalding("named options test", options) + val mode: Mode = TestMode(t => (store.sourceToBuffer).get(t)) + + val flowToPipe = scaldingPlatform + .plan(job) + .apply((interval, mode)) + .right + .get + ._2 + + val fd = new FlowDef + val typedPipe = flowToPipe.apply((fd, mode)) + def tupleSetter[T] = new TupleSetter[T] { + override def apply(arg: T) = { + val tup = cascading.tuple.Tuple.size(1) + tup.set(0, arg) + tup + } + override def arity = 1 + } + val pipe = typedPipe.toPipe(new Fields("0"))(fd, mode, tupleSetter) + val numReducers = pipeConfig(pipe)(ReducerKey).head.toInt + assert(numReducers === expectedReducers) + } + + "The ScaldingPlatform" should { + "with same setting on multiple names use the one for the node" in { + val fmReducers = 50 + val smReducers = 100 + + val options = Map( + FlatMapNodeName1 -> Options().set(Reducers(fmReducers)), + SummerNodeName1 -> Options().set(Reducers(smReducers))) + + verify(options, smReducers) { (source, store) => + source + .flatMap(Some(_)).name(FlatMapNodeName1) + .sumByKey(store).name(SummerNodeName1) + } + } + + "use named option from the closest node when two names defined one after the other" in { + val smReducers1 = 50 + val smReducers2 = 100 + + val options = Map( + SummerNodeName1 -> Options().set(Reducers(smReducers1)), + SummerNodeName2 -> Options().set(Reducers(smReducers2))) + + verify(options, smReducers1) { (source, store) => + source + .flatMap(Some(_)) + .sumByKey(store).name(SummerNodeName1).name(SummerNodeName2) + } + } + + "use named option from the upstream node if option not defined on current node" in { + val fmReducers1 = 50 + val fmReducers2 = 100 + + val options = Map( + FlatMapNodeName1 -> Options().set(Reducers(fmReducers1)), + FlatMapNodeName2 -> Options().set(Reducers(fmReducers2))) + + verify(options, fmReducers2) { (source, store) => + source + .flatMap(Some(_)).name(FlatMapNodeName1) + .sumByKey(store).name(SummerNodeName1) + .map { case (k, (optV, v)) => k }.name(FlatMapNodeName2) + .write(IdentitySink) + } + } + + "use named option from the upstream node if option not defined on current node, even if upstream node is more than a node apart" in { + val fmReducers1 = 50 + val fmReducers2 = 100 + + val options = Map( + FlatMapNodeName1 -> Options().set(Reducers(fmReducers1)), + FlatMapNodeName2 -> Options().set(Reducers(fmReducers2))) + + verify(options, fmReducers2) { (source, store) => + source + .flatMap(Some(_)).name(FlatMapNodeName1) + .sumByKey(store).name(SummerNodeName1) + .flatMap { case (k, (optV, v)) => Some(k) } + .flatMap { k => List(k, k) }.name(FlatMapNodeName2) + .write(IdentitySink) + } + } + + "use named option from the closest upstream node if same option defined on two upstream nodes" in { + val fmReducers1 = 50 + val fmReducers2 = 100 + + val options = Map( + FlatMapNodeName1 -> Options().set(Reducers(fmReducers1)), + FlatMapNodeName2 -> Options().set(Reducers(fmReducers2))) + + verify(options, fmReducers1) { (source, store) => + source + .flatMap(Some(_)) + .sumByKey(store).name(SummerNodeName1) + .flatMap { case (k, (optV, v)) => Some(k) }.name(FlatMapNodeName1) + .flatMap { k => List(k, k) }.name(FlatMapNodeName2) + .write(IdentitySink) + } + } + + "options propagate backwards" in { + val fmReducers2 = 1000 + + /** + * Here FlatMapNodeName1 is closer to the summer node but doesn't have Reducers property + * defined so it is picked from FlatMapNodeName2. + */ + val options = Map( + FlatMapNodeName1 -> Options().set(MonoidIsCommutative), + FlatMapNodeName2 -> Options().set(Reducers(fmReducers2))) + + verify(options, fmReducers2) { (source, store) => + source + .flatMap(Some(_)) + .sumByKey(store).name(SummerNodeName1) + .flatMap { case (k, (optV, v)) => Some(k) }.name(FlatMapNodeName1) + .flatMap { k => List(k, k) }.name(FlatMapNodeName2) + .write(IdentitySink) + } + } + } +} diff --git a/summingbird-storm-test/src/test/scala/com/twitter/summingbird/storm/TopologyTests.scala b/summingbird-storm-test/src/test/scala/com/twitter/summingbird/storm/TopologyTests.scala index 225849668..9d3f3ab87 100644 --- a/summingbird-storm-test/src/test/scala/com/twitter/summingbird/storm/TopologyTests.scala +++ b/summingbird-storm-test/src/test/scala/com/twitter/summingbird/storm/TopologyTests.scala @@ -16,34 +16,20 @@ package com.twitter.summingbird.storm +import java.util.{ Map => JMap } + import backtype.storm.generated.StormTopology -import com.twitter.algebird.{ MapAlgebra, Semigroup } -import com.twitter.storehaus.{ ReadableStore, JMapStore } -import com.twitter.storehaus.algebra.MergeableStore +import com.twitter.algebird.MapAlgebra import com.twitter.summingbird._ -import com.twitter.summingbird.online._ +import com.twitter.summingbird.batch.Batcher import com.twitter.summingbird.online.option._ -import com.twitter.summingbird.storm.option._ -import com.twitter.summingbird.batch.{ BatchID, Batcher } import com.twitter.summingbird.storm.spout.TraversableSpout -import com.twitter.tormenta.spout.Spout -import com.twitter.util.Future -import java.util.{ Collections, HashMap, Map => JMap, UUID } -import java.util.concurrent.atomic.AtomicInteger -import org.scalatest.WordSpec import org.scalacheck._ -import org.scalacheck.Prop._ -import org.scalacheck.Properties +import org.scalatest.WordSpec import scala.collection.JavaConversions._ -import scala.collection.JavaConverters._ -import scala.collection.mutable.{ - ArrayBuffer, - HashMap => MutableHashMap, - Map => MutableMap, - SynchronizedBuffer, - SynchronizedMap -} +import scala.collection.mutable.{ HashMap => MutableHashMap, Map => MutableMap } + /** * Tests for Summingbird's Storm planner. */ @@ -196,4 +182,22 @@ class TopologyTests extends WordSpec { assert(TDistMap(0).get_common.get_parallelism_hint == 5) } + + "With same setting on multiple names we use the one for the node" in { + val fmNodeName = "flatMapper" + val smNodeName = "summer" + val p = Storm.source(TraversableSpout(sample[List[Int]])) + .flatMap(testFn).name(fmNodeName) + .sumByKey(TestStore.createStore[Int, Int]()._2).name(smNodeName) + + val opts = Map(fmNodeName -> Options().set(SummerParallelism(10)), + smNodeName -> Options().set(SummerParallelism(20))) + val storm = Storm.local(opts) + val stormTopo = storm.plan(p).topology + val bolts = stormTopo.get_bolts + + // Tail should use parallelism specified for the summer node + assert(bolts("Tail").get_common.get_parallelism_hint == 20) + } + } diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala index dedf2ef06..637dd59d9 100644 --- a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala +++ b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala @@ -124,7 +124,7 @@ abstract class Storm(options: Map[String, Options], transformConfig: Summingbird private type Prod[T] = Producer[Storm, T] private[storm] def get[T <: AnyRef: ClassTag](dag: Dag[Storm], node: StormNode): Option[(String, T)] = { - val producer = node.members.last + val producer = node.members.head Options.getFirst[T](options, dag.producerToPriorityNames(producer)) }