Source code for pm4py.visualization.dfg.util.dfg_gviz

'''
    PM4Py – A Process Mining Library for Python
Copyright (C) 2024 Process Intelligence Solutions UG (haftungsbeschränkt)

This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU Affero General Public License for more details.

You should have received a copy of the GNU Affero General Public License
along with this program.  If not, see this software project's root or
visit <https://www.gnu.org/licenses/>.

Website: https://processintelligence.solutions
Contact: info@processintelligence.solutions
'''

import tempfile
from copy import copy
import sys

from graphviz import Digraph
from pm4py.util import constants
from typing import Dict, List, Tuple
from collections import defaultdict, deque
from pm4py.util.vis_utils import (
    human_readable_stat,
    get_arc_penwidth,
    get_trans_freq_color,
    value_to_color,
)


[docs] def get_activities_color(activities_count): """ Get frequency color for attributes Parameters ----------- activities_count Count of attributes in the log Returns ----------- activities_color Color assigned to attributes in the graph """ activities_color = {} min_value, max_value = get_min_max_value(activities_count) for ac in activities_count: v0 = activities_count[ac] v1 = get_trans_freq_color(v0, min_value, max_value) activities_color[ac] = v1 return activities_color
[docs] def get_activities_color_serv_time(serv_time): """ Gets the color for the activities based on the service time Parameters ---------------- serv_time Service time Returns ---------------- act_color Dictionary associating each activity to a color based on the service time """ activities_color = {} min_soj_time, max_soj_time = get_min_max_value(serv_time) for ac in serv_time: act_soj_time = serv_time[ac] trans_base_color = int( 255 - 100 * (act_soj_time - min_soj_time) / (max_soj_time - min_soj_time + 0.00001) ) trans_base_color_hex = str(hex(trans_base_color))[2:].upper() activities_color[ac] = ( "#" + "FF" + trans_base_color_hex + trans_base_color_hex ) return activities_color
[docs] def get_min_max_value(dfg): """ Gets min and max value assigned to edges in DFG graph Parameters ----------- dfg Directly follows graph Returns ----------- min_value Minimum value in directly follows graph max_value Maximum value in directly follows graph """ min_value = 9999999999 max_value = -1 for edge in dfg: if dfg[edge] < min_value: min_value = dfg[edge] if dfg[edge] > max_value: max_value = dfg[edge] return min_value, max_value
[docs] def assign_penwidth_edges(dfg): """ Assign penwidth to edges in directly-follows graph Parameters ----------- dfg Direcly follows graph Returns ----------- penwidth Graph penwidth that edges should have in the direcly follows graph """ penwidth = {} min_value, max_value = get_min_max_value(dfg) for edge in dfg: v0 = dfg[edge] v1 = get_arc_penwidth(v0, min_value, max_value) penwidth[edge] = str(v1) return penwidth
[docs] def sort_dfg_reachability( dfg: List[Tuple[str, str]], start_activities_to_include: List[str], end_activities_to_include: List[str], ) -> Tuple[List[str], List[Tuple[str, str]]]: """ Sort the edges of the directly-follows graph based on reachability principles (start activities are putting at the beginning, end activities at the end) Parameters ---------------- dfg List of edges of the directly-follows graph (without frequency/performance annotation) start_activities_to_include Start activities end_activities End activities Returns ---------------- sorted_activities Activities sorted by reachability sorted_edges Edges sorted by reachability """ # identify all unique activities activities_dfg = set(x[0] for x in dfg).union(set(x[1] for x in dfg)) # create adjacency lists and in-degree count adjacency_list = defaultdict(list) in_degree = defaultdict(int) for u, v in dfg: adjacency_list[u].append(v) in_degree[v] += 1 if u not in in_degree: in_degree[u] = 0 # initialize the queue with start activities queue = deque(start_activities_to_include) distance = {activity: 0 for activity in start_activities_to_include} # ensure all activities are present in the distance dictionary for activity in activities_dfg: if activity not in distance: distance[activity] = float("inf") # perform BFS to calculate the distance of each activity from the start # activities while queue: current = queue.popleft() current_distance = distance[current] for neighbor in adjacency_list[current]: if distance[neighbor] > current_distance + 1: distance[neighbor] = current_distance + 1 queue.append(neighbor) # sort edges based on the distance of their source activities def edge_priority(edge): u, v = edge if u in start_activities_to_include: return (0, distance[u], distance[v], u, v) if v in end_activities_to_include: return (2, distance[u], distance[v], u, v) return (1, distance[u], distance[v], u, v) sorted_edges = sorted(dfg, key=edge_priority) # Step 6: Sort activities based on their distance sorted_activities = sorted(activities_dfg, key=lambda x: (distance[x], x)) return sorted_activities, sorted_edges
[docs] def graphviz_visualization( activities_count, dfg, image_format="png", measure="frequency", max_no_of_edges_in_diagram=100000, start_activities=None, end_activities=None, serv_time=None, font_size="12", bgcolor=constants.DEFAULT_BGCOLOR, rankdir=constants.DEFAULT_RANKDIR_GVIZ, enable_graph_title: bool = constants.DEFAULT_ENABLE_GRAPH_TITLES, graph_title: str = "Directly-Follows Graph", ): """ Do GraphViz visualization of a DFG graph Parameters ----------- activities_count Count of attributes in the log (may include attributes that are not in the DFG graph) dfg DFG graph image_format GraphViz should be represented in this format measure Describes which measure is assigned to edges in direcly follows graph (frequency/performance) max_no_of_edges_in_diagram Maximum number of edges in the diagram allowed for visualization start_activities Start activities of the log end_activities End activities of the log serv_time For each activity, the service time in the log font_size Size of the text on the activities/edges bgcolor Background color of the visualization (i.e., 'transparent', 'white', ...) rankdir Direction of the graph ("LR" for left-to-right; "TB" for top-to-bottom) enable_graph_title Enables the visualization of a graph's title graph_title Graph title to display (if enable_graph_title) Returns ----------- viz Digraph object """ if start_activities is None: start_activities = [] if end_activities is None: end_activities = [] filename = tempfile.NamedTemporaryFile(suffix=".gv") filename.close() viz = Digraph( "", filename=filename.name, engine="dot", graph_attr={"bgcolor": bgcolor, "rankdir": rankdir}, ) if enable_graph_title: viz.attr( label='<<FONT POINT-SIZE="' + str(2 * int(font_size)) + '">' + graph_title + "</FONT>>", labelloc="top", ) # first, remove edges in diagram that exceeds the maximum number of edges # in the diagram dfg_key_value_list = [] for edge in dfg: dfg_key_value_list.append([edge, dfg[edge]]) # more fine grained sorting to avoid that edges that are below the threshold are # undeterministically removed dfg_key_value_list = sorted( dfg_key_value_list, key=lambda x: (x[1], x[0][0], x[0][1]), reverse=True, ) dfg_key_value_list = dfg_key_value_list[ 0: min(len(dfg_key_value_list), max_no_of_edges_in_diagram) ] dfg_allowed_keys = [x[0] for x in dfg_key_value_list] dfg_keys = list(dfg.keys()) for edge in dfg_keys: if edge not in dfg_allowed_keys: del dfg[edge] activities_count_int = copy(activities_count) activities_in_dfg = set(activities_count) # assign attributes color if measure == "frequency": activities_color = get_activities_color(activities_count_int) else: activities_color = get_activities_color_serv_time(serv_time) # represent nodes viz.attr("node", shape="box") if len(activities_in_dfg) == 0: activities_to_include = sorted(list(set(activities_count_int))) else: # take unique elements as a list not as a set (in this way, nodes are # added in the same order to the graph) activities_to_include = sorted(list(set(activities_in_dfg))) start_activities_to_include = [ act for act in start_activities if act in activities_to_include ] end_activities_to_include = [ act for act in end_activities if act in activities_to_include ] # calculate edges penwidth ext_dfg = copy(dfg) if start_activities_to_include is not None and start_activities_to_include: for sact in start_activities_to_include: ext_dfg[(constants.DEFAULT_ARTIFICIAL_START_ACTIVITY, sact)] = ( start_activities[sact] ) if end_activities_to_include is not None and end_activities_to_include: for eact in end_activities_to_include: ext_dfg[(eact, constants.DEFAULT_ARTIFICIAL_END_ACTIVITY)] = ( end_activities[eact] ) dfg_values = dfg.values() min_dfg_value = min(dfg_values) max_dfg_value = max(dfg_values) penwidth = assign_penwidth_edges(ext_dfg) dfg_edges = sorted(list(dfg.keys())) if start_activities_to_include and end_activities_to_include: activities_to_include, dfg_edges = sort_dfg_reachability( dfg_edges, start_activities_to_include, end_activities_to_include ) activities_map = {} for act in activities_to_include: if "frequency" in measure and act in activities_count_int: viz.node( str(hash(act)), act + " (" + str(activities_count_int[act]) + ")", style="filled", fillcolor=activities_color[act], fontsize=font_size, ) activities_map[act] = str(hash(act)) elif ( "performance" in measure and act in serv_time and serv_time[act] >= 0 ): viz.node( str(hash(act)), act + " (" + human_readable_stat(serv_time[act]) + ")", fontsize=font_size, style="filled", fillcolor=activities_color[act], ) activities_map[act] = str(hash(act)) else: viz.node(str(hash(act)), act, fontsize=font_size) activities_map[act] = str(hash(act)) # represent edges for edge in dfg_edges: if "frequency" in measure or "cost" in measure: label = str(dfg[edge]) else: label = human_readable_stat(dfg[edge]) color = None if "performance" in measure: color = value_to_color(dfg[edge], min_dfg_value, max_dfg_value) viz.edge( str(hash(edge[0])), str(hash(edge[1])), label=label, penwidth=str(penwidth[edge]), fontsize=font_size, color=color, ) if start_activities_to_include: viz.node("@@startnode", "<&#9679;>", shape="circle", fontsize="34") for act in start_activities_to_include: label = ( str(start_activities[act]) if isinstance(start_activities, dict) and measure == "frequency" else "" ) viz.edge( "@@startnode", activities_map[act], label=label, fontsize=font_size, penwidth=str( penwidth[ (constants.DEFAULT_ARTIFICIAL_START_ACTIVITY, act) ] ), ) if end_activities_to_include: # <&#9632;> viz.node("@@endnode", "<&#9632;>", shape="doublecircle", fontsize="32") for act in end_activities_to_include: label = ( str(end_activities[act]) if isinstance(end_activities, dict) and measure == "frequency" else "" ) viz.edge( activities_map[act], "@@endnode", label=label, fontsize=font_size, penwidth=str( penwidth[(act, constants.DEFAULT_ARTIFICIAL_END_ACTIVITY)] ), ) viz.attr(overlap="false") viz.attr(fontsize="11") viz.format = image_format.replace("html", "plain-ext") return viz