You can create a DataFrame using createDataFrame()
method from a list of dictionaries.
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Example").getOrCreate()
data = [{"name": "Alice", "age": 29}, {"name": "Bob", "age": 30}]
df = spark.createDataFrame(data)
df.show()
Using the withColumn()
method and when
function from pyspark.sql.functions
.
from pyspark.sql.functions import when
df = df.withColumn("age_category", when(df.age > 30, "Senior").otherwise("Junior"))
df.show()
Use the dropDuplicates()
method.
df.dropDuplicates().show()
Use the join()
method to perform joins between DataFrames.
df1 = spark.createDataFrame([("Alice", 29), ("Bob", 30)], ["name", "age"])
df2 = spark.createDataFrame([("Alice", "NY"), ("Bob", "LA")], ["name", "city"])
joined_df = df1.join(df2, on="name", how="inner")
joined_df.show()
Use the filter()
method.
df.filter(df.age > 30).show()
Use the agg()
and avg()
functions.
from pyspark.sql.functions import avg
df.agg(avg("age")).show()
Use the toPandas()
method.
pandas_df = df.toPandas()
Use the cache()
method.
df.cache()
Use the repartition()
method.
df = df.repartition(5)
Use the groupBy()
method followed by aggregation functions.
df.groupBy("age").count().show()
Use the orderBy()
method.
df.orderBy(df.age.desc()).show()
Use the drop()
method.
df = df.drop("age")
Use the read.csv()
method.
df = spark.read.csv("file.csv", header=True, inferSchema=True)
Use the write.parquet()
method.
df.write.parquet("output.parquet")
Use Window
from pyspark.sql.window
for windowed operations.
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
windowSpec = Window.partitionBy("age").orderBy("name")
df = df.withColumn("row_num", row_number().over(windowSpec))
df.show()
Use the dropna()
or fillna()
methods.
df.dropna().show()
df.fillna({"age": 0}).show()
Use the union()
method.
df1.union(df2).show()
Use the pivot()
method.
df.groupBy("name").pivot("age").count().show()
First, register the DataFrame as a table, then use the sql()
method.
df.createOrReplaceTempView("people")
spark.sql("SELECT * FROM people WHERE age > 30").show()
Use SparkContext.broadcast()
to broadcast variables to executors.
broadcastVar = spark.sparkContext.broadcast([1, 2, 3])
broadcastVar.value
Use createDataFrame()
method.
rdd = spark.sparkContext.parallelize([(1, "Alice"), (2, "Bob")])
df = spark.createDataFrame(rdd, ["id", "name"])
df.show()
Use the corr()
method.
df.stat.corr("age", "salary")
Use the cast()
method.
df = df.withColumn("age", df["age"].cast("integer"))
Use the split()
function.
from pyspark.sql.functions import split
df = df.withColumn("name_split", split(df["name"], " "))
df.show()
Use pyspark.sql.functions.udf
to define UDFs.
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
def square(x):
return x * x
square_udf = udf(square, IntegerType())
df = df.withColumn("age_squared", square_udf(df["age"]))
df.show()
Use Window
function with rowsBetween()
.
from pyspark.sql.window import Window
from pyspark.sql.functions import sum
windowSpec = Window.orderBy("age").rowsBetween(Window.unboundedPreceding, Window.currentRow)
df = df.withColumn("running_total", sum("age").over(windowSpec))
df.show()
Use the collect()
method.
rows = df.collect()
Use broadcast()
from pyspark.sql.functions
.
from pyspark.sql.functions import broadcast
broadcasted_df = broadcast(df)
Use the limit()
method.
df.orderBy(df.age.desc()).limit(5).show()
Use the join()
method with how="left_anti"
.
df1.join(df2, on="name", how="left_anti").show()