In [None]:
# Process Mining Assignment with PM4Py
# Author: Xixi Lu
# Course: PhD Open at University of Warsaw
# Date: April 25th, 2025

# Welcome!

In this assignment, you will learn how to work with **PM4Py**, a leading Python library for process mining.
The library is publicly available on https://github.com/process-intelligence-solutions/pm4py

You will apply key concepts you learned during the OpenPhD lectures such as:

- Event Logs
- Process Discovery
- Directly-Follows Graphs (DFG)
- Inductive Miner
- Filters
- Your Own Discovery Algorithm (!!)


# Part 1: Setup and Imports

In [None]:
# Install the pm4py library if you have not install it before.
# Please use the version 2.7.x.

!pip install pm4py==2.7.15

In [None]:
# Install graphviz if you have not install graphviz before.
# The graphviz library will be used to visualize process models.

# https://pypi.python.org/pypi/pydot
!apt-get -qq install -y graphviz && pip install pydot

In [None]:
# Check version 2.7.15
import pm4py
print(pm4py.__version__)

In [None]:
# For importing logs
from pm4py.objects.log.importer.xes import importer as xes_importer
from pm4py.objects.log.util import dataframe_utils
from pm4py.objects.conversion.log import converter as log_converter
from pm4py.util import constants

# For DFG miner
from pm4py.algo.discovery.dfg import algorithm as dfg_discovery

# For visualizing DFGs
from pm4py.visualization.dfg import visualizer as dfg_visualization
from IPython.display import SVG, display

# For Inductive Miner
from pm4py.algo.discovery.inductive import algorithm as inductive_miner
from pm4py.objects.conversion.process_tree import converter as pt_converter

# For visualizing Petri nets
from pm4py.visualization.petri_net import visualizer as pn_visualizer

# For evaluating the quality of the discovered process model
from pm4py.algo.conformance.tokenreplay import algorithm as token_replay
from pm4py.algo.evaluation.replay_fitness.variants import token_replay as token_replay_fitness
from pm4py.algo.evaluation.replay_fitness import algorithm as replay_fitness_evaluator
from pm4py.algo.evaluation.precision import algorithm as precision_evaluator

# For filtering the logs
from pm4py.algo.filtering.log.variants import variants_filter

# Other helpful imports
import pandas as pd
import numpy as np
import pydot

# Part 2: Load an Event Log


Load the given event log and convert the timestamps to the correct type.

In [None]:
df = pd.read_csv('./Road_Traffic_Fine_Management_Process.csv')

df['Complete Timestamp'] = pd.to_datetime(df['Complete Timestamp'])

log_csv = df.copy()
log_csv.head()

Compute some statistics of the log:
- Number of events,
- Number of cases,
- Number of resources,
- Number of unique activities occurred


In [None]:
print('Number of events:', len(log_csv))
print('Number of cases:', len(pd.unique(log_csv['Case ID'])))
print('Number of resources:',len(pd.unique(log_csv['Resource'])))
print('Number of set of activities:', len(pd.unique(log_csv['Activity'])))
print(min(log_csv['Complete Timestamp']))
print(max(log_csv['Complete Timestamp']))

Convert the csv into an event log:

In [None]:

# Set the correct case column, activity column, timestamp column, and other columns
log_csv = log_csv.rename(columns={
    'Case ID': 'case:concept:name', # 'case:concept:name' is the default case key in pm4py
    'Activity': 'concept:name', # 'concept:name' is the default activity key in pm4py
    'Complete Timestamp': 'time:timestamp', # 'time:timestamp' is the default timestamp key in pm4py
    'Resource': 'org:resource' # 'org:resource' is the default resource key in pm4py

})

# Convert the dataframe into an event log object in pm4py
event_log = log_converter.apply(log_csv, parameters={
    log_converter.Variants.TO_EVENT_LOG.value.Parameters.CASE_ID_KEY: 'case:concept:name'
})

Compute the statistics again, see if the conversion is performed correctly.

In [None]:
# Number of cases
num_cases = len(event_log)

# Number of events
num_events = sum([len(trace) for trace in event_log])

# Number of distinct activities
all_activities = set()
for trace in event_log:
    for event in trace:
        all_activities.add(event['concept:name'])  # 'concept:name' is the default activity key in pm4py
num_activities = len(all_activities)

print(f"Number of cases: {num_cases}")
print(f"Number of events: {num_events}")
print(f"Number of distinct activities: {num_activities}")

# Part 3: Implement a DFG Discovery



## **[TODO] Task:** Implement DFG discovery manually.
Use the event log to compute a dictionary with all directly-follows relations and their frequencies.

In [None]:
# TODO: Implement the DFG Discovery

def compute_dfg(log):
    dfg = dict()
    # TODO:
    return dfg

my_dfg = compute_dfg(event_log)
print(my_dfg)


## **[TODO] Task:** Compare your DFG with PM4Py's built-in DFG discovery:

In [None]:

# TODO

# dfg = ...
print(dfg)

## Visualize the DFG

In [None]:

# Visualize a DFG
gviz = dfg_visualization.apply(dfg, parameters={"format": "svg"})

gviz.graph_attr.update({
    'scale': '0.5',    # scale down the image (smaller number -> smaller SVG)
    'dpi': '50'     # dpi lower = lighter image
})

display(SVG(gviz.pipe(format='svg')))

# Part 4: Inductive Miner

## **[TODO] Task:** Apply PM4Py's **Inductive Miner** and visualize the resulting process model (Petri Net).

In [None]:
# TODO: Discover a process tree using Inductive Miner
process_tree = ...

# Convert the process tree to a Petri Net
net, initial_marking, final_marking = pt_converter.apply(process_tree)


