Skip to content

Utility Functions

General Utilities

binom_0(n, p)

Mirrors scipy binom.pmf as used in code

Source code in titan/utils.py
def binom_0(n: int, p: float):
    """
    Mirrors scipy binom.pmf as used in code
    """
    return (1 - p) ** n

connected_components(graph)

Get connected components in graph

Parameters:

Name Type Description Default
graph

the model's underlying graph

required

Returns:

Type Description

list of connected components

Source code in titan/utils.py
def connected_components(graph):
    """
    Get connected components in graph

    args:
        graph: the model's underlying graph

    returns:
        list of connected components
    """
    return sorted(
        list(graph.subgraph(c) for c in nx.connected_components(graph)),
        key=len,
        reverse=True,
    )

get_check_rand_int(seed)

Check the value passed of a seed, make sure it's an int, if 0, get a random seed

Parameters:

Name Type Description Default
seed int

integer to check or replace with a seed

required

Returns:

Type Description
int

validated seed

Source code in titan/utils.py
def get_check_rand_int(seed: int) -> int:
    """
    Check the value passed of a seed, make sure it's an int, if 0, get a random seed

    args:
        seed: integer to check or replace with a seed

    returns:
        validated seed
    """
    if type(seed) is not int or seed < 0:
        raise ValueError("Random seed must be positive integer")
    elif seed == 0:
        return random.randint(1, 1000000)
    else:
        return seed

get_cumulative_bin(rand_gen, bin_def)

Get the bin key given cumulative bins. A probability is selected at random, then each bin's prob is compared to it, the first bin that has a cumulative prob (e.g. for bin 2, the prob of bin 1 plus the prob of bin 2) less than or equal to that probability is returned.

Parameters:

Name Type Description Default
rand_gen

random number generator

required
bin_def ObjMap

ObjMap containing the bins

required

Returns:

Type Description
int

integer key of the matched bin (or last bin if no matches)

Source code in titan/utils.py
def get_cumulative_bin(rand_gen, bin_def: ObjMap) -> int:
    """
    Get the bin key given cumulative bins.  A probability is selected at random, then each bin's `prob` is compared to it, the first bin that has a cumulative `prob` (e.g. for bin 2, the prob of bin 1 plus the prob of bin 2) less than or equal to that probability is returned.

    args:
        rand_gen: random number generator
        bin_def: ObjMap containing the bins

    returns:
        integer key of the matched bin (or last bin if no matches)
    """
    rand_val = rand_gen.random()
    p = 0.0
    for bin, fields in bin_def.items():
        p += fields.prob
        if rand_val <= p:
            break

    return bin

get_independent_bin(rand_gen, bin_def)

Get the bin key given independent bins. A probability is selected at random, then each bin's prob is compared to it, the first bin that has a prob less than or equal to that probability is returned.

Parameters:

Name Type Description Default
rand_gen

A random number generator

required
bin_def ObjMap

The ObjMap containing the bins

required

Returns:

Type Description
int

The integer key of the matched bin (or last bin if no matches)

Source code in titan/utils.py
def get_independent_bin(rand_gen, bin_def: ObjMap) -> int:
    """
    Get the bin key given independent bins.  A probability is selected at random, then each bin's `prob` is compared to it, the first bin that has a `prob` less than or equal to that probability is returned.

    args:
        rand_gen: A random number generator
        bin_def: The ObjMap containing the bins

    returns:
        The integer key of the matched bin (or last bin if no matches)
    """
    rand_val = rand_gen.random()
    for bin, fields in bin_def.items():
        if rand_val <= fields.prob:
            break

    return bin

get_param_from_path(params, param_path, delimiter)

Given a params object and a delimited path, get the leaf of the params tree and the last key to access it

Source code in titan/utils.py
def get_param_from_path(params: ObjMap, param_path: str, delimiter: str):
    """
    Given a params object and a delimited path, get the leaf of the params tree
    and the last key to access it
    """
    path = param_path.split(delimiter)
    path_params = params
    for p in path[:-1]:
        try:
            path_params = path_params[p]
        except KeyError:
            path_params = path_params[int(p)]

    return path_params, path[-1]

