自定义数据源

除了内置数据源,我们还可以编写自己的定制数据源。

对于非并行数据源,实现SourceFunction接口;对于并行数据源,实现ParallelSourceFunction接口或扩展(继承)自RichParallelSourceFunction。

【示例】(简单版本)使用自定义数据源,模拟信用卡交易流数据生成器。

请按以下步骤执行。

1、在IntelliJ IDEA中创建一个Flink项目,使用flink-quickstart-scala/flink-quickstart-java项目模板。

2、设置依赖。在pom.xml中添加如下依赖:

<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-scala_2.12</artifactId>
	<version>1.13.2</version>
	<scope>provided</scope>
</dependency>
<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-streaming-scala_2.12</artifactId>
	<version>1.13.2</version>
	<scope>provided</scope>
</dependency>
<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-java</artifactId>
	<version>1.13.2</version>
	<scope>provided</scope>
</dependency>
<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-streaming-java_2.12</artifactId>
	<version>1.13.2</version>
	<scope>provided</scope>
</dependency>

3、创建POJO类,表示信用卡交易数据结构:

import java.util.Objects;

/**
 * 实体类,代表信用卡交易数据
 */
public class Transaction implements Serializable {
    public long accountId;     	// 交易账户
    public long timestamp;     	// 交易时间
    public double amount;      	// 交易金额

    public Transaction() { }

    public Transaction(long accountId, long timestamp, double amount) {
        this.accountId = accountId;
        this.timestamp = timestamp;
        this.amount = amount;
    }

    public long getAccountId() {
        return accountId;
    }

    public void setAccountId(long accountId) {
        this.accountId = accountId;
    }

    public long getTimestamp() {
        return timestamp;
    }

    public void setTimestamp(long timestamp) {
        this.timestamp = timestamp;
    }

    public double getAmount() {
        return amount;
    }

    public void setAmount(double amount) {
        this.amount = amount;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) {
            return true;
        } else if (o == null || getClass() != o.getClass()) {
            return false;
        }
        Transaction that = (Transaction) o;
        return accountId == that.accountId &&
                timestamp == that.timestamp &&
                Double.compare(that.amount, amount) == 0;
    }

    @Override
    public int hashCode() {
        return Objects.hash(accountId, timestamp, amount);
    }

    @Override
    public String toString() {
        return "Transaction{" +
                "accountId=" + accountId +
                ", timestamp=" + timestamp +
                ", amount=" + amount +
                '}';
    }
}

4、创建自定义的数据源类,继承自SourceFunction。实现代码如下所示:

import com.xueai8.fraud.entity.Transaction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.sql.Timestamp;
import java.util.Arrays;
import java.util.List;

/**
 * 自定义数据源,继承自SourceFunction
 */
public class MyTransactionSource implements SourceFunction {
    private static final long serialVersionUID = 1L;

    private static final Timestamp INITIAL_TIMESTAMP = Timestamp.valueOf("2020-01-01 00:00:00");
    private static final long SIX_MINUTES = 6 * 60 * 1000;

    private final boolean bounded;      		// 标志变量,指示生成流数据还是批数据

    private int index = 0;              		// 交易记录的索引
    private long timestamp;             	// 交易发生的时间戳

    private volatile boolean isRunning = true;
    private List<Transaction> data = null;

    public MyTransactionSource(boolean bounded){
        this.bounded = bounded;
        this.timestamp = INITIAL_TIMESTAMP.getTime();

        // 事先存储的信用卡交易数据,在实际中来自于外部数据源系统(如Kafka)
        data = Arrays.asList(
                new Transaction(1, 0L, 188.23),
                new Transaction(2, 0L, 374.79),
                new Transaction(3, 0L, 112.15),
                new Transaction(4, 0L, 478.75),
                new Transaction(5, 0L, 208.85),
                new Transaction(1, 0L, 379.64),
                new Transaction(2, 0L, 351.44),
                new Transaction(3, 0L, 320.75),
                new Transaction(4, 0L, 259.42),
                new Transaction(5, 0L, 273.44),
                new Transaction(1, 0L, 267.25),
                new Transaction(2, 0L, 397.15),
                new Transaction(3, 0L, 0.219),
                new Transaction(4, 0L, 231.94),
                new Transaction(5, 0L, 384.73),
                new Transaction(1, 0L, 419.62),
                new Transaction(2, 0L, 412.91),
                new Transaction(3, 0L, 0.77),
                new Transaction(4, 0L, 22.10),
                new Transaction(5, 0L, 377.54),
                new Transaction(1, 0L, 375.44),
                new Transaction(2, 0L, 230.18),
                new Transaction(3, 0L, 0.80),
                new Transaction(4, 0L, 350.89),
                new Transaction(5, 0L, 127.55),
                new Transaction(1, 0L, 483.91),
                new Transaction(2, 0L, 228.22),
                new Transaction(3, 0L, 871.15),
                new Transaction(4, 0L, 64.19),
                new Transaction(5, 0L, 79.43),
                new Transaction(1, 0L, 56.12),
                new Transaction(2, 0L, 256.48),
                new Transaction(3, 0L, 148.16),
                new Transaction(4, 0L, 199.95),
                new Transaction(5, 0L, 252.37),
                new Transaction(1, 0L, 274.73),
                new Transaction(2, 0L, 473.54),
                new Transaction(3, 0L, 119.92),
                new Transaction(4, 0L, 323.59),
                new Transaction(5, 0L, 353.16),
                new Transaction(1, 0L, 211.90),
                new Transaction(2, 0L, 280.93),
                new Transaction(3, 0L, 347.89),
                new Transaction(4, 0L, 459.86),
                new Transaction(5, 0L, 82.31),
                new Transaction(1, 0L, 373.26),
                new Transaction(2, 0L, 479.83),
                new Transaction(3, 0L, 454.25),
                new Transaction(4, 0L, 83.64),
                new Transaction(5, 0L, 292.44)
        );
    }

