|
| 1 | +''' |
| 2 | + PM4Py – A Process Mining Library for Python |
| 3 | +Copyright (C) 2026 Process Intelligence Solutions UG (haftungsbeschränkt) |
| 4 | +
|
| 5 | +This program is free software: you can redistribute it and/or modify |
| 6 | +it under the terms of the GNU Affero General Public License as |
| 7 | +published by the Free Software Foundation, either version 3 of the |
| 8 | +License, or any later version. |
| 9 | +
|
| 10 | +This program is distributed in the hope that it will be useful, |
| 11 | +but WITHOUT ANY WARRANTY; without even the implied warranty of |
| 12 | +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| 13 | +GNU Affero General Public License for more details. |
| 14 | +
|
| 15 | +You should have received a copy of the GNU Affero General Public License |
| 16 | +along with this program. If not, see this software project's root or |
| 17 | +visit <https://www.gnu.org/licenses/>. |
| 18 | +
|
| 19 | +Website: https://processintelligence.solutions |
| 20 | +Contact: info@processintelligence.solutions |
| 21 | +''' |
| 22 | + |
| 23 | +# Author: Maximilian Josef Frank (https://orcid.org/0000-0002-0714-7748) |
| 24 | + |
| 25 | +import random |
| 26 | +import itertools |
| 27 | + |
| 28 | +# typing |
| 29 | +from collections.abc import Iterable |
| 30 | +InputMap = dict[str,list[frozenset]] |
| 31 | +OutputMap = dict[str,list[frozenset]] |
| 32 | +Individual = tuple[InputMap,OutputMap] |
| 33 | + |
| 34 | +class iset(frozenset): |
| 35 | + "Indexable frozenset printing as set, i.e. without `frozenset(…)`" |
| 36 | + def __repr__(self): |
| 37 | + return "{" + repr(sorted(self))[1:-1] + "}" |
| 38 | + |
| 39 | + @staticmethod |
| 40 | + def flat(item: Iterable) -> Self: |
| 41 | + return iset(itertools.chain(*item)) |
| 42 | + |
| 43 | +def rand_partition(pool: Iterable) -> list[set]: |
| 44 | + pool = set(pool) |
| 45 | + # also ensures no activity in two partitions |
| 46 | + # s. 4. Causal Matrix, Def. 4; https://doi.org/10.1007/11494744_5 |
| 47 | + partition = [] |
| 48 | + while pool: |
| 49 | + draw = iset(random.sample( |
| 50 | + tuple(pool), |
| 51 | + random.randint(1, len(pool)) |
| 52 | + )) |
| 53 | + partition.append(draw) |
| 54 | + pool -= draw |
| 55 | + return partition |
| 56 | + |
| 57 | +def get_src_sink_sets_for_wfnet(I: InputMap, O: OutputMap, T: list[str]) -> tuple[list[str],list[str]]: |
| 58 | + """Determines input set and output set, which need to be connected by a place to create a WF-net""" |
| 59 | + def add2graphs(graphs, t, nextT): |
| 60 | + # find graph |
| 61 | + graph = next((g for g in graphs if t in g), None) |
| 62 | + if graph is None: |
| 63 | + graph = [t] |
| 64 | + graphs.append(graph) |
| 65 | + # append plain/grouped T |
| 66 | + successors = [] |
| 67 | + for S in nextT[t]: |
| 68 | + if type(S) == str: |
| 69 | + successors.append(S) |
| 70 | + else: |
| 71 | + successors.extend(S) |
| 72 | + graph += successors |
| 73 | + # merge if end = start |
| 74 | + for tn in successors: |
| 75 | + for g2 in graphs[:]: # [:] = copy |
| 76 | + if g2[0] == tn and g2 != graph: |
| 77 | + graph += g2 |
| 78 | + graphs.remove(g2) |
| 79 | + break |
| 80 | + return graphs |
| 81 | + graphsI, graphsO = [], [] |
| 82 | + for t in T: |
| 83 | + graphsI = add2graphs(graphsI, t, I) |
| 84 | + graphsO = add2graphs(graphsO, t, O) |
| 85 | + first = [g[0] for g in graphsO] # ⋃first = reachable via O[∀t] |
| 86 | + last = [g[0] for g in graphsI] # ⋃first = reachable via I[∀t] |
| 87 | + return first, last |
0 commit comments