如何批量检索实体

本文关键字:实体 检索 何批量 | 更新日期: 2023-09-27 18:28:38

在Azure表存储中,如何查询与分区中特定行键匹配的一组实体

我正在使用Azure表存储,需要检索一组与分区中的一组行键匹配的实体。

基本上,如果这是SQL,它可能看起来像这样:

SELECT TOP 1 SomeKey
FROM TableName WHERE SomeKey IN (1, 2, 3, 4, 5);

我想节省成本,减少做一堆表检索操作,而这些操作我可以使用表批处理操作来完成。出于某种原因,我得到了一个例外,上面写着:

"具有检索操作的批处理事务不能包含任何其他操作"

这是我的代码:

public async Task<IList<GalleryPhoto>> GetDomainEntitiesAsync(int someId, IList<Guid> entityIds)
{
    try
    {
        var client = _storageAccount.CreateCloudTableClient();
        var table = client.GetTableReference("SomeTable");
        var batchOperation = new TableBatchOperation();
        var counter = 0;
        var myDomainEntities = new List<MyDomainEntity>();
        foreach (var id in entityIds)
        {
            if (counter < 100)
            {
                batchOperation.Add(TableOperation.Retrieve<MyDomainEntityTableEntity>(someId.ToString(CultureInfo.InvariantCulture), id.ToString()));
                ++counter;
            }
            else
            {
                var batchResults = await table.ExecuteBatchAsync(batchOperation);
                var batchResultEntities = batchResults.Select(o => ((MyDomainEntityTableEntity)o.Result).ToMyDomainEntity()).ToList();
                myDomainEntities .AddRange(batchResultEntities );
                batchOperation.Clear();
                counter = 0;
            }
        }
        return myDomainEntities;
    }
    catch (Exception ex)
    {
        _logger.Error(ex);
        throw;
    }
}

如果不手动循环一组行键并对每个行键执行单独的检索表操作,我如何才能实现我想要的目标?我不想承担与此相关的成本,因为我可能有数百个行键需要过滤。

如何批量检索实体

我制作了一个助手方法,在每个分区的单个请求中完成。

这样使用:

var items = table.RetrieveMany<MyDomainEntity>(partitionKey, nameof(TableEntity.RowKey), 
     rowKeysList, columnsToSelect);

以下是辅助方法:

    public static List<T> RetrieveMany<T>(this CloudTable table, string partitionKey, 
        string propertyName, IEnumerable<string> valuesRange, 
        List<string> columnsToSelect = null)
        where T : TableEntity, new()
    {
        var enitites = table.ExecuteQuery(new TableQuery<T>()
            .Where(TableQuery.CombineFilters(
                TableQuery.GenerateFilterCondition(
                    nameof(TableEntity.PartitionKey),
                    QueryComparisons.Equal,
                    partitionKey),
                TableOperators.And,
                GenerateIsInRangeFilter(
                    propertyName,
                    valuesRange)
            ))
            .Select(columnsToSelect))
            .ToList();
        return enitites;
    }

    public static string GenerateIsInRangeFilter(string propertyName, 
         IEnumerable<string> valuesRange)
    {
        string finalFilter = valuesRange.NotNull(nameof(valuesRange))
            .Distinct()
            .Aggregate((string)null, (filterSeed, value) =>
            {
                string equalsFilter = TableQuery.GenerateFilterCondition(
                    propertyName,
                    QueryComparisons.Equal,
                    value);
                return filterSeed == null ?
                    equalsFilter :
                    TableQuery.CombineFilters(filterSeed,
                                              TableOperators.Or,
                                              equalsFilter);
            });
        return finalFilter ?? "";
    }

我已经在rowKeysList中测试了不到100个值,但是,如果有更多的值,它甚至会抛出异常,我们总是可以将请求拆分为多个部分。

有数百个行键,这就排除了使用带有行键列表的$filter(这无论如何都会导致部分分区扫描)。

由于您收到的错误,批处理似乎同时包含查询和其他类型的操作(这是不允许的)。我不明白你为什么会从代码片段中得到这个错误。

