Archives

Archives / 2016 / August
  • Working example of MassTransit 3 request/response conversation

    Chapter 3.5 of the MassTransit documentation contains a non-complete example of a request/response conversation using MassTransit. After some fiddling I got an example working using an in-memory MassTransit queue with multple request types. See the code below, implemented as an xunit test.

    using System;
    using System.Threading;
    using System.Threading.Tasks;
    using MassTransit;
    using Xunit;
    namespace MassTransitRequestResponse.Test
    {
    public interface INamedReq
    {
    string Name { get; }
    }
    public class NamedReq : INamedReq
    {
    public string Name { get; set; }
    }
    public interface INamedResp
    {
    string Named { get; }
    }
    public class NamedResp : INamedResp
    {
    public string Named { get; set; }
    }
    public interface ICalledReq
    {
    string Name { get; }
    }
    class CalledReq : ICalledReq
    {
    public string Name { get; set; }
    }
    public interface ICalledResp
    {
    string Called { get; }
    }
    public class CalledResp : ICalledResp
    {
    public string Called { get; set; }
    }
    public class NamedReqRespConsumer : IConsumer<INamedReq>
    {
    public Task Consume(ConsumeContext<INamedReq> context)
    {
    context.Respond(new NamedResp { Named = $"I name you {context.Message.Name}" });
    return Task.FromResult(0);
    }
    }
    public class CalledReqRespConsumer : IConsumer<ICalledReq>
    {
    public Task Consume(ConsumeContext<ICalledReq> context)
    {
    context.Respond(new CalledResp { Called = $"I call you {context.Message.Name}" });
    return Task.FromResult(0);
    }
    }
    public class MasstransitRequestResponse
    {
    [Fact]
    public async void ReqResTest()
    {
    // Arrange
    var control = Bus.Factory.CreateUsingInMemory(configure =>
    {
    configure.ReceiveEndpoint("reqRespQueue", endpoint =>
    {
    endpoint.Consumer<NamedReqRespConsumer>();
    endpoint.Consumer<CalledReqRespConsumer>();
    });
    });
    var myUri = control.Address;
    var namedClient = new MessageRequestClient<INamedReq, INamedResp>(control, new Uri(control.Address, "reqRespQueue"), TimeSpan.FromSeconds(3));
    var calledClient = new MessageRequestClient<ICalledReq, ICalledResp>(control, new Uri(control.Address, "reqRespQueue"), TimeSpan.FromSeconds(3));
    using (var handle = control.Start())
    {
    await handle.Ready;
    // Act
    var result1 = await namedClient.Request(new NamedReq() { Name = "Serge" }, CancellationToken.None);
    var result2 = await calledClient.Request(new CalledReq() { Name = "Danny" }, CancellationToken.None);
    // Assert
    Assert.Equal("I name you Serge", result1.Named);
    Assert.Equal("I call you Danny", result2.Called);
    }
    }
    }
    }