-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
227 lines (196 loc) · 9 KB
/
main.py
File metadata and controls
227 lines (196 loc) · 9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
from transformers import AutoModelForCausalLM, AutoTokenizer
from openai import OpenAI
from transformers.utils import get_json_schema
from atproto import Client, models, AtUri
from time import sleep
import sqlite3
import os
from atproto_client.exceptions import InvokeTimeoutError
DB_PATH = "notifications.db"
def init_database():
"""Initialize the SQLite database and create the table if it doesn't exist"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS replied_notifications (
cid TEXT PRIMARY KEY,
uri TEXT,
replied_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
conn.commit()
conn.close()
def has_replied_to_cid(cid):
"""Check if we've already replied to this notification CID"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('SELECT cid FROM replied_notifications WHERE cid = ?', (cid,))
result = cursor.fetchone()
conn.close()
return result is not None
def save_replied_cid(cid, uri):
"""Save the notification CID to the database after replying"""
try:
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('INSERT OR IGNORE INTO replied_notifications (cid, uri) VALUES (?, ?)', (cid, uri))
conn.commit()
conn.close()
print(f"Saved CID to database: {cid}")
except sqlite3.Error as e:
print(f"Database error: {e}")
# Initialize database on startup
init_database()
client = Client()
handle_name = 'handle-here'
app_password = 'app_password-here'
open_client = OpenAI(base_url="http://127.0.0.1:1235/v1", api_key="nothing") ## you must likely want to change this btw
client.login(handle_name, app_password)
def check_replies(thread_post, handle_name):
"""Check if we've already replied by looking at thread replies"""
didnt_reply = True
if hasattr(thread_post.thread, 'replies') and thread_post.thread.replies:
for i in range(len(thread_post.thread.replies)):
if handle_name == thread_post.thread.replies[i].post.author.handle:
print(f"Found our reply in thread: {thread_post.thread.replies[i].post.author.handle}")
didnt_reply = False
break
return didnt_reply
def name_of_user(thread_post):
name = thread_post.thread.post.author.display_name
return name
def text_of_parent_post(thread_post):
text = thread_post.thread.parent.post.record.text
return str(text)
def get_chat_start(text, text2, text3):
chat = [
{"role": "system", "content": f"You are a helpful assistant (you better not say user thinks). make everything you write be very short, one paragraph or less, don't mainsplain. the post above the user is asking about: '{text3}'."},
{"role": "user", "content": str(text).replace(('@' + handle_name), '')},
]
return chat
def get_chat_reply(text, text2, text3):
chat = [
{"role": "system", "content": f"You are a helpful assistant (you better not say user thinks). make everything you write be very short, one paragraph or less, don't mainsplain."},
{"role": "assistant", "content": str(text3) },
{"role": "user", "content": str(text).replace(('@' + handle_name), '')},
]
return chat
def get_chat_start_without_context(text):
chat = [
{"role": "system", "content": f"You are a helpful assistant (you better not say user thinks). make everything you write be very short, one paragraph or less. don't mainsplain"},
{"role": "user", "content": str(text).replace(('@' + handle_name), '')},
]
return chat
def get_chat_thread_history(og_uri,current_prompt):
chat = [{"role": "system", "content": f"You are a helpful assistant (you better not say user thinks). make everything you write be very short, one paragraph or less. don't mainsplain"},{"role": "user", "content": "test"}]
uri=None
not_done=True
count=1
while not_done is True:
the_role=None
if count % 2 == 0:
the_role="user"
else:
the_role="system"
thread_post=None
if count == 1:
thread_post = client.get_post_thread(og_uri)
uri=thread_post.thread.parent.post.uri
else:
thread_post = client.get_post_thread(uri)
print(chat)
#print(thread_post.thread.post.record.reply.parent.uri)
try:
uri=thread_post.thread.post.record.reply.parent.uri
print(uri)
except:
#print("break")
break
try:
text=thread_post.thread.parent.post.record.text
except:
text=thread_post.thread.replies[0].post.record.text
chat.insert(1,{"role": the_role, "content": text})
count=count+1
#print("out of loop")
chat.pop(1)
chat.pop()
chat.append({"role": "user", "content": current_prompt})
#print("final: "+str(chat))
return chat
def main():
client.login(handle_name, app_password)
while True:
try:
last_seen_at = client.get_current_time_iso()
responses = client.app.bsky.notification.list_notifications()
for notification in responses.notifications:
if notification.reason in ['mention','reply']:
if has_replied_to_cid(notification.cid):
#print(f"Already replied to CID (database): {notification.cid}")
continue
thread_post = client.get_post_thread(notification.uri)
above_post=thread_post.thread.parent.post.record.text
def get_name_of_user() -> str:
"""
Gets the of the user name.
"""
names = name_of_user(thread_post)
return names
def get_text_of_parent_post() -> str:
"""
Gets the text in the parent post.
"""
text = text_of_parent_post(thread_post)
return text
# Also check using the existing reply checking method
didnt_reply = check_replies(thread_post, handle_name)
if didnt_reply is True:
chat_template = None
if notification.reason == 'reply':
chat_template = get_chat_thread_history(
notification.uri,
notification.record.text,
)
else:
chat_template = get_chat_start(
notification.record.text,
get_name_of_user(),
get_text_of_parent_post()
)
response = open_client.responses.create(
model="granite-4.0-h-tiny",
input=chat_template,
store=False,
)
root_post_ref = models.create_strong_ref(thread_post.thread.parent.post)
reply_to_root = models.create_strong_ref(
client.get_post(
post_rkey=AtUri.from_str(notification.uri).rkey,
cid=notification.cid,
profile_identify=notification.author.did
)
)
responsef = response.output_text
client.send_post(
text=responsef[0:300],
reply_to=models.AppBskyFeedPost.ReplyRef(parent=reply_to_root, root=root_post_ref),
)
# Save the CID to database after successful reply
save_replied_cid(notification.cid, notification.uri)
#print(f"Response sent: {response.output_text}")
#print(f"Didn't reply (thread check): {didnt_reply}")
else:
#print(f"Did reply (thread check): {not didnt_reply}")
# Even if we found our reply in the thread, save the CID to database
save_replied_cid(notification.cid, notification.uri)
continue
client.app.bsky.notification.update_seen({'seen_at': last_seen_at})
print('Processed notifications')
sleep(5)
except InvokeTimeoutError:
client.login(handle_name, app_password)
continue
if __name__ == "__main__":
main()
sleep(90)