Skip to content

Commit 745028b

Browse files
Merge pull request #261 from decentraland/feat/endpoint-to-recreate-trades-mv
feat: introduce trades endpoint to recreate MV
2 parents 569592c + dfbcecd commit 745028b

File tree

11 files changed

+588
-205
lines changed

11 files changed

+588
-205
lines changed

src/controllers/handlers/trades-handler.ts

+28
Original file line numberDiff line numberDiff line change
@@ -203,3 +203,31 @@ export async function getTradeAcceptedEventHandler(
203203
}
204204
}
205205
}
206+
207+
export async function recreateTradesMaterializedViewHandler(
208+
context: Pick<HandlerContextWithPath<'trades', '/v1/trades/materialized-view/recreate'>, 'components'>
209+
) {
210+
try {
211+
const {
212+
components: { trades }
213+
} = context
214+
215+
await trades.recreateMaterializedView()
216+
217+
return {
218+
status: StatusCode.OK,
219+
body: {
220+
ok: true,
221+
message: 'Materialized view recreated successfully'
222+
}
223+
}
224+
} catch (e) {
225+
return {
226+
status: StatusCode.INTERNAL_SERVER_ERROR,
227+
body: {
228+
ok: false,
229+
message: isErrorWithMessage(e) ? e.message : 'Could not recreate materialized view'
230+
}
231+
}
232+
}
233+
}

src/controllers/routes.ts

+9-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { Router } from '@well-known-components/http-server'
22
import * as authorizationMiddleware from 'decentraland-crypto-middleware'
3+
import { createTradesViewAuthMiddleware } from '../logic/http/auth'
34
import { TradeCreationSchema } from '../ports/trades/schemas'
45
import { GlobalContext } from '../types'
56
import { createBalanceHandler } from './handlers/balance-handler'
@@ -15,7 +16,13 @@ import { getPricesHandler } from './handlers/prices-handler'
1516
import { getRankingsHandler } from './handlers/rankings-handler'
1617
import { getSalesHandler } from './handlers/sales-handler'
1718
import { getStatsHandler } from './handlers/stats-handler'
18-
import { addTradeHandler, getTradeAcceptedEventHandler, getTradeHandler, getTradesHandler } from './handlers/trades-handler'
19+
import {
20+
addTradeHandler,
21+
getTradeAcceptedEventHandler,
22+
getTradeHandler,
23+
getTradesHandler,
24+
recreateTradesMaterializedViewHandler
25+
} from './handlers/trades-handler'
1926
import { createTransakHandler } from './handlers/transak-handler'
2027
import { getTrendingsHandler } from './handlers/trending-handler'
2128
import { getVolumeHandler } from './handlers/volume-handler'
@@ -112,6 +119,7 @@ export async function setupRouter(globalContext: GlobalContext): Promise<Router<
112119
router.get('/v1/stats/:category/:stat', getStatsHandler)
113120
router.get('/v1/rankings/:entity/:timeframe', getRankingsHandler)
114121
router.get('/v1/volume/:timeframe', getVolumeHandler)
122+
router.post('/v1/trades/materialized-view/recreate', createTradesViewAuthMiddleware(), recreateTradesMaterializedViewHandler)
115123

116124
setupFavoritesRouter(router, { components })
117125

src/logic/http/auth.ts

+40
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
import { IHttpServerComponent } from '@well-known-components/interfaces'
2+
import { HandlerContextWithPath, StatusCode } from '../../types'
3+
4+
export async function validateApiToken(
5+
context: Pick<HandlerContextWithPath<'config', string>, 'components' | 'request'>,
6+
apiTokenHeaderName = 'x-api-token'
7+
): Promise<boolean> {
8+
const { config } = context.components
9+
const expectedApiToken = await config.getString('MARKETPLACE_SERVER_TRADES_API_TOKEN')
10+
11+
if (!expectedApiToken) {
12+
return false
13+
}
14+
15+
const providedApiToken = context.request.headers.get(apiTokenHeaderName)
16+
return providedApiToken === expectedApiToken
17+
}
18+
19+
export function createTradesViewAuthMiddleware(apiTokenHeaderName = 'x-api-token') {
20+
return async function tradesViewAuthMiddleware(
21+
context: Pick<HandlerContextWithPath<'config' | 'logs', string>, 'components' | 'request'>,
22+
next: () => Promise<IHttpServerComponent.IResponse>
23+
) {
24+
const isValid = await validateApiToken(context, apiTokenHeaderName)
25+
const { logs } = context.components
26+
27+
if (!isValid) {
28+
logs.getLogger('tradesViewAuthMiddleware').error('[tradesViewAuthMiddleware] Invalid token, returning unauthorized')
29+
return {
30+
status: StatusCode.UNAUTHORIZED,
31+
body: {
32+
ok: false,
33+
message: 'Unauthorized'
34+
}
35+
}
36+
}
37+
38+
return next()
39+
}
40+
}

