模型手把手系列开篇 之 python、spark 和java 生成TFrecord


书接上文,咱们的 图算法十篇 之 图机器学习系列文章总结 已经完结, 接下来 咱们 将开端 从零开端 一点一点 的用 tensorflow 完结 一些 经典的 模型,除了 和咱们 一同学习 之外,也是为了 能够帮助自己 对 曩昔学习过的 常识 做一些系统化 的 总结与回顾 ,进行 查漏补缺

接下来,文章里的许多代码,咱们会运用 notebook长途访问pyspark集群, 算法东西神器重磅引荐 文章里介绍的 notebook 东西 进行介绍,而 部分 java 与 scala spark 代码 则 均是现在 我在自己 mac 上建立的 单机版环境 编写的,如有 任何环境问题,欢迎 在 算法全栈之路 的大众号上 和我联系 ~

模型手把手系列 方案 首要 围着 tensorflow 完结模型 的 流程打开,方案中会包括 练习数据的生成数据的读入特征的处理模型结构的建立损失函数的设计序列建模经典模型的完结 等模块打开,中心 许多内容 或许我也会去 查找 许多材料与源码 ,希望 能够 真实起到 总结自己学习过 的常识、对 各位算法工程师 们在 作业学习面试 等进程中 有所帮助 的 效果吧 !

闲言少叙,本文首要先从 模型练习 的 上游 数据生成开端讲起,首要介绍 运用 python 、spark( pyspark/scala spark ) 、Java 、tfrecorder 等这 4 种方式 生成 tfrecord 的进程 以及运用 python 解码 tfrecord文件的 进程,下面 让 咱们 开端正文 吧~ go go go !!!


(1) tensorflow 模型练习数据来历简介

书接上文,咱们知道:tensorflow 练习 所 需求的 上游数据,在 数据量比较小 的时分,咱们能够用 python 的 pandas 或则 numpy 等办法 直接 在单机PC上 读取数据 然后 喂给模型 ,这种 模型 的 文件类型 能够是 本地的 txt 或则 csv 等格局。当 数据量比较大 的时分,咱们一般 将 数据放在 集群 hdfs 上,也能够 保存成 csv 或 txt 的格局,然后 练习的时分去 进行 分布式并行读取

TFRecords 是 TensorFlow 官方引荐 和 支撑的二进制文件格局,其关于 tensorflow 十分友爱,其关于 特征列多 的数据 存储占用空间 更小。当 数据量特别大,且 io 读取数据成为 模型练习速度 的 瓶颈、乃至 有时分 gpu 的 利用率时高时低 的时分,这个时分 咱们能够 将咱们 的 数据 保存成 tfrecord 的格局。 这一起也 对应着 tfrecord 的一些优点: 读取速度快、占用空间少、支撑并行读取等。这儿 咱们 就不再对 tfrecord 文件 生成的理论进行 打开阐明了,感兴趣 的 同学能够下去 自己搜索材料 哈 ~

尽管本文 是 介绍 tfrecord 的 数据格局,可是咱们 选用模型练习数据的数据格局 的时分,也 不一定非要运用 tfrecord 。许多时分 咱们 练习模型数据量 不是很大,而且 单机内存 完全能够 hold住 一切 的 数据,而 咱们 对 模型 的 练习速度 也 没有那么高要求,这个时分 普通的 csv 和 txt 等格局 简略直接,便于 查看 数据, 也能够 作为 咱们的 首选 ~

本文 首要是 介绍 多种方式生成tfrecord 格局的数据,本身便是 偏向于工程的 ,理论性 没那么强,咱们 直接开端 看 代码 吧 ~


(2)代码韶光

本文咱们首要介绍 运用 python 、spark( pyspark/scala spark ) 、Java 、tfrecorder 等 这 4 种方式 生成 tfrecord 的进程 以及 运用python 解码 tfrecord文件 的 进程 ,下面就让咱们逐一开端介绍吧 ~

因为 本文的 代码 触及 多种语言 ,这儿咱们 对各个模块 别离导包, 或许有冗余的当地,读者 能够 自行进行 区别,关于 代码的可读性 应该 无影响。

(2.1) 数据预备

看代码吧~

@欢迎重视作者大众号算法全栈之路
importpandasaspd
raw_df=pd.DataFrame([[28,12.1,'male',"1#2",1],[30,8.7,'female',"3#4#5",0],[32,24.6,'female',"6#7#8#9#10",1]],columns=['age','price','sex','click_list','label'])
print(raw_df)
raw_df.to_csv("./raw_df.csv",sep='\t',index=False,header=None)

这儿 数据类型 咱们别离挑选了 搜广推算法 用的最多的 int 型、float型、categroy 型、seq 序列类型特征 以及 label 这几列 数据 用来生成 tfrecord,如果有 其他类型 的 特征 同理 可得。


(2.2) python生成 tfrecord 数据

@欢迎重视作者大众号算法全栈之路
#文件途径
intput_csv_file="./raw_df.csv"
intput_csv_file="./py_tf_record"
#生成整数型的特点
def_int64_feature(value):
returntf.train.Feature(int64_list=tf.train.Int64List(value=[value]))
#生成浮点数类型的特点
def_float_feature(value):
returntf.train.Feature(float_list=tf.train.FloatList(value=[value]))
#生成字符串型的特点
def_bytes_feature(value):
returntf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))
#生成序列类型的特征
def_int64list_feature(value_list):
returntf.train.Feature(int64_list=tf.train.Int64List(value=value_list))
defgenerate_tf_records(intput_file_path,out_file_path):

withcodecs.open("./raw_df.csv","r","utf-8")asraw_file:
line_list=raw_file.readlines()

print("line_list_len:",len(line_list))

writer=tf.compat.v1.python_io.TFRecordWriter(out_file_path)
forlineintqdm.tqdm(line_list):
age=int(line.split("\t")[0])
price=(float)(line.split("\t")[1])
gender=line.split("\t")[2]
click_list=list(map(int,line.split("\t")[3].split("#")))
label=int(line.split("\t")[4])

example=tf.train.Example(features=tf.train.Features(
feature={
"age":_int64_feature(int(age)),
"price":_float_feature(float(price)),
"gender":_bytes_feature(gender.encode()),
"click_list":_int64list_feature(click_list),
"label":_int64_feature(label)
}))
#写入一条tfrecord
writer.write(example.SerializeToString())
writer.close()
generate_tf_records(intput_csv_file,intput_csv_file)

这儿,咱们挑选了4种 极具典型 的、搜广推算法常用的特征类型 来进行阐明。tfrecord 里有 examplefeature 的概念: example 是 protocol buffer 数据标准的完结,咱们 能够以为 每个example 能够 是一条样本(当然也能够有多条样本)。一个 example 音讯体 中包括了 一系列的 features ,而 features 里又包括有 featuer, 每一个feature 是一个 dict 形式 的 数据结构。

其中 要留意 的是: click_list 这个表示的是 用户的点击序列特征长度 关于 每个用户 或许不同。咱们能够在 这儿 传入一个 列表封装到 tfrecord 对象 里去,然后 让 tensorflow 直接读取序列特征 。当然,咱们 也能够 这儿把 列表拼接成字符串 ,然后 tensorflow 读入进去 之后 再去split 得到序列,只是 模型 会 更消耗时刻 而已。

别的需求留意的是 value= 后面 接的是数组,也能够是单个元素。如果 你写的 代码有报 数据格局问题 的话,这儿 或许需求要点看下 然后 作出 调整。

这儿要 引荐一下 codecs 这个 python 包,其关于 python读写文件 格局的编码转换 十分友爱,当读写 数据格局 兼容 会呈现 bug 的时分,强烈引荐 codecs 哦。


(2.3) spark 生成 tfrecord 数据( scala spark + pyspark)

书接上文,在 许多时分 数据量比较少 的话,咱们能够 用 上面介绍 的 单机版 python 来生成 tfrecord 文件,可是 咱们上面 也介绍了: 数据量小的时分,内存足够,用啥tfrecocrd啊,直接上 csv等不香吗? 数据量大 的时分,就得靠 咱们 这儿 介绍 的 spark 来生成 tfrecord 了,亲测速度快了十数倍不止!

那 咱们上面 介绍的 python 单机版生成tfrecord 就 无用武之地 了吗? 当然 不是,天然生成它才 必有用! 咱们 能够 在 开发代码 并 进行流程测验 的时分用 单机版python 去 生成测验 ,保证 整个开发 流程 的 流通,最终 要大规模跑数 进行 试验 的 时分,改用 本末节介绍 的 spark版别 的 办法 来 提高功率 , 两者结合 几乎 perfert !!!

(2.3.1) scala spark 生成 tfrecord

因为 scala 和 java 均是跑在 虚拟机jvm 的 语言,在 maven 工程里 是 能够 混合编译 彼此调用 的。 要想运用 spark 直接生成 tfrecord ,需求用到 google 供给的 spark 和 tensorflow 交互的包

