从Rest Service并行加载页面数据
本文关键字:数据 加载 并行 Rest Service | 更新日期: 2023-09-27 18:15:33
我正在使用一个restful api,每次调用将返回最多50条记录,如果您需要更多,您必须创建多个调用并传递一个偏移量。
有时我们需要加载"所有"结果,我们使用类似于下面代码的东西-这是一个接一个的请求,并将结果添加到一个列表中,当达到最大值或在任何一个调用中返回的数量小于请求的数量时停止。
如何重构这个(使用任务/并行/线程)在任何时候用多个请求加载这个数据,并且仍然得到完全相同的结果,我已经看过创建多个Tasks
并等待它们,但问题是要加载的记录数量是未知的,直到"不再可用"或达到最大值。
public IEnumerable<T> GetItems(int maxAmount = -1)
{
var moreData = true;
var result = new List<T>();
var counter = 0;
var batchAmount = 50;
while(moreData)
{
var requestAmount = Math.Min(batchAmount,maxAmount-result.Count);
var items = GetItemsFromService<T>(requestAmount,counter);
counter += items.Count;
moreData = items.Count == requestAmount && (maxAmount == -1 || maxAmount> items.Count);
result.AddRange(items);
}
return result;
}
private IEnumerable<T> GetItemsFromService(int batchAmount,int offset)
{
//Lets assume that this gets data from a rest service that returns a maximum of batchAmount
//and offsets using the offset variable.
}
不幸的是,您不能在这里使用async
,因为您依赖于前一个请求的项目数量。这必须是同步的,除非您想对接收到的数据执行一些异步操作。
它必须是一个设计糟糕的API,返回的页面结果没有总页面或总条目数
我设法让这个工作,基本上我一直发送页面请求,直到其中一个请求返回一无所有-因为它们是按顺序启动的,一旦响应返回一无所有,我们不需要再发出请求,只允许现有的请求完成。
我的工作代码是这样的:
private IEnumerable<object> GetEntitiesInParallel(Type type, string apiPath, Dictionary<string, string> parameters, int startPosition, int maxAmount)
{
var context = new TaskThreadingContext(maxAmount, startPosition);
var threads = Enumerable.Range(0, NumberOfThreads).Select(i =>
{
var task = Task.Factory.StartNew(() =>
{
while (context.Continue)
{
var rawData = String.Empty;
var offset = context.NextAmount();
var result = GetEntitiesSingleRequest(type, parameters, offset, apiPath, out rawData);
if (result.Any())
{
context.AddResult(result.Cast<object>(), rawData);
}
else
{
context.NoResult();
}
}
});
return task;
}).ToArray();
Task.WaitAll(threads);
var results = context.GetResults<object>();
return results;
}
private IEnumerable<object> GetEntitiesSingleRequest(Type type,Dictionary<string,string> parameters,
int offset,string apiPath, out string rawData)
{
var request = Utility.CreateRestRequest(apiPath, Method.GET,ApiKey,50,offset,parameters);
type = typeof(List<>).MakeGenericType(type);
var method = Client.GetType().GetMethods().Single(m => m.IsGenericMethod && m.Name == "Execute").MakeGenericMethod(type);
try
{
dynamic response = (IRestResponse)method.Invoke(Client, new object[] { request });
var data = response.Data as IEnumerable;
var dataList = data.Cast<object>().ToList();
rawData = response.Content.Replace("'n", Environment.NewLine);
return dataList.OfType<object>().ToList();
}
catch (Exception ex)
{
if (ex.Message.IndexOf("404") != -1)
{
rawData = null;
return Enumerable.Empty<object>();
}
throw;
}
}
private class TaskThreadingContext
{
private int batchAmount = 50;
private object locker1 = new object();
private object locker2 = new object();
private CancellationTokenSource tokenSource;
private CancellationToken token;
private volatile bool cont = true;
private volatile int offset = 0;
private volatile int max = 0;
private volatile int start = 0;
private List<object> result = new List<object>();
private List<string> raw = new List<string>();
public bool Continue { get { return cont; } }
public TaskThreadingContext(int maxRows = 0,int startPosition = 0)
{
max = maxRows;
offset = start = startPosition;
}
public int NextAmount()
{
lock(locker1)
{
var ret = offset;
var temp = offset + batchAmount;
if (temp - start > max && max > 0)
{
temp = max - offset;
}
offset = temp;
if (offset - start >= max && max > 0)
{
cont = false;
}
return ret;
}
}
public TaskThreadingContext()
{
tokenSource = new CancellationTokenSource();
token = tokenSource.Token;
}
public void AddResult(IEnumerable<object> items,string rawData)
{
lock(locker2)
{
result.AddRange(items);
raw.Add(rawData);
}
}
public IEnumerable<T> GetResults<T>()
{
return this.result.Cast<T>().ToList();
}
public void NoResult()
{
cont = false;
}
}