Data frame with pyspark
Hi!
Data frame is like table in SQL, it can be loaded from file and operated in memory. This is the process of data-preprocessing, which can be very tedious work.
When data is millions, then processing in single machine is not fast enough since we are running with time.
Here we will use pyspark — spark in python.
0.import package
from pyspark.sql import SparkSession
1.create a spark instance
app = SparkSession.builder.appName(app_name)
app.config('spark.driver.memory', driver_memory)
app.config('spark.executor.memory', executor_memory)
app.config('spark.executor.cores', '5')
app.config('spark.cores.max', '5')
app.config('spark.driver.maxResultSize', '0')
spark = app.getOrCreate()
2.load from (csv) file
header : ‘false’ or ‘true’(default)
delimiter : if not set, delimiter is ‘,’
format : the file format
df = spark.read.load("file://" + self.file_root, format="csv",
header='false')
3.rename columns
df.withColumnRenamed('old_column_name', 'new_column_name')
4.column replace (replace “’” with “ “)
df = df.withColumn('column_name', regexp_replace('column_name', "'", " "))
5.drop null
df = df.na.drop(subset=["column_name"])
6.remove duplicate rows
df = df.dropDuplicates()
7.filter
df = df.filter(df['column_name'] == 'value')
8.group and aggregate
group = df.groupBy(['id', 'pg_id']).agg({'*': 'count', 'ck_count': 'sum', 'position': 'avg'})
9.map and reduce…
10.errors and solutions ERROR : java.io.IOException No space left on device Cause : the temp file dir is not big enough when the storage for shuffle is heavy Solutions: use linux command ‘df -h’ to check available partition mount, initially set app.config(‘spark.local.dir’, ‘./temp/spark’)
but spark.local.dir is invalid ,so set system variable ‘export SPARK_LOCAL_DIRS=/home/ch/temp/spark’
ERROR : java.lang.OutOfMemoryError: Java heap space Cause : memory is used up Solution : config memory size like the following app.config(‘spark.driver.memory’, ‘40g) app.config(‘spark.executor.memory’, ‘40g)
ERROR : java.lang.OutOfMemoryError: GC overhead limit exceeded Cause : make too much effort to gc collect Solution : import gc and del big variable by hand
Goodbye!