pom.xml 里导入这个包就能够

@欢迎重视作者大众号算法全栈之路
<dependency>
<groupId>org.tensorflow</groupId>
<artifactId>spark-tensorflow-connector_2.11</artifactId>
<version>1.15.0</version>
</dependency>

然后下面是我供给了一个 根据scala spark 生成 tfrecord 的demo ,中心 的 环境 是我 单机版的spark ,或许你 用的时分 这儿 需求微调,十分简略,自己 去适配 下吧~

@欢迎重视作者大众号算法全栈之路
packagezmt_demo.model_sbs
importorg.apache.spark.SparkConf
importorg.apache.spark.sql.SparkSession
importorg.apache.spark.sql.functions._
objectDemo{
defmain(args:Array[String]){
valsparkConf=newSparkConf()
//.registerKryoClasses(Array(classOf[XgbScoreRow]))
//调节长数据本地化时刻
.setMaster("local[*]")
.set("spark.locality.wait","10")
.set("spark.sql.orc.enabled","false")
valsparkSession=SparkSession.builder()
.appName("scalasparkgeneratetfrecord")
.config(sparkConf)
.config("spark.kryoserializer.buffer.max","1024m")
.config("spark.serializer","org.apache.spark.serializer.KryoSerializer")
.config("hive.exec.dynamic.partition.mode","nonstrict")
.enableHiveSupport()
.getOrCreate()
valdemo_df=sparkSession.read
.option("inferSchema","false")//是否主动推到内容的类型
.option("delimiter","\t")//分隔符,默以为,
.csv("/Users/dhl/Desktop/notebook_all/模型手把手系列/raw_df.csv")
.toDF("age","price","sex","click_list","label")
.withColumn("click_list",split(col("click_list"),"#"))
demo_df.printSchema()
demo_df.show(2,false)
valsavedPath="/Users/dhl/Desktop/notebook_all/模型手把手系列/scala_spark_tfcord"
demo_df.write
.mode("overwrite")
.format("tfrecords")
.option("recordType","Example")
.save(savedPath)
}
}

运用单机版的 spark,咱们 在自己 mac 就能进行 业务流程代码 的 调试哈,不用 在 链接spark集群 就能够 完结 spark 代码的调试,当然 数据 是 需求 咱们 本地伪造 的 ~

关于一些 运用spark RDD 接口 较多 的同学,能够 先将 RDD 转 dataframe ,然后 在 生成tfrecord 哦 !


(2.3.2) pyspark 生成 tfrecord

现在 在 国内大厂,仍是有 许多公司 的 算法团队 运用 pyspark 十分频频 ,这儿 咱们 也供给下 pyspark 版别生成 tfrecord 的代码吧。

中心在用 spark-submit 提交 pyspark 脚本任务的时分,需求在最终参数列表里加上

--jars /Users/dhl/Desktop/notebook_all//spark-tensorflow-connector_2.11-1.15.0.jar

其实 效果和 maven 类似 ,和 上面一样 引进咱们 的 Jar 包 。导完包后,就能够 写代码 提spark job 任务 了。

@欢迎重视作者大众号算法全栈之路
importos
importsys
importfindspark
findspark.init()
importos.pathaspath
importimportlib
frompysparkimportStorageLevel
frompyspark.sqlimportSparkSession
frompyspark.sql.typesimport*
frompy4j.protocolimportPy4JJavaError
frompyspark.sqlimportfunctionsasfun
frompyspark.sql.functionsimportcol
frompyspark.sqlimportHiveContext
frompyspark.sql.functionsimport*
frompyspark.sql.functionsimportlit
importwarnings
warnings.filterwarnings("ignore")
#sparkconfigsetup
spark=SparkSession.builder.appName("pyspark-app")\
.config("spark.submit.deployMode","client")\
.config('spark.yarn.queue','idm-prod')\
.config("spark.kryoserializer.buffer.max","1024m")\
.config("spark.serializer","org.apache.spark.serializer.KryoSerializer")\
.config("hive.exec.dynamic.partition.mode","nonstrict")\
.enableHiveSupport()\
.getOrCreate()
path="./pyspark_tfrecord"
pdf_values=raw_df.values.tolist()
pdf_columns=raw_df.columns.tolist()
spark_df=spark.createDataFrame(pdf_values,pdf_columns).persist(StorageLevel.MEMORY_AND_DISK)
spark_df.write.format("tfrecords").option("recordType","Example").save(path)

