一、简要说明
统一工作单元是一个比较重要的基础设施组件,它负责管理整个业务流程当中涉及到的数据库事务,一旦某个环节出现异常自动进行回滚处理。
在 ABP vNext 框架当中,工作单元被独立出来作为一个单独的模块(Volo.Abp.Uow
)。你可以根据自己的需要,来决定是否使用统一工作单元。
二、源码分析
整个 Volo.Abp.Uow
项目的结构如下,从下图还是可以看到我们的老朋友 IUnitOfWorkManager
和 IUnitOfWork
,不过也多了一些新东西。看一个模块的功能,首先从它的 Module 入手,我们先看一下 AbpUnitofWorkModule
里面的实现。
2.1 工作单元的初始模块
打开 AbpUnitOfWorkModule
里面的代码,发现还是有点失望,里面就一个服务注册完成事件。
public override void PreConfigureServices(ServiceConfigurationContext context)
{
context.Services.OnRegistred(UnitOfWorkInterceptorRegistrar.RegisterIfNeeded);
}
- 1
- 2
- 3
- 4
这里的结构和之前看的 审计日志 模块类似,就是注册拦截器的作用,没有其他特别的操作。
2.1.1 拦截器注册
继续跟进代码,其实现是通过 UnitOfWorkHelper
来确定哪些类型应该集成 UnitOfWork
组件。
public static void RegisterIfNeeded(IOnServiceRegistredContext context)
{
// 根据回调传入的 context 绑定的实现类型,判断是否应该为该类型注册 UnitOfWorkInterceptor 拦截器。
if (UnitOfWorkHelper.IsUnitOfWorkType(context.ImplementationType.GetTypeInfo()))
{
context.Interceptors.TryAdd<UnitOfWorkInterceptor>();
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
继续分析 UnitOfWorkHelper
内部的代码,第一种情况则是实现类型 (implementationType
) 或类型的任一方法标注了 UnitOfWork
特性的话,都会为其注册工作单元拦截器。
第二种情况则是 ABP vNext 为我们提供了一个新的 IUnitOfWorkEnabled
标识接口。只要继承了该接口的实现,都会被视为需要工作单元组件,会在系统启动的时候,自动为它绑定拦截器。
public static bool IsUnitOfWorkType(TypeInfo implementationType)
{
// 第一种方式,即判断具体类型与其方法是否标注了 UnitOfWork 特性。
if (HasUnitOfWorkAttribute(implementationType) || AnyMethodHasUnitOfWorkAttribute(implementationType))
{
return true;
}
// 第二种方式,即判断具体类型是否继承自 IUnitOfWorkEnabled 接口。
if (typeof(IUnitOfWorkEnabled).GetTypeInfo().IsAssignableFrom(implementationType))
{
return true;
}
return false;
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
2.2 新的接口与抽象
在 ABP vNext 当中,将一些 职责 从原有的工作单元进行了 分离。抽象出了 IDatabaseApi
、ISupportsRollback
、ITransactionApi
这三个接口,这三个接口分别提供了不同的功能和职责。
2.2.1 数据库统一访问接口
这里以 IDatabaseApi
为例,它是提供了一个 数据库提供者(Database Provider
) 的抽象概念,在 ABP vNext 里面,是将 EFCore 作为数据库概念来进行抽象的。(因为后续 MongoDb 与 MemoryDb
与其同级)
你可以看作是 EF Core 的 Provider
,在 EF Core 里面我们可以实现不同的 Provider
,来让 EF Core 支持访问不同的数据库。
而 ABP vNext 这么做的意图就是提供一个统一的数据库访问 API,如何理解呢?这里以 EFCoreDatabaseApi
为例,你查看它的实现会发现它继承并实现了 ISupportsSavingChanges
,也就是说 EFCoreDatabaseApi
支持 SaveChanges
操作来持久化数据更新与修改。
public class EfCoreDatabaseApi<TDbContext> : IDatabaseApi, ISupportsSavingChanges
where TDbContext : IEfCoreDbContext
{
public TDbContext DbContext { get; }
public EfCoreDatabaseApi(TDbContext dbContext)
{
DbContext = dbContext;
}
public Task SaveChangesAsync(CancellationToken cancellationToken = default)
{
return DbContext.SaveChangesAsync(cancellationToken);
}
public void SaveChanges()
{
DbContext.SaveChanges();
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
也就是说 SaveChanges
这个操作,是 EFCore 这个 DatabaseApi
提供了一种特殊操作,是该类型数据库的一种特殊接口。
如果针对于某些特殊的数据库,例如 InfluxDb
等有一些特殊的 Api 操作时,就可以通过一个 DatabaseApi
类型进行处理。
2.2.2 数据库事务接口
通过最开始的项目结构会发现一个 ITransactionApi
接口,这个接口只定义了一个 事务提交操作(Commit
),并提供了异步方法的定义。
public interface ITransactionApi : IDisposable
{
void Commit();
Task CommitAsync();
}
- 1
- 2
- 3
- 4
- 5
- 6
跳转到其典型实现 EfCoreTransactionApi
当中,可以看到该类型还实现了 ISupportsRollback
接口。通过这个接口的名字,我们大概就知道它的作用,就是提供了回滚方法的定义。如果某个数据库支持回滚操作,那么就可以为其实现该接口。
其实这里按照语义,你也可以将它放在 EfCoreDatabaseApi
进行实现,因为回滚也是数据库提供的 API 之一,只是在 ABP vNext 里面又将其归为事务接口进行处理了。
这里就不再详细赘述该类型的具体实现,后续会在单独的 EF Core 章节进行说明。
2.3 工作单元的原理与实现
在 ABP vNext 框架当中的工作单元实现,与原来 ABP 框架有一些不一样。
2.3.1 内部工作单元 (子工作单元)
首先说内部工作单元的定义,现在是有一个新的 ChildUnitOfWork
类型作为 子工作单元。子工作单元本身并不会产生实际的业务逻辑操作,基本所有逻辑都是调用 UnitOfWork
的方法。
internal class ChildUnitOfWork : IUnitOfWork
{
public Guid Id => _parent.Id;
public IUnitOfWorkOptions Options => _parent.Options;
public IUnitOfWork Outer => _parent.Outer;
public bool IsReserved => _parent.IsReserved;
public bool IsDisposed => _parent.IsDisposed;
public bool IsCompleted => _parent.IsCompleted;
public string ReservationName => _parent.ReservationName;
public event EventHandler<UnitOfWorkFailedEventArgs> Failed;
public event EventHandler<UnitOfWorkEventArgs> Disposed;
public IServiceProvider ServiceProvider => _parent.ServiceProvider;
private readonly IUnitOfWork _parent;
// 只有一个带参数的构造函数,传入的就是外部的工作单元(带事务)。
public ChildUnitOfWork([NotNull] IUnitOfWork parent)
{
Check.NotNull(parent, nameof(parent));
_parent = parent;
_parent.Failed += (sender, args) => { Failed.InvokeSafely(sender, args); };
_parent.Disposed += (sender, args) => { Disposed.InvokeSafely(sender, args); };
}
// 下面所有 IUnitOfWork 的接口方法,都是调用传入的 UnitOfWork 实例。
public void SetOuter(IUnitOfWork outer)
{
_parent.SetOuter(outer);
}
public void Initialize(UnitOfWorkOptions options)
{
_parent.Initialize(options);
}
public void Reserve(string reservationName)
{
_parent.Reserve(reservationName);
}
public void SaveChanges()
{
_parent.SaveChanges();
}
public Task SaveChangesAsync(CancellationToken cancellationToken = default)
{
return _parent.SaveChangesAsync(cancellationToken);
}
public void Complete()
{
}
public Task CompleteAsync(CancellationToken cancellationToken = default)
{
return Task.CompletedTask;
}
public void Rollback()
{
_parent.Rollback();
}
public Task RollbackAsync(CancellationToken cancellationToken = default)
{
return _parent.RollbackAsync(cancellationToken);
}
public void OnCompleted(Func<Task> handler)
{
_parent.OnCompleted(handler);
}
public IDatabaseApi FindDatabaseApi(string key)
{
return _parent.FindDatabaseApi(key);
}
public void AddDatabaseApi(string key, IDatabaseApi api)
{
_parent.AddDatabaseApi(key, api);
}
public IDatabaseApi GetOrAddDatabaseApi(string key, Func<IDatabaseApi> factory)
{
return _parent.GetOrAddDatabaseApi(key, factory);
}
public ITransactionApi FindTransactionApi(string key)
{
return _parent.FindTransactionApi(key);
}
public void AddTransactionApi(string key, ITransactionApi api)
{
_parent.AddTransactionApi(key, api);
}
public ITransactionApi GetOrAddTransactionApi(string key, Func<ITransactionApi> factory)
{
return _parent.GetOrAddTransactionApi(key, factory);
}
public void Dispose()
{
}
public override string ToString()
{
return $"[UnitOfWork {Id}]";
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
- 93
- 94
- 95
- 96
- 97
- 98
- 99
- 100
- 101
- 102
- 103
- 104
- 105
- 106
- 107
- 108
- 109
- 110
- 111
- 112
- 113
- 114
- 115
- 116
- 117
- 118
- 119
- 120
- 121
- 122
- 123
- 124
- 125
虽然基本上所有方法的实现,都是调用的实际工作单元实例。但是有两个方法 ChildUnitOfWork
是空实现的,那就是 Complete()
和 Dispose()
方法。
这两个方法一旦在内部工作单元调用了,就会导致 事务被提前提交,所以这里是两个空实现。
下面就是上述逻辑的伪代码:
using(var transactioinUow = uowMgr.Begin())
{
// 业务逻辑 1 。
using(var childUow1 = uowMgr.Begin())
{
// 业务逻辑 2。
using(var childUow2 = uowMgr.Begin())
{
// 业务逻辑 3。
childUow2.Complete();
}
childUow1.Complete();
}
transactioinUow.Complete();
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
以上结构一旦某个内部工作单元抛出了异常,到会导致最外层带事务的工作单元无法调用 Complete()
方法,也就能够保证我们的 数据一致性。
2.3.2 外部工作单元
首先我们查看 UnitOfWork
类型和 IUnitOfWork
的定义和属性,可以获得以下信息。
每个工作单元是瞬时对象,因为它继承了 ITransientDependency
接口。
每个工作单元都会有一个 Guid
作为其唯一标识信息。
每个工作单元拥有一个 IUnitOfWorkOptions
来说明它的配置信息。
这里的配置信息主要指一个工作单元在执行时的 超时时间,是否包含一个事务,以及它的 事务隔离级别(如果是事务性的工作单元的话)。
每个工作单元存储了 IDatabaseApi
与 ITransactionApi
的集合,并提供了访问/存储接口。
提供了两个操作事件 Failed
与 Disposed
。
这两个事件分别在工作单元执行失败以及被释放时(调用 Dispose()
方法)触发,开发人员可以挂载这两个事件提供自己的处理逻辑。
工作单元还提供了一个工作单元完成事件组。
用于开发人员在工作单元完成时(调用Complete()
方法)挂载自己的处理事件,因为是 List
所以你可以指定多个,它们都会在调用 Complete()
方法之后执行,例如如下代码:
using (var uow = _unitOfWorkManager.Begin())
{
uow.OnCompleted(async () => completed = true);
uow.OnCompleted(async()=>Console.WriteLine("Hello ABP vNext"));
uow.Complete();
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
以上信息是我们查看了 UnitOfWork
的属性与接口能够直接得出的结论,接下来我会根据一个工作单元的生命周期来说明一遍工作单元的实现。
一个工作单元的的构造是通过工作单元管理器实现的(IUnitOfWorkManager
),通过它的 Begin()
方法我们会获得一个工作单元,至于这个工作单元是外部工作单元还是内部工作单元,取决于开发人员传入的参数。
public IUnitOfWork Begin(UnitOfWorkOptions options, bool requiresNew = false)
{
Check.NotNull(options, nameof(options));
// 获得当前的工作单元。
var currentUow = Current;
// 如果当前工作单元不为空,并且开发人员明确说明不需要构建新的工作单元时,创建内部工作单元。
if (currentUow != null && !requiresNew)
{
return new ChildUnitOfWork(currentUow);
}
// 调用 CreateNewUnitOfWork() 方法创建新的外部工作单元。
var unitOfWork = CreateNewUnitOfWork();
// 使用工作单元配置初始化外部工作单元。
unitOfWork.Initialize(options);
return unitOfWork;
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
这里需要注意的就是创建新的外部工作单元方法,它这里就使用了 IoC
容器提供的 Scope
生命周期,并且在创建之后会将最外部的工作单元设置为最新创建的工作单元实例。
private IUnitOfWork CreateNewUnitOfWork()
{
var scope = _serviceProvider.CreateScope();
try
{
var outerUow = _ambientUnitOfWork.UnitOfWork;
var unitOfWork = scope.ServiceProvider.GetRequiredService<IUnitOfWork>();
// 设置当前工作单元的外部工作单元。
unitOfWork.SetOuter(outerUow);
// 设置最外层的工作单元。
_ambientUnitOfWork.SetUnitOfWork(unitOfWork);
unitOfWork.Disposed += (sender, args) =>
{
_ambientUnitOfWork.SetUnitOfWork(outerUow);
scope.Dispose();
};
return unitOfWork;
}
catch
{
scope.Dispose();
throw;
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
上述描述可能会有些抽象,结合下面这两幅图可能会帮助你的理解。
我们可以在任何地方注入 IAmbientUnitOfWork
来获取当前活动的工作单元,关于 IAmbientUnitOfWork
与 IUnitOfWorkAccessor
的默认实现,都是使用的 AmbientUnitOfWork
。
在该类型的内部,通过 AsyncLocal
来确保在不同的 异步上下文切换 过程中,其值是正确且统一的。
构造了一个外部工作单元之后,我们在仓储等地方进行数据库操作。操作完成之后,我们需要调用 Complete()
方法来说明我们的操作已经完成了。如果你没有调用 Complete()
方法,那么工作单元在被释放的时候,就会产生异常,并触发 Failed
事件。
public virtual void Dispose()
{
if (IsDisposed)
{
return;
}
IsDisposed = true;
DisposeTransactions();
// 只有调用了 Complete()/CompleteAsync() 方法之后,IsCompleted 的值才为 True。
if (!IsCompleted || _exception != null)
{
OnFailed();
}
OnDisposed();
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
所以,我们在手动使用工作单元管理器构造工作单元的时候,一定要注意调用 Complete()
方法。
既然 Complete()
方法这么重要,它内部究竟做了什么事情呢?下面我们就来看一下。