Trident State 详解

简介:

一、什么是Trident State

直译过来就是trident状态,这里的状态主要涉及到Trident如何实现一致性语义规则,Trident的计算结果将被如何提交,如何保存,如何更新等等。我们知道Trident的计算都是以batch为单位的,但是batch在中的tuple在处理过程中有可能会失败,失败之后bach又有可能会被重播,这就涉及到很多事务一致性问题。Trident State就是管理这些问题的一套方案,与这套方案对应的就是Trident State API。这样说可能还比较抽象,下面就用一个例子具体说明一下。

1.1 举例具体例子来说明

假设有这么一个需求,统计一个数据流中各个单词出现的数量,并把单词和其数量更新到数据库中。假设我们在数据库中只有两个字段,单词和其数量,在计数过程中,如果遇到相同的单词则就把其数量加一。但是这么做有一个问题,如果某个单词是被重播的单词,就有可能导致这个单词被多加了一遍。因此,在数据库中只保存单词和其数量两个字段是无法做到“数据只被处理一次”的语义要求的。

1.2 Trident是怎么解决这个问题的呢?

Trident定义了如下语义规则:

1. 所有的Tuple都是以batch的形式处理的
2. 每个batch都会被分配一个唯一的“transaction id”(txid),如果batch被重发,txid不变
3. 各个batch状态的更新是有序的,也就是说batch2一定会在batch3之前更新

有了这三个规则,我们就可以通过txid知道batch是否被处理过,然后就可以根据实际情况来更新状态信息了。很明显,要满足这几个语义规则,就需要spout来支持,因为把tuple封装成batch,分配txid等等都是有spout来负责的。

但是在具体应用场景中,storm应该能够提供不同的容错级别,因为某些情况下我们并不需要强一致性。为了更灵活的处理,Trident提供了三类spout,分别是:

1. Transactional spouts : 事务spout,提供了强一致性
2. Opaque Transactional spouts:不透明事务spout,提供了弱一致性
3. No-Transactional spouts:非事务spout,对一致性无法保证

注意,所有的Trident Spout都是以batch的形式发送数据,每个batch也都会分配一个唯一的txid,决定它们有不同性质的地方在于它们对各自的batch提供了什么样的保证

1.3 Trident State的类型

我们已经知道Trident 提供了三种类型的spout来服务Trident State管理,那么对应的Trident State也有三种类型:

1. Transactional
2. Opaque Transactional 
3. No-Transactional

二、各类Trident Spout详解

2.1 Transactional spouts

Transactional spouts对batch的发送提供了如下保证:

1. 相同txid的batch完全一样,如果一个batch被重播,重播的batch的txid及其所有tuple和原batch的完全一致
2. 两个batch中的tuple不会有重合
3. 每个tuple都在batch中,不会有batch漏掉某个tuple

这三个特性是“最完美”的保证,也最容易理解,Stream被分割成固定的batch,而且不会改变。Storm就提供了一个Transactional spout的实现:TransactionalTridentKafkaSpout

我们现在再看上面1.1节提到的那个实例,我们要把单词和其数量保存在数据库中,为了保证“数据只被处理一次”,除了要保存单词和数量两个字段之外,我们再加一个字段txid。在更新数据时,我们先对比一下当前的数据的txid和数据库中数据的txid,若txid相同,说明是被重播的数据,直接跳过即可,如果不同,则把两个数值相加即可。

下面具体说明一下,假设当前处理的batch的txid=3,其中的tuples为:

[man]
[man]
[dog]

再假设数据库中保存的数据为:

man => [count=3, txid=1]
dog => [count=4, txid=3]
apple => [count=10, txid=2]

数据库中“man”单词的txid为1,而当前batch的txid为3,说明当前batch中的“man”单词未被累加过,所以需要把当前batch中”man”的个数累加到数据库中。数据库中“dog”单词的txid为3,和当前batch的txid相同,说明已经被累计过了直接跳过。最终数据库中的结果变为:

man => [count=5, txid=1]
dog => [count=4, txid=3]
apple => [count=10, txid=2]

总结一下整个处理过程:

if(database txid=current txid){//两次更新的txid相同
     跳过;
}else{
     用current value替换掉database value;
}

2.2 Opaque Transactional spouts

上面已经提到过,并不是所有情形下都需要保证强一致性。例如在TransactionalTridentKafkaSpout中(关于Kafka相关介绍,点这里),如果它的一个batch中的tuples来自一个topic的所有partitions,如果要满足Transactionnal Spout语义的话,一旦这个batch因为某些失败而被重发,重发batch中的所有tuple必须与这个batch中的完全一致,而恰好kafka集群某个节点down掉导致这个topic其中一个partition无法使用,那么就会导致这个batch无法凑齐所有tuple(无法获取失败partition上的数据),整个处理过程被挂起。而Opaque Transactional spouts就可以解决这个问题。

