Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ That job will generate 86400 data points, which will be aggregated to 43200 exam
Once that is done, change the information in the [config.py](ml_pipeline_examples/sin_wave_example/config.py) to match your local env.
Run the command with the virtual-env activated:
```
python ml_pipeline_examples/sin_wave_example/timeseries_local_simple_data.py
python ml_pipeline_examples/sin_wave_example/training/timeseries_local_sin_wave.py
```

This will output a serving_model_dir and a tf_transform_graph under the location you specified for ```PIPELINE_ROOT``` in the config.py file. With this you can now follow the rest of the steps outlines in Option 1 but using your own model.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from typing import Text
import tensorflow as tf
import tensorflow_transform as tft
import timeseries.encoder_decoder.encoder_decoder_model as encoder_decoder_model
import ml_pipeline.timeseries.encoder_decoder.encoder_decoder_model as encoder_decoder_model

from tfx.components.trainer.executor import TrainerFnArgs

Expand Down Expand Up @@ -69,7 +69,7 @@ def serve_tf_examples_fn(serialized_tf_examples):
serialized_tf_examples, feature_spec)
transformed_features = create_training_data(
model.tft_layer(parsed_features))
return model(transformed_features)
return model(transformed_features), transformed_features

return serve_tf_examples_fn

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,32 +42,11 @@ def __init__(self, config: Dict[Text, Any], batching_size: int = 1000):
self.batching_size = batching_size

def setup(self):
# TODO switch to shared.py
self.transform_output = tft.TFTransformOutput(self.tf_transform_graph_dir)
self.tft_layer = self.transform_output.transform_features_layer()
# self.tft_layer = self.transform_output.transform_features_layer()

def start_bundle(self):
self.batch: [WindowedValue] = []

def finish_bundle(self):
for prediction in self.process_result(self.batch):
yield prediction

def process(
self,
element: prediction_log_pb2.PredictionLog,
window=beam.DoFn.WindowParam,
timestamp=beam.DoFn.TimestampParam):
if len(element.predict_log.request.inputs['examples'].string_val) > 1:
raise Exception("Only support single input string.")

if len(self.batch) > self.batching_size:
for k in self.process_result(self.batch):
yield k
self.batch.clear()
else:
self.batch.append(WindowedValue(element, timestamp, [window]))

def process_result(self, element: [WindowedValue]):
def process(self, element: prediction_log_pb2.PredictionLog):
"""
A input example has shape : [timesteps, all_features] all_features is
not always == to features used in model.
Expand All @@ -79,18 +58,22 @@ def process_result(self, element: [WindowedValue]):
There are also Metadata fields which provide context

"""
element_value = [k.value for k in element]

if len(element.predict_log.request.inputs['examples'].string_val) > 1:
raise Exception("Only support single input string.")

processed_inputs = []
request_inputs = []
request_inputs_tft = []
request_outputs = []

for k in element_value:
request_inputs.append(
k.predict_log.request.inputs['examples'].string_val[0])
request_outputs.append(k.predict_log.response.outputs['output_0'])
request_inputs.append(
element.predict_log.request.inputs['examples'].string_val[0])
request_outputs.append(element.predict_log.response.outputs['output_0'])
request_inputs_tft.append(element.predict_log.response.outputs['output_1'])

# The output of tf.io.parse_example is a set of feature tensors which
# have shape for non Metadata of [batch,
# have shape for non-metadata values of [batch,
# timestep]

batched_example = tf.io.parse_example(
Expand All @@ -99,7 +82,9 @@ def process_result(self, element: [WindowedValue]):
# The tft layer gives us two labels 'FLOAT32' and 'LABEL' which have
# shape [batch, timestep, model_features]

inputs = self.tft_layer(batched_example)
# inputs = self.tft_layer(batched_example)
# TODO make work with batches
inputs = request_inputs_tft

# Determine which of the features was used in the model
feature_labels = timeseries_transform_utils.create_feature_list_from_list(
Expand All @@ -115,7 +100,9 @@ def process_result(self, element: [WindowedValue]):
batched_example['METADATA_SPAN_END_TS']).numpy()

batch_pos = 0
for batch_input in inputs['LABEL'].numpy():
for batch_input in inputs:
# TODO Currently only supports model runinf of batch size 1
batch_input = tf.make_ndarray(batch_input)[0]
# Get the Metadata from the original request
span_start_timestamp = datetime.fromtimestamp(
metadata_span_start_timestamp[batch_pos][0] / 1000)
Expand Down Expand Up @@ -150,9 +137,9 @@ def process_result(self, element: [WindowedValue]):
label = (feature_labels[model_feature_pos])

# The num of features should == number of results
if len(feature_labels) != len(last_timestep_input):
if len(feature_labels) != len(last_timestep_output):
raise ValueError(f'Features list {feature_labels} in config is '
f'len {len(feature_labels)} which '
f'length {len(feature_labels)} which '
f'does not match output length '
f'{len(last_timestep_output)} '
f' This normally is a result of using a configuration '
Expand Down Expand Up @@ -198,8 +185,9 @@ class CheckAnomalous(beam.DoFn):
"""

# TODO(BEAM-6158): Revert the workaround once we can pickle super() on py3.
def __init__(self, threshold: float = 0.05):
def __init__(self, output_false: bool = True, threshold: float = 0.05):
beam.DoFn.__init__(self)
self.output_false = output_false
self.threshold = threshold

def process(self, element: Dict[Text, Any], *unused_args, **unused_kwargs):
Expand All @@ -208,12 +196,17 @@ def process(self, element: Dict[Text, Any], *unused_args, **unused_kwargs):
'span_end_timestamp': element['span_end_timestamp']
}

anomaly_found = False

for key, value in element['feature_results'].items():
input_value = value['input_value']
output_value = value['output_value']
diff = abs(input_value - output_value)
value.update({'diff': diff})
if not key.endswith('-TIMESTAMP'):
value.update({'anomaly': diff > self.threshold})
if diff > self.threshold:
value.update({'anomaly': True})
anomaly_found = True
result.update({key: value})
yield result
if anomaly_found:
yield result
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,17 @@
import numpy
from datetime import datetime
import tensorflow as tf
import timeseries.encoder_decoder.encoder_decoder_preprocessing as encoder_decoder_preprocessing
import ml_pipeline.timeseries.encoder_decoder.encoder_decoder_preprocessing as encoder_decoder_preprocessing
from scipy.stats import stats
from tensorflow_transform.beam import tft_unit
from tensorflow_transform.beam import impl as beam_impl
from timeseries.utils import timeseries_transform_utils as ts_utils

from ml_pipeline.timeseries.utils import timeseries_transform_utils as ts_utils


class BeamImplTest(tft_unit.TransformTestCase):
def setUp(self):
tf.compat.v1.logging.info(
'Starting test case: %s', self._testMethodName)
'Starting test case: %s', self._testMethodName)

self._context = beam_impl.Context(use_deep_copy_optimization=True)
self._context.__enter__()
Expand All @@ -43,15 +42,15 @@ def _SkipIfExternalEnvironmentAnd(self, predicate, reason):

def testBasicType(self):
config = {
'timesteps': 3,
'time_features': [],
'features': ['a'],
'enable_timestamp_features': False
'timesteps': 3,
'time_features': [],
'features': ['a'],
'enable_timestamp_features': False
}

input_data = [{'a': [1000.0, 2000.0, 3000.0]}]
input_metadata = tft_unit.metadata_from_feature_spec(
{'a': tf.io.VarLenFeature(tf.float32)})
{'a': tf.io.VarLenFeature(tf.float32)})

output = [[1000], [2000], [3000]]

Expand All @@ -60,37 +59,37 @@ def testBasicType(self):
expected_data = [{'Float32': output, 'LABEL': output}]

expected_metadata = tft_unit.metadata_from_feature_spec({
'Float32': tf.io.FixedLenFeature([config['timesteps'], 1],
tf.float32),
'LABEL': tf.io.FixedLenFeature([config['timesteps'], 1],
tf.float32)
'Float32': tf.io.FixedLenFeature([config['timesteps'], 1],
tf.float32),
'LABEL': tf.io.FixedLenFeature([config['timesteps'], 1],
tf.float32)
})

preprocessing_fn = functools.partial(
encoder_decoder_preprocessing.preprocessing_fn,
custom_config=config)
encoder_decoder_preprocessing.preprocessing_fn,
custom_config=config)

self.assertAnalyzeAndTransformResults(
input_data,
input_metadata,
preprocessing_fn,
expected_data,
expected_metadata)
input_data,
input_metadata,
preprocessing_fn,
expected_data,
expected_metadata)

def testMixedType(self):
config = {
'timesteps': 3,
'time_features': ['MINUTE', 'MONTH', 'HOUR', 'DAY', 'YEAR'],
'features': ['a', 'b'],
'enable_timestamp_features': False
'timesteps': 3,
'time_features': ['MINUTE', 'MONTH', 'HOUR', 'DAY', 'YEAR'],
'features': ['a', 'b'],
'enable_timestamp_features': False
}

input_data = [{
'a': [1000.0, 2000.0, 3000.0], 'b': [3000, 2000, 1000]
'a': [1000.0, 2000.0, 3000.0], 'b': [3000, 2000, 1000]
}]
input_metadata = tft_unit.metadata_from_feature_spec({
'a': tf.io.VarLenFeature(tf.float32),
'b': tf.io.VarLenFeature(tf.int64)
'a': tf.io.VarLenFeature(tf.float32),
'b': tf.io.VarLenFeature(tf.int64)
})

output = [[1000.0, 3000.0], [2000.0, 2000.0], [3000.0, 1000.0]]
Expand All @@ -100,43 +99,43 @@ def testMixedType(self):
expected_data = [{'Float32': output, 'LABEL': output}]

expected_metadata = tft_unit.metadata_from_feature_spec({
'Float32': tf.io.FixedLenFeature([config['timesteps'], 2],
tf.float32),
'LABEL': tf.io.FixedLenFeature([config['timesteps'], 2],
tf.float32)
'Float32': tf.io.FixedLenFeature([config['timesteps'], 2],
tf.float32),
'LABEL': tf.io.FixedLenFeature([config['timesteps'], 2],
tf.float32)
})

preprocessing_fn = functools.partial(
encoder_decoder_preprocessing.preprocessing_fn,
custom_config=config)
encoder_decoder_preprocessing.preprocessing_fn,
custom_config=config)

self.assertAnalyzeAndTransformResults(
input_data,
input_metadata,
preprocessing_fn,
expected_data,
expected_metadata)
input_data,
input_metadata,
preprocessing_fn,
expected_data,
expected_metadata)

def testWithTimeStamps(self):

config = {
'timesteps': 2,
'time_features': ['MINUTE', 'MONTH', 'HOUR', 'DAY', 'YEAR'],
'features': ['float32', 'foo_TIMESTAMP'],
'enable_timestamp_features': True
'timesteps': 2,
'time_features': ['MINUTE', 'MONTH', 'HOUR', 'DAY', 'YEAR'],
'features': ['float32', 'foo_TIMESTAMP'],
'enable_timestamp_features': True
}

# The values will need to be different enough for the zscore not to nan
timestamp_1 = int(datetime(2000, 1, 1, 0, 0, 0).timestamp())
timestamp_2 = int(datetime(2001, 6, 15, 12, 30, 30).timestamp())

input_data = [{
'float32': [1000.0, 2000.0],
'foo_TIMESTAMP': [timestamp_1 * 1000, timestamp_2 * 1000]
'float32': [1000.0, 2000.0],
'foo_TIMESTAMP': [timestamp_1 * 1000, timestamp_2 * 1000]
}]
input_metadata = tft_unit.metadata_from_feature_spec({
'float32': tf.io.VarLenFeature(tf.float32),
'foo_TIMESTAMP': tf.io.VarLenFeature(tf.int64)
'float32': tf.io.VarLenFeature(tf.float32),
'foo_TIMESTAMP': tf.io.VarLenFeature(tf.int64)
})

output_timestep_1 = self.create_transform_output(timestamp_1)
Expand All @@ -160,22 +159,22 @@ def testWithTimeStamps(self):
expected_data = [{'Float32': output, 'LABEL': output}]

expected_metadata = tft_unit.metadata_from_feature_spec({
'Float32': tf.io.FixedLenFeature([config['timesteps'], 11],
tf.float32),
'LABEL': tf.io.FixedLenFeature([config['timesteps'], 11],
tf.float32)
'Float32': tf.io.FixedLenFeature([config['timesteps'], 11],
tf.float32),
'LABEL': tf.io.FixedLenFeature([config['timesteps'], 11],
tf.float32)
})

preprocessing_fn = functools.partial(
encoder_decoder_preprocessing.preprocessing_fn,
custom_config=config)
encoder_decoder_preprocessing.preprocessing_fn,
custom_config=config)

self.assertAnalyzeAndTransformResults(
input_data,
input_metadata,
preprocessing_fn,
expected_data,
expected_metadata)
input_data,
input_metadata,
preprocessing_fn,
expected_data,
expected_metadata)

def create_transform_output(self, timestamp: int) -> [float]:
# Needs to be in lexical order
Expand All @@ -195,16 +194,16 @@ def create_transform_output(self, timestamp: int) -> [float]:
cos_year = math.cos(timestamp * (2.0 * math.pi / ts_utils.YEAR))

return [
cos_day,
cos_hour,
cos_min,
cos_month,
cos_year,
sin_day,
sin_hour,
sin_min,
sin_month,
sin_year
cos_day,
cos_hour,
cos_min,
cos_month,
cos_year,
sin_day,
sin_hour,
sin_min,
sin_month,
sin_year
]


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def run(args, pipeline_args):
'model_config': config.MODEL_CONFIG
}))
| beam.ParDo(
process_encdec_inf_rtn.CheckAnomalous(threshold=0.7))
process_encdec_inf_rtn.CheckAnomalous(output_false=False, threshold=0.7))
| beam.ParDo(print))


Expand Down
Loading