sample是抽样函数
t1 = train.sample(False, 0.2, 42)
t2 = train.sample(False, 0.2, 43)
t1.count(),t2.count()
Output:
(109812, 109745)
withReplacement = True or False代表是否有放回。
fraction = x, where x = .5,代表抽取百分比
when(condition, value1).otherwise(value2)
联合使用:
那么:当满足条件condition的指赋值为values1,不满足条件的则赋值为values2.
otherwise表示,不满足条件的情况下,应该赋值为啥。
demo1
>>> from pyspark.sql import functions as F
>>> df.select(df.name, F.when(df.age > 4, 1).when(df.age < 3, -1).otherwise(0)).show()
+-----+------------------------------------------------------------+
| name|CASE WHEN (age > 4) THEN 1 WHEN (age < 3) THEN -1 ELSE 0 END|
+-----+------------------------------------------------------------+
|Alice| -1|
| Bob| 1|
+-----+------------------------------------------------------------+
demo 2:多个when串联
df = df.withColumn('mod_val_test1',F.when(df['rand'] <= 0.35,1).when(df['rand'] <= 0.7, 2).otherwise(3))
between(lowerBound, upperBound)
筛选出某个范围内的值,返回的是TRUE or FALSE
>>> df.select(df.name, df.age.between(2, 4)).show()
+-----+---------------------------+
| name|((age >= 2) AND (age <= 4))|
+-----+---------------------------+
|Alice| true|
| Bob| false|
+-----+---------------------------+
选择dataframe中间的特定行数
而我使用的dataframe前两种方法都没法解决。特点如下:
特定列中的内容为字符串,并非数值,不能直接比较大小。
所选取数据为中间行,如第10~20行,不能用函数直接选取。
最终的解决方法如下:
首先添加行索引,然后选择特定区间内的行索引,从而选取特定中间行。
第一步,添加行索引。
from pyspark.sql.functions import monotonically_increasing_id
dfWithIndex = df.withColumn(“id”,monotonically_increasing_id())
第二步,筛选特定行。
dfWithIndex.select(dfWithIndex.name, dfWithIndex.id.between(50, 100)).show()
有这么两种常规的新建数据方式:createDataFrame
、.toDF()
sqlContext.createDataFrame(pd.dataframe())
是把pandas
的dataframe
转化为spark.dataframe
格式,所以可以作为两者的格式转化
from pyspark.sql import Row
row = Row("spe_id", "InOther")
x = ['x1','x2']
y = ['y1','y2']
new_df = sc.parallelize([row(x[i], y[i]) for i in range(2)]).toDF()
Row
代表的是该数据集的列名。
withColumn是通过添加或替换与现有列有相同的名字的列,返回一个新的DataFrame
result3.withColumn('label', 0)
train.withColumn('Purchase_new', train.Purchase /2.0).select('Purchase','Purchase_new').show(5)
Output:
+--------+------------+
|Purchase|Purchase_new|
+--------+------------+
| 8370| 4185.0|
| 15200| 7600.0|
| 1422| 711.0|
| 1057| 528.5|
| 7969| 3984.5|
+--------+------------+
only showing top 5 rows
**报错:**AssertionError: col should be Column,一定要指定某现有列
有两种方式可以实现:
from pyspark.sql import functions
result3 = result3.withColumn('label', functions.lit(0))
但是!! 如何新增一个特别List??(参考:王强的知乎回复)
python中的list不能直接添加到dataframe中,需要先将list转为新的dataframe,然后新的dataframe和老的dataframe进行join操作, 下面的例子会先新建一个dataframe,然后将list转为dataframe,然后将两者join起来。
from pyspark.sql.functions import lit
df = sqlContext.createDataFrame(
[(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3"))
from pyspark.sql.functions import monotonically_increasing_id
df = df.withColumn("id", monotonically_increasing_id())
df.show()
+---+---+-----+---+
| x1| x2| x3| id|
+---+---+-----+---+
| 1| a| 23.0| 0|
| 3| B|-23.0| 1|
+---+---+-----+---+
from pyspark.sql import Row
l = ['jerry', 'tom']
row = Row("pid", "name")
new_df = sc.parallelize([row(i, l[i]) for i in range(0,len(l))]).toDF()
new_df.show()
+---+-----+
|pid| name|
+---+-----+
| 0|jerry|
| 1| tom|
+---+-----+
join_df = df.join(new_df, df.id==new_df.pid)
join_df.show()
+---+---+-----+---+---+-----+
| x1| x2| x3| id|pid| name|
+---+---+-----+---+---+-----+
| 1| a| 23.0| 0| 0|jerry|
| 3| B|-23.0| 1| 1| tom|
+---+---+-----+---+---+-----+
#####**坑啊!!!**其中,monotonically_increasing_id()
生成的ID保证是单调递增和唯一的,但不是连续的。
所以,有可能,单调到1-140000,到了第144848个,就变成一长串:8845648744563,所以千万要注意!!
result3 = result3.withColumn('label', df.result*0 )
df = df.withColumn(“xx”, 1)
df = df.withColumn("year2", df["year1"].cast("Int"))
jdbcDF.withColumnRenamed( "id" , "idx" )
#####过滤数据(filter和where方法相同):
df = df.filter(df['age']>21)
df = df.where(df['age']>21)
多个条件jdbcDF .filter(“id = 1 or c1 = ‘b’” ).show()
#####对null或nan数据进行过滤:
from pyspark.sql.functions import isnan, isnull
df = df.filter(isnull("a")) # 把a列里面数据为null的筛选出来(代表python的None类型)
df = df.filter(isnan("a")) # 把a列里面数据为nan的筛选出来(Not a Number,非数字数据)
参考:
PySpark:使用isin过滤返回空数据框
[pyspark 实践汇总2](https://blog.csdn.net/yepeng2007fei/article/details/78874306)
有两个数据集,从data_1中抽取出data_2中的相同的元素
可行的方式:
df_ori_part = df_ori[df_ori['user_pin'].isin(list(df_1['user_pin']))]
df_ori_part = df_ori.filter(df_ori['user_pin'].isin(list(df_1['user_pin'])) == True )
df_ori_part = df_ori.filter(~df_ori['user_pin'].isin(list(df_1['user_pin'])) )
result3 = result1.union(result2)
jdbcDF.unionALL(jdbcDF.limit(1)) # unionALL
合并2个表的join方法:
df_join = df_left.join(df_right, df_left.key == df_right.key, "inner")
其中,方法可以为:inner
, outer
, left_outer
, right_outer
, leftsemi
.
其中注意,一般需要改为:left_outer
joinDF1.join(joinDF2, Seq("id", "name"))
joinDF1.join(joinDF2 , joinDF1("id" ) === joinDF2( "t1_id"))
跟pandas 里面的left_on,right_on
来看一个例子,先构造两个dataframe:
sentenceDataFrame = spark.createDataFrame((
(1, "asf"),
(2, "2143"),
(3, "rfds")
)).toDF("label", "sentence")
sentenceDataFrame.show()
sentenceDataFrame1 = spark.createDataFrame((
(1, "asf"),
(2, "2143"),
(4, "f8934y")
)).toDF("label", "sentence")
newDF = sentenceDataFrame1.select("sentence").subtract(sentenceDataFrame.select("sentence"))
newDF.show()
+--------+
|sentence|
+--------+
| f8934y|
+--------+
newDF = sentenceDataFrame1.select("sentence").intersect(sentenceDataFrame.select("sentence"))
newDF.show()
+--------+
|sentence|
+--------+
| asf|
| 2143|
+--------+
newDF = sentenceDataFrame1.select("sentence").union(sentenceDataFrame.select("sentence"))
newDF.show()
+--------+
|sentence|
+--------+
| asf|
| 2143|
| f8934y|
| asf|
| 2143|
| rfds|
+--------+
# 并集 + 去重
newDF = sentenceDataFrame1.select("sentence").union(sentenceDataFrame.select("sentence")).distinct()
newDF.show()
+--------+
|sentence|
+--------+
| rfds|
| asf|
| 2143|
| f8934y|
+--------+
有时候需要根据某个字段内容进行分割,然后生成多行,这时可以使用explode方法
下面代码中,根据c3字段中的空格将字段内容进行分割,分割的内容存储在新的字段c3_中,如下所示
jdbcDF.explode( "c3" , "c3_" ){time: String => time.split( " " )}
jdbcDF.stat.freqItems(Seq ("c1") , 0.3).show()
根据c4字段,统计该字段值出现频率在30%以上的内容
train.crosstab('Age', 'Gender').show()
Output:
+----------+-----+------+
|Age_Gender| F| M|
+----------+-----+------+
| 0-17| 5083| 10019|
| 46-50|13199| 32502|
| 18-25|24628| 75032|
| 36-45|27170| 82843|
| 55+| 5083| 16421|
| 51-55| 9894| 28607|
| 26-35|50752|168835|
+----------+-----+------+
train.groupby('Age').agg({'Purchase': 'mean'}).show()
Output:
+-----+-----------------+
| Age| avg(Purchase)|
+-----+-----------------+
|51-55|9534.808030960236|
|46-50|9208.625697468327|
| 0-17|8933.464640444974|
|36-45|9331.350694917874|
|26-35|9252.690632869888|
| 55+|9336.280459449405|
|18-25|9169.663606261289|
+-----+-----------------+
另外一些demo:
df['x1'].groupby(df['x2']).count().reset_index(name='x1')
train.groupby('Age').count().show()
Output:
+-----+------+
| Age| count|
+-----+------+
|51-55| 38501|
|46-50| 45701|
| 0-17| 15102|
|36-45|110013|
|26-35|219587|
| 55+| 21504|
|18-25| 99660|
+-----+------+
应用多个函数:
from pyspark.sql import functions
df.groupBy(“A”).agg(functions.avg(“B”), functions.min(“B”), functions.max(“B”)).show()
整合后GroupedData类型可用的方法(均返回DataFrame类型):
avg(*cols) —— 计算每组中一列或多列的平均值
count() —— 计算每组中一共有多少行,返回DataFrame有2列,一列为分组的组名,另一列为行总数
max(*cols) —— 计算每组中一列或多列的最大值
mean(*cols) —— 计算每组中一列或多列的平均值
min(*cols) —— 计算每组中一列或多列的最小值
sum(*cols) —— 计算每组中一列或多列的总和
将df的每一列应用函数f:
df.foreach(f) 或者 df.rdd.foreach(f)
将df的每一块应用函数f:
df.foreachPartition(f) 或者 df.rdd.foreachPartition(f)
map函数应用
可以参考:Spark Python API函数学习:pyspark API(1)
train.select('User_ID').rdd.map(lambda x:(x,1)).take(5)
Output:
[(Row(User_ID=1000001), 1),
(Row(User_ID=1000001), 1),
(Row(User_ID=1000001), 1),
(Row(User_ID=1000001), 1),
(Row(User_ID=1000002), 1)]
其中map在spark2.0就移除了,所以只能由rdd.调用。
data.select('col').rdd.map(lambda l: 1 if l in ['a','b'] else 0 ).collect()
print(x.collect())
print(y.collect())
[1, 2, 3]
[(1, 1), (2, 4), (3, 9)]
还有一种方式mapPartitions
:
def _map_to_pandas(rdds):
""" Needs to be here due to pickling issues """
return [pd.DataFrame(list(rdds))]
data.rdd.mapPartitions(_map_to_pandas).collect()
返回的是list。
udf 函数应用
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
import datetime
# 定义一个 udf 函数
def today(day):
if day==None:
return datetime.datetime.fromtimestamp(int(time.time())).strftime('%Y-%m-%d')
else:
return day
# 返回类型为字符串类型
udfday = udf(today, StringType())
df.withColumn('day', udfday(df.day))
有点类似apply,定义一个 udf 方法, 用来返回今天的日期(yyyy-MM-dd):
df.drop('age').collect()
df.drop(df.age).collect()
dropna函数:
df = df.na.drop() # 扔掉任何列包含na的行
df = df.dropna(subset=['col_name1', 'col_name2']) # 扔掉col1或col2中任一一列包含na的行
train.dropna().count()
Output:
166821
填充NA包括fillna
train.fillna(-1).show(2)
Output:
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|User_ID|Product_ID|Gender| Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|1000001| P00069042| F|0-17| 10| A| 2| 0| 3| -1| -1| 8370|
|1000001| P00248942| F|0-17| 10| A| 2| 0| 1| 6| 14| 15200|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
only showing top 2 rows
返回当前DataFrame中不重复的Row记录。该方法和接下来的dropDuplicates()方法不传入指定字段时的结果相同。
示例:
jdbcDF.distinct()
根据指定字段去重。类似于select distinct a, b操作
示例:
train.select('Age','Gender').dropDuplicates().show()
Output:
+-----+------+
| Age|Gender|
+-----+------+
|51-55| F|
|51-55| M|
|26-35| F|
|26-35| M|
|36-45| F|
|36-45| M|
|46-50| F|
|46-50| M|
| 55+| F|
| 55+| M|
|18-25| F|
| 0-17| F|
|18-25| M|
| 0-17| M|
+-----+------+
Pandas和Spark的DataFrame两者互相转换:
pandas_df = spark_df.toPandas()
spark_df = sqlContext.createDataFrame(pandas_df)
转化为pandas,但是该数据要读入内存,如果数据量大的话,很难跑得动
两者的异同:
- Pyspark DataFrame是在分布式节点上运行一些数据操作,而pandas是不可能的;
- Pyspark DataFrame的数据反映比较缓慢,没有Pandas那么及时反映;
- Pyspark DataFrame的数据框是不可变的,不能任意添加列,只能通过合并进行;
- pandas比Pyspark DataFrame有更多方便的操作以及很强大
与Spark RDD的相互转换:
rdd_df = df.rdd
df = rdd_df.toDF()
DataFrame注册成SQL的表:
df.createOrReplaceTempView("TBL1")
进行SQL查询(返回DataFrame):
conf = SparkConf()
ss = SparkSession.builder.appName("APP_NAME").config(conf=conf).getOrCreate()
df = ss.sql(“SELECT name, age FROM TBL1 WHERE age >= 13 AND age <= 19″)
在Python中,我们也可以使用SQLContext类中 load/save函数来读取和保存CSV文件:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
df = sqlContext.load(source="com.databricks.spark.csv", header="true", path = "cars.csv")
df.select("year", "model").save("newcars.csv", "com.databricks.spark.csv",header="true")
其中,header代表是否显示表头。
其中主函数:
save(path=None, format=None, mode=None, partitionBy=None, **options)[source]
Parameters:
-
path – the path in a Hadoop supported file system
-
format – the format used to save
-
mode –
-
specifies the behavior of the save operation when data already
exists.
-
append: Append contents of this DataFrame to existing data.
-
overwrite: Overwrite existing data.
-
ignore: Silently ignore this operation if data already exists.
-
error (default case): Throw an exception if data already exists.
-
partitionBy – names of partitioning columns
-
options – all other string options
场景是要,依据B表与A表共有的内容,需要去除这部分共有的。
使用的逻辑是merge两张表,然后把匹配到的删除即可。
from pyspark.sql import functions
def LeftDeleteRight(test_left,test_right,left_col = 'user_pin',right_col = 'user_pin'):
print('right data process ...')
columns_right = test_right.columns
test_right = test_right.withColumn('user_pin_right', test_right[right_col])
test_right = test_right.withColumn('notDelete', functions.lit(0))
# 删除其余的
for col in columns_right:
test_right = test_right.drop(col)
print('rbind left and right data ...')
test_left = test_left.join(test_right, test_left[left_col] == test_right['user_pin_right'], "left")
test_left = test_left.fillna(1)
test_left = test_left.where('notDelete =1')
# 去掉多余的字段
for col in ['user_pin_right','notDelete']:
test_left = test_left.drop(col)
return test_left
%time test_left = LeftDeleteRight(test_b,test_a,left_col = 'user_pin',right_col = 'user_pin')
Job aborted due to stage failure: Task 3 in stage 0.0 failed 4 times, most recent failure: Lost task 3.3 in
这里遇到的问题主要是因为数据源数据量过大,而机器的内存无法满足需求,导致长时间执行超时断开的情况,数据无法有效进行交互计算,因此有必要增加内存
参考:Spark常见问题汇总:https://my.oschina.net/tearsky/blog/629201
【总结】PySpark的DataFrame处理方法:增删改差
Spark-SQL之DataFrame操作大全
Complete Guide on DataFrame Operations in PySpark
笔者最近需要使用pyspark进行数据整理,于是乎给自己整理一份使用指南。pyspark.dataframe跟pandas的差别还是挺大的。1、——– 查 ——–— 1.1 行元素查询操作 —像SQL那样打印列表前20元素show函数内可用int类型指定要打印的行数:df.show()df.show(30)以树的形式打印概要df.prin...
数据帧用于
统计,机器学习和数据
操作/探索。 您可以将
Dataframe视为Excel电子表格。 该程序包设计轻巧直观。
:warning: 该软件包已准备好投入生产,但API尚未稳定。 达到稳定性后,将标记1.0.0版。 建议您的包管理器直接锁定到提交ID,而不是直接锁定master分支。 :warning:
:star: 该项目向您表示感谢。
从CSV,JSONL,Parquet,MySQL和PostgreSQL导入
导出为CSV,JSONL,Excel,Parquet,MySQL和PostgreSQL
开发人员友好
灵活-创建自定义系列(自定义数据类型)
与互
操作性。
伪数据生成
插值(ForwardFill,BackwardFill,Linear,Spline,Lagrange)
时间序列预测(SES,Holt-Winters)
绘图(跨平台)
请参阅此处的。
在上一篇文章中,我整理了pandas在数据合并和重塑中常用到的concat方法的使用说明。在这里,将接着介绍pandas中也常常用到的join 和merge方法
merge
pandas的merge方法提供了一种类似于SQL的内存链接操作,官网文档提到它的性能会比其他开源语言的数据操作(例如R)要高效。
和SQL语句的对比可以看这里
merge的参数
on:列名,join用来对齐的那一列的名字,用到这个参数的时候一定要保证左表和右表用来对齐的那一列都有相同的列名。
left_on:左表对齐的列,可以是列名,也可以是和dataframe同样长度的arrays。
right_on:右表对齐的列,可
import pandas as pd
from
pyspark.sql import SparkSession
from
pyspark.sql import SQLContext
from
pyspark import SparkContext
#初始化数据
#初始化pandas
DataFrame
df = pd.
DataFrame([[1, 2, 3], [4, 5, 6]], index=['row1', 'row2'], columns=['c1', 'c2', 'c3'])
#打印数据
在第一篇中进行的简单的介绍了Shiro的登录的实现https://blog.csdn.net/qq_38340127/article/details/109866392,因为主要为了后续的Spring结合Shiro,而Shiro通过Filer实现的,所以此篇主要讲Filter原理和使用。
一 Filter 过滤器
在web项目中Filer的主要功能就是对用户请求做预处理,接着将请求交给servlet处理并响应,然后Filter在对该相应进行后置处理。
web浏览器-------->web服务器-
1、https://spark.apache.org/docs/latest/api/python/reference/api/
pyspark.RDD.html
1、去除重复列
pyspark.sql.
DataFrame.dropDuplicates(subset=None)
作用:返回
删除重复行的新
DataFrame,可选择仅考虑某些列。
Examples
from
pyspark.sql import Row
df = sc.parallelize([ \
Row(name='Alice
1.创建dataframe
1.1读取文件来创建dataframe
from pyspark.sql import SparkSession #sparkSession为同统一入口
#创建spakr对象
spark = SparkSession\
.builder\
.appName('readfile')\
.getOrCreate()
# 1.读取csv,parquet等文件文件
logFilePath = 'births_train.csv'
log_df = spark.
文章目录0 准备工作1 使用PySpark1.1 使用shell1.2 使用脚本2 读hudi表3 创建hudi表格4 增量查询hudi表4.1 创建初始commit时间点4.2 增量查询5 删除操作
0 准备工作
原始数据库通过Debezium到kafka,kafka通过DeltaStream到hadoop。
运行程序:
confluent中connect-standlone【数据库变动经过Debezium到kafka】
hudi06-demo项目中的eureka、hudi-kafka-demo【kaf
def MyPartitioner(key): #自定义分区函数
print('MyPartitioner is running')
print('the key is %d'%key)
return key%10 #设定分区取值方式
def main():
print('the main function is running')
pySpark-flatten-dataframe
PySpark函数可展平从JSON / CSV / SQL / Parquet加载的任何复杂的嵌套数据框结构
例如,对于嵌套的JSON-
展平所有嵌套项:{“ human”:{“ name”:{“ first_name”:“ Jay Lohokare”}}}
通过column ='human-name-first_name'转换为dataFrame。可以通过更改连接器变量来更改连接器'-'。
爆炸数组:{“ array”:[“ one”,“ two”,“ three”]}转换为具有3行的column ='array'的dataFrame
该函数可以处理任何级别的嵌套。
该函数不能处理数组中的数组。 这只是为了保持代码的动态性和通用性。 为了处理内部数组数组,修改if isinstance在for的循环flattenSchema
Python DataFrame 如何设置列表字段/元素类型?
比如笔者想将列表的两个字段由float64设置为int64,那么就要用到DataFrame的astype属性,举例如图:
该例列表为“m_pred_survived”字段为“PassengerId”及“Survived”,设置为int64类型,最后可以输出检验下是否正确。
m_pred_survived = pd.DataFrame(columns=['PassengerId', 'Survived'])
以上这篇Python DataFrame设置/更改列表字段/元素类型的方法就是小编分享给大家的全部内容了,希望能给大家一个
# 创建一个DataFrame
df = spark.createDataFrame([(1, 'a'), (2, 'a'), (3, 'b'), (4, 'c'), (5, 'c')], ['id', 'value'])
# 使用groupBy和count函数来统计相同数据的个数
countDF = df.groupBy('value').agg(count('id').alias('count'))
# 查看结果
countDF.show()
+-----+-----+
|value|count|
+-----+-----+
| b| 1|
| c| 2|
| a| 2|
+-----+-----+
这将会返回一个新的DataFrame,其中包含每个唯一值的计数。在这个例子中,'a'重复出现2次,'b'和'c'分别仅出现1次和2次。
m0_69799751:
R语言︱数据去重
则则147:
python | prophet的案例实践:趋势检验、突变点检验等
坑挺多 | 联邦学习FATE:上传数据(一)
缺一味药: