Skip to content

Commit de90036

Browse files
committed
Result object (retained)
1 parent 1e04534 commit de90036

File tree

3 files changed

+64
-53
lines changed

3 files changed

+64
-53
lines changed

neo4j/__main__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ def main():
115115
for i, record in enumerate(records):
116116
has_results = True
117117
if i == 0:
118-
stdout.write("%s\r\n" % "\t".join(record.__fields__))
118+
stdout.write("%s\r\n" % "\t".join(record.__keys__))
119119
stdout.write("%s\r\n" % "\t".join(map(repr, record)))
120120
if has_results:
121121
stdout.write("\r\n")

neo4j/connection.py

Lines changed: 20 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -188,11 +188,7 @@ class Response(object):
188188

189189
def __init__(self, connection):
190190
self.connection = connection
191-
self.__complete = False
192-
193-
@property
194-
def complete(self):
195-
return self.__complete
191+
self.complete = False
196192

197193
def deliver(self, messages):
198194
for message in messages:
@@ -202,27 +198,32 @@ def deliver(self, messages):
202198
log_info("S: %s %s", message_names[signature], " ".join(map(repr, fields)))
203199
handler_name = "on_%s" % message_names[signature].lower()
204200
try:
205-
getattr(self, handler_name)(*fields)
201+
handler = getattr(self, handler_name)
206202
except AttributeError:
207203
pass
204+
else:
205+
handler(*fields)
208206
if signature in SUMMARY:
209-
self.__complete = True
207+
self.complete = True
210208
if signature == FAILURE:
211-
self.ack_failure()
209+
def on_failure(metadata):
210+
raise ProtocolError("Could not acknowledge failure")
212211

213-
def consume(self):
214-
self.connection.fetch_all(self)
212+
subscriber = Response(self)
213+
subscriber.on_failure = on_failure
214+
self.connection.append(ACK_FAILURE, response=subscriber)
215215

216-
def ack_failure(self):
217-
""" Queue an acknowledgement for a previous failure.
218-
"""
216+
def on_record(self, values):
217+
pass
219218

220-
def on_failure(metadata):
221-
raise ProtocolError("Could not acknowledge failure")
219+
def on_success(self, metadata):
220+
pass
221+
222+
def on_failure(self, metadata):
223+
pass
222224

223-
subscriber = Response(self)
224-
subscriber.on_failure = on_failure
225-
self.connection.append(ACK_FAILURE, response=subscriber)
225+
def on_ignored(self, metadata):
226+
pass
226227

227228

228229
class Connection(object):
@@ -251,7 +252,7 @@ def on_failure(metadata):
251252

252253
self.append(INIT, (user_agent,), response=response)
253254
self.send()
254-
response.consume()
255+
self.fetch_all(response)
255256

256257
def append(self, signature, fields=(), response=None):
257258
""" Add a message to the outgoing queue.

neo4j/session.py

Lines changed: 43 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,29 @@ def session(self, **config):
9696
return Session(connect(self.host, self.port, **config))
9797

9898

99+
class Result(list):
100+
101+
def __init__(self):
102+
super(Result, self).__init__()
103+
self.keys = None
104+
self.bench_test = None
105+
106+
def on_header(self, metadata):
107+
self.keys = metadata["fields"]
108+
if self.bench_test:
109+
self.bench_test.start_recv = perf_counter()
110+
111+
def on_record(self, values):
112+
self.append(Record(self.keys, tuple(map(hydrated, values))))
113+
114+
def on_footer(self, metadata):
115+
if self.bench_test:
116+
self.bench_test.end_recv = perf_counter()
117+
118+
def on_failure(self, metadata):
119+
raise CypherError(metadata)
120+
121+
99122
class Session(object):
100123
""" Logical session carried out over an established TCP connection.
101124
Sessions should generally be constructed using the :meth:`.Driver.session`
@@ -118,56 +141,43 @@ def run(self, statement, parameters=None):
118141
119142
:param statement: Cypher statement to execute
120143
:param parameters: dictionary of parameters
121-
:return: list of :class:`.Record` objects
144+
:return: Cypher result
145+
:rtype: :class:`.Result`
122146
"""
123-
t = BenchTest()
124147

125148
# Ensure the statement is a Unicode value
126149
if isinstance(statement, bytes):
127150
statement = statement.decode("UTF-8")
128151

129152
parameters = dict(parameters or {})
130153

154+
t = BenchTest()
131155
t.init = perf_counter()
132156

133-
fields = []
134-
records = []
135-
136-
def on_header(metadata):
137-
fields.extend(metadata["fields"])
138-
t.start_recv = perf_counter()
139-
140-
def on_record(values):
141-
records.append(Record(fields, tuple(map(hydrated, values))))
142-
143-
def on_footer(metadata):
144-
t.end_recv = perf_counter()
145-
146-
def on_failure(metadata):
147-
raise CypherError("FAILURE")
157+
result = Result()
158+
result.bench_test = t
148159

149160
run_response = Response(self.connection)
150-
run_response.on_success = on_header
151-
run_response.on_failure = on_failure
161+
run_response.on_success = result.on_header
162+
run_response.on_failure = result.on_failure
152163

153164
pull_all_response = Response(self.connection)
154-
pull_all_response.on_record = on_record
155-
pull_all_response.on_success = on_footer
156-
pull_all_response.on_failure = on_failure
165+
pull_all_response.on_record = result.on_record
166+
pull_all_response.on_success = result.on_footer
167+
pull_all_response.on_failure = result.on_failure
157168

158169
self.connection.append(RUN, (statement, parameters), response=run_response)
159170
self.connection.append(PULL_ALL, response=pull_all_response)
160171
t.start_send = perf_counter()
161172
self.connection.send()
162173
t.end_send = perf_counter()
163174

164-
run_response.consume()
165-
pull_all_response.consume()
166-
175+
self.connection.fetch_all(run_response)
176+
self.connection.fetch_all(pull_all_response)
167177
t.done = perf_counter()
168178
self.bench_tests.append(t)
169179

170-
return records
180+
return result
171181

172182
def close(self):
173183
""" Shut down and close the session.
@@ -257,14 +267,14 @@ class Record(object):
257267
``record["field"]``) or by attribute (``record.field``).
258268
"""
259269

260-
def __init__(self, fields, values):
261-
self.__fields__ = fields
270+
def __init__(self, keys, values):
271+
self.__keys__ = keys
262272
self.__values__ = values
263273

264274
def __repr__(self):
265275
values = self.__values__
266276
s = []
267-
for i, field in enumerate(self.__fields__):
277+
for i, field in enumerate(self.__keys__):
268278
s.append("%s=%r" % (field, values[i]))
269279
return "<Record %s>" % " ".join(s)
270280

@@ -278,20 +288,20 @@ def __ne__(self, other):
278288
return not self.__eq__(other)
279289

280290
def __len__(self):
281-
return self.__fields__.__len__()
291+
return self.__keys__.__len__()
282292

283293
def __getitem__(self, item):
284294
if isinstance(item, string):
285295
return getattr(self, item)
286296
elif isinstance(item, integer):
287-
return getattr(self, self.__fields__[item])
297+
return getattr(self, self.__keys__[item])
288298
else:
289299
raise TypeError(item)
290300

291301
def __getattr__(self, item):
292302
try:
293-
i = self.__fields__.index(item)
303+
i = self.__keys__.index(item)
294304
except ValueError:
295-
raise AttributeError("No field %r" % item)
305+
raise AttributeError("No key %r" % item)
296306
else:
297307
return self.__values__[i]

0 commit comments

Comments
 (0)