Source code for pm4py.objects.petri_net.utils.embeddings_similarity

'''
    PM4Py – A Process Mining Library for Python
Copyright (C) 2024 Process Intelligence Solutions UG (haftungsbeschränkt)

This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU Affero General Public License for more details.

You should have received a copy of the GNU Affero General Public License
along with this program.  If not, see this software project's root or
visit <https://www.gnu.org/licenses/>.

Website: https://processintelligence.solutions
Contact: info@processintelligence.solutions
'''
import random
from typing import List

import numpy as np
import networkx as nx
from pm4py.objects.petri_net.obj import PetriNet


###############################################################################
# 1 ⸺ Convert the Petri net to a plain (bidirectional) graph
###############################################################################

def _petri_to_nx(net) -> nx.Graph:
    """
    Returns an undirected NetworkX graph with node attribute ``type``
    ∈ {'place', 'transition'} and edge attribute ``flow`` (weight 1).
    """
    G = nx.Graph()
    # add nodes
    for p in net.places:
        G.add_node(f"P_{p.name}", type="place")
    for t in net.transitions:
        G.add_node(f"T_{t.name}", type="transition")
    # add arcs
    for arc in net.arcs:
        u, v = arc.source, arc.target
        uid = f"{'P' if u in net.places else 'T'}_{u.name}"
        vid = f"{'P' if v in net.places else 'T'}_{v.name}"
        G.add_edge(uid, vid, flow=1)
    return G


###############################################################################
# 2 ⸺ Node2Vec walk generation (simplified, unbiased)
###############################################################################

def _random_walk(G: nx.Graph, start: str, length: int) -> List[str]:
    walk = [start]
    for _ in range(length - 1):
        cur = walk[-1]
        neigh = list(G.neighbors(cur))
        if not neigh:
            break
        walk.append(random.choice(neigh))
    return walk


def _generate_walks(G: nx.Graph,
                    num_walks: int = 10,
                    walk_length: int = 40,
                    seed: int = 42) -> List[List[str]]:
    random.seed(seed)
    nodes = list(G.nodes())
    walks = []
    for _ in range(num_walks):
        random.shuffle(nodes)
        for n in nodes:
            walks.append(_random_walk(G, n, walk_length))
    return walks


###############################################################################
# 3 ⸺ Train Skip-gram (gensim Word2Vec)
###############################################################################

def _train_word2vec(walks: List[List[str]],
                    dimensions: int = 64,
                    window: int = 5,
                    workers: int = 1,
                    epochs: int = 10):
    from gensim.models import Word2Vec

    return Word2Vec(
        sentences=walks,
        vector_size=dimensions,
        window=window,
        min_count=0,
        sg=1,            # skip-gram
        workers=workers,
        epochs=epochs,
    )


###############################################################################
# 4 ⸺ Read-out: aggregate node embeddings into **one vector per net**
###############################################################################

def _readout(model,
             G: nx.Graph,
             mode: str = "mean",
             transition_only: bool = False) -> np.ndarray:
    """
    Parameters
    ----------
    mode : {'mean', 'sum', 'max'}
        How to pool the node embeddings.
    transition_only : bool
        If True, use only transition nodes (ignores places).
    """
    nodes = [
        n for n, data in G.nodes(data=True)
        if (not transition_only) or data["type"] == "transition"
    ]
    vecs = np.array([model.wv[n] for n in nodes])
    if mode == "sum":
        return vecs.sum(axis=0)
    if mode == "max":
        return vecs.max(axis=0)
    return vecs.mean(axis=0)      # default


###############################################################################
# 5 ⸺ Public helper
###############################################################################

[docs] def petri_net_embedding(net, dimensions: int = 64, num_walks: int = 10, walk_length: int = 40, window: int = 5, workers: int = 1, epochs: int = 10, readout: str = "mean", transition_only: bool = False, seed: int = 42) -> np.ndarray: """ Returns ------- np.ndarray (shape = [dimensions,]) """ G = _petri_to_nx(net) walks = _generate_walks(G, num_walks, walk_length, seed) w2v = _train_word2vec(walks, dimensions, window, workers, epochs) return _readout(w2v, G, readout, transition_only)
[docs] def cosine_similarity(a, b, eps=1e-10): a = np.asarray(a, dtype=float) b = np.asarray(b, dtype=float) num = np.dot(a, b) # dot product denom = np.linalg.norm(a) * np.linalg.norm(b) if denom < eps: # avoid division-by-zero return 0.0 # or raise an error / np.nan return num / denom
[docs] def apply(net1: PetriNet, net2: PetriNet) -> float: """ Computes the embeddings-based similarity between two Petri nets, based on the approach described in: Colonna, Juan G., et al. "Process mining embeddings: Learning vector representations for Petri nets." Intelligent Systems with Applications 23 (2024): 200423. Parameters ---------------- net1 Petri net net2 Second Petri net Returns ----------------- similarity_metric Cosine similarity between the two embeddings """ emb1 = petri_net_embedding(net1) emb2 = petri_net_embedding(net2) return cosine_similarity(emb1, emb2)