依赖数据源的类型,你可以将用于批处理和流的DataSet接口做为数据源写一个batch或者streaming程序。本节教程旨在介绍在这两个方向上通用接口的基本概念。
注:当我以StreamingExecutionEnvironment和DataStream API来作为讲述这些概念的实例时。其他和DataSet API是一样的,仅仅使用了ExecutionEnvironment和DataSet来指代而已。
DataSet和DataStream
Flink使用特殊的类——DataSet和DataStream来表达程序中的数据。你可以把他们想象成包含可以复制的数据的不可变集合。在DataSet中的数据是有限的但是在一个DataStream中元素的数量是无上限的。
这些集合在一些关键的点上还是和常用的java集合有一些区别的。首先,他们是可变的,意味着一旦他们被创建就不能增加或者移除任何元素。你要做的不仅仅只是检查里面的元素这么简单。
一个集合是通过在Flink程序中增加一个source来初始化的。新的集合通过调用像map、filter等函数来做转换而生成的。
Flink程序的结构
Flink程序就像是转换数据集合的程序一样。每个程序都包含下面的部分:
1 获取执行环境
2 加载/创建初始化数据
3 说明在数据上要做的转换
4 说明将你计算的结果保存在哪
5 执行程序
我现在只是给出这些步骤的一个总览,如果需要深入学习还需要了解更多。关于java DataSet API的核心类都可以在包org.apache.flink.api.java中找到,java DataStream API的核心类可以在包org.apache.flink.streaming.api中找到。
StreamExecutionEnvironment是所有的Flink程序的基础。你可以通过下面列举的静态方法来获取:
getExecutionEnvironment()createLocalEnvironment()createRemoteEnvironment(String host,int port,String... jarFiles)
一般情况下,你只需要使用getExecutionEnvironment(),他依赖如下的上下文:如果你是在IDE中作为一般java程序来执行的话,他会在你本地的机器上创建一个本地环境来运行你的代码。如果你将自己的程序打包成jar文件,通过命令行的方式调用,Flink集群管理器就会执行你的main方法,getExecutionEnvironment()将会为你在集群上的程序打造一个执行环境。
对于特定的数据源执行环境使用很多方法从文件中读取数据:你只需要逐行读取,就像CSV文件那样,或者使用完整的自定义数据输入格式。为了将文本文件作为一系列行的集合来读,你可以这么用:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamtext = env.readTextFile("file:///path/to/file");
这样你就可以通过这个DataStream来创建转换为新的DataStream输入源。你可以通过调用一个转换函数来应用这个转换。举个例子,一个map转换像是这样:
DataStreaminput = ...;DataStream parsed = input.map(new MapFunction () { @Override public Integer map(String value) { return Integer.parseInt(value); }});
这样你就可以将原始集合中的每个字符串转换成整数。
一旦你有了一个包含你最终结果的DataStream,你就可以通过创建一个sink来写入外部系统。下面是创建sink的一些示例方法:
writeAsText(String path)print()
一旦你完成了整个程序,你需要通过在StreamExecutionEnvironment上调用execute()来执行你的程序。依赖于ExecutionEnvironment的类型执行环境可以决定是在本地机器运行还是将你的程序提交到集群上执行。
execute()方法返回一个JobExecutionResult,他包含执行时间和累加结果。
延迟评估
所有的Flink程序都是延迟执行的:当主函数执行的时候,数据往往不会立刻就加载并转换。然而,每个操作都被创建并加入了程序计划中。当执行程序通过execute()触发的时候操作才真正被执行。不管执行环境的类型是本地还是集群的方式。
延迟评估使得Flink的执行程序作为历史镜像计划单元来安排程序的运行。
特定的键
一些转换(join,coGroup,keyBy,groupBy)需要一个在集合元素中定义的键。另外一些(Reduce,GroupReduce,Aggregate,Windows)则允许数据可以在他们被使用之前在键上组织。
DataSet被下面这样的方式组织:
DataSet<...> input = //[...]DataSet<...> reduced = input .groupBy(/*define key here*/) .reduceGroup(/*do something*/);
而在DataStream中是这样做的:
DataStream<...> input = //[...]DataStream<...> windowed = input .keyBy(/*define key here*/) .window(/*window specification*/);
Flink的数据格式不是基于键值对的。因此,你也没必要非得讲数据集合转成键和值的类型。键是“虚拟”的:他们作为真实数据上的函数来指导分组算子。
注:下面我将使用DataStream API和keyBy函数。请自行脑补DataSet API的方式(你只需要将他们替换成DataSet和groupBy)。
为Tuples定义键
最简单的方式是在一个或多个Tuple的域上进行分组的Tuples:
DataStream> input = //[...]KeyedStream ,Tuple> keyed = input.keyBy(0)
元组在第一个域上进行分组:
DataStream> input = //[...]KeyedStream ,Tuple> keyed = input.keyBy(0,1)
现在,我使用包含第一个和第二个域的组合键来分组元组:
如果你有一个嵌套tuple的DataStream数据,比如:
DataStream,String,Long>> ds;
使用keyBy(0)可以让系统来调用完整的Tuple2作为一个键。如果你想要看下嵌套的Tuple2是怎么用的,那你就不得不使用如下介绍的——field expression(域表达式)
使用域表达式定义键
你可以使用基于字符串的域表达式来引出嵌套域和grouping,sorting,joining和coGrouping的定义。
域表达式使得你可以像Tuple和Java POJO一样简单地在嵌套组合类型中切换域。
在下面的例子中,你有一个WC的POJO,他有两个域——word和count。为了按照word分组,你只需要按照他的name来调用keyBy()函数。
//some ordinary POJOpublic class WC { public String word; public int count;}DataStreamwords = //[...]DataStream wordCounts = words.keyBy("word").window(/*window specification*/);
域表达式示例:
public static class WC { public ComplexNestedClass complex; //nested POJO private int count; //getter / setter for private field(count) public int getCount() { return count; } public void setCount(int c) { this.count = c; }}public static class ComplexNestedClass { public Integer someNumber; public float someFloat; public Tuple3word; public IntWritable hadoopCitizen;}
使用Selector函数来定义键:
另外一种定义键的方式就是“key selector”函数。一个key selector函数将一个独立的元素来作为参数,返回这个元素的键。这个键可以是任意类型并且可以从计算结果中取出。
下面这个例子展示了key selector是怎样返回一个对象的域的:
// some ordinary POJOpublic class WC {public String word; public int count;}DataStreamwords = //[...]KeyedStream keyed = words .keyBy(new KeySelector () { pubic String getKey(WC wc) {return wc.word;}});
定义转换函数
大部分的转换都需要用户自定义的函数。这里列举了不同的方式:
实现一个接口
最常用的方式就是实现一个提供的接口:
class MyMapFunction implements MapFunction{ public Integer map(String value) {return Integer.parseInt(value);}});data.map(new MyMapFunction());
匿名类
你还可以通过一个叫做匿名类的东东:
data.map(new MapFunction(){ public Integer map(String value) {return Integer.parseInt(value);}});
Java 8 Lambdas函数
Flink也支持Java 8中定义的Lambda接口。
data.filter(s->s.startsWith("http://"));data.reduce((i1,i2) -> i1 + i2);