记录一次rocket broker 注册nameserver超时的排查
前言
线上broker出现注册失败的情况,如图所示:
整个机器,只有这台机器到rocket.4这台Nameserver出现注册超时的情况。
基本信息:
出现问题的broker IP:192.168.11.26
注册超时的Nameserver IP:192.168.21.1(rocket.4)
Broker 在阿里云可用区B
Nameserver 在阿里云可用区E
broker ping nameserver 数据包的RTT大概需要1.7ms
排查过程
判断原因
从broker机器上抓包查看,注册消息从开始发到发完总共用时花了3s+,超过了3000ms,导致出现超时的情况。
筛选package bytes,“code”: 103, opaque为849859871,由于849859871是唯一的,我们可以直接搜
可以看到,直到nameserver到了3.183s+才回消息,远远超出了时间限制。
大致统计了一下,这个注册消息的数据包的大小大概在2MB左右。
按照道理来说不太应该,因为即便是1.7ms的延迟,按照1000Mbps的来算,传输2MB,可以在几百毫秒内完成。
为什么会超过3s呢….
仔细看一下看一下中间发包有什么特征:
然后在broker这台机器上用ss 看一下tcp的相关参数信息:
$ ss -it dst '192.168.21.1:9876'
State Recv-Q Send-Q Local Address:Port Peer Address:Port
ESTAB 0 65436 192.168.11.26:33968 192.168.21.1:sd
cubic wscale:0,2 rto:202 rtt:1.669/0.009 ato:40 mss:574 cwnd:10 bytes_acked:22105070559 bytes_received:966176 segs_out:38523231 segs_in:19356005 send 27.5Mbps lastsnd:1 lastrcv:26977 lastack:1 pacing_rate 55.0Mbps unacked:2 rcv_rtt:197959 rcv_space:29295
可以看到,mss为574,加上66bytes的tcp header,那么一次传输的段大小为640bytes,跟上面wireshark看到的Length是一样的。
还可以看到wireshark上显示192.168.21.1的Window大小为1148。
那么就可以得知,broker每次只能发给nameserver 2 * 574大小的tcp 数据,发完就要等待ack。
跟上面的wireshark显示的一样。
我写了一个一个不太严谨的计算方式:
耗时 = 数据的大小 / (574 * round(Window / MSS) ) * RTT
那么照这个样子计算一下,发完一个2079208 Bytes的数据需要多久的时间。
2079208bytes / (574 * 2) * 1.66ms = 3006.5202787456446ms
那么我个人的判断是因为Window设置的太小,导致的超时。
但是有一个比较奇怪的小问题,就是明明Window Full了,为什么wireshark没有提示出来标红呢,那是因为抓的是已经握手完之后的包,滑动窗口的初始长度在握手的时候会发出来,没有握手的包,不太好计算,所以可以找机会在握手之前抓包佐证上面判断。
确定原因
接着思考,到底是哪一方的Window Size设置的太小导致的瓶颈呢?
在这个场景下,根据抓包可以判断出,每次发送方发了两个包之后,要等到接收方(nameserver)的ack,导致发送方的数据包不能一直发,可以判断出是接收方(nameserver)的Window 太小导致的。
经过测试和搜索资料,发现接收端sock的SO_RCVBUF是可以影响到接收方的Window Size。
经过开发大佬的查看rocket的源代码,发现配置JVM参数可以改变这个大小:
-Dcom.rocketmq.remoting.socket.sndbuf.size=1048576 -Dcom.rocketmq.remoting.socket.rcvbuf.size=1048576
在修改了jvm参数之后,再次抓包,发现Window Size没有被改变。
这就很奇怪….让我一度怀疑我搜索的资料是不是有问题…
在这里卡了很久,就差去翻内核网络栈的代码了…
后面我想我自己写个go程序放到机器上测试一下SO_RCVBUF有没有用,代码如下:
package main
import (
"flag"
"log"
"net"
"syscall"
"time"
)
var ptype *string = flag.String("ftype", "server", "server or client")
var addr *string = flag.String("addr", "0.0.0.0:19991", "server listen addr or address that client to connect.")
var length int = 2 * 1024 * 1024
func Random2MBData() []byte {
data := make([]byte, length)
for i := 0; i < length; i++ {
data[i] = 0x66
}
return data
}
func process(conn net.Conn) {
if conn == nil {
return
}
defer conn.Close()
tc := conn.(*net.TCPConn)
fd, err := tc.File()
if err != nil {
panic(err)
}
sndBuf, err := syscall.GetsockoptInt(int(fd.Fd()), syscall.SOL_SOCKET, syscall.SO_SNDBUF)
if err != nil {
panic(err)
}
rcvBuf, err := syscall.GetsockoptInt(int(fd.Fd()), syscall.SOL_SOCKET, syscall.SO_RCVBUF)
if err != nil {
panic(err)
}
log.Printf("server conn rcvbuf: %d, sendbuf: %d .\n", rcvBuf, sndBuf)
failedTimes := 0
recivedSize := 0
for {
log.Print("server starting to recv data...")
var data = make([]byte, length)
size, err := conn.Read(data)
log.Printf("recivedSize: %d", size)
if err != nil {
failedTimes += 1
log.Printf("recv data from client: %s, failedTimes: %d, reason: %v", conn.RemoteAddr(), failedTimes, err)
}
if failedTimes > 2 {
return
}
recivedSize = recivedSize + size
if recivedSize == length {
failedTimes = 0
recivedSize = 0
log.Print("recv done, server starting to sleep a while.")
time.Sleep(1 * time.Second)
}
}
}
func startClient(addr string) {
conn, err := net.Dial("tcp", addr)
if err != nil {
panic(err)
}
defer conn.Close()
tc := conn.(*net.TCPConn)
fd, err := tc.File()
if err != nil {
panic(err)
}
if err := syscall.SetsockoptInt(int(fd.Fd()), syscall.SOL_SOCKET, syscall.SO_REUSEADDR, 1); err != nil {
panic(err)
}
if err := syscall.SetsockoptInt(int(fd.Fd()), syscall.SOL_SOCKET, syscall.SO_KEEPALIVE, 1); err != nil {
panic(err)
}
//if err := syscall.SetsockoptInt(int(fd.Fd()), syscall.SOL_SOCKET, syscall.TCP_NODELAY, 1); err != nil {
// panic(err)
//}
//err = syscall.SetsockoptInt(int(fd.Fd()), syscall.SOL_SOCKET, syscall.SO_RCVBUF, 131072)
//if err != nil {
// panic(err)
//}
err = syscall.SetsockoptInt(int(fd.Fd()), syscall.SOL_SOCKET, syscall.SO_SNDBUF, 512)
if err != nil {
panic(err)
}
for {
log.Print("client starting to send data..")
data := Random2MBData()
dataSize := len(data)
log.Printf("client data size: %d", dataSize)
failedTimes := 0
for i := 0; i < dataSize; {
size, err := conn.Write(data[i:])
log.Printf("client send size: %d", size)
i = i + size
if err != nil {
failedTimes += 1
log.Printf("send data to server %s failed times: %d, reason: %v", conn.RemoteAddr(), failedTimes, err)
}
if failedTimes > 2 {
break
}
}
if failedTimes > 2 {
return
}
if dataSize == length {
log.Print("send done, client starting sleep a while...")
time.Sleep(60 * time.Second)
}
}
}
func startServer(addr string) {
listen, err := net.Listen("tcp", addr)
if err != nil {
panic(err)
}
defer listen.Close()
tc := listen.(*net.TCPListener)
lfd, err := tc.File()
if err != nil {
panic(err)
}
if err := syscall.SetsockoptInt(int(lfd.Fd()), syscall.SOL_SOCKET, syscall.SO_REUSEADDR, 1); err != nil {
panic(err)
}
if err := syscall.SetsockoptInt(int(lfd.Fd()), syscall.SOL_SOCKET, syscall.SO_KEEPALIVE, 1); err != nil {
panic(err)
}
//if err := syscall.SetsockoptInt(int(lfd.Fd()), syscall.SOL_SOCKET, syscall.TCP_NODELAY, 1); err != nil {
// panic(err)
//}
if err := syscall.SetsockoptInt(int(lfd.Fd()), syscall.SOL_SOCKET, syscall.SO_RCVBUF, 1024); err != nil {
panic(err)
}
if err := syscall.SetsockoptInt(int(lfd.Fd()), syscall.SOL_SOCKET, syscall.SO_SNDBUF, 2048); err != nil {
panic(err)
}
log.Printf("")
for {
conn, err := listen.Accept()
//tc := conn.(*net.TCPConn)
//fd, err := tc.File()
//if err != nil {
// panic(err)
//}
//err = syscall.SetsockoptInt(int(fd.Fd()), syscall.SOL_SOCKET, syscall.SO_RCVBUF, 1024)
//if err != nil {
// panic(err)
//}
//err = syscall.SetsockoptInt(int(fd.Fd()), syscall.SOL_SOCKET, syscall.SO_SNDBUF, 2048)
//if err != nil {
// panic(err)
//}
if err != nil {
log.Printf("accept conn failed: %v", err)
continue
}
go process(conn)
}
}
func main() {
flag.Parse()
log.Printf("ptype: %s", *ptype)
var f func(string)
if *ptype == "server" {
f = startServer
} else {
f = startClient
}
f(*addr)
}
用法:
client端:
$ go-test-tcp -ftype client -addr <server-addr>
server端:
$ go-test-tcp -ftype server
启动之后抓包看到的结果是,设置SO_RCVBUF,是可以改变Window Size的。
甚至我还用上面的代码还原了故障…
那现在只需要确定一下,Nameserver的SO_RCVBUF设置的大小是不是跟JVM配置参数-Dcom.rocketmq.remoting.socket.rcvbuf.size=1048576 配置的是否一样,就可以确定原因了。
但是有一个问题是一般socket的SO_RCVBUF大小是进程自listen socket的大小的,而listen socket的创建逻辑和设置SO_RCVBUF都是在进程启动的时候配置的,除非我们停止进程,不然没办法用strace看到系统调用…
那么有没有其他命令可以看到呢….不幸的是,我Google了好久,都没找到可以拿到tcp连接的SO_RCVBUF大小的方案…
根据之前的经验,Nameserver可以短暂的停机,那么就用strace 追踪一下进程启动的时候设置的大小:
$ strace -ff <broker-start-command> &> strace.log
还记得刚才说的那个wireshark没有提示Window Full的情况吗,我们趁着这个机会还可以抓包确定一下。
抓出来的包的内容:
可以看到,就是因为WIndowFull导致的。基本上可以确定我的判断了
那么再看一下strace.log怎么配置SO_RCVBUF的:
1024….!
我自己又翻了一下代码,发现在NamesrvStartup.java:
写的确实是1024,那为什么改了没生效呢….
解决问题
后面再次跟开发大佬确定,给的结果是,读取配置的逻辑权重大小是:配置文件 > arguments 。
跟我说nameserver的配置文件写的是1024:
好吧…把serverSocketRcvBufSize改成1024000:
保存,重新启动Nameserver,问题解决。
这里延伸的说一下,如果这次出问题的是另外的一个重要的服务,不能停机,那么是不是就没办法用strace来看了,以我的看法来说是的。
这个问题比较头疼,那么我想了一下,应该是可以拿到这个值的,翻了一下内核的代码,SO_RCVBUF这个值是在:
https://github.com/torvalds/linux/blob/master/include/net/sock.h#L415
如何不结束进程看到SO_RCVBUF和SO_SNDBUF的大小
可以借助于systemtap,在预留好的tcp_recvmsg这个探针这里拿到sock这个结构体的指针,可以看到SO_RCVBUF和SO_SNDBUF的大小,我写好的systemtap脚本:
#! /usr/bin/env stap
%{
#include <linux/version.h>
#include <net/sock.h>
#include <net/tcp.h>
#include <net/ip.h>
#include <linux/skbuff.h>
%}
function get_rcvbuf_from_sock(sock:long)
%{ /* pure */
struct sock *sk = (struct sock *)(uintptr_t) STAP_ARG_sock;
STAP_RETVALUE = (int) kread(&(sk->sk_rcvbuf));
CATCH_DEREF_FAULT();
%}
function get_sndbuf_from_sock(sock:long)
%{ /* pure */
struct sock *sk = (struct sock *)(uintptr_t) STAP_ARG_sock;
STAP_RETVALUE = (int) kread(&(sk->sk_sndbuf));
CATCH_DEREF_FAULT();
%}
probe begin {
printf("%6s %16s %6s %6s %6s %16s %12s %12s\n",
"UID", "CMD", "PID", "PORT", "DPORT", "IP_SOURCE", "RCVBUF", "SNDBUF")
}
probe kernel.function("tcp_sendmsg"){
if(inet_get_ip_source($sk) == @1 && __tcp_sock_dport($sk) == $2){
printf("%6d %16s %6d %6d %6d %16s %d %d\n", uid(), execname(), pid(),
inet_get_local_port($sk), __tcp_sock_dport($sk), inet_get_ip_source($sk), get_rcvbuf_from_sock($sk), get_sndbuf_from_sock($sk) )
}
}
使用方法:
$ stap -g get_sock_buf.stp <remote sock addr> <remote sock port>
输出的内容截图: