基于curator的延迟队列

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介:

基于curator的延迟队列
这里不介绍关于curator的用法及优劣,旨在探究curator对于延迟队列的使用原理

怎么使用

 <groupId>org.apache.curator</groupId>
 <artifactId>curator-recipes</artifactId>
 <version>4.0.1</version>

 <groupId>org.apache.curator</groupId>
 <artifactId>curator-framework</artifactId>
 <version>4.0.1</version>


public class Processor {

private final static CuratorFramework client;
private final static DistributedDelayQueue<String> queue;

static{
    ZookeeperConfig config = ZookeeperConfig.getConfig();
    // create client
    client = CuratorFrameworkFactory.newClient(config.getRegistryAddress(),
            new ExponentialBackoffRetry(3000, 2));
    // build queue
    queue = QueueBuilder.builder(client, new AutoSubmitConsumer(),
            new AutoSubmitQueueSerializer(), DelayQueueEnum.AUTO_SUBMIT.getPath())
            .buildDelayQueue();
    // 开启执行计划
    enable();
}

/**
 * 生产数据
 *
 * @param id
 * @param endTime
 * @throws Exception
 */
public void producer(String id, Date endTime) throws Exception {
    queue.put(id, endTime.getTime());
}
private static void enable(){
    try {
        client.start();
        queue.start();
    } catch (Exception e) {
        logger.error("enable queue fail, exception:{}", e);
    }
}

}
// Serializer
class AutoSubmitQueueSerializer implements QueueSerializer {

@Override
public byte[] serialize(String s) {
     return s.getBytes("utf-8");
}

@Override
public String deserialize(byte[] bytes) {
    return new String(bytes);
}

}

// consumer
AutoSubmitConsumer implements QueueConsumer {

@Override
public void consumeMessage(String id)  {
    logger.info("consumeMessage, :{}", id);
      // service processor.
    logger.info("consumeMessage# auto submit end, result:{}, id:{}", result, id);
}

@Override
public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
}

}
是临时节点还是持久化节点,如果基于内存的话客户端或者服务端挂了以后就会存在数据丢失的问题? 是否会重新排序,zk是按照请求的时间先后顺序写入的,那么curator是怎么监听到期时间的呢?

猜想
是否持久化
是否会在每次请求的时候拿到服务端所有的节点数据进行排序后存入到服务端
验证
针对第一点,我们关闭zookeeper服务端和客户端后重新启动后之前的节点还存在所以是持久化节点
通过客户端工具连接zookeeper发现并不会每次请求的时候都会重新排序,也就是说可能在client端进行处理的
以下是在客户端工具上截取的一部分信息,key是由三部分组成的,第一部分固定的queue- , 第二部分暂不确定,第三部分是节点的序号 

源码求证
// org.apache.curator.framework.recipes.queue.DistributedQueue#start
// 部分片段
client.create().creatingParentContainersIfNeeded().forPath(queuePath);
if ( !isProducerOnly )

    {
        service.submit
            (
                new Callable<Object>()
                {
                    @Override
                    public Object call()
                    {
                        runLoop(); // step1
                        return null;
                    }
                }
            );
    }

// org.apache.curator.framework.recipes.queue.DistributedQueue#runLoop
// step1中的代码片段
while ( state.get() == State.STARTED )

        {
            try
            {
                ChildrenCache.Data      data = (maxWaitMs > 0) ? childrenCache.blockingNextGetData(currentVersion, maxWaitMs, TimeUnit.MILLISECONDS) : childrenCache.blockingNextGetData(currentVersion);
                currentVersion = data.version;
                // 诸如:
                //queue-|2E1D86A3BB6|0000000019
                //queue-|1712F752AA0|0000000036
                //queue-|1712F76FF60|0000000035
        // 拿到所有的子节点
                List<String> children = Lists.newArrayList(data.children); 
                // 根据过期时间排序
            // step6
                sortChildren(children); 
        // 排序后
                //queue-|1712F752AA0|0000000036
                //queue-|1712F76FF60|0000000035
                //queue-|2E1D86A3BB6|0000000019
                if ( children.size() > 0 )
                { //获取到期时间
                    maxWaitMs = getDelay(children.get(0));
                   
                    if ( maxWaitMs > 0 ) continue;
                }
                else  continue;
               // 死循环不断轮询是否有满足条件的节点;
               // 只要有满足条件的节点就将整个排序后的集合往下传递
                processChildren(children, currentVersion); // step2
            }
           
        }

