Skip to content

population.py

population

This class stores the attributes that characterize a population and the methods for its initialization and progression.

Attributes:

Name Type Description
Nparticles int

Number of particles in the simulation.

rng Generator

Random number generator.

random np.random

Stores np.ramdon generator to use exclusively with NetworkX.

disease disease

Object with disease information and methods.

homes dict

Dictionary with information on every home.

workersAvailable list

List with available workers.

max_encounters int

Max number of encounter used on max_encounters filter.

in_lckd np.ndarray bool

Lockdown turned on or not for each step.

tracing list

List with the tracing for every step, tracing is stored as an dict of lists.

tracing_infectious list

List with the tracing of infectious particles for every step, tracing is stored as an dict of lists.

infection_tree Graph

Graph with disease spread information.

Particles general attributes

Attributes bellow describes every particle in the current step.

Attributes:

Name Type Description
pid np.ndarray int

Particle

home_id np.ndarray int

Id of the home for each particle, in the list of all homes.

ages np.ndarray int

Age group for each particle.

placement np.ndarray int

Placement marker for each particle. Placement tells where the particle is in the simulation (eg.: home, environment, services).

activity np.ndarray int

Activity marker for each particle. Activity tells what the particle is doing (eg.: free, working, visiting).

has_tracing np.ndarray bool

Mask for the tracing capability of every particle.

workplace_id np.ndarray int

Id of the service where particle works

workplace_instance np.ndarray int

Instance of the service where particles works.

workplace_room np.ndarray int

Room of the service particle works.

worker_type np.ndarray int

Type of worker for each particle.

Particles states attributes

Attributes:

Name Type Description
states np.ndarray int

State of the infection for every particle.

symptoms np.ndarray int

State of symptoms for every particles.

quarantine_states np.ndarray int

Quarantine states for every particle.

diag_states np.ndarray int

Last diagnostic state for every particle.

Particles infection information attributes

Attributes:

Name Type Description
inf_source np.ndarray int

Source of infection of each particle, pid of the particle that was responsible for the infection of the corresponding particle (NO_INFEC not infected, -1 initially infected).

inf_placement np.ndarray int

Placement of particles at time of infection (NO_INFEC -> not infected).

inf_vector_sympt np.ndarray int

Symptomatic state of the infection vector at the time the infection occurred (NO_INFEC -> no infection).

time_exposed np.ndarray float

Time in which a particle becomes exposed.

time_infectious np.ndarray float

Time in which a particle becomes infectious.

time_recovered np.ndarray float

Time in which a particle becomes recovered.

time_quarantined np.ndarray float

Time in which a particle was quarantined.

time_traced np.ndarray float

Time in which a particle was traced.

Time series

Attributes:

Name Type Description
states_ts np.ndarray

Time series of states attribute (conditional to store_time_series parameter).

symptoms_ts np.ndarray

Time series of symptoms attribute (conditional to store_time_series parameter).

quarantine_states_ts np.ndarray

Time series of quarantine_states attribute (conditional to store_time_series parameter).

diag_states_ts np.ndarray

Time series of diag_states attribute (conditional to store_time_series parameter).

positions_ts np.ndarray

Time series of positions attribute (conditional to store_time_series parameter).

placement_ts np.ndarray

Time series of placement attribute (conditional to store_time_series parameter).

susceptible np.ndarray

Time series of the number of susceptible particles on each step.

exposed np.ndarray

Time series of the number of exposed particles on each step.

infectious np.ndarray

Time series of the number of infectious particles on each step.

recovered np.ndarray

Time series of the number of recovered particles on each step.

deceased np.ndarray

Time series of the number of deceased particles on each step.

R0t np.ndarray int

Cumulative mean of spreads on time, converging to Basic Reproduction Number (R0)

spreads np.ndarray int

At each time, number of particles infected by the particle with corresponding column index.

add_attribute(self, name, value, store=False, compress=False)

Adds an attribute to the population object.

Parameters:

Name Type Description Default
name str

Name of the attribute.

required
value Any

