import os
import tempfile
import uuid
from enum import Enum
from math import log10
from typing import Any, Dict, Optional, Union
import pandas as pd
from pm4py.util import exec_utils, constants
[docs]
class Parameters(Enum):
FORMAT = "format"
NODE_HEIGHT = "node_height"
NODE_WIDTH = "node_width"
EDGE_PENWIDTH = "edge_penwidth"
MAX_VARIANTS = "max_variants"
ALIGNMENT_CRITERIA = "alignment_criteria"
MIN_HORIZONTAL_DISTANCE = "min_horizontal_distance"
MAX_HORIZONTAL_DISTANCE = "max_horizontal_distance"
LAYOUT_EXT_MULTIPLIER = "layout_ext_multiplier"
SHOW_LEGEND = "show_legend"
ENABLE_GRAPH_TITLE = "enable_graph_title"
GRAPH_TITLE = "graph_title"
[docs]
def apply(
variants_df: pd.DataFrame,
parameters: Optional[Dict[Union[str, Parameters], Any]] = None,
):
if parameters is None:
parameters = {}
# Extract parameters
format = exec_utils.get_param_value(Parameters.FORMAT, parameters, "png")
node_height = exec_utils.get_param_value(Parameters.NODE_HEIGHT, parameters, 0.85)
node_width = exec_utils.get_param_value(Parameters.NODE_WIDTH, parameters, 0.85)
edge_penwidth = exec_utils.get_param_value(Parameters.EDGE_PENWIDTH, parameters, 1.0)
max_variants = exec_utils.get_param_value(Parameters.MAX_VARIANTS, parameters, 5)
alignment_criteria = exec_utils.get_param_value(Parameters.ALIGNMENT_CRITERIA, parameters, "start")
min_horizontal_distance = exec_utils.get_param_value(Parameters.MIN_HORIZONTAL_DISTANCE, parameters, 1.5)
max_horizontal_distance = exec_utils.get_param_value(Parameters.MAX_HORIZONTAL_DISTANCE, parameters, 4.5)
layout_ext_multiplier = exec_utils.get_param_value(Parameters.LAYOUT_EXT_MULTIPLIER, parameters, 75)
enable_graph_title = exec_utils.get_param_value(Parameters.ENABLE_GRAPH_TITLE, parameters,
constants.DEFAULT_ENABLE_GRAPH_TITLES)
graph_title = exec_utils.get_param_value(Parameters.GRAPH_TITLE, parameters, "Process Variants Paths and Durations")
# Required column names
variant_column = "@@variant_column"
variant_count = "@@variant_count"
index_column = "@@index_in_trace"
flow_time_column = "@@flow_time"
activity_key = "concept:name"
activity_key_2 = "concept:name_2"
# Sort variants from most frequent to least frequent and pick top N
unique_variants = variants_df[[variant_column, variant_count]].drop_duplicates()
unique_variants = unique_variants.sort_values(variant_count, ascending=False)
top_variants = unique_variants.head(max_variants)[variant_column].tolist()
filtered_df = variants_df[variants_df[variant_column].isin(top_variants)]
# Temporary .gv and output file
output_file_gv = tempfile.NamedTemporaryFile(suffix=".gv")
output_file_gv.close()
output_file_img = tempfile.NamedTemporaryFile(suffix="." + format)
output_file_img.close()
# For distance normalization
max_flow_time = filtered_df[flow_time_column].max()
# Assign each variant a color
variant_colors = {
variant: f"#{hash(str(variant)) % 0xffffff:06x}"
for variant in top_variants
}
# Build the GraphViz lines
lines = ["graph G {"]
if enable_graph_title:
lines.append(
f' label=<<FONT POINT-SIZE="20">{graph_title}</FONT>>;'
' labelloc="top";'
)
lines.append(' layout=neato;')
lines.append(' splines=true;')
# Store the computed positions of each activity per variant
variant_y_pos = {}
variant_node_positions = {}
# Assign y-coordinates to variants
total_variants = len(top_variants)
for i, variant in enumerate(top_variants):
y_coord = (total_variants - i) * layout_ext_multiplier
variant_y_pos[variant] = y_coord
variant_node_positions[variant] = {}
# Calculate positions starting from 0 for all variants
for variant in top_variants:
vdf = filtered_df[filtered_df[variant_column] == variant].sort_values(index_column)
for _, row in vdf.iterrows():
src_activity = row[activity_key]
tgt_activity = row[activity_key_2]
flow_time = row[flow_time_column]
src_idx = row[index_column]
if src_activity not in variant_node_positions[variant]:
if src_idx == 0:
variant_node_positions[variant][src_activity] = 0
else:
continue # Skip if source activity is missing (should not happen with sorted data)
# Convert flow_time to a distance
if flow_time == 0:
distance = min_horizontal_distance
else:
norm_time = min(1.0, log10(1 + flow_time) / log10(1 + max_flow_time))
distance = min_horizontal_distance + norm_time * (max_horizontal_distance - min_horizontal_distance)
x_old = variant_node_positions[variant][src_activity]
x_new = x_old + distance * layout_ext_multiplier
variant_node_positions[variant][tgt_activity] = x_new
# Apply shifts based on alignment criteria
if alignment_criteria == "start":
# No shift needed
pass
elif alignment_criteria == "end":
for variant in top_variants:
# Find the position of the last activity (maximum position)
last_pos = max(variant_node_positions[variant].values())
shift = -last_pos
for activity in variant_node_positions[variant]:
variant_node_positions[variant][activity] += shift
else:
# alignment_criteria is the name of some activity
variants_with_activity = [
variant for variant in top_variants
if alignment_criteria in variant_node_positions[variant]
]
variants_without_activity = [
variant for variant in top_variants
if alignment_criteria not in variant_node_positions[variant]
]
if variants_without_activity:
missing_variants = ", ".join(str(v) for v in variants_without_activity)
raise ValueError(
f"Alignment activity '{alignment_criteria}' not found in variants: {missing_variants}. "
"All variants must contain the alignment activity."
)
for variant in variants_with_activity:
align_pos = variant_node_positions[variant][alignment_criteria]
shift = -align_pos
for activity in variant_node_positions[variant]:
variant_node_positions[variant][activity] += shift
# Create node labels for each variant
for i, variant in enumerate(top_variants):
count = unique_variants[unique_variants[variant_column] == variant][variant_count].iloc[0]
label_text = f"Variant\n{i + 1}\n({count} cases)"
label_node_id = f"label_{uuid.uuid4().hex[:12]}"
lines.append(
f' {label_node_id} [label="{label_text}", shape=none, '
f'fontsize="10pt", pos="-60,{variant_y_pos[variant]}!", fixedsize=true];'
)
# Create actual activity nodes and edges
for variant in top_variants:
y_pos = variant_y_pos[variant]
vdf = filtered_df[filtered_df[variant_column] == variant].sort_values(index_column)
color = variant_colors[variant]
activity_node_ids = {}
# Nodes
for activity, x_pos in variant_node_positions[variant].items():
node_id = f"n{uuid.uuid4().hex[:12]}"
activity_node_ids[activity] = node_id
label = activity.replace(" ", "\\n") # literal backslash-n for Graphviz
lines.append(
f' {node_id} [label="{label}", shape=box, style="filled,rounded", '
f'fillcolor="{color}", width={node_width}, height={node_height}, '
f'pos="{x_pos},{y_pos}!", fontsize="8pt", fixedsize=true];'
)
# Edges
for _, row in vdf.iterrows():
src = row[activity_key]
tgt = row[activity_key_2]
ftime = row[flow_time_column]
if ftime < 60:
label_time = f"{ftime:.1f}s"
elif ftime < 3600:
label_time = f"{ftime / 60:.1f}m"
elif ftime < 86400:
label_time = f"{ftime / 3600:.1f}h"
else:
label_time = f"{ftime / 86400:.1f}d"
src_id = activity_node_ids[src]
tgt_id = activity_node_ids[tgt]
lines.append(
f' {src_id} -- {tgt_id} [label="{label_time}", fontsize="7pt", '
f'color="{color}", penwidth={edge_penwidth}];'
)
lines.append("}")
# Write .gv file
with open(output_file_gv.name, "w") as f:
f.write("\n".join(lines))
# Use neato -n2 to respect exact coordinates
os.system(f'neato -n2 -T{format} "{output_file_gv.name}" > "{output_file_img.name}"')
return output_file_img.name