diff --git a/akkeeper/src/main/resources/reference.conf b/akkeeper/src/main/resources/reference.conf index 9b84f7c..f05126f 100644 --- a/akkeeper/src/main/resources/reference.conf +++ b/akkeeper/src/main/resources/reference.conf @@ -121,7 +121,7 @@ akkeeper { servers = "" # The initial amount of time to wait between retries. - connection-interval-ms = 3000 + connection-interval-ms = 500 # The maximum number of times to retry. max-retries = 8 diff --git a/akkeeper/src/main/scala/akkeeper/container/ContainerInstanceMain.scala b/akkeeper/src/main/scala/akkeeper/container/ContainerInstanceMain.scala index 86dec11..09a87e5 100644 --- a/akkeeper/src/main/scala/akkeeper/container/ContainerInstanceMain.scala +++ b/akkeeper/src/main/scala/akkeeper/container/ContainerInstanceMain.scala @@ -83,7 +83,7 @@ object ContainerInstanceMain extends App with ContainerDefinitionJsonProtocol { val actorSystem = ActorSystem(instanceConfig.akkeeperAkka.actorSystemName, instanceConfig) val zkConfig = ZookeeperClientConfig.fromConfig(actorSystem.settings.config.zookeeper) - val instanceStorage = InstanceStorageFactory(zkConfig.child(instanceArgs.appId)) + val instanceStorage = InstanceStorageFactory(zkConfig.child(instanceArgs.appId), actorSystem) val actorsJsonStr = Source.fromFile(instanceArgs.actors).getLines().mkString("\n") val actors = actorsJsonStr.parseJson.convertTo[Seq[ActorLaunchContext]] diff --git a/akkeeper/src/main/scala/akkeeper/master/MasterRunner.scala b/akkeeper/src/main/scala/akkeeper/master/MasterRunner.scala index 859f5fd..f571d56 100644 --- a/akkeeper/src/main/scala/akkeeper/master/MasterRunner.scala +++ b/akkeeper/src/main/scala/akkeeper/master/MasterRunner.scala @@ -68,7 +68,7 @@ private[master] class YarnMasterRunner extends MasterRunner { private def createInstanceStorage(actorSystem: ActorSystem, appId: String): InstanceStorage = { val zkConfig = ZookeeperClientConfig.fromConfig(actorSystem.settings.config.zookeeper) - InstanceStorageFactory(zkConfig.child(appId)) + InstanceStorageFactory((zkConfig.child(appId),actorSystem)) } private def createDeployClient(actorSystem: ActorSystem, diff --git a/akkeeper/src/main/scala/akkeeper/storage/InstanceStorage.scala b/akkeeper/src/main/scala/akkeeper/storage/InstanceStorage.scala index 0c815cf..4645dc9 100644 --- a/akkeeper/src/main/scala/akkeeper/storage/InstanceStorage.scala +++ b/akkeeper/src/main/scala/akkeeper/storage/InstanceStorage.scala @@ -15,6 +15,7 @@ */ package akkeeper.storage +import akka.actor.ActorSystem import akkeeper.api._ import akkeeper.storage.zookeeper.ZookeeperClientConfig import akkeeper.storage.zookeeper.async.ZookeeperInstanceStorage @@ -62,14 +63,15 @@ private[akkeeper] trait InstanceStorageFactory[T] extends (T => InstanceStorage) private[akkeeper] object InstanceStorageFactory { implicit object ZookeeperInstanceStorageFactory - extends InstanceStorageFactory[ZookeeperClientConfig] { + extends InstanceStorageFactory[(ZookeeperClientConfig, ActorSystem)] { - override def apply(config: ZookeeperClientConfig): InstanceStorage = { - new ZookeeperInstanceStorage(config.child("instances")) + override def apply(ctr:(ZookeeperClientConfig,ActorSystem)): InstanceStorage = { + val (config, actorSystem) = ctr + new ZookeeperInstanceStorage(config.child("instances"))(actorSystem) } } - def apply[T: InstanceStorageFactory](config: T): InstanceStorage = { - implicitly[T](config) + def apply[T: InstanceStorageFactory](ctr: T): InstanceStorage = { + implicitly[T](ctr) } } diff --git a/akkeeper/src/main/scala/akkeeper/storage/zookeeper/async/ZookeeperInstanceStorage.scala b/akkeeper/src/main/scala/akkeeper/storage/zookeeper/async/ZookeeperInstanceStorage.scala index f8a38ec..65b797b 100644 --- a/akkeeper/src/main/scala/akkeeper/storage/zookeeper/async/ZookeeperInstanceStorage.scala +++ b/akkeeper/src/main/scala/akkeeper/storage/zookeeper/async/ZookeeperInstanceStorage.scala @@ -23,9 +23,13 @@ import org.apache.zookeeper.CreateMode import scala.concurrent.Future import ZookeeperInstanceStorage._ +import akka.actor.ActorSystem import akkeeper.api.InstanceId -private[akkeeper] class ZookeeperInstanceStorage(config: ZookeeperClientConfig) +import scala.concurrent.duration.{DurationInt, FiniteDuration} +import scala.util.Try + +private[akkeeper] class ZookeeperInstanceStorage(config: ZookeeperClientConfig)(implicit system: ActorSystem) extends BaseZookeeperStorage with InstanceStorage { protected override val zookeeperClient = @@ -50,16 +54,32 @@ private[akkeeper] class ZookeeperInstanceStorage(config: ZookeeperClientConfig) .map(fromBytes[InstanceInfo]) } + //akka-http default timeout is 20 seconds, so ensure that the call does not exceed it + //timeout outer future in 19 seconds override def getInstances: Future[Seq[InstanceId]] = { - val instancesFuture = for { + val t0 = System.nanoTime() + val instancesFuture: Future[Seq[Future[Seq[String]]]] = for { + //timeout containers <- zookeeperClient.children("") } yield for { container <- containers } yield zookeeperClient.children(container) - instancesFuture + val expectedExecution = instancesFuture .flatMap(f => Future.sequence(f).map(_.flatten)) .map(_.map(pathToInstanceId)) .recover(notFoundToEmptySeq[InstanceId]) + expectedExecution.onComplete { result:Try[Seq[InstanceId]] => + val t1 = System.nanoTime() + val nanoDelta = t1 - t0 + val secondsDelta = nanoDelta.toDouble / 1000000000L + val statusMsg = if(result.isSuccess) {"succeeded"} else {s"failed with ${result.failed.get}"} + if(secondsDelta >= 20.0) { + system.log.warning(s"getInstances TIMEOUTED, took ${secondsDelta} seconds and ${statusMsg}") + } else { + system.log.info(s"getInstances took ${secondsDelta} seconds and ${statusMsg}") + } + }(system.dispatcher) + expectedExecution } override def getInstancesByContainer(containerName: String): Future[Seq[InstanceId]] = { @@ -67,6 +87,7 @@ private[akkeeper] class ZookeeperInstanceStorage(config: ZookeeperClientConfig) .map(_.map(pathToInstanceId)) .recover(notFoundToEmptySeq[InstanceId]) } + } private[akkeeper] object ZookeeperInstanceStorage { diff --git a/akkeeper/src/test/scala/akkeeper/storage/zookeeper/async/ZookeeperInstanceStorageSpec.scala b/akkeeper/src/test/scala/akkeeper/storage/zookeeper/async/ZookeeperInstanceStorageSpec.scala index 14a5d69..39007ec 100644 --- a/akkeeper/src/test/scala/akkeeper/storage/zookeeper/async/ZookeeperInstanceStorageSpec.scala +++ b/akkeeper/src/test/scala/akkeeper/storage/zookeeper/async/ZookeeperInstanceStorageSpec.scala @@ -15,18 +15,23 @@ */ package akkeeper.storage.zookeeper.async -import akka.actor.Address +import akka.actor.{ActorSystem, Address} import akka.cluster.UniqueAddress +import akka.testkit.TestKit import akkeeper.AwaitMixin import akkeeper.address._ import akkeeper.api._ import akkeeper.storage._ import akkeeper.storage.zookeeper.ZookeeperClientConfig import org.apache.curator.test.{TestingServer => ZookeeperServer} -import org.scalatest.{FlatSpec, Matchers} +import org.scalatest.{BeforeAndAfterAll, FlatSpecLike, Matchers} -class ZookeeperInstanceStorageSpec extends FlatSpec - with Matchers with AwaitMixin with ZookeeperBaseSpec { +class ZookeeperInstanceStorageSpec extends TestKit(ActorSystem("instanceStorageSpec")) with FlatSpecLike + with Matchers with AwaitMixin with ZookeeperBaseSpec with BeforeAndAfterAll{ + + override def afterAll(): Unit = { + TestKit.shutdownActorSystem(system) + } private def withStorage[T](f: InstanceStorage => T): T = { val zookeeper = new ZookeeperServer() diff --git a/build.sbt b/build.sbt index 93b28e3..35db92e 100644 --- a/build.sbt +++ b/build.sbt @@ -14,10 +14,10 @@ * limitations under the License. */ -val AkkaVersion = "2.5.14" -val AkkaHttpVersion = "10.1.3" +val AkkaVersion = "2.5.31" +val AkkaHttpVersion = "10.1.13" val CuratorVersion = "2.7.1" -val SprayJsonVersion = "1.3.4" +val SprayJsonVersion = "1.3.6" val HadoopVersion = "2.8.4" val ScalaTestVersion = "3.0.5" val ScalamockVersion = "3.4.2" @@ -40,7 +40,7 @@ val TestDependencies = Seq( val CommonSettings = Seq( organization := "com.github.izeigerman", - scalaVersion := "2.12.6", + scalaVersion := "2.12.11", crossScalaVersions := Seq("2.11.11", scalaVersion.value), version := "0.4.12-SNAPSHOT",