Skip to content

tools.py

check_dir(path, complete_path=None)

Checks if a directory exists, if not recursively creates it.

Parameters:

Name Type Description Default
path str

Path to directory.

required
complete_path str

Should not be set by user, original path to be printed if an error is encountered.

None

Exceptions:

Type Description
OSError

Unable to create directory.

Source code in comorbuss/tools.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
def check_dir(path, complete_path=None):
    """Checks if a directory exists, if not recursively creates it.

    Args:
        path (str): Path to directory.
        complete_path (str): Should not be set by user, original path to be printed if
            an error is encountered.

    Raises:
        OSError: Unable to create directory.
    """
    complete_path = complete_path or path
    if not os.path.isdir(path):
        base_d, tail_d = os.path.split(path)
        if tail_d == "":
            raise OSError("Unable to create '{}' directory.".format(complete_path))
        check_dir(base_d, complete_path)
        os.mkdir(path)

e_dist(a, b, metric='euclidean')

Distance calculation for 1D, 2D and 3D points using einsum : a, b - list, tuple, array in 1, 2 or 3D form : metric - euclidean ('e', 'eu'...), sqeuclidean ('s', 'sq'...), :-----------------------------------------------------------------------

Source code in comorbuss/tools.py
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
def e_dist(a, b, metric="euclidean"):
    """Distance calculation for 1D, 2D and 3D points using einsum
    : a, b   - list, tuple, array in 1, 2 or 3D form
    : metric - euclidean ('e', 'eu'...), sqeuclidean ('s', 'sq'...),
    :-----------------------------------------------------------------------
    """
    a = np.asarray(a)
    b = np.atleast_2d(b)
    a_dim = a.ndim
    b_dim = b.ndim
    if a_dim == 1:
        a = a.reshape(1, 1, a.shape[0])
    if a_dim >= 2:
        a = a.reshape(np.prod(a.shape[:-1]), 1, a.shape[-1])
    if b_dim > 2:
        b = b.reshape(np.prod(b.shape[:-1]), b.shape[-1])
    diff = a - b
    dist_arr = np.einsum("ijk, ijk->ij", diff, diff)
    if metric[:1] == "e":
        dist_arr = np.sqrt(dist_arr)
    dist_arr = np.squeeze(dist_arr)
    return dist_arr

efficacy_calc(cases_vacc, not_cases_vacc, cases_not_vacc, not_cases_not_vacc, return_limits=False)

Calculates efficacy of a vaccine from the number of cases and not cases.

Parameters:

Name Type Description Default
cases_vacc int

Cases in the vaccinated population.

required
not_cases_vacc int

Not cases in the vaccinated population.

required
cases_not_vacc int

Cases in the not vaccinated population.

required
not_cases_not_vacc int

Not cases in the not vaccinated population.

required
return_limits bool

Return also upper and lower limits of efficacy.

False

Returns:

Type Description
float

float: [description]

Source code in comorbuss/tools.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
def efficacy_calc(
    cases_vacc: int,
    not_cases_vacc: int,
    cases_not_vacc: int,
    not_cases_not_vacc: int,
    return_limits: bool = False,
) -> float:
    """Calculates efficacy of a vaccine from the number of cases and not cases.

    Args:
        cases_vacc (int): Cases in the vaccinated population.
        not_cases_vacc (int): Not cases in the vaccinated population.
        cases_not_vacc (int): Cases in the not vaccinated population.
        not_cases_not_vacc (int): Not cases in the not vaccinated population.
        return_limits (bool): Return also upper and lower limits of efficacy.

    Returns:
        float: [description]
    """
    rv = cases_vacc / (cases_vacc + not_cases_vacc)
    rnv = cases_not_vacc / (cases_not_vacc + not_cases_not_vacc)
    rr = rv / rnv
    efficacy = 1 - rr
    if return_limits:
        rr_lower = np.exp(
            np.log(rr)
            - 1.96 * np.sqrt(((1 - rv) / cases_vacc) + ((1 - rnv) / cases_not_vacc))
        )
        rr_upper = np.exp(
            np.log(rr)
            + 1.96 * np.sqrt(((1 - rv) / cases_vacc) + ((1 - rnv) / cases_not_vacc))
        )
        efficacy_limts = (1 - rr_lower, 1 - rr_upper)
        return efficacy, efficacy_limts
    else:
        return efficacy

filter_parse(f, comm, allowed_filters=[], comm_str='self.comm')

Get a tuple of comparative and boolean operations and transform it in a expression that can be evaluated.

Parameters:

Name Type Description Default
f tuple

Tuple with the operations.

required
comm community

Community object of the simulation.

required
allowed_filters str

List with allowed custom filters ("days", "tracing", "workers, "service", "diagnostic" or "vaccination").

[]

Returns:

Type Description
str, bool

Expression that can be evaluated., True if parse successful.

Source code in comorbuss/tools.py
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
def filter_parse(f, comm, allowed_filters=[], comm_str="self.comm"):
    """Get a tuple of comparative and boolean operations and transform it in a expression that
        can be evaluated.

    Args:
        f (tuple): Tuple with the operations.
        comm (community): Community object of the simulation.
        allowed_filters (str): List with allowed custom filters ("days", "tracing", "workers, "service", "diagnostic" or "vaccination").

    Returns:
        str, bool: Expression that can be evaluated., True if parse successful.
    """
    out = ""
    ok = True
    # Check is filter is days
    if type(f) is tuple and len(f) == 2 and type(f[0]) is str:
        if f[0].lower() == "days":
            if "days" in allowed_filters:
                try:
                    return (
                        "self.clk.time_since(self.pop.time_quarantined)>={}".format(
                            comm.clk.days_to_time(float(f[1]))
                        ),
                        ok,
                    )
                except:
                    pass
            else:
                comm.event_log("Days can not be used in this filter.", S.MSG_WRNG)
                ok = False
    # Check if filter is tracing
    if type(f) is tuple and len(f) == 3 and type(f[0]) is str:
        if f[0].lower() == "tracing":
            if "tracing" in allowed_filters:
                parsed_str, ok = filter_parse(
                    f[1], comm, allowed_filters, comm_str=comm_str
                )
                tracing_lenght = comm.clk.days_to_time(float(f[2]))
                tracing_steps = comm.clk.time_to_steps(tracing_lenght)
                comm.pop.enable_tracing(buffer_length=tracing_steps + 2)
                try:
                    return (
                        "self.pop.trace_particles({}, {})".format(
                            parsed_str, tracing_lenght
                        ),
                        ok,
                    )
                except:
                    pass
            else:
                comm.event_log("Tracing can not be used in this filter.", S.MSG_WRNG)
                ok = False
    # Check if filter is workers
    if type(f) is tuple and len(f) == 3 and type(f[0]) is str:
        if f[0].lower() == "workers":
            if "workers" in allowed_filters:
                try:
                    for wrkr in comm.srv[f[1]].workers_parameters:
                        if wrkr["name"] == f[2]:
                            return wrkr["id"], ok
                except:
                    pass
            else:
                comm.event_log("Workers can not be used in this filter.", S.MSG_WRNG)
                ok = False
    # Check if filter is service
    if type(f) is tuple and len(f) == 2 and type(f[0]) is str:
        if f[0].lower() == "service":
            if "service" in allowed_filters:
                try:
                    return comm.srv[f[1]].id, ok
                except:
                    pass
            else:
                comm.event_log("Service can not be used in this filter.", S.MSG_WRNG)
                ok = False
    # Check if filter is a vaccination
    if type(f) is tuple and len(f) == 2 and type(f[0]) is str:
        if f[0].lower() == "vaccination":
            if "vaccination" in allowed_filters and "vaccinations" in comm.modules:
                try:
                    return comm.vaccinations[f[1]].id, ok
                except KeyError:
                    pass
            else:
                comm.event_log(
                    "Vaccination can not be used in this filter.", S.MSG_WRNG
                )
                ok = False
    # Check if filter is a vaccination_attribute
    if type(f) is tuple and len(f) == 3 and type(f[0]) is str:
        if f[0].lower() == "vaccination_attr":
            if "vaccination_attr" in allowed_filters and hasattr(comm, "vaccinations"):
                try:
                    eval("comm.vaccinations[{}].{}".format(f[1], f[2]))
                    return "{}.vaccinations[{}].{}".format(comm_str, f[1], f[2]), ok
                except KeyError:
                    pass
            else:
                comm.event_log(
                    "Vaccination can not be used in this filter.", S.MSG_WRNG
                )
                ok = False
    # Check if filter is a diagnostic
    if type(f) is tuple and len(f) == 2 and type(f[0]) is str:
        if f[0].lower() == "diagnostic":
            if "diagnostic" in allowed_filters:
                for diag in comm.diagnostics:
                    if diag.name.lower() == f[1].lower():
                        return diag.id, ok
    # Check if filter is a diagnostic
    if type(f) is tuple and len(f) == 2 and type(f[0]) is str:
        if f[0].lower() == "module":
            if "module" in allowed_filters:
                if hasattr(comm, f[1]):
                    return "{}.{}".format(comm_str, f[1]), ok
            else:
                comm.event_log("Diagnostic can not be used in this filter.", S.MSG_WRNG)
                ok = False
    # Check if filter is isin
    if type(f) is tuple and len(f) == 3 and type(f[1]) is str:
        if f[1].lower() == "isin":
            try:
                eval("comm.pop.{}".format(f[0]))
                return "np.isin({}.pop.{}, {})".format(comm_str, f[0], list(f[2])), ok
            except:
                try:
                    eval("comm.{}".format(f[0]))
                    return "np.isin({}.{}, {})".format(comm_str, f[0], list(f[2])), ok
                except:
                    pass
    # If neither treat as comparative and boolean operations
    for item in f:
        if type(item) is tuple or type(item) is list:
            parsed_str, ok = filter_parse(
                item, comm, allowed_filters, comm_str=comm_str
            )
            out += "({})".format(parsed_str)
        else:
            # Test if current element can be interpreted as an population attribute
            try:
                eval("comm.pop.{}".format(item))
                item = "{}.pop.{}".format(comm_str, item)
            except:
                # Test if current element can be interpreted as a community attribute
                try:
                    eval("comm.{}".format(item))
                    item = "{}.{}".format(comm_str, item)
                except:
                    pass
            out += str(item)
    return out, ok

function(f)

Checks if f is callable, if true returns it, else raise TypeError.

Parameters:

Name Type Description Default
f callable

To be tested

required
Source code in comorbuss/tools.py
67
68
69
70
71
72
73
74
75
76
def function(f):
    """Checks if f is callable, if true returns it, else raise TypeError.

    Args:
        f (callable): To be tested
    """
    if callable(f):
        return f
    else:
        raise TypeError("{} is not callable.".format(f))

get_enc_pct_matrix(tracing, Nparticles, time_series=[], query_label=0, time_slice=slice(1, None, None))

This function returns a percentage, for each entry of the matrix. The entry (i, j) represents the percentage of the time steps for which the particle i encounters particle j, if the time series match the query_label

Source code in comorbuss/tools.py
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
def get_enc_pct_matrix(
    tracing, Nparticles, time_series=[], query_label=0, time_slice=slice(1, None)
):
    """This function returns a percentage, for each entry of the matrix. The
    entry (i, j) represents the percentage of the time steps for which the
    particle i encounters particle j, if the time series match the query_label"""
    time_series = check_time_series(tracing, time_series, Nparticles)
    rows = []
    cols = []
    slice_size = 0
    slice_index_array = np.arange(len(tracing))
    # loop over time steps
    for step, dict in zip(slice_index_array[time_slice], tracing[time_slice]):
        slice_size += 1
        for key in dict:
            key_values = dict[key]
            mask_add = time_series[step, key_values] == query_label
            mask_add = mask_add | (time_series[step, key] == query_label)
            for i, v in enumerate(mask_add):
                if v:
                    rows.append(key)
                    cols.append(key_values[i])
    data = np.full(len(rows), 1.0 / slice_size)
    return csr_matrix((data, (rows, cols)), shape=(Nparticles, Nparticles)), len(rows)

get_encounters_array_age_group(tracing, ages, ageGroups=array([ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16,17, 18, 19, 20]), dt=1, slice_rng=slice(1, None, None))

This function returns an array whose entry i is the average number of encounters that a particle in the i-th age group has with another particle along a day

Source code in comorbuss/tools.py
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
def get_encounters_array_age_group(
    tracing, ages, ageGroups=S.ALL_AGES, dt=1, slice_rng=slice(1, None)
):
    """This function returns an array whose entry i is the average number of
    encounters that a particle in the i-th age group has with another particle
    along a day"""
    Nparticles = len(ages)
    [enc_pct_matrix, _] = get_enc_pct_matrix(tracing, Nparticles, [], 0, slice_rng)
    arr_ageG = np.zeros(len(ageGroups))
    mtx_ageG = get_encounters_matrix_age_group(enc_pct_matrix, ages, ageGroups)
    ageG_num = np.zeros(len(ageGroups))
    for ageGroup in ageGroups:
        ageG_num[ageGroup] = np.count_nonzero(ages == ageGroup)
        if ageG_num[ageGroup] > 0:
            arr_ageG[ageGroup] = np.sum(mtx_ageG[ageGroup, :]) / (
                dt * ageG_num[ageGroup] / 24
            )
    for row in ageGroups:
        for col in ageGroups:
            if mtx_ageG[row, col] > 0:
                mtx_ageG[row, col] /= dt * len(ages) / 24
    return arr_ageG, mtx_ageG

get_encounters_matrix_age_group(matrix, ages, ageGroups)

This function returns a matrix mtx_ageG which concentrates the entries of the provided matrix according to the age groups in ages

Source code in comorbuss/tools.py
419
420
421
422
423
424
425
426
427
def get_encounters_matrix_age_group(matrix, ages, ageGroups):
    """This function returns a matrix mtx_ageG which concentrates the entries
    of the provided matrix according to the age groups in ages"""
    mtx_ageG = np.zeros((len(ageGroups), len(ageGroups)))
    coo_mtx = matrix.tocoo()
    for ind, row in enumerate(coo_mtx.row):
        col = coo_mtx.col[ind]
        mtx_ageG[ages[row], ages[col]] += coo_mtx.data[ind]
    return mtx_ageG

in_list_of_dicts(list, item, key='name', return_dict=False)

Checks if an item is contained in a list of dicts.

Parameters:

Name Type Description Default
list list

List to search.

required
item any

Item to be searched.

required
key str

Key to search for item. If None will search on all keys of the dictionaries. Defaults to 'name'.

'name'
return_dict bool

Returns dict instead of True. Defaults to False.

False

Returns:

Type Description
bool

True if item is found

Source code in comorbuss/tools.py
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
def in_list_of_dicts(list, item, key="name", return_dict=False):
    """Checks if an item is contained in a list of dicts.

    Args:
        list (list): List to search.
        item (any): Item to be searched.
        key (str, optional): Key to search for item. If None will search on all keys of
            the dictionaries. Defaults to 'name'.
        return_dict (bool): Returns dict instead of True. Defaults to False.

    Returns:
        bool: True if item is found
    """
    for dict in list:
        if key != None:
            keys = [key]
        else:
            keys = dict.keys()
        for key in keys:
            if dict[key] == item:
                if return_dict:
                    return dict
                else:
                    return True
    return False

load_demographics(city, state, parameters, filename='/home/nano/Apps/anaconda3/lib/python3.7/site-packages/COMORBUSS-1.0.0-py3.7.egg/data/demographic_data_brazil.csv')

Loads population data and basic services information from database.

Parameters:

Name Type Description Default
city str

Name of the city.

required
state str

Two characters state abbreviation.

required
parameters dict

Parameters dictionary

required
filename str

File with the csv database. Defaults to DEMO_DATA_FILE.

'/home/nano/Apps/anaconda3/lib/python3.7/site-packages/COMORBUSS-1.0.0-py3.7.egg/data/demographic_data_brazil.csv'
Source code in comorbuss/tools.py
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
def load_demographics(city, state, parameters, filename=S.DEMO_DATA_FILE):
    """Loads population data and basic services information from database.

    Args:
        city (str): Name of the city.
        state (str): Two characters state abbreviation.
        parameters (dict): Parameters dictionary
        filename (str, optional): File with the csv database. Defaults to DEMO_DATA_FILE.
    """
    # Remove accents and convert to upper case city and state
    city_name = city
    city = city.upper()
    state = state.upper()
    loaded = False
    try:
        data = pd.read_csv(filename)
        loaded = True
    except:
        print(
            "["
            + S.MSG_TEXTS[S.MSG_WRNG]
            + "] Coudn't open demographic database from file: \""
            + filename
            + '". Proceeding with default demographic data'
        )
    if loaded:
        filter_city = data["CITY"].str.match(city)
        filter_state = data["STATE"].str.match(state)
        line = data[filter_city & filter_state]
        if len(line) == 1:
            parameters["city_name"] = city_name
            parameters["city_area"] = float(line["AREA_URB"].iloc[0])
            parameters["population_ages"] = [
                int(line["0 a 4 anos"].iloc[0]),  # 0-4 yo
                int(line["5 a 9 anos"].iloc[0]),  # 5-9 yo
                int(line["10 a 14 anos"].iloc[0]),  # 10-14 yo
                int(line["15 a 19 anos"].iloc[0]),  # 15-19 yo
                int(line["20 a 24 anos"].iloc[0]),  # 20-24 yo
                int(line["25 a 29 anos"].iloc[0]),  # 25-29 yo
                int(line["30 a 34 anos"].iloc[0]),  # 30-34 yo
                int(line["35 a 39 anos"].iloc[0]),  # 35-39 yo
                int(line["40 a 44 anos"].iloc[0]),  # 40-44 yo
                int(line["45 a 49 anos"].iloc[0]),  # 45-49 yo
                int(line["50 a 54 anos"].iloc[0]),  # 50-54 yo
                int(line["55 a 59 anos"].iloc[0]),  # 55-59 yo
                int(line["60 a 64 anos"].iloc[0]),  # 60-64 yo
                int(line["65 a 69 anos"].iloc[0]),  # 65-69 yo
                int(line["70 a 74 anos"].iloc[0]),  # 70-74 yo
                int(line["75 a 79 anos"].iloc[0]),  # 75-79 yo
                int(line["80 a 84 anos"].iloc[0]),  # 80-84 yo
                int(line["85 a 89 anos"].iloc[0]),  # 85-89 yo
                int(line["90 a 94 anos"].iloc[0]),  # 90-94 yo
                int(line["95 a 99 anos"].iloc[0]),  # 95-99 yo
                int(line["100 anos ou mais"].iloc[0]),  # 100+ yo
            ]
            parameters["persons_per_home"] = float(
                line["Mean number of persons per house"].iloc[0]
            )
            parameters["number_of_markets"] = int(line["Number of markets"].iloc[0])
            parameters["number_of_hospitals"] = int(line["Number of hospitals"].iloc[0])
            parameters["number_of_schools"] = int(line["Number of schools"].iloc[0])
            parameters["number_of_restaurants"] = int(
                line["Number of restaurants"].iloc[0]
            )
            print(
                "["
                + S.MSG_TEXTS[S.MSG_PRGS]
                + "] Loaded demographic data for "
                + city
                + "-"
                + state
            )
        elif len(line > 1):
            print(
                "["
                + S.MSG_TEXTS[S.MSG_WRNG]
                + "] More than one city found in demographic database. Proceeding with default demographic data"
            )
        else:
            print(
                "["
                + S.MSG_TEXTS[S.MSG_WRNG]
                + "] City not found in demographic database, check name and state. Proceeding with default demographic data"
            )
    return parameters

load_hdf5(filename='', keep_open=False, version_check=True)

Loads simulation parameters and results from an hdf5 file.

Parameters:

Name Type Description Default
filename str

Filename to load, if not given a prompt window will be shown to the user to select the file.

''

Returns:

Type Description
dict, string

Loaded data and filename for the loaded file.

Source code in comorbuss/tools.py
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
def load_hdf5(filename="", keep_open=False, version_check=True):
    """Loads simulation parameters and results from an hdf5 file.

    Args:
        filename (str, optional): Filename to load, if not given a prompt window
            will be shown to the user to select the file.

    Returns:
        dict, string: Loaded data and filename for the loaded file.
    """
    gotData = False
    retry = False
    # If filename is not given ask user to select a file
    if filename == "":
        filename = ask_for_file()
    data = dict()
    while not gotData:
        # Try to open the file and load data
        try:
            results = h5dict.File(filename, "r")
            if version_check:
                try:
                    if not results["version"] in S.COMPATIBLE_HDF5:
                        error_message = "Selected file is from COMORBUSS {} and not compatible with current version of COMORBUSS.".format(
                            results["version"]
                        )
                        raise RuntimeError("Wrong hdf5 version.")
                except KeyError:
                    error_message = "Selected file is from an old version of COMORBUSS (pre v0.3.0) and not compatible with current version of COMORBUSS."
                    raise
            if not keep_open:
                data = results.to_dict()
                results.close()
            gotData = True
        # If file can't be opened ask user again for file
        except:
            gotData = False
            for line in traceback.format_exception(*sys.exc_info()):
                # print(line)
                pass
            try:
                retry = ask_retry(error_message)
            except:
                retry = ask_retry(line)
            if retry:
                filename = ask_for_file()
        # if user clicks cancel exitis without data
        if not retry:
            if not gotData:
                results = {}
            break
    if keep_open:
        return results, filename
    else:
        return data, filename

normalize(l)

Normalizes a list of floats.

Parameters:

Name Type Description Default
list list

List of floats.

required

Returns:

Type Description
list

Normalized list.

Source code in comorbuss/tools.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
def normalize(l):
    """Normalizes a list of floats.

    Args:
        list (list): List of floats.

    Returns:
        list: Normalized list.
    """
    norm_f = np.sum(l)
    new_l = []
    for i in l:
        new_l.append(i / norm_f)
    return np.array(new_l)

plot_enc_pct_matrix(tracing, time_series=[], query_label=0, slice_rng=slice(1, None, None), size=(5, 4), title='')

This function returns a probability matrix. The entry (i, j) of the matrix is the percentage of time steps in slice_rng for which: 1-there are encounters between particles i and j, and 2-the time_series for the step where the encounter ocurred matches the query_label. This matrix can be interpreted as the probability that one particle encounters another when time_series matches query_label

Source code in comorbuss/tools.py
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
def plot_enc_pct_matrix(
    tracing,
    time_series=[],
    query_label=0,
    slice_rng=slice(1, None),
    size=(5, 4),
    title="",
):
    """This function returns a probability matrix. The entry (i, j) of the matrix
    is the percentage of time steps in slice_rng for which: 1-there are
    encounters between particles i and j, and 2-the time_series for the step
    where the encounter ocurred matches the query_label. This matrix can be
    interpreted as the probability that one particle encounters another when
    time_series matches query_label"""
    if title == "":
        title = "Query Label {:}".format(query_label)
    [enc_pct_matrix, _] = get_enc_pct_matrix(
        tracing, time_series, query_label, slice_rng
    )
    coo_mtx = enc_pct_matrix.tocoo()
    plt.figure(figsize=size)
    plt.title(title)
    ax = plt.gca()
    ax.set_xlim(0, coo_mtx.shape[0])
    ax.set_ylim(0, coo_mtx.shape[1])
    sc = plt.scatter(coo_mtx.row, coo_mtx.col, c=coo_mtx.data, s=1)
    plt.colorbar(sc)
    plt.tight_layout(pad=0.5)
    plt.show()
    return enc_pct_matrix

recursive_copy(object)

Makes a recursive copy of a nested dict or list object.

Parameters:

Name Type Description Default
object Union[dict, list]

Object to be copied.

required

Returns:

Type Description
Union[dict, list]

Union[dict, list]: Copy of the original object.

Source code in comorbuss/tools.py
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
def recursive_copy(object: Union[dict, list]) -> Union[dict, list]:
    """Makes a recursive copy of a nested dict or list object.

    Args:
        object (Union[dict, list]): Object to be copied.

    Returns:
        Union[dict, list]: Copy of the original object.
    """
    if type(object) is dict:
        return {key: recursive_copy(value) for key, value in object.items()}
    elif type(object) is list:
        return [recursive_copy(value) for value in object]
    else:
        try:
            return object.copy()
        except AttributeError:
            return object

save_hdf5(comms, out_file, to_store=[], to_store_srvc=[], skip_defaults=False, try_append=False)

Store simulation parameters and results in hdf5 files.

Parameters:

Name Type Description Default
comms list

A list of comunities to be stored (all comunities must have the same parameters and vary only the seed).

required
out_file string

Filename to save data.

required
to_store list

Extras results to be stored, must follow TO_STORE format. Defaults to [].

[]
to_store_srvc list

Extra services data to be stored, must follow TO_STORE_SRVC format. Defaults to [].

[]
skip_defaults bool

Skip default to stores (TO_STORE and TO_STORE_SRVC). Defaults to False.

False
try_append bool

Try to append to an existing file (file must have the same parameters). Defaults to False.

False
Source code in comorbuss/tools.py
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
def save_hdf5(
    comms,
    out_file,
    to_store=[],
    to_store_srvc=[],
    skip_defaults=False,
    try_append=False,
):
    """Store simulation parameters and results in hdf5 files.

    Args:
        comms (list): A list of comunities to be stored (all comunities must have
            the same parameters and vary only the seed).
        out_file (string): Filename to save data.
        to_store (list, optional): Extras results to be stored, must follow
            `TO_STORE` format. Defaults to [].
        to_store_srvc (list, optional): Extra services data to be stored, must
            follow `TO_STORE_SRVC` format. Defaults to [].
        skip_defaults (bool, optional): Skip default to stores (`TO_STORE` and
            `TO_STORE_SRVC`). Defaults to False.
        try_append (bool, optional): Try to append to an existing file (file must have
            the same parameters). Defaults to False.
    """
    # If try_append check if file exists, if yes open it in append mode, else create the file
    if try_append and os.path.isfile(out_file):
        mode = "a"
        append = True
    else:
        mode = "w"
        append = False
    # Try to open the output file, retry up to 100000 times, another thread might be using same file
    fopen = False
    while not fopen:
        try:
            out = h5dict.File(out_file, mode)
            fopen = True
        except:
            try:
                count_retry += 1
                time.sleep(0.1)
            except:
                count_retry = 0
            if count_retry >= 10000:
                raise
    if mode == "w":
        out["version"] = S.VERSION
    else:
        if out["version"] != S.VERSION:
            print(
                "[Error] Trying to append to a hdf5 file with a different version, file version is {}, current version is {}.".format(
                    out["version"], S.VERSION
                )
            )
            out.close()
            return None
    cfilter = {"compression": "lzf"}
    load_from_comms(comms, out, to_store, to_store_srvc, skip_defaults, cfilter, append)
    out.close()