Opaque Transactional spouts提供了如下保证:

- 每个tuple只在一个batch中被成功处理,如果一个batch中的tuple处理失败的话,会被后面的batch继续处理

怎么理解这个特性呢,简要来说就OpaqueTransactional spout和Transactional spouts基本差不多,只是在Opaque Transactional spout中,相同txid的batch中的tuple集合可能不一样OpaqueTridentKafkaSpout就是符合这种特性的spout的,所以它可以容忍kafka节点失败。

因为重播的batch中的tuple集合可能不一样,所以对于Opaque Transactional Spout,就不能根据txid是否一致来决定是否需要更新状态了。我们需要在数据库中保存更多的状态信息,除了单词名,数量、txid之外,我们还需要保存一个pre-value来记录前一次计算的值。我们再用上面例子具体说明一下。

假设数据库中的记录如下:

{ value = 4,
  preValue = 1,
  txid = 2
}

假设当前batch的count值为2,txid=3。因为当前txid和数据库中的不同,我们需要把preValue替换成value的值,累计value值,然后更新txid值为3,结果如下:

{ value = 6,
  prevValue = 4,
  txid = 3
}

再假设当前batch的count值为1,txid=2。这是当前txid和数据库中的相同,虽然两个txid值相同,但由于两个batch的内容已经变了,所以上次的更新可以忽略掉,需要对数据库中的value值进行重新计算,即把当前值和preValue值相加,结果如下:

{ value =3,
  prevValue = 1,
  txid = 2
}

总结一下整个处理过程:

if(database txid=current txid){
 value=preValue+current value;//重新更新value
 //preValue不变;
}else{
 preValue=value;//更新preValue
 value=preValue+current value;//更新value
 txid=current txid;//更新txid
}

2.3 No-Transactional spouts

No-Transactional spouts对每个batch的内容不做任何保证。如果失败的batch没被重发,它有会出现“最多被处理一次”的请况,如果tuples被多个batch处理,则会发生“最少被处理一次的情况”,很难保证“数据只被处理一次”的情况。

三、Spout和State的类型总结

下面这个表格描述了“数据只被处理一次”的spout/state的类型组合:

这里写图片描述

总的来说, Opaque transactional states即有一定的容错性又能保证数据一致性,但它的代价是需要在数据库中保存更多的状态信息(txid和preValue)。Transactional states虽然需要较少的状态信息(txid),但是它需要transactional spouts的支持。non-transactional states需要在数据库中保存最少的状态信息但难以保证“数据只被处理一次”的语义。

因此,在实际应用中,spout和state类型的选择需要根据我们具体应用需求来决定,当然在容错性和增加存储代价之间也需要做个权衡。

四、State API

上面讲的看上去有点啰嗦,庆幸的是Trident State API 在内部为我们实现了所有状态管理的逻辑,我们不需要再进行诸如对比txid,在数据库中存储多个值等操作,仅需要简单调用Trident API即可,例如:

TridentTopology topology = new TridentTopology();       
TridentState wordCounts =
  topology.newStream("spout1", spout)
    .each(new Fields("sentence"), new Split(), new Fields("word"))
    .groupBy(new Fields("word"))
    .persistentAggregate(MemcachedState.opaque(serverLocations), new Count(), new Fields("count"))               
    .parallelismHint(6);

所有的管理Opaque transactional states状态的逻辑都在MemcachedState.opaque()方法内部实现了。另外,所有的更新操作都是以batch为单位的,这样减少了对数据库的调用次数,极大的提高了效率。下面就向大家介绍一下和Trident State 相关的API。

4.1 State接口

Trident API中最基本的State接口只有两个方法:

public interface State {
    void beginCommit(Long txid);
    void commit(Long txid);
}

State接口只定义了状态什么时候开始更新,什么时候结束更新,并且我们都能获得一个txid。具体这个State如何工作,如何更新State,如何查询State,Trident并没有对此作出限制,我们可以自己任意实现。

假设我们有一个Location数据库,我们要通过Trident查新和更新这个数据库,那么我们可以自己实现这样一个LocationDB State,因为我们需要查询和更新,所以我们为这个LocationDB 可以添加对Location的get和set的实现:

public class LocationDB implements State {
    public void beginCommit(Long txid) {   
    }
    public void commit(Long txid) {   
    }
    public void setLocation(long userId, String location) {
  // code to access database and set location
    }
    public String getLocation(long userId) {
  // code to get location from database
    }
 }

