-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsend_emails_no_linkedin.py
More file actions
157 lines (129 loc) · 4.99 KB
/
send_emails_no_linkedin.py
File metadata and controls
157 lines (129 loc) · 4.99 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
#!/usr/bin/env python3
"""
Send AI-customized emails without LinkedIn scraping.
"""
import asyncio
import os
import aiohttp
from datetime import datetime, timedelta
import pytz
from dotenv import load_dotenv
from ai_email_generator import AIEmailGenerator
load_dotenv()
class CCAPIClient:
def __init__(self):
self.api_key = os.getenv("CCAI_API_KEY")
self.client_id = os.getenv("CCAI_CLIENT_ID")
self.email_url = os.getenv("CCAI_EMAIL_URL")
if not self.api_key or not self.client_id:
raise ValueError("CCAI_API_KEY and CCAI_CLIENT_ID must be set in .env file")
self.headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
"Accept": "*/*",
"clientId": self.client_id,
"accountId": "1223",
}
async def send_test_email(self, first_name, last_name, to_email, subject, message):
try:
pst = pytz.timezone("America/Los_Angeles")
now = datetime.now(pst)
scheduled_time = now + timedelta(minutes=2)
campaign = {
"subject": subject,
"title": f"Test AI Email - {first_name}",
"message": message,
"senderEmail": os.getenv("SENDER_EMAIL", "andreas@allcode.com"),
"replyEmail": os.getenv("SENDER_EMAIL", "andreas@allcode.com"),
"senderName": os.getenv("SENDER_NAME", "Andreas Garcia"),
"scheduledTimestamp": scheduled_time.isoformat(),
"scheduledTimezone": "America/Los_Angeles",
"accounts": [
{
"firstName": first_name,
"lastName": last_name,
"email": to_email,
"phone": "",
}
],
"campaignType": "EMAIL",
"addToList": "noList",
"contactInput": "accounts",
"fromType": "single",
"senders": [],
}
async with aiohttp.ClientSession() as session:
async with session.post(
f"{self.email_url}/api/v1/campaigns",
headers=self.headers,
json=campaign,
) as response:
result = await response.json()
return {
"success": response.status in [200, 201],
"status_code": response.status,
"response": result,
}
except Exception as e:
return {"success": False, "error": str(e)}
async def send_emails_no_linkedin():
"""Send AI emails without LinkedIn scraping."""
test_contacts = [
{
"name": "Andreas Garcia",
"email": "andreas@allcode.com",
"company": "AllCode",
"title": "Software Developer",
},
{
"name": "Joel Garcia",
"email": "joel@allcode.com",
"company": "AllCode",
"title": "CEO & Founder",
},
]
print("🤖 Sending AI emails without LinkedIn scraping...")
print("=" * 50)
ai_generator = AIEmailGenerator()
ccai_client = CCAPIClient()
for contact in test_contacts:
print(f"\n📤 Processing {contact['name']} ({contact['email']})")
try:
# Create basic profile info without LinkedIn
profile_info = f"""Name: {contact["name"]}
Role: {contact["title"]} at {contact["company"]}"""
# Generate AI email
first_name = contact["name"].split()[0]
ai_email = await ai_generator.generate_ai_email(profile_info, first_name)
# Parse subject and content
lines = ai_email.split("\n")
subject = lines[0].replace("Subject: ", "")
body_start = 1
while body_start < len(lines) and not lines[body_start].strip():
body_start += 1
content = "\n".join(lines[body_start:])
# Parse name
name_parts = contact["name"].split()
first_name = name_parts[0]
last_name = " ".join(name_parts[1:]) if len(name_parts) > 1 else ""
# Send email
result = await ccai_client.send_test_email(
first_name=first_name,
last_name=last_name,
to_email=contact["email"],
subject=subject,
message=content,
)
if result["success"]:
print("✅ AI email sent successfully")
print(f"📧 Subject: {subject}")
else:
print(f"❌ Email failed: {result}")
except Exception as e:
print(f"❌ Error: {e}")
await asyncio.sleep(2)
print("\n🎉 Email campaign completed!")
if __name__ == "__main__":
print("🤖 AI Email System - No LinkedIn Required")
print("=" * 40)
asyncio.run(send_emails_no_linkedin())