【Flink】详解Flink的八种分区

我们好,咱们的gzh是向阳三只大明白,满满满是干货,共享近期的学习常识以及个人总结(包含读研和IT),跪求一波重视,希望和我们一同努力、前进!!

简介

Flink是一个流处理框架,一个Flink-Job由多个Task/算子构成,逻辑层面构成一个链条,一起Flink支撑并行操作,每一个并行度能够了解为一个数据管道称之为SubTask。咱们画图来看一下:

【Flink】详解Flink的八种分区

数据会在多个算子的SubTask之间相互传递,算子之间的并行度可能是不同的,这样就产生了数据分区问题,其中心问题在于上游的某个SubTask的数据该发送到下流的哪一个SubTask中。为了处理分区相关问题,Flink供给了一系列分区算子,下面将具体为我们介绍分区算子和相关的分区器。

分区算子

Flink一共有6种(rescale和rebalance都是轮询算子)或者7种分区算子:

  • shuffle :调用shuffle办法将会随机分配,总体上服从均匀分布;
  • rebalance:调用rebalance办法将会轮询分配,对所有的并⾏⼦使命进⾏轮询分配,可能会导致TM之间的数据交换;
  • rescale:调用rescale办法将会以组为单位轮训分配,而不是整体进行轮训,为了避免TM之间的数据交互;
  • broadcast:调用broadcast办法将数据流播送给所有的下流子使命;
  • global:调用global办法将会进行全局分区,将上游所有数据发送到下流第一个分区中;
  • keyby:调用keyby办法将会按键分区。
  • 自界说规则:自界说数据分发战略。代表算子为partitionCustom。

分区器

概述

每一个分区算子的底层实际上对应一个分区器,一共8个分区器

  • GlobalPartitioner
  • ShufflePartitioner
  • RebalancePartitioner
  • RescalePartitioner
  • BroadcastPartitioner
  • ForwardPartitioner
  • KeyGroupStreamPartitioner
  • CustomPartitionerWrapper

各个分区器的承继联系如下:

【Flink】详解Flink的八种分区
接下来将具体介绍每一个分区算子和对应的分区器。

ChannelSelector

ChannelSelector是分区器一起完成的接口,界说分区器的根本行为。

public interface ChannelSelector<T extends IOReadableWritable> {
​
  // 初始化ChannelSelector,传入的参数为下流channel的数量
  void setup(int numberOfChannels);
​
  // 回来挑选的channel索引编号,这个办法决议的上游的数据需求写入到哪个channel中
  // 这个办法的Partitioner子类要点需求完成的办法
  // 关于broadcast播送类型算子,不需求完成这个办法
  // 尽管broadcast不需求完成这个办法,可是仍是重写了办法,throw new UnsupportedOperationException
  // 传入的参数为记载数据流中的元素,该办法需求依据元素来推断出需求发送到的下流channel
  int selectChannel(T record);
​
  // 回来是否为播送类型
  boolean isBroadcast();
}

StreamPartitioner

StreamPartitioner抽象类完成了StreamPartitioner接口,它的代码如下所示:

public abstract class StreamPartitioner<T>
    implements ChannelSelector<SerializationDelegate<StreamRecord<T>>>, Serializable {
  private static final long serialVersionUID = 1L;
    
  // 下流的channel数量
  protected int numberOfChannels;
    
  // 初始化的时分就知道下流的channel数量
  @Override
  public void setup(int numberOfChannels) {
    this.numberOfChannels = numberOfChannels;
   }
    
  // 肯定不是播送类型
  @Override
  public boolean isBroadcast() {
    return false;
   }
​
  public abstract StreamPartitioner<T> copy();
​
  @Override
  public boolean equals(Object o) {
    if (this == o) {
      return true;
     }
    if (o == null || getClass() != o.getClass()) {
      return false;
     }
    final StreamPartitioner<?> that = (StreamPartitioner<?>) o;
    return numberOfChannels == that.numberOfChannels;
   }
​
  @Override
  public int hashCode() {
    return Objects.hash(numberOfChannels);
   }
​
  // 决议了作业康复时分上游遇到扩缩容的话,需求处理哪些上游状况保存的数据
  public SubtaskStateMapper getUpstreamSubtaskStateMapper() {
    return SubtaskStateMapper.ARBITRARY;
   }
​
  // 决议了作业康复时分下流遇到扩缩容的话,需求处理哪些下流状况保存的数据
  public abstract SubtaskStateMapper getDownstreamSubtaskStateMapper();
    
  // 该办法界说了上下流之间的联系类型,假如回来True,表示上下流SubTask之间有清晰的一一对应联系,假如回来False代表上下流SubTask之间没有清晰的对应联系
  public abstract boolean isPointwise();
}

