Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions src/brpc/socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -718,7 +718,7 @@ int Socket::OnCreated(const SocketOptions& options) {
return -1;
}
_io_event.set_bthread_tag(options.bthread_tag);
auto guard = butil::MakeScopeGuard([this] {
auto io_event_guard = butil::MakeScopeGuard([this] {
_io_event.Reset();
});

Expand Down Expand Up @@ -808,6 +808,12 @@ int Socket::OnCreated(const SocketOptions& options) {
return -1;
}
}

HoldHCRelatedRef();
auto hc_ref_guard = butil::MakeScopeGuard([this] {
ReleaseHCRelatedReference();
});

// Must be the last one! Internal fields of this Socket may be accessed
// just after calling ResetFileDescriptor.
if (ResetFileDescriptor(fd) != 0) {
Expand All @@ -817,8 +823,9 @@ int Socket::OnCreated(const SocketOptions& options) {
berror(saved_errno));
return -1;
}
HoldHCRelatedRef();
guard.dismiss();

hc_ref_guard.dismiss();
io_event_guard.dismiss();

return 0;
}
Expand Down
3 changes: 0 additions & 3 deletions src/brpc/socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -420,9 +420,6 @@ friend void DereferenceSocket(Socket*);

// True if health checking is enabled.
bool HCEnabled() const {
// This fence makes sure that we see change of
// `_is_hc_related_ref_held' before changing `_versioned_ref.
butil::atomic_thread_fence(butil::memory_order_acquire);
return _health_check_interval_s > 0 && _is_hc_related_ref_held;
}

Expand Down
14 changes: 11 additions & 3 deletions src/brpc/socket_map.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,8 @@ int SocketMap::Insert(const SocketMapKey& key, SocketId* id,
std::unique_lock<butil::Mutex> mu(_mutex);
SingleConnection* sc = _map.seek(key);
if (sc) {
// The `_mutex' guarantees the consistent state
// of `_is_hc_related_ref_held' in SocketMap.
if (!sc->socket->Failed() || sc->socket->HCEnabled()) {
++sc->ref_count;
*id = sc->socket->id();
Expand All @@ -255,7 +257,7 @@ int SocketMap::Insert(const SocketMapKey& key, SocketId* id,
opt.use_rdma = use_rdma;
opt.hc_option = hc_option;
if (_options.socket_creator->CreateSocket(opt, &tmp_id) != 0) {
PLOG(FATAL) << "Fail to create socket to " << key.peer;
PLOG(ERROR) << "Fail to create socket to " << key.peer;
return -1;
}
// Add a reference to make sure that sc->socket is always accessible. Not
Expand All @@ -266,13 +268,18 @@ int SocketMap::Insert(const SocketMapKey& key, SocketId* id,
if (rc < 0) {
LOG(FATAL) << "Fail to address SocketId=" << tmp_id;
return -1;
} else if (rc > 0 && !ptr->HCEnabled()) {
LOG(FATAL) << "Failed socket is not HC-enabled";
} else if (rc > 0 &&
// The `_mutex' guarantees the consistent state
// of `_is_hc_related_ref_held' in SocketMap.
!ptr->HCEnabled()) {
LOG(ERROR) << "Failed socket is not HC-enabled";
return -1;
}
// If health check is enabled, a health-checking-related reference
// is hold in Socket::Create.
// If health check is disabled, hold a reference in SocketMap.
// The `_mutex' guarantees the consistent state
// of `_is_hc_related_ref_held' in SocketMap.
SingleConnection new_sc = { 1, ptr->HCEnabled() ? ptr.get() : ptr.release(), 0 };
_map[key] = new_sc;
*id = tmp_id;
Expand Down Expand Up @@ -317,6 +324,7 @@ void SocketMap::RemoveInternal(const SocketMapKey& key,
}

void SocketMap::ReleaseReference(Socket* s) {
// The `_mutex' guarantees the consistent state of `_is_hc_related_ref_held' in SocketMap.
if (s->HCEnabled()) {
s->ReleaseHCRelatedReference();
} else {
Expand Down
2 changes: 1 addition & 1 deletion src/bvar/detail/sampler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ void SamplerCollector::run() {
// NOTE:
// * Following vars can't be created on thread's stack since this thread
// may be abandoned at any time after forking.
// * They can't created inside the constructor of SamplerCollector as well,
// * They can't be created inside the constructor of SamplerCollector as well,
// which results in deadlock.
if (s_cumulated_time_bvar == NULL) {
s_cumulated_time_bvar =
Expand Down
Loading