作者 | 刘耀辉
审稿 | BBuf、许啸宇
1
背景
近年来,量化感知练习是一个较为热点的问题,能够大大优化量化后练习形成精度损失的问题,使得练习进程愈加高效。
Torch.fx在这一问题上走在了前列,运用纯Python语言完结了关于Torch.nn.Module的解析和向IR的转换,也能够供给改换后的IR对应的Python代码,在外部则是供给了简练易用的API,大大便利了量化感知练习进程的搭建。此外,Torch.fx也有助于消除动态图和静态图之间的Gap,能够比较便利地对图进行操作以及进行算子交融。
OneFlow紧随其后增加了针对OneFlow的fx,即One-fx,在装置One-fx之后,用户能够直接调用oneflow.fx,也能够直接经过import onefx as fx进行运用。
one-fx地址:
github.com/Oneflow-Inc…
One-fx完结代码中绝大部分是关于Torch.fx的fork,但根据OneFlow和PyTorch之间存在的差别进行了一些适配或优化。本文将围绕One-fx适配办法以及在OneFlow中的应用打开。
2
FX首要模块
- Symbolioc Trace
- Graph Module
- Interpreter
- Proxy
- Passes
其间,前4个模块共同完结了fx的基本功能,Graph Module和Proxy又是Symbolic Trace的基础,Passes则是在此基础上的扩大。
Symbolic Trace的基本概念如上图所示,最基本的模型运转进程便是从模型界说到模型履行这样一个流程。
fx则是进行了非侵入式的解析,将模型履行进程转成一张图,这张图中包含了很多个Node,每一个Node都包含了模型中的子模块或许函数调用信息,然后用户能够很便利地获取到一切的Node,并对其进行一些改换操作,最终经过GraphModule重新生成一个模型界说,并对其履行。
其间,在进行模型解析的时分,节点之间变量传递也均运用署理后的变量,如y = oneflow.relu(x),实际上x和y是Proxy(x)和Proxy(y)。
3
One-fx完结办法
这儿给出一个Fx最简略的用例,以便利后续关于完结办法的介绍。
import oneflow
class MyModule(oneflow.nn.Module):
def __init__(self):
super().__init__()
self.linear = oneflow.nn.Linear(512, 512)
def forward(self, x):
x = self.linear(x)
y = oneflow.ones([2, 3])
x = oneflow.relu(x)
return y
m = MyModule()
traced = oneflow.fx.symbolic_trace(m)
print(traced.code)
"""
def forward(self, x):
linear = self.linear(x); x = None
relu = oneflow.relu(linear); linear = None
_tensor_constant0 = self._tensor_constant0
return _tensor_constant0
"""
函数署理
署理,即fx中的Proxy模块,目的是在每次进行函数或模块调用的时分增加一些额定操作,使得对模型的解析和重建得以进行,而包装则是适配署理的一种办法。
torch.fx中,关于nn.Module的包装比较易于理解,每当待解析Module中出现了承继自nn.Module的目标,那么就将其__call__函数替换成包装过的函数。然而,关于pytorch的函数的署理的完结要更“绕”一些,是凭借了__torch_function__这一机制(github.com/pytorch/pyt… ),限于篇幅原因这儿不专门对其进行介绍。比较关键的点是,OneFlow中没有这一机制,假如需求增加,那么会是规划很大的、侵入性的,所以One-fx的完结就需求找其它途径。
咱们运用的处理办法是查找oneflow,oneflow.nn.functional,oneflow._C等模块中的Callable,并去除其间属于类的部分,然后对其他函数进行包装,在每次解析模型之前,会将这些模块的__dict__中对应项替换成包装后的函数,并且在解析模型之后重新将这些项进行还原。关于constructor类型的函数,如ones,randn等则不进行署理,直接运转,在最终构建图的时分作为constant来处理。
关于函数的包装部分源码完结如下,每次运转署理后的函数,会先判别该函数的入参中有没有Proxy变量,假如有,那么将会创立一个call_function类型的节点并回来Proxy包装后的节点,不然直接调用原函数并回来成果。
def _create_wrapped_func(orig_fn):
@functools.wraps(orig_fn)
def wrapped(*args, **kwargs):
# 判别参数中是否存在proxy变量
proxy = _find_proxy(args, kwargs)
if proxy is not None:
# 假如参数中有Proxy变量,创立节点并回来Proxy包装后的节点
return_proxy = proxy.tracer.create_proxy(
"call_function", orig_fn, args, kwargs
)
return_proxy.node.meta["is_wrapped"] = True
return return_proxy
# 假如没有Proxy变量,直接调用原函数
return orig_fn(*args, **kwargs)
return wrapped
其间,return_proxy = proxy.tracer.create_proxy(“call_function”, orig_fn, args, kwargs)这行代码指定了运用与入参相同的Tracer来创立节点并回来成果,create_proxy函数界说的首要部分如下,创立节点并在Proxy包装后回来。
def create_proxy(self, kind: str, target: Target, args: Tuple[Any, ...], kwargs: Dict[str, Any],
name: Optional[str] = None, type_expr : Optional[Any] = None,
proxy_factory_fn: Callable[[Node], 'Proxy'] = None):
args_ = self.create_arg(args)
kwargs_ = self.create_arg(kwargs)
assert isinstance(args_, tuple)
assert isinstance(kwargs_, dict)
# 创立节点
node = self.create_node(kind, target, args_, kwargs_, name, type_expr)
if not proxy_factory_fn:
proxy = self.proxy(node)
else:
proxy = proxy_factory_fn(node)
return proxy
而其间的create_node办法,实际上是调用了Tracer.graph.create_node,在图中创立节点,首要部分代码如下,其间op便是fx IR中的op,代表了节点类型,而target则是节点的操作主体,在上面的比方中便是orig_func。
因而,当咱们自界说的Module中的forward函数中的一切调用都被包装之后,实际上再运转forward的时分,就会依次在Tracer.graph中创立节点,这也正是symbolic_trace的基本思路。
def create_node(self, op: str, target: 'Target',
args: Optional[Tuple['Argument', ...]] = None,
kwargs: Optional[Dict[str, 'Argument']] = None,
name: Optional[str] = None,
type_expr: Optional[Any] = None) -> Node:
# 此处有一些assert
# 创立一个节点称号,防止重复
candidate = name if name is not None else self._target_to_str(target)
name = self._graph_namespace.create_name(candidate, None)
# 创立节点
n = Node(self, name, op, target, args, kwargs, type_expr)
# 树立称号与节点的映射联系
self._graph_namespace.associate_name_with_obj(name, n)
return n
而关于symbolic_trace进程,其间心便是Tracer.trace。这个办法能够分为两部分,一个是预处理部分,一个是骨干部分。其间预处理进程大致界说如下,首要任务是初始化Graph、确立模型以及forward函数和创立包装后的参数。
如前面所提及的,symbolic trace的基本思路是凭借Proxy变量以及包装后的函数,在每次调用的时分都创立一个节点,因而,forward函数的输入也需求用Proxy进行包装,这一步界说在
Tracer.create_args_for_root中。
def trace(
self,
root: Union[oneflow.nn.Module, Callable[..., Any]],
concrete_args: Optional[Dict[str, Any]] = None,
) -> Graph:
# 确认模块主体以及forward函数,其间fn即forward函数
if isinstance(root, oneflow.nn.Module):
self.root = root
assert hasattr(
type(root), self.traced_func_name
), f"traced_func_name={self.traced_func_name} doesn't exist in {type(root).__name__}"
fn = getattr(type(root), self.traced_func_name)
self.submodule_paths = {mod: name for name, mod in root.named_modules()}
else:
self.root = oneflow.nn.Module()
fn = root
tracer_cls: Optional[Type["Tracer"]] = getattr(self, "__class__", None)
# 在Tracer中初始化一张图
self.graph = Graph(tracer_cls=tracer_cls)
self.tensor_attrs: Dict[oneflow.Tensor, str] = {}
# 这个子函数用于搜集模型中一切Tensor类型的变量
def collect_tensor_attrs(m: oneflow.nn.Module, prefix_atoms: List[str]):
for k, v in m.__dict__.items():
if isinstance(v, oneflow.Tensor):
self.tensor_attrs[v] = ".".join(prefix_atoms + [k])
for k, v in m.named_children():
collect_tensor_attrs(v, prefix_atoms + [k])
collect_tensor_attrs(self.root, [])
assert isinstance(fn, FunctionType)
# 获取fn地点模块的一切可读变量
fn_globals = fn.__globals__
# 创立包装后的参数
fn, args = self.create_args_for_root(
fn, isinstance(root, oneflow.nn.Module), concrete_args
)
随后则是trace的骨干部分,这一部分大致代码如下,首要任务是对函数、办法、模块进行必要的包装,然后在Graph中创立节点,完结整个图的信息。
其间,咱们会创立一个Patcher环境并在其间进行这些进程,这是因为关于函数和办法的包装会直接改动掉某些包中对应函数或办法的行为,为了不让这种行为的改动溢出到trace的规模之外,在每次进行包装的时分会在Patcher中记载本次操作,然后在_Patcher.__exit__中根据记载的操作逐个还原现场。
# 下面代码仍然是`trace`函数的一部分
# 界说关于`nn.Module`的getattr办法的包装
@functools.wraps(_orig_module_getattr)
def module_getattr_wrapper(mod, attr):
attr_val = _orig_module_getattr(mod, attr)
return self.getattr(attr, attr_val, parameter_proxy_cache)
# 界说关于`nn.Module`的forward办法的包装
@functools.wraps(_orig_module_call)
def module_call_wrapper(mod, *args, **kwargs):
def forward(*args, **kwargs):
return _orig_module_call(mod, *args, **kwargs)
_autowrap_check(
patcher,
getattr(getattr(mod, "forward", mod), "__globals__", {}),
self._autowrap_function_ids,
)
return self.call_module(mod, forward, args, kwargs)
# 这儿Patcher的作用是在退出这一环境的时分恢复现场,防止包装函数、办法的影响溢出到`trace`之外。
with _Patcher() as patcher:
# 对`__getattr__`和`nn.Module.__call__`这两个办法默认进行包装
patcher.patch_method(
oneflow.nn.Module,
"__getattr__",
module_getattr_wrapper,
deduplicate=False,
)
patcher.patch_method(
oneflow.nn.Module, "__call__", module_call_wrapper, deduplicate=False
)
# 对预定好需求进行包装的函数进行包装
_patch_wrapped_functions(patcher)
_autowrap_check(patcher, fn_globals, self._autowrap_function_ids)
# 遍历一切需求对其间函数进行主动包装的package
for module in self._autowrap_search:
if module is oneflow:
dict = {}
# 当package为oneflow时,对此进行特殊处理,独自分出一个字典存放本来`oneflow.__dict__`中的内容
for name, value in module.__dict__.items():
if not isinstance(value, oneflow.nn.Module) and not value in _oneflow_no_wrapped_functions:
dict[name] = value
_autowrap_check_oneflow(
patcher, dict, module.__dict__, self._autowrap_function_ids
)
else:
_autowrap_check(
patcher, module.__dict__, self._autowrap_function_ids
)
# 创立节点,这儿的`create_node`调用实际上只是创立了最终一个节点,即输出节点。
# 可是这儿`fn`便是forward函数,在运转这一函数的时分,就会如前面所说依次创立节点。
self.create_node(
"output",
"output",
(self.create_arg(fn(*args)),),
{},
type_expr=fn.__annotations__.get("return", None),
)
其间,_patch_wrapped_functions的完结如下:
s(patcher: _Patcher):
# `_wrapped_fns_to_patch`中包含了一切需求主动包装的函数
for frame_dict, name in _wrapped_fns_to_patch:
if name not in frame_dict:
if hasattr(builtins, name):
# 关于built-in函数,不存在于frame_dict中,独自进行处理来根据称号获取函数自身
orig_fn = getattr(builtins, name)
else:
# 假如是oneflow中指定需求包装的函数,那么就进行获取,不然抛出称号无法识别的反常
is_oneflow_wrapped_function, func = is_oneflow_wrapped_function_and_try_get(name)
if is_oneflow_wrapped_function:
orig_fn = func
else:
raise NameError("Cannot deal with the function %s."%name)
else:
# 假如函数称号现已存在于frame_dict中,直接经过字典查询来取得函数
orig_fn = frame_dict[name]
# 创立包装后的函数并进行`patch`,即界说当trace进程结束的时分,怎么还原现场
patcher.patch(frame_dict, name, _create_wrapped_func(orig_fn))
# 关于类中的办法,直接包装并patch。
for cls, name in _wrapped_methods_to_patch:
patcher.patch_method(cls, name, _create_wrapped_method(cls, name))
大局包装
在模型的forward函数中,咱们有时不仅会用到框架自带的模块或许函数,有点时分还需求用到自界说的函数或许built-in函数,关于这种情况假如不进行处理,那么天然无法承受Proxy(x)的入参。fx中供给了fx.wrap这一API,当用户需求调用这部分函数的时分,能够完结运用fx.wrap(func)使其被包装。
例如:
import oneflow
oneflow.fx.wrap(len)
class MyModule(oneflow.nn.Module):
def __init__(self):
super().__init__()
self.linear = oneflow.nn.Linear(512, 512)
def forward(self, x):
x = self.linear(x) + len(x.shape)
return x
traced = oneflow.fx.symbolic_trace(MyModule())
print(traced.code)
"""
def forward(self, x):
linear = self.linear(x)
getattr_1 = x.shape; x = None
len_1 = len(getattr_1); getattr_1 = None
add = linear + len_1; linear = len_1 = None
return add
"""
可是其局限性在于,假如Module的源代码是来自其它库,那么在调用的当地运用fx.wrap是不起作用的,在oneflow和torch中都会有这一问题。然而flowvision中有多处运用了built-in function,因而咱们增加了一个API,即global_wrap,原理比较简略,便是直接对某个函数地点的包的__dict__进行修改,用法如下:
# MyModule来自其它包
with oneflow.fx.global_wrap(len):
m = MyModule()
traced = oneflow.fx.symbolic_trace(m)
print(traced.code)
"""
def forward(self, x):
linear = self.linear(x); x = None
getattr_1 = linear.shape
len_1 = len(getattr_1); getattr_1 = None
relu = oneflow.relu(linear); linear = None
add = relu + len_1; relu = len_1 = None
return add
"""
运用with关键字的原因是这种完结办法是直接修改了某个包的__dict__,关于其它当地的调用也会产生影响,因而需求将其限制在必定规模内。此外,包装后的函数包含了对类型的断定等一系列操作,也会极大影响built-in函数的功能。
其它适配
其它当地的处理都比较简略,不需求对完结办法做修改,只需求将细节部分对齐即可,这也体现出oneflow和pytorch在前端部分的高度兼容性。
4
IR规划
fx的IR规划遵从以下几个准则:
- 防止支持长尾散布,杂乱的样例。首要关注经典模型的程序捕获和改换。
- 运用机器学习从业者现已了解的工具和概念,例如Python的数据结构和 PyTorch 中揭露记载的算子 。
- 使程序捕获进程具有高度可装备性,以便用户能够为长尾需求完结自己的处理方案。
fx的IR首要由几个部分组成;
- opcode:即当时操作的类型,能够是placeholder, get_attr, call_function, call_method, call_module, output
- name:即给当时操作的命名。
- target:当时操作的实体,例如关于call_function类型的操作,可能这一属性会是。
- args和kwargs:指定当时操作的参数。
经过print_tabular这一API能够很便利漂亮地打印出fx中的IR,例如关于以下的MyModule模型,咱们能够打印出其IR:
import oneflow
class MyModule(oneflow.nn.Module):
def __init__(self, do_activation : bool = False):
super().__init__()
self.do_activation = do_activation
self.linear = oneflow.nn.Linear(512, 512)
def forward(self, x):
x = self.linear(x)
y = oneflow.ones([2, 3])
x = oneflow.topk(x, 10)
return x.relu() + y
traced = oneflow.fx.symbolic_trace(MyModule())
traced.graph.print_tabular()
"""
opcode name target args kwargs
------------- ----------------- ------------------------ ------------------------- --------
placeholder x x () {}
call_module linear linear (x,) {}
call_function topk <built-in function topk> (linear, 10) {}
call_method relu relu (topk,) {}
get_attr _tensor_constant0 _tensor_constant0 () {}
call_function add <built-in function add> (relu, _tensor_constant0) {}
output output output (add,) {}
"""
尽管fx的IR不算强大(例如不能处理动态控制流),可是界说十分简练,完结简略,关于用户来讲上手门槛相对低很多。
5
One-fx应用举例
OP替换
下面的比方展现了怎么将add操作全部替换成mul操作。
import oneflow
from oneflow.fx import symbolic_trace
import operator
class M(oneflow.nn.Module):
def forward(self, x, y):
return x + y, oneflow.add(x, y), x.add(y)
if __name__ == '__main__':
traced = symbolic_trace(M())
patterns = set([operator.add, oneflow.add, "add"])
for n in traced.graph.nodes:
if any(n.target == pattern for pattern in patterns):
with traced.graph.inserting_after(n):
new_node = traced.graph.call_function(oneflow.mul, n.args, n.kwargs)
n.replace_all_uses_with(new_node)
traced.graph.erase_node(n)
traced.recompile()
traced.graph.print_tabular()
print(traced.code)
功能剖析
以下代码展现怎么运用fx进行模型的功能剖析,将本来的模型经过symbolic_trace解析成各个节点,再在其间刺进测试功能的操作。
import oneflow
import flowvision.models as models
import statistics, tabulate, time
from typing import Any, Dict, List
class ProfilingInterpreter(oneflow.fx.Interpreter):
def __init__(self, mod : oneflow.nn.Module):
gm = oneflow.fx.symbolic_trace(mod)
super().__init__(gm)
# 记载总运转时刻
self.total_runtime_sec : List[float] = []
# 记载各个节点运转时刻
self.runtimes_sec : Dict[oneflow.fx.Node, List[float]] = {}
# 重写`run`办法,本质上是对基类`run`办法的简略封装,在运转前后记载时刻点。
# 这一办法是Graph全体运转的入口。
def run(self, *args) -> Any:
t_start = time.time()
return_val = super().run(*args)
t_end = time.time()
self.total_runtime_sec.append(t_end - t_start)
return return_val
# 同上,重写`run_node`办法,不需求自己写细节完结,只需求在对基类的`run_node`调用前后记载时刻点即可
# 这一办法是Graph中运转每个Node的入口。
def run_node(self, n : oneflow.fx.Node) -> Any:
t_start = time.time()
return_val = super().run_node(n)
t_end = time.time()
self.runtimes_sec.setdefault(n, [])
self.runtimes_sec[n].append(t_end - t_start)
return return_val
# 界说怎么打印功能测试成果
def summary(self, should_sort : bool = False) -> str:
# 存储每个节点的打印信息
node_summaries : List[List[Any]] = []
# 由于模块会被调用屡次,所以这儿核算一下均匀的运转总时长
mean_total_runtime = statistics.mean(self.total_runtime_sec)
for node, runtimes in self.runtimes_sec.items():
mean_runtime = statistics.mean(runtimes)
# 核算节点运转时刻占总时刻的份额
pct_total = mean_runtime / mean_total_runtime * 100
# 记载节点信息、节点均匀运转时长和节点运转时刻占总时刻的份额
node_summaries.append(
[node.op, str(node), mean_runtime, pct_total])
# 假如需求,安依照运转时刻进行排序
if should_sort:
node_summaries.sort(key=lambda s: s[2], reverse=True)
# 以下是凭借tabulate库进行格式化来美化显现作用
headers : List[str] = [
'Op type', 'Op', 'Average runtime (s)', 'Pct total runtime'
]
return tabulate.tabulate(node_summaries, headers=headers)
if __name__ == '__main__':
rn18 = models.resnet18()
rn18.eval()
input = oneflow.randn(5, 3, 224, 224)
output = rn18(input)
interp = ProfilingInterpreter(rn18)
interp.run(input)
print(interp.summary(True))
作用如下:
算子交融
以下代码演示怎么凭借fx将模型中的卷积层和BN层进行交融,关于这种组合,并不需求引进新的算子,只需求对本来conv的权重进行操作即可。能够参阅:nenadmarkus.com/p/fusing-ba… 。
import sys
import oneflow
import oneflow.nn as nn
import numpy as np
import copy
from typing import Dict, Any, Tuple
# 经过直接对权重进行运算的办法进行Conv和BN的交融
def fuse_conv_bn_eval(conv, bn):
assert(not (conv.training or bn.training)), "Fusion only for eval!"
fused_conv = copy.deepcopy(conv)
fused_conv.weight, fused_conv.bias = \
fuse_conv_bn_weights(fused_conv.weight, fused_conv.bias,
bn.running_mean, bn.running_var, bn.eps, bn.weight, bn.bias)
return fused_conv
# 权重交融办法
def fuse_conv_bn_weights(conv_w, conv_b, bn_rm, bn_rv, bn_eps, bn_w, bn_b):
if conv_b is None:
conv_b = oneflow.zeros_like(bn_rm)
if bn_w is None:
bn_w = oneflow.ones_like(bn_rm)
if bn_b is None:
bn_b = oneflow.zeros_like(bn_rm)
bn_var_rsqrt = oneflow.rsqrt(bn_rv + bn_eps)
conv_w = conv_w * (bn_w * bn_var_rsqrt).reshape([-1] + [1] * (len(conv_w.shape) - 1))
conv_b = (conv_b - bn_rm) * bn_var_rsqrt * bn_w + bn_b
return oneflow.nn.Parameter(conv_w), oneflow.nn.Parameter(conv_b)
# 根据字符串对称号进行分割,比方`foo.bar.baz` -> (`foo.bar`, `baz`)
def _parent_name(target : str) -> Tuple[str, str]:
*parent, name = target.rsplit('.', 1)
return parent[0] if parent else '', name
def replace_node_module(node: oneflow.fx.Node, modules: Dict[str, Any], new_module: oneflow.nn.Module):
assert(isinstance(node.target, str))
parent_name, name = _parent_name(node.target)
setattr(modules[parent_name], name, new_module)
# 界说对模型进行交融操作的进程
def fuse(model: oneflow.nn.Module) -> oneflow.nn.Module:
model = copy.deepcopy(model)
# 先经过fx.symbolic_trace获取一个GraphModule
fx_model: oneflow.fx.GraphModule = oneflow.fx.symbolic_trace(model)
modules = dict(fx_model.named_modules())
# 遍历GraphModule中的一切节点,别离进行操作
for node in fx_model.graph.nodes:
# 跳过一切不是module的节点
if node.op != 'call_module':
continue
# 检测到conv+bn的结构后进行交融操作
if type(modules[node.target]) is nn.BatchNorm2d and type(modules[node.args[0].target]) is nn.Conv2d:
# conv的输出同时被其它节点运用,即conv后衔接两个节点时无法交融
if len(node.args[0].users) > 1:
continue
conv = modules[node.args[0].target]
bn = modules[node.target]
fused_conv = fuse_conv_bn_eval(conv, bn)
replace_node_module(node.args[0], modules, fused_conv)
# 对图中的边进行置换,关于用到bn输出的节点,要更改它们的输入
node.replace_all_uses_with(node.args[0])
# 移除旧的节点
fx_model.graph.erase_node(node)
fx_model.graph.lint()
# 重新建图(构造模型)
fx_model.recompile()
return fx_model
if __name__ == '__main__':
# 以下引进flowvision中的resnet 18模型,并进行交融前后的benchmark比较
import flowvision.models as models
import time
rn18 = models.resnet18().cuda()
rn18.eval()
inp = oneflow.randn(10, 3, 224, 224).cuda()
output = rn18(inp)
def benchmark(model, iters=20):
for _ in range(10):
model(inp)
oneflow.cuda.synchronize()
begin = time.time()
for _ in range(iters):
model(inp)
return str(time.time()-begin)
fused_rn18 = fuse(rn18)
unfused_time = benchmark(rn18)
fused_time = benchmark(fused_rn18)
print("Unfused time: ", benchmark(rn18))
print("Fused time: ", benchmark(fused_rn18))
assert unfused_time > fused_time
6
未来方案
-
基于fx进行8bit量化感知练习和布置
-
基于fx进行算子交融
-
eager形式下基于fx取得模型更精确的FLOPs和MACs成果
参阅文献
1.pytorch.org/docs/stable…
2.github.com/Oneflow-Inc…
3.pytorch.org/tutorials/i…
4.pytorch.org/tutorials/i…
5.zhuanlan.zhihu.com/p/449908382
欢迎 Star、试用 OneFlow 最新版别:
github.com/Oneflow-Inc…