Flink Table API&SQL的概念和通用API

 百万发注册     |      2019-12-25 08:32

上回讲到,游戏逻辑交由Table去测试了,TableServer似乎没有什么可测的了。

Table API和SQL通过join API集成在一起,这个join API的核心概念是Table,Table可以作为查询的输入和输出。这篇文档展示了使用Table API和SQL查询的程序的通用结构,如何注册一个Table,如何查询一个Table以及如何将数据发给Table。

想了下,TableServer还是有点东西可测的,至少我们可以测试API接口是否正常。

Table API和SQL查询程序的结构

所有批处理和流处理的Table API、SQL程序都有如下相同的模式,下面例子的代码展示了Table API和SQL程序的通用结构:

// 对于批处理程序来说使用 ExecutionEnvironment 来替换 StreamExecutionEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 创建一个TableEnvironment
// 对于批处理程序来说使用 BatchTableEnvironment 替换 StreamTableEnvironment
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// 注册一个 Table
tableEnv.registerTable("table1", ...)            // 或者
tableEnv.registerTableSource("table2", ...);     // 或者
tableEnv.registerExternalCatalog("extCat", ...);

// 从Table API的查询中创建一个Table
Table tapiResult = tableEnv.scan("table1").select(...);
// 从SQL查询中创建一个Table
Table sqlResult  = tableEnv.sql("SELECT ... FROM table2 ... ");

// 将Table API 种的结果 Table 发射到TableSink中 , SQL查询也是一样的
tapiResult.writeToSink(...);

// 执行
env.execute();
// 对于批处理程序来说使用 ExecutionEnvironment 来替换 StreamExecutionEnvironment
val env = StreamExecutionEnvironment.getExecutionEnvironment

// 创建一个TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// 注册一个 Table
tableEnv.registerTable("table1", ...)           // 或者
tableEnv.registerTableSource("table2", ...)     // 或者
tableEnv.registerExternalCatalog("extCat", ...) 

// 从Table API的查询中创建一个Table
val tapiResult = tableEnv.scan("table1").select(...)
// 从SQL查询中创建一个Table
val sqlResult  = tableEnv.sql("SELECT ... FROM table2 ...")

// 将Table API 种的结果 Table 发射到TableSink中 , SQL查询也是一样的
tapiResult.writeToSink(...)

// 执行
env.execute()

注意:Table API 和 SQL查询可以轻易地进行集成并嵌入到DataStream或者DataSet程序中,请参考Integration With DataStream and DataSet API部分来了解DataStream和DataSet如何转换成Table及Table如何转换成DataStream和DataSet。

所以现在补了点TableServer的测试。代码也做了点修改。

创建一个TableEnvironment

TableEnvironment是Table API和SQL集成的核心概念,它主要负责:
  1、在内部目录中注册一个Table
  2、注册一个外部目录
  3、执行SQL查询
  4、注册一个用户自定义函数(标量、表及聚合)
  5、将DataStream或者DataSet转换成Table
  6、持有ExecutionEnvironment或者StreamExecutionEnvironment的引用
一个Table总是会绑定到一个指定的TableEnvironment中,相同的查询不同的TableEnvironment是无法通过join、union合并在一起。

TableEnvironment可以通过调用带有参数StreamExecutionEnvironment或者ExecutionEnvironment和一个可选参数TableConfig的静态方法TableEnvironment.getTableEnvironment()来创建。TableConf可以用来配置TableEnvironment或者自定义查询优化器和翻译过程(参考查询优化器)

// ***************
// STREAMING QUERY
// ***************
StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
// 为streaming查询创建一个 TableEnvironment
StreamTableEnvironment sTableEnv = TableEnvironment.getTableEnvironment(sEnv);

// ***********
// BATCH QUERY
// ***********
ExecutionEnvironment bEnv = ExecutionEnvironment.getExecutionEnvironment();
// 为批查询创建一个 TableEnvironment
BatchTableEnvironment bTableEnv = TableEnvironment.getTableEnvironment(bEnv);
// ***************
// STREAMING QUERY
// ***************
val sEnv = StreamExecutionEnvironment.getExecutionEnvironment
// 为流查询创建一个 TableEnvironment
val sTableEnv = TableEnvironment.getTableEnvironment(sEnv)

// ***********
// BATCH QUERY
// ***********
val bEnv = ExecutionEnvironment.getExecutionEnvironment
// 为批查询创建一个 TableEnvironment
val bTableEnv = TableEnvironment.getTableEnvironment(bEnv)

