Содержание
- 2. DataFrame
- 3. DATAFRAME DataFrame – распределенный набор данных сгруппированный по именованным столбцам. DataFrame – объект SPARK, позволяющий работать
- 4. СОЗДАНИЕ emp = [(1,"Smith",-1,"2018","10","M",3000), \ (2,"Rose",1,"2010","20","M",4000), \ (3,"Williams",1,"2010","10","M",1000), \ (4,"Jones",2,"2005","10","F",2000), \ (5,"Brown",2,"2010","40","",-1), \ (6,"Brown",2,"2010","50","",-1) \ ]
- 5. СОЗДАНИЕ df_customers=spark.read.csv('/tmp/spark_practics/data/customers', header=True, sep=',') df_billings=spark.read.csv('/tmp/spark_practics/data/billings', header=True, sep=',') df_customers.printSchema() df_customers.show(3) df_billings.printSchema() df_billings.show(3)
- 6. ОБРАЩЕНИЕ К СТОЛБЦАМ dataframe_name.column_name df_customers = df_customers.withColumnRenamed("customer_id", "id") df_billings.filter(df_billings.amount .groupBy(df_customers.country).agg({"amount": "avg", "balance": "max", "customer_id": "count"}).show(3) 2)
- 7. ОПЕРАЦИИ C DATAFRAME Выбор столбцов df_billings.select('customer_id', 'balance').show(3) 2) Переименование столбцов df_billings.withColumnRenamed('customer_id', 'id').show(3) 3) Добавление новых столбцов
- 8. SPARK SQL df_customers.createOrReplaceTempView('cust') df = spark.sql('select country, min(id), count(id) from cust group by country having count(id)
- 9. Создание DataFrame из данных разных форматов
- 10. CSV DF=spark.read.csv('hdfs://cluster/user/hdfs/test/example.csv‘, schema=None, sep = '\t', encoding=‘UTF-8’, escape=‘\’, comment=‘--’, header=‘True’, inferSchema=‘False’, ignoreLeadingWhiteSpace=‘True’, ignoreTrailingWhiteSpace=‘True’, nullValue=‘Null’, nanValue=‘Nan’, positiveInf=‘-oo’,
- 11. CSV Параметры: path – строка или список строк, указывающие, где лежат файлы. schema – опционально, схема
- 12. CSV Параметры: nullValue – устанавливает строку, которая обозначает значение null. По умолчанию пустая строка. nanValue –
- 13. CSV Параметры: mode – указывает реакцию на ошибки при парсинге строк. По умолчанию, ‘PERMISSIVE’. PERMISSIVE :
- 14. JSON DF=spark.read.json('//home/user/test/example.json‘, schema=None, prefersDecimal=‘True’, allowComments=‘False’, allowUnquotedFieldNames=‘True’, allowSingleQuotes=‘False’, allowNumericLeadingZero=‘True’, allowBackslashEscapingAnyCharacter=‘True’, mode=None, dateFormat=None, timestampFormat=None)
- 15. JSON Параметры: path – строка или список строк, указывающие, где лежат файлы. schema – опционально, схема
- 16. JSON Параметры: primitivesAsString – устанавливает загружать все примитивы, как строки. По умолчанию, false. prefersDecimal – устанавливает
- 17. PARQUET DF=spark.read.parquet('//home/user/test/exampleparquet’) Параметры: path – строка или список строк, указывающие, где лежат данные.
- 18. ORC DF=spark.read.orc('//home/user/test/exampleorc’) Параметры: path – строка, указывающая, где лежат данные.
- 19. TEXT DF=spark.read.text('//home/user/test/example.txt’) Параметры: path – строка или список строк, указывающие, где лежат данные.
- 20. HIVE DF=spark.read.table(tableName = ‘test.table’) или DF=spark.sql(sqlQuery = ‘select * from test.tavble’) Параметры: tableName – строка название
- 21. JDBC DF=spark.read.jdbc(‘jdbc:postgresql://localhost:5432/db_test’, ‘table_test’, column=‘date_from’, lowerBound=None, upperBound=None, numPartitions=10, predicates=None, properties={ ‘user’ : ‘SYSTEM’, ‘password’ : ‘mypassword’ })
- 22. JDBC Параметры: url – JDBC URL table – название таблицы column – наименование целочисленного поля, который
- 23. AVRO Работа с avro форматом не включена в поставку spark, поэтому поддержку необходимо дополнительно подключать. ./bin/spark-submit
- 24. AVRO, LOAD() Параметры: path – строка или список строк, указывающие, где лежат файлы. format – Строка
- 25. Создание RDD (sequence)
- 26. SEQUENCE Sequence файлы используются для хранения пар “ключ-значение”, в pyspark возможность работать с ними присутствует у
- 27. Сохранение DataFrame в различные форматы данных
- 28. CSV DF=spark.write.csv('hdfs://cluster/user/hdfs/test/example.csv‘, mode=‘overwrite’, compression= snappy, sep=‘|’, escape=None, header=‘True’, nullValue=‘null’, escapeQuotes=None, quoteAll=None, dateFormat=None, timestampFormat=None, ignoreLeadingWhiteSpace=None, ignoreTrailingWhiteSpace=None, charToEscapeQuoteEscaping=None,
- 29. CSV Параметры: path – по какому пути сохранять файл. mode – определяет поведение, если файлы с
- 30. CSV Параметры: escape – символ, используемый для экранирования кавычек внутри уже заключенного в кавычки значения. По
- 31. JSON DF=spark.write.json('hdfs://cluster/user/hdfs/test/examplejson‘, mode=‘overwrite’, compression= snappy, dateFormat=None, timestampFormat=None, lineSep=None, encoding=None)’)
- 32. JSON Параметры: path – по какому пути сохранять файл. mode – определяет поведение, если файлы с
- 33. PARQUET DF=spark.write.parquet('hdfs://cluster/user/hdfs/test/exampleparq‘, mode=‘overwrite’, partitionBy=‘None’, compression= snappy)’)
- 34. PARQUET Параметры: path – по какому пути сохранять файл. mode – определяет поведение, если файлы с
- 35. ORC DF=spark.write.orc('hdfs://cluster/user/hdfs/test/exampleorc‘, mode=‘overwrite’, partitionBy=‘None’, compression= snappy)’)
- 36. ORC Параметры: path – по какому пути сохранять файл. mode – определяет поведение, если файлы с
- 37. TEXT DF=spark.write.text('//home/user/test/example.txt’, compression=‘gzip’, lineSep=‘\n\r’) Параметры: path – по какому пути сохранять файл. mode – определяет поведение,
- 38. HIVE DF=spark.write.saveAsTable(name, format=None, mode=None, partitionBy=None, **options) Параметры: name – имя таблицы format – в каком формате
- 39. JDBC DF=spark.write.jdbc(‘jdbc:postgresql://localhost:5432/db_test’, ‘table_test’, properties={ ‘user’ : ‘SYSTEM’, ‘password’ : ‘mypassword’ }) Параметры: url – JDBC URL
- 40. AVRO Работа с avro форматом не включена в поставку spark, поэтому поддержку необходимо дополнительно подключать. ./bin/spark-submit
- 41. AVRO, SAVE() Параметры: path – строка или список строк, указывающие, где лежат файлы. format – Строка
- 42. Сохранеие RDD (sequence)
- 43. SEQUENCE Sequence файлы используются для хранения пар “ключ-значение”, в pyspark возможность работать с ними присутствует у
- 45. Скачать презентацию