docker上整合kafka和flume

2017-03-03 15:33:39

docker上运行这些系统确是很方便。记录一下在docker上运行整合kakfa,flume的过程,以做备忘

运行zookeeper

# docker pull wurstmeister/zookeeper
# docker run --name zookeeper -p 2181:2181 -t -d wurstmeister/zookeeper 
# docker ps -a
CONTAINER ID        IMAGE                    COMMAND                  CREATED             STATUS              PORTS                                                NAMES
3b9d2029c1b6        wurstmeister/zookeeper   "/bin/sh -c '/usr/sbi"   57 seconds ago      Up 56 seconds       22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp   zookeeper

我是在aws上运行的,所以pull速度很快

运行kafka

# docker pull wurstmeister/kafka
# docker run -id --name kafka  -e KAFKA_HEAP_OPTS="-Xmx256M -Xms128M" -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://52.37.134.248:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092  -e KAFKA_BROKER_ID=1 -e ZK=zk -p 9092:9092 --link zookeeper:zk   wurstmeister/kafka
# docker ps -a
CONTAINER ID        IMAGE                    COMMAND                  CREATED             STATUS              PORTS                                                NAMES
877992e80d14        wurstmeister/kafka       "start-kafka.sh"         6 seconds ago       Up 5 seconds        0.0.0.0:9092->9092/tcp                               kafka

由于在aws上运行,内存只有1G,所以run时指定了内存大小KAFKA_HEAP_OPTS="-Xmx256M -Xms128M"
为了方便外网访问kafka,启动时也指定了LISTENERS参数KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://52.37.134.248:9092KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092(52.37.134.248为宿主机器ip)

创建topic

# docker exec -it kafka /bin/bash
bash-4.3# echo $PATH
/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/opt/jdk/bin:/opt/kafka/bin
bash-4.3# cd /opt/kafka
bash-4.3# bin/kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic flume
Created topic "flume".
bash-4.3# bin/kafka-topics.sh --list --zookeeper zookeeper:2181
flume

通过exec进入容器,输出PATH可以查看kafka位置。

运行flume
flume比较麻烦的是,docker hub上没有最新的flume镜像,所以只好自己构造了。
编写Dockerfile,内容为

FROM openjdk:8u121-jre
MAINTAINER bin

RUN apt-get update && apt-get install -q -y --no-install-recommends wget



RUN mkdir /opt/flume
RUN wget -qO- http://archive.apache.org/dist/flume/1.7.0/apache-flume-1.7.0-bin.tar.gz \
  | tar zxvf - -C /opt/flume --strip 1

ADD start-flume.sh /opt/flume/bin/start-flume

ENV PATH /opt/flume/bin:$PATH

非常简单,就是下载flume,解压,及添加解压路径到PATH

构造镜像

# docker build -t  bin/flume .
# docker images
REPOSITORY               TAG                 IMAGE ID            CREATED             SIZE
bin/flume                latest              74c8cd29dd1b        46 minutes ago      404.3 MB
openjdk                  8u121-jre           a4d689e63201        2 days ago          309.2 MB

编写flume配置文件flume.conf

#指定agent的组件名称
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 指定Flume source(要监听的路径)
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /var/tmp/flume_log
a1.sources.r1.ignorePattern = ^(.)*\\.tmp|(.)*\\.COMPLETED$

# 指定Flume sink
#a1.sinks.k1.type = logger
#a1.sinks.k1.type = file_roll
#a1.sinks.k1.sink.directory = /var/tmp/flume_out/
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = flume
a1.sinks.k1.kafka.bootstrap.servers = kafka:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.ki.kafka.producer.compression.type = snappy

# 指定Flume channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 绑定source和sink到channel上
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

运行flume镜像

# docker run -v /home/ec2-user/docker/flume/flume.conf:/opt/flume/conf/flume.conf -v /home/ec2-user/docker/flume/flume_log:/var/tmp/flume_log   --link zookeeper:zk  --link kafka:kafka --name flume -it  bin/flume

注意,这时映射了conf文件,及监听路径flume_log(注意路径正确),同时link了kafka。
运行成功后,当前的命令行已经是容器内的命令,运行flume就行

root@29f5c8a54f13:/opt/flume# bin/flume-ng agent --conf conf --conf-file conf/flume.conf --name a1 -Dflume.root.logger=INFO,console

退出容器(ctrl+p ctrl+q),然后进入宿主机器的flume.conf,输出一个测试内容

# echo "hello" >> 1.log

进入kafka容器,使用bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic flume --from-beginning可以查看到已经接收到hello字符串了。

本想加上elasticsearch,kibana的,但aws已经跑不起来,所以暂且这样了。