Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class SequentiallyCatsVsAkkaBenchmark extends Common {
dispatcher = disp
dispatcherCleanup = dispClean

val (seq, clean) = SequentiallyF.resource[IO, Int].allocated.unsafeRunSync()(runtime)
val (seq, clean) = SequentiallyF.resource[IO, Int]().allocated.unsafeRunSync()(runtime)
catsSequentially = seq
catsCleanup = clean
}
Expand Down
20 changes: 20 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,26 @@ lazy val `sequentially-ce` = projectMatrix
)
.jvmPlatform(scalaVersions = scalaVersions)

lazy val `sequentially-ce-metrics` = projectMatrix
.in(file("sequentially-ce-metrics"))
.settings(commonSettings)
.settings(
name := "sequentially-ce-metrics"
)
.settings(
libraryDependencies ++= Seq(
PrometheusTools,
Dependencies.CatsEffect.effect,
Scalatest % Test,
),
excludeDependencies ++= Seq(
ExclusionRule("com.typesafe.akka"),
ExclusionRule("org.apache.pekko"),
),
)
.jvmPlatform(scalaVersions = scalaVersions)
.dependsOn(`sequentially-ce`)

lazy val benchmark = project
.in(file("benchmark"))
.settings(commonSettings)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package com.evolutiongaming.concurrent

import cats.effect.kernel.Async
import cats.effect.std.Dispatcher
import cats.syntax.all.*
import com.evolutiongaming.concurrent.sequentially.SequentiallyF

import scala.concurrent.Future

/** Wrapper around SequentiallyF that adds metrics tracking.
* Since SequentiallyF is a final class, we wrap it by delegation.
*/
final class MeteredSequentiallyF[F[_]: Async, -K] private (
private val sequentially: SequentiallyF[F, K],
private val metrics: SequentiallyMetricsF[F],
) {

def apply[T](
key: K
)(
task: => T
)(implicit
dispatcher: Dispatcher[F]
): Future[T] = {
dispatcher.unsafeToFuture(applyF(key)(Async[F].delay(task)))
}

def applyF[T](key: K)(task: => F[T]): F[T] = {
val start = System.nanoTime()

metrics.queue(start) *> metrics.run(sequentially.applyF(key)(task))
}
}

object MeteredSequentiallyF {

def apply[F[_]: Async, K](
sequentially: SequentiallyF[F, K],
name: String,
sequentiallyMetrics: SequentiallyMetricsF.Factory[F],
): MeteredSequentiallyF[F, K] = {
apply(sequentially, sequentiallyMetrics(name))
}

def apply[F[_]: Async, K](
sequentially: SequentiallyF[F, K],
metrics: SequentiallyMetricsF[F],
): MeteredSequentiallyF[F, K] = {
new MeteredSequentiallyF(sequentially, metrics)
}

trait Factory[F[_]] {
def apply[K](name: String): MeteredSequentiallyF[F, K]
}

object Factory {

trait Provider[F[_]] {
def apply[K]: SequentiallyF[F, K]
}

def apply[F[_]: Async](
provider: Provider[F],
sequentiallyMetrics: SequentiallyMetricsF.Factory[F],
): Factory[F] = new Factory[F] {
override def apply[K](name: String): MeteredSequentiallyF[F, K] =
MeteredSequentiallyF(provider.apply[K], sequentiallyMetrics(name))
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package com.evolutiongaming.concurrent

import cats.effect.kernel.Sync
import cats.syntax.all.*
import com.evolutiongaming.prometheus.PrometheusHelper.*
import io.prometheus.client.{CollectorRegistry, Summary}

trait SequentiallyMetricsF[F[_]] {
def queue(startNanos: Long): F[Unit]
def run[T](task: => F[T]): F[T]
}

object SequentiallyMetricsF {

type Factory[F[_]] = String => SequentiallyMetricsF[F]

object Factory {

/** @note Must be singleton as metric names must be unique.
* @see CollectorRegistry#register
*/
def apply[F[_]: Sync](
prometheusRegistry: CollectorRegistry,
prefix: String = "sequentially",
): Factory[F] = {
val time = Summary
.build()
.name(s"${ prefix }_time")
.help("Latency of Sequentially operations (queue, run) (by name)")
.labelNames("name", "operation")
.defaultQuantiles()
.register(prometheusRegistry)

name =>
new SequentiallyMetricsF[F] {
def queue(startNanos: Long): F[Unit] = {
Sync[F].delay {
time.labels(name, "queue").timeTillNowNanos(startNanos)
}
}

def run[T](task: => F[T]): F[T] = {
Sync[F].defer {
val start = System.nanoTime()
task.flatMap { result =>
Sync[F].delay {
time.labels(name, "run").observe((System.nanoTime() - start).toDouble / 1e9)
result
}
}
}
}
}
}
}
}

Loading