Skip to content

Commit 9a4486e

Browse files
hitalinclaude
andcommitted
feat: ポーリングモードのノートキャプチャ対応
ポーリングモード時にsub_note/unsub_noteで指定されたノートの リアクション変更を定期的にバッチ取得して検出する。 - captured_notes: アカウントごとのキャプチャ中ノートIDセット - polling_loop: 2サイクルに1回、notes/showで個別取得(3並列) - リアクション差分検出: キャッシュと比較してreacted/unreactedイベントをemit Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 553b77b commit 9a4486e

1 file changed

Lines changed: 117 additions & 3 deletions

File tree

src/streaming.rs

Lines changed: 117 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::collections::HashMap;
1+
use std::collections::{HashMap, HashSet};
22
use std::sync::Arc;
33
use std::time::Duration;
44

@@ -192,6 +192,8 @@ pub struct StreamingManager {
192192
connections: Arc<Mutex<HashMap<String, ConnectionHandle>>>,
193193
poll_connections: Arc<Mutex<HashMap<String, PollingHandle>>>,
194194
subscriptions: Arc<RwLock<HashMap<String, SubscriptionInfo>>>,
195+
/// Note IDs being captured per account (for polling mode note updates).
196+
captured_notes: Arc<RwLock<HashMap<String, HashSet<String>>>>,
195197
emitter: Arc<dyn FrontendEmitter>,
196198
event_bus: Arc<EventBus>,
197199
db: Arc<Database>,
@@ -208,6 +210,7 @@ impl StreamingManager {
208210
connections: Arc::new(Mutex::new(HashMap::new())),
209211
poll_connections: Arc::new(Mutex::new(HashMap::new())),
210212
subscriptions: Arc::new(RwLock::new(HashMap::new())),
213+
captured_notes: Arc::new(RwLock::new(HashMap::new())),
211214
emitter,
212215
event_bus,
213216
db,
@@ -381,6 +384,7 @@ impl StreamingManager {
381384
let event_bus = self.event_bus.clone();
382385
let db = self.db.clone();
383386
let api_client = self.api_client.clone();
387+
let captured_notes = self.captured_notes.clone();
384388

385389
let task = tokio::spawn(async move {
386390
polling_loop(
@@ -393,6 +397,7 @@ impl StreamingManager {
393397
token_owned,
394398
interval,
395399
subscriptions,
400+
captured_notes,
396401
cancel_rx,
397402
)
398403
.await;
@@ -586,26 +591,41 @@ impl StreamingManager {
586591
}
587592

588593
pub async fn sub_note(&self, account_id: &str, note_id: &str) -> Result<(), NoteDeckError> {
589-
// In polling mode, note capture is handled by polling loop (PR 3)
590-
// For now, only send to WebSocket if connected
594+
// WebSocket mode: send subNote command
591595
let conns = self.connections.lock().await;
592596
if let Some(handle) = conns.get(account_id) {
593597
return handle
594598
.cmd_tx
595599
.send(WsCommand::SubNote { id: note_id.to_string() })
596600
.map_err(|_| NoteDeckError::ConnectionClosed);
597601
}
602+
drop(conns);
603+
604+
// Polling mode: add to captured_notes set for batch polling
605+
let mut captured = self.captured_notes.write().await;
606+
captured
607+
.entry(account_id.to_string())
608+
.or_default()
609+
.insert(note_id.to_string());
598610
Ok(())
599611
}
600612

601613
pub async fn unsub_note(&self, account_id: &str, note_id: &str) -> Result<(), NoteDeckError> {
614+
// WebSocket mode
602615
let conns = self.connections.lock().await;
603616
if let Some(handle) = conns.get(account_id) {
604617
return handle
605618
.cmd_tx
606619
.send(WsCommand::UnsubNote { id: note_id.to_string() })
607620
.map_err(|_| NoteDeckError::ConnectionClosed);
608621
}
622+
drop(conns);
623+
624+
// Polling mode: remove from captured_notes
625+
let mut captured = self.captured_notes.write().await;
626+
if let Some(set) = captured.get_mut(account_id) {
627+
set.remove(note_id);
628+
}
609629
Ok(())
610630
}
611631

@@ -1112,10 +1132,14 @@ async fn polling_loop(
11121132
token: String,
11131133
interval: Duration,
11141134
subscriptions: Arc<RwLock<HashMap<String, SubscriptionInfo>>>,
1135+
captured_notes: Arc<RwLock<HashMap<String, HashSet<String>>>>,
11151136
mut cancel_rx: tokio::sync::watch::Receiver<bool>,
11161137
) {
11171138
let mut sub_states: HashMap<String, PollSubState> = HashMap::new();
1139+
// Cached reaction counts for captured notes (for diff detection).
1140+
let mut note_reaction_cache: HashMap<String, HashMap<String, i64>> = HashMap::new();
11181141
let mut consecutive_failures: u64 = 0;
1142+
let mut poll_count: u64 = 0;
11191143

11201144
loop {
11211145
// Check cancellation
@@ -1199,6 +1223,96 @@ async fn polling_loop(
11991223
}
12001224
}
12011225

1226+
// Note capture: poll every 2nd cycle (2x interval)
1227+
poll_count += 1;
1228+
if poll_count % 2 == 0 {
1229+
let note_ids: Vec<String> = {
1230+
let captured = captured_notes.read().await;
1231+
captured
1232+
.get(&account_id)
1233+
.map(|s| s.iter().cloned().collect())
1234+
.unwrap_or_default()
1235+
};
1236+
1237+
// Fetch in batches of 3 to avoid overwhelming the server
1238+
for chunk in note_ids.chunks(3) {
1239+
let futures: Vec<_> = chunk
1240+
.iter()
1241+
.map(|note_id| {
1242+
let api = api_client.clone();
1243+
let host = host.clone();
1244+
let token = token.clone();
1245+
let account_id = account_id.clone();
1246+
let note_id = note_id.clone();
1247+
async move {
1248+
let result = api
1249+
.get_note(&host, &token, &account_id, &note_id)
1250+
.await;
1251+
(note_id, result)
1252+
}
1253+
})
1254+
.collect();
1255+
1256+
let results = futures_util::future::join_all(futures).await;
1257+
1258+
for (note_id, result) in results {
1259+
if let Ok(note) = result {
1260+
// Diff reactions against cache
1261+
let new_reactions = note.reactions.clone();
1262+
1263+
let old_reactions = note_reaction_cache
1264+
.get(&note_id)
1265+
.cloned()
1266+
.unwrap_or_default();
1267+
1268+
// Find new/increased reactions
1269+
for (reaction, &new_count) in &new_reactions {
1270+
let old_count = old_reactions.get(reaction).copied().unwrap_or(0);
1271+
if new_count > old_count {
1272+
let payload = StreamNoteCaptureEvent {
1273+
account_id: account_id.clone(),
1274+
note_id: note_id.clone(),
1275+
update_type: "reacted".to_string(),
1276+
body: json!({
1277+
"reaction": reaction,
1278+
"emoji": null,
1279+
"userId": null,
1280+
}),
1281+
};
1282+
emit_or_log!(
1283+
emitter,
1284+
"stream-note-capture-updated",
1285+
payload
1286+
);
1287+
}
1288+
}
1289+
1290+
// Find removed reactions
1291+
for (reaction, _) in &old_reactions {
1292+
if !new_reactions.contains_key(reaction) {
1293+
let payload = StreamNoteCaptureEvent {
1294+
account_id: account_id.clone(),
1295+
note_id: note_id.clone(),
1296+
update_type: "unreacted".to_string(),
1297+
body: json!({
1298+
"reaction": reaction,
1299+
"userId": null,
1300+
}),
1301+
};
1302+
emit_or_log!(
1303+
emitter,
1304+
"stream-note-capture-updated",
1305+
payload
1306+
);
1307+
}
1308+
}
1309+
1310+
note_reaction_cache.insert(note_id, new_reactions);
1311+
}
1312+
}
1313+
}
1314+
}
1315+
12021316
// Determine sleep duration
12031317
let sleep_duration = if poll_failed {
12041318
consecutive_failures += 1;

0 commit comments

Comments
 (0)