Demo entry 6658022



Submitted by anonymous on Nov 05, 2017 at 21:34
Language: Java. Code size: 1.4 kB.

public class AggregatorTopologySupplier extends TopologySupplier {

	private long windowSize;

	public AggregatorTopologySupplier(long windowSize) {
		this.windowSize = windowSize;

	public TopologyBuilder get() {
		final SensorAggregationKeySerde aggregationKeySerde = new SensorAggregationKeySerde();
		final SensorMetricSerde metricSerde = new SensorMetricSerde();
		final SensorRecordSerde recordSerde = new SensorRecordSerde();

		KStreamBuilder builder = new KStreamBuilder();
		KStream<String, SensorRecord> input =;

		KGroupedStream<SensorAggregationKey, SensorRecord> grouped = input.groupBy(
				(key, value) -> new SensorAggregationKey(value.getOwnerId(), value.getType()), aggregationKeySerde,

		KTable<Windowed<SensorAggregationKey>, SensorMetric> aggregation = grouped.aggregate(SensorMetric::new,
				(key, value, metric) -> metric.update(value), TimeWindows.of(windowSize), metricSerde,

		aggregation.foreach((windowedKey, metric) -> {
			WindowedMetric windowedMetric = new WindowedMetric(windowedKey.key(), metric, windowSize,
			// TODO: Handle failures here to avoid losing data.;

		return builder;

This snippet took 0.01 seconds to highlight.

Back to the Entry List or Home.

Delete this entry (admin only).