对于不同的算子,单元测试的编写也不一样。下面我们编写一个单词统计Sample,来进行Flink单元测试支持的演示。
**该Flink示例功能就是实现单词计数。**首先自定义一个source不断地发出文本,然后编写代码进行处理解析,最后用滚动窗口每5秒进行一次计算,并放到有状态的存储中。
1. 引入依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink-version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala-version}</artifactId>
<version>${flink-version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala-version}</artifactId>
<version>${flink-version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.1</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.5</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.5</version>
</dependency>
我使用的版本
flink-version: 1.12.0
scala-version:2.12
Flink单元测试需要用到其自带的test工具类,工具类分别在org.apache.flink:flink-runtime_2.11:tests:1.10.0 和 org.apache.flink:flink-streaming-java_2.11:tests:1.10.0里面,需要添加classifer:tests。
2.构建持续发送句子的source
public class SentenceSource extends RichSourceFunction<String> {
private volatile Boolean isCanceled;
private Random random;
private static final String[] SENTENCE = {
"The cat stretched"
, "Jacob stood on his tiptoes"
, "The car turned the corner"
, "Kelly twirled in circles"
, "She opened the door"
, "Aaron made a picture"
, "I am sorry"
, "I danced"
@Override
public void open(Configuration parameters) throws Exception {
this.isCanceled = false;
this.random = new Random();
@Override
public void run(SourceContext<String> ctx) throws Exception {
while (!isCanceled) {
int randomIndex = random.nextInt(SENTENCE.length);
ctx.collect(SENTENCE[randomIndex]);
TimeUnit.MILLISECONDS.sleep(500);
@Override
public void cancel() {
this.isCanceled = true;
3. 实现将句子转换成单词的转换算子(transfer)
public class SentenceToWordFunc extends RichFlatMapFunction<String, String> {
private static final Logger logger = LoggerFactory.getLogger(SentenceToWordFunc.class);
@Override
public void flatMap(String sentence, Collector<String> out) throws Exception {
if (StringUtils.isEmpty(sentence)) {
logger.warn("Sentence is empty");
} else {
String[] words = sentence.split(" ");
if (words != null && words.length > 0) {
for (String word : words) {
if (StringUtils.isEmpty(word)) {
logger.warn("Word is empty");
} else {
out.collect(word);
} else {
logger.warn("Sentence is invalid. Sentence is {}.", sentence);
4. 实现将单词转换为二元组的转换算子(用于统计单词数)
public class WordToWordCountFunc extends RichMapFunction<String, Tuple2<String, Integer>> {
@Override
public Tuple2<String, Integer> map(String word) throws Exception {
Tuple2<String, Integer> wordCount = Tuple2.of(word, 1);
return wordCount;
5. 实现单词的聚合计算算子(用于窗口计算)
public class WordCountReduceFunc implements ReduceFunction<Tuple2<String, Integer>> {
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
return Tuple2.of(value1.f0, value1.f1 + value2.f1);
6. 实现使用ReducingState的ProcessWindowFunction算子,用于窗口单词数统计计算
* TODO
* 窗口处理:对单词进行聚合计算,并将计算结果保存到 State 中
public class ReduceWordStateWindowFunc
extends ProcessWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow> {
private static final Logger logger = LoggerFactory.getLogger(ReduceWordStateWindowFunc.class);
private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
private ReducingState<Tuple2<String, Integer>> reducingState;
@Override
public void open(Configuration parameters) throws Exception {
ReducingStateDescriptor<Tuple2<String, Integer>> reducingStateDes = new ReducingStateDescriptor(
"wordCount",
new WordCountReduceFunc(),
TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));
reducingState = getRuntimeContext().getReducingState(reducingStateDes);
@Override
public void process(
String s,
Context context,
Iterable<Tuple2<String, Integer>> elements,
Collector<Tuple2<String, Integer>> out)
throws Exception {
TimeWindow window = context.window();
logger.info("window触发计算时间:{}", sdf.format(window.getEnd()));
elements.forEach(item -> {
try {
if (reducingState != null) {
reducingState.add(item);
out.collect(reducingState.get());
} catch (Exception e) {
logger.error("add wordCount tuple to reducing state error!", e);
7. 主函数, 组装pipeline
public class WordCountTask {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> source = env.addSource(new SentenceSource());
SingleOutputStreamOperator<String> watermarksStream = source.assignTimestampsAndWatermarks(
WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(2))
.withTimestampAssigner((word, timeStamp) -> System.currentTimeMillis()));
SingleOutputStreamOperator<String> wordDataStream = watermarksStream.flatMap(new SentenceToWordFunc());
SingleOutputStreamOperator<Tuple2<String, Integer>> wordsDataSteam =
wordDataStream.map(new WordToWordCountFunc());
KeyedStream<Tuple2<String, Integer>, String> keyedStream = wordsDataSteam.keyBy(value -> value.f0);
WindowedStream<Tuple2<String, Integer>, String, TimeWindow> tumblingWindowedStream =
keyedStream.window(TumblingEventTimeWindows.of(Time.seconds(5)));
SingleOutputStreamOperator<Tuple2<String, Integer>> resultDataStream =
tumblingWindowedStream.process(new ReduceWordStateWindowFunc());
resultDataStream.print();
env.execute("wordCount for unit test");
一切就绪,回到正题,开始测试
1、对于无状态的算子,可以直接进行测试
SentenceToWordFunc测试
该函数有一个Runtime依赖,即Collector,最容易想到的解决办法是,对它进行mock
private SentenceToWordFunc func;
@Before
public void setUp() throws Exception {
func = new SentenceToWordFunc();
@Test
public void testByMock() throws Exception {
String sentence = "The cat stretched";
Collector collector = spy(Collector.class);
func.flatMap(sentence, collector);
Mockito.verify(collector, Mockito.times(1)).collect("The");
Mockito.verify(collector, Mockito.times(1)).collect("cat");
Mockito.verify(collector, Mockito.times(1)).collect("stretched"
);
此外,还可以crl+右键,观测一下Collecter的实现
是接口类,那再crl+alt+B观测一下实现类
存在一个ListCollector实现类,比较符合我们预期输出形式,点击进去
可以看到,该类的具体实现对象需要传一个List参数,那测试就出来了
private SentenceToWordFunc func;
@Before
public void setUp() throws Exception {
func = new SentenceToWordFunc();
@Test
public void testFlatMap() throws Exception {
String sentence = "The cat stretched";
List<String> out = new ArrayList<>();
ListCollector<String> collector = new ListCollector<>(out);
func.flatMap(sentence, collector);
Assert.assertEquals("The", out.get(0));
2、source算子的测试
source算子是进行数据的输出,并没有任何回调结果,进行测试的时候需要解决两个问题
使用一个容器或其他类型对数据进行承载(需要实现一sink算子进行写入)
过于庞大的数据或无界流数据,要主动进行截断(运行两个子进程,一进程运行,二进程暂停)
public class SentenceSourceTest extends ReduceWordStateWindowFunc {
private StreamExecutionEnvironment env;
@Test
public void setUp() throws Exception {
env = StreamExecutionEnvironment.getExecutionEnvironment();
SentenceSource source = new SentenceSource();
DataStream<String> dataStream = env.addSource(source);
dataStream.print();
env.execute();
@Test
public void run() throws Exception {
env = StreamExecutionEnvironment.getExecutionEnvironment();
SentenceSource source = new SentenceSource();
DataStream<String> dataStream = env.addSource(source);
dataStream.addSink(new CollectSink());
dataStream.print();
new Thread(
() -> {
try {
env.execute();
} catch (Exception e) {
e.printStackTrace();
.start();
TimeUnit.SECONDS.sleep(10);
new Thread(
() -> {
source.cancel();
.start();
Assert.assertNotNull(CollectSink.VALUES);
private static class CollectSink implements SinkFunction<String> {
public static final List<String> VALUES = Collections.synchronizedList(new ArrayList<>());
@Override
public void invoke(String value, Context context) throws Exception {
VALUES.add(value);
3. 有状态算子
由于有状态算子设计watermark、window、state ,如果直接使用Mock进行测试的话,具备一定的难度。官方提供了专门的test依赖来进行Flink有状态算子的测试。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_2.12</artifactId>
<version>1.12.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime_2.12</artifactId>
<version>1.12.0</version>
<scope>test</scope>
<classifier>tests</classifier>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.12.0</version>
<scope>test</scope>
<classifier>tests</classifier>
</dependency>
官方提供四个流处理类:
OneInputStreamOperatorTestHarness(用于给DataStream测试)
KeyedOneInputStreamOperatorTestHarness(用于给KeyedStreams 测试)
TwoInputStreamOperatorTestHarness(用于给两个连接的DataStream测试)
KeyedTwoInputStreamOperatorTestHarness(用于给两个连接的KeyedStream测试)
官方文档里面虽然对该部分列举了一个例子,但该例子并不能很好的兼容我们需要测试的算子,翻阅了各种博客,列举的例子也是参差不全,基本都是以官方文档的例子为中心,并不能给与帮助。这时候我们需要转换查找解决办法思路,查看源码。源码在开发过程,往往也会带有官方自己的测试类。
还有stateDecscription,windowOperator等的使用
经过一番折腾后
public class ReduceWordStateWindowFuncTest extends ReduceWordStateWindowFunc {
private KeyedOneInputStreamOperatorTestHarness<String, Tuple2<String, Integer>, Tuple2<String, Integer>>
testHarness;
@Before
public void setUp() throws Exception {
ReduceWordStateWindowFunc func = new ReduceWordStateWindowFunc();
ReducingStateDescriptor<Tuple2<String, Integer>> stateDescriptor =
new ReducingStateDescriptor(
"windowCount",
new WordCountReduceFunc(),
TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {})
.createSerializer(new ExecutionConfig()));
WindowOperator operator =
new WindowOperator(
TumblingEventTimeWindows.of(Time.seconds(5)),
new TimeWindow.Serializer(),
new TupleKeySelector(),
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
stateDescriptor,
new InternalSingleValueProcessWindowFunction(func),
EventTimeTrigger.create(),
null);
testHarness =
new KeyedOneInputStreamOperatorTestHarness<String, Tuple2<String, Integer>, Tuple2<String, Integer>>(
operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
testHarness.open();
@Test
public void process() throws Exception {
testHarness.processElement(Tuple2.of("wordOne", 1), 1000);
testHarness.processElement(Tuple2.of("wordTwo", 1), 1500);
testHarness.processElement(Tuple2.of("wordOne", 1), 2000);
testHarness.processElement(Tuple2.of("wordOne", 1), 4999);
testHarness.processWatermark(new Watermark(4999));
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
expectedOutput.add(new StreamRecord<>(Tuple2.of("wordOne", 3), 4999));
expectedOutput.add(new StreamRecord<>(Tuple2.of("wordTwo", 1), 4999));
expectedOutput.add(new Watermark(4999));
TestHarnessUtil.assertOutputEqualsSorted(
"window计算错误", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
private static class TupleKeySelector implements KeySelector<Tuple2<String, Integer>, String> {
private static final long serialVersionUID = 1L;
@Override
public String getKey(Tuple2<String, Integer> value) throws Exception {
return value.f0;
private static class Tuple2ResultSortComparator implements Comparator<Object>, Serializable {
@Override
public int compare(Object o1, Object o2) {
if (o1 instanceof Watermark || o2 instanceof Watermark) {
return 0;
} else {
StreamRecord<Tuple2<String, Integer>> sr0 = (StreamRecord<Tuple2<String, Integer>>) o1;
StreamRecord<Tuple2<String, Integer>> sr1 = (StreamRecord<Tuple2<String, Integer>>) o2;
if (sr0.getTimestamp() != sr1.getTimestamp()) {
return (int) (sr0.getTimestamp() - sr1.getTimestamp());
int comparison = sr0.getValue().f0.compareTo(sr1.getValue().f0);
if (comparison != 0) {
return comparison;
} else {
return sr0.getValue().f1 - sr1.getValue().f1;
因为这里要模拟window的behavior,所以,得模拟一个WindowOpertor来模拟窗口。
WindowOperator的构造参数有9个这么多,在不查看源码测试类的情况下,摸索的时间估计会更长(官方文档并没有该部分的介绍及Demo)
WindowOperator的泛型有4个参数。K – key,表示KeyedStream中进行分流的key类型,我们这里是String。IN – 表示输入的类型,我们将单词转换为元组,所以此处是Tuple2<String, Integer>。文档漏了一个ACC - accumulator,累加,指中间变量的类型,我们这儿还是Tuple2。再下来是OUT – 表示输出的类型,仍然是Tuple2。最后一个W – Window的缩写,我们这里是滚动时间窗口,所以是TimeWindow。
接下来看下构造参数
所需参数:
窗口分配器,我们这里使用的是滚动窗口TumblingEventTimeWindows.of(Time.seconds(5))
窗口序列化器,窗口是一个对象,里面保存了开始时间、结束时间。我们直接用时间窗口的序列化器即可new TimeWindow.Serializer()。
KeyedSelector,这个就是从输入中选择key的逻辑。
Key类型的序列化器,我们这里使用的是String类型,所以直接使用使用Flink提供的String类型序列化器即可。BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig())
StateDescriptor,把我们之前构建的对象传进来。
InternalWindowFunction,这是一个接口 ,选择符合情况的实现类
ci.apache.org/projects/fl…