Flink的DataSource三部曲之一:直接API

本文是《Flink的DataSource三部曲》系列的第一篇,该系列旨在通过实战学习和了解Flink的DataSource,为以后的深入学习打好基础,由以下三部分组成:

  1. 直接API:即本篇,除了准备环境和工程,还学习了StreamExecutionEnvironment提供的用来创建数据来的API;
  2. 内置connector:StreamExecutionEnvironment的addSource方法,入参可以是flink内置的connector,例如kafka、RabbitMQ等;
  3. 自定义:StreamExecutionEnvironment的addSource方法,入参可以是自定义的SourceFunction实现类;

Flink的DataSource三部曲文章链接

  1. 《Flink的DataSource三部曲之一:直接API》
  2. 《Flink的DataSource三部曲之二:内置connector》
  3. 《Flink的DataSource三部曲之三:自定义》

关于Flink的DataSource

官方对DataSource的解释:Sources are where your program reads its input from,即DataSource是应用的数据来源,如下图的两个红框所示:
在这里插入图片描述

DataSource类型

对于常见的文本读入、kafka、RabbitMQ等数据来源,可以直接使用Flink提供的API或者connector,如果这些满足不了需求,还可以自己开发,下图是我按照自己的理解梳理的:
在这里插入图片描述

环境和版本

熟练掌握内置DataSource的最好办法就是实战,本次实战的环境和版本如下:

  1. JDK:1.8.0_211
  2. Flink:1.9.2
  3. Maven:3.6.0
  4. 操作系统:macOS Catalina 10.15.3 (MacBook Pro 13-inch, 2018)
  5. IDEA:2018.3.5 (Ultimate Edition)

源码下载

如果您不想写代码,整个系列的源码可在GitHub下载到,地址和链接信息如下表所示(https://github.com/zq2599/blog_demos):

名称 链接 备注
项目主页 https://github.com/zq2599/blog_demos 该项目在GitHub上的主页
git仓库地址(https) https://github.com/zq2599/blog_demos.git 该项目源码的仓库地址,https协议
git仓库地址(ssh) [email protected]:zq2599/blog_demos.git 该项目源码的仓库地址,ssh协议

这个git项目中有多个文件夹,本章的应用在flinkdatasourcedemo文件夹下,如下图红框所示:
在这里插入图片描述

环境和版本

本次实战的环境和版本如下:

  1. JDK:1.8.0_211
  2. Flink:1.9.2
  3. Maven:3.6.0
  4. 操作系统:macOS Catalina 10.15.3 (MacBook Pro 13-inch, 2018)
  5. IDEA:2018.3.5 (Ultimate Edition)

创建工程

  1. 在控制台执行以下命令就会进入创建flink应用的交互模式,按提示输入gourpId和artifactId,就会创建一个flink应用(我输入的groupId是com.bolingcavalry,artifactId是flinkdatasourcedemo):
mvn \
archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeVersion=1.9.2
  1. 现在maven工程已生成,用IDEA导入这个工程,如下图:
    在这里插入图片描述
  2. 以maven的类型导入:
    在这里插入图片描述
  3. 导入成功的样子:
    在这里插入图片描述
  4. 项目创建成功,可以开始写代码实战了;

辅助类Splitter

实战中有个功能常用到:将字符串用空格分割,转成Tuple2类型的集合,这里将此算子做成一个公共类Splitter.java,代码如下:

package com.bolingcavalry;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
import org.apache.flink.util.StringUtils;

public class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
    @Override
    public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {

        if(StringUtils.isNullOrWhitespaceOnly(s)) {
            System.out.println("invalid line");
            return;
        }

        for(String word : s.split(" ")) {
            collector.collect(new Tuple2<String, Integer>(word, 1));
        }
    }
}

准备完毕,可以开始实战了,先从最简单的Socket开始。

Socket DataSource

Socket DataSource的功能是监听指定IP的指定端口,读取网络数据;

  1. 在刚才新建的工程中创建一个类Socket.java:
package com.bolingcavalry.api;

import com.bolingcavalry.Splitter;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;

public class Socket {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //监听本地9999端口,读取字符串
        DataStream<String> socketDataStream = env.socketTextStream("localhost", 9999);

        //每五秒钟一次,将当前五秒内所有字符串以空格分割,然后统计单词数量,打印出来
        socketDataStream
                .flatMap(new Splitter())
                .keyBy(0)
                .timeWindow(Time.seconds(5))
                .sum(1)
                .print();

        env.execute("API DataSource demo : socket");
    }
}

