27 #ifndef EMBB_DATAFLOW_NETWORK_H_ 28 #define EMBB_DATAFLOW_NETWORK_H_ 30 #if EMBB_DATAFLOW_TRACE_SIGNAL_HISTORY 34 #include <embb/base/atomic.h> 35 #include <embb/base/thread.h> 37 #include <embb/dataflow/internal/select.h> 38 #include <embb/dataflow/internal/switch.h> 39 #include <embb/dataflow/internal/constant_source.h> 40 #include <embb/dataflow/internal/source.h> 41 #include <embb/dataflow/internal/process.h> 42 #include <embb/dataflow/internal/sink.h> 44 #include <embb/dataflow/internal/scheduler_sequential.h> 45 #include <embb/dataflow/internal/scheduler_mtapi.h> 108 template <
typename Type>
115 template <
typename Type>
129 void Connect(InType & input);
137 void operator >> (InType & input);
148 template <
typename T1,
typename T2 = embb::base::internal::Nil,
149 typename T3 = embb::base::internal::Nil,
150 typename T4 = embb::base::internal::Nil,
151 typename T5 = embb::base::internal::Nil>
181 template <
typename T1,
typename T2 = embb::base::internal::Nil,
182 typename T3 = embb::base::internal::Nil,
183 typename T4 = embb::base::internal::Nil,
184 typename T5 = embb::base::internal::Nil>
219 template <
class Inputs,
class Outputs>
280 virtual bool HasInputs()
const;
285 InputsType & GetInputs();
297 virtual bool HasOutputs()
const;
302 OutputsType & GetOutputs();
315 template <
typename T>
316 void operator >> (T & target);
332 template <
class Inputs,
class Outputs>
393 virtual bool HasInputs()
const;
398 InputsType & GetInputs();
410 virtual bool HasOutputs()
const;
415 OutputsType & GetOutputs();
428 template <
typename T>
429 void operator >> (T & target);
445 template<
typename Type>
479 virtual bool HasInputs()
const;
484 InputsType & GetInputs();
495 virtual bool HasOutputs()
const;
500 OutputsType & GetOutputs();
513 template <
typename T>
514 void operator >> (T & target);
530 template<
typename Type>
564 virtual bool HasInputs()
const;
569 InputsType & GetInputs();
580 virtual bool HasOutputs()
const;
585 OutputsType & GetOutputs();
598 template <
typename T>
599 void operator >> (T & target);
618 template<
typename I1,
typename I2 = embb::base::internal::Nil,
619 typename I3 = embb::base::internal::Nil,
620 typename I4 = embb::base::internal::Nil,
621 typename I5 = embb::base::internal::Nil>
675 virtual bool HasInputs()
const;
680 InputsType & GetInputs();
691 virtual bool HasOutputs()
const;
709 template<
typename O1,
typename O2 = embb::base::internal::Nil,
710 typename O3 = embb::base::internal::Nil,
711 typename O4 = embb::base::internal::Nil,
712 typename O5 = embb::base::internal::Nil>
766 virtual bool HasInputs()
const;
771 virtual bool HasOutputs()
const;
776 OutputsType & GetOutputs();
789 template <
typename T>
790 void operator >> (T & target);
801 template<
typename Type>
828 virtual bool HasInputs()
const;
833 virtual bool HasOutputs()
const;
838 OutputsType & GetOutputs();
851 template <
typename T>
852 void operator >> (T & target);
877 class Network :
public internal::ClockListener {
880 : sink_counter_(NULL), sink_count_(0)
881 , slices_(0), sched_(NULL), policy_() {
886 : sink_counter_(NULL), sink_count_(0)
887 , slices_(slices), sched_(NULL), policy_() {
892 : sink_counter_(NULL), sink_count_(0)
893 , slices_(0), sched_(NULL), policy_(policy) {
897 : sink_counter_(NULL), sink_count_(0)
898 , slices_(slices), sched_(NULL), policy_(policy) {
903 if (NULL != sched_) {
907 if (NULL != sink_counter_) {
908 for (
int ii = 0; ii < slices_; ii++) {
909 sink_counter_[ii].~Atomic<
int>();
912 sink_counter_ = NULL;
916 template <
typename T1,
917 typename T2 = embb::base::internal::Nil,
918 typename T3 = embb::base::internal::Nil,
919 typename T4 = embb::base::internal::Nil,
920 typename T5 = embb::base::internal::Nil>
925 template <
typename T1,
926 typename T2 = embb::base::internal::Nil,
927 typename T3 = embb::base::internal::Nil,
928 typename T4 = embb::base::internal::Nil,
929 typename T5 = embb::base::internal::Nil>
937 typename I1,
typename I2,
typename I3,
typename I4,
typename I5,
938 typename O1,
typename O2,
typename O3,
typename O4,
typename O5>
941 :
public internal::Process< true,
942 internal::Inputs<I1, I2, I3, I4, I5>,
943 internal::Outputs<O1, O2, O3, O4, O5> > {
945 typedef typename internal::Process<
true,
946 internal::Inputs<I1, I2, I3, I4, I5>,
947 internal::Outputs<O1, O2, O3, O4, O5> >::FunctionType
951 : internal::Process< true,
952 internal::
Inputs<I1, I2, I3, I4, I5>,
953 internal::
Outputs<O1, O2, O3, O4, O5> >(
954 network.sched_, function) {
955 this->SetPolicy(network.policy_);
956 network.processes_.push_back(
this);
961 : internal::Process< true,
962 internal::
Inputs<I1, I2, I3, I4, I5>,
963 internal::
Outputs<O1, O2, O3, O4, O5> >(
964 network.sched_, function) {
965 this->SetPolicy(policy);
966 network.processes_.push_back(
this);
970 : internal::Process< true,
971 internal::
Inputs<I1, I2, I3, I4, I5>,
972 internal::
Outputs<O1, O2, O3, O4, O5> >(
973 network.sched_, job) {
974 network.processes_.push_back(
this);
981 typename I1,
typename I2,
typename I3,
typename I4,
typename I5,
982 typename O1,
typename O2,
typename O3,
typename O4,
typename O5>
985 :
public internal::Process< false,
986 internal::Inputs<I1, I2, I3, I4, I5>,
987 internal::Outputs<O1, O2, O3, O4, O5> >{
989 typedef typename internal::Process<
false,
990 internal::Inputs<I1, I2, I3, I4, I5>,
991 internal::Outputs<O1, O2, O3, O4, O5> >::FunctionType
995 : internal::Process< false,
996 internal::
Inputs<I1, I2, I3, I4, I5>,
997 internal::
Outputs<O1, O2, O3, O4, O5> >(
998 network.sched_, function) {
999 this->SetPolicy(network.policy_);
1000 network.processes_.push_back(
this);
1005 : internal::Process< false,
1006 internal::
Inputs<I1, I2, I3, I4, I5>,
1007 internal::
Outputs<O1, O2, O3, O4, O5> >(
1008 network.sched_, function) {
1009 this->SetPolicy(policy);
1010 network.processes_.push_back(
this);
1014 : internal::Process< false,
1015 internal::
Inputs<I1, I2, I3, I4, I5>,
1016 internal::
Outputs<O1, O2, O3, O4, O5> >(
1017 network.sched_, job) {
1018 network.processes_.push_back(
this);
1022 template<
typename Type>
1023 class Switch :
public internal::Switch<Type> {
1026 : internal::
Switch<Type>(network.sched_) {
1027 this->SetPolicy(network.policy_);
1028 network.processes_.push_back(
this);
1032 : internal::
Switch<Type>(network.sched_) {
1033 this->SetPolicy(policy);
1034 network.processes_.push_back(
this);
1038 template<
typename Type>
1039 class Select :
public internal::Select<Type> {
1042 : internal::
Select<Type>(network.sched_) {
1043 this->SetPolicy(network.policy_);
1044 network.processes_.push_back(
this);
1048 : internal::
Select<Type>(network.sched_) {
1049 this->SetPolicy(policy);
1050 network.processes_.push_back(
this);
1054 template<
typename I1,
typename I2 = embb::base::internal::Nil,
1055 typename I3 = embb::base::internal::Nil,
1056 typename I4 = embb::base::internal::Nil,
1057 typename I5 = embb::base::internal::Nil>
1058 class Sink :
public internal::Sink<
1059 internal::Inputs<I1, I2, I3, I4, I5> > {
1061 typedef typename internal::Sink<
1062 internal::Inputs<I1, I2, I3, I4, I5> >::FunctionType FunctionType;
1066 internal::
Inputs<I1, I2, I3, I4, I5> >(
1067 network.sched_, &network, function) {
1068 this->SetPolicy(network.policy_);
1069 network.sinks_.push_back(
this);
1070 network.sink_count_++;
1076 internal::
Inputs<I1, I2, I3, I4, I5> >(
1077 network.sched_, &network, function) {
1078 this->SetPolicy(policy);
1079 network.sinks_.push_back(
this);
1080 network.sink_count_++;
1085 internal::
Inputs<I1, I2, I3, I4, I5> >(
1086 network.sched_, &network, job) {
1087 network.sinks_.push_back(
this);
1088 network.sink_count_++;
1092 template<
typename O1,
typename O2 = embb::base::internal::Nil,
1093 typename O3 = embb::base::internal::Nil,
1094 typename O4 = embb::base::internal::Nil,
1095 typename O5 = embb::base::internal::Nil>
1096 class Source :
public internal::Source<
1097 internal::Outputs<O1, O2, O3, O4, O5> > {
1099 typedef typename internal::Source<
1100 internal::Outputs<O1, O2, O3, O4, O5> >::FunctionType
1105 internal::
Outputs<O1, O2, O3, O4, O5> >(network.sched_, function) {
1106 this->SetPolicy(network.policy_);
1107 network.sources_.push_back(
this);
1113 internal::
Outputs<O1, O2, O3, O4, O5> >(network.sched_, function) {
1114 this->SetPolicy(policy);
1115 network.sources_.push_back(
this);
1120 internal::
Outputs< O1, O2, O3, O4, O5> >(network.sched_, job) {
1121 network.sources_.push_back(
this);
1125 template<
typename Type>
1130 this->SetPolicy(network.policy_);
1131 network.sources_.push_back(
this);
1137 this->SetPolicy(policy);
1138 network.sources_.push_back(
this);
1145 for (
size_t ii = 0; ii < sources_.size() && valid; ii++) {
1146 valid = valid && sources_[ii]->IsFullyConnected();
1148 for (
size_t ii = 0; ii < processes_.size() && valid; ii++) {
1149 valid = valid && processes_[ii]->IsFullyConnected();
1151 for (
size_t ii = 0; ii < sinks_.size() && valid; ii++) {
1152 valid = valid && sinks_[ii]->IsFullyConnected();
1155 for (
size_t ii = 0; ii < processes_.size() && valid; ii++) {
1156 valid = valid && !processes_[ii]->HasCycle();
1163 slices_ =
static_cast<int>(
1166 for (
size_t ii = 0; ii < processes_.size(); ii++) {
1167 int tt = processes_[ii]->IsSequential() ? 1 :
1172 for (
size_t ii = 0; ii < sources_.size(); ii++) {
1173 sources_[ii]->SetScheduler(sched_);
1175 for (
size_t ii = 0; ii < processes_.size(); ii++) {
1176 processes_[ii]->SetScheduler(sched_);
1178 for (
size_t ii = 0; ii < sinks_.size(); ii++) {
1179 sinks_[ii]->SetScheduler(sched_);
1184 while (clock >= 0) {
1185 const int idx = clock % slices_;
1186 while (sink_counter_[idx] > 0) {
1187 sched_->YieldToScheduler();
1189 if (!SpawnClock(clock))
1194 int ii = clock - slices_ + 1;
1196 for (; ii < clock; ii++) {
1197 const int idx = ii % slices_;
1198 while (sink_counter_[idx] > 0) {
1199 sched_->YieldToScheduler();
1210 virtual void OnClock(
int clock) {
1211 const int idx = clock % slices_;
1212 assert(sink_counter_[idx] > 0);
1213 --sink_counter_[idx];
1217 std::vector<internal::Node*> processes_;
1218 std::vector<internal::Node*> sources_;
1219 std::vector<internal::Node*> sinks_;
1223 internal::Scheduler * sched_;
1226 #if EMBB_DATAFLOW_TRACE_SIGNAL_HISTORY 1227 std::vector<int> spawn_history_[Slices];
1230 bool SpawnClock(
int clock) {
1231 const int idx = clock % slices_;
1232 #if EMBB_DATAFLOW_TRACE_SIGNAL_HISTORY 1233 spawn_history_[idx].push_back(clock);
1235 assert(sink_counter_[idx] == 0);
1236 sink_counter_[idx] = sink_count_;
1237 for (
size_t kk = 0; kk < sources_.size(); kk++) {
1238 sources_[kk]->Start(clock);
1241 for (
size_t kk = 0; kk < sources_.size(); kk++) {
1242 result &= sources_[kk]->Wait(clock);
1247 void PrepareSlices() {
1248 sched_ = embb::base::Allocation::New<internal::SchedulerMTAPI>(slices_);
1249 if (sched_->GetSlices() != slices_) {
1250 slices_ = sched_->GetSlices();
1255 for (
int ii = 0; ii < slices_; ii++) {
1266 #endif // EMBB_DATAFLOW_NETWORK_H_ Definition: lock_free_mpmc_queue.h:40
Switch process template.
Definition: network.h:446
unsigned int embb_core_count_available()
Returns the number of available processor cores.
void operator()()
Executes the network until one of the the sources returns false.
static void Free(void *ptr)
Frees memory that has been allocated by Allocation::Allocate() for some pointer ptr.
Select process template.
Definition: network.h:531
Outputs< OUTPUT_TYPE_LIST > OutputsType
Output port type list.
Definition: network.h:723
Inputs< INPUT_TYPE_LIST > InputsType
Input port type list.
Definition: network.h:344
Outputs< Type > OutputsType
Output port type list.
Definition: network.h:461
embb::base::Function< void, INPUT_TYPE_LIST, OUTPUT_TYPE_LIST > FunctionType
Function type to use when processing tokens.
Definition: network.h:226
Outputs< OUTPUT_TYPE_LIST > OutputsType
Output port type list.
Definition: network.h:236
In< Type > InType
Input port class that can be connected to this output port.
Definition: network.h:121
Generic serial process template.
Definition: network.h:220
Input port class.
Definition: network.h:109
Inputs< INPUT_TYPE_LIST > InputsType
Input port type list.
Definition: network.h:632
Inputs< bool, Type > InputsType
Input port type list.
Definition: network.h:456
Network()
Constructs an empty network.
Definition: network.h:68
Outputs< Type > OutputsType
Output port type list.
Definition: network.h:546
embb::base::Function< void, OUTPUT_TYPE_LIST > FunctionType
Function type to use when processing tokens.
Definition: network.h:718
Constant source process template.
Definition: network.h:802
Output port class.
Definition: network.h:116
Represents a collection of Actions.
Definition: job.h:41
Provides the output port types for a process.
Definition: network.h:185
Wraps function pointers, member function pointers, and functors with up to five arguments.
Definition: function.h:94
static void * Allocate(size_t size)
Allocates size bytes of memory (unaligned).
Outputs< OUTPUT_TYPE_LIST > OutputsType
Output port type list.
Definition: network.h:349
Outputs< OUTPUT_TYPE_LIST > OutputsType
Output port type list.
Definition: network.h:807
Source process template.
Definition: network.h:713
embb::base::Function< void, bool, Type, Type, Type & > FunctionType
Function type to use when processing tokens.
Definition: network.h:536
embb::base::Function< void, INPUT_TYPE_LIST, OUTPUT_TYPE_LIST > FunctionType
Function type to use when processing tokens.
Definition: network.h:339
embb::base::Function< void, bool, Type, Type & > FunctionType
Function type to use when processing tokens.
Definition: network.h:451
embb::base::Function< void, INPUT_TYPE_LIST > FunctionType
Function type to use when processing tokens.
Definition: network.h:627
Out< T_Index > Result
Result of an output port type query.
Definition: network.h:196
Sink process template.
Definition: network.h:622
Describes the execution policy of a parallel algorithm.
Definition: execution_policy.h:48
Inputs< INPUT_TYPE_LIST > InputsType
Input port type list.
Definition: network.h:231
Network(int slices, embb::mtapi::ExecutionPolicy const &policy)
Constructs an empty network.
Definition: network.h:103
Inputs< bool, Type, Type > InputsType
Input port type list.
Definition: network.h:541
bool IsValid()
Checks whether the network is completely connected and free of cycles.
Represents a set of processes that are connected by communication channels.
Definition: network.h:57
Network(int slices)
Constructs an empty network.
Definition: network.h:79
static void Delete(Type *to_delete)
Destructs an instance of type Type and frees the allocated memory.
Definition: memory_allocation.h:176
Network(embb::mtapi::ExecutionPolicy const &policy)
Constructs an empty network.
Definition: network.h:91
Generic parallel process template.
Definition: network.h:333
Type list used to derive output port types from Index.
Definition: network.h:191