grid_file_to_edge_yml(file_path, outfile_path, diagonal_neighbors=False)

Read a csv describing the layout of locations and write the results to a yml file describing the location edges, which can then be used in the params [location.edges].

Sample csv:

location_1,location_3,location_4
location_1,location_3,location_4
location_1,location_3,location_4
location_1,location_2,
location_2,location_2,

Would generate the edges: * location_1, location_3 * location_1, location_2 * location_2, location_3 * location_3, location_4

If diagonal_neighbors were True, the edge [location_2, location_4] would also be returned.

Parameters:

Name Type Description Default
file_path str

path to a csv file which contains a layout for the locations in the model.

required
outfile_path str

path where the resulting yml file should be saved

required
diagonal_neighbors bool

whether diagonally adjacent cells should be considered neighbors [default false]

False
Source code in titan/utils.py
def grid_file_to_edge_yml(
    file_path: str, outfile_path: str, diagonal_neighbors: bool = False
) -> None:
    """
    Read a csv describing the layout of locations and write the results to a yml file describing the location edges, which can then be used in the params [location.edges].

    Sample csv:
    ```
    location_1,location_3,location_4
    location_1,location_3,location_4
    location_1,location_3,location_4
    location_1,location_2,
    location_2,location_2,
    ```

    Would generate the edges:
    * location_1, location_3
    * location_1, location_2
    * location_2, location_3
    * location_3, location_4

    If `diagonal_neighbors` were True, the edge [location_2, location_4] would also be returned.

    args:
        file_path: path to a csv file which contains a layout for the locations in the model.
        outfile_path: path where the resulting yml file should be saved
        diagonal_neighbors: whether diagonally adjacent cells should be considered neighbors [default false]
    """
    edges = grid_file_to_edges(file_path, diagonal_neighbors=diagonal_neighbors)

    with open(outfile_path, "w") as f:
        yaml.dump({"edges": edges}, f)

grid_file_to_edges(file_path, diagonal_neighbors=False)

Read a csv describing the layout of locations and return a dictionary describing the location edges, which can then be used in the params [location.edges].

Sample csv:

location_1,location_3,location_4
location_1,location_3,location_4
location_1,location_3,location_4
location_1,location_2,
location_2,location_2,

Would generate the edges: * location_1, location_3 * location_1, location_2 * location_2, location_3 * location_3, location_4

If diagonal_neighbors were True, the edge [location_2, location_4] would also be returned.

Parameters:

Name Type Description Default
file_path str

path to a csv file which contains a layout for the locations in the model.

required
diagonal_neighbors bool

whether diagonally adjacent cells should be considered neighbors [default false]

False

Returns:

Type Description
Dict

A dictionary with generated edge names to locations

Source code in titan/utils.py
def grid_file_to_edges(file_path: str, diagonal_neighbors: bool = False) -> Dict:
    """
    Read a csv describing the layout of locations and return a dictionary describing the location edges, which can then be used in the params [location.edges].

    Sample csv:
    ```
    location_1,location_3,location_4
    location_1,location_3,location_4
    location_1,location_3,location_4
    location_1,location_2,
    location_2,location_2,
    ```

    Would generate the edges:
    * location_1, location_3
    * location_1, location_2
    * location_2, location_3
    * location_3, location_4

    If `diagonal_neighbors` were True, the edge [location_2, location_4] would also be returned.

    args:
        file_path: path to a csv file which contains a layout for the locations in the model.
        diagonal_neighbors: whether diagonally adjacent cells should be considered neighbors [default false]

    returns:
        A dictionary with generated edge names to locations
    """
    # read in the grid
    grid = []
    with open(file_path, newline="") as f:
        reader = csv.reader(f)
        for row in reader:
            grid.append(row)

    # generate edge pairs
    edges: Set[Tuple[str, str]] = set()
    for i in range(len(grid) - 1):
        for j in range(len(grid[0]) - 1):
            loc = grid[i][j]
            if loc == "":
                continue

            add_edge(edges, loc, grid[i + 1][j])
            add_edge(edges, loc, grid[i][j + 1])

            if diagonal_neighbors:
                add_edge(edges, loc, grid[i + 1][j + 1])
                if i >= 1:
                    add_edge(edges, loc, grid[i - 1][j + 1])
                if j >= 1:
                    add_edge(edges, loc, grid[i + 1][j - 1])

    res = {}
    for (i, edge) in enumerate(edges):
        res[f"edge_{i+1}"] = {"location_1": edge[0], "location_2": edge[1]}

    return res

