diff --git a/README.md b/README.md
index 31a22bf..845c133 100644
--- a/README.md
+++ b/README.md
@@ -130,7 +130,7 @@ Refering $SPARK_HOME to the Spark installation directory.
| kinesis.executor.maxRecordPerRead | 10000 | Maximum Number of records to fetch per getRecords API call |
| kinesis.executor.addIdleTimeBetweenReads | false | Add delay between two consecutive getRecords API call |
| kinesis.executor.idleTimeBetweenReadsInMs | 1000 | Minimum delay between two consecutive getRecords |
-| kinesis.client.describeShardInterval | 1s (1 second) | Minimum Interval between two DescribeStream API calls to consider resharding |
+| kinesis.client.describeShardInterval | 1s (1 second) | Minimum Interval between two ListShards API calls to consider resharding |
| kinesis.client.numRetries | 3 | Maximum Number of retries for Kinesis API requests |
| kinesis.client.retryIntervalMs | 1000 | Cool-off period before retrying Kinesis API |
| kinesis.client.maxRetryIntervalMs | 10000 | Max Cool-off period between 2 retries |
diff --git a/pom.xml b/pom.xml
index 183ae05..2ad7e67 100644
--- a/pom.xml
+++ b/pom.xml
@@ -99,7 +99,12 @@
com.amazonaws
amazon-kinesis-client
- 1.8.10
+ 1.9.0
+
+
+ com.amazonaws
+ aws-java-sdk-core
+ 1.11.430
com.amazonaws
diff --git a/src/main/scala/org/apache/spark/sql/kinesis/KinesisReader.scala b/src/main/scala/org/apache/spark/sql/kinesis/KinesisReader.scala
index 27f2d5d..76f4181 100644
--- a/src/main/scala/org/apache/spark/sql/kinesis/KinesisReader.scala
+++ b/src/main/scala/org/apache/spark/sql/kinesis/KinesisReader.scala
@@ -25,7 +25,7 @@ import java.util.concurrent.{Executors, ThreadFactory}
import com.amazonaws.AbortedException
import com.amazonaws.services.kinesis.AmazonKinesisClient
import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord
-import com.amazonaws.services.kinesis.model.{DescribeStreamRequest, GetRecordsRequest, Shard, _}
+import com.amazonaws.services.kinesis.model.{GetRecordsRequest, ListShardsRequest, Shard, _}
import scala.collection.JavaConverters._
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration.Duration
@@ -72,8 +72,6 @@ private[kinesis] case class KinesisReader(
readerOptions.getOrElse("client.maxRetryIntervalMs".toLowerCase(Locale.ROOT), "10000").toLong
}
- private val maxSupportedShardsPerStream = 100
-
private var _amazonClient: AmazonKinesisClient = null
private def getAmazonClient(): AmazonKinesisClient = {
@@ -85,8 +83,8 @@ private[kinesis] case class KinesisReader(
}
def getShards(): Seq[Shard] = {
- val shards = describeKinesisStream
- logInfo(s"Describe Kinesis Stream: ${shards}")
+ val shards = listShards
+ logInfo(s"List shards in Kinesis Stream: ${shards}")
shards
}
@@ -170,36 +168,18 @@ private[kinesis] case class KinesisReader(
records
}
- private def describeKinesisStream(): Seq[Shard] = {
- // TODO - We have a limit on DescribeStream API call.
- // So we should be cautious before making this call
+ private def listShards(): Seq[Shard] = {
- val describeStreamRequest = new DescribeStreamRequest
- describeStreamRequest.setStreamName(streamName)
- describeStreamRequest.setLimit(maxSupportedShardsPerStream)
+ val listShardsRequest = new ListShardsRequest
+ listShardsRequest.setStreamName(streamName)
- val describeStreamResult: DescribeStreamResult = runUninterruptibly {
- retryOrTimeout[DescribeStreamResult]( s"Describe Streams") {
- getAmazonClient.describeStream(describeStreamRequest)
+ val listShardsResult: ListShardsResult = runUninterruptibly {
+ retryOrTimeout[ListShardsResult]( s"List shards") {
+ getAmazonClient.listShards(listShardsRequest)
}
}
- val shards = new ArrayList[Shard]()
- var exclusiveStartShardId : String = null
-
- do {
- describeStreamRequest.setExclusiveStartShardId( exclusiveStartShardId )
- val describeStreamResult = getAmazonClient.describeStream( describeStreamRequest )
- shards.addAll( describeStreamResult.getStreamDescription().getShards() )
- if (describeStreamResult.getStreamDescription().getHasMoreShards() && shards.size() > 0) {
- exclusiveStartShardId = shards.get(shards.size() - 1).getShardId();
- } else {
- exclusiveStartShardId = null
- }
- } while ( exclusiveStartShardId != null )
-
- shards.asScala.toSeq
-
+ listShardsResult.getShards.asScala.toSeq
}
/*
diff --git a/src/main/scala/org/apache/spark/sql/kinesis/ShardSyncer.scala b/src/main/scala/org/apache/spark/sql/kinesis/ShardSyncer.scala
index 5799977..41678f5 100644
--- a/src/main/scala/org/apache/spark/sql/kinesis/ShardSyncer.scala
+++ b/src/main/scala/org/apache/spark/sql/kinesis/ShardSyncer.scala
@@ -63,12 +63,12 @@ private[kinesis] object ShardSyncer extends Logging {
shardIdToShardMap.get(parentShardId) match {
case None =>
throw new IllegalStateException(s"ShardId $parentShardId is not closed. " +
- s"This can happen due to a race condition between describeStream and a" +
+ s"This can happen due to a race condition between listShards and a" +
s" reshard operation")
case Some(parentShard: Shard) =>
if (parentShard.getSequenceNumberRange().getEndingSequenceNumber == null) {
throw new IllegalStateException(s"ShardId $parentShardId is not closed. " +
- s"This can happen due to a race condition between describeStream and a " +
+ s"This can happen due to a race condition between listShards and a " +
s"reshard operation")
}
}