ShufflePartitioner

@PublicEvolving
public DataStream<T> shuffle() {
    return setConnectionType(new ShufflePartitioner<T>());
}

能够看到shuffle算子对应的分区器是【ShufflePartitioner】。

public class ShufflePartitioner<T> extends StreamPartitioner<T> {
  private static final long serialVersionUID = 1L;
​
  private Random random = new Random();
    
  // 重要
  // 随机回来一个下流Channel,由于random.nextInt契合均匀分布,所以shuffle的数据分布也契合均匀分布
  @Override
  public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
    return random.nextInt(numberOfChannels);
   }
​
  @Override
  public SubtaskStateMapper getDownstreamSubtaskStateMapper() {
    return SubtaskStateMapper.ROUND_ROBIN;
   }
​
  @Override
  public StreamPartitioner<T> copy() {
    return new ShufflePartitioner<T>();
   }
    
  // ShufflePartitioner上下流Subtask之间没有清晰对应联系
  @Override
  public boolean isPointwise() {
    return false;
   }
​
  @Override
  public String toString() {
    return "SHUFFLE";
   }
}

图例

【Flink】详解Flink的八种分区

GlobalPartitioner

public DataStream<T> global() {
    return setConnectionType(new GlobalPartitioner<T>());
}

能够看到global对应的分区器是【GlobalPartitioner】。

public class GlobalPartitioner<T> extends StreamPartitioner<T> {
  private static final long serialVersionUID = 1L;
    
  // 数据永久发往下流第一个SubTask。
  @Override
  public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
    return 0;
   }
​
  @Override
  public StreamPartitioner<T> copy() {
    return this;
   }
    
  // 康复使命的时分将会康复到第一个使命。
  @Override
  public SubtaskStateMapper getDownstreamSubtaskStateMapper() {
    return SubtaskStateMapper.FIRST;
   }
    
  // ShufflePartitioner上下流Subtask之间没有清晰对应联系
  @Override
  public boolean isPointwise() {
    return false;
   }
​
  @Override
  public String toString() {
    return "GLOBAL";
   }
}

图例

【Flink】详解Flink的八种分区

ForwardPartitioner

public class ForwardPartitioner<T> extends StreamPartitioner<T> {
  private static final long serialVersionUID = 1L;
    
  // 仍是发往下流第一个SubTask,不同的是这里的下流SubTask是在本地的。
  @Override
  public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
    return 0;
   }
​
  public StreamPartitioner<T> copy() {
    return this;
   }
    
  // 上下流SubTask是一一对应的,假如上下流算子并行度不一致就会报错
  @Override
  public boolean isPointwise() {
    return true;
   }
​
  @Override
  public String toString() {
    return "FORWARD";
   }
​
  @Override
  public SubtaskStateMapper getDownstreamSubtaskStateMapper() {
    return SubtaskStateMapper.UNSUPPORTED;
   }
​
  @Override
  public SubtaskStateMapper getUpstreamSubtaskStateMapper() {
    return SubtaskStateMapper.UNSUPPORTED;
   }
}

ForwardPartitionerStreamGraphaddEdgeInternal办法中主动创立(生成StreamGraph的过程),代码片段如下所示:

// ...
if (partitioner == null
    && upstreamNode.getParallelism() == downstreamNode.getParallelism()) {
  // 只有在上游和下流的并行度相同且没有指定相关分区器的时分,才会运用ForwardPartitioner
  partitioner = new ForwardPartitioner<Object>();
} else if (partitioner == null) {
  // 否 则运用RebalancePartitioner
  partitioner = new RebalancePartitioner<Object>();
}
​
// 这里还会再次检测上游和下流的并行度是否一致
// 避免用户强行指定运用ForwardPartitioner时分上下流的并行度不一致
if (partitioner instanceof ForwardPartitioner) {
  if (upstreamNode.getParallelism() != downstreamNode.getParallelism()) {
    throw new UnsupportedOperationException(
        "Forward partitioning does not allow "
            + "change of parallelism. Upstream operation: "
            + upstreamNode
            + " parallelism: "
            + upstreamNode.getParallelism()
            + ", downstream operation: "
            + downstreamNode
            + " parallelism: "
            + downstreamNode.getParallelism()
            + " You must use another partitioning strategy, such as broadcast, rebalance, shuffle or global.");
   }
}
// ...

或者调用forward算子创立,这个办法根本不运用。

public DataStream<T> forward() {
    return setConnectionType(new ForwardPartitioner<T>());
}

图例

【Flink】详解Flink的八种分区

RebalancePartitioner

public DataStream<T> rebalance() {
	return setConnectionType(new RebalancePartitioner<T>());
}

能够看到rebalance对应的分区器是【RebalancePartitioner】。

public class RebalancePartitioner<T> extends StreamPartitioner<T> {
    private static final long serialVersionUID = 1L;
    // 记载要接受数据的下流Channel编号
    private int nextChannelToSendTo;
    @Override
    public void setup(int numberOfChannels) {
        super.setup(numberOfChannels);
        nextChannelToSendTo = ThreadLocalRandom.current().nextInt(numberOfChannels);
    }
    // 选用取余的方式找出发送的下流channel
    @Override
    public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
        nextChannelToSendTo = (nextChannelToSendTo + 1) % numberOfChannels;
        return nextChannelToSendTo;
    }
    // 康复的时分将保存数据轮询发送
    @Override
    public SubtaskStateMapper getDownstreamSubtaskStateMapper() {
        return SubtaskStateMapper.ROUND_ROBIN;
    }
    public StreamPartitioner<T> copy() {
        return this;
    }
    // 上下流SubTask之间没有意义对应联系
    @Override
    public boolean isPointwise() {
        return false;
    }
    @Override
    public String toString() {
        return "REBALANCE";
    }
}

图例

【Flink】详解Flink的八种分区

RescalePartitioner

public DataStream<T> rescale() {
	return setConnectionType(new RescalePartitioner<T>());
}

能够看到rescale对应的分区器是【RescalePartitioner】。跟rebalance不同,例如上游并行度是2,下流是4,则上游一个并行度以循环的方式将记载输出到下流的两个并行度上;上游另一个并行度以循环的方式将记载输出到下流另两个并行度上。假如上游并行度是4,下流并行度是2,则上游两个并行度将记载输出到下流一个并行度上;上游另两个并行度将记载输出到下流另一个并行度上。(能够了解是一种负载均衡的轮询

public class RescalePartitioner<T> extends StreamPartitioner<T> {
    private static final long serialVersionUID = 1L;
    private int nextChannelToSendTo = -1;
    // 选用的方式和rebalance一致,都是轮询的战略
    @Override
    public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
        if (++nextChannelToSendTo >= numberOfChannels) {
            nextChannelToSendTo = 0;
        }
        return nextChannelToSendTo;
    }
    // 康复的时分不支撑扩缩容,由于本来的对应联系已经被破坏了
    @Override
    public SubtaskStateMapper getDownstreamSubtaskStateMapper() {
        return SubtaskStateMapper.UNSUPPORTED;
    }
    // 康复的时分不支撑扩缩容,由于本来的对应联系已经被破坏了
    @Override
    public SubtaskStateMapper getUpstreamSubtaskStateMapper() {
        return SubtaskStateMapper.UNSUPPORTED;
    }
    public StreamPartitioner<T> copy() {
        return this;
    }
    @Override
    public String toString() {
        return "RESCALE";
    }
    // 这是有一一对应联系的分区方式
    @Override
    public boolean isPointwise() {
        return true;
    }
}

图例

【Flink】详解Flink的八种分区

KeyGroupPartitioner

public <K> KeyedStream<T, K> keyBy(KeySelector<T, K> key) {
	Preconditions.checkNotNull(key);
	return new KeyedStream<>(this, clean(key));
}
// 调用keyby回来一个KeyedStream
// 在KeyedStream底层用一个PartitionTransformation包装了KeyGroupStreamPartitioner(键提取器,和默认最大键组数)
// 
 public KeyedStream(
            DataStream<T> dataStream,
            KeySelector<T, KEY> keySelector,
            TypeInformation<KEY> keyType) {
        this(
                dataStream,
                new PartitionTransformation<>(
                        dataStream.getTransformation(),
                        new KeyGroupStreamPartitioner<>(
                                keySelector,
                                StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM)),
                keySelector,
                keyType);
    }

以下是【KeyGroupStreamPartitioner】的源码剖析

