Skip to content
Draft

WIP #14

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion common/thorhelper/thorcommon.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,6 @@ class SimpleActivityTimer
else
startCycles = 0;
}

inline ~SimpleActivityTimer()
{
if (likely(enabled))
Expand Down
13 changes: 10 additions & 3 deletions thorlcr/activities/funnel/thfunnelslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,18 @@ class CParallelFunnel : implements IRowStream, public CSimpleInterface
unsigned numRows = 0;
try
{
funnel.activity.startInput(inputIndex);
{
LookAheadTimer t(funnel.activity.slaveTimerStats, funnel.activity.queryTimeActivities());
funnel.activity.startInput(inputIndex);
}
started = true;
inputStream = funnel.activity.queryInputStream(inputIndex);
while (!stopping)
{
numRows = 0;
for (;numRows < chunkSize; numRows++)
{
LookAheadTimer t(funnel.activity.slaveTimerStats, funnel.activity.queryTimeActivities());
const void * row = inputStream->ungroupedNextRow();
if (!row)
break;
Expand Down Expand Up @@ -354,7 +358,6 @@ class FunnelSlaveActivity : public CSlaveActivity
}
virtual void start() override
{
ActivityTimer s(slaveTimerStats, timeActivities);
if (!grouped && parallel)
{
//NB starts inputs on each thread
Expand All @@ -376,7 +379,11 @@ class FunnelSlaveActivity : public CSlaveActivity

auto startInputNFunc = [&](unsigned i)
{
try { startInput(i); }
try
{
LookAheadTimer s(slaveTimerStats, timeActivities);
startInput(i);
}
catch (CATCHALL)
{
ActPrintLog("FUNNEL(%" ACTPF "d): Error staring input %d", container.queryId(), i);
Expand Down
3 changes: 3 additions & 0 deletions thorlcr/activities/join/thjoinslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,7 @@ class CMergeJoinSlaveBaseActivity : public CThorNarySlaveActivity, public CThorS
}
virtual void start() override
{
ActivityTimer s(slaveTimerStats, timeActivities);
CThorNarySlaveActivity::start();

ForEachItemIn(i1, expandedInputs)
Expand All @@ -682,6 +683,7 @@ class CMergeJoinSlaveBaseActivity : public CThorNarySlaveActivity, public CThorS
}
CATCH_NEXTROW()
{
ActivityTimer s(slaveTimerStats, timeActivities);
OwnedConstThorRow ret = processor.nextRow();
if (ret)
{
Expand All @@ -692,6 +694,7 @@ class CMergeJoinSlaveBaseActivity : public CThorNarySlaveActivity, public CThorS
}
virtual const void *nextRowGE(const void *seek, unsigned numFields, bool &wasCompleteMatch, const SmartStepExtra &stepExtra)
{
ActivityTimer s(slaveTimerStats, timeActivities);
try { return nextRowGENoCatch(seek, numFields, wasCompleteMatch, stepExtra); }
CATCH_NEXTROWX_CATCH;
}
Expand Down
1 change: 1 addition & 0 deletions thorlcr/activities/lookupjoin/thlookupjoinslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1530,6 +1530,7 @@ class CInMemJoinBase : public CSlaveActivity, public CAllOrLookupHelper<HELPER>,
}
virtual void start() override
{
ActivityTimer t(slaveTimerStats, queryTimeActivities());
joined = 0;
joinCounter = 0;
candidateCounter = 0;
Expand Down
1 change: 0 additions & 1 deletion thorlcr/activities/loop/thloopslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -866,7 +866,6 @@ class CConditionalActivity : public CSlaveActivity
}
virtual void start() override
{
ActivityTimer s(slaveTimerStats, timeActivities);
stopUnselectedInputs();
if (queryInput(branch))
{
Expand Down
4 changes: 4 additions & 0 deletions thorlcr/activities/nsplitter/thnsplitterslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ class NSplitterSlaveActivity : public CSlaveActivity, implements ISharedSmartBuf
}
void prepareInput()
{
ActivityTimer t(slaveTimerStats, queryTimeActivities());
// NB: called by 1st output to start()
CriticalBlock block(prepareInputLock);
if (!inputPrepared)
Expand Down Expand Up @@ -300,7 +301,10 @@ class NSplitterSlaveActivity : public CSlaveActivity, implements ISharedSmartBuf
inline const void *nextRow(unsigned outIdx, rowcount_t current)
{
if (1 == activeOutputCount) // will be true, if only 1 input connected, or only 1 input was active (others stopped) when it started reading
{
ActivityTimer t(slaveTimerStats, queryTimeActivities());
return inputStream->nextRow();
}
if (recsReady == current && writeAheadException.get())
throw LINK(writeAheadException);
return sharedRowStream->queryOutput(outIdx)->nextRow(); // will block until available
Expand Down
1 change: 1 addition & 0 deletions thorlcr/activities/project/thprojectslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ class CPrefetchProjectSlaveActivity : public CSlaveActivity
~CPrefetcher() { stop(); }
PrefetchInfo *pullRecord()
{
LookAheadTimer t(parent.slaveTimerStats, parent.timeActivities);
OwnedConstThorRow row = parent.inputStream->nextRow();
if (row)
{
Expand Down
6 changes: 6 additions & 0 deletions thorlcr/activities/when/thwhenslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,12 @@ class CWhenSlaveActivity : public CDependencyExecutorSlaveActivity
info.fastThrough = false;
calcMetaInfoSize(info, queryInput(0));
}
// IThorDataLink
virtual void start() override
{
ActivityTimer t(slaveTimerStats, queryTimeActivities());
PARENT::start();
}
};

////////////////////
Expand Down
11 changes: 10 additions & 1 deletion thorlcr/graph/thgraphslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,8 @@ void CSlaveActivity::startInput(unsigned index, const char *extra)

void CSlaveActivity::stop()
{
if (hasNegativeLocalExecute)
throw makeStringExceptionV(-1, "CSlaveActivity::stopAllInputs - localExecuteTime a%u: process < input", queryActivityId());
if (input)
stopInput(0);
dataLinkStop();
Expand Down Expand Up @@ -588,7 +590,13 @@ unsigned __int64 CSlaveActivity::queryLocalCycles() const
}
unsigned __int64 processCycles = queryTotalCycles() + queryLookAheadCycles();
if (processCycles < inputCycles) // not sure how/if possible, but guard against
{
//throw makeStringExceptionV(-1, "CSlaveActivity::queryLocalCycles a%u- process %" I64F "uns < input %" I64F "uns", queryActivityId(), cycle_to_nanosec(processCycles), cycle_to_nanosec(inputCycles));
hasNegativeLocalExecute = true;
//ActPrintLog("CSlaveActivity::queryLocalCycles - process %" I64F "uns < input %" I64F "uns", cycle_to_nanosec(processCycles), cycle_to_nanosec(inputCycles));
return 0;
}
hasNegativeLocalExecute = false;
processCycles -= inputCycles;
const unsigned __int64 blockedCycles = queryBlockedCycles();
if (processCycles < blockedCycles)
Expand Down Expand Up @@ -731,6 +739,7 @@ void CThorStrandedActivity::strandedStop()
//For some reason gcc doesn't let you specify a function as pure virtual and define it at the same time.
void CThorStrandedActivity::start()
{
SimpleActivityTimer s(startTime, timeActivities);
CSlaveActivity::start();
startJunction(splitter);
onStartStrands();
Expand Down Expand Up @@ -854,7 +863,7 @@ IStrandJunction *CThorStrandedActivity::getOutputStreams(CActivityBase &ctx, uns

unsigned __int64 CThorStrandedActivity::queryTotalCycles() const
{
unsigned __int64 total = 0;;
cycle_t total = startTime;
ForEachItemIn(i, strands)
{
CThorStrandProcessor &strand = strands.item(i);
Expand Down
4 changes: 2 additions & 2 deletions thorlcr/graph/thgraphslave.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,6 @@ class graphslave_decl CSlaveActivity : public CActivityBase, public CEdgeProgres
{
mutable MemoryBuffer *data;
mutable CriticalSection crit;

protected:
CThorInputArray inputs;
IPointerArrayOf<IThorDataLink> outputs;
Expand All @@ -216,7 +215,7 @@ class graphslave_decl CSlaveActivity : public CActivityBase, public CEdgeProgres
// fileStats is in this base class as it used by multiple derived classes (both slave and master) but not all.
// (Having it in the base class aids setup and resizing.)
mutable std::vector<OwnedPtr<CRuntimeStatisticCollection>> fileStats;

mutable bool hasNegativeLocalExecute = false;
protected:
unsigned __int64 queryLocalCycles() const;
bool ensureStartFTLookAhead(unsigned index);
Expand Down Expand Up @@ -475,6 +474,7 @@ class graphslave_decl CThorStrandedActivity : public CSlaveActivity
{
typedef CSlaveActivity PARENT;
protected:
cycle_t startTime = 0;
CThorStrandOptions strandOptions;
IArrayOf<CThorStrandProcessor> strands;
Owned<IStrandBranch> branch;
Expand Down