forked from voidleko/TwitchGraphQLScripts
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathfollowing.py
More file actions
94 lines (79 loc) · 2.08 KB
/
following.py
File metadata and controls
94 lines (79 loc) · 2.08 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
import requests
import json
import sys
from typing import Optional, List
from dataclasses import dataclass
from datetime import datetime
from data import FollowerData
API_URL = "https://gql.twitch.tv/gql"
API_CLIENT_ID = "kd1unb4b3q4t58fwlpcbzcbnm76a8fp"
FOLLOWING_REQUEST_BODY = """
query fetchUser($id: ID, $login: String, $first: Int = 100, $after: Cursor) {
user(id: $id, login: $login, lookupType: ALL) {
follows(first: $first, after: $after) {
totalCount
pageInfo {
hasNextPage
}
edges {
cursor
followedAt
node {
login
createdAt
}
}
}
}
}
"""
def get_following(user: str) -> Optional[List[FollowerData]]:
session = requests.Session()
cursor = None
result = []
pagenum = 0
while True:
pagenum += 1
print(f"[LOG] loading following, page {pagenum}")
response = session.post(
url=API_URL,
json={
'query': FOLLOWING_REQUEST_BODY,
'variables': {
'login': user,
'after': cursor
}
},
headers={
"Client-ID": API_CLIENT_ID
})
if response.status_code != 200:
print(f"Failed request: exit code = {response.status_code}")
print(response.text)
return None
else:
try:
data = json.loads(response.text)
for follower_json in data['data']['user']['follows']['edges']:
cursor = follower_json['cursor']
if follower_json['node'] is None:
# Deleted account
continue
else:
name = follower_json['node']['login']
followed_at = datetime.fromisoformat(follower_json['followedAt'])
created_at = datetime.fromisoformat(follower_json['node']['createdAt'])
follower = FollowerData(name=name, created_at=created_at, followed_at=followed_at)
result.append(follower)
if not data['data']['user']['follows']['pageInfo']['hasNextPage']:
break
except Exception as e:
print("Failed: invalid response format")
print(response.text)
return None
return result
if __name__ == "__main__":
if len(sys.argv) == 2:
print(*get_following(sys.argv[1]), sep="\n")
else:
print("Ivalid run format")