In [None]:
# Visualize the discovered model
gviz = pn_visualizer.apply(net, initial_marking, final_marking)
pn_visualizer.view(gviz)

In [None]:
## Trying out another variant of inductive miner, e.g., the inductive_miner.Variants.IMd
process_tree2 = ...


net2, im2, fm2 = pt_converter.apply(process_tree2)
gviz2 = pn_visualizer.apply(net2, im2, fm2)
pn_visualizer.view(gviz2)

# Part 5: Evaluate the model qualities

In [None]:
def evaluate_model(event_log, net, im, fm):
    # Token-based fitness
    replay_result = token_replay.apply(event_log, net, im, fm)
    token_fitness = token_replay_fitness.evaluate(replay_result)

    # Token-based precision
    token_precision = precision_evaluator.apply(
        event_log, net, im, fm,
        variant=precision_evaluator.Variants.ETCONFORMANCE_TOKEN
    )

    # Alignment-based fitness
    alignment_fitness = replay_fitness_evaluator.apply(
        event_log, net, im, fm,
        variant=replay_fitness_evaluator.Variants.ALIGNMENT_BASED
    )

    # Alignment-based precision
    alignment_precision = precision_evaluator.apply(
        event_log, net, im, fm,
        variant=precision_evaluator.Variants.ALIGN_ETCONFORMANCE
    )

    return {
        "token_fitness": token_fitness,
        "token_precision": token_precision,
        "alignment_fitness": alignment_fitness,
        "alignment_precision": alignment_precision
    }

def print_result(metrics):
    print(f"Token Fitness (avg trace fitness): {metrics['token_fitness'].get('average_trace_fitness', 'N/A'):.4f}")
    print(f"Token Fitness (log_fitness): {metrics['token_fitness'].get('log_fitness', 'N/A'):.4f}")
    print(f"Token Precision: {metrics['token_precision']:.4f}")
    print(f"Alignment Fitness (avg trace fitness): {metrics['alignment_fitness'].get('average_trace_fitness', 'N/A'):.4f}")
    print(f"Alignment Fitness (log_fitness): {metrics['alignment_fitness'].get('log_fitness', 'N/A'):.4f}")
    print(f"Alignment Precision: {metrics['alignment_precision']:.4f}")


res = evaluate_model(event_log, net, initial_marking, final_marking)
print_result(res)

res = evaluate_model(event_log, net2, im2, fm2)
print_result(res)

# Part 6: Filtering


## **[TODO] Task:** Apply a filter to remove infrequent traces before applying process discovery.

Evaluate the model discovered using the filtered log.

Discuss the results. Is the model more precise? Or is it more fitting?

In [None]:
# TODO: Find most frequent variants
filtered_log = ...


In [None]:
## TODO: Rediscover a process model using Inductive Miner on filtered log
process_tree = ...
f_net, f_im, f_fm = pt_converter.apply(process_tree)

# Evaluate model here (token-based + alignment-based) using the original log (i.e., not filtered log)
evaluation = evaluate_model(event_log, f_net, f_im, f_fm)
print_result(evaluation)

## **[TODO] Task:** Apply a filter to remove infrequent activities before applying process discovery.


Evaluate the model discovered using the filtered log.

Discuss the results. Is the model more precise? Or is it more fitting?

# Part 7: Your Own Discovery Algorithm or Log Preprocessing Approach to Improve the Discovered Model (!!)

## **[TODO] Challenge:** Invent and implement a simple custom process discovery algorithm.

You can decide to return a Petri Net (see an [example](https://github.com/process-intelligence-solutions/pm4py/blob/release/examples/petri_manual_generation.py)) or a Process Tree (recommended, see an [example](https://github.com/process-intelligence-solutions/pm4py/blob/release/examples/tree_manual_generation.py))

Other ideas:
- Reproduce or motify Inductive Miner (see [IM source code](https://github.com/process-intelligence-solutions/pm4py/tree/release/pm4py/algo/discovery/inductive))
- Trace model
- Heuristic Miner
- Split Miner

Explain your idea briefly and code it below.


In [None]:

# TODO: OR code your discovery algorithm that returns a process tree
def your_miner_pt(log):
    # An example how you can create a process tree
    # root = ProcessTree(operator=Operator.SEQUENCE)
    # leaf_A = ProcessTree(label="A", parent=root)
    # leaf_B = ProcessTree(label="B", parent=root)

    # root.children.append(leaf_A)
    # root.children.append(leaf_B)
    # _ptree = root
    return _ptree


#TODO: code your discovery algorithm that returns a Petri net
def your_miner_pn(log):
    # An example how you can create a Petri Net
    # _net = PetriNet("test")
    # source = PetriNet.Place("source")
    # sink = PetriNet.Place("sink")
    # t1 = PetriNet.Transition("fire", "fire")
    # _net.add_place(source)
    # _net.add_place(sink)
    # _net.add_transition(t1)
    # _net.add_arc(source, t1)
    # _net.add_arc(t1, sink)

    # _im = Marking()
    # _im.add(source)
    # _fm = Marking()
    # _fm.add(sink)

    return _net, _im, _fm



# SO...

your_ptree = your_miner_pt(event_log)
your_net, your_im, your_fm = pt_converter.apply(your_ptree)

# OR...

your_net, your_im, your_fm = your_miner_pn(event_log)

## **[TODO] TASK:** evaluate your algorithm

Use the function evaluate_model() to evaluate your model.
Discuss your results.

In [None]:

# Evaluate your discovered model
your_result = evaluate_model(event_log, your_net, your_im, your_fm)

print_result(your_result)
