0%

RPC

RPC(Remote Procedure Call)远程过程调用,简单的理解是一个节点请求另一个节点提供的服务

背景

  • 单一应用架构
    当网站流量很小时,只需一个应用,将所有功能都部署在一起,以减少部署节点和成本。此时,用于简化增删改查工作量的数据访问框架(ORM)是关键。
  • 垂直应用架构
    当访问量逐渐增大,单一应用增加机器带来的加速度越来越小,提升效率的方法之一是将应用拆成互不相干的几个应用,以提升效率。此时,用于加速前端页面开发的Web框架(MVC)是关键。
  • 分布式服务架构
    当垂直应用越来越多,应用之间交互不可避免,将核心业务抽取出来,作为独立的服务,逐渐形成稳定的服务中心,使前端应用能更快速的响应多变的市场需求。此时,用于提高业务复用及整合的分布式服务框架(RPC)是关键。
  • 流动计算架构
    当服务越来越多,容量的评估,小服务资源的浪费等问题逐渐显现,此时需增加一个调度中心基于访问压力实时管理集群容量,提高集群利用率。此时,用于提高机器利用率的资源调度和治理中心(SOA)是关键。

基本原理

客户端(Client):服务的调用方。
服务端(Server):真正的服务提供者。
客户端存根:存放服务端的地址消息,再将客户端的请求参数打包成网络消息,然后通过网络远程发送给服务方。
服务端存根:接收客户端发送过来的消息,将消息解包,并调用本地的方法。

过程:

  1. 要解决寻址的问题,也就是说,A服务器上的应用怎么告诉底层的RPC框架,如何连接到B服务器(如主机或IP地址)以及特定的端口,方法的名称名称是什么,这样才能完成调用。比如基于Web服务协议栈的RPC,就要提供一个endpoint URI,或者是从UDDI服务上查找。如果是RMI调用的话,还需要一个RMI Registry来注册服务的地址。
  2. 要解决通讯的问题,主要是通过在客户端和服务器之间建立TCP连接,远程过程调用的所有交换的数据都在这个连接里传输。连接可以是按需连接,调用结束后就断掉,也可以是长连接,多个远程过程调用共享同一个连接。
  3. 当A服务器上的应用发起远程过程调用时,方法的参数需要通过底层的网络协议如TCP传递到B服务器,由于网络协议是基于二进制的,内存中的参数的值要序列化成二进制的形式,也就是序列化(Serialize)或编组(marshal),通过寻址和传输将序列化的二进制发送给B服务器。
  4. B服务器收到请求后,需要对参数进行反序列化(序列化的逆操作),恢复为内存中的表达方式,然后找到对应的方法(寻址的一部分)进行本地调用,然后得到返回值。
  5. 返回值还要发送回服务器A上的应用,也要经过序列化的方式发送,服务器A接到后,再反序列化,恢复为内存中的表达方式,交给A服务器上的应用

如何分析一个RPC框架

  1. 服务治理
  2. 编码协议(IDL)
  3. 传输(通信)协议
  4. 线程模型

RPC框架

Dubbo

架构

Dubbo 架构具有连通性、健壮性、伸缩性、以及向未来架构的升级性几个特点。

  • Provider为服务提供方,提供 Java 服务接口的实现,并将其元信息注册到 Dubbo 注册中心(过程 1.register 所示)
  • Consumer为服务消费端,从 Dubbo 注册中心检索订阅的 Java 服务接口的元信息(过程 2.subscribe 所示),通过框架处理后,生成代理程序执行远程方法调用(过程 4.invoke 所示)
  • Registry为注册中心,属于注册元信息中心化基础设施(如 Apache Zookeeper 或 Alibaba Nacos),为 Provider 提供注册通道,为 Cosumer 提供订阅渠道。同时,注册中心支持注册元信息变更通知,通知 Consumer 上游 Provider 节点的变化(如扩容或缩容)。而注册元信息均以 Dubbo URL 的形式存储
  • Monitor为服务治理平台,提供开发和运维人员服务查询、路由规则、服务 Mock 和测试等治理能力

传输协议:TCP+基于长链接的NIO框架
编码协议:定制的Hessian2框架/PB/HTTP等
线程模型:Reactor

Dubbo解决的问题

  • 当服务越来越多时,服务 URL 配置管理变得非常困难,F5 硬件负载均衡器的单点压力也越来越大。此时需要一个服务注册中心,动态地注册和发现服务,使服务的位置透明。并通过在消费方获取服务提供方地址列表,实现软负载均衡和 Failover,降低对 F5 硬件负载均衡器的依赖,也能减少部分成本。
  • 服务间依赖关系变得错踪复杂,甚至分不清哪个应用要在哪个应用之前启动,架构师都不能完整的描述应用的架构关系。这时,需要自动画出应用间的依赖关系图,以帮助架构师理清关系。
  • 服务的调用量越来越大,服务的容量问题就暴露出来,这个服务需要多少机器支撑?什么时候该加机器?为了解决这些问题,第一步,要将服务现在每天的调用量,响应时间,都统计出来,作为容量规划的参考指标。其次,要可以动态调整权重,在线上,将某台机器的权重一直加大,并在加大的过程中记录响应时间的变化,直到响应时间到达阈值,记录此时的访问量,再以此访问量乘以机器数反推总容量。