另外顺便提一下,如果在一个节点只用1个Registry提供Player/Table等多种注册的话,最好分装几个Global注册的

在Catalog(目录)中注册一个Table

TableEnvironment有一个在内部通过表名组织起来的表目录,Table API或者SQL查询可以访问注册在目录中的表,并通过名称来引用它们。
TableEnvironment允许通过各种源来注册一个表:
  1、一个已存在的Table对象,通常是Table API或者SQL查询的结果
  2、TableSource,可以访问外部数据如文件、数据库或者消息系统
  3、DataStream或者DataSet程序中的DataStream或者DataSet

将DataStream或者DataSet注册为一个表将在Integration With DataStream and DataSet API中讨论。

API并增加再上一层的API供统一使用可能会更好点,这样不用每个需要的地方都做。

注册一个Table

一个Table可以在TableEnvironment中按照下面程序注册:

// 获取一个 StreamTableEnvironment,  BatchTableEnvironment也是同样的方法
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// Table 是简单的投影查询的结果 
Table projTable = tableEnv.scan("X").project(...);

// 将 Table projTable 注册为表 "projectedX"
tableEnv.registerTable("projectedTable", projTable);
// 获取一个TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// Table 是简单的投影查询的结果 
val projTable: Table = tableEnv.scan("X").project(...)

// 将 Table projTable 注册为表 "projectedX"
tableEnv.registerTable("projectedTable", projTable)

注意:一个注册的Table被当做是与关系型数据库中的视图类似,即定义Table的查询不会被优化,但是当其他查询引用到已注册的Table时会被内联。如果多个查询引用同一个已注册的Table,这个Table会跟每个查询内联并进行多次执行,即:已注册的Table的结果不会共享。

这部分我们处理,因为实际上不难。

注册一个TableSource

TableSource可以访问保存在外部存储系统如数据库系统(MySQL、HBase...),指定编码格式的文件(CSV, Apache [Parquet, Avro, ORC],...)或者消息系统(Apache Kafka,RabbitMQ,...)中的数据。

Flink的目标是为通用的数据格式和存储系统提供TableSource,请参考Table Sources和Sinks页来了解Flink所支持的TableSource列表及如何自定义一个TableSource。

一个TableSource可以在TableEnvironment中按如下方式来定义:

// 获取一个StreamTableEnvironment, 同样适用于BatchTableEnvironment
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// 创建一个 TableSource
TableSource csvSource = new CsvTableSource("/path/to/file", ...);

// 将TableSource注册为表 "CsvTable"
tableEnv.registerTableSource("CsvTable", csvSource);
// 获取一个 TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// 创建一个TableSource
val csvSource: TableSource = new CsvTableSource("/path/to/file", ...)

// 将 TableSource 注册为表 "CsvTable"
tableEnv.registerTableSource("CsvTable", csvSource)

 

注册一个外部Catalog(目录)

一个外部目录提供了关于外部数据库和表的信息如:它们的名称、模式、统计及如何访问保存在外部数据库、表和文件中的数据。

一个外部目录可以通过实现ExternalCatalog接口来创建并在TableEnvironment中注册,如下:

// 获取一个 StreamTableEnvironment, 同样适用于 BatchTableEnvironment
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// 创建一个外部catalog
ExternalCatalog catalog = new InMemoryExternalCatalog();

// 注册 ExternalCatalog
tableEnv.registerExternalCatalog("InMemCatalog", catalog);
// 获取一个 TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// 创建一个 catalog
val catalog: ExternalCatalog = new InMemoryExternalCatalog

// 注册 ExternalCatalog
tableEnv.registerExternalCatalog("InMemCatalog", catalog)

一旦在TableEnvironment中注册之后,所有定义在ExternalCatalog中的表都可以通过指定全路径如:catalog.database.table 在Table API或者SQL查询来访问。

目前,Flink提供InMemoryExternalCatalog来做demo或者测试。然而,ExternalCatalog接口还可以被用来连接HCatalog或者Metastore到Table API。

代码已经提交。

查询一个Table

Table API

Table API是一个Scala和Java的语言集成查询API,与SQL相反,查询并不指定为字符串而是根据主机语言一步一步的构建。

Table API是基于Table类来的,Table类代表了一个流或者批表,并且提供方法来使用关系型操作。这些方法返回一个新的Table对象,这个Table对象代表着输入的Table应用关系型操作后的结果。一些关系型操作是由多个方法调用组成的如:table.groupBy(...).select(), 其中groupBy(...)指定了table的分组,而select(...)则是table分组的映射。

Table API文档描述了streaming和batch表所支持的所有Table API操作。

下面的例子展示了一个简单的Table API聚合查询:

// 获取一个 StreamTableEnvironment, 同样适用于 BatchTableEnvironment
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// 注册一个名叫 Orders 的表

// 扫描注册的 Orders 表
Table orders = tableEnv.scan("Orders");
// 计算所有来自法国的客户的收入
Table revenue = orders
  .filter("cCountry === 'FRANCE'")
  .groupBy("cID, cName")
  .select("cID, cName, revenue.sum AS revSum");

// 发射或者转换一个 Table
// 执行查询
// 获取一个 TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// 注册一个名叫 Orders 的表

// 扫描已注册的 Orders 表
Table orders = tableEnv.scan("Orders")
// 计算所有来自法国偶的客户的收入
Table revenue = orders
  .filter('cCountry === "FRANCE")
  .groupBy('cID, 'cName)
  .select('cID, 'cName, 'revenue.sum AS 'revSum)

// 发射或者转换一个Table
// 执行查询

注意:Scala Table API使用Scala的符号在引用表属性时,以'`'开始,Table API使用Scala的隐式转换,为了使用Scala的隐式转换,请确保导入org.apache.flink.api.scala._org.apache.flink.table.api.scala._

SQL

Flink的SQL集成是基于Apache Calcite的,Apache Calcite实现了标准的SQL,SQL查询被指定为常规字符串。

SQL文档描述了Flink对流和批表的SQL支持。
下面的例子展示了如何指定一个查询并返回一个Table结果;

// 获取一个 StreamTableEnvironment, 同样适用于BatchTableEnvironment
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// 注册一个名叫Orders 的表

// 计算所有来自法国的客户的收入
Table revenue = tableEnv.sql(
    "SELECT cID, cName, SUM(revenue) AS revSum " +
    "FROM Orders " +
    "WHERE cCountry = 'FRANCE' " +
    "GROUP BY cID, cName"
  );

// 发射或者转换一个Table
// 执行查询
// 获取一个 TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

//注册一个名叫 Orders的表

// 计算所有来自法国的客户的收入
Table revenue = tableEnv.sql(""" 
  |SELECT cID, cName, SUM(revenue) AS revSum
  |FROM Orders
  |WHERE cCountry = 'FRANCE'
  |GROUP BY cID, cName
  """.stripMargin)

// 发射或者转换 Table
// 执行查询

混合使用Table API和SQL

Table API和SQL查询可以很容易地合并因为它们都返回Table对象:
  1、Table API查询可以基于SQL查询结果的Table来进行
  2、SQL查询可以基于Table API查询的结果来定义

发射一个Table

为了发射一个Table,可以将其写入一个TableSink中,TableSink 是支持各种文件格式(如:CSV, Apache Parquet, Apache Avro)、存储系统(如:JDBC, Apache HBase, Apache Cassandra, Elasticsearch)或者消息系统(如:Apache Kafka,RabbitMQ)的通用接口。

一个批Table只能写入BatchTableSink中,而流Table需要一个AppendStreamTableSinkRetractStreamTableSink或者UpsertStreamTableSink

请参考Table Sources & Sinks文档来了解更多可用sink的信息和如何实现一个自定义的TableSink。

// 获取一个StreamTableEnvironment, 同样适用于 BatchTableEnvironment
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// 使用Table API和/或SQL查询获取一个 Table
Table result = ...

// 创建一个TableSink
TableSink sink = new CsvTableSink("/path/to/file", fieldDelim = "|");

// 将结果Table写入TableSink中
result.writeToSink(sink);

// 执行程序
// 获取一个TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// 使用Table API和/或SQL查询获取一个 Table
val result: Table = ...

//创建一个 TableSink
val sink: TableSink = new CsvTableSink("/path/to/file", fieldDelim = "|")

// 将结果 Table写入TableSink中
result.writeToSink(sink)

// 执行程序

翻译和执行一个查询

Table API和SQL查询根据输入是流还是批翻译成DataStream或者DataSet,查询内部表示为一个逻辑查询计划,并分两个阶段进行翻译:
  1、优化逻辑计划
  2、翻译成一个DataStream或者DataSet程序

Table API或者SQL查询会在下面情况下触发:
  当调用Table.writeToSink()时,Table会发射到TableSink中
  Table转换DataStream或者DataSet时(参考与DataStream和DataSet API集成)
一旦翻译,Table API或者SQL查询就会像常规DataStream或DataSet处理一样,并且当StreamExecutionEnvironment.execute()或者ExecutionEnvironment.execute()调用时执行。