Spark pt2
Category : Datascience Tag : gds bigdata
June 6, 2021, 3:08 p.m.

Short Description :
Spark part2. Basic syntax using pySpark at databricks
source : datak

<h2>About</h2><p>Part 1 explains basics of spark and differences/advantages against Hadoop. Part 2 includes basic syntax on how to use Spark by Python using databricks platform.</p><p><br></p><h3>Spark Platform and Databricks</h3><p>There are couple of ways to set up spark. Below list includes the options by easier order</p><ol><li>Cloud : Databricks (Free Tier)</li><li>Cloud : AWS EMR(Elastic MapReduce) (No Free option)</li><li>Cloud : AWS EC2 (Might be covered by free tier option for 1st year)</li><li>Local : VM (Free)</li></ol><p><a href="https://databricks.com/" target="_blank">Databrics</a>&nbsp;is spin out from UC Berkeley same team as Apache Spark, and makes spark easier to use. Behind the easy to use is based on Unified Analytics Platform.</p><p><br></p><h3>Syntax</h3><p>Import required libraries</p><pre># python from pyspark.sql import SparkSession spark = SparkSession.builder.appName(&lt;put your app name&gt;).getOrCreate()</pre><p>Data load : upload local csv/json data to databricks and transform to DataFrame</p><p>See below screenshot for upload data. Additional reference is <a href="https://docs.databricks.com/data/data.html" target="_blank">here</a>.</p><p><img src="/media/django-summernote/2021-06-06/b10950fc-1f56-46bf-8514-eddddb3eb83c.png" style="width: 775px;"></p><pre># python #upload json data df = spark.read.format("json").load("dbfs:/FileStore/shared_uploads/tak@datak.biz/&lt;your file name&gt;.json", inferSchema = True, header = True) #upload csv data df = spark.read.format("csv").load("dbfs:/FileStore/shared_uploads/tak@datak.biz/&lt;your file name&gt;.csv", inferSchema=True, header=True)</pre><p>For complex json load to dataframe, see the&nbsp;<a href="https://docs.databricks.com/spark/latest/dataframes-datasets/complex-nested-data.html" target="_blank">resource</a>&nbsp;from databricks</p><p><br></p><p>Show data in spark dataframe</p><pre># python df.show()</pre><p>Head/Tail</p><pre># python #first n rows as a list df.head(n) #pull first row from n rows of data frame (it's not a list) df.head(n)[0] #pull value first column and first row from n rows of data frame and (it is a value) df.head(n)[0][0] #last n rows df.tail(n) </pre><p>Dataframe stats</p><pre># python df.describe().show()</pre><p>Column names</p><pre># python #list of columns df.columns</pre><p>Column schema type (check all of columne)<br></p><pre># python df.printSchema()</pre><p></p><p>Change schema<br></p><pre># python from pyspark.sql.types import StrucField, StringType, IntegerType, StructType #assume df includes 2 columns named 'age'(long) and 'name'(string) data_schema = [StructField('age', IntegerType(), True), StructField('name', StringType(), True)] final_struc = StructType(fileds=data_schema) #convert age data type from default long to integer df = spark.read.json("dbfs:/FileStore/shared_uploads/tak@datak.biz/&lt;your file name&gt;.json", schema=final_struc) </pre><p></p><p>Check column schema type</p><pre># python type(df['age'])</pre><p>Select columns<br></p><pre># python #select single column df.select('col1') #select multi columns df.select(['col1','col2'])</pre><p></p><p>Filter</p><pre># python #single conditin df.filter(df['Close'] &lt; 500).show() #multiple conditions : and df.filter((df['Close'] &lt; 500) &amp; (df['Open'] &gt; 200)).show() #multiple conditions : and not df.filter((df1['Close'] &lt; 200) &amp; ~(df1['Open'] &gt; 200)).show() </pre><p>Collect</p><p>`.show()` does not provide actual list nor value, `collect()` does provide in case we would like to store either list or value for a next data modulation</p><pre>new_df = df.&lt;your method chain&gt;.collect()</pre><p>asDict</p><p>get data as a value</p><pre># python new_df = df.filter().collect new_df_row = new_df[0] row.asDict()['Close']</pre><p>Group by</p><pre># python #sum df.groupBy('&lt;column name&gt;').sum('&lt;numeric col&gt;').show() #sum can be replaced with max, min, mean, count, etc.</pre><p>Aggregate</p><pre># python #sum df.agg({}).show({'&lt;numeric col&gt;':'sum'}) #sum can be replaced with max, min, mean, count, etc.</pre><p>Count distinct</p><pre># python from pyspark.sql.functions import countDistinct df.select(countDistinct('&lt;numeric col&gt;')).show()</pre><p>Missing Data : na drop</p><pre># python #remove record if any count of na at a row df.na.drop().show() #remove record if a row has 2 nulls df.na.drop(thresh = 2).show() #remove record with null from an user specified column df.na.drop(subset=['&lt;specific col name&gt;']).show()</pre><p>Missing Data : na fill</p><pre># python #only fill 'string null' by 'FILL VALUE' df.na.fill('FILL VALUE').show() #only fill 'numeric null' by 0 df.na.fill(0).show() #fill by specific value for user specified column df.na.fill(&lt;what ever value&gt;, subset=['&lt;specific col name&gt;']).show()</pre><p>Dates and Time Stamp</p><pre># python from pyspark.sql.functions import (dayofmonth, hour, dayofyear, month, year, weekofyear, format_number, data_format) #only show dayofmonth df.select(dayofmonth(df['&lt;date column&gt;'])).show() #dayofmonth() can be replaced with the other options importing above</pre><p>Add new column</p><pre># python newdf = df.withColumn('&lt;new col name&gt;', &lt;function for new col&gt;) newdf.show()</pre><p>Change column name</p><pre>new_df = df.withColumnRenamed('&lt;previous col name&gt;', '&lt;new col name&gt;')</pre><p><br></p><p>Reference notebook at databricks :&nbsp;<a href="https://community.cloud.databricks.com/?o=1968775996956896#notebook/1104620756046730/command/2662797607545198" target="_blank">datak</a></p>


<< Back to Blog Posts
Back to Home

Related Posts

Spark pt1
Jun 6 2021
Spark part1. Spark basics



© DATAK 2024