Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion src/brpc/channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ ChannelSSLOptions* ChannelOptions::mutable_ssl_options() {
static ChannelSignature ComputeChannelSignature(const ChannelOptions& opt) {
if (opt.auth == NULL &&
!opt.has_ssl_options() &&
opt.device_name.empty() &&
opt.connection_group.empty() &&
opt.hc_option.health_check_path.empty()) {
// Returning zeroized result by default is more intuitive for users.
Expand All @@ -94,6 +95,10 @@ static ChannelSignature ComputeChannelSignature(const ChannelOptions& opt) {
buf.append("|conng=");
buf.append(opt.connection_group);
}
if (!opt.device_name.empty()) {
buf.append("|devn=");
buf.append(opt.device_name);
}
if (opt.auth) {
buf.append("|auth=");
buf.append((char*)&opt.auth, sizeof(opt.auth));
Expand Down Expand Up @@ -369,7 +374,8 @@ int Channel::InitSingle(const butil::EndPoint& server_addr_and_port,
return -1;
}
if (SocketMapInsert(SocketMapKey(server_addr_and_port, sig),
&_server_id, ssl_ctx, _options.use_rdma, _options.hc_option) != 0) {
&_server_id, ssl_ctx, _options.use_rdma,
_options.hc_option, _options.device_name) != 0) {
LOG(ERROR) << "Fail to insert into SocketMap";
return -1;
}
Expand Down Expand Up @@ -409,6 +415,7 @@ int Channel::Init(const char* ns_url,
ns_opt.use_rdma = _options.use_rdma;
ns_opt.channel_signature = ComputeChannelSignature(_options);
ns_opt.hc_option = _options.hc_option;
ns_opt.device_name = _options.device_name;
if (CreateSocketSSLContext(_options, &ns_opt.ssl_ctx) != 0) {
return -1;
}
Expand Down
5 changes: 5 additions & 0 deletions src/brpc/channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,11 @@ struct ChannelOptions {
// Its priority is higher than FLAGS_health_check_path and FLAGS_health_check_timeout_ms.
// When it is not set, FLAGS_health_check_path and FLAGS_health_check_timeout_ms will take effect.
HealthCheckOption hc_option;

// The device name of the client's network adapter.
// if the device_name is "", the flow control is determined by the OS.
// Default: ""
std::string device_name;
private:
// SSLOptions is large and not often used, allocate it on heap to
// prevent ChannelOptions from being bloated in most cases.
Expand Down
3 changes: 2 additions & 1 deletion src/brpc/details/naming_service_thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,8 @@ void NamingServiceThread::Actions::ResetServers(
// to pick those Sockets with the right settings during OnAddedServers
const SocketMapKey key(_added[i], _owner->_options.channel_signature);
CHECK_EQ(0, SocketMapInsert(key, &tagged_id.id, _owner->_options.ssl_ctx,
_owner->_options.use_rdma, _owner->_options.hc_option));
_owner->_options.use_rdma, _owner->_options.hc_option,
_owner->_options.device_name));
_added_sockets.push_back(tagged_id);
}

Expand Down
1 change: 1 addition & 0 deletions src/brpc/details/naming_service_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ struct GetNamingServiceThreadOptions {
HealthCheckOption hc_option;
ChannelSignature channel_signature;
std::shared_ptr<SocketSSLContext> ssl_ctx;
std::string device_name;
};

// A dedicated thread to map a name to ServerIds
Expand Down
10 changes: 9 additions & 1 deletion src/brpc/socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -729,6 +729,7 @@ int Socket::OnCreated(const SocketOptions& options) {
_tos = 0;
_remote_side = options.remote_side;
_local_side = butil::EndPoint();
_device_name = options.device_name;
_on_edge_triggered_events = options.on_edge_triggered_events;
_user = options.user;
_conn = options.conn;
Expand Down Expand Up @@ -1296,7 +1297,14 @@ int Socket::Connect(const timespec* abstime,
CHECK_EQ(0, butil::make_close_on_exec(sockfd));
// We need to do async connect (to manage the timeout by ourselves).
CHECK_EQ(0, butil::make_non_blocking(sockfd));

if (!_device_name.empty()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

不是很确定 #3157 需要的是指定source IP,还是指定网络设备。如果只需要指定source IP, 之前的bind可以满足需求。这里要不要把之前的那个client_host选项一并加进来?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

确实可以加进来,让网络配置更精细化,我有空合并一下

if (setsockopt(sockfd, SOL_SOCKET, SO_BINDTODEVICE,
_device_name.c_str(), _device_name.size()) < 0) {
PLOG(ERROR) << "Fail to set SO_BINDTODEVICE of fd=" << sockfd
<< " to device_name=" << _device_name;
return -1;
}
}
const int rc = ::connect(
sockfd, (struct sockaddr*)&serv_addr, addr_size);
if (rc != 0 && errno != EINPROGRESS) {
Expand Down
4 changes: 4 additions & 0 deletions src/brpc/socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ struct SocketOptions {
// user->BeforeRecycle() before recycling.
int fd{-1};
butil::EndPoint remote_side;
std::string device_name;
// If `connect_on_create' is true and `fd' is less than 0,
// a client connection will be established to remote_side()
// regarding deadline `connect_abstime' when Socket is being created.
Expand Down Expand Up @@ -830,6 +831,9 @@ friend void DereferenceSocket(Socket*);
// Address of self. Initialized in ResetFileDescriptor().
butil::EndPoint _local_side;

// The device name of the client's network adapter.
std::string _device_name;

// Called when edge-triggered events happened on `_fd'. Read comments
// of EventDispatcher::AddConsumer (event_dispatcher.h)
// carefully before implementing the callback.
Expand Down
9 changes: 6 additions & 3 deletions src/brpc/socket_map.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,9 @@ SocketMap* get_or_new_client_side_socket_map() {
int SocketMapInsert(const SocketMapKey& key, SocketId* id,
const std::shared_ptr<SocketSSLContext>& ssl_ctx,
bool use_rdma,
const HealthCheckOption& hc_option) {
return get_or_new_client_side_socket_map()->Insert(key, id, ssl_ctx, use_rdma, hc_option);
const HealthCheckOption& hc_option,
const std::string& device_name) {
return get_or_new_client_side_socket_map()->Insert(key, id, ssl_ctx, use_rdma, hc_option, device_name);
}

int SocketMapFind(const SocketMapKey& key, SocketId* id) {
Expand Down Expand Up @@ -229,7 +230,8 @@ void SocketMap::ShowSocketMapInBvarIfNeed() {
int SocketMap::Insert(const SocketMapKey& key, SocketId* id,
const std::shared_ptr<SocketSSLContext>& ssl_ctx,
bool use_rdma,
const HealthCheckOption& hc_option) {
const HealthCheckOption& hc_option,
const std::string& device_name) {
ShowSocketMapInBvarIfNeed();

std::unique_lock<butil::Mutex> mu(_mutex);
Expand All @@ -251,6 +253,7 @@ int SocketMap::Insert(const SocketMapKey& key, SocketId* id,
SocketId tmp_id;
SocketOptions opt;
opt.remote_side = key.peer.addr;
opt.device_name = device_name;
opt.initial_ssl_ctx = ssl_ctx;
opt.use_rdma = use_rdma;
opt.hc_option = hc_option;
Expand Down
14 changes: 8 additions & 6 deletions src/brpc/socket_map.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,18 +82,19 @@ struct SocketMapKeyHasher {
int SocketMapInsert(const SocketMapKey& key, SocketId* id,
const std::shared_ptr<SocketSSLContext>& ssl_ctx,
bool use_rdma,
const HealthCheckOption& hc_option);
const HealthCheckOption& hc_option,
const std::string& device_name);

inline int SocketMapInsert(const SocketMapKey& key, SocketId* id,
const std::shared_ptr<SocketSSLContext>& ssl_ctx) {
HealthCheckOption hc_option;
return SocketMapInsert(key, id, ssl_ctx, false, hc_option);
return SocketMapInsert(key, id, ssl_ctx, false, hc_option, "");
}

inline int SocketMapInsert(const SocketMapKey& key, SocketId* id) {
std::shared_ptr<SocketSSLContext> empty_ptr;
HealthCheckOption hc_option;
return SocketMapInsert(key, id, empty_ptr, false, hc_option);
return SocketMapInsert(key, id, empty_ptr, false, hc_option, "");
}

// Find the SocketId associated with `key'.
Expand Down Expand Up @@ -155,17 +156,18 @@ class SocketMap {
int Insert(const SocketMapKey& key, SocketId* id,
const std::shared_ptr<SocketSSLContext>& ssl_ctx,
bool use_rdma,
const HealthCheckOption& hc_option);
const HealthCheckOption& hc_option,
const std::string& device_name);

int Insert(const SocketMapKey& key, SocketId* id,
const std::shared_ptr<SocketSSLContext>& ssl_ctx) {
HealthCheckOption hc_option;
return Insert(key, id, ssl_ctx, false, hc_option);
return Insert(key, id, ssl_ctx, false, hc_option, "");
}
int Insert(const SocketMapKey& key, SocketId* id) {
std::shared_ptr<SocketSSLContext> empty_ptr;
HealthCheckOption hc_option;
return Insert(key, id, empty_ptr, false, hc_option);
return Insert(key, id, empty_ptr, false, hc_option, "");
}

void Remove(const SocketMapKey& key, SocketId expected_id);
Expand Down
44 changes: 44 additions & 0 deletions test/brpc_server_unittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2070,4 +2070,48 @@ TEST_F(ServerTest, auth) {
ASSERT_EQ(0, server.Join());
}

void TestClientHost(const butil::EndPoint& ep,
brpc::Controller& cntl,
int error_code, bool failed,
brpc::ChannelOptions& copt) {
brpc::Channel chan;
copt.max_retry = 0;
ASSERT_EQ(0, chan.Init(ep, &copt));

test::EchoRequest req;
test::EchoResponse res;
req.set_message(EXP_REQUEST);
test::EchoService_Stub stub(&chan);
stub.Echo(&cntl, &req, &res, NULL);
ASSERT_EQ(cntl.Failed(), failed) << cntl.ErrorText();
ASSERT_EQ(cntl.ErrorCode(), error_code);
}

TEST_F(ServerTest, network_device_name) {
butil::EndPoint ep;
ASSERT_EQ(0, str2endpoint("127.0.0.1:8613", &ep));
brpc::Server server;
EchoServiceImpl service;
ASSERT_EQ(0, server.AddService(&service, brpc::SERVER_DOESNT_OWN_SERVICE));
brpc::ServerOptions opt;
ASSERT_EQ(0, server.Start(ep, &opt));

brpc::Controller cntl;
brpc::ChannelOptions copt;
copt.device_name = "lo";
std::vector<brpc::ConnectionType> connection_types = {
brpc::CONNECTION_TYPE_SINGLE,
brpc::CONNECTION_TYPE_POOLED,
brpc::CONNECTION_TYPE_SHORT
};
for (auto connect_type : connection_types) {
copt.connection_type = connect_type;
TestClientHost(ep, cntl, 0, false, copt);
cntl.Reset();
}

ASSERT_EQ(0, server.Stop(0));
ASSERT_EQ(0, server.Join());
}

} //namespace
Loading