diff --git a/timeseries-streaming/timeseries-python-applications/README.MD b/timeseries-streaming/timeseries-python-applications/README.MD index 6e1339c8..16695528 100644 --- a/timeseries-streaming/timeseries-python-applications/README.MD +++ b/timeseries-streaming/timeseries-python-applications/README.MD @@ -75,7 +75,7 @@ That job will generate 86400 data points, which will be aggregated to 43200 exam Once that is done, change the information in the [config.py](ml_pipeline_examples/sin_wave_example/config.py) to match your local env. Run the command with the virtual-env activated: ``` -python ml_pipeline_examples/sin_wave_example/timeseries_local_simple_data.py +python ml_pipeline_examples/sin_wave_example/training/timeseries_local_sin_wave.py ``` This will output a serving_model_dir and a tf_transform_graph under the location you specified for ```PIPELINE_ROOT``` in the config.py file. With this you can now follow the rest of the steps outlines in Option 1 but using your own model. diff --git a/timeseries-streaming/timeseries-python-applications/ml_pipeline/timeseries/encoder_decoder/encoder_decoder_run_fn.py b/timeseries-streaming/timeseries-python-applications/ml_pipeline/timeseries/encoder_decoder/encoder_decoder_run_fn.py index 11dae284..7a289105 100644 --- a/timeseries-streaming/timeseries-python-applications/ml_pipeline/timeseries/encoder_decoder/encoder_decoder_run_fn.py +++ b/timeseries-streaming/timeseries-python-applications/ml_pipeline/timeseries/encoder_decoder/encoder_decoder_run_fn.py @@ -16,7 +16,7 @@ from typing import Text import tensorflow as tf import tensorflow_transform as tft -import timeseries.encoder_decoder.encoder_decoder_model as encoder_decoder_model +import ml_pipeline.timeseries.encoder_decoder.encoder_decoder_model as encoder_decoder_model from tfx.components.trainer.executor import TrainerFnArgs @@ -69,7 +69,7 @@ def serve_tf_examples_fn(serialized_tf_examples): serialized_tf_examples, feature_spec) transformed_features = create_training_data( model.tft_layer(parsed_features)) - return model(transformed_features) + return model(transformed_features), transformed_features return serve_tf_examples_fn diff --git a/timeseries-streaming/timeseries-python-applications/ml_pipeline/timeseries/encoder_decoder/transforms/process_encdec_inf_rtn.py b/timeseries-streaming/timeseries-python-applications/ml_pipeline/timeseries/encoder_decoder/transforms/process_encdec_inf_rtn.py index ffe0aafe..bea89970 100644 --- a/timeseries-streaming/timeseries-python-applications/ml_pipeline/timeseries/encoder_decoder/transforms/process_encdec_inf_rtn.py +++ b/timeseries-streaming/timeseries-python-applications/ml_pipeline/timeseries/encoder_decoder/transforms/process_encdec_inf_rtn.py @@ -42,32 +42,11 @@ def __init__(self, config: Dict[Text, Any], batching_size: int = 1000): self.batching_size = batching_size def setup(self): + # TODO switch to shared.py self.transform_output = tft.TFTransformOutput(self.tf_transform_graph_dir) - self.tft_layer = self.transform_output.transform_features_layer() + # self.tft_layer = self.transform_output.transform_features_layer() - def start_bundle(self): - self.batch: [WindowedValue] = [] - - def finish_bundle(self): - for prediction in self.process_result(self.batch): - yield prediction - - def process( - self, - element: prediction_log_pb2.PredictionLog, - window=beam.DoFn.WindowParam, - timestamp=beam.DoFn.TimestampParam): - if len(element.predict_log.request.inputs['examples'].string_val) > 1: - raise Exception("Only support single input string.") - - if len(self.batch) > self.batching_size: - for k in self.process_result(self.batch): - yield k - self.batch.clear() - else: - self.batch.append(WindowedValue(element, timestamp, [window])) - - def process_result(self, element: [WindowedValue]): + def process(self, element: prediction_log_pb2.PredictionLog): """ A input example has shape : [timesteps, all_features] all_features is not always == to features used in model. @@ -79,18 +58,22 @@ def process_result(self, element: [WindowedValue]): There are also Metadata fields which provide context """ - element_value = [k.value for k in element] + + if len(element.predict_log.request.inputs['examples'].string_val) > 1: + raise Exception("Only support single input string.") + processed_inputs = [] request_inputs = [] + request_inputs_tft = [] request_outputs = [] - for k in element_value: - request_inputs.append( - k.predict_log.request.inputs['examples'].string_val[0]) - request_outputs.append(k.predict_log.response.outputs['output_0']) + request_inputs.append( + element.predict_log.request.inputs['examples'].string_val[0]) + request_outputs.append(element.predict_log.response.outputs['output_0']) + request_inputs_tft.append(element.predict_log.response.outputs['output_1']) # The output of tf.io.parse_example is a set of feature tensors which - # have shape for non Metadata of [batch, + # have shape for non-metadata values of [batch, # timestep] batched_example = tf.io.parse_example( @@ -99,7 +82,9 @@ def process_result(self, element: [WindowedValue]): # The tft layer gives us two labels 'FLOAT32' and 'LABEL' which have # shape [batch, timestep, model_features] - inputs = self.tft_layer(batched_example) + # inputs = self.tft_layer(batched_example) + # TODO make work with batches + inputs = request_inputs_tft # Determine which of the features was used in the model feature_labels = timeseries_transform_utils.create_feature_list_from_list( @@ -115,7 +100,9 @@ def process_result(self, element: [WindowedValue]): batched_example['METADATA_SPAN_END_TS']).numpy() batch_pos = 0 - for batch_input in inputs['LABEL'].numpy(): + for batch_input in inputs: + # TODO Currently only supports model runinf of batch size 1 + batch_input = tf.make_ndarray(batch_input)[0] # Get the Metadata from the original request span_start_timestamp = datetime.fromtimestamp( metadata_span_start_timestamp[batch_pos][0] / 1000) @@ -150,9 +137,9 @@ def process_result(self, element: [WindowedValue]): label = (feature_labels[model_feature_pos]) # The num of features should == number of results - if len(feature_labels) != len(last_timestep_input): + if len(feature_labels) != len(last_timestep_output): raise ValueError(f'Features list {feature_labels} in config is ' - f'len {len(feature_labels)} which ' + f'length {len(feature_labels)} which ' f'does not match output length ' f'{len(last_timestep_output)} ' f' This normally is a result of using a configuration ' @@ -198,8 +185,9 @@ class CheckAnomalous(beam.DoFn): """ # TODO(BEAM-6158): Revert the workaround once we can pickle super() on py3. - def __init__(self, threshold: float = 0.05): + def __init__(self, output_false: bool = True, threshold: float = 0.05): beam.DoFn.__init__(self) + self.output_false = output_false self.threshold = threshold def process(self, element: Dict[Text, Any], *unused_args, **unused_kwargs): @@ -208,12 +196,17 @@ def process(self, element: Dict[Text, Any], *unused_args, **unused_kwargs): 'span_end_timestamp': element['span_end_timestamp'] } + anomaly_found = False + for key, value in element['feature_results'].items(): input_value = value['input_value'] output_value = value['output_value'] diff = abs(input_value - output_value) value.update({'diff': diff}) if not key.endswith('-TIMESTAMP'): - value.update({'anomaly': diff > self.threshold}) + if diff > self.threshold: + value.update({'anomaly': True}) + anomaly_found = True result.update({key: value}) - yield result + if anomaly_found: + yield result diff --git a/timeseries-streaming/timeseries-python-applications/ml_pipeline/timeseries/utils/timeseries_transform_utils_test.py b/timeseries-streaming/timeseries-python-applications/ml_pipeline/timeseries/utils/timeseries_transform_utils_test.py index 20e7c933..f03c6a2e 100644 --- a/timeseries-streaming/timeseries-python-applications/ml_pipeline/timeseries/utils/timeseries_transform_utils_test.py +++ b/timeseries-streaming/timeseries-python-applications/ml_pipeline/timeseries/utils/timeseries_transform_utils_test.py @@ -18,18 +18,17 @@ import numpy from datetime import datetime import tensorflow as tf -import timeseries.encoder_decoder.encoder_decoder_preprocessing as encoder_decoder_preprocessing +import ml_pipeline.timeseries.encoder_decoder.encoder_decoder_preprocessing as encoder_decoder_preprocessing from scipy.stats import stats from tensorflow_transform.beam import tft_unit from tensorflow_transform.beam import impl as beam_impl -from timeseries.utils import timeseries_transform_utils as ts_utils - +from ml_pipeline.timeseries.utils import timeseries_transform_utils as ts_utils class BeamImplTest(tft_unit.TransformTestCase): def setUp(self): tf.compat.v1.logging.info( - 'Starting test case: %s', self._testMethodName) + 'Starting test case: %s', self._testMethodName) self._context = beam_impl.Context(use_deep_copy_optimization=True) self._context.__enter__() @@ -43,15 +42,15 @@ def _SkipIfExternalEnvironmentAnd(self, predicate, reason): def testBasicType(self): config = { - 'timesteps': 3, - 'time_features': [], - 'features': ['a'], - 'enable_timestamp_features': False + 'timesteps': 3, + 'time_features': [], + 'features': ['a'], + 'enable_timestamp_features': False } input_data = [{'a': [1000.0, 2000.0, 3000.0]}] input_metadata = tft_unit.metadata_from_feature_spec( - {'a': tf.io.VarLenFeature(tf.float32)}) + {'a': tf.io.VarLenFeature(tf.float32)}) output = [[1000], [2000], [3000]] @@ -60,37 +59,37 @@ def testBasicType(self): expected_data = [{'Float32': output, 'LABEL': output}] expected_metadata = tft_unit.metadata_from_feature_spec({ - 'Float32': tf.io.FixedLenFeature([config['timesteps'], 1], - tf.float32), - 'LABEL': tf.io.FixedLenFeature([config['timesteps'], 1], - tf.float32) + 'Float32': tf.io.FixedLenFeature([config['timesteps'], 1], + tf.float32), + 'LABEL': tf.io.FixedLenFeature([config['timesteps'], 1], + tf.float32) }) preprocessing_fn = functools.partial( - encoder_decoder_preprocessing.preprocessing_fn, - custom_config=config) + encoder_decoder_preprocessing.preprocessing_fn, + custom_config=config) self.assertAnalyzeAndTransformResults( - input_data, - input_metadata, - preprocessing_fn, - expected_data, - expected_metadata) + input_data, + input_metadata, + preprocessing_fn, + expected_data, + expected_metadata) def testMixedType(self): config = { - 'timesteps': 3, - 'time_features': ['MINUTE', 'MONTH', 'HOUR', 'DAY', 'YEAR'], - 'features': ['a', 'b'], - 'enable_timestamp_features': False + 'timesteps': 3, + 'time_features': ['MINUTE', 'MONTH', 'HOUR', 'DAY', 'YEAR'], + 'features': ['a', 'b'], + 'enable_timestamp_features': False } input_data = [{ - 'a': [1000.0, 2000.0, 3000.0], 'b': [3000, 2000, 1000] + 'a': [1000.0, 2000.0, 3000.0], 'b': [3000, 2000, 1000] }] input_metadata = tft_unit.metadata_from_feature_spec({ - 'a': tf.io.VarLenFeature(tf.float32), - 'b': tf.io.VarLenFeature(tf.int64) + 'a': tf.io.VarLenFeature(tf.float32), + 'b': tf.io.VarLenFeature(tf.int64) }) output = [[1000.0, 3000.0], [2000.0, 2000.0], [3000.0, 1000.0]] @@ -100,30 +99,30 @@ def testMixedType(self): expected_data = [{'Float32': output, 'LABEL': output}] expected_metadata = tft_unit.metadata_from_feature_spec({ - 'Float32': tf.io.FixedLenFeature([config['timesteps'], 2], - tf.float32), - 'LABEL': tf.io.FixedLenFeature([config['timesteps'], 2], - tf.float32) + 'Float32': tf.io.FixedLenFeature([config['timesteps'], 2], + tf.float32), + 'LABEL': tf.io.FixedLenFeature([config['timesteps'], 2], + tf.float32) }) preprocessing_fn = functools.partial( - encoder_decoder_preprocessing.preprocessing_fn, - custom_config=config) + encoder_decoder_preprocessing.preprocessing_fn, + custom_config=config) self.assertAnalyzeAndTransformResults( - input_data, - input_metadata, - preprocessing_fn, - expected_data, - expected_metadata) + input_data, + input_metadata, + preprocessing_fn, + expected_data, + expected_metadata) def testWithTimeStamps(self): config = { - 'timesteps': 2, - 'time_features': ['MINUTE', 'MONTH', 'HOUR', 'DAY', 'YEAR'], - 'features': ['float32', 'foo_TIMESTAMP'], - 'enable_timestamp_features': True + 'timesteps': 2, + 'time_features': ['MINUTE', 'MONTH', 'HOUR', 'DAY', 'YEAR'], + 'features': ['float32', 'foo_TIMESTAMP'], + 'enable_timestamp_features': True } # The values will need to be different enough for the zscore not to nan @@ -131,12 +130,12 @@ def testWithTimeStamps(self): timestamp_2 = int(datetime(2001, 6, 15, 12, 30, 30).timestamp()) input_data = [{ - 'float32': [1000.0, 2000.0], - 'foo_TIMESTAMP': [timestamp_1 * 1000, timestamp_2 * 1000] + 'float32': [1000.0, 2000.0], + 'foo_TIMESTAMP': [timestamp_1 * 1000, timestamp_2 * 1000] }] input_metadata = tft_unit.metadata_from_feature_spec({ - 'float32': tf.io.VarLenFeature(tf.float32), - 'foo_TIMESTAMP': tf.io.VarLenFeature(tf.int64) + 'float32': tf.io.VarLenFeature(tf.float32), + 'foo_TIMESTAMP': tf.io.VarLenFeature(tf.int64) }) output_timestep_1 = self.create_transform_output(timestamp_1) @@ -160,22 +159,22 @@ def testWithTimeStamps(self): expected_data = [{'Float32': output, 'LABEL': output}] expected_metadata = tft_unit.metadata_from_feature_spec({ - 'Float32': tf.io.FixedLenFeature([config['timesteps'], 11], - tf.float32), - 'LABEL': tf.io.FixedLenFeature([config['timesteps'], 11], - tf.float32) + 'Float32': tf.io.FixedLenFeature([config['timesteps'], 11], + tf.float32), + 'LABEL': tf.io.FixedLenFeature([config['timesteps'], 11], + tf.float32) }) preprocessing_fn = functools.partial( - encoder_decoder_preprocessing.preprocessing_fn, - custom_config=config) + encoder_decoder_preprocessing.preprocessing_fn, + custom_config=config) self.assertAnalyzeAndTransformResults( - input_data, - input_metadata, - preprocessing_fn, - expected_data, - expected_metadata) + input_data, + input_metadata, + preprocessing_fn, + expected_data, + expected_metadata) def create_transform_output(self, timestamp: int) -> [float]: # Needs to be in lexical order @@ -195,16 +194,16 @@ def create_transform_output(self, timestamp: int) -> [float]: cos_year = math.cos(timestamp * (2.0 * math.pi / ts_utils.YEAR)) return [ - cos_day, - cos_hour, - cos_min, - cos_month, - cos_year, - sin_day, - sin_hour, - sin_min, - sin_month, - sin_year + cos_day, + cos_hour, + cos_min, + cos_month, + cos_year, + sin_day, + sin_hour, + sin_min, + sin_month, + sin_year ] diff --git a/timeseries-streaming/timeseries-python-applications/ml_pipeline_examples/sin_wave_example/inference/stream_inference.py b/timeseries-streaming/timeseries-python-applications/ml_pipeline_examples/sin_wave_example/inference/stream_inference.py index 0b688f66..7b619a12 100644 --- a/timeseries-streaming/timeseries-python-applications/ml_pipeline_examples/sin_wave_example/inference/stream_inference.py +++ b/timeseries-streaming/timeseries-python-applications/ml_pipeline_examples/sin_wave_example/inference/stream_inference.py @@ -60,7 +60,7 @@ def run(args, pipeline_args): 'model_config': config.MODEL_CONFIG })) | beam.ParDo( - process_encdec_inf_rtn.CheckAnomalous(threshold=0.7)) + process_encdec_inf_rtn.CheckAnomalous(output_false=False, threshold=0.7)) | beam.ParDo(print)) diff --git a/timeseries-streaming/timeseries-python-applications/ml_pipeline_examples/sin_wave_example/training/timeseries_local_sin_wave.py b/timeseries-streaming/timeseries-python-applications/ml_pipeline_examples/sin_wave_example/training/timeseries_local_sin_wave.py index 504b100c..80b8fad2 100644 --- a/timeseries-streaming/timeseries-python-applications/ml_pipeline_examples/sin_wave_example/training/timeseries_local_sin_wave.py +++ b/timeseries-streaming/timeseries-python-applications/ml_pipeline_examples/sin_wave_example/training/timeseries_local_sin_wave.py @@ -19,12 +19,12 @@ from tfx.orchestration.beam.beam_dag_runner import BeamDagRunner from tfx.proto import trainer_pb2 -import timeseries.pipeline_templates.timeseries_pipeline as pipeline +import ml_pipeline.timeseries.pipeline_templates.timeseries_pipeline as pipeline from tfx.orchestration import metadata from absl import logging import ml_pipeline_examples.sin_wave_example.config as config -from timeseries.utils import timeseries_transform_utils +from ml_pipeline.timeseries.utils import timeseries_transform_utils def run(): @@ -39,9 +39,9 @@ def run(): tfx_pipeline = pipeline.create_pipeline( pipeline_name=config.PIPELINE_NAME, enable_cache=False, - run_fn='timeseries.encoder_decoder.encoder_decoder_run_fn.run_fn', + run_fn='ml_pipeline.timeseries.encoder_decoder.encoder_decoder_run_fn.run_fn', preprocessing_fn= - 'timeseries.encoder_decoder.encoder_decoder_preprocessing.preprocessing_fn', + 'ml_pipeline.timeseries.encoder_decoder.encoder_decoder_preprocessing.preprocessing_fn', data_path=config.SYNTHETIC_DATASET['local-raw'], pipeline_root=config.LOCAL_PIPELINE_ROOT, serving_model_dir=join(config.LOCAL_PIPELINE_ROOT, os.pathsep),