flume对接kafka配置文件|flume怎么实现传输数据到kafka1

|

① flume 将kafka数据导入hbase是怎么更新的

f1和f2下就只有一个hfile,f3下面没有hfile因为数据都被删除了 一次只能put一个column 一次只能delete一个column 删除整行,用deleteall deleteall 't1', 'r1'

② 如何搭建Flume分布式日志系统

你一步搭建Flume分布式日志系统在前篇几十条业务线日志系统如何收集处理?中已经介绍了Flume的众多应用场景,那此篇中先介绍如何搭建单机版日志系统。环境CentOS7.0java1.8下载官网下载 http://flume.apache.org/download.html当前最新版 apache-flume-1.7.0-bin.tar.gz下载后上传到CentOS中的/usr/local/ 文件夹中,并解压到当前文件中重命名为flume170 /usr/local/flume170tar -zxvf apache-flume-1.7.0-bin.tar.gz安装配置修改 flume-env.sh 配置文件,主要是添加JAVA_HOME变量设置JAVA_HOME=/usr/lib/jvm/java8设置Flume的全局变量 打开profilevi /etc/profile添加export FLUME=/usr/local/flume170export PATH=$PATH:$FLUME/bin然后使环境变量生效source /etc/profile验证是否安装成功flume-ng version测试小实例参考网上Spool类型的示例Spool监测配置的目录下新增的文件,并将文件中的数据读取出来。需要注意两点: 1) 拷贝到spool目录下的文件不可以再打开编辑。 2) spool目录下不可包含相应的子目录 创建agent配置文件复制代码# vi /usr/local/flume170/conf/spool.confa1.sources = r1a1.channels = c1a1.sinks = k1# Describe/configure the sourcea1.sources.r1.type = spooldira1.sources.r1.channels = c1a1.sources.r1.spoolDir =/usr/local/flume170/logsa1.sources.r1.fileHeader = true# Use a channel which buffers events in memorya1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100# Describe the sinka1.sinks.k1.type = loggera1.sinks.k1.channel = c1复制代码spoolDir:设置监控的文件夹,当有文件进入时会读取文件的内容再通过sink发送,发送完成后会在文件名后加上.complete启动flume agent a1/usr/local/flume170/bin/flume-ng agent -c . -f /usr/local/flume170/conf/spool.conf -n a1 -Dflume.root.logger=INFO,console追加一个文件到/usr/local/flume170/logs目录# echo "spool test1" > /usr/local/flume170/logs/spool_text.log在控制台,可以看到以下相关信息:复制代码14/08/10 11:37:13 INFO source.SpoolDirectorySource: Spooling Directory Source runner has shutdown.14/08/10 11:37:13 INFO source.SpoolDirectorySource: Spooling Directory Source runner has shutdown.14/08/10 11:37:14 INFO avro.: Preparing to move file /usr/local/flume170/logs/spool_text.log to/usr/local/flume170/logs/spool_text.log.COMPLETED 14/08/10 11:37:14 INFO source.SpoolDirectorySource: Spooling Directory Source runner has shutdown. 14/08/10 11:37:14 INFO source.SpoolDirectorySource: Spooling Directory Source runner has shutdown. 14/08/10 11:37:14 INFO sink.LoggerSink: Event: { headers:{file=/usr/local/flume170/logs/spool_text.log} body: 73 70 6F 6F 6C 20 74 65 73 74 31 spool test1 } 14/08/10 11:37:15 INFO source.SpoolDirectorySource: Spooling Directory Source runner has shutdown. 14/08/10 11:37:15 INFO source.SpoolDirectorySource: Spooling Directory Source runner has shutdown. 14/08/10 11:37:16 INFO source.SpoolDirectorySource: Spooling Directory Source runner has shutdown. 14/08/10 11:37:16 INFO source.SpoolDirectorySource: Spooling Directory Source runner has shutdown. 14/08/10 11:37:17 INFO source.SpoolDirectorySource: Spooling Directory Source runner has shutdown.复制代码出现上面的内容就表明已经可以运行了,整个安装过程很简单,主要是配置。至于分布式的需要设置source和sink。如上图,将每个业务中的Flume产生的日志再用一个Flume来接收汇总,然后将汇总后的日志统一发送给KafKa作统一处理,最后保存到HDFS或HBase中。上图中,每个业务中的Flume可以做负载和主备,由此可以看出有很强的扩展性。作者:欢醉

