The library gets stuck pulling records from Kinesis for very long periods of time when the stream is empty. By looking at the debug output, I have confirmed it is due to this condition always being true when the stream is empty (we lose the lastSequenceNumber and never regain it due to emptiness).
As a result of the prolonged pulling, we run into Kinesis read limits which further throttles. In this loop for example, it hits the Kinesis API ~250 times to accomplish a micro-batch on a 30s trigger (which can take a few mins to do). Eventually, getMillisBehindLatest becomes 0 and the loop can move forward. For reference, we use a fallback stream in one region that is normally empty, and union it with another region which normally has data. The full region finishes in about 5-10s, the empty region runs for minutes sometimes.
- What is causing
getMillisBehindLatest to move forward? The Kinesis API seems to move it slightly forward on every call.
- Is it true that if a pull from Kinesis at a timestamp offset
t0 returns empty, then it is safe to move the timestamp forward t0 + getMillisBehindLatest to the tip of the stream?
- Should
avoidEmptyBatches be taken into consideration somewhere in that condition? Is there another config we could use to circumvent this?
- If not, is there an update to that condition that you would recommend? Can we remove it?
Example select log lines in the loop for a single micro-batch (library makes 257 pull attempts):
20/08/23 20:34:54 DEBUG KinesisSourceRDD: Milli secs behind is 80956000
20/08/23 20:34:54 DEBUG KinesisSourceRDD: Milli secs behind is 80670000
20/08/23 20:34:55 DEBUG KinesisSourceRDD: Milli secs behind is 80374000
20/08/23 20:34:55 DEBUG KinesisSourceRDD: Milli secs behind is 80089000
20/08/23 20:34:55 DEBUG KinesisSourceRDD: Milli secs behind is 79790000
20/08/23 20:34:55 DEBUG KinesisSourceRDD: Milli secs behind is 79496000
20/08/23 20:34:55 DEBUG KinesisSourceRDD: Milli secs behind is 79200000
...
20/08/23 20:35:41 DEBUG KinesisSourceRDD: Milli secs behind is 1324000
20/08/23 20:35:41 DEBUG KinesisSourceRDD: Milli secs behind is 1048000
20/08/23 20:35:41 DEBUG KinesisSourceRDD: Milli secs behind is 765000
20/08/23 20:35:41 DEBUG KinesisSourceRDD: Milli secs behind is 480000
20/08/23 20:35:42 DEBUG KinesisSourceRDD: Milli secs behind is 203000
20/08/23 20:35:42 DEBUG KinesisSourceRDD: Milli secs behind is 0
Exceptions raised (86 within micro batch):
20/08/23 20:35:38 DEBUG request: Received error response: com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException: Rate exceeded for shard shardId-000000000001 in stream stream_name under account 123456789. (Service: AmazonKinesis; Status Code: 400; Error Code: ProvisionedThroughputExceededException; Request ID: )
Current version: spark-sql-kinesis_2.11-1.1.2-spark_2.4
The library gets stuck pulling records from Kinesis for very long periods of time when the stream is empty. By looking at the debug output, I have confirmed it is due to this condition always being true when the stream is empty (we lose the
lastSequenceNumberand never regain it due to emptiness).As a result of the prolonged pulling, we run into Kinesis read limits which further throttles. In this loop for example, it hits the Kinesis API ~250 times to accomplish a micro-batch on a 30s trigger (which can take a few mins to do). Eventually,
getMillisBehindLatestbecomes 0 and the loop can move forward. For reference, we use a fallback stream in one region that is normally empty, and union it with another region which normally has data. The full region finishes in about 5-10s, the empty region runs for minutes sometimes.getMillisBehindLatestto move forward? The Kinesis API seems to move it slightly forward on every call.t0returns empty, then it is safe to move the timestamp forwardt0 + getMillisBehindLatestto the tip of the stream?avoidEmptyBatchesbe taken into consideration somewhere in that condition? Is there another config we could use to circumvent this?Example select log lines in the loop for a single micro-batch (library makes 257 pull attempts):
Exceptions raised (86 within micro batch):
Current version:
spark-sql-kinesis_2.11-1.1.2-spark_2.4