Browse Source

Fetch outbox to grab old activities

Chocobozzz 2 years ago
parent
commit
c986175d68
No account linked to committer's email address

+ 1
- 2
.codeclimate.yml View File

@@ -6,7 +6,7 @@ engines:
6 6
     enabled: true
7 7
     config:
8 8
       languages:
9
-      - javascript
9
+      - typescript
10 10
   eslint:
11 11
     enabled: true
12 12
   fixme:
@@ -18,7 +18,6 @@ ratings:
18 18
 exclude_paths:
19 19
 - config/
20 20
 - node_modules/
21
-- client
22 21
 - scripts/
23 22
 - server/tests/
24 23
 - .tmp/

+ 0
- 1
client/src/app/+admin/follows/followers-list/followers-list.component.html View File

@@ -8,7 +8,6 @@
8 8
     >
9 9
       <p-column field="id" header="ID"></p-column>
10 10
       <p-column field="follower.host" header="Host"></p-column>
11
-      <p-column field="email" header="Email"></p-column>
12 11
       <p-column field="follower.score" header="Score"></p-column>
13 12
       <p-column field="state" header="State"></p-column>
14 13
       <p-column field="createdAt" header="Created date" [sortable]="true"></p-column>

+ 0
- 1
client/src/app/+admin/follows/following-list/following-list.component.html View File

@@ -8,7 +8,6 @@
8 8
     >
9 9
       <p-column field="id" header="ID"></p-column>
10 10
       <p-column field="following.host" header="Host"></p-column>
11
-      <p-column field="email" header="Email"></p-column>
12 11
       <p-column field="state" header="State"></p-column>
13 12
       <p-column field="createdAt" header="Created date" [sortable]="true"></p-column>
14 13
       <p-column header="Unfollow" styleClass="action-cell">

+ 2
- 31
server/controllers/activitypub/inbox.ts View File

@@ -1,27 +1,10 @@
1 1
 import * as express from 'express'
2
-import { Activity, ActivityPubCollection, ActivityPubOrderedCollection, ActivityType, RootActivity } from '../../../shared'
2
+import { Activity, ActivityPubCollection, ActivityPubOrderedCollection, RootActivity } from '../../../shared'
3 3
 import { logger } from '../../helpers'
4 4
 import { isActivityValid } from '../../helpers/custom-validators/activitypub/activity'
5
-import { processCreateActivity, processUpdateActivity, processUndoActivity } from '../../lib'
6
-import { processAcceptActivity } from '../../lib/activitypub/process/process-accept'
7
-import { processAddActivity } from '../../lib/activitypub/process/process-add'
8
-import { processAnnounceActivity } from '../../lib/activitypub/process/process-announce'
9
-import { processDeleteActivity } from '../../lib/activitypub/process/process-delete'
10
-import { processFollowActivity } from '../../lib/activitypub/process/process-follow'
5
+import { processActivities } from '../../lib/activitypub/process/process'
11 6
 import { asyncMiddleware, checkSignature, localAccountValidator, signatureValidator } from '../../middlewares'
12 7
 import { activityPubValidator } from '../../middlewares/validators/activitypub/activity'
13
-import { AccountInstance } from '../../models/account/account-interface'
14
-
15
-const processActivity: { [ P in ActivityType ]: (activity: Activity, inboxAccount?: AccountInstance) => Promise<any> } = {
16
-  Create: processCreateActivity,
17
-  Add: processAddActivity,
18
-  Update: processUpdateActivity,
19
-  Delete: processDeleteActivity,
20
-  Follow: processFollowActivity,
21
-  Accept: processAcceptActivity,
22
-  Announce: processAnnounceActivity,
23
-  Undo: processUndoActivity
24
-}
25 8
 
26 9
 const inboxRouter = express.Router()
27 10
 
@@ -69,15 +52,3 @@ async function inboxController (req: express.Request, res: express.Response, nex
69 52
 
70 53
   res.status(204).end()
71 54
 }
72
-
73
-async function processActivities (activities: Activity[], inboxAccount?: AccountInstance) {
74
-  for (const activity of activities) {
75
-    const activityProcessor = processActivity[activity.type]
76
-    if (activityProcessor === undefined) {
77
-      logger.warn('Unknown activity type %s.', activity.type, { activityId: activity.id })
78
-      continue
79
-    }
80
-
81
-    await activityProcessor(activity, inboxAccount)
82
-  }
83
-}

+ 0
- 2
server/controllers/activitypub/outbox.ts View File

@@ -34,8 +34,6 @@ async function outboxController (req: express.Request, res: express.Response, ne
34 34
   const data = await db.Video.listAllAndSharedByAccountForOutbox(account.id, start, count)
35 35
   const activities: Activity[] = []
36 36
 
37
-  console.log(account.url)
38
-
39 37
   for (const video of data.data) {
40 38
     const videoObject = video.toActivityPubObject()
41 39
     let addActivity: ActivityAdd = await addActivityData(video.url, account, video, video.VideoChannel.url, videoObject)

+ 3
- 0
server/controllers/api/server/follows.ts View File

@@ -19,6 +19,7 @@ import { sendUndoFollow } from '../../../lib/activitypub/send/send-undo'
19 19
 import { AccountInstance } from '../../../models/account/account-interface'
20 20
 import { retryTransactionWrapper } from '../../../helpers/database-utils'
21 21
 import { saveAccountAndServerIfNotExist } from '../../../lib/activitypub/account'
22
+import { addFetchOutboxJob } from '../../../lib/activitypub/fetch'
22 23
 
23 24
 const serverFollowsRouter = express.Router()
24 25
 
@@ -136,6 +137,8 @@ async function follow (fromAccount: AccountInstance, targetAccount: AccountInsta
136 137
       if (accountFollow.state === 'pending') {
137 138
         await sendFollow(accountFollow, t)
138 139
       }
140
+
141
+      await addFetchOutboxJob(targetAccount, t)
139 142
     })
140 143
   } catch (err) {
141 144
     // Reset target account

+ 8
- 6
server/helpers/activitypub.ts View File

@@ -46,14 +46,16 @@ function activityPubCollectionPagination (url: string, page: number, result: Res
46 46
     orderedItems: result.data
47 47
   }
48 48
 
49
-  const obj = {
50
-    id: url,
51
-    type: 'OrderedCollection',
52
-    totalItems: result.total,
53
-    orderedItems: orderedCollectionPagination
49
+  if (page === 1) {
50
+    return activityPubContextify({
51
+      id: url,
52
+      type: 'OrderedCollection',
53
+      totalItems: result.total,
54
+      first: orderedCollectionPagination
55
+    })
54 56
   }
55 57
 
56
-  return activityPubContextify(obj)
58
+  return orderedCollectionPagination
57 59
 }
58 60
 
59 61
 function buildSignedActivity (byAccount: AccountInstance, data: Object) {

+ 1
- 0
server/initializers/constants.ts View File

@@ -228,6 +228,7 @@ const ACTIVITY_PUB = {
228 228
   ACCEPT_HEADER: 'application/ld+json; profile="https://www.w3.org/ns/activitystreams"',
229 229
   PUBLIC: 'https://www.w3.org/ns/activitystreams#Public',
230 230
   COLLECTION_ITEMS_PER_PAGE: 10,
231
+  FETCH_PAGE_LIMIT: 100,
231 232
   URL_MIME_TYPES: {
232 233
     VIDEO: [ 'video/mp4', 'video/webm', 'video/ogg' ], // TODO: Merge with VIDEO_MIMETYPE_EXT
233 234
     TORRENT: [ 'application/x-bittorrent' ],

+ 15
- 0
server/lib/activitypub/fetch.ts View File

@@ -0,0 +1,15 @@
1
+import { Transaction } from 'sequelize'
2
+import { AccountInstance } from '../../models/account/account-interface'
3
+import { activitypubHttpJobScheduler, ActivityPubHttpPayload } from '../jobs/activitypub-http-job-scheduler/activitypub-http-job-scheduler'
4
+
5
+async function addFetchOutboxJob (account: AccountInstance, t: Transaction) {
6
+  const jobPayload: ActivityPubHttpPayload = {
7
+    uris: [ account.outboxUrl ]
8
+  }
9
+
10
+  return activitypubHttpJobScheduler.createJob(t, 'activitypubHttpFetcherHandler', jobPayload)
11
+}
12
+
13
+export {
14
+  addFetchOutboxJob
15
+}

+ 1
- 0
server/lib/activitypub/index.ts View File

@@ -1,6 +1,7 @@
1 1
 export * from './process'
2 2
 export * from './send'
3 3
 export * from './account'
4
+export * from './fetch'
4 5
 export * from './share'
5 6
 export * from './video-channels'
6 7
 export * from './videos'

+ 1
- 0
server/lib/activitypub/process/index.ts View File

@@ -1,3 +1,4 @@
1
+export * from './process'
1 2
 export * from './process-accept'
2 3
 export * from './process-add'
3 4
 export * from './process-announce'

+ 38
- 0
server/lib/activitypub/process/process.ts View File

@@ -0,0 +1,38 @@
1
+import { Activity, ActivityType } from '../../../../shared/models/activitypub/activity'
2
+import { AccountInstance } from '../../../models/account/account-interface'
3
+import { processAcceptActivity } from './process-accept'
4
+import { processAddActivity } from './process-add'
5
+import { processAnnounceActivity } from './process-announce'
6
+import { processCreateActivity } from './process-create'
7
+import { processDeleteActivity } from './process-delete'
8
+import { processFollowActivity } from './process-follow'
9
+import { processUndoActivity } from './process-undo'
10
+import { processUpdateActivity } from './process-update'
11
+import { logger } from '../../../helpers/logger'
12
+
13
+const processActivity: { [ P in ActivityType ]: (activity: Activity, inboxAccount?: AccountInstance) => Promise<any> } = {
14
+  Create: processCreateActivity,
15
+  Add: processAddActivity,
16
+  Update: processUpdateActivity,
17
+  Delete: processDeleteActivity,
18
+  Follow: processFollowActivity,
19
+  Accept: processAcceptActivity,
20
+  Announce: processAnnounceActivity,
21
+  Undo: processUndoActivity
22
+}
23
+
24
+async function processActivities (activities: Activity[], inboxAccount?: AccountInstance) {
25
+  for (const activity of activities) {
26
+    const activityProcessor = processActivity[activity.type]
27
+    if (activityProcessor === undefined) {
28
+      logger.warn('Unknown activity type %s.', activity.type, { activityId: activity.id })
29
+      continue
30
+    }
31
+
32
+    await activityProcessor(activity, inboxAccount)
33
+  }
34
+}
35
+
36
+export {
37
+  processActivities
38
+}

+ 2
- 0
server/lib/activitypub/send/send-create.ts View File

@@ -21,6 +21,8 @@ async function sendVideoAbuse (byAccount: AccountInstance, videoAbuse: VideoAbus
21 21
   return unicastTo(data, byAccount, video.VideoChannel.Account.sharedInboxUrl, t)
22 22
 }
23 23
 
24
+// async function sendCreateView ()
25
+
24 26
 async function createActivityData (url: string, byAccount: AccountInstance, object: any) {
25 27
   const { to, cc } = await getAudience(byAccount)
26 28
   const activity: ActivityCreate = {

+ 69
- 0
server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-fetcher-handler.ts View File

@@ -0,0 +1,69 @@
1
+import { logger } from '../../../helpers'
2
+import { buildSignedActivity } from '../../../helpers/activitypub'
3
+import { doRequest } from '../../../helpers/requests'
4
+import { database as db } from '../../../initializers'
5
+import { ActivityPubHttpPayload } from './activitypub-http-job-scheduler'
6
+import { processActivities } from '../../activitypub/process/process'
7
+import { ACTIVITY_PUB } from '../../../initializers/constants'
8
+
9
+async function process (payload: ActivityPubHttpPayload, jobId: number) {
10
+  logger.info('Processing ActivityPub fetcher in job %d.', jobId)
11
+
12
+  const options = {
13
+    method: 'GET',
14
+    uri: '',
15
+    json: true
16
+  }
17
+
18
+  for (const uri of payload.uris) {
19
+    options.uri = uri
20
+    logger.info('Fetching ActivityPub data on %s.', uri)
21
+
22
+    const response = await doRequest(options)
23
+    const firstBody = response.body
24
+
25
+    if (firstBody.first && Array.isArray(firstBody.first.orderedItems)) {
26
+      const activities = firstBody.first.orderedItems
27
+
28
+      logger.info('Processing %i items ActivityPub fetcher for %s.', activities.length, uri)
29
+
30
+      await processActivities(activities)
31
+    }
32
+
33
+    let limit = ACTIVITY_PUB.FETCH_PAGE_LIMIT
34
+    let i = 0
35
+    let nextLink = firstBody.first.next
36
+    while (nextLink && i < limit) {
37
+      options.uri = nextLink
38
+
39
+      const { body } = await doRequest(options)
40
+      nextLink = body.nextLink
41
+      i++
42
+
43
+      if (Array.isArray(body.orderedItems)) {
44
+        const activities = body.orderedItems
45
+        logger.info('Processing %i items ActivityPub fetcher for %s.', activities.length, uri)
46
+
47
+        await processActivities(activities)
48
+      }
49
+    }
50
+  }
51
+}
52
+
53
+function onError (err: Error, jobId: number) {
54
+  logger.error('Error when broadcasting ActivityPub request in job %d.', jobId, err)
55
+  return Promise.resolve()
56
+}
57
+
58
+function onSuccess (jobId: number) {
59
+  logger.info('Job %d is a success.', jobId)
60
+  return Promise.resolve()
61
+}
62
+
63
+// ---------------------------------------------------------------------------
64
+
65
+export {
66
+  process,
67
+  onError,
68
+  onSuccess
69
+}

+ 5
- 3
server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-job-scheduler.ts View File

@@ -2,16 +2,18 @@ import { JobScheduler, JobHandler } from '../job-scheduler'
2 2
 
3 3
 import * as activitypubHttpBroadcastHandler from './activitypub-http-broadcast-handler'
4 4
 import * as activitypubHttpUnicastHandler from './activitypub-http-unicast-handler'
5
+import * as activitypubHttpFetcherHandler from './activitypub-http-fetcher-handler'
5 6
 import { JobCategory } from '../../../../shared'
6 7
 
7 8
 type ActivityPubHttpPayload = {
8 9
   uris: string[]
9
-  signatureAccountId: number
10
-  body: any
10
+  signatureAccountId?: number
11
+  body?: any
11 12
 }
12 13
 const jobHandlers: { [ handlerName: string ]: JobHandler<ActivityPubHttpPayload, void> } = {
13 14
   activitypubHttpBroadcastHandler,
14
-  activitypubHttpUnicastHandler
15
+  activitypubHttpUnicastHandler,
16
+  activitypubHttpFetcherHandler
15 17
 }
16 18
 const jobCategory: JobCategory = 'activitypub-http'
17 19
 

Loading…
Cancel
Save