<h2>About</h2><p>Part 1 explains basics of spark and differences/advantages against Hadoop. Part 2 includes basic syntax on how to use Spark by Python using databricks platform.</p><p><br></p><h3>Spark Platform and Databricks</h3><p>There are couple of ways to set up spark. Below list includes the options by easier order</p><ol><li>Cloud : Databricks (Free Tier)</li><li>Cloud : AWS EMR(Elastic MapReduce) (No Free option)</li><li>Cloud : AWS EC2 (Might be covered by free tier option for 1st year)</li><li>Local : VM (Free)</li></ol><p><a href="https://databricks.com/" target="_blank">Databrics</a> is spin out from UC Berkeley same team as Apache Spark, and makes spark easier to use. Behind the easy to use is based on Unified Analytics Platform.</p><p><br></p><h3>Syntax</h3><p>Import required libraries</p><pre># python from pyspark.sql import SparkSession spark = SparkSession.builder.appName(<put your app name>).getOrCreate()</pre><p>Data load : upload local csv/json data to databricks and transform to DataFrame</p><p>See below screenshot for upload data. Additional reference is <a href="https://docs.databricks.com/data/data.html" target="_blank">here</a>.</p><p><img src="/media/django-summernote/2021-06-06/b10950fc-1f56-46bf-8514-eddddb3eb83c.png" style="width: 775px;"></p><pre># python #upload json data df = spark.read.format("json").load("dbfs:/FileStore/shared_uploads/tak@datak.biz/<your file name>.json", inferSchema = True, header = True) #upload csv data df = spark.read.format("csv").load("dbfs:/FileStore/shared_uploads/tak@datak.biz/<your file name>.csv", inferSchema=True, header=True)</pre><p>For complex json load to dataframe, see the <a href="https://docs.databricks.com/spark/latest/dataframes-datasets/complex-nested-data.html" target="_blank">resource</a> from databricks</p><p><br></p><p>Show data in spark dataframe</p><pre># python df.show()</pre><p>Head/Tail</p><pre># python #first n rows as a list df.head(n) #pull first row from n rows of data frame (it's not a list) df.head(n)[0] #pull value first column and first row from n rows of data frame and (it is a value) df.head(n)[0][0] #last n rows df.tail(n) </pre><p>Dataframe stats</p><pre># python df.describe().show()</pre><p>Column names</p><pre># python #list of columns df.columns</pre><p>Column schema type (check all of columne)<br></p><pre># python df.printSchema()</pre><p></p><p>Change schema<br></p><pre># python from pyspark.sql.types import StrucField, StringType, IntegerType, StructType #assume df includes 2 columns named 'age'(long) and 'name'(string) data_schema = [StructField('age', IntegerType(), True), StructField('name', StringType(), True)] final_struc = StructType(fileds=data_schema) #convert age data type from default long to integer df = spark.read.json("dbfs:/FileStore/shared_uploads/tak@datak.biz/<your file name>.json", schema=final_struc) </pre><p></p><p>Check column schema type</p><pre># python type(df['age'])</pre><p>Select columns<br></p><pre># python #select single column df.select('col1') #select multi columns df.select(['col1','col2'])</pre><p></p><p>Filter</p><pre># python #single conditin df.filter(df['Close'] < 500).show() #multiple conditions : and df.filter((df['Close'] < 500) & (df['Open'] > 200)).show() #multiple conditions : and not df.filter((df1['Close'] < 200) & ~(df1['Open'] > 200)).show() </pre><p>Collect</p><p>`.show()` does not provide actual list nor value, `collect()` does provide in case we would like to store either list or value for a next data modulation</p><pre>new_df = df.<your method chain>.collect()</pre><p>asDict</p><p>get data as a value</p><pre># python new_df = df.filter().collect new_df_row = new_df[0] row.asDict()['Close']</pre><p>Group by</p><pre># python #sum df.groupBy('<column name>').sum('<numeric col>').show() #sum can be replaced with max, min, mean, count, etc.</pre><p>Aggregate</p><pre># python #sum df.agg({}).show({'<numeric col>':'sum'}) #sum can be replaced with max, min, mean, count, etc.</pre><p>Count distinct</p><pre># python from pyspark.sql.functions import countDistinct df.select(countDistinct('<numeric col>')).show()</pre><p>Missing Data : na drop</p><pre># python #remove record if any count of na at a row df.na.drop().show() #remove record if a row has 2 nulls df.na.drop(thresh = 2).show() #remove record with null from an user specified column df.na.drop(subset=['<specific col name>']).show()</pre><p>Missing Data : na fill</p><pre># python #only fill 'string null' by 'FILL VALUE' df.na.fill('FILL VALUE').show() #only fill 'numeric null' by 0 df.na.fill(0).show() #fill by specific value for user specified column df.na.fill(<what ever value>, subset=['<specific col name>']).show()</pre><p>Dates and Time Stamp</p><pre># python from pyspark.sql.functions import (dayofmonth, hour, dayofyear, month, year, weekofyear, format_number, data_format) #only show dayofmonth df.select(dayofmonth(df['<date column>'])).show() #dayofmonth() can be replaced with the other options importing above</pre><p>Add new column</p><pre># python newdf = df.withColumn('<new col name>', <function for new col>) newdf.show()</pre><p>Change column name</p><pre>new_df = df.withColumnRenamed('<previous col name>', '<new col name>')</pre><p><br></p><p>Reference notebook at databricks : <a href="https://community.cloud.databricks.com/?o=1968775996956896#notebook/1104620756046730/command/2662797607545198" target="_blank">datak</a></p>
<< Back to Blog Posts
Back to Home