bootstrap-server: 10.160.231.159:9093,10.160.231.58:9093,10.160.231.204:9093 生产: topic: shell-loyalty-sichuan username: oms_prod username: R!y7GB9IDA 测试: topic: shell-loyalty-test username: oms_test username: Fi51kN!yzS group可自定义,请在名称中以group标识,如oms_group #------------------------------Java代码参考,以实际系统需求与技术实现为准---------------------------- # Specifies the broker information of the Kafka instance. # ip:port indicates the connection address and port number of the instance. # For details about how to obtain the information, see section "Collecting Connection Information." # Example: bootstrap.servers=100.xxx.xxx.87:909x,100.xxx.xxx.69:909x,100.xxx.xxx.155:909x bootstrap.servers=ip1:port1,ip2:port2,ip3:port3 acks=all key.serializer=org.apache.kafka.common.serialization.StringSerializer value.serializer=org.apache.kafka.common.serialization.StringSerializer buffer.memory=33554432 retries=0 sasl.mechanism=SCRAM-SHA-512 security.protocol=SASL_SSL # Password of the SSL truststore file. # This password is configured to access the JKS file generated by Java. ssl.truststore.password=dms@kafka # Indicates whether to verify the certificate domain name. # If this parameter is left blank, the function is disabled. # This parameter must be disabled and must be left blank. ssl.endpoint.identification.algorithm= # Location of the SSL truststore file ssl.truststore.location=client.jks # Set the jaas username and password. # The username and password are the username and password entered when SASL_SSL # is enabled during Kafka instance creation, # or the username and password set when SASL_SSL user is created. sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \ username="username" \ password="password"; #------------------------------生产消息代码 package com.dms.kafka.examples; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import java.io.BufferedInputStream; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.net.URL; import java.util.ArrayList; import java.util.Enumeration; import java.util.List; import java.util.Properties; public class DmsProducer { public static final String PRODUCER_CONFIG_FILE_NAME = "dms.sdk.producer.properties"; private Producer producer; DmsProducer(String path) { Properties props = new Properties(); try { InputStream in = new BufferedInputStream(new FileInputStream(path)); props.load(in); } catch (IOException e) { e.printStackTrace(); return; } producer = new KafkaProducer<>(props); } DmsProducer() { Properties props; try { props = loadFromClassPath(PRODUCER_CONFIG_FILE_NAME); } catch (IOException e) { e.printStackTrace(); return; } producer = new KafkaProducer<>(props); } public void produce(String topic, V value) { produce(topic, null, null, value, null, null); } public void produce(String topic, Integer partition, K key, V value) { produce(topic, partition, key, value, null, null); } public void produce(ProducerRecord record) { produce(record, null); } public void produce(ProducerRecord record, Callback callback) { producer.send(record, callback); } public void produce(String topic, Integer partition, K key, V value, Long timestamp, Callback callback) { if (timestamp == null) { produce(new ProducerRecord<>(topic, partition, key, value), callback); } else { produce(new ProducerRecord<>(topic, partition, timestamp, key, value), callback); } } public void produce(String topic, Integer partition, K key, V value, Long timestamp) { produce(topic, partition, key, value, timestamp, null); } public void produce(String topic, Integer partition, K key, V value, Callback callback) { produce(topic, partition, key, value, null, callback); } public void close() { producer.close(); } private static ClassLoader getCurrentClassLoader() { ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); if (classLoader == null) { classLoader = DmsProducer.class.getClassLoader(); } return classLoader; } private static Properties loadFromClassPath(String configFileName) throws IOException { ClassLoader classLoader = getCurrentClassLoader(); Properties config = new Properties(); List properties = new ArrayList<>(); Enumeration propertyResources = classLoader.getResources(configFileName); while (propertyResources.hasMoreElements()) { properties.add(propertyResources.nextElement()); } for (URL url : properties) { InputStream is = null; try { is = url.openStream(); config.load(is); } finally { if (is != null) { is.close(); is = null; } } } return config; } } #------------------------------测试代码 package com.dms.kafka.examples; import org.junit.Test; public class DmsProducerTest { @Test public void testProducer() throws Exception { DmsProducer producer = new DmsProducer<>(); int partition = 0; try { for (int i = 0; i < 10; i++) { String key = null; String data = String.format("Demo message %d.", i); producer.produce("topic-0", partition, key, data, (metadata, exception) -> { if (exception != null) { exception.printStackTrace(); return; } System.out.println("Message produced."); }); System.out.println("Message content:" + data); } } catch (Exception e) { e.printStackTrace(); } finally { producer.close(); } } }