Java知识分享网 - 轻松学习从此开始!    

Java知识分享网

Java1234官方群25:java1234官方群17
Java1234官方群25:838462530
        
SpringBoot+SpringSecurity+Vue+ElementPlus权限系统实战课程 震撼发布        

最新Java全栈就业实战课程(免费)

springcloud分布式电商秒杀实战课程

IDEA永久激活

66套java实战课程无套路领取

锋哥开始收Java学员啦!

Python学习路线图

锋哥开始收Java学员啦!
当前位置: 主页 > Java文档 > 大数据云计算 >

flink学习笔记 PDF 下载


分享到:
时间:2021-04-21 10:30来源:http://www.java1234.com 作者:转载  侵权举报
flink学习笔记 PDF 下载
失效链接处理
flink学习笔记 PDF 下载


本站整理下载:
提取码:hw3h 
 
 
相关截图:
 
主要内容:


一.定义
Apache Flink 是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。Flink被设计在所有常见的集群环境中运行,以内存执行速度和任意规模来执行计算。
 
二.Flink框架Api
1.依赖的jar包
<dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-scala_2.11</artifactId>
        <version>1.7.2</version>
 </dependency>
<dependency>
         <groupId>org.apache.flink</groupId>
         <artifactId>flink-streaming-scala_2.11</artifactId>
         <version>1.7.2</version>
</dependency>
2.执行的环境
ExecutionEnvironment.getExecutionEnvironment  //搭建批处理环境
StreamEnvironment.getExecutionEnvionment  //搭建流式处理环境
3.Flink的source和Sink
1)Source  通过多种途径获取数据
从集合中获取 env.fromCollection(集合)
从文件中获取 env.readTextFile(文件路径)
从元素中获取 env.fromElements(各种类型组成的元素)
从kafka中获取:
Val p = new Properties()
p.setProperty(“bootstrap.servers”,”主机名”)
p.setProperty(“group.id”,”consumer-group”)
p.setProperty(“key.deserializer”,”org.apache.kafka.common.serialization.StringDeserialize”)
p.serProperty(“value.deserializer”,”org.apache.kafka.common.serialization.StringDeserializer”)
p.setProperty(“auto.offset.reset”,”latest”)
env.addSource(new FlinkKafkaConsumer011[String](“主题名”,new SimpleStringSchema(),配置项类))
自定义的source:
Env.addSource(new一个自定义的类)
让自定义的类继承SourceFunction(泛型为自定义的类),重写里边的方法
在run方法中写获取数据的流程,并通过上下文环境的形式将数据发送回来
JDBC的Source
Env.createInput(JDBCInputFormat.buildJDBCInputFormat())  //创建jdbc输入
.setDrivername(“com.mysql.jdbc.Driver”)  //jdbc驱动连接
.setDBUrl(“jdbc:mysql://主机名:3306/库”)  //连接路径
.setUsername(“用户”)
.setPassword(“密码”)
.setQuery(“sql”)
.setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.类型)) //行类型信息,basicTypeInfo后边跟数据的类型(*必须和数据库对应一致)
返回的数据可以通过map进行处理,用getField进行取值
2) Sink 发送数据到不同的地方
Kafka的sink
数据处理过后的dataStream.addSink(new FlinkKafkaProducer011[String](“主机名:端口号”,”主题”,new SimpleStringSchema()))  //生产者向主题发送数据
JDBC的 sink
处理过后的stream.addSink(自定义的jdbc的类)
让类继承RichSinkFunction[泛型与上边stream的内容保持一致]{
//从写里边三个方法
//需要三个全局变量Connection,新增返回值,修改返回值
   //PreparedStatement(为了全局引用)
Open{
Conn = DriverManager.getConnection(“jdbc:mysql://主机名称:3306”,”用户名”,”密码”)
insertSta = conn.PreparedStatement(“添加sql”)
updateSta = conn.PreparedStatement(“修改sql”)
}
Invoke{
//对open的sql语句进行赋值操作
通过修改返回值updateSta.set类型进行一一赋值
updateSta.execute()  //执行以下
}
Close{
//关闭三个资源
添加,修改和连接
}

 

------分隔线----------------------------

锋哥公众号


锋哥微信


关注公众号
【Java资料站】
回复 666
获取 
66套java
从菜鸡到大神
项目实战课程

锋哥推荐