Spring Cloud

Spring Cloud为开发人员提供了快速构建分布式系统中的一些通用模式(例如配置管理,服务发现,断路器,智能路由,微代理,控制总线,一次性令牌,全局锁,领导选举,分布式 会话,群集状态)。 分布式系统的协调引出样板模式(boiler plate patterns),并且使用Spring Cloud开发人员可以快速地实现这些模式来启动服务和应用程序。

交互流程

Spring Cloud 微服务架构是由多个组件一起组成的,各个组件的交互流程如下。

  • 请求统一通过 API 网关 Zuul 来访问内部服务,先经过 Token 进行安全认证。
  • 通过安全认证后,网关 Zuul 从注册中心 Eureka 获取可用服务节点列表。
  • 从可用服务节点中选取一个可用节点,然后把请求分发到这个节点。
  • 整个请求过程中,Hystrix 组件负责处理服务超时熔断,Turbine 组件负责监控服务间的调用和熔断相关指标,Sleuth 组件负责调用链监控,ELK 负责日志分析。

优点

  • 社区活跃
  • 标准化的将微服务的成熟产品和框架结合一起,Spring Cloud 提供整套的微服务解决方案,开发成本较低,且风险较小。
  • 基于 Spring Boot,具有简单配置、快速开发、轻松部署、方便测试的特点。
  • 支持 REST 服务调用,相比于 RPC,更加轻量化和灵活(服务之间只依赖一纸契约,不存在代码级别的强依赖),有利于跨语言服务的实现,以及服务的发布部署。另外,结合 Swagger,也使得服务的文档一体化。

对比Dubbo

dubbo&sc

gRPC

它的原理是通过 IDL(Interface Definition Language)文件定义服务接口的参数和返回值类型,然后通过代码生成程序生成服务端和客户端的具体实现代码,这样在 gRPC 里,客户端应用可以像调用本地对象一样调用另一台服务器上对应的方法。

它的主要特性包括三个方面。

  • 通信协议采用了 HTTP/2,因为 HTTP/2 提供了连接复用、双向流、服务器推送、请求优先级、首部压缩等机制。Netty 4.1 提供了 HTTP/2 底层协议栈,通过 Http2ConnectionHandler 及其依赖的其它类库,实现了 HTTP/2 消息的统一接入和处理。
  • IDL 使用了ProtoBuf,ProtoBuf 是由 Google 开发的一种数据序列化协议,它的压缩和传输效率极高,语法也简单
  • 多语言支持,能够基于多种语言自动生成对应语言的客户端和服务端的代码。

Thrift

Thrift 是一种轻量级的跨语言 RPC 通信方案,支持多达 25 种编程语言。为了支持多种语言,跟 gRPC 一样,Thrift 也有一套自己的接口定义语言 IDL,可以通过代码生成器,生成各种编程语言的 Client 端和 Server 端的 SDK 代码,这样就保证了不同语言之间可以相互通信。它的架构图可以用下图来描述。

网络栈结构

TProtocol层
支持多种序列化格式:如 Binary、Compact、JSON、Thrift 等。

TTransport层
支持多种通信方式:如 Socket、Framed、File、Memory、zlib 等。

Service模型
服务端支持多种处理方式:如 Simple 、Thread Pool、Non-Blocking 等。

  • TSimpleServer: 简单的单线程服务模型,常用于测试;
  • TThreadPoolServer: 多线程服务模型,使用标准的阻塞式IO;
  • TNonBlockingServer: 多线程服务模型,使用非阻塞式IO(需要使用TFramedTransport数据传输方式);
  • THsHaServer: THsHa引入了线程池去处理,其模型读写任务放到线程池去处理,Half-sync/Half-async处理模式,Half-async是在处理IO事件上(accept/read/write io),Half-sync用于handler对rpc的同步处理;

对比

框架 语言 服务治理 多种序列化 注册中心 管理中心 跨语言
Dubbo Java 支持 支持 支持 支持 不支持
Spring Cloud Java 支持 支持 支持 支持 不支持
gRPC 跨语言 不支持 only pb 不支持 不支持 支持
Thrift 跨语言 不支持 only thrift 不支持 不支持 支持

RPC中的网络传输与线程模型

基础网络模型

Linux IO模式及 select、poll、epoll详解

几种IO模型

  • BIO,同步阻塞IO,阻塞整个步骤,如果连接少,他的延迟是最低的,因为一个线程只处理一个连接,适用于少连接且延迟低的场景,比如说数据库连接。
  • NIO,同步非阻塞IO,阻塞业务处理但不阻塞数据接收,适用于高并发且处理简单的场景,比如聊天软件。
  • 多路复用IO,他的两个步骤处理是分开的,也就是说,一个连接可能他的数据接收是线程a完成的,数据处理是线程b完成的,他比BIO能处理更多请求。
  • 信号驱动IO,这种IO模型主要用在嵌入式开发,不参与讨论。
  • 异步IO,他的数据请求和数据处理都是异步的,数据请求一次返回一次,适用于长连接的业务场景。

NIO

