-
Notifications
You must be signed in to change notification settings - Fork 45
/
Copy pathext.py
355 lines (284 loc) · 12.3 KB
/
ext.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
# -*- coding: utf-8 -*-
#
# This file is part of Invenio.
# Copyright (C) 2015-2018 CERN.
# Copyright (C) 2022 RERO.
# Copyright (C) 2023-2025 Graz University of Technology.
#
# Invenio is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.
"""Invenio module that implements OAuth 2 server."""
import os
import warnings
import importlib_metadata
import oauthlib.common as oauthlib_commmon
import six
from flask import abort, request
from flask_login import current_user
from flask_menu import current_menu
from flask_oauthlib.contrib.oauth2 import bind_cache_grant
from invenio_i18n import LazyString
from invenio_i18n import lazy_gettext as _
from invenio_rest.csrf import csrf
from werkzeug.utils import cached_property, import_string
from . import config
from .models import OAuthUserProxy, Scope
from .provider import oauth2
class _OAuth2ServerState(object):
"""OAuth2 server state storing registered scopes."""
def __init__(self, app, entry_point_group=None):
"""Initialize state."""
self.app = app
self.scopes = {}
# Initialize OAuth2 provider
oauth2.init_app(app)
# Flask-OAuthlib does not support CACHE_REDIS_URL
if app.config["OAUTH2_CACHE_TYPE"] == "redis" and app.config.get(
"CACHE_REDIS_URL"
):
from redis import from_url as redis_from_url
app.config.setdefault(
"OAUTH2_CACHE_REDIS_HOST", redis_from_url(app.config["CACHE_REDIS_URL"])
)
# Configures an OAuth2Provider instance to use configured caching
# system to get and set the grant token.
bind_cache_grant(app, oauth2, lambda: OAuthUserProxy(current_user))
# Disables oauthlib's secure transport detection in in debug mode.
if app.debug or app.testing:
os.environ["OAUTHLIB_INSECURE_TRANSPORT"] = "1"
if entry_point_group:
self.load_entry_point_group(entry_point_group)
def scope_choices(self, exclude_internal=True):
"""Return list of scope choices.
:param exclude_internal: Exclude internal scopes or not.
(Default: ``True``)
:returns: A list of tuples (id, scope).
"""
return [
(k, scope)
for k, scope in sorted(self.scopes.items())
if not exclude_internal or not scope.is_internal
]
def register_scope(self, scope):
"""Register a scope.
:param scope: A :class:`invenio_oauth2server.models.Scope` instance.
"""
if not isinstance(scope, Scope):
raise TypeError("Invalid scope type.")
assert scope.id not in self.scopes
self.scopes[scope.id] = scope
def load_entry_point_group(self, entry_point_group):
"""Load actions from an entry point group.
:param entry_point_group: The entrypoint group name to load plugins.
"""
for ep in set(importlib_metadata.entry_points(group=entry_point_group)):
self.register_scope(ep.load())
def load_obj_or_import_string(self, value):
"""Import string or return object.
:params value: Import path or class object to instantiate.
:params default: Default object to return if the import fails.
:returns: The imported object.
"""
imp = self.app.config.get(value)
if isinstance(imp, six.string_types):
return import_string(imp)
elif imp:
return imp
@cached_property
def jwt_verification_factory(self):
"""Load default JWT verification factory."""
return self.load_obj_or_import_string("OAUTH2SERVER_JWT_VERIFICATION_FACTORY")
class InvenioOAuth2Server(object):
"""Invenio-OAuth2Server extension."""
def __init__(self, app=None, **kwargs):
"""Extension initialization.
:param app: An instance of :class:`flask.Flask`.
"""
if app:
self._state = self.init_app(app, **kwargs)
def init_app(self, app, entry_point_group="invenio_oauth2server.scopes", **kwargs):
"""Flask application initialization.
:param app: An instance of :class:`flask.Flask`.
:param entry_point_group: The entrypoint group name to load plugins.
(Default: ``'invenio_oauth2server.scopes'``)
"""
self.init_config(app)
state = _OAuth2ServerState(app, entry_point_group=entry_point_group)
app.extensions["invenio-oauth2server"] = state
return state
def init_config(self, app):
"""Initialize configuration.
:param app: An instance of :class:`flask.Flask`.
"""
app.config.setdefault(
"OAUTH2SERVER_BASE_TEMPLATE",
app.config.get("BASE_TEMPLATE", "invenio_oauth2server/base.html"),
)
app.config.setdefault(
"OAUTH2SERVER_COVER_TEMPLATE",
app.config.get("COVER_TEMPLATE", "invenio_oauth2server/base.html"),
)
app.config.setdefault(
"OAUTH2SERVER_SETTINGS_TEMPLATE",
app.config.get(
"SETTINGS_TEMPLATE", "invenio_oauth2server/settings/base.html"
),
)
for k in dir(config):
if k.startswith("OAUTH2SERVER_") or k.startswith("OAUTH2_"):
app.config.setdefault(k, getattr(config, k))
def __getattr__(self, name):
"""Proxy to state object."""
return getattr(self._state, name, None)
def _get_uri_from_request(request):
"""Get uri from request.
The uri returned from request.uri is not properly urlencoded
(sometimes it's partially urldecoded) This is a weird hack to get
werkzeug to return the proper urlencoded string uri
copy pasted code from flask-oauthlib-invenio
"""
uri = request.base_url
if request.query_string:
uri += "?" + request.query_string.decode("utf-8")
return uri
def extract_params():
"""Extract request params.
copy pasted code from flask-oauthlib-invenio
"""
uri = _get_uri_from_request(request)
http_method = request.method
headers = dict(request.headers)
if "wsgi.input" in headers:
del headers["wsgi.input"]
if "wsgi.errors" in headers:
del headers["wsgi.errors"]
# Werkzeug, and subsequently Flask provide a safe Authorization header
# parsing, so we just replace the Authorization header with the extraced
# info if it was successfully parsed.
if request.authorization:
headers["Authorization"] = str(request.authorization)
# request.form could create problems on file upload
body = {} # request.form.to_dict()
return uri, http_method, body, headers
def verify_request(scopes):
"""Verify request.
copy pasted code from flask-oauthlib-invenio
"""
uri, http_method, body, headers = extract_params()
try:
# compatibility to oauthlib
headers["Authorization"] = str(headers["Authorization"])
except KeyError:
pass
return oauth2.server.verify_request(uri, http_method, body, headers, scopes)
def verify_oauth_token_and_set_current_user():
"""Verify OAuth token and set current user on request stack.
This function should be used **only** on REST application.
.. code-block:: python
app.before_request(verify_oauth_token_and_set_current_user)
"""
# Since this function can be evoked multiple times
# we add a check to not run it if it has already run.
if hasattr(request, "oauth_verify_has_run"):
return
for func in oauth2._before_request_funcs:
func()
if not hasattr(request, "oauth") or not request.oauth:
scopes = []
try:
valid, req = verify_request(scopes)
except ValueError:
abort(400, "Error trying to decode a non urlencoded string.")
for func in oauth2._after_request_funcs:
valid, req = func(valid, req)
if valid:
request.oauth = req
if hasattr(request, "oauth"):
request.skip_csrf_check = True
request.oauth_verify_has_run = True
class InvenioOAuth2ServerREST(object):
"""Invenio-OAuth2Server REST extension."""
def __init__(self, app=None, **kwargs):
"""Extension initialization.
:param app: An instance of :class:`flask.Flask`.
"""
if app:
self.init_app(app, **kwargs)
def init_app(self, app, **kwargs):
"""Flask application initialization.
:param app: An instance of :class:`flask.Flask`.
"""
self.init_config(app)
allowed_urlencode_chars = app.config.get(
"OAUTH2SERVER_ALLOWED_URLENCODE_CHARACTERS"
)
if allowed_urlencode_chars:
InvenioOAuth2ServerREST.monkeypatch_oauthlib_urlencode_chars(
allowed_urlencode_chars
)
# add check to skip csrf validation if oauth request
csrf.before_csrf_protect(verify_oauth_token_and_set_current_user)
app.before_request(verify_oauth_token_and_set_current_user)
def init_config(self, app):
"""Initialize configuration."""
app.config.setdefault(
"OAUTH2SERVER_ALLOWED_URLENCODE_CHARACTERS",
getattr(config, "OAUTH2SERVER_ALLOWED_URLENCODE_CHARACTERS"),
)
@staticmethod
def monkeypatch_oauthlib_urlencode_chars(chars):
"""Monkeypatch OAuthlib set of "URL encoded"-safe characters.
.. note::
OAuthlib keeps a set of characters that it considers as valid
inside an URL-encoded query-string during parsing of requests. The
issue is that this set of characters wasn't designed to be
configurable since it should technically follow various RFC
specifications about URIs, like for example `RFC3986
<https://www.ietf.org/rfc/rfc3986.txt>`_. Many online services and
frameworks though have designed their APIs in ways that aim at
keeping things practical and readable to the API consumer, making
use of special characters to mark or seperate query-string
arguments. Such an example is the usage of embedded JSON strings
inside query-string arguments, which of course have to contain the
"colon" character (:) for key/value pair definitions.
Users of the OAuthlib library, in order to integrate with these
services and frameworks, end up either circumventing these "static"
restrictions of OAuthlib by pre-processing query-strings, or -in
search of a more permanent solution- directly make Pull Requests
to OAuthlib to include additional characters in the set, and
explain the logic behind their decision (one can witness these
efforts inside the git history of the source file that includes
this set of characters `here
<https://github.com/idan/oauthlib/commits/master/oauthlib/common.py>`_).
This kind of tactic leads easily to misconceptions about the
ability one has over the usage of specific features of services and
frameworks. In order to tackle this issue in Invenio-OAuth2Server,
we are monkey-patching this set of characters using a configuration
variable, so that usage of any special characters is a conscious
decision of the package user.
"""
modified_chars = set(chars)
always_safe = set(oauthlib_commmon.always_safe)
original_special_chars = oauthlib_commmon.urlencoded - always_safe
if modified_chars != original_special_chars:
warnings.warn(
'You are overriding the default OAuthlib "URL encoded" set of '
"valid characters. Make sure that the characters defined in "
"oauthlib.common.urlencoded are indeed limitting your needs.",
RuntimeWarning,
)
oauthlib_commmon.urlencoded = always_safe | modified_chars
def finalize_app(app):
"""Finalize app."""
icons = app.extensions["invenio-theme"].icons
current_menu.submenu("settings.applications").register(
endpoint="invenio_oauth2server_settings.index",
text=_(
"%(icon)s Applications",
icon=LazyString(lambda: f'<i class="{icons.codepen}"></i>'),
),
order=5,
active_when=lambda: request.endpoint.startswith(
"invenio_oauth2server_settings."
),
)