现在单点登录已经是非常的普遍了, CAS(Central Authentication Service)统一认证服务也是属于我们必备的知识和技能,今天就来带大家学习如何接入统一认证服务。










__ init__.py:

代码语言:javascript复制""" flask_cas.__init__ """ import flask from flask import current_app # Find the stack on which we want to store the database connection. # Starting with Flask 0.9, the _app_ctx_stack is the correct one, # before that we need to use the _request_ctx_stack. try: from flask import _app_ctx_stack as stack except ImportError: from flask import _request_ctx_stack as stack from . import routing from functools import wraps class CAS(object): """ Required Configs: |Key | |----------------| |CAS_SERVER | |CAS_AFTER_LOGIN | Optional Configs: |Key | Default | |---------------------------|-----------------------| |CAS_TOKEN_SESSION_KEY | _CAS_TOKEN | |CAS_USERNAME_SESSION_KEY | CAS_USERNAME | |CAS_ATTRIBUTES_SESSION_KEY | CAS_ATTRIBUTES | |CAS_LOGIN_ROUTE | '/cas' | |CAS_LOGOUT_ROUTE | '/cas/logout' | |CAS_VALIDATE_ROUTE | '/cas/serviceValidate'| |CAS_AFTER_LOGOUT | None | """ def __init__(self, app=None, url_prefix=None): self._app = app if app is not None: self.init_app(app, url_prefix) def init_app(self, app, url_prefix=None): # Configuration defaults app.config.setdefault('CAS_TOKEN_SESSION_KEY', '_CAS_TOKEN') app.config.setdefault('CAS_USERNAME_SESSION_KEY', 'CAS_USERNAME') app.config.setdefault('CAS_ATTRIBUTES_SESSION_KEY', 'CAS_ATTRIBUTES') app.config.setdefault('CAS_LOGIN_ROUTE', '/cas') app.config.setdefault('CAS_LOGOUT_ROUTE', '/cas/logout') app.config.setdefault('CAS_VALIDATE_ROUTE', '/cas/serviceValidate') # Requires CAS 2.0 app.config.setdefault('CAS_AFTER_LOGOUT', None) # Register Blueprint app.register_blueprint(routing.blueprint, url_prefix=url_prefix) # Use the newstyle teardown_appcontext if it's available, # otherwise fall back to the request context if hasattr(app, 'teardown_appcontext'): app.teardown_appcontext(self.teardown) else: app.teardown_request(self.teardown) def teardown(self, exception): ctx = stack.top @property def app(self): return self._app or current_app @property def username(self): return flask.session.get( self.app.config['CAS_USERNAME_SESSION_KEY'], None) @property def attributes(self): return flask.session.get( self.app.config['CAS_ATTRIBUTES_SESSION_KEY'], None) @property def token(self): return flask.session.get( self.app.config['CAS_TOKEN_SESSION_KEY'], None) def login(): return flask.redirect(flask.url_for('cas.login', _external=True)) def logout(): return flask.redirect(flask.url_for('cas.logout', _external=True)) def login_required(function): @wraps(function) def wrap(*args, **kwargs): if 'CAS_USERNAME' not in flask.session: flask.session['CAS_AFTER_LOGIN_SESSION_URL'] = ( flask.request.script_root + flask.request.full_path ) return login() else: return function(*args, **kwargs) return wrap



