Skip to content

Commit fc07273

Browse files
committed
Matched models between live and history data
1 parent 7b452fa commit fc07273

1 file changed

Lines changed: 69 additions & 36 deletions

File tree

api/src/osmosis/OsmosisHistoricalProcessor.ts

Lines changed: 69 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -7,25 +7,34 @@ import { ConfigService } from "@nestjs/config";
77
import { ClickhouseService } from "../clickhouse/clickhouse.service.js";
88
import { NumiaConfig } from "../config/config.interface.js";
99

10-
interface MintBurnEvent {
10+
interface PoolSwapEvent {
1111
timestamp: number;
12-
type: string;
12+
sender: string;
13+
side: string;
1314
amount: string;
14-
address: string;
15+
txhash: string;
1516
}
1617

17-
interface TransferEvent {
18+
interface MintEvent {
1819
timestamp: number;
19-
from: string;
20-
to: string;
2120
amount: string;
21+
mint_to_address: string;
22+
txhash: string;
2223
}
2324

24-
interface PoolSwapEvent {
25+
interface BurnEvent {
2526
timestamp: number;
26-
sender: string;
27-
side: string;
2827
amount: string;
28+
burn_from_address: string;
29+
txhash: string;
30+
}
31+
32+
interface TransferEvent {
33+
timestamp: number;
34+
from_address: string;
35+
to_address: string;
36+
amount: string;
37+
txhash: string;
2938
}
3039

3140
@Processor("osmosis-historical")
@@ -38,6 +47,8 @@ export class OsmosisHistoricalProcessor extends WorkerHost {
3847

3948
private readonly bigQueryClient: BigQuery;
4049

50+
private readonly TOKEN_DENOM = 'factory/osmo19hdqma2mj0vnmgcxag6ytswjnr8a3y07q7e70p/wLIBRA';
51+
4152
constructor(
4253
config: ConfigService,
4354

@@ -76,8 +87,6 @@ export class OsmosisHistoricalProcessor extends WorkerHost {
7687

7788
public async triggerFetchHistoricalData() {
7889
await this.osmosisQueue.add("fetchHistoricalData", undefined);
79-
// await this.fetchMintBurnEvents();
80-
// await this.fetchPoolSwapEvents();
8190
}
8291

8392
private async fetchHistoricalData() {
@@ -110,14 +119,26 @@ export class OsmosisHistoricalProcessor extends WorkerHost {
110119
type === "mint"
111120
? event.messages[0].mint_to_address
112121
: event.messages[0].burn_from_address;
113-
114-
await this.insertMintBurnEvent({ timestamp, type, amount, address });
122+
const txhash = event.hash;
115123

116124
if (type === "mint") {
125+
const mintEvent: MintEvent = {
126+
timestamp,
127+
amount,
128+
mint_to_address: address,
129+
txhash
130+
};
131+
await this.insertMintEvent(mintEvent);
117132
// Not a proper DFS, just one hop away from minter
118-
await this.fetchTransfersForAddress(
119-
event.messages[0].mint_to_address,
120-
);
133+
await this.fetchTransfersForAddress(event.messages[0].mint_to_address);
134+
} else {
135+
const burnEvent: BurnEvent = {
136+
timestamp,
137+
amount,
138+
burn_from_address: address,
139+
txhash
140+
};
141+
await this.insertBurnEvent(burnEvent);
121142
}
122143
}
123144
}
@@ -141,14 +162,14 @@ export class OsmosisHistoricalProcessor extends WorkerHost {
141162
if (event.messageTypes.includes("/cosmos.bank.v1beta1.MsgSend")) {
142163
for (const message of event.messages) {
143164
if (
144-
message.amount[0].denom ===
145-
"factory/osmo19hdqma2mj0vnmgcxag6ytswjnr8a3y07q7e70p/wLIBRA"
165+
message.amount[0].denom === this.TOKEN_DENOM
146166
) {
147-
const transferEvent = {
167+
const transferEvent: TransferEvent = {
148168
timestamp: new Date(event.blockTimestamp).getTime(),
149-
from: message.from_address,
150-
to: message.to_address,
169+
from_address: message.from_address,
170+
to_address: message.to_address,
151171
amount: message.amount.amount,
172+
txhash: event.hash,
152173
};
153174
await this.insertTransferEvent(transferEvent);
154175
}
@@ -176,56 +197,68 @@ export class OsmosisHistoricalProcessor extends WorkerHost {
176197
const [rows] = await this.bigQueryClient.query(query);
177198

178199
for (const row of rows) {
179-
const side =
180-
row.denom_in ===
181-
"factory/osmo19hdqma2mj0vnmgcxag6ytswjnr8a3y07q7e70p/wLIBRA"
182-
? "buy"
183-
: "sell";
184-
const swapEvent = {
200+
const side = row.denom_in === this.TOKEN_DENOM ? "buy" : "sell";
201+
const swapEvent: PoolSwapEvent = {
185202
timestamp: new Date(row.ingestion_timestamp.value).getTime(),
186203
sender: row.sender,
187204
side,
188205
amount: side === "buy" ? row.parsed_amount_in : row.parsed_amount_out,
206+
txhash: row.tx_id,
189207
};
190208
await this.insertSwapEvent(swapEvent);
191209
}
192210
}
193211

194-
private async insertMintBurnEvent(event: MintBurnEvent) {
195-
// console.log('Mint/Burn Event:', event);
212+
private async insertMintEvent(event: MintEvent) {
213+
console.log('Mint Event:', JSON.stringify(event, null, 2));
214+
await this.clickhouseService.client.insert({
215+
table: "mint_events",
216+
values: {
217+
timestamp: event.timestamp,
218+
amount: event.amount,
219+
mint_to_address: event.mint_to_address,
220+
txhash: event.txhash,
221+
},
222+
});
223+
}
224+
225+
private async insertBurnEvent(event: BurnEvent) {
226+
console.log('Burn Event:', JSON.stringify(event, null, 2));
196227
await this.clickhouseService.client.insert({
197-
table: "mint_burn_events",
228+
table: "burn_events",
198229
values: {
199230
timestamp: event.timestamp,
200-
type: event.type,
201231
amount: event.amount,
202-
address: event.address,
232+
burn_from_address: event.burn_from_address,
233+
txhash: event.txhash,
203234
},
204235
});
205236
}
206237

207238
private async insertTransferEvent(event: TransferEvent) {
208-
// console.log('Transfer Event:', event);
239+
console.log('Transfer Event:', JSON.stringify(event, null, 2));
209240
await this.clickhouseService.client.insert({
210241
table: "transfer_events",
211242
values: {
212243
timestamp: event.timestamp,
213-
from: event.from,
214-
to: event.to,
244+
from_address: event.from_address,
245+
to_address: event.to_address,
215246
amount: event.amount,
247+
txhash: event.txhash,
216248
},
217249
});
218250
}
219251

220252
private async insertSwapEvent(event: PoolSwapEvent) {
221-
// console.log('Pool Swap Event:', event);
253+
console.log('Pool Swap Event:', JSON.stringify(event, null, 2));
222254
await this.clickhouseService.client.insert({
223255
table: "pool_swap_events",
224256
values: {
225257
timestamp: event.timestamp,
226258
sender: event.sender,
227259
side: event.side,
228260
amount: event.amount,
261+
txhash: event.txhash,
229262
},
230263
});
231264
}

0 commit comments

Comments
 (0)