添加链接
link之家
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接
相关文章推荐
光明磊落的登山鞋  ·  Python3 File ...·  1 年前    · 
谦和的冲锋衣  ·  HDLBits: 在线学习 ...·  2 年前    · 
玩篮球的柠檬  ·  laravel ...·  2 年前    · 

对于不同的算子,单元测试的编写也不一样。下面我们编写一个单词统计Sample,来进行Flink单元测试支持的演示。

**该Flink示例功能就是实现单词计数。**首先自定义一个source不断地发出文本,然后编写代码进行处理解析,最后用滚动窗口每5秒进行一次计算,并放到有状态的存储中。

1. 引入依赖

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>${flink-version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_${scala-version}</artifactId>
    <version>${flink-version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_${scala-version}</artifactId>
    <version>${flink-version}</version>
</dependency>
<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-lang3</artifactId>
    <version>3.1</version>
</dependency>
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-api</artifactId>
    <version>1.7.5</version>
</dependency>
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    <version>1.7.5</version>
</dependency>

我使用的版本

  • flink-version: 1.12.0
  • scala-version:2.12
  • Flink单元测试需要用到其自带的test工具类,工具类分别在org.apache.flink:flink-runtime_2.11:tests:1.10.0 和 org.apache.flink:flink-streaming-java_2.11:tests:1.10.0里面,需要添加classifer:tests。

    2.构建持续发送句子的source

    public class SentenceSource extends RichSourceFunction<String> {
        private volatile Boolean isCanceled;
        private Random random;
        private static final String[] SENTENCE = {
                "The cat stretched"
                , "Jacob stood on his tiptoes"
                , "The car turned the corner"
                , "Kelly twirled in circles"
                , "She opened the door"
                , "Aaron made a picture"
                , "I am sorry"
                , "I danced"
        @Override
        public void open(Configuration parameters) throws Exception {
            this.isCanceled = false;
            this.random = new Random();
        @Override
        public void run(SourceContext<String> ctx) throws Exception {
            while (!isCanceled) {
                // 随机发送其中一条句子
                int randomIndex = random.nextInt(SENTENCE.length);
                ctx.collect(SENTENCE[randomIndex]);
                TimeUnit.MILLISECONDS.sleep(500);
        @Override
        public void cancel() {
            this.isCanceled = true;
    

    3. 实现将句子转换成单词的转换算子(transfer)

    public class SentenceToWordFunc extends RichFlatMapFunction<String, String> {
        private static final Logger logger = LoggerFactory.getLogger(SentenceToWordFunc.class);
        @Override
        public void flatMap(String sentence, Collector<String> out) throws Exception {
            if (StringUtils.isEmpty(sentence)) {
                logger.warn("Sentence is empty");
            } else {
                String[] words = sentence.split(" ");
                if (words != null && words.length > 0) {
                    for (String word : words) {
                        if (StringUtils.isEmpty(word)) {
                            logger.warn("Word is empty");
                        } else {
                            out.collect(word);
                } else {
                    logger.warn("Sentence is invalid. Sentence is {}.", sentence);
    

    4. 实现将单词转换为二元组的转换算子(用于统计单词数)

    public class WordToWordCountFunc extends RichMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public Tuple2<String, Integer> map(String word) throws Exception {
            Tuple2<String, Integer> wordCount = Tuple2.of(word, 1);
            return wordCount;
    

    5. 实现单词的聚合计算算子(用于窗口计算)

    public class WordCountReduceFunc implements ReduceFunction<Tuple2<String, Integer>> {
        @Override
        public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
            return Tuple2.of(value1.f0, value1.f1 + value2.f1);
    

    6. 实现使用ReducingState的ProcessWindowFunction算子,用于窗口单词数统计计算

    * TODO * 窗口处理:对单词进行聚合计算,并将计算结果保存到 State 中 public class ReduceWordStateWindowFunc extends ProcessWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow> { private static final Logger logger = LoggerFactory.getLogger(ReduceWordStateWindowFunc.class); private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); // 用于保存单词计算结果状态 private ReducingState<Tuple2<String, Integer>> reducingState; @Override public void open(Configuration parameters) throws Exception { // 创建一个State的description ReducingStateDescriptor<Tuple2<String, Integer>> reducingStateDes = new ReducingStateDescriptor( "wordCount", new WordCountReduceFunc(), // 聚合计算 TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {})); reducingState = getRuntimeContext().getReducingState(reducingStateDes); @Override public void process( String s, Context context, Iterable<Tuple2<String, Integer>> elements, Collector<Tuple2<String, Integer>> out) throws Exception { // 打印窗口window计算时间 TimeWindow window = context.window(); logger.info("window触发计算时间:{}", sdf.format(window.getEnd())); elements.forEach(item -> { try { if (reducingState != null) { // 将单词直接放入到状态中即可 reducingState.add(item); // 将结果继续输出到下游 out.collect(reducingState.get()); } catch (Exception e) { logger.error("add wordCount tuple to reducing state error!", e);

    7. 主函数, 组装pipeline

    public class WordCountTask {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            // 加载自定义数据源
            DataStreamSource<String> source = env.addSource(new SentenceSource());
            // 添加水位线
            SingleOutputStreamOperator<String> watermarksStream = source.assignTimestampsAndWatermarks(
                    WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                            .withTimestampAssigner((word, timeStamp) -> System.currentTimeMillis()));
            SingleOutputStreamOperator<String> wordDataStream = watermarksStream.flatMap(new SentenceToWordFunc());
            SingleOutputStreamOperator<Tuple2<String, Integer>> wordsDataSteam =
                    wordDataStream.map(new WordToWordCountFunc());
            KeyedStream<Tuple2<String, Integer>, String> keyedStream = wordsDataSteam.keyBy(value -> value.f0);
            // 5秒滚动时间窗口
            WindowedStream<Tuple2<String, Integer>, String, TimeWindow> tumblingWindowedStream =
                    keyedStream.window(TumblingEventTimeWindows.of(Time.seconds(5)));
            // 用状态进行聚合计算
            SingleOutputStreamOperator<Tuple2<String, Integer>> resultDataStream =
                    tumblingWindowedStream.process(new ReduceWordStateWindowFunc());
            resultDataStream.print();
            env.execute("wordCount for unit test");
    

    一切就绪,回到正题,开始测试

    1、对于无状态的算子,可以直接进行测试

    SentenceToWordFunc测试

    该函数有一个Runtime依赖,即Collector,最容易想到的解决办法是,对它进行mock

    private SentenceToWordFunc func;
    @Before
    public void setUp() throws Exception {
        func = new SentenceToWordFunc();
    @Test
    public void testByMock() throws Exception {
        String sentence = "The cat stretched";
        Collector collector = spy(Collector.class);
        func.flatMap(sentence, collector);
        Mockito.verify(collector, Mockito.times(1)).collect("The");
        Mockito.verify(collector, Mockito.times(1)).collect("cat");
        Mockito.verify(collector, Mockito.times(1)).collect("stretched"
    
    
    
    
        
    );
    

    此外,还可以crl+右键,观测一下Collecter的实现

    是接口类,那再crl+alt+B观测一下实现类

    存在一个ListCollector实现类,比较符合我们预期输出形式,点击进去

    可以看到,该类的具体实现对象需要传一个List参数,那测试就出来了

    private SentenceToWordFunc func;
    @Before
    public void setUp() throws Exception {
        func = new SentenceToWordFunc();
    @Test
    public void testFlatMap() throws Exception {
        String sentence = "The cat stretched";
        List<String> out = new ArrayList<>();
        ListCollector<String> collector = new ListCollector<>(out);
        func.flatMap(sentence, collector);
        Assert.assertEquals("The", out.get(0));
    

    2、source算子的测试

    source算子是进行数据的输出,并没有任何回调结果,进行测试的时候需要解决两个问题

  • 使用一个容器或其他类型对数据进行承载(需要实现一sink算子进行写入)
  • 过于庞大的数据或无界流数据,要主动进行截断(运行两个子进程,一进程运行,二进程暂停)
  • public class SentenceSourceTest extends ReduceWordStateWindowFunc {
        private StreamExecutionEnvironment env;
        @Test
        public void setUp() throws Exception {
            env = StreamExecutionEnvironment.getExecutionEnvironment();
            SentenceSource source = new SentenceSource();
            DataStream<String> dataStream = env.addSource(source);
            dataStream.print();
            env.execute();
        @Test
        public void run() throws Exception {
            env = StreamExecutionEnvironment.getExecutionEnvironment();
            SentenceSource source = new SentenceSource();
            DataStream<String> dataStream = env.addSource(source);
            dataStream.addSink(new CollectSink());
            dataStream.print();
            // 创建执行子线程
            new Thread(
                    () -> {
                        try {
                            env.execute();
                        } catch (Exception e) {
                            e.printStackTrace();
                    .start();
            TimeUnit.SECONDS.sleep(10);
            // 创建停止执行子线程
            new Thread(
                    () -> {
                        source.cancel();
                    .start();
            Assert.assertNotNull(CollectSink.VALUES);
        private static class CollectSink implements SinkFunction<String> {
            // must be static
            public static final List<String> VALUES = Collections.synchronizedList(new ArrayList<>());
            @Override
            public void invoke(String value, Context context) throws Exception {
                VALUES.add(value);
    

    3. 有状态算子

    由于有状态算子设计watermark、window、state ,如果直接使用Mock进行测试的话,具备一定的难度。官方提供了专门的test依赖来进行Flink有状态算子的测试。

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-test-utils_2.12</artifactId>
        <version>1.12.0</version>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-runtime_2.12</artifactId>
        <version>1.12.0</version>
        <scope>test</scope>
        <classifier>tests</classifier>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.12</artifactId>
        <version>1.12.0</version>
        <scope>test</scope>
        <classifier>tests</classifier>
    </dependency>
    

    官方提供四个流处理类:

  • OneInputStreamOperatorTestHarness(用于给DataStream测试)
  • KeyedOneInputStreamOperatorTestHarness(用于给KeyedStreams 测试)
  • TwoInputStreamOperatorTestHarness(用于给两个连接的DataStream测试)
  • KeyedTwoInputStreamOperatorTestHarness(用于给两个连接的KeyedStream测试)
  • 官方文档里面虽然对该部分列举了一个例子,但该例子并不能很好的兼容我们需要测试的算子,翻阅了各种博客,列举的例子也是参差不全,基本都是以官方文档的例子为中心,并不能给与帮助。这时候我们需要转换查找解决办法思路,查看源码。源码在开发过程,往往也会带有官方自己的测试类。

    还有stateDecscription,windowOperator等的使用

    经过一番折腾后

    public class ReduceWordStateWindowFuncTest extends ReduceWordStateWindowFunc {
        private KeyedOneInputStreamOperatorTestHarness<String, Tuple2<String, Integer>, Tuple2<String, Integer>>
                testHarness;
        @Before
        public void setUp() throws Exception {
            ReduceWordStateWindowFunc func = new ReduceWordStateWindowFunc();
            // 创建一个windowState Description
            ReducingStateDescriptor<Tuple2<String, Integer>> stateDescriptor =
                    new ReducingStateDescriptor(
                            "windowCount",
                            new WordCountReduceFunc(),
                            TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {})
                                    .createSerializer(new ExecutionConfig()));
            // 构造Operator模拟窗口行为
            WindowOperator operator =
                    new WindowOperator(
                            TumblingEventTimeWindows.of(Time.seconds(5)),
                            new TimeWindow.Serializer(),
                            new TupleKeySelector(),
                            BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
                            stateDescriptor,
                            new InternalSingleValueProcessWindowFunction(func),
                            EventTimeTrigger.create(),
                            null);
            testHarness =
                    new KeyedOneInputStreamOperatorTestHarness<String, Tuple2<String, Integer>, Tuple2<String, Integer>>(
                            operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
            testHarness.open();
        @Test
        public void process() throws Exception {
            testHarness.processElement(Tuple2.of("wordOne", 1), 1000);
            testHarness.processElement(Tuple2.of("wordTwo", 1), 1500);
            testHarness.processElement(Tuple2.of("wordOne", 1), 2000);
            testHarness.processElement(Tuple2.of("wordOne", 1), 4999);
            // 添加水位线,不添加的话,window会继续把数据发送到下游
            testHarness.processWatermark(new Watermark(4999));
            // 存储期望输出
            ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
            expectedOutput.add(new StreamRecord<>(Tuple2.of("wordOne", 3), 4999));
            expectedOutput.add(new StreamRecord<>(Tuple2.of("wordTwo", 1), 4999));
            expectedOutput.add(new Watermark(4999));
            // 校验结果
            TestHarnessUtil.assertOutputEqualsSorted(
                    "window计算错误", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
       //数据聚合是有Key的,需要一KeySelector
        private static class TupleKeySelector implements KeySelector<Tuple2<String, Integer>, String> {
            private static final long serialVersionUID = 1L;
            @Override
            public String getKey(Tuple2<String, Integer> value) throws Exception {
                return value.f0;
    	// 用于断言判断
        private static class Tuple2ResultSortComparator implements Comparator<Object>, Serializable {
            @Override
            public int compare(Object o1, Object o2) {
                if (o1 instanceof Watermark || o2 instanceof Watermark) {
                    return 0;
                } else {
                    StreamRecord<Tuple2<String, Integer>> sr0 = (StreamRecord<Tuple2<String, Integer>>) o1;
                    StreamRecord<Tuple2<String, Integer>> sr1 = (StreamRecord<Tuple2<String, Integer>>) o2;
                    if (sr0.getTimestamp() != sr1.getTimestamp()) {
                        return (int) (sr0.getTimestamp() - sr1.getTimestamp());
                    int comparison = sr0.getValue().f0.compareTo(sr1.getValue().f0);
                    if (comparison != 0) {
                        return comparison;
                    } else {
                        return sr0.getValue().f1 - sr1.getValue().f1;
    因为这里要模拟window的behavior,所以,得模拟一个WindowOpertor来模拟窗口。

    WindowOperator的构造参数有9个这么多,在不查看源码测试类的情况下,摸索的时间估计会更长(官方文档并没有该部分的介绍及Demo)

    WindowOperator的泛型有4个参数。K – key,表示KeyedStream中进行分流的key类型,我们这里是String。IN – 表示输入的类型,我们将单词转换为元组,所以此处是Tuple2<String, Integer>。文档漏了一个ACC - accumulator,累加,指中间变量的类型,我们这儿还是Tuple2。再下来是OUT – 表示输出的类型,仍然是Tuple2。最后一个W – Window的缩写,我们这里是滚动时间窗口,所以是TimeWindow。

    接下来看下构造参数

    所需参数:

    窗口分配器,我们这里使用的是滚动窗口TumblingEventTimeWindows.of(Time.seconds(5))

    窗口序列化器,窗口是一个对象,里面保存了开始时间、结束时间。我们直接用时间窗口的序列化器即可new TimeWindow.Serializer()。

    KeyedSelector,这个就是从输入中选择key的逻辑。

    Key类型的序列化器,我们这里使用的是String类型,所以直接使用使用Flink提供的String类型序列化器即可。BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig())

    StateDescriptor,把我们之前构建的对象传进来。

    InternalWindowFunction,这是一个接口 ,选择符合情况的实现类

    ci.apache.org/projects/fl…

    分类:
    后端
    标签: