Practical NoSQL resilience design pattern for the enterprise


Building applications resilient to infrastructure failure is essential for systems that run in a distributed environment, such as NoSQL databases. For example, failure can come from compute resources such as nodes, local/remote clusters, network switches, or the entire data center. On occasion, NoSQL nodes or clusters may be marked down by Operations to perform administrative tasks, such as a software upgrade and adding extra capacity. In this blog, you can find information on how to build resilient NoSQL applications using appropriate design patterns that are suitable to your needs.

Note that the resilience design pattern described in this blog is complementary to other patterns, such as the Hystrix library. The difference is that it focuses on the following abstractions:

  • NoSQL database architecture and capability abstraction, such as replication strategy, durability, and high availability
  • Infrastructure and environment abstraction, such as standard minimal deployment, disaster recovery, and continuous operation
  • Application abstraction, such as workload, performance, and access patterns

Why resiliency design patterns for NoSQL databases?

Enabled by more efficient distributed consensus protocols, such as Paxos and Raft, and new thinking, such as flexible database schema, NoSQL databases have grown out of early adoption and are gaining popularity among mainstream companies. Although many NoSQL vendors tout their products as having unparalleled web-level scalability and capabilities such as auto-failover, not all are created equal in terms of availability, consistency, durability and recoverability. This is especially true if you are running mission-critical applications using heterogeneous NoSQL systems that include key-value (KV) pairs, column families (CF), and document and graph databases across a geographically dispersed environment.

It becomes imperative that a well-run enterprise optimizes its management to achieve the highest operational efficiency and availability. To accomplish this, an enterprise should not only empower application developers with well-defined NoSQL database resilience design patterns so that they can be relieved from preventable infrastructure failure, but it should also provide a guaranteed-availability Service Level Agreement (SLA) on these resilience design patterns.

NoSQL resiliency design pattern consideration

Given the enormous complexity associated with integrating an enterprise-class infrastructure, such as eBay, developing meaningful and usable NoSQL resilience pattern is more than just one dimension — it comprises a multitude of coherent and intricately interconnected cogs such as these:

  • Use case qualification
  • Application persistence error handling
  • Technology stack and engineering framework
  • Data center infrastructure (for example, public/private/hybrid cloud) configuration and setup
  • NoSQL database-agnostic and -specific resilience abstraction
  • Operation and management best practice and standard operation procedure (SOP)

Since NoSQL databases are not panaceas, ill-suited applications can disrupt operations and cause friction between stakeholders within the organization. The first step toward defining a meaningful NoSQL database resilience pattern is to ensure that only qualified use cases will use it. With that, we recommend the following guidelines when qualifying new NoSQL use cases.

NoSQL qualifying guidelines

NoSQL resiliency design pattern approach

One main objective of a NoSQL resilience design pattern is that it should support a wide range of use cases across different NoSQL databases. To achieve this objective, we devised the following steps in our approach:

  1. Identify a meaningful NoSQL database architectural abstraction based on the CAP theorem, ACID/BASE properties, and performance characteristics.
  2. Categorize and define types of different resilience patterns based on workload, performance, and consistency properties that are meaningful to applications.
  3. Define a standardized minimal NoSQL deployment pattern for common small-to-medium, non-mission critical use cases.
  4. Define an enhanced design pattern to support mission-critical use cases that require high availability, consistency, durability, scalability, and performance.
  5. Define other design patterns to support non-conforming use cases, for example, standalone without disaster recovery (DR), and application sharding.

Given that JSON has become the de facto standard data format for web-centric e-commerce businesses like eBay, this blog will use two of the top document-centric NoSQL databases (MongoDB and Couchbase) to illustrate our proposed resilience design pattern.

DB comparison

Although a tutorial on MongoDB and Couchbase is beyond the scope of this blog, the high-level comparison between them in Table 1 helps illustrate their differences. It also helps explain how these differences influence respective resilience design patterns for each product.

Table 1. Comparison of MongoDB and Couchbase
MongoDB Couchbase
* Throughout this blog post, for convenience we designate one Couchbase cluster per data center even though multiple clusters can be set up per data center. However, nodes in the same Couchbase cluster do not span across multiple data centers.
Name Replica set/sharded cluster “Cluster”*
Architecture Master-slave Peer-to-peer
Replication Master to slave (or primary to secondary) nodes Local cluster Primary to replica nodes
Multiple cluster Bi- or uni-directional Cross Data Center Replication (XDCR)
Sharding Optional Built-in
Quorum Read Yes No
Write Yes

From the comparison above, we observe the following characteristics when designing a resilience pattern design:

  • NoSQL databases tend to simplify their resilience pattern if they are based on peer-to-peer architecture.
  • NoSQL databases tend to complicate their resilience pattern if they lack quorum read/write, an important capability to support global read and write consistency.

Lastly, we found it helpful to organize resilience design patterns into different categories, as shown in the Table 2. For brevity, we will focus the examples on the following three categories: workload, durability, and application sharding.

Table 2. Categories of Resilience Design Patterns
Category Pattern
Workload General purpose mixed read and write
Performance High performance read and/or write
Durability 100% durability
High local and/or cross data center durability
High availability (HA) High availability local read and write
High availability multi-data center read and write
High Read and write consistency High local data center read and write consistency
High multi-data center read and write consistency
Others Administration, backup and restore, application sharding …

Standard minimal deployment

As mentioned earlier, one key objective of NoSQL resilience design patterns is to relieve applications and Operation of preventable infrastructure failure. It is our opinion that, disregarding the size or importance of applications, enterprise-class NoSQL clusters should be able to handle node, cluster, site or network partition failure gracefully with built-in preventive measures. Also, they should be able to perform disaster recovery (DR) within reasonable bounds. This assumption warrants the need of the “standard minimal deployments” described below.

MongoDB standard minimal deployment

According to our experience managing some of the largest 24×7 heterogeneous NoSQL clusters in the world, we recommend the following “standard minimal deployment” best practice for enterprise-class MongoDB deployments:

  • Whenever possible, nodes in the same MongoDB replica set will be provisioned in different fault domain to minimize any availability risk associated with node and network failure.
  • Standard MongoDB deployment (for example, type 1 in Table 3) always includes third data center with two or more secondary nodes. For applications that do not have traffic in a third data center, light-weight arbiter nodes should be used for cross-data center quorum voting during site or total data center failure.
  • With few exceptions (for example, type 2 below), a MongoDB replica set can be deployed in only one data center if applications do not require remote DR capability.
Table 3. MongoDB Standard Minimal Deployments
Deployment Type MongoDB Replica Set Nodes Configuration Description
Data Center 1 Data Center 2 Data Center 3
1 Standard deployment with built-in DR capability Minimum three nodes, which include one primary and two secondary nodes Minimum two secondary nodes Minimum two secondary or arbiter nodes.This is to ensure that should any one data center experience total failure, it is not excluded from quorum voting for new primary node. In selected high traffic or important use cases, additional secondary nodes may be added in each data center. The purpose is to increase application read resilience so that surviving nodes won’t be overwhelmed in case of one or more secondary nodes fail in any data center.
2 Special qualified use case without DR Three (one primary, two secondary nodes) This deployment pattern is for use cases that do not require remote DR since applications can rebuild the entire dataset from an external data source.Furthermore, the lifespan of data can be relatively short-lived and may expire in a matter of minutes, if not seconds. In selected high traffic or important use cases, additional secondary nodes may be added to the replica set to help increase resilience.

The following diagram illustrates the MongoDB “standard minimal deployment” pattern.

Mongo deploy

During primary node failover, one of the available secondary nodes in either data center will be elected as the new primary node, as shown in the following diagram.

