sabato 6 aprile 2019

BlobIt - a Scalable Binary Large Object Storage


I am really excited to introduce BlobIt, a brand new distributed object storage.
A distributed object storage is a network service that allows you to store raw data (like files, but without a filesystem).
This kind of services do not need a shared medium like expensive shared arrays of disks, SAN...., but the system replicates data over several machines.
These machines usually are helds in several racks and a single cluster may span even more than one datacenter.

BlobIt is mostly an application layer on top of Apache BookKeeper which provides powerful primitives to store data in large clusters at Internet scale.
BookKeeper is able to handle large amounts of machines, held in several racks and and even datacenters, and BlobIt exploits all of the its magic features.

BlobIt is the data storage layer of EmailSuccess, a power full MTA (mail transfer agent) but it can be used by any Java application that needs a solution for distributed storage.

How it works ?

BlobIt stores Blobs (Binary Large Objects) on Bookies, a Bookie is the  'BookKeeper server' part.

This picture show how it works


In BookKeeper data is organized in form of immutable streams of raw data named ledgers, each ledger is identified by a 64bit positive integer number.

BookKeeper does most of the hard work:
- replication
- consistency
- server discovery
- failure handling
- an efficient network protocol
- store data in very efficiently
- separate write path from the read path
- keep an in-memory read cache on the server in order to provide fast access to data that have written recently
- handle the ability to spread data across multiple disks per machine (both for journal and for data)

See BookKeeper documentation in order to understand better BookKeeper architecture, benefits and possible configurations.

BlobIt introduces a layer of abstraction of BookKeeper that makes: the client deals with Buckets which are partitions of the whole data space and Objects which are like the files in a traditional filesystem.

Each write returns an ObjectId which is a Smart ID which lets the reader access data by issuing network calls directly to the Bookies: the id itself contains all of the information needed to access data: ledger id, position, total size, number of parts.
So there is no "BlobIt server", the decentralized architecture of BookKeeper is fully exploited and there is no need to have a central server which proxies requests.

We also need a Metadata services, and currently we are using HerdDB which is in turn a Distributed Database also built upon Apache BookKeeper.

HerdDB architecture is fully decentralized, tables are grouped into tablespaces, each tablespace has one or more replicas and it is independent from the others.
HerdDB stores its white-ahead log on BookKeeper, and this way the write path of metadata flows thru bookies and leverages BookKeeper architecture.

BlobIt creates an HerdDB tablespace to store metadata for each bucket.
Usually you are going to co-locate the leader of the tablespace that holds metadata for a specific bucket nearby the clients that are writing data to that bucket and you will keep replicas on another rack (readers do not need the metadata service).

BookKeeper and HerdDB rely heavily on Apache ZooKeeper to store low level metadata and to implement service discovery.


Stay tuned !

We release this week the 0.2.0 version of BlobIt, the project is growing rapidly and we are going to implement a lot of new features soon.

If you are interested in this project join us on GitHub or ping me directly on LinkedIn

venerdì 15 settembre 2017

HerdDB a distributed JVM-embeddable database built on Apache BookKeeper


At Diennea we developed EmailSuccess, a powerfull MTA (Mail Transfer Agent), designed to deliver millions of email messages per hour to inboxes all around the world.
We have very particular requirements for the database layer of EmailSucces, we want a system which:
  • is ultra fast for writes
  • can be replicated among tens of machines. without any shared medium or SAN
  • can handle thousands of tables
  • can handle multi-table transactions
  • can handle multi-table queries
  • scale horizontally on the number of tables, simply adding macchines
  • can optionally run inside the same process of the JVM which runs the main service
  • can be controlled by using SQL language and the JDBC API
  • can support indexing of data
We have been using Apache HBase for long time for our internal business but HBase (and other Big-Data engines) do not satisfy our requirements.
So we designed a new Key-Value database which will be fast enough to handle the write load of the system and then we added an SQL Planner and a JDBC Driver.
We already have great experience of Apache BookKeeper and Apache ZooKeeper projects, as we use them to build sophisticated distributed services, for instance Majordodo (which is open source and you can find it on GitHub and Maven Central), and we decided to use BookKeeper as  write-ahead transaction log and ZookKeeper for group membership and coordination..

HerdDB overview

