0%

SASRec代码笔记


完整的代码注释:https://github.com/Guadzilla/Paper_notebook/tree/main/SASRec

论文笔记:https://guadzilla.github.io/2021/11/03/SASRec/


collections.defaultdict(list)

1
class collections.defaultdict(default_factory=None, /[, ...])

返回一个新的类似字典的对象。defaultdict 是内置 dict类的子类。 它重载了一个方法并添加了一个可写的实例变量。

本对象包含一个名为 default_factory 的属性,构造时,第一个参数用于为该属性提供初始值,默认为 None。所有其他参数(包括关键字参数)都相当于传递给 dict 的构造函数。

使用defulydict(list)实例化对象时, default_factory=list,可以很轻松地将(键-值对组成的)序列转换为(键-列表组成的)字典

1
2
3
4
5
6
7
s = [('yellow', 1), ('blue', 2), ('yellow', 3), ('blue', 4), ('red', 1)]
d = defaultdict(list)
for k, v in s:
d[k].append(v)

sorted(d.items())
# 输出:[('blue', [2, 4]), ('red', [1]), ('yellow', [1, 3])]

当字典中没有的键第一次出现时,python自动为其返回一个空列表,list.append()会将值添加进新列表;再次遇到相同的键时,list.append()将其它值再添加进该列表。

Python自定义多线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
def random_neq(l, r, s):
t = np.random.randint(l, r)
while t in s:
t = np.random.randint(l, r)
return t


def sample_function(user_train, usernum, itemnum, batch_size, maxlen, result_queue, SEED):
def sample():

user = np.random.randint(1, usernum + 1) # 随机采样user id,注意是从1开始的
while len(user_train[user]) <= 1: user = np.random.randint(1, usernum + 1) # 长度小于1的训练集不要

seq = np.zeros([maxlen], dtype=np.int32) # seq序列,长度固定为maxlen,用0在前面padding补上长度,例:[0,0,...,0,23,15,2,6]
pos = np.zeros([maxlen], dtype=np.int32)
neg = np.zeros([maxlen], dtype=np.int32)
nxt = user_train[user][-1] # user_train的最后一个item取为nxt
idx = maxlen - 1

ts = set(user_train[user]) # ts为序列的item集合
for i in reversed(user_train[user][:-1]): # 从后往前遍历user_train,idx为当前要填充的下标
seq[idx] = i
pos[idx] = nxt
if nxt != 0: neg[idx] = random_neq(1, itemnum + 1, ts) # 生成的负样本不能取该序列item集合里的item
nxt = i
idx -= 1
if idx == -1: break

return (user, seq, pos, neg) # 返回一次采样,(用户id,训练序列,label序列,负样本序列)

np.random.seed(SEED)
while True: # 采样一个batch_size大小的数据样本,打包成一个batch,放到线程队列里
one_batch = []
for i in range(batch_size):
one_batch.append(sample())

result_queue.put(zip(*one_batch))


class WarpSampler(object):
def __init__(self, User, usernum, itemnum, batch_size=64, maxlen=10, n_workers=1):
self.result_queue = Queue(maxsize=n_workers * 10) # 长度为10的线程队列
self.processors = []
for i in range(n_workers):
self.processors.append( # Process()进程的类, target:要调用的对象即sampler_function,args:调用该对象要接受的参数
Process(target=sample_function, args=(User,
usernum,
itemnum,
batch_size,
maxlen,
self.result_queue,
np.random.randint(2e9)
)))
self.processors[-1].daemon = True
self.processors[-1].start()

def next_batch(self):
return self.result_queue.get()

def close(self):
for p in self.processors:
p.terminate()
p.join()



# sampler是WarpSampler对象的实例,每次调用sampler.next_batch(),就返回一个batch的样本。
# 进一步解释:每次调用sampler.next_batch()就call其线程队列里的一个线程,每个线程用于返回一个batch的数据。
sampler = WarpSampler(user_train, usernum, itemnum, batch_size=args.batch_size, maxlen=args.maxlen, n_workers=3)

torch.tril()

1
2
3
4
5
torch.tril(input, diagonal=0, *, out=None) → Tensor
# 功能:返回下三角矩阵其余部分用out填充(默认为0)
# input:输入矩阵,二维tensor
# diagonal:表示对角线位置,diagonal=0为主对角线,diagonal=-1为主对角线往下1格,diagonal=1为主对角线往上1格
# out:表示填充,默认用out=None即0填充

