Data frame with pyspark

Read this in "about 2 minutes".

Hi!

Data frame is like table in SQL, it can be loaded from file and operated in memory. This is the process of data-preprocessing, which can be very tedious work.

When data is millions, then processing in single machine is not fast enough since we are running with time.

Here we will use pyspark — spark in python.

0.import package

from pyspark.sql import SparkSession

1.create a spark instance

app = SparkSession.builder.appName(app_name)
app.config('spark.driver.memory', driver_memory)
app.config('spark.executor.memory', executor_memory)
app.config('spark.executor.cores', '5')
app.config('spark.cores.max', '5')
app.config('spark.driver.maxResultSize', '0')
spark = app.getOrCreate()

2.load from (csv) file

header : ‘false’ or ‘true’(default)

delimiter : if not set, delimiter is ‘,’

format : the file format

df = spark.read.load("file://" + self.file_root, format="csv", 
header='false')

3.rename columns

df.withColumnRenamed('old_column_name', 'new_column_name')

4.column replace (replace “’” with “ “)

df = df.withColumn('column_name', regexp_replace('column_name', "'", " "))

5.drop null

df = df.na.drop(subset=["column_name"])

6.remove duplicate rows

df = df.dropDuplicates()

7.filter

df = df.filter(df['column_name'] == 'value')

8.group and aggregate

group = df.groupBy(['id', 'pg_id']).agg({'*': 'count', 'ck_count': 'sum', 'position': 'avg'})

9.map and reduce…

10.errors and solutions ERROR : java.io.IOException No space left on device Cause : the temp file dir is not big enough when the storage for shuffle is heavy Solutions: use linux command ‘df -h’ to check available partition mount, initially set app.config(‘spark.local.‌​dir’, ‘./temp/spark’)

but spark.local.‌​dir is invalid ,so set system variable ‘export SPARK_LOCAL_DIRS=/home/ch/temp/spark’

ERROR : java.lang.OutOfMemoryError: Java heap space Cause : memory is used up Solution : config memory size like the following app.config(‘spark.driver.memory’, ‘40g) app.config(‘spark.executor.memory’, ‘40g)

ERROR : java.lang.OutOfMemoryError: GC overhead limit exceeded Cause : make too much effort to gc collect Solution : import gc and del big variable by hand

Goodbye! :wink:


Author

Typing Theme

Lorem ipsum dolor sit amet, consectetur adipisicing elit. Tempora non aut eos voluptas debitis unde impedit aliquid ipsa.

 The comment for this post is disabled.