Initial value for the attribute.

required
store bool

Select to store the attribute at the end of the simulation. Defaults to False.

False
compress bool

Select if the attribute should be stored compressed or not. Defaults to False.

False
Source code in comorbuss/population.py
254
255
256
257
258
259
260
261
262
263
264
265
266
267
def add_attribute(self, name, value, store=False, compress=False):
    """Adds an attribute to the population object.

    Args:
        name (str): Name of the attribute.
        value (Any): Initial value for the attribute.
        store (bool, optional): Select to store the attribute at the end of the
            simulation. Defaults to False.
        compress (bool, optional): Select if the attribute should be stored
            compressed or not. Defaults to False.
    """
    setattr(self, name, value)
    if store:
        self.comm.add_to_store(name, "comm.pop.{}".format(name), compress)

add_net_generator(self, plc, net_generator)

Add a new encounters generator to be processed every step.

Parameters:

Name Type Description Default
plc int

Placement marker.

required
net_generator net_generator

An object with a gen method.

required
Source code in comorbuss/population.py
821
822
823
824
825
826
827
828
829
def add_net_generator(self, plc, net_generator):
    """Add a new encounters generator to be processed every step.

    Args:
        plc (int): Placement marker.
        net_generator (net_generator): An object with a gen method.
    """
    self.plc_contact_history[plc] = []
    self.encounters_generators.append((plc, net_generator))

allocate_ages(self, parameters)

Allocate ages to the population, giving priority so that kids don't live alone.

Parameters:

Name Type Description Default
homes list

List of the home ids for each particle.

required
parameters dict

Simulation parameters.

required

Returns:

Type Description
np.ndarray

Age group of each particle.

Source code in comorbuss/population.py
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
def allocate_ages(self, parameters):
    """Allocate ages to the population, giving priority so that kids don't live alone.

    Args:
        homes (list): List of the home ids for each particle.
        parameters (dict): Simulation parameters.

    Returns:
        np.ndarray: Age group of each particle.
    """
    nHomes = parameters["homesNumber"]
    ageGroups = parameters["ageGroups"]
    orderToAssign = parameters["ageAssignment"]
    homesId = np.arange(nHomes, dtype=int)
    unasignedParticles = np.copy(self.pid)
    unasignedPerHome = np.zeros(np.max(self.home_id) + 1, dtype=int)
    for i, v in zip(*np.unique(self.home_id, return_counts=True)):
        unasignedPerHome[i] = v
    ages = np.zeros(self.Nparticles, dtype=int)
    particlesPerGroup = np.rint(ageGroups * self.Nparticles).astype(int)
    for group in orderToAssign:
        for _ in range(0, particlesPerGroup[group]):
            if unasignedParticles.size:
                homeId = self.rng.choice(
                    homesId,
                    p=(unasignedPerHome / sum(unasignedPerHome)),
                    replace=False,
                )
                maksUnasignedInHome = (self.home_id == homeId) & np.isin(
                    self.pid, unasignedParticles
                )
                particleId = self.rng.choice(
                    self.pid[maksUnasignedInHome], replace=False
                )
                ages[particleId] = group
                unasignedParticles = unasignedParticles[
                    unasignedParticles != particleId
                ]
                unasignedPerHome[homeId] = unasignedPerHome[homeId] - 1
    return ages

count_cases(self)

Count the number of cases at the last completed step of the time evolution

Source code in comorbuss/population.py
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
def count_cases(self):
    """Count the number of cases at the last completed step of the time evolution"""
    # Counts the current number of particles in each state and stores the sum on
    # the respective attribute
    self.susceptible[self.clk.step] = np.count_nonzero(
        self.states == S.STATE_S, axis=0
    )
    self.exposed[self.clk.step] = np.count_nonzero(self.states == S.STATE_E, axis=0)
    self.infectious[self.clk.step] = np.count_nonzero(
        self.states == S.STATE_I, axis=0
    )
    self.recovered[self.clk.step] = np.count_nonzero(
        self.states == S.STATE_R, axis=0
    )
    self.deceased[self.clk.step] = np.count_nonzero(
        self.states == S.STATE_D, axis=0
    )

    self.pre_symptomatic[self.clk.step] = np.count_nonzero(
        self.symptoms == S.SYMPT_NYET, axis=0
    )
    self.asymptomatic[self.clk.step] = np.count_nonzero(
        self.symptoms == S.SYMPT_NO, axis=0
    )
    self.symptomatic[self.clk.step] = np.count_nonzero(
        self.symptoms == S.SYMPT_YES, axis=0
    )
    self.severe_symptomatic[self.clk.step] = np.count_nonzero(
        self.symptoms == S.SYMPT_SEVERE, axis=0
    )

count_per_age_group(self, group)

Counts the number of particles in a age group

Parameters:

Name Type Description Default
group int

Id of the age group to be calculated

required

Returns:

Type Description
int

Number of particles in the given age group

Source code in comorbuss/population.py
684
685
686
687
688
689
690
691
692
693
def count_per_age_group(self, group):
    """Counts the number of particles in a age group

    Args:
        group (int): Id of the age group to be calculated

    Returns:
        int: Number of particles in the given age group
    """
    return self.ages[self.ages == group].size

enable_tracing(self, buffer_length=None)

Enables tracing. To be used only during initialization.

Parameters:

Name Type Description Default
buffer_length int

Number of steps of the required tracing. If None will store tracing for entire simulation. Defaults to None.

None
Source code in comorbuss/population.py
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
def enable_tracing(self, buffer_length=None):
    """Enables tracing. To be used only during initialization.

    Args:
        buffer_length (int, optional): Number of steps of the required tracing.
            If None will store tracing for entire simulation. Defaults to None.
    """
    if buffer_length == None:
        self.tracing_buffer_len = None
        self.update_tracing = self.update_tracing_norm
        self.get_tracing = self.get_tracing_norm
        self.gen_tracing = self.gen_tracing_all
        self.tracing = [{} for _ in range(self.comm.Nsteps)]
        self.tracing_infectious = [{} for _ in range(self.comm.Nsteps)]
    elif self.tracing_buffer_len != None:
        self.update_tracing = self.update_tracing_buffer
        self.get_tracing = self.get_tracing_buff
        self.gen_tracing = self.gen_tracing_all
        if buffer_length >= self.tracing_buffer_len:
            self.tracing_buffer_len = buffer_length
            self.tracing = circularlist(size=self.tracing_buffer_len)
            self.tracing.append(({}, 0))
            self.tracing_infectious = circularlist(size=self.tracing_buffer_len)
            self.tracing_infectious.append(({}, 0))

gen_adhesion_mask(self, percentage)

Create a boolean array indicating if particle of corresponding index is going to adhere to a given policy.

Parameters:

Name Type Description Default
percentage float

Percentage of the population to be chosen.

required

Returns:

Type Description
np.ndarray

Array of booleans with the adhesion of each particle.

Source code in comorbuss/population.py
600
601
602
603
604
605
606
607
608
609
610
611
612
613
def gen_adhesion_mask(self, percentage):
    """Create a boolean array indicating if particle of corresponding index is
        going to adhere to a given policy.

    Args:
        percentage (float): Percentage of the population to be chosen.

    Returns:
        np.ndarray: Array of booleans with the adhesion of each particle.
    """
    chosen = self.rng.choice(
        self.pid, int(self.Nparticles * percentage), replace=False
    )
    return np.isin(self.pid, chosen)

gen_encounters_and_infections(self, attributes)

Generates encounters masks for each context.

Source code in comorbuss/population.py
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
def gen_encounters_and_infections(self, attributes):
    """Generates encounters masks for each context."""
    # Empty list to store encounters masks
    self.encounters_masks = []
    mask_alive = self.states != S.STATE_D
    contacts = 0
    self.mask_new_infected[:] = False
    counts_accumulator = np.zeros(4)

    for plc, net_generator in self.encounters_generators:
        mask_particles = (self.placement == plc) & mask_alive
        nparticles = np.count_nonzero(mask_particles)
        if nparticles > 1:
            if hasattr(net_generator, "mask_new_infections"):
                gen_new_inf = net_generator.mask_new_infections
            else:
                gen_new_inf = self.disease.mask_new_infections
            for nparticles, mask_particles, mask_encounters in net_generator.gen(
                mask_alive
            ):
                contacts += np.count_nonzero(mask_encounters) / 2
                counts_accumulator += self.process_encounters(
                    nparticles,
                    mask_particles,
                    mask_encounters,
                    plc,
                    net_generator.inf_prob_weight,
                    attributes,
                    gen_new_inf,
                )
            if hasattr(net_generator, "srvc"):
                net_generator.srvc.visitors_count[self.clk.step] = np.count_nonzero(
                    mask_particles
                    & ~np.isin(
                        self.pid,
                        np.concatenate(
                            (
                                net_generator.srvc.workers_ids,
                                net_generator.srvc.guests_ids,
                            )
                        ),
                    )
                )
        self.plc_contact_history[plc] = self.plc_contact_history[plc] + [contacts]

    return counts_accumulator

gen_homes(self)

Generates a dictionary with homes information.

Returns:

Type Description
dict

Dictionary with homes information.

Source code in comorbuss/population.py
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
def gen_homes(self):
    """Generates a dictionary with homes information.

    Returns:
        dict: Dictionary with homes information.
    """
    homes = {}
    for home, pid in zip(self.home_id, self.pid):
        if not home in homes.keys():
            homes[home] = {"pids": []}
        homes[home]["pids"].append(pid)
    for home in homes.keys():
        homes[home]["pids"] = np.array(homes[home]["pids"])
        homes[home]["mask"] = np.isin(self.pid, homes[home]["pids"])
        homes[home]["nparticles"] = len(homes[home]["pids"])
        # n = len(homes[home]['pids'])
        # if n == 1:
        #     homes[home]['net'] = nx.Graph()
        #     homes[home]['net'].add_node(1)
        #     homes[home]['mask_encounters'] = np.zeros((1, 1), dtype=bool)
        # elif n > 1:
        #     homes[home]['net'] = nx.erdos_renyi_graph(n, self.home_net_par['mean_contacts']/(n-1), seed=self.random)
        #     homes[home]['mask_encounters'] = nx.to_numpy_matrix(homes[home]['net'], dtype=bool)
        # else:
        #     homes.pop('home', None)
    return dict(sorted(homes.items()))

gen_tracing_all(mask_encounters, mask_particles, nparticles) staticmethod

Trace all encounters for each particle mask_partipledof encounters to be traced

Returns:

Type Description
dict

Lists of pid that each particle encountered

Source code in comorbuss/population.py
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
@staticmethod
@njit
def gen_tracing_all(mask_encounters, mask_particles, nparticles):
    """Trace all encounters for each particle
        mask_partipledof encounters to be traced

    Returns:
        dict: Lists of pid that each particle encountered
    """
    had_encounters = np.count_nonzero(mask_encounters, axis=1) > 0
    tracing = []
    tracing_pids = []
    ids = np.arange(nparticles)
    to_pid = np.where(mask_particles)[0]
    for i, pid in zip(ids[had_encounters], to_pid[had_encounters]):
        tracing.append([to_pid[i] for i in ids[mask_encounters[i, :]]])
        tracing_pids.append(pid)
    return tracing, tracing_pids

init_states(self, inf0, inf0_symp, homes_inf0_frac=0.5, homes_inf0_dict={})

Create array of initial states and initial symptoms

Parameters:

Name Type Description Default
states np.ndarray

array of states initialized with susceptible

required
symptoms np.ndarray

arplotray of symptoms initialized with S.NO_INFEC

required
inf0 float

list of percentages of population in each state [S, E, I, R]

required
inf0_symp float

percentage of infectious population with each symptom [asymp, symp, severe]

required
Source code in comorbuss/population.py
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
def init_states(self, inf0, inf0_symp, homes_inf0_frac=0.5, homes_inf0_dict={}):
    """Create array of initial states and initial symptoms

    Args:
        states (np.ndarray): array of states initialized with susceptible
        symptoms (np.ndarray): arplotray of symptoms initialized with S.NO_INFEC
        inf0 (float): list of percentages of population in each state [S, E, I, R]
        inf0_symp (float): percentage of infectious population with each symptom
            [asymp, symp, severe]
    """
    if homes_inf0_frac > 0 and homes_inf0_frac <= 1:
        if homes_inf0_dict == {}:
            self.event_log("Initialized states using init_states_homes", S.MSG_PRGS)
            return self.init_states_homes(inf0, inf0_symp, homes_inf0_frac)
        else:
            self.event_log("Initialized states using homes_inf0_dict", S.MSG_PRGS)
            return self.init_states_cond_prob(
                inf0, inf0_symp, homes_inf0_frac, homes_inf0_dict
            )
    else:
        return self.init_states_old(inf0, inf0_symp)

mark_for_new_position(self, to_change, placement, activity)

Mark particles for changes in position

Parameters:

Name Type Description Default
to_change np.ndarray

Mask or list of pids for particles to be changed

required
placement int

New placement

required
activity int

New activity

required
Source code in comorbuss/population.py
695
696
697
698
699
700
701
702
703
704
def mark_for_new_position(self, to_change, placement, activity):
    """Mark particles for changes in position

    Args:
        to_change (np.ndarray): Mask or list of pids for particles to be changed
        placement (int): New placement
        activity (int): New activity
    """
    self.placement[to_change] = placement
    self.activity[to_change] = activity

mean_time_in_state(self, state)

Calculates the mean time for all population in a given state.

Parameters:

Name Type Description Default
state int

id of the state, available states: S.STATE_S: 0, S.STATE_E = 1 and S.STATE_I: 2

required

Returns:

Type Description
float

Mean time in the given state.

Source code in comorbuss/population.py
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
def mean_time_in_state(self, state):
    """Calculates the mean time for all population in a given state.

    Args:
        state (int): id of the state, available states: S.STATE_S: 0,
            S.STATE_E = 1 and S.STATE_I: 2

    Returns:
        float: Mean time in the given state.
    """
    if state == S.STATE_S:
        time = self.time_exposed
    elif state == S.STATE_E:
        time = self.time_infectious - self.time_exposed
    elif state == S.STATE_I:
        time = self.time_recovered - self.time_infectious
    else:
        print(
            "Invalid state, mean only available to S.STATE_S, S.STATE_E and S.STATE_I."
        )
        return 0
    maskNotInf = (time > S.ninf) & (time < S.inf)
    return np.mean(time[maskNotInf])

process_encounters(self, nparticles, mask_particles, mask_encounters, plc, plc_inf_prob_weight, attributes, gen_new_inf)

Process encounters from an encounters mask.

Parameters:

Name Type Description Default
nparticles int

Number of particles in this context.

required
mask_particles np.ndarray

Mask of the particles in this context.

required
mask_encounters np.ndarray

Mask of nparticles x nparticles size with the encounters in this context.

required
plc int

Placement marker.

required
plc_inf_prob_weight float

Infection weight of the placement.

required
attributes dict

Attributes dict to be stored in the infection tree.

required
gen_new_inf function

Function to generate new infections.

required

Returns:

Type Description
[int, int, int, int]

Counts of encounters, encounters with infectious, possible infections and new infections.

