Практика SPARK

Содержание

Слайд 2

DataFrame

DataFrame

Слайд 3

DATAFRAME

DataFrame – распределенный набор данных сгруппированный по именованным столбцам.
DataFrame – объект SPARK,

DATAFRAME DataFrame – распределенный набор данных сгруппированный по именованным столбцам. DataFrame –
позволяющий работать с набором данных в табличном представлении.

Полезные ссылки:
pyspark.sql module — PySpark 2.4.8 documentation (apache.org)
https://sparkbyexamples.com/

Слайд 4

СОЗДАНИЕ

emp = [(1,"Smith",-1,"2018","10","M",3000), \
(2,"Rose",1,"2010","20","M",4000), \
(3,"Williams",1,"2010","10","M",1000), \
(4,"Jones",2,"2005","10","F",2000), \
(5,"Brown",2,"2010","40","",-1), \

СОЗДАНИЕ emp = [(1,"Smith",-1,"2018","10","M",3000), \ (2,"Rose",1,"2010","20","M",4000), \ (3,"Williams",1,"2010","10","M",1000), \ (4,"Jones",2,"2005","10","F",2000), \ (5,"Brown",2,"2010","40","",-1),
(6,"Brown",2,"2010","50","",-1) \
]
empColumns = ["emp_id","name","superior_emp_id","year_joined", \
"emp_dept_id","gender","salary"]
empDF = spark.createDataFrame(data=emp, schema = empColumns)
empDF.printSchema()
empDF.show(3)

Слайд 5

СОЗДАНИЕ
df_customers=spark.read.csv('/tmp/spark_practics/data/customers', header=True, sep=',')
df_billings=spark.read.csv('/tmp/spark_practics/data/billings', header=True, sep=',')
df_customers.printSchema()
df_customers.show(3)
df_billings.printSchema()
df_billings.show(3)

СОЗДАНИЕ df_customers=spark.read.csv('/tmp/spark_practics/data/customers', header=True, sep=',') df_billings=spark.read.csv('/tmp/spark_practics/data/billings', header=True, sep=',') df_customers.printSchema() df_customers.show(3) df_billings.printSchema() df_billings.show(3)

Слайд 6

ОБРАЩЕНИЕ К СТОЛБЦАМ

dataframe_name.column_name
df_customers = df_customers.withColumnRenamed("customer_id", "id")
df_billings.filter(df_billings.amount < 5000).join(df_customers, df_customers.id == df_billings.customer_id) \
.groupBy(df_customers.country).agg({"amount":

ОБРАЩЕНИЕ К СТОЛБЦАМ dataframe_name.column_name df_customers = df_customers.withColumnRenamed("customer_id", "id") df_billings.filter(df_billings.amount .groupBy(df_customers.country).agg({"amount": "avg", "balance":
"avg", "balance": "max", "customer_id": "count"}).show(3)
2) “column_name”
df_billings.filter("amount < 5000").where("balance > 0").show(3)
3) col(“column_name”)
from pyspark.sql import functions as f
df_billings.filter(f.col("amount") < 5000).where(f.col("balance") > 0).show(3)

Слайд 7

ОПЕРАЦИИ C DATAFRAME

Выбор столбцов
df_billings.select('customer_id', 'balance').show(3)
2) Переименование столбцов
df_billings.withColumnRenamed('customer_id', 'id').show(3)
3) Добавление новых столбцов
df_billings.withColumn("cat", f.col('balance')+500).show(3)
4)

ОПЕРАЦИИ C DATAFRAME Выбор столбцов df_billings.select('customer_id', 'balance').show(3) 2) Переименование столбцов df_billings.withColumnRenamed('customer_id', 'id').show(3)
Агрегатные ф-ии
df_customers = df_customers.withColumnRenamed("customer_id", "id")
df_billings.filter(df_billings.amount < 5000).join(df_customers, df_customers.id == df_billings.customer_id) \
.groupBy(df_customers.country)\
.agg(f.avg("amount").alias("avg_am"),\
f.max("balance").alias("max_bal"),\
#fergergetgrtg
f.count("customer_id").alias("c"))\
.where(f.col("c")>7000).show(3)

