Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ classifiers = [
"Topic :: Software Development :: Libraries :: Application Frameworks",
]

dependencies = ["deprecation", "python-dotenv"]
dependencies = ["python-dotenv"]
optional-dependencies = { compat = ["six"] }

[dependency-groups]
test = ["tox"]
Expand Down
212 changes: 2 additions & 210 deletions splunklib/results.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,9 @@
print(f"Results are a preview: {reader.is_preview}")
"""

from io import BufferedReader, BytesIO


import xml.etree.ElementTree as et

from collections import OrderedDict
from io import BufferedReader
from json import loads as json_loads

__all__ = ["ResultsReader", "Message", "JSONResultsReader"]

import deprecation


class Message:
"""This class represents informational messages that Splunk interleaves in the results stream.
Expand All @@ -70,205 +61,6 @@ def __hash__(self):
return hash((self.type, self.message))


class _ConcatenatedStream:
"""Lazily concatenate zero or more streams into a stream.

As you read from the concatenated stream, you get characters from
each stream passed to ``_ConcatenatedStream``, in order.

**Example**::

from StringIO import StringIO
s = _ConcatenatedStream(StringIO("abc"), StringIO("def"))
assert s.read() == "abcdef"
"""

def __init__(self, *streams):
self.streams = list(streams)

def read(self, n=None):
"""Read at most *n* characters from this stream.

If *n* is ``None``, return all available characters.
"""
response = b""
while len(self.streams) > 0 and (n is None or n > 0):
txt = self.streams[0].read(n)
response += txt
if n is not None:
n -= len(txt)
if n is None or n > 0:
del self.streams[0]
return response


class _XMLDTDFilter:
"""Lazily remove all XML DTDs from a stream.

All substrings matching the regular expression <?[^>]*> are
removed in their entirety from the stream. No regular expressions
are used, however, so everything still streams properly.

**Example**::

from StringIO import StringIO
s = _XMLDTDFilter("<?xml abcd><element><?xml ...></element>")
assert s.read() == "<element></element>"
"""

def __init__(self, stream):
self.stream = stream

def read(self, n=None):
"""Read at most *n* characters from this stream.

If *n* is ``None``, return all available characters.
"""
response = b""
while n is None or n > 0:
c = self.stream.read(1)
if c == b"":
break
if c == b"<":
c += self.stream.read(1)
if c == b"<?":
while True:
q = self.stream.read(1)
if q == b">":
break
else:
response += c
if n is not None:
n -= len(c)
else:
response += c
if n is not None:
n -= 1
return response


@deprecation.deprecated(
details="Use the JSONResultsReader function instead in conjunction with the 'output_mode' query param set to 'json'"
)
class ResultsReader:
"""This class returns dictionaries and Splunk messages from an XML results
stream.

``ResultsReader`` is iterable, and returns a ``dict`` for results, or a
:class:`Message` object for Splunk messages. This class has one field,
``is_preview``, which is ``True`` when the results are a preview from a
running search, or ``False`` when the results are from a completed search.

This function has no network activity other than what is implicit in the
stream it operates on.

:param `stream`: The stream to read from (any object that supports
``.read()``).

**Example**::

import results
response = ... # the body of an HTTP response
reader = results.ResultsReader(response)
for result in reader:
if isinstance(result, dict):
print(f"Result: {result}")
elif isinstance(result, results.Message):
print(f"Message: {result}")
print(f"is_preview = {reader.is_preview}")
"""

# Be sure to update the docstrings of client.Jobs.oneshot,
# client.Job.results_preview and client.Job.results to match any
# changes made to ResultsReader.
#
# This wouldn't be a class, just the _parse_results function below,
# except that you cannot get the current generator inside the
# function creating that generator. Thus it's all wrapped up for
# the sake of one field.
def __init__(self, stream):
# The search/jobs/exports endpoint, when run with
# earliest_time=rt and latest_time=rt streams a sequence of
# XML documents, each containing a result, as opposed to one
# results element containing lots of results. Python's XML
# parsers are broken, and instead of reading one full document
# and returning the stream that follows untouched, they
# destroy the stream and throw an error. To get around this,
# we remove all the DTD definitions inline, then wrap the
# fragments in a fiction <doc> element to make the parser happy.
stream = _XMLDTDFilter(stream)
stream = _ConcatenatedStream(BytesIO(b"<doc>"), stream, BytesIO(b"</doc>"))
self.is_preview = None
self._gen = self._parse_results(stream)

def __iter__(self):
return self

def __next__(self):
return next(self._gen)

def _parse_results(self, stream):
"""Parse results and messages out of *stream*."""
result = None
values = None
try:
for event, elem in et.iterparse(stream, events=("start", "end")):
if elem.tag == "results" and event == "start":
# The wrapper element is a <results preview="0|1">. We
# don't care about it except to tell is whether these
# are preview results, or the final results from the
# search.
is_preview = elem.attrib["preview"] == "1"
self.is_preview = is_preview
if elem.tag == "result":
if event == "start":
result = OrderedDict()
elif event == "end":
yield result
result = None
elem.clear()

elif elem.tag == "field" and result is not None:
# We need the 'result is not None' check because
# 'field' is also the element name in the <meta>
# header that gives field order, which is not what we
# want at all.
if event == "start":
values = []
elif event == "end":
field_name = elem.attrib["k"]
if len(values) == 1:
result[field_name] = values[0]
else:
result[field_name] = values
# Calling .clear() is necessary to let the
# element be garbage collected. Otherwise
# arbitrarily large results sets will use
# arbitrarily large memory intead of
# streaming.
elem.clear()

elif elem.tag in ("text", "v") and event == "end":
text = "".join(elem.itertext())
values.append(text)
elem.clear()

elif elem.tag == "msg":
if event == "start":
msg_type = elem.attrib["type"]
elif event == "end":
text = elem.text if elem.text is not None else ""
yield Message(msg_type, text)
elem.clear()
except SyntaxError as pe:
# This is here to handle the same incorrect return from
# splunk that is described in __init__.
if "no element found" in pe.msg:
return
else:
raise


class JSONResultsReader:
"""This class returns dictionaries and Splunk messages from a JSON results
stream.
Expand Down Expand Up @@ -303,7 +95,7 @@ class JSONResultsReader:
# except that you cannot get the current generator inside the
# function creating that generator. Thus it's all wrapped up for
# the sake of one field.
def __init__(self, stream):
def __init__(self, stream) -> None:
# The search/jobs/exports endpoint, when run with
# earliest_time=rt and latest_time=rt, output_mode=json, streams a sequence of
# JSON documents, each containing a result, as opposed to one
Expand Down
Loading