Wenn ich beispielsweise zwei Transformationsblöcke von Stream zu Stapelblock habe und jetzt einen zusätzlichen Transformationsblock habe und die Stapelgröße auf 3 erhöhen möchte, wie stelle ich dann sicher, dass alle vorherigen Stapel vor der Erhöhung verarbeitet wurden, um ein synchronisiertes Verhalten sicherzustellen? Der Punkt ist, dass alle Transformationsblöcke das exakt identische Element erhalten und die Ausgaben dieser Transformationsblöcke so gestapelt werden sollten, dass nur die Ausgaben gestapelt werden, die mit identischen Eingaben übereinstimmen.
Hier ein Beispiel, wie ich es haben möchte:
Konstanter Strom von Ints, um Blöcke zu transformieren:
1,2,3, [Punkt, an dem die Batch-Größe erhöht wird],4,5,...
Lassen Sie Transformationsblöcke das ausgeben, was sie erhalten haben, wie 1 => 1
Batchblock sollte also so ausgeben :
[1,1], [2,2], [3,3], [Änderung der Stapelgröße], [4,4,4], [5,5,5],...
Hier mein aktueller Code:
Code: Select all
public class Test
{
private Stopwatch watch;
private BroadcastBlock tempBCB;
private BatchBlock batchBlock;
private TransformBlock transformBlock;
private ActionBlock justToFlushTransformBlock;
private CoreLogic core1;
private CoreLogic core2;
public Test()
{
tempBCB = new BroadcastBlock(input => input);
//here batch size = 2
batchBlock = new BatchBlock(2, new GroupingDataflowBlockOptions { Greedy = false });
transformBlock = new TransformBlock(array =>
{
List inputObjects = array[0].Item1;
List ret = inputObjects.ConvertAll(x => new FinalObject(x));
foreach (var tuple in array)
{
//iterate over each individual object
foreach (var dictionary in tuple.Item2)
{
ret[dictionary.Key].outputList.Add(dictionary.Value);
}
}
return ret;
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });
justToFlushTransformBlock = new ActionBlock(list =>
{
//just in order to accept items from the transformBlock output queue
});
//Generate 2 CoreLogic objects
core1 = new CoreLogic();
core2 = new CoreLogic();
//linking
tempBCB.LinkTo(core1.transformBlock, new DataflowLinkOptions { PropagateCompletion = true });
tempBCB.LinkTo(core2.transformBlock, new DataflowLinkOptions { PropagateCompletion = true });
core1.transformBlock.LinkTo(batchBlock);
core2.transformBlock.LinkTo(batchBlock);
batchBlock.LinkTo(transformBlock, new DataflowLinkOptions { PropagateCompletion = true });
transformBlock.LinkTo(justToFlushTransformBlock, new DataflowLinkOptions { PropagateCompletion = true });
}
public void Start()
{
const int numberChunks = 30;
watch = new Stopwatch();
watch.Start();
for (int j = 1; j
{
batchBlock.Complete();
});
transformBlock.Completion.Wait();
watch.Stop();
Console.WriteLine("Elapsed time (in milliseconds): " + watch.ElapsedMilliseconds);
Console.ReadLine();
}
}
public class CoreLogic
{
private Random rand;
public TransformBlock transformBlock;
public CoreLogic()
{
const int numberIntermediateObjects = 10000;
transformBlock = new TransformBlock(input =>
{
//please ignore the fact that `input` is not utilized here, the point is to generate a collection of IntermediateObject and return
Dictionary ret = new Dictionary();
for (int i = 0; i < numberIntermediateObjects; i++)
{
IntermediateObject value = new IntermediateObject(i);
ret.Add(i, value);
}
var tuple = new Tuple(input, ret);
return tuple;
});
}
}
public class InputObject : ICloneable
{
public int value1 { get; private set; }
public InputObject(int value)
{
this.value1 = value;
}
object ICloneable.Clone()
{
return Clone();
}
public InputObject Clone()
{
return (InputObject)this.MemberwiseClone();
}
}
public class IntermediateObject
{
public int value1 { get; private set; }
public IntermediateObject(int value)
{
this.value1 = value;
}
}
public class FinalObject
{
public InputObject input { get; private set; }
public List outputList;
public FinalObject(InputObject input)
{
this.input = input;
this.outputList = new List();
}
}
public static class Cloning
{
public static List CloneListCloneValues(List original) where TValue : ICloneable
{
List ret = new List(original.Count);
foreach (TValue entry in original)
{
ret.Add((TValue)entry.Clone());
}
return ret;
}
}
Mobile version