③ Kafka Connect的安装和配置

       在使用Kafka Connect时,需要注意一些事项,以帮助你构建适应长期需求的datapipeline。本章旨在提供有关的一些上下文。

       要开始使用Kafka Connect,只有一个硬性的先决条件:一个Kafka的broker集群。然而,随着集群增长,有几个问题需要提前考虑:

       在开始之前,确定哪种模式最适合您的环境非常有用。 对于适合单个代理的环境(例如从web服务器向Kafka发送日志),standalone模式非常适合。在单个source或sink可能需要大量数据的用例中(例如,将数据从Kafka发送到HDFS),分布式模式在可伸缩性方面更加灵活,并提供了高可用性服务,从而最小化停机时间。

       Kafka Connect插件是一组jar文件,Kafka Connect可以在其中找到一个或多个connector、transform、以及converter的实现。Kafka Connect将每个插件彼此隔离,这样一个插件中的库就不会受到其他插件库的影响,这点非常重要。

Kafka Connect plugin是: (1)在一个uber jar文件中包含插件及所有第三方依赖;或 (2)一个包含jar包和第三方依赖的目录。

       Kafka Connect使用plugin path找到插件,这是Kafka Connect在worker配置文件中定义的一个以逗号分隔的目录列表。要安装插件,请将目录或uber jar放在plugin path路径中列出的目录中。

        举个例子 ,我们在每台机器上创建一个/usr/local/share/kafka/plugins目录,然后将我们所有的插件jar或插件目录放入其中。然后在worker的配置文件中加入如下配置项:

       现在,当我们启动worker时,Kafka Connect可以发现这些插件中定义的所有connector、transform以及converter。Kafka Connect显式地避免了其他插件中的库, 并防止了冲突。

       如果要在同一个机器上运行多个standalone实例,有一些参数需要是独一无二的: (1)offset.storage.file.filename:connector偏移量的存储。 (2)rest.port:用于监听http请求的rest接口所占用的端口。

       connector和task的配置,offsets和状态会存储在Kafka的内部主题中,Kafka Connect会自动创建这些主题,且所有topic都使用了压缩清理策略。        如果要手动创建这些topic,推荐使用如下命令:

这里只列出一些有疑问的。

       配置了group.id的worker会自动发现彼此并形成集群。一个集群中的所有worker必须使用相同的三个Kafka topic来共享配置、偏移量以及状态,所有worker必须配置相同的config.storage.topic、offset.storage.topic以及status.storage.topic。

       每个converter实现类都有自己的相关配置需求。下面的例子展示了一个worker属性文件,其中使用的AvroConverter需要将Schema Registry的url作为属性进行传递。

注意: 除了其配置覆盖这些配置的connector,worker上运行的所有connector都使用这些converter。

④ flume通过什么原理采集数据到kafka

日志采集。线上数据一般主要是落地文件或者通过socket传输给另外一个系统。这种情况下,你很难推动线上应用或服务去修改接口,直接向kafka里写数据。这时候你可能就需要flume这样的系统帮你去做传输。对于数量级别,做过单机upd的flume source的配置,100+M/s数据量,10w qps flume就开始大量丢包。因此我们在搭建系统时,抛弃了flume,自己研发了一套传输系统。但flume设计的source-channel-sink模式还是比较好的,我们在开发系统时无耻的也抄袭了这种方式。

⑤ 找不到net40的kafka客户端,有什么好的替代方案吗落到本地flume收发到某个java中台再发kafka

4.0升级到4.5并不需要多少代码改变啊部署的时候目标电脑也只需要按照4.0也可以运行4.5的二进制文件