4.2 StateFactory工厂接口

Trident提供了State Factory接口,我们实现了这个接口之后,Trident 就可以通过这个接口获得具体的Trident State实例了,下面我们就实现一个可以制造LocationDB实例的LocationDBFactory:

public class LocationDBFactory implements StateFactory {
   public State makeState(Map conf, int partitionIndex, int numPartitions) {
      return new LocationDB();
   } 
}

4.3 QueryFunction接口

这个接口是用来帮助Trident查询一个State,这个接口定义了两个方法:

public interface QueryFunction<S extends State, T> extends EachOperation {
    List<T> batchRetrieve(S state, List<TridentTuple> args);
    void execute(TridentTuple tuple, T result, TridentCollector collector);
}

接口的第一个方法batchRetrieve()有两个参数,分别是要查询的State源和查询参数,因为trident都是以batch为单位处理的,所以这个查询参数是一个List<TridentTuple>集合。关于第二个方法execute()有三个参数,第一个代表查询参数中的某个tuple,第二个代表这个查询参数tuple对应的查询结果,第三个则是一个消息发送器。下面就看一个QuaryLocation的实例:

public class QueryLocation extends BaseQueryFunction<LocationDB, String> {
    public List<String> batchRetrieve(LocationDB state, List<TridentTuple> inputs) {
    List<String> ret = new ArrayList();
    for(TridentTuple input: inputs) {
        ret.add(state.getLocation(input.getLong(0)));
    }
    return ret;
}

public void execute(TridentTuple tuple, String location, TridentCollector collector) {
    collector.emit(new Values(location));
    }   
}

QueryLocation接收到Trident发送的查询参数,参数是一个batch,batch中tuple内容是userId信息,然后batchRetrieve()方法负责从State源中获取每个userId对应的的location。最终batchRetrieve()查询的结果会被execute()方法发送出去。

但这里有个问题,batchRetrieve()方法中针对每个userid都做了一次查询State操作,这样处理显然效率不高,也不符合Trident所有操作都是针对batch的原则。所以,我们要对LocationDB这个State做一下改造,提供一个bulkGetLocations()方法来替换掉getLocation()方法,请看改造后的LocationDB的实现:

public class LocationDB implements State {
    public void beginCommit(Long txid) {   
    }
    public void commit(Long txid) {   
    }
    public void setLocationsBulk(List<Long> userIds, List<String> locations) {
  // set locations in bulk
    }
    public List<String> bulkGetLocations(List<Long> userIds) {
  // get locations in bulk
    }
 }

我们可以看到,改造的LocationDB对Location的查询和更新都是批量操作的,这样显然可以提高处理效率。此时,我们再稍微改一下QueryFunction中batchRetrieve()方法:

public class QueryLocation extends BaseQueryFunction<LocationDB, String> {
    public List<String> batchRetrieve(LocationDB state, List<TridentTuple> inputs) {
        List<Long> userIds = new ArrayList<Long>();
        for(TridentTuple input: inputs) {
            userIds.add(input.getLong(0));
        }
        return state.bulkGetLocations(userIds);
    }

    public void execute(TridentTuple tuple, String location, TridentCollector collector) {
        collector.emit(new Values(location));
    }   
}

QueryLocation在topology中可以这么使用:

TridentTopology topology = new TridentTopology();
topology.newStream("myspout", spout)
    .stateQuery(locations, new Fields("userid"), new QueryLocation(), new Fields("location"))

4.4 UpdateState接口

当我们要更新一个State源时,我们需要实现一个UpdateState接口。UpdateState接口只提供了一个方法:

public interface StateUpdater<S extends State> extends Operation {
    void updateState(S state, List<TridentTuple> tuples, TridentCollector collector);
}

下面我们来具体看一下LocationUpdater的实现:

public class LocationUpdater extends BaseStateUpdater<LocationDB> {
    public void updateState(LocationDB state, List<TridentTuple> tuples, TridentCollector collector) {
        List<Long> ids = new ArrayList<Long>();
        List<String> locations = new ArrayList<String>();
        for(TridentTuple t: tuples) {
            ids.add(t.getLong(0));
            locations.add(t.getString(1));
        }
        state.setLocationsBulk(ids, locations);
    }
}

对于LocationUpdater在topology中可以这么使用:

TridentTopology topology = new TridentTopology();
TridentState locations = 
topology.newStream("locations", locationsSpout)

