import { Injectable, Logger } from '@nestjs/common'; import { ConfigService } from '@nestjs/config'; import { ConfigurationType } from '@server/configuration'; import { defaultCount, statusMap } from '@server/constants'; import { PrismaService } from '@server/prisma/prisma.service'; import { TRPCError, initTRPC } from '@trpc/server'; import Axios, { AxiosInstance } from 'axios'; import dayjs from 'dayjs'; import timezone from 'dayjs/plugin/timezone'; import utc from 'dayjs/plugin/utc'; dayjs.extend(utc); dayjs.extend(timezone); /** * 读书账号每日小黑屋 */ const blockedAccountsMap = new Map(); @Injectable() export class TrpcService { trpc = initTRPC.create(); publicProcedure = this.trpc.procedure; protectedProcedure = this.trpc.procedure.use(({ ctx, next }) => { const errorMsg = (ctx as any).errorMsg; if (errorMsg) { throw new TRPCError({ code: 'UNAUTHORIZED', message: errorMsg }); } return next({ ctx }); }); router = this.trpc.router; mergeRouters = this.trpc.mergeRouters; request: AxiosInstance; updateDelayTime = 60; private readonly logger = new Logger(this.constructor.name); constructor( private readonly prismaService: PrismaService, private readonly configService: ConfigService, ) { const { url } = this.configService.get('platform')!; this.updateDelayTime = this.configService.get( 'feed', )!.updateDelayTime; this.request = Axios.create({ baseURL: url, timeout: 15 * 1e3 }); this.request.interceptors.response.use( (response) => { return response; }, async (error) => { this.logger.log('error: ', error); const errMsg = error.response?.data?.message || ''; const id = (error.config.headers as any).xid; if (errMsg.includes('WeReadError401')) { // 账号失效 await this.prismaService.account.update({ where: { id }, data: { status: statusMap.INVALID }, }); this.logger.error(`账号(${id})登录失效,已禁用`); } else if (errMsg.includes('WeReadError429')) { //TODO 处理请求频繁 this.logger.error(`账号(${id})请求频繁,打入小黑屋`); } const today = this.getTodayDate(); const blockedAccounts = blockedAccountsMap.get(today); if (Array.isArray(blockedAccounts)) { if (id) { blockedAccounts.push(id); } blockedAccountsMap.set(today, blockedAccounts); } else if (errMsg.includes('WeReadError400')) { this.logger.error(`账号(${id})处理请求参数出错`); this.logger.error('WeReadError400: ', errMsg); // 10s 后重试 await new Promise((resolve) => setTimeout(resolve, 10 * 1e3)); } else { this.logger.error("Can't handle this error: ", errMsg); } return Promise.reject(error); }, ); } removeBlockedAccount = (vid: string) => { const today = this.getTodayDate(); const blockedAccounts = blockedAccountsMap.get(today); if (Array.isArray(blockedAccounts)) { const newBlockedAccounts = blockedAccounts.filter((id) => id !== vid); blockedAccountsMap.set(today, newBlockedAccounts); } }; private getTodayDate() { return dayjs.tz(new Date(), 'Asia/Shanghai').format('YYYY-MM-DD'); } getBlockedAccountIds() { const today = this.getTodayDate(); const disabledAccounts = blockedAccountsMap.get(today) || []; this.logger.debug('disabledAccounts: ', disabledAccounts); return disabledAccounts.filter(Boolean); } private async getAvailableAccount() { const disabledAccounts = this.getBlockedAccountIds(); const account = await this.prismaService.account.findMany({ where: { status: statusMap.ENABLE, NOT: { id: { in: disabledAccounts }, }, }, take: 10, }); if (!account || account.length === 0) { throw new Error('暂无可用读书账号!'); } return account[Math.floor(Math.random() * account.length)]; } async getMpArticles(mpId: string, page = 1, retryCount = 3) { const account = await this.getAvailableAccount(); try { const res = await this.request .get< { id: string; title: string; picUrl: string; publishTime: number; }[] >(`/api/v2/platform/mps/${mpId}/articles`, { headers: { xid: account.id, Authorization: `Bearer ${account.token}`, }, params: { page, }, }) .then((res) => res.data) .then((res) => { this.logger.log( `getMpArticles(${mpId}) page: ${page} articles: ${res.length}`, ); return res; }); return res; } catch (err) { this.logger.error(`retry(${4 - retryCount}) getMpArticles error: `, err); if (retryCount > 0) { return this.getMpArticles(mpId, page, retryCount - 1); } else { throw err; } } } async refreshMpArticlesAndUpdateFeed(mpId: string, page = 1) { const articles = await this.getMpArticles(mpId, page); if (articles.length > 0) { let results; const { type } = this.configService.get('database')!; if (type === 'sqlite') { // sqlite3 不支持 createMany const inserts = articles.map(({ id, picUrl, publishTime, title }) => this.prismaService.article.upsert({ create: { id, mpId, picUrl, publishTime, title }, update: { publishTime, title, }, where: { id }, }), ); results = await this.prismaService.$transaction(inserts); } else { results = await (this.prismaService.article as any).createMany({ data: articles.map(({ id, picUrl, publishTime, title }) => ({ id, mpId, picUrl, publishTime, title, })), skipDuplicates: true, }); } this.logger.debug( `refreshMpArticlesAndUpdateFeed create results: ${JSON.stringify(results)}`, ); } // 如果文章数量小于 defaultCount,则认为没有更多历史文章 const hasHistory = articles.length < defaultCount ? 0 : 1; await this.prismaService.feed.update({ where: { id: mpId }, data: { syncTime: Math.floor(Date.now() / 1e3), hasHistory, }, }); return { hasHistory }; } inProgressHistoryMp = { id: '', page: 1, }; async getHistoryMpArticles(mpId: string) { if (this.inProgressHistoryMp.id === mpId) { this.logger.log(`getHistoryMpArticles(${mpId}) is running`); return; } this.inProgressHistoryMp = { id: mpId, page: 1, }; if (!this.inProgressHistoryMp.id) { return; } try { const feed = await this.prismaService.feed.findFirstOrThrow({ where: { id: mpId, }, }); // 如果完整同步过历史文章,则直接返回 if (feed.hasHistory === 0) { this.logger.log(`getHistoryMpArticles(${mpId}) has no history`); return; } const total = await this.prismaService.article.count({ where: { mpId, }, }); this.inProgressHistoryMp.page = Math.ceil(total / defaultCount); // 最多尝试一千次 let i = 1e3; while (i-- > 0) { if (this.inProgressHistoryMp.id !== mpId) { this.logger.log( `getHistoryMpArticles(${mpId}) is not running, break`, ); break; } const { hasHistory } = await this.refreshMpArticlesAndUpdateFeed( mpId, this.inProgressHistoryMp.page, ); if (hasHistory < 1) { this.logger.log( `getHistoryMpArticles(${mpId}) has no history, break`, ); break; } this.inProgressHistoryMp.page++; await new Promise((resolve) => setTimeout(resolve, this.updateDelayTime * 1e3), ); } } finally { this.inProgressHistoryMp = { id: '', page: 1, }; } } isRefreshAllMpArticlesRunning = false; async refreshAllMpArticlesAndUpdateFeed() { if (this.isRefreshAllMpArticlesRunning) { this.logger.log('refreshAllMpArticlesAndUpdateFeed is running'); return; } const mps = await this.prismaService.feed.findMany(); this.isRefreshAllMpArticlesRunning = true; try { for (const { id } of mps) { await this.refreshMpArticlesAndUpdateFeed(id); await new Promise((resolve) => setTimeout(resolve, this.updateDelayTime * 1e3), ); } } finally { this.isRefreshAllMpArticlesRunning = false; } } async getMpInfo(url: string) { url = url.trim(); const account = await this.getAvailableAccount(); return this.request .post< { id: string; cover: string; name: string; intro: string; updateTime: number; }[] >( `/api/v2/platform/wxs2mp`, { url }, { headers: { xid: account.id, Authorization: `Bearer ${account.token}`, }, }, ) .then((res) => res.data); } async createLoginUrl() { return this.request .get<{ uuid: string; scanUrl: string; }>(`/api/v2/login/platform`) .then((res) => res.data); } async getLoginResult(id: string) { return this.request .get<{ message: string; vid?: number; token?: string; username?: string; }>(`/api/v2/login/platform/${id}`, { timeout: 120 * 1e3 }) .then((res) => res.data); } }