Source code in comorbuss/population.py
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
def process_encounters(
    self,
    nparticles,
    mask_particles,
    mask_encounters,
    plc,
    plc_inf_prob_weight,
    attributes,
    gen_new_inf,
):
    """Process encounters from an encounters mask.

    Args:
        nparticles (int): Number of particles in this context.
        mask_particles (np.ndarray): Mask of the particles in this context.
        mask_encounters (np.ndarray): Mask of nparticles x nparticles size
            with the encounters in this context.
        plc (int): Placement marker.
        plc_inf_prob_weight (float): Infection weight of the placement.
        attributes (dict): Attributes dict to be stored in the infection tree.
        gen_new_inf (function): Function to generate new infections.

    Returns:
        [int, int, int, int]: Counts of encounters, encounters with infectious,
            possible infections and new infections.
    """
    mask_new_infected = np.zeros(nparticles, dtype=bool)

    # Get infectious and susceptible particles in this context
    mask_infectious = self.states[mask_particles] == S.STATE_I
    mask_susceptible = self.states[mask_particles] == S.STATE_S
    # Apply filters
    mask_encounters = self.encounters_filter(mask_encounters, mask_particles)
    self.update_tracing(
        self.tracing, self.gen_tracing(mask_encounters, mask_particles, nparticles)
    )
    ninfectious = int(np.count_nonzero(mask_infectious))
    possible_infections = 0
    encounters_infectious = 0
    new_infections = 0
    if ninfectious > 0:
        # Get a relation between index in this context and pids the the population
        to_pid = self.pid[mask_particles]
        # Get only encounters generated bu infectious particles
        mask_encounters_infectious = mask_encounters * mask_infectious.reshape(
            (nparticles, 1)
        )
        # Get only encounters to susceptible particles
        possibly_infected = (
            mask_susceptible.reshape((1, nparticles)) * mask_encounters_infectious
        )
        # Trace encounters I -> S
        self.update_tracing(
            self.tracing_infectious,
            self.gen_tracing(possibly_infected, mask_particles, nparticles),
        )
        # Get new infections in this context from disease
        mask_new_infections = gen_new_inf(
            possibly_infected,
            mask_particles,
            nparticles,
            self.disease.inf_probability[self.clk.step],
            self.disease.inf_susceptibility,
            plc_inf_prob_weight,
            self.disease.randgendis.uniform(
                0.0, 1.0, size=(nparticles, nparticles)
            ),
        )
        # Check for infection caused by multiple agents
        mask_have_multiple_infecters = (
            np.count_nonzero(mask_new_infections, axis=0) > 1
        )
        ids = np.arange(nparticles)
        for i in ids[mask_have_multiple_infecters]:
            chosen_source = self.rng.choice(ids[mask_new_infections[:, i]])
            mask_new_infections[:, i] = False
            mask_new_infections[chosen_source, i] = True
        # Generate the mask of new infections on this context
        mask_new_infected = np.any(mask_new_infections, axis=0)
        ninf = np.count_nonzero(mask_new_infected)
        # Stores important information
        if ninf > 0:
            source, infected = mask_new_infections.nonzero()
            new_infections = ninf
            self.inf_source[to_pid[infected]] = to_pid[source]
            self.infection_tree.add_edges_from(
                zip(to_pid[source], to_pid[infected])
            )
            nx.set_edge_attributes(
                self.infection_tree,
                {
                    edge: {
                        "[{}]infection_time".format(self.clk.step): self.clk.time
                    }
                    for edge in zip(to_pid[source], to_pid[infected])
                },
            )
            for i in range(len(infected)):
                attributes[to_pid[infected[i]]].update(
                    {
                        "inf_source": to_pid[source[i]],
                        "time_exposed": self.clk.time,
                        "inf_placement": self.placement[to_pid[infected[i]]],
                        "vector_symptoms": self.symptoms[to_pid[source[i]]],
                    }
                )
            self.mask_new_infected[mask_particles] = mask_new_infected
            self.spreads[self.clk.step, mask_particles] = np.count_nonzero(
                mask_new_infections, axis=1
            )
        possible_infections = np.count_nonzero(possibly_infected)
        encounters_infectious = np.count_nonzero(mask_encounters_infectious)
        self.challenges[self.clk.step, mask_particles] = np.count_nonzero(
            possibly_infected, axis=0
        )
    encounters = np.count_nonzero(mask_encounters) / 2

    return [encounters, encounters_infectious, possible_infections, new_infections]

to_exposed(self, attributes)

Do states transitions: Susceptible --> Exposed.

Parameters:

Name Type Description Default
attributes dict

Attributes dict to be stored in the infection tree.

required
Source code in comorbuss/population.py
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
def to_exposed(self, attributes):
    """Do states transitions: Susceptible --> Exposed.

    Args:
        attributes (dict): Attributes dict to be stored in the infection tree."""
    # Execute before infection functions
    for f in self.before_infection_functions:
        f()

    # Iterate on all contexts encounters data
    counts = self.gen_encounters_and_infections(attributes)

    # Stores important information
    self.infections_cumulative += counts[3]
    self.time_exposed[self.mask_new_infected] = self.clk.time
    self.states[self.mask_new_infected] = S.STATE_E
    self.inf_placement[self.mask_new_infected] = self.placement[
        self.mask_new_infected
    ]
    self.inf_vector_sympt[self.mask_new_infected] = self.symptoms[
        self.inf_source[self.mask_new_infected]
    ]
    self.count_cases()
    self.possible_infections[self.clk.step] = counts[2]
    self.encounters[self.clk.step] = counts[0]
    self.encounters_infectious[self.clk.step] = counts[1]

to_infectious(self, attributes)

Do states transitions: Exposed --> Infectious.

Parameters:

Name Type Description Default
attributes dict

Attributes dict to be stored in the infection tree.

required
Source code in comorbuss/population.py
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
def to_infectious(self, attributes):
    """Do states transitions: Exposed --> Infectious.

    Args:
        attributes (dict): Attributes dict to be stored in the infection tree."""
    # Check if any particle already exposed becomes an infectious one
    mask_become_infectious = self.disease.mask_to_infectious(self.states)
    self.states[mask_become_infectious] = S.STATE_I
    self.time_infectious[mask_become_infectious] = self.clk.time
    self.symptoms[mask_become_infectious] = S.SYMPT_NYET
    for p in self.pid[mask_become_infectious]:
        attributes[p].update({"time_infectious": self.clk.time})

to_recovered(self, attributes)

Do states transitions: {Symptomatic, Asymptomatic} --> Recovered or deceased.

Parameters:

Name Type Description Default
attributes dict

Attributes dict to be stored in the infection tree.

required
Source code in comorbuss/population.py
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
def to_recovered(self, attributes):
    """Do states transitions: {Symptomatic, Asymptomatic} --> Recovered or deceased.

    Args:
        attributes (dict): Attributes dict to be stored in the infection tree."""
    # Check if any infected particle recovered in the last time step
    mask_recover, mask_deceased = self.disease.mask_to_recovered(
        self.states, self.symptoms
    )
    self.maskNotYetSympRecovered = (self.symptoms == S.SYMPT_NYET) & mask_recover
    self.states[mask_recover] = S.STATE_R
    self.states[mask_deceased] = S.STATE_D
    self.mark_for_new_position(mask_deceased, S.PLC_CEMT, S.ACT_deceased)
    mask_recover_n_deceased = mask_recover | mask_deceased
    self.symptoms[mask_recover_n_deceased] = (
        10 * self.symptoms[mask_recover_n_deceased]
    )
    self.symptoms[self.maskNotYetSympRecovered] = S.NO_INFEC
    self.time_recovered[mask_recover_n_deceased] = self.clk.time
    for p in self.pid[mask_deceased]:
        attributes[p].update({"time_deceased": self.clk.time})
    for p in self.pid[mask_recover]:
        attributes[p].update({"time_recovered": self.clk.time})

    # Stores important information
    self.deceased_cumulative += np.count_nonzero(mask_deceased)

to_symptoms(self, attributes)

Do states transitions: Infectious --> {Symptomatic, Asymptomatic}.

Parameters:

Name Type Description Default
attributes dict

Attributes dict to be stored in the infection tree.

