'''
PM4Py – A Process Mining Library for Python
Copyright (C) 2024 Process Intelligence Solutions UG (haftungsbeschränkt)
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see this software project's root or
visit <https://www.gnu.org/licenses/>.
Website: https://processintelligence.solutions
Contact: info@processintelligence.solutions
'''
from enum import Enum
from typing import Optional, Dict, Any
from pm4py.objects.ocel import constants
from pm4py.objects.ocel.obj import OCEL
from pm4py.util import exec_utils, pandas_utils
import pandas as pd
from pm4py.objects.ocel.util import ocel_consistency
from pm4py.objects.ocel.util import filtering_utils
from pm4py.objects.ocel.validation import ocel20_rel_validation
from pm4py.util import constants as pm4_constants
from pm4py.objects.log.util import dataframe_utils
import warnings
[docs]
class Parameters(Enum):
EVENT_ID = constants.PARAM_EVENT_ID
EVENT_ACTIVITY = constants.PARAM_EVENT_ACTIVITY
EVENT_TIMESTAMP = constants.PARAM_EVENT_TIMESTAMP
OBJECT_ID = constants.PARAM_OBJECT_ID
OBJECT_TYPE = constants.PARAM_OBJECT_TYPE
INTERNAL_INDEX = constants.PARAM_INTERNAL_INDEX
QUALIFIER = constants.PARAM_QUALIFIER
CHANGED_FIELD = constants.PARAM_CHNGD_FIELD
CUMCOUNT = "cumcount"
VALIDATION = "validation"
EXCEPT_IF_INVALID = "except_if_invalid"
[docs]
def apply(file_path: str, parameters: Optional[Dict[Any, Any]] = None):
if parameters is None:
parameters = {}
import sqlite3
validation = exec_utils.get_param_value(
Parameters.VALIDATION, parameters, True
)
except_if_invalid = exec_utils.get_param_value(
Parameters.EXCEPT_IF_INVALID, parameters, False
)
event_id = exec_utils.get_param_value(
Parameters.EVENT_ID, parameters, constants.DEFAULT_EVENT_ID
)
event_activity = exec_utils.get_param_value(
Parameters.EVENT_ACTIVITY, parameters, constants.DEFAULT_EVENT_ACTIVITY
)
event_timestamp = exec_utils.get_param_value(
Parameters.EVENT_TIMESTAMP,
parameters,
constants.DEFAULT_EVENT_TIMESTAMP,
)
object_id = exec_utils.get_param_value(
Parameters.OBJECT_ID, parameters, constants.DEFAULT_OBJECT_ID
)
object_type = exec_utils.get_param_value(
Parameters.OBJECT_TYPE, parameters, constants.DEFAULT_OBJECT_TYPE
)
internal_index = exec_utils.get_param_value(
Parameters.INTERNAL_INDEX, parameters, constants.DEFAULT_INTERNAL_INDEX
)
qualifier_field = exec_utils.get_param_value(
Parameters.QUALIFIER, parameters, constants.DEFAULT_QUALIFIER
)
changed_field = exec_utils.get_param_value(
Parameters.CHANGED_FIELD, parameters, constants.DEFAULT_CHNGD_FIELD
)
cumcount_field = exec_utils.get_param_value(
Parameters.CUMCOUNT, parameters, "@@cumcount"
)
if validation:
satisfied, unsatisfied = ocel20_rel_validation.apply(file_path)
if unsatisfied:
if pm4_constants.SHOW_INTERNAL_WARNINGS:
warnings.warn(
"There are unsatisfied OCEL 2.0 constraints in the given relational database: " +
str(unsatisfied))
if except_if_invalid:
raise Exception("OCEL 2.0 validation failed.")
conn = sqlite3.connect(file_path)
EVENTS = pd.read_sql("SELECT * FROM event", conn)
OBJECTS = pd.read_sql("SELECT * FROM object", conn)
etypes = sorted(pandas_utils.format_unique(EVENTS["ocel_type"].unique()))
otypes = sorted(pandas_utils.format_unique(OBJECTS["ocel_type"].unique()))
EVENTS = EVENTS.to_dict("records")
OBJECTS = OBJECTS.to_dict("records")
events_id_type = {x["ocel_id"]: x["ocel_type"] for x in EVENTS}
objects_id_type = {x["ocel_id"]: x["ocel_type"] for x in OBJECTS}
EVENT_CORR_TYPE = pd.read_sql("SELECT * FROM event_map_type", conn)
OBJECT_CORR_TYPE = pd.read_sql("SELECT * FROM object_map_type", conn)
EVENT_CORR_TYPE = EVENT_CORR_TYPE.to_dict("records")
OBJECT_CORR_TYPE = OBJECT_CORR_TYPE.to_dict("records")
events_type_map = {
x["ocel_type"]: x["ocel_type_map"] for x in EVENT_CORR_TYPE
}
objects_type_map = {
x["ocel_type"]: x["ocel_type_map"] for x in OBJECT_CORR_TYPE
}
event_types_coll = []
object_types_coll = []
for act in etypes:
act_red = events_type_map[act]
df = pd.read_sql("SELECT * FROM event_" + act_red, conn)
df = df.rename(
columns={"ocel_id": event_id, "ocel_time": event_timestamp}
)
event_types_coll.append(df)
for ot in otypes:
ot_red = objects_type_map[ot]
df = pd.read_sql("SELECT * FROM object_" + ot_red, conn)
df = df.rename(
columns={"ocel_id": object_id, "ocel_time": event_timestamp}
)
object_types_coll.append(df)
event_types_coll = pandas_utils.concat(event_types_coll)
event_types_coll[event_activity] = event_types_coll[event_id].map(
events_id_type
)
event_types_coll = dataframe_utils.convert_timestamp_columns_in_df(
event_types_coll,
timest_format=pm4_constants.DEFAULT_TIMESTAMP_PARSE_FORMAT,
timest_columns=[event_timestamp],
)
object_types_coll = pandas_utils.concat(object_types_coll)
object_types_coll[object_type] = object_types_coll[object_id].map(
objects_id_type
)
object_types_coll = object_types_coll.rename(
columns={"ocel_changed_field": changed_field}
)
events_timestamp = event_types_coll[[event_id, event_timestamp]].to_dict(
"records"
)
events_timestamp = {
x[event_id]: x[event_timestamp] for x in events_timestamp
}
object_types_coll[cumcount_field] = object_types_coll.groupby(
object_id
).cumcount()
if changed_field in object_types_coll:
objects = object_types_coll[object_types_coll[changed_field].isna()]
object_changes = object_types_coll[
~object_types_coll[changed_field].isna()
]
if len(objects) == 0:
objects = object_types_coll[object_types_coll[cumcount_field] == 0]
object_changes = object_types_coll[
object_types_coll[cumcount_field] > 0
]
if len(object_changes) == 0:
object_changes = None
del objects[changed_field]
else:
objects = object_types_coll
object_changes = None
if event_timestamp in objects:
del objects[event_timestamp]
del objects[cumcount_field]
E2O = pd.read_sql("SELECT * FROM event_object", conn)
E2O = E2O.rename(
columns={
"ocel_event_id": event_id,
"ocel_object_id": object_id,
"ocel_qualifier": qualifier_field,
}
)
E2O[event_activity] = E2O[event_id].map(events_id_type)
E2O[event_timestamp] = E2O[event_id].map(events_timestamp)
E2O[object_type] = E2O[object_id].map(objects_id_type)
O2O = pd.read_sql("SELECT * FROM object_object", conn)
O2O = O2O.rename(
columns={
"ocel_source_id": object_id,
"ocel_target_id": object_id + "_2",
"ocel_qualifier": qualifier_field,
}
)
if len(O2O) == 0:
O2O = None
conn.close()
event_types_coll[internal_index] = event_types_coll.index
E2O[internal_index] = E2O.index
event_types_coll = event_types_coll.sort_values(
[event_timestamp, internal_index]
)
E2O = E2O.sort_values([event_timestamp, internal_index])
del event_types_coll[internal_index]
del E2O[internal_index]
if object_changes is not None:
object_changes = dataframe_utils.convert_timestamp_columns_in_df(
object_changes,
timest_format=pm4_constants.DEFAULT_TIMESTAMP_PARSE_FORMAT,
timest_columns=[event_timestamp],
)
object_changes[internal_index] = object_changes.index
object_changes = object_changes.sort_values(
[event_timestamp, internal_index]
)
del object_changes[internal_index]
ocel = OCEL(
events=event_types_coll,
objects=objects,
relations=E2O,
object_changes=object_changes,
o2o=O2O,
parameters=parameters,
)
ocel = ocel_consistency.apply(ocel, parameters=parameters)
ocel = filtering_utils.propagate_relations_filtering(
ocel, parameters=parameters
)
return ocel