Слайд 8

SPARK SQL
df_customers.createOrReplaceTempView('cust')
df = spark.sql('select country, min(id), count(id) from cust group by country

SPARK SQL df_customers.createOrReplaceTempView('cust') df = spark.sql('select country, min(id), count(id) from cust group
having count(id)<1450')
df.show(3)

Слайд 9

Создание DataFrame из данных разных форматов

Создание DataFrame из данных разных форматов

Слайд 10

CSV

DF=spark.read.csv('hdfs://cluster/user/hdfs/test/example.csv‘, schema=None, sep = '\t',
encoding=‘UTF-8’, escape=‘\’, comment=‘--’, header=‘True’, inferSchema=‘False’,
ignoreLeadingWhiteSpace=‘True’, ignoreTrailingWhiteSpace=‘True’,

CSV DF=spark.read.csv('hdfs://cluster/user/hdfs/test/example.csv‘, schema=None, sep = '\t', encoding=‘UTF-8’, escape=‘\’, comment=‘--’, header=‘True’, inferSchema=‘False’, ignoreLeadingWhiteSpace=‘True’,
nullValue=‘Null’,
nanValue=‘Nan’, positiveInf=‘-oo’, negativeInf=‘+oo’, dateFormat=‘yyyy-MM-dd HH:mm:ss’,
timestampFormat=‘yyyy-MM-dd'T'HH:mm:ss.SSSZZ’, maxColumns= 100, maxCharsPerColumn=1000,
maxMalformedLogPerPartition=1, mode=‘PERMISSIVE’)

Слайд 11

CSV

Параметры:
path – строка или список строк, указывающие, где лежат файлы.
schema – опционально,

CSV Параметры: path – строка или список строк, указывающие, где лежат файлы.
схема которая описывает данные в файле.
esep – символ, который описывает разделитель в файле. По умолчанию, ‘,’.
encoding – указывает из какой кодировки надо декодировать файл. По умолчанию, ‘UTF-8’.
escape – символ, используемый для экранирования кавычек внутри уже заключенного в кавычки значения. По умолчанию '\'.
comment – символ, указывающий начало строки комментария, которые не требуется загружать, по умолчанию выключено.
header – логическое значение, указывающее использовать первую строку в качестве названий столбцов или нет. По умолчанию ‘False’.
inferSchema – логическое значение, указывающее автоматическое получение схемы из данных, требует дополнительного сканирования всего файла. По умолчанию, ‘False’.
ignoreLeadingWhiteSpace – определяет пропускать или нет пробелы перед значениями. По умолчанию, ‘False’.
ignoreTrailingWhiteSpace – определяет пропускать или нет пробелы после значений. По умолчанию, ‘False’.

Слайд 12

CSV

Параметры:
nullValue – устанавливает строку, которая обозначает значение null. По умолчанию пустая строка.
nanValue

CSV Параметры: nullValue – устанавливает строку, которая обозначает значение null. По умолчанию
– устанавливает строку, которая обозначает нечисловое значение. По умолчанию ‘NaN’.
positiveInf – устанавливает строку, которая обозначает плюс бесконечность. По умолчанию ‘Inf’.
negativeInf – устанавливает строку, которая обозначает минус бесконечность. По умолчанию ‘Inf’.
dateFormat – устанавливает формат даты в соответствии форматам java.text.SimpleDateFormat, поля со значениями соответствующие этому формату загружаются с типом дата. По умолчанию, ‘yyyy-MM-dd’.
timestampFormat – устанавливает формат timestamp в соответствии форматам java.text.SimpleDateFormat, поля со значениями соответствующие этому формату загружаются с типом timestamp . По умолчанию, ‘yyyy-MM-dd'T'HH:mm:ss.SSSZZ’.
maxColumns – указывает максимальное возможное количество столбцов, которое может иметь файл, по умолчанию 20480.
maxCharsPerColumn – указывает максимальное возможное количество символов у столбцов, которое может иметь файл, по умолчанию , -1 (неограничено).
maxMalformedLogPerPartition – указывает максимальное количество не соответствующие схеме строк, которые будут записаны в лог, остальные проигнорированы.

Слайд 13

CSV

Параметры:
mode – указывает реакцию на ошибки при парсинге строк. По умолчанию, ‘PERMISSIVE’.
PERMISSIVE

CSV Параметры: mode – указывает реакцию на ошибки при парсинге строк. По
: устанавливает значения в null в строках, которые не смог распарсить. В случае использования пользовательской схемы, устанавливает в null все значения, которые не соответствуют схеме.
DROPMALFORMED : игнорирует строки, которые не смог распарсить.
FAILFAST : вызывает исключение.

Слайд 14

JSON

DF=spark.read.json('//home/user/test/example.json‘, schema=None,
prefersDecimal=‘True’, allowComments=‘False’, allowUnquotedFieldNames=‘True’,
allowSingleQuotes=‘False’, allowNumericLeadingZero=‘True’,
allowBackslashEscapingAnyCharacter=‘True’, mode=None, dateFormat=None, timestampFormat=None)

JSON DF=spark.read.json('//home/user/test/example.json‘, schema=None, prefersDecimal=‘True’, allowComments=‘False’, allowUnquotedFieldNames=‘True’, allowSingleQuotes=‘False’, allowNumericLeadingZero=‘True’, allowBackslashEscapingAnyCharacter=‘True’, mode=None, dateFormat=None, timestampFormat=None)

Слайд 15

JSON

Параметры:
path – строка или список строк, указывающие, где лежат файлы.
schema – опционально,

JSON Параметры: path – строка или список строк, указывающие, где лежат файлы.
схема которая описывает данные в файле.
mode – указывает реакцию на ошибки при парсинге строк. По умолчанию, ‘PERMISSIVE’.
PERMISSIVE : устанавливает значения в null в строках, которые не смог распарсить. В случае использования пользовательской схемы, устанавливает в null все значения, которые не соответствуют схеме.
DROPMALFORMED : игнорирует строки, которые не смог распарсить.
FAILFAST : вызывает исключение.
dateFormat – устанавливает формат даты в соответствии форматам java.text.SimpleDateFormat, поля со значениями соответствующие этому формату загружаются с типом дата. По умолчанию, ‘yyyy-MM-dd’.
timestampFormat – устанавливает формат timestamp в соответствии форматам java.text.SimpleDateFormat, поля со значениями соответствующие этому формату загружаются с типом timestamp . По умолчанию, ‘yyyy-MM-dd'T'HH:mm:ss.SSSZZ’.

Слайд 16

JSON

Параметры:
primitivesAsString – устанавливает загружать все примитивы, как строки. По умолчанию, false.
prefersDecimal –

JSON Параметры: primitivesAsString – устанавливает загружать все примитивы, как строки. По умолчанию,
устанавливает загружать все числа с дробной частью, как decimal (дробная часть фиксированная), иначе, как вещественное число (число с плавающей точкой). По умолчанию, false.
allowComments – устанавливает игнорировать комментарии в стиле Java/C++. По умолчанию, false.
allowUnquotedFieldNames – разрешает наименования полей без кавычек. По умолчанию, false.
allowSingleQuotes – разрешает апострофы в дополнении к двойным кавычкам. По умолчанию, false.
allowNumericLeadingZero – разрешает лидирующие нули в числах (например, 00012). По умолчанию, false.
allowBackslashEscapingAnyCharacter – разрешает использование обратной черты в качестве символа экранирования. По умолчанию, false.

Слайд 17

PARQUET

DF=spark.read.parquet('//home/user/test/exampleparquet’)
Параметры:
path – строка или список строк, указывающие, где лежат данные.

PARQUET DF=spark.read.parquet('//home/user/test/exampleparquet’) Параметры: path – строка или список строк, указывающие, где лежат данные.

Слайд 18

ORC

DF=spark.read.orc('//home/user/test/exampleorc’)
Параметры:
path – строка, указывающая, где лежат данные.

ORC DF=spark.read.orc('//home/user/test/exampleorc’) Параметры: path – строка, указывающая, где лежат данные.

Слайд 19

TEXT

DF=spark.read.text('//home/user/test/example.txt’)
Параметры:
path – строка или список строк, указывающие, где лежат данные.

TEXT DF=spark.read.text('//home/user/test/example.txt’) Параметры: path – строка или список строк, указывающие, где лежат данные.

Слайд 20

HIVE

DF=spark.read.table(tableName = ‘test.table’)
или
DF=spark.sql(sqlQuery = ‘select * from test.tavble’)
Параметры:
tableName – строка название таблицы
или
sqlQuery

HIVE DF=spark.read.table(tableName = ‘test.table’) или DF=spark.sql(sqlQuery = ‘select * from test.tavble’) Параметры:
– sql запрос

Слайд 21

JDBC

DF=spark.read.jdbc(‘jdbc:postgresql://localhost:5432/db_test’, ‘table_test’, column=‘date_from’, lowerBound=None, upperBound=None, numPartitions=10, predicates=None, properties={ ‘user’ : ‘SYSTEM’, ‘password’

JDBC DF=spark.read.jdbc(‘jdbc:postgresql://localhost:5432/db_test’, ‘table_test’, column=‘date_from’, lowerBound=None, upperBound=None, numPartitions=10, predicates=None, properties={ ‘user’ : ‘SYSTEM’,
: ‘mypassword’ })
Примеры строк подключения:

Слайд 22

JDBC

Параметры:
url – JDBC URL
table – название таблицы
column – наименование целочисленного поля,

JDBC Параметры: url – JDBC URL table – название таблицы column –
который будет использовать для партицирования; если параметр установлен, то numPartitions, lowerBound (inclusive), and upperBound (exclusive) устанавливают ограничения на набор данных в таблице, по которым формируется DataFrame.
lowerBound – минимальное значение поля, по которому формируются партиции.
upperBound – максимальное значение поля, по которому формируются партиции.
numPartitions – количество партиций, которое может быть создано (при параллельной обработке большое количество партиций может уронить источник данных).
predicates – список выражений, которые будут применены в условиях WHERE, каждое выражение определяет одну партицию.
properties – дополнительные аргументы при подключении к jdbc источнику. Например { ‘user’ : ‘SYSTEM’, ‘password’ : ‘mypassword’ }

Слайд 23

AVRO

Работа с avro форматом не включена в поставку spark, поэтому поддержку необходимо

AVRO Работа с avro форматом не включена в поставку spark, поэтому поддержку
дополнительно подключать.
./bin/spark-submit --packages org.apache.spark:spark-avro_2.12:3.1.1 ...
После этого этот формат будет доступен в методе:
DF=spark.read.load('hdfs://cluster/user/hdfs/test/exampleavro‘, format=‘avro’, schema=None, **options)
Для чтения avro-файлов – параметр format=‘avro’

Слайд 24

AVRO, LOAD()

Параметры:
path – строка или список строк, указывающие, где лежат файлы.
format –

AVRO, LOAD() Параметры: path – строка или список строк, указывающие, где лежат
Строка указывающая на формат файла (avro/csv/json/parquet/orc…). Значение по умолчанию ‘parquet’.
schema – опционально, схема которая описывает данные в файле или DDL форматированная строка, например ‘col0 INT, col1 DOUBLE’.
options – опции, которые можно задать при вызове соответствующего формату метода (например, load(test.csv, format=‘csv’, sep=‘\t’))

Слайд 25

Создание RDD (sequence)

Создание RDD (sequence)

Слайд 26

SEQUENCE

Sequence файлы используются для хранения пар “ключ-значение”, в pyspark возможность работать с

SEQUENCE Sequence файлы используются для хранения пар “ключ-значение”, в pyspark возможность работать
ними присутствует у RDD, более низкоуровневой структуры данных нежели DataFrame.
rdd = spark.sparkContext.sequenceFile('path')
path – путь до файла

Слайд 27

Сохранение DataFrame в различные форматы данных

Сохранение DataFrame в различные форматы данных

Слайд 28

CSV
DF=spark.write.csv('hdfs://cluster/user/hdfs/test/example.csv‘, mode=‘overwrite’,
compression= snappy, sep=‘|’, escape=None, header=‘True’, nullValue=‘null’,
escapeQuotes=None, quoteAll=None, dateFormat=None, timestampFormat=None,

CSV DF=spark.write.csv('hdfs://cluster/user/hdfs/test/example.csv‘, mode=‘overwrite’, compression= snappy, sep=‘|’, escape=None, header=‘True’, nullValue=‘null’, escapeQuotes=None, quoteAll=None, dateFormat=None,

ignoreLeadingWhiteSpace=None, ignoreTrailingWhiteSpace=None, charToEscapeQuoteEscaping=None,
encoding=None, emptyValue=None)’)

Слайд 29

CSV

Параметры:
path – по какому пути сохранять файл.
mode – определяет поведение, если файлы

CSV Параметры: path – по какому пути сохранять файл. mode – определяет
с таким именем уже созданы, по умолчанию «error».
append: Добавляет содержимое DataFrame к существующим данным.
overwrite: Перезаписывает существующие файлы.
ignore: Отменяет сохранение.
error или errorifexists : Генерирует исключение.
compression – кодек сжатия при сохранении файла (none, bzip2, gzip, lz4, snappy and deflate).
sep – символ, который описывает разделитель в файле. По умолчанию, ‘,’.
header – логическое значение, указывающее записывать названия столбцов в первую строку или нет. По умолчанию ‘False’.
encoding – указывает в какой кодировке сохранять файл. По умолчанию, ‘UTF-8’.
nullValue – устанавливает строку, которая обозначает значение null. По умолчанию пустая строка.
emptyValue – указывает строку, которая будет использоваться при записи пустых значений. По умолчанию пустая строка.

Слайд 30

CSV

Параметры:
escape – символ, используемый для экранирования кавычек внутри уже заключенного в кавычки

CSV Параметры: escape – символ, используемый для экранирования кавычек внутри уже заключенного
значения. По умолчанию '\'.
escapeQuotes – флаг, включающий обрамление в кавычки всех значений, внутри которых есть кавычки. По умолчанию true, (все кавычки в значениях экранируются).
quoteAll – включает обрамление в кавычки всех значений. По умолчанию, false (все кавычки в значениях экранируются).
dateFormat – устанавливает формат даты в соответствии форматам java.text.SimpleDateFormat. По умолчанию, ‘yyyy-MM-dd’.
timestampFormat – устанавливает формат timestamp в соответствии форматам java.text.SimpleDateFormat. По умолчанию, ‘yyyy-MM-dd'T'HH:mm:ss.SSSZZ’.
ignoreLeadingWhiteSpace – определяет пропускать или нет пробелы перед значениями. По умолчанию, ‘True’.
ignoreTrailingWhiteSpace – определяет пропускать или нет пробелы после значений. По умолчанию, ‘True’.
charToEscapeQuoteEscaping –символ, используемый для экранирования, если символ экранирования являются кавычки. По умолчанию используется

Слайд 31

JSON
DF=spark.write.json('hdfs://cluster/user/hdfs/test/examplejson‘, mode=‘overwrite’,
compression= snappy, dateFormat=None, timestampFormat=None, lineSep=None,
encoding=None)’)

JSON DF=spark.write.json('hdfs://cluster/user/hdfs/test/examplejson‘, mode=‘overwrite’, compression= snappy, dateFormat=None, timestampFormat=None, lineSep=None, encoding=None)’)

Слайд 32

JSON

Параметры:
path – по какому пути сохранять файл.
mode – определяет поведение, если файлы

JSON Параметры: path – по какому пути сохранять файл. mode – определяет
с таким именем уже созданы, по умолчанию «error».
append: Добавляет содержимое DataFrame к существующим данным.
overwrite: Перезаписывает существующие файлы.
ignore: Отменяет сохранение.
error или errorifexists : Генерирует исключение.
compression – кодек сжатия при сохранении файла (none, bzip2, gzip, lz4, snappy and deflate).
lineSep – символ, который описывает разделитель строк в файле. По умолчанию, ‘\n’.
encoding – указывает в какой кодировке сохранять файл. По умолчанию, ‘UTF-8’.
dateFormat – устанавливает формат даты в соответствии форматам java.text.SimpleDateFormat. По умолчанию, ‘yyyy-MM-dd’.
timestampFormat – устанавливает формат timestamp в соответствии форматам java.text.SimpleDateFormat. По умолчанию, ‘yyyy-MM-dd'T'HH:mm:ss.SSSZZ’.

Слайд 33

PARQUET
DF=spark.write.parquet('hdfs://cluster/user/hdfs/test/exampleparq‘, mode=‘overwrite’,
partitionBy=‘None’, compression= snappy)’)

PARQUET DF=spark.write.parquet('hdfs://cluster/user/hdfs/test/exampleparq‘, mode=‘overwrite’, partitionBy=‘None’, compression= snappy)’)

Слайд 34

PARQUET

Параметры:
path – по какому пути сохранять файл.
mode – определяет поведение, если файлы

PARQUET Параметры: path – по какому пути сохранять файл. mode – определяет
с таким именем уже созданы, по умолчанию «error».
append: Добавляет содержимое DataFrame к существующим данным.
overwrite: Перезаписывает существующие файлы.
ignore: Отменяет сохранение.
error или errorifexists : Генерирует исключение.
compression – кодек сжатия при сохранении файла (none, bzip2, gzip, lz4, snappy and deflate).
partitionBy – названия столбцов, по которым создавать партиции.

Слайд 35

ORC
DF=spark.write.orc('hdfs://cluster/user/hdfs/test/exampleorc‘, mode=‘overwrite’,
partitionBy=‘None’, compression= snappy)’)

ORC DF=spark.write.orc('hdfs://cluster/user/hdfs/test/exampleorc‘, mode=‘overwrite’, partitionBy=‘None’, compression= snappy)’)

Слайд 36

ORC

Параметры:
path – по какому пути сохранять файл.
mode – определяет поведение, если файлы

ORC Параметры: path – по какому пути сохранять файл. mode – определяет
с таким именем уже созданы, по умолчанию «error».
append: Добавляет содержимое DataFrame к существующим данным.
overwrite: Перезаписывает существующие файлы.
ignore: Отменяет сохранение.
error или errorifexists : Генерирует исключение.
compression – кодек сжатия при сохранении файла (none, bzip2, gzip, lz4, snappy and deflate).
partitionBy – названия столбцов, по которым создавать партиции.

Слайд 37

TEXT

DF=spark.write.text('//home/user/test/example.txt’, compression=‘gzip’, lineSep=‘\n\r’)
Параметры:
path – по какому пути сохранять файл.
mode – определяет поведение,

TEXT DF=spark.write.text('//home/user/test/example.txt’, compression=‘gzip’, lineSep=‘\n\r’) Параметры: path – по какому пути сохранять файл.
если файлы с таким именем уже созданы, по умолчанию «error».
append: Добавляет содержимое DataFrame к существующим данным.
overwrite: Перезаписывает существующие файлы.
ignore: Отменяет сохранение.
error или errorifexists : Генерирует исключение.
compression – кодек сжатия при сохранении файла (none, bzip2, gzip, lz4, snappy and deflate).
lineSep – символ, который описывает разделитель строк в файле. По умолчанию, ‘\n’.

Слайд 38

HIVE

DF=spark.write.saveAsTable(name, format=None, mode=None, partitionBy=None, **options)
Параметры:
name – имя таблицы
format – в каком формате

HIVE DF=spark.write.saveAsTable(name, format=None, mode=None, partitionBy=None, **options) Параметры: name – имя таблицы format
будет создаваться файл (parquet/csv…)
mode – определяет поведение, если файлы с таким именем уже созданы, по умолчанию «error».
append: Добавляет содержимое DataFrame к существующим данным.
overwrite: Перезаписывает существующие файлы.
ignore: Отменяет сохранение.
error или errorifexists : Генерирует исключение.
partitionBy – названия полей, по которым будет партицирование
…другие опции, которые относятся к форматам файлов.

Слайд 39

JDBC

DF=spark.write.jdbc(‘jdbc:postgresql://localhost:5432/db_test’, ‘table_test’, properties={ ‘user’ : ‘SYSTEM’, ‘password’ : ‘mypassword’ })

Параметры:
url – JDBC

JDBC DF=spark.write.jdbc(‘jdbc:postgresql://localhost:5432/db_test’, ‘table_test’, properties={ ‘user’ : ‘SYSTEM’, ‘password’ : ‘mypassword’ }) Параметры:
URL
table – название таблицы
properties – дополнительные аргументы при подключении к jdbc источнику. Например { ‘user’ : ‘SYSTEM’, ‘password’ : ‘mypassword’ }

Например:

Слайд 40

AVRO

Работа с avro форматом не включена в поставку spark, поэтому поддержку необходимо

AVRO Работа с avro форматом не включена в поставку spark, поэтому поддержку
дополнительно подключать.
./bin/spark-submit --packages org.apache.spark:spark-avro_2.12:3.1.1 ...
После этого этот формат будет доступен в методе:
DF=spark.write.save('hdfs://cluster/user/hdfs/test/exampleavro‘, format=‘avro’, mode=‘overwrite’, partitionBy=‘None’, **options)
Для чтения avro-файлов – параметр format=‘avro’

Слайд 41

AVRO, SAVE()

Параметры:
path – строка или список строк, указывающие, где лежат файлы.
format –

AVRO, SAVE() Параметры: path – строка или список строк, указывающие, где лежат
Строка указывающая на формат файла (avro/csv/json/parquet/orc…). Значение по умолчанию ‘parquet’.
mode – определяет поведение, если файлы с таким именем уже созданы, по умолчанию «error».
append: Добавляет содержимое DataFrame к существующим данным.
overwrite: Перезаписывает существующие файлы.
ignore: Отменяет сохранение.
error или errorifexists : Генерирует исключение.
partitionBy – названия столбцов, по которым создавать партиции.
options – опции, которые можно задать при вызове соответствующего формату метода (например, load(test.csv, format=‘csv’, sep=‘\t’))

Слайд 42

Сохранеие RDD (sequence)

Сохранеие RDD (sequence)

Слайд 43

SEQUENCE

Sequence файлы используются для хранения пар “ключ-значение”, в pyspark возможность работать с

SEQUENCE Sequence файлы используются для хранения пар “ключ-значение”, в pyspark возможность работать
ними присутствует у RDD, более низкоуровневой структуры данных нежели DataFrame.
rdd.saveAsSequenceFile(‘path')
path – путь до файла
Имя файла: Практика-SPARK.pptx
Количество просмотров: 47
Количество скачиваний: 0