-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
117 lines (91 loc) · 3.06 KB
/
main.py
File metadata and controls
117 lines (91 loc) · 3.06 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
from fastapi import FastAPI, BackgroundTasks, Depends, HTTPException
from model import ClientData
from rocket import send_verification_email
import secrets
import random
import jinja2
from pathlib import Path
from dotenv import load_dotenv
from fastapi.middleware.cors import CORSMiddleware
from fastapi_mail import FastMail, MessageSchema, ConnectionConfig, MessageType
import os
load_dotenv()
# Create templates directory if it doesn't exist
TEMPLATES_DIR = Path("templates")
TEMPLATES_DIR.mkdir(exist_ok=True)
# Setup Jinja2 template environment
template_env = jinja2.Environment(
loader=jinja2.FileSystemLoader('templates')
)
app = FastAPI()
#Email Service depencey
def render_template(template_name: str, **kwargs) -> str:
template = template_env.get_template(template_name)
return template.render(**kwargs)
load_dotenv()
# Create templates directory if it doesn't exist
TEMPLATES_DIR = Path("templates")
TEMPLATES_DIR.mkdir(exist_ok=True)
# Setup Jinja2 template environment
template_env = jinja2.Environment(
loader=jinja2.FileSystemLoader('templates')
)
config = ConnectionConfig(
MAIL_USERNAME=os.getenv("MAIL_USERNAME"),
MAIL_PASSWORD=os.getenv("MAIL_PASSWORD"),
MAIL_FROM=os.getenv("MAIL_FROM"),
MAIL_PORT=int(os.getenv("MAIL_PORT")),
MAIL_SERVER=os.getenv("MAIL_SERVER"),
MAIL_STARTTLS=True,
MAIL_SSL_TLS=False,
USE_CREDENTIALS=True,
TEMPLATE_FOLDER=str(TEMPLATES_DIR)
)
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
def rocket_code():
# for i in range(2):
token =random.randint(1000, 9999)
return token
def rocket_url():
url_token = f"https://rocket-emailer.onrender.com/{secrets.token_hex(16)}"
return url_token
def render_template(template_name: str, **kwargs) -> str:
template = template_env.get_template(template_name)
return template.render(**kwargs)
async def contact_us(email: str, message: str, subject: str = "Rocket Account Activation", token: str = rocket_code(),url_token :str = rocket_url()): # type: ignore
html_content = render_template(
"rocket_feed.html",
message=message,
token = token,
url_token = url_token
)
message = MessageSchema(
subject=subject,
recipients=[email],
body=html_content,
subtype=MessageType.html,
attachments=[
{
"file": "assets/rocket.png",
"headers": {"Content-ID": "<company_logo>"},
"mime_type": "image/jpg",
"mime_subtype": "jpg",
}
]
)
fm = FastMail(config) # checks for the email send configurations and associated images / info
await fm.send_message(message)
@app.post("/root")
async def root(client_data: ClientData):
try:
await contact_us(client_data.client_email, client_data.client_message)
return {"message": "Email has been sent"}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))