-
Notifications
You must be signed in to change notification settings - Fork 45
Expand file tree
/
Copy pathaccess_token.py
More file actions
206 lines (183 loc) · 7.77 KB
/
access_token.py
File metadata and controls
206 lines (183 loc) · 7.77 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
import base64
import hashlib
import json
import os
import pathlib
import requests
from api_common import ApiCommon
from oauth_scope import Scope
from user_auth import get_auth_code
class AccessToken(ApiCommon):
def __init__(self, api_config, name=None):
if name:
self.name = name
else:
self.name = "access_token"
self.api_config = api_config
self.path = pathlib.Path(api_config.oauth_token_dir) / (self.name + ".json")
# use the recommended authorization approach
auth = api_config.app_id + ":" + api_config.app_secret
b64auth = base64.b64encode(auth.encode("ascii")).decode("ascii")
self.auth_headers = {"Authorization": "Basic " + b64auth}
def fetch(self, scopes=None, client_credentials=False):
"""
This method tries to make it as easy as possible for a developer
to start using an OAuth access token. It fetches the access token
by trying all supported methods, in this order:
1. Get from the process environment variable that is the UPPER CASE
version of the self.name attribute. This method is intended as
a quick hack for developers.
2. Read the access_token and (if available) the refresh_token from
the file at the path specified by joining the configured
OAuth token directory, the self.name attribute, and the '.json'
file extension.
3. Execute the OAuth 2.0 request flow using the default browser
and local redirect.
"""
try:
self.from_environment()
return
except Exception:
print(f"reading {self.name} from environment failed, trying read")
try:
self.read()
return
except Exception:
print(f"reading {self.name} failed, trying oauth")
self.oauth(scopes=scopes, client_credentials=client_credentials)
def from_environment(self):
"""
Easiest path for using an access token: get it from the
process environment. Note that the environment variable name
is the UPPER CASE of the self.name instance attribute.
"""
self.access_token = os.environ[self.name.upper()]
self.refresh_token = None
def read(self):
"""
Get the access token from the file at self.path.
"""
with open(self.path, "r") as jsonfile:
data = json.load(jsonfile)
self.name = data.get("name") or "access_token"
self.access_token = data["access_token"]
self.refresh_token = data.get("refresh_token")
self.scopes = data.get("scopes")
print(f"read {self.name} from {self.path}")
def write(self):
"""
Store the access token in the file at self.path.
"""
with open(self.path, "w") as jsonfile:
# make credential-bearing file as secure as possible
if "chmod" in dir(os):
os.chmod(jsonfile.fileno(), 0o600)
# write the information to the file
json.dump(
{
"name": self.name,
"access_token": self.access_token,
"refresh_token": self.refresh_token,
"scopes": self.scopes,
},
jsonfile,
indent=2,
)
def header(self, headers={}):
headers["Authorization"] = "Bearer " + self.access_token
return headers
def hashed(self):
"""
Print the access token in a human-readable format that does not reveal
the actual access credential. The purpose of this method is for a developer
to verify that the access token has changed after a refresh.
"""
return hashlib.sha256(self.access_token.encode()).hexdigest()
def hashed_refresh_token(self):
"""
Print the refresh token in a human-readable format that does not reveal
the actual access credential. The purpose of this method is for a developer
to verify when the refresh token changes.
"""
if not self.refresh_token:
raise RuntimeError("AccessToken does not have a refresh token")
return hashlib.sha256(self.refresh_token.encode()).hexdigest()
def _get_user_post_data(self, scopes):
"""
When requesting an OAuth token for a user, the protocol requires going
through the process of getting an auth_code. This process allows the
user to approve the scopes requested by the application.
"""
print("getting auth_code...")
auth_code = get_auth_code(self.api_config, scopes=scopes)
print(f"exchanging auth_code for {self.name}...")
# Generate POST data to exchange the auth_code (obtained by
# a redirect from the browser) for the access_token and a
# refresh_token.
return {
"code": auth_code,
"redirect_uri": self.api_config.redirect_uri,
"grant_type": "authorization_code",
}
def _get_client_post_data(self, scopes):
"""
When requesting an OAuth token for the client, no auth_code is required
because the user is the same as the owner of the client.
"""
print("getting access token using client credentials...")
return {
"grant_type": "client_credentials",
"scope": ",".join(list(map(lambda scope: scope.value, scopes))),
}
def oauth(self, scopes=None, client_credentials=False):
"""
Execute the OAuth 2.0 process for obtaining an access token.
For more information, see IETF RFC 6749: https://tools.ietf.org/html/rfc6749
and https://developers.pinterest.com/docs/getting-started/authentication-and-scopes/
""" # noqa: E501 because the long URL is okay
if not scopes:
scopes = [Scope.READ_USERS, Scope.READ_PINS, Scope.READ_BOARDS]
print(
"OAuth scopes required. Setting to default: "
f"{','.join(list(map(lambda scope: scope.value, scopes)))}"
)
post_data = (
client_credentials
and self._get_client_post_data(scopes)
or self._get_user_post_data(scopes)
)
if self.api_config.verbosity >= 2:
print("POST", self.api_config.api_uri + "/v5/oauth/token")
if self.api_config.verbosity >= 3:
self.api_config.credentials_warning()
print(post_data)
response = requests.post(
self.api_config.api_uri + "/v5/oauth/token",
headers=self.auth_headers,
data=post_data,
)
unpacked = self.unpack(response)
print("scope: " + unpacked["scope"])
self.access_token = unpacked["access_token"]
self.refresh_token = unpacked.get("refresh_token")
self.scopes = unpacked["scope"]
def refresh(self, continuous=False):
print(f"refreshing {self.name}...")
post_data = {"grant_type": "refresh_token", "refresh_token": self.refresh_token}
if continuous:
post_data["refresh_on"] = True
if self.api_config.verbosity >= 2:
print("POST", self.api_config.api_uri + "/v5/oauth/token")
if self.api_config.verbosity >= 3:
self.api_config.credentials_warning()
print(post_data)
response = requests.post(
self.api_config.api_uri + "/v5/oauth/token",
headers=self.auth_headers,
data=post_data,
)
unpacked = self.unpack(response)
self.access_token = unpacked["access_token"]
# save refresh token if it was also refreshed
if "refresh_token" in unpacked:
self.refresh_token = unpacked["refresh_token"]