memo(f)

Decorator to memoize a function (caches results given args, only use if deterministic)

Source code in titan/utils.py
def memo(f):
    """
    Decorator to memoize a function
    (caches results given args, only use if deterministic)
    """
    cache = {}

    @wraps(f)
    def wrap(*arg):
        if arg not in cache:
            cache[arg] = f(*arg)
        return cache[arg]

    return wrap

override_param(params, param_path, value, delimiter='|')

Given the params and a parameter path in the format prep|cap, change the current value to new value

Source code in titan/utils.py
def override_param(params: ObjMap, param_path: str, value, delimiter="|"):
    """
    Given the params and a parameter path in the format prep|cap, change the
    current value to new value
    """
    override_item, last_key = get_param_from_path(params, param_path, delimiter)
    try:
        old_val = override_item[last_key]
    except KeyError:
        last_key = int(last_key)
        old_val = override_item[last_key]

    logging.info(f"overriding - {param_path}: {old_val} => {value}")
    override_item[last_key] = value

safe_dist(dist_info, rand_gen)

Draw a value from a distribution as defined in dist_info.

Parameters:

Name Type Description Default
dist_info ObjMap

a definition of a distribution to use [params.classes.distributions]

required
rand_gen

random number generator

required

Returns:

Type Description
Union[int, float]

a value drawn from the distribution

Source code in titan/utils.py
def safe_dist(dist_info: ObjMap, rand_gen) -> Union[int, float]:
    """
    Draw a value from a distribution as defined in `dist_info`.

    args:
        dist_info: a definition of a distribution to use [params.classes.distributions]
        rand_gen: random number generator

    returns:
        a value drawn from the distribution
    """
    # gather arguments
    args = []
    for d in dist_info.vars.values():
        args.append(parse_var(d.value, d.value_type))

    dist = get_dist(rand_gen, dist_info.dist_type)
    value = dist(*args)

    if hasattr(value, "__iter__"):  # check if value is any type of sequence
        return value[0]
    else:
        return value

safe_divide(numerator, denominator)

Divide two numbers, but default 0 if denominator is 0, otherwise divide as normal.

Parameters:

Name Type Description Default
numerator int

number being divided

required
denominator int

number doing the dividing

required

Returns:

Type Description
float

resulting number

Source code in titan/utils.py
def safe_divide(numerator: int, denominator: int) -> float:
    """
    Divide two numbers, but default 0 if denominator is 0, otherwise divide as normal.

    args:
        numerator: number being divided
        denominator: number doing the dividing

    returns:
        resulting number
    """
    if denominator == 0:
        return 0.0
    else:
        return 1.0 * numerator / denominator

safe_random_choice(seq, rand_gen, weights=None)

Return None or a random choice from a collection of items

Parameters:

Name Type Description Default
seq

collection to select a random item from

required
rand_gen

random number generator

required
weights

an optional collection of weights to use instead of a uniform distribution

None

Returns:

Type Description

an item, or None if the collection is empty