从上述代码可见,StreamExecutionEnvironment.socketTextStream就可以创建Socket类型的DataSource,在控制台执行命令nc -lk 9999,即可进入交互模式,此时输出任何字符串再回车,都会将字符串传输到本机9999端口;

  1. 在IDEA上运行Socket类,启动成功后再回到刚才执行nc -lk 9999的控制台,输入一些字符串再回车,可见Socket的功能已经生效:
    在这里插入图片描述

集合DataSource(generateSequence)

  1. 基于集合的DataSource,API如下图所示:
    在这里插入图片描述
  2. 先试试最简单的generateSequence,创建指定范围内的数字型的DataSource:
package com.bolingcavalry.api;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class GenerateSequence {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //并行度为1
        env.setParallelism(1);

        //通过generateSequence得到Long类型的DataSource
        DataStream<Long> dataStream = env.generateSequence(1, 10);

        //做一次过滤,只保留偶数,然后打印
        dataStream.filter(new FilterFunction<Long>() {
            @Override
            public boolean filter(Long aLong) throws Exception {
                return 0L==aLong.longValue()%2L;
            }
        }).print();

        env.execute("API DataSource demo : collection");
    }
}
  1. 运行时会打印偶数:
    4.

集合DataSource(fromElements+fromCollection)

  1. fromElements和fromCollection就在一个类中试了吧,创建FromCollection类,里面是这两个API的用法:
package com.bolingcavalry.api;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.ArrayList;
import java.util.List;

public class FromCollection {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //并行度为1
        env.setParallelism(1);

        //创建一个List,里面有两个Tuple2元素
        List<Tuple2<String, Integer>> list = new ArrayList<>();
        list.add(new Tuple2("aaa", 1));
        list.add(new Tuple2("bbb", 1));

        //通过List创建DataStream
        DataStream<Tuple2<String, Integer>> fromCollectionDataStream = env.fromCollection(list);

        //通过多个Tuple2元素创建DataStream
        DataStream<Tuple2<String, Integer>> fromElementDataStream = env.fromElements(
                new Tuple2("ccc", 1),
                new Tuple2("ddd", 1),
                new Tuple2("aaa", 1)
        );

        //通过union将两个DataStream合成一个
        DataStream<Tuple2<String, Integer>> unionDataStream = fromCollectionDataStream.union(fromElementDataStream);

        //统计每个单词的数量
        unionDataStream
                .keyBy(0)
                .sum(1)
                .print();

        env.execute("API DataSource demo : collection");
    }
}
  1. 运行结果如下:
    在这里插入图片描述

文件DataSource

  1. 下面的ReadTextFile类会读取绝对路径的文本文件,并对内容做单词统计:
package com.bolingcavalry.api;

import com.bolingcavalry.Splitter;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class ReadTextFile {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置并行度为1
        env.setParallelism(1);

        //用txt文件作为数据源
        DataStream<String> textDataStream = env.readTextFile("file:///Users/zhaoqin/temp/202003/14/README.txt", "UTF-8");

        //统计单词数量并打印出来
        textDataStream
                .flatMap(new Splitter())
                .keyBy(0)
                .sum(1)
                .print();

        env.execute("API DataSource demo : readTextFile");
    }
}
  1. 请确保代码中的绝对路径下存在名为README.txt文件,运行结果如下:
    在这里插入图片描述
  2. 打开StreamExecutionEnvironment.java源码,看一下刚才使用的readTextFile方法实现如下,原来是调用了另一个同名方法,该方法的第三个参数确定了文本文件是一次性读取完毕,还是周期性扫描内容变更,而第四个参数就是周期性扫描的间隔时间:
public DataStreamSource<String> readTextFile(String filePath, String charsetName) {
		Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(filePath), "The file path must not be null or blank.");

		TextInputFormat format = new TextInputFormat(new Path(filePath));
		format.setFilesFilter(FilePathFilter.createDefaultFilter());
		TypeInformation<String> typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
		format.setCharsetName(charsetName);

		return readFile(format, filePath, FileProcessingMode.PROCESS_ONCE, -1, typeInfo);
	}
  1. 上面的FileProcessingMode是个枚举,源码如下:
@PublicEvolving
public enum FileProcessingMode {

	/** Processes the current contents of the path and exits. */
	PROCESS_ONCE,

	/** Periodically scans the path for new data. */
	PROCESS_CONTINUOUSLY
}
  1. 另外请关注readTextFile方法的filePath参数,这是个URI类型的字符串,除了本地文件路径,还可以是HDFS的地址:hdfs://host:port/file/path

