Developers Club geek daily blog

1 year, 8 months ago
image
All hi.
On January 4 there was a new version of introduction before experience of use in projects. Spark works at the majority of operating systems and it can be started in the local mode even on the normal notebook. Using simplicity of the Spark setup in this case a sin not to use the main to functions. In this article we will look as on the notebook quickly to configure processing of the big file (more random access memory of the computer) by means of normal SQL queries. It will allow to make requests even to the unprepared user. Additional connection of iPython (Jupyter) notebook will allow to make full reports. In article the simple example of processing of the file is sorted, other examples on Python are here.

Input data: the file(s) on several GB with the arranged data, the notebook from free RAM < 1GB. Необходимо получать различные аналитические данные с помощью SQL- или подобных несложных запросов к файлам. Разберем пример, когда в файлах лежит статистика поисковых запросов за месяц (данные на скриншотах показаны для примера и не соответствуют действительности):
image

It is necessary to receive distribution of number of words in search query for requests of a certain subject. For example, containing the word "real estate". I.e. in this example simply we filter search queries, we consider the number of words in each request, we group in the number of words and we build distribution:
image

The Spark installation in the local mode is almost identical to basic operation systems and is reduced to actions:
1. We download Spark (this example works for version 1.6) and we will extract in any folder.

2. The Java installation (if is not present)
— for Windows and MAC it is downloaded and we set the 7th version with java.com
— for Linux: $ sudo apt-get update and $ sudo apt-get install openjdk-7-jdk + can be necessary to add the JAVA installation address to .bashrc: JAVA_HOME=/usr/lib/jvm/java-7-openjdk-i386"
If there is no Python, then it is possible just to set Anaconda.

We start pySpark (it is possible to start spark-shell for work in Scala as in native language): we come into the unpacked Spark archive and in the bin folder we start pyspark (an example: spark.apache.org/docs/latest/quick-start.html). At successful start we receive:
image

It was necessary "to prepare" our file for SQL queries (in the Spark 1.6 version for some types of files it is possible to do directly SQL queries without creation of the table). I.e. we will create DataFrame (at DataFrame too a heap of useful functions) and from it — the table for SQL queries:
1. We load the necessary libraries
>>> from pyspark.sql import SQLContext, Row
>>> sqlContext = SQLContext(sc)


2. We get the text variable as the source file for processing and we watch that in the first line:
>>> text = sc.textFile('путь к файлу')
>>> text.first()
u'2015-09-01\tu'день знаний'\t101753'


In our file lines are separated by tabulation. For correct separation on columns we use the Map and Split functions, using tabulation as a divider: map (lambda l: l.split ('\t')). Let's select from result of splitting the necessary columns. For this task we will need to know the number of words in a certain search query. Therefore we will take only request (query column) and the number of words in it (wc column): map (lambda l: Row (query=l[1], wc=len (l[1].split ('')))).

It is possible to take all columns of the table further to do any SQL queries to it:
map (lambda l: Row (date=l[0], query=l[1], stat=l[2], wc=len (l[1].split (''))))

Let's perform these operations in one line
>>> schema = text.map(lambda l: l.split('\t')).map(lambda l: Row(query=l[1], wc=len(l[1].split(' '))))


It was necessary to transfer schema to DataFrame with which it is possible to make many useful operations of processing (examples of spark.apache.org/docs/latest/sql-programming-guide.html#dataframe-operations):
>>> df = sqlContext.createDataFrame(schema)
>>> df.show()
+--------------------+---+
| query| wc|
+--------------------+---+
|день знаний...| 2|
| сбербанк онлайн| 2|
|эхо москвы слушать| 3|
...


3. Let's transfer DataFrame to the table to do SQL queries:
 >>> df.registerTempTable('queryTable')


4. We make the SQL query for all file and we unload result in the output variable:
>>> output = sqlContext.sql('SELECT wc, COUNT(*) FROM queryTable GROUP BY wc').collect()


For the file in 2GB at free RAM in 700MB such request took 9 minutes. It is possible to see the course of processing of process in a line of a type (… from 53):
INFO TaskSetManager: Finished task 1.0 in stage 8.0 (TID 61) in 11244 ms on localhost (1/53)

We can add additional restrictions:
>>> outputRealty = sqlContext.sql('SELECT wc, COUNT(*) FROM queryTable WHERE query like "%недвижимость%" GROUP BY wc').collect()


It was necessary to draw the histogram on this distribution. For example, it is possible to write result of output in the output.txt file and to draw distribution simply in Excel:

>>> with open('output.txt', 'w') as f:
...         f.write('wc \t count \n')
...         for line in output:
...             f.write(str(line[0]) + '\t' + str(line[1]) + '\n')

This article is a translation of the original post at habrahabr.ru/post/274705/
If you have any questions regarding the material covered in the article above, please, contact the original author of the post.
If you have any complaints about this article or you want this article to be deleted, please, drop an email here: sysmagazine.com@gmail.com.

We believe that the knowledge, which is available at the most popular Russian IT blog habrahabr.ru, should be accessed by everyone, even though it is poorly translated.
Shared knowledge makes the world better.
Best wishes.

comments powered by Disqus