The previous posts in this series dealt with creating a fast and reliable suite of Acceptance Tests using memory fakes, and how to use Contract Tests to assert the truthfulness of these memory fakes. This post will cover an approach for covering a clique of microservices communicating via NestJS’s microservices infrastructure.
We will expand upon the example used in the previous posts - an ecommerce backend consisting of a Product Catalog, a Cart and an Orders system. Initially these 3 concerns resided in a single monolithic HTTP server, but as the product became more popular, differing SLOs and team structure required the separation of each of these concerns to different microservice; the UI will communicate with all three via HTTP calls, and additionally the Cart microservice will retrieve a product from the Catalog microservice when adding it to a cart, and then create an order from the cart by publishing an event to the Orders microservice. This example uses Kafka as the Transport between microservices but other Transports should work more or less the same.
Each microservice resides in its own folder and has its own Module and its own bootstrap function. Here are the module and bootstrap logic for the Catalog microservice:
class CatalogModule {
static register(productRepo: ProductRepository): DynamicModule {
return {
module: CatalogModule,
providers: [
{
provide: PRODUCT_REPO,
useValue: productRepo
},
],
controllers: [ProductController]
}
}
}
async function createCatalogApp(
productRepo: ProductRepository,
microserviceOptions: MicroserviceOptions) {
const app = await NestFactory.create(
CatalogModule.register(productRepo))
app.enableCors({origin: "*"});
app.connectMicroservice(microserviceOptions);
await app.init();
await app.startAllMicroservices();
return app;
}
async function startCatalogServer() {
const mongo = await new MongoClient(MONGO_URL).connect();
const db = mongo.db('store');
const productRepo = new MongoDBProductRepository(db);
const app = await createCatalogApp(productRepo, {
transport: Transport.KAFKA,
options: {
client: {
clientId: 'product-catalog',
brokers: [KAFKA_BROKER_URL],
},
consumer: {
groupId: 'product-catalog-server',
},
},
});
await app.listen(CATALOG_PORT);
}
Note that the startCatalogServer() function instantiates the production adapters (MongoDB client and Kafka connection) and injects them into the createCatalogApp() function - which we will also call from the Test Harness, albeit with memory fakes instead. While the Orders microservice looks almost the same, the Cart microservice has one difference, since it consumes the Kafka endpoints exposed by the other microservices:
async function createCartApp(
clientsModule: DynamicModule,
orderRepo: OrderRepository) {
const app = await NestFactory.create(
CartModule.register(clientsModule, orderRepo));
app.enableCors({origin: "*"});
await app.init();
return app;
}
async function startCartServer() {
const mongo = await new MongoClient(MONGO_URL).connect();
const db = mongo.db(dbName);
const orderRepo = new MongoDBOrderRepository(db);
const clientsModule = ClientsModule.register([
{
name: CART_CLIENT,
transport: Transport.KAFKA,
options: {
client: {
clientId: 'cart-client',
brokers: [KAFKA_BROKER_URL],
},
consumer: {
groupId: 'cart-consumer',
},
},
},
]);
const app = await createCartApp(clientsModule, orderRepo);
await app.listen(CART_PORT);
}
In the Order and Catalog microservices, the controllers now expose endpoints that are detected by the NestJS microservices infrastructure. Both use the @MessagePattern decorator since we want to get a product by its id, and to create an order, returning it. Of course, in reality this pattern is somewhat of an abuse of event-driven architecture, but for the sake of the example we can let it slide.
@Controller("/products")
export class ProductController {
constructor(@Inject(PRODUCT_REPO) productRepo: ProductRepository)
{}
…
@MessagePattern('productById')
async findProductById(id: string) {
return this.productRepo.findById(id);
}
}
@Controller("/order")
export class OrderController {
constructor(@Inject(ORDER_REPO) orderRepo: OrderRepository) {
}
…
@MessagePattern("createOrder")
async createOrder(order: OrderTemplate) {
return this.orderRepo.create(order);
}
}
The Cart microservice will use the ClientKafka class provided by NestJS to send these messages to the other microservices and receive the response message:
export class CartManager implements OnModuleInit {
constructor(@Inject(CART_CLIENT) client: ClientKafka,
@Inject(CART_REPO) cartRepo: CartRepository) {
}
async addToCart(cartId: string, productId: string) {
const product = await firstValueFrom(
this.client.send<Product>('productById', productId));
if (!product) {
throw new Error(`product ${productId} not found`);
}
await this.cartRepo.addToCart(cartId, product);
}
async checkout(id: string) {
const cart = await this.cartRepo.findById(id);
if (!cart) {
throw new BadRequestException(`cart ${id} not found`);
}
return firstValueFrom(
this.client.send<Order>('createOrder',
{items: cart.items}));
}
async onModuleInit() {
this.client.subscribeToResponseOf('productById');
this.client.subscribeToResponseOf('createOrder');
await this.client.connect();
}
}
Our objective is to replace Kafka in acceptance tests with a memory-based fake, implemented using Node’s EventEmitter. If we create a common event emitter and share it between all microservices, it would serve as the message bus. As long as we implement the same API as the ClientKafka and ServerKafka classes from NestJS’s Kafka support, the system should behave exactly the same.
To implement the server side of the fake, we must extend the Server class and implement the CustomTransportStrategy interface. There’s a lot of code copied from other Transport implementations but the gist is that we accept an EventEmitter and subscribe to the MESSAGE_EVENT event. When a message arrives, we get the appropriate handler (registered by the NestJS infrastructure into our Server instance at bootstrap time) and invoke it. Finally, if the handler is decorated with @MessagePattern, the response is sent back to the caller via the same event emitter.
export class MemoryTransportServer
extends Server
implements CustomTransportStrategy {
constructor(private readonly emitter: EventEmitter) {
super();
this.emitter.on(MESSAGE_EVENT, this.handleMessage.bind(this));
}
async handleMessage(packet: ReadPacket & PacketId) {
const pattern = !isString(packet.pattern)
? JSON.stringify(packet.pattern)
: packet.pattern;
const handler = this.messageHandlers.get(pattern);
if (!handler) {
return this.emitter.emit(ERROR_EVENT, {
id: packet.id,
status: 'error',
err: NO_MESSAGE_HANDLER + ` (${pattern})`,
});
}
const response$ = this.transformToObservable(
await handler(packet.data),
) as Observable<any>;
response$ &&
this.send(response$, (data) =>
this.emitter.emit(
`message_response_${packet.id}`,
Object.assign(data, { id: packet.id, pattern }),
),
);
}
close(): any {
this.emitter.removeAllListeners();
}
listen(callback: () => any): any {
callback();
}
readonly transportId = Symbol.for('MemoryTransport');
}
To fake the client side of the transport, we must extend the ClientProxy class. It has two APIs - dispatchEvent for fire-and-forget events, and publish for a request-response pattern:
export class MemoryTransportClient extends ClientProxy {
constructor(private readonly emitter: EventEmitter) {
super();
}
protected async dispatchEvent(packet: ReadPacket): Promise<any> {
this.emitter.emit(MESSAGE_EVENT, packet);
return packet;
}
protected publish(
partialPacket: ReadPacket,
callback: (packet: WritePacket) => any,
) {
try {
const packet = this.assignPacketId(partialPacket);
this.emitter.once('message_response_' + packet.id,
(response) => {
callback(response);
});
this.emitter.emit(MESSAGE_EVENT, packet);
return () => {}
} catch (err) {
callback({ err });
}
}
close(): any {
this.emitter.removeAllListeners();
}
public async connect() {}
}
In order to provide a drop-in replacement for the other Transport clients, we should also provide a ClientsModule with a similar API:
class MemoryClientsModule {
static register({
name,
emitter,
}: {
name: string;
emitter: EventEmitter;
}): DynamicModule {
return {
providers: [
{
provide: name,
useValue: new MemoryTransportClient(emitter),
},
],
exports: [name],
module: MemoryClientsModule,
};
}
}
Now that we have memory fakes, we can modify our test harness to run the three microservices:
async function createTestHarness(products: ProductTemplate[]) {
const {
ordersApp,
cartApp,
catalogApp,
...rest} = await runMicroservices(products);
return {
catalogApp: request(catalogApp.getHttpServer()),
ordersApp: request(ordersApp.getHttpServer()),
cartApp: request(cartApp.getHttpServer()),
...rest
}
}
async function runMicroservices(products: ProductTemplate[] = []) {
const emitter = new EventEmitter();
const strategy = new MemoryTransportServer(emitter);
const clientsModule = MemoryClientsModule.register({
name: CART_CLIENT,
emitter,
});
const productRepo = new InMemoryProductRepository(products);
const orderRepo = new InMemoryOrderRepository();
const catalogApp = await createCatalogApp(
productRepo, {strategy});
const ordersApp = await createOrdersApp(orderRepo, {strategy});
const cartApp = await createCartApp(clientsModule);
return {
catalogApp,
ordersApp,
cartApp,
orderRepo,
productRepo
};
}
Finally, we can modify our acceptance test to communicate with all three microservices. Note that we added a call to fetch a list of products from the Catalog microservice, to assert that its HTTP API is functioning. We can safely assume that the product catalog contains exactly one product because we provide the initial list of products to the InMemoryProductRepository via the createTestHarness function. Similarly, at the end of the test, we now get the newly-created order from the Order microservice.
test('a user can order a product', async () => {
const {ordersApp, cartApp, catalogApp } =
await createTestHarness([aProduct()]);
const cartId = '666';
const products = await catalogApp.get('/products/')
.expect(200)
.then((response) => {
const items: unknown[] = response.body;
return items.map(item => Product.parse(item));
});
expect(products).toHaveLength(1);
const product = products[0];
await cartApp
.post(`/cart/${cartId}`)
.send({productId: product.id})
.expect(201);
await cartApp
.get(`/cart/${cartId}`)
.expect({id: cartId, items: [{
productId: product.id,
price: product.price,
name: product.title
}]});
const orderId = await cartApp
.post(`/cart/${cartId}/checkout`)
.expect(201)
.then(response => response.text);
const order = await ordersApp
.get(`/order/${orderId}`)
.expect(200)
.then(response => Order.parse(response.body));
expect(order).toMatchObject(expect.objectContaining({
id: orderId,
items: expect.arrayContaining([
expect.objectContaining({
productId: product.id,
})
])
}));
});
The memory-based transport is available as a separate library (yes, with its own contract test!). As always, the code examples for this post are available here.
Comments