required
Source code in comorbuss/population.py
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
def to_symptoms(self, attributes):
    """Do states transitions: Infectious --> {Symptomatic, Asymptomatic}.

    Args:
        attributes (dict): Attributes dict to be stored in the infection tree."""
    # Check if after appropriate number of days, an infectious particle develops symptoms
    (
        mask_symptomatic,
        mask_asymptomatic,
        mask_severe,
    ) = self.disease.mask_to_symptoms(
        self.states, self.symptoms, self.time_infectious
    )
    self.symptoms[mask_symptomatic] = S.SYMPT_YES
    self.symptoms[mask_asymptomatic] = S.SYMPT_NO
    self.symptoms[mask_severe] = S.SYMPT_SEVERE
    self.time_activated[
        mask_symptomatic | mask_asymptomatic | mask_severe
    ] = self.clk.time
    for p in self.pid[mask_symptomatic]:
        attributes[p].update(
            {"time_activated": self.clk.time, "symptoms": S.SYMPT_YES}
        )
    for p in self.pid[mask_asymptomatic]:
        attributes[p].update(
            {"time_activated": self.clk.time, "symptoms": S.SYMPT_NO}
        )
    for p in self.pid[mask_severe]:
        attributes[p].update(
            {"time_activated": self.clk.time, "symptoms": S.SYMPT_SEVERE}
        )

    # Stores important information
    self.severe_cumulative += np.count_nonzero(mask_severe)
    self.symptoms_cumulative += np.count_nonzero(mask_symptomatic & mask_severe)

trace_particles(self, mask_origin, back_time)

Use tracing information to identify particles that had contact with particles in a mask.

Parameters:

Name Type Description Default
mask_origin np.ndarray

Mask with the particles search contacts.

required
back_time float

Window of time to search from current step.

required

Returns:

Type Description
np.ndarray

Mask with traced particles.

Source code in comorbuss/population.py
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
def trace_particles(self, mask_origin, back_time):
    """Use tracing information to identify particles that had contact with particles in a mask.

    Args:
        mask_origin (np.ndarray): Mask with the particles search contacts.
        back_time (float): Window of time to search from current step.

    Returns:
        np.ndarray: Mask with traced particles.
    """
    mask_tracing = mask_origin & self.has_tracing
    if np.sum(mask_tracing) > 0:
        traced_particles = []
        trace_back_step = np.max(
            [0, self.clk.time_to_steps(self.clk.time - back_time)]
        )
        for i in range(trace_back_step, self.clk.step):
            for p in self.pid[mask_tracing]:
                traced_particles.append(self.get_tracing(i, p))
        traced_particles = np.unique(np.concatenate(traced_particles, axis=0))
        mask_with_tracing = np.isin(traced_particles, self.pid[self.has_tracing])
        mask_traced = np.isin(self.pid, traced_particles[mask_with_tracing])
    else:
        mask_traced = mask_tracing
    return mask_traced

update_inf_tree_attributes(self, mask, attribute, value)

Stores information in the infection tree

Parameters:

Name Type Description Default
mask np.ndarray

Mask with particles to update.

required
attribute str

Name of the attribute to be stored.

required
value np.ndarray

Data to be stored.

required
Source code in comorbuss/population.py
615
616
617
618
619
620
621
622
623
624
625
626
627
def update_inf_tree_attributes(self, mask, attribute, value):
    """Stores information in the infection tree

    Args:
        mask (np.ndarray): Mask with particles to update.
        attribute (str): Name of the attribute to be stored.
        value (np.ndarray): Data to be stored.
    """
    attributes = {}
    attribute = "[{}]{}".format(self.clk.step, attribute)
    for p in self.pid[mask]:
        attributes[p] = {attribute: value}
    nx.set_node_attributes(self.infection_tree, attributes)

update_states(self)

Update the state of the disease in every particle at the current time

Source code in comorbuss/population.py
1105
1106
1107
1108
1109
1110
1111
1112
1113
def update_states(self):
    """Update the state of the disease in every particle at the current time"""
    # Dictionary to store infection tree attributes
    attributes = dict(self.empty_attributes)
    self.to_recovered(attributes)
    self.to_infectious(attributes)
    self.to_symptoms(attributes)
    self.to_exposed(attributes)
    nx.set_node_attributes(self.infection_tree, attributes)