Skip to content
This repository was archived by the owner on Jun 17, 2024. It is now read-only.
This repository was archived by the owner on Jun 17, 2024. It is now read-only.

Can't increase the in-flight messages count for persistent subscriptions #149

@chiller

Description

@chiller

I have a persistent subscription and a scala consumer. My issue is that no matter what settings I set in the subscription or the client application.conf I can't seem to get more than 10 in-flight messages.

Here is my code:

import java.util.concurrent.Executors
import _root_.akka.actor.{Actor, ActorLogging, ActorSystem, Props}
import eventstore.ResolvedEvent
import eventstore.akka.PersistentSubscriptionActor.ManualAck
import eventstore.akka.{EventStoreExtension, LiveProcessingStarted, PersistentSubscriptionActor}
import eventstore.core.EventStream

import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
import scala.sys.process._

trait Globals {
  implicit val system = ActorSystem()
  implicit val ec =  ExecutionContext.fromExecutor(Executors.newFixedThreadPool(30))
}

object PersistentSubscriptionExample extends App with Globals{

  val actor = system.actorOf(Props[CountPersistentStream])
  val extension = EventStoreExtension(system)

  val sub = system.actorOf(
    PersistentSubscriptionActor.props(
      connection = extension.actor,
      client = actor,
      streamId = EventStream.Id("$ce-test"),
      groupName = "test1",
      credentials = None,
      settings = extension.settings,
      autoAck = false
    ))
}

class CountPersistentStream extends Actor with ActorLogging with Globals {
  context.setReceiveTimeout(1.second)

  def receive: Receive = {
    case e: ResolvedEvent =>
      val currentSender = sender()
      Future {
        log.info(s"${e.streamId.toString}")
        Thread.sleep(1000)
        currentSender ! ManualAck(e.linkEvent.data.eventId)
      }
    case LiveProcessingStarted => log.info("live processing started")
  }
}

I can see that only 10 threads do work at any given time by looking at visualvm. Now if I set autoack=true, then all 30 threads will do work.
If I use the http api curl 'localhost:2113/subscriptions/%24ce-test/test1/15?embed=TryHarder' -H "Accept: application/vnd.eventstore.competingatom+json" then I do get 15 in-flight.
Am I missing some configuration?

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions