添加链接
link之家
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接
相关文章推荐
非常酷的红茶  ·  Element: classList ...·  3 月前    · 
失落的木瓜  ·  CS224n--Lecture 1 ...·  2 年前    · 

指定不同格式来读取文件,例如以下为指定json格式读取数据:

df = spark.read.format('json').load('python/test_support/sql/people.json')

针对常用几个文件格式,pyspark也可以直接通过对应的文件格式读取,如:

df = spark.read.json('../datas/data.json')

具体内容见后文。

schema

DataFrameReader.schema(schema)

指定读取的数据的schema列信息,有些数据文件中没有结构信息,需要手动指定。有些文件类型如json可以省略这一步,文件中自带schema信息,不过也需要视具体情况而定。

spark.read.schema("col0 INT, col1 DOUBLE")
DataFrameReader.load(path=None, format=None, schema=None, **options)

通过load指定文件路径,也可以在括号内通过key-value的格式指定不同的格式、schema等。示例如下:

df = spark.read.load('../datas/data.json', format="json")

table

DataFrameReader.table(tableName)

从指定数据表中读取数据,并返回一个dataframe,使用如下:

df = spark.read.table('db.table')
df.show(5, False)

可以从指定的hive数据表中读取数据。

option

DataFrameReader.option(key, value)
DataFrameReader.options(**options)

将参数以key-value的形式指定,这部分参数需要依据读取的文件格式来指定不同的参数,可参考源码。

参考链接:spark.read.option

有以下这么三种方式读取json文件:

# json文件
df = spark.read.format('json').load('../datas/data.json')
df = spark.read.load('../datas/data.json', format="json")
df = spark.read.json('../datas/data.json')

这三种方法本质上都是一致的,都是读取单个json文件,可自行选择。

而如果我们要读取多个格式一致的json文件,可以采用以下方式:

df = spark.read.format('json').load(['../datas/data.json', '../datas/data2.json'])
df = spark.read.format('json').load('../output/data_json/')

当然啦,在实际工作中,我们很有可能碰到需要读取特定目录的文件,这时需要指定通配符来匹配对应的文件目录,例如:

spark.read.json("path/'{%s,%s}'%(day, dat_n)/")

还要很多其他格式,可以参考:读取文件通配符

有些情况下,我们并不需要将数据的所有列都读取出来,这时就可以通过指定schema来部分读取自己想要的列,同时也能指定数据的类型,如下所示:

df = spark.read.json('../datas/data.json')
df.printSchema()
df = spark.read.json('../datas/data.json', schema="name string, age int")
df.printSchema()
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)

结果显示,当不指定schema信息时,默认读取的name是string类型,age是long类型。而我们指定了age类型指定为int后,spark中其对应类型为integer。官方称这种格式为DDL-formatted string,也可以用spark自己的 pyspark.sql.types.StructType来指定schema。

上述都是在本地进行测试,如果是在实际工作中,如果数据是存储在HDFS文件系统中,读取的方式与上述一样,只不过路径是HDFS中的。

如果数据存储在S3中,我们的读取方式如下:

df = spark.read.json("s3://bucket/dir/")
# csv文件
df = spark.read.format('csv').load('../datas/data.csv')
df = spark.read.csv('../datas/data.csv')

以上两种方式是比较简单的读取csv文件的方法,而打印结果时会发现,并没有将表头当作dataframe的schema信息:

+-----+----+
|  _c0| _c1|
+-----+----+
| name| age|
|alice|  18|
|  bob|  19|
+-----+----+

这个时候就需要指定参数了,如下所示,这样就可以将结果正确的读取到dataframe中了。

df = spark.read.option("header", "true").csv('../datas/data.csv')
df.show()
df.printSchema()
+-----+----+
| name| age|
+-----+----+
|alice|  18|
|  bob|  19|
+-----+----+
 |-- name: string (nullable = true)
 |--  age: string (nullable = true)

也可以通过指定schema来读取没有表头的csv文件数据:

df = spark.read.csv('../datas/data.csv', schema="name: string, age: string")

指定age为int时结果读取到的数据是null,因而用string读取数据,具体原因暂未知。不过在指定表头读取的数据结果来看,age也是string类型,或许与文件类型有关。

parquet和orc

parquet与orc文件介绍参考:parquet与orc

这两种格式文件都是采用snappy压缩,snappy是一种高效的文件压缩方式,参考:snappy压缩

读取文件示例如下所示,两种方式读取的数据均能将schema信息读取到。

# parquet
df = spark.read.parquet('../output/data_parquet/')
df.show()
# orc
df = spark.read.orc('../output/data_orc/')
df.show()
+-----+---+
| name|age|
+-----+---+
|alice| 18|
|  bob| 19|
+-----+---+
+-----+---+
| name|age|
+-----+---+
|alice| 18|
|  bob| 19|
+-----+---+

读取数据表

可直接用spark的api接口读取:

df = spark.read.table('db.table')

不过这个方法比较单一,无法灵活的适应各种复杂的情况。复杂的情况可以使用sparksql进行数据表的读取:

df = spark.sql("select * from db.table where ...")

括号内就是普通的sql语句,可以指定选取的列,过滤的条件等。

DataFrameReader.jdbc(url, table, column=None, lowerBound=None, upperBound=None, numPartitions=None, predicates=None, properties=None)

示例如下:

url = "jdbc:mysql://IPaddress/database"
driver = "com.mysql.jdbc.Driver"
user = "username"
password = "passwd"
database = "database"
table = "tablename"
df = spark.read.format("jdbc") \
    .options(url=url, driver=driver, user=user, password=password) \
    .load(dbtable="%s.%s" % (database, table))
                    文章目录pyspark读取数据参数介绍formatschemaloadtableoption读取文件jsoncsvparquet和orc读取数据表hivejdbcpyspark读取数据参数介绍formatDataFrameReader.format(source)指定不同格式来读取文件,例如以下为指定json格式读取数据:df = spark.read.format('json').load('python/test_support/sql/people.json')针对常用几个文件格式
				
本教程是学习使用,使用环境是单机下。第一步,对系统进行配置:spark = SparkSession.builder .master("local") .appName("Word Count") .config("spark.some.config.option", "some-value") .getOrCreate()使用默认配置即可,后期再进行讲解。...
hadoop@rachel-virtual-machine:/usr/local/spark$ ./bin/pyspark ./bin/pyspark: 行 45: python: 未找到命令 Python 3.6.8 (default, Jan 14 201...
1、pyspark 读取与保存 参考:http://www.manongjc.com/detail/15-vfxldlrjpphxldk.html dt1 = spark.read.parquet(r'/home/Felix/pycharm_projects/test/testfile.parquet') print(dt1.show()) 保存的时候主要文件夹权限,不然报错ERROR FileOutputCommitter: Mkdirs failed to create file: 修改文件夹权限先
parquet数据:列式存储结构,由Twitter和Cloudera合作开发,相比于行式存储,其特点是: 可以跳过不符合条件的数据,只读取需要的数据,降低IO数据量;压缩编码可以降低磁盘存储空间,使用更高效的压缩编码节约存储空间;只读取需要的列,支持向量运算,能够获取更好的扫描性能。 那么我们怎么在pyspark读取和使用parquet数据呢?我以local模式,linux下的pycharm执行作说明。 首先,导入库文件和配置环境: import os from pyspark import SparkContext, SparkConf from pyspark.sql.session
sys.setdefaultencoding('utf-8') from pyspark import SparkConf,SparkContext from pyspark.sql import SQLContext conf=SparkConf().setAppName("spark_...
网上找了很多资料,例如 textFile 和wholeTextFiles方法。但这两个都是都是读取txt文件,然后生成RDD的格式。 如果再从 RDD 转为Pyspark 的 DataFrame格式,那么就多此一举了。 终于找到了直接读取 txt 文件并生成 DataFrame的方法,那就是 spark 的 read.txt 方法。 实现代码如下: from pyspark.sql i...
您好,如果在使用 PySpark 读取 Hive 时出现权限不足的问题,可能是因为您的用户没有足够的权限来访问 Hive 数据库。您可以尝试以下解决方法: 1. 确认您的用户在 Hive 中有足够的权限。您可以联系管理员或者使用 Hive 命令行工具来确认。 2. 在 PySpark 中指定正确的 Hive 元数据存储位置。您可以在 PySpark 中使用以下代码来指定 Hive 元数据存储位置: spark = SparkSession.builder.appName("myApp") \ .config("spark.sql.warehouse.dir", "/user/hive/warehouse") \ .enableHiveSupport() \ .getOrCreate() 其中,/user/hive/warehouse 是您的 Hive 元数据存储位置。 3. 确认您的 PySpark 版本和 Hive 版本兼容。如果版本不兼容,可能会导致权限不足的问题。 希望以上解决方法能够帮助您解决问题。如果问题仍然存在,请提供更多的信息,我们将尽力帮助您解决问题。