From the API point of view you can see HerdDB as a traditional SQL-based database, so your are going to issue CREATE TABLE, SELECT, INSERT, JOIN...statements and the system will do what you expect.
But internally it works as a Key-Value engine, accessing to data by the Primary Key is as fast a possible, both for reads and for writes. In fact the primary user of HerdDB, EmailSucess, uses it to store the state of every single email message.
On the Key-Value core we added the ability to run scans and multi-row updates, aggregate functions and so on, this way you can use it like any other SQL database.
The main unit of work for an HerdDB cluster is the tablespace, a tablespace is a group of tables. In the context of a tablespace you can run transactions, joins and subqueries which span multiple tables.
In a cluster for each tablespace a leader node is designated by the administrator (with some kind of auto-healing and auto leader reassignment in case of failure) and all the transactions on its tables are run on that node.
This system scales well by having many tablespaces and so the load can be spread among all the machines in the cluster.
Indexes are supported by using an implementation of the Block Range Index pattern (BRIN indexes), adapted to the way the HerdDB uses to store data.
The database can be accessed from outside the process by using TLS and authentication is performed using SASL with Kerberos or the simpler DIGEST-MD5 mechanism.

The write path

The most critical path for data access in HerdDB is the write path, in particular the INSERT and the UPDATE-by-PK data manipulation statements are the most important for us, together with the GET-by-PK.
The leader of the tablespace keeps in memory a data structure which holds all the PKs for a table in an hash table, and an in-memory buffer which contains all the dirty and/or recently accessed records.
When an INSERT reachs the the server the write is first logged to the log, then the map of valid PKs gets updated and the new record is stored in the memory buffer.
If an UPDATE is issued on the same PK (and this is our primary use case) the update is directly performed in memory, without hitting "data pages" disks, we only write to the log in order to achieve durability.
If a GET comes for the same PK we can read directly the "dirty" record from the buffer.
After a configurable timeout or when the system is running out of memory a checkpoint is performed and buffers are flushed to disk, creating immutable data pages, so usually all the work is in memory, writes are performed serially on the transaction log and when flushing to disk complete data pages are written, without ever modifiing existing files.
This kind of write pattern is very suitable of our use case: data files are always written or read entirely, leveraging the most of OS caches.

Replication and Apache BookKeeper

HerdDB leverages Apache BookKeeper ability to provide a distributed write ahead log, when a node is running as leader it writes each state change to BookKeeper, working as a replicated state machine.
Some features:
  • each write is guaranteed to be "durable" after the ack from BookKeeeper
  • each replica is guaranteed to read only entries for which the ack has been received from the writer (the Last-Add-Confirmed protocol)
  • each leader (the basic storage unit of BookKeeper) can be written only once
For each tablespace you can add a virtually unlimited number of replicas, each 'replica' node will 'tail' the transaction log and replay each data manipulation activity to its local copy of the data.
If a "new leader" comes in, BookKeeper will fence out the "old leader", preventing any further write to the ledger, this way the old leader will not be able to carry on its activity and change its local state: this will guarantee that every node will converge to the same consistent view of the system.
A very good explanation on how this can be done is provided in the BookKeeper tutorial.
Apache BookKeeper servers, called Bookies, can be run standalone but the preferred way is to run them inside the same JVM of the database, leveraging the ability to talk to the Bookie without passing from the network stack.

martedì 11 aprile 2017

Majordodo and fine grained resource allocation

Majordodo is a distributed resource manager, its primary purpose is to manage thousands of concurrent tasks in a cluster of Java Virtual Machines.
It has been designed with multi tenancy as the primary goal, that is to provide a sophisticated facility to allocate resources by distributing tasks all over the cluster.

A typical Majordodo cluster is made of these major components:
- A ZooKeeper ensemble which is used for service discovery and leader election
- A BookKeeper ensemble which provides a distributed write-ahead commit log
- A set of  Brokers, which actually allocate resources and handle the state of each task
- A set of  Workers, which run the tasks and share their local resources
- A set of clients, which submit tasks to the Brokers



Resources

Usually a Majordodo cluster will run many different types of tasks (CPU intensive, RAM bounded, temporary local disk usage requirements) on very heterogeneous machines: slow/fast disks, more/less RAM, many/few CPUs, network connectivity and so on.