从代码里能够看到,咱们这儿是运用 python 的pandas dataframe直接转的 pyspark 的 dataframe ,然后由 spark 的 dataframe 直接保存成 tfrecord 的格局。

其中需求留意的一点是: option("recordType", "Example") 这个当地的参数。 当然,关于 序列特征,咱们也能够运用 SequenceExample 这个参数来生成。

可是 关于 序列特征,咱们只要在 特征列 的 位置放入 列表元素 就 能够,tensorflow 读入 list 数据之后 再去转 序列特征 处理 也是能够的。

这儿和上面一样,这儿的 pyspark 办法 也能够和 上面 末节介绍 的 python办法 彼此结合 运用,到达 pyspark + python 包来 生成 tfrecord 的意图,十分优秀!!!

这儿 我就不在去 详细完结 了哈,可是 pyspark + python 自定义函数 与 scala + java 自定义函数的联合运用,能够说 是 灵活开发 的 模范之作 了 !


(2.4) java 生成 tfrecord 数据

书接上文,咱们说了 Java 和 scala spark 代码能够 混合编译,然后进行 彼此灵活调用 的,咱们 这儿 介绍的 Java 版别 的 生成tfrecord 的 函数与办法 ,也是能够 结合上面 介绍的 scala spark 办法,在 spark 的 map算子 调用 这儿介绍 的 办法,到达 spark + java相结合的办法来生成 tfrecord 格局文件,关于 广阔的 javaer 们,算是 十分友爱 了。

要想用 Java 生成 tfrecord 数据,需求导入下面这 两个jar 包 ,其中一个 和 上面要用到 的 重复。

@欢迎重视作者大众号算法全栈之路
<dependency>
<groupId>org.tensorflow</groupId>
<artifactId>spark-tensorflow-connector_2.11</artifactId>
<version>1.15.0</version>
</dependency>
<dependency>
<groupId>org.tensorflow</groupId>
<artifactId>tensorflow</artifactId>
<version>1.5.0</version>
</dependency>

Java代码嘛,没说的,便是包多!! 导入便是了 。

@欢迎重视作者大众号算法全栈之路
packagedemo;
importjava.io.*;
importjava.util.*;
importorg.tensorflow.example.Example;
importorg.tensorflow.example.Feature;
importorg.tensorflow.example.Features;
importorg.tensorflow.example.Int64List;
importorg.tensorflow.example.*;
importorg.tensorflow.spark.shaded.com.google.protobuf.ByteString;
importorg.tensorflow.spark.shaded.org.tensorflow.hadoop.util.TFRecordWriter;
publicclassGenerate_TFrecord_Demo{
publicstaticvoidmain(String[]args)throwsIOException{
TFRecordWritertf_write=newTFRecordWriter(newDataOutputStream(newFileOutputStream("/Users/dhl/Desktop/notebook_all/模型手把手系列/java_tfcord")));
Map<String,Object>featureMap=newHashMap<>();
featureMap.put("age","20");
featureMap.put("price","15.5");
featureMap.put("sex","male");
featureMap.put("click_list",Arrays.asList("1","2","3"));
featureMap.put("label","1");
Map<String,Feature>inputFeatureMap=newHashMap<String,Feature>();
for(Stringkey:featureMap.keySet()){
Featurefeature=null;
if(key.equals("sex")){
BytesList.BuilderbyteListBuilder=BytesList.newBuilder();
ByteStringbytes=ByteString.copyFromUtf8((String)featureMap.get(key));
byteListBuilder.addValue(bytes);
feature=Feature.newBuilder().setBytesList(byteListBuilder.build()).build();
}elseif(key.equals("age")){
Int64List.Builderint64ListBuilder=Int64List.newBuilder();
int64ListBuilder.addValue(Integer.parseInt(featureMap.get(key).toString()));
feature=Feature.newBuilder().setInt64List(int64ListBuilder.build()).build();
}elseif(key.equals("price")){
FloatList.BuilderfloatListBuilder=FloatList.newBuilder();
floatListBuilder.addValue(Float.parseFloat(featureMap.get(key).toString()));
feature=Feature.newBuilder().setFloatList(floatListBuilder.build()).build();
}elseif(key.equals("click_list")){
List<String>stringList=(List<String>)featureMap.get(key);
List<ByteString>byteStrings=newArrayList<ByteString>();
for(Strings:stringList){
byteStrings.add(ByteString.copyFromUtf8(s));
}
BytesList.BuilderbyteListBuilder=BytesList.newBuilder();
byteListBuilder.addAllValue(byteStrings);
feature=Feature.newBuilder().setBytesList(byteListBuilder.build()).build();
}
if(feature!=null){
inputFeatureMap.put(key,feature);
}
}
Featuresfeatures=Features.newBuilder().putAllFeature(inputFeatureMap).build();
Exampleexample=Example.newBuilder().setFeatures(features).build();
System.out.println(example.getFeatures());
//java版别tfrecord生成与写入
tf_write.write(example.toByteArray());
}
}

