An Approach to Achieve Scalability and Availability of Data Stores

 

Today there has been an explosion of the web, specifically in social networks and users of ecommerce applications, that corresponds to an explosion in the sheer volume of data we must deal with. The web has become so ubiquitous that it is used by everyone, from the scientists in 1990s, who used it for exchanging scientific documents, to five-year-olds today exchanging emoticons about kittens. There comes the need of scalability, which is the potential of a system, network, or process to be enlarged in order to accommodate that data growth. The web has virtually brought the world closer, which means there is no such thing as “down time” anymore. Business hours are 24/7, with buyers shopping in disparate time zones. Thereby, a necessity for high availability of the data stores arises. This blog post provides a course of action required to achieve scalability and availability for data stores.

This article covers the following methods to provide a scalable and highly available data stores for applications.

  • Scalability: a distributed system with self-service scaling capability
    • Data capacity analysis
    • Review of data access patterns
    • Different techniques for sharding
    • Self-service scaling capability
  • Availability: physical deployment, rigorous operational procedures, and application resiliency
    • Multiple data center deployment
    • Self-healing tools
    • Well-defined DR tiering, RTO, RPO, and SOPs
    • Application resiliency for data stores

Scalability

With the advent of the web, especially Web 2.0 sites where millions of users may both read and write data, scalability of simple database operations has become more important. There are two ways to scale a system: vertically and horizontally. This talk focuses on horizontal scalability, where both the data and the load of simple operations is distributed/sharded over many servers, where the servers do not share RAM, CPU, or disk. Although in some implementations disk and storage can be shared, auto scaling can become a challenge for such cases.

diagram abstractly illustrating scalability measures. Image by freeimageslive.co.uk – freebie.photography

The following measures should be considered as mandatory methods in building a scalable data store.

  • Data capacity analysis: It is a very important task to understand the extreme requirements of the application in terms of peak and average transactions per second, peak number of queries, payload size, expected throughput, and backup requirements. This enables the data store scalability design in terms of how many physical servers are needed and hardware configuration of the data store with respect to memory footprint, disk size, CPU Cores, I/O throughput, and other resources.

  • Review data access patterns: The simplest course to scale an application is to start by looking for access patterns. Given the nature of distributed systems, all queries to the data store must have the access key in all real-time queries to avoid scatter and gather problem across different servers. Data must be aligned by the access key in each of the shards of the distributed data store. In many applications, there can be more than one access key. For example, in an ecommerce application, data retrieval can be by Product ID or by User ID. In such cases, the options are to either store the data redundantly aligned by both keys or store the data with a reference key, depending upon the application’s requirements.

  • Different techniques for sharding: There are different ways to shard the data in a distributed data store. Two of the common mechanisms are function-based sharding and lookup-based sharding.Function-based sharding refers to the sharding scheme where a deterministic function is applied on the key to get the value of shard. In this case, the shard key should exist in each entity stored in the distributed data store, for efficient retrieval. In addition, if the shard key is not random, it can cause hot spots in the system.Lookup-based sharding refers to a lookup table used to store the start range and end range of the key. Clients can cache the lookup table to avoid single point of failure.Many NoSQL databases implement one of these techniques for achieving scalability.

  • Self-service scaling capability: Self-service scaling, or auto-scaling, can work as a jewel in the scalable system crown. Data stores are designed and architected to provide enough capacity to scale up front, but rapid elasticity and cloud services can enable vertical and horizontal scaling in the true sense. Self-service vertical scaling enables the addition of resources to an existing node to increase its capacity, while self-service horizontal scaling enables the addition or removal of nodes in the distributed data store via “scale-up” or “scale-down” functionality.

Availability

Data stores need to be highly available for read and write operations. Availability refers to a system or component that is continuously operational for a desirably long length of time. Below are some of the methods to ensure that the right architectural patterns, physical deployment, and rigorous operational procedures are in place for a highly available data store.

diagram of the four availability methods discussed in this blog post

  • Multiple data center deployment: Distributed data stores must be deployed in different data centers with redundant replicas for disaster recovery. Geographical location of data centers should be chosen cautiously to avoid network latency across the nodes. The ideal way is to deploy primary nodes equally amongst the data centers along with local and remote replicas in each data center. Distributed Data stores inherently reduces the downtime footprint by the sharding factor. In addition, equal distribution of nodes across data centers causes only 1/nth of the data to be unavailable in case of a complete data center shutdown.

  • Self-healing tools: Efficient monitoring and self-healing tools must be in place to monitor the heartbeat of the nodes in the distributed data store. In case of failures, these tools should not only monitor but also provide a way to bring the failed component alive or should provide a mechanism to bring its most recent replica up as the next primary. This self-healing mechanism should be cautiously used per the application’s requirements. Some high-write-intensive applications cannot afford inconsistent data, which can change the role of self-healing tools to monitor and alert the application for on-demand healing, instead.

  • Well-defined DR tiering, RTO, RPO, and SOPs: Rigorous operational procedures can bring the availability numbers (ratio of the expected value of the uptime of a system to the aggregate of the expected values of up and down time) to a higher value. Disaster recovery tiers must be well defined for any large-scale enterprise, with an associated expected downtime for the corresponding tiers. The Recovery Time Objective (RTO) and Recovery Point Objective (RPO) should be well tested in a simulated production environment to provide a predicted loss in availability, if any. Well-written SOPs are proven saviors in a crisis, especially in a large enterprise, where Operations can implement SOPs to recover the system as early as possible.

  • Application resiliency for data stores: Hardware fails, but systems must not die. Application resiliency is the ability of an application to react to problems in one of its components and still provide the best possible service. There are multiple ways that an application can use to achieve high availability for read and write database operations. Application resiliency for reads enables the application to read from a replica in the case of primary failure. Resiliency can also be part of a distributed data store feature, as in many of the NoSQL databases. When there is no data affinity of the newly inserted data with the existing data, a round-robin insertion approach can be taken, where new inserts can write to a node other than the primary when the primary is unavailable. On the contrary, when there is data affinity of the newly inserted data with the existing data, the approach is primarily driven by consistency requirements of the application.

The key takeaway is that in order to build a scalable and highly available data store, one must take a systematic approach to implement the methods described in this paper. This list of methods is a mandatory, comprehensive list, but not exhaustive, and it can have more methods added to it as needed. Plan to grow BIG and aim to be 24/7 UP, and with the proper scalability and availability measures in place, the sky is the limit.

References

Image by freeimageslive.co.uk – freebie.photography

Rheos

 

Data IS the next currency.  The increased demand for real-time data across almost every business and technology platform has changed the world we live in.  It is no different at eBay.

About two years ago, I was thrilled when I was asked to lead a development team to build a real-time data platform at eBay using Kafka. Initially, it was just for our Oracle change stream. In late 2015, we decided to expand it to a fully managed, secure, and easy-to-use real-time data platform, known as Rheos. The goal of Rheos is to provide a near real-time buyer experience, seller insights, and a data-driven commerce business at eBay.

While Kafka has given us core capabilities in stream processing, managing a large, distributed, highly available, real-time data pipelines running on the cloud across security zones and data centers is hard without automation and core services. Hence, Rheos was built to provide the necessary life-cycle management, monitoring, and well-architected standards and ecosystem for the real-time streaming data pipelines. Currently, the pipelines consist of Kafka, Storm and stream processing applications. Shared and non-shared data streams can be running on these pipelines.

By the end of 2016, nearly 100 billion messages flowed through the pipelines in Rheos daily. In 2017, Rheos is expected to handle 15 times the current traffic.

So, how did we get there?

Concepts

At a very high level, Rheos has these concepts:

  • Data taxonomy is a well-defined convention that classifies and catalogs events into proper namespaces for organizational, ease of discovery, and management purposes.
  • Category is a top-level component in a namespace for a given stream type, for example, monitoring events, click stream events, business events, and so on.
  • Stream captures the logical data flow that leads to a consumable data point in Kafka. The data flow may cut across one or more data points and stream processing units.
  • Domain represents a shard or a group of related topics for a given stream type. Topics in the group are subject to a set of control parameters such as max partitions, max replica, max data retention period, max topic count, and service level agreement, just as examples.
  • Namespace is used to classify the different data streams in Rheos. A namespace is composed of category, stream, and domain

Automation

Lifecycle Management Service

Lifecycle Management Service is a cloud service that provisions and provides full lifecycle management (LCM) for Zookeeper, Kafka, Storm, and MirrorMaker clusters. It is built on a modular architecture with a pluggable extension and frameworks. This combination allows it to create and perform LCM on a stream pipeline running on any cloud platforms (such as OpenStack, AWS, Google Cloud). The Lifecycle Management Service allows you to provision, flex up/down a cluster, or replace a bad node in a cluster. In addition to its CLI API, it is equipped with a RESTful API that allows Rheos Management Service (see the Core Service below) to perform simple operation on a guest instance. For example, the management service can do a rolling start on a troubled Kafka cluster via the Lifecycle Manager API.

Lifecycle Management Service architectural building blocks consist of these components

  • API Server (REST and CLI) — a thin layer that parses, validates, and forwards requests to Task Manager
  • Task Manager (RPC) — a stateful service that creates and executes orchestration workflows on a cluster of nodes
  • Conductor — a component that is responsible for receiving heartbeat information from the guest instances
  • Guest Agent — A lightweight agent that runs on the guest instance; responsible for executing a command from the Task Manager on the instance as well as sending heartbeat metrics to the Conductor
  • Message Queue — a scoped, controlled, and secured way for the communication between the API Server, Task Manager, Conductor and the Guest Agent

The pluggable extension includes these functions:

  • Workflow
  • Monitoring and metrics emitter and aggregator
  • Authentication and authorization
  • Configuration management
  • IaaS (the underlying compute, storage, resource management, etc.)

Core Service

Rheos core service consists of the following components: Kafka Proxy Server, Schema Registry Service, Stream Metadata Service, and Management Service. The following picture captures how these components interact with each other.

Rheos Kafka Proxy Server

One of Rheos’ key objectives is to provide a single point of access to the data streams for the producers and consumers without hard-coding the actual broker names. This allows any open-source Kafka connectors, framework, and Kafka clients written in any programming language to seamlessly produce or consume in Rheos.

To do this, we created a Rheos Kafka Proxy Server that handles Kafka TCP Protocol so that the Proxy Server can intercept any initial connection requests from the clients. Upon receiving the initial connection requests, the Proxy Server identifies which Kafka cluster the topic resides on via the Rheos Metadata Service (described below). Then, the actual broker cnames will be returned to the clients so that the clients can complete the final connection handshake with the brokers.

In addition, Rheos Kafka Proxy Server also allows operations to easily replace a bad node or move a topic from one Kafka cluster to another with very little to no impact to the clients.

Schema Registry Service

To promote data hygiene in Rheos and ease of use for both stream producer and consumer, each event in Rheos must be identifiable with an Avro schema. Rheos has built a Schema Registry Service based on confluent.io Schema Registry. This service hosts data format definition, provides schema versioning and serialization information for each event type. In addition, Rheos users can view, insert, and update the schemas in the registry.

Rheos Metadata Service

Stream Metadata Service provides a system of record for each stream and the associated producer and consumer(s) that are known to the system. Prior to producing to or consuming from a stream, one must “register” the Kafka topic along with the associated schema, stream producer, and consumer with the Metadata Service. With this, Kafka topics, broker list along with the associated schemas can easily be discovered or browsed via Rheos REST API or Portal. More importantly, no hard coding of broker names in the client code! In addition, the Metadata Service also makes it possible for our Management Service and Health Check System to seamlessly monitor, alert, and perform life cycle management operations on streams and the infrastructure that the streams run on.

The recorded information includes the following items:

  • The physical (cluster) location of a topic or a stream processing job/topology
  • Data durability, retention policy, partition, producer, and consumer information
  • Source and target data mirroring information
  • Default configuration for Zookeeper, Kafka, and Storm
  • Topic schema information
  • And more

Management Service

Rheos performs stream, producer, and consumer life cycle management operations with a set of predefined Standard Operating Procedure (SOP) in the Management Service. Each SOP has a series of steps that can be performed on a guest instance via the Lifecycle Management Service. For example, Operations can initiate a rolling restart of a Kafka cluster using one of the SOPs.

Health Check System

This service monitors the health of each asset (for example, a Kafka, Zookeeper, or MirrorMaker node) that is provisioned through the Lifecycle Management Service in these aspects:

  • Node state (up or down)
  • Cluster health
  • Producer traffic, consumer lags, or data loss

It periodically samples data from Kafka topics, performs consumer lag checks, and end-to-end latency checks via Management Service. Upon anomaly or error detection, the service generates an alert via email and/or to eBay Operations. In addition, the Health Check Service records a consumer’s current offset with a timestamp in the primary and the secondary Kafka clusters.

Producer traffic

Producer traffic is closely monitored and can be viewed on the Rheos Portal. To provide a quick visual for a producer’s traffic trending or pattern, the current traffic volume of a stream domain (aka topic group with selected or all partitions) is overlaid on top of its yesterday’s traffic pattern. This way, one can quickly detect if there’s an anomaly with the current traffic.

End-to-end latency

A popular question everyone wants to ask is the data end-to-end latency or consumer lags in a stream pipeline. Rheos Health Check System provides a stream domain’s end-to-end latency by measuring two periods of time:

  • From when an event is published to Kafka to the time when the event is consumed by a consumer
  • From when an event is published to Kafka to the time when the broker writes to disk

Stream consistency check

To quickly remediate a problem in a stream, the Health Check System proactively monitors a set of in-sync replicas (ISR) for a given topic in a stream. In addition, it also ensures that the stream that the topic goes through is consistent spanning across one or more Kafka clusters.

Node status

Last but not the least, our Health Check System also monitors the state of each node in Rheos. At a high level, it provides a quick overview of the cluster health by checking these conditions:

  • Whether a node is reachable or not
  • Whether the primary workload (broker, Zookeeper, etc.) is running or not on a reachable node or not
  • Whether a randomly selected node in a cluster can properly fulfil a request or not

Rheos Mirroring Service

In addition to Kafka’s cluster replication, Rheos Mirroring Service provides high data availability and integrity by mirroring data from source cluster to one or more target clusters. Built around Kafka’s MirrorMaker, the service is used to set up MirrorMaker instances and mirror a group of topics from one cluster to another via a REST API. Through the API, one can start and stop the mirroring of a topic group.

Rheos Mirroring Service consists of these key components:

  • Asset Agent is co-located on a mirroring compute node and responsible for reporting heartbeat metrics to a State Store.
  • Mirror Manager is a REST service that starts and stops the mirroring of a topic group. It is equipped with the intelligence to properly distribute the MirrorMaker instances across the cluster based on a distribution strategy.
  • Configurator is an Ansible playbook that resides on each MirrorMaker node. It is responsible for these functions:
    • Creating the required Kafka producer/consumer properties for a topic group
    • Creating the required directory structure for the instance along with the supervisor configuration
    • Starting or stopping the MirrorMaker instance based on the given source to target mirroring configuration
  • Mirror Bootstrap is a thin Java wrapper that registers and deregisters the MirrorMaker instance in the State Store prior to interacting with the underlying Mirror Maker instance. This allows us to capture the physical and the logical data mirroring activities.

Using the Mirroring Service to achieve high availability

As shown below, data can be mirrored from one region or availability zone to one or more regions or availability zones for highly availablity reasons. To do that, MirrorMaker instances are set up in the target locations to consume data from a source cluster and subsequently publish to target clusters.

Using the Mirroring Service to move data across security zones

In addition, Data Mirroring is used to provide data movement from one security zone to another. As shown below, MirrorMaker instances are set up in the target security zone to consume data from the source security zone over a TLS connection and subsequently publish the received data to the target clusters.

How to access Kafka securely?

To acquire a broker connection, a Rheos client must be authenticated by the eBay Identity Service via the Kafka SASL mechanism. Upon authentication, the client is then further authorized through Kafka’s default pluggable Authorizer via Zookeeper.

In some cases, such as moving data across security zones, TLS is also enabled at the connection level.

Conclusion

Rheos has opened a new chapter in many aspects at eBay.  With Rheos, eBay data can now be securely extracted and moved from a data store, application, or other source to one or more locations in a real-time manner.  Stream processing has opened up new possibilities for eBay businesses, fraud detection, monitoring, analytics, and more at eBay.

Coding Kata Month

 

I’m very lucky to be working at eBay with some of the most talented people I know. More fortunate still perhaps that they indulge me in my regular experiments in making our department a better place to work. I’ve been thinking about what I perceive as deficiencies in coding kata and talking to one of my colleagues at EPD (European Product Development) about this since maybe midyear 2016. As we talked about the similarities and differences in martial arts and coding kata, we began to explore what we might do in order to shift the needle on current coding kata practice.

To that end, we kicked off ‘Kata Month’ in December. It was very much an exploratory exercise to see what would happen if we solved the same kata every day for a month. Rather than do a kata until it was ‘solved’, what if we practiced it daily and with a view to deliberately practicing elements of coding? Truth be told, it very nearly did not happen, and I owe thanks to my manager Paul Hammond, who pushed me to kick off the exercise despite not being completely prepared. My tendency is to over-engineer and given the various pressures of our day to day I’d likely have delayed until January or February to try and have everything as I wanted it. As it turned out, we had enough in place and so with pretty much zero notice, I sent out the following email in December week 1:

Hi all,

For the next four weeks in the London office, we’ll be holding Coding Kata Month. Each day between 11 – 12, you’ll have one hour in which to participate. (Instructions below for week 1)

In martial arts, constant, deliberate practice of fundamentals is key to attaining mastery. In Kendo, there are 10 kata (interestingly, they are done in pairs) — effectively 20 movements to learn. When I first started kendo, the kata were the ‘boring’ bits that I had to do in order to do the fun stuff (beating someone with a stick). The more I did them though, the more I realised there was a richness in them that I hadn’t seen (or had wilfully ignored). Yes, the movements are choreographed, but an understanding of the fundamentals ingrained in them is crucial. There is correctness of physical form, but also distance, timing, and things that are more difficult to perceive without practice — reading your opponent, their movement, their breathing, gauging their readiness.

Deliberate practice to improve these fundamentals is key. The same is true for any skill, be it a musical instrument, carpentry, ballet and also programming. For the next month, we’re going to delve into deliberate practice for programming through kata.
Monday to Thursday are kata day (implementation).
Friday will be for code review/debrief — an opportunity for people to talk about what they learned.

Instructions:
Each day between 11:00 – 12:00 sharp
Complete the Harry Potter coding kata within the constraints set for that day/week.

  • Each time you begin, start from scratch.
    1. Go to our GitHub kata repository.
    2. Create a new repo named day1-<my initials>[-<my pair's initials>].
    3. Clone your new repo.
    4. Open your IDE of choice and create new project in your new repo.
    5. Code…
  • Commit after each Red/Green/Refactor cycle.
  • At the conclusion of the kata:
    • Include a text file listing the participants.
    • Record any thoughts you think are relevant: learnings, assumptions, gripes, ideas, notes for next time, etc.
    • Commit the above notes along with your code.

Week 1 – Individual Practice
Mon –> Thursday — Code solutions
Choose your language — you will be sticking with this language for a while, so choose carefully!
Repeat the kata each day.
Use the same language, same IDE.
Friday –> Code review (group)
On Friday, we’ll get together as a group and talk about what we learned and look at some different examples of your solutions.

Weeks 2–4 will change things up a little. Here’s a taste of what is to come:
Week 2 — Pairing
Week 3 — Design variation and mobbing
Week 4 — Open

Honestly, I was a little taken aback at how enthusiastically the initiative was picked up by the teams. I figured they might get a kick out of it, but they grabbed the idea and ran with it. They talked about it over lunch, they talked about it across teams. After a long and challenging year, it was great to see the crew jumping in with so much energy.

I dived in with equal enthusiasm. Honestly, I’d not coded in anger in well over a year, and I was painfully rusty. On day one, I realised how much I’d forgotten about TDD and got an embarrassingly small amount of code written. On day 2, I sort of hit my groove and worked out where I wanted to go with a solution. On day 3, I’d nailed a working solution to the problem, and by day 4, I knocked it out in about 20 minutes and started looking at how to evolve the data structures I’d chosen to make my solution extensible. I was feeling pretty good about myself.

I sat down to pair with one of our programmers in week 2. At the end of the first session I had the humbling experience of seeing just how much I had to learn about TDD (not to mention intentional programming and various design patterns). The other thing it did was make me realise just how rich this area of kata could be. Having an interesting problem to solve was one thing, but putting together a repeatable solution that incorporates a contextually appropriate use of both fundamental and advanced programming skills has so much potential.

I won’t give you a detailed rundown of the entire month; suffice it to say there were some interesting things to come out of it. Some of them code-related, some not.

For example, we stipulated one hour for kata between 11 and 12 (just before most people go to lunch). The consensus was after a couple of weeks that this was quite disruptive to the day overall. The teams had standup in the morning, then a small amount of time to work before kata started, then lunch and then the afternoon. Productivity-wise, there was the general feeling that half the day was gone before any project work got done. For future iterations of kata month, we’ll kick off the day with kata. If nothing else, at least that way folks are starting the day writing code — something that you don’t always get to do despite best intentions.

Another interesting thing that came out of our Friday review sessions was that some people were bored after ‘solving’ the kata. This was what I really wanted to address — that kata are not a thing to be ‘solved’, but a way to practice fundamentals. To some extent this was helped by the variety from week to week (individual, pairing, mobbing, etc.), but we also discussed using the time to work on weak points or selecting a different approach to solving the problem or even making more effective use of the IDE to do some of the heavy lifting. In hindsight, this might have been different if I’d spent more time setting the scene at the beginning, explaining how kata work in martial arts and what I was expecting. It also helped reinforce to me the importance of having a repeatable solution in place. Having a repeatable solution takes the ‘solving’ part out of the equation and lets you focus on practice of implementing a solution (more on that in a future post).

At the end of the month, I ran a retro and put out a survey to the participants. I’d like to share some of the responses.

What were your major take-aways from Kata Month?

responses to What were your major take-aways from Kata Month?

What changes would you like to see for the next time we run this exercise?

responses to What changes would you like to see for the next time we run this exercise?

It was interesting to see the various viewpoints of the people that participated, what their preconceptions and assumptions were, and how they changed over time. As far as our Friday sessions went, they were quite unstructured and in hindsight we could have made a lot more of them. We looked through some code, but with the exception of week 3 where we did an impromptu mobbing session, we didn’t really demo any writing of code. Given my views on kata as a visual teaching and learning aid, that feels like an opportunity missed.

Setting expectations early on was also a recurring theme. I think there is a place for some amount of ritual to designate a mental shift required for working on kata. It need not be elaborate, but something that puts the practitioner in the mindset of deliberate practice. In that way, the goal and the aim is clear — execute the kata in order to practice your fundamentals.

We talked also about the fact that this was a ‘greenfields’ kata and that it might be useful to try to do a kata along similar lines that was refactoring existing code that had issues of varying kinds. There are refactoring kata out there, but I quite like the idea of having kata that exist in pairs to exercise similar principles in both greenfields and brownfields situations, possibly even having kata whose solution works for one situation but needs refactoring to suit another. There are subtly different skills involved in selecting a particular design pattern to implement a solution versus recognising when existing code should be refactored to use that pattern.

Since kata month finished, I’ve put together a small working group of interested folks with the aim of putting together some kata of our own. We’re working to that end now, to come up with a problem and a solution that is representative of the skills required by an EPD programmer. My intention, once we have something that works for us, is to then share those with the wider world. In the meantime, there is no shortage of kata ‘problems’ out there, but very few of them are accompanied with a solution. About the only one that springs to mind is Bob Martin’s Bowling Kata. I think there is certainly scope for other existing kata to similarly have repeatable solutions designed for them — not simply ‘solved’, but achieving a repeatable solution deliberately designed to exercise fundamentals and good design principles in context.