Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ Refering $SPARK_HOME to the Spark installation directory.
| kinesis.executor.maxRecordPerRead | 10000 | Maximum Number of records to fetch per getRecords API call |
| kinesis.executor.addIdleTimeBetweenReads | false | Add delay between two consecutive getRecords API call |
| kinesis.executor.idleTimeBetweenReadsInMs | 1000 | Minimum delay between two consecutive getRecords |
| kinesis.client.describeShardInterval | 1s (1 second) | Minimum Interval between two DescribeStream API calls to consider resharding |
| kinesis.client.describeShardInterval | 1s (1 second) | Minimum Interval between two ListShards API calls to consider resharding |
| kinesis.client.numRetries | 3 | Maximum Number of retries for Kinesis API requests |
| kinesis.client.retryIntervalMs | 1000 | Cool-off period before retrying Kinesis API |
| kinesis.client.maxRetryIntervalMs | 10000 | Max Cool-off period between 2 retries |
Expand Down
7 changes: 6 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,12 @@
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>amazon-kinesis-client</artifactId>
<version>1.8.10</version>
<version>1.9.0</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-core</artifactId>
<version>1.11.430</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
Expand Down
40 changes: 10 additions & 30 deletions src/main/scala/org/apache/spark/sql/kinesis/KinesisReader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import java.util.concurrent.{Executors, ThreadFactory}
import com.amazonaws.AbortedException
import com.amazonaws.services.kinesis.AmazonKinesisClient
import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord
import com.amazonaws.services.kinesis.model.{DescribeStreamRequest, GetRecordsRequest, Shard, _}
import com.amazonaws.services.kinesis.model.{GetRecordsRequest, ListShardsRequest, Shard, _}
import scala.collection.JavaConverters._
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration.Duration
Expand Down Expand Up @@ -72,8 +72,6 @@ private[kinesis] case class KinesisReader(
readerOptions.getOrElse("client.maxRetryIntervalMs".toLowerCase(Locale.ROOT), "10000").toLong
}

private val maxSupportedShardsPerStream = 100

private var _amazonClient: AmazonKinesisClient = null

private def getAmazonClient(): AmazonKinesisClient = {
Expand All @@ -85,8 +83,8 @@ private[kinesis] case class KinesisReader(
}

def getShards(): Seq[Shard] = {
val shards = describeKinesisStream
logInfo(s"Describe Kinesis Stream: ${shards}")
val shards = listShards
logInfo(s"List shards in Kinesis Stream: ${shards}")
shards
}

Expand Down Expand Up @@ -170,36 +168,18 @@ private[kinesis] case class KinesisReader(
records
}

private def describeKinesisStream(): Seq[Shard] = {
// TODO - We have a limit on DescribeStream API call.
// So we should be cautious before making this call
private def listShards(): Seq[Shard] = {

val describeStreamRequest = new DescribeStreamRequest
describeStreamRequest.setStreamName(streamName)
describeStreamRequest.setLimit(maxSupportedShardsPerStream)
val listShardsRequest = new ListShardsRequest
listShardsRequest.setStreamName(streamName)

val describeStreamResult: DescribeStreamResult = runUninterruptibly {
retryOrTimeout[DescribeStreamResult]( s"Describe Streams") {
getAmazonClient.describeStream(describeStreamRequest)
val listShardsResult: ListShardsResult = runUninterruptibly {
retryOrTimeout[ListShardsResult]( s"List shards") {
getAmazonClient.listShards(listShardsRequest)
}
}

val shards = new ArrayList[Shard]()
var exclusiveStartShardId : String = null

do {
describeStreamRequest.setExclusiveStartShardId( exclusiveStartShardId )
val describeStreamResult = getAmazonClient.describeStream( describeStreamRequest )
shards.addAll( describeStreamResult.getStreamDescription().getShards() )
if (describeStreamResult.getStreamDescription().getHasMoreShards() && shards.size() > 0) {
exclusiveStartShardId = shards.get(shards.size() - 1).getShardId();
} else {
exclusiveStartShardId = null
}
} while ( exclusiveStartShardId != null )

shards.asScala.toSeq

listShardsResult.getShards.asScala.toSeq
}

/*
Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/org/apache/spark/sql/kinesis/ShardSyncer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,12 @@ private[kinesis] object ShardSyncer extends Logging {
shardIdToShardMap.get(parentShardId) match {
case None =>
throw new IllegalStateException(s"ShardId $parentShardId is not closed. " +
s"This can happen due to a race condition between describeStream and a" +
s"This can happen due to a race condition between listShards and a" +
s" reshard operation")
case Some(parentShard: Shard) =>
if (parentShard.getSequenceNumberRange().getEndingSequenceNumber == null) {
throw new IllegalStateException(s"ShardId $parentShardId is not closed. " +
s"This can happen due to a race condition between describeStream and a " +
s"This can happen due to a race condition between listShards and a " +
s"reshard operation")
}
}
Expand Down