一、编写数据集导入函数,此函数必须返回2个对象:
1. 字典为 {"特征名称": 特征值Tensor 或 SparseTensor}
2. label的Tensor列表
格式:
def input_fn(dataset):
...
return feature_dict, label # 这2组合为张量迭代器, 就是 dataset
二、 定义特征列( tf.feature_column)
a = tf.feature_column.numeric_column('a')
b = tf.feature_column.numeric_column('b')
c = tf.feature_column.numeric_column('c', normalizer_fn=lambda x: x)
三、实例化相关的预创建的 Estimator
estimator = tf.estimator.LinearClassifier(
feature_columns=[a,b,c]
)
四、调用训练、评估或推理方法
.train(数据集处理函数句柄,steps=None)
estimator.predict(数据集处理函数句柄,predict_keys=None, steps=None)
.estimator.evaluate(数据集处理函数句柄,steps=None)
常用 estimator
tf.estimator.DNNClassifier(hidden_units,feature_columns)
tf.estimator.DNNRegressor(hidden_units,feature_columns)
tf.estimator.LinearClassifier(feature_columns)
tf.estimator.LinearRegressor(feature_columns)
tf.estimator.DNNLinearCombinedClassifier()
tf.estimator.DNNLinearCombinedRegressor()
常用 feature_column(指定estimator字段的数据类型)
特征处理几种方式
- 特征归一化(针对连续型)
- 特征离散化:很少直接使用连续值作为特征,而是将特征离散化后再输入到模型中。离散化特征对于异常值具有更好的鲁棒性,可以为特征引入非线性的能力。并且,离散化可以更好的进行Embedding,可以通过👉等频分桶对特征值离散化
- 特征组合:,生成更丰富的行为表征,可加速模型的收敛速度
- Embedding
分桶,交叉特征
age_buckets = tf.feature_column.bucketized_column(
age, boundaries=[18, 25, 30, 35, 40, 45, 50, 55, 60, 65])
crossed_columns = [
tf.feature_column.crossed_column(
['education', 'occupation'], hash_bucket_size=1000),
tf.feature_column.crossed_column(
[age_buckets, 'education', 'occupation'], hash_bucket_size=1000),
]
tf.feature_column
连续值类型特征 转向量
features = {"price": [[1.], [5.]]}
price = tf.feature_column.numeric_column("price")
将连续值类型特征映射为one-hot(需2步)
aa = tf.feature_column.categorical_column_with_identity(
key='连续整数特征列',
num_buckets= num # num_buckets 要 大于 此连续整数列里面所有数的最大值
)
👉注:categorical_column_with_identity这种方式输出结果为 类似SparseTensor的稀疏矩阵。
👉所以我们需要用 tf.feature_column.indicator_column 将稀疏矩阵转为 正常的one-hot👇
tf.feature_column.indicator_column( aa ) # 参数是上面identity包装输出的稀疏矩阵
分桶将连续值特征 按照 数值到校 分段 映射为 离散值类型的one-hot特征列
目标格式:
日期范围 表示为…
< 1960 年 [1, 0, 0, 0]
>= 1960 年 and < 1980 年 [0, 1, 0, 0]
>= 1980 年 and < 2000 年 [0, 0, 1, 0]
>= 2000 年 [0, 0, 0, 1]
实现:
# 首先,将原始输入转换为一个numeric column
numeric_feature_column = tf.feature_column.numeric_column("Year")
# 然后,按照边界[1960,1980,2000]将numeric column进行bucket,然后自动one-hot
bucketized_feature_column = tf.feature_column.bucketized_column(
source_column = numeric_feature_column,
boundaries = [1960, 1980, 2000])
上面说的是连续值类型特征,下面开始聊离散值类型特征及其扩展转化👇
离散值类型特征 转向量 👇
离散值类型特征 多半是不规则数据,所以无法直接转对应的离散向量。
但是可以通过 哈希散列 和 词表散列的方式转换 👇
哈希散列 (离散文本被散列离散为 离散值类型的one-hot特征列)
当类别的数量特别大时,若将每个词汇或整数设置单独的类别,会消耗非常大的内存。
我们可大致给定类别值,让模型自己去做hash,然后映射到设定的类别(桶)中,并学习拟合到这些类别
hashed_feature_column =
tf.feature_column.categorical_column_with_hash_bucket(
key = "some_feature", # 指定的列为字符串组成的列,👉字符串转为ID
hash_bucket_size = random_n
)
# 假如 random_n 为 100,那么最终的离散值就会随机选取[0-100)闭开区间的 K 个值]
# K为输入的数据类型保持一致的数
# 👉此结果依然是稀疏向量, 需要转为稠密向量 one-hot👇
和将连续值类型特征映射为one-hot的做法类似:
👉我们需要用 tf.feature_column.indicator_column 将稀疏矩阵转为 正常的one-hot👇
具体例子如下:
tf.feature_column.indicator_column(hashed_feature_column)
👉这里还有一个做法,可以把上面的稀疏矩阵转为一个具体的Tensor。但不是one-hot:
import tensorflow as tf
from tensorflow.python.feature_column import feature_column_lib
feature_cache = feature_column_lib.FeatureTransformationCache(features={
# feature对应的值可以为Tensor,也可以为SparseTensor
"feature": tf.constant(value=
[
["a", "b"], ["c", "d"]
],
)
})
id_weight_pair = tf.feature_column.categorical_column_with_hash_bucket(
key='feature',
hash_bucket_size=100
).get_sparse_tensors(feature_cache, None)
sparse_tensor = id_weight_pair.id_tensor # id_weight_pair 转为 SparseTensor
tensor = tf.sparse.to_dense(sparse_tensor)
print(tensor)
# 输出结果
# tf.Tensor(
# [[39 22]
# [72 65]], shape=(2, 2), dtype=int64)
词表散列 (离散文本被散列离散为整数)
vocabulary_feature_column =
tf.feature_column.categorical_column_with_vocabulary_list(
key='some_feature',
vocabulary_list=["哈", "蜜", "瓜"],
# num_oov_buckets=2,
# default_value=-1,
)
👉Note: 如果num_oov_buckets这个参数不传,则代表vocabulary_list列表中的vocab被一一映射为和categorical_column_with_hash_bucket类似的效果
👉和categorical_column_with_hash_bucket稍有不同的是:
categorical_column_with_vocabulary_list不是随机映射的
而是从0开始 [0,1,2]对应哈,密,瓜,三个字
👉至于参数 num_oov_buckets=10,它代表着,如果vocabulary_list的词不在 some_feature这个列的值中,那么则会用 hash算法对vocabulary_list中的(不在 some_feature这个列)的值做增扩散列,num_oov_buckets=2 传的是2, 则可能被随机散列为 3或者4,因为(0,1,2)都被占用了。
👉至于参数default_value=-1,代表如果vocabulary_list的词不在 some_feature这个列的值中,那么会被散列,且散列的值设为-1
👉同样的和categorical_column_with_hash_bucket一样,结果也是稀疏矩阵。
所以需要用feature_column把其转为one-hot:
tf.feature_column.indicator_column(vocabulary_feature_column)
将离散文本特征转为embedding特征
👉前提:是在做过categorical_column_with_vocabulary_list 或categorical_column_with_hash_bucket 的离散化基础上,再继续embedding的。
写法:
tf.feature_column.embedding_column(被离散后的特征变量, dimension=5)
# dimension=5 代表转换后每个样本词有5维度的特征。 shape即 [样本词数, 5]
eg:
vocabulary_feature_column = \
tf.feature_column.categorical_column_with_vocabulary_list(
key='some_feature',
vocabulary_list=["哈", "蜜", "瓜"],
# num_oov_buckets=2,
# default_value=-1,
)
tf.feature_column.embedding_column(vocabulary_feature_column, dimension=3)
交叉特征
👉Note: 要注意,特征交叉是对应维度做交叉,一般是只对特征做交叉,样本个数不变,eg:
a = [
[1,2],
[3,4]
]
b = [
[5,6],
[7,8]
]
a与b交叉后得到👇:
[
[1,5], [1,6], [2,5], [2,6],
[3,7], [3,8], [4,7], [4,8],
]
tf.feature_column.crossed_column使用例子:
import tensorflow as tf
import tensorflow.feature_column as fc
actual_sex = {'sex': tf.Variable(['male', 'female', 'female', 'male'], dtype=tf.string)}
actual_nationality = {'nationality': tf.Variable(['belgian', 'french', 'belgian', 'belgian'], dtype=tf.string)}
actual_sex_nationality = dict(actual_sex, **actual_nationality)
# hashed_column
sex_hashed_raw = fc.categorical_column_with_hash_bucket("sex", 10)
sex_hashed = fc.indicator_column(sex_hashed_raw)
# crossed column
crossed_sn_raw = fc.crossed_column(['sex', 'nationality'], hash_bucket_size=20)
crossed_sn = fc.indicator_column(crossed_sn_raw) # 交叉后是稀疏矩阵,转稠密
👉Note1: fc.numeric_column不能直接做交叉,需要先转为离散,然后再交叉
👉Note2: hash_bucket_size代表特征交叉后,用hash函数将每个样本的交叉特征映射为0-19的数字
👉Note3: 交叉后的特征为sparse矩阵, 需要用indicator_column转为one-hot或multi-hot形式的dense(稠密)矩阵,(具体是one-hot还是multi-hot,根据稀疏矩阵的格式来转)
将特征输入到模型的2种方式
一. tk.layers.DenseFeatures()
numerical_columns 是没有做任何处理的,就可以直接输入到DNN中
categorical_columns,是经过embedding处理后的
👉Note:训练得出的 User Embed 和 Item Embed 不能直接运算,因为不在同一个向量空间
preprocessing_layer = tf.keras.layers.DenseFeatures(numerical_columns + categorical_columns)
model = tf.keras.Sequential([
preprocessing_layer,
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dense(1, activation='sigmoid'),
])
model.compile(
loss='binary_crossentropy',
optimizer='adam',
metrics=['accuracy']
)
model.fit(train_dataset, epochs=10)
test_loss, test_accuracy = model.evaluate(test_dataset)
print('\n\nTest Loss {}, Test Accuracy {}'.format(test_loss, test_accuracy)
二. tf.estimator.模型()
下面会着重说这个方式,此处略
feature_column实例
a = tf.feature_column.numeric_column("数值字段")
# 已知类别个数
b = tf.feature_column.categorical_column_with_vocabulary_file(
"类别字段名称",
["类别", "类别2",...,"类别n"] # 把所有类别字符串转为 ID 0,1,2,...,n
)
# 和上述类似,未知类别个数时,用分桶
c = tf.feature_column.categorical_column_with_hash_bucket(
"类别字段名称",
hash_bucket_size=2000 # 越大,冲突越少
)
最后将 feature_column的结果,组装成列表, 返回给模型构造。
👉eg:
feature_columns = [a, b, c]
classifier = tf.estimator.LinearClassifier(feature_columns)
👉正式训练模型:
classifier.train(数据集处理函数句柄(就是上面返回dataset的那个),steps=None)
👉上面的dataset函数句柄如下(官方CSV操作实例):
TRAIN_DATA_URL = "https://storage.googleapis.com/tf-datasets/titanic/train.csv"
TEST_DATA_URL = "https://storage.googleapis.com/tf-datasets/titanic/eval.csv"
# 把数据下载到此路径下
train_file_path = tf.keras.utils.get_file("train.csv", TRAIN_DATA_URL)
# 'C:\\Users\\lin\\.keras\\datasets\\train.csv'
test_file_path = tf.keras.utils.get_file("eval.csv", TEST_DATA_URL)
# 'C:\\Users\\lin\\.keras\\datasets\\eval.csv'
CSV_COLUMNS = ['survived', 'sex', 'age', 'n_siblings_spouses', 'parch', 'fare', 'class', 'deck', 'embark_town', 'alone']
LABEL_COLUMN = 'survived'
LABELS = [0, 1]
def get_dataset(file_path):
"""
真正操作CSV的函数
"""
dataset = tf.data.experimental.make_csv_dataset(
file_path,
batch_size=12,
column_names=CSV_COLUMNS,
label_name=LABEL_COLUMN,
na_value="?",
num_epochs=1,
ignore_errors=True)
return dataset
# 只要我们将 column_names, label_name 这2个字段指定好
# make_csv_dataset 将会自动帮我们组装 dataset格式 (包括 feature_dict 和 lable的迭代器)并return
👉因为要传函数句柄(就是函数名),而不是调用,所以额外的参数,需要用到偏函数解决
from functools import partial
# 读取上面路径下的CSV
train_func = partial(get_dataset, train_file_path)
test_func = partial(get_dataset, test_file_path)
classifier.train(train_func)
result = classifier.evaluate(test_func)
print(result)
tf.data(文件)都是 dataset的子集
tf.data.TextLineDataset - 从文本文件中读取行。
tf.data.TFRecordDataset - 从 TFRecord 文件中读取记录。
tf.data.FixedLengthRecordDataset - 从二进制文件中读取具有固定大小的记录。
dataset转iter
dataset1 = tf.data.Dataset.range(10)
c = dataset1.__iter__()
print(c.get_next())
print(c.get_next())
print(c.get_next())
👉Note: 不要用next() 也不要用 next()
👉注意:如果 iter() 找不到 get_next()
👉那么就直接用下面即可
iterator = tf.compat.v1.data.make_one_shot_iterator(dataset)
dataset = iterator.get_next()
return dataset
tensorboard
使用方式1:keras callback
训练出模型,直接用tensorboard命令接口,前提需要在具有 模型文件的目录下。model.ckpt。
log_dir="logs/fit/" + datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
callbacks = [ # put it to fit()
keras.callbacks.TensorBoard(log_dir),
keras.callbacks.ModelCheckpoint(save_model_file,save_best_only=True),
keras.callbacks.EarlyStopping(patience=5, min_delta=1e-3)
]
# tensorboard --logdir "logs/fit"
使用方式2:(基于Tensorflow2.0 的 gradient)
https://www.tensorflow.org/tensorboard/get_started?hl=zh-cn
写入TFRecords
Note: 一个Example就是👉一个 样本
第一步,生成TFRecord Writer
writer = tf.data.experimental.TFRecordWriter(path, compression_type=None)
# path:TFRecord文件的存放路径;
# compression_type:定义TFRecord文件保存的压缩格式;
也可以用纯python接口(好处就是可以用with上下文):
with tf.io.TFRecordWriter() as writer:
xxx
第二步,tf.train.Feature生成协议信息
feature是一个字典值,它是将某个类型列表编码成特定的feature格式,而该字典键用于读取TFRecords文件时索引得到不同的数据,某个类型列表可能包含零个或多个值,列表类型如下三种:
tf.train.BytesList(value=[value]) # value转化为字符串(二进制)列表
tf.train.FloatList(value=[value]) # value转化为浮点型列表
tf.train.Int64List(value=[value]) # value转化为整型列表
其中,value是你要保存的数据。内层feature编码方式:
feature_internal = {
"width":tf.train.Feature(int64_list=tf.train.Int64List(value=[width])),
"weights":tf.train.Feature(float_list=tf.train.FloatList(value=[weights])),
"image_raw":tf.train.Feature(bytes_list=tf.train.BytesList(value=[image_raw]))
}
外层features再将内层字典编码:
features_extern = tf.train.Features(feature_internal)
- tf.train.Feature这个接口可以编码封装列表类型和字典类型,内层用的是tf.train.Feature
- 外层使用tf.train.Features
第三步,使用tf.train.Example将features编码数据封装成特定的PB协议格式
example = tf.train.Example(features_extern)
第四步,将example数据系列化为字符串
example_str = example.SerializeToString()
第五步,将系列化为字符串的example数据写入协议缓冲区
writer.write(example_str)
以上5步合写
writer = tf.data.experimental.TFRecordWriter(path, compression_type=None)
example = tf.train.Example(
tf.train.Features(
{
"width": tf.train.Feature(int64_list=tf.train.Int64List(value=[width])),
"weights": tf.train.Feature(float_list=tf.train.FloatList(value=[weights])),
"image_raw": tf.train.Feature(bytes_list=tf.train.BytesList(value=[image_raw]))
}
)
)
example_str = example.SerializeToString() # 将example数据系列化为字符串
writer.write(example_str)
正式将业务数据写入到TFRecord
- 将特征中心的表读出(121个特征)
- 将click列 和 合并好的feature列用select['click','feature']出来,并 collect()取值
- 将取出的值转为 pandas 的 df
- 将df存储到 TFRecord
eg:
writer = tf.data.experimental.TFRecordWriter(path, compression_type=None)
click = click_batch[i]
feature = feature_batch[i].tostring()
# [18.0, 0.09475817797242475, 0.0543921297305341...
# 构造example,int64, float64, bytes
example = tf.train.Example(features=tf.train.Features(feature={
"label": tf.train.Feature(int64_list=tf.train.Int64List(value=[click])),
"feature": tf.train.Feature(bytes_list=tf.train.BytesList(value=[feature]))
}))
# 序列化example,写入文件
writer.write(example.SerializeToString())
读取 TFRecord
FEATURE_COLUMNS = ['channel_id', 'vector', 'user_weigths', 'article_weights']
def read_ctr_records():
# 定义转换函数,输入时序列化的
def parse_tfrecords_function(features):
parsed_features = tf.data.experimental.parse_example_dataset(features)
feature = tf.io.decode_raw(parsed_features['feature'], tf.float64)
feature = tf.reshape(tf.cast(feature, tf.float32), [1, 121])
# 特征顺序 1 channel_id, 100 article_vector, 10 user_weights, 10 article_weights
# 1 channel_id类别型特征, 100维文章向量求平均值当连续特征,10维用户权重求平均值当连续特征
channel_id = tf.cast(tf.slice(feature, [0, 0], [1, 1]), tf.int32)
vector = tf.reduce_sum(tf.slice(feature, [0, 1], [1, 100]), axis=1)
user_weights = tf.reduce_sum(tf.slice(feature, [0, 101], [1, 10]), axis=1)
article_weights = tf.reduce_sum(tf.slice(feature, [0, 111], [1, 10]), axis=1)
label = tf.cast(parsed_features['label'], tf.float32)
# 构造字典 名称-tensor
tensor_list = [channel_id, vector, user_weights, article_weights]
feature_dict = dict(zip(FEATURE_COLUMNS, tensor_list))
return feature_dict, label
dataset = tf.data.TFRecordDataset(["./xx.tfrecords"])
dataset = dataset.map(parse_tfrecords_function)
正则化(Regularization)
- L1:最终结果,可能使得某项的权重参数惩罚为0
- L2:最终结果,可能使得某项的权重参数大大减小,但不会为0(权重衰减)
带有L2的 loss 反向传播(BP)后求导得:
dW = 对w的偏导 + (λ/m)W (λ 为正则化因子,是超参数)
W:=W−αdW
最终 w = w(1-αλ/m) - α * 对w的偏导
(1-αλ/m) 永远小于1, 所以 w只会衰减,所以特征不会消失。
带有L1的 loss BP求导:
同上,略, 最终会使w为0,让特征消失掉。
TFRL
常出现的问题:
- 模型更新周期慢,不能有效反映线上的变化,最快小时级别,一般是天级别甚至周级别。
解决方式:采用Online-learning的算法。 - 模型参数多线上predict的时候需要内存大,QPS无法保证。
解决方式:采用一些优化的方法,在保证精度的前提下,尽量获取稀疏解,从而降低模型参数的数量。
常用最优化的方法:FTRL(Follow the Regularized Leader)
FTRL是一种获得稀疏模型并且防止过拟合的优化方法:(就是loss+l1+l2的结合体)公式:
loss + l1 + l2
应用场景:
FTRL更适合大量的稀疏特征和大量数据场景。(低稀疏可能效果并不是很好)
算法参数:
- lambda1:L1正则系数,参考值:10 ~ 15
- lambda2:L2正则系数,参考值:10 ~ 15
- alpha:FTRL参数,参考值:0.1
- beta:FTRL参数,参考值:1.0
- batchSize: mini-batch的大小,参考值:10000
最终参数:
lambda1 = 15,lambda2 = 15, alpha = 0.1, beta = 1.0
使用FTRL实例:
classifiry = tf.estimator.LinearClassifier(feature_columns=feature_cl,
optimizer=tf.train.FtrlOptimizer(
learning_rate=0.01,
l1_regularization_strength=10,
l2_regularization_strength=15,
)
)