Developers Club geek daily blog

1 year, 10 months ago
The field of activity of our company extends far beyond game development. In parallel with it we keep tens of internal projects, and Player Relationship Management Platform (PRMP) – one of the most ambitious.

Player Relationship Management Platform (PRMP) – special system which by means of the analysis of data bulks in real time allows to personify interaction with the player through the recommendations arriving to the user proceeding from a context of its last play experience.

PRMP allows our players to derive more pleasure from game, improves their user experience, and also relieves of viewing unnecessary advertizing and promo-messages.

Architecture of PRMP
Player Relationship Management Platform in Wargaming: collecting and data analysis

Player Relationship Management Platform can be separated into several components conditionally: RAW Data Collection, WG HUB and Business Rule Engine. Their architecture can be seen on the scheme.
In this article we will tell about adapters for collecting and data analysis, and in the following publications we will in detail consider other components of system.


Data collection is conducted by means of the common bus as which Kafka is used. All subsystems of World of Tanks Blitz are written in real time a log of the set format in the bus. For subsystems which owing to technical restrictions cannot make it we wrote the adapters collecting and redirecting a log to Kafka. In particular, our stack contains adapters for MySQL, PSQL, RabbitMQ, and also the adapter for loading of contemporary records from DWH, through Hive the JDBC interface. Each of them exports metrics about processing rate and lag from a source in JMX where for data visualization Grafana is used, and for the notification about problems — Zabbix. All adapters are developed as standalone Java application on Java 8 and Scala.

The adapter for MySQL, PSQL
Tungsten replicator to which the producer is written to Kafka is taken as a basis. We use replication as it is a reliable method of data acquisition without additional load of the DB server of data source.

The current pipeline in Tungsten looks as follows:

replicator.pipelines=slave
replicator.pipeline.slave=d-binlog-to-q,q-to-kafka
replicator.pipeline.slave.stores=parallel-queue
replicator.pipeline.slave.services=datasource
replicator.pipeline.slave.syncTHLWithExtractor=false

replicator.stage.d-binlog-to-q=com.continuent.tungsten.replicator.pipeline.SingleThreadStageTask
replicator.stage.d-binlog-to-q.extractor=dbms
replicator.stage.d-binlog-to-q.applier=parallel-q-applier
replicator.stage.d-binlog-to-q.filters=replicate,colnames,schemachange
replicator.stage.d-binlog-to-q.blockCommitRowCount=$ {replicator.global.buffer.size}

replicator.stage.q-to-kafka=com.continuent.tungsten.replicator.pipeline.SingleThreadStageTask
replicator.stage.q-to-kafka.extractor=parallel-q-extractor
replicator.stage.q-to-kafka.applier=asynckafka
replicator.stage.q-to-kafka.taskCount= of $ {replicator.global.apply.channels}
replicator.stage.q-to-kafka.blockCommitRowCount=$ {replicator.global.buffer.size}


where the asynckafka module is written by us.

Asynckafka obtains data from the previous stage and writes in Kafka. The last written offset remains in zookeeper, it always is together with Kafka. As the tungsten option can save data in the file or MySQL, but it is not really reliable in case of loss of a host with the adapter. In our case, at a kresha the module reads offset and processing of binlog continues from the last value saved in Kafka.

Record in Kafka

override def commit(): Unit = {
  try {
    import scala.collection.JavaConversions._
    val msgs : java.util.concurrent.ConcurrentLinkedQueue[(String,String,String,Option[Callback])] = new java.util.concurrent.ConcurrentLinkedQueue[(String,String,String,Option[Callback])]()
    data.foreach(e => {
      msgs.addAll(ruleProcessor.get.processToMsg(e._1, e._2).map(e => (e._1, e._2, e._3, None)))
    })
    kafkaSender.get.send(msgs.toSeq:_*)
  } catch {
    case kpe:
      KafkaProducerException => {
      logger.error(kpe.getMessage, kpe)
      throw new ReplicatorException(kpe);
    }
  }
 
  lastHeader.map(saveLastHeader(_))
  resetEventsToSend()
 
}
 

Saving of offset

def saveLastHeader(header: ReplDBMSHeader): Unit = {
  zkCurator.map {
    zk =>
      try {
        val dhd = DbmsHeaderData(
          header.getSeqno,
          header.getFragno,
          header.getLastFrag,
          header.getSourceId,
          header.getEpochNumber,
          header.getEventId,
          header.getShardId,
          header.getExtractedTstamp.getTime,
          header.getAppliedLatency,
          if (null == header.getUpdateTstamp) {
            0
          } else {
            header.getUpdateTstamp.getTime
          },
          if (null == header.getTaskId) {
            0
          } else {
            header.getTaskId
          })
        logger.info("{}", writePretty(dhd))
        zk.setData().forPath(getZkDirectoryPath(context), writePretty(dhd).getBytes("utf8"))
      } catch {

        case t: Throwable => logger.error("error while safe last header to zk", t)
      }
  }
}


Recovery of offset

override def getLastEvent: ReplDBMSHeader = {
  lastHeader.getOrElse {
    var result = new ReplDBMSHeaderData(0, 0, false, "", 0, "", "", new Timestamp(System.currentTimeMillis()), 0)
    zkCurator.map {
      zk =>
        try {
          val json = new String(zk.getData().forPath(getZkDirectoryPath(context)), "utf8")
          logger.info("found previous header {}", json)
          val headerDto = read[DbmsHeaderData](json)
          result = new ReplDBMSHeaderData(headerDto.seqno, headerDto.fragno, headerDto.lastFrag, headerDto.sourceId, headerDto.epochNumber, headerDto.eventId, headerDto.shardId, new Timestamp(headerDto.extractedTstamp), headerDto.appliedLatency, new Timestamp(headerDto.updateTstamp), headerDto.taskId)
        } catch {

          case t: Throwable => logger.error("error while safe last header to zk", t)
        }
    }
    result
  }
}


The adapter for RabbitMQ
Rather simple adapter which shifts data from one queue in another. Records on one are transferred to Kafka then acknowledge is carried out to RabbitMQ. Service it is guaranteed delivers the message at least once, the deduplication occurs on the party of data handling.
    RabbitMQConsumerCallback callback = new RabbitMQConsumerCallback() {
          @Override
          public void apply(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) { // callback-функция при получении данных от RabbitMQ
 
              String routingKey = envelope.getRoutingKey();
 
              Tuple3<String, String, String> routingExpr = routingExprMap.get(routingKey); // Получение topic и ключ партиционирования Kafka по конфигу в зависимости от входящего routingKey
              if (routingExpr == null)
                  throw new RuntimeException("No mapping for routing key " + routingKey);
 
              String expr = routingExpr._1(),
                      topic = Objects.firstNonNull(routingExpr._2(), kafkaProducerMainTopic),
                      sourceDoc = routingExpr._3();
 
              Object data = rabbitMQConsumerSerializer.deserialize(body); // десериализация входящего сообщения, десериализатор указан в конфиге
              RabbitMQMessageEnvelope msgEnvelope = new RabbitMQMessageEnvelope(envelope, properties, data, sourceDoc); //создание исходящего сообщения в соответствии с установленным форматом
 
              byte[] key = getValueByExpression(data, expr).getBytes();
              byte[] msg = kafkaProducerSerializer.serialize(msgEnvelope);
 
              kafkaProducer.addMsg(topic, key, msg, envelope.getDeliveryTag()); // отсылка сообщения в Kafka
 
              try {
                  checkForSendBatch();
              } catch (IOException e) {
                  this.errBack(e);
              }
          }
 
          @Override
          public void errBack(Exception e) {
              logger.error("{}", e.fillInStackTrace());
              close();
          }



The adapter for DWH
When it is necessary to process historical data, we address in DWH. The storage is constructed on the Hadoop technologies therefore for data acquisition we use Hive or Impala. That the interface of loading was more universal, we implemented it through JDBC. The main problem of work with DWH is that data in it are normalized, and for collecting of the document entirely, it is necessary to integrate several tables.

That we have on an input:
• data of necessary tables of a partitsionirovana on date
• the period for which we want to load data is known
• the key of grouping of the document for each table is known.

To group tables:
• we use Spark SQL Data Frame
• we integrate a cycle on dates from the set range
• several DataFrame is integrated on a grouping key in one document and we write in Kafka with use of Spark.

Example of the Datasource setup by means of file property.
hdfs_kafka.dataframe.df1.uri="jdbc:hive2://[HiveUri]:10000/test;user=hdfs" // jbdc uri

hdfs_kafka.dataframe.df1.sql=select * from test.log_arenas_p1_v1 where dt='%s' hdfs_kafka.dataframe.df1.keyField=arena_id // SQL-выражение про ‘%s’ плейсхолдер

hdfs_kafka.dataframe.df1.outKeyField=arena_id // указывает, по какому полю из датафрейма достаётся ключ.

hdfs_kafka.dataframe.df1.tableName=test.log_arenas_p1_v
hdfs_kafka.dataframe.df2.uri="jdbc:hive2://[HiveUri]:10000/test;user=hdfs"

hdfs_kafka.dataframe.df2.sql=select * from test.log_arenas_members where dt='%s' hdfs_kafka.dataframe.df2.keyField=arena_id

hdfs_kafka.dataframe.df2.outKeyField=arena_id  // поле, которое является ключом для записи в Kafka

hdfs_kafka.dataframe.df2.tableName=test.log_arenas_members_p1_v  // имя таблицы, идёт в тело сообщения


In this example we build two DataFrame.

The application considers the number of days between the specified dates and executes a cycle of a configuration file:
hdfs_kafka.from=2015-06-25
hdfs_kafka.to=2015-06-26

val dates = Utils.getRange(configuration.dateFormat, configuration.from, configuration.to) // Получить список дат, для которых выполнять sql выражения из настройки датафреймов

dates.map( date => { // Основной цикл приложения

val dataFrames = configuration.dataframes.map( dfconf => {
     val df = executeJdbc(sqlContext, Utils.makeQuery(dfconf.sql, date), dfconf.uri)
     (dfconf, df)
})
val keysExtracted = dataFrames.map( e => { // Построение массива DataFrame

     dataFrameProcessor.extractKey(e._2.rdd, e._1.keyField, e._1.tableName)
})      //Метод для получения RDD[Key, Row] используя keyBy по полю keyField в настройке

val grouped = keysExtracted.reduce(_.union(_)).map( e => (e._1, Seq(e._2))) // Объединение всех dataFrame в один

grouped.reduceByKey(_ ++ _) // Группировка Row по ключу

dataFrameProcessor.applySeq(grouped) 
}) // Обработка и отправка сообщений



How processing of collected information, and also other PRMP components is carried out, we will tell in the following post. If you have any questions of the described technologies – surely set them in comments.

This article is a translation of the original post at habrahabr.ru/post/273607/
If you have any questions regarding the material covered in the article above, please, contact the original author of the post.
If you have any complaints about this article or you want this article to be deleted, please, drop an email here: sysmagazine.com@gmail.com.

We believe that the knowledge, which is available at the most popular Russian IT blog habrahabr.ru, should be accessed by everyone, even though it is poorly translated.
Shared knowledge makes the world better.
Best wishes.

comments powered by Disqus