-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathtest_oauth2.py
More file actions
89 lines (65 loc) · 3.02 KB
/
test_oauth2.py
File metadata and controls
89 lines (65 loc) · 3.02 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
import webbrowser
import requests
from requests_oauthlib import OAuth2Session
import os
import json
import argh
def get_authorization_url(base_url):
return f'{base_url.rstrip("/")}/oauth2/authorize'
def get_token_url(base_url):
return f'{base_url.rstrip("/")}/oauth2/access_token'
def get_authorized_session(session, base_url, client_id, client_secret):
authorization_url, state = session.authorization_url(get_authorization_url(base_url))
print('Go to', authorization_url)
oauth_code = input('Enter Code: ')
access_token_response = requests.post(get_token_url(base_url), data={
'client_id': client_id,
'client_secret': client_secret,
'code': oauth_code,
'grant_type': 'authorization_code'
})
if access_token_response.status_code == 200:
token = access_token_response.json()
print('User successfully authorized, with token:', token)
with open('access_token.json', 'w') as stored_access_token:
stored_access_token.write(json.dumps(token))
return OAuth2Session(client_id=client_id, token=token)
else:
print(access_token_response.text)
def do_oauth2(client_id, client_secret, redirect_uri, base_url, force_refresh=False, reauthenticate=False):
base_url = base_url.rstrip('/')
try:
with open('access_token.json', 'r') as stored_access_token:
token = json.loads(stored_access_token.read())
except (FileNotFoundError, ValueError):
token = None
if token and not reauthenticate:
projectplace = OAuth2Session(client_id, token=token)
else:
projectplace = get_authorized_session(
OAuth2Session(client_id, redirect_uri=redirect_uri), base_url, client_id, client_secret
)
print('Calling with token', projectplace.token)
response = projectplace.get(f'{base_url}/1/user/me/profile')
if response.status_code == 401 or force_refresh:
print('Unauthorized, access token may have expired - attempting to refresh token.')
refresh_response = requests.post(f'{base_url}/oauth2/access_token', {
'client_id': client_id,
'client_secret': client_secret,
'refresh_token': projectplace.token[u'refresh_token'],
'grant_type': 'refresh_token'
})
if refresh_response.status_code == 200:
token = refresh_response.json()
with open('access_token.json', 'w') as stored_access_token:
stored_access_token.write(json.dumps(token))
projectplace = OAuth2Session(client_id=client_id, token=token)
print('%s: Refreshing token worked, new access token:', token)
else:
print('%s: Refreshing failed, response =', refresh_response.text)
print('Deleting expired access token. Try again!')
os.unlink('access_token.json')
if response.status_code == 200:
print('200 OK Successfully fetched profile belonging to', response.json()['sort_name'])
if __name__ == '__main__':
argh.dispatch_command(do_oauth2)