代码语言:javascript复制""" flask_cas.cas_urls Functions for creating urls to access CAS. """ try: from urllib import quote from urllib import urlencode from urlparse import urljoin except ImportError: from urllib.parse import quote from urllib.parse import urljoin from urllib.parse import urlencode def create_url(base, path=None, *query): """ Create a url. Creates a url by combining base, path, and the query's list of key/value pairs. Escaping is handled automatically. Any key/value pair with a value that is None is ignored. Keyword arguments: base -- The left most part of the url (ex. http://localhost:5000). path -- The path after the base (ex. /foo/bar). query -- A list of key value pairs (ex. [('key', 'value')]). Example usage: >>> create_url( ... 'http://localhost:5000', ... 'foo/bar', ... ('key1', 'value'), ... ('key2', None), # Will not include None ... ('url', 'http://example.com'), ... ) 'http://localhost:5000/foo/bar?key1=value&url=http%3A%2F%2Fexample.com' """ url = base # Add the path to the url if it's not None. if path is not None: url = urljoin(url, quote(path)) # Remove key/value pairs with None values. query = filter(lambda pair: pair[1] is not None, query) # Add the query string to the url url = urljoin(url, '?{0}'.format(urlencode(list(query)))) return url def create_cas_login_url(cas_url, cas_route, service, renew=None, gateway=None): """ Create a CAS login URL . Keyword arguments: cas_url -- The url to the CAS (ex. http://sso.pdx.edu) cas_route -- The route where the CAS lives on server (ex. /cas) service -- (ex. http://localhost:5000/login) renew -- "true" or "false" gateway -- "true" or "false" Example usage: >>> create_cas_login_url( ... 'http://sso.pdx.edu', ... '/cas', ... 'http://localhost:5000', ... ) 'http://sso.pdx.edu/cas?service=http%3A%2F%2Flocalhost%3A5000' """ return create_url( cas_url, cas_route, ('service', service), ('renew', renew), ('gateway', gateway), ) def create_cas_logout_url(cas_url, cas_route, service=None): """ Create a CAS logout URL. Keyword arguments: cas_url -- The url to the CAS (ex. http://sso.pdx.edu) cas_route -- The route where the CAS lives on server (ex. /cas/logout) url -- (ex. http://localhost:5000/login) Example usage: >>> create_cas_logout_url( ... 'http://sso.pdx.edu', ... '/cas/logout', ... 'http://localhost:5000', ... ) 'http://sso.pdx.edu/cas/logout?service=http%3A%2F%2Flocalhost%3A5000' """ return create_url( cas_url, cas_route, ('service', service), ) def create_cas_validate_url(cas_url, cas_route, service, ticket, renew=None): """ Create a CAS validate URL. Keyword arguments: cas_url -- The url to the CAS (ex. http://sso.pdx.edu) cas_route -- The route where the CAS lives on server (ex. /cas/serviceValidate) service -- (ex. http://localhost:5000/login) ticket -- (ex. 'ST-58274-x839euFek492ou832Eena7ee-cas') renew -- "true" or "false" Example usage: >>> create_cas_validate_url( ... 'http://sso.pdx.edu', ... '/cas/serviceValidate', ... 'http://localhost:5000/login', ... 'ST-58274-x839euFek492ou832Eena7ee-cas' ... ) 'http://sso.pdx.edu/cas/serviceValidate?service=http%3A%2F%2Flocalhost%3A5000%2Flogin&ticket=ST-58274-x839euFek492ou832Eena7ee-cas' """ return create_url( cas_url, cas_route, ('service', service), ('ticket', ticket), ('renew', renew), )



