Skip to content
This repository was archived by the owner on May 12, 2021. It is now read-only.

Commit 8d06aa2

Browse files
committed
Fixed python implementation for Apache Apex
1 parent 712027a commit 8d06aa2

67 files changed

Lines changed: 4814 additions & 24 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,3 +12,6 @@ nb-configuration.xml
1212
hadoop.log
1313
site/
1414
.checkstyle
15+
*.pyc
16+
*.out
17+
examples/python/output

docs/python/main.md

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
#Developing Streaming Application in Python#
2+
3+
Currently we have exposed basic support for Stateless Support.
4+
5+
##Requirements:##
6+
* Python 2.7
7+
* py4j
8+
Please install py4j on your machine.
9+
```
10+
pip install py4j
11+
```
12+
13+
14+
Once you have pulled Apache Malhar project, go to python project and follow next steps:
15+
16+
* Compile all projects under Apache Malhar and make sure you have hadoop installed on local node.
17+
* Once compilation finish, go to python/script directory and launch ./pyshell
18+
* This will launch python shell. Now you can develop your application using python shell.
19+
20+
You can write simpler application using available apis as well provide custom functions written in python.
21+
22+
```
23+
24+
def filter_function(a):
25+
input_data=a.split(',')
26+
if float(input_data[2])> 100:
27+
return True
28+
return False
29+
30+
from pyapex import createApp
31+
a=createApp('python_app').from_kafka09('localhost:2181','test_topic') \
32+
.filter('filter_operator',filter_function) \
33+
.to_console(name='endConsole') \
34+
.launch(False)
35+
```
36+
37+
38+
Note: Currently developer need to ensure that required python dependencies are installed on Hadoop cluster.
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
Hadoop is the Elephant King!
2+
A yellow and elegant thing.
3+
He never forgets
4+
Useful data, or lets
5+
An extraneous element cling!
6+
A wonderful king is Hadoop.
7+
The elephant plays well with Sqoop.
8+
But what helps him to thrive
9+
Are Impala, and Hive,
10+
And HDFS in the group.
11+
Hadoop is an elegant fellow.
12+
An elephant gentle and mellow.
13+
He never gets mad,
14+
Or does anything bad,
15+
Because, at his core, he is yellow.
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
symbol,timestamp,open
2+
MSFT,2014-10-31T00:00:00-04:00,46.31618
3+
MSFT,2014-11-03T00:00:00-05:00,46.26685
4+
MSFT,2014-11-04T00:00:00-05:00,46.6714
5+
MSFT,2014-11-05T00:00:00-05:00,47.16475
6+
MSFT,2014-11-06T00:00:00-05:00,47.22396
7+
MSFT,2014-11-07T00:00:00-05:00,48.26987
8+
MSFT,2014-11-10T00:00:00-05:00,48.04292
9+
MSFT,2014-11-11T00:00:00-05:00,48.2008
10+
MSFT,2014-11-12T00:00:00-05:00,47.91465
11+
MSFT,2014-11-13T00:00:00-05:00,48.16133
12+
MSFT,2014-11-14T00:00:00-05:00,49.07897
13+
MSFT,2014-11-17T00:00:00-05:00,48.75336
14+
MSFT,2014-11-18T00:00:00-05:00,48.78283
15+
MSFT,2014-11-19T00:00:00-05:00,48.31615
16+
MSFT,2014-11-20T00:00:00-05:00,47.66082
17+
MSFT,2014-11-21T00:00:00-05:00,48.67361
18+
MSFT,2014-11-24T00:00:00-05:00,47.65089
19+
MSFT,2014-11-25T00:00:00-05:00,47.32322
20+
MSFT,2014-11-26T00:00:00-05:00,47.15442
21+
MSFT,2014-11-28T00:00:00-05:00,47.61117
22+
MSFT,2014-12-01T00:00:00-05:00,47.54167
23+
MSFT,2014-12-02T00:00:00-05:00,48.49488
24+
MSFT,2014-12-03T00:00:00-05:00,48.09771
25+
MSFT,2014-12-04T00:00:00-05:00,48.04806
26+
MSFT,2014-12-05T00:00:00-05:00,48.47502
27+
MSFT,2014-12-08T00:00:00-05:00,47.97855
28+
MSFT,2014-12-09T00:00:00-05:00,46.77711
29+
MSFT,2014-12-10T00:00:00-05:00,47.24379
30+
MSFT,2014-12-11T00:00:00-05:00,46.74732
31+
MSFT,2014-12-12T00:00:00-05:00,46.35014
32+
MSFT,2014-12-15T00:00:00-05:00,46.86647
33+
MSFT,2014-12-16T00:00:00-05:00,45.57566
34+
MSFT,2014-12-17T00:00:00-05:00,44.73166
35+
MSFT,2014-12-18T00:00:00-05:00,46.25085
36+
MSFT,2014-12-19T00:00:00-05:00,47.27357
37+
MSFT,2014-12-22T00:00:00-05:00,47.44237
38+
MSFT,2014-12-23T00:00:00-05:00,48.0282
39+
MSFT,2014-12-24T00:00:00-05:00,48.2963
40+
MSFT,2014-12-26T00:00:00-05:00,48.06792
41+
MSFT,2014-12-29T00:00:00-05:00,47.36294
42+
MSFT,2014-12-30T00:00:00-05:00,47.10477
43+
MSFT,2014-12-31T00:00:00-05:00,46.39979
44+
MSFT,2015-01-02T00:00:00-05:00,46.33028
45+
MSFT,2015-01-05T00:00:00-05:00,46.04234
46+
MSFT,2015-01-06T00:00:00-05:00,46.05227
47+
MSFT,2015-01-07T00:00:00-05:00,45.65509
48+
MSFT,2015-01-08T00:00:00-05:00,46.41965
49+
MSFT,2015-01-09T00:00:00-05:00,47.27357
50+
MSFT,2015-01-12T00:00:00-05:00,47.08492
51+
MSFT,2015-01-13T00:00:00-05:00,46.6381
52+
MSFT,2015-01-14T00:00:00-05:00,45.63523
53+
MSFT,2015-01-15T00:00:00-05:00,45.8934
54+
MSFT,2015-01-16T00:00:00-05:00,44.98983
55+
MSFT,2015-01-20T00:00:00-05:00,45.97283
56+
MSFT,2015-01-21T00:00:00-05:00,45.61537
57+
MSFT,2015-01-22T00:00:00-05:00,46.16149
58+
MSFT,2015-01-23T00:00:00-05:00,47.02534
59+
MSFT,2015-01-26T00:00:00-05:00,46.66788
60+
MSFT,2015-01-27T00:00:00-05:00,42.6465
61+
MSFT,2015-01-28T00:00:00-05:00,42.43799
62+
MSFT,2015-01-29T00:00:00-05:00,40.64078
63+
MSFT,2015-01-30T00:00:00-05:00,41.25639
64+
MSFT,2015-02-02T00:00:00-05:00,40.30318
65+
MSFT,2015-02-03T00:00:00-05:00,41.33583
66+
MSFT,2015-02-04T00:00:00-05:00,41.64364
67+
MSFT,2015-02-05T00:00:00-05:00,41.92166
68+
MSFT,2015-02-06T00:00:00-05:00,42.37841
69+
MSFT,2015-02-09T00:00:00-05:00,41.94152
70+
MSFT,2015-02-10T00:00:00-05:00,42.43799
71+
MSFT,2015-02-11T00:00:00-05:00,42.34863
72+
MSFT,2015-02-12T00:00:00-05:00,42.38834
73+
MSFT,2015-02-13T00:00:00-05:00,43.07346
74+
MSFT,2015-02-17T00:00:00-05:00,43.97
75+
MSFT,2015-02-18T00:00:00-05:00,43.63
76+
MSFT,2015-02-19T00:00:00-05:00,43.27
77+
MSFT,2015-02-20T00:00:00-05:00,43.5
78+
MSFT,2015-02-23T00:00:00-05:00,43.7
79+
MSFT,2015-02-24T00:00:00-05:00,44.15
80+
MSFT,2015-02-25T00:00:00-05:00,43.95
81+
MSFT,2015-02-26T00:00:00-05:00,43.99
82+
MSFT,2015-02-27T00:00:00-05:00,44.14
83+
MSFT,2015-03-02T00:00:00-05:00,43.67
84+
MSFT,2015-03-03T00:00:00-05:00,43.56
85+
MSFT,2015-03-04T00:00:00-05:00,43.01
86+
MSFT,2015-03-05T00:00:00-05:00,43.07
87+
MSFT,2015-03-06T00:00:00-05:00,43
88+
MSFT,2015-03-09T00:00:00-04:00,42.19
89+
MSFT,2015-03-10T00:00:00-04:00,42.35
90+
MSFT,2015-03-11T00:00:00-04:00,42.32
91+
MSFT,2015-03-12T00:00:00-04:00,41.33
92+
MSFT,2015-03-13T00:00:00-04:00,40.7
93+
MSFT,2015-03-16T00:00:00-04:00,41.47
94+
MSFT,2015-03-17T00:00:00-04:00,41.37
95+
MSFT,2015-03-18T00:00:00-04:00,41.43
96+
MSFT,2015-03-19T00:00:00-04:00,42.25