您唯一的其他选择是执行单独的查询。不过,您可以异步执行这些操作,因此不必等待每个操作返回。表存储在给定的分区上提供了超过2000个事务/秒的速度,因此这是一个可行的解决方案。

不确定我最初是怎么错过的,但以下是MSDN文档中TableBatchOperation类型的片段:

一个批处理操作最多可以包含100个单独的表操作,要求每个操作实体必须具有相同的分区键具有检索操作的批次不能包含任何其他操作请注意,批处理操作的总有效负载限制为4MB。

我最终按照David Makogon的建议异步执行了单独的检索操作。

我制作了自己的贫民区链接表。我知道它没有那么高效(也许没问题),但我只有在数据没有缓存在本地的情况下才会发出这个请求,这只意味着要切换设备。不管怎样,这似乎奏效了。通过检查这两个数组的长度,我可以推迟context.done();

var query = new azure.TableQuery()
          .top(1000)
          .where('PartitionKey eq ?', 'link-' + req.query.email.toLowerCase() );
           tableSvc.queryEntities('linkUserMarker',query, null, function(error, result, response) {
            if( !error && result ){
                var markers = [];
                result.entries.forEach(function(e){
                   tableSvc.retrieveEntity('markerTable', e.markerPartition._,  e.RowKey._.toString() , function(error, marker, response){
                       markers.push( marker );
                       if( markers.length == result.entries.length ){
                            context.res = {
                            status:200,
                                body:{
                                    status:'error',
                                    markers: markers
                                }
                            };
                            context.done();
                       }
                   });
                });
            }   else {
                notFound(error);
            }
        });

我在寻找解决方案时看到了你的帖子,在我的情况下,我需要同时查找多个id。

因为没有包含linq的支持(https://learn.microsoft.com/en-us/rest/api/storageservices/query-operators-supported-for-the-table-service)我刚刚做了一个巨大的或相等的链条。

到目前为止似乎在为我工作,希望它能帮助到任何人。

public async Task<ResponseModel<ICollection<TAppModel>>> ExecuteAsync(
    ICollection<Guid> ids,
    CancellationToken cancellationToken = default
)
{
    if (!ids.Any())
        throw new ArgumentOutOfRangeException();
    // https://learn.microsoft.com/en-us/rest/api/storageservices/query-operators-supported-for-the-table-service
    // Contains not support so make a massive or equals statement...lol
    var item = Expression.Parameter(typeof(TTableModel), typeof(TTableModel).FullName);
    var expressions = ids
        .Select(
            id => Expression.Equal(
                Expression.Constant(id.ToString()),
                Expression.MakeMemberAccess(
                    Expression.Parameter(typeof(TTableModel), nameof(ITableEntity.RowKey)),
                    typeof(TTableModel).GetProperty(nameof(ITableEntity.RowKey))
                )
            )
        )
        .ToList();
    var builderExpression = expressions.First();
    builderExpression = expressions
        .Skip(1)
        .Aggregate(
            builderExpression, 
            Expression.Or
        );
    var finalExpression = Expression.Lambda<Func<TTableModel, bool>>(builderExpression, item);
    var result = await _azureTableService.FindAsync(
        finalExpression,
        cancellationToken
    );
    return new(
        result.Data?.Select(_ => _mapper.Map<TAppModel>(_)).ToList(),
        result.Succeeded,
        result.User,
        result.Messages.ToArray()
    );
}

    public async Task<ResponseModel<ICollection<TTableEntity>>> FindAsync(
        Expression<Func<TTableEntity,bool>> filter,
        CancellationToken ct = default
    )
    {
        try
        {
            var queryResultsFilter = _tableClient.QueryAsync<TTableEntity>(
                FilterExpressionTree(filter),
                cancellationToken: ct
            );
            var items = new List<TTableEntity>();
            await foreach (TTableEntity qEntity in queryResultsFilter)
                items.Add(qEntity);
            return new ResponseModel<ICollection<TTableEntity>>(items);
        }
        catch (Exception exception)
        {
            _logger.Error(
                nameof(FindAsync),
                exception,
                exception.Message
            );
            // OBSFUCATE
            // TODO PASS ERROR ID
            throw new Exception();
        }
    }