astropy:docs

Source code for astropy.modeling.fitting

# Licensed under a 3-clause BSD style license - see LICENSE.rst

"""
This module provides wrappers, called Fitters, around some Numpy and Scipy
fitting functions. All Fitters take an instance of `~astropy.modeling.core.ParametricModel`
as input and define a ``__call__`` method which fits the model to the data and changes the
model's parameters attribute. The idea is to make this extensible and allow
users to easily add other fitters.

Linear fitting is done using Numpy's `~numpy.linalg.lstsq` function.
There are currently two non-linear fitters which use `~scipy.optimize.leastsq` and
`~scipy.optimize.slsqp` functions in scipy.optimize.\
"""

from __future__ import division

import abc
import numbers
import warnings

from functools import reduce

import numpy as np

from ..logger import log
from .utils import poly_map_domain
from ..utils.exceptions import AstropyUserWarning



__all__ = ['LinearLSQFitter', 'NonLinearLSQFitter', 'SLSQPFitter',
           'JointFitter', 'Fitter']



DEFAULT_MAXITER = 100
DEFAULT_EPS = np.sqrt(np.finfo(float).eps)
DEFAULT_MAX_BOUND = 10 ** 12
DEFAULT_MIN_BOUND = -10 ** 12


def _convert_input(x, y, z=None):
    x = np.asarray(x)
    y = np.asarray(y)
    if x.shape[0] != y.shape[0]:
        raise ValueError("x and y should have the same shape")
    if z is None:
        farg = (x, y)
    else:
        z = np.asarray(z)
        if x.shape != z.shape:
            raise ValueError("x, y and z should have the same shape")
        farg = (x, y, z)
    return farg


class ModelsError(Exception):
    """Base class for model exceptions"""


class ModelLinearityError(ModelsError):
    """
    Raised when a linear model is passed to a non-linear fitter and vice versa.
    """


class UnsupportedConstraintError(ModelsError, ValueError):
    """
    Raised when a fitter does not support a type of constraint.
    """


