Source code for run

"""
.. module:: run_mp
    :synopsis: Run the code, initialising everything and calling the sampler
.. moduleauthor:: Benjamin Audren <benjamin.audren@epfl.ch>

"""
from initialise import initialise
from data import Data
import io_mp
import sys
import warnings
import os
import re


[docs]def run(custom_command=''): """ Main call of the function It recovers the initialised instances of cosmo Class, :class:`Data` and the NameSpace containing the command line arguments, feeding into the sampler. Parameters ---------- custom_command: str allows for testing the code """ # Create all the instances of the needed classes to run the code. The safe # initialisation handles the errors. cosmo, data, command_line, success = safe_initialisation( custom_command) # If success is False, it means either that the initialisation was not # successful, or that it was simply an analysis call. The run should # stop if not success: return # Once that the initialisation phase is done, one can import the # sampler import sampler # Generic sampler call sampler.run(cosmo, data, command_line) return
[docs]def mpi_run(custom_command=""): """ Launch a simple MPI run, with no communication of covariance matrix Each process will make sure to initialise the folder if needed. Then and only then, it will send the signal to its next in line to proceed. This allows for initialisation over an arbitrary cluster geometry (you can have a single node with many cores, and all the chains living there, or many nodes with few cores). The speed loss due to the time spend checking if the folder is created should be negligible when running decently sized chains. Each process will send the number that it found to be the first available to its friends, so that the gathering of information post-run is made easier. If a chain number is specified, this will be used as the first number, and then incremented afterwards with the rank of the process. """ from mpi4py import MPI comm = MPI.COMM_WORLD nprocs = comm.Get_size() rank = comm.Get_rank() success = True folder = '' # If the process is not the zeroth one, then wait for a signal from your # n-1 before initializing the folder if rank != 0: status = comm.recv(source=rank-1, tag=1) folder = comm.recv(source=rank-1, tag=2) if status == 'failed': success = False else: number = status if success: if not custom_command: custom_command = " ".join(sys.argv[1:]) if rank != 0: custom_command += " --chain-number %s" % str(int(number)+1) # First check if the folder is there already_sent = False if rank != 0 and rank < nprocs-1: status = int(number)+1 if Data.folder_is_initialised(folder): comm.send(status, dest=rank+1, tag=1) comm.send(folder, dest=rank+1, tag=2) already_sent = True # Then, properly initialise cosmo, data, command_line, success = safe_initialisation( custom_command, comm, nprocs) # The first initialisation should check a few more things if rank == 0: # Check that the run asked is compatible with mpirun and prepare. if command_line.subparser_name == 'info': warnings.warn( "Analyzing the chains is not supported in mpirun" " so this will run on one core only.") status = 'failed' elif command_line.method == "MH": regexp = re.match(".*__(\w*).txt", data.out_name) suffix = regexp.groups()[0] status = suffix elif command_line.method == "NS": status = 1 else: warnings.warn( "The method '%s' is not supported"%(command_line.method) + " in mpirun so this will run on one core only.") status = 'failed' folder = data.out_name elif rank < nprocs-1: status = int(number)+1 # Send an "OK" signal to the next in line, giving the its own chain # number for the other to add 1 if rank < nprocs-1: if not already_sent: comm.send(status, dest=rank+1, tag=1) comm.send(folder, dest=rank+1, tag=2) else: if rank < nprocs-1: comm.send('failed', dest=rank+1, tag=1) comm.send('', dest=rank+1, tag=2) if success: import sampler sampler.run(cosmo, data, command_line) return
[docs]def mock_update_run(custom_command=""): """ Tentative covmat update run Not reachable yet by any option. """ from mpi4py import MPI comm = MPI.COMM_WORLD rank = comm.Get_rank() # store the command_line space if not custom_command: custom_command = " ".join(sys.argv[1:]) # Do a first run mpi_run(custom_command) # Compute the covariance matrix if rank == 0: info_command = from_run_to_info(custom_command) initialise(info_command) # Make sure that the covariance matrix used is the one just computed with_covmat = add_covariance_matrix(custom_command) mpi_run(with_covmat) return
[docs]def safe_initialisation(custom_command="", comm=None, nprocs=1): """ Wrapper around the init function to handle errors KeyWord Arguments ----------------- custom_command : str testing purposes comm : MPI.Intracomm object that helps communicating between the processes nprocs : int number of processes """ try: cosmo, data, command_line, success = initialise(custom_command) except io_mp.ConfigurationError as message: if comm: for index in range(1, nprocs): comm.send('failed', dest=index, tag=1) print str(message) raise io_mp.ConfigurationError( "The initialisation was not successful, resulting in a " "potentially half created `log.param`. Please see the " "above error message. If you run the exact same command, it" " will not work. You should solve the problem, and try again.") except KeyError as e: if comm: for index in range(1, nprocs): comm.send('failed', dest=index, tag=1) raise io_mp.ConfigurationError( "You are running in a folder that was created following " "a non-successful initialisation (wrong parameter name, " "wrong likelihood, etc...). If you have solved the issue, you " "should remove completely the output folder, and try again." + " Alternatively, there could be a problem with "+e.message) return cosmo, data, command_line, success
[docs]def from_run_to_info(command): """ Translate a command corresponding to a run into one for analysis """ original = command.split() new = ['info'] # Recover the folder index = original.index('-o') folder = original[index+1] # Recover chain number index = original.index('-N') number = original[index+1] # Recover all relevant chains chains = [os.path.join(folder, elem) for elem in os.listdir(folder) if elem.find(number) != -1] new.extend(chains) # Do not plot the pdf new.append("--noplot") return " ".join(new)
[docs]def add_covariance_matrix(command): """ Make sure that the command uses the covariance matrix from the folder """ original = command.split() # recover the folder index = original.index('-o') folder = original[index+1] # get the name of the folder name = folder.split(os.path.sep)[-1] covname = os.path.join( folder, name+'.covmat') bfname = os.path.join( folder, name+'.bestfit') if '-c' in original: index = original.index('-c') original[index+1] = covname else: original.extend(['-c', covname]) if '-b' in original: index = original.index('-b') original[index+1] = bfname elif '--bestfit' in original: index = original.index('--bestfit') original[index+1] = bfname else: original.extend(['-b', bfname]) # Change the number of steps asked index = original.index('-N') number = list(str(original[index+1])) number[0] = str(int(number[0])+1) newnumber = "".join(number) original[index+1] = str(newnumber) return " ".join(original)