Wikipedia edits
All edits actions happening on Wikipedia are recorded and available as a stream of data (see https://wikitech.wikimedia.org/wiki/Event_Platform/EventStreams for further details). A possible way of process mine the stream of edits happening is by considering the page being edited as the instance of the editing process and the edit "action" as the actual activity name.
To achieve this goal we can write a new BeamlineAbstractSource
that consumes the stream of edits and produces a stream of BEvent
s that can then be forwarded to one of the miners. So we can first define our source as well as the set of websites we want to filter (in this case we will focus on edits happening on the English version of Wikipedia, i.e., enwiki
):
After then we can write the code to transform the JSON produced by the Wikipedia stream into a stream of XTrace
s.
Client client = ClientBuilder.newClient();
WebTarget target = client.target("https://stream.wikimedia.org/v2/stream/recentchange");
SseEventSource source = SseEventSource.target(target).reconnectingEvery(5, TimeUnit.SECONDS).build();
source.register(new Consumer<InboundSseEvent>() {
@Override
public void accept(InboundSseEvent t) {
String data = t.readData();
if (data != null) {
JSONObject obj = new JSONObject(data);
String processName = obj.getString("wiki");
String caseId = obj.getString("title");
String activityName = obj.getString("type");
if (processesToStream.contains(processName)) {
// prepare the actual event
try {
buffer.add(BEvent.create(processName, caseId, activityName));
} catch (EventException e) {
e.printStackTrace();
}
}
}
}
});
source.open();
This code can be wrapped in a thread that executes all the time, and stores each event in a buffer for further dispatching:
public class WikipediaEditSource extends BeamlineAbstractSource {
private static final long serialVersionUID = 608025607423103621L;
private static List<String> processesToStream = Arrays.asList("enwiki");
public void run(SourceContext<BEvent> ctx) throws Exception {
Queue<BEvent> buffer = new LinkedList<>();
new Thread(new Runnable() {
@Override
public void run() {
// code from previous listing
// ...
}
}).start();
while(isRunning()) {
while (isRunning() && buffer.isEmpty()) {
Thread.sleep(100l);
}
if (isRunning()) {
synchronized (ctx.getCheckpointLock()) {
BEvent e = buffer.poll();
ctx.collect(e);
}
}
}
}
}
A simple consumer, in this case the Trivial discovery miner, can then be attached to the source with:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env
.addSource(new WikipediaEditSource())
.keyBy(BEvent::getProcessName)
.flatMap(new DirectlyFollowsDependencyDiscoveryMiner().setModelRefreshRate(10).setMinDependency(0))
.addSink(new SinkFunction<ProcessMap>(){
public void invoke(ProcessMap value, Context context) throws Exception {
value.generateDot().exportToSvg(new File("src/main/resources/output/output.svg"));
};
});
env.execute();
After running the system for about a couple of minutes, the following map was produced:
The complete code of this example is available in the GitHub repository https://github.com/beamline/examples/tree/master/src/main/java/beamline/examples/wikipedia.