Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
259 changes: 134 additions & 125 deletions maflib/rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,115 +36,108 @@
import maflib.core
import maflib.util

def download(url, decompress_as=''):
@maflib.util.rule
def download(task):
"""Create a rule to download a file from given URL.

It stores the file to the target node. If ``decompress_as`` is given, then
it automatically decompresses the downloaded file.

:param url: URL string of the file to be downloaded.
:type url: ``str``
:param decompress_as: Decompression method of downloaded file. If an empty
.. describe:: Task parameters

- url (``str``): (**required**) URL string of the file to be downloaded.
- decompress_as(``str``): Decompression method of downloaded file. If an empty
string is given, then this function does not do decompression.
``'bz2'``, ``'gz'`` or ``'zip'`` is available.
:return: A rule.
:rtype: :py:class:`maflib.core.Rule`

"""
def body(task):
if decompress_as != '':
t = tempfile.NamedTemporaryFile()
urlretrieve(url, t.name)
_decompress(t.name, task.outputs[0].abspath(), decompress_as)
else:
urlretrieve(url, task.outputs[0].abspath())

return maflib.core.Rule(fun=body, dependson=[download, url])
url = task.parameter['url']
decompress_as = task.parameter.get('decompress_as', '')

if decompress_as != '':
t = tempfile.NamedTemporaryFile()
urlretrieve(url, t.name)
_decompress(t.name, task.outputs[0].abspath(), decompress_as)
else:
urlretrieve(url, task.outputs[0].abspath())


def decompress(filetype='auto'):
@maflib.util.rule
def decompress(task):
"""A rule to decompress an input file.

:param filetype: Type of compressed file. Following values are available.

.. describe:: Task parameters

- filetype (``str``): Type of compressed file. Following values are available.
- ``'auto'``: Use automatically detected type from the extension of the
input file name.
input file name (default).
- ``'bz2'``: bzip2 file.
- ``'gz'``: gzip file.
- ``'zip'``: zip file.
:type filetype: ``str``
:return: A rule.
:rtype: :py:class:`maflib.core.Rule`

"""
def body(task):
ft = filetype
if ft == 'auto':
ft = os.path.splitext(task.inputs[0].abspath())[1][1:]
ft = task.parameter.get('filetype', 'auto')

res = _decompress(
task.inputs[0].abspath(), task.outputs[0].abspath(), ft)
if not res:
raise Exception(
"Filetype %s is not supported in decompress." % ft)
if ft == 'auto':
ft = os.path.splitext(task.inputs[0].abspath())[1][1:]

return maflib.core.Rule(fun=body, dependson=[decompress, filetype])
res = _decompress(
task.inputs[0].abspath(), task.outputs[0].abspath(), ft)
if not res:
raise Exception(
"Filetype %s is not supported in decompress." % ft)


def max(key):
@maflib.util.json_aggregator
def max(values, outpath, parameter):
"""Creates an aggregator to select the max value of given key.

The created aggregator chooses the result with the maximum value of
``key``, and writes the JSON object to the output node.

:param key: A key to be used for selection of maximum value.
:type key: ``str``
:return: An aggregator.
:rtype: :py:class:`maflib.core.Rule`
.. describe:: Task parameters

- key (``str``): (**required**) A key to be used for selection of maximum value.

"""
@maflib.util.json_aggregator
def body(values, outpath, parameter):
if len(values) == 0:
return json.dumps({})
key = parameter['key']

max_value = values[0][key]
argmax = values[0]
for value in values[1:]:
if max_value < value[key]:
max_value = value[key]
argmax = value
return argmax
if len(values) == 0:
return json.dumps({})

return maflib.core.Rule(fun=body, dependson=[max, key])
max_value = values[0][key]
argmax = values[0]
for value in values[1:]:
if max_value < value[key]:
max_value = value[key]
argmax = value
return argmax


def min(key):
@maflib.util.json_aggregator
def min(values, outpath, parameter):
"""Creates an aggregator to select the minimum value of given key.

The created aggregator chooses the result with the minimum value of
``key``, and writes the JSON object to the output node.

:param key: A key to be used for selection of minimum value.
:type key: ``str``
:return: An aggregator.
:rtype: :py:class:`maflib.core.Rule`
.. describe:: Task parameters

"""
@maflib.util.json_aggregator
def body(values, outpath, parameter):
if len(values) == 0:
return json.dumps({})
- key (``str``): (**required**) A key to be used for selection of minimum value.

min_value = values[0][key]
argmin = values[0]
for value in values[1:]:
if min_value > value[key]:
min_value = value[key]
argmin = value
return argmin
"""
key = parameter['key']

if len(values) == 0:
return json.dumps({})

return maflib.core.Rule(fun=body, dependson=[min, key])
min_value = values[0][key]
argmin = values[0]
for value in values[1:]:
if min_value > value[key]:
min_value = value[key]
argmin = value
return argmin


@maflib.util.json_aggregator
Expand All @@ -167,6 +160,7 @@ def average(values, output, parameter):
return scheme


@maflib.util.rule
def convert_libsvm_accuracy(task):
"""Rule that converts message output by svm-predict into json file.

Expand All @@ -183,6 +177,7 @@ def convert_libsvm_accuracy(task):
task.outputs[0].write(json.dumps(j))


@maflib.util.rule
def create_label_result_libsvm(task):
"""TODO(noji) write document."""
predict_f = task.inputs[0].abspath()
Expand All @@ -200,6 +195,7 @@ def create_label_result_libsvm(task):
task.outputs[0].write(json.dumps(instances))


@maflib.util.rule
def calculate_stats_multiclass_classification(task):
"""Calculates various performance measures for multi-class classification.

Expand Down Expand Up @@ -341,7 +337,8 @@ def _macro_average(values):
task.outputs[0].write(json.dumps(results))


def segment_by_line(num_folds, parameter_name='fold'):
@maflib.util.rule
def segment_by_line(task):
"""Creates a rule that splits a line-by-line dataset to the k-th fold train
and validation subsets for n-fold cross validation.

Expand All @@ -360,66 +357,79 @@ def segment_by_line(num_folds, parameter_name='fold'):
parameter name is specified by ``parameter_name``. The index must be a
non-negative integer less than ``num_folds``.

:param num_folds: Number of folds for splitting. Inverse of this value is
the ratio of validation set size compared to the input dataset size. As
noted above, the fold parameter must be less than ``num_folds``.
:param parameter_name: Name of the parameter indicating the number of
folds.
:return: A rule.
:rtype: ``function``
.. describe:: Task parameters

- num_folds: (**required**) Number of folds for splitting. Inverse of this
value is the ratio of validation set size compared to the input dataset
size. As noted above, the fold parameter must be less than
``num_folds``.
- parameter_name: Name of the parameter indicating the number of folds.

"""
def body(task):
source = open(task.inputs[0].abspath())
num_lines = 0
for line in source: num_lines += 1
source.seek(0)

base = num_lines / num_folds
n = int(task.env[parameter_name])
test_begin = base * n
test_end = base * (n + 1)

with open(task.outputs[0].abspath(), 'w') as train,\
open(task.outputs[1].abspath(), 'w') as test:
i = 0
for line in source:
if i < test_begin or i >= test_end:
# in train
train.write(line)
else:
test.write(line)
i += 1
source.close()
return body

def segment_without_label_bias(weights, extract_label=(lambda line: line[:line.find(' ')])):
"""Segments an example per line data into k-fold where k is the length of param weights.
num_folds = task.parameter['num_folds']
parameter_name = task.parameter.get('parameter_name', 'fold')

source = open(task.inputs[0].abspath())
num_lines = 0
for line in source: num_lines += 1
source.seek(0)

base = num_lines / num_folds
n = int(task.env[parameter_name])
test_begin = base * n
test_end = base * (n + 1)

with open(task.outputs[0].abspath(), 'w') as train,\
open(task.outputs[1].abspath(), 'w') as test:
i = 0
for line in source:
if i < test_begin or i >= test_end:
# in train
train.write(line)
else:
test.write(line)
i += 1
source.close()

