TPL Part 4 — Task的协同


简单的Continuation

Task.ContinueWith(Task): 当指定的Task履行终了时。

void Main()
{
Task rootTask = new Task(()=>{
Console.WriteLine("root task completed");
});
root Task.ContinueWith((Task previousTask)=>{
Console.WriteLine("continute task completed");
});

rootTask.Start();

// waitfor input before exiting
Console.WriteLine("Press enter to finish");
Console.ReadLine();

}

Task.ContinueWhenAll(Task[]):当指定的所有Task都履行终了时,示例代码:

 

Task continuation = Task.Factory.ContinueWhenAll<int>(tasks, antecedents =>{
foreach(Task<int> t in antecedents) {
// dosomething
}
});

 

 

TaskFactory.ContinueWhenAny(Task[]):当指定的所有Task的任意1个履行终了时,代码与ContinueWhenAll类似(以下代码中,打印出前1个Task的履行时间):

Task continuation = Task.Factory.ContinueWhenAny<int>(tasks,
(Task<int>antecedent) => {
//write out a message using the antecedent result
Console.WriteLine("The first task slept for {0} milliseconds",
antecedent.Result);
});

Continue 选项

OnlyOnRanToCompletion仅当履行完

NotOnRanToCompletion:没有履行完(被取消或出现异常)

OnlyOnFaulted:仅当出现异常

NotOnFaulted:没有出现异常

OnlyOnCancelled:仅当被取消

NotOnCancelled:没有被取消

处理异常

void Main()
{
Task rootTask = new Task(()=>{
Console.WriteLine("root task completed");
throw new Exception("root throwed exception");
});
rootTask.ContinueWith((Task previousTask)=>{
Console.WriteLine("even root throw exception , I still run");
});

rootTask.Start();

// waitfor input before exiting
Console.WriteLine("Press enter to finish");
Console.ReadLine();

}

以上代码中,第1个task中抛出了异常,Continue的Task依然会继续履行。可是Task被Finalized时异常就会抛出。

解决方案:

void Main()
{
Task rootTask = new Task(()=>{
Console.WriteLine("root task completed");
throw new Exception("root throwed exception");
});
var t2 = rootTask.ContinueWith((Task previousTask)=>{
//
if(previousTask.Status== TaskStatus.Faulted){
throw previousTask.Exception.InnerException;
}
Console.WriteLine("even root throw exception , I still run");
});

rootTask.Start();

try{
t2.Wait();
}
catch(AggregateException ex){
ex.Handle(inner=>{Console.WriteLine("exception handled in main thread"); return true;});
}

// waitfor input before exiting
Console.WriteLine("Press enter to finish");
Console.ReadLine();

}

在Task中冒泡抛出异常,在主线程中等待最后那个Task的履行并对AggregateException进行处理。

创建子Task

创建子Task并附加在父Task上:

void Main()
{

Task parentTask = new Task(() => {
Console.WriteLine("parent task started");
//create the first child task
Task childTask = new Task(() => {
// writeout a message and wait
Console.WriteLine("Child task running");
Thread.Sleep(1000);
Console.WriteLine("Child task throwed exception");
throw new Exception();
} ,TaskCreationOptions.AttachedToParent);
Console.WriteLine("start child task…");
childTask.Start();

Console.WriteLine("parent task ended");
});
// startthe parent task
parentTask.Start();

// waitfor input before exiting
Console.WriteLine("Press enter to finish");
Console.ReadLine();
}

1. 父Task会抛出子Task中的异常

2. 父Task的状态会遭到所附加的子Task状态的影响

Barrier的使用

 

class BankAccount {
public int Balance {
get;
set;
}
} ;

