1、简述
Chronicle Queue 是一款高性能、低延迟的日志和消息队列库,专为低延迟应用设计,特别是在金融系统、交易系统等对数据吞吐和实时性要求极高的场景中广泛使用。它主要通过内存映射文件(memory-mapped files)将数据直接写入磁盘,从而实现了接近于内存的性能,同时提供持久化能力。
核心特性
🔥 超低延迟:Chronicle Queue 使用内存映射文件技术,使得数据的读写性能极其接近内存操作,非常适合对性能要求极高的应用场景。
🔥 持久化:虽然其性能非常接近内存操作,但数据实际存储在磁盘上,确保了数据的持久化。
🔥 无锁设计:Chronicle Queue 是无锁设计,避免了多线程竞争带来的性能损失。
🔥 时序日志:所有数据按时间顺序存储,便于时间回溯和数据重放。
🔥 文件轮换:支持按时间或大小进行日志文件轮换,有助于日志管理和归档。
本文将详细介绍 Chronicle Queue 的使用场景、核心特性,以及如何在 Java 中进行集成,最后通过详细代码示例演示其使用。

2、实践样例
2.1 引入依赖
在 pom.xml 文件中引入 Chronicle Queue 依赖:
<dependency>
<groupId>net.openhft</groupId>
<artifactId>chronicle-queue</artifactId>
<version>5.21.20</version> <!-- 请使用最新版本 -->
</dependency>
2.2 创建 Queue
Chronicle Queue 使用非常简单,首先你需要创建一个队列实例。队列数据会被写入到磁盘上的文件中:
import net.openhft.chronicle.queue.ChronicleQueue;
public class ChronicleQueueExample {
public static void main(String[] args) {
try (ChronicleQueue queue = ChronicleQueue.single("queue-dir")) {
System.out.println("Chronicle Queue 已创建并存储在 queue-dir 文件夹中");
}
}
}
以上代码将在 queue-dir 目录中创建一个 Chronicle Queue 队列。
2.3 生产者写入数据
接下来我们要向队列中写入数据,Chronicle Queue 提供了 Appender 用于将消息写入队列。以下代码展示了如何使用生产者写入消息:
import net.openhft.chronicle.queue.ChronicleQueue;
import net.openhft.chronicle.queue.ExcerptAppender;
public class Producer {
public static void main(String[] args) {
try (ChronicleQueue queue = ChronicleQueue.single("queue-dir")) {
ExcerptAppender appender = queue.acquireAppender();
// 写入数据
for (int i = 0; i < 10; i++) {
appender.writeText("Message " + i);
System.out.println("Written: Message " + i);
}
}
}
}
2.4 消费者读取数据
消费者读取数据非常简单,Chronicle Queue 提供了 Tailer 用于读取队列中的消息。以下代码展示了如何读取生产者写入的消息:
import net.openhft.chronicle.queue.ChronicleQueue;
import net.openhft.chronicle.queue.ExcerptTailer;
public class Consumer {
public static void main(String[] args) {
try (ChronicleQueue queue = ChronicleQueue.single("queue-dir")) {
ExcerptTailer tailer = queue.createTailer();
String message;
while ((message = tailer.readText()) != null) {
System.out.println("Read: " + message);
}
}
}
}
3、场景运用
Chronicle Queue 除了基本的读写消息之外,还有一些高级用法,能够满足更多复杂的需求和场景。以下介绍几个常见的高级应用:
🔥 高频交易系统:如金融领域中的订单撮合引擎或行情数据系统,要求极低延迟的消息传递。
🔥 日志记录:能够快速记录大量日志,且能根据时间顺序快速读取。
🔥 实时数据传输:需要快速传递数据,且具备高吞吐量的应用。
3.1 自定义对象的序列化和反序列化
Chronicle Queue 默认处理的是字符串类型数据,但我们经常需要传递更复杂的 Java 对象。Chronicle Queue 提供了序列化支持,可以将自定义的对象写入队列,并在消费时进行反序列化。首先,我们需要实现自定义的 Java 对象,并使用 BytesMarshallable 接口来支持序列化:
import net.openhft.chronicle.bytes.BytesIn;
import net.openhft.chronicle.bytes.BytesOut;
import net.openhft.chronicle.bytes.BytesMarshallable;
public class Order implements BytesMarshallable {
private String orderId;
private double amount;
public Order(String orderId, double amount) {
this.orderId = orderId;
this.amount = amount;
}
@Override
public void writeMarshallable(BytesOut bytes) {
bytes.writeUtf8(orderId);
bytes.writeDouble(amount);
}
@Override
public void readMarshallable(BytesIn bytes) {
this.orderId = bytes.readUtf8();
this.amount = bytes.readDouble();
}
@Override
public String toString() {
return "Order{" +
"orderId='" + orderId + '\'' +
", amount=" + amount +
'}';
}
}
生产者写入自定义对象:
import net.openhft.chronicle.queue.ChronicleQueue;
import net.openhft.chronicle.queue.ExcerptAppender;
public class CustomObjectProducer {
public static void main(String[] args) {
try (ChronicleQueue queue = ChronicleQueue.single("object-queue-dir")) {
ExcerptAppender appender = queue.acquireAppender();
// 写入自定义对象
for (int i = 0; i < 5; i++) {
Order order = new Order("Order" + i, 100 + i);
appender.writeDocument(order);
System.out.println("Written: " + order);
}
}
}
}
消费者读取自定义对象:
import net.openhft.chronicle.queue.ChronicleQueue;
import net.openhft.chronicle.queue.ExcerptTailer;
public class CustomObjectConsumer {
public static void main(String[] args) {
try (ChronicleQueue queue = ChronicleQueue.single("object-queue-dir")) {
ExcerptTailer tailer = queue.createTailer();
// 读取自定义对象
Order order;
while (tailer.readDocument(o -> order = (Order) o)) {
System.out.println("Read: " + order);
}
}
}
}
3.2 多消费者模式
Chronicle Queue 支持多消费者同时读取队列中的消息。每个消费者都有独立的 Tailer,可以在不同的位置读取队列的数据。通过这种方式,可以实现多个消费者处理不同的数据分片。
import net.openhft.chronicle.queue.ChronicleQueue;
import net.openhft.chronicle.queue.ExcerptTailer;
public class MultiConsumer {
public static void main(String[] args) {
try (ChronicleQueue queue = ChronicleQueue.single("multi-consumer-queue")) {
ExcerptTailer tailer1 = queue.createTailer("consumer1");
ExcerptTailer tailer2 = queue.createTailer("consumer2");
String message;
System.out.println("Consumer 1 reading:");
while ((message = tailer1.readText()) != null) {
System.out.println("Consumer 1 read: " + message);
}
System.out.println("Consumer 2 reading:");
while ((message = tailer2.readText()) != null) {
System.out.println("Consumer 2 read: " + message);
}
}
}
}
在此模式下,不同消费者可以同时消费消息,而不会干扰彼此。
3.3 高级的文件轮换策略
除了按时间轮换文件(如按天、按小时),Chronicle Queue 还支持按文件大小进行轮换。当文件达到设定的大小时,它将自动创建一个新文件来存储后续数据。
import net.openhft.chronicle.queue.ChronicleQueue;
import net.openhft.chronicle.queue.RollCycles;
public class SizeBasedRollingExample {
public static void main(String[] args) {
try (ChronicleQueue queue = ChronicleQueue.singleBuilder("size-queue-dir")
.rollCycle(RollCycles.SMALL_HOURLY).blockSize(1 << 20).build()) {
ExcerptAppender appender = queue.acquireAppender();
// 写入数据
for (int i = 0; i < 1000; i++) {
appender.writeText("Large message " + i);
}
System.out.println("Data written with size-based file rolling.");
}
}
}
在此示例中,我们使用 RollCycles.SMALL_HOURLY 和 blockSize 设置每个文件的大小限制。
3.4 自定义 RollCycle
Chronicle Queue 支持通过 RollCycle 定制轮换策略。你可以根据业务需求定义自己的轮换规则。
import net.openhft.chronicle.queue.RollCycle;
import net.openhft.chronicle.wire.SequenceGenerator;
public enum CustomRollCycle implements RollCycle {
EVERY_FIVE_MINUTES("yyyyMMdd-HHmm", 60 * 1000 * 5);
private final String format;
private final int length;
CustomRollCycle(String format, int length) {
this.format = format;
this.length = length;
}
@Override
public String format() {
return format;
}
@Override
public int lengthInMillis() {
return length;
}
@Override
public SequenceGenerator toSequenceGenerator() {
return null; // 使用默认的序列生成器
}
}
然后在创建队列时使用这个自定义的 RollCycle。
3.5 Chronical Queue 和 Replication(复制)
在分布式系统中,Chronicle Queue 还支持将数据在多个实例间进行复制。通过 Chronicle Queue Replication,你可以将数据同步到不同的节点,以保证高可用性和数据冗余。
import net.openhft.chronicle.queue.ChronicleQueue;
import net.openhft.chronicle.queue.replication.TcpReplicationHub;
public class ReplicatedQueueExample {
public static void main(String[] args) {
try (ChronicleQueue queue = ChronicleQueue.single("replicated-queue")) {
TcpReplicationHub hub = TcpReplicationHub.builder().host("localhost").port(8080).build();
// 配置队列为复制模式
queue.withReplication(hub.createReplicationChannel(1));
// 接下来可以像普通的 Chronicle Queue 那样写入数据
ExcerptAppender appender = queue.acquireAppender();
appender.writeText("Replicated message");
}
}
}
通过这种方式,你可以将队列中的数据复制到其他节点上,实现数据的高可用性。
4、总结
Chronicle Queue 是一款为高性能需求而设计的轻量级日志和消息队列工具,提供了极低的延迟、持久化和高吞吐量的解决方案。通过内存映射文件的方式,将数据高效地持久化到磁盘中,可以满足对性能和稳定性要求极高的场景,尤其适合金融行业和实时数据系统。
🔥 高效读写:无论是读写性能还是时序日志的顺序访问,都具有非常高的效率。
🔥 高并发无锁设计:由于其设计是无锁的,因此非常适合在高并发场景下使用,极大减少了线程间的竞争。
🔥 数据持久化:队列数据虽然高效,但同时也会自动持久化到磁盘中,不会丢失重要的业务日志或消息数据。
在本文中,我们介绍了 Chronicle Queue 的核心特性,并通过具体的代码展示了如何在 Java 中使用 Chronicle Queue 搭建一个简单的消息队列。