Source code for ofxclient.config

from __future__ import absolute_import
from __future__ import unicode_literals
from __future__ import with_statement
try:
    # python 3
    from configparser import ConfigParser
except ImportError:
    # python 2
    from ConfigParser import ConfigParser
try:
    import keyring
    keyring.get_password('is-backend', 'configured?')
    KEYRING_AVAILABLE = True
except RuntimeError:
    # no keyring backend found
    KEYRING_AVAILABLE = False
except ImportError:
    KEYRING_AVAILABLE = False
import os
import os.path
import sys

from ofxclient.account import Account

try:
    DEFAULT_CONFIG = os.path.expanduser(os.path.join('~', 'ofxclient.ini'))
except:
    DEFAULT_CONFIG = None


[docs]class SecurableConfigParser(ConfigParser): """:py:class:`ConfigParser` subclass that knows how to store options marked as secure into the OS specific keyring/keychain. To mark an option as secure, the caller must call 'set_secure' at least one time for the particular option and from then on it will be seen as secure and will be stored / retrieved from the keychain. Example:: from ofxclient.config import SecurableConfigParser # password will not be saved in the config file c = SecurableConfigParser() c.add_section('Info') c.set('Info','username','bill') c.set_secure('Info','password','s3cre7') with open('config.ini','wb') as fp: c.write(fp) """ _secure_placeholder = '%{secured}'
[docs] def __init__(self, keyring_name='ofxclient', keyring_available=KEYRING_AVAILABLE, **kwargs): if sys.version_info >= (3,): # python 3 ConfigParser.__init__(self, interpolation=None) else: # python 2 ConfigParser.__init__(self) self.keyring_name = keyring_name self.keyring_available = keyring_available self._unsaved = {} self.keyring_name = keyring_name
def is_secure_option(self, section, option): """Test an option to see if it is secured or not. :param section: section id :type section: string :param option: option name :type option: string :rtype: boolean otherwise. """ if not self.has_section(section): return False if not self.has_option(section, option): return False if ConfigParser.get(self, section, option) == self._secure_placeholder: return True return False def has_secure_option(self, section, option): """See is_secure_option""" return self.is_secure_option(section, option) def items(self, section): """Get all items for a section. Subclassed, to ensure secure items come back with the unencrypted data. :param section: section id :type section: string """ items = [] for k, v in ConfigParser.items(self, section): if self.is_secure_option(section, k): v = self.get(section, k) items.append((k, v)) return items def secure_items(self, section): """Like items() but only return secure items. :param section: section id :type section: string """ return [x for x in self.items(section) if self.is_secure_option(section, x[0])] def set(self, section, option, value): """Set an option value. Knows how to set options properly marked as secure.""" if self.is_secure_option(section, option): self.set_secure(section, option, value) else: ConfigParser.set(self, section, option, value) def set_secure(self, section, option, value): """Set an option and mark it as secure. Any subsequent uses of 'set' or 'get' will also now know that this option is secure as well. """ if self.keyring_available: s_option = "%s%s" % (section, option) self._unsaved[s_option] = ('set', value) value = self._secure_placeholder ConfigParser.set(self, section, option, value) def get(self, section, option, *args): """Get option value from section. If an option is secure, populates the plain text.""" if self.is_secure_option(section, option) and self.keyring_available: s_option = "%s%s" % (section, option) if self._unsaved.get(s_option, [''])[0] == 'set': return self._unsaved[s_option][1] else: return keyring.get_password(self.keyring_name, s_option) return ConfigParser.get(self, section, option, *args) def remove_option(self, section, option): """Removes the option from ConfigParser as well as the secure storage backend """ if self.is_secure_option(section, option) and self.keyring_available: s_option = "%s%s" % (section, option) self._unsaved[s_option] = ('delete', None) ConfigParser.remove_option(self, section, option) def write(self, *args): """See ConfigParser.write(). Also writes secure items to keystore.""" ConfigParser.write(self, *args) if self.keyring_available: for key, thing in self._unsaved.items(): action = thing[0] value = thing[1] if action == 'set': keyring.set_password(self.keyring_name, key, value) elif action == 'delete': try: keyring.delete_password(self.keyring_name, key) except: pass self._unsaved = {}
[docs]class OfxConfig(object): """Default config file handler for other tools to use. This can read and write from the default config which is $USERS_HOME/ofxclient.ini :param file_name: absolute path to a config file (optional) :type file_name: string or None Example usage:: from ofxclient.config import OfxConfig from ofxclient import Account a = Account() c = OfxConfig(file_name='/tmp/new.ini') c.add_account(a) c.save() account_list = c.accounts() one_account = c.account( a.local_id() ) """
[docs] def __init__(self, file_name=None): self.secured_field_names = [ 'institution.username', 'institution.password' ] f = file_name or DEFAULT_CONFIG if f is None: raise ValueError('file_name is required') self._load(f)
def reload(self): """Reload the config file from disk""" return self._load() def accounts(self): """List of confgured :py:class:`ofxclient.Account` objects""" return [self._section_to_account(s) for s in self.parser.sections()] def encrypted_accounts(self): return [a for a in self.accounts() if self.is_encrypted_account(a.local_id())] def unencrypted_accounts(self): return [a for a in self.accounts() if not self.is_encrypted_account(a.local_id())] def account(self, id): """Get :py:class:`ofxclient.Account` by section id""" if self.parser.has_section(id): return self._section_to_account(id) return None def add_account(self, account): """Add Account to config (does not save)""" serialized = account.serialize() section_items = flatten_dict(serialized) section_id = section_items['local_id'] if not self.parser.has_section(section_id): self.parser.add_section(section_id) for key in sorted(section_items): self.parser.set(section_id, key, section_items[key]) self.encrypt_account(id=section_id) return self def encrypt_account(self, id): """Make sure that certain fields are encrypted.""" for key in self.secured_field_names: value = self.parser.get(id, key) self.parser.set_secure(id, key, value) return self def is_encrypted_account(self, id): """Are all fields for the account id encrypted?""" for key in self.secured_field_names: if not self.parser.is_secure_option(id, key): return False return True def remove_account(self, id): """Add Account from config (does not save)""" if self.parser.has_section(id): self.parser.remove_section(id) return True return False def save(self): """Save changes to config file""" with open(self.file_name, 'w') as fp: self.parser.write(fp) return self def _load(self, file_name=None): self.parser = None file_name = file_name or self.file_name if not os.path.exists(file_name): with open(file_name, 'a'): os.utime(file_name, None) self.file_name = file_name conf = SecurableConfigParser() with open(self.file_name) as f: if hasattr(conf, 'read_file'): # python 3 conf.read_file(f) else: # python 2 conf.readfp(f) self.parser = conf return self def _section_to_account(self, section): section_items = dict(self.parser.items(section)) serialized = unflatten_dict(section_items) return Account.deserialize(serialized)
def unflatten_dict(dict, prefix=None, separator='.'): ret = {} for k, v in dict.items(): key_parts = k.split(separator) if len(key_parts) == 1: ret[k] = v else: first = key_parts[0] rest = key_parts[1:] temp = ret.setdefault(first, {}) for idx, part in enumerate(rest): if (idx+1) == len(rest): temp[part] = v else: temp = temp.setdefault(part, {}) return ret def flatten_dict(dict_, prefix=None, separator='.'): ret = {} for k, v in dict_.items(): if prefix: flat_key = separator.join([prefix, k]) else: flat_key = k if isinstance(v, dict): deflated = flatten_dict(v, prefix=flat_key) for dk, dv in deflated.items(): ret[dk] = dv else: ret[flat_key] = v return ret