记录一下自己搭建Flink环境的过程,给新手一些参考。一个简单的统计用户打开内容次数,数据从Kafka消费,经过Flink做简单的计算后,存入MySQL供后台展示,为了减少出错,降低上手难度,字段非常精简。
实验环境准备
创建Docker
还是使用Docker,下面是 docker-compose.yml
version: "3.7"
services:
FlinkJobManager: # define the job manager service
image: flink:1.15.0-scala_2.12-java8
container_name: FlinkJobManager
hostname: FlinkJobManager
expose:
- "6123"
ports:
- "8081:8081" # map port 8081 of container to port 8081 of localhost
command: jobmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: FlinkJobManager
FlinkTaskManager: # define the task manager service
image: flink:1.15.0-scala_2.12-java8
#container_name: FlinkTaskManager
#hostname: FlinkTaskManager
expose:
- "6121"
- "6122"
depends_on:
- FlinkJobManager # ensure the sequence of docker composing
command: taskmanager
scale: 2 # replica
links:
- "FlinkJobManager:jobmanager" # link to job manager service
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: FlinkJobManager
taskmanager.numberOfTaskSlots: 2
zookeeper:
image: confluentinc/cp-zookeeper:6.2.0
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-kafka:6.2.0
container_name: broker
ports:
- "9092:9092"
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_INTERNAL://broker:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
mysql:
image: mysql:5.7
ports:
- "3306:3306"
command: [
'mysqld',
'--innodb-buffer-pool-size=80M',
'--character-set-server=utf8mb4',
'--collation-server=utf8mb4_unicode_ci',
'--default-time-zone=+8:00',
'--lower-case-table-names=1'
]
environment:
# root 密码
MYSQL_ROOT_PASSWORD: root
添加Flink运行时的Lib
应为需要连接Kafka和Mysql需要添加对应驱动
在两台 FlinkTaskManager 中安装连接依赖(下载下面的jar到 /opt/flink/lib),容器内有wget,可以使用wget + jar 下载地址的方式下载。下载完记得重启容器。
Kafka
官方文档地址
https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/connectors/table/kafka/
Jar文件地址
https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka/1.15.0/flink-sql-connector-kafka-1.15.0.jar
jdbc & MySQL
https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/connectors/table/jdbc/
https://repo.maven.apache.org/maven2/org/apache/flink/flink-connector-jdbc/1.15.0/flink-connector-jdbc-1.15.0.jar
https://repo.maven.apache.org/maven2/mysql/mysql-connector-java/8.0.29/mysql-connector-java-8.0.29.jar
验证Flink SQL是否可以正常启动
进入 FlinkJobManager 容器,启动 FlinkSQL客户端
docker-compose exec FlinkJobManager bash
./bin/sql-client.sh
看到如下输出即可
▒▓██▓██▒
▓████▒▒█▓▒▓███▓▒
▓███▓░░ ▒▒▒▓██▒ ▒
░██▒ ▒▒▓▓█▓▓▒░ ▒████
██▒ ░▒▓███▒ ▒█▒█▒
░▓█ ███ ▓░▒██
▓█ ▒▒▒▒▒▓██▓░▒░▓▓█
█░ █ ▒▒░ ███▓▓█ ▒█▒▒▒
████░ ▒▓█▓ ██▒▒▒ ▓███▒
░▒█▓▓██ ▓█▒ ▓█▒▓██▓ ░█░
▓░▒▓████▒ ██ ▒█ █▓░▒█▒░▒█▒
███▓░██▓ ▓█ █ █▓ ▒▓█▓▓█▒
░██▓ ░█░ █ █▒ ▒█████▓▒ ██▓░▒
███░ ░ █░ ▓ ░█ █████▒░░ ░█░▓ ▓░
██▓█ ▒▒▓▒ ▓███████▓░ ▒█▒ ▒▓ ▓██▓
▒██▓ ▓█ █▓█ ░▒█████▓▓▒░ ██▒▒ █ ▒ ▓█▒
▓█▓ ▓█ ██▓ ░▓▓▓▓▓▓▓▒ ▒██▓ ░█▒
▓█ █ ▓███▓▒░ ░▓▓▓███▓ ░▒░ ▓█
██▓ ██▒ ░▒▓▓███▓▓▓▓▓██████▓▒ ▓███ █
▓███▒ ███ ░▓▓▒░░ ░▓████▓░ ░▒▓▒ █▓
█▓▒▒▓▓██ ░▒▒░░░▒▒▒▒▓██▓░ █▓
██ ▓░▒█ ▓▓▓▓▒░░ ▒█▓ ▒▓▓██▓ ▓▒ ▒▒▓
▓█▓ ▓▒█ █▓░ ░▒▓▓██▒ ░▓█▒ ▒▒▒░▒▒▓█████▒
██░ ▓█▒█▒ ▒▓▓▒ ▓█ █░ ░░░░ ░█▒
▓█ ▒█▓ ░ █░ ▒█ █▓
█▓ ██ █░ ▓▓ ▒█▓▓▓▒█░
█▓ ░▓██░ ▓▒ ▓█▓▒░░░▒▓█░ ▒█
██ ▓█▓░ ▒ ░▒█▒██▒ ▓▓
▓█▒ ▒█▓▒░ ▒▒ █▒█▓▒▒░░▒██
░██▒ ▒▓▓▒ ▓██▓▒█▒ ░▓▓▓▓▒█▓
░▓██▒ ▓░ ▒█▓█ ░░▒▒▒
▒▓▓▓▓▓▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒░░▓▓ ▓░▒█░
______ _ _ _ _____ ____ _ _____ _ _ _ BETA
| ____| (_) | | / ____|/ __ \| | / ____| (_) | |
| |__ | |_ _ __ | | __ | (___ | | | | | | | | |_ ___ _ __ | |_
| __| | | | '_ \| |/ / \___ \| | | | | | | | | |/ _ \ '_ \| __|
| | | | | | | | < ____) | |__| | |____ | |____| | | __/ | | | |_
|_| |_|_|_| |_|_|\_\ |_____/ \___\_\______| \_____|_|_|\___|_| |_|\__|
Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit.
Command history file path: /root/.flink-sql-history
Flink SQL>
验证Kafka是否正常
使用 docker-compose exec broker bash
进入容器测试一下生产和消费,kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic test
启动消费者,消费 test
Topic. 然后使用 kafka-console-producer --bootstrap-server 127.0.0.1:9092 --topic test
命令创建生成者,进入交互命令后输入要生产的内容(以换行符分隔),消费者出现消息内容表示 Kafka 配置正确.
通过FlinkSQL创建任务
CREATE TABLE KafkaTable (
`user_id` int,
`item_id` int,
`behavior` STRING,
`ts` TIMESTAMP(3) METADATA FROM 'timestamp',
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'broker:9092',
'properties.group.id' = 'flink_consumer',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
);
生成测试数据
进入Kafka 容器,使用 kafka-console-producer
按照之前测试时的步骤测试生产和消费数据,注意Topic是 user_behavior
. 生产数据 {"user_id":1,"item_id":1,"behavior":"open"}
到 user_behavior
接着在 FlinkSQL
中观察 select * from KafkaTable;
查看是否有数据
创建MySQL数据库和表
进入MySQL容器使用 mysql -p
命令连接数据库
create database mydatabase;
use mydatabase;
CREATE TABLE mysqltable (`count` int,`user_id` int,`item_id` int);
在FlinkSQL中定义输出
CREATE TABLE MySQLTable (
`count` int,
`user_id` int,
`item_id` int
) WITH (
'connector' = 'jdbc',
'username'='root',
'password'='root',
'url' = 'jdbc:mysql://mysql:3306/mydatabase',
'table-name' = 'mysqltable'
);
根据输入insert输出定义作业
insert into MySQLTable
select
cast( count(*) as int) as `count`,user_id,item_id
from KafkaTable t
where t.behavior like 'open'
GROUP BY TUMBLE (ts, INTERVAL '30' SECOND),
user_id,
item_id;
在FlinkSQl中执行 select * from MySQLTable;
,并在Kafka中生成不同的数据,观察结果,如下:
{"user_id":1,"item_id":1,"behavior":"open"}
{"user_id":2,"item_id":1,"behavior":"open"}
WEBUI显示: