Skip to content

Commit

Permalink
feat: download from gateway if missing shards
Browse files Browse the repository at this point in the history
  • Loading branch information
Alan Shaw committed Jul 3, 2024
1 parent de3fb82 commit 8a1c971
Show file tree
Hide file tree
Showing 3 changed files with 168 additions and 8 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
"archy": "^1.0.0",
"ariakit-utils": "0.17.0-next.27",
"blueimp-md5": "^2.19.0",
"linkdex": "^3.0.0",
"multiformats": "^12.1.3",
"next": "^13.5.4",
"nft.storage": "^7.1.1",
Expand Down
84 changes: 81 additions & 3 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

91 changes: 86 additions & 5 deletions src/lib/migrations/nft-storage.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
import { NFTStorage } from 'nft.storage'
import * as Link from 'multiformats/link'
import { sha256 } from 'multiformats/hashes/sha2'
import * as Claims from '@web3-storage/content-claims/client'
import { CarBlockIterator } from '@ipld/car'
import { LinkIndexer } from 'linkdex'
import { MigrationSourceConfiguration, Shard, Upload } from './api'

const carCode = 0x0202

export const checkToken = async (token: string) => {
const client = new NFTStorage({ token })
try {
Expand All @@ -22,6 +28,19 @@ interface ListOptions {
before?: string
}

interface UploadListItem {
/** The root CID of the upload */
cid: string
/** CIDs of CARs the upload was sharded into. */
parts: string[]
/**
* Type of the upload.
* "remote" = Pinning Service API
* "nft" = Multipart upload of NFT data
*/
type: string
}

export class NFTStorageMigrator {
#token
#cursor
Expand Down Expand Up @@ -65,8 +84,43 @@ export class NFTStorageMigrator {
continue
}

const root = Link.parse(raw.cid)

let parts = raw.parts
if (!parts.length && raw.type === 'remote') {
try {
// If no parts, and this was a pin request, then pickup may have
// dealt with it.
//
// Pickup downloads the data, writes a CAR, calculates the CAR CID
// and then copies the CAR to carpark, using the CAR CID as the key.
// Format: `<carCID>/<carCID>.car`.
//
// If so we should be able to get a location claim, and key in the
// claim should be the CAR CID.
const claims = await Claims.read(root)
const locationClaims = []
for (const c of claims) {
if (c.type === 'assert/location') {
locationClaims.push(c)
}
}
// It needs to be the _only_ location claim because someone else may
// have uploaded the same root CID sharded across multiple CARs
if (locationClaims.length === 1) {
const url = locationClaims[0].location.find(url => url.endsWith('.car'))
if (url) {
const part = url.split('/').pop()?.slice(0, -4)
if (part) parts.push(part)
}
}
} catch (err) {
console.error(`failed to read content claims for PSA item: ${root}`, err)
}
}

const shards: Shard[] = []
for (const p of raw.parts) {
for (const p of parts) {
shards.push({
link: Link.parse(p),
size: async () => {
Expand All @@ -77,13 +131,40 @@ export class NFTStorageMigrator {
return parseInt(contentLength)
},
bytes: async () => {
// TODO: fetch from /complete?
throw new Error('not implemented')
// Should not be necessary - service should signal this shard
// already exists and does not need re-upload.
const res = await fetch(`https://${p}.ipfs.w3s.link/`)
if (!res.ok) throw new Error(`failed to get shard: ${p}`, { cause: { status: res.status } })
return new Uint8Array(await res.arrayBuffer())
}
})
}

yield { root: Link.parse(raw.cid), shards } as Upload
// Add a synthetic shard that is the entire DAG.
// Attempt to download from gateway.
// TODO: fetch from /complete?
if (!shards.length) {
try {
const res = await fetch(`https://w3s.link/ipfs/${root}?format=car`)
if (!res.ok) throw new Error('failed to get DAG as CAR', { cause: { status: res.status } })
const bytes = new Uint8Array(await res.arrayBuffer())
// Verify CAR is complete
const iterator = await CarBlockIterator.fromBytes(bytes)
const index = new LinkIndexer()
for await (const block of iterator) {
index.decodeAndIndex(block)
}
if (!index.isCompleteDag()) {
throw new Error('CAR does not contain a complete DAG')
}
const link = Link.create(carCode, await sha256.digest(bytes))
shards.push({ link, size: async () => bytes.length, bytes: async () => bytes })
} catch (err) {
console.error(`failed to download CAR for item: ${root}`, err)
}
}

yield { root, shards } as Upload
}
}
}
Expand All @@ -92,7 +173,7 @@ export class NFTStorageMigrator {
/**
* Follow before with last item, to fetch all the things.
*/
async function * paginator (fn: (service: Service, opts: ListOptions) => Promise<Response>, service: Service, opts: ListOptions) {
async function * paginator (fn: (service: Service, opts: ListOptions) => Promise<Response>, service: Service, opts: ListOptions): AsyncGenerator<{ value: UploadListItem[] }> {
let res = await fn(service, opts)
if (!res.ok) {
if (res.status === 429) {
Expand Down

0 comments on commit 8a1c971

Please sign in to comment.