Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ public class FanOutKinesisShardSubscription {
// Queue is meant for eager retrieval of records from the Kinesis stream. We will always have 2
// record batches available on next read.
private final BlockingQueue<SubscribeToShardEvent> eventQueue = new LinkedBlockingQueue<>(2);
private final AtomicBoolean subscriptionActive = new AtomicBoolean(false);
private final AtomicReference<Throwable> subscriptionException = new AtomicReference<>();

// Store the current starting position for this subscription. Will be updated each time new
Expand Down Expand Up @@ -108,8 +107,9 @@ public void activateSubscription() {
shardId,
startingPosition,
consumerArn);
if (subscriptionActive.get()) {
LOG.warn("Skipping activation of subscription since it is already active.");
if (shardSubscriber != null
&& shardSubscriber.getSubscriptionState() == SubscriptionState.SUBSCRIBED) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: should we change the message. It refers to the subscription being active , which seems to be tied to the subscriptionActive variable we have removed.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated wording

LOG.warn("Skipping activation of subscription since it is active & subscribed.");
return;
}

Expand Down Expand Up @@ -166,9 +166,9 @@ public void activateSubscription() {
shardId,
startingPosition,
consumerArn);
subscriptionActive.set(true);
// Request first batch of records.
shardSubscriber.requestRecords();

} else {
String errorMessage =
"Timeout when subscribing to shard "
Expand Down Expand Up @@ -236,16 +236,43 @@ public SubscribeToShardEvent nextEvent() {
throw new KinesisStreamsSourceException(
"Subscription encountered unrecoverable exception.", throwable);
}
final SubscriptionState state =
Optional.ofNullable(shardSubscriber)
.map(FanOutShardSubscriber::getSubscriptionState)
.orElse(SubscriptionState.NOT_STARTED);

if (!subscriptionActive.get()) {
LOG.debug(
"Subscription to shard {} for consumer {} is not yet active. Skipping.",
shardId,
consumerArn);
return null;
switch (state) {
case NOT_STARTED:
if (LOG.isDebugEnabled()) {
LOG.debug(
"Subscription to shard {} for consumer {} is not yet active. Skipping.",
shardId,
consumerArn);
}
return null;
case COMPLETED:
if (shardSubscriber.isShardEndReached()) {
if (LOG.isInfoEnabled()) {
LOG.info(
"Subscription reached SHARD_END for shard {} for consumer {}.",
shardId,
consumerArn);
}
return null;
}
if (LOG.isInfoEnabled()) {
LOG.info(
"Subscription expired to shard {} for consumer {}. Restarting.",
shardId,
consumerArn);
}
activateSubscription();
return null;
case SUBSCRIBED:
return eventQueue.poll();
default:
throw new IllegalStateException("Unknown subscription state: " + state);
}

return eventQueue.poll();
}

/**
Expand All @@ -254,26 +281,48 @@ public SubscribeToShardEvent nextEvent() {
*/
private class FanOutShardSubscriber implements Subscriber<SubscribeToShardEventStream> {
private final CountDownLatch subscriptionLatch;

private Subscription subscription;

private final AtomicReference<SubscriptionState> subscriptionState =
new AtomicReference<>(SubscriptionState.NOT_STARTED);
private final AtomicBoolean isShardEnd = new AtomicBoolean(false);

private FanOutShardSubscriber(CountDownLatch subscriptionLatch) {
this.subscriptionLatch = subscriptionLatch;
}

/**
* Fetch the state that the subscriber is in.
*
* @return Subscription state for the subscriber.
*/
public SubscriptionState getSubscriptionState() {
return subscriptionState.get();
}

/**
* Boolean whether this subscriber has reached the end of a shard.
*
* @return True if ShardEnd. false otherwise.
*/
public boolean isShardEndReached() {
return isShardEnd.get();
}

public void requestRecords() {
subscription.request(1);
}

public void cancel() {
if (!subscriptionActive.get()) {
LOG.warn("Trying to cancel inactive subscription. Ignoring.");
if (this.subscriptionState.get() == SubscriptionState.COMPLETED) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change the message to be more accurate?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The log statement is technically correct in that its canceling a subscription that is no longer active. But ill update the wording to more closely align with the new states

LOG.warn("Subscription is already completed. Ignoring request to cancel.");
return;
}
subscriptionActive.set(false);

if (subscription != null) {
subscription.cancel();
}
this.subscriptionState.set(SubscriptionState.COMPLETED);
}

@Override
Expand All @@ -284,6 +333,7 @@ public void onSubscribe(Subscription subscription) {
startingPosition,
consumerArn);
this.subscription = subscription;
this.subscriptionState.set(SubscriptionState.SUBSCRIBED);
subscriptionLatch.countDown();
}

Expand All @@ -300,6 +350,15 @@ public void visit(SubscribeToShardEvent event) {
event);
eventQueue.put(event);

if (event.continuationSequenceNumber() == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("continuationSequenceNumber is null. " +
"Reached ShardEnd for shard: {}", shardId);
}
isShardEnd.set(true);
Comment thread
pelaezryan marked this conversation as resolved.
return;
}

// Update the starting position in case we have to recreate the
// subscription
startingPosition =
Expand Down Expand Up @@ -330,8 +389,14 @@ public void onError(Throwable throwable) {
@Override
public void onComplete() {
LOG.info("Subscription complete - {} ({})", shardId, consumerArn);
cancel();
activateSubscription();
this.subscriptionState.set(SubscriptionState.COMPLETED);
}
}

/** States that the {@code FanOutShardSubscriber} may be in. */
private enum SubscriptionState {
NOT_STARTED,
SUBSCRIBED,
COMPLETED
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can there be an error state?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not that I am aware of. When any exceptions occur we terminate the subscription (if it's not done so already):

        kinesis.subscribeToShard(consumerArn, shardId, startingPosition, responseHandler)
                .exceptionally(
                        throwable -> {
                            // If consumer exists and is still activating, we want to countdown.
                            if (ExceptionUtils.findThrowable(
                                            throwable, ResourceInUseException.class)
                                    .isPresent()) {
                                waitForSubscriptionLatch.countDown();
                                return null;
                            }
                            LOG.error(
                                    "Error subscribing to shard {} with starting position {} for consumer {}.",
                                    shardId,
                                    startingPosition,
                                    consumerArn,
                                    throwable);
                            terminateSubscription(throwable);
                            return null;
                        });

}
}
Loading
Loading