@maflib.util.rule
def segment_without_label_bias(task):
"""Segments an example per line data into k-fold where k is the length of
param weights.

This method consider the label-bias when segmentation:
In machine learning experiments, we often want to prepare training or testing examples
in equal proportions for each label for the correct evaluation.
``weights`` specifies the proportion of examples in the k-th fold for each label.
In machine learning experiments, we often want to prepare training or
testing examples in equal proportions for each label for the correct
evaluation. ``weights`` specifies the proportion of examples in the k-th
fold for each label.

A typical usage of this task is as follows:

.. code-block:: py

exp(source='news20.scale',
target='train dev test',
rule=segment_without_label_bias([0.8, 0.1, 0.1]))
rule=segment_without_label_bias(weights=[0.8, 0.1, 0.1]))

This exp segment data news20.scale into 3-fold for train/develop/test.
For each label, train contains 80% of the examples of that label, while dev/test contains
10% of examples of the one.
For each label, train contains 80% of the examples of that label, while
dev/test contains 10% of examples of the one.

The input is assumed to be the format of an example per line, such as libsvm
or vowpal format. The param ``extract_label`` specifies the way to extract
the label from each line, so you can handle other format by customizing this
function as far as it follows the one example per line format.

The input is assumed to be the format of an example per line, such as libsvm or vowpal format.
The param ``extract_label`` specifies the way to extract the label from each line, so you can handle other format by customizing this function as far as it follows the one example per line format.
.. describe:: Task parameters

:param weights: list of floats specifing the weight by which data are segmented
:param extract_label: function extracting the label from an input line
- weights: (**required**) list of floats specifing the weight by which data
are segmented
- extract_label:
:param weights: function extracting the label from an input line
(default = lambda line: line[:line.find(' ')])

"""
weights = task.parameter['weights']
extract_label = task.parameter.get('extract_label', (lambda line: line[:line.find(' ')]))

def _segment_data_with_weights(data):
normalized = map(lambda w: w / sum(weights), weights)
Expand All @@ -432,20 +442,19 @@ def _segment_data_with_weights(data):
endpoints = [0] + list(map(lambda w: int(len(data) * w), accumulate))
return [data[endpoints[i]:endpoints[i + 1]] for i in range(len(endpoints) - 1)]

def body(task):
if len(weights) != len(task.outputs):
raise maflib.core.InvalidMafArgumentException("lengths of weights must be the same as the number of target")

label2examples = collections.defaultdict(list)
for line in open(task.inputs[0].abspath()): label2examples[extract_label(line)].append(line)
label2segmented_examples = dict([(k, _segment_data_with_weights(v)) \
for k, v in label2examples.items()])
for i, o in enumerate(task.outputs):
with open(o.abspath(), 'w') as f:
for examples in label2segmented_examples.values():
for line in examples[i]: f.write(line)
return 0
return maflib.core.Rule(body, dependson=[segment_without_label_bias])
if len(weights) != len(task.outputs):
raise maflib.core.InvalidMafArgumentException("lengths of weights must be the same as the number of target")

label2examples = collections.defaultdict(list)
for line in open(task.inputs[0].abspath()): label2examples[extract_label(line)].append(line)
label2segmented_examples = dict([(k, _segment_data_with_weights(v)) \
for k, v in label2examples.items()])
for i, o in enumerate(task.outputs):
with open(o.abspath(), 'w') as f:
for examples in label2segmented_examples.values():
for line in examples[i]: f.write(line)
return 0


def _decompress(srcpath, dstpath, filetype):
if filetype == 'bz2':
Expand Down
7 changes: 4 additions & 3 deletions samples/liblinear/wscript
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,10 @@ def build(exp):
website = 'http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/multiclass/'

exp(target='news20.scale',
rule=maflib.rules.download(website+'/news20.scale.bz2', 'bz2'))
rule=maflib.rules.download(url=website+'/news20.scale.bz2', decompress_as='bz2'))

exp(target='news20.t.scale',
rule=maflib.rules.download(website+'/news20.t.scale.bz2', 'bz2'))
rule=maflib.rules.download(url=website+'/news20.t.scale.bz2', decompress_as='bz2'))

# Train with various parameters.
exp(source='news20.scale',
Expand All @@ -71,7 +72,7 @@ def build(exp):
exp(source='result',
target='max_accuracy',
aggregate_by='B',
rule=maflib.rules.max('accuracy'))
rule=maflib.rules.max(key='accuracy'))

# Plot a line chart.
exp(source='max_accuracy',
Expand Down
Loading