通过调用Trident Stream的partitionPersist方法可以更新一个State。在上面这个实例中,LocationUpdater接收一个State和要更新的batch,最终通过调用LocationFactory制造的LocationDB中的setLocationsBulk()方法把batch中的userid及其location批量更新到State中。

partitionPersist操作会返回一个TridentState对象,这个对象即是被TridentTopology更新后的LocationDB,所以,我们可以在topology中续继续对这个返回的State做查询操作。

另外一点需要注意的是,从上面StateUpdater接口可以看出,在它的updateState()方法中还提供了一个TridentCollector,因此在执行StateUpdate的同时仍然可以形成一个新的Stream。若要操作StateUpdater形成的Stream,可以通过调用TridentState.newValueStream()方法实现。

五、persistentAggregate

Trident另一个update state的方法时persistentAggregate,请看下面word count的例子:

TridentTopology topology = new TridentTopology();       
TridentState wordCounts =
  topology.newStream("spout1", spout)
    .each(new Fields("sentence"), new Split(), new Fields("word"))
    .groupBy(new Fields("word"))
    .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))

5.1 MapState接口

persistentAggregate是在partitionPersist之上的另一个抽象,它会对Trident Stream进行聚合之后再把聚合结果更新到State中。在上面这个例子中,因为聚合的是一个groupedStream,Trident要求这种情况下State需要实现MapState接口,被grouped的字段会被做为MapSate的key,被grouped的数据计算的结果会被做为MapSate的value。MapSate接口定义如下:

public interface MapState<T> extends State {
    List<T> multiGet(List<List<Object>> keys);
    List<T> multiUpdate(List<List<Object>> keys, List<ValueUpdater> updaters);
    void multiPut(List<List<Object>> keys, List<T> vals);
}

对于MapSate的实现有MemoryMapState

5.2 Snapshottable接口

如果我们聚合的不是一个groupedStream,Trident要求我们的State实现Snapshottable接口:

public interface Snapshottable<T> extends State {
    T get();
    T update(ValueUpdater updater);
    void set(T o);
}

对于Snapshottable的实现有 MemcachedState

六、Map States的实现

在Trident中实现MapState很简单,大部分工作Trident已经替我们做了。OpaqueMap,TransactionalMap, 和NonTransactionalMap类已经替我们完成了和容错相关的处理逻辑. 我们仅仅提供一个 IBackingMap的实现类即可, IBackingMap的接口定义如下:

public interface IBackingMap<T> {
    List<T> multiGet(List<List<Object>> keys); 
    void multiPut(List<List<Object>> keys, List<T> vals); 
}

OpaqueMap’s调用的multiPut将会把value值自动封装成OpaqueValue来处理, TransactionalMap’s 将会把value封装成TransactionalValue再进行处理, 而NonTransactionalMaps 则不会对value做处理,直接传递给topology。

另外,
Trident提供的CachedMap 类会对Map中的key/value做自动的LRU缓存 。
Trident提供的SnapshottableMap类会把MapState转换成SnapShottable对象(把MapState中的所有key/value对聚合成一个固定的key)。

详细更详细的了解整个MapState的实现过程,请查看 MemcachedState 的实现,MemcachedState除了把上面介绍的相关接口整合到一起之外,还提供了对opaque transactional, transactional, non-transactional三个语义规则的支持。

目录
相关文章
|
8月前
|
存储 缓存 资源调度
Flink_state 的优化与 remote_state 的探索
bilibili 资深开发工程师张杨,在 Flink Forward Asia 2022 核心技术专场的分享。
407 1
Flink_state 的优化与 remote_state 的探索
|
8月前
|
索引
beamManagement(二)TCI-state/QCL
上一篇讲解了idle初始接入阶段,基站和UE用SSB的索引,关联PRACH的发送时刻比较内涵的指示了波束信息;在RRC建立进入connected mode后,就可以通过TCI State来指示波束信息, 为利于后续内容理解,这里先看下TCI-state及QCL的概念。
|
消息中间件 存储 并行计算
The Soft Link between IM Level Storage Location and WM level Storage Type?(2)
The Soft Link between IM Level Storage Location and WM level Storage Type?(2)
The Soft Link between IM Level Storage Location and WM level Storage Type?(2)
The Soft Link between IM Level Storage Location and WM level Storage Type?(1)
The Soft Link between IM Level Storage Location and WM level Storage Type?(1)
The Soft Link between IM Level Storage Location and WM level Storage Type?(1)
|
存储 消息中间件 缓存
storm笔记:Trident状态
在storm笔记:Trident应用中说了下Trident的使用,这里说下Trident几种状态的变化及其对应API的使用。
74 0
storm笔记:Trident状态
|
NoSQL