Source code in titan/utils.py
def safe_random_choice(seq, rand_gen, weights=None):
    """
    Return None or a random choice from a collection of items

    args:
        seq: collection to select a random item from
        rand_gen: random number generator
        weights: an optional collection of weights to use instead of a uniform distribution

    returns:
        an item, or `None` if the collection is empty
    """
    if not seq:
        return None

    if isinstance(seq, set):
        seq = tuple(seq)

    # don't call out to random choices if we don't need to (for performance)
    if len(seq) == 1:
        return seq[0]
    elif len(seq) == 2 and weights is None:
        return seq[0] if rand_gen.random() <= 0.5 else seq[1]

    choices = rand_gen.choices(seq, weights=weights)
    return choices[0]

safe_random_int(start, stop, rand_gen)

Return an integer between [start, stop)

Parameters:

Name Type Description Default
start int

start value

required
stop int

stop value

required
rand_gen

random number generator

required

Returns:

Type Description
int

an item, or None if the collection is empty

Source code in titan/utils.py
def safe_random_int(start: int, stop: int, rand_gen) -> int:
    """
    Return an integer between [start, stop)

    args:
        start: start value
        stop: stop value
        rand_gen: random number generator

    returns:
        an item, or `None` if the collection is empty
    """
    return floor(rand_gen.random() * (stop - start) + start)

safe_shuffle(seq, rand_gen)

Return None or a shuffled sequence

Parameters:

Name Type Description Default
seq Collection[~T]

collection to shuffle

required
rand_gen

random number generator

required

Returns:

Type Description
Iterable[~T]

shuffled sequence, or None if empty

Source code in titan/utils.py
def safe_shuffle(seq: Collection[T], rand_gen) -> Iterable[T]:
    """
    Return None or a shuffled sequence

    args:
        seq: collection to shuffle
        rand_gen: random number generator

    returns:
        shuffled sequence, or `None` if empty
    """
    if seq:
        if isinstance(seq, set):
            seq = list(seq)
        rand_gen.shuffle(seq)
        return seq
    else:
        return []

scale_param(params, param_path, scalar, delimiter='|')

Given the params and a parameter path in the format prep|cap, scale the current value by the scalar

Source code in titan/utils.py
def scale_param(params: ObjMap, param_path: str, scalar: float, delimiter="|"):
    """
    Given the params and a parameter path in the format prep|cap, scale the
    current value by the scalar
    """
    scaling_item, last_key = get_param_from_path(params, param_path, delimiter)

    old_val = scaling_item[last_key]
    logging.info(f"scaling - {param_path}: {old_val} => {old_val * scalar}")
    scaling_item[last_key] = old_val * scalar

total_probability(p, num_acts)

Given a per act probability and a number of acts, return the total probability.

Parameters:

Name Type Description Default
p float

the per act probability

required
num_acts int

the number of acts

required

Returns:

Type Description
float

the total probability

Source code in titan/utils.py
def total_probability(p: float, num_acts: int) -> float:
    """
    Given a per act probability and a number of acts, return the total probability.

    args:
        p: the per act probability
        num_acts: the number of acts

    returns:
        the total probability
    """
    if num_acts == 1:
        return p
    elif num_acts >= 1:
        return 1.0 - binom_0(num_acts, p)
    else:
        return 0.0

Params

ObjMap

A dictionary-like class which allows accessing members either using standard dictionary notation or dots. Note the hash function is hard-coded - beware.

__hash__(self) special

Return hash(self).

Source code in titan/parse_params.py
def __hash__(self):
    return 1234567890

check_params(params)

Consistency checks for param populations

Source code in titan/parse_params.py
def check_params(params: ObjMap):
    """
    Consistency checks for param populations
    """
    race_pop = 0
    for race in params.classes.races:
        r_dems = params.demographics[race]
        race_pop += r_dems.ppl
        sex_type_pop = 0
        for st, st_dems in r_dems.sex_type.items():
            if st in list(params.classes.sex_types.keys()):
                sex_type_pop += st_dems.ppl

            drug_type_pop = 0
            for dt, dt_dems in st_dems.drug_type.items():
                if dt in list(params.classes.drug_types):
                    drug_type_pop += dt_dems.ppl

            assert math.isclose(
                drug_type_pop, 1, abs_tol=0.001
            ), f"ppl of {race}'s {st}'s drug_types must add to 1. Currently adding to {drug_type_pop}"

        assert math.isclose(
            sex_type_pop, 1, abs_tol=0.001
        ), f"ppl of {race}'s sex_types must add to 1. Currently adding to {sex_type_pop}"

    assert math.isclose(race_pop, 1, abs_tol=0.001), "ppl of races must add to 1"

    loc_pop = 0
    for location in params.classes.locations.values():
        loc_pop += location.ppl

    assert math.isclose(loc_pop, 1, abs_tol=0.001), "ppl of locations must add to 1"

    for param, assort in params.assort_mix.items():
        assort_value = 0
        for ptnr_value in assort.partner_values.values():
            assort_value += ptnr_value
        assert math.isclose(
            assort_value, 1, abs_tol=0.001
        ), f"assort values must add to 1, not {assort_value} in {param}"

create_params(setting_name, param_path, outdir, error_on_unused=False)

Entry function - given the path to the setting, params, output directory and whether or not to use the base setting. Parse and create a params (ObjMap) object.

Parameters:

Name Type Description Default
setting_name Optional[str]

path to a settings file or directory or None

required
param_path str

path to parameter file or directory

required
outdir str

path to directory where computed params will be saved

required
error_on_unused bool

throw a hard error if there are unused parameters, otherwise warnings are only printed

False

Returns:

Type Description
ObjMap

computed/validated model paramters with defaults filled in where needed

Source code in titan/parse_params.py
def create_params(
    setting_name: Optional[str],
    param_path: str,
    outdir: str,
    error_on_unused: bool = False,
) -> ObjMap:
    """
    Entry function - given the path to the setting, params, output directory and whether
    or not to use the base setting. Parse and create a params (ObjMap) object.

    args:
        setting_name: path to a settings file or directory or `None`
        param_path: path to parameter file or directory
        outdir: path to directory where computed params will be saved
        error_on_unused: throw a hard error if there are unused parameters, otherwise warnings are only printed

    returns:
        computed/validated model paramters with defaults filled in where needed
    """
    # find defs, where we are in the code for settings and base
    filename = getsourcefile(create_params)  # what is the sourcefile for this function
    if filename is not None:
        parent = Path(filename).resolve().parent
    else:
        raise Exception("can't find where I am in the code?")

    param_defs = os.path.join(parent, "params")

    param_paths = []

    # merge setting and params
    if setting_name is not None:
        # check if it's a known setting or pass it through as a path
        if setting_name in os.listdir(os.path.join(parent, "settings")):
            param_paths.append(os.path.join(parent, "settings", setting_name))
        else:
            param_paths.append(setting_name)

    param_paths.append(param_path)

    parsed = paraml.create_params(
        param_defs,
        *param_paths,
        out_path=os.path.join(outdir, "params.yml"),
        error_on_unused=error_on_unused,
    )

    parsed = ObjMap(parsed)
    check_params(parsed)

    # copy migration file if enabled
    if parsed.location.migration.enabled:
        shutil.copy(
            parsed.location.migration.probs_file,
            os.path.join(outdir, "migration_probs.csv"),
        )

    return parsed

Probability Distributions

pert(np_random, low, peak, high, temperature)

A pert distribution, inspired by tensorflow

arguments must be so that:

  • low < peak < high
  • temperature > 0

The support is [low, high]. The peak must fit in that interval: low < peak < high. The temperature is a positive parameter that controls the shape of the distribution. Higher values yield a sharper peak.

Parameters:

Name Type Description Default
np_random

random number generator (used to get beta)

required
low

distribution low value

required
peak

modal point in distribution

required
high

distribution high value

required
temperature

scaling factor

