Skip to main content

实战

配置文件

a1.sources = r1
a1.channels = c1
a1.sinks = k1

a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 3000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = ip:9092,ip:9092,ip:9092
#正则读取数据
a1.sources.r1.kafka.topics.regex = ^bigdata_demo.*
a1.sources.r1.kafka.consumer.group.id = bigdata_flum3
a1.sources.r1.setTopicHeader = true
a1.sources.r1.topicHeader = topic
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.bigdata.bigdatautil.TimestampFlinkCDCInterceptor$Builder
#第一次消费所有数据,如果group.id没有变,那么就从最新的地方消费
a1.sources.r1.kafka.consumer.auto.offset.reset=earliest


a1.channels.c1.type = file
#保存消费的位置信息
a1.channels.c1.checkpointDir = /home/bigdata/module/apache-flume-1.9.0-bin/checkpoint
a1.channels.c1.dataDirs = /home/bigdata/module/apache-flume-1.9.0-bin/data
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1123456
a1.channels.c1.keep-alive = 6

## sink1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/db/%{topic}_inc/%Y-%m-%d
a1.sinks.k1.hdfs.round = false


a1.sinks.k1.hdfs.rollInterval = 800
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0


a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = gzip

## 拼装
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1

jvm相关

vi flume-env.sh

export JAVA_OPTS="-Xms2048m -Xmx2048m"

拦截器的使用示例


public class TimestampInterceptor implements Interceptor {
@Override
public void initialize() {

}

@Override
public Event intercept(Event event) {

Map<String, String> headers = event.getHeaders();
String log = new String(event.getBody(), StandardCharsets.UTF_8);

JSONObject jsonObject = JSONObject.parseObject(log);

Long ts = jsonObject.getLong("ts");
String database_name = jsonObject.getString("database");
String table_name = jsonObject.getString("table");

//Maxwell输出的数据中的ts字段时间戳单位为秒,Flume HDFSSink要求单位为毫秒
String timeMills = String.valueOf(ts * 1000);


headers.put("timestamp", timeMills);
headers.put("database_name",database_name);
headers.put("table_name",table_name);
return event;

}

@Override
public List<Event> intercept(List<Event> events) {

for (Event event : events) {
intercept(event);
}

return events;
}

@Override
public void close() {

}

public static class Builder implements Interceptor.Builder {


@Override
public Interceptor build() {
return new TimestampInterceptor();
}

@Override
public void configure(Context context) {

}
}
}
将打好的包放入/opt/module/flume/lib文件夹下
flume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar

Flume启停文件

#!/bin/bash

case $1 in
"start")
echo " --------启动 abs3 flume-------"
ssh master2 "nohup /home/bigdata/module/apache-flume-1.9.0-bin/bin/flume-ng agent -n a1 -c /home/bigdata/module/apache-flume-1.9.0-bin/conf -f /home/bigdata/module/apache-flume-1.9.0-bin/conf/kafka_to_hdfs.conf >/dev/null 2>&1 &"
;;
"stop")

echo " --------启动 关闭 flume-------"
ssh master2 "ps -ef | grep kafka_to_hdfs.conf | grep -v grep |awk '{print \$2}' | xargs -n1 kill"
;;
esac