Skip to content

Commit

Permalink
feat: implement concurrency control in updatePoints API and update de…
Browse files Browse the repository at this point in the history
…pendencies
  • Loading branch information
pranshu05 committed Dec 7, 2024
1 parent 1375e8e commit 9bd1f7a
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 43 deletions.
1 change: 0 additions & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ name: API Trigger Every 5 Minutes
on:
schedule:
- cron: "*/5 * * * *"
workflow_dispatch:

jobs:
trigger-endpoint:
Expand Down
48 changes: 37 additions & 11 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"lucide-react": "^0.468.0",
"next": "14.2.15",
"next-auth": "^4.24.10",
"p-limit": "^6.1.0",
"react": "^18",
"react-dom": "^18",
"spotify-web-api-node": "^5.0.2"
Expand Down
83 changes: 52 additions & 31 deletions src/pages/api/updatePoints/index.js
Original file line number Diff line number Diff line change
@@ -1,53 +1,74 @@
import { db } from "@/lib/firebaseConfig";
import { collection, getDocs, updateDoc, doc } from "firebase/firestore";
import { collection, getDocs, query, limit, startAfter, orderBy } from "firebase/firestore";
import { getAccessToken } from "@/lib/getAccessToken";
import axios from "axios";
import pLimit from "p-limit";

const BATCH_SIZE = 10;
const CONCURRENT_REQUESTS = 10;

export default async function handler(req, res) {
try {
const usersCollection = collection(db, "users");
let lastVisible = null;
let totalProcessed = 0;

const limitConcurrency = pLimit(CONCURRENT_REQUESTS);

do {
const usersQuery = lastVisible
? query(usersCollection, orderBy("id"), startAfter(lastVisible), limit(BATCH_SIZE))
: query(usersCollection, orderBy("id"), limit(BATCH_SIZE));

const usersSnapshot = await getDocs(usersQuery);

if (usersSnapshot.empty) break;

const usersSnapshot = await getDocs(usersCollection);
const users = usersSnapshot.docs.map((doc) => ({
id: doc.id,
...doc.data(),
}));
const users = usersSnapshot.docs.map((doc) => ({
id: doc.id,
...doc.data(),
}));

let updateMessage = [];
const updatePromises = users.map((user) =>
limitConcurrency(async () => {
const { id: userId, lastUpdated, points = 0 } = user;
const currentTimestamp = Date.now();

for (const user of users) {
const { id: userId, lastUpdated, points = 0 } = user;
const token = await getAccessToken(userId);
if (lastUpdated && currentTimestamp - lastUpdated < 60 * 60 * 1000) return;

if (!token) {
return res.status(400).json({ error: "Unable to fetch access token" });
}
const token = await getAccessToken(userId);
if (!token) {
console.warn(`Unable to fetch access token for user ${userId}`);
return;
}

const currentTimestamp = Date.now();
if (lastUpdated && currentTimestamp - lastUpdated < 55 * 60 * 1000) {
updateMessage.push(`User ${userId}'s points were already updated within the last hour.`);
continue;
}
const oneHourAgo = currentTimestamp - 1 * 60 * 60 * 1000;
const response = await axios.get(
`https://api.spotify.com/v1/me/player/recently-played?after=${oneHourAgo}&limit=50`,
{
headers: { Authorization: `Bearer ${token}` },
}
);

const oneHourAgo = currentTimestamp - 1 * 60 * 60 * 1000;
const tracksPlayed = response.data.items.length;

const response = await axios.get(`https://api.spotify.com/v1/me/player/recently-played?after=${oneHourAgo}&limit=50`, {
headers: { Authorization: `Bearer ${token}` },
});
const tracksPlayed = response.data.items.length;
const userDocRef = doc(db, "users", userId);
await updateDoc(userDocRef, {
points: points + tracksPlayed,
lastUpdated: currentTimestamp,
});
})
);

const userDocRef = doc(db, "users", userId);
await updateDoc(userDocRef, {
points: points + tracksPlayed,
lastUpdated: currentTimestamp,
});
await Promise.all(updatePromises);

updateMessage.push(`User ${userId}'s points have been successfully updated.`);
}
totalProcessed += users.length;
lastVisible = usersSnapshot.docs[usersSnapshot.docs.length - 1];
} while (lastVisible);

res.status(200).json({
message: "Points update process completed.",
details: updateMessage,
totalProcessed,
});
} catch (error) {
console.error("Error updating points:", error);
Expand Down

0 comments on commit 9bd1f7a

Please sign in to comment.