Population
Population
The Population
class is used to represent the population of agents the model is running on. On construction, it stochastically creates the population described in the params
. At its core, it is a graph with nodes (all_agents
) and edges (relationships
), it can be formally backed by a NetworkX graph by enabling the graph in the prams file. This allows for some graph-specific logic to be applied throughout the running of the model (e.g. trimming components, writing network statistics).
__init__(self, params, id=None)
special
Initialize Population object.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
params |
parse_params.ObjMap |
Model parameters |
required |
id |
Optional[str] |
8 character identifier for a model |
None |
Source code in titan/population.py
def __init__(self, params: "parse_params.ObjMap", id: Optional[str] = None):
"""
Initialize Population object.
args:
params : Model parameters
id: 8 character identifier for a model
"""
if id is None:
self.id = nanoid.generate(size=8)
else:
self.id = id
utils.set_up_logging(params)
logging.info(f"Population ID: {self.id}")
self.pop_seed = utils.get_check_rand_int(params.model.seed.ppl)
# Init RNG for population creation to pop_seed
self.pop_random = random.Random(self.pop_seed)
self.np_random = np.random.default_rng(self.pop_seed)
self.enable_graph = params.model.network.enable
self.components: List = []
if self.enable_graph:
self.graph = nx.Graph()
else:
self.graph = None
self.params = params
# set up the in-scope exposures
self.exposures = [
exposure
for exposure in exposures.BaseExposure.__subclasses__()
if self.params.exposures[exposure.name]
]
# initialize the class level items
for exposure in self.exposures:
exposure.init_class(params)
# set up the in-scope features
self.features = [
feature
for feature in features.BaseFeature.__subclasses__()
if self.params.features[feature.name]
]
# initialize the class level items
for feature in self.features:
feature.init_class(params)
# set up the population's locations and edges
self.geography = location.Geography(params)
# All agent set list
self.all_agents = ag.AgentSet("AllAgents")
# pwid agents (performance for partnering)
self.pwid_agents = ag.AgentSet("PWID", parent=self.all_agents)
# agents who can take on a partner
self.partnerable_agents: Dict[str, Set["ag.Agent"]] = {}
for bond_type in self.params.classes.bond_types.keys():
self.partnerable_agents[bond_type] = set()
# who can sleep with whom
self.sex_partners: Dict[str, Set["ag.Agent"]] = {}
for sex_type in self.params.classes.sex_types.keys():
self.sex_partners[sex_type] = set()
self.relationships: Set["ag.Relationship"] = set()
# find average partnership durations
self.mean_rel_duration: Dict[str, Dict] = partnering.get_mean_rel_duration(
self.params
)
logging.info(" Creating agents")
# for each location in the population, create agents per that location's demographics
init_time = -1 * self.params.model.time.burn_steps
for loc in self.geography.locations.values():
for race in params.classes.races:
for i in range(
round(
params.model.num_pop
* loc.ppl
* loc.params.demographics[race].ppl
)
):
if self.all_agents.num_members() >= self.params.model.num_pop:
logging.warning(
"WARNING: not adding agent to population - too many agents"
)
break
agent = self.create_agent(loc, race, init_time)
self.add_agent(agent)
# initialize relationships
logging.info(" Creating Relationships")
self.update_partner_assignments(0)
add_agent(self, agent)
Adds an agent to the population
Parameters:
Name | Type | Description | Default |
---|---|---|---|
agent |
ag.Agent |
The agent to be added |
required |
Source code in titan/population.py
def add_agent(self, agent: "ag.Agent"):
"""
Adds an agent to the population
args:
agent : The agent to be added
"""
# Add to all agent set
self.all_agents.add_agent(agent)
if agent.drug_type == "Inj":
self.pwid_agents.add_agent(agent)
# who can sleep with this agent
for sex_type in self.params.classes.sex_types[agent.sex_type].sleeps_with:
self.sex_partners[sex_type].add(agent)
if self.enable_graph:
self.graph.add_node(agent)
add_relationship(self, rel)
Add a new relationship to the population.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
rel |
ag.Relationship |
The Relationship to be added |
required |
Source code in titan/population.py
def add_relationship(self, rel: "ag.Relationship"):
"""
Add a new relationship to the population.
args:
rel : The Relationship to be added
"""
self.relationships.add(rel)
if self.enable_graph:
self.graph.add_edge(rel.agent1, rel.agent2, type=rel.bond_type)
connected_components(self)
Get connected components in graph (if enabled)
Returns:
Type | Description |
---|---|
List |
list of connected components |
Source code in titan/population.py
def connected_components(self) -> List:
"""
Get connected components in graph (if enabled)
returns:
list of connected components
"""
if self.enable_graph:
return self.components
else:
raise ValueError(
"Can't get connected_components, population doesn't have graph enabled."
)
create_agent(self, loc, race, time, sex_type=None, drug_type=None, age=None)
Create a new agent with randomly assigned attributes according to population demographics [params.demographics]
Parameters:
Name | Type | Description | Default |
---|---|---|---|
loc |
location.Location |
location the agent will live in |
required |
race |
str |
race of the new agent |
required |
time |
int |
current time step of the model |
required |
sex_type |
Optional[str] |
sex_type of the new agent |
None |
drug_type |
Optional[str] |
drug_type of the new agent |
None |
age |
Optional[int] |
age of the new agent |
None |
Returns:
Type | Description |
---|---|
ag.Agent |
a new agent |
Source code in titan/population.py
def create_agent(
self,
loc: "location.Location",
race: str,
time: int,
sex_type: Optional[str] = None,
drug_type: Optional[str] = None,
age: Optional[int] = None,
) -> "ag.Agent":
"""
Create a new agent with randomly assigned attributes according to population
demographics [params.demographics]
args:
loc: location the agent will live in
race: race of the new agent
time: current time step of the model
sex_type: sex_type of the new agent
drug_type: drug_type of the new agent
age: age of the new agent
returns:
a new agent
"""
if not sex_type:
sex_type = utils.safe_random_choice(
loc.pop_weights[race]["values"],
self.pop_random,
weights=loc.pop_weights[race]["weights"],
)
# no choice available
if sex_type is None:
raise ValueError("Agent must have sex type")
# Determine drugtype
if not drug_type:
drug_type = utils.safe_random_choice(
loc.drug_weights[race][sex_type]["values"],
self.pop_random,
weights=loc.drug_weights[race][sex_type]["weights"],
)
# no choice available
if drug_type is None:
raise ValueError("Agent must have drug type")
if not age:
age = self.get_age(loc, race)
agent = ag.Agent(sex_type, age, race, drug_type, loc)
sex_role = utils.safe_random_choice(
loc.role_weights[race][sex_type]["values"],
self.pop_random,
weights=loc.role_weights[race][sex_type]["weights"],
)
if sex_role is None:
raise ValueError("Agent must have sex role")
else:
agent.sex_role = sex_role
agent_params = (
agent.location.params.demographics[race]
.sex_type[sex_type]
.drug_type[drug_type]
)
for exposure in self.exposures:
agent_feature = getattr(agent, exposure.name)
agent_feature.init_agent(self, time)
for bond, bond_def in loc.params.classes.bond_types.items():
agent.partners[bond] = set()
dist_info = agent_params.num_partners[bond]
agent.mean_num_partners[bond] = ceil(
utils.safe_dist(dist_info, self.np_random)
* utils.safe_divide(
agent.location.params.calibration.sex.partner,
self.mean_rel_duration[bond][race],
)
)
# so not zero if added mid-year
agent.target_partners[bond] = agent.mean_num_partners[bond]
if "injection" in bond_def.acts_allowed:
assert agent.drug_type == "Inj" or agent.mean_num_partners[bond] == 0
if agent.target_partners[bond] > 0:
self.partnerable_agents[bond].add(agent)
for feature in self.features:
agent_feature = getattr(agent, feature.name)
agent_feature.init_agent(self, time)
return agent
get_age(self, loc, race)
Given the population characteristics, get a random age to assign to an agent given the race of that agent
Parameters:
Name | Type | Description | Default |
---|---|---|---|
race |
str |
race of the agent whose age is being generated |
required |
Returns:
Type | Description |
---|---|
int |
age and the bin the age came from |
Source code in titan/population.py
def get_age(self, loc: "location.Location", race: str) -> int:
"""
Given the population characteristics, get a random age to assign to an agent given the race of that agent
args:
race : race of the agent whose age is being generated
returns:
age and the bin the age came from
"""
bins = loc.params.demographics[race].age
i = utils.get_independent_bin(self.pop_random, bins)
age = self.pop_random.randrange(bins[i].min, bins[i].max)
return age
migrate(self)
Have agents migrate between locations with probabilities defined in location.migration.matrix_file
.
Source code in titan/population.py
def migrate(self):
"""
Have agents migrate between locations with probabilities defined in `location.migration.matrix_file`.
"""
m_attr = self.params.location.migration.attribute
for a in self.all_agents:
m_param = a.location.migration_weights
if self.pop_random.random() < m_param["prob"]:
new_loc = utils.safe_random_choice(
m_param["values"],
self.pop_random,
weights=m_param["weights"],
)
if m_attr == "name":
a.location = self.geography.locations[new_loc]
elif m_attr == "category":
a.location = utils.safe_random_choice(
self.geography.categories[new_loc], self.pop_random
)
remove_agent(self, agent)
Remove an agent from the population.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
agent |
ag.Agent |
Agent to remove |
required |
Source code in titan/population.py
def remove_agent(self, agent: "ag.Agent"):
"""
Remove an agent from the population.
args:
agent : Agent to remove
"""
for rel in copy(agent.relationships):
rel.progress(force=True)
self.remove_relationship(rel)
self.all_agents.remove_agent(agent)
for partner_type in self.sex_partners:
if agent in self.sex_partners[partner_type]:
self.sex_partners[partner_type].remove(agent)
for exposure in self.exposures:
agent_attr = getattr(agent, exposure.name)
if agent_attr.active:
exposure.remove_agent(agent)
for feature in self.features:
agent_attr = getattr(agent, feature.name)
if agent_attr.active:
feature.remove_agent(agent)
if self.enable_graph:
self.graph.remove_node(agent)
for bond in self.partnerable_agents.values():
if agent in bond:
bond.remove(agent)
# mark agent component as -1 (no component)
agent.component = "-1"
remove_relationship(self, rel)
Remove a relationship from the population.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
rel |
ag.Relationship |
Relationship to remove |
required |
Source code in titan/population.py
def remove_relationship(self, rel: "ag.Relationship"):
"""
Remove a relationship from the population.
args:
rel : Relationship to remove
"""
self.relationships.remove(rel)
# without this relationship, are agents partnerable again?
self.update_partnerability(rel.agent1)
self.update_partnerability(rel.agent2)
if self.enable_graph:
self.graph.remove_edge(rel.agent1, rel.agent2)
trim_graph(self)
Initialize network with graph-based algorithm for relationship adding/pruning
Source code in titan/population.py
def trim_graph(self):
"""
Initialize network with graph-based algorithm for relationship
adding/pruning
"""
if self.params.model.network.type == "comp_size":
def trim_component(component, max_size):
for agent in component.nodes:
if (
self.pop_random.random()
< self.params.calibration.network.trim.prob
):
for rel in copy(agent.relationships):
if len(agent.relationships) == 1:
break # Make sure that agents stay part of the
# network by keeping one bond
rel.progress(force=True)
self.remove_relationship(rel)
# recurse on new sub-components
sub_comps = utils.connected_components(component)
for sub_comp in sub_comps:
if sub_comp.number_of_nodes() > max_size:
trim_component(component, max_size)
else:
break
components = self.connected_components()
for comp in components:
if (
comp.number_of_nodes()
> self.params.model.network.component_size.max
):
logging.info(f"TOO BIG {comp} {comp.number_of_nodes()}")
trim_component(comp, self.params.model.network.component_size.max)
logging.info(f" Total agents in graph: {self.graph.number_of_nodes()}")
update_agent_components(self)
Update the component IDs associated with each agent based on the current state of the graph
Source code in titan/population.py
def update_agent_components(self):
"""
Update the component IDs associated with each agent based on the current state of the graph
"""
if self.enable_graph:
self.components = utils.connected_components(self.graph)
for id, component in enumerate(self.components):
for agent in component.nodes:
agent.component = str(id)
self.params.classes.components = list(
map(str, range(-1, len(self.components)))
)
update_agent_partners(self, agent, bond_type, components)
Finds and bonds new partner. Creates relationship object for partnership, calcs partnership duration, adds it to the population, and adds to networkX graph if self.enable_graph is set True.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
agent |
ag.Agent |
Agent that is seeking a new partner |
required |
bond_type |
str |
What type of bond the agent is seeking to make |
required |
Returns:
Type | Description |
---|---|
bool |
True if no match was found for agent (used for retries) |
Source code in titan/population.py
def update_agent_partners(
self, agent: "ag.Agent", bond_type: str, components: List
) -> bool:
"""
Finds and bonds new partner. Creates relationship object for partnership,
calcs partnership duration, adds it to the population, and adds to networkX graph if self.enable_graph
is set True.
args:
agent: Agent that is seeking a new partner
bond_type: What type of bond the agent is seeking to make
returns:
True if no match was found for agent (used for retries)
"""
partnerable_agents = self.partnerable_agents[bond_type]
if (
self.pop_random.random()
< self.params.partnership.network.same_component.prob
and agent.has_partners()
):
# find agent's component
agent_component: Set["ag.Agent"] = set()
for comp in components:
if agent in comp:
agent_component = comp
break
partnerable_agents = partnerable_agents & agent_component
partner = partnering.select_partner(
agent,
partnerable_agents,
self.sex_partners,
self.pwid_agents,
self.params,
self.pop_random,
bond_type,
)
no_match = True
if partner:
race = utils.safe_random_choice([agent.race, partner.race], self.pop_random)
duration = partnering.get_partnership_duration(
agent.location.params, self.np_random, bond_type, race
)
relationship = ag.Relationship(
agent, partner, duration, bond_type=bond_type
)
self.add_relationship(relationship)
# can partner still partner?
if len(partner.partners[bond_type]) > (
partner.target_partners[bond_type]
* self.params.calibration.partnership.buffer
):
self.partnerable_agents[bond_type].remove(partner)
no_match = False
return no_match
update_partner_assignments(self, t)
Determines which agents will seek new partners from All_agentSet. Calls update_agent_partners for any agents that desire partners.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
t |
int |
current time step of the model |
required |
Source code in titan/population.py
def update_partner_assignments(self, t: int):
"""
Determines which agents will seek new partners from All_agentSet.
Calls update_agent_partners for any agents that desire partners.
args:
t: current time step of the model
"""
# update agent targets annually
if t % self.params.model.time.steps_per_year == 0:
self.update_partner_targets()
if self.enable_graph:
network_components = [set(g.nodes()) for g in self.components]
else:
network_components = []
# Now create partnerships until available partnerships are out
for bond in self.params.classes.bond_types:
eligible_agents = deque(
[
a
for a in self.all_agents
if len(a.partners[bond]) < a.target_partners[bond]
]
)
attempts = {a: 0 for a in eligible_agents}
while eligible_agents:
agent = eligible_agents.popleft()
if len(agent.partners[bond]) < agent.target_partners[bond]:
# no match
if self.update_agent_partners(agent, bond, network_components):
attempts[agent] += 1
# add agent back to eligible pool
if (
len(agent.partners[bond]) < agent.target_partners[bond]
and attempts[agent]
< self.params.calibration.partnership.break_point
):
eligible_agents.append(agent)
if self.enable_graph:
self.trim_graph()
self.update_agent_components()
update_partner_targets(self)
Update the target number of partners for each agent and bond type
Source code in titan/population.py
def update_partner_targets(self):
"""
Update the target number of partners for each agent and bond type
"""
for a in self.all_agents:
for bond in self.params.classes.bond_types:
a.target_partners[bond] = poisson(
self.np_random, a.mean_num_partners[bond]
)
self.update_partnerability(a)
update_partnerability(self, a)
Update whether each agent in the population is currently able to form new relationships for each bond type
Source code in titan/population.py
def update_partnerability(self, a):
"""
Update whether each agent in the population is currently able to form new relationships for each bond type
"""
for bond in self.params.classes.bond_types.keys():
if a in self.partnerable_agents[bond]:
if len(a.partners[bond]) > (
a.target_partners[bond] * self.params.calibration.partnership.buffer
):
self.partnerable_agents[bond].remove(a)
elif len(a.partners[bond]) < (
a.target_partners[bond] * self.params.calibration.partnership.buffer
):
self.partnerable_agents[bond].add(a)
Population Reading & Writing
Released in v1.1.0
Populations can be saved to file so that they can be analysed in detail or re-used in a future run. run_titan.py
allows this using the --savepop [path]
option to save the population to the path, and the --poppath [path]
option loads the population at the path. The population is saved after creation, but before the model has run.
Saving the Population
The population is represented as a series of csv files that save the attributes for the core entities (agents, relationships at this time). The population can be saved with only core
attributes (e.g. race, sex_type, hiv) or with intervention
attributes (e.g. prep, vaccinated) as well. intervention
attributes are less likely to work as intended across versions of the model.
titan.population_io.write(pop, dir, compress=True)
Write a non-empty Population to file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pop |
Population |
a non-empty agent population |
required |
dir |
str |
path to directory where files should be written |
required |
compress |
bool |
whether to compress and archive the csv |
True |
Returns:
Type | Description |
---|---|
str |
path, or archive name if compress is true |
Source code in titan/population_io.py
def write(pop: Population, dir: str, compress: bool = True) -> str:
"""
Write a non-empty Population to file.
args:
pop: a non-empty agent population
dir: path to directory where files should be written
compress: whether to compress and archive the csv
returns:
path, or archive name if compress is true
"""
assert len(pop.relationships) > 0, "Can't write empty population"
utils.set_up_logging(pop.params)
# open agent file
agent_file = os.path.join(dir, f"{pop.id}_agents.csv")
a = next(iter(pop.all_agents))
# get all attributes
agent_attrs = [k for k in a.__dict__.keys() if k not in agent_exclude_attrs]
write_class_file(agent_file, pop.all_agents, agent_attrs)
extra_files = []
# write agent extras (features, exposures) to their own files
def write_extra_class(extra_attrs, extra_type):
for extra in extra_attrs:
extra_obj = getattr(a, extra)
extra_attrs = list(extra_obj.__dict__.keys())
extra_file = os.path.join(dir, f"{pop.id}_{extra_type}_{extra}.csv")
extra_files.append(extra_file)
write_extra_class_file(extra_file, pop.all_agents, extra, extra_attrs)
write_extra_class(agent_feature_attrs, "feat")
write_extra_class(agent_exposure_attrs, "exposure")
# open relationship file
rel_file = os.path.join(dir, f"{pop.id}_relationships.csv")
r = next(iter(pop.relationships))
rel_attrs = list(r.__dict__.keys())
write_class_file(rel_file, pop.relationships, rel_attrs)
if compress:
archive_name = make_archive(
os.path.join(dir, f"{pop.id}_pop"), "gztar", root_dir=dir, base_dir="."
)
os.remove(agent_file)
os.remove(rel_file)
for f in extra_files:
os.remove(f)
return archive_name
else:
return dir
Reading in/using a Saved Population
A population can be created from the files saved, however, there is no validation done to ensure the params used when running on this population make sense/match what was originally used when creating it. Some things may validly change (e.g. interventions, reports needed, seeds), but others may result in strange behavior if changed (e.g. race distribution, what classes are in use).
titan.population_io.read(params, path)
Read a population from file and return a Population instance
Parameters:
Name | Type | Description | Default |
---|---|---|---|
params |
ObjMap |
the parameters used for creating this popultation |
required |
path |
str |
path where [id]_agents.csv and [id]_relationships.csv are or tar.gz file containing population |
required |
Returns:
Type | Description |
---|---|
Population |
the re-constituted population |
Source code in titan/population_io.py
def read(params: ObjMap, path: str) -> Population:
"""
Read a population from file and return a Population instance
args:
params: the parameters used for creating this popultation
path: path where [id]_agents.csv and [id]_relationships.csv are or tar.gz file
containing population
returns:
the re-constituted population
"""
if os.path.isfile(path):
dir = mkdtemp()
unpack_archive(path, dir)
path = dir
agent_file = glob.glob(os.path.join(path, "*_agents.csv"))[0]
rel_file = glob.glob(os.path.join(path, "*_relationships.csv"))[0]
feat_files = glob.glob(os.path.join(path, "*_feat_*.csv"))
exposure_files = glob.glob(os.path.join(path, "*_exposure_*.csv"))
assert os.path.isfile(agent_file), f"can't find agents.csv in {dir}"
assert os.path.isfile(rel_file), f"can't find relationships.csv in {dir}"
_, agent_filename = os.path.split(agent_file)
id = agent_filename[:8]
# create feature dict
agent_extras: Dict[str, Dict] = {}
def update_agent_extras(files, extra_type):
pattern = re.compile(f"^.*_{extra_type}_(.*)\\.csv$")
for file in files:
m = pattern.match(file)
if m is not None:
extra = m.group(1)
agent_extras[extra] = {}
with open(file, newline="") as f:
reader = csv.DictReader(f)
for row in reader:
agent_extras[extra][int(row["agent"])] = row
update_agent_extras(feat_files, "feat")
update_agent_extras(exposure_files, "exposure")
# don't create any agents on init
params.model.num_pop = 0
pop = Population(params, id=id)
# re-create all agents and add to population
with open(agent_file, newline="") as f:
reader = csv.DictReader(f)
for row in reader:
a = create_agent(
row,
params.classes.bond_types.keys(),
pop.geography.locations,
agent_extras,
)
pop.add_agent(a)
# update num_pop to actual population
params.model.num_pop = pop.all_agents.num_members()
# re-create all relationships and add to population
with open(rel_file, newline="") as f:
reader = csv.DictReader(f)
for row in reader:
r = create_relationship(row, pop)
pop.add_relationship(r)
pop.update_agent_components()
return pop