Sometimes you have to transform your data by adding value taken from external resources, like WebServices, Databases, Paid services, which is going to slow down all of your processing pipeline.
Consider for instance the case of looking up GeoLocation information for several IP address while processing a stream of hits on a web site, for each IP address you have to call an external service and wait for a response.
In this cases cache may help and you would propably start by using the JCache (JSR107) API, anyway, in an auto-scalable architecture you are rather going to need a distributed cache solution.
The Apache Kafka ecosystem (along with HBase, Hadoop, MapReduce...) is based on ZooKeeper, and BlazingCache can be the perfect companion for Kafka Streams applications as BlazingCache exploits ZooKeeper as coordination service. Furthermore BlazingCache provides an unique architecture for this kind of lightweight processing nodes architectures.
In BlazingCache, cached data is retained only locally to the JVM actually using it, with the platform adding the little bit of coordination needed to guarantee consistency between each cache instance. Thus, there is no need for multicast or other advanced discovery/peer-to-peer services which are ofter difficult to setup in this changing world of containers and cloud based services.
BlazingCache has been designed to work very well in environments in which data is strongly sensitive to location, where is very likely that your cached data is partitioned among processing nodes.
This is very common in multi-tenancy applications and several applications of Kafka Streams tend to partition data to different nodes, as the normal load distribution is bound to Kafka partitioning.
This simple example, contributed by Nicolò Boschi, https://github.com/nicoloboschi/kafkastreamsblazingcache is a good starting point to understand how simple it is to use a distributed JCache provider with Kafka Streams.
CachingProvider cachingProvider = Caching.getCachingProvider();
try
(javax.cache.CacheManager cacheManager = cachingProvider.getCacheManager(
cachingProvider.getDefaultURI(),
cachingProvider.getDefaultClassLoader(),
cacheConfiguration);) {
try
(Cache<String, String> cache
= cacheManager.createCache(
"ip-location-cache"
,
new
MutableConfiguration<>());) {
KStreamBuilder builder =
new
KStreamBuilder();
KStream<String, String> startLines = builder.stream(stringSerde, stringSerde, input_topic);
MockGeoLocationService geoLocationService =
new
MockGeoLocationService();
KStream<String, LogLine> finalLines = startLines
.mapValues((value) -> {
return
LogLine.parse(value);
})
.mapValues((LogLine logLine) -> {
String ip = logLine.getIp();
if
(ip !=
null
) {
String location = cache.get(ip);
if
(location ==
null
) {
location = geoLocationService.findLocation(ip);
cache.put(ip, location);
}
logLine.setLocation(location);
}
return
logLine;
});
finalLines.to(stringSerde, logLineSerde, output_topic);
KafkaStreams streams =
new
KafkaStreams(builder, props);
streams.start();
Thread.sleep(5000L);
streams.close();
}
}
Some useful links:
- BlazingCache docs: https://blazingcache.readme.io/
- A tutorial about Apache Kafka Streams: https://www.confluent.io/blog/introducing-kafka-streams-stream-processing-made-simple/