void Main()
{
//create the array of bank accounts
BankAccount[] accounts = new BankAccount[6];
for(int i = 0;i < accounts.Length; i++) {
accounts[i] = new BankAccount();
}
//create the total balance counter
int totalBalance = 0;
//create the barrier
Barrier barrier = new Barrier(3, (myBarrier) => {

// zerothe balance
totalBalance= 0;
// sumthe account totals
foreach(BankAccount account in accounts) {
totalBalance+= account.Balance;
}
// writeout the balance
Console.WriteLine("[From barrier :] Total balance: {0}",totalBalance);
});
//define the tasks array
Task[] tasks = new Task[3];
// loopto create the tasks
for(int i = 0;i < tasks.Length; i++) {
tasks[i]= new Task((stateObj) => {
//create a typed reference to the account
BankAccount account = (BankAccount)stateObj;
// startof phase
Random rnd = new Random();
for(int j = 0;j < 1000; j++) {
account.Balance+= 2;
}

Thread.Sleep(new Random().Next(3000));

Console.WriteLine("Task {0} waiting, phase {1} ",
Task.CurrentId,barrier.CurrentPhaseNumber);
//signal the barrier

barrier.SignalAndWait();

account.Balance-= 1000;
Console.WriteLine("barrier finished .");
// endof phase
Console.WriteLine("Task {0}, phase {1} ended",
Task.CurrentId,barrier.CurrentPhaseNumber);
//signal the barrier
barrier.SignalAndWait();
},
accounts[i]);
}

// startthe task
foreach(Task t in tasks) {
t.Start();
}
// waitfor all of the tasks to complete
Task.WaitAll(tasks);
// waitfor input before exiting
Console.WriteLine("Press enter to finish");
Console.ReadLine();

}

在以上代码中,打开了3个barrier和3个Task,在Task中为每一个账户添加2000,然后给barrier发出同步信号,当barrier收到3个信号时,对账号进行求和并保存;当barrier完成逻辑后,控制权交给了每一个Task,此时每一个Task对account减1000,再次求和,最后结果为3000。

如果希望通过Cancel来控制barrier的行动,还可以在barrier中传入tokenSource.Token:barrier.SignalAndWait(tokenSource.Token);并在Task中履行Cancel:tokenSource.Cancel()。

可以通过调用barrier.RemoveParticipant();来减少barrier的count。

CountEventDown

作用和Barrier类似,累计信号数量,当信号量到达指定数量,set event。

void Main()
{

CountdownEvent cdevent = new CountdownEvent(5);
//create a Random that we will use to generate
// sleepintervals
Random rnd = new Random();
//create 5 tasks, each of which will wait for
// arandom period and then signal the event
Task[] tasks = new Task[6];
for(int i = 0;i < tasks.Length; i++) {
//create the new task
tasks[i]= new Task(() => {
// putthe task to sleep for a random period
// up toone second
Thread.Sleep(rnd.Next(500, 1000));
//signal the event
Console.WriteLine("Task {0} signalling event",Task.CurrentId);
cdevent.Signal();
});
};
//create the final task, which will rendezous with the other 5
// usingthe count down event
tasks[5] = new Task(()=> {
// waiton the event
Console.WriteLine("Rendezvous task waiting");
cdevent.Wait();
Console.WriteLine("CountDownEvent has been set");
});

// startthe tasks
foreach(Task t in tasks) {
t.Start();
}
Task.WaitAll(tasks);

// waitfor input before exiting
Console.WriteLine("Press enter to finish");
Console.ReadLine();

}

在以上代码中,开启了5个Task和1个count为5的CountDownEvent对象,每一个Task中完成任务后分别对CountDownEvent发信号,当凑齐5个信号后,会打印出CountDownEvent has been set。

ManualResetEvent 和 AutoResetEvent

熟习.net之前版本的应当都对它们很熟习,用于在多线程环境中完成线程同步。区分在于,前者必须调用reset才能恢覆信号;而AutoResetEvent则会自动reset。在此不再赘述。

SemaphoreSlim

void Main()
{
SemaphoreSlim semaphore = new SemaphoreSlim(3);
//create the cancellation token source
CancellationTokenSource tokenSource
= new CancellationTokenSource();

//create and start the task that will wait on the event
for(int i = 0;i < 10; i++) {
Task.Factory.StartNew((obj)=> {

semaphore.Wait(tokenSource.Token);
// printout a message when we are released
Console.WriteLine("Task {0} released", obj);

},i,tokenSource.Token);
}

//create and start the signalling task
Task signallingTask = Task.Factory.StartNew(() => {
// loopwhile the task has not been cancelled
while(!tokenSource.Token.IsCancellationRequested) {
// go tosleep for a random period
tokenSource.Token.WaitHandle.WaitOne(500);
//signal the semaphore
semaphore.Release(3);
Console.WriteLine("Semaphore released");
}
// if wereach this point, we know the task has been cancelled
tokenSource.Token.ThrowIfCancellationRequested();
},tokenSource.Token);
// askthe user to press return before we cancel
// thetoken and bring the tasks to an end
Console.WriteLine("Press enter to cancel tasks");
Console.ReadLine();
//cancel the token source and wait for the tasks
tokenSource.Cancel();
// waitfor input before exiting
Console.WriteLine("Press enter to finish");
Console.ReadLine();

}