至此,通过直接API创建DataSource的实战就完成了,后面的章节我们继续学习内置connector方式的DataSource;

欢迎关注我的公众号:程序员欣宸

在这里插入图片描述

版权声明:本文为boling_cavalry原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/boling_cavalry/article/details/105467076

智能推荐

OpenCV学习之路(五)图像的几何变换

在这一章将要学习图像的移动、旋转,仿射变换等 扩展缩放 我们如果想要改变图像的大小,我们就需要对图像进行扩展缩放,opencv提供给我们控制扩展缩放的函数: 参数解释: src:进行扩展缩放的原图片 dst:可以在此处设置缩放因子,也可手动设置尺寸 interpolation:在缩放时我们推荐使用cv2.INTER_AREA, 在扩展时我们推荐使用cv2.INTER_CUBIC(慢) 和 cv2....

2018.8.27

2018.8.27...

HTML 表单元素的基本样式

HTML 表单元素的基本样式 原创 ixygj197875 发布于2018-02-22 17:48:53 阅读数 2296 收藏 更新于2018-05-20 15:35:58 分类专栏: 揭秘 CSS 揭秘 CSS 收起 表单元素主要包括 label、input、textarea、select、datalist、******、progress、meter、output等,以及对表单元素进行分组的 ...

php输出语句

php输出语句 常见的输出语句 echo(): 可以一次输出多个值,多个值之间用逗号分隔。echo是语言结构(language construct),而并不是真正的函数,因此不能作为表达式的一部分使用。 print(): 函数print()打印一个值(它的参数),如果字符串成功显示则返回true,否则返回false。 print_r(): 可以把字符串和数字简单地打印出来,而数组则以括起来的键和值...

工厂模式

简介 常见的实例化对象模式。 用工厂方法替代new操作的一种模式。 当我们使用new操作实例化对象时,调用构造函数完成初始化。若初始化仅是进行赋值等简单的操作,写入构造函数即可。但如果初始化时需要执行一长串复杂的代码,将多个工作装入一个方法,是不妥的。 创建实例与使用实例分离。将创建实例所需的大量初始化工作从基类的构造函数中分离出去。 简单工厂模式、工厂方法模式针对的是一个产品等级结构;而抽象工厂...

猜你喜欢

B1105 Spiral Matrix (画图)

B1105 Spiral Matrix (25分) //第一次只拿了21分 矩阵的长和宽,求最大因子,从sqrt(num)开始枚举. 每次循环一次,s++,t--,d--,r++ 测试点四运行超时,是因为输入一个数字的时候,需要直接输出这个数字。//1分 测试点二运行超时,最后一个数字不必再while循环一次,直接输出即可。//3分 最后一个测试点卡了好久/(ㄒoㄒ)/~~ 螺旋矩阵...

Java基础=>String,StringBuffer与StringBuilder的区别

字符串常量池 什么是字符串常量池? JVM为了减少字符串对象的重复创建,其维护了一块特殊的内存,这段内存被称为字符串常量池(存储在方法区中)。 具体实现 当代码中出现字符串时,JVM首先会对其进行检查。 如果字符串常量池中存在相同内容的字符串对象,如果有,则不再创建,直接返回这个对象的地址返回。 如果字符串常量池中不存在相同内容的字符串对象,则创建一个新的字符串对象并放入常量池,并返回新创建的字符...

java调用其他java项目的Https接口

项目中是这样的: 用户拿出二维码展示,让机器识别二维码, 机器调用开门的后台系统接口, 然后开门的后台系统接口需要调用管理系统的接口, 管理系统需要判断能不能开门.这两个系统是互相独立的.当时使用http调用是没有问题的.当时后来要求必须用https.废话不说,直接代码: 我的项目中调用的是 HttpsUtils.Get(utlStr) 这个接口 开门系统接口如下图:   管理系统的接口...

Hadoop1.2.1全分布式模式配置

一 集群规划 主机名            IP                               安装的软件 &nbs...

Go语言gin框架的安装

尝试安装了一下gin,把遇到的一些小问题来记录一下 安装步骤 首先来看看官方文档,链接点这里 可以看到安装步骤很简单,就一句话 在命令行中输入这句话运行等待就好。 问题来了,因为墙的问题,go get会很慢,所以命令行里面半天什么反应也没有,不要急,慢慢等着就会看到gin-gonic/gin这个目录出现 这个时候命令行还是没有结束,表示还在下一些东西。有的时候可能心急的人就停了(比如我),然后写个...