forked from CapivaraProjects/repository
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathDiseaseRepository.py
More file actions
112 lines (104 loc) · 4.04 KB
/
DiseaseRepository.py
File metadata and controls
112 lines (104 loc) · 4.04 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
from database.Disease import Disease as DiseaseDB
from models.Disease import Disease
from models.Plant import Plant
from repository.base import Base
from sqlalchemy import and_
class DiseaseRepository(Base):
"""
Disease repository, dedicated to realize all functions related to diseases.
"""
def __init__(self,
dbuser="",
dbpass="",
dbhost="",
port="",
dbname=""):
super().__init__(dbuser, dbpass, dbhost, port, dbname)
def create(self, disease=Disease()):
"""
(Disease) -> (Disease)
"""
diseaseDB = DiseaseDB(disease=disease)
session = self.session_factory()
session.add(diseaseDB)
session.flush()
session.refresh(diseaseDB)
session.commit()
return Disease(diseaseDB.id,
Plant(diseaseDB.plant.id,
diseaseDB.plant.scientificName,
diseaseDB.plant.commonName),
diseaseDB.scientificName,
diseaseDB.commonName)
def update(self, disease=Disease()):
"""
(Disease) -> (Disease)
"""
session = self.session_factory()
diseaseDB = session.query(DiseaseDB).filter_by(id=disease.id).first()
dic = {}
if (diseaseDB.plant.id != disease.plant.id):
dic['idPlant'] = disease.plant.id
if (diseaseDB.scientificName != disease.scientificName):
dic['scientificName'] = disease.scientificName
if (diseaseDB.commonName != disease.commonName):
dic['commonName'] = disease.commonName
if (dic != {}):
session.query(DiseaseDB).filter_by(id=disease.id).update(dic)
session.commit()
session.flush()
session.refresh(diseaseDB)
return Disease(diseaseDB.id,
Plant(diseaseDB.plant.id,
diseaseDB.plant.scientificName,
diseaseDB.plant.commonName),
diseaseDB.scientificName,
diseaseDB.commonName)
def delete(self, disease=Disease()):
"""
(Plant) -> (Boolean)
"""
status = False
session = self.session_factory()
diseaseDB = session.query(DiseaseDB).filter_by(id=disease.id).first()
session.delete(diseaseDB)
session.commit()
session.flush()
if (not session.query(DiseaseDB).filter_by(id=diseaseDB.id).count()):
status = True
session.close()
return status
def search(self, disease=Disease(), pageSize=10, offset=0):
"""
(Disease, pageSize, offset) -> {'total': int, 'content':[Disease]}
"""
session = self.session_factory()
query = session.query(DiseaseDB).filter(and_(
DiseaseDB.scientificName.like(
'%'+disease.scientificName+'%'),
DiseaseDB.commonName.like('%'+disease.commonName+'%')))
content = query.slice(offset, pageSize).all()
total = query.count()
diseases = []
for diseaseDB in content:
diseases.append(Disease(
diseaseDB.id,
Plant(diseaseDB.plant.id,
diseaseDB.plant.scientificName,
diseaseDB.plant.commonName),
diseaseDB.scientificName,
diseaseDB.commonName))
dic = {'total': total, 'content': diseases}
return dic
def searchByID(self, id):
"""
(Int) -> (Disease)
"""
session = self.session_factory()
diseaseDB = session.query(DiseaseDB).get(id)
return Disease(diseaseDB.id,
Plant(diseaseDB.plant.id,
diseaseDB.plant.scientificName,
diseaseDB.plant.commonName),
diseaseDB.scientificName,
diseaseDB.commonName)