Skip to content

Integration with other Libraries

River

River (https://riverml.xyz/) is a library to build online machine learning models. Such models operate on data streams. River includes several online machine learning algorithms that can be used for several tasks, including classification, regression, anomaly detection, time series forecasting, etc. The ideology behind River is to be a generic machine learning which allows to perform these tasks in a streaming manner. Indeed, many batch machine learning algorithms have online equivalents. Note that River also supports some more basic tasks. For instance, you might just want to calculate a running average of a data stream.

It is possible to integrate pyBeamline's result into River to leverage its ML capabilities. For example, let's say we want to use concept drift detection using the ADWIN algorithm. In particular, we are interested in computing if the frequency of the directly follows relation BC changes over time. To accomplish this task, let's first build a log where we artificially inject two of such drifts:

import random

log_original = ["ABCD"]*10000 + ["ACBD"]*500
random.shuffle(log_original)

log_after_drift = ["ABCD"]*500 + ["ACBD"]*10000
random.shuffle(log_after_drift)

log_with_drift = log_source(log_original + log_after_drift + log_original)
In this case, we built two logs (log_original and log_after_drift) which include the same process variants but that differ in the number of occurrences. Finally, we construct our pyBeamline log source log_with_drift by concatenating log_original + log_after_drift + log_original.

After that we can use the capabilities of pyBeamline and reactivex to construct a pipeline that produce a sequence of frequencies corresponding to the frequency of directly follows relation BC in window with length 40 (which is chosen as all our traces have length 4). Also note that we leverage the fact that in all our events when B and C appear they are always in the same trace (because of how log_source generates the observable). We will later define a function check_for_drift:

from reactivex import operators as ops

log_with_drift.pipe(
  RxOperator(ops.buffer_with_count(40)),
  RxOperator(ops.flat_map(lambda events: reactivex.from_iterable(events).pipe(
      ops.pairwise(),
      ops.filter(lambda x: x[0].get_trace_name() == x[1].get_trace_name() and x[0].get_event_name() == "B" and x[1].get_event_name() == "C"),
      ops.count()
      )
  )),
).sink(print_sink())

After this we can define our function for drift detection and collection of points and drift indexes using:

import reactivex

from pybeamline.stream.rx_operator import RxOperator
from pybeamline.stream.base_sink import BaseSink
from typing import Optional, List
from pybeamline.stream.base_map import BaseMap
from river import drift
from reactivex import operators as ops

class CheckForDrift(BaseMap[int, int]):
  def __init__(self):
    self.drift_detector = drift.ADWIN()
    self.drifts = []
    self.index = 0

  def transform(self, value: int) -> Optional[List[int]]:
    self.drift_detector.update(value)
    self.index += 1
    if self.drift_detector.drift_detected:
      self.drifts.append(self.index)
    return [value]

class CollectSink(BaseSink[int]):
  def __init__(self):
    self.data = []

  def consume(self, item: int) -> None:
    self.data.append(item)


drift_detector = CheckForDrift()
collector = CollectSink()

log_with_drift.pipe(RxOperator(ops.buffer_with_count(40)),
  RxOperator(ops.flat_map(lambda events: reactivex.from_iterable(events).pipe(
      ops.pairwise(),
      ops.filter(lambda x: x[0].get_trace_name() == x[1].get_trace_name() and x[0].get_event_name() == "B" and x[1].get_event_name() == "C"),
      ops.count()
      )
  )),
  drift_detector
).sink(collector)
With this class available, CheckForDrift can now be piped to the previous computation. Plotting the frequencies and the concept drifts will result in the following:

For a complete working example, see https://github.com/beamline/pybeamline/blob/master/tutorial.ipynb.