Skip to content

Commit

Permalink
[perf] optimize dynamic run mothed by remove a mutex
Browse files Browse the repository at this point in the history
  • Loading branch information
ChunelFeng committed Oct 21, 2023
1 parent 58cc4f4 commit 9342f7d
Show file tree
Hide file tree
Showing 5 changed files with 10 additions and 19 deletions.
2 changes: 1 addition & 1 deletion src/GraphCtrl/GraphElement/GElement.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ CIndex GElement::getThreadIndex() {
}


GElement* GElement::setThreadPool(UThreadPoolPtr ptr) {
GElementPtr GElement::setThreadPool(UThreadPoolPtr ptr) {
CGRAPH_ASSERT_NOT_NULL_THROW_ERROR(ptr)
CGRAPH_ASSERT_INIT_THROW_ERROR(false)
this->thread_pool_ = ptr;
Expand Down
2 changes: 1 addition & 1 deletion src/GraphCtrl/GraphElement/GElement.h
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ class GElement : public GElementObject,
GAspectManagerPtr aspect_manager_ { nullptr }; // 整体流程的切面管理类
UThreadPoolPtr thread_pool_ { nullptr }; // 用于执行的线程池信息
GPerfInfo* perf_info_ = nullptr; // 用于perf的信息
CLong trigger_times_ { 0 }; // 被触发的次数信息(loop执行n次,算触发1次)
CULong trigger_times_ { 0 }; // 被触发的次数信息(loop执行n次,算触发1次)

/** 图相关信息 */
std::atomic<CSize> left_depend_ { 0 }; // 当 left_depend_ 值为0的时候,即可以执行该element信息
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,23 +42,17 @@ CStatus GDynamicEngine::run() {

asyncRun();

if (!cur_status_.isOK()) {
status = cur_status_; // 如果不是ok的话,进行赋值
}
status = cur_status_;
CGRAPH_FUNCTION_END
}


CStatus GDynamicEngine::afterRunCheck() {
CGRAPH_FUNCTION_BEGIN
if (run_element_size_ != total_element_arr_.size()) {
CGRAPH_RETURN_ERROR_STATUS("dynamic engine run element size not match...")
}

CGRAPH_RETURN_ERROR_STATUS_BY_CONDITION(run_element_size_ != total_element_arr_.size(), \
"dynamic engine run element size not match...")
for (GElementPtr element : total_element_arr_) {
if (!element->done_) {
CGRAPH_RETURN_ERROR_STATUS("dynamic engine run check failed...")
}
CGRAPH_RETURN_ERROR_STATUS_BY_CONDITION(!element->done_, "dynamic engine run check failed...")
}

CGRAPH_FUNCTION_END
Expand Down Expand Up @@ -95,10 +89,7 @@ CStatus GDynamicEngine::beforeRun() {

CStatus GDynamicEngine::process(GElementPtr element, CBool affinity) {
CGRAPH_FUNCTION_BEGIN
if (unlikely(cur_status_.isErr())) {
// 如果当前整体状态异常,直接返回,不执行了
CGRAPH_RETURN_ERROR_STATUS("current status error")
}
CGRAPH_RETURN_ERROR_STATUS_BY_CONDITION(cur_status_.isErr(), "current status error");

const auto& exec = [this, element] {
auto curStatus = element->fatProcessor(CFunctionType::RUN);
Expand Down Expand Up @@ -136,7 +127,6 @@ CVoid GDynamicEngine::afterElementRun(GElementPtr element) {
process(cur, cur == ready.back());
}

CGRAPH_UNIQUE_LOCK lock(lock_);
/**
* 满足一下条件之一,则通知wait函数停止等待
* 1,无后缀节点全部执行完毕
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ class GDynamicEngine : public GEngine {
GElementPtrArr total_element_arr_; // pipeline中所有的元素信息集合
GElementPtrArr front_element_arr_; // 没有依赖的元素信息
CSize total_end_size_ = 0; // 图结束节点数量
CSize finished_end_size_ = 0; // 执行结束节点数量
std::atomic<CSize> run_element_size_ {0}; // 执行元素的个数,用于后期校验。这里和静态不一样,需要加atomic
std::atomic<CSize> finished_end_size_ { 0 }; // 执行结束节点数量
std::atomic<CSize> run_element_size_ { 0 }; // 执行元素的个数,用于后期校验。这里和静态不一样,需要加atomic
CStatus cur_status_; // 当前全局的状态信息

std::mutex lock_;
Expand Down
1 change: 1 addition & 0 deletions src/UtilsCtrl/ThreadPool/Thread/UThreadPrimary.h
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ class UThreadPrimary : public UThreadBase {
auto target = (index_ + i + 1) % config_->default_thread_size_;
steal_targets_.push_back(target);
}
steal_targets_.shrink_to_fit();
}

private:
Expand Down

0 comments on commit 9342f7d

Please sign in to comment.