Module riak_pipe_w_rec_countdown

Proof of concept for recursive input (fitting sending output to itself).

Behaviours: riak_pipe_vnode_worker.

Description

Proof of concept for recursive input (fitting sending output to itself). When this fitting receives an input, it passes that input to its output, and also passes Input-1 to itself as input until the input is 0. Thus, sending 3 as the input to this fitting, would result in the outputs 3, 2, 1, and 0. That is:

  Spec = [#fitting_spec{name=counter,
                        module=riak_pipe_w_rec_countdown}],
  {ok, Pipe} = riak_pipe:exec(Spec, []),
  riak_pipe:queue_work(Pipe, 3),
  riak_pipe:eoi(Pipe),
  {eoi, Results, []} = riak_pipe:collect_results(Pipe).
  [{counter,0},{counter,1},{counter,2},{counter,3}] = Results.

This fitting should work with any consistent-hash function. It requires no archiving for handoff.

If the argument is the atom testeoi, then the final recursive input (0) will be sent three times, with no delay before the second case and a 1-second delay before the third. These two sends should test the behavior of vnode enqueueing while attempting to force a worker to done. If all eoi handling is done properly, then 0 should appear three times in the result list. The testeoi case should go like this:

  Spec = [#fitting_spec{name=counter,
                        module=riak_pipe_w_rec_countdown,
                        arg=testeoi}],
  Options = [{trace,[restart]},{log,sink}],
  {ok, Pipe} = riak_pipe:exec(Spec, Options),
  riak_pipe:queue_work(Pipe, 3),
  riak_pipe:eoi(Pipe),
  {eoi, Results, Trace} = riak_pipe:collect_results(Pipe).
  [{counter,0},{counter,0},{counter,0},
   {counter,1},{counter,2},{counter,3}] = Results.
  [{counter,{trace,[restart],{vnode,{restart,_}}}}] = Trace.

If Results contains less than three instances of {counter,0}, then the test failed. If Trace is empty, the done/eoi race was not triggered, and the test should be re-run.

NOTE: This test code has been copied to the EUnit tests in riak_pipe.erl, into the basic_test_() collection.

Data Types

state()

abstract datatype: state()

Function Index

done/1Unused.
init/2Initialization just stows the partition and fitting details in the module's state, for sending outputs in process/3.
process/3Process sends Input directly to the next fitting, and also Input-1 back to this fitting as new input.

Function Details

done/1

done(State::state()) -> ok

Unused.

init/2

init(Partition::riak_pipe_vnode:partition(), FittingDetails::riak_pipe_fitting:details()) -> {ok, state()}

Initialization just stows the partition and fitting details in the module's state, for sending outputs in process/3.

process/3

process(Input::term(), Last::boolean(), State::state()) -> {ok, state()}

Process sends Input directly to the next fitting, and also Input-1 back to this fitting as new input.


Generated by EDoc, Aug 5 2012, 06:59:40.