mirror of
https://github.com/janeczku/calibre-web
synced 2025-11-07 10:43:55 +00:00
Refactored code
Testrun
This commit is contained in:
219
cps/oauth_bb.py
219
cps/oauth_bb.py
@@ -96,7 +96,113 @@ def logout_oauth_user():
|
||||
session.pop(str(oauth_key) + '_oauth_user_id')
|
||||
|
||||
|
||||
if ub.oauth_support:
|
||||
def oauth_update_token(provider_id, token, provider_user_id):
|
||||
session[provider_id + "_oauth_user_id"] = provider_user_id
|
||||
session[provider_id + "_oauth_token"] = token
|
||||
|
||||
# Find this OAuth token in the database, or create it
|
||||
query = ub.session.query(ub.OAuth).filter_by(
|
||||
provider=provider_id,
|
||||
provider_user_id=provider_user_id,
|
||||
)
|
||||
try:
|
||||
oauth_entry = query.one()
|
||||
# update token
|
||||
oauth_entry.token = token
|
||||
except NoResultFound:
|
||||
oauth_entry = ub.OAuth(
|
||||
provider=provider_id,
|
||||
provider_user_id=provider_user_id,
|
||||
token=token,
|
||||
)
|
||||
ub.session.add(oauth_entry)
|
||||
ub.session_commit()
|
||||
|
||||
# Disable Flask-Dance's default behavior for saving the OAuth token
|
||||
# Value differrs depending on flask-dance version
|
||||
return backend_resultcode
|
||||
|
||||
|
||||
def bind_oauth_or_register(provider_id, provider_user_id, redirect_url, provider_name):
|
||||
query = ub.session.query(ub.OAuth).filter_by(
|
||||
provider=provider_id,
|
||||
provider_user_id=provider_user_id,
|
||||
)
|
||||
try:
|
||||
oauth_entry = query.first()
|
||||
# already bind with user, just login
|
||||
if oauth_entry.user:
|
||||
login_user(oauth_entry.user)
|
||||
log.debug(u"You are now logged in as: '%s'", oauth_entry.user.nickname)
|
||||
flash(_(u"you are now logged in as: '%(nickname)s'", nickname= oauth_entry.user.nickname),
|
||||
category="success")
|
||||
return redirect(url_for('web.index'))
|
||||
else:
|
||||
# bind to current user
|
||||
if current_user and current_user.is_authenticated:
|
||||
oauth_entry.user = current_user
|
||||
try:
|
||||
ub.session.add(oauth_entry)
|
||||
ub.session.commit()
|
||||
flash(_(u"Link to %(oauth)s Succeeded", oauth=provider_name), category="success")
|
||||
return redirect(url_for('web.profile'))
|
||||
except Exception as e:
|
||||
log.debug_or_exception(e)
|
||||
ub.session.rollback()
|
||||
else:
|
||||
flash(_(u"Login failed, No User Linked With OAuth Account"), category="error")
|
||||
log.info('Login failed, No User Linked With OAuth Account')
|
||||
return redirect(url_for('web.login'))
|
||||
# return redirect(url_for('web.login'))
|
||||
# if config.config_public_reg:
|
||||
# return redirect(url_for('web.register'))
|
||||
# else:
|
||||
# flash(_(u"Public registration is not enabled"), category="error")
|
||||
# return redirect(url_for(redirect_url))
|
||||
except (NoResultFound, AttributeError):
|
||||
return redirect(url_for(redirect_url))
|
||||
|
||||
|
||||
def get_oauth_status():
|
||||
status = []
|
||||
query = ub.session.query(ub.OAuth).filter_by(
|
||||
user_id=current_user.id,
|
||||
)
|
||||
try:
|
||||
oauths = query.all()
|
||||
for oauth_entry in oauths:
|
||||
status.append(int(oauth_entry.provider))
|
||||
return status
|
||||
except NoResultFound:
|
||||
return None
|
||||
|
||||
|
||||
def unlink_oauth(provider):
|
||||
if request.host_url + 'me' != request.referrer:
|
||||
pass
|
||||
query = ub.session.query(ub.OAuth).filter_by(
|
||||
provider=provider,
|
||||
user_id=current_user.id,
|
||||
)
|
||||
try:
|
||||
oauth_entry = query.one()
|
||||
if current_user and current_user.is_authenticated:
|
||||
oauth_entry.user = current_user
|
||||
try:
|
||||
ub.session.delete(oauth_entry)
|
||||
ub.session.commit()
|
||||
logout_oauth_user()
|
||||
flash(_(u"Unlink to %(oauth)s Succeeded", oauth=oauth_check[provider]), category="success")
|
||||
except Exception as e:
|
||||
log.debug_or_exception(e)
|
||||
ub.session.rollback()
|
||||
flash(_(u"Unlink to %(oauth)s Failed", oauth=oauth_check[provider]), category="error")
|
||||
except NoResultFound:
|
||||
log.warning("oauth %s for user %d not found", provider, current_user.id)
|
||||
flash(_(u"Not Linked to %(oauth)s", oauth=provider), category="error")
|
||||
return redirect(url_for('web.profile'))
|
||||
|
||||
def generate_oauth_blueprints():
|
||||
oauthblueprints = []
|
||||
if not ub.session.query(ub.OAuthProvider).count():
|
||||
for provider in ("github", "google"):
|
||||
@@ -141,8 +247,12 @@ if ub.oauth_support:
|
||||
app.register_blueprint(blueprint, url_prefix="/login")
|
||||
if element['active']:
|
||||
register_oauth_blueprint(element['id'], element['provider_name'])
|
||||
return oauthblueprints
|
||||
|
||||
|
||||
if ub.oauth_support:
|
||||
oauthblueprints = generate_oauth_blueprints()
|
||||
|
||||
@oauth_authorized.connect_via(oauthblueprints[0]['blueprint'])
|
||||
def github_logged_in(blueprint, token):
|
||||
if not token:
|
||||
@@ -175,113 +285,6 @@ if ub.oauth_support:
|
||||
return oauth_update_token(str(oauthblueprints[1]['id']), token, google_user_id)
|
||||
|
||||
|
||||
def oauth_update_token(provider_id, token, provider_user_id):
|
||||
session[provider_id + "_oauth_user_id"] = provider_user_id
|
||||
session[provider_id + "_oauth_token"] = token
|
||||
|
||||
# Find this OAuth token in the database, or create it
|
||||
query = ub.session.query(ub.OAuth).filter_by(
|
||||
provider=provider_id,
|
||||
provider_user_id=provider_user_id,
|
||||
)
|
||||
try:
|
||||
oauth_entry = query.one()
|
||||
# update token
|
||||
oauth_entry.token = token
|
||||
except NoResultFound:
|
||||
oauth_entry = ub.OAuth(
|
||||
provider=provider_id,
|
||||
provider_user_id=provider_user_id,
|
||||
token=token,
|
||||
)
|
||||
ub.session.add(oauth_entry)
|
||||
ub.session_commit()
|
||||
|
||||
# Disable Flask-Dance's default behavior for saving the OAuth token
|
||||
# Value differrs depending on flask-dance version
|
||||
return backend_resultcode
|
||||
|
||||
|
||||
def bind_oauth_or_register(provider_id, provider_user_id, redirect_url, provider_name):
|
||||
query = ub.session.query(ub.OAuth).filter_by(
|
||||
provider=provider_id,
|
||||
provider_user_id=provider_user_id,
|
||||
)
|
||||
try:
|
||||
oauth_entry = query.first()
|
||||
# already bind with user, just login
|
||||
if oauth_entry.user:
|
||||
login_user(oauth_entry.user)
|
||||
log.debug(u"You are now logged in as: '%s'", oauth_entry.user.nickname)
|
||||
flash(_(u"you are now logged in as: '%(nickname)s'", nickname= oauth_entry.user.nickname),
|
||||
category="success")
|
||||
return redirect(url_for('web.index'))
|
||||
else:
|
||||
# bind to current user
|
||||
if current_user and current_user.is_authenticated:
|
||||
oauth_entry.user = current_user
|
||||
try:
|
||||
ub.session.add(oauth_entry)
|
||||
ub.session.commit()
|
||||
flash(_(u"Link to %(oauth)s Succeeded", oauth=provider_name), category="success")
|
||||
return redirect(url_for('web.profile'))
|
||||
except Exception as e:
|
||||
log.debug_or_exception(e)
|
||||
ub.session.rollback()
|
||||
else:
|
||||
flash(_(u"Login failed, No User Linked With OAuth Account"), category="error")
|
||||
log.info('Login failed, No User Linked With OAuth Account')
|
||||
return redirect(url_for('web.login'))
|
||||
# return redirect(url_for('web.login'))
|
||||
# if config.config_public_reg:
|
||||
# return redirect(url_for('web.register'))
|
||||
# else:
|
||||
# flash(_(u"Public registration is not enabled"), category="error")
|
||||
# return redirect(url_for(redirect_url))
|
||||
except (NoResultFound, AttributeError):
|
||||
return redirect(url_for(redirect_url))
|
||||
|
||||
|
||||
def get_oauth_status():
|
||||
status = []
|
||||
query = ub.session.query(ub.OAuth).filter_by(
|
||||
user_id=current_user.id,
|
||||
)
|
||||
try:
|
||||
oauths = query.all()
|
||||
for oauth_entry in oauths:
|
||||
status.append(int(oauth_entry.provider))
|
||||
return status
|
||||
except NoResultFound:
|
||||
return None
|
||||
|
||||
|
||||
def unlink_oauth(provider):
|
||||
if request.host_url + 'me' != request.referrer:
|
||||
pass
|
||||
query = ub.session.query(ub.OAuth).filter_by(
|
||||
provider=provider,
|
||||
user_id=current_user.id,
|
||||
)
|
||||
try:
|
||||
oauth_entry = query.one()
|
||||
if current_user and current_user.is_authenticated:
|
||||
oauth_entry.user = current_user
|
||||
try:
|
||||
ub.session.delete(oauth_entry)
|
||||
ub.session.commit()
|
||||
logout_oauth_user()
|
||||
flash(_(u"Unlink to %(oauth)s Succeeded", oauth=oauth_check[provider]), category="success")
|
||||
except Exception as e:
|
||||
log.debug_or_exception(e)
|
||||
ub.session.rollback()
|
||||
flash(_(u"Unlink to %(oauth)s Failed", oauth=oauth_check[provider]), category="error")
|
||||
except NoResultFound:
|
||||
log.warning("oauth %s for user %d not found", provider, current_user.id)
|
||||
flash(_(u"Not Linked to %(oauth)s", oauth=provider), category="error")
|
||||
return redirect(url_for('web.profile'))
|
||||
|
||||
|
||||
# notify on OAuth provider error
|
||||
@oauth_error.connect_via(oauthblueprints[0]['blueprint'])
|
||||
def github_error(blueprint, error, error_description=None, error_uri=None):
|
||||
|
||||
Reference in New Issue
Block a user