Apart from this host-specific resources we usually have to deal with datasources: for instance on each JVM we have a single pool of connections per datasource, and on each SQL Database (PostGre) server we have a global limit of concurrent connections to be respected, the same applies to HBase regions.

The main idea is to be able to allocate tasks in order to leverage most of the resources on each specific machine and to respect global and local limits on local and remote resource usages.
All of this is to be seen at the light of multi-tenancy, that is that every 'logical' user of the system has a limited quantity of resources to use in a given time interval: this way you can have users with guaranteed resources and you can be sure that a single user will not saturate the cluster (or a single resource) with only its own tasks.

Configuring Resource realtime usage constrains with Majordodo

Each user of Majordodo usually is bound to a set of resources, for instance its tasks will work on a specific database or SAN, but at any time this binding can changes, due to other 'external' resource managers which decide to fail over a DBMS to other machines or SLA managers which request to route tasks for some kind of user to 'faster' machines.

The primary settings for Majordodo configuration are:
- the user-resource mapping function on the brokers
- the configuration of each worker

You have to provide a function which tells to the Broker which resources a task is going to use if it gets executed instantly: this way even if the actual resources dedicated to an user change after the submission of a task we can count on the correct allocation of them.

On each Worker you are going to configure:
- the maximum number of concurrent tasks overall
- the maximum number of concurrent tasks for each task types
- a priority for each group of users
- a description of the local resources and the maximum number of tasks using them

Even on the Worker side the configuration is dynamic and can be changed at runtime without restart (which in turn would need to failover all the tasks running on the worker).

One note: all of the dynamic reconfiguration features are currently available using "Embedded Majordodo": you will be running the Broker and the Worker inside JVMs which supply all of the callback to the system.
 


venerdì 30 dicembre 2016

Introducing HerdDB a distributed JVM-embeddable database built on Apache BookKeeper

At Diennea we developed EmailSuccess, a powerfull MTA (Mail Transfer Agent), designed to deliver millions of email messages per hour to inboxes all around the world.
We have very particular requirements for the database layer of EmailSucces, we want a system which:
  • is ultra fast for writes
  • can be replicated among tens of machines. without any shared medium or SAN
  • can handle thousands of tables
  • can handle multi-table transactions
  • can handle multi-table queries
  • scale horizontally on the number of tables, simply adding macchines
  • can optionally run inside the same process of the JVM which runs the main service
  • can be controlled by using SQL language and the JDBC API
  • can support indexing of data
We have been using Apache HBase for long time for our internal business but HBase (and other Big-Data engines) do not satisfy our requirements.
So we designed a new Key-Value database which will be fast enough to handle the write load of the system and then we added an SQL Planner and a JDBC Driver.
We already have great experience of Apache BookKeeper and Apache ZooKeeper projects, as we use them to build sophisticated distributed services, for instance Majordodo (which is open source and you can find it on GitHub and Maven Central), and we decided to use BookKeeper as  write-ahead transaction log and ZookKeeper for group membership and coordination..

HerdDB overview

From the API point of view you can see HerdDB as a traditional SQL-based database, so your are going to issue CREATE TABLE, SELECT, INSERT, JOIN...statements and the system will do what you expect.
But internally it works as a Key-Value engine, accessing to data by the Primary Key is as fast a possible, both for reads and for writes. In fact the primary user of HerdDB, EmailSucess, uses it to store the state of every single email message.
On the Key-Value core we added the ability to run scans and multi-row updates, aggregate functions and so on, this way you can use it like any other SQL database.
The main unit of work for an HerdDB cluster is the tablespace, a tablespace is a group of tables. In the context of a tablespace you can run transactions, joins and subqueries which span multiple tables.
In a cluster for each tablespace a leader node is designated by the administrator (with some kind of auto-healing and auto leader reassignment in case of failure) and all the transactions on its tables are run on that node.
This system scales well by having many tablespaces and so the load can be spread among all the machines in the cluster.
Indexes are supported by using an implementation of the Block Range Index pattern (BRIN indexes), adapted to the way the HerdDB uses to store data.
The database can be accessed from outside the process by using TLS and authentication is performed using SASL with Kerberos or the simpler DIGEST-MD5 mechanism.

The write path

The most critical path for data access in HerdDB is the write path, in particular the INSERT and the UPDATE-by-PK data manipulation statements are the most important for us, together with the GET-by-PK.
The leader of the tablespace keeps in memory a data structure which holds all the PKs for a table in an hash table, and an in-memory buffer which contains all the dirty and/or recently accessed records.
When an INSERT reachs the the server the write is first logged to the log, then the map of valid PKs gets updated and the new record is stored in the memory buffer.
If an UPDATE is issued on the same PK (and this is our primary use case) the update is directly performed in memory, without hitting "data pages" disks, we only write to the log in order to achieve durability.
If a GET comes for the same PK we can read directly the "dirty" record from the buffer.
After a configurable timeout or when the system is running out of memory a checkpoint is performed and buffers are flushed to disk, creating immutable data pages, so usually all the work is in memory, writes are performed serially on the transaction log and when flushing to disk complete data pages are written, without ever modifiing existing files.
This kind of write pattern is very suitable of our use case: data files are always written or read entirely, leveraging the most of OS caches.

Replication and Apache BookKeeper

HerdDB leverages Apache BookKeeper ability to provide a distributed write ahead log, when a node is running as leader it writes each state change to BookKeeper, working as a replicated state machine.
Some features:
  • each write is guaranteed to be "durable" after the ack from BookKeeeper
  • each replica is guaranteed to read only entries for which the ack has been received from the writer (the Last-Add-Confirmed protocol)
  • each leader (the basic storage unit of BookKeeper) can be written only once
For each tablespace you can add a virtually unlimited number of replicas, each 'replica' node will 'tail' the transaction log and replay each data manipulation activity to its local copy of the data.
If a "new leader" comes in, BookKeeper will fence out the "old leader", preventing any further write to the ledger, this way the old leader will not be able to carry on its activity and change its local state: this will guarantee that every node will converge to the same consistent view of the system.
A very good explanation on how this can be done is provided in the BookKeeper tutorial.
Apache BookKeeper servers, called Bookies, can be run standalone but the preferred way is to run them inside the same JVM of the database, leveraging the ability to talk to the Bookie without passing from the network stack.

Getting started using docker

If you want to run Herd on your docker enabled laptop just run:
docker run --rm -it  -e server.port=7000 -p 7000:7000 \
             -e server.advertised.host=$(hostname) \
             --name herddb eolivelli/herd:latest
 
The above command will run HerdDB in foreground mode.
In order to launch the CLI from another terminal use:
docker exec -it herddb /bin/bash bin/herddb-cli.sh \
             -x jdbc:herddb:server:0.0.0.0:7000 -sc

Then you can issue SQL queries:
herd: SELECT * FROM systables
herd: CREATE TABLE mytable (mykey string primary key, myvalue int) 
herd: INSERT INTO  mytable values('a',1)
herd: SELECT * FROM  mytable
 

Follow the instructions at https://github.com/diennea/herddb/tree/master/herddb-docker
You can find basic documentation on GitHub Wiki

Status of the project

HerdDB is Apache2 licensed, at the time of writing this post we are still in ALPHA, but you can already start playing with HerdDB by downloading it from GitHub and building it locally.
We are going to release on Maven Central as we release the first stable version.
Feel free to report bugs on the GitHub bug tracker, to file Pull Requests to contribute with improvements or different use cases.
We will soon setup mailing lists and the official reference documentation

giovedì 17 novembre 2016

Apache Kafka Streams and JCache with BlazingCache

Developing and setting up a distributed stream processing pipeline is quite simple using Apache Kafka Streams, in fact Kafka Streams runtime automatically scales well without changes in your code.

Sometimes you have to transform your data by adding value taken from external resources, like WebServices, Databases, Paid services, which is going to slow down all of your processing pipeline.

Consider for instance the case of looking up GeoLocation information for several IP address while processing a stream of hits on a web site, for each IP address you have to call an external service and wait for a response.

In this cases cache may help and you would propably start by using the JCache (JSR107) API, anyway,  in an auto-scalable architecture you are rather going to need a distributed cache solution.

The Apache Kafka ecosystem (along with HBase, Hadoop, MapReduce...) is based on ZooKeeper, and BlazingCache can be the perfect companion for Kafka Streams applications as BlazingCache exploits ZooKeeper as coordination service. Furthermore BlazingCache provides an unique architecture for this kind of lightweight processing nodes architectures.

