失效链接处理 |
大数据培训心得 PDF 下载
本站整理下载:
相关截图:
主要内容:
Flume主要包括三部分source,channel,sink
Flume 最主要的作用是,实时读取服务器本地磁盘的数据,将数据写到 HDFS。
Source 负责接收数据,Channel 是位于 Source 和 Sink 之间的缓冲区,Sink 不断地轮询 Channel 中的事件且批量移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个 Flume Agent。
Event Flume 数据传输的基本单元,以 Event 的形式将数据从源头送至目的地。
安装 配置 简单使用见以下链接:
https://blog.csdn.net/weixin_42837961/article/details/104533147
本次实验命令
flume-ng agent --conf conf --conf-file job/dataCollect.conf -name a1 -Dflume.root.logger=INFO,console
实验思路: flume的source监听一个日志文件/home/disastrous/project/access.log,将监听到的数据发送给channel管道在将数据推送给sink sink关联的就是HDFS上的路径
注意:
1.Flume的文件夹下面建了一个job文件集,里面放每次执行时的配置文件,本次执行的配置文件:
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/disastrous/project/access.log
a1.sinks.k1.type = hdfs
#指定hdfs目录格式:年月日 (小时:/%y-%m-%d/%H/ 日小时:/%y-%m-%d/%d-%H/)
a1.sinks.k1.hdfs.path = hdfs://node1:9000/flume/webClick/logs/%y-%m-%d
#生成文件前缀
a1.sinks.k1.hdfs.filePrefix = webClick-
### 在hdfs上生成文件策略:三种策略,满足一个就会执行 ###
#以下策略:每隔60s或者文件大小超过10M的时候产生新文件
# hdfs有多少条消息时新建文件,0不基于消息个数
a1.sinks.k1.hdfs.rollCount=0
# hdfs创建多长时间新建文件,0不基于时间
a1.sinks.k1.hdfs.rollInterval=60
# hdfs多大时新建文件,0不基于文件大小 单位:byte
a1.sinks.k1.hdfs.rollSize=1024000000
# 当目前被打开的临时文件在该参数指定的时间(秒)内,没有任何数据写入,则将该临时文件关闭并重命名成目标文件
a1.sinks.k1.hdfs.idleTimeout=6
a1.sinks.k1.hdfs.fileType=DataStream
a1.sinks.k1.hdfs.useLocalTimeStamp=true
### 在hdfs上生成目录策略 每五分钟生成一个新目录: ###
# 是否启用时间上的”舍弃”,这里的”舍弃”,类似于”四舍五入”,如果启用,则会影响除了%t的其他所有时间表达式
a1.sinks.k1.hdfs.round=true
# 时间上进行“舍弃”的值;
a1.sinks.k1.hdfs.roundValue=24
# 时间上进行”舍弃”的单位,包含:second,minute,hour
a1.sinks.k1.hdfs.roundUnit=hour
a1.channels.c1.type = memory
a1.channels.c1.capacity = 20000
a1.channels.c1.keep-alive = 1000
a1.channels.c1.transactionCapacity = 10000
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
[记录报错]
1.执行后报错:
org.apache.flume.ChannelFullException: Space for commit to queue couldn't be acquired. Sinks are likely not keeping up with sources, or the buffer size is too tight
解决方法:
a1.channels.c1.capacity = 20000 //改大点
a1.channels.c1.transactionCapacity = 10000 //改大点
a1.channels.c1.keep-alive = 1000 //填上这句 可能是一下到达的数据太多了
//缓冲区大小最大多少和虚拟机配置的内存大小有关系
|