    @Override
    public void run(SourceContext sourceContext) throws Exception {
        while(this.isRunning && this.hasNext()) {
            sourceContext.collect(this.next());
        }
    }

    @Override
    public void cancel() {
        this.isRunning = false;
    }

    private boolean hasNext() {
        // 如果还有数据
        if (index < data.size()) {
            return true;
        }
        // 如果是用于生成批数据
        else if(bounded){
            return false;
        }
        // 如果是用于生成流数据,从头循环
        else {
            index = 0;
            return true;
        }

    }

    // 生成下一个交易数据,交易时间相隔6分钟
    private Transaction next() {
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        Transaction transaction = data.get(index++);
        transaction.setTimestamp(timestamp);
        timestamp += SIX_MINUTES;
        return transaction;
    }
}

5、创建一个测试类(带有main方法的主程序),使用addSource方法添加自定义数据源。

Scala代码:

import com.xueai8.ch03.source.MyTransactionSource
import org.apache.flink.streaming.api.scala._

/**
  * 自定义数据源的测试类
  */
object CustomSourceDemo {

  def main(args: Array[String]): Unit = {
    // 获取流执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // 添加自定义数据源
    val transactions = env.addSource(new MyTransactionSource(false)).name("transactions")

    // 输出交易记录
    transactions.print

    // 触发流程序执行
    env.execute("Flink Custom Source")
  }
}

Java代码:

import com.xueai8.ch03.entity.Transaction;
import com.xueai8.ch03.source.MyTransactionSource;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * 自定义数据源测试类
 */
public class CustomSourceDemo {

	public static void main(String[] args) throws Exception {
		// 设置流执行环境
		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		// 设置自定义数据源。参数false指定创建的是流数据源
		DataStream<Transaction> transactions = 
env.addSource(new MyTransactionSource(false)).name("transactions");

		// 输出查看
		transactions.print();

		// 执行流程序
		env.execute("Transaction Stream");
	}
}

6、执行以上程序,查看控制台输出结果如下(部分结果):

1> Transaction{accountId=1, timestamp=1577808000000, amount=188.23}
2> Transaction{accountId=2, timestamp=1577808360000, amount=374.79}
3> Transaction{accountId=3, timestamp=1577808720000, amount=112.15}
4> Transaction{accountId=4, timestamp=1577809080000, amount=478.75}
5> Transaction{accountId=5, timestamp=1577809440000, amount=208.85}
6> Transaction{accountId=1, timestamp=1577809800000, amount=379.64}
7> Transaction{accountId=2, timestamp=1577810160000, amount=351.44}
8> Transaction{accountId=3, timestamp=1577810520000, amount=320.75}
1> Transaction{accountId=4, timestamp=1577810880000, amount=259.42}
2> Transaction{accountId=5, timestamp=1577811240000, amount=273.44}
3> Transaction{accountId=1, timestamp=1577811600000, amount=267.25}
4> Transaction{accountId=2, timestamp=1577811960000, amount=397.15}
5> Transaction{accountId=3, timestamp=1577812320000, amount=0.219}
6> Transaction{accountId=4, timestamp=1577812680000, amount=231.94}
7> Transaction{accountId=5, timestamp=1577813040000, amount=384.73}
8> Transaction{accountId=1, timestamp=1577813400000, amount=419.62}
......

《Spark原理深入与编程实战》