Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 24 additions & 25 deletions pyauparser/grammar.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,8 +267,7 @@ def load_file(file_or_path):
"""Load grammar information from file.
http://goldparser.org/doc/egt/index.htm
"""
if (isinstance(file_or_path, str) or
isinstance(file_or_path, str)):
if (isinstance(file_or_path, str)):
with open(file_or_path, "rb") as file:
return Grammar._load(file)
else:
Expand All @@ -282,36 +281,36 @@ def read_empty():

def read_byte():
c = f.read(1)
return c[0] if len(c) == 1 else None
return c if len(c) == 1 else None

def read_bool():
c = f.read(1)
return ord(c[0]) == 1 if len(c) == 1 else None
return c[0] == 1 if len(c) == 1 else None

def read_short():
c = f.read(2)
return ord(c[0]) + ord(c[1]) * 256 if len(c) == 2 else None
return c[0] + c[1] * 256 if len(c) == 2 else None

def read_string():
s = ""
s = bytes()
while True:
c = f.read(2)
if len(c) < 2 or (ord(c[0]) == 0 and ord(c[1]) == 0):
if len(c) < 2 or (c[0] == 0 and c[1] == 0):
break
s += c
return s.decode("utf-16le")

def read_value():
t = f.read(1)
if t == 'E':
if t == b'E':
return read_empty()
elif t == 'b':
elif t == b'b':
return read_byte()
elif t == 'B':
elif t == b'B':
return read_bool()
elif t == 'I':
elif t == b'I':
return read_short()
elif t == 'S':
elif t == b'S':
return read_string()
else:
return None
Expand All @@ -323,44 +322,44 @@ def read_value():
raise Exception("Unknown Header: " + header)

# read records
while read_byte() == 'M':
while read_byte() == b'M':
v = []
for x in range(read_short()):
v.append(read_value())
t = v[0] if len(v) > 0 else None
if t == 'p': # Property
if t == b'p': # Property
grm.properties[v[1]] = Property(v[1], v[2], v[3])
elif t == 't': # Table Counts
elif t == b't': # Table Counts
tablecounts = tuple(v[1:])
elif t == 'c': # Character Set Table
elif t == b'c': # Character Set Table
grm.charsets[v[1]] = CharacterSet(
v[1], v[2],
tuple([(v[i * 2 + 5], v[i * 2 + 6]) for i in range(v[3])]))
elif t == 'S': # Symbol Record
elif t == b'S': # Symbol Record
grm.symbols[v[1]] = Symbol(v[1], v[2], v[3])
elif t == 'g': # Group Record
elif t == b'g': # Group Record
grm.symbolgroups[v[1]] = SymbolGroup(
v[1], v[2], v[3], v[4], v[5], v[6], v[7], tuple(v[10:]))
elif t == 'R': # Production Record
elif t == b'R': # Production Record
grm.productions[v[1]] = Production(v[1], v[2], tuple(v[4:]))
elif t == 'I': # Initial States Record
elif t == b'I': # Initial States Record
grm.dfainit, grm.lalrinit = v[1:]
elif t == 'D': # DFA State Record
elif t == b'D': # DFA State Record
grm.dfastates[v[1]] = DFAState(
v[1],
v[3] if v[2] else None,
tuple([DFAEdge(v[i * 3 + 5], v[i * 3 + 6])
for i in range((len(v) - 5) / 3)]))
elif t == 'L': # LALR State Record
for i in range(int((len(v) - 5) / 3))]))
elif t == b'L': # LALR State Record
grm.lalrstates[v[1]] = LALRState(
v[1],
dict([(v[i * 4 + 3],
LALRAction(v[i * 4 + 3],
v[i * 4 + 4],
v[i * 4 + 5]))
for i in range((len(v) - 3) / 4)]))
for i in range(int((len(v) - 3) / 4))]))
else:
raise Exception("Unknown type: " + t)
raise Exception("Unknown type: " + t.decode("utf-16le"))

# check read counts
readcounts = (len(grm.symbols),
Expand Down
107 changes: 63 additions & 44 deletions pyauparser/lexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,51 @@
import sys
from . import grammar

class Buffer(object):
"""Encapsulation of the data buffer
"""

def __init__(self, file, is_unicode):
self.is_unicode = is_unicode
self.file = file
self.reset()

def reset(self):
self.buf = str() if self.is_unicode else bytes()
self.buf_cur = 0
self.buf_remain = 0

def fill(self):
if self.buf_cur >= 4096:
self.buf = self.buf[self.buf_cur:]
self.buf_cur = 0
self.buf += self.file.read(4096)
self.buf_remain = len(self.buf) - self.buf_cur

def peek_char(self, incr):
if incr < self.buf_remain:
return self.buf[self.buf_cur + incr]
else:
self.fill()
if incr < self.buf_remain:
return self.buf[self.buf_cur + incr]
else:
return None

def code(self, char):
return ord(char) if self.is_unicode else char

def get_data(self, data_size):
return self.buf[self.buf_cur:self.buf_cur + data_size]

def find_eol(self, start, size):
eol = '\n' if self.is_unicode else b'\n'
return self.buf.find(eol, start, self.buf_cur + size)

def seek_forward(self, value):
self.buf_cur += value
self.buf_remain -= value


class Token(object):
"""Token which is a result from Lexer
Expand Down Expand Up @@ -31,8 +76,7 @@ def load_file(self, file_or_path, encoding=None):
""" Load a file to lexer.
File_or_path could be file object or file name.
"""
if (isinstance(file_or_path, str) or
isinstance(file_or_path, str)):
if (isinstance(file_or_path, str)):
import codecs
if encoding:
self._load(codecs.open(file_or_path, encoding=encoding), True)
Expand All @@ -45,50 +89,34 @@ def load_string(self, s):
""" Load a string to lexer.
"""
import io
self._load(io.StringIO(s), s is str)
self._load(io.StringIO(s), True) # TODO: add load_bytes or similar

def _load(self, file, is_unicode):
self.file = file
self.is_unicode = is_unicode
self.buf = "" if is_unicode else str()
self.buf_cur = 0
self.buf_remain = 0
self.buffer = Buffer(file, is_unicode)
self.line = 1
self.column = 1
self.group_stack = []

def _load_buffer(self):
# shrink buffer
if self.buf_cur >= 4096:
self.buf = self.buf[self.buf_cur:]
self.buf_cur = 0
# read into buffer
self.buf += self.file.read(4096)
self.buf_remain = len(self.buf) - self.buf_cur

def _consume_buffer(self, n):
# update line, column position
start = self.buf_cur
start = self.buffer.buf_cur
new_line_i = -1
while True:
i = self.buf.find("\n", start, self.buf_cur + n)
i = self.buffer.find_eol(start, n)
if i != -1:
start = new_line_i = i + 1
self.line += 1
else:
if new_line_i == -1:
self.column += n
else:
self.column = 1 + self.buf_cur + n - new_line_i
self.column = 1 + self.buffer.buf_cur + n - new_line_i
break
# manipulate buffer
if n < self.buf_remain:
self.buf_cur += n
self.buf_remain -= n
if n < self.buffer.buf_remain:
self.buffer.seek_forward(n)
else:
self.buf = "" if self.is_unicode else str()
self.buf_cur = 0
self.buf_remain = 0
self.buffer.reset()

@property
def position(self):
Expand All @@ -102,18 +130,12 @@ def peek_token(self):
cur = 0
hit_symbol = None
while True:
if cur < self.buf_remain: # peek 1 char
c = self.buf[self.buf_cur + cur]
else:
self._load_buffer()
if cur < self.buf_remain:
c = self.buf[self.buf_cur + cur]
else:
break # if EOF
c = self.buffer.peek_char(cur)
if not c:
break
cur += 1

next_index = -1 # find next state
c_ord = ord(c)
c_ord = self.buffer.code(c)
for (r_min, r_max), target_index, target in state.edges_lookup:
if c_ord >= r_min and c_ord <= r_max:
next_index = target_index
Expand All @@ -134,14 +156,11 @@ def peek_token(self):
hit_cur = cur

if hit_symbol:
lexeme = self.buf[self.buf_cur:self.buf_cur + hit_cur]
return Token(hit_symbol, lexeme, self.position)
return Token(hit_symbol, self.buffer.get_data(hit_cur), self.position)
elif cur == 0:
return Token(self.grammar.symbol_EOF, "", self.position)
else:
if cur == 0:
return Token(self.grammar.symbol_EOF, "", self.position)
else:
lexeme = self.buf[self.buf_cur:self.buf_cur + cur]
return Token(self.grammar.symbol_Error, lexeme, self.position)
return Token(self.grammar.symbol_Error, self.buffer.get_data(cur), self.position)

def read_token(self):
""" Read next token and return it.
Expand Down Expand Up @@ -193,7 +212,7 @@ def read_token(self):
top[1] = top[1] + token.lexeme
self._consume_buffer(len(token.lexeme))
else:
top[1] = top[1] + token.lexeme[0]
top[1] = top[1] + token.lexeme[0:1]
self._consume_buffer(1)

def read_token_all(self):
Expand Down
2 changes: 1 addition & 1 deletion pyauparser/test/test_grammar.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def test_load(self):
self.assertEqual(len(self.grammar.lalrstates), 19)

def test_export(self):
with open("temp_operator_grammar.py", "wb") as f:
with open("temp_operator_grammar.py", "w") as f:
self.grammar.export_to_py(f)
import temp_operator_grammar
grammar2 = temp_operator_grammar.load()
Expand Down