Wie kann ich die Batch-Größe eines Batchblocks während der Laufzeit dynamisch ändern?C#

Ein Treffpunkt für C#-Programmierer
Anonymous
 Wie kann ich die Batch-Größe eines Batchblocks während der Laufzeit dynamisch ändern?

Post by Anonymous »

Ich habe einen Batch-Block im TPL-Datenfluss und mehrere Zielblöcke, die mit dem Batch-Block verknüpft sind. Allerdings ändert sich die Anzahl der Zielblöcke dynamisch und damit auch die Größe der Batches. Das Problem besteht darin, dass die Batch-Größe bei der Initialisierung des Batchblocks angegeben werden muss und ich keine Möglichkeit sehe, sie später anzupassen. Irgendwelche Ideen, wie man das umgehen kann? Ist die einzige Möglichkeit, die Verknüpfung aufzuheben (alle Verknüpfungen zum Batchblock und vom Batchblock zu entfernen), den Batchblock mit einer neuen Batchgröße neu zu initialisieren und dann erneut zu verknüpfen? Ich könnte das tun, aber wie kann ich sicherstellen, dass alte und neue Stapel nicht alle verwechselt werden?

Wenn ich beispielsweise zwei Transformationsblöcke von Stream zu Stapelblock habe und jetzt einen zusätzlichen Transformationsblock habe und die Stapelgröße auf 3 erhöhen möchte, wie stelle ich dann sicher, dass alle vorherigen Stapel vor der Erhöhung verarbeitet wurden, um ein synchronisiertes Verhalten sicherzustellen? Der Punkt ist, dass alle Transformationsblöcke das exakt identische Element erhalten und die Ausgaben dieser Transformationsblöcke so gestapelt werden sollten, dass nur die Ausgaben gestapelt werden, die mit identischen Eingaben übereinstimmen.

Hier ein Beispiel, wie ich es haben möchte:

Konstanter Strom von Ints, um Blöcke zu transformieren:
1,2,3, [Punkt, an dem die Batch-Größe erhöht wird],4,5,...

Lassen Sie Transformationsblöcke das ausgeben, was sie erhalten haben, wie 1 => 1

Batchblock sollte also so ausgeben :
[1,1], [2,2], [3,3], [Änderung der Stapelgröße], [4,4,4], [5,5,5],...

Hier mein aktueller Code:

Code: Select all

public class Test
{
private Stopwatch watch;

private BroadcastBlock tempBCB;
private BatchBlock batchBlock;
private TransformBlock transformBlock;
private ActionBlock justToFlushTransformBlock;

private CoreLogic core1;
private CoreLogic core2;

public Test()
{
tempBCB = new BroadcastBlock(input => input);

//here batch size = 2
batchBlock = new BatchBlock(2, new GroupingDataflowBlockOptions { Greedy = false });

transformBlock = new TransformBlock(array =>
{
List inputObjects = array[0].Item1;
List ret = inputObjects.ConvertAll(x => new FinalObject(x));

foreach (var tuple in array)
{
//iterate over each individual object
foreach (var dictionary in tuple.Item2)
{
ret[dictionary.Key].outputList.Add(dictionary.Value);
}
}

return ret;
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });

justToFlushTransformBlock = new ActionBlock(list =>
{
//just in order to accept items from the transformBlock output queue
});

//Generate 2 CoreLogic objects
core1 = new CoreLogic();
core2 = new CoreLogic();

//linking
tempBCB.LinkTo(core1.transformBlock, new DataflowLinkOptions { PropagateCompletion = true });
tempBCB.LinkTo(core2.transformBlock, new DataflowLinkOptions { PropagateCompletion = true });

core1.transformBlock.LinkTo(batchBlock);
core2.transformBlock.LinkTo(batchBlock);

batchBlock.LinkTo(transformBlock, new DataflowLinkOptions { PropagateCompletion = true });

transformBlock.LinkTo(justToFlushTransformBlock, new DataflowLinkOptions { PropagateCompletion = true });
}

public void Start()
{
const int numberChunks = 30;

watch = new Stopwatch();
watch.Start();

for (int j = 1; j 
{
batchBlock.Complete();
});

transformBlock.Completion.Wait();

watch.Stop();

Console.WriteLine("Elapsed time (in milliseconds): " + watch.ElapsedMilliseconds);
Console.ReadLine();
}
}

public class CoreLogic
{
private Random rand;
public TransformBlock transformBlock;

public CoreLogic()
{
const int numberIntermediateObjects = 10000;

transformBlock = new TransformBlock(input =>
{
//please ignore the fact that `input` is not utilized here, the point is to generate a collection of IntermediateObject and return

Dictionary ret = new Dictionary();
for (int i = 0; i < numberIntermediateObjects;  i++)
{
IntermediateObject value = new IntermediateObject(i);

ret.Add(i, value);
}

var tuple = new Tuple(input, ret);

return tuple;
});
}
}

public class InputObject : ICloneable
{
public int value1 { get; private set; }

public InputObject(int value)
{
this.value1 = value;
}

object ICloneable.Clone()
{
return Clone();
}

public InputObject Clone()
{
return (InputObject)this.MemberwiseClone();
}
}

public class IntermediateObject
{
public int value1 { get; private set; }

public IntermediateObject(int value)
{
this.value1 = value;
}
}

public class FinalObject
{
public InputObject input { get; private set; }
public List outputList;

public FinalObject(InputObject input)
{
this.input = input;

this.outputList = new List();
}
}

public static class Cloning
{
public static List CloneListCloneValues(List original) where TValue : ICloneable
{
List ret = new List(original.Count);

foreach (TValue entry in original)
{
ret.Add((TValue)entry.Clone());
}

return ret;
}
}

Quick Reply

Change Text Case: 
   
  • Similar Topics
    Replies
    Views
    Last post