-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathdb_utils.py
More file actions
50 lines (42 loc) · 1.34 KB
/
db_utils.py
File metadata and controls
50 lines (42 loc) · 1.34 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
from mongoengine import connect, Document, StringField, DictField
import os
from dotenv import load_dotenv
load_dotenv()
MONGO_USER = os.getenv("MONGO_USER")
MONGO_PASS = os.getenv("MONGO_PASS")
MONGO_HOST = os.getenv("MONGO_HOST", "localhost")
MONGO_PORT = int(os.getenv("MONGO_PORT", "27017"))
MONGO_DB = os.getenv("MONGO_DB", "bugbounty")
# Connect using mongoengine
connect(
db=MONGO_DB,
host=MONGO_HOST,
port=MONGO_PORT,
username=MONGO_USER,
password=MONGO_PASS,
)
class Program(Document):
platform = StringField(required=True)
handle = StringField(required=True)
name = StringField()
data = DictField()
meta = {
'collection': 'programs',
'indexes': [
{'fields': ('platform', 'handle'), 'unique': True}
]
}
# Functions using mongoengine ORM
def insert_or_update_program(platform, handle, name, data):
Program.objects(platform=platform, handle=handle).update_one(
set__name=name,
set__data=data,
upsert=True
)
def get_program(platform, handle):
prog = Program.objects(platform=platform, handle=handle).first()
return prog.data if prog else None
def get_all_handles(platform):
return [p.handle for p in Program.objects(platform=platform)]
def delete_program(platform, handle):
Program.objects(platform=platform, handle=handle).delete()