当用户进程发出read操作时,如果kernel中的数据还没有准备好,那么它并不会block用户进程,而是立刻返回一个error。从用户进程角度讲 ,它发起一个read操作后,并不需要等待,而是马上就得到了一个结果。用户进程判断结果是一个error时,它就知道数据还没有准备好,于是它可以再次发送read操作。一旦kernel中的数据准备好了,并且又再次收到了用户进程的system call,那么它马上就将数据拷贝到了用户内存,然后返回。

多路复用

IO multiplexing就是我们说的select,poll,epoll,有些地方也称这种IO方式为event driven IO。select/epoll的好处就在于单个process就可以同时处理多个网络连接的IO。它的基本原理就是select,poll,epoll这个function会不断的轮询所负责的所有socket,当某个socket有数据到达了,就通知用户进程。

Netty

概述

Netty采用典型的三层网络架构进行开发和设计,主要涵盖Reactor通信调度层,责任链ChannelPipeline和业务逻辑编排层(Service ChannelHandler)。

Reactor通信调度层:该层主要包含NioSocketChannel(客户端异步非阻塞通道)/NioServerSocketChannel(服务端异步非阻塞通道),Eventloop,ByteBuffer和Task。该层的主要职责是监听网络的读写和连接操作,负责将网络层的数据读取到内存缓冲区中,然后出发各种网络事件,例如连接,读/写等事件,将这些事件出发到pipeline中,由pipeline管理的职责链来进行后续处理。

职责链ChannelPipeline:它负责事件在职责链中的有序传播,同时负责动态地编排职责链。不同应用的Handler 节点的功能也不同,通常情况下,往往会开发编解码Hanlder 用于消息的编解码,它可以将外部的协议消息转换成内部的POJO 对象,这样上层业务则只需要关心处理业务逻辑即可,不需要感知底层的协议差异和线程模型差异,实现了架构层面的分层隔离。

业务逻辑编排层:业务逻辑编排层通常有两类:一类是纯粹的业务逻辑编排,还有一类是其他的应用层协议插件,用于特定协议相关的会话和链路管理。例如CMPP 协议,用于管理和中国移动短信系统的对接。

Reactor通信调度层

该层主要包含NioSocketChannel(客户端异步非阻塞通道)/NioServerSocketChannel(服务端异步非阻塞通道),Eventloop,ByteBuffer和Task。该层的主要职责是监听网络的读写和连接操作,负责将网络层的数据读取到内存缓冲区中,然后出发各种网络事件,例如连接,读/写等事件,将这些事件出发到pipeline中,由pipeline管理的职责链来进行后续处理。

  1. Reactor主线程 MainReactor 对象通过select 监听连接事件, 收到事件后,通过Acceptor 处理连接事件
  2. 当 Acceptor 处理连接事件后,MainReactor 将连接分配给SubReactor
  3. subReactor 将连接加入到连接队列进行监听,并创建handler进行各种事件处理
  4. 当有新事件发生时, subreactor 就会调用对应的handler处理
  5. handler 通过read 读取数据,分发给后面的worker 线程处理
  6. worker 线程池分配独立的worker 线程进行业务处理,并返回结果
  7. handler 收到响应的结果后,再通过send 将结果返回给client
  8. Reactor 主线程可以对应多个Reactor 子线程, 即MainRecator 可以关联多个SubReactor

Netty Reactor

Channel

Netty 网络通信的组件,能够用于执行网络 I/O 操作。Channel 为用户提供:

  • 当前网络连接的通道的状态(例如是否打开?是否已连接?)
  • 网络连接的配置参数 (例如接收缓冲区大小)
  • 提供异步的网络 I/O 操作(如建立连接,读写,绑定端口),异步调用意味着任何 I/O 调用都将立即返回,并且不保证在调用结束时所请求的 I/O 操作已完成。
  • 调用立即返回一个 ChannelFuture 实例,通过注册监听器到 ChannelFuture 上,可以 I/O 操作成功、失败或取消时回调通知调用方。
  • 支持关联 I/O 操作与对应的处理程序。

不同协议、不同的阻塞类型的连接都有不同的 Channel 类型与之对应。下面是一些常用的 Channel 类型:

  • NioSocketChannel,异步的客户端 TCP Socket 连接。
  • NioServerSocketChannel,异步的服务器端 TCP Socket 连接。
  • NioDatagramChannel,异步的 UDP 连接。
  • NioSctpChannel,异步的客户端 Sctp 连接。
  • NioSctpServerChannel,异步的 Sctp 服务器端连接,这些通道涵盖了 UDP 和 TCP 网络 IO 以及文件 IO。

ChannelPipeline

Netty将Channel的数据管道抽象为ChannelPipeline,消息在ChannelPipline中流动和传递。ChannelPipeline持有I/O事件拦截器ChannelHandler的双向链表,由ChannelHandler对I/O事件进行拦截和处理,可以方便的新增和删除ChannelHandler来实现不同的业务逻辑定制,不需要对已有的ChannelHandler进行修改,能够实现对修改封闭和对扩展的支持

ChannelHandler

它是一个接口,用于处理I/O事件或拦截I/O事件,并将其转发给对应的channelPipeline中的下一个处理程序。

ChannelHandler 本身并没有提供很多方法,因为这个接口有许多的方法需要实现,方便使用期间,可以继承它的子类:

ChannelInboundHandler 用于处理入站 I/O 事件。

