安装

下载地址,示例中使用flink-1.10.2-bin-scala_2.11.tgz

Standalone

tar -zxvf flink-1.10.2-bin-scala_2.11.tgz
cd flink-1.10.2
vim ./conf/vim flink-conf.yaml
#------------------------------------------
jobmanager.rpc.address: 192.168.241.100
#------------------------------------------
cd bin
./start-cluster.sh
# 打开http://192.168.241.100:8081/ 浏览flink dashboard

# 运行示例任务
./flink run ../examples/batch/WordCount.jar
./flink run ../examples/batch/WordCount.jar --input test.txt

# 默认只有一个task manager运行,追加一个taskmanager
./taskmanager.sh start

./bin/stop-cluster.sh

Yarn Cluster

项目开发

项目模板

官方站点提供了java/scala的项目模板,通过maven/gradle来创建。

scala

下载地址,在windows上安装,检查环境变量PATH是否已添加C:\Program Files (x86)\scala\bin

IDEA搜索插件Scala安装。

getting started

datastream_api示例程序

原理

Data Sources

内置data sources

transformation

Data Sinks

内置data sinks

write*()方法用于debug,不参与checkpoint,at-least-once语义,出现问题之后无法恢复

connectors

flink支持哪些组件?

checkpoints

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// start a checkpoint every 1000 ms
env.enableCheckpointing(1000);

// advanced options:

// set mode to exactly-once (this is the default)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

// make sure 500 ms of progress happen between checkpoints
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);

// checkpoints have to complete within one minute, or are discarded
env.getCheckpointConfig().setCheckpointTimeout(60000);

// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

// enable externalized checkpoints which are retained after job cancellation
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

// allow job recovery fallback to checkpoint when there is a more recent savepoint
env.getCheckpointConfig().setPreferCheckpointForRecovery(true);

// enables the experimental unaligned checkpoints
env.getCheckpointConfig.enableUnalignedCheckpoints();

checkpoint 默认是关闭的,默认状态state保存在TaskManager的内存,checkpoint保存在JobManager的内存,状态后台还可以保存在文件,RocksDB

算子

文档

Transformation Description
Reduce 滚动统计keyedStream里面的单个key对应的数据
Fold 滚动叠加keyedStream里面的单个key对应的数据,A fold function that, when applied on the sequence (1,2,3,4,5), emits the sequence "start-1", "start-1-2", "start-1-2-3", ...
聚合函数 min/minBy,min不会保证key字段及求最小值的字段以外的其它字段的值是否正确,但是minBy是保证当前元素中的每个字段值都是跟source中的元素保持一致的
max/maxBy,同上
window 针对keyedStream,并行度可设置
windowAll 可以针对原始的dataStream,并行度只能是1
connect  
split  

ProcessFunction

watermark

参考

钉钉直播教程

flink 客户端操作