required
Source code in titan/distributions.py
def pert(np_random, low, peak, high, temperature):
    """
    A pert distribution, inspired by [tensorflow](https://github.com/tensorflow/probability/blob/c833ee5cd9f60f3257366b25447b9e50210b0590/tensorflow_probability/python/distributions/pert.py#L137)

    arguments must be so that:

    * low < peak < high
    * temperature > 0

    The support is `[low, high]`.  The `peak` must fit in that interval:
      `low < peak < high`.  The `temperature` is a positive parameter that
      controls the shape of the distribution. Higher values yield a sharper peak.

    args:
        np_random: random number generator (used to get beta)
        low: distribution low value
        peak: modal point in distribution
        high: distribution high value
        temperature: scaling factor
    """
    assert low < peak < high
    assert temperature > 0

    scale = high - low
    alpha = 1.0 + temperature * (peak - low) / scale
    beta = 1.0 + temperature * (high - peak) / scale
    return low + scale * np_random.beta(alpha, beta)

poisson(np_rand, mu)

Mirrors scipy poisson.rvs function as used in code

Source code in titan/distributions.py
def poisson(np_rand, mu: float):
    """
    Mirrors scipy poisson.rvs function as used in code
    """
    if mu < 0:
        return 0
    return np_rand.poisson(mu)

set_value(np_random, value)

A distribution that always returns the value passed

Parameters:

Name Type Description Default
np_random

random number generator (to conform to distribution interface)

required
value

value to return

required
Source code in titan/distributions.py
def set_value(np_random, value):
    """
    A distribution that always returns the value passed

    args:
        np_random: random number generator (to conform to distribution interface)
        value: value to return
    """
    return value

weibull_modified(np_random, shape, scale)

Modified version of numpy's (single parameter) weibull distribution to use the 2-parameter weibull.

Parameters:

Name Type Description Default
np_random

random number generator

required
shape

weibull shape parameter

required
scale

weibull scale parameter

required
Source code in titan/distributions.py
def weibull_modified(np_random, shape, scale):
    """
    Modified version of numpy's (single parameter) weibull distribution to use the 2-parameter weibull.

    args:
        np_random: random number generator
        shape: weibull shape parameter
        scale: weibull scale parameter
    """
    random_number = np_random.random()
    return scale * (-log(1 - random_number)) ** (1 / shape)

Complex Probabilities

get_death_rate(hiv, aids, drug_type, sex_type, haart_adh, race, location, steps_per_year, exit_class)

Find the death rate of an agent given a set of attributes.

Parameters:

Name Type Description Default
hiv bool

whether the agent is HIV+

required
aids bool

whether the agent has AIDS

required
drug_type str

whether the PWID base death rate should be used or the base one

required
haart_adh bool

whether an agent is haart adherent

required
race str

the race of the agent

required
location Location

agent's location

required
steps_per_year int

the number of model steps in a year

required
exit_class str

the exit class to access in agent params

required

Returns:

Type Description
float

the probability of an agent with these characteristics dying in a given time step

Source code in titan/probabilities.py
@utils.memo
def get_death_rate(
    hiv: bool,
    aids: bool,
    drug_type: str,
    sex_type: str,
    haart_adh: bool,
    race: str,
    location: Location,
    steps_per_year: int,
    exit_class: str,
) -> float:
    """
    Find the death rate of an agent given a set of attributes.

    args:
        hiv: whether the agent is HIV+
        aids: whether the agent has AIDS
        drug_type: whether the PWID base death rate should be used or the base one
        haart_adh: whether an agent is haart adherent
        race: the race of the agent
        location: agent's location
        steps_per_year: the number of model steps in a year
        exit_class: the exit class to access in agent params

    returns:
        the probability of an agent with these characteristics dying in a given time step
    """
    param = location.params.demographics

    death_param = param[race].sex_type[sex_type].drug_type[drug_type].exit[exit_class]

    p = death_param.base

    if aids:
        p *= death_param.aids
    elif hiv:
        if haart_adh:
            p *= death_param.haart_adherent
        else:
            p *= death_param.hiv

    # putting it into per 1 person-month from per 1000 person years
    return p / (1000 * steps_per_year)