examples/python/stock_filter.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing,
13+
# software distributed under the License is distributed on an
14+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
# KIND, either express or implied. See the License for the
16+
# specific language governing permissions and limitations
17+
# under the License.
18+
#
19+
20+
import os
21+
dir_path = os.path.dirname(os.path.realpath(__file__))
22+
print dir_path
23+
input_data=dir_path+"/"+"/resources/stock_data.csv"
24+
data = []
25+
with open( input_data, "r") as outfile:
26+
outfile.readline()
27+
for line in outfile:
28+
data.append(line)
29+
30+
from pyapex import createApp
31+
32+
def filter_func(a):
33+
input_data=a.split(",")
34+
if float(input_data[2])> 45:
35+
return True
36+
return False
37+
38+
39+
from pyapex import createApp
40+
a=createApp('python_app').from_data(data) \
41+
.filter('filter_operator',filter_func) \
42+
.to_console(name='endConsole') \
43+
.launch(False)
44+

examples/python/word_count.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing,
13+
# software distributed under the License is distributed on an
14+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
# KIND, either express or implied. See the License for the
16+
# specific language governing permissions and limitations
17+
# under the License.
18+
#
19+
20+
import os
21+
dir_path = os.path.dirname(os.path.realpath(__file__))
22+
print dir_path
23+
input_data=dir_path+"/"+"/resources/hadoop_word_count.txt"
24+
data = []
25+
with open( input_data, "r") as outfile:
26+
outfile.readline()
27+
for line in outfile:
28+
for d in line.split(' '):
29+
if len(d):
30+
data.append(d)
31+
32+
33+
from pyapex import createApp
34+
from pyapex.functions.window import TriggerType,Trigger,TriggerOption
35+
36+
t=TriggerOption.at_watermark()
37+
t.firingOnlyUpdatedPanes()
38+
t.accumulatingFiredPanes()
39+
t.withEarlyFiringsAtEvery(count=4)
40+
41+
42+
from pyapex import createApp
43+
a=createApp('reduce_app2').from_data(data) \
44+
.window(window='TIME', duration=110, trigger=t,allowed_lateness=100) \
45+
.countByKey("countByKey") \
46+
.to_console(name='endConsole') \
47+
.launch(False)

library/pom.xml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
<version>3.8.0-SNAPSHOT</version>
3030
</parent>
3131

32+
33+
3234
<artifactId>malhar-library</artifactId>
3335
<packaging>jar</packaging>
3436

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package com.datatorrent.lib.io.fs;
20+
21+
import java.util.List;
22+
23+
import com.datatorrent.api.Context;
24+
import com.datatorrent.api.DefaultOutputPort;
25+
import com.datatorrent.api.InputOperator;
26+
27+
public class InMemoryDataInputOperator<T> implements InputOperator
28+
{
29+
private List<T> inputData = null;
30+
private boolean emissionCompleted = false;
31+
public final transient DefaultOutputPort<T> outputPort = new DefaultOutputPort<T>();
32+
33+
public InMemoryDataInputOperator()
34+
{
35+
inputData = null;
36+
}
37+
38+
public InMemoryDataInputOperator(List<T> data)
39+
{
40+
inputData = data;
41+
}
42+
43+
@Override
44+
public void emitTuples()
45+
{
46+
if (emissionCompleted) {
47+
return;
48+
}
49+
for (T data : inputData) {
50+
outputPort.emit(data);
51+
}
52+
emissionCompleted = true;
53+
}
54+
55+
@Override
56+
public void beginWindow(long l)
57+
{
58+
}
59+
60+
@Override
61+
public void endWindow()
62+
{
63+
}
64+
65+
@Override
66+
public void setup(Context.OperatorContext context)
67+
{
68+
}
69+
70+
@Override
71+
public void teardown()
72+
{
73+
}
74+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.apex.malhar.lib.window.accumulation;
20+
21+
import org.apache.apex.malhar.lib.window.Accumulation;
22+
23+
public interface Reduce<T> extends Accumulation<T,T,T>
24+
{
25+
public T reduce(T input1, T input2);
26+
}

library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/ReduceFn.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,14 @@
1818
*/
1919
package org.apache.apex.malhar.lib.window.accumulation;
2020

21-
import org.apache.apex.malhar.lib.window.Accumulation;
2221

2322
/**
2423
* An easy to use reduce Accumulation
2524
* @param <INPUT>
2625
*
2726
* @since 3.5.0
2827
*/
29-
public abstract class ReduceFn<INPUT> implements Accumulation<INPUT, INPUT, INPUT>
28+
public abstract class ReduceFn<INPUT> implements Reduce<INPUT>
3029
{
3130
@Override
3231
public INPUT defaultAccumulatedValue()

0 commit comments

Comments
 (0)