giovedì 17 novembre 2016

Apache Kafka Streams and JCache with BlazingCache

Developing and setting up a distributed stream processing pipeline is quite simple using Apache Kafka Streams, in fact Kafka Streams runtime automatically scales well without changes in your code.

Sometimes you have to transform your data by adding value taken from external resources, like WebServices, Databases, Paid services, which is going to slow down all of your processing pipeline.

Consider for instance the case of looking up GeoLocation information for several IP address while processing a stream of hits on a web site, for each IP address you have to call an external service and wait for a response.

In this cases cache may help and you would propably start by using the JCache (JSR107) API, anyway,  in an auto-scalable architecture you are rather going to need a distributed cache solution.

The Apache Kafka ecosystem (along with HBase, Hadoop, MapReduce...) is based on ZooKeeper, and BlazingCache can be the perfect companion for Kafka Streams applications as BlazingCache exploits ZooKeeper as coordination service. Furthermore BlazingCache provides an unique architecture for this kind of lightweight processing nodes architectures.

In BlazingCache, cached data is retained only locally to the JVM actually using it, with the platform adding the little bit of coordination needed to guarantee consistency between each cache instance. Thus, there is no need for multicast or other advanced discovery/peer-to-peer services which are ofter difficult to setup in this changing world of containers and cloud based services.

BlazingCache has been designed to work very well in environments in which data is strongly sensitive to location, where is very likely that your cached data is partitioned among processing nodes.
This is very common in multi-tenancy applications and several applications of Kafka Streams tend to partition data to different nodes, as the normal load distribution is bound to Kafka partitioning.

This simple example, contributed by Nicolò Boschi, https://github.com/nicoloboschi/kafkastreamsblazingcache is a good starting point to understand how simple it is to use a distributed JCache provider with Kafka Streams.

CachingProvider cachingProvider = Caching.getCachingProvider();
try (javax.cache.CacheManager cacheManager = cachingProvider.getCacheManager(
    cachingProvider.getDefaultURI(),
    cachingProvider.getDefaultClassLoader(),
    cacheConfiguration);) {
    try (Cache<String, String> cache
        = cacheManager.createCache("ip-location-cache", new MutableConfiguration<>());) {
        KStreamBuilder builder = new KStreamBuilder();
        KStream<String, String> startLines = builder.stream(stringSerde, stringSerde, input_topic);
        MockGeoLocationService geoLocationService = new MockGeoLocationService();
        KStream<String, LogLine> finalLines = startLines
            .mapValues((value) -> {
                return LogLine.parse(value);
            })
            .mapValues((LogLine logLine) -> {
                String ip = logLine.getIp();
                if (ip != null) {
                    String location = cache.get(ip);
                    if (location == null) {
                        location = geoLocationService.findLocation(ip);
                        cache.put(ip, location);
                    }
                    logLine.setLocation(location);
                }
                return logLine;
            });
        finalLines.to(stringSerde, logLineSerde, output_topic);
        KafkaStreams streams = new KafkaStreams(builder, props);
        streams.start();
        Thread.sleep(5000L);
        streams.close();
    }
}

Some useful links:
- BlazingCache docs: https://blazingcache.readme.io/