[Solved] is it possible to prevent appending a file if the schema is not correct?

karpan Asks: is it possible to prevent appending a file if the schema is not correct?
The following example shows that the spark allows appending a file even if the data to be appended have one more column (different schema). Is there a way to prevent this from happening? In principle a parquet file contains the schema so is there an automatic way to achieve this?

Code:
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.types import *
import os
# imports for random dataset creation
import random
import string
spark = SparkSession.builder.appName('learn').master('yarn').enableHiveSupport().getOrCreate()
# first save
schema1 = StructType([
    StructField('id_inside', LongType(), nullable=False),
    StructField('name_inside', StringType(), nullable=False),
])
data1 = [[random.randint(0, 5), ''.join(random.choice(string.ascii_lowercase) for _ in range(10))] for _ in range(10)]
df1 = spark.createDataFrame(data1, schema=schema1)
df1.write.format('parquet').mode(saveMode='overwrite').save('/tmp/df1_2')
# df1_2.show()
# +---------+-----------+
# |id_inside|name_inside|
# +---------+-----------+
# |        0| krohfjcwwo|
# |        0| cvkwmuddxf|
# |        5| rsxtjdfwjv|
os.system('hadoop fs -ls /tmp/df1_2')
# os.system('hadoop fs -ls /tmp/df1_2')
# Found 3 items
# -rw-r--r--   3 sdap hdfs          0 2021-07-30 17:58 /tmp/df1_2/_SUCCESS
# -rw-r--r--   3 sdap hdfs        723 2021-07-30 17:58 /tmp/df1_2/part-00000-e353d0d9-ff1e-4436-8a2f-95fcf69eb0ca-c000.snappy.parquet
# -rw-r--r--   3 sdap hdfs        720 2021-07-30 17:58 /tmp/df1_2/part-00001-e353d0d9-ff1e-4436-8a2f-95fcf69eb0ca-c000.snappy.parquet

# append data using a different schema
schema2 = StructType([
    StructField('id_inside', LongType(), nullable=False),
    StructField('name_inside', StringType(), nullable=False),
    StructField('name_inside2', StringType(), nullable=False),
])
data2 = [[random.randint(0, 5), *[''.join(random.choice(string.ascii_lowercase) for _ in range(10))]*2 ] for _ in range(10)]
df2 = spark.createDataFrame(data2, schema=schema2)
df2.write.format('parquet').mode(saveMode='append').save('/tmp/df1_2')
os.system('hadoop fs -ls /tmp/df1_2')
# Found 5 items
# -rw-r--r--   3 sdap hdfs          0 2021-07-30 17:58 /tmp/df1_2/_SUCCESS
# -rw-r--r--   3 sdap hdfs       1006 2021-07-30 17:58 /tmp/df1_2/part-00000-d5372b57-4173-401d-94e2-c78d3b5c395c-c000.snappy.parquet
# -rw-r--r--   3 sdap hdfs        723 2021-07-30 17:58 /tmp/df1_2/part-00000-e353d0d9-ff1e-4436-8a2f-95fcf69eb0ca-c000.snappy.parquet
# -rw-r--r--   3 sdap hdfs       1003 2021-07-30 17:58 /tmp/df1_2/part-00001-d5372b57-4173-401d-94e2-c78d3b5c395c-c000.snappy.parquet
# -rw-r--r--   3 sdap hdfs        720 2021-07-30 17:58 /tmp/df1_2/part-00001-e353d0d9-ff1e-4436-8a2f-95fcf69eb0ca-c000.snappy.parquet


# read the file back in and check the schema
res = spark.read.format('parquet').load('/tmp/df1_2')
res.sample(withReplacement=False, fraction=0.1).show()
# +---------+-----------+------------+
# |id_inside|name_inside|name_inside2|
# +---------+-----------+------------+
# |        5| gmafmuprti|  gmafmuprti|
# |        3| ttshihunbe|  ttshihunbe|
# |        2| dlrpqnzwrz|        null|
# +---------+-----------+------------+
res.printSchema()
# root
#  |-- id_inside: long (nullable = true)
#  |-- name_inside: string (nullable = true)
#  |-- name_inside2: string (nullable = true)

Ten-tools.com may not be responsible for the answers or solutions given to any question asked by the users. All Answers or responses are user generated answers and we do not have proof of its validity or correctness. Please vote for the answer that helped you in order to help others find out which is the most helpful answer. Questions labeled as solved may be solved or may not be solved depending on the type of question and the date posted for some posts may be scheduled to be deleted periodically. Do not hesitate to share your response here to help other visitors like you. Thank you, Ten-tools.