|
|
|
|
bootstrap-server: 10.160.231.159:9093,10.160.231.58:9093,10.160.231.204:9093
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
生产:
|
|
|
|
|
topic: shell-loyalty-sichuan
|
|
|
|
|
username: oms_prod
|
|
|
|
|
username: R!y7GB9IDA
|
|
|
|
|
测试:
|
|
|
|
|
topic: shell-loyalty-test
|
|
|
|
|
username: oms_test
|
|
|
|
|
username: Fi51kN!yzS
|
|
|
|
|
|
|
|
|
|
group可自定义,请在名称中以group标识,如oms_group
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
#------------------------------Java代码参考,以实际系统需求与技术实现为准----------------------------
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# Specifies the broker information of the Kafka instance.
|
|
|
|
|
# ip:port indicates the connection address and port number of the instance.
|
|
|
|
|
# For details about how to obtain the information, see section "Collecting Connection Information."
|
|
|
|
|
# Example: bootstrap.servers=100.xxx.xxx.87:909x,100.xxx.xxx.69:909x,100.xxx.xxx.155:909x
|
|
|
|
|
bootstrap.servers=ip1:port1,ip2:port2,ip3:port3
|
|
|
|
|
acks=all
|
|
|
|
|
key.serializer=org.apache.kafka.common.serialization.StringSerializer
|
|
|
|
|
value.serializer=org.apache.kafka.common.serialization.StringSerializer
|
|
|
|
|
buffer.memory=33554432
|
|
|
|
|
retries=0
|
|
|
|
|
sasl.mechanism=SCRAM-SHA-512
|
|
|
|
|
security.protocol=SASL_SSL
|
|
|
|
|
|
|
|
|
|
# Password of the SSL truststore file.
|
|
|
|
|
# This password is configured to access the JKS file generated by Java.
|
|
|
|
|
ssl.truststore.password=dms@kafka
|
|
|
|
|
|
|
|
|
|
# Indicates whether to verify the certificate domain name.
|
|
|
|
|
# If this parameter is left blank, the function is disabled.
|
|
|
|
|
# This parameter must be disabled and must be left blank.
|
|
|
|
|
ssl.endpoint.identification.algorithm=
|
|
|
|
|
|
|
|
|
|
# Location of the SSL truststore file
|
|
|
|
|
ssl.truststore.location=client.jks
|
|
|
|
|
|
|
|
|
|
# Set the jaas username and password.
|
|
|
|
|
# The username and password are the username and password entered when SASL_SSL
|
|
|
|
|
# is enabled during Kafka instance creation,
|
|
|
|
|
# or the username and password set when SASL_SSL user is created.
|
|
|
|
|
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
|
|
|
|
|
username="username" \
|
|
|
|
|
password="password";
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
#------------------------------生产消息代码
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
package com.dms.kafka.examples;
|
|
|
|
|
|
|
|
|
|
import org.apache.kafka.clients.producer.Callback;
|
|
|
|
|
import org.apache.kafka.clients.producer.KafkaProducer;
|
|
|
|
|
import org.apache.kafka.clients.producer.Producer;
|
|
|
|
|
import org.apache.kafka.clients.producer.ProducerRecord;
|
|
|
|
|
|
|
|
|
|
import java.io.BufferedInputStream;
|
|
|
|
|
import java.io.FileInputStream;
|
|
|
|
|
import java.io.IOException;
|
|
|
|
|
import java.io.InputStream;
|
|
|
|
|
import java.net.URL;
|
|
|
|
|
import java.util.ArrayList;
|
|
|
|
|
import java.util.Enumeration;
|
|
|
|
|
import java.util.List;
|
|
|
|
|
import java.util.Properties;
|
|
|
|
|
|
|
|
|
|
public class DmsProducer<K, V> {
|
|
|
|
|
public static final String PRODUCER_CONFIG_FILE_NAME = "dms.sdk.producer.properties";
|
|
|
|
|
private Producer<K, V> producer;
|
|
|
|
|
|
|
|
|
|
DmsProducer(String path) {
|
|
|
|
|
Properties props = new Properties();
|
|
|
|
|
try {
|
|
|
|
|
InputStream in = new BufferedInputStream(new FileInputStream(path));
|
|
|
|
|
props.load(in);
|
|
|
|
|
}
|
|
|
|
|
catch (IOException e) {
|
|
|
|
|
e.printStackTrace();
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
producer = new KafkaProducer<>(props);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
DmsProducer() {
|
|
|
|
|
Properties props;
|
|
|
|
|
try {
|
|
|
|
|
props = loadFromClassPath(PRODUCER_CONFIG_FILE_NAME);
|
|
|
|
|
}
|
|
|
|
|
catch (IOException e) {
|
|
|
|
|
e.printStackTrace();
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
producer = new KafkaProducer<>(props);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public void produce(String topic, V value) {
|
|
|
|
|
produce(topic, null, null, value, null, null);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public void produce(String topic, Integer partition, K key, V value) {
|
|
|
|
|
produce(topic, partition, key, value, null, null);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public void produce(ProducerRecord<K, V> record) {
|
|
|
|
|
produce(record, null);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public void produce(ProducerRecord<K, V> record, Callback callback) {
|
|
|
|
|
producer.send(record, callback);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public void produce(String topic, Integer partition, K key, V value, Long timestamp,
|
|
|
|
|
Callback callback) {
|
|
|
|
|
if (timestamp == null) {
|
|
|
|
|
produce(new ProducerRecord<>(topic, partition, key, value), callback);
|
|
|
|
|
} else {
|
|
|
|
|
produce(new ProducerRecord<>(topic, partition, timestamp, key, value), callback);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public void produce(String topic, Integer partition, K key, V value, Long timestamp) {
|
|
|
|
|
produce(topic, partition, key, value, timestamp, null);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public void produce(String topic, Integer partition, K key, V value, Callback callback) {
|
|
|
|
|
produce(topic, partition, key, value, null, callback);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public void close() {
|
|
|
|
|
producer.close();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private static ClassLoader getCurrentClassLoader() {
|
|
|
|
|
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
|
|
|
|
|
if (classLoader == null) {
|
|
|
|
|
classLoader = DmsProducer.class.getClassLoader();
|
|
|
|
|
}
|
|
|
|
|
return classLoader;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private static Properties loadFromClassPath(String configFileName)
|
|
|
|
|
throws IOException {
|
|
|
|
|
ClassLoader classLoader = getCurrentClassLoader();
|
|
|
|
|
Properties config = new Properties();
|
|
|
|
|
|
|
|
|
|
List<URL> properties = new ArrayList<>();
|
|
|
|
|
Enumeration<URL> propertyResources = classLoader.getResources(configFileName);
|
|
|
|
|
while (propertyResources.hasMoreElements()) {
|
|
|
|
|
properties.add(propertyResources.nextElement());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for (URL url : properties) {
|
|
|
|
|
InputStream is = null;
|
|
|
|
|
try {
|
|
|
|
|
is = url.openStream();
|
|
|
|
|
config.load(is);
|
|
|
|
|
}
|
|
|
|
|
finally {
|
|
|
|
|
if (is != null) {
|
|
|
|
|
is.close();
|
|
|
|
|
is = null;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return config;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
#------------------------------测试代码
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
package com.dms.kafka.examples;
|
|
|
|
|
|
|
|
|
|
import org.junit.Test;
|
|
|
|
|
|
|
|
|
|
public class DmsProducerTest {
|
|
|
|
|
@Test
|
|
|
|
|
public void testProducer()
|
|
|
|
|
throws Exception {
|
|
|
|
|
DmsProducer<String, String> producer = new DmsProducer<>();
|
|
|
|
|
int partition = 0;
|
|
|
|
|
try {
|
|
|
|
|
for (int i = 0; i < 10; i++) {
|
|
|
|
|
String key = null;
|
|
|
|
|
String data = String.format("Demo message %d.", i);
|
|
|
|
|
|
|
|
|
|
producer.produce("topic-0", partition, key, data, (metadata, exception) -> {
|
|
|
|
|
if (exception != null) {
|
|
|
|
|
exception.printStackTrace();
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
System.out.println("Message produced.");
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
System.out.println("Message content:" + data);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
catch (Exception e) {
|
|
|
|
|
e.printStackTrace();
|
|
|
|
|
}
|
|
|
|
|
finally {
|
|
|
|
|
producer.close();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|