在以上代码中,new了1个SemaphoreSlim对象并传入3,开了10个Task线程,每当有信号从Semaphore传来时,打印Task[i]被release。同时开1个信号线程,每500毫秒release3个Task。

可见,Semaphore的作用主要是可以选择1次release多少个Task。

 

Producer / Consumer(生产者/消费者模式)

以下代码中,new了1个BlockingCollection,类型为Deposit。开了3个生产者Task,每一个生产者中创建20个Deposit对象并给Amount赋值为100。在主线程中等待生产者Task履行终了,调用blockingCollection.CompleteAdding()方法。以后开1个消费者Task用于操作账户对象,循环判断blockingCollection.IsCompleted属性(生产者是不是完成工作),从集合拿出存款对象,增加账户余额。

示例代码:

class BankAccount {
public int Balance {
get;
set;
}
}
class Deposit {
public int Amount {
get;
set;
}
}

void Main()
{
BlockingCollection<Deposit> blockingCollection
= new BlockingCollection<Deposit>();

var producers = new List<Task>();
for(int i = 0;i < 3; i++) {
var producer = Task.Factory.StartNew((obj) => {
//create a series of deposits
for(int j = 0;j < 20; j++) {
//create the transfer
var randAmount = new Random().Next(100);
Deposit deposit = new Deposit { Amount = randAmount};
Thread.Sleep(newRandom().Next(200));
// placethe transfer in the collection
blockingCollection.Add(deposit);
Console.WriteLine(string.Format("Amount: {0} deposit Processed, index: {1}",randAmount, int.Parse(obj.ToString()) +j));

}
}, i*20);
producers.Add(producer);
};
//create a many to one continuation that will signal
// theend of production to the consumer
Task.Factory.ContinueWhenAll(producers.ToArray(),antecedents => {
//signal that production has ended
Console.WriteLine("Signalling production end");
blockingCollection.CompleteAdding();
});
//create a bank account
BankAccount account = new BankAccount();
//create the consumer, which will update
// thebalance based on the deposits
Task consumer = Task.Factory.StartNew(() => {
while(!blockingCollection.IsCompleted) {
Deposit deposit;
// tryto take the next item
if(blockingCollection.TryTake(outdeposit)) {
//update the balance with the transfer amount
account.Balance+= deposit.Amount;
}
}
// printout the final balance
Console.WriteLine("Final Balance: {0}", account.Balance);
});
// waitfor the consumer to finish
consumer.Wait();
// waitfor input before exiting
Console.WriteLine("Press enter to finish");
Console.ReadLine();

}

波比源码 – 精品源码模版分享 | www.bobi11.com
1. 本站所有资源来源于用户上传和网络,如有侵权请邮件联系站长!
2. 分享目的仅供大家学习和交流,您必须在下载后24小时内删除!
3. 不得使用于非法商业用途,不得违反国家法律。否则后果自负!
4. 本站提供的源码、模板、插件等等其他资源,都不包含技术服务请大家谅解!
5. 如有链接无法下载、失效或广告,请联系管理员处理!
6. 本站资源售价只是赞助,收取费用仅维持本站的日常运营所需!
7. 如遇到加密压缩包,请使用WINRAR解压,如遇到无法解压的请联系管理员!

波比源码 » TPL Part 4 — Task的协同

发表评论

Hi, 如果你对这款模板有疑问,可以跟我联系哦!

联系站长
赞助VIP 享更多特权,建议使用 QQ 登录
喜欢我嘛?喜欢就按“ctrl+D”收藏我吧!♡