public class KeyGroupStreamPartitioner<T, K> extends StreamPartitioner<T>
        implements ConfigurableStreamPartitioner {
    private static final long serialVersionUID = 1L;
    private final KeySelector<T, K> keySelector;
    private int maxParallelism;
    @Override
    public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
        K key;
        try {
            // 经过keySelector获取键
            key = keySelector.getKey(record.getInstance().getValue());
        } catch (Exception e) {
            throw new RuntimeException(
                    "Could not extract key from " + record.getInstance().getValue(), e);
        }
        // 
        return KeyGroupRangeAssignment.assignKeyToParallelOperator(
                key, maxParallelism, numberOfChannels);
    }
    @Override
    public SubtaskStateMapper getDownstreamSubtaskStateMapper() {
        return SubtaskStateMapper.RANGE;
    }
    // 上下流SubTask没有一一对应联系
    @Override
    public boolean isPointwise() {
        return false;
    }
    // 这里是查看是否装备了最大并行度(最大建组数),假如有装备则代替默认值
    @Override
    public void configure(int maxParallelism) {
        KeyGroupRangeAssignment.checkParallelismPreconditions(maxParallelism);
        this.maxParallelism = maxParallelism;
    }
}
// 包装了一层查看一下键是否是null
// key:键;
// maxParallelis:支撑的最大并行度,也便是键组的数量
// parallelism:当时并行度
public static int assignKeyToParallelOperator(Object key, int maxParallelism, int parallelism) {
	Preconditions.checkNotNull(key, "Assigned key must not be null!");
	return computeOperatorIndexForKeyGroup(maxParallelism, parallelism, assignToKeyGroup(key, maxParallelism));
}
// 分配键组
// key:键;
// maxParallelis:支撑的最大并行度,也便是键组的数量
public static int assignToKeyGroup(Object key, int maxParallelism) {
    Preconditions.checkNotNull(key, "Assigned key must not be null!");
	return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism);
}
// 经过键组ID*当时并行度/最大键组数量默认128来分配数据流向的channel
// maxParallelis:支撑的最大并行度,也便是键组的数量
// parallelism:当时并行度
// keyGroupId:键组ID
public static int computeOperatorIndexForKeyGroup(int maxParallelism, int parallelism, int keyGroupId) {
	return keyGroupId * parallelism / maxParallelism;
}

图例

【Flink】详解Flink的八种分区

Flink怎么运用分区器

Flink经过RecordWriter向下流写入输入。RecordWriter经过RecordWriterBuilder创立。

public RecordWriter<T> build(ResultPartitionWriter writer) {
    if (selector.isBroadcast()) {
        return new BroadcastRecordWriter<>(writer, timeout, taskName);
    } else {
        return new ChannelSelectorRecordWriter<>(writer, selector, timeout, taskName);
    }
}

build办法中会调用【selector】的isBroadcast办法,假如是播送类型,则创立【BroadcastRecordWriter】目标来写数据,否则创立【ChannelSelectorRecordWriter】目标来写数据。

以下是【BroadcastRecordWriter】目标的源码剖析:

public final class BroadcastRecordWriter<T extends IOReadableWritable> extends RecordWriter<T> {
	broadcastEmit办法
    // writer都是调用emit办法,在BroadcastRecordWriter中进行了包装,实质调用的是broadcastEmit办法
    @Override
    public void emit(T record) throws IOException {
        broadcastEmit(record);
    }
    @Override
    public void broadcastEmit(T record) throws IOException {
        // 查看
        checkErroneous();
		// 先运用序列化器将数据序列化,然后进行播送
        targetPartition.broadcastRecord(serializeRecord(serializer, record));
        if (flushAlways) {
            flushAll();
        }
    }
}

以下是【ChannelSelectorRecordWriter】目标源码剖析:

public final class ChannelSelectorRecordWriter<T extends IOReadableWritable>
        extends RecordWriter<T> {
    private final ChannelSelector<T> channelSelector;
    @Override
    public void emit(T record) throws IOException {
        // 分区器依据当时记载核算出下流Subtask的索引,然后发送
        emit(record, channelSelector.selectChannel(record));
    }
    protected void emit(T record, int targetSubpartition) throws IOException {
        checkErroneous();
        // 先进行序列化操作
        // targetSubpartition便是上一步中分区器核算的SubTask索引
        targetPartition.emitRecord(serializeRecord(serializer, record), targetSubpartition);
        if (flushAlways) {
            targetPartition.flush(targetSubpartition);
        }
    }
}

总结

  1. Flink自身供给了多种分区API,在底层运用的都是分区器,Flink一般供给了7种分区器;
  2. 按键分区本质上是按键组分区,经过分配键组的方式分配键;
  3. rescale本地轮番分配)和rebalance轮番分配)有区别,前者考虑了TM之间数据传输的问题,能够了解是一种软负载均衡的轮询;

往期回忆

  1. 【Flink】浅谈Flink背压问题(1)
  2. 【分布式】浅谈CAP、BASE理论(1)

文中难免会出现一些描绘不当之处(尽管我已重复查看多次),欢迎在留言区指正,相关的常识点也可进行共享,希望我们都能有所收获!!