RabbitMQ:Jackson2JsonMessageConverter分析
跳到导航
跳到搜索
关于
在看《Spring 实战》:发送异步消息一节,对于 RabbitMQ(RabbitTemplate)提到基于 JSON 的转换,可以使用 Jackson2JsonMessageConverter 作为消息转换器,但是没有提到“在生产者、消费者之间如何实现对象映射”。 虽然可以想到和 JMS(JmsTemplate)基于 JSON 转换所用到的 MappinJackson2MessageConverter 类似:在生产者、消费者端配置消息转换器时,都设置 typeIdPropertyName、typeIdMappins[1]。 但是对于其流程并不十分清楚,所以通过源代码追踪简单分析如下。
执行过程
以下以 RabbitTemplate.convertAndSend(Object object) 为例。
- RabbitTemplate.convertAndSend:
public class RabbitTemplate extends ... { ... // 如果不配置,则默认的 MessageConverter 为 SimpleMessageConverter private MessageConverter messageConverter = new SimpleMessageConverter(); ... ... // setter:setMessageConverter // getter:getMessageConverter ... ... @Override public void convertAndSend(Object object) throws AmqpException { // 使用默认的 exchange、routingKey,而 CorrelationData 为空 convertAndSend(this.exchange, this.routingKey, object, (CorrelationData) null); } @Override public void convertAndSend(String exchange, String routingKey, final Object object, @Nullable CorrelationData correlationData) throws AmqpException { // send 方法发送的信息为 Message 类型,所以需要 convertMessageIfNecessary(object) send(exchange, routingKey, convertMessageIfNecessary(object), correlationData); } @Override public void send(final String exchange, final String routingKey, final Message message, @Nullable final CorrelationData correlationData) throws AmqpException { execute(channel -> { doSend(channel, exchange, routingKey, message, (RabbitTemplate.this.returnsCallback != null || (correlationData != null && StringUtils.hasText(correlationData.getId()))) && isMandatoryFor(message), correlationData); return null; }, obtainTargetConnectionFactory(this.sendConnectionFactorySelectorExpression, message)); } ... // 消息转换:Object -> Message protected Message convertMessageIfNecessary(final Object object) { if (object instanceof Message) { return (Message) object; } /** * 获取“消息转换器”并调用其 toMessage 方法 * 如果使用【Jackson2JsonMessageConverter】,则实际调用【AbstractMessageConverter.toMessage】 * * (Jackson2JsonMessageConverter 只包含构造方法,AbstractMessageConverter 是其父类的父类) */ return getRequiredMessageConverter().toMessage(object, new MessageProperties()); } ... private MessageConverter getRequiredMessageConverter() throws IllegalStateException { MessageConverter converter = getMessageConverter(); // 获取配置的 MessageConverter if (converter == null) { throw new AmqpIllegalStateException( "No 'messageConverter' specified. Check configuration of RabbitTemplate."); } return converter; } }
- 默认的消息转换器为:SimpleMessageConverter,使用 Jackson2JsonMessageConverter 需要配置。
- AbstractMessageConverter.toMessage:
public abstract class AbstractMessageConverter implements MessageConverter { ... @Override public final Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException { // 转换消息,genericType 为空(但此类的 genericType 并无作用) return toMessage(object, messageProperties, null); } @Override public final Message toMessage(Object object, @Nullable MessageProperties messagePropertiesArg, @Nullable Type genericType) throws MessageConversionException { MessageProperties messageProperties = messagePropertiesArg; if (messageProperties == null) { messageProperties = new MessageProperties(); } // 调用 createMessage 方法创建 Message 对象(实际的消息) Message message = createMessage(object, messageProperties, genericType); messageProperties = message.getMessageProperties(); // 保证 Message 的 messageProperties 中 MessageId 不为空 if (this.createMessageIds && messageProperties.getMessageId() == null) { messageProperties.setMessageId(UUID.randomUUID().toString()); } return message; } protected Message createMessage(Object object, MessageProperties messageProperties, @Nullable Type genericType) { return createMessage(object, messageProperties); // 可以看到 genericType 被抛弃了 } /** * 抽象方法,将会在子类中实现: * 如果实用【Jackson2JsonMessageConverter】,则实际调用【AbstractJackson2MessageConverter.createMessage】 */ protected abstract Message createMessage(Object object, MessageProperties messageProperties); }
- AbstractJackson2MessageConverter.createMessage:
public abstract class AbstractJackson2MessageConverter extends AbstractMessageConverter implements BeanClassLoaderAware, SmartMessageConverter { ... public static final Charset DEFAULT_CHARSET = StandardCharsets.UTF_8; protected final ObjectMapper objectMapper; private MimeType supportedContentType; private String supportedCTCharset; @Nullable private ClassMapper classMapper = null; // !!! private Charset defaultCharset = DEFAULT_CHARSET; ... private Jackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper(); // !!! ... private boolean charsetIsUtf8 = true; ... // 构造方法(以下以 Jackson2JsonMessageConverter 的无参构造为例) protected AbstractJackson2MessageConverter(ObjectMapper objectMapper, MimeType contentType, String... trustedPackages) { Assert.notNull(objectMapper, "'objectMapper' must not be null"); Assert.notNull(contentType, "'contentType' must not be null"); this.objectMapper = objectMapper; // 为 new ObjectMapper()的对象 this.supportedContentType = contentType; // 为 "application/json" 的 MimeType(MIME 类型) this.supportedCTCharset = this.supportedContentType.getParameter("charset"); ((DefaultJackson2JavaTypeMapper) this.javaTypeMapper).setTrustedPackages(trustedPackages); } ... @Override protected Message createMessage(Object objectToConvert, MessageProperties messageProperties) throws MessageConversionException { // 创建 Message 对象,genericType 为空 return createMessage(objectToConvert, messageProperties, null); } @Override protected Message createMessage(Object objectToConvert, MessageProperties messageProperties, @Nullable Type genericType) throws MessageConversionException { byte[] bytes; // 通过 objectMapper 将 objectToConvert(要发送的对象)转换为 byte[] try { // 默认:以 Utf8(JsonEncoding.UTF8) 转换对象 if (this.charsetIsUtf8 && this.supportedCTCharset == null) { bytes = this.objectMapper.writeValueAsBytes(objectToConvert); } // 以设置的 supportedCTCharset 转换对象 else { String jsonString = this.objectMapper .writeValueAsString(objectToConvert); String encoding = this.supportedCTCharset != null ? this.supportedCTCharset : getDefaultCharset(); bytes = jsonString.getBytes(encoding); } } catch (IOException e) { throw new MessageConversionException("Failed to convert Message content", e); } /** * messageProperties:设置内容的 MIME 类型 * * supportedContentType由“构造方法、set方法”指定,默认:"application/json" */ messageProperties.setContentType(this.supportedContentType.toString()); /** * messageProperties:设置内容的字符集 * * supportedCTCharset由“构造方法、set方法”指定,默认:StandardCharsets.UTF_8 */ if (this.supportedCTCharset == null) { messageProperties.setContentEncoding(getDefaultCharset()); } // messageProperties:设置内容的长度 messageProperties.setContentLength(bytes.length); /** * messageProperties:【设置对象类型!!!】 * * 1、classMapper:只能由“set方法”指定,默认:空 * * 2、javaTypeMapper:只能由“set方法”指定,默认已设置为:DefaultJackson2JavaTypeMapper */ if (getClassMapper() == null) { //【通过 objectMapper 构建 JavaType 对象】 JavaType type = this.objectMapper.constructType( genericType == null ? objectToConvert.getClass() : genericType); if (genericType != null && !type.isContainerType() && Modifier.isAbstract(type.getRawClass().getModifiers())) { type = this.objectMapper.constructType(objectToConvert.getClass()); } /** * 通过 javaTypeMapper 的配置(Jackson2JavaTypeMapper 的实现类), * 将 JavaType 设置到 messageProperties * * 【默认将调用 DefaultJackson2JavaTypeMapper.fromJavaType】 */ getJavaTypeMapper().fromJavaType(type, messageProperties); } else { /** * 通过 classMapper 的配置(ClassMapper 的实现类), * 将 objectToConvert.getClass() 设置到 messageProperties */ getClassMapper().fromClass(objectToConvert.getClass(), messageProperties); // NOSONAR never null } return new Message(bytes, messageProperties); } }
- DefaultJackson2JavaTypeMapper.fromJavaType:
public class DefaultJackson2JavaTypeMapper extends AbstractJavaTypeMapper implements Jackson2JavaTypeMapper { ... /** * 以下 addHeader、getClassIdFieldName、getContentClassIdFieldName、getKeyClassIdFieldName 方法 * 均已在父类 AbstractJavaTypeMapper 中实现 */ @Override public void fromJavaType(JavaType javaType, MessageProperties properties) { /** * 向 messageProperties 的【“__TypeId__”】中写入 javaType 的“原始类型”【即,对象本身的类型】 */ addHeader(properties, getClassIdFieldName(), javaType.getRawClass()); /** * 如果 javaType 表示的类型是“数组类型”以外的“容器类型”,【即,对象是一个容器类型】 * 向 messageProperties 的【“__ContentTypeId__”】中写入 javaType 元素的类型【即,对象的元素的类型】 * * P.S. “容器类型”:包括数组、映射和集合类型 */ if (javaType.isContainerType() && !javaType.isArrayType()) { addHeader(properties, getContentClassIdFieldName(), javaType.getContentType().getRawClass()); } /** * 如果 javaType 的 _keyType 不为空,【即,对象是一个映射类型】 * 向 messageProperties 的【“__KeyTypeId__”】中写入 javaType 的 _keyType 的类型【即,对象的 key 的类型】 * * P.S. _keyType 用于表示 MapLikeType(表示一个 Map-like 的类型)的“键的类型” */ if (javaType.getKeyType() != null) { addHeader(properties, getKeyClassIdFieldName(), javaType.getKeyType().getRawClass()); } } ... }
- AbstractJavaTypeMapper.addHeader:
public abstract class AbstractJavaTypeMapper implements BeanClassLoaderAware { ... // 用于保存 JavaType 的“原始类型” public static final String DEFAULT_CLASSID_FIELD_NAME = "__TypeId__"; // 用于保存 JavaType 的“元素的类型” public static final String DEFAULT_CONTENT_CLASSID_FIELD_NAME = "__ContentTypeId__"; // 用于保存 JavaType 的“key 的类型” public static final String DEFAULT_KEY_CLASSID_FIELD_NAME = "__KeyTypeId__"; // 用于保存 String -> Class<?> 的映射 private final Map<String, Class<?>> idClassMapping = new HashMap<String, Class<?>>(); // 用于保存 Class<?> -> String 的映射:由 idClassMapping 的键值反转得到 private final Map<Class<?>, String> classIdMapping = new HashMap<Class<?>, String>(); ... public String getClassIdFieldName() { return DEFAULT_CLASSID_FIELD_NAME; } public String getContentClassIdFieldName() { return DEFAULT_CONTENT_CLASSID_FIELD_NAME; } public String getKeyClassIdFieldName() { return DEFAULT_KEY_CLASSID_FIELD_NAME; } // 设置 idClassMapping: public void setIdClassMapping(Map<String, Class<?>> idClassMapping) { this.idClassMapping.putAll(idClassMapping); createReverseMap(); } // 设置 classIdMapping:将 idClassMapping 的键值反转 private void createReverseMap() { this.classIdMapping.clear(); for (Map.Entry<String, Class<?>> entry : this.idClassMapping.entrySet()) { String id = entry.getKey(); Class<?> clazz = entry.getValue(); this.classIdMapping.put(clazz, id); } } ... protected void addHeader(MessageProperties properties, String headerName, Class<?> clazz) { /** * 向 MessageProperties.headers 中添加(“要发送对象的”)类型: * 1、如果 classIdMapping 中存在 String -> Class<?> 的映射,则写入对应的 String * 2、否则,写入 Class<?> 的名称 */ if (this.classIdMapping.containsKey(clazz)) { properties.getHeaders().put(headerName, this.classIdMapping.get(clazz)); } else { properties.getHeaders().put(headerName, clazz.getName()); } } }
相关
参考
- ↑ JMS 的 MappinJackson2MessageConverter 配置:
@Bean public MappingJackson2MessageMappinConverter messageConverter(){ MappingJackson2MessageMappinConverter messageConverter = new MappingJackson2MessageMappinConverter(); // 设置消息转换器的 typeIdPropertyName(存储“对象类型”的字段) messageConverter.setTypeIdPropertyName("_typeId"); // Map 用于保存“对象类型”到“实际类型”的映射 Map<String, Class<?>> typeIdMapping = new HashMap<>(); typeIdMapping.put("Bean", bean.class); // 设置消息转换器的 typeIdMapping(“对象类型”到“实际类型”的映射) messageConverter.setTypeIdMapping(typeIdMapping); return messageConverter; }