失效链接处理 |
flink学习笔记 PDF 下载
本站整理下载:
相关截图:
主要内容:
一.定义
Apache Flink 是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。Flink被设计在所有常见的集群环境中运行,以内存执行速度和任意规模来执行计算。
二.Flink框架Api
1.依赖的jar包
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>1.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.7.2</version>
</dependency>
2.执行的环境
ExecutionEnvironment.getExecutionEnvironment //搭建批处理环境
StreamEnvironment.getExecutionEnvionment //搭建流式处理环境
3.Flink的source和Sink
1)Source 通过多种途径获取数据
从集合中获取 env.fromCollection(集合)
从文件中获取 env.readTextFile(文件路径)
从元素中获取 env.fromElements(各种类型组成的元素)
从kafka中获取:
Val p = new Properties()
p.setProperty(“bootstrap.servers”,”主机名”)
p.setProperty(“group.id”,”consumer-group”)
p.setProperty(“key.deserializer”,”org.apache.kafka.common.serialization.StringDeserialize”)
p.serProperty(“value.deserializer”,”org.apache.kafka.common.serialization.StringDeserializer”)
p.setProperty(“auto.offset.reset”,”latest”)
env.addSource(new FlinkKafkaConsumer011[String](“主题名”,new SimpleStringSchema(),配置项类))
自定义的source:
Env.addSource(new一个自定义的类)
让自定义的类继承SourceFunction(泛型为自定义的类),重写里边的方法
在run方法中写获取数据的流程,并通过上下文环境的形式将数据发送回来
JDBC的Source
Env.createInput(JDBCInputFormat.buildJDBCInputFormat()) //创建jdbc输入
.setDrivername(“com.mysql.jdbc.Driver”) //jdbc驱动连接
.setDBUrl(“jdbc:mysql://主机名:3306/库”) //连接路径
.setUsername(“用户”)
.setPassword(“密码”)
.setQuery(“sql”)
.setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.类型)) //行类型信息,basicTypeInfo后边跟数据的类型(*必须和数据库对应一致)
返回的数据可以通过map进行处理,用getField进行取值
2) Sink 发送数据到不同的地方
Kafka的sink
数据处理过后的dataStream.addSink(new FlinkKafkaProducer011[String](“主机名:端口号”,”主题”,new SimpleStringSchema())) //生产者向主题发送数据
JDBC的 sink
处理过后的stream.addSink(自定义的jdbc的类)
让类继承RichSinkFunction[泛型与上边stream的内容保持一致]{
//从写里边三个方法
//需要三个全局变量Connection,新增返回值,修改返回值
//PreparedStatement(为了全局引用)
Open{
Conn = DriverManager.getConnection(“jdbc:mysql://主机名称:3306”,”用户名”,”密码”)
insertSta = conn.PreparedStatement(“添加sql”)
updateSta = conn.PreparedStatement(“修改sql”)
}
Invoke{
//对open的sql语句进行赋值操作
通过修改返回值updateSta.set类型进行一一赋值
updateSta.execute() //执行以下
}
Close{
//关闭三个资源
添加,修改和连接
}
|