Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion akkeeper/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ akkeeper {
servers = ""

# The initial amount of time to wait between retries.
connection-interval-ms = 3000
connection-interval-ms = 500
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you change it?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in ZookeeperClient we have .retryPolicy(new ExponentialBackoffRetry(config.connectionIntervalMs, config.maxRetries)) - so we start from 3000ms and then exponentially backoff up to max-retries. 3000 ms * (2 ^ 8) is already around 12 minutes - way over our timeout. 500ms brings that time to around 2 minutes max


# The maximum number of times to retry.
max-retries = 8
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ object ContainerInstanceMain extends App with ContainerDefinitionJsonProtocol {
val actorSystem = ActorSystem(instanceConfig.akkeeperAkka.actorSystemName, instanceConfig)

val zkConfig = ZookeeperClientConfig.fromConfig(actorSystem.settings.config.zookeeper)
val instanceStorage = InstanceStorageFactory(zkConfig.child(instanceArgs.appId))
val instanceStorage = InstanceStorageFactory(zkConfig.child(instanceArgs.appId), actorSystem)

val actorsJsonStr = Source.fromFile(instanceArgs.actors).getLines().mkString("\n")
val actors = actorsJsonStr.parseJson.convertTo[Seq[ActorLaunchContext]]
Expand Down
2 changes: 1 addition & 1 deletion akkeeper/src/main/scala/akkeeper/master/MasterRunner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ private[master] class YarnMasterRunner extends MasterRunner {
private def createInstanceStorage(actorSystem: ActorSystem,
appId: String): InstanceStorage = {
val zkConfig = ZookeeperClientConfig.fromConfig(actorSystem.settings.config.zookeeper)
InstanceStorageFactory(zkConfig.child(appId))
InstanceStorageFactory((zkConfig.child(appId),actorSystem))
}

private def createDeployClient(actorSystem: ActorSystem,
Expand Down
12 changes: 7 additions & 5 deletions akkeeper/src/main/scala/akkeeper/storage/InstanceStorage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package akkeeper.storage

import akka.actor.ActorSystem
import akkeeper.api._
import akkeeper.storage.zookeeper.ZookeeperClientConfig
import akkeeper.storage.zookeeper.async.ZookeeperInstanceStorage
Expand Down Expand Up @@ -62,14 +63,15 @@ private[akkeeper] trait InstanceStorageFactory[T] extends (T => InstanceStorage)
private[akkeeper] object InstanceStorageFactory {

implicit object ZookeeperInstanceStorageFactory
extends InstanceStorageFactory[ZookeeperClientConfig] {
extends InstanceStorageFactory[(ZookeeperClientConfig, ActorSystem)] {

override def apply(config: ZookeeperClientConfig): InstanceStorage = {
new ZookeeperInstanceStorage(config.child("instances"))
override def apply(ctr:(ZookeeperClientConfig,ActorSystem)): InstanceStorage = {
val (config, actorSystem) = ctr
new ZookeeperInstanceStorage(config.child("instances"))(actorSystem)
}
}

def apply[T: InstanceStorageFactory](config: T): InstanceStorage = {
implicitly[T](config)
def apply[T: InstanceStorageFactory](ctr: T): InstanceStorage = {
implicitly[T](ctr)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,13 @@ import org.apache.zookeeper.CreateMode

import scala.concurrent.Future
import ZookeeperInstanceStorage._
import akka.actor.ActorSystem
import akkeeper.api.InstanceId

private[akkeeper] class ZookeeperInstanceStorage(config: ZookeeperClientConfig)
import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.util.Try

private[akkeeper] class ZookeeperInstanceStorage(config: ZookeeperClientConfig)(implicit system: ActorSystem)
extends BaseZookeeperStorage with InstanceStorage {

protected override val zookeeperClient =
Expand All @@ -50,23 +54,40 @@ private[akkeeper] class ZookeeperInstanceStorage(config: ZookeeperClientConfig)
.map(fromBytes[InstanceInfo])
}

//akka-http default timeout is 20 seconds, so ensure that the call does not exceed it
//timeout outer future in 19 seconds
override def getInstances: Future[Seq[InstanceId]] = {
val instancesFuture = for {
val t0 = System.nanoTime()
val instancesFuture: Future[Seq[Future[Seq[String]]]] = for {
//timeout
containers <- zookeeperClient.children("")
} yield for {
container <- containers
} yield zookeeperClient.children(container)
instancesFuture
val expectedExecution = instancesFuture
.flatMap(f => Future.sequence(f).map(_.flatten))
.map(_.map(pathToInstanceId))
.recover(notFoundToEmptySeq[InstanceId])
expectedExecution.onComplete { result:Try[Seq[InstanceId]] =>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this future going to be completed on akka-http timeout?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes - it is independent. Akka-http waits for completion of http request for akka.http.server.request-timeout
This future will be completed at unknown point. if it is completed within that timeout, result will be set back to http layer and completes the request. Otherwise, request is completed with timeout, but the Future computation here continues and eventually it should finish, logging the result.

val t1 = System.nanoTime()
val nanoDelta = t1 - t0
val secondsDelta = nanoDelta.toDouble / 1000000000L
val statusMsg = if(result.isSuccess) {"succeeded"} else {s"failed with ${result.failed.get}"}
if(secondsDelta >= 20.0) {
system.log.warning(s"getInstances TIMEOUTED, took ${secondsDelta} seconds and ${statusMsg}")
} else {
system.log.info(s"getInstances took ${secondsDelta} seconds and ${statusMsg}")
}
}(system.dispatcher)
expectedExecution
}

override def getInstancesByContainer(containerName: String): Future[Seq[InstanceId]] = {
zookeeperClient.children(containerName)
.map(_.map(pathToInstanceId))
.recover(notFoundToEmptySeq[InstanceId])
}

}

private[akkeeper] object ZookeeperInstanceStorage {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,23 @@
*/
package akkeeper.storage.zookeeper.async

import akka.actor.Address
import akka.actor.{ActorSystem, Address}
import akka.cluster.UniqueAddress
import akka.testkit.TestKit
import akkeeper.AwaitMixin
import akkeeper.address._
import akkeeper.api._
import akkeeper.storage._
import akkeeper.storage.zookeeper.ZookeeperClientConfig
import org.apache.curator.test.{TestingServer => ZookeeperServer}
import org.scalatest.{FlatSpec, Matchers}
import org.scalatest.{BeforeAndAfterAll, FlatSpecLike, Matchers}

class ZookeeperInstanceStorageSpec extends FlatSpec
with Matchers with AwaitMixin with ZookeeperBaseSpec {
class ZookeeperInstanceStorageSpec extends TestKit(ActorSystem("instanceStorageSpec")) with FlatSpecLike
with Matchers with AwaitMixin with ZookeeperBaseSpec with BeforeAndAfterAll{

override def afterAll(): Unit = {
TestKit.shutdownActorSystem(system)
}

private def withStorage[T](f: InstanceStorage => T): T = {
val zookeeper = new ZookeeperServer()
Expand Down
8 changes: 4 additions & 4 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@
* limitations under the License.
*/

val AkkaVersion = "2.5.14"
val AkkaHttpVersion = "10.1.3"
val AkkaVersion = "2.5.31"
val AkkaHttpVersion = "10.1.13"
val CuratorVersion = "2.7.1"
val SprayJsonVersion = "1.3.4"
val SprayJsonVersion = "1.3.6"
val HadoopVersion = "2.8.4"
val ScalaTestVersion = "3.0.5"
val ScalamockVersion = "3.4.2"
Expand All @@ -40,7 +40,7 @@ val TestDependencies = Seq(

val CommonSettings = Seq(
organization := "com.github.izeigerman",
scalaVersion := "2.12.6",
scalaVersion := "2.12.11",
crossScalaVersions := Seq("2.11.11", scalaVersion.value),
version := "0.4.12-SNAPSHOT",

Expand Down