Mongo before failover

During site or data center failure, one of the available secondary nodes in other data centers will be elected as the new primary node, as shown in the following diagram.

Mongo after failover

Couchbase standard minimal deployment

With peer-to-peer architecture, Couchbase’s “standard minimal deployment” does not require a third data center since it does not involve quorum voting when failing over to a new primary node. Arguably, increasing minimal nodes from four to five or six or more per data center can help strengthen operational viability should two or more nodes in the same data center fail at the same time (itself an unlikely scenario if they were provisioned in different fault domains in the first place). With that, we feel that a minimal four nodes per data center are sufficient for most Couchbase use cases to start with.

Table 4. Couchbase Standard Minimal Deployments
Deployment Type Couchbase Replica Set Nodes Configuration Description
Data Center 1 Data Center 2 Data Center 3 (Optional)
1 Standard deployment with built-in DR capability 4+ nodes 4+ nodes 4+ nodes See description above
2 Special qualified use case without DR 4+ nodes Same as MongoDB

The following diagram illustrates the Couchbase “standard minimal deployment” pattern where each data center/cluster has two copies of the same document (for example, P1 being the primary copy and R1 the eventually consistent replica copy).

Couchbase deploy

During automatic node failover, the Couchbase client SDK in applications will detect node failure and receive an updated failover cluster map with a topology containing the new location of replica documents that have been promoted to primary. This is shown in following diagram.

Couchbase failover

Although Couchbase supports bidirectional Cross Data Center Replication (XDCR) between geographically dispersed clusters, its current implementation does not offer automatic cluster failover. To address this limitation, Couchbase will support a new feature called Multi-Cluster Awareness (MCA) in a future release (tentatively v4.x) to provide this capability, as shown in following diagram.

Couchbase MCA

High-level MongoDB and Couchbase resilience capability comparison

Before we talk about the details of an individual resilience design pattern for each product, it helps to understand the capability of MongoDB and Couchbase in terms of high availability, consistency, durability, and DR across local and multiple data centers. The following table highlights their differences.

Table 5. Comparison of MongoDB and Couchbase Resilience Capabilities
NoSQL Database Data center High Availability High Consistency High Durability DR
* MCA (Multi-Cluster Awareness) and TbCR (Timestamp-based Conflict Resolution) will be available in a future Couchbase v4.x release.
MongoDB Local DC No for Write
Yes for Read
Yes Yes No
Multi DC Yes
Couchbase Local DC No for Write
Yes for Read
Yes Yes No
Multi DC Yes with MCA* No Yes with MCA and TbCR* Yes

From the above comparison, we observe one similarity between MongoDB and Couchbase in their support for high-availability writes or the lack of. This is because both products limit their writes to a single primary node only. Nonetheless, Couchbase does plan to support high-availability write for multiple data centers through its future Multi-Cluster Awareness feature, which will help alleviate this limitation. This is the reason why Couchbase’s standard minimal deployment pattern requires at minimum two data centers/clusters.

MongoDB resilience design pattern examples

Before we show MongoDB’s resilience design pattern examples, it is important to understand how data loss can happen in MongoDB if applications do not use a proper resilience design pattern.

Non-synchronous writes

Let’s assume that you have a MongoDB replica set comprised of three nodes. The following operations illustrate how data loss can happen:

  • First, the application writes five documents and receives write confirmation from the primary node only.
  • The first three documents are replicated successfully at the second secondary node, and two documents are replicated at the third secondary node.

    Mongo Non-Synchronous Writes A

  • The primary node fails before all five documents reach both of the two secondary nodes

    Mongo Non-Synchronous Writes B

  • Quorum voting re-elects the second secondary node as the new primary node because it receives the third document, that is, more recently than the third secondary node

    Mongo Non-Synchronous Writes C

  • The original primary node steps down to be a secondary and rolls back its fourth and fifth documents, since they didn’t reach other two secondary nodes

    Mongo Non-Synchronous Writes D

  • In effect, the application loses the fourth and fifth documents even though it receives confirmation from the original primary node. The issue associated with this type of data loss (non-synchronous write) occurs because that application didn’t apply the “quorum write” option to majority nodes in the same replica set.
  • In addition to “quorum write”, one should also enable MongoDB’s write-ahead logging journal file to further increase primary node write durability.

Since non-synchronous write is just one type of issue that can happen to applications, it is not sufficient to develop a pattern just to solve it. Having various add-on patterns, like synchronous write and others, on top of the MongoDB standard minimal deployment patterns helps enrich the overall NoSQL database resilience capabilities.

MongoDB write durability pattern

To achieve MongoDB replica-set-wide write durability, an application should use MongoDB’s WriteConcern = Majority option, where a write waits for confirmation from majority nodes across multiple data centers. During primary node failover, one of the majority secondary nodes having the latest committed write will be elected as the new primary node.

One caveat of this pattern is that one should weigh the pros and cons associated with waiting for cross-data center majority nodes confirmation, since it may not suitable for applications that require short latency.

MongoDB Write Durability Pattern

MongoDB read intensive pattern

This pattern is for applications that require high availability reads. With MongoDB’s master-slave replication architecture and ability to scale up to 49 secondary nodes across multiple data centers, it inherently supports highly available reads, provided that the number of healthy secondary nodes in any data center are capable of handling read traffic if one or more fail. Note that even though the current MongoDB replica set can scale up to 50 nodes, only 7 nodes can vote during primary node failover. The following diagram illustrates this capability.

MongoDB Read Intensive Pattern

MongoDB extreme high write pattern

Use cases requiring extreme high writes can use MongoDB optional sharding since it allows horizontal write scaling across multiple replica sets (shards). Each shard (or replica set) stores part of the entire dataset as defined by a predefined shard key of the collection. With a well-designed shard key, MongoDB automatically distributes and balances read/write queries to designated shards. Although MongoDB sharding provides horizontal scale-out write capability, this pattern incurs consequent complexity and overhead and should be used with caution:

  • An application should start with sharding using an appropriate shard key from the start instead of migrating from a single replica set as afterthought. This is because migrating a single replica set to a sharded cluster is a major undertaking from both development and operations perspectives.
  • We recommend starting with a predefined number of shards capable of handling capacity and traffic in the foreseeable future. This helps eliminate the overhead associated with rebalancing when adding new shards.
  • Note that MongoDB automatic shard balancing may introduce spikes during chunk migration and can potentially impact performance.
  • Developers needs to understand behavior and limitation on how mongos, a software router process, on queries that do not include shard key, such as scatter gather operations.
  • Developers should weigh the pros and cons of the overhead associated with running mongos as part of application servers vs. in separate nodes.

The following diagram illustrates this capability.

MongoDB Extreme High Write Pattern

Couchbase resilience design pattern examples

Although Couchbase does not support write-ahead logging or quorum write, it achieves high durability through following mechanisms:

  • Local cluster durability — ReplicateTo and PersistTo functions
  • Multi-cluster durability (to be released in a future v4.x release):
    • Multi-Cluster Awareness (MCA) — Without application logic, this feature allows application developers and DBA to define rules on how applications should behave in the event of complete data center/cluster failover.
    • Timestamp-based Conflict Resolution (TbCR) — Based on a server-synced clock, this feature provides Last-Write-Win (LWW) capability on bidirectional replication update conflict resolution to ensure correct cross-data center/cluster write durability.

Couchbase local cluster write durability pattern

For local cluster durability, Couchbase provides the following two functions through its client SDK:

  • ReplicateTo — This function allows writes on the same document to be successfully replicated in memory for all copies in the local cluster. However, it does not guarantee writes to be persisted on disk, which may result in data loss if both primary and replica nodes fail before that happens.
  • PersistTo — For increased durability, applications can use this function so that writes will not only be successfully replicated in memory but also persisted on disk in the local cluster.

Note that even with the second PersistTo function, the current Couchbase version still does not guarantee writes to be successfully replicated to remote data centers/clusters. (This capability is explained in the next section, “Couchbase Multi-cluster write durability pattern.”) The following operations illustrate how both functions work.

  1. Assume that the Couchbase topology contains four nodes per data center/cluster with each storing two copies of the same document replicated through XDCR between clusters.

    Couchbase Local Cluster Write Durability Pattern A

  2. The application in Data Center 1 writes document P1 to node N1.

    Couchbase Local Cluster Write Durability Pattern B

  3. Before P1 is replicated to the replica node in the local cluster or to the remote data center/cluster, node N1 fails, and as a result the application suffers data loss.

    Couchbase Local Cluster Write Durability Pattern C

  4. Before P1 reaches the remote data center/cluster, even though P1 has been replicated successfully in memory to the local cluster replica node N4, if both N1 and N4 nodes fail, the application still suffers data loss.

    Couchbase Local Cluster Write Durability Pattern D

  5. Using the ReplicateTo function can circumvent the failure described in step 3, and using the PersistTo function can circumvent the failure described in step 4, as shown in the following figure.

    Couchbase Local Cluster Write Durability Pattern E

  6. Lastly, for multi-data center/cluster durability, use the design pattern described in “Couchbase multi-cluster write durability pattern.

Couchbase multi-cluster write durability pattern

With its Multi-Cluster Awareness and Timestamp-based Conflict Resolution features, Couchbase supports multi-cluster durability as shown below.

Couchbase Multi Cluster Write Durability Pattern A

In the absence of write-ahead logging or quorum write, and even though Couchbase provides sufficient support for local and multi-cluster durability, one should still ask this question: what is the likelihood that all primary and replica nodes fail in multiple data centers or even worse that all data centers fail completely at the same time? These two unlikely failure scenarios are shown in the following two diagrams. We feel that the odds are next to zero if one follows this proposed Couchbase durability design pattern.

Couchbase Multi Cluster Write Durability Pattern B                         Couchbase Multi Cluster Write Durability Pattern C

Couchbase read/write intensive scalability pattern

With its peer-to-peer architecture and XDCR’s bidirectional multi-cluster/data center replication capability, Couchbase affords users the flexibility to provision clusters with different sizes and shapes tailored for specific traffic/capacity and usage patterns. This flexibility is shown in the following diagram. On the other hand, the cluster-sizing calculation exercise can become complicated. This is especially true if it involves Multi-Dimensional Scaling.

Couchbase Read:Write Intensive Scalability Pattern

Other NoSQL design pattern examples

NoSQL DB-agnostic application sharding pattern

The motivation behind this design pattern is that almost all large-scale mission-critical use cases require high availability. According to our experience, it doesn’t matter which NoSQL database you use or how comprehensive the best practice and SOP you follow, sometimes simple mundane maintenance tasks can jeopardize the overall database availability. The larger the size of database, the more severe the damage it may suffer. This design pattern offers an alternative solution using a divide-and-conquer approach, that is, by reducing the NoSQL database cluster to a small and manageable size through the following mechanisms:

  • Applications shard their data using modular, hash, round robin, or any other suitable algorithm.
  • The DBA and Operations provide a standard-size NoSQL cluster to host each application-level shard.
  • If needed, each application-level shard can further use built-in NoSQL vendor product sharding if it is available.

Using MongoDB as an example, the following diagram illustrates this design pattern. One caveat associated with this pattern is that it requires a middle-tier data access layer to help direct traffic, an effort that must not be underestimated.

NoSQL DB Agnostic Application Sharding Pattern

Future work and direction

In this blog, we described the motivation, consideration, and approach behind our proposed NoSQL resilience design pattern. We overviewed key differences between MongoDB and Couchbase in the context of resilience patterns. We also walked through three NoSQL resilience design pattern examples and a DB-agnostic application sharding pattern. In conclusion, we would like to suggest the following future work and direction on this topic:

  • Provide end-to-end integration of proven NoSQL design patterns with application frameworks and also cloud provisioning and management infrastructure.
  • Formalize the above NoSQL design patterns as officially supported products rather than just engineering patterns.
  • Add other types of resilience patterns, such as high consistency.
  • Add support for other NoSQL databases, for example, Cassandra.
  • Collaborate with NoSQL vendors and develop new resilience patterns for new features and capabilities.

The graphics in Non-synchronous writes are reproduced with the kind permission of Todd Dampier from his presentation “Rock-Solid Mongo Ops“.

How eBay’s Shopping Cart used compression techniques to solve network I/O bottlenecks



eBay’s data storage related to Shopping Cart information relies on two different data stores. There’s a MongoDB store that stores the entire cart contents for a user’s cart, and there’s an Oracle store that stores only the cart summary and basic cart details but has enough hints to recompute the full cart if needed. The Oracle store is used to overcome the persistence and consistency challenges (if any) associated with storing data in MongoDB. It’s easier to think of the MongoDB layer as a “cache” and the Oracle store as the persistent copy. If there’s a cache miss (that is, missing data in MongoDB), the services fall back to recover the data from Oracle and make further downstream calls to recompute the cart.

All MongoDB carts are in JSON format, while the Oracle carts are stored in JSON format in a BLOB column. (These Oracle carts are user-sharded and are used only for OLTP reasons. All other filtering and querying to peek into cart data happens via a data warehouse and denormalized access patterns.)

Figure 1. Storage architecture

While this is not a discussion on the choice of technology (Oracle vs. MongoDB vs. any other database), we are hoping that our experience gives folks an insight into what it takes to identify, debate, solve, and (most importantly) roll out fixes rapidly to a site that handles hundreds of millions of calls across tens of millions of users each day.

Problem statement

Late in 2016, the Shopping Cart service started experiencing a high number of misses at its caching layer, and at around the same time, we started getting alerts from eBay’s Ops team about our MongoDB replicas failing to catch up in a decent amount of time. (MongoDB runs in master-slave configuration and this description is worth a read if you are unfamiliar with the setup.) The setup, which was seemingly working “fine” for the past few years, had suddenly started to hit bottlenecks that were unrelated to any infrastructure or code change. Upon further investigation, this was narrowed down to MongoDB’s oplog hitting network I/O limits. You can think of the oplog as Oracle’s redo logs, except that this has a very critical purpose in making sure that the MongoDB read replicas are refreshed in as real time as possible. Due to this, the read replicas for a GET Cart operation, for example, were returning stale carts and were being rejected as a “cache” miss because of the incorrect (stale) version that was stamped on the cart. This meant that we were doing a lot more recomputes of users’ carts by falling back on the Oracle cart and using that to call downstream systems to formulate the full cart from the basic summary.

Obviously, this was a bad thing since we were making our users wait more than normal. (Cache misses are bad! Recomputes are bad!)


Before we go into specific solutions, we want to call out the numerous other conversations and options that were worth a mention but not selected into the final bucket. For example, once the replicas were not catching up, we ran trials on “hiding” some of the replicas to try and allow them to catch up faster without taking user traffic. We also played around with timeouts and periodic replica reboots, but none of them seemed to cause the issues to die down.

Option 1: sharding (MongoDB)

The first obvious option was to split our JSON data into shardable sections. (Note that the entire cart was stored as a single JSON object.) This allows specific sections (only) to be written to disk and into a separate MongoDB cluster, which in turn reduces the number of writes and I/Os sent across the wire into the single master instance. The challenge with this approach was the re-write of the existing business logic to now understand a seemingly new schema.

Option 2: selective writes

This option would use MongoDB’s set command to update only the specific values changing on each update. While this works in theory, the kind of updates happening on the cart JSON object would involve domino-effect updates, which would pretty much trigger a change across numerous sections of the JSON. Keeping this aside, there was no assurance from the MongoDB folks that this would reduce the amount of oplogs being written. Their concern was that given the large number of selecting updates within a document, it might trigger the entire document update — thus not helping.

Option 3: client-side compressed writes and reads

Given the desire to get our situation fixed quickly, without rewriting the business logic, compression seemed like another logical option to consider. Reducing the amount of data coming into the master should correspondingly reduce the amount of data flowing into the oplog. However, this would convert the data into a binary format and would need clients to understand compressed reads and writes.

Our choice

Considering various options and the timings for them, we chose Option 3 as the fastest, cleanest, and the most likely to succeed at first try. The biggest assumption was that the CPU hit taken by the clients on compression and decompression would be well worth the savings in terms of the latency on the MongoDB side. One advantage we had is that we had already embraced SOA, and our service encapsulated all the calls in and out of the MongoDB cluster, so it was only our service (“client” to MongoDB) that needed to change.

Goals and considerations

Before we got started to do something as fundamental as this, we had to get our goals right first. Here were our simple premises.

  • Allow carts to be compressed and persisted into MongoDB (no change of the data store at this time).
  • Allow a choice of compression codecs and seamless shifting between reading with one codec and persisting/writing with another, in case there’s ever a need to change because of performance reasons.
  • Allow reading of old, new, and intermediate carts, that is, forward and backward compatibility.
    • Old carts can be read by new clients and should work.
    • New carts can be read by old clients and should work.
    • Old carts can be read and persisted as new carts.
    • New carts can be read and persisted as old carts.
  • Implementation should allow for uncompressed and compressed carts to be stored at the same time (not necessarily for the same exact cart).
  • Ultimately, getting rid of raw uncompressed storage in favor of compressed carts.
  • Make sure that there are no use cases needing real-time query of JSON data in the MongoDB store. (The new data will be compressed binary and won’t be query-able).

After a quick round of deliberation, we moved to the next phase of evaluation.

Codecs, codecs everywhere

For our choice of codecs, we had four well-known Java implementations we could choose from:

  • SNAPPY: Google-contributed compressor/de-compressor and heavily used in many Google products, externally and internally. The source code for the SNAPPY codec is available on GitHub.
  • LZ4_HIGH: Open-source software, BSD 2-clause license, running in high-compression mode and requiring the storage capacity for decompression to work. The source code for the LZ4 codec is available on GitHub.
  • LZ_FAST: Same as above, just running in fast compression mode.
  • GZIP: the Java-provided implementation

We wrote a benchmark with sample cart data (all sizes in bytes) from production across these libraries to choose what our initial choice of algorithm would be. To make sure that they are not impacted by CPU or I/O spikes on the local machine, these tests were run multiple times to make sure that the results (sample shown here below) are consistent.

Running test for: SNAPPY
Orig_size    Compressed   Ratio    Comp_time   Decom_time    Total_time
687          598          1.149    188.062ms   1.683ms       189.745ms
786          672          1.170    0.079ms     0.193ms       0.272ms
848          695          1.220    0.081ms     0.227ms       0.308ms
15022        3903         3.849    0.296ms     3.181ms       3.477ms
17845        4112         4.340    0.358ms     2.268ms       2.627ms
45419        9441         4.811    0.775ms     5.999ms       6.774ms
55415        10340        5.359    0.860ms     3.851ms       4.711ms
125835       21338        5.897    1.012ms     8.834ms       9.845ms
1429259      179838       7.947    6.248ms     14.742ms      20.990ms
4498990      625338       7.194    19.339ms    64.469ms      83.808ms
Running test for: LZ4_FAST
Orig_size    Compressed   Ratio    Comp_time   Decom_time    Total_time
687          591          1.162    0.091ms     16.615ms      16.705ms
786          658          1.195    0.020ms     26.348ms      26.368ms
848          685          1.238    0.026ms     15.140ms      15.166ms
15022        3539         4.245    0.065ms     17.934ms      17.999ms
17845        3712         4.807    0.096ms     16.895ms      16.991ms
45419        6224         7.297    0.197ms     18.445ms      18.642ms
55415        6172         8.978    0.164ms     17.282ms      17.445ms
125835       11830        10.637   0.538ms     16.239ms      16.777ms
1429259      49364        28.953   4.201ms     21.064ms      25.265ms
4498990      167400       26.876   14.094ms    26.003ms      40.097ms
Running test for: LZ4_HIGH
Orig_size    Compressed   Ratio    Comp_time   Decom_time    Total_time
687          589          1.166    0.149ms     0.032ms       0.181ms
786          648          1.213    0.064ms     0.012ms       0.076ms
848          676          1.254    0.066ms     0.013ms       0.079ms
15022        3270         4.594    0.232ms     0.041ms       0.273ms
17845        3349         5.328    0.244ms     0.045ms       0.290ms
45419        5192         8.748    0.587ms     0.154ms       0.741ms
55415        5130         10.802   0.666ms     0.102ms       0.769ms
125835       8413         14.957   1.776ms     0.403ms       2.179ms
1429259      17955        79.602   12.251ms    3.162ms       15.413ms
4498990      60096        74.863   35.819ms    8.585ms       44.404ms
Running test for: GZIP
Orig_size    Compressed   Ratio    Comp_time   Decom_time    Total_time
687          447          1.537    0.939ms     0.636ms       1.575ms
786          489          1.607    0.138ms     0.103ms       0.240ms
848          514          1.650    0.099ms     0.109ms       0.207ms
15022        2579         5.825    0.502ms     0.400ms       0.902ms
17845        2659         6.711    0.596ms     0.508ms       1.104ms
45419        4265         10.649   1.209ms     0.755ms       1.964ms
55415        4324         12.816   1.301ms     0.775ms       2.076ms
125835       7529         16.713   3.108ms     1.651ms       4.760ms
1429259      23469        60.900   26.322ms    37.250ms      63.572ms
4498990      69053        65.153   153.870ms   103.974ms     257.844ms

Here are some observations:

  • SNAPPY seems to suffer from a slow start. The time taken for the first run is always notoriously high. It also has not-so-great compression ratios. We were surprised that SNAPPY performed so badly given how much talk it gets.
  • LZ4_FAST decompression times are almost constant, independent of size. It also shows not-so-great compression ratios though.
  • LZ4_HIGH provides great times for both compression and decompression and gets a bit slower at high data sizes.
  • GZIP seems to get worse at high data sizes.

Given these results, LZ4_HIGH seems to be the most optimal code for the following reasons.

  • No slow start issues or performance quirks observed
  • Linear time growth with data sizes
  • Excellent overall performance for small to medium cart sizes (well into the 99th percentile cart size) and reasonable performance at very large cart sizes

However, there’s one caveat. Decompression for LZ4_HIGH expects the output buffer size to be specified in the API call. The memory pre-allocation is likely what enables the faster decompression. It’s a price to be paid for the benefits but something we felt was useful enough to account for it in the final design. So while the decision was clear, the implementation was designed to have all four codecs available as choices to be possible to shift seamlessly between one codec or another (one of our goals as mentioned previously) depending on a future need.

Payload design

A sample payload of today’s shopping cart looks like this. The cart variable is the only interesting piece in the payload.

"_id" : ObjectId("560ae017a054fc715524e27a"),
"user" : "9999999999",
"site" : 0,
"computeMethod" : "CCS_V4.0.0",
"cart" : "...JSON cart object...",
"lastUpdatedDate" : ISODate("2016-09-03T00:47:44.406Z")

Our new payload with support for compressed data looks like this. The cart variable remains unchanged, but the new elements, initially derived from the cart variable, provide support for the Compression feature.

"_id" : ObjectId("560ae017a054fc715524e27a"),
"user" : "9999999999",
"site" : 0,
"computeMethod" : "CCS_V4.0.0",
"cart" : "...JSON cart object...",
"compressedData" : {
"compressedCart" : "...Compressed cart object..."
"compressionMetadata" : {
"codec" : "LZ4_HIGH",
"compressedSize" : 3095,
"uncompressedSize" : 6485
"lastUpdatedDate" : ISODate("2016-09-03T00:47:44.406Z")

Each of the fields in the compressionMetadata field is described below:

  • "compressedData" — The container that stores the compressed cart and metadata about the cart itself that will be used for compression/decompression.
  • "compressedCart" : "...Compressed cart object..." — The compressed data for the cart field in the upper-level container.
  • "compressionMetadata" — The subcontainer that holds the metadata required for decompression and hints for certain codecs (for example, LZ4_HIGH that needs the destination size (uncompressedSize) to work.
  • "codec" : "LZ4_HIGH" — Stored when compression runs and used when decompression runs.
  • "compressedSize" : 3095 — Stored when compression runs and used when decompression runs (used by LZ4_HIGH only, but we do not differentiate)
  • "uncompressedSize" : 6485 — Stored when compression runs and used when compression runs (used by LZ4_HIGH only, but we do not differentiate)

Note that the sizes of the compressed and decompressed data are also stored every time although they are only really used when the codec is LZ4_HIGH. While all of this helped with the need to seamlessly switch between codecs, it also acted as a great source of statistical information for compression metrics for us.

Config-driven approach (versus experimentation)

Once the code rolled out to production, we had two approaches towards experimentation to make sure our implementation works seamlessly. The first was to go with an A/B test, the typical experimentation paradigm, and verify performance via standard reporting that’s already in place. The other option was via back-end config-driven testing. We chose the latter since we were confident that we could figure out issues with pure back-end metrics and have enough logging in place to identify issues. For example, we could make sure that the compressed and decompressed data matched the size stored in the metadata (else log a critical error). We also had alerting built into place that would give us an immediate read if any codec was mis-configured or failed to compress or decompress at run time.

To add to it all, eBay’s services follow the 12-factor approach for the most part. The one factor we focus on here is the config-driven approach that enables us to play around with different codec settings.

In summary, the whole rollout and validation process looked like this:

  1. Configure builds with compression feature = OFF as default.
  2. Roll out to all boxes, with a relaxed rollout template, making sure that there are no inadvertent impacts caused by the feature even when it is turned off. Bake for a few hours.
  3. Pick one random box and turn on compression config for that box ONLY in backwards-compatible mode. This mostly verifies compression, and very seldom is decompression expected to be triggered unless the same single box reads the previously compressed cart.
  4. Turn on the compression config for a few more boxes until all boxes compress but no one reads the compressed data yet. This causes even more increased traffic and worsen the current situation. Necessary evil.
  5. Turn on reading decompression config for one box. Make sure that this box is able to read the compressed field and use ONLY the compressed field.
  6. Like before, turn on reading decompression config for multiple boxes and then for all boxes. Verify across all boxes.

Finally, here are the different configs we used:

  • compressionEnabled = true/false — The master switch that controls whether this feature is enabled or not.
  • compressionCodec = {SNAPPY, LZ4_HIGH, LZ4_FAST, GZIP} — One of the four choices that will be enabled, with LZ4_HIGH being the default choice.
  • WRITE_MODE = One of three modes.
    • NO_COMPRESS — Writes will not write the compressed data fields.
    • DUAL — Writes will write the compressed data fields and the uncompressed/regular cart.
    • COMPRESS_ONLY — Writes will write only the compressed data fields and null out the uncompressed/regular cart field.
  • READ_MODE = one of three modes
    • NO_COMPRESS — Reads will read from the regular cart fields.
    • VERIFY_COMPRESS — Reads will read from both the fields, use the regular cart field, but verify that the compression data is being decompressed correctly. Think of this as “audit” mode.
    • COMPRESS_ONLY — Reads will directly read the cart from the compressed data fields, decompress it, and use it.

Note that not all mode combinations are valid. For example, WRITE_MODE = COMPRESS_ONLY and READ_MODE = NO_COMPRESS is invalid, and care should be taken to avoid that combination.

It might look like overkill, but the key goal to keep the site up and running at all costs with zero impact was playing on our minds all the time. We felt that the critical factor was to have all the different controls at your disposal for any eventuality.

Everything is backwards compatible. Backwards compatibility is everything.

We can’t emphasize this enough. For any site that cares about its Users, this is a critical factor to be factored into any new feature that is rolled out. Our goal was no different, and the diagram in figure 2 captures some of the complexity we faced.

A few things to keep in mind:

  • Do not delete fields that are being deprecated. We added the compressedData field as a new one to our existing JSON structure.
  • Make sure to separate out the code paths in a way and as close to the source of changing data. This practice almost always allows a better code-deprecation option in the future.
  • Backward compatibility is critical even when new code is in the process of rollouts. Do not assume a 30 minute “incompatibility window” and everything will be fine after that. For example, you never know when things simply stall and you will be stuck in limbo for much longer.

For example, Figure 2 shows how our code paths looked while we were in the midst of rollout and both the old and new services were taking traffic. By separating out the logic right at the DAO/DO layer, the rest of the business logic continued as if nothing changed.

Figure 2. Code paths


After a successful rollout, we saw some magical numbers. Our oplog write rates, which had been almost 150GB/hour, were down to about 11GB/hour, a 1300% drop! The average object size of the documents, which had been hovering around 32KB, was down to 5KB, a 600% drop! In fact, we even saw some improvements in our eventual service response times as well. So overall we achieved what we wanted with ZERO production issues, and all our bets and analysis paid fantastic dividends at the end, benefiting our customers.

Figure 3. Before-and-after average object sizes.

Figure 3 shows a screenshot from MongoDB’s Ops Manager UI tool that highlights the slight increase (when we were doing dual writes of compressed and uncompressed data) and the dramatic drop in the average object size from 32KB down to 5KB after we turned the feature on fully.

Figure 4. Before-and-after oplog throughput

Figure 4 shows another screenshot from MongoDB’s Ops Manager UI tool showing the slight increase and the dramatic drop in the oplog throughput after the feature was turned on fully.

Post-release, we also went ahead and removed the “dead code,” which was basically the old logic that worked only on compressed data. Over a period of time, as older carts keep getting refreshed, there will never even be a hint that we even had uncompressed data at some point in time! So how did the machines running the (de-)compression do? Not too bad at all. There was hardly any noticeable change comparing CPU and JVM heap usages before and after the code rollout.

Finally, here are some other scatter graphs for a random hour’s worth of data showing how the compression did in the Production environment. These graphs provide many other interesting observations, but those are left as an exercise for the reader.

Figure 5. A scatter plot showing compression achieved for the uncompressed data plotted against the original (uncompressed) data

Figure 6. A scatter plot showing the behavior of read times (that is, decompression) versus the size of the compressed data

Figure 7. A scatter plot showing the behavior of write times (that is, compression) versus the size of uncompressed data



About three years ago, the team joined eBay, and we started working on eBay Insights with a mission to bring powerful analytics to eBay sellers. Later, seller analytics became a key component of Seller Hub. Seller Hub is eBay’s one-stop portal, providing a unified platform for millions of eBay professional sellers. It is the hub where sellers get access to all the summaries, activities, tools, and insights about their businesses on eBay.

Seller analytics in Seller Hub is powered by Portico — a data store that we built in house to support interactive, sub-second, complex aggregations on top of hundreds of billions of listing, purchase, behavioral activity tracking, and other kinds of records, with dozens of trillions of data elements, covering several years of listing history. Portico is currently handling millions of queries daily providing market GMV, traffic report, competitor analysis, and recommendations for unlikely to sell items through Performance and Growth tabs in Seller Hub.

Implementing such a data store from scratch is a major undertaking, but after evaluating a dozen open-source and commercial solutions, we decided to go for it. As we describe Portico’s architecture below, we’ll point out some of the key considerations that led us to building this system ourselves.

Generally, there are two approaches to big data analytics:

  • OLAP-type systems, where aggregates are precomputed into “cubes” of various granularities (for example, Apache Kylin)
  • Brute-force scanning over data organized in columnar format (for example, Apache Parquet and Druid)

Our problem clearly called for the second, more flexible (albeit more complex) approach. Some important use cases, for example, comparing historical performance of a seller’s listing to its competitors, go well beyond manipulations with precomputed aggregates. Also, we needed the ability to drill down into the original un-aggregated records.

Going into the columnar world, we had to discard job-oriented solutions targeted at “internal” analytics (most are Hadoop/HDFS-based), as we needed a highly available system that scales to millions of users with hundreds of concurrent queries and also provides consistent sub-second performance. We also wanted a solution that keeps most of the data in memory, as reading 100s of MB of data from disk on each query won’t scale with a large number of queries.

So we were after a highly available distributed (many terabytes) in-memory columnar store. This has been done before. One inspiration came from the excellent Google PowerDrill paper, and another was Druid, mentioned above.

Data organization

Two main optimization ideas are behind columnar organization:

  • Columnar data allows much better compression than row data (since column values tend to be similar, and many columns in analytics datasets tend to be sparse).
  • Analytics queries usually touch a small subset of columns from a dataset, so the scanned data can be limited just to the columns that are needed.

The next critical optimization question is how to organize related records in the columnar dataset to minimize the amount of data scanned by a typical query? The efficiency of brute-force scanning can be expressed as:


Where N_{used} is the number of records that went into computing the result, and N_{scanned} is the total number of records that were evaluated. So, if the whole dataset is broken into many fragments (partitions), we want to minimize the number of scanned fragments by putting records that are likely to be used together into the same fragment and by providing a way to eliminate (prune) unneeded fragments prior to scanning.

For eBay seller analytics use cases, there are three such groupings.

Grouping Number of groups Group size (up to)
1. Records related to the same listing billions few thousand
2. Listings belonging to the same seller millions millions
3. Listings in the same leaf category hundreds of thousands tens of millions

Examples of listing categories are Wireless Routers or Women’s Jeans.

Notice that items 2 and 3 are somewhat at odds: if data is broken into partitions by seller (some hash of seller ID, to have a reasonable number of partitions), each partition would contain listings belonging to many different categories, and if data is broken by leaf category, each partition would contain listings from many different sellers.

An important observation is that sellers naturally focus on a few categories, so even very large sellers with millions of listings tend to be present in a relatively small number of categories. In fact, over 98% of eBay sellers are present in fewer than 100 leaf categories. Excluding 85% of eBay sellers who listed under 20 leaf categories from the sample set, over 86% of the remaining eBay sellers are present in fewer than 100 leaf categories. This observation led us to the following organization of our dataset:

  • All records belonging to the same listing are kept together.
  • Fragments are partitioned by leaf category.
  • Large leaf categories are subpartitioned by seller ID hash, so the number of listings in each subpartition is kept at around 1 million. (One million is a very rough target, as there are sellers having millions of listings in the same category, but it still works pretty well for the majority of sellers/categories).
  • Within each subpartition, data is broken into bite-size “pages” with a fixed number of listings (currently 50,000).
  • Records within each subpartition are sorted by listing ID (across all pages).
  • For category pruning, we keep a mapping from seller ID to the list of all categories where the seller ever listed.


The above data organization efficiently handles many kinds of queries/aggregations:

  • Looking up all listing data for a given listing ID (with known seller and category) requires a single page scan: since data within each subpartition is sorted by listing ID, we can locate the page containing the listing by looking at min/max listing ID values stored in page metadata.
  • Aggregating listings in a given category (or a small set of categories) requires scanning just the pages belonging to these categories. Further page pruning can be achieved by evaluating page metadata.
  • Aggregating listings for a given seller is similar to the above: we first look up all categories where the seller is present and prune subpartitions using the seller ID.
  • A small percentage of listings change categories over the lifetime. We keep records for these listings separately in each category and combine them at query time.

But this versatility comes at a cost: as new data is added daily to our dataset, it sprinkles all over the place, as new records are added to existing listings, and new listings are inserted at random positions within pages (eBay listing IDs aren’t strictly incremental), so no part of our dataset can be considered immutable. Effectively, the whole dataset has to be rebuilt daily from scratch. This differs radically from Druid’s approach, where data is organized as a time series, and historical fragments are immutable.

Still, optimization for runtime performance and scalability outweigh the fixed (although high) cost of rebuilding the whole dataset for us. As an additional benefit, we’ve got daily snapshots of our dataset, so we can safely roll back and re-ingest new data to recover from data loss or corruption caused by bugs or data quality issues. Finally, rebuilding the whole dataset makes “schema migration” (introduction of new columns and new kinds of listing entities) almost a routine operation.

Representing hierarchical data in columnar format

As mentioned above, listing data consists of many different kinds of records (entities), and resembles a hierarchical structure:


With data organized in columns, a mechanism is needed to align columns to the same logical record during scanning and also to recursively navigate columns in child records. To align columns at the same hierarchy level, we keep track of the currently scanned row position, and separately for the last accessed positions for all columns ever referenced by the query (query may access different columns on each row iteration). On each column access, we compare column position to the current row position, and skip over a range of column values (if necessary) to align that column to the current row. This alignment logic adds a few CPU cycles on each column access, and it results into a material overhead when scanning billions of values, but it simplifies the query API greatly.


To align child entities, we introduce a special hidden “cardinality” column for each child containing the number of child records corresponding to the parent record. Incrementing the parent row position by 1 results in incrementing the child row position by the corresponding value contained in the cardinality column:


With the above table, we present columnar dataset in the query API as an iterable collection of rows that is conceptually familiar to every developer (in pseudo-code):

for (listing <- listings)
  if (listing.sellerId == 123)
    for (attribute <- listing.attributes)
      for (value <- attribute.values)

The root (listing) iterator goes over all root records, and child iterators automatically terminate at the last child record for the currently iterated parent.

Data types

Analytics datasets contain mostly numeric data, or data that can be mapped to numeric types. In fact, most of the common types can be mapped to 32-bit integers:

  • 64-bit integers (longs) map to two 32-bit integers for low and high bytes
  • Booleans (flags) map to 0 or 1 (compression described below takes care of unused bits)
  • Floating-point numbers can be represented as one or two 32-bit integers depending on the required precision
  • Dates map to the number of days from an arbitrary “base” date (we’ve picked 1/1/2000).
  • Time of day (with seconds precision) maps to the number of seconds from midnight
  • Currency amounts map to integer amounts in the lowest denomination (for example, US dollar amounts are expressed in cents)

In cases when more than one 32-bit integer is needed for type mapping, we use multiple underlying integer columns to represent a single column in the dataset.

Type conversions are expensive when scanning billions of values, so it’s important that basic comparison operations can be performed on unconverted values (that is, the mappings are order-preserving).

We also use dictionaries for columns containing a relatively small number of distinct values, with each value converted to its index in the dictionary. Dictionaries work for any data type (for example, strings), including integers, in cases when a significant portion of distinct values are large by absolute value. Smaller numbers, like dictionary indexes, take less space when encoded/compressed. Dictionary indexes can be selected to preserve ordering, for example, lexicographic ordering for strings.

Another benefit of dictionaries is that the number of occurrences of each distinct value can be stored along with the value in page metadata. This helps with page pruning (for example, skipping pages not containing listings of a specific type) and allows the collection of statistics about encoded data without actually scanning it (for example, what’s the percentage of listings of a given type in a category).

We also support optional Bloom filters in metadata on columns that may be used for page pruning. For example, a Bloom filter on seller ID allows skipping pages that do not contain listings for specified sellers.

Columnar compression

For integer compression, we’ve developed a generic algorithm (IntRunLengthCodec) combining bit packing with Run-Length Encoding (RLE) and optional dictionary encoding. It detects whether RLE or bit packing is preferable for a block of numbers, and uses a heuristic on the percentage of distinct large values triggering use of dictionary. Apache Parquet uses a similar approach combining bit backing and RLE.

During decoding, IntRunLengthCodec supports skipping over ranges of values that is significantly faster than iteration, improving performance of column alignment.

For strings that do not fit into a dictionary, we use LZ4 compression in conjunction with IntRunLengthCodec to store string lengths. (We’ve started with Snappy, but LZ4 turned out a bit faster with similar compression ratios.) Both Snappy and LZ4 operate on blocks, allowing skipping over ranges of bytes for column alignment. Thus string columns are represented by two underlying columns, one with LZ4 compressed strings written back to back and another with the corresponding string lengths.

Compression ratios are highly dependent on the data being compressed. On average we achieve compression ratio of ~30 for the whole dataset, bringing the compressed size down to several terabytes.

Page structure

We keep encoded column data separately from metadata that contains encoded column lengths and codec configurations. Metadata also contains dictionaries, serialized Bloom filters, and column statistics (min/max values, number of empty values, uncompressed data size, etc.)

On disk, column data for a page is written to a single binary file with columns written sequentially back to back. Metadata is written as a (compressed) JSON file. At run time, column data is memory mapped from the binary file, and metadata is loaded into an on-heap structure.


Pages belonging to the same subpartition are kept together as a “page set”. Page sets are dealt with atomically to ensure data consistency at subpartition level. When a newer version of subpartition page set is available, it’s loaded in memory, and the page set reference is swapped before unloading the older version.

At the time of page set loading, we also precompute frequently used aggregates (for example, daily GMV) and compact cross-references (for example, “relisting chains” where the same item was listed multiple times by a seller) across all pages. This data is stored on the heap together with the page set and can be used by queries.

We also provide an LRU cache at page set level for query results, so frequently executed queries can bypass page scanning on repeated executions with same parameters.

Schema API

Portico is implemented in Scala, and we leverage Scala DSL capabilities to provide friendly schema and query APIs.

The schema for a dataset is defined by extending the Schema class. (Code snippets are simplified, and class and method names are shortened for clarity):

class ListingSchema extends Schema("listings") {
  // columns
  val listingId = $long(key = true)
  val sellerId = $int(filtered = true)
  val status = $int()
  val price = $currency()

  // children
  val attributes = new AttributesSchema

class AttributeValuesSchema extends Schema {
  val value = $stringLz4(key = true)

class AttributesSchema extends Schema {
  val name = $stringDictionary(key = true)
  val values = new AttributeValuesSchema

The methods starting with '$' define columns along with configuration. (The '$' prefix was chosen to avoid name collisions in subclasses). In the above code, key = true designates a column as a part of the schema key. Key columns are used when merging incremental data into a page (see “Data ingestion” below). Specifying filtered = true enables Bloom filter creation. Column and child names are captured from Schema class field names via reflection. Column specifications are encapsulated in the ColumnSpecSet class and can be obtained from a schema instance.

At runtime, an instance of a ColumnSet representing a page (with references to memory-mapped column data and metadata) can be bound to an instance of Schema and scanned as follows:

val page: ColumnSet = ...
val listings: Schema = ...
val attributes: Schema = schema.attributes
val values: Schema = attributes.values

while (listings.$hasMore) {
  val listingId = listings.listingId.longValue
  val sellerId = listings.sellerId.intValue
  while (attributes.$hasMore) {
    val name =
    while (values.$hasMore) {
      val value = values.value.stringValue

Care is taken to ensure that column-access methods compile into Java primitives, so there is no primitive boxing and unboxing during scanning.

Column values can be converted into the target type during scanning. (Type annotation is added for clarity.)

val listingStartDate: LocalDate = listings.startDate.convertedValue

Column metadata is available directly from fields after binding, for example:

val sellerIdStats = listings.sellerId.stats

To support schema migration, binding resolves conflicts between columns and children in the schema and in the bound page:

  • Columns are matched by name.
  • Columns found in the page but missing in the schema are dropped.
  • Columns found in the schema but missing in the page are substituted with columns containing type-specific empty values (0 for integers).
  • If column type in the schema differs from column type in the page, automatic type conversion is applied (at scan time).
  • Similarly, children are matched by name with non-matching children dropped or defaulted to all empty columns.

The above behavior can be overridden with an optional SchemaAdaptor passed into the bind() method. SchemaAdaptor allows custom type conversions, renaming columns and children, and also defaulting missing columns via a computation against other columns.

Adding data

Schema iteration behavior over hierarchical records is defined in the RecordSet trait that Schema implements. RecordSet along with ColumnSpecSet (obtained from schema or created separately) serve as the input into ColumnSetBuilder, constructing a sequence of ColumnSets (pages) with fixed number of root records:


There are two other important implementations of RecordSet:

  • StichedRecordSet takes a sequence of RecordSets and presents them as a single RecordSet.
  • MergedRecordSet takes a RecordSet and a hierarchical collection of iterators over sorted incoming records (for example, parsed from CSV), and presents them as a single merged RecordSet. MergedRecordSet uses key fields to decide whether a record needs to be inserted or overwritten and to preserve the ordering of records throughout the hierarchy.porticostitchedrecordset-picture7

Thus, to add new data to a subpartition comprised of a sequence of pages (ColumnSets), we go through the following steps:

  1. Bind existing pages to Schema (with optional SchemaAdaptor for schema migrations) to produce a sequence of RecordSets.
  2. Construct a single StichedRecordSet from the above sequence of RecordSets.
  3. Construct MergedRecordSet from StichedRecordSet with the incoming data.
  4. Pass MergedRecordSet to ColumnSetBuilder to produce the resulting sequence of pages.


The above process relies on the fact that all records in the original ColumnSets and in the incoming data are sorted, and does not materialize intermediate RecordSets in memory (records are streamed), so it can be performed on inputs of arbitrary size.

Data distribution and coordination

A Portico environment consists of one or more clusters. Each cluster contains all subpartitions comprising a dataset distributed across many nodes. Increasing the number of clusters increases redundancy and capacity (query throughput). The number of nodes within each cluster is determined by total heap and mapped memory needed to hold the dataset. Since memory-mapped files are paged from disk by the operating system, memory oversubscription is possible, however we try to keep oversubscription low to avoid excessive paging.

All nodes in an environment know about all other nodes in all clusters, so a single query can be distributed across the whole environment. We use Zookeeper for cluster membership, and consistent hashing (based on the Ketama algorithm) to minimize subpartition moves during rebalancing when nodes join or leave clusters. During rebalancing, nodes keep previously assigned subpartitions, so routing of queries to clusters that are being rebalanced is supported, improving environment availability and capacity.

Partitions that are frequently accessed due to seasonal or geographical access patterns can be dynamically configured with additional replicas to improve performance and availability.

With that, we achieve high availability and consistency of Portico environments, i.e. we guarantee that a successful query execution is performed exactly once against each requested partition/subpartition. Separation (by seller) and atomicity of subpartitions ensures that each listing is also evaluated exactly once.

In addition to cluster nodes, the Portico environment contains “reference” nodes that serve partition mappings used to limit the number of partitions where a query is dispatched (pruning); in our case, mapping from seller ID to all leaf categories where seller is present, as discussed in “Data organization.”

Portico environments run in eBay’s internal OpenStack cloud (Connected Commerce Cloud or C3), and datasets are persisted in OpenStack cloud storage (Swift).


Query execution

We leverage the map/reduce paradigm for query execution. Native Portico queries are written by extending the Query trait parameterized by query result type R:

trait Query[R] {
  def init(ctx: QueryContext, parameters: Option[JSON]): Unit
  def acceptPartitionKey(key: String ...): Boolean
  def execute(page: ColumnSet, key: String, ...): Option[R]
  def merge(result1: R, result2: R): R
  def onSubpartitionResult(result: R, key: String, ...): Option[R]
  def onPartitionResult(result: R, key: String, ...): Option[R]
  def onQueryResult(result: R, key): Option[R]

The init() method binds context and execution parameters to the query. Context provides access to the execution framework, and can be used to lookup locally provided services or to execute subqueries.

The execute() method scans a page and optionally produces a result. Query needs to define how results are combined in the merge() method. The execution framework invokes execute() method on each page for each partition or subpartition requested by the query. The merge() method is called recursively on the produced results, first with all results in a subpartition, then with all subpartition results in a partition, and finally with all partition results. The on...Result() callbacks can be optionally implemented by a query to apply special processing at these milestones during query execution.


We provide two specializations of the Query trait:

  • SchemaQuery, where scanning occurs over a pre-bound schema
  • SqlQuery, supporting ad-hoc SQL execution (using Apache Calcite)

During query execution, the framework collects statistics on how many values were scanned and skipped for each column in each subpartition. These statistics are sent back to the client along with the query result, and can be used for query tuning.

Clients submit queries to random nodes in a Portico environment over HTTP (all nodes are behind an HTTP load balancer). A named SchemaQuery can be submitted either as a precompiled JAR archive or as a Scala source text, in which case, Scala source is compiled into a JAR by the receiving node. JAR archives are distributed and stored on disk across clusters during execution, so subsequent query executions (with different parameters) don’t require JAR/source transfers. To isolate Portico clients, nodes use separate class loaders for each client, and same named query can be resubmitted/recompiled multiple times.

The receiving node computes distribution of subpartitions requested by the query across nodes in the environment and drops subpartitions (pruning by seller ID hash) that are not accepted by the query. If multiple nodes (in different clusters or replicas within cluster) contain the same requested subpartition, a heuristic is applied to balance the expected number of scanned records on each node and the distribution of requests across replicas. Query is then dispatched in parallel to selected nodes via HTTP API described above. Node-local query execution is also performed in parallel with multiple pages scanned concurrently, using all available CPUs. After all local results are merged into a single result, it’s serialized back to the calling node, and then, after merging all node results, back to the client.

Data ingestion

We merge new data into the Portico dataset daily. The new data is coming from Data Warehouse and data science feeds as separate flat CSV files for each entity comprising listing. There is a single “ingest” server responsible for feed preparation. Each feed is broken into fragments by leaf category ID and sorted by the corresponding key fields. Incoming feeds are tracked in a MySQL database, and fragments are stored in Swift. After all fragments are ready, the ingest server triggers the start of merge process in a Portico environment via HTTP API.

Nodes download incoming feed fragments from Swift for subpartitions they are responsible for and merge them into the corresponding page data, as described in “Adding data.” With multiple clusters, the node responsible for merging each subpartition is selected deterministically using hashes of cluster states and subpartition key.

Pages for merged subpartitions are uploaded to Swift, and updated across clusters in the environment via a node-to-node HTTP API with fallback to Swift. The ingest server monitors the overall progress of merging, and once merging is completed, it writes a build manifest to Swift, indicating a complete build. The ingest server also runs a report query to collect statistics (data sizes, record counts, etc.) for each partition in the new build, and persists this information in MySQL. This data is used to analyze sizing trends.

Portico environment can process queries during data ingestion, however the capacity during ingestion is degraded due to CPU time being spent on merging. We perform ingestion in a pre-production Portico environment and distribute new dataset builds into production environments in different availability zones (data centers) via the HTTP API, with fallback to Swift.


Near-real-time data

With several years of historical data, Portico supports a variety of analytics use cases, however, the historical data in Portico is at least one day old, as new data is merged daily. This gap is covered by near-real-time (NRT) data that is ingested from Kafka streams containing new and updated listings, purchases, small-window behavioral aggregates, etc. NRT data also temporarily covers historical data in case of delays in data warehouse extracts or historical data ingestion.

We chose to colocate NRT data for subpartitions with historical data on the same nodes. This helps in combining NRT and historical data for subpartitions in one pass in queries, reducing network trips and result sizes.

Like historical data, NRT data is broken into sequences of pages (page sets), however page size is determined by time window (for example, one hour) instead of the number of records. NRT page sets are grouped by day and dropped as new daily historical data arrives.

Unlike with historical data, we keep different entities separately with flat schemas, so scanning NRT data requires separate passes for each entity. Still, we guarantee that each record key occurs at most once within a page, and all records in a page are ordered. The deduplication and ordering happens in the “current” page that is kept on the heap and where the new data is being appended.


Seller Hub analytics performance

With Seller Hub expanding to the Big 4 markets, by November 2016, daily web traffic exceeded 4 million requests with an average of 400,000 unique visits. 99.47% of these requests achieve sub-second latency and all were ran on-the-fly on a dataset size of over 100 TB, compressed to a size of 3.6 TB and memory-mapped. With over three years of behavioral impression data and over four years of listing data, eBay’s sellers for the first time are able to perform real time analytics on their listing impressions, sales conversion rate, price trends, and those of their competitors.


To achieve this performance, Seller Hub’s queries are optimized to take advantage of several features that were designed for best performance and optimum capacity utilization.

As described in “Data organization,” most of our sellers tend to focus their businesses on a small subset of trending categories such as Fashion, Electronics, and DVDs, and trending categories tend to change with seasonality. These large leaf categories are sub-partitioned by seller ID hash and have most of their traffic balanced across the clusters. Also, from analyzing the access pattern of Seller Hub’s sellers and their geographic time zones, Portico is able to dynamically configure additional replica sets for these trending categories to further increase query performance.

For seller-centric queries, queries make extensive use of Bloom filters and column statistics described in “Page Structure” to prune pages that do not contain listings in the specified date ranges for that seller. For category-level aggregation queries, such as finding last 30- and 365-days markets’ GMV, these results are pre-computed once during the loading of sub-partitions and cached for the day until the next daily build, as described in “Data ingestion.” Lastly, over 80% of the queries are sent using default 30-day time window, caching the query result of this frequently accessed date range in the query cache, once computed, greatly improved the average performance.

Next steps

  • The Portico team is still working on exposing NRT data in Seller Hub.
  • We would like to improve Calcite integration, primarily around projection and filter optimizations.
  • We are following the Apache Arrow project and considering it as a potential foundation for native replacement of codecs and schema API.

Powered by QuickLaTeX.