添加链接
link之家
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接
Collectives™ on Stack Overflow

Find centralized, trusted content and collaborate around the technologies you use most.

Learn more about Collectives

Teams

Q&A for work

Connect and share knowledge within a single location that is structured and easy to search.

Learn more about Teams

I am working with data from very long, nested JSON files. Problem is, that the structure of these files is not always the same as some of them miss columns others have. I want to create a custom schema from an empty JSON file that contains all columns. If I later read JSON files into this pre-defined schema, the non-existing columns will be filled with null values (thats at least the plan). What I did so far:

  • loading a test JSON (that does not contain all columns that can be expected) into a dataframe
  • writing its schema into a JSON file
  • Opening this JSON file in a text-editor and adding the missing columns manually
  • Next thing I want to do is creating a new schema by reading the JSON file into my code, but I struggle with the synthax. Can I read the schema directly from the file itself? I have tried

    schemaFromJson = StructType.fromJson(json.loads('filepath/spark-schema.json'))
    

    but it gives me TypeError: init() missing 2 required positional arguments: 'doc' and 'pos'

    Any idea whats wrong about my current code? Thanks a lot

    edit: I came across this link sparkbyexamples.com/pyspark/pyspark-structtype-and-structfield . Chapter 7 pretty much describes the problem I am having. I just dont understand how I can parse the json file I manually enhanced to schemaFromJson = StructType.fromJson(json.loads(schema.json)).

    When I do:

    jsonDF = spark.read.json(filesToLoad)
    schema = jsonDF.schema.json()
    schemaNew = StructType.fromJson(json.loads(schema))
    jsonDF2 = spark.read.schema(schemaNew).json(filesToLoad)
    

    The code runs through, but its obviously not useful because jsonDF and jsonDF2 do have the same content/schema. What I want to achieve, is adding some columns to 'schema' which will then be reflected in 'schemaNew'.

    I think I got it. Schemapath contains the already enhanced schema:

    schemapath = '/path/spark-schema.json'
    with open(schemapath) as f:
       d = json.load(f)
       schemaNew = StructType.fromJson(d)
       jsonDf2 = spark.read.schema(schmaNew).json(filesToLoad)
       jsonDF2.printSchema()
                    Hello @Moritz, Hope you are doing good. I am trying to implement this solution. However, I am getting KeyError: 'fields' error in schemaNew = StructType.fromJson(d). Could you please help me out here.   Schemapath contains the already enhanced schema.  What are you passing here exactly. Please help
    – SDS
                    Jun 16 at 14:10
    

    Why don't you define an empty DF with all columns that the JSON files can have? Then you load the JSONs into it. Here is an idea:

    For Spark 3.1.0:

    from pyspark.sql.types import *
    schema = StructType([
        StructField("fruit",StringType(),True),
        StructField("size",StringType(),True),
        StructField("color",StringType(),True)
    df = spark.createDataFrame([], schema)
    json_file_1 = {"fruit": "Apple","size": "Large"}
    json_df_1 = spark.read.json(sc.parallelize([json_file_1]))
    df = df.unionByName(json_df_1, allowMissingColumns=True)
    json_file_2 = {"fruit": "Banana","size": "Small","color": "Yellow"}
    df = df.unionByName(json_file_2, allowMissingColumns=True)
    display(df)
                    Well I tried that but it was a nightmare, because I would have to define 300+ columns, most of them nested. I came across this link sparkbyexamples.com/pyspark/pyspark-structtype-and-structfield . Chapter 7 pretty much describes the problem I am having. I just dont understand how I can parse the json file I manually enhanced to schemaFromJson = StructType.fromJson(json.loads(schema.json))
    – Moritz 
                    Oct 26, 2021 at 9:39
    

    You can check out this tool for generating pyspark schema from JSON input https://github.com/PreetRanjan/pyspark-schema-generator It helps generating the PySpark schema that you can use in your script and you can add or remove columns as per your requirement. It has few bugs but worked fine for me.

    Thanks for contributing an answer to Stack Overflow!

    • Please be sure to answer the question. Provide details and share your research!

    But avoid

    • Asking for help, clarification, or responding to other answers.
    • Making statements based on opinion; back them up with references or personal experience.

    To learn more, see our tips on writing great answers.