ChannelOutboundHandler 用于处理出站 I/O 操作。

NioEventLoop

NioEventLoop 中维护了一个线程和任务队列,支持异步提交执行任务,线程启动时会调用 NioEventLoop 的 run 方法,执行 I/O 任务和非 I/O 任务:

  • I/O 任务,即 selectionKey 中 ready 的事件,如 accept、connect、read、write 等,由 processSelectedKeys 方法触发。
  • 非 IO 任务,添加到 taskQueue 中的任务,如 register0、bind0 等任务,由 runAllTasks 方法触发。

两种任务的执行时间比由变量 ioRatio 控制,默认为 50,则表示允许非 IO 任务执行的时间与 IO 任务的执行时间相等。

特点

  • 零拷贝
  • 数据从内存发到网络中,存在两次拷贝,先是从用户空间拷贝到内核空间,再从内核空间拷贝到网络IO
  • NIO提供的ByteBuffer可以使用Direct Buffer模式
  • 直接开辟一个非堆物理内存,不需要进行字节缓冲区的二次拷贝,可以直接将数据写入到内核空间
  • 可扩展性
    基于Netty的基础NIO框架,可以方便地进行应用层协议定制,例如HTTP协议栈、Thrift协议栈、FTP协议栈等。这些扩展不需要修改Netty的源码,直接基于Netty的二进制类库即可实现协议的扩展和定制。
  • 高可靠
    路有效性检测,内存保护机制,优雅停机

RPC性能优化

  1. 客户端 获取到 目标接口的 client stub
  2. 客户端 调用目标方法
  3. 客户端 获取到 请求方法 和 请求数据
  4. 客户端 把 请求方法 和 请求数据 序列化为 传输数据
  5. 进行网络传输
  6. 服务端 获取到 传输数据
  7. 服务端 反序列化获取到 请求方法 和 请求数据
  8. 服务端 获取到 Invoker
  9. 服务端 调用 具体实现 获取到 响应结果
  10. 服务端 把 响应结果 序列化为 传输数据
  11. 进行网络传输
  12. 客户端 接收到 传输数据
  13. 客户端 反序列化获取到 响应结果
  14. 客户端 返回 响应结果

整个流程中对性能影响比较大的环节有:序列化[4, 7, 10, 13],方法调用[2, 3, 8, 9, 14],网络传输[5, 6, 11, 12]

RPC性能取决于

  1. IO模型。优化点:结合多路复用器(select,epoll)实现非阻塞IO。
  2. 线程模型。优化点:使用Reactor线程模型
  3. 序列化方式。优化点:pb,thrift或者自定义协议
  4. 池化技术,减少对象创建和销毁
  5. 使用Direct Buffer
  6. 定制报文。优化点:不使用HTTP,使应用层的头尽可能小
  7. 连接优化。优化点:在合适的地方使用长链接

kRPC

https://wiki.corp.kuaishou.com/pages/viewpage.action?pageId=351636938

Kafka

拓扑结构

ef35529683d3fe948250b8a4985450cb.png

  1. producer:
      消息生产者,发布消息到 kafka 集群的终端或服务。
  2. broker:
      kafka 集群中包含的服务器。
  3. topic:
      每条发布到 kafka 集群的消息属于的类别,即 kafka 是面向 topic 的。
  4. partition:
      partition 是物理上的概念,每个 topic 包含一个或多个 partition。kafka 分配的单位是 partition。
  5. consumer:
      从 kafka 集群中消费消息的终端或服务。
  6. Consumer group:
      high-level consumer API 中,每个 consumer 都属于一个 consumer group,每条消息只能被 consumer group 中的一个 Consumer 消费,但可以被多个 consumer group 消费。
  7. replica:
      partition 的副本,保障 partition 的高可用。
  8. leader:
      replica 中的一个角色, producer 和 consumer 只跟 leader 交互。
  9. follower:
      replica 中的一个角色,从 leader 中复制数据。
  10. controller:
      kafka 集群中的其中一个服务器,用来进行 leader election 以及 各种 failover。
  11. zookeeper:
      kafka 通过 zookeeper 来存储集群的 meta 信息。

zookeeper节点

eaec86120c4f10c45efdb6ca8d5b760b.png

producer

写入方式:

producer 采用 push 模式将消息发布到 broker,每条消息都被 append 到 patition 中,属于顺序写磁盘(顺序写磁盘效率比随机写内存要高,保障 kafka 吞吐率)。

分区路由:

producer 发送消息到 broker 时,会根据分区算法选择将其存储到哪一个 partition。其路由机制为:

  1. 指定了 patition,则直接使用;
  2. 未指定 patition 但指定 key,通过对 key 的 value 进行hash 选出一个 patition
  3. patition 和 key 都未指定,使用轮询选出一个 patition。

写入流程:

  1. producer 先从 zookeeper 的 “/brokers/…/state” 节点找到该 partition 的 leader
  2. producer 将消息发送给该 leader
  3. leader 将消息写入本地 log
  4. followers 从 leader pull 消息,写入本地 log 后 leader 发送 ACK
  5. leader 收到所有 ISR 中的 replica 的 ACK 后,增加 HW(high watermark,最后 commit 的 offset) 并向 producer 发送 ACK

producer delivery guarantee

