You can create a DataFrame using createDataFrame() method from a list of dictionaries.
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Example").getOrCreate()
data = [{"name": "Alice", "age": 29}, {"name": "Bob", "age": 30}]
df = spark.createDataFrame(data)
df.show()
Using the withColumn() method and when function from pyspark.sql.functions.
from pyspark.sql.functions import when
df = df.withColumn("age_category", when(df.age > 30, "Senior").otherwise("Junior"))
df.show()
Use the dropDuplicates() method.
df.dropDuplicates().show()
Use the join() method to perform joins between DataFrames.
df1 = spark.createDataFrame([("Alice", 29), ("Bob", 30)], ["name", "age"])
df2 = spark.createDataFrame([("Alice", "NY"), ("Bob", "LA")], ["name", "city"])
joined_df = df1.join(df2, on="name", how="inner")
joined_df.show()
Use the filter() method.
df.filter(df.age > 30).show()
Use the agg() and avg() functions.
from pyspark.sql.functions import avg
df.agg(avg("age")).show()
Use the toPandas() method.
pandas_df = df.toPandas()
Use the cache() method.
df.cache()
Use the repartition() method.
df = df.repartition(5)
Use the groupBy() method followed by aggregation functions.
df.groupBy("age").count().show()
Use the orderBy() method.
df.orderBy(df.age.desc()).show()
Use the drop() method.
df = df.drop("age")
Use the read.csv() method.
df = spark.read.csv("file.csv", header=True, inferSchema=True)
Use the write.parquet() method.
df.write.parquet("output.parquet")
Use Window from pyspark.sql.window for windowed operations.
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
windowSpec = Window.partitionBy("age").orderBy("name")
df = df.withColumn("row_num", row_number().over(windowSpec))
df.show()
Use the dropna() or fillna() methods.
df.dropna().show()
df.fillna({"age": 0}).show()
Use the union() method.
df1.union(df2).show()
Use the pivot() method.
df.groupBy("name").pivot("age").count().show()
First, register the DataFrame as a table, then use the sql() method.
df.createOrReplaceTempView("people")
spark.sql("SELECT * FROM people WHERE age > 30").show()
Use SparkContext.broadcast() to broadcast variables to executors.
broadcastVar = spark.sparkContext.broadcast([1, 2, 3])
broadcastVar.value
Use createDataFrame() method.
rdd = spark.sparkContext.parallelize([(1, "Alice"), (2, "Bob")])
df = spark.createDataFrame(rdd, ["id", "name"])
df.show()
Use the corr() method.
df.stat.corr("age", "salary")
Use the cast() method.
df = df.withColumn("age", df["age"].cast("integer"))
Use the split() function.
from pyspark.sql.functions import split
df = df.withColumn("name_split", split(df["name"], " "))
df.show()
Use pyspark.sql.functions.udf to define UDFs.
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
def square(x):
return x * x
square_udf = udf(square, IntegerType())
df = df.withColumn("age_squared", square_udf(df["age"]))
df.show()
Use Window function with rowsBetween().
from pyspark.sql.window import Window
from pyspark.sql.functions import sum
windowSpec = Window.orderBy("age").rowsBetween(Window.unboundedPreceding, Window.currentRow)
df = df.withColumn("running_total", sum("age").over(windowSpec))
df.show()
Use the collect() method.
rows = df.collect()
Use broadcast() from pyspark.sql.functions.
from pyspark.sql.functions import broadcast
broadcasted_df = broadcast(df)
Use the limit() method.
df.orderBy(df.age.desc()).limit(5).show()
Use the join() method with how="left_anti".
df1.join(df2, on="name", how="left_anti").show()