logo

PySpark SQL

Apache Spark е най-успешният софтуер на Apache Software Foundation и е предназначен за бързи изчисления. Няколко индустрии използват Apache Spark, за да намерят своите решения. PySpark SQL е модул в Spark, който интегрира релационна обработка с API за функционално програмиране на Spark. Можем да извлечем данните, като използваме SQL език за заявки. Можем да използваме заявките като SQL езика.

Ако имате основно разбиране за RDBMS, PySpark SQL ще бъде лесен за използване, където можете да разширите ограничението на традиционната релационна обработка на данни. Spark също поддържа Hive Query Language, но има ограничения на базата данни Hive. Spark SQL е разработен, за да премахне недостатъците на базата данни Hive. Нека да разгледаме следните недостатъци на Hive:

Недостатъци на Hive

  • Не може да възобнови обработката, което означава, че ако изпълнението е неуспешно по средата на работния поток, не можете да възобновите от мястото, където е блокирало.
  • Не можем да изтрием шифрованите бази данни в каскада, когато кошчето е активирано. Това води до грешка при изпълнение. За премахване на такъв тип база данни, потребителите трябва да използват опцията Purge.
  • Специалните заявки се изпълняват с помощта на MapReduce, който се стартира от Hive, но когато анализираме базата данни със среден размер, това забавя производителността.
  • Hive не поддържа операцията за актуализиране или изтриване.
  • Тя е ограничена до поддръжката на подзаявки.

Тези недостатъци са причините да се разработи Apache SQL.

PySpark SQL Кратко въведение

PySpark поддържа интегрирана релационна обработка с функционалното програмиране на Spark. Той осигурява поддръжка за различните източници на данни, за да направи възможно вплитането на SQL заявки с кодови трансформации, като по този начин се получава много мощен инструмент.

PySpark SQL установява връзката между RDD и релационната таблица. Той осигурява много по-тясна интеграция между релационна и процедурна обработка чрез декларативен Dataframe API, който е интегриран с кода на Spark.

Използвайки SQL, той може да бъде лесно достъпен за повече потребители и да подобри оптимизацията за текущите. Той също така поддържа широка гама от източници на данни и алгоритми в Big-data.

Функция на PySpark SQL

Характеристиките на PySpark SQL са дадени по-долу:

10 от 100

1) Достъп до последователни данни

Той осигурява последователен достъп до данни, което означава, че SQL поддържа споделен начин за достъп до различни източници на данни като Hive, Avro, Parquet, JSON и JDBC. Той играе важна роля за настаняването на всички съществуващи потребители в Spark SQL.

2) Включване със Spark

PySpark SQL заявките са интегрирани с програмите на Spark. Можем да използваме заявките вътре в програмите на Spark.

Едно от най-големите му предимства е, че разработчиците не трябва ръчно да управляват отказ на състояние или да поддържат приложението в синхрон с пакетни задания.

3) Стандартна свързаност

Той осигурява връзка чрез JDBC или ODBC и тези два са индустриалните стандарти за свързаност за инструменти за бизнес разузнаване.

4) Дефинирани от потребителя функции

PySpark SQL има комбинирана езикова дефинирана от потребителя функция (UDF). UDF се използва за дефиниране на нова функция, базирана на колони, която разширява речника на DSL на Spark SQL за трансформиране на DataFrame.

5) Съвместимост с Hive

PySpark SQL изпълнява немодифицирани Hive заявки върху текущи данни. Позволява пълна съвместимост с текущите данни на Hive.

PySpark SQL модул

Някои важни класове на Spark SQL и DataFrames са следните:

    pyspark.sql.SparkSession:Той представлява основната входна точка за DataFrame и SQL функционалност.pyspark.sql.DataFrame:Той представлява разпределена колекция от данни, групирани в именувани колони.pyspark.sql.Колона:Той представлява израз на колона в a DataFrame. pyspark.sql.Ред:Той представлява ред от данни в a DataFrame. pyspark.sql.GroupedData:Методи за агрегиране, върнати от DataFrame.groupBy(). pyspark.sql.DataFrameNaFunctions:Той представлява методи за обработка на липсващи данни (нулеви стойности).pyspark.sql.DataFrameStatFunctions:Той представлява методи за статистическа функционалност.pysark.sql.functions:Представлява списък с вградени функции, достъпни за DataFrame. pyspark.sql.types:Той представлява списък от налични типове данни.pyspark.sql.Прозорец:Използва се за работа с функции на Window.

