RabbitMQ:Jackson2JsonMessageConverter分析

来自Wikioe
Eijux讨论 | 贡献2022年12月24日 (六) 11:34的版本 →‎相关
跳到导航 跳到搜索


关于

在看《Spring 实战》:发送异步消息一节,对于 RabbitMQ(RabbitTemplate)提到基于 JSON 的转换,可以使用 Jackson2JsonMessageConverter 作为消息转换器,但是没有提到“在生产者、消费者之间如何实现对象映射”。

虽然可以想到和 JMS(JmsTemplate)基于 JSON 转换所用到的 MappinJackson2MessageConverter 类似:在生产者、消费者端配置消息转换器时,都设置 typeIdPropertyName、typeIdMappins[1]。

但是对于其流程并不十分清楚,所以通过源代码追踪简单分析如下。

执行过程

以下以 RabbitTemplate.convertAndSend(Object object) 为例。
  1. RabbitTemplate.convertAndSend
    public class RabbitTemplate extends ... {
    	...
    	// 如果不配置,则默认的 MessageConverter 为 SimpleMessageConverter
    	private MessageConverter messageConverter = new SimpleMessageConverter();
    	...
    	
    	...
    	// setter:setMessageConverter	
    	// getter:getMessageConverter
    	...
    	
    	...
    	@Override
    	public void convertAndSend(Object object) throws AmqpException {
    		// 使用默认的 exchange、routingKey,而 CorrelationData 为空
    		convertAndSend(this.exchange, this.routingKey, object, (CorrelationData) null);
    	}
    	
    	@Override
    	public void convertAndSend(String exchange, String routingKey, final Object object,
    			@Nullable CorrelationData correlationData) throws AmqpException {
    		// send 方法发送的信息为 Message 类型,所以需要 convertMessageIfNecessary(object)
    		send(exchange, routingKey, convertMessageIfNecessary(object), correlationData);
    	}
    	
    	@Override
    	public void send(final String exchange, final String routingKey,
    			final Message message, @Nullable final CorrelationData correlationData)
    			throws AmqpException {
    		execute(channel -> {
    			doSend(channel, exchange, routingKey, message,
    					(RabbitTemplate.this.returnsCallback != null
    							|| (correlationData != null && StringUtils.hasText(correlationData.getId())))
    							&& isMandatoryFor(message),
    					correlationData);
    			return null;
    		}, obtainTargetConnectionFactory(this.sendConnectionFactorySelectorExpression, message));
    	}
    	...
    	
    	// 消息转换:Object -> Message
    	protected Message convertMessageIfNecessary(final Object object) {
    		if (object instanceof Message) {
    			return (Message) object;
    		}
    		/**
    		 * 获取“消息转换器”并调用其 toMessage 方法
    		 * 如果使用【Jackson2JsonMessageConverter】,则实际调用【AbstractMessageConverter.toMessage】
    		 *
    		 * (Jackson2JsonMessageConverter 只包含构造方法,AbstractMessageConverter 是其父类的父类)
    		 */ 
    		return getRequiredMessageConverter().toMessage(object, new MessageProperties());
    	}
    	...
    	private MessageConverter getRequiredMessageConverter() throws IllegalStateException {
    		MessageConverter converter = getMessageConverter();	// 获取配置的 MessageConverter
    		if (converter == null) {
    			throw new AmqpIllegalStateException(
    					"No 'messageConverter' specified. Check configuration of RabbitTemplate.");
    		}
    		return converter;
    	}
    }
    
    • 默认的消息转换器为:SimpleMessageConverter,使用 Jackson2JsonMessageConverter 需要配置。
  2. AbstractMessageConverter.toMessage
    public abstract class AbstractMessageConverter implements MessageConverter {
    	...
    	
    	@Override
    	public final Message toMessage(Object object, MessageProperties messageProperties)
    			throws MessageConversionException {
    		// 转换消息,genericType 为空(但此类的 genericType 并无作用)
    		return toMessage(object, messageProperties, null);
    	}
    
    	@Override
    	public final Message toMessage(Object object, @Nullable MessageProperties messagePropertiesArg,
    			@Nullable Type genericType)
    			throws MessageConversionException {
    
    		MessageProperties messageProperties = messagePropertiesArg;
    		if (messageProperties == null) {
    			messageProperties = new MessageProperties();
    		}
    		// 调用 createMessage 方法创建 Message 对象(实际的消息)
    		Message message = createMessage(object, messageProperties, genericType);
    		messageProperties = message.getMessageProperties();
    		// 保证 Message 的 messageProperties 中 MessageId 不为空
    		if (this.createMessageIds && messageProperties.getMessageId() == null) {
    			messageProperties.setMessageId(UUID.randomUUID().toString());
    		}
    		return message;
    	}
    
    	protected Message createMessage(Object object, MessageProperties messageProperties, @Nullable Type genericType) {
    		return createMessage(object, messageProperties);	// 可以看到 genericType 被抛弃了
    	}
    	
    	/**
    	 * 抽象方法,将会在子类中实现:
    	 * 如果实用【Jackson2JsonMessageConverter】,则实际调用【AbstractJackson2MessageConverter.createMessage】
    	 */
    	protected abstract Message createMessage(Object object, MessageProperties messageProperties);
    }
    
  3. AbstractJackson2MessageConverter.createMessage
    public abstract class AbstractJackson2MessageConverter extends AbstractMessageConverter
    		implements BeanClassLoaderAware, SmartMessageConverter {
    	
    	...
    	public static final Charset DEFAULT_CHARSET = StandardCharsets.UTF_8;
    		
    	protected final ObjectMapper objectMapper;
    	
    	private MimeType supportedContentType;
    	
    	private String supportedCTCharset;
    	
    	@Nullable
    	private ClassMapper classMapper = null;		// !!!
    	
    	private Charset defaultCharset = DEFAULT_CHARSET;
    	
    	...
    	private Jackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper();	// !!!
    	
    	...
    	private boolean charsetIsUtf8 = true;
    	
    	...
    		
    	// 构造方法(以下以 Jackson2JsonMessageConverter 的无参构造为例)
    	protected AbstractJackson2MessageConverter(ObjectMapper objectMapper, MimeType contentType,
    			String... trustedPackages) {
    
    		Assert.notNull(objectMapper, "'objectMapper' must not be null");
    		Assert.notNull(contentType, "'contentType' must not be null");
    		this.objectMapper = objectMapper;	// 为 new ObjectMapper()的对象
    		this.supportedContentType = contentType;	// 为 "application/json" 的 MimeType(MIME 类型)
    		this.supportedCTCharset = this.supportedContentType.getParameter("charset");
    		((DefaultJackson2JavaTypeMapper) this.javaTypeMapper).setTrustedPackages(trustedPackages);
    	}
    		
    	...
    	
    	@Override
    	protected Message createMessage(Object objectToConvert, MessageProperties messageProperties)
    			throws MessageConversionException {
    		// 创建 Message 对象,genericType 为空
    		return createMessage(objectToConvert, messageProperties, null);
    	}
    	
    	@Override
    	protected Message createMessage(Object objectToConvert, MessageProperties messageProperties,
    			@Nullable Type genericType) throws MessageConversionException {
    
    		byte[] bytes;
    		// 通过 objectMapper 将 objectToConvert(要发送的对象)转换为 byte[]
    		try {
    			// 默认:以 Utf8(JsonEncoding.UTF8) 转换对象
    			if (this.charsetIsUtf8 && this.supportedCTCharset == null) {
    				bytes = this.objectMapper.writeValueAsBytes(objectToConvert);
    			}
    			// 以设置的 supportedCTCharset 转换对象
    			else {
    				String jsonString = this.objectMapper
    						.writeValueAsString(objectToConvert);
    				String encoding = this.supportedCTCharset != null ? this.supportedCTCharset : getDefaultCharset();
    				bytes = jsonString.getBytes(encoding);
    			}
    		}
    		catch (IOException e) {
    			throw new MessageConversionException("Failed to convert Message content", e);
    		}
    		/**
    		 * messageProperties:设置内容的 MIME 类型
    		 *
    		 * supportedContentType由“构造方法、set方法”指定,默认:"application/json"
    		 */
    		messageProperties.setContentType(this.supportedContentType.toString());
    		/**
    		 * messageProperties:设置内容的字符集
    		 *
    		 * supportedCTCharset由“构造方法、set方法”指定,默认:StandardCharsets.UTF_8
    		 */
    		if (this.supportedCTCharset == null) {
    			messageProperties.setContentEncoding(getDefaultCharset());
    		}
    		// messageProperties:设置内容的长度
    		messageProperties.setContentLength(bytes.length);
    
    		/**
    		 * messageProperties:【设置对象类型!!!】
    		 *
    		 * 1、classMapper:只能由“set方法”指定,默认:空
    		 *
    		 * 2、javaTypeMapper:只能由“set方法”指定,默认已设置为:DefaultJackson2JavaTypeMapper
    		 */
    		if (getClassMapper() == null) {	
    			//【通过 objectMapper 构建 JavaType 对象】
    			JavaType type = this.objectMapper.constructType(
    					genericType == null ? objectToConvert.getClass() : genericType);
    			if (genericType != null && !type.isContainerType()
    					&& Modifier.isAbstract(type.getRawClass().getModifiers())) {
    				type = this.objectMapper.constructType(objectToConvert.getClass());
    			}
    			/**
    			 * 通过 javaTypeMapper 的配置(Jackson2JavaTypeMapper 的实现类),
    			 * 将 JavaType 设置到 messageProperties
    			 *
    			 * 【默认将调用 DefaultJackson2JavaTypeMapper.fromJavaType】
    			 */
    			getJavaTypeMapper().fromJavaType(type, messageProperties);
    		}
    		else {
    			/**
    			 * 通过 classMapper 的配置(ClassMapper 的实现类),
    			 * 将 objectToConvert.getClass() 设置到 messageProperties
    			 */
    			getClassMapper().fromClass(objectToConvert.getClass(), messageProperties); // NOSONAR never null
    		}
    		
    		return new Message(bytes, messageProperties);
    	}
    }
    
  4. DefaultJackson2JavaTypeMapper.fromJavaType
    public class DefaultJackson2JavaTypeMapper extends AbstractJavaTypeMapper implements Jackson2JavaTypeMapper {
    	
    	...
    	
    	/**
    	 * 以下 addHeader、getClassIdFieldName、getContentClassIdFieldName、getKeyClassIdFieldName 方法
    	 * 均已在父类 AbstractJavaTypeMapper 中实现
    	 */
    	@Override
    	public void fromJavaType(JavaType javaType, MessageProperties properties) {
    		/**
    		 * 向 messageProperties 的【“__TypeId__”】中写入 javaType 的“原始类型”【即,对象本身的类型】
    		 */
    		addHeader(properties, getClassIdFieldName(), javaType.getRawClass());
    		
    		/**
    		 * 如果 javaType 表示的类型是“数组类型”以外的“容器类型”,【即,对象是一个容器类型】
    		 * 向 messageProperties 的【“__ContentTypeId__”】中写入 javaType 元素的类型【即,对象的元素的类型】
    		 * 
    		 * P.S. “容器类型”:包括数组、映射和集合类型
    		 */
    		if (javaType.isContainerType() && !javaType.isArrayType()) {
    			addHeader(properties, getContentClassIdFieldName(), javaType.getContentType().getRawClass());
    		}
    		
    		/**
    		 * 如果 javaType 的 _keyType 不为空,【即,对象是一个映射类型】
    		 * 向 messageProperties 的【“__KeyTypeId__”】中写入 javaType 的 _keyType 的类型【即,对象的 key 的类型】
    		 * 
    		 * P.S. _keyType 用于表示 MapLikeType(表示一个 Map-like 的类型)的“键的类型”
    		 */
    		if (javaType.getKeyType() != null) {
    			addHeader(properties, getKeyClassIdFieldName(), javaType.getKeyType().getRawClass());
    		}
    	}
    	...
    }
    
  5. AbstractJavaTypeMapper.addHeader
    public abstract class AbstractJavaTypeMapper implements BeanClassLoaderAware {
    	...
    	// 用于保存 JavaType 的“原始类型”
    	public static final String DEFAULT_CLASSID_FIELD_NAME = "__TypeId__";
    	// 用于保存 JavaType 的“元素的类型”
    	public static final String DEFAULT_CONTENT_CLASSID_FIELD_NAME = "__ContentTypeId__";
    	// 用于保存 JavaType 的“key 的类型”
    	public static final String DEFAULT_KEY_CLASSID_FIELD_NAME = "__KeyTypeId__";
    
    	// 用于保存 String -> Class<?> 的映射
    	private final Map<String, Class<?>> idClassMapping = new HashMap<String, Class<?>>();
    	// 用于保存 Class<?> -> String 的映射:由 idClassMapping 的键值反转得到
    	private final Map<Class<?>, String> classIdMapping = new HashMap<Class<?>, String>();
    	
    	...
    	
    	public String getClassIdFieldName() {
    		return DEFAULT_CLASSID_FIELD_NAME;
    	}
    	
    	public String getContentClassIdFieldName() {
    		return DEFAULT_CONTENT_CLASSID_FIELD_NAME;
    	}
    	
    	public String getKeyClassIdFieldName() {
    		return DEFAULT_KEY_CLASSID_FIELD_NAME;
    	}
    	
    	// 设置 idClassMapping:
    	public void setIdClassMapping(Map<String, Class<?>> idClassMapping) {
    		this.idClassMapping.putAll(idClassMapping);
    		createReverseMap();
    	}
    	
    	// 设置 classIdMapping:将 idClassMapping 的键值反转
    	private void createReverseMap() {
    		this.classIdMapping.clear();
    		for (Map.Entry<String, Class<?>> entry : this.idClassMapping.entrySet()) {
    			String id = entry.getKey();
    			Class<?> clazz = entry.getValue();
    			this.classIdMapping.put(clazz, id);
    		}
    	}
    	
    	...
    	
    	protected void addHeader(MessageProperties properties, String headerName, Class<?> clazz) {
    		/**
    		 * 向 MessageProperties.headers 中添加(“要发送对象的”)类型:
    		 * 1、如果 classIdMapping 中存在 String -> Class<?> 的映射,则写入对应的 String
    		 * 2、否则,写入 Class<?> 的名称
    		 */
    		if (this.classIdMapping.containsKey(clazz)) {
    			properties.getHeaders().put(headerName, this.classIdMapping.get(clazz));
    		}
    		else {
    			properties.getHeaders().put(headerName, clazz.getName());
    		}
    	}
    }
    

相关


参考

  1. JMSMappinJackson2MessageConverter 配置:
    	@Bean
    	public MappingJackson2MessageMappinConverter messageConverter(){
    		MappingJackson2MessageMappinConverter messageConverter = new MappingJackson2MessageMappinConverter();
    		
    		// 设置消息转换器的 typeIdPropertyName(存储“对象类型”的字段)
    		messageConverter.setTypeIdPropertyName("_typeId");
    		
    		// Map 用于保存“对象类型”到“实际类型”的映射
    		Map<String, Class<?>> typeIdMapping = new HashMap<>();
    		typeIdMapping.put("Bean", bean.class);
    		// 设置消息转换器的 typeIdMapping(“对象类型”到“实际类型”的映射)
    		messageConverter.setTypeIdMapping(typeIdMapping);
    		
    		return messageConverter;
    	}