55import ssl
66import struct
77
8- import boto
8+ import boto . s3 . key
99import snappy
1010import ujson as json
1111import json as standard_json
1212import zlib
13- from cStringIO import StringIO
13+ from io import BytesIO
1414from google .protobuf .message import DecodeError
1515
1616from .message_pb2 import Message , Header
@@ -42,17 +42,18 @@ def _parse_heka_record(record):
4242 # messages is an unprocessed form of the data, usually the original
4343 # gzipped payload from the client.
4444 #
45- # We attempt to decompress it, and if that fails,
46- # attempt to decode it as a UTF-8 string.
45+ # We decompress it if we can, then try to decode it as a UTF-8 string.
4746 elif field .name == 'content' :
4847 try :
49- string = zlib .decompress (field .value_bytes [0 ], 16 + zlib .MAX_WBITS )
50- except zlib .error :
48+ string = field .value_bytes [0 ]
5149 try :
52- string = field .value_bytes [0 ].decode ('utf-8' )
53- except UnicodeDecodeError :
54- # There is no associated payload
55- break
50+ string = zlib .decompress (string , 16 + zlib .MAX_WBITS )
51+ except zlib .error :
52+ pass # not compressed
53+ string = string .decode ('utf-8' )
54+ except UnicodeDecodeError :
55+ # There is no associated payload
56+ break
5657 payload = {"content" : string }
5758 break
5859
@@ -123,7 +124,7 @@ def _parse_json(string):
123124class BacktrackableFile :
124125 def __init__ (self , stream ):
125126 self ._stream = stream
126- self ._buffer = StringIO ()
127+ self ._buffer = BytesIO ()
127128
128129 def read (self , size ):
129130 buffer_data = self ._buffer .read (size )
@@ -151,7 +152,7 @@ def backtrack(self):
151152 buf = self ._buffer .getvalue ()
152153 index = buf .find (chr (_record_separator ), 1 )
153154
154- self ._buffer = StringIO ()
155+ self ._buffer = BytesIO ()
155156 if index >= 0 :
156157 self ._buffer .write (buf [index :])
157158 self ._buffer .seek (0 )
@@ -170,7 +171,7 @@ def read_until_next(fin, separator=_record_separator):
170171 bytes_skipped = 0
171172 while True :
172173 c = fin .read (1 )
173- if c == '' :
174+ if len ( c ) == 0 :
174175 return bytes_skipped , True
175176 elif ord (c ) != separator :
176177 bytes_skipped += 1
@@ -196,7 +197,7 @@ def read_one_record(input_stream, raw=False, verbose=False, strict=False, try_sn
196197 if strict :
197198 raise ValueError ("Unexpected character(s) at the start of record" )
198199 if verbose :
199- print "Skipped" , skipped , " bytes to find a valid separator"
200+ print ( "Skipped %s bytes to find a valid separator" % skipped )
200201
201202 raw_record = struct .pack ("<B" , 0x1e )
202203
@@ -223,9 +224,9 @@ def read_one_record(input_stream, raw=False, verbose=False, strict=False, try_sn
223224 header .ParseFromString (header_raw )
224225 unit_separator = input_stream .read (1 )
225226 total_bytes += 1
226- if ord (unit_separator [ 0 ] ) != 0x1f :
227+ if ord (unit_separator ) != 0x1f :
227228 error_msg = "Unexpected unit separator character at offset {}: {}" .format (
228- total_bytes , ord (unit_separator [ 0 ] )
229+ total_bytes , ord (unit_separator )
229230 )
230231 if strict :
231232 raise ValueError (error_msg )
@@ -262,23 +263,23 @@ def unpack_file(filename, **kwargs):
262263
263264
264265def unpack_string (string , ** kwargs ):
265- return unpack (StringIO (string ), ** kwargs )
266+ return unpack (BytesIO (string ), ** kwargs )
266267
267268
268269def unpack (fin , raw = False , verbose = False , strict = False , backtrack = False , try_snappy = True ):
269270 record_count = 0
270271 total_bytes = 0
271272
272273 while True :
273- r = None
274+ r , size = None , 0
274275 try :
275276 r , size = read_one_record (fin , raw , verbose , strict , try_snappy )
276277 except Exception as e :
277278 if strict :
278279 fin .close ()
279280 raise e
280281 elif verbose :
281- print e
282+ print ( e )
282283
283284 if backtrack and type (e ) == DecodeError :
284285 fin .backtrack ()
@@ -288,14 +289,14 @@ def unpack(fin, raw=False, verbose=False, strict=False, backtrack=False, try_sna
288289 break
289290
290291 if verbose and r .error is not None :
291- print r .error
292+ print ( r .error )
292293
293294 record_count += 1
294295 total_bytes += size
295296
296297 yield r , total_bytes
297298
298299 if verbose :
299- print "Processed" , record_count , " records"
300+ print ( "Processed %s records" % record_count )
300301
301302 fin .close ()
0 commit comments