Utility Functions
General Utilities
binom_0(n, p)
Mirrors scipy binom.pmf as used in code
Source code in titan/utils.py
def binom_0(n: int, p: float):
"""
Mirrors scipy binom.pmf as used in code
"""
return (1 - p) ** n
connected_components(graph)
Get connected components in graph
Parameters:
Name | Type | Description | Default |
---|---|---|---|
graph |
|
the model's underlying graph |
required |
Returns:
Type | Description |
---|---|
|
list of connected components |
Source code in titan/utils.py
def connected_components(graph):
"""
Get connected components in graph
args:
graph: the model's underlying graph
returns:
list of connected components
"""
return sorted(
list(graph.subgraph(c) for c in nx.connected_components(graph)),
key=len,
reverse=True,
)
get_check_rand_int(seed)
Check the value passed of a seed, make sure it's an int, if 0, get a random seed
Parameters:
Name | Type | Description | Default |
---|---|---|---|
seed |
int |
integer to check or replace with a seed |
required |
Returns:
Type | Description |
---|---|
int |
validated seed |
Source code in titan/utils.py
def get_check_rand_int(seed: int) -> int:
"""
Check the value passed of a seed, make sure it's an int, if 0, get a random seed
args:
seed: integer to check or replace with a seed
returns:
validated seed
"""
if type(seed) is not int or seed < 0:
raise ValueError("Random seed must be positive integer")
elif seed == 0:
return random.randint(1, 1000000)
else:
return seed
get_cumulative_bin(rand_gen, bin_def)
Get the bin key given cumulative bins. A probability is selected at random, then each bin's prob
is compared to it, the first bin that has a cumulative prob
(e.g. for bin 2, the prob of bin 1 plus the prob of bin 2) less than or equal to that probability is returned.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
rand_gen |
|
random number generator |
required |
bin_def |
ObjMap |
ObjMap containing the bins |
required |
Returns:
Type | Description |
---|---|
int |
integer key of the matched bin (or last bin if no matches) |
Source code in titan/utils.py
def get_cumulative_bin(rand_gen, bin_def: ObjMap) -> int:
"""
Get the bin key given cumulative bins. A probability is selected at random, then each bin's `prob` is compared to it, the first bin that has a cumulative `prob` (e.g. for bin 2, the prob of bin 1 plus the prob of bin 2) less than or equal to that probability is returned.
args:
rand_gen: random number generator
bin_def: ObjMap containing the bins
returns:
integer key of the matched bin (or last bin if no matches)
"""
rand_val = rand_gen.random()
p = 0.0
for bin, fields in bin_def.items():
p += fields.prob
if rand_val <= p:
break
return bin
get_independent_bin(rand_gen, bin_def)
Get the bin key given independent bins. A probability is selected at random, then each bin's prob
is compared to it, the first bin that has a prob
less than or equal to that probability is returned.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
rand_gen |
|
A random number generator |
required |
bin_def |
ObjMap |
The ObjMap containing the bins |
required |
Returns:
Type | Description |
---|---|
int |
The integer key of the matched bin (or last bin if no matches) |
Source code in titan/utils.py
def get_independent_bin(rand_gen, bin_def: ObjMap) -> int:
"""
Get the bin key given independent bins. A probability is selected at random, then each bin's `prob` is compared to it, the first bin that has a `prob` less than or equal to that probability is returned.
args:
rand_gen: A random number generator
bin_def: The ObjMap containing the bins
returns:
The integer key of the matched bin (or last bin if no matches)
"""
rand_val = rand_gen.random()
for bin, fields in bin_def.items():
if rand_val <= fields.prob:
break
return bin
get_param_from_path(params, param_path, delimiter)
Given a params object and a delimited path, get the leaf of the params tree and the last key to access it
Source code in titan/utils.py
def get_param_from_path(params: ObjMap, param_path: str, delimiter: str):
"""
Given a params object and a delimited path, get the leaf of the params tree
and the last key to access it
"""
path = param_path.split(delimiter)
path_params = params
for p in path[:-1]:
try:
path_params = path_params[p]
except KeyError:
path_params = path_params[int(p)]
return path_params, path[-1]
grid_file_to_edge_yml(file_path, outfile_path, diagonal_neighbors=False)
Read a csv describing the layout of locations and write the results to a yml file describing the location edges, which can then be used in the params [location.edges].
Sample csv:
location_1,location_3,location_4
location_1,location_3,location_4
location_1,location_3,location_4
location_1,location_2,
location_2,location_2,
Would generate the edges: * location_1, location_3 * location_1, location_2 * location_2, location_3 * location_3, location_4
If diagonal_neighbors
were True, the edge [location_2, location_4] would also be returned.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
file_path |
str |
path to a csv file which contains a layout for the locations in the model. |
required |
outfile_path |
str |
path where the resulting yml file should be saved |
required |
diagonal_neighbors |
bool |
whether diagonally adjacent cells should be considered neighbors [default false] |
False |
Source code in titan/utils.py
def grid_file_to_edge_yml(
file_path: str, outfile_path: str, diagonal_neighbors: bool = False
) -> None:
"""
Read a csv describing the layout of locations and write the results to a yml file describing the location edges, which can then be used in the params [location.edges].
Sample csv:
```
location_1,location_3,location_4
location_1,location_3,location_4
location_1,location_3,location_4
location_1,location_2,
location_2,location_2,
```
Would generate the edges:
* location_1, location_3
* location_1, location_2
* location_2, location_3
* location_3, location_4
If `diagonal_neighbors` were True, the edge [location_2, location_4] would also be returned.
args:
file_path: path to a csv file which contains a layout for the locations in the model.
outfile_path: path where the resulting yml file should be saved
diagonal_neighbors: whether diagonally adjacent cells should be considered neighbors [default false]
"""
edges = grid_file_to_edges(file_path, diagonal_neighbors=diagonal_neighbors)
with open(outfile_path, "w") as f:
yaml.dump({"edges": edges}, f)
grid_file_to_edges(file_path, diagonal_neighbors=False)
Read a csv describing the layout of locations and return a dictionary describing the location edges, which can then be used in the params [location.edges].
Sample csv:
location_1,location_3,location_4
location_1,location_3,location_4
location_1,location_3,location_4
location_1,location_2,
location_2,location_2,
Would generate the edges: * location_1, location_3 * location_1, location_2 * location_2, location_3 * location_3, location_4
If diagonal_neighbors
were True, the edge [location_2, location_4] would also be returned.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
file_path |
str |
path to a csv file which contains a layout for the locations in the model. |
required |
diagonal_neighbors |
bool |
whether diagonally adjacent cells should be considered neighbors [default false] |
False |
Returns:
Type | Description |
---|---|
Dict |
A dictionary with generated edge names to locations |
Source code in titan/utils.py
def grid_file_to_edges(file_path: str, diagonal_neighbors: bool = False) -> Dict:
"""
Read a csv describing the layout of locations and return a dictionary describing the location edges, which can then be used in the params [location.edges].
Sample csv:
```
location_1,location_3,location_4
location_1,location_3,location_4
location_1,location_3,location_4
location_1,location_2,
location_2,location_2,
```
Would generate the edges:
* location_1, location_3
* location_1, location_2
* location_2, location_3
* location_3, location_4
If `diagonal_neighbors` were True, the edge [location_2, location_4] would also be returned.
args:
file_path: path to a csv file which contains a layout for the locations in the model.
diagonal_neighbors: whether diagonally adjacent cells should be considered neighbors [default false]
returns:
A dictionary with generated edge names to locations
"""
# read in the grid
grid = []
with open(file_path, newline="") as f:
reader = csv.reader(f)
for row in reader:
grid.append(row)
# generate edge pairs
edges: Set[Tuple[str, str]] = set()
for i in range(len(grid) - 1):
for j in range(len(grid[0]) - 1):
loc = grid[i][j]
if loc == "":
continue
add_edge(edges, loc, grid[i + 1][j])
add_edge(edges, loc, grid[i][j + 1])
if diagonal_neighbors:
add_edge(edges, loc, grid[i + 1][j + 1])
if i >= 1:
add_edge(edges, loc, grid[i - 1][j + 1])
if j >= 1:
add_edge(edges, loc, grid[i + 1][j - 1])
res = {}
for (i, edge) in enumerate(edges):
res[f"edge_{i+1}"] = {"location_1": edge[0], "location_2": edge[1]}
return res
memo(f)
Decorator to memoize a function (caches results given args, only use if deterministic)
Source code in titan/utils.py
def memo(f):
"""
Decorator to memoize a function
(caches results given args, only use if deterministic)
"""
cache = {}
@wraps(f)
def wrap(*arg):
if arg not in cache:
cache[arg] = f(*arg)
return cache[arg]
return wrap
override_param(params, param_path, value, delimiter='|')
Given the params and a parameter path in the format prep|cap, change the current value to new value
Source code in titan/utils.py
def override_param(params: ObjMap, param_path: str, value, delimiter="|"):
"""
Given the params and a parameter path in the format prep|cap, change the
current value to new value
"""
override_item, last_key = get_param_from_path(params, param_path, delimiter)
try:
old_val = override_item[last_key]
except KeyError:
last_key = int(last_key)
old_val = override_item[last_key]
logging.info(f"overriding - {param_path}: {old_val} => {value}")
override_item[last_key] = value
safe_dist(dist_info, rand_gen)
Draw a value from a distribution as defined in dist_info
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dist_info |
ObjMap |
a definition of a distribution to use [params.classes.distributions] |
required |
rand_gen |
|
random number generator |
required |
Returns:
Type | Description |
---|---|
Union[int, float] |
a value drawn from the distribution |
Source code in titan/utils.py
def safe_dist(dist_info: ObjMap, rand_gen) -> Union[int, float]:
"""
Draw a value from a distribution as defined in `dist_info`.
args:
dist_info: a definition of a distribution to use [params.classes.distributions]
rand_gen: random number generator
returns:
a value drawn from the distribution
"""
# gather arguments
args = []
for d in dist_info.vars.values():
args.append(parse_var(d.value, d.value_type))
dist = get_dist(rand_gen, dist_info.dist_type)
value = dist(*args)
if hasattr(value, "__iter__"): # check if value is any type of sequence
return value[0]
else:
return value
safe_divide(numerator, denominator)
Divide two numbers, but default 0 if denominator is 0, otherwise divide as normal.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
numerator |
int |
number being divided |
required |
denominator |
int |
number doing the dividing |
required |
Returns:
Type | Description |
---|---|
float |
resulting number |
Source code in titan/utils.py
def safe_divide(numerator: int, denominator: int) -> float:
"""
Divide two numbers, but default 0 if denominator is 0, otherwise divide as normal.
args:
numerator: number being divided
denominator: number doing the dividing
returns:
resulting number
"""
if denominator == 0:
return 0.0
else:
return 1.0 * numerator / denominator
safe_random_choice(seq, rand_gen, weights=None)
Return None or a random choice from a collection of items
Parameters:
Name | Type | Description | Default |
---|---|---|---|
seq |
|
collection to select a random item from |
required |
rand_gen |
|
random number generator |
required |
weights |
|
an optional collection of weights to use instead of a uniform distribution |
None |
Returns:
Type | Description |
---|---|
|
an item, or |
Source code in titan/utils.py
def safe_random_choice(seq, rand_gen, weights=None):
"""
Return None or a random choice from a collection of items
args:
seq: collection to select a random item from
rand_gen: random number generator
weights: an optional collection of weights to use instead of a uniform distribution
returns:
an item, or `None` if the collection is empty
"""
if not seq:
return None
if isinstance(seq, set):
seq = tuple(seq)
# don't call out to random choices if we don't need to (for performance)
if len(seq) == 1:
return seq[0]
elif len(seq) == 2 and weights is None:
return seq[0] if rand_gen.random() <= 0.5 else seq[1]
choices = rand_gen.choices(seq, weights=weights)
return choices[0]
safe_random_int(start, stop, rand_gen)
Return an integer between [start, stop)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
start |
int |
start value |
required |
stop |
int |
stop value |
required |
rand_gen |
|
random number generator |
required |
Returns:
Type | Description |
---|---|
int |
an item, or |
Source code in titan/utils.py
def safe_random_int(start: int, stop: int, rand_gen) -> int:
"""
Return an integer between [start, stop)
args:
start: start value
stop: stop value
rand_gen: random number generator
returns:
an item, or `None` if the collection is empty
"""
return floor(rand_gen.random() * (stop - start) + start)
safe_shuffle(seq, rand_gen)
Return None or a shuffled sequence
Parameters:
Name | Type | Description | Default |
---|---|---|---|
seq |
Collection[~T] |
collection to shuffle |
required |
rand_gen |
|
random number generator |
required |
Returns:
Type | Description |
---|---|
Iterable[~T] |
shuffled sequence, or |
Source code in titan/utils.py
def safe_shuffle(seq: Collection[T], rand_gen) -> Iterable[T]:
"""
Return None or a shuffled sequence
args:
seq: collection to shuffle
rand_gen: random number generator
returns:
shuffled sequence, or `None` if empty
"""
if seq:
if isinstance(seq, set):
seq = list(seq)
rand_gen.shuffle(seq)
return seq
else:
return []
scale_param(params, param_path, scalar, delimiter='|')
Given the params and a parameter path in the format prep|cap, scale the current value by the scalar
Source code in titan/utils.py
def scale_param(params: ObjMap, param_path: str, scalar: float, delimiter="|"):
"""
Given the params and a parameter path in the format prep|cap, scale the
current value by the scalar
"""
scaling_item, last_key = get_param_from_path(params, param_path, delimiter)
old_val = scaling_item[last_key]
logging.info(f"scaling - {param_path}: {old_val} => {old_val * scalar}")
scaling_item[last_key] = old_val * scalar
total_probability(p, num_acts)
Given a per act probability and a number of acts, return the total probability.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
p |
float |
the per act probability |
required |
num_acts |
int |
the number of acts |
required |
Returns:
Type | Description |
---|---|
float |
the total probability |
Source code in titan/utils.py
def total_probability(p: float, num_acts: int) -> float:
"""
Given a per act probability and a number of acts, return the total probability.
args:
p: the per act probability
num_acts: the number of acts
returns:
the total probability
"""
if num_acts == 1:
return p
elif num_acts >= 1:
return 1.0 - binom_0(num_acts, p)
else:
return 0.0
Params
ObjMap
A dictionary-like class which allows accessing members either using standard dictionary notation or dots. Note the hash function is hard-coded - beware.
__hash__(self)
special
Return hash(self).
Source code in titan/parse_params.py
def __hash__(self):
return 1234567890
check_params(params)
Consistency checks for param populations
Source code in titan/parse_params.py
def check_params(params: ObjMap):
"""
Consistency checks for param populations
"""
race_pop = 0
for race in params.classes.races:
r_dems = params.demographics[race]
race_pop += r_dems.ppl
sex_type_pop = 0
for st, st_dems in r_dems.sex_type.items():
if st in list(params.classes.sex_types.keys()):
sex_type_pop += st_dems.ppl
drug_type_pop = 0
for dt, dt_dems in st_dems.drug_type.items():
if dt in list(params.classes.drug_types):
drug_type_pop += dt_dems.ppl
assert math.isclose(
drug_type_pop, 1, abs_tol=0.001
), f"ppl of {race}'s {st}'s drug_types must add to 1. Currently adding to {drug_type_pop}"
assert math.isclose(
sex_type_pop, 1, abs_tol=0.001
), f"ppl of {race}'s sex_types must add to 1. Currently adding to {sex_type_pop}"
assert math.isclose(race_pop, 1, abs_tol=0.001), "ppl of races must add to 1"
loc_pop = 0
for location in params.classes.locations.values():
loc_pop += location.ppl
assert math.isclose(loc_pop, 1, abs_tol=0.001), "ppl of locations must add to 1"
for param, assort in params.assort_mix.items():
assort_value = 0
for ptnr_value in assort.partner_values.values():
assort_value += ptnr_value
assert math.isclose(
assort_value, 1, abs_tol=0.001
), f"assort values must add to 1, not {assort_value} in {param}"
create_params(setting_name, param_path, outdir, error_on_unused=False)
Entry function - given the path to the setting, params, output directory and whether or not to use the base setting. Parse and create a params (ObjMap) object.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
setting_name |
Optional[str] |
path to a settings file or directory or |
required |
param_path |
str |
path to parameter file or directory |
required |
outdir |
str |
path to directory where computed params will be saved |
required |
error_on_unused |
bool |
throw a hard error if there are unused parameters, otherwise warnings are only printed |
False |
Returns:
Type | Description |
---|---|
ObjMap |
computed/validated model paramters with defaults filled in where needed |
Source code in titan/parse_params.py
def create_params(
setting_name: Optional[str],
param_path: str,
outdir: str,
error_on_unused: bool = False,
) -> ObjMap:
"""
Entry function - given the path to the setting, params, output directory and whether
or not to use the base setting. Parse and create a params (ObjMap) object.
args:
setting_name: path to a settings file or directory or `None`
param_path: path to parameter file or directory
outdir: path to directory where computed params will be saved
error_on_unused: throw a hard error if there are unused parameters, otherwise warnings are only printed
returns:
computed/validated model paramters with defaults filled in where needed
"""
# find defs, where we are in the code for settings and base
filename = getsourcefile(create_params) # what is the sourcefile for this function
if filename is not None:
parent = Path(filename).resolve().parent
else:
raise Exception("can't find where I am in the code?")
param_defs = os.path.join(parent, "params")
param_paths = []
# merge setting and params
if setting_name is not None:
# check if it's a known setting or pass it through as a path
if setting_name in os.listdir(os.path.join(parent, "settings")):
param_paths.append(os.path.join(parent, "settings", setting_name))
else:
param_paths.append(setting_name)
param_paths.append(param_path)
parsed = paraml.create_params(
param_defs,
*param_paths,
out_path=os.path.join(outdir, "params.yml"),
error_on_unused=error_on_unused,
)
parsed = ObjMap(parsed)
check_params(parsed)
# copy migration file if enabled
if parsed.location.migration.enabled:
shutil.copy(
parsed.location.migration.probs_file,
os.path.join(outdir, "migration_probs.csv"),
)
return parsed
Probability Distributions
pert(np_random, low, peak, high, temperature)
A pert distribution, inspired by tensorflow
arguments must be so that:
- low < peak < high
- temperature > 0
The support is [low, high]
. The peak
must fit in that interval:
low < peak < high
. The temperature
is a positive parameter that
controls the shape of the distribution. Higher values yield a sharper peak.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
np_random |
|
random number generator (used to get beta) |
required |
low |
|
distribution low value |
required |
peak |
|
modal point in distribution |
required |
high |
|
distribution high value |
required |
temperature |
|
scaling factor |
required |
Source code in titan/distributions.py
def pert(np_random, low, peak, high, temperature):
"""
A pert distribution, inspired by [tensorflow](https://github.com/tensorflow/probability/blob/c833ee5cd9f60f3257366b25447b9e50210b0590/tensorflow_probability/python/distributions/pert.py#L137)
arguments must be so that:
* low < peak < high
* temperature > 0
The support is `[low, high]`. The `peak` must fit in that interval:
`low < peak < high`. The `temperature` is a positive parameter that
controls the shape of the distribution. Higher values yield a sharper peak.
args:
np_random: random number generator (used to get beta)
low: distribution low value
peak: modal point in distribution
high: distribution high value
temperature: scaling factor
"""
assert low < peak < high
assert temperature > 0
scale = high - low
alpha = 1.0 + temperature * (peak - low) / scale
beta = 1.0 + temperature * (high - peak) / scale
return low + scale * np_random.beta(alpha, beta)
poisson(np_rand, mu)
Mirrors scipy poisson.rvs function as used in code
Source code in titan/distributions.py
def poisson(np_rand, mu: float):
"""
Mirrors scipy poisson.rvs function as used in code
"""
if mu < 0:
return 0
return np_rand.poisson(mu)
set_value(np_random, value)
A distribution that always returns the value passed
Parameters:
Name | Type | Description | Default |
---|---|---|---|
np_random |
|
random number generator (to conform to distribution interface) |
required |
value |
|
value to return |
required |
Source code in titan/distributions.py
def set_value(np_random, value):
"""
A distribution that always returns the value passed
args:
np_random: random number generator (to conform to distribution interface)
value: value to return
"""
return value
weibull_modified(np_random, shape, scale)
Modified version of numpy's (single parameter) weibull distribution to use the 2-parameter weibull.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
np_random |
|
random number generator |
required |
shape |
|
weibull shape parameter |
required |
scale |
|
weibull scale parameter |
required |
Source code in titan/distributions.py
def weibull_modified(np_random, shape, scale):
"""
Modified version of numpy's (single parameter) weibull distribution to use the 2-parameter weibull.
args:
np_random: random number generator
shape: weibull shape parameter
scale: weibull scale parameter
"""
random_number = np_random.random()
return scale * (-log(1 - random_number)) ** (1 / shape)
Complex Probabilities
get_death_rate(hiv, aids, drug_type, sex_type, haart_adh, race, location, steps_per_year, exit_class)
Find the death rate of an agent given a set of attributes.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
hiv |
bool |
whether the agent is HIV+ |
required |
aids |
bool |
whether the agent has AIDS |
required |
drug_type |
str |
whether the PWID base death rate should be used or the base one |
required |
haart_adh |
bool |
whether an agent is haart adherent |
required |
race |
str |
the race of the agent |
required |
location |
Location |
agent's location |
required |
steps_per_year |
int |
the number of model steps in a year |
required |
exit_class |
str |
the exit class to access in agent params |
required |
Returns:
Type | Description |
---|---|
float |
the probability of an agent with these characteristics dying in a given time step |
Source code in titan/probabilities.py
@utils.memo
def get_death_rate(
hiv: bool,
aids: bool,
drug_type: str,
sex_type: str,
haart_adh: bool,
race: str,
location: Location,
steps_per_year: int,
exit_class: str,
) -> float:
"""
Find the death rate of an agent given a set of attributes.
args:
hiv: whether the agent is HIV+
aids: whether the agent has AIDS
drug_type: whether the PWID base death rate should be used or the base one
haart_adh: whether an agent is haart adherent
race: the race of the agent
location: agent's location
steps_per_year: the number of model steps in a year
exit_class: the exit class to access in agent params
returns:
the probability of an agent with these characteristics dying in a given time step
"""
param = location.params.demographics
death_param = param[race].sex_type[sex_type].drug_type[drug_type].exit[exit_class]
p = death_param.base
if aids:
p *= death_param.aids
elif hiv:
if haart_adh:
p *= death_param.haart_adherent
else:
p *= death_param.hiv
# putting it into per 1 person-month from per 1000 person years
return p / (1000 * steps_per_year)