一般情况下存在三种情况:

  1. At most once 消息可能会丢,但绝不会重复传输
  2. At least one 消息绝不会丢,但可能会重复传输
  3. Exactly once 每条消息肯定会被传输一次且仅传输一次

broker

存储方式:

物理上把 topic 分成一个或多个 patition(对应 server.properties 中的 num.partitions=3 配置),每个 patition 物理上对应一个文件夹(该文件夹存储该 patition 的所有消息和索引文件)

存储策略

无论消息是否被消费,kafka 都会保留所有消息。有两种策略可以删除旧数据:

  1. 基于时间:log.retention.hours=168
  2. 基于大小:log.retention.bytes=1073741824

HA

replication

同一个 partition 可能会有多个 replica(对应 server.properties 配置中的 default.replication.factor=N)。没有 replica 的情况下,一旦 broker 宕机,其上所有 patition 的数据都不可被消费,同时 producer 也不能再将数据存于其上的 patition。引入replication 之后,同一个 partition 可能会有多个 replica,而这时需要在这些 replica 之间选出一个 leader,producer 和 consumer 只与这个 leader 交互,其它 replica 作为 follower 从 leader 中复制数据。

Kafka 分配 Replica 的算法如下:

  1. 将所有 broker(假设共 n 个 broker)和待分配的 partition 排序
  2. 将第 i 个 partition 分配到第(i mod n)个 broker 上
  3. 将第 i 个 partition 的第 j 个 replica 分配到第((i + j) mode n)个 broker上

leader failover

当 partition 对应的 leader 宕机时,需要从 follower 中选举出新 leader。在选举新leader时,一个基本的原则是,新的 leader 必须拥有旧 leader commit 过的所有消息。

kafka 在 zookeeper 中(/brokers/…/state)动态维护了一个 ISR(in-sync replicas),由3.3节的写入流程可知 ISR 里面的所有 replica 都跟上了 leader,只有 ISR 里面的成员才能选为 leader。对于 f+1 个 replica,一个 partition 可以在容忍 f 个 replica 失效的情况下保证消息不丢失。

当所有 replica 都不工作时,有两种可行的方案:

  1. 等待 ISR 中的任一个 replica 活过来,并选它作为 leader。可保障数据不丢失,但时间可能相对较长。
  2. 选择第一个活过来的 replica(不一定是 ISR 成员)作为 leader。无法保障数据不丢失,但相对不可用时间较短。

broker failover

  1. controller 在 zookeeper 的 /brokers/ids/[brokerId] 节点注册 Watcher,当 broker 宕机时 zookeeper 会 fire watch
  2. controller 从 /brokers/ids 节点读取可用broker
  3. controller决定set_p,该集合包含宕机 broker 上的所有 partition
  4. 对 set_p 中的每一个 partition
    4.1 从/brokers/topics/[topic]/partitions/[partition]/state 节点读取 ISR
    4.2 决定新 leader(如4.3节所描述)
    4.3 将新 leader、ISR、controller_epoch 和 leader_epoch 等信息写入 state 节点
  5. 通过 RPC 向相关 broker 发送 leaderAndISRRequest 命令

controller failover

当 controller 宕机时会触发 controller failover。每个 broker 都会在 zookeeper 的 “/controller” 节点注册 watcher,当 controller 宕机时 zookeeper 中的临时节点消失,所有存活的 broker 收到 fire 的通知,每个 broker 都尝试创建新的 controller path,只有一个竞选成功并当选为 controller。

当新的 controller 当选时,会触发 KafkaController.onControllerFailover 方法,在该方法中完成如下操作

  1. 读取并增加 Controller Epoch。
  2. 在 reassignedPartitions Patch(/admin/reassign_partitions) 上注册 watcher。
  3. 在 preferredReplicaElection Path(/admin/preferred_replica_election) 上注册 watcher。
  4. 通过 partitionStateMachine 在 broker Topics Patch(/brokers/topics) 上注册 watcher。
  5. 若 delete.topic.enable=true(默认值是 false),则 partitionStateMachine 在 Delete Topic Patch(/admin/delete_topics) 上注册 watcher。
  6. 通过 replicaStateMachine在 Broker Ids Patch(/brokers/ids)上注册Watch。
  7. 初始化 ControllerContext 对象,设置当前所有 topic,“活”着的 broker 列表,所有 partition 的 leader 及 ISR等。
  8. 启动 replicaStateMachine 和 partitionStateMachine。
  9. 将 brokerState 状态设置为 RunningAsController。
  10. 将每个 partition 的 Leadership 信息发送给所有“活”着的 broker。
  11. 若 auto.leader.rebalance.enable=true(默认值是true),则启动 partition-rebalance 线程。
  12. 若 delete.topic.enable=true 且Delete Topic Patch(/admin/delete_topics)中有值,则删除相应的Topic。

consumer

The high-level consumer API

high-level consumer API 提供了 consumer group 的语义,一个消息只能被 group 内的一个 consumer 所消费,且 consumer 消费消息时不关注 offset,最后一个 offset 由 zookeeper 保存。

使用 high-level consumer API 可以是多线程的应用,应当注意:

  1. 如果消费线程大于 patition 数量,则有些线程将收不到消息
  2. 如果 patition 数量大于线程数,则有些线程多收到多个 patition 的消息
  3. 如果一个线程消费多个 patition,则无法保证你收到的消息的顺序,而一个 patition 内的消息是有序的

The SimpleConsumer API

如果你想要对 patition 有更多的控制权,那就应该使用 SimpleConsumer API,比如:

  1. 多次读取一个消息
  2. 只消费一个 patition 中的部分消息
  3. 使用事务来保证一个消息仅被消费一次

但是使用此 API 时,partition、offset、broker、leader 等对你不再透明,需要自己去管理。你需要做大量的额外工作:

  1. 必须在应用程序中跟踪 offset,从而确定下一条应该消费哪条消息
  2. 应用程序需要通过程序获知每个 Partition 的 leader 是谁
  3. 需要处理 leader 的变更

consumer group

kafka 的分配单位是 patition。每个 consumer 都属于一个 group,一个 partition 只能被同一个 group 内的一个 consumer 所消费(也就保障了一个消息只能被 group 内的一个 consuemr 所消费),但是多个 group 可以同时消费这个 partition。

kafka 的设计目标之一就是同时实现离线处理和实时处理,根据这一特性,可以使用 spark/Storm 这些实时处理系统对消息在线处理,同时使用 Hadoop 批处理系统进行离线处理,还可以将数据备份到另一个数据中心,只需要保证这三者属于不同的 consumer group。

消费方式

consumer 采用 pull 模式从 broker 中读取数据。

push 模式很难适应消费速率不同的消费者,因为消息发送速率是由 broker 决定的。它的目标是尽可能以最快速度传递消息,但是这样很容易造成 consumer 来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而 pull 模式则可以根据 consumer 的消费能力以适当的速率消费消息。

对于 Kafka 而言,pull 模式更合适,它可简化 broker 的设计,consumer 可自主控制消费消息的速率,同时 consumer 可以自己控制消费方式——即可批量消费也可逐条消费,同时还能选择不同的提交方式从而实现不同的传输语义。

consumer delivery guarantee

如果将 consumer 设置为 autocommit,consumer 一旦读到数据立即自动 commit。如果只讨论这一读取消息的过程,那 Kafka 确保了 Exactly once。

但实际使用中应用程序并非在 consumer 读取完数据就结束了,而是要进行进一步处理,而数据处理与 commit 的顺序在很大程度上决定了consumer delivery guarantee:

  1. 读完消息先 commit 再处理消息。
    这种模式下,如果 consumer 在 commit 后还没来得及处理消息就 crash 了,下次重新开始工作后就无法读到刚刚已提交而未处理的消息,这就对应于 At most once
  2. 读完消息先处理再 commit。
    这种模式下,如果在处理完消息之后 commit 之前 consumer crash 了,下次重新开始工作时还会处理刚刚未 commit 的消息,实际上该消息已经被处理过了。这就对应于 At least once。
  3. 如果一定要做到 Exactly once,就需要协调 offset 和实际操作的输出。
    精典的做法是引入两阶段提交。如果能让 offset 和操作输入存在同一个地方,会更简洁和通用。这种方式可能更好,因为许多输出系统可能不支持两阶段提交。比如,consumer 拿到数据后可能把数据放到 HDFS,如果把最新的 offset 和数据本身一起写到 HDFS,那就可以保证数据的输出和 offset 的更新要么都完成,要么都不完成,间接实现 Exactly once。(目前就 high-level API而言,offset 是存于Zookeeper 中的,无法存于HDFS,而SimpleConsuemr API的 offset 是由自己去维护的,可以将之存于 HDFS 中)

总之,Kafka 默认保证 At least once,并且允许通过设置 producer 异步提交来实现 At most once

consumer rebalance

当有 consumer 加入或退出、以及 partition 的改变(如 broker 加入或退出)时会触发 rebalance。consumer rebalance算法如下:

  1. 将目标 topic 下的所有 partirtion 排序,存于PT
  2. 对某 consumer group 下所有 consumer 排序,存于 CG,第 i 个consumer 记为 Ci
  3. N=size(PT)/size(CG),向上取整
  4. 解除 Ci 对原来分配的 partition 的消费权(i从0开始)
  5. 将第iN到(i+1)N-1个 partition 分配给 Ci

在 0.8.*版本,每个 consumer 都只负责调整自己所消费的 partition,为了保证整个consumer group 的一致性,当一个 consumer 触发了 rebalance 时,该 consumer group 内的其它所有其它 consumer 也应该同时触发 rebalance。这会导致以下几个问题:

  1. Herd effect
      任何 broker 或者 consumer 的增减都会触发所有的 consumer 的 rebalance
  2. Split Brain
      每个 consumer 分别单独通过 zookeeper 判断哪些 broker 和 consumer 宕机了,那么不同 consumer 在同一时刻从 zookeeper 看到的 view 就可能不一样,这是由 zookeeper 的特性决定的,这就会造成不正确的 reblance 尝试。
  3. 调整结果不可控
      所有的 consumer 都并不知道其它 consumer 的 rebalance 是否成功,这可能会导致 kafka 工作在一个不正确的状态。

基于以上问题,kafka 设计者考虑在0.9.*版本开始使用中心 coordinator 来控制 consumer rebalance,然后又从简便性和验证要求两方面考虑,计划在 consumer 客户端实现分配方案。

