From f3042ba8a7f2333b7a721aafd8a52b25bae4fcd2 Mon Sep 17 00:00:00 2001 From: chadlagore Date: Tue, 25 Aug 2020 18:07:12 -0700 Subject: [PATCH] change to list-shards --- README.md | 2 +- pom.xml | 7 +++- .../spark/sql/kinesis/KinesisReader.scala | 40 +++++-------------- .../spark/sql/kinesis/ShardSyncer.scala | 4 +- 4 files changed, 19 insertions(+), 34 deletions(-) diff --git a/README.md b/README.md index 31a22bf..845c133 100644 --- a/README.md +++ b/README.md @@ -130,7 +130,7 @@ Refering $SPARK_HOME to the Spark installation directory. | kinesis.executor.maxRecordPerRead | 10000 | Maximum Number of records to fetch per getRecords API call | | kinesis.executor.addIdleTimeBetweenReads | false | Add delay between two consecutive getRecords API call | | kinesis.executor.idleTimeBetweenReadsInMs | 1000 | Minimum delay between two consecutive getRecords | -| kinesis.client.describeShardInterval | 1s (1 second) | Minimum Interval between two DescribeStream API calls to consider resharding | +| kinesis.client.describeShardInterval | 1s (1 second) | Minimum Interval between two ListShards API calls to consider resharding | | kinesis.client.numRetries | 3 | Maximum Number of retries for Kinesis API requests | | kinesis.client.retryIntervalMs | 1000 | Cool-off period before retrying Kinesis API | | kinesis.client.maxRetryIntervalMs | 10000 | Max Cool-off period between 2 retries | diff --git a/pom.xml b/pom.xml index 183ae05..2ad7e67 100644 --- a/pom.xml +++ b/pom.xml @@ -99,7 +99,12 @@ com.amazonaws amazon-kinesis-client - 1.8.10 + 1.9.0 + + + com.amazonaws + aws-java-sdk-core + 1.11.430 com.amazonaws diff --git a/src/main/scala/org/apache/spark/sql/kinesis/KinesisReader.scala b/src/main/scala/org/apache/spark/sql/kinesis/KinesisReader.scala index 27f2d5d..76f4181 100644 --- a/src/main/scala/org/apache/spark/sql/kinesis/KinesisReader.scala +++ b/src/main/scala/org/apache/spark/sql/kinesis/KinesisReader.scala @@ -25,7 +25,7 @@ import java.util.concurrent.{Executors, ThreadFactory} import com.amazonaws.AbortedException import com.amazonaws.services.kinesis.AmazonKinesisClient import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord -import com.amazonaws.services.kinesis.model.{DescribeStreamRequest, GetRecordsRequest, Shard, _} +import com.amazonaws.services.kinesis.model.{GetRecordsRequest, ListShardsRequest, Shard, _} import scala.collection.JavaConverters._ import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.duration.Duration @@ -72,8 +72,6 @@ private[kinesis] case class KinesisReader( readerOptions.getOrElse("client.maxRetryIntervalMs".toLowerCase(Locale.ROOT), "10000").toLong } - private val maxSupportedShardsPerStream = 100 - private var _amazonClient: AmazonKinesisClient = null private def getAmazonClient(): AmazonKinesisClient = { @@ -85,8 +83,8 @@ private[kinesis] case class KinesisReader( } def getShards(): Seq[Shard] = { - val shards = describeKinesisStream - logInfo(s"Describe Kinesis Stream: ${shards}") + val shards = listShards + logInfo(s"List shards in Kinesis Stream: ${shards}") shards } @@ -170,36 +168,18 @@ private[kinesis] case class KinesisReader( records } - private def describeKinesisStream(): Seq[Shard] = { - // TODO - We have a limit on DescribeStream API call. - // So we should be cautious before making this call + private def listShards(): Seq[Shard] = { - val describeStreamRequest = new DescribeStreamRequest - describeStreamRequest.setStreamName(streamName) - describeStreamRequest.setLimit(maxSupportedShardsPerStream) + val listShardsRequest = new ListShardsRequest + listShardsRequest.setStreamName(streamName) - val describeStreamResult: DescribeStreamResult = runUninterruptibly { - retryOrTimeout[DescribeStreamResult]( s"Describe Streams") { - getAmazonClient.describeStream(describeStreamRequest) + val listShardsResult: ListShardsResult = runUninterruptibly { + retryOrTimeout[ListShardsResult]( s"List shards") { + getAmazonClient.listShards(listShardsRequest) } } - val shards = new ArrayList[Shard]() - var exclusiveStartShardId : String = null - - do { - describeStreamRequest.setExclusiveStartShardId( exclusiveStartShardId ) - val describeStreamResult = getAmazonClient.describeStream( describeStreamRequest ) - shards.addAll( describeStreamResult.getStreamDescription().getShards() ) - if (describeStreamResult.getStreamDescription().getHasMoreShards() && shards.size() > 0) { - exclusiveStartShardId = shards.get(shards.size() - 1).getShardId(); - } else { - exclusiveStartShardId = null - } - } while ( exclusiveStartShardId != null ) - - shards.asScala.toSeq - + listShardsResult.getShards.asScala.toSeq } /* diff --git a/src/main/scala/org/apache/spark/sql/kinesis/ShardSyncer.scala b/src/main/scala/org/apache/spark/sql/kinesis/ShardSyncer.scala index 5799977..41678f5 100644 --- a/src/main/scala/org/apache/spark/sql/kinesis/ShardSyncer.scala +++ b/src/main/scala/org/apache/spark/sql/kinesis/ShardSyncer.scala @@ -63,12 +63,12 @@ private[kinesis] object ShardSyncer extends Logging { shardIdToShardMap.get(parentShardId) match { case None => throw new IllegalStateException(s"ShardId $parentShardId is not closed. " + - s"This can happen due to a race condition between describeStream and a" + + s"This can happen due to a race condition between listShards and a" + s" reshard operation") case Some(parentShard: Shard) => if (parentShard.getSequenceNumberRange().getEndingSequenceNumber == null) { throw new IllegalStateException(s"ShardId $parentShardId is not closed. " + - s"This can happen due to a race condition between describeStream and a " + + s"This can happen due to a race condition between listShards and a " + s"reshard operation") } }