'''
PM4Py – A Process Mining Library for Python
Copyright (C) 2024 Process Intelligence Solutions UG (haftungsbeschränkt)
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see this software project's root or
visit <https://www.gnu.org/licenses/>.
Website: https://processintelligence.solutions
Contact: info@processintelligence.solutions
'''
from enum import Enum
from pm4py.objects.log.obj import EventLog, EventStream
import pandas as pd
from typing import Optional, Dict, Any, Collection, Union
from pm4py.util import exec_utils, constants, xes_constants, pandas_utils
from pm4py.objects.ocel.obj import OCEL
from pm4py.objects.ocel import constants as ocel_constants
from pm4py.objects.conversion.log import converter as log_converter
from pm4py.objects.ocel.util import ocel_consistency
from copy import copy
import math
[docs]
class Parameters(Enum):
CASE_ID_KEY = constants.PARAMETER_CONSTANT_CASEID_KEY
CASE_ATTRIBUTE_PREFIX = constants.PARAMETER_KEY_CASE_ATTRIBUTE_PRFIX
ACTIVITY_KEY = constants.PARAMETER_CONSTANT_ACTIVITY_KEY
TIMESTAMP_KEY = constants.PARAMETER_CONSTANT_TIMESTAMP_KEY
TARGET_OBJECT_TYPE = "target_object_type"
TARGET_OBJECT_TYPE_2 = "target_object_type_2"
LEFT_INDEX = "left_index"
RIGHT_INDEX = "right_index"
DIRECTION = "direction"
def __postprocess_stream(list_events):
"""
Postprocess the list of events of the stream in order to make sure
that there are no NaN/NaT values
Parameters
-------------
list_events
List of events
Returns
-------------
list_events
Postprocessed stream
"""
for event in list_events:
event_keys = list(event.keys())
for k in event_keys:
typ_k = type(event[k])
if typ_k is pd._libs.tslibs.nattype.NaTType:
del event[k]
continue
elif (typ_k is float or typ_k is int) and math.isnan(event[k]):
del event[k]
continue
elif event[k] is None:
del event[k]
continue
return list_events
[docs]
def from_traditional_log(
log: EventLog, parameters: Optional[Dict[Any, Any]] = None
) -> OCEL:
"""
Transforms an EventLog to an OCEL
Parameters
-----------------
log
Event log
parameters
Parameters of the algorithm, including:
- Parameters.TARGET_OBJECT_TYPE => the name of the object type to which the cases should be mapped
- Parameters.ACTIVITY_KEY => the attribute to use as activity
- Parameters.TIMESTAMP_KEY => the attribute to use as timestamp
- Parameters.CASE_ID_KEY => the attribute to use as case identifier
Returns
-----------------
ocel
OCEL (equivalent to the provided event log)
"""
if parameters is None:
parameters = {}
log = log_converter.apply(
log, variant=log_converter.Variants.TO_EVENT_LOG, parameters=parameters
)
target_object_type = exec_utils.get_param_value(
Parameters.TARGET_OBJECT_TYPE, parameters, "OTYPE"
)
activity_key = exec_utils.get_param_value(
Parameters.ACTIVITY_KEY, parameters, xes_constants.DEFAULT_NAME_KEY
)
timestamp_key = exec_utils.get_param_value(
Parameters.TIMESTAMP_KEY,
parameters,
xes_constants.DEFAULT_TIMESTAMP_KEY,
)
case_id_key = exec_utils.get_param_value(
Parameters.CASE_ID_KEY, parameters, xes_constants.DEFAULT_TRACEID_KEY
)
events = []
objects = []
relations = []
ev_count = 0
for trace in log:
case_id = trace.attributes[case_id_key]
obj = {
ocel_constants.DEFAULT_OBJECT_ID: case_id,
ocel_constants.DEFAULT_OBJECT_TYPE: target_object_type,
}
for attr in trace.attributes:
if attr != case_id_key:
obj[attr] = trace.attributes[attr]
objects.append(obj)
for ev in trace:
ev_count = ev_count + 1
activity = ev[activity_key]
timestamp = ev[timestamp_key]
eve = {
ocel_constants.DEFAULT_EVENT_ID: str(ev_count),
ocel_constants.DEFAULT_EVENT_ACTIVITY: activity,
ocel_constants.DEFAULT_EVENT_TIMESTAMP: timestamp,
}
for attr in ev:
if attr not in [activity, timestamp]:
eve[attr] = ev[attr]
events.append(eve)
relations.append(
{
ocel_constants.DEFAULT_EVENT_ID: str(ev_count),
ocel_constants.DEFAULT_EVENT_ACTIVITY: activity,
ocel_constants.DEFAULT_EVENT_TIMESTAMP: timestamp,
ocel_constants.DEFAULT_OBJECT_ID: case_id,
ocel_constants.DEFAULT_OBJECT_TYPE: target_object_type,
}
)
events = pandas_utils.instantiate_dataframe(events)
objects = pandas_utils.instantiate_dataframe(objects)
relations = pandas_utils.instantiate_dataframe(relations)
return OCEL(events=events, objects=objects, relations=relations)
def __get_events_dataframe(
df: pd.DataFrame,
activity_key: str,
timestamp_key: str,
case_id_key: str,
case_attribute_prefix: str,
events_prefix="E",
) -> pd.DataFrame:
"""
Internal method to get the events dataframe out of a traditional log stored as Pandas dataframe
"""
columns = {case_id_key}.union(
set(x for x in df.columns if not x.startswith(case_attribute_prefix))
)
columns = list(columns)
df = df[columns]
df = df.rename(
columns={
activity_key: ocel_constants.DEFAULT_EVENT_ACTIVITY,
timestamp_key: ocel_constants.DEFAULT_EVENT_TIMESTAMP,
case_id_key: ocel_constants.DEFAULT_OBJECT_ID,
}
)
df[ocel_constants.DEFAULT_EVENT_ID] = events_prefix + df.index.astype(str)
return df
def __get_objects_dataframe(
df: pd.DataFrame,
case_id_key: str,
case_attribute_prefix: str,
target_object_type: str,
) -> pd.DataFrame:
"""
Internal method to get the objects dataframe out of a traditional log stored as Pandas dataframe
"""
columns = {x for x in df.columns if x.startswith(case_attribute_prefix)}
columns = list(columns)
df = df[columns]
df = df.rename(columns={case_id_key: ocel_constants.DEFAULT_OBJECT_ID})
df = df.groupby(ocel_constants.DEFAULT_OBJECT_ID).first().reset_index()
df[ocel_constants.DEFAULT_OBJECT_TYPE] = target_object_type
return df
def __get_relations_from_events(
events: pd.DataFrame, target_object_type: str
) -> pd.DataFrame:
"""
Internal method to get the relations dataframe out of a traditional log stored as Pandas dataframe
"""
relations = events[
[
ocel_constants.DEFAULT_EVENT_ACTIVITY,
ocel_constants.DEFAULT_EVENT_TIMESTAMP,
ocel_constants.DEFAULT_OBJECT_ID,
ocel_constants.DEFAULT_EVENT_ID,
]
]
relations[ocel_constants.DEFAULT_OBJECT_TYPE] = target_object_type
return relations
[docs]
def from_traditional_pandas(
df: pd.DataFrame, parameters: Optional[Dict[Any, Any]] = None
) -> OCEL:
"""
Transforms a dataframe to an OCEL
Parameters
-----------------
df
Pandas dataframe
parameters
Parameters of the algorithm, including:
- Parameters.TARGET_OBJECT_TYPE => the name of the object type to which the cases should be mapped
- Parameters.ACTIVITY_KEY => the attribute to use as activity
- Parameters.TIMESTAMP_KEY => the attribute to use as timestamp
- Parameters.CASE_ID_KEY => the attribute to use as case identifier
- Parameters.CASE_ATTRIBUTE_PREFIX => the prefix identifying the attributes at the case level
Returns
-----------------
ocel
OCEL (equivalent to the provided event log)
"""
if parameters is None:
parameters = {}
target_object_type = exec_utils.get_param_value(
Parameters.TARGET_OBJECT_TYPE, parameters, "OTYPE"
)
activity_key = exec_utils.get_param_value(
Parameters.ACTIVITY_KEY, parameters, xes_constants.DEFAULT_NAME_KEY
)
timestamp_key = exec_utils.get_param_value(
Parameters.TIMESTAMP_KEY,
parameters,
xes_constants.DEFAULT_TIMESTAMP_KEY,
)
case_id_key = exec_utils.get_param_value(
Parameters.CASE_ID_KEY, parameters, constants.CASE_CONCEPT_NAME
)
case_attribute_prefix = exec_utils.get_param_value(
Parameters.CASE_ATTRIBUTE_PREFIX,
parameters,
constants.CASE_ATTRIBUTE_PREFIX,
)
events = __get_events_dataframe(
df, activity_key, timestamp_key, case_id_key, case_attribute_prefix
)
objects = __get_objects_dataframe(
df, case_id_key, case_attribute_prefix, target_object_type
)
relations = __get_relations_from_events(events, target_object_type)
del events[ocel_constants.DEFAULT_OBJECT_ID]
events = events.sort_values(
[
ocel_constants.DEFAULT_EVENT_TIMESTAMP,
ocel_constants.DEFAULT_EVENT_ID,
]
)
relations = relations.sort_values(
[
ocel_constants.DEFAULT_EVENT_TIMESTAMP,
ocel_constants.DEFAULT_EVENT_ID,
]
)
return OCEL(events=events, objects=objects, relations=relations)
[docs]
def from_interleavings(
df1: pd.DataFrame,
df2: pd.DataFrame,
interleavings: pd.DataFrame,
parameters: Optional[Dict[Any, Any]] = None,
) -> OCEL:
"""
Transforms a couple of dataframes, along with the interleavings between them, to an OCEL
Parameters
-----------------
df1
First of the two dataframes
df2
Second of the two dataframes
interleavings
Interleavings dataframe
parameters
Parameters of the algorithm, including:
- Parameters.ACTIVITY_KEY => the attribute to use as activity
- Parameters.TIMESTAMP_KEY => the attribute to use as timestamp
- Parameters.CASE_ID_KEY => the attribute to use as case identifier
- Parameters.CASE_ATTRIBUTE_PREFIX => the prefix identifying the attributes at the case level
- Parameters.TARGET_OBJECT_TYPE => the name of the object type to which the cases of the first log should be mapped
- Parameters.TARGET_OBJECT_TYPE_2 => the name of the object type to which the cases of the second log should be mapped
- Parameters.LEFT_INDEX => the index column of the events of the first dataframe, in the interleavings dataframe
- Parameters.RIGHT_INDEX => the index column of the events of the second dataframe, in the interleavings
dataframe.
- Parameters.DIRECTION => the direction of the interleavings (LR or RL)
Returns
-----------------
ocel
OCEL (equivalent to the provided event log)
"""
if parameters is None:
parameters = {}
activity_key = exec_utils.get_param_value(
Parameters.ACTIVITY_KEY, parameters, xes_constants.DEFAULT_NAME_KEY
)
timestamp_key = exec_utils.get_param_value(
Parameters.TIMESTAMP_KEY,
parameters,
xes_constants.DEFAULT_TIMESTAMP_KEY,
)
case_id_key = exec_utils.get_param_value(
Parameters.CASE_ID_KEY, parameters, constants.CASE_CONCEPT_NAME
)
case_attribute_prefix = exec_utils.get_param_value(
Parameters.CASE_ATTRIBUTE_PREFIX,
parameters,
constants.CASE_ATTRIBUTE_PREFIX,
)
target_object_type = exec_utils.get_param_value(
Parameters.TARGET_OBJECT_TYPE, parameters, "OTYPE"
)
target_object_type_2 = exec_utils.get_param_value(
Parameters.TARGET_OBJECT_TYPE_2, parameters, "OTYPE2"
)
left_index = exec_utils.get_param_value(
Parameters.LEFT_INDEX, parameters, "@@left_index"
)
right_index = exec_utils.get_param_value(
Parameters.RIGHT_INDEX, parameters, "@@right_index"
)
direction = exec_utils.get_param_value(
Parameters.DIRECTION, parameters, "@@direction"
)
events1 = __get_events_dataframe(
df1,
activity_key,
timestamp_key,
case_id_key,
case_attribute_prefix,
events_prefix="E1_",
)
objects1 = __get_objects_dataframe(
df1, case_id_key, case_attribute_prefix, target_object_type
)
relations1 = __get_relations_from_events(events1, target_object_type)
relations1_minimal = relations1[
{
ocel_constants.DEFAULT_EVENT_ID,
ocel_constants.DEFAULT_OBJECT_ID,
ocel_constants.DEFAULT_OBJECT_TYPE,
}
]
events2 = __get_events_dataframe(
df2,
activity_key,
timestamp_key,
case_id_key,
case_attribute_prefix,
events_prefix="E2_",
)
objects2 = __get_objects_dataframe(
df2, case_id_key, case_attribute_prefix, target_object_type_2
)
relations2 = __get_relations_from_events(events2, target_object_type_2)
relations2_minimal = relations2[
{
ocel_constants.DEFAULT_EVENT_ID,
ocel_constants.DEFAULT_OBJECT_ID,
ocel_constants.DEFAULT_OBJECT_TYPE,
}
]
interleavings[left_index] = "E1_" + interleavings[left_index].astype(
int
).astype(str)
interleavings[right_index] = "E2_" + interleavings[right_index].astype(
int
).astype(str)
interleavings_lr = interleavings[interleavings[direction] == "LR"][
[left_index, right_index]
]
interleavings_rl = interleavings[interleavings[direction] == "RL"][
[left_index, right_index]
]
relations3 = events1.merge(
interleavings_lr,
left_on=ocel_constants.DEFAULT_EVENT_ID,
right_on=left_index,
)
relations3 = relations3.merge(
relations2_minimal,
left_on=right_index,
right_on=ocel_constants.DEFAULT_EVENT_ID,
suffixes=("", "_@#@#RIGHT"),
)
relations3[ocel_constants.DEFAULT_OBJECT_ID] = relations3[
ocel_constants.DEFAULT_OBJECT_ID + "_@#@#RIGHT"
]
relations3[ocel_constants.DEFAULT_OBJECT_TYPE] = target_object_type_2
relations4 = events2.merge(
interleavings_rl,
left_on=ocel_constants.DEFAULT_EVENT_ID,
right_on=right_index,
)
relations4 = relations4.merge(
relations1_minimal,
left_on=left_index,
right_on=ocel_constants.DEFAULT_EVENT_ID,
suffixes=("", "_@#@#LEFT"),
)
relations4[ocel_constants.DEFAULT_OBJECT_ID] = relations4[
ocel_constants.DEFAULT_OBJECT_ID + "_@#@#LEFT"
]
relations4[ocel_constants.DEFAULT_OBJECT_TYPE] = target_object_type
del events1[ocel_constants.DEFAULT_OBJECT_ID]
del events2[ocel_constants.DEFAULT_OBJECT_ID]
events = pandas_utils.concat([events1, events2])
objects = pandas_utils.concat([objects1, objects2])
relations = pandas_utils.concat(
[relations1, relations2, relations3, relations4]
)
events = events.sort_values(
[
ocel_constants.DEFAULT_EVENT_TIMESTAMP,
ocel_constants.DEFAULT_EVENT_ID,
]
)
relations = relations.sort_values(
[
ocel_constants.DEFAULT_EVENT_TIMESTAMP,
ocel_constants.DEFAULT_EVENT_ID,
]
)
return OCEL(events=events, objects=objects, relations=relations)
[docs]
def log_to_ocel_multiple_obj_types(
log_obj: Union[EventLog, EventStream, pd.DataFrame],
activity_column: str,
timestamp_column: str,
obj_types: Collection[str],
obj_separator: str = " AND ",
additional_event_attributes: Optional[Collection[str]] = None,
additional_object_attributes: Optional[Dict[str, Collection[str]]] = None,
) -> OCEL:
"""
Converts an event log to an object-centric event log with one or more than one
object types.
Parameters
---------------
log_obj
Log object
activity_column
Activity column
timestamp_column
Timestamp column
object_types
List of columns to consider as object types
obj_separator
Separator between different objects in the same column
additional_event_attributes
Additional attributes to be considered as event attributes in the OCEL
additional_object_attributes
Additional attributes per object type to be considered as object attributes in the OCEL
(dictionary in which object types are associated to their attributes, i.e.,
{"order": ["quantity", "cost"], "invoice": ["date", "due date"]})
Returns
----------------
ocel
Object-centric event log
"""
log_obj = log_converter.apply(
log_obj, variant=log_converter.Variants.TO_DATA_FRAME
)
if additional_event_attributes is None:
additional_event_attributes = {}
if additional_object_attributes is None:
additional_object_attributes = {}
events = []
objects = []
relations = []
obj_ids = set()
stream = log_obj.to_dict("records")
stream = __postprocess_stream(stream)
for index, eve in enumerate(stream):
ocel_eve = {
ocel_constants.DEFAULT_EVENT_ID: str(index),
ocel_constants.DEFAULT_EVENT_ACTIVITY: eve[activity_column],
ocel_constants.DEFAULT_EVENT_TIMESTAMP: eve[timestamp_column],
}
for attr in additional_event_attributes:
if attr in eve:
ocel_eve[attr] = eve[attr]
events.append(ocel_eve)
for ot in obj_types:
try:
objs = eve[ot].split(obj_separator)
for obj in objs:
if len(obj.strip()) > 0:
if obj not in obj_ids:
obj_ids.add(obj)
obj_instance = {
ocel_constants.DEFAULT_OBJECT_ID: obj,
ocel_constants.DEFAULT_OBJECT_TYPE: ot,
}
if ot in additional_object_attributes:
for objattname in additional_object_attributes[
ot
]:
if objattname in eve:
objattvalue = eve[objattname]
obj_instance[objattname] = objattvalue
objects.append(obj_instance)
rel = copy(ocel_eve)
rel[ocel_constants.DEFAULT_OBJECT_ID] = obj
rel[ocel_constants.DEFAULT_OBJECT_TYPE] = ot
relations.append(rel)
except BaseException:
pass
events = pandas_utils.instantiate_dataframe(events)
objects = pandas_utils.instantiate_dataframe(objects)
relations = pandas_utils.instantiate_dataframe(relations)
relations = relations.drop_duplicates(
subset=[
ocel_constants.DEFAULT_EVENT_ID,
ocel_constants.DEFAULT_OBJECT_ID,
]
)
ocel = OCEL(events=events, objects=objects, relations=relations)
ocel = ocel_consistency.apply(ocel)
return ocel