[docs]class Fitter(object): """ Base class for all fitters. The purpose of this class is to manage constraints. """ __metaclass__ = abc.ABCMeta # The base Fitter does not support any constraints by default; individual # fitters should explicitly set this list to the specific constraints # it supports # May contain any of 'bounds', 'eqcons', 'ineqcons', 'fixed', or 'tied' supported_constraints = [] def __init__(self): self._weights = None def _validate_constraints(self, model): message = '{0} cannot handle {{0}} constraints.'.format( self.__class__.__name__) if (any(model.fixed.values()) and 'fixed' not in self.supported_constraints): raise UnsupportedConstraintError( message.format('fixed parameter')) if (any(model.tied.values()) and 'tied' not in self.supported_constraints): raise UnsupportedConstraintError( message.format('tied parameter')) if (any([tuple(b) != (None, None) for b in model.bounds.values()]) and 'bounds' not in self.supported_constraints): raise UnsupportedConstraintError( message.format('bound parameter')) if model.eqcons and 'eqcons' not in self.supported_constraints: raise UnsupportedConstraintError(message.format('equality')) if (model.ineqcons and 'ineqcons' not in self.supported_constraints): raise UnsupportedConstraintError(message.format('inequality')) # TODO # @property # def covar(self): # return None @abc.abstractmethod def __call__(self): """ This method performs the actual fitting and modifies the parameter list of a model. Fitter subclasses should implement this method. """ raise NotImplementedError("Subclasses should implement this") def _fitter_to_model_params(self, model, fps): _fit_params, _fit_param_indices = model._model_to_fit_params() if any(model.fixed.values()) or any(model.tied.values()): model.parameters[_fit_param_indices] = fps for idx, name in enumerate(model.param_names): if model.tied[name] != False: value = model.tied[name](model) slice_ = model._param_metrics[name][0] model.parameters[slice_] = value elif any([tuple(b) != (None, None) for b in model.bounds.values()]): for name, par in zip(model.param_names, _fit_params): if model.bounds[name] != (None, None): b = model.bounds[name] if b[0] is not None: par = max(par, model.bounds[name][0]) if b[1] is not None: par = min(par, model.bounds[name][1]) setattr(model, name, par) else: model.parameters = fps
[docs]class LinearLSQFitter(Fitter): """ A class performing a linear least square fitting. Uses `numpy.linalg.lstsq` to do the fitting. Given a model and data, fits the model to the data and changes the model's parameters. Keeps a dictionary of auxiliary fitting information. Parameters ---------- model : an instance of `~astropy.modeling.core.ParametricModel` Raises ------ ModelLinearityError A nonlinear model is passed to a linear fitter """ supported_constraints = ['fixed'] def __init__(self): super(LinearLSQFitter, self).__init__() self.fit_info = {'residuals': None, 'rank': None, 'singular_values': None, 'params': None } @staticmethod def _deriv_with_constraints(model, param_indices, x=None, y=None): if y is None: d = np.array(model.deriv(x=x)) else: d = np.array(model.deriv(x=x, y=y)) if model.col_deriv: return d[param_indices] else: return d[:, param_indices] def _map_domain_window(self, model, x, y=None): """ Maps domain into window for a polynomial model which has these attributes. """ if y is None: if hasattr(model, 'domain') and model.domain is None: model.domain = [x.min(), x.max()] if hasattr(model, 'window') and model.window is None: model.window = [-1, 1] return poly_map_domain(x, model.domain, model.window) else: if hasattr(model, 'x_domain') and model.x_domain is None: model.x_domain = [x.min(), x.max()] if hasattr(model, 'y_domain') and model.y_domain is None: model.y_domain = [y.min(), y.max()] if hasattr(model, 'x_window') and model.x_window is None: model.x_window = [-1., 1.] if hasattr(model, 'y_window') and model.y_window is None: model.y_window = [-1., 1.] xnew = poly_map_domain(x, model.x_domain, model.x_window) ynew = poly_map_domain(y, model.y_domain, model.y_window) return xnew, ynew def __call__(self, model, x, y, z=None, weights=None, rcond=None): """ Fit data to this model. Parameters ---------- model : `ParametricModel` model to fit to x, y, z x : array input coordinates y : array input coordinates z : array (optional) input coordinates weights : array (optional) weights rcond : float, optional Cut-off ratio for small singular values of `a`. Singular values are set to zero if they are smaller than `rcond` times the largest singular value of `a`. Returns ------ model_copy : `ParametricModel` a copy of the input model with parameters set by the fitter """ if not model.fittable: raise ValueError("Model must be a subclass of ParametricModel") if not model.linear: raise ModelLinearityError('Model is not linear in parameters, ' 'linear fit methods should not be used.') self._validate_constraints(model) multiple = False model_copy = model.copy() _, fitparam_indices = model_copy._model_to_fit_params() self._weights = weights if model_copy.n_inputs == 2 and z is None: raise ValueError("Expected x, y and z for a 2 dimensional model.") farg = _convert_input(x, y, z) if len(farg) == 2: x, y = farg if y.ndim == 2: assert y.shape[1] == model_copy.param_dim, ( "Number of data sets (Y array is expected to equal " "the number of parameter sets") # map domain into window if hasattr(model_copy, 'domain'): x = self._map_domain_window(model_copy, x) if any(model_copy.fixed.values()): lhs = self._deriv_with_constraints(model_copy, fitparam_indices, x=x) else: lhs = model_copy.deriv(x=x) if len(y.shape) == 2: rhs = y multiple = y.shape[1] else: rhs = y else: x, y, z = farg if x.shape[-1] != z.shape[-1]: raise ValueError("x and z should have equal last dimensions") # map domain into window if hasattr(model_copy, 'x_domain'): x, y = self._map_domain_window(model_copy, x, y) if any(model_copy.fixed.values()): lhs = self._deriv_with_constraints(model_copy, fitparam_indices, x=x, y=y) else: lhs = model_copy.deriv(x=x, y=y) if len(z.shape) == 3: rhs = np.array([i.flatten() for i in z]).T multiple = z.shape[0] else: rhs = z.flatten() if weights is not None: weights = np.asarray(weights, dtype=np.float) if len(x) != len(weights): raise ValueError("x and weights should have the same length") if rhs.ndim == 2: lhs *= weights[:, np.newaxis] rhs *= weights[:, np.newaxis] else: lhs *= weights[:, np.newaxis] rhs *= weights if not multiple and model_copy.param_dim > 1: raise ValueError("Attempting to fit a 1D data set to a model " "with multiple parameter sets") if rcond is None: rcond = len(x) * np.finfo(x.dtype).eps scl = (lhs * lhs).sum(0) lacoef, resids, rank, sval = np.linalg.lstsq(lhs / scl, rhs, rcond) self.fit_info['residuals'] = resids self.fit_info['rank'] = rank self.fit_info['singular_values'] = sval # If y.n_inputs > model.n_inputs we are doing a simultanious 1D fitting # of several 1D arrays. Otherwise the model is 2D. # if y.n_inputs > self.model.n_inputs: if multiple and model_copy.param_dim != multiple: model_copy.param_dim = multiple # TODO: Changing the model's param_dim needs to be handled more # carefully; for now it's not actually allowed lacoef = (lacoef.T / scl).T self.fit_info['params'] = lacoef # TODO: Only Polynomial models currently have an _order attribute; # maybe change this to read isinstance(model, PolynomialBase) if hasattr(model_copy, '_order') and rank != model_copy._order: warnings.warn("The fit may be poorly conditioned\n", AstropyUserWarning) self._fitter_to_model_params(model_copy, lacoef.flatten()) return model_copy
[docs]class NonLinearLSQFitter(Fitter): """ A class performing non-linear least squares fitting using the Levenberg-Marquardt algorithm implemented in `scipy.optimize.leastsq`. Parameters ---------- model : a fittable `~astropy.modeling.core.ParametricModel` model to fit to data Raises ------ ModelLinearityError A linear model is passed to a nonlinear fitter """ supported_constraints = ['fixed', 'tied', 'bounds'] def __init__(self): self.fit_info = {'nfev': None, 'fvec': None, 'fjac': None, 'ipvt': None, 'qtf': None, 'message': None, 'ierr': None, 'status': None} super(NonLinearLSQFitter, self).__init__()
[docs] def errorfunc(self, fps, *args): model = args[0] self._fitter_to_model_params(model, fps) meas = args[-1] if self._weights is None: return np.ravel(model(*args[1 : -1]) - meas) else: return np.ravel(self._weights * (model(*args[1 : -1]) - meas)) # @property # def covar(self): # """ # Calculate the covariance matrix (doesn't take into account # constraints). # """ # NOTE: Not currently used; won't work with Fitter implementation where # models are not tied to fitters # n = len(self.model.parameters) # construct the permutation matrix # P = np.take(np.eye(n), self.fit_info['ipvt'] - 1, 0) # construct the R matrix as in JP = QR # r = np.triu(self.fit_info['fjac'].T[:n, :]) # r_pt = np.dot(r, P.T) # p_rt = np.dot(P, r.T) # try: # return np.dual.inv(np.dot(p_rt, r_pt)) # except: # log.info("Could not construct a covariance matrix") # return None
def __call__(self, model, x, y, z=None, weights=None, maxiter=DEFAULT_MAXITER, epsilon=DEFAULT_EPS, estimate_jacobian=False): """ Fit data to this model. Parameters ---------- model : `ParametricModel` model to fit to x, y, z x : array input coordinates y : array input coordinates z : array (optional) input coordinates weights : array (optional weights maxiter : int maximum number of iterations epsilon : float A suitable step length for the forward-difference approximation of the Jacobian (if model.fjac=None). If epsfcn is less than the machine precision, it is assumed that the relative errors in the functions are of the order of the machine precision. estimate_jacobian : bool If False (default) and if the model has a deriv method, it will be used. Otherwise the Jacobian will be estimated. If True, the Jacobian will be estimated in any case. Returns ------ model_copy : `ParametricModel` a copy of the input model with parameters set by the fitter """ if not model.fittable: raise ValueError("Model must be a subclass of ParametricModel") self._validate_constraints(model) from scipy import optimize model_copy = model.copy() farg = (model_copy, ) + _convert_input(x, y, z) self._weights = weights if model_copy.param_dim != 1: # only single data sets ca be fitted raise ValueError("NonLinearLSQFitter can only fit one " "data set at a time") if model_copy.deriv is None or estimate_jacobian: dfunc = None else: dfunc = self._wrap_deriv init_values, _ = model_copy._model_to_fit_params() fitparams, status, dinfo, mess, ierr = optimize.leastsq( self.errorfunc, init_values, args=farg, Dfun=dfunc, col_deriv=model_copy.col_deriv, maxfev=maxiter, epsfcn=epsilon, full_output=True) self._fitter_to_model_params(model_copy, fitparams) self.fit_info.update(dinfo) self.fit_info['status'] = status self.fit_info['message'] = mess self.fit_info['ierr'] = ierr if ierr not in [1, 2, 3, 4]: warnings.warn("The fit may be unsuccessful; check " "fit_info['message'] for more information.", AstropyUserWarning) return model_copy @staticmethod def _wrap_deriv(params, model, x, y, z=None): """ Wraps the method calculating the Jacobian of the function to account for model constraints. Currently the only fitter that uses a derivative is the `NonLinearLSQFitter`. This wrapper may need to be revised when other fitters using function derivative are added or when the statistic is separated from the fitting routines. `~scipy.optimize.leastsq` expects the function derivative to have the above signature (parlist, (argtuple)). In order to accomodate model constraints, instead of using p directly, we set the parameter list in this function. """ if any(model.fixed.values()) or any(model.tied.values()): if z is None: full_deriv = np.array(model.deriv(x, *model.parameters)) else: full_deriv = np.array(model.deriv(x, y, *model.parameters)) pars = [getattr(model, name) for name in model.param_names] fixed = [par.fixed for par in pars] tied = [par.tied for par in pars] tied = list(np.where([par.tied != False for par in pars], True, tied)) fix_and_tie = np.logical_or(fixed, tied) ind = np.logical_not(fix_and_tie) if not model.col_deriv: full_deriv = np.asarray(full_deriv).T residues = np.asarray(full_deriv[np.nonzero(ind)]) else: residues = full_deriv[np.nonzero(ind)] return [np.ravel(_) for _ in residues] else: if z is None: return model.deriv(x, *params) else: return [np.ravel(_) for _ in model.deriv(x, y, *params)]
[docs]class SLSQPFitter(Fitter): """ Sequential Least Squares Programming optimization algorithm. The algorithm is described in [1]_. It supports tied and fixed parameters, as well as bounded constraints. Uses `scipy.optimize.fmin_slsqp`. Raises ------ ModelLinearityError A linear model is passed to a nonlinear fitter References ---------- .. [1] http://www.netlib.org/toms/733 """ supported_constraints = ['bounds', 'eqcons', 'ineqcons', 'fixed', 'tied'] def __init__(self): super(SLSQPFitter, self).__init__() self.fit_info = { 'final_func_val': None, 'numiter': None, 'exit_mode': None, 'message': None }
[docs] def errorfunc(self, fps, *args): """ Compute the sum of the squared residuals Parameters ---------- fps : list parameters returned by the fitter args : list input coordinates """ model = args[0] meas = args[-1] self._fitter_to_model_params(model, fps) res = model(*args[1:-1]) - meas if self._weights is None: return np.sum(res ** 2) else: return np.sum(self._weights * res ** 2)
def __call__(self, model, x, y, z=None, weights=None, verblevel=0, maxiter=DEFAULT_MAXITER, epsilon=DEFAULT_EPS): """ Fit data to this model. Parameters ---------- model : `ParametricModel` model to fit to x, y, z x : array input coordinates y : array input coordinates z : array (optional) input coordinates weights : array (optional) weights verblevel : int 0-silent 1-print summary upon completion, 2-print summary after each iteration maxiter : int maximum number of iterations epsilon : float the step size for finite-difference derivative estimates Returns ------ model_copy : `ParametricModel` a copy of the input model with parameters set by the fitter """ if not model.fittable: raise ValueError("Model must be a subclass of ParametricModel") if model.linear: warnings.warn('Model is linear in parameters; ' 'consider using linear fitting methods.', AstropyUserWarning) self._validate_constraints(model) model_copy = model.copy() from scipy import optimize farg = _convert_input(x, y, z) farg = (model_copy, ) + farg self._weights = weights if model_copy.param_dim != 1: # for now only single data sets ca be fitted raise ValueError("NonLinearLSQFitter can only fit " "one data set at a time") p0, param_indices = model_copy._model_to_fit_params() pars = [getattr(model_copy, name) for name in model_copy.param_names] bounds = [par.bounds for par in pars if par.fixed != True and par.tied == False] bounds = np.asarray(bounds) for i in bounds: if i[0] is None: i[0] = DEFAULT_MIN_BOUND if i[1] is None: i[1] = DEFAULT_MAX_BOUND # older versions of scipy require this array to be float bounds = np.asarray(bounds, dtype=np.float) eqcons = np.array(model_copy.eqcons) ineqcons = np.array(model_copy.ineqcons) fitparams, final_func_val, numiter, exit_mode, mess = \ optimize.fmin_slsqp( self.errorfunc, p0, args=farg, disp=verblevel, full_output=1, bounds=bounds, eqcons=eqcons, ieqcons=ineqcons, iter=maxiter, acc=1.E-6, epsilon=DEFAULT_EPS) self._fitter_to_model_params(model_copy, fitparams) self.fit_info['final_func_val'] = final_func_val self.fit_info['numiter'] = numiter self.fit_info['exit_mode'] = exit_mode self.fit_info['message'] = mess if exit_mode != 0: warnings.warn("The fit may be unsuccessful; check " "fit_info['message'] for more information.", AstropyUserWarning) return model_copy
[docs]class JointFitter(object): """ Fit models which share a parameter. For example, fit two gaussians to two data sets but keep the FWHM the same. Parameters ---------- models : list a list of model instances jointparameters : list a list of joint parameters initvals : list a list of initial values """ def __init__(self, models, jointparameters, initvals): self.models = list(models) self.initvals = list(initvals) self.jointparams = jointparameters self._verify_input() for m in self.jointparams.keys(): m.set_joint_parameters(self.jointparams[m]) self.fitparams = self._model_to_fit_params() # a list of model.n_inputs self.modeldims = [m.n_inputs for m in self.models] # sum all model dimensions self.ndim = np.sum(self.modeldims) def _model_to_fit_params(self): fparams = [] fparams.extend(self.initvals) for model in self.models: params = [p.flatten() for p in model.parameters] for pname in model.joint: slc = model._param_metrics[pname][0] del params[slc] fparams.extend(params) return fparams
[docs] def errorfunc(self, fps, *args): """ fps : list the fitted parameters - result of an one iteration of the fitting algorithm args : dict tuple of measured and input coordinates args is always passed as a tuple from optimize.leastsq """ lstsqargs = list(args) fitted = [] fitparams = list(fps) numjp = len(self.initvals) # make a separate list of the joint fitted parameters jointfitparams = fitparams[:numjp] del fitparams[:numjp] for model in self.models: margs = lstsqargs[:model.n_inputs + 1] del lstsqargs[:model.n_inputs + 1] # separate each model separately fitted parameters numfp = len(model._parameters) - len(model.joint) mfparams = fitparams[:numfp] del fitparams[:numfp] # recreate the model parameters mparams = [] for pname in model.param_names: if pname in model.joint: index = model.joint.index(pname) # should do this with slices in case the # parameter is not a number mparams.extend([jointfitparams[index]]) else: slc = model._param_metrics[pname][0] plen = slc.stop - slc.start mparams.extend(mfparams[:plen]) del mfparams[:plen] modelfit = model.eval(margs[:-1], *mparams) fitted.extend(modelfit - margs[-1]) return np.ravel(fitted)
def _verify_input(self): assert(len(self.models) > 1) assert(len(self.jointparams.keys()) >= 2) for j in self.jointparams.keys(): assert(len(self.jointparams[j]) == len(self.initvals)) def __call__(self, *args): """ Fit data to these models keeping some of the pramaters common to the two models. """ from scipy import optimize assert(len(args) == reduce(lambda x, y: x + 1 + y + 1, self.modeldims)) self.fitparams[:], _ = optimize.leastsq(self.errorfunc, self.fitparams, args=args) fparams = self.fitparams[:] numjp = len(self.initvals) # make a separate list of the joint fitted parameters jointfitparams = fparams[:numjp] del fparams[:numjp] for model in self.models: # extract each model's fitted parameters numfp = len(model._parameters) - len(model.joint) mfparams = fparams[:numfp] del fparams[:numfp] # recreate the model parameters mparams = [] for pname in model.param_names: if pname in model.joint: index = model.joint.index(pname) # should do this with slices in case the parameter # is not a number mparams.extend([jointfitparams[index]]) else: slc = model._param_metrics[pname][0] plen = slc.stop - slc.start mparams.extend(mfparams[:plen]) del mfparams[:plen] model.parameters = np.array(mparams)

Page Contents