nsq区别

核心区别:

  1. kafka是pull,nsq是push
  2. nsq不固化数据

丢消息

刨根问底,kafka 到底会不会丢消息

生产者丢失消息

Kafka 通过配置 request.required.acks 属性来确认消息的生产:

  • 0 表示不进行消息接收是否成功的确认;不能保证消息是否发送成功,生成环境基本不会用。
  • 1 表示当 Leader 接收成功时确认;只要 Leader 存活就可以保证不丢失,保证了吞吐量。
  • -1 或者 all 表示 Leader 和 Follower 都接收成功时确认;可以最大限度保证消息不丢失,但是吞吐量低。

kafka producer 的参数 acks 的默认值为 1,所以默认的 producer 级别是 at least once,并不能 exactly once。

  • 如果 acks 配置为 0,发生网络抖动消息丢了,生产者不校验 ACK 自然就不知道丢了。
  • 如果 acks 配置为 1 保证 leader 不丢,但是如果 leader 挂了,恰好选了一个没有 ACK 的 follower,那也丢了。
  • all:保证 leader 和 follower 不丢,但是如果网络拥塞,没有收到 ACK,会有重复发的问题。

Kafka Broker 丢失消息

操作系统本身有一层缓存,叫做 Page Cache,当往磁盘文件写入的时候,系统会先将数据流写入缓存中,至于什么时候将缓存的数据写入文件中是由操作系统自行决定。

Kafka 提供了一个参数 producer.type 来控制是不是主动 flush,如果 Kafka 写入到 mmap 之后就立即 flush 然后再返回 Producer 叫同步 (sync);写入 mmap 之后立即返回 Producer 不调用 flush 叫异步 (async)。

Kafka 通过多分区多副本机制中已经能最大限度保证数据不会丢失,如果数据已经写入系统 cache 中但是还没来得及刷入磁盘,此时突然机器宕机或者掉电那就丢了,当然这种情况很极端。

消费者丢失消息

  • 先 commit 再处理消息。如果在处理消息的时候异常了,但是 offset 已经提交了,这条消息对于该消费者来说就是丢失了,再也不会消费到了。
  • 先处理消息再 commit。如果在 commit 之前发生异常,下次还会消费到该消息,重复消费的问题可以通过业务保证消息幂等性来解决。

RocketMQ

持久化 & 多副本

RocketMQ默认配置为 3副本、异步复制、异步刷盘,类比kafka的的replica_factor=3, acks=leader;在这种配置之下,可靠性99.99%,可用性99.95%

同时可以针对特殊场景,提供同步刷盘、同步复制的集群,提供更高的可靠性,在6副本、同步复制、同步刷盘的配置下,消息可靠性可达9个9。相应的,写入延时会升高,qps、availability会降低

RocketMQ的消息默认保留48小时,由于存储模型的关系,这个配置是集群级别的,不像kafka是topic级别的。所以如果有特殊的需求,请联系@沈辉 视情况单独搭建集群

消息重试(Requeue)

kafka设计之初主要作为 log (特指类似binlog这种WAL)的传输总线,同一个partition内的有序是默认需求,这意味着消费失败的消息是不能重入的,否则消息的顺序性就被破坏了;用户需要自己打日志或者转储来处理消费失败的消息。

而在MQ领域,大多数场景下其实并不需要严格的顺序,对于消费失败的消息(可能是消费者下游短时间不可用、load高等),这种情况下用户希望消息能够重新投递到MQ,可以不用额外写代码处理错误的消息。RocketMQ 在非有序消费模式下,支持消息重试,类比NSQ的requeue机制,并且重试的消息具有和普通消息一样的持久化支持。

死信队列 (Dead Letter Queue)

当消息多次重试仍然消费失败,这种情况多半是消费者逻辑有问题,比如对于消息的某些字段没有兼容等;需要有地方能够转储这一批消息。RocketMQ提供了死信队列,消费者可以指定消息最大重试次数,当消息重试超过该次数,消息将会发往死信队列。待消费者的问题解决之后,可以从死信队列拉取这些消息统一处理。

延时消息

一些场景下,生产者发送消息成功后,希望delay一段时间消息再投递给消费者,比如创建一个订单之后30分钟内未支付则取消订单的场景。

NSQ支持ms级别精度的延时消息,其实现为内存里的一个priority queue,没有持久化;kafka不支持延时消息;当前版本的RocketMQ支持分级的时延,比如支持10s、30s、1min、5min、10min、15min、20min、30min等多个级别时延,足够覆盖大部分场景,并且有持久化。

需要任意时延的延迟消息用户,见: 延时消息用户文档

写入延迟保障

RocketMQ和kafka都是顺序写盘,充分利用page cache以获得低时延和吞吐。kafka的存储模型为每个partition都由若干segment(物理文件)组成的逻辑文件,当kafka集群上的topic/partition数量多了之后,kafka的顺序写可能会逐渐退化到随机写,写入延迟上涨;只能拆分集群,减少单集群上topic/partition数量。

而RocketMQ的存储模型与kafka不同,所有消息都写到一个CommitLog中即返回,再异步将offset、length信息dispatch到各Consume Queue中,所以是严格的顺序写,在topic数量很多的时候,page fault仍然维持在低水平,写入时延较为稳定。

