Source code for modules.logProcessing

from datetime import datetime
import pandas
from multiprocessing.queues import Queue
import os
import utils.utils

try:
    from pm4py.objects.conversion.log import factory as conversion_factory
    from pm4py.objects.log.importer.xes import factory as xes_importer
    from pm4py.objects.log.exporter.xes import factory as xes_exporter
except ImportError as e:
    print("[PROCESS MINING] Process mining analysis has been disabled because 'pm4py' module is not installed."
          "See https://github.com/bpm-diag/smartRPA#1-pm4py")
    print(e)


[docs]def handle_log(status_queue: Queue, file_extension: str, filename: str, filepath: list, save_path: str, RPA_log_path: str): """ Process event log. For each log: * import log into pandas dataframe * rename columns to match XES standard (concept:name, time:timestamp, org:resource) * generate caseIDs from timestamp * insert case:creator and lifecycle:transition columns Then all the processed event logs are merged into one. A dataframe is created from the merged event logs and will be used in the rest of the process. Dataframe is also exported in XES. :param status_queue: queue to print values in GUI :param file_extension: extension of input event log (either CSV or XES) :param filename: name of input log :param filepath: list of paths of input logs :param save_path: path where to save log :param RPA_log_path: path of RPA folder :return: processed event log """ if file_extension == ".csv": def createCaseID(ts): try: # caseID = datetime.strptime(ts, "%Y-%m-%dT%H:%M:%S.%f").strftime('%m%d%H%M%S%f') # [:-3] caseID = datetime.fromisoformat(ts).strftime('%m%d%H%M%S%f') return caseID except Exception: caseID = datetime.strptime( ts, "%Y-%m-%d %H:%M:%S:%f").strftime('%m%d%H%M%S%f') # [:-3] return caseID # combine multiple csv into one and then export it to xes csv_to_combine = list() for i, csv_path in enumerate(filepath): # load csv in pandas dataframe, # rename columns to match xes standard, # remove rows that don't have timestamp # replace null values with empty string # sort by timestamp try: df = pandas \ .read_csv(csv_path, encoding='utf-8-sig') \ .rename(columns={'event_type': 'concept:name', 'timestamp': 'time:timestamp', 'user': 'org:resource'}) \ .dropna(subset=["time:timestamp"]) \ .fillna('') \ .sort_values(by='time:timestamp') except pandas.errors.ParserError: df = pandas \ .read_csv(csv_path, encoding='utf-8-sig', sep=';') \ .rename(columns={'event_type': 'concept:name', 'timestamp': 'time:timestamp', 'user': 'org:resource'}) \ .dropna(subset=["time:timestamp"]) \ .fillna('') \ .sort_values(by='time:timestamp') # Each csv should have a separate case ID, so I insert a column to the left of each csv and assign # number i. When I convert the combined csv to xes, all the rows with the same number will belong to a # single trace, so I will have i traces. # convert timestamp to ISO format # try: # df['time:timestamp'] = df['time:timestamp'] \ # .apply((lambda ts: datetime.strptime(ts, "%Y-%m-%d %H:%M:%S:%f").isoformat())) # except ValueError: # pass try: # insert this column to create a unique trace for each csv df.insert(0, 'case:concept:name', createCaseID(df['time:timestamp'][0])) except ValueError: # column already present, replace case id values so they are sequential pass try: # insert this column to create a unique trace for each csv df.insert(1, 'case:creator', 'SmartRPA by marco2012') except ValueError: # column already present pass try: df.insert(2, 'lifecycle:transition', 'complete') except ValueError: # column already present pass csv_to_combine.append(df) # dataframe of combined csv, sorted by timestamp combined_csv = pandas.concat(csv_to_combine) # remove rows containing path of temporary files combined_csv = combined_csv[~combined_csv['event_src_path'].str.contains( '~.*\.tmp|\.tmp.*~')] # convert case id to string # combined_csv['case:concept:name'] = combined_csv['case:concept:name'].astype(str) # insert index for each row # combined_csv.insert(0, 'row_index', range(0, len(combined_csv))) # dataframe = combined_csv # calculate csv path combined_csv_path = os.path.join( RPA_log_path, f'{filename}_combined.csv') # save dataframe as csv combined_csv.to_csv(combined_csv_path, index=False, encoding='utf-8-sig') # convert csv to xes log = conversion_factory.apply(combined_csv) # sort by timestamp # log = sorting.sort_timestamp(log) # convert csv to xes xes_path = os.path.join( save_path, utils.utils.EVENT_LOG_FOLDER, f'{filename}.xes') xes_exporter.export_log(log, xes_path) # timestamp in xes file must have attribute date, not string utils.utils.fixTimestampFieldXES(xes_path) status_queue.put(f"[PROCESS MINING] Working directory is {save_path}") status_queue.put(f"[PROCESS MINING] Generated XES file") return combined_csv, log elif file_extension == ".xes": log = xes_importer.import_log(filepath) return None, log else: status_queue.put( "[PROCESS_MINING] Input file must be either .csv or .xes") return False