In BlazingCache, cached data is retained only locally to the JVM actually using it, with the platform adding the little bit of coordination needed to guarantee consistency between each cache instance. Thus, there is no need for multicast or other advanced discovery/peer-to-peer services which are ofter difficult to setup in this changing world of containers and cloud based services.

BlazingCache has been designed to work very well in environments in which data is strongly sensitive to location, where is very likely that your cached data is partitioned among processing nodes.
This is very common in multi-tenancy applications and several applications of Kafka Streams tend to partition data to different nodes, as the normal load distribution is bound to Kafka partitioning.

This simple example, contributed by Nicolò Boschi, https://github.com/nicoloboschi/kafkastreamsblazingcache is a good starting point to understand how simple it is to use a distributed JCache provider with Kafka Streams.

CachingProvider cachingProvider = Caching.getCachingProvider();
try (javax.cache.CacheManager cacheManager = cachingProvider.getCacheManager(
    cachingProvider.getDefaultURI(),
    cachingProvider.getDefaultClassLoader(),
    cacheConfiguration);) {
    try (Cache<String, String> cache
        = cacheManager.createCache("ip-location-cache", new MutableConfiguration<>());) {
        KStreamBuilder builder = new KStreamBuilder();
        KStream<String, String> startLines = builder.stream(stringSerde, stringSerde, input_topic);
        MockGeoLocationService geoLocationService = new MockGeoLocationService();
        KStream<String, LogLine> finalLines = startLines
            .mapValues((value) -> {
                return LogLine.parse(value);
            })
            .mapValues((LogLine logLine) -> {
                String ip = logLine.getIp();
                if (ip != null) {
                    String location = cache.get(ip);
                    if (location == null) {
                        location = geoLocationService.findLocation(ip);
                        cache.put(ip, location);
                    }
                    logLine.setLocation(location);
                }
                return logLine;
            });
        finalLines.to(stringSerde, logLineSerde, output_topic);
        KafkaStreams streams = new KafkaStreams(builder, props);
        streams.start();
        Thread.sleep(5000L);
        streams.close();
    }
}

Some useful links:
- BlazingCache docs: https://blazingcache.readme.io/

lunedì 16 maggio 2016

BlazingCache 1.8.0 is out


BlazingCache,  is a very "simple" but powerful Distributed Cache, designed with the goal to coordinate many local caches in a cluster of cooperating JVMs.

At Diennea we have lots of JVMs hosting our customers’ tasks, the load of each being assigned only to a little portion of the whole cluster (look at Majordodo for instance). In this case, data locality is fundamental so that it is better to cache such data as nearest as possible to the code processing it.

BlazingCache aims at coordinating many local caches, ensuring that every change to data is propagated to every other node which holds data.

A single BlazingCache coordinator server can handle hundreds of client with a minimal setup

What's new on 1.8 ?


With 1.8.0 release BlazingCache introduces the ability to keep direct references to local Java plain objects.
 
In its initial design, BlazingCache only stored byte[] objects and the Client had always to serialize/deserialize its own model objects.
The original motivation for storing only byte[] is that this way the Client is always forced to clone cached objects, avoiding side effects due to modifications of shared objects.

The key to this new feature are the new putObject/fetchObject methods:


    try (CacheClient client1 = CacheClientBuilder
                .newBuilder()               
                .mode(CacheClientBuilder.Mode.LOCAL)
                .build()) {
            client1.start();
            client1.waitForConnection(10000);

            MyBean myBean = new MyBean();
            client1.putObject("key", myBean, 6000);
           
            MyBean myBean2 = client1.fetchObject("key");
           
            assertSame(myBean,myBean2);
     }


Check out documentation at https://blazingcache.readme.io/


mercoledì 2 marzo 2016

Majordodo - a Distributed Resource Manager built on top of Apache BookKeeper

At Diennea we offer a Software-As-A-Service platform to build applications whose primary purpose is to implement complex direct digital marketing applications, expecially for deliverying email and text messages.

One of our primary business requirements is the ability to provide access to shared resources to a lot of users but giving to every one a configurable amount of resources (multitenancy), in terms of CPU, RAM, SQL/HBase Databases and (distributed) filesystem-like storage.

Other existing projects like Apache Hadoop YARN or Apache Mesos do not provide a fine grained way to allocate resources to tenants; Majordodo has been designed to deal with thousands of users which request executions of micro tasks, it is just like having a very big distributed ThreadPool with complex resource allocation facilities, which can be reconfigured at runtime. Guaranteed Service Level can be changed at runtime even for tasks which have already been submitted to the system.

Majordodo tasks can be very simple tasks, such as sending a single email message or long running batch operations which can continue running for hours.

When a task is assigned to a specific machine (a 'Worker' in Majordodo words) the Broker will follow its execution, monitor it and eventually fail over the execution to another Worker in case of machine failure.

Majordodo has been designed to deal with Worker machines which can fail at any time, which is a fundamental aspect in elastic deployments: to this end, in Majordodo, tasks get simply resubmitted to other machines in a transparent way, according to service level configuration.

Majordodo clients submit tasks to a Broker service using a simple HTTP JSON-based API supporting transactions and the 'slots' facility.

Workers discover the actual leader Broker and keep one and only one active TCP connection to it. Broker-to-Worker and Broker-to-Broker protocol has been designed to support asynchronous messaging and one connection per Worker is enough for task state management. The networking stack scales well up to hundreds of Workers with a minimal overhead on the Broker (thanks to Netty).

Majordodo is built upon Apache BookKeeper and Apache Zookeeper, leveraging these powerful systems to implement replication and face all the usual distributed computing issues.

Majordodo and Zookeeper

Majordodo Clients use Zookeeper to discover active Brokers on the network.

On the Broker side Majordodo uses Zookeeper for many situations: it uses it directly to address leader election, to advertise the presence of services on the network and to keep metadata about BookKeeper ledgers. BookKeeper in turn uses Zookeeper for Bookie discovery and for ledger metadata storage.

Among all the Brokers one is elected as 'leader', clients can connect to any of the Brokers but only the leader can change the 'status' of the system, like accepting task submissions, and handling Workers connections.
 Zookeeper is used to manage a shared view of the list of BookKeeper ledgers. The leader Broker creates new ledgers and drops unused ledgers, keeping on Zookeeper the list of actual ledgers.
Zookeeper allows the Broker to manage this kind of metadata in a safe manner, using CAS (compare and set) operations. Upon accessing the ledger list, the Broker can issue a conditional modification operation requesting it to fail if another Broker took control.

Majordodo and BookKeeper

Apache BookKeeper is a replicated log service which allows Majordodo to implement a distributed commit log with a shared nothing architecture: no shared disk or database is needed to make all the Brokers share the same view of the global status of the system.
The basic unit of work is the Ledger which is an ordered sequence of log entries, each entry being identified by a sequence number.
BookKeeper is ideal for replicating the state of Brokers, the leader Broker has a global view of the status of the system in memory and logs every change to a Ledger.
BookKeeper is used as a write-ahead commit log, that is that every change to the status is written to the log and then it is applied to the in-memory status. The other Brokers (we name them 'followers') tail the log and apply each change to their own copy of the status.
A very good explanation on how this can be done is provided in the BookKeeper tutorial.


Another interesting feature of BookKeeper is that ledgers can only be written once, and if another clients opens the ledger for reading it can automatically 'fence' the writer so as to allow no more writes on that ledger .
In case of leadership change, for instance in case of temporary network failures, the 'old' leader Broker is not able to log entries any more and thus it cannot 'change' the global status of the system in memory.

A shared-nothing architecture

The only shared structures between Brokers are the Zookeeper filesystem and the BookKeeper ledgers, but logs cannot be retained forever, accordingly each Broker must periodically take a snapshot of its own in-memory view of the status and persist it to disk in order to recover quickly and in order to let BookKeeper release space and resources.

When a Broker boots it loads a consistent snapshot of the status of the system at a given time and then starts to replay the log from the time (ledger offset) at which the snapshot was taken. If no local snapshot is available the booting Broker discovers an active Broker in the network and downloads a valid snapshot from the network.

As in Majordodo there is no shared disk or storage service the deletion of ledgers must be coordinated in some way.
We delete old ledgers after a configurable amount of time, for instance when a ledger is not used and has been created 48 hours in the past. When a 'follower' Broker remains offline for more than 48 hours at the time of the boot it need to find another Broker on the network and download a snapshot, otherwise the boot will fail.