Source code for io_mp

"""
Input-Output handling

Handles all the input/output of the code (at least most of it). If something is
printed that does not satisfy you (number of decimals, for instance, in the
output files), you only have to find the called function and change a number.

Whenever the arguments of the functions are :code:`command_line` or
:code:`data`, no mention of them will be done - as it is now clear. On the
contrary, if there are more arguments, they will be detailled.

This module also defines a new class :class:`File`, that extends
:py:class:`file`, which provides a tail function. It is used in
:func:`sampler.read_args_from_chain`.

Finally, the way the error messages are displayed is set there, along with
ascii-art for the exclamation mark sign.
"""

import os
import re  # Module to handle regular expressions
from datetime import date
import fcntl
import textwrap  # used to format the error messages

# Ascii art for error display
START_LINE = {}
START_LINE['error'] = [r' /|\   ',
                       r'/_o_\  ',
                       r'       ']
START_LINE['warning'] = [r' /!\ ',
                         r'     ']
START_LINE['info'] = [r' /!\ ',
                      r'     ']

STANDARD_LENGTH = 80  # standard, increase if you have a big screen


[docs]def log_parameters(data, command_line): """ Write the first lines of the log.param Writes the beginning of log.param, starting with the header with the cosmological code version and potential git hash and branch name, and then recopies entirely the input parameter file. """ with open(os.path.join(command_line.folder, 'log.param'), 'w') as log: log.write("#-----{0} {1} (branch: {2}, hash: {3})-----\n\n".format( data.cosmological_module_name, data.version, data.git_branch, data.git_version)) with open(command_line.param, 'r') as param_file: for line in param_file: log.write(line)
[docs]def log_likelihood_parameters(likelihood, command_line): """ Write down the interpreted .data file of the input likelihood to log.param .. warning:: Since version 2.0.2, the lines are not copied verbatim, they are first interpreted, then copied. This allows for overriding of parameters from the input.param file. """ with open(os.path.join(command_line.folder, 'log.param'), 'a') as log: #tolog = open(likelihood.path, 'r') log.write("\n\n#-----Likelihood-{0}-----\n".format(likelihood.name)) for key, value in likelihood.dictionary.iteritems(): if type(value) != type(''): log.write("%s.%s = %s\n" % ( likelihood.name, key, value)) else: log.write("%s.%s = '%s'\n" % ( likelihood.name, key, value)) #for line in tolog: #log.write(line) #tolog.seek(0) #tolog.close()
[docs]def log_cosmo_arguments(data, command_line): """ Write down the `cosmo_arguments` used to log.param Third function called when writing log.param. It is understood here that all the other parameters for the cosmological modules are set to their default value directly in the program. It is written as an update for the dictionary cosmo_arguments (i.e. as :code:`dict.update()` and not as :code:`dict =`) in order not to erase previously initialized data. """ if len(data.cosmo_arguments) >= 1: log = open(os.path.join(command_line.folder, 'log.param'), 'a') log.write('\n\n#-----------Cosmological-arguments---------\n') log.write('data.cosmo_arguments.update({0})\n'.format( data.cosmo_arguments)) log.close()
[docs]def log_default_configuration(data, command_line): """ Log the .conf file to log.param Fourth and last function called when writing log.param. Only useful if you have several versions of your cosmological code installed in different locations, or different versions of Clik. But, as you never know what might go wrong, it is logged everytime ! TODO: should the root be still logged? (@packaging) """ log = open(os.path.join(command_line.folder, 'log.param'), 'a') log.write('\n\n#--------Default-Configuration------\n') for key, value in data.path.iteritems(): log.write("data.path['{0}']\t= '{1}'\n".format(key, value)) log.close()
[docs]def refresh_file(data): """ Closes and reopen the output file to write any buffered quantities """ data.out.close() data.out = open(data.out_name, 'a')
[docs]def create_output_files(command_line, data): """ Automatically create a new name for the chain. This routine takes care of organising the folder for you. It will automatically generate names for the new chains according to the date, number of points chosen. .. warning:: The way these names are generated (with the proper number of _, __, -, and their placement) is exploited in the rest of the code in various places. Please keep that in mind if ever you are in the mood of changing things here. """ if command_line.restart is None: number = command_line.N else: number = int( command_line.restart.split(os.path.sep)[-1].split('__')[0]. split('_')[1]) + command_line.N # output file outname_base = '{0}_{1}__'.format(date.today(), number) suffix = 0 trying = True if command_line.chain_number is None: for files in os.listdir(command_line.folder): if files.find(outname_base) != -1: if int(files.split('__')[-1].split('.')[0]) > suffix: suffix = int(files.split('__')[-1].split('.')[0]) suffix += 1 while trying: data.out = open(os.path.join( command_line.folder, outname_base)+str(suffix)+'.txt', 'w') try: lock(data.out, fcntl.LOCK_EX | fcntl.LOCK_NB) trying = False except LockError: suffix += 1 data.out_name = os.path.join( command_line.folder, outname_base)+str(suffix)+'.txt' print 'Creating %s\n' % data.out_name else: data.out_name = os.path.join( command_line.folder, outname_base)+command_line.chain_number+'.txt' data.out = open(data.out_name, 'w') print 'Creating %s\n' % data.out_name # in case of a restart, copying the whole thing in the new file if command_line.restart is not None: for line in open(command_line.restart, 'r'): data.out.write(line)
[docs]def get_tex_name(name, number=1): """ Simplistic tex name transformer. Essentially tries to add a backslash in front of known possible greek letters, and insert curly brackets { } around statement following an _ or a ^. It will also try to include the scale into the name in a nice way. .. note:: This might easily fail on simple names, like `beta_plus_lambda`. In this case, please use an extra plot file with the command line option :code:`-extra plot_file`, or come up with a better function ! .. note:: This function returns immediatly with the unmodified name if it already contains the LaTeX symbol for math, $. Parameters ---------- name : str Input name Keyword Arguments ----------------- number : float Scale """ # First, if the name already contains $ signs, returns it unmodified if name.find("$") != -1: return name tex_greek = ['omega', 'tau', 'alpha', 'beta', 'delta', 'nu', 'Omega', 'Lambda', 'lambda', 'Delta', 'mu', 'sigma', 'gamma'] for elem in tex_greek: if elem in name: position = name.find(elem) name = name[:position]+"""\\"""+name[position:] if name.find('_') != -1: temp_name = name.split('_')[0]+'_{' for i in range(1, len(name.split('_'))): temp_name += name.split('_')[i]+' ' name = temp_name + '}' if number == 1: name = "${0}$".format(name) elif number < 1000 and number > 1: name = "$%0.d~%s$" % (number, name) else: temp_name = "$%0.e%s$" % (number, name) m = re.search(r'(?:\$[0-9]*e\+[0]*)([0-9]*)(.*)', temp_name) sign = '+' if m is None: m = re.search(r'(?:\$[0-9]*e\-[0]*)([0-9]*)(.*)', temp_name) sign = '-' name = '$10^{'+sign+m.groups()[0]+'}'+m.groups()[1] return name
[docs]def write_covariance_matrix(covariance_matrix, names, path): """ Store the covariance matrix to a file """ with open(path, 'w') as cov: cov.write('# %s\n' % ', '.join(['%16s' % name for name in names])) for i in range(len(names)): for j in range(len(names)): if covariance_matrix[i][j] > 0: cov.write(' %.5e\t' % covariance_matrix[i][j]) else: cov.write('%.5e\t' % covariance_matrix[i][j]) cov.write('\n')
[docs]def write_bestfit_file(bestfit, names, path): """ Store the bestfit parameters to a file """ with open(path, 'w') as bestfit_file: bestfit_file.write( '# %s\n' % ', '.join(['%16s' % name for name in names])) # Removing scale factors in order to store true parameter values for i in range(len(names)): #bfvalue = chain[a[0], 2+i]*info.scales[i, i] bf_value = bestfit[i] if bf_value > 0: bestfit_file.write(' %.5e\t' % bf_value) else: bestfit_file.write('%.5e\t' % bf_value) bestfit_file.write('\n')
[docs]def pretty_print(string, status, return_string=False): """ Return the string formatted according to its status The input is a potentially long message, describing the problem. According to the severity of its status (so far, 'error' will exit the program, whereas 'warning' and 'info' will go through anyway). Standard length has been defined globally, as well as the ascii-art dictionary of arrays START_LINE. """ if return_string: output = '' length = STANDARD_LENGTH-len(START_LINE[status][0]) # Remove unwanted spaces (coming from carriage returns in the input string) # and handle voluntary carriage returns specified with \n first_cleanup = [' '.join(elem.lstrip(' ').split()) for elem in string.split('\n')] splitted = [] # Recover the lines splitted at correct length for elem in first_cleanup: splitted.extend(textwrap.wrap(elem, length)) if status == 'error': # Add a blank line so that the error displays better if return_string: output += '\n' else: print # Add in front the appropriate fancy display index = 0 for line in splitted: # If the number of needed lines is bigger than the ascii-art, the last # line of it (empty) will be used. if index < len(START_LINE[status]): start_index = index else: start_index = len(START_LINE[status])-1 if return_string: output += START_LINE[status][start_index]+line+'\n' else: print START_LINE[status][start_index]+line index += 1 if return_string: return output else: return
[docs]def safe_exec(string): """Attempt at executing a string from file in a secure way""" exec(string, {'__builtins__': {}})
[docs]class File(file): """ New class of file, to provide an equivalent of the tail command (on linux). It will be used when starting from an existing chain, and avoids circling through an immense file. """
[docs] def tail(self, lines_2find=1): """Imitates the classic tail command""" self.seek(0, 2) # go to end of file bytes_in_file = self.tell() lines_found, total_bytes_scanned = 0, 0 while (lines_2find+1 > lines_found and bytes_in_file > total_bytes_scanned): byte_block = min(1024, bytes_in_file-total_bytes_scanned) self.seek(-(byte_block+total_bytes_scanned), 2) total_bytes_scanned += byte_block lines_found += self.read(1024).count('\n') self.seek(-total_bytes_scanned, 2) line_list = list(self.readlines()) return line_list[-lines_2find:]
[docs]class LockError(Exception): """ .. warning:: in the process of being tested """ # Error codes: LOCK_FAILED = 1
[docs]def lock(file, flags): """ Lock a given file to prevent other instances of the code to write to the same file. .. warning:: in the process of being tested """ import fcntl try: fcntl.flock(file.fileno(), flags) except IOError as exc_value: # The exception code varies on different systems so we'll catch # every IO error raise LockError(*exc_value)
[docs]def unlock(file): """ Unlock a previously locked file. .. warning:: in the process of being tested """ import fcntl fcntl.flock(file.fileno(), fcntl.LOCK_UN)
[docs]def warning_message(message, *args): """ Custom implementation of `showwarning` from :mod:`warnings` """ pretty_print(message.args[0], "warning")
[docs]class MyError(Exception): """ Base class defining the general presentation of error messages """ def __init__(self, message): """Reformat the name of the class for easier reading""" Exception.__init__(self) self.message = message name = self.__class__.__name__ self.name = '' # Extract the name, and add spaces between the capital letters for index, letter in enumerate(name): if letter.isupper(): if index > 0: self.name += ' ' + letter else: self.name += letter else: self.name += letter
[docs] def __str__(self): """Define the behaviour under the print statement""" return '\n\n' + self.name + ':' + pretty_print( self.message, "error", True)
[docs]class CosmologicalModuleError(MyError): """For all problems linked to the cosmological module""" pass
[docs]class ConfigurationError(MyError): """Missing files, libraries, etc...""" pass
[docs]class MissingLibraryError(MyError): """Missing Cosmo module, Planck, ...""" pass
[docs]class LikelihoodError(MyError): """Problems when computing likelihood, missing nuisance, etc...""" pass
[docs]class FiducialModelWritten(MyError): """Used to exit the code in case of writing a fiducial file""" pass
[docs]class AnalyzeError(MyError): """Used when encountering a fatal mistake in analyzing chains""" pass