src/logic/trades/materialized-view.ts

+291
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,291 @@
1+
import { IPgComponent } from '@well-known-components/pg-component'
2+
import { ContractName, getContract } from 'decentraland-transactions'
3+
import { MARKETPLACE_SQUID_SCHEMA } from '../../constants'
4+
import { getEthereumChainId, getPolygonChainId } from '../chainIds'
5+
6+
export const TRADES_MV_NAME = 'mv_trades'
7+
8+
export async function recreateTradesMaterializedView(db: IPgComponent) {
9+
const marketplacePolygon = getContract(ContractName.OffChainMarketplace, getPolygonChainId())
10+
const marketplaceEthereum = getContract(ContractName.OffChainMarketplace, getEthereumChainId())
11+
12+
// Start transaction
13+
const client = await db.getPool().connect()
14+
try {
15+
await client.query('BEGIN')
16+
17+
// Drop triggers with exception handling
18+
await client.query(`
19+
DO $$
20+
BEGIN
21+
BEGIN
22+
DROP TRIGGER IF EXISTS refresh_trades_mv_on_trades ON marketplace.trades;
23+
EXCEPTION WHEN insufficient_privilege THEN
24+
RAISE NOTICE 'Insufficient privileges to drop trigger refresh_trades_mv_on_trades';
25+
END;
26+
27+
BEGIN
28+
DROP TRIGGER IF EXISTS refresh_trades_mv_on_nft ON ${MARKETPLACE_SQUID_SCHEMA}.nft;
29+
EXCEPTION WHEN insufficient_privilege THEN
30+
RAISE NOTICE 'Insufficient privileges to drop trigger refresh_trades_mv_on_nft';
31+
END;
32+
33+
BEGIN
34+
DROP TRIGGER IF EXISTS refresh_trades_mv_on_item ON ${MARKETPLACE_SQUID_SCHEMA}.item;
35+
EXCEPTION WHEN insufficient_privilege THEN
36+
RAISE NOTICE 'Insufficient privileges to drop trigger refresh_trades_mv_on_item';
37+
END;
38+
39+
BEGIN
40+
DROP TRIGGER IF EXISTS refresh_trades_mv_on_squid_trades_trade ON squid_trades.trade;
41+
EXCEPTION WHEN insufficient_privilege THEN
42+
RAISE NOTICE 'Insufficient privileges to drop trigger refresh_trades_mv_on_squid_trades_trade';
43+
END;
44+
45+
BEGIN
46+
DROP TRIGGER IF EXISTS refresh_trades_mv_on_signature_index ON squid_trades.signature_index;
47+
EXCEPTION WHEN insufficient_privilege THEN
48+
RAISE NOTICE 'Insufficient privileges to drop trigger refresh_trades_mv_on_signature_index';
49+
END;
50+
END
51+
$$;
52+
`)
53+
54+
// Drop materialized view
55+
await client.query(`DROP MATERIALIZED VIEW IF EXISTS marketplace.${TRADES_MV_NAME}`)
56+
57+
// Drop function
58+
await client.query('DROP FUNCTION IF EXISTS refresh_trades_mv() CASCADE')
59+
60+
// Create materialized view
61+
await client.query(`
62+
CREATE MATERIALIZED VIEW marketplace.${TRADES_MV_NAME} AS
63+
WITH trades_owner_ok AS (
64+
SELECT t.id
65+
FROM marketplace.trades t
66+
JOIN marketplace.trade_assets ta ON t.id = ta.trade_id
67+
LEFT JOIN marketplace.trade_assets_erc721 erc721_asset ON ta.id = erc721_asset.asset_id
68+
LEFT JOIN ${MARKETPLACE_SQUID_SCHEMA}.nft nft
69+
ON ta.contract_address = nft.contract_address
70+
AND ta.direction = 'sent'
71+
AND nft.token_id = erc721_asset.token_id::numeric
72+
WHERE t.type IN ('public_item_order', 'public_nft_order')
73+
GROUP BY t.id
74+
HAVING bool_and(ta.direction != 'sent' OR nft.owner_address = t.signer)
75+
)
76+
SELECT
77+
t.id,
78+
t.created_at,
79+
t.type,
80+
t.signer,
81+
MAX(CASE WHEN av.direction = 'sent' THEN av.contract_address END) AS contract_address_sent,
82+
MAX(CASE WHEN av.direction = 'received' THEN av.amount END) AS amount_received,
83+
MAX(CASE WHEN av.direction = 'sent' THEN av.available END) AS available,
84+
json_object_agg(
85+
av.direction,
86+
json_build_object(
87+
'contract_address', av.contract_address,
88+
'direction', av.direction,
89+
'beneficiary', av.beneficiary,
90+
'extra', av.extra,
91+
'token_id', av.token_id,
92+
'item_id', av.item_id,
93+
'amount', av.amount,
94+
'creator', av.creator,
95+
'owner', av.nft_owner,
96+
'category', av.category,
97+
'nft_id', av.nft_id,
98+
'issued_id', av.issued_id,
99+
'nft_name', av.nft_name
100+
)
101+
) AS assets,
102+
103+
MAX(av.contract_address) FILTER (WHERE av.direction = 'sent') AS sent_contract_address,
104+
MAX(av.token_id) FILTER (WHERE av.direction = 'sent') AS sent_token_id,
105+
MAX(av.category) FILTER (WHERE av.direction = 'sent') AS sent_nft_category,
106+
MAX(av.item_id) FILTER (WHERE av.direction = 'sent') AS sent_item_id,
107+
CASE
108+
WHEN COUNT(CASE WHEN st.action = 'cancelled' THEN 1 END) > 0 THEN 'cancelled'
109+
WHEN t.expires_at < now()::timestamptz(3) THEN 'cancelled'
110+
WHEN (
111+
(si_signer.index IS NOT NULL
112+
AND si_signer.index != (t.checks ->> 'signerSignatureIndex')::int)
113+
OR (si_signer.index IS NULL
114+
AND (t.checks ->> 'signerSignatureIndex')::int != 0)
115+
)
116+
THEN 'cancelled'
117+
WHEN (
118+
(si_contract.index IS NOT NULL
119+
AND si_contract.index != (t.checks ->> 'contractSignatureIndex')::int)
120+
OR (si_contract.index IS NULL
121+
AND (t.checks ->> 'contractSignatureIndex')::int != 0)
122+
)
123+
THEN 'cancelled'
124+
WHEN COUNT(CASE WHEN st.action = 'executed' THEN 1 END)
125+
>= (t.checks ->> 'uses')::int THEN 'sold'
126+
ELSE 'open'
127+
END AS status
128+
129+
FROM marketplace.trades AS t
130+
JOIN trades_owner_ok AS ok
131+
ON t.id = ok.id
132+
133+
JOIN (
134+
SELECT
135+
ta.id,
136+
ta.trade_id,
137+
ta.contract_address,
138+
ta.direction,
139+
ta.beneficiary,
140+
ta.extra,
141+
erc721_asset.token_id,
142+
erc20_asset.amount,
143+
item.creator,
144+
item.available,
145+
nft.owner_address AS nft_owner,
146+
nft.category,
147+
nft.id AS nft_id,
148+
nft.issued_id AS issued_id,
149+
nft.name AS nft_name,
150+
coalesce(nft.item_blockchain_id::text, item_asset.item_id) AS item_id
151+
FROM marketplace.trade_assets AS ta
152+
LEFT JOIN marketplace.trade_assets_erc721 AS erc721_asset
153+
ON ta.id = erc721_asset.asset_id
154+
LEFT JOIN marketplace.trade_assets_erc20 AS erc20_asset
155+
ON ta.id = erc20_asset.asset_id
156+
LEFT JOIN marketplace.trade_assets_item AS item_asset
157+
ON ta.id = item_asset.asset_id
158+
LEFT JOIN ${MARKETPLACE_SQUID_SCHEMA}.item AS item
159+
ON ta.contract_address = item.collection_id
160+
AND item_asset.item_id::numeric = item.blockchain_id
161+
LEFT JOIN ${MARKETPLACE_SQUID_SCHEMA}.nft AS nft
162+
ON ta.contract_address = nft.contract_address
163+
AND erc721_asset.token_id::numeric = nft.token_id
164+
) AS av
165+
ON t.id = av.trade_id
166+
167+
LEFT JOIN squid_trades.trade AS st
168+
ON st.signature = t.hashed_signature
169+
170+
LEFT JOIN squid_trades.signature_index AS si_signer
171+
ON LOWER(si_signer.address) = LOWER(t.signer)
172+
173+
LEFT JOIN (
174+
SELECT *
175+
FROM squid_trades.signature_index idx
176+
WHERE LOWER(idx.address) IN (
177+
'${marketplacePolygon.address}',
178+
'${marketplaceEthereum.address}'
179+
)
180+
) AS si_contract
181+
ON t.network = si_contract.network
182+
183+
WHERE t.type IN ('public_item_order', 'public_nft_order')
184+
GROUP BY
185+
t.id,
186+
t.type,
187+
t.created_at,
188+
t.network,
189+
t.chain_id,
190+
t.signer,
191+
t.checks,
192+
si_contract.index,
193+
si_signer.index;
194+
`)
195+
196+
// Create primary index
197+
await client.query(`CREATE UNIQUE INDEX IF NOT EXISTS idx_mv_trades_id ON marketplace.${TRADES_MV_NAME} (id)`)
198+
199+
// Create additional indexes for performance optimization
200+
// Status and type - improves queries filtering by open trades and specific trade types
201+
await client.query(`CREATE INDEX IF NOT EXISTS idx_mv_trades_status_type ON marketplace.${TRADES_MV_NAME} (status, type)`)
202+
// Creation date - optimizes queries sorting by recently listed
203+
await client.query(`CREATE INDEX IF NOT EXISTS idx_mv_trades_created_at ON marketplace.${TRADES_MV_NAME} (created_at DESC)`)
204+
// Category - improves queries filtering by NFT category
205+
await client.query(`CREATE INDEX IF NOT EXISTS idx_mv_trades_category ON marketplace.${TRADES_MV_NAME} (sent_nft_category)`)
206+
// Contract and token - optimizes joins with NFT tables
207+
await client.query(
208+
`CREATE INDEX IF NOT EXISTS idx_mv_trades_contract_token ON marketplace.${TRADES_MV_NAME} (contract_address_sent, sent_token_id)`
209+
)
210+
211+
// Create refresh function
212+
await client.query(`
213+
CREATE OR REPLACE FUNCTION refresh_trades_mv()
214+
RETURNS TRIGGER
215+
LANGUAGE plpgsql
216+
AS $$
217+
BEGIN
218+
REFRESH MATERIALIZED VIEW CONCURRENTLY marketplace.${TRADES_MV_NAME};
219+
RETURN NULL;
220+
END;
221+
$$;
222+
`)
223+
224+
// Create triggers with exception handling
225+
await client.query(`
226+
DO $$
227+
BEGIN
228+
BEGIN
229+
CREATE TRIGGER refresh_trades_mv_on_trades
230+
AFTER INSERT OR UPDATE OR DELETE
231+
ON marketplace.trades
232+
FOR EACH STATEMENT
233+
EXECUTE FUNCTION refresh_trades_mv();
234+
EXCEPTION WHEN insufficient_privilege THEN
235+
RAISE NOTICE 'Insufficient privileges to create trigger refresh_trades_mv_on_trades';
236+
END;
237+
238+
BEGIN
239+
CREATE TRIGGER refresh_trades_mv_on_nft
240+
AFTER INSERT OR UPDATE OR DELETE
241+
ON ${MARKETPLACE_SQUID_SCHEMA}.nft
242+
FOR EACH STATEMENT
243+
EXECUTE FUNCTION refresh_trades_mv();
244+
EXCEPTION WHEN insufficient_privilege THEN
245+
RAISE NOTICE 'Insufficient privileges to create trigger refresh_trades_mv_on_nft';
246+
END;
247+
248+
BEGIN
249+
CREATE TRIGGER refresh_trades_mv_on_item
250+
AFTER INSERT OR UPDATE OR DELETE
251+
ON ${MARKETPLACE_SQUID_SCHEMA}.item
252+
FOR EACH STATEMENT
253+
EXECUTE FUNCTION refresh_trades_mv();
254+
EXCEPTION WHEN insufficient_privilege THEN
255+
RAISE NOTICE 'Insufficient privileges to create trigger refresh_trades_mv_on_item';
256+
END;
257+
258+
BEGIN
259+
CREATE TRIGGER refresh_trades_mv_on_squid_trades_trade
260+
AFTER INSERT OR UPDATE OR DELETE
261+
ON squid_trades.trade
262+
FOR EACH STATEMENT
263+
EXECUTE FUNCTION refresh_trades_mv();
264+
EXCEPTION WHEN insufficient_privilege THEN
265+
RAISE NOTICE 'Insufficient privileges to create trigger refresh_trades_mv_on_squid_trades_trade';
266+
END;
267+
268+
BEGIN
269+
CREATE TRIGGER refresh_trades_mv_on_signature_index
270+
AFTER INSERT OR UPDATE OR DELETE
271+
ON squid_trades.signature_index
272+
FOR EACH STATEMENT
273+
EXECUTE FUNCTION refresh_trades_mv();
274+
EXCEPTION WHEN insufficient_privilege THEN
275+
RAISE NOTICE 'Insufficient privileges to create trigger refresh_trades_mv_on_signature_index';
276+
END;
277+
END
278+
$$;
279+
`)
280+
281+
// Set the owner of the materialized view
282+
await client.query(`ALTER MATERIALIZED VIEW marketplace.${TRADES_MV_NAME} OWNER TO mv_trades_owner;`)
283+
284+
await client.query('COMMIT')
285+
} catch (error) {
286+
await client.query('ROLLBACK')
287+
throw error
288+
} finally {
289+
client.release()
290+
}
291+
}

0 commit comments

Comments
 (0)