例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
>>> a = torch.randn(3, 3)
>>> a
tensor([[-1.0813, -0.8619, 0.7105],
[ 0.0935, 0.1380, 2.2112],
[-0.3409, -0.9828, 0.0289]])
>>> torch.tril(a)
tensor([[-1.0813, 0.0000, 0.0000],
[ 0.0935, 0.1380, 0.0000],
[-0.3409, -0.9828, 0.0289]])

>>> b = torch.randn(4, 6)
>>> b
tensor([[ 1.2219, 0.5653, -0.2521, -0.2345, 1.2544, 0.3461],
[ 0.4785, -0.4477, 0.6049, 0.6368, 0.8775, 0.7145],
[ 1.1502, 3.2716, -1.1243, -0.5413, 0.3615, 0.6864],
[-0.0614, -0.7344, -1.3164, -0.7648, -1.4024, 0.0978]])
>>> torch.tril(b, diagonal=1)
tensor([[ 1.2219, 0.5653, 0.0000, 0.0000, 0.0000, 0.0000],
[ 0.4785, -0.4477, 0.6049, 0.0000, 0.0000, 0.0000],
[ 1.1502, 3.2716, -1.1243, -0.5413, 0.0000, 0.0000],
[-0.0614, -0.7344, -1.3164, -0.7648, -1.4024, 0.0000]])
>>> torch.tril(b, diagonal=-1)
tensor([[ 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000],
[ 0.4785, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000],
[ 1.1502, 3.2716, 0.0000, 0.0000, 0.0000, 0.0000],
[-0.0614, -0.7344, -1.3164, 0.0000, 0.0000, 0.0000]])

Python中的 ~ 波浪线运算符

~,用法只有一个那就是按位取反

Python 波浪线与补码_https://space.bilibili.com/59807853-CSDN博客_python 波浪线

torch.nn.MultiAttention

1
2
torch.nn.MultiheadAttention(embed_dim, num_heads, dropout=0.0, bias=True, add_bias_kv=False, add_zero_attn=False, kdim=None, vdim=None, batch_first=False, device=None, dtype=None):

对应公式:

计算公式:

1
forward(query, key, value, key_padding_mask=None, need_weights=True, attn_mask=None)

QKV比较常规,需要注意的是

  1. key_padding_mask参数,大小为(N,S),指定key中的哪些元素不做attention计算,即看作padding。注意,为True的位置不计算attention(是padding的地方不计算)
  2. attn_mask参数,

torch.nn.BCEWithLogitsLoss()

1
forward(self, input: Tensor, target: Tensor) -> Tensor

参数说明:

  • input: Tensor of arbitrary shape as unnormalized scores (often referred to as logits).
  • target: Tensor of the same shape as input with values between 0 and 1

input:$x$ output:$y$

当 $y=1$ 时,$l_n=−log\sigma(x_n)$ ;当 $y=0$ 时,$l_n=−log(1-\sigma(x_n))$ 。

论文里使用了一个全1的矩阵pos_labels,和一个全0的矩阵neg_labels。正例标签值都为1(正确的item,ground truth应该是概率为1),负例标签值都为0(错误的item,ground truth应该是概率为0)。

1
2
3
4
5
6
7
8
9
10
pos_labels, neg_labels = torch.ones(pos_logits.shape, device=args.device), \
torch.zeros(neg_logits.shape, device=args.device)


# print("\neye ball check raw_logits:"); print(pos_logits); print(neg_logits)
# check pos_logits > 0, neg_logits < 0
adam_optimizer.zero_grad()
indices = np.where(pos != 0) # 返回一个二维数组array, array[0]=[横坐标], array[1]=[纵坐标]
loss = bce_criterion(pos_logits[indices], pos_labels[indices]) # 使正例的得分尽量
loss += bce_criterion(neg_logits[indices], neg_labels[indices])

torch.argsort()

1
torch.argsort(input, dim=-1, descending=False) → LongTensor

沿着指定dim从小到大(默认)排序元素,然后返回这些元素原来的下标。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
>>>t = torch.randint(1,10,(1,5))
>>>t
tensor([[7, 9, 5, 6, 3]])
>>>t.argsort()
tensor([[4, 2, 3, 0, 1]])
>>>t.argsort().argsort()
tensor([[3, 4, 1, 2, 0]])

