You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

215 lines
6.0 KiB

1 year ago
bootstrap-server: 10.160.231.159:9093,10.160.231.58:9093,10.160.231.204:9093
生产:
topic: shell-loyalty-sichuan
username: oms_prod
username: R!y7GB9IDA
测试:
topic: shell-loyalty-test
username: oms_test
username: Fi51kN!yzS
group可自定义请在名称中以group标识如oms_group
#------------------------------Java代码参考以实际系统需求与技术实现为准----------------------------
# Specifies the broker information of the Kafka instance.
# ip:port indicates the connection address and port number of the instance.
# For details about how to obtain the information, see section "Collecting Connection Information."
# Example: bootstrap.servers=100.xxx.xxx.87:909x,100.xxx.xxx.69:909x,100.xxx.xxx.155:909x
bootstrap.servers=ip1:port1,ip2:port2,ip3:port3
acks=all
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
buffer.memory=33554432
retries=0
sasl.mechanism=SCRAM-SHA-512
security.protocol=SASL_SSL
# Password of the SSL truststore file.
# This password is configured to access the JKS file generated by Java.
ssl.truststore.password=dms@kafka
# Indicates whether to verify the certificate domain name.
# If this parameter is left blank, the function is disabled.
# This parameter must be disabled and must be left blank.
ssl.endpoint.identification.algorithm=
# Location of the SSL truststore file
ssl.truststore.location=client.jks
# Set the jaas username and password.
# The username and password are the username and password entered when SASL_SSL
# is enabled during Kafka instance creation,
# or the username and password set when SASL_SSL user is created.
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
username="username" \
password="password";
#------------------------------生产消息代码
package com.dms.kafka.examples;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
import java.util.Properties;
public class DmsProducer<K, V> {
public static final String PRODUCER_CONFIG_FILE_NAME = "dms.sdk.producer.properties";
private Producer<K, V> producer;
DmsProducer(String path) {
Properties props = new Properties();
try {
InputStream in = new BufferedInputStream(new FileInputStream(path));
props.load(in);
}
catch (IOException e) {
e.printStackTrace();
return;
}
producer = new KafkaProducer<>(props);
}
DmsProducer() {
Properties props;
try {
props = loadFromClassPath(PRODUCER_CONFIG_FILE_NAME);
}
catch (IOException e) {
e.printStackTrace();
return;
}
producer = new KafkaProducer<>(props);
}
public void produce(String topic, V value) {
produce(topic, null, null, value, null, null);
}
public void produce(String topic, Integer partition, K key, V value) {
produce(topic, partition, key, value, null, null);
}
public void produce(ProducerRecord<K, V> record) {
produce(record, null);
}
public void produce(ProducerRecord<K, V> record, Callback callback) {
producer.send(record, callback);
}
public void produce(String topic, Integer partition, K key, V value, Long timestamp,
Callback callback) {
if (timestamp == null) {
produce(new ProducerRecord<>(topic, partition, key, value), callback);
} else {
produce(new ProducerRecord<>(topic, partition, timestamp, key, value), callback);
}
}
public void produce(String topic, Integer partition, K key, V value, Long timestamp) {
produce(topic, partition, key, value, timestamp, null);
}
public void produce(String topic, Integer partition, K key, V value, Callback callback) {
produce(topic, partition, key, value, null, callback);
}
public void close() {
producer.close();
}
private static ClassLoader getCurrentClassLoader() {
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
if (classLoader == null) {
classLoader = DmsProducer.class.getClassLoader();
}
return classLoader;
}
private static Properties loadFromClassPath(String configFileName)
throws IOException {
ClassLoader classLoader = getCurrentClassLoader();
Properties config = new Properties();
List<URL> properties = new ArrayList<>();
Enumeration<URL> propertyResources = classLoader.getResources(configFileName);
while (propertyResources.hasMoreElements()) {
properties.add(propertyResources.nextElement());
}
for (URL url : properties) {
InputStream is = null;
try {
is = url.openStream();
config.load(is);
}
finally {
if (is != null) {
is.close();
is = null;
}
}
}
return config;
}
}
#------------------------------测试代码
package com.dms.kafka.examples;
import org.junit.Test;
public class DmsProducerTest {
@Test
public void testProducer()
throws Exception {
DmsProducer<String, String> producer = new DmsProducer<>();
int partition = 0;
try {
for (int i = 0; i < 10; i++) {
String key = null;
String data = String.format("Demo message %d.", i);
producer.produce("topic-0", partition, key, data, (metadata, exception) -> {
if (exception != null) {
exception.printStackTrace();
return;
}
System.out.println("Message produced.");
});
System.out.println("Message content:" + data);
}
}
catch (Exception e) {
e.printStackTrace();
}
finally {
producer.close();
}
}
}