这儿,咱们 把文件 写入了 我 自己本机 的 途径,也挑选了 几个常用 的 特征类型 来运用 Java生成tfrecord 文件,自己 去 按需求更改 吧。


(2.5)python 的 tfrecorder 生成tfrecord

在我 最终 开端写小作文 做总结 的时分,偶然发现了 这个python 包 : tfrecorder ,咱们 能够运用pip install tfrecorder来 进行装置。

尽管也 是python 单机版 的包,可是这个包能够 不用写代码 显式的 打开 csv 文件 进行 文件转换,十分强壮了!

下面的 两种方式 均是 运用google 开源的tfrecorder 这个包东西的。

闲言少叙,看代码吧~

(2.5.1) csv 直接转tfrecord

完结的功用如题,单机版python神器啊!

@欢迎重视作者大众号算法全栈之路
importtfrecorder
tfrecorder.create_tfrecords(
input_data='./raw_df.csv',
output_dir='./csv_tfrecord')

(2.5.2) pandas dataframe 直接转 tfrecord
@欢迎重视作者大众号算法全栈之路
importpandasaspd
importtfrecorder
raw_df.tensorflow.to_tfr(output_dir='./pd_tfrecord')

这个 东西 有一个 ,便是 装置的时分 依靠 比较多,会 呈现 包抵触 的情况,很难缠。看阐明 好像 google 已经 抛弃保护 这个包 了,最终 更新时刻 在2020年 ?

不管了,上面 介绍的办法足够多 ,总有一种 姿势 能够满足 你。


(2.6) 解码 tfrecord 文件

这儿 要 要点介绍 下: 因为 tfrecord 是 二进制文件 ,咱们 生成了之后 如何 查看里边数据结构 呢?

简略! 用下面的办法就能够了 ,看代码 ~

@欢迎重视作者大众号算法全栈之路
importtensorflow.compat.v1astf
defgetTFRecordFormat(files):
withtf.Session()assess:
#加载TFRecord数据
ds=tf.data.TFRecordDataset(files)
ds=ds.batch(1)
ds=ds.prefetch(buffer_size=2)
iterator=ds.make_one_shot_iterator()
#为了加快速度,仅仅简略拿一组数据看下结构
batch_data=iterator.get_next()
whileTrue:
res=sess.run(batch_data)
forserialized_exampleinres:
example_proto=tf.train.Example.FromString(serialized_example)
features=example_proto.features
forkeyinfeatures.feature:
feature=features.feature[key]
iflen(feature.bytes_list.value)>0:
ftype='bytes_list'
fvalue=feature.bytes_list.value
iflen(feature.float_list.value)>0:
ftype='float_list'
fvalue=feature.float_list.value
iflen(feature.int64_list.value)>0:
ftype='int64_list'
fvalue=feature.int64_list.value
result='{0}:{1}{2}{3}'.format(key,ftype,len(fvalue),fvalue)
print(result)
break
print("*"*20)
break
#getTFRecordFormat('./pd_tfrecord')
getTFRecordFormat('./py_tf_record')
#getTFRecordFormat('./pyspark_tfrecord/part-r-00007')
#getTFRecordFormat('./scala_spark_tfcord/part-r-00000')
#getTFRecordFormat('./java_tfcord')

留意,这儿 咱们 运用 的是 tensorflow 1.x 的 版别 ~

最终 tfrecord文件解析 出来 在 咱们 的 demo 式例中 长这个样子:

模型手把手系列开篇 之 python、spark 和java 生成TFrecord

到这儿,模型手把手系列开篇 之 python、spark 和java 生成TFrecord 的 全文就写完了。 在本文里,咱们 供给了 众多生成 tfrecord 的 办法与东西,代码均能够完美跑成功,总有一款适合你,希望能够对你有参考效果 ~


码字不易,觉得有收成就动动小手转载一下吧,你的支撑是我写下去的最大动力 ~

更多更全更新内容,欢迎重视作者的大众号: 算法全栈之路

模型手把手系列开篇 之 python、spark 和java 生成TFrecord

  • END –