-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathsqlconsumer.py
More file actions
198 lines (170 loc) · 5.72 KB
/
sqlconsumer.py
File metadata and controls
198 lines (170 loc) · 5.72 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
#sqlconsumer.py
"""
Consumes basic packets and stores them in a SQL database
Currently SQLite3 only
"""
from aprsconsumer import Consumer
from aprspacket import AprsFrame
import datetime
from sqlite3 import dbapi2 as dba
import logging,sys
logger = logging.getLogger('MyLogger')
debug=logger.debug
info=logger.info
exception=logger.exception
##TODO: This is sqlite3 specific
def adapt_datetime(ts):
return time.mktime(ts.timetuple()) + float(str(ts.microsecond)[:3])/1000
dba.register_adapter(datetime.datetime, adapt_datetime)
class Main(Consumer):
def __init__(self,parameters,name,connString=':memory:'):
Consumer.__init__(self,parameters,name)
def _runFirst(self):
debug('Connecting to DB')
self.__connect()
if self.parameters.build_db==1:
self._buildDB()
def consume(self,packet):
#check to see if the packet is already in the system
debug('Storing Packet')
cur=self.dbConn.cursor()
cur.execute("""select * from Reports
where sourceAddress=? and information=?
""",(str(packet.source),packet.information)
)
storedReport=cur.fetchone()
##TODO: add time/dupe filter
if storedReport:
if storedReport.heardLocal:
debug('dupe report previously heard locally')
return
if packet.heardLocal:
debug('Replace dupe with local')
self.__updateReport(packet)
else:
debug('insert new packet report')
self.__insertReport(packet)
def __updateReport(self,packet):
##TODO: update query
pass
def __insertReport(self,packet):
sql="""insert into Reports (
receivedTime
,aprsString
,sourcePort
,heardLocal
,hasLocation
,sourceAddress
,destinationAddress
,digipeaters
,information
,symbolTable
,symbolCharacter
,symbolOverlay
,latitude
,longitude
,elevation
)
values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)
"""
vals=( str(packet.receivedTime)
,str(packet)
,packet.sourcePort
,packet.heardLocal
,packet.payload.hasLocation
,str(packet.source)
,str(packet.destination)
,str(packet.digipeaters)
,packet.information
,packet.payload.symbolTable
,packet.payload.symbolCharacter
,packet.payload.symbolOverlay
,packet.payload.latitude
,packet.payload.longitude
,packet.payload.elevation
)
self.dbConn.execute(sql,vals)
self.dbConn.commit()
debug('SQL report insert successfull')
def __connect(self):
##TODO: error trap db connection
self.dbConn=dba.connect(self.parameters.connection_string)
#sql="""SELECT load_extension('bin/libspatialite-2.dll')"""
#self.dbConn.execute(sql)
def __disconnect(self):
self.dbConn.close()
def _buildDB(self):
"""
Creates the tables necessary for tracking APRS stations
"""
info('Rebuilding DB')
#drop the table first
self.dbConn.execute('drop table if exists Reports')
sql="""create table Reports (
receivedTime date not null
,aprsString text(200) not null
,sourcePort text(50)
,heardLocal bit
,hasLocation bit
,sourceAddress text(10) not null
,destinationAddress text(10) not null
,digipeaters text(100) not null
,information text(256) not null
,symbolTable text(1)
,symbolCharacter text(1)
,symbolOverlay text(1)
,latitude real
,longitude real
,elevation real
,constraint pk_Reports_reportId
primary key (sourceAddress,receivedTime,information)
)
"""
print sql
self.dbConn.execute(sql)
## sql="""
## alter table Reports
## add constraint pk_Reports_reportId
## primary key (fromCall,fromSSID,receivedTime)
## """
## self.dbConn.execute(sql)
sql="""
create index idx_Reports_sourceAddress
on Reports (sourceAddress)
"""
self.dbConn.execute(sql)
sql="""
create index idx_Reports_Positions
on Reports (
sourceAddress,latitude,longitude,elevation
)
"""
self.dbConn.execute(sql)
self.dbConn.commit()
self.dbConn.execute('vacuum')
info('DB rebuild complete')
def test():
import ConfigParser
from main import ConfigSection
dbConn=':memory:'
iniFile='aprsmonitor.ini'
cfg=ConfigParser.ConfigParser()
cfg.read(iniFile)
params=ConfigSection(cfg.items('sqlite_1'))
print cfg.sections()
consumer=Main(params,'sqlite_1',':memory:')
consumer._runFirst()
consumer._buildDB()
reports=("""JF3UYN>APU25N,TCPIP*,qAC,JG6YCL-JA:=3449.90N/13513.30E-PHG2450 Kita-Rokko Kobe WiRES6084 {UIV32N}"""
,"""JM6ISF>APU25N,JM6ISF-3*,TRACE3-2,qAR,JA6YWR:=3129.57N/13042.43EIJ-net 144.66MHz 9600bps I-gate {UIV32N}"""
)
packets=[]
for report in reports:
bp=AprsFrame(report)
packets.append(bp)
for packet in packets:
print 'Storing: \n%s::%s' % (packet.source,packet.information)
consumer.consume(packet)
#consumer._buildDB()
if __name__=='__main__':
test()