Getting started
Installing the library
To use the Beamline framework in your Java Maven project it is necessary to include, in the pom.xml
file, the package repository:
<repositories>
<repository>
<id>jitpack.io</id>
<url>https://jitpack.io</url>
</repository>
</repositories>
<dependency>
<groupId>com.github.beamline</groupId>
<artifactId>framework</artifactId>
<version>x.y.z</version>
</dependency>
Hello world stream mining
The following code represents a minimum running example that, once implemented in the main
method of a Java class should provide some basic understanding of the concepts:
In step 1 the stream source is configured and, in this specific case, the stream is defined as coming from a static IEEE XES file. In step 2, an hypothetical miner is created and configured, using custom methods (such as the setMinDependency
method). Step 3 consists of the definition of the chain of operations to be performed on each event of the stream. In this case, after the source is connected (addSource
), we inform Flink that events can be distributed but all those that belong to the same process should be treated together (keyBy
); then the events are flatMap
ped - meaning that not all events will result in a mining result - by the miner; and finally a sink is connected to save the SVG map to file (addSink
). In step 4, the defined pipeline is finally executed.
Basic concepts
In this section the basic concepts of the Beamline framework are presented.
Streaming dataflow
Each application based on Apache Flink relies on the concept of streaming dataflow. A streaming dataflow consists of the basic transformations applied to each event, from its origin (called source) until the end (called sink). In between, different operators can be chained together in order to transform the data according to the requirements. Once this pipeline of operations is defined, it can be deployed and Apache Flink will take care of the actual execution, including parallelizing possible operations and distributing the data across the network.
Events
While Apache Flink can be designed to transmit any type of event, the Beamline framework comes with its own definition of event, called BEvent
. Here some of the corresponding methods are highlighted:
Essentially, a Beamline event, consists of 3 maps for attributes referring to the process, to the trace, and to the event itself. While it's possible to set all the attributes individually, some convenience methods are proposed as well, such as getTraceName
which returns the name of the trace (i.e., the case id). Internally, a BEvent
stores the basic information using as attribute names the same provided by the standard extension of OpenXES. Additionally, setters for attributes defined in the context of OpenXES are provided too, thus providing some level of interoperability between the platforms.
Comparison with OpenXES
While the usage of OpenXES has been considered, it has been decided that it is better to have a proper definition of event which embeds all information. This is due to the fact that in streaming processing each event is an atomic independent unit, i.e., it is not really possible to have collections of traces or collections of events part of the same happening.
Sources
In the context of Beamline it is possible to define sources to create any possible type of event. The framework comes with some sources already defined for the generation of BEvent
s. The base class of all sources is called BeamlineAbstractSource
which implements a RichSourceFunction
. In Apache Flink, a "rich" function is a function which can have access to the distributed state and thus become stateful.
Sources already implemented are XesLogSource
, MQTTXesSource
, CSVLogSource
, and StringTestSource
. A XesLogSource
creates a source from a static log (useful for testing purposes). An MQTTXesSource
generates an source from an MQTT-XES stream. CSVLogSource
is a source which reads events from a text file, and StringTestSource
allows the definition of simple log directly in its constructor (useful for testing purposes).
The class diagram of the observable sources available in Beamline Framework is reported below:
In order to use any source, it is possible to provide it to the addSource
method:
Details on XesLogSource
Emits all events from an XES event log. Example usage:
Or, alternatively, providing directly the path to the log file:Details on CSVLogSource
Emits all events from a CSV file, column numbers for case id and activity name must be provided in the constructor. Example usage:
Additional configuration parameters can be provided, like the separator:Details on MQTTXesSource
Forwards all events on an MQTT broker respecting the MQTT-XES protocol. Example usage:
Details on StringTestSource
Source that considers each trace as a string provided in the constructor and each event as one character of the string. Example usage:
This source is going to emit 7 events as part of 2 traces.Filters
The filter operators, in Apache Flink, do not change the type of stream, but filters the events so that only those passing a predicate test can pass. In Beamline there are some filters already implemented that can be used as follows:
In line 5 a filter is specified so that only events referring to activities A
, B
, and C
are maintained (while all others are discarded).
Filters can operate on event attributes or trace attributes and the following are currently available:
Details on RetainOnEventAttributeEqualityFilter
Retains events based on the equality of an event attribute. Example:
Details on ExcludeOnEventAttributeEqualityFilter
Exclude events based on the equality of an event attribute.
Details on RetainOnCaseAttributeEqualityFilter
Retains events based on the equality of a trace attribute.
Details on ExcludeOnCaseAttributeEqualityFilter
Excludes events based on the equality of a trace attribute.
Details on RetainActivitiesFilter
Retains activities base on their name (concept:name
).
Details on ExcludeActivitiesFilter
Excludes activities base on their name (concept:name
).
Please note that filters can be chained together in order to achieve the desired result.
Mining algorithms
A mining algorithm is a flatMap
er consuming events generated from a source. All mining algorithms must extend the abstract class StreamMiningAlgorithm
. This class is structured as:
The generic types T
refers to the type of the generated output (i.e., the result of the mining algorithm). The only abstract method that needs to be implemented by a mining algorithm is ingest(BEvent event) : K
which receives an event as actual parameter and returns the result of the ingestion of the event as value or the special value null
. If null
is returned, nothing will be propagated down to the pipeline, for example, it might not be interesting to mine a process for each event observed, but maybe every 100 events (and thus the reason for having a flatMap
). The other method offered is getProcessedEvents() : long
that returns the number of events processed up to now.
Since a StreamMiningAlgorithm
is a "rich" function, it is possible to have access to the status information. Additionally, since this operator might be distributed, it is necessary to apply it on a keyed stream. A key can be used to split the stream into independent "branches" that can be processed in parallel by different instances of the operators occurring afterwards. It is therefore extremely important to choose wisely how to key a stream. Instances of the same operator that are applied on different "branches" (obtained by keying the stream) cannot communicate between each other. Examples of keys in different contexts:
- If the goal is to perform control-flow discovery, probably it is necessary to key the stream based on the process name (using
keyBy(BEvent::getProcessName)
): all events that belong to the same process should be considered by the same instance of the mining algorithm to extract the same process; - If the goal is to perform conformance checking, probably it is enough to key the stream based on the process instance (a.k.a., trace name or case id; using
keyBy(BEvent::getTraceName)
): in a streaming context, each trace is independent from the others with respect to the goal of calculating their conformance, and hence there is no need to share information regarding the whole process.
In the core of the Beamline library there is only one mining algorithm implemented (though other are available as additional dependencies):
Details on InfiniteSizeDirectlyFollowsMapper
An algorithm that transforms each pair of consequent event appearing in the same case as a directly follows operator (generating an object with type DirectlyFollowsRelation
). This mapper is called infinite because it's memory footprint will grow as the case ids grow. The mapper produces results as DirectlyFollowsRelation
s.
An example of how the algorithm can be used is the following:
BeamlineAbstractSource source = ...
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env
.addSource(source)
.keyBy(BEvent::getProcessName)
.flatMap(new InfiniteSizeDirectlyFollowsMapper())
.addSink(new SinkFunction<DirectlyFollowsRelation>() {
public void invoke(ProcessMap value, Context context) throws Exception {
System.out.println(value.getFrom() + " -> " + value.getTo());
};
});
env.execute();
Responses
Responses are produced by miners as events are processed. Currently, Beamline supports an empty Response
class which might be extended to custom behavior as well as a Graphviz graphical representation in a GraphvizResponse
abstract class and some others. On all Response
objects it is possible to invoke the getProcessedEvents()
method, which indicates how many events that response has processed. Hence this is the hierarchy of results:
An example of a way to consume these results is reported in the following code:
In this code, we assume the existence of a miner called DiscoveryMiner
which produces output as an object with (sub)type GraphvizResponse
.
Citation
Please, cite this work as:
- Andrea Burattin. "Beamline: A comprehensive toolkit for research and development of streaming process mining". In Software Impacts, vol. 17 (2023).