Разгледайте следния пример на PySpark SQL.

сортиране на селекция java
 import findspark findspark.init() import pyspark # only run after findspark.init() from pyspark.sql import SparkSession spark = SparkSession.builder.getOrCreate() df = spark.sql('''select 'spark' as hello ''') df.show() 

Изход:

 +-----+ |hello| +-----+ |spark| +-----+ 

Обяснение на кода:

В горния код ние импортирахме findspark модул и т.нар findspark.init() конструктор; след това импортирахме модула SparkSession, за да създадем spark сесия.

от pyspark.sql импортирайте SparkSession

Spark сесия може да се използва за създаване на Dataset и DataFrame API. SparkSession може също да се използва за създаване на DataFrame, регистриране на DataFrame като таблица, изпълнение на SQL над таблици, кеширане на таблица и четене на паркетен файл.

създател на класове

Той е създател на Spark Session.

getOrCreate()

Използва се за получаване на съществуващ SparkSession, или ако няма съществуващ, създайте нов въз основа на опциите, зададени в конструктора.

Малко други методи

Няколко метода на PySpark SQL са следните:

1. име на приложение (име)

Използва се за задаване на името на приложението, което ще се показва в уеб интерфейса на Spark. Параметърът име приема името на параметъра.

2. config(key=None, value = None, conf = None)

Използва се за задаване на опция за конфигурация. Опциите, зададени чрез този метод, се разпространяват автоматично и към двете SparkConf и SparkSession конфигурация на.

 from pyspark.conf import SparkConfSparkSession.builder.config(conf=SparkConf()) 

Параметри:

    ключ-Низ от име на ключ на свойство на конфигурация.стойност-Той представлява стойността на свойство на конфигурация.conf -Екземпляр на SparkConf.

3. майстор (майстор)

Той задава главния url адрес на spark, към който да се свържете, като например „local“ за локално изпълнение, „local[4]“ за локално изпълнение с 4 ядра.

Параметри:

    господар:url за spark master.

4. SparkSession.каталог

Това е интерфейс, който потребителят може да създава, пуска, променя или прави заявки към основната база данни, таблици, функции и т.н.

5. SparkSession.conf

трета нормална форма

Това е интерфейс за конфигурация по време на изпълнение за spark. Това е интерфейсът, чрез който потребителят може да получи и зададе всички конфигурации на Spark и Hadoop, които са подходящи за Spark SQL.

клас pyspark.sql.DataFrame

Това е разпределена колекция от данни, групирани в именувани колони. DataFrame е подобна на релационната таблица в Spark SQL, може да бъде създадена с помощта на различни функции в SQLContext.

 student = sqlContext.read.csv('...') 

След създаването на рамка с данни можем да я манипулираме с помощта на няколко специфични за домейна езика (DSL), които са предварително дефинирани функции на DataFrame. Помислете за следния пример.

 # To create DataFrame using SQLContext student = sqlContext.read.parquet('...') department = sqlContext.read.parquet('...') student.filter(marks > 55).join(department, student.student_Id == department.id)  .groupBy(student.name, 'gender').({'name': 'student_Id', 'mark': 'department'}) 

Нека разгледаме следния пример:

Запитване с помощта на Spark SQL

В следния код първо създаваме DataFrame и изпълняваме SQL заявките, за да извлечем данните. Разгледайте следния код:

 from pyspark.sql import * #Create DataFrame songdf = spark.read.csv(r'C:UsersDEVANSH SHARMA	op50.csv', inferSchema = True, header = True) #Perform SQL queries songdf.select('Genre').show() songdf.filter(songdf['Genre']=='pop').show() 

Изход:

 +----------------+ | Genre| +----------------+ | canadian pop| | reggaeton flow| | dance pop| | pop| | dfw rap| | pop| | trap music| | pop| | country rap| | electropop| | reggaeton| | dance pop| | pop| | panamanian pop| |canadian hip hop| | dance pop| | latin| | dfw rap| |canadian hip hop| | escape room| +----------------+ only showing top 20 rows +---+--------------------+-------------+-----+----------------+------+------------+--------------+--------+--------+-------+--------------+------------+----------+ |_c0| Track.Name| Artist.Name|Genre|Beats.Per.Minute|Energy|Danceability|Loudness..dB..|Liveness|Valence.|Length.|Acousticness..|Speechiness.|Popularity| +---+--------------------+-------------+-----+----------------+------+------------+--------------+--------+--------+-------+--------------+------------+----------+ | 4|Beautiful People ...| Ed Sheeran| pop| 93| 65| 64| -8| 8| 55| 198| 12| 19| 86| | 6|I Don't Care (wit...| Ed Sheeran| pop| 102| 68| 80| -5| 9| 84| 220| 9| 4| 84| | 8| How Do You Sleep?| Sam Smith| pop| 111| 68| 48| -5| 8| 35| 202| 15| 9| 90| | 13| Someone You Loved|Lewis Capaldi| pop| 110| 41| 50| -6| 11| 45| 182| 75| 3| 88| | 38|Antisocial (with ...| Ed Sheeran| pop| 152| 82| 72| -5| 36| 91| 162| 13| 5| 87| | 44| Talk| Khalid| pop| 136| 40| 90| -9| 6| 35| 198| 5| 13| 84| | 50|Cross Me (feat. C...| Ed Sheeran| pop| 95| 79| 75| -6| 7| 61| 206| 21| 12| 82| +---+--------------------+-------------+-----+----------------+------+------------+--------------+--------+--------+-------+--------------+------------+----------+ 

Използване на функцията groupBy().

Функцията groupBy() събира подобни данни за категория.

 songdf.groupBy('Genre').count().show() 

Изход:

 +----------------+-----+ | Genre|count| +----------------+-----+ | boy band| 1| | electropop| 2| | pop| 7| | brostep| 2| | big room| 1| | pop house| 1| | australian pop| 1| | edm| 3| | r&b en espanol| 1| | dance pop| 8| | reggaeton| 2| | canadian pop| 2| | trap music| 1| | escape room| 1| | reggaeton flow| 2| | panamanian pop| 2| | atl hip hop| 1| | country rap| 2| |canadian hip hop| 3| | dfw rap| 2| +----------------+-----+ 

разпределение (брой дялове, *колове)

The разпространение () връща нов DataFrame, който е израз за разделяне. Тази функция приема два параметъра numpartitions и *кол. The numpartitions параметърът указва целевия брой колони.

 song_spotify.repartition(10).rdd.getNumPartitions() data = song_spotify.union(song_spotify).repartition('Energy') data.show(5) 

Изход:

 +---+--------------------+-----------+-------+----------------+------+------------+--------------+--------+--------+-------+--------------+------------+----------+ |_c0| Track.Name|Artist.Name| Genre|Beats.Per.Minute|Energy|Danceability|Loudness..dB..|Liveness|Valence.|Length.|Acousticness..|Speechiness.|Popularity| +---+--------------------+-----------+-------+----------------+------+------------+--------------+--------+--------+-------+--------------+------------+----------+ | 4|Beautiful People ...| Ed Sheeran| pop| 93| 65| 64| -8| 8| 55| 198| 12| 19| 86| | 5|Goodbyes (Feat. Y...|Post Malone|dfw rap| 150| 65| 58| -4| 11| 18| 175| 45| 7| 94| | 17| LA CANCI?N| J Balvin| latin| 176| 65| 75| -6| 11| 43| 243| 15| 32| 90| | 4|Beautiful People ...| Ed Sheeran| pop| 93| 65| 64| -8| 8| 55| 198| 12| 19| 86| | 5|Goodbyes (Feat. Y...|Post Malone|dfw rap| 150| 65| 58| -4| 11| 18| 175| 45| 7| 94| +---+--------------------+-----------+-------+----------------+------+------------+--------------+--------+--------+-------+--------------+------------+----------+ only showing top 5 rows