# 两次argsort()可以返回每个元素的rank排名
# 解释:
# 把商品0,1,2,3,4按顺序摆好,他们的得分分别为[7,9,5,6,3]
# 对所有商品的得分从小到大排序(argsort()操作)
# 得到积分排名是[3,5,6,7,9],积分排名对应的商品id是[4,2,3,0,1](第一次argsort()的结果),每个商品id对应的下标就是他们的得分名次
# 例如商品4得分最高排在第一位,商品1得分最低排最后一位
# 然后我们想得到0,1,2,3,4顺序下的结果
# 所以对商品id排序,使得商品摆放顺序由[4,2,3,0,1]变为[0,1,2,3,4],这里也是argsort()操作,因为0~4天然有顺序关系
# [4,2,3,0,1]变为[0,1,2,3,4]的同时,排名情况[0,1,2,3,4]也变成了[3,4,1,2,0](第二次argsort()的结果)
# 即求得每个商品在原来顺序下的得分名次

numpy中的argmax、argmin、argwhere、argsort、argpartition函数 - 古明地盆 - 博客园 (cnblogs.com)

评价指标Hit Ratio、NDCG[1]

Hit Ratio

Evaluation Metrics. Given a user, each algorithm produces a ranked list of items. To assess the ranked list with the ground-truth item set (GT), we adopt Hit Ratio (HR), which has been commonly used in top-N evaluation . If a test item appears in the recommended list, it is deemed a hit. HR is calculated as:

NDCG

As the HR is recall-based metric, it does not reflect the accuracy of getting top ranks correct, which is crucial in many real-world applications. To address this, we also adopt Normalized Discounted Cumulative Gain (NDCG), which assigns higher importance to results at top ranks, scoring successively lower ranks with marginal fractional utility:

where ZK is the normalizer to ensure the perfect ranking has a value of 1; ri is the graded relevance of item at position i. We use the simple binary relevance for our work: ri = 1 if the item is in the test set, and 0 otherwise. For both metrics, larger values indicate better performance. In the evaluation, we calculate both metrics for each user in the test set, and report the average score.

代码实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# evaluate on test set
def evaluate(model, dataset, args):
[train, valid, test, usernum, itemnum] = copy.deepcopy(dataset) # deepcopy一份用于valid和test

NDCG = 0.0
HT = 0.0
valid_user = 0.0

if usernum > 10000: # 用户数量大于10000就随机采10000
users = random.sample(range(1, usernum + 1), 10000)
else:
users = range(1, usernum + 1)
for u in users:

if len(train[u]) < 1 or len(test[u]) < 1: continue

seq = np.zeros([args.maxlen], dtype=np.int32)
idx = args.maxlen - 1
# 假设原始序列为[1,2,3,4,5,6,7] 6用于valid;7用于test
seq[idx] = valid[u][0] # seq: [0,0,0,...,0,0,0,6]
idx -= 1
for i in reversed(train[u]): # seq: [0,0,0,...,0,1,2,3,4,5,6] 只剩test里的[7]用于预测
seq[idx] = i
idx -= 1
if idx == -1: break
rated = set(train[u]) # 序列物品集合
rated.add(0)
item_idx = [test[u][0]] # 取出ground truth label
for _ in range(100): # item_idx: [label,random,random,...,random] 1+100个随机物品,看得分在top10的情况
t = np.random.randint(1, itemnum + 1)
while t in rated: t = np.random.randint(1, itemnum + 1)
item_idx.append(t)

predictions = -model.predict(*[np.array(l) for l in [[u], [seq], item_idx]])
predictions = predictions[0] # (1,101) -> 101 (squeeze)

rank = predictions.argsort().argsort()[0].item() # 做两次argsort(),可以得到每个位置的rank排名

valid_user += 1

if rank < 10: # TOP10才记录,这里真实rank = rank + 1 ,因为argsort()索引包含0
NDCG += 1 / np.log2(rank + 2)
HT += 1
if valid_user % 100 == 0:
print('.', end="")
# sys.stdout.flush()

return NDCG / valid_user, HT / valid_user

参考文献:

[1]He X, Chen T, Kan M Y, et al. Trirank: Review-aware explainable recommendation by modeling aspects