Schema registry can be used in various scenarios. In this page, the configurations for different use-cases are listed.

You can find the related parameters that you need the use in the configurations from Upstash Console. Scroll down to the REST API section to find the values you need:

  • UPSTASH_KAFKA_REST_URL
  • UPSTASH_KAFKA_REST_USERNAME
  • UPSTASH_KAFKA_REST_PASSWORD

Producer/Consumer

Producer

If you need to configure your producers to use the schema registry, add the following properties to producer properties in addition to the broker configurations. Note that the selected deserializer needs to be schema-registry aware.

Properties props = new Properties();
// ... other configurations like broker.url and broker authentication are skipped
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer, "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("schema.registry.url", UPSTASH_KAFKA_REST_URL + "/schema-registry");
props.put("basic.auth.credentials.source", "USER_INFO");
props.put("basic.auth.user.info", UPSTASH_KAFKA_REST_USERNAME + ":" + UPSTASH_KAFKA_REST_PASSWORD);

try (var producer = new KafkaProducer<String, org.apache.avro.GenericRecord>(props)) {
    // ...
  }

Consumer

If you need to configure your consumers to use the schema registry, add the following properties to consumer properties in addition to the broker configurations. Note that the selected deserializer needs to be schema-registry aware.

Properties props = new Properties();
// ... other configurations like broker.url and broker authentication are skipped
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer, "io.confluent.kafka.serializers.KafkaAvroDeserializer");
props.put("schema.registry.url", "$UPSTASH_KAFKA_REST_URL/schema-registry");
props.put("basic.auth.credentials.source", "USER_INFO");
props.put("basic.auth.user.info", UPSTASH_KAFKA_REST_USERNAME + ":" + UPSTASH_KAFKA_REST_PASSWORD);

try(var consumer = new KafkaConsumer<String, org.apache.avro.GenericRecord>(props)) {
    // ...
}

Connectors

Some connectors forces you to use a STRUCT as key/value which means that you need a schema and a schema aware convertor to use with the connector. For this case, you can add the following configurations to your connector.

{
    "name": "myConnector",
    "properties": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        // other configurations are skipped.

        "key.converter": "io.confluent.connect.avro.AvroConverter",
        "key.converter.upstash.schema.registry.enable": "true",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.upstash.schema.registry.enable": "true"
    }
}

The config above is the shorther version of the following.

{
    "name": "myConnector",
    "properties": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        // other configurations are skipped.

        "key.converter": "io.confluent.connect.avro.AvroConverter",
        "key.converter.basic.auth.credentials.source": "USER_INFO",
        "key.converter.basic.auth.user.info": "UPSTASH_KAFKA_REST_USERNAME:UPSTASH_KAFKA_REST_PASSWORD",
        "key.converter.schema.registry.url": "UPSTASH_KAFKA_REST_URL/schema-registry",

        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.basic.auth.credentials.source": "USER_INFO",
        "value.converter.basic.auth.user.info": "UPSTASH_KAFKA_REST_USERNAME:UPSTASH_KAFKA_REST_PASSWORD",
        "value.converter.schema.registry.url": "UPSTASH_KAFKA_REST_URL/schema-registry"
    }
}

Thirdy party UI tools

You can use third-party UI tools to use the schema registry with. We have schema registry configuration examples in our Monitoring section.

SchemaRegistryClient

SchemaRegistryClient can be used to access the schema registry programmatically. In this case, you can configure it as following:

Map<String, String> configs = new HashMap<>();
configs.put(SchemaRegistryClientConfig.BASIC_AUTH_CREDENTIALS_SOURCE, "USER_INFO");
configs.put(SchemaRegistryClientConfig.USER_INFO_CONFIG, UPSTASH_KAFKA_REST_USERNAME + ":" + UPSTASH_KAFKA_REST_PASSWORD);
var client = new CachedSchemaRegistryClient(UPSTASH_KAFKA_REST_URL + "/schema-registry", 100, configs);