// org.apache.curator.framework.recipes.queue.DistributedQueue#processChildren
// step2对应的代码片段:
private void processChildren(List children, long currentVersion)

{
    final Semaphore processedLatch = new Semaphore(0);
    final boolean   isUsingLockSafety = (lockPath != null);
    int             min = minItemsBeforeRefresh;
    for ( final String itemNode : children )
    {
        if ( Thread.currentThread().isInterrupted() )
        {
            processedLatch.release(children.size());
            break;
        }

        if ( !itemNode.startsWith(QUEUE_ITEM_NAME) )
        {
            processedLatch.release();
            continue;
        }

        if ( min-- <= 0 )
        {
            if ( refreshOnWatch && (currentVersion != childrenCache.getData().version) )
            {
                processedLatch.release(children.size());
                break;
            }
        }
    // step3
        if ( getDelay(itemNode) > 0 )
        {
            processedLatch.release();
            continue;
        }
        //这里使用了线程池,为了保证每一个节点都执行完毕后才返回方法所以使用了信号灯
        executor.execute
        (
            new Runnable()
            {
                @Override
                public void run()
                {
                    try
                    {
                        //是否采用了分布式锁,因为我们初始化的时候并未使用所以没有用到这里的安全锁,实际上是进入到了else中
                        if ( isUsingLockSafety )
                        {
                            
                            processWithLockSafety(itemNode, ProcessType.NORMAL);
                        }
                        else
                        {
                // 看这里 step4
                            processNormally(itemNode, ProcessType.NORMAL);
                        }
                    }finally
                    {
                        processedLatch.release();
                    }
                }
            }
        );
    }

    processedLatch.acquire(children.size());
}

// org.apache.curator.framework.recipes.queue.DistributedQueue#getDelay(java.lang.String)
// 对应step3处的代码片段
protected long getDelay(String itemNode)

        {
            return getDelay(itemNode, System.currentTimeMillis());
        }
        
        private long getDelay(String itemNode, long sortTime)
        {  // 会从key上获取时间戳        
    // step5
            long epoch = getEpoch(itemNode); 
            return epoch - sortTime; // 计算过期时间
        }

// 对应step5处的代码
private static long getEpoch(String itemNode)

{
// itemNode -> queue-|时间戳|序号
    int     index2 = itemNode.lastIndexOf(SEPARATOR);
    int     index1 = (index2 > 0) ? itemNode.lastIndexOf(SEPARATOR, index2 - 1) : -1;
    if ( (index1 > 0) && (index2 > (index1 + 1)) )
    {
        try
        {
            String  epochStr = itemNode.substring(index1 + 1, index2);
            return Long.parseLong(epochStr, 16); // 从这里可以知道queue-|这里是16进制的时间戳了|序号| 可能是出于key长度的考量吧(更节省内存),用10进制的时间戳会长很多
        }
    }
    return 0;
}

// org.apache.curator.framework.recipes.queue.DistributedQueue#sortChildren
// 会根据延时时间排序
// step6处的代码片段
protected void sortChildren(List children)

        {
            final long sortTime = System.currentTimeMillis();
            Collections.sort
            (
                children,
                new Comparator<String>()
                {
                    @Override
                    public int compare(String o1, String o2)
                    {
                        long        diff = getDelay(o1, sortTime) - getDelay(o2, sortTime);
                        return (diff < 0) ? -1 : ((diff > 0) ? 1 : 0);
                    }
                }
            );
        }

// 对应step4处的代码片段
private boolean processNormally(String itemNode, ProcessType type) throws Exception

{
    try
    {
        String  itemPath = ZKPaths.makePath(queuePath, itemNode);
        Stat    stat = new Stat();

        byte[]  bytes = null;
        if ( type == ProcessType.NORMAL )
        {
            // 获取key对应的value
            bytes = client.getData().storingStatIn(stat).forPath(itemPath);
        }
        if ( client.getState() == CuratorFrameworkState.STARTED )
        {
           // 移除节点
                        client.delete().withVersion(stat.getVersion()).forPath(itemPath);
        }

        if ( type == ProcessType.NORMAL )
        {
        //step7
            processMessageBytes(itemNode, bytes);
        }

        return true;
    }

    return false;
}

//对应step7处代码,会回调我们的业务代码
private ProcessMessageBytesCode processMessageBytes(String itemNode, byte[] bytes) throws Exception

{
    ProcessMessageBytesCode     resultCode = ProcessMessageBytesCode.NORMAL;
    MultiItem<T>                items;
    try
    {
      // 根据我们定义的序列化器序列化
        items = ItemSerializer.deserialize(bytes, serializer);
    }

    for(;;)
    {
     // 省略一部分代码
        try
        {
            consumer.consumeMessage(item); // 这里就会回调到我们的业务代码
        }
    }
    return resultCode;
}

总结
org.apache.curator.framework.recipes.queue.DistributedQueue#internalCreateNode这个方法也证实了确实是持久化且有序的节点;
如果过期时间太长而数据生产的过于频繁的话,那么势必会造成数据的积压对于性能和内存都是很大的考验;
而且是客户端不断的循环获取所有的节点、排序、再处理,由此我们也证明了前面猜想是排序后在服务端重新添加所有节点每次监听第一个节点变化的想法看来是错误的;
原文地址https://my.oschina.net/u/2486137/blog/3215445

相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
相关文章
|
6月前
|
消息中间件 存储 Java
RabbitMQ之延迟队列解读
RabbitMQ之延迟队列解读
|
2月前
|
NoSQL Java Redis
分布式延时消息的另外一种选择 Redisson (推荐使用)
来源: https://blog.csdn.net/m0_73311735/article/details/127070042 因为工作中需要用到分布式的延时队列,调研了一段时间,选择使用 Redisson DelayedQueue,为了搞清楚内部运行流程,特记录下来。 总体流程大概是图中的这个样子,初看一眼有点不知从何下手,接下来我会通过以下几点来分析流程,相信看完本文你能了解整个运行流程。 基本使用 内部数据结构介绍 基本流程 发送延时消息 获取延时消息 初始化延时队列 图片 基本使用 发送延迟消息代码如下,发送了一条延迟时间为 5s 的消息。 public void prod
|
3月前
|
消息中间件 存储 Java
RabbitMQ之延迟队列(手把手教你学习延迟队列)
【1月更文挑战第12天】延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列的。
323 1
|
7月前
|
消息中间件 Java Docker
RabbitMQ 如何实现延迟队列?
RabbitMQ 如何实现延迟队列?
333 1
|
9月前
|
消息中间件 存储 调度
RabbitMQ的延迟队列
RabbitMQ的延迟队列是一种特殊的队列,可以在消息发送后延迟一段时间后再将消息投递给消费者。
148 0
|
9月前
|
消息中间件 Java 数据库
RibbitMQ学习笔记延迟队列(一)
RibbitMQ学习笔记延迟队列
39 0
|
9月前
|
消息中间件 存储 NoSQL
RibbitMQ学习笔记延迟队列(二)
RibbitMQ学习笔记延迟队列
52 0
|
9月前
|
消息中间件 存储 中间件
RabbitMQ的延迟队列
RabbitMQ是一个开源的消息队列中间件,它提供了可靠的消息传递机制,被广泛应用于分布式系统中。延迟队列是RabbitMQ中常用的一种队列类型,它可以用来实现消息的延时投递。
179 0
|
消息中间件 存储 NoSQL
RabbitMQ学习(九):延迟队列
延时队列中,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望 在指定时间到了以后或之前取出和处理。简单来说,延时队列就是用来存放需要在指定时间内被处理的 元素的队列。 其实延迟队列就是死信队列的一种。
245 0
RabbitMQ学习(九):延迟队列
|
消息中间件 Java Kafka
RabbitMQ没有延时队列?我就教你一招,玩转延时队列
延时队列:顾名思义,是一个用于做消息延时消费的队列。但是它也是一个普通队列,所以它具备普通队列的特性,相比之下,延时的特性就是它最大的特点。所谓的延时就是将我们需要的消息,延迟多久之后被消费。普通队列是即时消费的,延时队列是根据延时时间,多久之后才能消费的。