⑥ 求助flume + kafka 异常 java.nio.BufferUnderflowException

缓冲的长度不匹配,,,,,,,,,使用短缓冲,接收/处理 长缓冲框架处理原理分析:可以根据实际情况,设置buffersize的大小,让buffersize处在平均略高的状态下工作,节省由于初始设置过大,而浪费内存。解决方案:1,最简单的修改框架原码。将其中的NioSocketSession类重写即可,里面有TransportMetadata创建的地方修改。2,通过对断包或粘包的处理。

⑦ flume怎么实现传输数据到kafka1

日志采集。线上数据一般主要是落地文件或者通过socket传输给另外一个系统。这种情况下,你很难推动线上应用或服务去修改接口,直接向kafka里写数据。这时候你可能就需要flume这样的系统帮你去做传输。对于数量级别,做过单机upd的flume source的配置,100+M/s数据量,10w qps flume就开始大量丢包。因此我们在搭建系统时,抛弃了flume,自己研发了一套传输系统。但flume设计的source-channel-sink模式还是比较好的,我们在开发系统时无耻的也抄袭了这种方式。

⑧ flume消费数据从kafka到hdfs上,flume日志显示如图,hdfs上没有数据,有大神吗

好像是这样的:avro会将您的日志收集起来放到一个文件中,当它达到设定的大小是才执行“Renaming”操作(或者强制kill时执行);2.UNBOUND也困扰我一段时间,我的结论是,这不是一句报错,不信您仔细看看,那一行根本没有“ERROR”之类的提示。

⑨ kafka怎么收集到flume的日志

采集层 主要可以使用Flume, Kafka两种技术。Flume:Flume 是管道流方式,提供了很多的默认实现,让用户通过参数部署,及扩展API.Kafka:Kafka是一个可持久化的分布式的消息队列。Kafka 是一个非常通用的系统。你可以有许多生产者和很多的消费者共享多个主题Topics。相比之下,Flume是一个专用工具被设计为旨在往HDFS,HBase发送数据。它对HDFS有特殊的优化,并且集成了Hadoop的安全特性。所以,Cloudera 建议如果数据被多个系统消费的话,使用kafka;如果数据被设计给Hadoop使用,使用Flume。 正如你们所知Flume内置很多的source和sink组件。然而,Kafka明显有一个更小的生产消费者生态系统,并且Kafka的社区支持不好。希望将来这种情况会得到改善,但是目前:使用Kafka意味着你准备好了编写你自己的生产者和消费者代码。如果已经存在的Flume Sources和Sinks满足你的需求,并且你更喜欢不需要任何开发的系统,请使用Flume。 Flume可以使用拦截器实时处理数据。这些对数据屏蔽或者过量是很有用的。Kafka需要外部的流处理系统才能做到。 Kafka和Flume都是可靠的系统,通过适当的配置能保证零数据丢失。然而,Flume不支持副本事件。于是,如果Flume代理的一个节点奔溃了,即使使用了可靠的文件管道方式,你也将丢失这些事件直到你恢复这些磁盘。如果你需要一个高可靠行的管道,那么使用Kafka是个更好的选择。 Flume和Kafka可以很好地结合起来使用。如果你的设计需要从Kafka到Hadoop的流数据,使用Flume代理并配置Kafka的Source读取数据也是可行的:你没有必要实现自己的消费者。你可以直接利用Flume与HDFS及HBase的结合的所有好处。你可以使用Cloudera Manager对消费者的监控,并且你甚至可以添加拦截器进行一些流处理。Flume和Kafka可以结合起来使用。通常会使用Flume + Kafka的方式。其实如果为了利用Flume已有的写HDFS功能,也可以使用Kafka + Flume的方式。

⑩ flume发送数据到kafka如何设置异步发送

前面应该还有个数据生产者,比如flume. flume负责生产数据,发送至kafka。 spark streaming作为消费者,实时的从kafka中获取数据进行计算。 计算结果保存至redis,供实时推荐使用。 flume+kafka+spark+redis是实时数据收集与计算的一套经典架构…


赞 (0)