博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
日志收集框架 Flume 组件之Source使用
阅读量:6906 次
发布时间:2019-06-27

本文共 4345 字,大约阅读时间需要 14 分钟。

上一篇简单介绍了Flume几个组件,今天介绍下组件其一的source,整理这些,也是二次学习的过程,也是梳理知识的过程。

Source 中文译为来源,源

作用:采集数据,然后把数据传输到channel上。
例如:监控某个文件或者某个端口或某个目录,新增数据,新增文件的变化,然后传输到channel。

常用的的source类型,也是平常用的比较多的几种类型,如下:

source类型 说明
Avro Source 支持avro协议,内置支持
Thrift Source 支持Thirft rpc ,内置支持
Exec Source 基于Unix的command在标准输出上采集数据 ,如tail -F
JMS Source 监控JMS系统,比如Activemq,可以
Taildir Source 监听目录或文件(Flume1.8版本支持)
Spooling Directory Source 监听目录下的新增文件
Kafka Source 读取Kafka数据

下面不多少,简单实战,没安装的可以google一下,好多安装教程,本文是基于Flume 1.8

Exec Source,前面说过了,exec source 是以tail -F 形式来监听文件的变化的,

flume-exec.conf配置:

#  http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied.  See the License for the# specific language governing permissions and limitations# under the License.# The configuration file needs to define the sources, # the channels and the sinks.# Sources, channels and sinks are defined per agent, # Name the components on this agenta1.sources = r1a1.sinks = k1a1.channels = c1# # Describe/configure the source# 配置类型为execa1.sources.r1.type = exec# 路径是自己要监听的日志路径a1.sources.r1.command = tail -F /usr/local/installed/tomcat/logs/system_app.loga1.sources.r1.channels = c1# # Describe the sink# 下沉sink是以日志的形式来打印a1.sinks.k1.type = logger# # Use a channel which buffers events in memory# channel采用以内存形式来存放上游source传递过来的数据a1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100# # Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1

具体使用步骤:

1、启动
进入到flume安装目录,../bin下,命令如下:

./bin/flume-ng agent -n a1 -c ../conf/ -f ../conf/flume-exec.conf

缺点:agent挂了,则不会记录上次传递数据的位置,还是以tail -F为准,来重新传递数据。

Taildir Source 监听目录文件变化,记录上一次同步后的位置,实现断点续传,可以保证没有重复数据的读取。

# The configuration file needs to define the sources, # the channels and the sinks.# Sources, channels and sinks are defined per agent, # Name the components on this agenta1.sources = r1a1.sinks = k1a1.channels = c1# # Describe/configure the sourcea1.sources.r1.type = TAILDIR# 保存监听文件的读取位置的文件a1.sources.r1.positionFile = /opt/flume/taildir_position.jsona1.sources.r1.filegroups = f1a1.sources.r1.filegroups.f1 = /usr/local/installed/tomcat/logs/system_app.loga1.sources.r1.batchSize = 100a1.sources.r1.backoffSleepIncrement = 1000a1.sources.r1.maxBackoffSleep = 5000## # Describe the sinka1.sinks.k1.type = logger## # Use a channel which buffers events in memorya1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100## # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1

具体测试,可以往监听的文件里写入数据,看看是否可以监听到数据。

Spooling Directory Source 监听目录文件的变化,

flume-spooling.conf 配置

# Name the components on this agenta1.sources = r1a1.sinks = k1a1.channels = c1# # Describe/configure the sourcea1.sources.r1.type = spooldira1.sources.r1.spoolDir = /usr/local/selfa1.sources.r1.deletePolicy = immediatea1.sources.r1.fileSuffix = completeda1.sources.r1.batchSize = 100# # Describe the sinka1.sinks.k1.type = logger## # Use a channel which buffers events in memorya1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100## # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1

备注:注意,只监听新增的文件,这个目录下有新增文件会被监听到。目录下子文件夹也不会被监听到,目录下以有的文件更新了,也不会被监听到。

Avro Source配置

# Name the components on this agenta1.sources = r1a1.sinks = k1a1.channels = c1# # Describe/configure the sourcea1.sources.r1.type = avroa1.sources.r1.bind = 127.0.0.1a1.sources.r1.port = 44444## # Describe the sinka1.sinks.k1.type = logger## # Use a channel which buffers events in memorya1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100## # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1

备注:此文件是监听127.0.0.1:44444端口的数据变化,可以telnet 127.0.0.1:44444,输入数据,看flume是否监听到数据。

其它的一些类型,可自行测试。

./bin/flume-ng agent -n a1 -c ../conf/ -f ../conf/flume-exec.conf
由于本文是在bin目录下启动的,没有更改flume产生日志的位置,所以会在/bin/logs/ 会有flume日志产生。

测试的时候,自己开一个窗口,监控日志的变化,由于本文是采用以log日志的形式输出,所以用这个命令tail -f ./bin/logs/flume.log 可以看到是否配置成功。


连接:

flume 概念以及模型简介地址:
日志收集框架 Flume 简介

转载于:https://blog.51cto.com/shangdc/2286265

你可能感兴趣的文章
查看Android应用签名信息
查看>>
Solr官方文档翻译-About & Getting Started
查看>>
MVC、MVP、MVVM、Angular.js、Knockout.js、Backbone.js、React.js、Ember.js、Avalon.js、Vue.js 概念摘录...
查看>>
优化中的subgradient方法
查看>>
fastDFS 一二事 - 简易服务器搭建(单linux)
查看>>
Elasticsearch初步使用(安装、Head配置、分词器配置)
查看>>
thinkphp 3.2.3 计划任务具体实现实例教程
查看>>
php源码解读
查看>>
linux编译ruby1.8.7 出现OPENSSL错误
查看>>
设计模式(五):中介者模式
查看>>
grep(Global Regular Expression Print)
查看>>
WCF学习之旅—WCF服务的批量寄宿(十三)
查看>>
解决“不是有效的win32应用程序”问题
查看>>
安装opencv以及遇到的坑
查看>>
C# 匿名函数
查看>>
MySQL锁系列2 表锁
查看>>
Lua中的closure(闭合函数)
查看>>
一个int类型引发的bug
查看>>
js 片段 - 控制类型为 text 的 input 类型
查看>>
CentOS安装中文支持(linux中文文件名乱码)
查看>>