加群联系作者vx:xiaoda0423
仓库地址:https://webvueblog.github.io/JavaPlusDoc/
“一个基于锁和条件变量(Condition)实现的简易版 Future,用来在某个线程中等待结果,直到被另一个线程显式唤醒并传递结果。”
🔵 举个简单使用场景:
假设你在做 异步RPC调用、异步消息处理这类事情:
-
线程A 发起请求,但不知道什么时候结果返回,于是 await() 等待。
-
当线程B 收到响应时,调用 signal(result) 把数据塞进来并唤醒线程A。
小小示意:
CondFuture<String> condFuture = new CondFuture<>();
// 线程A:请求并等待
new Thread(() -> {
try {
String result = condFuture.await(5, TimeUnit.SECONDS);
System.out.println("Got result: " + result);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
// 线程B:稍后响应
new Thread(() -> {
try {
Thread.sleep(1000); // 假装处理了 1秒
condFuture.signal("Response is here!");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
输出:
Got result: Response is here!
流量、高并发 IoT 项目(车联网平台)的标准实践
Cassandra 分库分节点 |
xx、xx、业务库分开,减少热数据干扰,提升查询性能 |
Redis 主备分离 |
热数据实时存储 + 备份灾容,快速读写,减少单点故障 |
Kafka producer/batch 配置 |
批量提交,延迟优化,内存缓冲提升消息发送吞吐量 |
Kafka consumer 手动ack + concurrency |
保证消息消费可靠性,并通过多线程加速消费处理 |
Elasticsearch restclient |
轻量高效,适合大数据量检索,降低TCP连接开销 |
高阶版 JT808 平台服务部署架构图
JT808服务 |
Netty线程模型+心跳检测+过期剔除 |
Redis |
主备双写,主节点故障快速切换备节点 |
Cassandra |
按业务分库,数据分离,提升查询效率 |
Kafka |
批量提交+异步ACK+3节点副本保障高可用 |
Elasticsearch |
异步写入+批量提交+慢查询优化 |
消费者组 |
线程池消费,超时监控,重试机制 |
超实用部署建议
Redis |
主备节点使用keepalived+vip做高可用切换 |
Cassandra |
每个库单独维护表,设置合理TTL(过期时间)清理旧数据 |
Kafka |
生产者开启幂等性 (enable.idempotence=true) ,防止重复投递 |
ES |
热数据分离,老数据定期归档,写入前做Bulk压缩 |
JT808服务器 |
增加防粘包拆包处理,最大包长校验,保护系统 |
全链路 |
接入链路追踪系统(如Skywalking、Zipkin),监控数据流向 |
Topic属性设置建议(重点!)
replication.factor |
3 |
防止节点宕机丢数据 |
min.insync.replicas |
2 |
写入必须至少2副本成功 |
acks |
all |
生产者端保证强一致性 |
enable.idempotence |
true |
开启幂等性,防止重复消息 |
retention.ms |
7天(604800000)或按业务定 |
保留时间够补偿 |
cleanup.policy |
delete(默认) |
保证磁盘可控,避免膨胀 |
segment.ms |
1小时 |
切小日志段,提升查询速度 |
📋 Kafka 参数调优表(超详细版)
🛠️ Producer 参数调优
acks | all |
等所有副本确认才算写成功 |
最强数据可靠性 |
retries | 3 ~ 5 |
发送失败自动重试次数 |
防止瞬时抖动丢消息 |
enable.idempotence | true |
开启幂等性,避免重复投递 |
高一致性必开 |
batch.size | 32KB ~ 64KB |
单批次最大字节数 |
调大提高吞吐量,减少IO次数 |
linger.ms | 5 ~ 10 |
批量发送延迟(毫秒) |
稍微延迟换更大批次(减少压Kafka) |
buffer.memory | 64MB ~ 128MB |
生产者内存缓冲池大小 |
内存富裕就调大(抗突发) |
compression.type | lz4
或 snappy |
压缩算法 |
降低网络带宽,提升吞吐 |
max.request.size | 1MB
或更大 |
单条消息最大尺寸 |
避免超大消息被拒 |
request.timeout.ms | 30s |
请求超时毫秒数 |
保证重试/失败及时切换 |
delivery.timeout.ms | 60s |
允许最大发送时间 |
配合 retries 效果好 |
🛠️ Consumer 参数调优
fetch.min.bytes | 1KB ~ 10KB |
最小抓取字节 |
抓取更多消息,减少拉取次数 |
fetch.max.bytes | 5MB ~ 10MB |
单次最大拉取量 |
合理拉大,防止吞吐低 |
fetch.max.wait.ms | 500ms |
最长等待时间 |
配合 batch 消费更流畅 |
max.poll.records | 500 ~ 1000 |
每次拉取最大记录数 |
批量处理提升效率 |
session.timeout.ms | 10s ~ 20s |
消费组心跳超时时间 |
保持平稳Rebalance |
heartbeat.interval.ms | 3s ~ 5s |
心跳间隔 |
配合session.timeout |
enable.auto.commit | false |
关闭自动提交 |
手动控制 offset,确保精确一次 |
max.partition.fetch.bytes | 1MB |
单分区最大拉取量 |
高分区场景要调大 |
🛠️ Broker 集群端参数调优
num.network.threads | 3 ~ 8 |
网络线程数 |
跟broker流量量级有关 |
num.io.threads | 8 ~ 16 |
IO线程数(磁盘/网络) |
跟磁盘/吞吐相关 |
log.dirs |
多磁盘挂载 |
日志存储目录 |
多路径并发刷盘更快 |
log.segment.bytes | 512MB ~ 1GB |
分段文件大小 |
文件过小影响性能 |
log.retention.hours | 72h
(3天)或按需 |
日志保留时间 |
结合业务定策略 |
message.max.bytes | 1MB
或更大 |
单消息最大字节数 |
跟 producer 端对应 |
replica.fetch.max.bytes | 10MB |
副本同步最大字节 |
防止副本落后太多 |
socket.request.max.bytes | 100MB |
socket请求最大字节数 |
保护broker防止OOM |
auto.create.topics.enable | false |
禁止自动创建 topic |
统一topic管理 |
【实战Tips】
-
吞吐优先→ 调大 batch.size、buffer.memory,使用压缩。
-
可靠性优先→ 开启 acks=all、幂等、合理设置 retries。
-
高并发低延迟→ 合理调 max.poll.records、fetch参数。
-
集群容灾→ Replication Factor = 3,ISR列表控制好(min.insync.replicas=2)。
-
异常处理→ 配置 DLQ(死信队列)+ 自定义拦截器(如 ProducerInterceptor 抓异常)。
📋 Kafka 流量压测工具推荐表
1. 官方自带工具 – kafka-producer-perf-test.sh
工具位置
一般在安装包里的:
$KAFKA_HOME/bin/kafka-producer-perf-test.sh
基础使用示例
./kafka-producer-perf-test.sh \\
–topic test-topic \\
–num-records 1000000 \\
–record-size 512 \\
–throughput -1 \\
–producer-props bootstrap.servers=localhost:9092
–topic |
压测用的 Topic |
–num-records |
要发送的消息总数 |
–record-size |
每条消息字节大小(比如 JT808 标准报文一般几十到几百字节) |
–throughput |
限流速率(条数/秒),-1表示不限速 |
–producer-props |
传入Kafka Producer的连接参数 |
✅ 输出结果:TPS、发送延迟、吞吐量等指标
2. 官方自带工具 – kafka-consumer-perf-test.sh
基础使用示例
./kafka-consumer-perf-test.sh \\
–topic test-topic \\
–bootstrap-server localhost:9092 \\
–messages 1000000
–topic |
要消费的Topic |
–messages |
要消费的消息条数 |
–bootstrap-server |
Kafka地址 |
✅ 输出结果:消费TPS、平均延迟、吞吐量
🚀 高阶压测思路
单机单Topic最大吞吐 |
producer-perf-test.sh |
配置大批量数据、-1不限速 |
多分区压测 |
producer-perf-test.sh |
多开进程,发送到不同分区 |
压测集群消费能力 |
consumer-perf-test.sh |
配合group.id并发消费 |
网络/磁盘瓶颈排查 |
producer/consumer + Linux iostat、sar |
观察磁盘/网络IO |
端到端延迟压测 |
自定义带时间戳的Payload |
生产+消费后计算RTT |
🔥 高阶技巧:配合这些一起压测更稳
-
调大 Producer batch.size、linger.ms → 批量发
-
Broker socket.request.max.bytes调大 → 防止大批次失败
-
保证磁盘I/O够快(SSD最佳)
-
JVM 参数优化(-Xms -Xmx固定堆大小)
-
配置 Topic 分区数、Replication Factor合理分摊压力
📈 实战Tips总结
-
小消息(如JT808位置上报)→ 高TPS 压测重点
-
大消息(报警/多媒体)→ 吞吐量/延迟 双压测
-
消费端一定要压 → 避免只测发送忽略消费瓶颈!
评论前必须登录!
注册