MVP.Express

Type-safe RPC with code generation and streaming support

RPC FrameworkSchema-FirstStreaming

Overview

MVP.Express stands for MYRA Virtual Procedure over Express Link—it’s MYRA’s RPC framework built entirely using MYRA libraries. It provides a complete solution for building distributed services with type-safe communication. Built on top of MyraCodec and MyraTransport, it offers schema-first service definitions, code generation, and both unary and streaming RPC patterns.

Key Features

Schema-First Services

  • Service definitions in YAML with methods and types
  • Generated clients and servers with full type safety
  • IDL-driven development for API-first design

Multiple RPC Patterns

  • Unary RPC: Request-response communication
  • Server streaming: Server sends multiple responses
  • Client streaming: Client sends multiple requests
  • Bidirectional streaming: Full-duplex communication

Production Features

  • Automatic retries with exponential backoff
  • Circuit breakers for fault tolerance
  • Request/response metadata for tracing
  • Built-in metrics and observability hooks

Full Stack Integration

  • Built on Myra Transport for io_uring performance
  • Uses Myra Codec for efficient serialization
  • Powered by Roray FFM Utils for memory management

Service Definition

# service.myra.yml
namespace: com.example.kvstore

types:
  Key:
    key: string
    
  Value:
    data: bytes
    version: int64
    
  KeyValue:
    key: Key
    value: Value
    
  GetRequest:
    key: Key
    
  PutRequest:
    key: Key
    value: Value
    expectedVersion: int64?
    
  DeleteRequest:
    key: Key
    expectedVersion: int64?
    
  WatchRequest:
    prefix: string
    
  WatchEvent:
    type: WatchEventType
    kv: KeyValue
    
  WatchEventType:
    enum: [PUT, DELETE]

services:
  KVStore:
    methods:
      get:
        request: GetRequest
        response: Value?
        
      put:
        request: PutRequest
        response: Value
        
      delete:
        request: DeleteRequest
        response: bool
        
      scan:
        request: Key
        response: stream KeyValue
        
      watch:
        request: WatchRequest
        response: stream WatchEvent

Generated Server Implementation

public class KVStoreImpl implements KVStoreService {
    private final ConcurrentMap<String, VersionedValue> store = 
        new ConcurrentHashMap<>();
    
    @Override
    public Value get(GetRequest request) {
        var value = store.get(request.key().key());
        return value != null ? value.toValue() : null;
    }
    
    @Override
    public Value put(PutRequest request) {
        var key = request.key().key();
        var newVersion = System.currentTimeMillis();
        
        var value = new VersionedValue(
            request.value().data(), 
            newVersion
        );
        
        if (request.expectedVersion().isPresent()) {
            // Optimistic concurrency
            var expected = request.expectedVersion().get();
            store.compute(key, (k, existing) -> {
                if (existing != null && existing.version() != expected) {
                    throw new VersionMismatchException();
                }
                return value;
            });
        } else {
            store.put(key, value);
        }
        
        return value.toValue();
    }
    
    @Override
    public Flow.Publisher<KeyValue> scan(Key prefix) {
        return subscriber -> {
            store.entrySet().stream()
                .filter(e -> e.getKey().startsWith(prefix.key()))
                .map(e -> KeyValue.of(Key.of(e.getKey()), e.getValue().toValue()))
                .forEach(subscriber::onNext);
            subscriber.onComplete();
        };
    }
}

Client Usage

// Create client
var client = KVStoreClient.builder()
    .address("localhost", 8080)
    .build();

// Unary call
var value = client.get(GetRequest.of(Key.of("user:123")));

// Streaming call
client.watch(WatchRequest.of("user:"))
    .subscribe(event -> {
        System.out.println("Event: " + event.type() + " " + event.kv().key());
    });

Starting the Server

var server = RpcServer.builder()
    .port(8080)
    .service(new KVStoreImpl())
    .build();

server.start();
System.out.println("KVStore server running on port 8080");
server.awaitTermination();

Installation

Gradle (Kotlin DSL)

plugins {
    id("express.mvp.rpc-framework") version "0.1.0"
}

dependencies {
    implementation("express.mvp:rpc-framework:0.1.0")
}

rpcFramework {
    schemaDir = file("src/main/schemas")
    outputDir = file("build/generated/sources/rpc")
}

Maven

<dependency>
    <groupId>express.mvp</groupId>
    <artifactId>rpc-framework</artifactId>
    <version>0.1.0</version>
</dependency>