面试题答案
一键面试1. 数据流的创建
- 从数据库获取订单数据:可以使用
System.Data.SqlClient
结合Observable.FromAsync
来异步获取数据。假设使用SqlDataReader
读取数据:
using System.Data.SqlClient;
using System.Reactive.Linq;
public static IObservable<Order> GetOrdersFromDatabase()
{
return Observable.FromAsync(async () =>
{
var orders = new List<Order>();
using (var connection = new SqlConnection("your_connection_string"))
{
await connection.OpenAsync();
using (var command = new SqlCommand("SELECT * FROM Orders", connection))
using (var reader = await command.ExecuteReaderAsync())
{
while (await reader.ReadAsync())
{
var order = new Order
{
OrderId = reader.GetInt32(0),
// 其他属性赋值
};
orders.Add(order);
}
}
}
return orders;
}).SelectMany(Observable.FromEnumerable);
}
- 从消息队列获取订单数据:以 RabbitMQ 为例,结合 EasyNetQ 库,使用
Observable.FromEventPattern
:
using EasyNetQ;
using System.Reactive.Linq;
public static IObservable<Order> GetOrdersFromMessageQueue()
{
var bus = RabbitHutch.CreateBus("host=your_host");
return Observable.FromEventPattern<Order>(
h => bus.Receive<Order>("orders_queue", h),
h => bus.Unsubscribe<Order>("orders_queue", h));
}
- 合并数据源:使用
Observable.Merge
合并多个数据源的数据流:
var combinedOrders = Observable.Merge(
GetOrdersFromDatabase(),
GetOrdersFromMessageQueue());
2. 操作符的组合
- 数据验证:使用
Select
操作符对订单数据进行验证,例如验证订单金额是否大于0:
var validOrders = combinedOrders.Select(order =>
{
if (order.Amount > 0)
{
return order;
}
else
{
throw new ArgumentException("Order amount must be greater than 0");
}
});
- 数据转换:使用
Select
操作符转换订单数据格式,例如将订单日期格式化为特定字符串:
var transformedOrders = validOrders.Select(order =>
{
order.FormattedDate = order.OrderDate.ToString("yyyy - MM - dd");
return order;
});
- 聚合:使用
Buffer
操作符按一定数量聚合订单,例如每10个订单聚合为一组:
var aggregatedOrders = transformedOrders.Buffer(10);
3. 错误处理机制
- 使用
Catch
操作符处理错误:在数据验证或其他操作可能抛出异常的地方,使用Catch
捕获异常并进行处理。例如在验证订单金额时:
var validOrdersWithErrorHandling = combinedOrders.Select(order =>
{
if (order.Amount > 0)
{
return order;
}
else
{
throw new ArgumentException("Order amount must be greater than 0");
}
})
.Catch((ArgumentException ex) =>
{
// 记录错误日志
Console.WriteLine($"Error in order validation: {ex.Message}");
return Observable.Empty<Order>();
});
- 使用
Finally
操作符进行最终清理:无论数据流是否成功完成或发生错误,都可以使用Finally
进行资源清理等操作。例如关闭数据库连接或消息队列连接:
validOrdersWithErrorHandling
.Finally(() =>
{
// 关闭数据库连接和消息队列连接等资源清理操作
})
.Subscribe();
4. 更新到不同存储
- 更新到数据库:使用
Subscribe
订阅数据流,并在订阅回调中更新数据库。例如:
transformedOrders.Subscribe(order =>
{
using (var connection = new SqlConnection("your_connection_string"))
{
connection.Open();
using (var command = new SqlCommand("UPDATE Orders SET FormattedDate = @FormattedDate WHERE OrderId = @OrderId", connection))
{
command.Parameters.AddWithValue("@FormattedDate", order.FormattedDate);
command.Parameters.AddWithValue("@OrderId", order.OrderId);
command.ExecuteNonQuery();
}
}
});
- 更新到缓存:以 Redis 为例,使用 StackExchange.Redis 库:
using StackExchange.Redis;
var redis = ConnectionMultiplexer.Connect("your_redis_connection_string");
var cache = redis.GetDatabase();
transformedOrders.Subscribe(order =>
{
cache.StringSet($"order:{order.OrderId}", JsonConvert.SerializeObject(order));
});
以上是基于 Rx.NET 设计订单处理系统架构的大致思路及关键代码示例,实际应用中还需根据具体需求进行调整和优化。
类定义示例
public class Order
{
public int OrderId { get; set; }
public decimal Amount { get; set; }
public DateTime OrderDate { get; set; }
public string FormattedDate { get; set; }
}