Click to Contract/Expend
minikube tunnel
# ./ticketing/client
docker build -t pcsmomo/client .
docker push pcsmomo/client # skaffold pull the image from here
# ./ticketing
skaffold dev
kubectl create secret generic jwt-secret --from-literal=JWT_KEY=asdfadd 127.0.0.1 ticketing.dev to /etc/hosts
# temporarily port forwarding NATS services
k get pods
# NATS client
k port-forward nats-depl-588b8b6b8-2s9nt 4222:4222
# NATS mornitoring service (not necessary. For debugging)
k port-forward nats-depl-588b8b6b8-2s9nt 8222:8222# NATS publisher and listener in separated tabs
npm run publish
npm run listen # x2 or x3, for more listener(=consumer)- NATS.io Docs
- STAN aka 'NATS Streaming' Docs
NATSandNATS Streaming Serverare two different things- NATS Streaming Server is top of NATS and we will use NATS Streaming Server in this course
- NATS Streaming implements some extraordinarily important design decisions that will affect our app
- We are going to run the official
nats-streamingdocker image in kubernetes. Need to read the image's docs- https://hub.docker.com/_/nats-streaming : Commandline Options
WARNING Deprecation Notice
The NATS Streaming Server is being deprecated. Critical bug fixes and security fixes will be applied until June of 2023.
NATS-enabled applications requiring persistence should use JetStream.
- Create a new sub-project with typescript support
- Install node-nats-streaming library and connect to nats streaming server
- We should have two npm scripts, one to emit events, and one to listen for events
- This program will be ran outside of kubernetes!
# ./ticketing
mkdir nats-test
cd nats-test
npm init -y
npm install node-nats-streaming ts-node-dev typescript @types/node
tsc --init- Option #1: Publisher Program - (Ingress-Nginx - NATS ClusterIP Service - NATS Pod)
- a bit too heavy for our case
- Option #2: Publisher Program - (NodePort Service - NATS Pod)
- creating a config file is still too much
- Option #3: Publisher Program - (Port-Forward Port 4222 - NATS Pod)
- Great for temporary connection!!
- Need to keep the terminal open
kubectl get pods
# NAME READY STATUS RESTARTS AGE
# ...
# nats-depl-689cdd9577-flmx7 1/1 Running 0 41m
kubectl port-forward nats-depl-689cdd9577-flmx7 4222:4222
# Forwarding from 127.0.0.1:4222 -> 4222
# Forwarding from [::1]:4222 -> 4222
# Keep the terminal open# ./ticketing/playground/nats-test
npm run publish
# [INFO] 22:56:46 ts-node-dev ver. 2.0.0 (using ts-node ver. 10.9.1, typescript ver. 4.9.5)
# Publisher connected to NATS# ./ticketing/playground/nats-test
npm run listen
# Listener connected to NATSOn the publisher terminal, type rs and enter and repeat it a few more times
# ./ticketing/playground/nats-test
npm run publish
rs
# EnterThe server restarts and it publishes a few more messages
Open a new terminal and run another listener
# ./ticketing/playground/nats-test
npm run listen
# Error: Unhandled error. ('stan: clientID already registered')Every connection has clientID and we've set the clientID, "123" for listener
Now we want it random IDs
It looks like as the same concept as Kafka Consumer Group
While consuming a message if the process fails (db connection issue, etc), we want to re-consume the message.
// the defualt is false
const options = stan.subscriptionOptions().setManualAckMode(true);
// as default, after 30s timeout if we don't acknowledge back,
// the message will be re-consumed on the next listenerSometimes, the listerner couldn't consume a message hung somewhere and the next message is processed first.
To investigate it, check the the monitor service
# In a new terminal
kubectl port-forward nats-depl-58d8b8d57d-6csgg 8222:8222navigate, http://localhost:8222/streaming
- In the
?subs=1URL, we can see two subscriptions. - if we restart,
rsa listner and refresh the page, we can see 3 subscriptions. - nats-streaming-server expects the lost connection to be back and wait for 30 seconds keeping the subscription.
- so this time, if a message is allocated to the disconnected subscription, it hung.
- Option #1. Tweak the nats-streaming docker image option
- -hbi, --hb_interval Interval at which server sends heartbeat to a client
- -hbt, --hb_timeout How long server waits for a heartbeat response
- -hbf, --hb_fail_count Number of failed heartbeats before server closes the client connection
-
it still has a few seconds missing during health check interval
- Option #2. listening
closefrom the codestan.on('close', () => {});
-
It works for general cases, but it wouldn't prevent 100% shutdown issues. e.g. node server closed from Activity monitor or something.
-
// ticketing/playground/nats-test/src/listener.ts
const options = stan.setDeliverAllAvailable().setDurableName('order-service');
stan.subscribe('', 'orders-service-queue-group', options);These three options work very well
- Initially a listener consume all messages.
- when the listener restarts, it consumes messages after the last message consumed(=offset?)
subject = Subjects.TicketCreated;Property 'subject' in type 'TicketCreatedListener' is not assignable to the same property in base type 'Listener<TicketCreatedEvent>'.
Type 'Subjects' is not assignable to type 'Subjects.TicketCreated'.ts(2416)If you see an error message like this, consider typing it.
Typescript is worrying if the subject would be changed some other value.\
- Option #1:
subject: Subjects.TicketCreated = Subjects.TicketCreated;
- Option #2:
readonly subject = Subjects.TicketCreated;
// ./ticketing/playground/nats-test/src/events/ticket-created-listener.ts
// correct
onMessage(data: TicketCreatedEvent['data'], msg: Message) {}
// typescript will give you warning
interface FakeData {
name: string;
cost: number
}
onMessage(data: FakeData, msg: Message) {}How to make publish method to return Promise
// ./ticketing/playground/nats-test/src/events/base-publisher.ts
// before
publish(data: T['data']) {
this.client.publish(this.subject, JSON.stringify(data), () => {
console.log('Event published.');
});
}
// after
publish(data: T['data']): Promise<void> {
return new Promise((resolve, reject) => {
this.client.publish(this.subject, JSON.stringify(data), err => {
if (err) {
return reject(err);
}
console.log('Event published to subject', this.subject);
resolve();
});
});
}
// ./ticketing/playground/nats-test/src/publisher.ts
await publisher.publish({})We will be using common event definitions in the common module as all our services are in Typescript.
However, if you have services in different languages, this approach won't work.\
- JSON Schema
- Protobuf
- Apache Avro
# ./ticketing/common
# udemy/microservices-node-react/dwktickets-npm/commmon
npm install node-nats-streaming
# udemy/microservices-node-react/dwktickets-npm/commmon
npm run pubk get pods
k delete nats-depl-6fdbfb6c6d-6wftz# ./ticketing/auth
# ./ticketing/tickets
# package.json
"@dwktickets/common": "^1.0.6",
npm update @dwktickets/commonStephen suggests to poll data from ticket Mongo Object, not from the req.body
Because there possibly could be sanitizing or pre-post(?) on the model side
So the value could be different
new TicketCreatedPublisher(client).publish({
id: ticket.id,
title: ticket.title,
price: ticket.price,
userId: ticket.userId,
});# ./ticketing/tickets
npm install node-nats-streamingMongoose is a singleton. Once you connect to mongoose, you can use it in any other files
And we want to do it exactly the same with nats-client
k get pods
k delete pod nats-depl-7696c4dc97-4lsvqThen tickets service is exited from this code
natsWrapper.client.on('close', () => {
console.log('NATS connecting closed!');
process.exit();
});And kubernetes restarted the ticket service automatically
k get pods
# NAME READY STATUS RESTARTS AGE
# tickets-depl-689578c6c6-6wzv4 1/1 Running 1 (57s ago) 85sPort forward to use nats-test to listen the ticket:created events
kubectl port-forward <nats-depl-pod-name> 4222:4222
# ./ticketing/playground/nats-test
npm run listen # x2- Postman -> Signin or Signup to get credential -> Create a ticket
- postman script
Successfully listening!!
Currently, there's no checking logic if one of actions is failed
Wwhich is critical
// 1
await ticket.save();
// 2
await new TicketCreatedPublisher(natsWrapper.client).publish({
id: ticket.id,
title: ticket.title,
price: ticket.price,
userId: ticket.userId,
});
res.status(201).send(ticket);We will manage them in
transaction
- There's no NATS connection in tests
- NATS connection is in
src/index.tsonly and tests imports onlysrc/app.ts
- Option #1. connect to actual NATS server in test
- Not ideal
- Option #2. Use
Fake NatsWrapper- Redirect import statement using Jest
Mocking (Faking) Imports with Jest
- Find the file that we want to 'fake'
- In the same directory, create a folder claled
__mocks__ - In that folder, create a file with an identical name to the file we want to fake
tickets/src/__mocks__/nats-wrappter.ts
- Write a fake implementation
- Tell jest to use that fake file in our test file
jest.mock('../../nats-wrapper');
Globally mocking the file
// ticketing/tickets/src/test/setup.ts
// __mocks__ folder is a special folder for jest
// and it will automatically look for a file with the same name
// as the module that we are trying to mock out.
jest.mock('../nats-wrapper.ts');// ticketing/tickets/src/__mocks__/nats-wrapper.ts
jest
.fn()
.mockImplementation((subject: string, data: string, callback: () => void) => {
callback();
});
// ticketing/tickets/src/routes/__test__/new.test.ts
// This is a mock function that we can use to make assertions about
expect(natsWrapper.client.publish).toHaveBeenCalled();
// ticketing/tickets/src/test/setup.ts
beforeAll(async () => {
jest.clearAllMocks();
});# ticketing/infra/k8s/tickets-depl.yaml
- name: NATS_CLIENT_ID
valueFrom:
fieldRef:
fieldPath: metadata.name
