1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| private static final String TOPIC = "zhongmingmao"; private static final String USER_SCHEMA = "{\"type\": \"record\", \"name\": \"User\", " + "\"fields\": [{\"name\": \"id\", \"type\": \"int\"}, " + "{\"name\": \"name\", \"type\": \"string\"}, {\"name\": \"age\", \"type\": \"int\"}]}";
public static void main(String[] args) throws Exception { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.serializer", KafkaAvroSerializer.class.getName()); props.put("value.serializer", KafkaAvroSerializer.class.getName()); props.put("schema.registry.url", "http://localhost:8081");
Schema.Parser parser = new Schema.Parser(); Schema schema = parser.parse(USER_SCHEMA);
Producer<String, GenericRecord> producer = new KafkaProducer<>(props);
Random rand = new Random(); int id = 0;
try { while (id < 100) { id++; String name = "name" + id; int age = rand.nextInt(40) + 1; GenericRecord user = new GenericData.Record(schema); user.put("id", id); user.put("name", name); user.put("age", age);
ProducerRecord<String, GenericRecord> record = new ProducerRecord<>(TOPIC, user); producer.send(record); TimeUnit.SECONDS.sleep(1); } } finally { producer.close(); } }
|