From cb2f46f034e971d5eea93e1b033ca578a1b13c51 Mon Sep 17 00:00:00 2001 From: Marek Zebrowski Date: Mon, 21 Dec 2020 15:30:22 +0100 Subject: [PATCH 1/2] timeout handling for get instances SA-25112 --- akkeeper/src/main/resources/reference.conf | 2 +- .../container/ContainerInstanceMain.scala | 2 +- .../scala/akkeeper/master/MasterRunner.scala | 2 +- .../akkeeper/storage/InstanceStorage.scala | 12 +++--- .../async/ZookeeperInstanceStorage.scala | 40 +++++++++++++++++-- .../async/ZookeeperInstanceStorageSpec.scala | 13 ++++-- build.sbt | 8 ++-- 7 files changed, 60 insertions(+), 19 deletions(-) diff --git a/akkeeper/src/main/resources/reference.conf b/akkeeper/src/main/resources/reference.conf index 9b84f7c..f05126f 100644 --- a/akkeeper/src/main/resources/reference.conf +++ b/akkeeper/src/main/resources/reference.conf @@ -121,7 +121,7 @@ akkeeper { servers = "" # The initial amount of time to wait between retries. - connection-interval-ms = 3000 + connection-interval-ms = 500 # The maximum number of times to retry. max-retries = 8 diff --git a/akkeeper/src/main/scala/akkeeper/container/ContainerInstanceMain.scala b/akkeeper/src/main/scala/akkeeper/container/ContainerInstanceMain.scala index 86dec11..09a87e5 100644 --- a/akkeeper/src/main/scala/akkeeper/container/ContainerInstanceMain.scala +++ b/akkeeper/src/main/scala/akkeeper/container/ContainerInstanceMain.scala @@ -83,7 +83,7 @@ object ContainerInstanceMain extends App with ContainerDefinitionJsonProtocol { val actorSystem = ActorSystem(instanceConfig.akkeeperAkka.actorSystemName, instanceConfig) val zkConfig = ZookeeperClientConfig.fromConfig(actorSystem.settings.config.zookeeper) - val instanceStorage = InstanceStorageFactory(zkConfig.child(instanceArgs.appId)) + val instanceStorage = InstanceStorageFactory(zkConfig.child(instanceArgs.appId), actorSystem) val actorsJsonStr = Source.fromFile(instanceArgs.actors).getLines().mkString("\n") val actors = actorsJsonStr.parseJson.convertTo[Seq[ActorLaunchContext]] diff --git a/akkeeper/src/main/scala/akkeeper/master/MasterRunner.scala b/akkeeper/src/main/scala/akkeeper/master/MasterRunner.scala index 859f5fd..f571d56 100644 --- a/akkeeper/src/main/scala/akkeeper/master/MasterRunner.scala +++ b/akkeeper/src/main/scala/akkeeper/master/MasterRunner.scala @@ -68,7 +68,7 @@ private[master] class YarnMasterRunner extends MasterRunner { private def createInstanceStorage(actorSystem: ActorSystem, appId: String): InstanceStorage = { val zkConfig = ZookeeperClientConfig.fromConfig(actorSystem.settings.config.zookeeper) - InstanceStorageFactory(zkConfig.child(appId)) + InstanceStorageFactory((zkConfig.child(appId),actorSystem)) } private def createDeployClient(actorSystem: ActorSystem, diff --git a/akkeeper/src/main/scala/akkeeper/storage/InstanceStorage.scala b/akkeeper/src/main/scala/akkeeper/storage/InstanceStorage.scala index 0c815cf..4645dc9 100644 --- a/akkeeper/src/main/scala/akkeeper/storage/InstanceStorage.scala +++ b/akkeeper/src/main/scala/akkeeper/storage/InstanceStorage.scala @@ -15,6 +15,7 @@ */ package akkeeper.storage +import akka.actor.ActorSystem import akkeeper.api._ import akkeeper.storage.zookeeper.ZookeeperClientConfig import akkeeper.storage.zookeeper.async.ZookeeperInstanceStorage @@ -62,14 +63,15 @@ private[akkeeper] trait InstanceStorageFactory[T] extends (T => InstanceStorage) private[akkeeper] object InstanceStorageFactory { implicit object ZookeeperInstanceStorageFactory - extends InstanceStorageFactory[ZookeeperClientConfig] { + extends InstanceStorageFactory[(ZookeeperClientConfig, ActorSystem)] { - override def apply(config: ZookeeperClientConfig): InstanceStorage = { - new ZookeeperInstanceStorage(config.child("instances")) + override def apply(ctr:(ZookeeperClientConfig,ActorSystem)): InstanceStorage = { + val (config, actorSystem) = ctr + new ZookeeperInstanceStorage(config.child("instances"))(actorSystem) } } - def apply[T: InstanceStorageFactory](config: T): InstanceStorage = { - implicitly[T](config) + def apply[T: InstanceStorageFactory](ctr: T): InstanceStorage = { + implicitly[T](ctr) } } diff --git a/akkeeper/src/main/scala/akkeeper/storage/zookeeper/async/ZookeeperInstanceStorage.scala b/akkeeper/src/main/scala/akkeeper/storage/zookeeper/async/ZookeeperInstanceStorage.scala index f8a38ec..40de5c9 100644 --- a/akkeeper/src/main/scala/akkeeper/storage/zookeeper/async/ZookeeperInstanceStorage.scala +++ b/akkeeper/src/main/scala/akkeeper/storage/zookeeper/async/ZookeeperInstanceStorage.scala @@ -23,9 +23,12 @@ import org.apache.zookeeper.CreateMode import scala.concurrent.Future import ZookeeperInstanceStorage._ +import akka.actor.ActorSystem import akkeeper.api.InstanceId -private[akkeeper] class ZookeeperInstanceStorage(config: ZookeeperClientConfig) +import scala.concurrent.duration.{DurationInt, FiniteDuration} + +private[akkeeper] class ZookeeperInstanceStorage(config: ZookeeperClientConfig)(implicit system: ActorSystem) extends BaseZookeeperStorage with InstanceStorage { protected override val zookeeperClient = @@ -50,16 +53,20 @@ private[akkeeper] class ZookeeperInstanceStorage(config: ZookeeperClientConfig) .map(fromBytes[InstanceInfo]) } + //akka-http default timeout is 20 seconds, so ensure that the call does not exceed it + //timeout outer future in 19 seconds override def getInstances: Future[Seq[InstanceId]] = { - val instancesFuture = for { + val instancesFuture: Future[Seq[Future[Seq[String]]]] = for { + //timeout containers <- zookeeperClient.children("") } yield for { container <- containers } yield zookeeperClient.children(container) - instancesFuture + val expectedExecution = instancesFuture .flatMap(f => Future.sequence(f).map(_.flatten)) .map(_.map(pathToInstanceId)) .recover(notFoundToEmptySeq[InstanceId]) + withTimeout(expectedExecution, 19.seconds)(extractPartialFutureState(expectedExecution, instancesFuture)) } override def getInstancesByContainer(containerName: String): Future[Seq[InstanceId]] = { @@ -67,6 +74,33 @@ private[akkeeper] class ZookeeperInstanceStorage(config: ZookeeperClientConfig) .map(_.map(pathToInstanceId)) .recover(notFoundToEmptySeq[InstanceId]) } + + private def extractPartialFutureState(f: Future[_], instancesFuture: Future[Seq[Future[Seq[String]]]]): + Future[Seq[InstanceId]] = { + if(!f.isCompleted) { + system.log.warning("extract partial future stated started") + if (instancesFuture.isCompleted && instancesFuture.value.get.isSuccess) { + val successfulInstances = instancesFuture.value.get.get.collect { + case f if f.isCompleted && f.value.get.isSuccess => f.value.get.get.map(pathToInstanceId) + case f => + val state = s"completed: ${f.isCompleted} successful ${f.value.map(_.isSuccess)}" + system.log.warning(s"zk get children failed: ${state}") + Seq.empty[InstanceId] + } + Future.successful(successfulInstances.flatten) + } else { + system.log.warning("Getting zookeeper children timeout") + Future.successful(Seq.empty[InstanceId]) + } + } else { + Future.successful(Seq.empty[InstanceId]) + } + } + + private def withTimeout[T](f:Future[T], timeout:FiniteDuration)(value: => Future[T]) = { + val timeoutFuture = akka.pattern.after(timeout, system.scheduler)(value) + Future.firstCompletedOf(Seq(f, timeoutFuture))(system.dispatcher) + } } private[akkeeper] object ZookeeperInstanceStorage { diff --git a/akkeeper/src/test/scala/akkeeper/storage/zookeeper/async/ZookeeperInstanceStorageSpec.scala b/akkeeper/src/test/scala/akkeeper/storage/zookeeper/async/ZookeeperInstanceStorageSpec.scala index 14a5d69..39007ec 100644 --- a/akkeeper/src/test/scala/akkeeper/storage/zookeeper/async/ZookeeperInstanceStorageSpec.scala +++ b/akkeeper/src/test/scala/akkeeper/storage/zookeeper/async/ZookeeperInstanceStorageSpec.scala @@ -15,18 +15,23 @@ */ package akkeeper.storage.zookeeper.async -import akka.actor.Address +import akka.actor.{ActorSystem, Address} import akka.cluster.UniqueAddress +import akka.testkit.TestKit import akkeeper.AwaitMixin import akkeeper.address._ import akkeeper.api._ import akkeeper.storage._ import akkeeper.storage.zookeeper.ZookeeperClientConfig import org.apache.curator.test.{TestingServer => ZookeeperServer} -import org.scalatest.{FlatSpec, Matchers} +import org.scalatest.{BeforeAndAfterAll, FlatSpecLike, Matchers} -class ZookeeperInstanceStorageSpec extends FlatSpec - with Matchers with AwaitMixin with ZookeeperBaseSpec { +class ZookeeperInstanceStorageSpec extends TestKit(ActorSystem("instanceStorageSpec")) with FlatSpecLike + with Matchers with AwaitMixin with ZookeeperBaseSpec with BeforeAndAfterAll{ + + override def afterAll(): Unit = { + TestKit.shutdownActorSystem(system) + } private def withStorage[T](f: InstanceStorage => T): T = { val zookeeper = new ZookeeperServer() diff --git a/build.sbt b/build.sbt index 93b28e3..35db92e 100644 --- a/build.sbt +++ b/build.sbt @@ -14,10 +14,10 @@ * limitations under the License. */ -val AkkaVersion = "2.5.14" -val AkkaHttpVersion = "10.1.3" +val AkkaVersion = "2.5.31" +val AkkaHttpVersion = "10.1.13" val CuratorVersion = "2.7.1" -val SprayJsonVersion = "1.3.4" +val SprayJsonVersion = "1.3.6" val HadoopVersion = "2.8.4" val ScalaTestVersion = "3.0.5" val ScalamockVersion = "3.4.2" @@ -40,7 +40,7 @@ val TestDependencies = Seq( val CommonSettings = Seq( organization := "com.github.izeigerman", - scalaVersion := "2.12.6", + scalaVersion := "2.12.11", crossScalaVersions := Seq("2.11.11", scalaVersion.value), version := "0.4.12-SNAPSHOT", From c729156310d251096e186050bc43361058f73fb5 Mon Sep 17 00:00:00 2001 From: Marek Zebrowski Date: Tue, 22 Dec 2020 13:15:16 +0100 Subject: [PATCH 2/2] log zookeeper future execution time and status --- .../async/ZookeeperInstanceStorage.scala | 41 +++++++------------ 1 file changed, 14 insertions(+), 27 deletions(-) diff --git a/akkeeper/src/main/scala/akkeeper/storage/zookeeper/async/ZookeeperInstanceStorage.scala b/akkeeper/src/main/scala/akkeeper/storage/zookeeper/async/ZookeeperInstanceStorage.scala index 40de5c9..65b797b 100644 --- a/akkeeper/src/main/scala/akkeeper/storage/zookeeper/async/ZookeeperInstanceStorage.scala +++ b/akkeeper/src/main/scala/akkeeper/storage/zookeeper/async/ZookeeperInstanceStorage.scala @@ -27,6 +27,7 @@ import akka.actor.ActorSystem import akkeeper.api.InstanceId import scala.concurrent.duration.{DurationInt, FiniteDuration} +import scala.util.Try private[akkeeper] class ZookeeperInstanceStorage(config: ZookeeperClientConfig)(implicit system: ActorSystem) extends BaseZookeeperStorage with InstanceStorage { @@ -56,6 +57,7 @@ private[akkeeper] class ZookeeperInstanceStorage(config: ZookeeperClientConfig)( //akka-http default timeout is 20 seconds, so ensure that the call does not exceed it //timeout outer future in 19 seconds override def getInstances: Future[Seq[InstanceId]] = { + val t0 = System.nanoTime() val instancesFuture: Future[Seq[Future[Seq[String]]]] = for { //timeout containers <- zookeeperClient.children("") @@ -66,7 +68,18 @@ private[akkeeper] class ZookeeperInstanceStorage(config: ZookeeperClientConfig)( .flatMap(f => Future.sequence(f).map(_.flatten)) .map(_.map(pathToInstanceId)) .recover(notFoundToEmptySeq[InstanceId]) - withTimeout(expectedExecution, 19.seconds)(extractPartialFutureState(expectedExecution, instancesFuture)) + expectedExecution.onComplete { result:Try[Seq[InstanceId]] => + val t1 = System.nanoTime() + val nanoDelta = t1 - t0 + val secondsDelta = nanoDelta.toDouble / 1000000000L + val statusMsg = if(result.isSuccess) {"succeeded"} else {s"failed with ${result.failed.get}"} + if(secondsDelta >= 20.0) { + system.log.warning(s"getInstances TIMEOUTED, took ${secondsDelta} seconds and ${statusMsg}") + } else { + system.log.info(s"getInstances took ${secondsDelta} seconds and ${statusMsg}") + } + }(system.dispatcher) + expectedExecution } override def getInstancesByContainer(containerName: String): Future[Seq[InstanceId]] = { @@ -75,32 +88,6 @@ private[akkeeper] class ZookeeperInstanceStorage(config: ZookeeperClientConfig)( .recover(notFoundToEmptySeq[InstanceId]) } - private def extractPartialFutureState(f: Future[_], instancesFuture: Future[Seq[Future[Seq[String]]]]): - Future[Seq[InstanceId]] = { - if(!f.isCompleted) { - system.log.warning("extract partial future stated started") - if (instancesFuture.isCompleted && instancesFuture.value.get.isSuccess) { - val successfulInstances = instancesFuture.value.get.get.collect { - case f if f.isCompleted && f.value.get.isSuccess => f.value.get.get.map(pathToInstanceId) - case f => - val state = s"completed: ${f.isCompleted} successful ${f.value.map(_.isSuccess)}" - system.log.warning(s"zk get children failed: ${state}") - Seq.empty[InstanceId] - } - Future.successful(successfulInstances.flatten) - } else { - system.log.warning("Getting zookeeper children timeout") - Future.successful(Seq.empty[InstanceId]) - } - } else { - Future.successful(Seq.empty[InstanceId]) - } - } - - private def withTimeout[T](f:Future[T], timeout:FiniteDuration)(value: => Future[T]) = { - val timeoutFuture = akka.pattern.after(timeout, system.scheduler)(value) - Future.firstCompletedOf(Seq(f, timeoutFuture))(system.dispatcher) - } } private[akkeeper] object ZookeeperInstanceStorage {