代码语言:javascript复制import flask from xmltodict import parse from flask import current_app from .cas_urls import create_cas_login_url from .cas_urls import create_cas_logout_url from .cas_urls import create_cas_validate_url try: from urllib import urlopen except ImportError: from urllib.request import urlopen blueprint = flask.Blueprint('cas', __name__) @blueprint.route('/login/') def login(): """ This route has two purposes. First, it is used by the user to login. Second, it is used by the CAS to respond with the `ticket` after the user logs in successfully. When the user accesses this url, they are redirected to the CAS to login. If the login was successful, the CAS will respond to this route with the ticket in the url. The ticket is then validated. If validation was successful the logged in username is saved in the user's session under the key `CAS_USERNAME_SESSION_KEY` and the user's attributes are saved under the key 'CAS_USERNAME_ATTRIBUTE_KEY' """ cas_token_session_key = current_app.config['CAS_TOKEN_SESSION_KEY'] redirect_url = create_cas_login_url( current_app.config['CAS_SERVER'], current_app.config['CAS_LOGIN_ROUTE'], flask.url_for('.login', origin=flask.session.get('CAS_AFTER_LOGIN_SESSION_URL'), _external=True)) if 'ticket' in flask.request.args: flask.session[cas_token_session_key] = flask.request.args['ticket'] if cas_token_session_key in flask.session: if validate(flask.session[cas_token_session_key]): if 'CAS_AFTER_LOGIN_SESSION_URL' in flask.session: redirect_url = flask.session.pop('CAS_AFTER_LOGIN_SESSION_URL') elif flask.request.args.get('origin'): redirect_url = flask.request.args['origin'] else: redirect_url = flask.url_for( current_app.config['CAS_AFTER_LOGIN']) else: del flask.session[cas_token_session_key] current_app.logger.debug('Redirecting to: {0}'.format(redirect_url)) return flask.redirect(redirect_url) @blueprint.route('/logout/') def logout(): """ When the user accesses this route they are logged out. """ cas_username_session_key = current_app.config['CAS_USERNAME_SESSION_KEY'] cas_attributes_session_key = current_app.config['CAS_ATTRIBUTES_SESSION_KEY'] if cas_username_session_key in flask.session: del flask.session[cas_username_session_key] if cas_attributes_session_key in flask.session: del flask.session[cas_attributes_session_key] if(current_app.config['CAS_AFTER_LOGOUT'] is not None): redirect_url = create_cas_logout_url( current_app.config['CAS_SERVER'], current_app.config['CAS_LOGOUT_ROUTE'], current_app.config['CAS_AFTER_LOGOUT']) else: redirect_url = create_cas_logout_url( current_app.config['CAS_SERVER'], current_app.config['CAS_LOGOUT_ROUTE']) current_app.logger.debug('Redirecting to: {0}'.format(redirect_url)) return flask.redirect(redirect_url) def validate(ticket): """ Will attempt to validate the ticket. If validation fails, then False is returned. If validation is successful, then True is returned and the validated username is saved in the session under the key `CAS_USERNAME_SESSION_KEY` while tha validated attributes dictionary is saved under the key 'CAS_ATTRIBUTES_SESSION_KEY'. """ cas_username_session_key = current_app.config['CAS_USERNAME_SESSION_KEY'] cas_attributes_session_key = current_app.config['CAS_ATTRIBUTES_SESSION_KEY'] current_app.logger.debug("validating token {0}".format(ticket)) cas_validate_url = create_cas_validate_url( current_app.config['CAS_SERVER'], current_app.config['CAS_VALIDATE_ROUTE'], flask.url_for('.login', origin=flask.session.get('CAS_AFTER_LOGIN_SESSION_URL'), _external=True), ticket) current_app.logger.debug("Making GET request to {0}".format( cas_validate_url)) xml_from_dict = {} isValid = False try: xmldump = urlopen(cas_validate_url).read().strip().decode('utf8', 'ignore') xml_from_dict = parse(xmldump) isValid = True if "cas:authenticationSuccess" in xml_from_dict["cas:serviceResponse"] else False except ValueError: current_app.logger.error("CAS returned unexpected result") if isValid: current_app.logger.debug("valid") xml_from_dict = xml_from_dict["cas:serviceResponse"]["cas:authenticationSuccess"] username = xml_from_dict["cas:user"] flask.session[cas_username_session_key] = username if "cas:attributes" in xml_from_dict: attributes = xml_from_dict["cas:attributes"] if "cas:memberOf" in attributes: if not isinstance(attributes["cas:memberOf"], list): attributes["cas:memberOf"] = attributes["cas:memberOf"].lstrip('[').rstrip(']').split(',') for group_number in range(0, len(attributes['cas:memberOf'])): attributes['cas:memberOf'][group_number] = attributes['cas:memberOf'][group_number].lstrip(' ').rstrip(' ') else: for index in range(0, len(attributes['cas:memberOf'])): attributes["cas:memberOf"][index] = attributes["cas:memberOf"][index].lstrip('[').rstrip(']').split(',') for group_number in range(0, len(attributes['cas:memberOf'][index])): attributes['cas:memberOf'][index][group_number] = attributes['cas:memberOf'][index][group_number].lstrip(' ').rstrip(' ') flask.session[cas_attributes_session_key] = attributes else: current_app.logger.debug("invalid") return isValid



项目的基本情况给大家介绍了,那么怎么去使用它呢? 我建议首先看一下readme文件,但是readme文件中的使用方法可能大部分人会报错,所以这里我提供一下之前写的一个demo,能够打开即用。


代码语言:javascript复制import flask from flask import Flask from flask_cas import CAS from flask_cas import login_required app = Flask(__name__) cas = CAS(app) # 此处填写server端地址 app.config['CAS_SERVER'] = 'https://server.cas.com:8443/cas' # 填写cas登录后跳转的路由 app.config['CAS_AFTER_LOGIN'] = 'login' app.config['SECRET_KEY'] ='dasdasdasdasdada123123423423aASDAS' # 全局取消证书验证 import ssl ssl._create_default_https_context = ssl._create_unverified_context @app.route('/') @login_required def login(): return flask.render_template( 'demo.html', username=cas.username, ) if __name__ == '__main__': app.run("app1.cas.com", 9100, debug=True) # 此处填写客户端路由


代码语言:javascript复制 Title {{username}}










