安装
下载地址,示例中使用flink-1.10.2-bin-scala_2.11.tgz
Standalone
tar -zxvf flink-1.10.2-bin-scala_2.11.tgz
cd flink-1.10.2
vim ./conf/vim flink-conf.yaml
#------------------------------------------
jobmanager.rpc.address: 192.168.241.100
#------------------------------------------
cd bin
./start-cluster.sh
# 打开http://192.168.241.100:8081/ 浏览flink dashboard
# 运行示例任务
./flink run ../examples/batch/WordCount.jar
./flink run ../examples/batch/WordCount.jar --input test.txt
# 默认只有一个task manager运行,追加一个taskmanager
./taskmanager.sh start
./bin/stop-cluster.sh
Yarn Cluster
项目开发
项目模板
官方站点提供了java/scala的项目模板,通过maven/gradle来创建。
scala
下载地址,在windows上安装,检查环境变量PATH是否已添加C:\Program Files (x86)\scala\bin
。
IDEA搜索插件Scala安装。
getting started
原理
Data Sources
transformation
Data Sinks
write*()方法用于debug,不参与checkpoint,at-least-once
语义,出现问题之后无法恢复
connectors
flink支持哪些组件?
checkpoints
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// start a checkpoint every 1000 ms
env.enableCheckpointing(1000);
// advanced options:
// set mode to exactly-once (this is the default)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// make sure 500 ms of progress happen between checkpoints
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// checkpoints have to complete within one minute, or are discarded
env.getCheckpointConfig().setCheckpointTimeout(60000);
// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// enable externalized checkpoints which are retained after job cancellation
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// allow job recovery fallback to checkpoint when there is a more recent savepoint
env.getCheckpointConfig().setPreferCheckpointForRecovery(true);
// enables the experimental unaligned checkpoints
env.getCheckpointConfig.enableUnalignedCheckpoints();
checkpoint 默认是关闭的,默认状态state保存在TaskManager的内存,checkpoint保存在JobManager的内存,状态后台还可以保存在文件,RocksDB。
算子
Transformation | Description |
---|---|
Reduce | 滚动统计keyedStream里面的单个key对应的数据 |
Fold | 滚动叠加keyedStream里面的单个key对应的数据,A fold function that, when applied on the sequence (1,2,3,4,5), emits the sequence "start-1", "start-1-2", "start-1-2-3", ... |
聚合函数 | min/minBy,min不会保证key字段及求最小值的字段以外的其它字段的值是否正确,但是minBy是保证当前元素中的每个字段值都是跟source中的元素保持一致的 max/maxBy,同上 |
window | 针对keyedStream,并行度可设置 |
windowAll | 可以针对原始的dataStream,并行度只能是1 |
connect | |
split |