Developers Club geek daily blog

3 years, 3 months ago
Note of the translator: On Habré and in ours told you about the company developer of technologies for online video broadcasts of BAM, and today we present to yours adapted translation of a note how the technical command of the strimingovy Spotify service was engaged in scaling of Apache Storm.

As Spotify scales Apache Storm

We in Spotify created several pipelines of tasks for work with analytics in real time which are used in targeted advertizing, service of the recommendation of music and data visualization. Each of these pipelines working in real time uses Apache Storm connected with different systems, for example, Kafka, Cassandra, Zookeeper, and also other sources and drains. At an application creation which number of active users exceeds 50 million worldwide it is necessary to remember constantly scalability to provide high availability and good system performance.

What is scalability

Scalability is a capability of the software to support performance up to standard in the conditions of the increasing operational load by means of simple adding of resources. But to receive such capability, it is necessary to make slightly more, than just to increase resources and to configure performance. At software development it is necessary to take such moments as quality, ease of service and operational indicators into account. When we create the application, we lay down the following conditions:

Important conditions of scalability

  • The software has to be high-quality and have reliable architecture;
  • The software has to be designed so that it could be let out easily, to accompany easily and to debug easily;
  • To master additional loading, software performance needs to be increased by linear adding of resources.

Scalability in Storm

What is included by pipelines when scaling in Storm? I will in detail consider it on the example of our pipeline of personalisation working in real time, and we will sort different aspects of scalability.

As Spotify scales Apache Storm

In our pipeline of personalisation there is Kafka cluster with topics for different types of events, for example, for the end of reproduction of a song or demonstration of advertizing. Our topology is signed by Storm for different user events, supplying them with metadata (for example, a song genre) which are read out from Cassandra, then groups their [events] in users and calculates custom attributes by means of combinations of algorithms of aggregation and a derivation. After these attributes of the user register in Cassandra which, in turn, is caused by the separate backend-services personalizing the user experience.

Design and quality

Expanding functionality of our pipeline of personalisation, we noticed that from the point of view of setup of performance and debugging of an event stream, topology begin to look very difficult. However tests showed rather big efficiency of our code therefore we quickly carried out refactoring to simplify topology.

Architecture of topology

Having passed through all cycle of transformation of one difficult topology in slightly small and simple, we learned for ourselves several lessons:

  • Create small logically clear topology for different flows of works;
  • Advance repeated use of a code by means of the general libraries, but not the general topology;
  • Be convinced that methods are easily tested;
  • Parallelize slow operations of input-output, use batch processing.


We developed our pipelines on Java, and for check of business logic in different computing "bolts" (processors of trains) used JUnit. We carried out continuous tests in the course of simulation of work of a cluster, using dependence of backtype.storm.testing.

Convenience of service

We made several modifications to increase convenience of service, that is that it was possible to unroll with ease software on new hosts of our cluster and to watch over their "health".


We took out settings of parallelism of "bolts", parameters of end points of a drain and source and topology performance parameters in the config file, thereby having had an opportunity to change settings, without changing at the same time a code. Now it became simple to do little incremental changes and to watch how it affects system operation.

Visualization of metrics

We have an opportunity to browse topology metrics in the form of diagrams which reflect "health" of system and help to reveal problems. We were convinced that we monitor the most high-level metrics (on the picture below) generalizing system status differently it is possible to get confused easily in a large number of diagrams when it becomes unclear what of characteristics is important.

As Spotify scales Apache Storm

Expansion of topology

As Spotify scales Apache Storm

All calculations in our pipeline of personalisation are independent, and in our strategy of expansion we use event handling duplication that in the course of transition to the new version not to lose any message. Such approach works not in all cases, for example, it will not be suitable for transactional calculations. On the chart above stages of expansion are placed in a chronological order from left to right, and t1 … represent to t8 different time spans.

Our strategy of expansion demands that the cluster of Storm had sufficient performance for simultaneous management of two working personalisation topology. In t1 timepoint the cluster works with the version of topology v1. When the v2 version is ready, we start it in a cluster. In t4 timepoint the cluster works with two versions at the same time. Each topology uses a unique identifier of groipId in Kafka to guarantee delivery of all messages to both versions. At this moment all messages are processed twice, but it is not terrible as calculations of an idempotentna. In t5 timepoint we disconnect v1, and it ceases to receive messages from Kafka. After we monitor change of metrics of v2 and we watch, whether all as it should be. If all is good, then we completely "kill" the v1 version, and in t8 timepoint the cluster functions only on the v2 version. However if in t7 timepoint we notice any problem on diagrams, then we return v1 in work. Here we "kill" the v2 version, thus, returning to starting point. Having an opportunity it is safe to return to an initial status, we can often roll away little changes and minimize risks.

Monitoring and warnings of errors

We monitor indicators of a cluster, topology, sources, drains, and the system warns us about violations only in high-level metrics, otherwise it is possible to be tired of excessively large number of warnings quickly.


Over time we study problem places, errors of system and we configure some elements to reach desirable performance. Receipt of worthy performance requires worthy hardware.


Right at the beginning we started our topology in the general cluster of Storm, but faced resource starvation because of their load soon. In this regard we started an independent cluster of Storm that was quite simple. Our present cluster processes more than 3 billion events a day and has 6 hosts with 24 kernels (2 flows on a kernel) and 32 Gbytes of memory. Even we will be able to work with such small cluster for a long time as even close did not approach a threshold of use of resources, despite two versions of topology of personalisation which are at the same time working in the course of development. In the future we want to start Storm on YARN on our cluster of Hadoop more rationally to use resources and to achieve great opportunities for scaling.

Flow capacity and latency

To receive the desirable flow capacity and latency, we need to configure settings of a source and drain. We also configured a caching, parallel processing and parallel access. All this is in detail described below.

Setup of a source and drain

Kafka setup

  • We configured rebalancing.max.tries, to minimize errors of balancing of loading in Kafka;
  • We use different group id for each KafkaSpout in each version to provide redundancy of message handling during transitions to new versions of topology.

Cassandra setup

  • We use different tables for different TTL. Also exposed gc_grace_period = 0 to turn off reading with recovery for records with TTL as they are not necessary to us;
  • We used strategy of consolidation of the tables connected date for new data;
  • We control number of open connections between topology of Storm and Cassandra;
  • We configured Snitch to provide proper routing of requests.

Problems with parallelism

OutputCollector in Storm is not potokobezopasny, that is it cannot be used safely at the same time by several flows, such as the callback-functions using Future-objects for asynchronous processing. To secure requests for creation/confirmation of processing of trains of Storm and their transfer to the processor, we use java.util.concurrent.ConcurrentLinkedQueue.

Parallelism setup

At setup of parallelism of our topology we derived inspiration from the presentation of Storm from the Strata 2014 conference. We followed here such principle which perfectly worked:

  • 1 worker on a topology node;
  • 1 executor on a kernel for processing of problems of the CPU;
  • 1-10 executor'ov on a kernel for processing of problems of input-output;
  • Look what tasks can be parallelized, and distribute resources between tasks as it is possible more carefully parallelizing the slowest of them (fast parallelize to a lesser extent).

Caching for processors ("bolts")

To provide calculation of attributes of users by processors, we had to select between an external and internal caching of memory. We preferred an internal caching as the external caching demands carrying out network operations of input-output that leads to increase of latency and adds one more point of failure. However the internal cache not persistent also has the delimited memory size. The question of persistence disturbed us a little, but over restrictions of memory it was necessary to work. We stopped the choice on the cache with a certain storage time implemented by means of Guava. To control its size, we can set quantity of the stored elements and time of their storage. It gave us the chance to optimum use memory in our cluster.

The integrated approach to scaling helped us to add new features for our constantly increasing active user base, having at the same time saved high availability of the pipelines Storm.

This article is a translation of the original post at
If you have any questions regarding the material covered in the article above, please, contact the original author of the post.
If you have any complaints about this article or you want this article to be deleted, please, drop an email here:

We believe that the knowledge, which is available at the most popular Russian IT blog, should be accessed by everyone, even though it is poorly translated.
Shared knowledge makes the world better.
Best wishes.

comments powered by Disqus