Flink消费Kafka统计点击量

记录一下自己搭建Flink环境的过程,给新手一些参考。一个简单的统计用户打开内容次数,数据从Kafka消费,经过Flink做简单的计算后,存入MySQL供后台展示,为了减少出错,降低上手难度,字段非常精简。

实验环境准备

创建Docker

还是使用Docker,下面是 docker-compose.yml

version: "3.7"
services:
  FlinkJobManager:  # define the job manager service
    image: flink:1.15.0-scala_2.12-java8
    container_name: FlinkJobManager
    hostname: FlinkJobManager
    expose:
      - "6123"
    ports:
      - "8081:8081"  # map port 8081 of container to port 8081 of localhost 
    command: jobmanager
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: FlinkJobManager 

  FlinkTaskManager:  # define the task manager service
    image: flink:1.15.0-scala_2.12-java8
    #container_name: FlinkTaskManager
    #hostname: FlinkTaskManager
    expose:
      - "6121"
      - "6122"
    depends_on:
      - FlinkJobManager  # ensure the sequence of docker composing
    command: taskmanager
    scale: 2  # replica
    links:
      - "FlinkJobManager:jobmanager"  # link to job manager service
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: FlinkJobManager
        taskmanager.numberOfTaskSlots: 2

  zookeeper:
    image: confluentinc/cp-zookeeper:6.2.0
    container_name: zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
  broker:
    image: confluentinc/cp-kafka:6.2.0
    container_name: broker
    ports:
      - "9092:9092"
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_INTERNAL://broker:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1

  mysql:
    image: mysql:5.7
    ports:
      - "3306:3306"
    command: [
          'mysqld',
          '--innodb-buffer-pool-size=80M',
          '--character-set-server=utf8mb4',
          '--collation-server=utf8mb4_unicode_ci',
          '--default-time-zone=+8:00',
          '--lower-case-table-names=1'
        ]
    environment:
      # root 密码
      MYSQL_ROOT_PASSWORD: root

添加Flink运行时的Lib

应为需要连接Kafka和Mysql需要添加对应驱动

在两台 FlinkTaskManager 中安装连接依赖(下载下面的jar到 /opt/flink/lib),容器内有wget,可以使用wget + jar 下载地址的方式下载。下载完记得重启容器。

Kafka

官方文档地址

https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/connectors/table/kafka/

Jar文件地址

https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka/1.15.0/flink-sql-connector-kafka-1.15.0.jar

jdbc & MySQL

https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/connectors/table/jdbc/

https://repo.maven.apache.org/maven2/org/apache/flink/flink-connector-jdbc/1.15.0/flink-connector-jdbc-1.15.0.jar

https://repo.maven.apache.org/maven2/mysql/mysql-connector-java/8.0.29/mysql-connector-java-8.0.29.jar

验证Flink SQL是否可以正常启动

进入 FlinkJobManager 容器,启动 FlinkSQL客户端

docker-compose exec FlinkJobManager bash

./bin/sql-client.sh

看到如下输出即可



▒▓██▓██▒ ▓████▒▒█▓▒▓███▓▒ ▓███▓░░ ▒▒▒▓██▒ ▒ ░██▒ ▒▒▓▓█▓▓▒░ ▒████ ██▒ ░▒▓███▒ ▒█▒█▒ ░▓█ ███ ▓░▒██ ▓█ ▒▒▒▒▒▓██▓░▒░▓▓█ █░ █ ▒▒░ ███▓▓█ ▒█▒▒▒ ████░ ▒▓█▓ ██▒▒▒ ▓███▒ ░▒█▓▓██ ▓█▒ ▓█▒▓██▓ ░█░ ▓░▒▓████▒ ██ ▒█ █▓░▒█▒░▒█▒ ███▓░██▓ ▓█ █ █▓ ▒▓█▓▓█▒ ░██▓ ░█░ █ █▒ ▒█████▓▒ ██▓░▒ ███░ ░ █░ ▓ ░█ █████▒░░ ░█░▓ ▓░ ██▓█ ▒▒▓▒ ▓███████▓░ ▒█▒ ▒▓ ▓██▓ ▒██▓ ▓█ █▓█ ░▒█████▓▓▒░ ██▒▒ █ ▒ ▓█▒ ▓█▓ ▓█ ██▓ ░▓▓▓▓▓▓▓▒ ▒██▓ ░█▒ ▓█ █ ▓███▓▒░ ░▓▓▓███▓ ░▒░ ▓█ ██▓ ██▒ ░▒▓▓███▓▓▓▓▓██████▓▒ ▓███ █ ▓███▒ ███ ░▓▓▒░░ ░▓████▓░ ░▒▓▒ █▓ █▓▒▒▓▓██ ░▒▒░░░▒▒▒▒▓██▓░ █▓ ██ ▓░▒█ ▓▓▓▓▒░░ ▒█▓ ▒▓▓██▓ ▓▒ ▒▒▓ ▓█▓ ▓▒█ █▓░ ░▒▓▓██▒ ░▓█▒ ▒▒▒░▒▒▓█████▒ ██░ ▓█▒█▒ ▒▓▓▒ ▓█ █░ ░░░░ ░█▒ ▓█ ▒█▓ ░ █░ ▒█ █▓ █▓ ██ █░ ▓▓ ▒█▓▓▓▒█░ █▓ ░▓██░ ▓▒ ▓█▓▒░░░▒▓█░ ▒█ ██ ▓█▓░ ▒ ░▒█▒██▒ ▓▓ ▓█▒ ▒█▓▒░ ▒▒ █▒█▓▒▒░░▒██ ░██▒ ▒▓▓▒ ▓██▓▒█▒ ░▓▓▓▓▒█▓ ░▓██▒ ▓░ ▒█▓█ ░░▒▒▒ ▒▓▓▓▓▓▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒░░▓▓ ▓░▒█░ ______ _ _ _ _____ ____ _ _____ _ _ _ BETA | ____| (_) | | / ____|/ __ \| | / ____| (_) | | | |__ | |_ _ __ | | __ | (___ | | | | | | | | |_ ___ _ __ | |_ | __| | | | '_ \| |/ / \___ \| | | | | | | | | |/ _ \ '_ \| __| | | | | | | | | < ____) | |__| | |____ | |____| | | __/ | | | |_ |_| |_|_|_| |_|_|\_\ |_____/ \___\_\______| \_____|_|_|\___|_| |_|\__| Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit. Command history file path: /root/.flink-sql-history Flink SQL>

验证Kafka是否正常

使用 docker-compose exec broker bash 进入容器测试一下生产和消费,kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic test 启动消费者,消费 test Topic. 然后使用 kafka-console-producer --bootstrap-server 127.0.0.1:9092 --topic test 命令创建生成者,进入交互命令后输入要生产的内容(以换行符分隔),消费者出现消息内容表示 Kafka 配置正确.

通过FlinkSQL创建任务

CREATE TABLE KafkaTable (
  `user_id` int,
  `item_id` int,
  `behavior` STRING,
  `ts` TIMESTAMP(3) METADATA FROM 'timestamp',
  WATERMARK FOR ts AS ts - INTERVAL '5' SECOND 
) WITH (
  'connector' = 'kafka',
  'topic' = 'user_behavior',
  'properties.bootstrap.servers' = 'broker:9092',
  'properties.group.id' = 'flink_consumer',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
);

生成测试数据

进入Kafka 容器,使用 kafka-console-producer 按照之前测试时的步骤测试生产和消费数据,注意Topic是 user_behavior. 生产数据 {"user_id":1,"item_id":1,"behavior":"open"}user_behavior 接着在 FlinkSQL 中观察 select * from KafkaTable; 查看是否有数据

创建MySQL数据库和表

进入MySQL容器使用 mysql -p 命令连接数据库

create database mydatabase;
use mydatabase;
CREATE TABLE mysqltable (`count` int,`user_id` int,`item_id` int);

在FlinkSQL中定义输出

CREATE TABLE MySQLTable (
    `count` int,
    `user_id` int,
    `item_id` int
) WITH (
    'connector' = 'jdbc',
    'username'='root',
    'password'='root',
    'url' = 'jdbc:mysql://mysql:3306/mydatabase',
    'table-name' = 'mysqltable'
);

根据输入insert输出定义作业

insert into MySQLTable
select
    cast( count(*) as int) as `count`,user_id,item_id
from KafkaTable t
where t.behavior like 'open'
GROUP BY TUMBLE (ts, INTERVAL '30' SECOND),
user_id,
item_id;

在FlinkSQl中执行 select * from MySQLTable;,并在Kafka中生成不同的数据,观察结果,如下:

{"user_id":1,"item_id":1,"behavior":"open"}
{"user_id":2,"item_id":1,"behavior":"open"}

WEBUI显示:

点赞