Demo entry 6658022

as

   

Submitted by anonymous on Nov 05, 2017 at 21:34
Language: Java. Code size: 1.4 kB.

public class AggregatorTopologySupplier extends TopologySupplier {

	private long windowSize;

	public AggregatorTopologySupplier(long windowSize) {
		this.windowSize = windowSize;
	}

	public TopologyBuilder get() {
		final SensorAggregationKeySerde aggregationKeySerde = new SensorAggregationKeySerde();
		final SensorMetricSerde metricSerde = new SensorMetricSerde();
		final SensorRecordSerde recordSerde = new SensorRecordSerde();

		KStreamBuilder builder = new KStreamBuilder();
		KStream<String, SensorRecord> input = builder.stream(Constants.SENSOR_RECORDS_TOPIC);

		KGroupedStream<SensorAggregationKey, SensorRecord> grouped = input.groupBy(
				(key, value) -> new SensorAggregationKey(value.getOwnerId(), value.getType()), aggregationKeySerde,
				recordSerde);

		KTable<Windowed<SensorAggregationKey>, SensorMetric> aggregation = grouped.aggregate(SensorMetric::new,
				(key, value, metric) -> metric.update(value), TimeWindows.of(windowSize), metricSerde,
				Constants.SENSOR_AGGREGATIONS_STORE);

		aggregation.foreach((windowedKey, metric) -> {
			WindowedMetric windowedMetric = new WindowedMetric(windowedKey.key(), metric, windowSize,
					windowedKey.window().start());
			// TODO: Handle failures here to avoid losing data.
			UnirestWrapper.post(Constants.API_ENDPOINT_MONITOR).body(windowedMetric).asBinaryAsync();
		});

		return builder;
	}
}

This snippet took 0.01 seconds to highlight.

Back to the Entry List or Home.

Delete this entry (admin only).