行列步队(Queue)代表了一个前辈先出的工具凑集。当您须要对各项进行前辈先出的访问时,则利用行列步队。当您在列表中添加一项,称为入队,当您从列表中移除一项时,称为出队。
比如平常我们在处理定时任务的时候,假设就一台机器,我们不可能单线程一条一条数据的去跑,这时候就须要提高机器资源的利用率。
下面我们来先容下,如何实现多线程+行列步队以提高并发处理能力。
代码实现1、定义线程数threadNum和行列步队queues
/// <summary>
/// 线程总数
/// </summary>
private int threadNum = 4;
/// <summary>
/// 总数
/// </summary>
private int totalCount = 0;
/// <summary>
/// 已处理
/// </summary>
private int index = 0;
/// <summary>
/// 行列步队
/// </summary>
private ConcurrentQueue<AssetRepayment> queues = new ConcurrentQueue<AssetRepayment>();
2、定义线程列表,往线程添加数据
public void SubDeTransaction()
{
var list = new List<AssetRepayment>();
for (int i = 0; i < 1000; i++)
{
list.Add(new AssetRepayment() { Title = i.ToString() + \"大众---\"大众 + Guid.NewGuid().ToString() });
}
if (list == || list.Count() == 0)
{
Console.WriteLine(\"大众没有可实行的数据\"大众);
return;
}
totalCount = list.Count;
Console.WriteLine(\公众可实行的数据:\"大众 + list.Count() + \"大众条\"大众);
foreach (var item in list)
{
queues.Enqueue(item);
}
List<Task> tasks = new List<Task>();
for (int i = 0; i < threadNum; i++)
{
var task = Task.Run(() =>
{
Process();
});
tasks.Add(task);
}
var taskList = Task.Factory.ContinueWhenAll(tasks.ToArray(), (ts) =>
{
});
taskList.Wait();
}
3、对线程数进行限定 for (int i = 0; i < threadNum; i++)
var taskList = Task.Factory.ContinueWhenAll(tasks.ToArray(), (ts) =>
{
});
taskList.Wait();
4、从行列步队取出数据进行业务处理
private void Process()
{
while (true)
{
var currentIndex = Interlocked.Increment(ref index);
AssetRepayment repayId = ;
var isExit = queues.TryDequeue(out repayId);
if (!isExit)
{
break;
}
try
{
Console.WriteLine(repayId.Title);
Console.WriteLine(string.Format(\"大众 共{0}条 当前第{1}条\公众, totalCount, currentIndex));
}
catch (Exception ex)
{
Console.WriteLine(ex);
}
}
}
运行测试
代码地址
https://gitee.com/conanOpenSource_admin/Example