在异步复制、异步刷盘下,写入延时的avg为10ms,pct99为20ms

基于时间回溯

比如30分钟前消费者下游故障,期间消费者全部异常,用户希望将消费进度回拨到30min前,达到”补”消息的效果。kafka需要集群版本在0.10以上才能支持,RocketMQ默认支持。

顺序/乱序消费

RocketMQ同时提供了顺序和乱序消费。 顺序的保障是,消息写到broker上的一个Consume Queue的顺序和消费者从该Consume Queue读取的顺序是一致的,多个producer并发写到一个Consume Queue的状况顺序没有保障,实际的顺序以broker收到的顺序为准。

一般有序消息都是同步发送的,也即msg n发送成功并且client收到写入成功的结果之后才会发送msg n+1,异步/并发的有序消息是个伪需求。

对于消息没有顺序要求的场景,可以使用乱序消费,并发度为消费者实例数 * 线程/协程数。

长轮询

长轮询通过客户端和服务端的配合,达到主动权在客户端,同时也能保证数据的实时性;长轮询本质上也是轮询,只不过对普通的轮询做了优化处理,服务端在没有数据的时候并不是马上返回数据,会hold住请求,等待服务端有数据,或者一直没有数据超时处理,然后一直循环下去;下面看一下如何简单实现一个长轮询;

暂不提供什么

  1. 广播消费(TBD)
    即同一个consumer group内的所有消费者每一个实例都能收到全量消息;可以通过每个消费者使用独立的Consumer group达到效果。
  2. 事务消息 (TBD)
    事务消息指:Producer发消息和Producer的其他操作(如写DB)形成事务,如果其他操作Commit,则消息Consumer可见;如果其他操作Rollback,则消息也不会投递给Consumer。
    原生的RocketMQ的java SDK支持事务消息,现阶段其他语言暂不支持,后续会支持。

vs Kafka

技术选型:RocketMQ or Kafka

(1) 适用场景

Kafka适合日志处理;

RocketMQ适合业务处理。

(2) 性能

Kafka单机写入 TPS 号称在百万条/秒;

RocketMQ 大约在10万条/秒。

结论:追求性能的话,Kafka单机性能更高。

(3) 可靠性

RocketMQ支持异步/同步刷盘;异步/同步Replication;

Kafka使用异步刷盘方式,异步Replication。

结论:RocketMQ所支持的同步方式提升了数据的可靠性。

(4) 实时性

均支持pull长轮询,RocketMQ消息实时性更好

结论:RocketMQ 胜出。

(5) 支持的队列数

Kafka单机超过64个队列/分区,消息发送性能降低严重;

RocketMQ 单机支持最高5万个队列,性能稳定

结论:长远来看,RocketMQ 胜出,这也是适合业务处理的原因之一

(6) 消息顺序性

Kafka 某些配置下,支持消息顺序,但是一台Broker宕机后,就会产生消息乱序;

RocketMQ支持严格的消息顺序,在顺序消息场景下,一台Broker宕机后,

发送消息会失败,但是不会乱序;

结论:RocketMQ 胜出

(7)消费失败重试机制

Kafka消费失败不支持重试

RocketMQ消费失败支持定时重试,每次重试间隔时间顺延。

(8)定时/延时消息

Kafka不支持定时消息;

RocketMQ支持定时消息

(9)分布式事务消息

Kafka不支持分布式事务消息;

阿里云ONS支持分布式定时消息,未来开源版本的RocketMQ也有计划支持分布式事务消息

(10)消息查询机制

Kafka不支持消息查询

RocketMQ支持根据Message Id查询消息,也支持根据消息内容查询消息

(11)消息回溯

Kafka理论上可以按照Offset来回溯消息

RocketMQ支持按照时间来回溯消息,精度毫秒,例如从一天之前的某时某分某秒开始重新消费消息

Java线程池实现原理及其在美团业务中的实践

参数详解

核心线程数:corePoolSize

线程池中活跃的线程数,即使它们是空闲的,除非设置了allowCoreThreadTimeOut为true。allowCoreThreadTimeOut的值是控制核心线程数是否在没有任务时是否停止活跃的线程,当它的值为true时,在线程池没有任务时,所有的工作线程都会停止。

最大线程数:maximumPoolSize

线程池所允许存在的最大线程数。

多余线程存活时长:keepAliveTime

线程池中除核心线程数之外的线程(多余线程)的最大存活时间,如果在这个时间范围内,多余线程没有任务需要执行,则多余线程就会停止。(注意:多余线程数 = 最大线程数 - 核心线程数)

时间单位:unit

多余线程存活时间的单位,可以是分钟、秒、毫秒等。

任务队列:workQueue

线程池的任务队列,使用线程池执行任务时,任务会先提交到这个队列中,然后工作线程取出任务进行执行,当这个队列满了,线程池就会执行拒绝策略。

线程工厂:threadFactory

创建线程池的工厂,线程池将使用这个工厂来创建线程池,自定义线程工厂需要实现ThreadFactory接口。

拒绝执行处理器(也称拒绝策略):handler

当线程池无空闲线程,并且任务队列已满,此时将线程池将使用这个处理器来处理新提交的任务。