文章目录
Kafka是一个分布式流处理平台,主要用于处理实时数据流。它可以用于日志收集、数据流处理、消息队列等场景。在大数据处理、实时数据分析等领域,Kafka被广泛应用。
Kafka的主要功能包括消息发布和订阅、消息存储和消息处理。
Kafka的概念包括生产者、消费者、主题、分区、偏移量等。生产者负责向Kafka发送消息,消费者负责从Kafka接收消息,主题是消息的分类,分区是主题的分片,偏移量是消息在分区中的位置。
Kafka有四个核心的API:
Kafka官网:https://kafka.apache.org/

Kafka中文文档:https://kafka.apachecn.org/

Confluent.Kafka

if (AppSetting.Kafka.UseConsumer)
builder.RegisterType<KafkaConsumer<string, string>>().As<IKafkaConsumer<string, string>>().SingleInstance();
if (AppSetting.Kafka.UseProducer)
builder.RegisterType<KafkaProducer<string, string>>().As<IKafkaProducer<string, string>>().SingleInstance();

1、IKafkaConsumer
public interface IKafkaConsumer<TKey, TValue> : IDisposable
{
/// <summary>
/// 订阅回调模式-消费(持续订阅)
/// </summary>
/// <param name="Func">回调函数,若配置为非自动提交(默认为否),则通过回调函数的返回值判断是否提交</param>
/// <param name="Topic">主题</param>
void Consume(Func<ConsumeResult<TKey, TValue>, bool> Func, string Topic);
/// <summary>
/// 批量订阅回调模式-消费(持续订阅)
/// </summary>
/// <param name="Func">回调函数,若配置为非自动提交(默认为否),则通过回调函数的返回值判断是否提交</param>
/// <param name="Topics">主题集合</param>
void ConsumeBatch(Func<ConsumeResult<TKey, TValue>, bool> Func, List<string> Topics);
/// <summary>
/// 批量消费模式-单次消费(消费出当前Kafka缓存的所有数据,并持续监听 300ms,如无新数据生产,则返回(最多一次消费 100条)
/// </summary>
/// <param name="Topic">主题</param>
/// <param name="TimeOut">持续监听时间,单位ms 默认值:300ms</param>
/// <param name="MaxRow">最多单次消费行数 默认值:100行</param>
/// <returns>待消费数据</returns>
List<ConsumeResult<TKey, TValue>> ConsumeOnce(string Topic, int TimeOut = 300, int MaxRow = 100);
/// <summary>
/// 单笔消费模式-单行消费
/// </summary>
/// <param name="Topic">主题</param>
/// <param name="TimeOut">持续监听时间,单位ms 默认值:300ms</param>
/// <returns>待消费数据</returns>
ConsumeResult<TKey, TValue> ConsumeOneRow(string Topic, int TimeOut = 300);
}

2、IKafkaProducer
public interface IKafkaProducer<TKey, TValue>
{
/// <summary>
/// 生产
/// </summary>
/// <param name="Key"></param>
/// <param name="Value"></param>
/// <param name="Topic"></param>
void Produce(TKey Key, TValue Value, string Topic);
/// <summary>
/// 生产 异步
/// </summary>
/// <param name="Key"></param>
/// <param name="Value"></param>
/// <param name="Topic"></param>
/// <returns></returns>
Task ProduceAsync(TKey Key, TValue Value, string Topic);
}

1、KafkaConsumer
/// <summary>
/// 消费者 (Message.Key的数据类型为string、Message.Value的数据类型为string)
/// 消费者实现三种消费方式:1.订阅回调模式 2.批量消费模式 3.单笔消费模式
/// </summary>
/// <typeparam name="TKey">Message.Key 的数据类型</typeparam>
/// <typeparam name="TValue">Message.Value 的数据类型</typeparam>
public class KafkaConsumer<TKey, TValue> : KafkaConfig, IKafkaConsumer<TKey, TValue>
{
/// <summary>
/// Kafka地址(包含端口号)
/// </summary>
public string Servers
{
get
{
return ConsumerConfig.BootstrapServers;
}
set
{
ConsumerConfig.BootstrapServers = value;
}
}
/// <summary>
/// 消费者群组
/// </summary>
public string GroupId
{
get
{
return ConsumerConfig.GroupId;
}
set
{
ConsumerConfig.GroupId = value;
}
}
/// <summary>
/// 自动提交 默认为 false
/// </summary>
public bool EnableAutoCommit
{
get
{
return ConsumerConfig.EnableAutoCommit ?? false;
}
set
{
ConsumerConfig.EnableAutoCommit = value;
}
}
/// <summary>
/// 订阅回调模式-消费(持续订阅)
/// </summary>
/// <param name="Func">回调函数,若配置为非自动提交(默认为否),则通过回调函数的返回值判断是否提交</param>
/// <param name="Topic">主题</param>
public void Consume(Func<ConsumeResult<TKey, TValue>, bool> Func, string Topic)
{
Task.Factory.StartNew(() =>
{
var builder = new ConsumerBuilder<TKey, TValue>(ConsumerConfig);
//设置反序列化方式
builder.SetValueDeserializer(new KafkaDConverter<TValue>());
builder.SetErrorHandler((_, e) =>
{
Logger.Error(LoggerType.KafkaException, null, null, $"Error:{e.Reason}");
}).SetStatisticsHandler((_, json) =>
{
Console.WriteLine($"-{DateTime.Now:yyyy-MM-dd HH:mm:ss} > 消息监听中..");
}).SetPartitionsAssignedHandler((c, partitions) =>
{
string partitionsStr = string.Join(", ", partitions);
Console.WriteLine($"-分配的kafka分区:{partitionsStr}");
}).SetPartitionsRevokedHandler((c, partitions) =>
{
string partitionsStr = string.Join(", ", partitions);
Console.WriteLine($"-回收了kafka的分区:{partitionsStr}");
});
using var consumer = builder.Build();
consumer.Subscribe(Topic);
while (AppSetting.Kafka.IsConsumerSubscribe) //true
{
ConsumeResult<TKey, TValue> result = null;
try
{
result = consumer.Consume();
if (result.IsPartitionEOF) continue;
if (Func(result))
{
if (!(bool)ConsumerConfig.EnableAutoCommit)
{
//手动提交,如果上面的EnableAutoCommit=true表示自动提交,则无需调用Commit方法
consumer.Commit(result);
}
}
}
catch (ConsumeException ex)
{
Logger.Error(LoggerType.KafkaException, $"Topic:{Topic},{ex.Error.Reason}", null, ex.Message + ex.StackTrace);
}
catch (Exception ex)
{
Logger.Error(LoggerType.KafkaException, $"Topic:{result.Topic}", null, ex.Message + ex.StackTrace);
}
}
});
}
/// <summary>
/// 批量订阅回调模式-消费(持续订阅)
/// </summary>
/// <param name="Func">回调函数,若配置为非自动提交(默认为否),则通过回调函数的返回值判断是否提交</param>
/// <param name="Topic">主题</param>
public void ConsumeBatch(Func<ConsumeResult<TKey, TValue>, bool> Func, List<string> Topics)
{
Task.Factory.StartNew(() =>
{
var builder = new ConsumerBuilder<TKey, TValue>(ConsumerConfig);
//设置反序列化方式
builder.SetValueDeserializer(new KafkaDConverter<TValue>());
builder.SetErrorHandler((_, e) =>
{
Logger.Error(LoggerType.KafkaException, null, null, $"Error:{e.Reason}");
}).SetStatisticsHandler((_, json) =>
{
Console.WriteLine($"-{DateTime.Now:yyyy-MM-dd HH:mm:ss} > 消息监听中..");
}).SetPartitionsAssignedHandler((c, partitions) =>
{
string partitionsStr = string.Join(", ", partitions);
Console.WriteLine($"-分配的kafka分区:{partitionsStr}");
}).SetPartitionsRevokedHandler((c, partitions) =>
{
string partitionsStr = string.Join(", ", partitions);
Console.WriteLine($"-回收了kafka的分区:{partitionsStr}");
});
using var consumer = builder.Build();
consumer.Subscribe(Topics);
while (AppSetting.Kafka.IsConsumerSubscribe) //true
{
ConsumeResult<TKey, TValue> result = null;
try
{
result = consumer.Consume();
if (result.IsPartitionEOF) continue;
if (Func(result))
{
if (!(bool)ConsumerConfig.EnableAutoCommit)
{
//手动提交,如果上面的EnableAutoCommit=true表示自动提交,则无需调用Commit方法
consumer.Commit(result);
}
}
}
catch (ConsumeException ex)
{
Logger.Error(LoggerType.KafkaException, $"Topic:{Topics.ToArray()},{ex.Error.Reason}", null, ex.Message + ex.StackTrace);
}
catch (Exception ex)
{
Logger.Error(LoggerType.KafkaException, $"Topic:{result.Topic}", null, ex.Message + ex.StackTrace);
}
}
});
}
/// <summary>
/// 批量消费模式-单次消费(消费出当前Kafka缓存的所有数据,并持续监听 300ms,如无新数据生产,则返回(最多一次消费 100条)
/// </summary>
/// <param name="Topic">主题</param>
/// <param name="TimeOut">持续监听时间,单位ms 默认值:300ms</param>
/// <param name="MaxRow">最多单次消费行数 默认值:100行</param>
/// <returns>待消费数据</returns>
public List<ConsumeResult<TKey, TValue>> ConsumeOnce(string Topic, int TimeOut = 300, int MaxRow = 100)
{
var builder = new ConsumerBuilder<TKey, TValue>(ConsumerConfig);
//设置反序列化方式
builder.SetValueDeserializer(new KafkaDConverter<TValue>());
using var consumer = builder.Build();
consumer.Subscribe(Topic);
List<ConsumeResult<TKey, TValue>> Res = new List<ConsumeResult<TKey, TValue>>();
while (true)
{
try
{
var result = consumer.Consume(TimeSpan.FromMilliseconds(TimeOut));
if (result == null) break;
else
{
Res.Add(result);
//手动提交,如果上面的EnableAutoCommit=true表示自动提交,则无需调用Commit方法
consumer.Commit();
}
if (Res.Count > MaxRow) break;
}
catch (Exception ex)
{
Logger.Error(LoggerType.KafkaException, $"Topic:{Topic}", null, ex.Message + ex.StackTrace);
return null;
}
}
return Res;
}
/// <summary>
/// 单笔消费模式-单行消费
/// </summary>
/// <param name="Topic">主题</param>
/// <param name="TimeOut">持续监听时间,单位ms 默认值:300ms</param>
/// <returns>待消费数据</returns>
public ConsumeResult<TKey, TValue> ConsumeOneRow(string Topic, int TimeOut = 300)
{
var builder = new ConsumerBuilder<TKey, TValue>(ConsumerConfig);
//设置反序列化方式
builder.SetValueDeserializer(new KafkaDConverter<TValue>());
using var consumer = builder.Build();
consumer.Subscribe(Topic);
try
{
var result = consumer.Consume(TimeSpan.FromMilliseconds(TimeOut));
if (result != null)
{
//手动提交,如果上面的EnableAutoCommit=true表示自动提交,则无需调用Commit方法
consumer.Commit();
}
return result;
}
catch (Exception ex)
{
Logger.Error(LoggerType.KafkaException, $"Topic:{Topic}", null, ex.Message + ex.StackTrace);
return null;
}
}
public void Dispose()
{
//if (_cache != null)
// _cache.Dispose();
GC.SuppressFinalize(this);
}
}

2、KafkaProducer
/// <summary>
/// 生产者 控制器或Service里面构造函数注入即可调用
/// Message.Key的数据类型为string、Message.Value的数据类型为string
/// </summary>
/// <typeparam name="TKey">Message.Key 的数据类型</typeparam>
/// <typeparam name="TValue">Message.Value 的数据类型</typeparam>
public class KafkaProducer<TKey, TValue> : KafkaConfig, IKafkaProducer<TKey, TValue>
{
/// <summary>
/// 构造生产者
/// </summary>
public KafkaProducer()
{
}
/// <summary>
/// Kafka地址(包含端口号)
/// </summary>
public string Servers
{
get
{
return ProducerConfig.BootstrapServers;
}
set
{
ProducerConfig.BootstrapServers = value;
}
}
/// <summary>
/// 生产
/// </summary>
/// <param name="Key">Message.Key 做消息指定分区投放有用的</param>
/// <param name="Value">Message.Value</param>
/// <param name="Topic">主题</param>
public void Produce(TKey Key, TValue Value, string Topic)
{
var producerBuilder = new ProducerBuilder<TKey, TValue>(ProducerConfig);
producerBuilder.SetValueSerializer(new KafkaConverter<TValue>());//设置序列化方式
using var producer = producerBuilder.Build();
try
{
producer.Produce(Topic, new Message<TKey, TValue>
{
Key = Key,
Value = Value
}, (result) =>
{
if (result.Error.IsError)
Logger.Error(LoggerType.KafkaException, $"Topic:{Topic},ServerIp:{KafkaHelper.GetServerIp()},ServerName:{KafkaHelper.GetServerName()}", null, $"Delivery Error:{result.Error.Reason}");
});//Value = JsonConvert.SerializeObject(value)
}
catch (ProduceException<Null, string> ex)
{
Logger.Error(LoggerType.KafkaException, $"Topic:{Topic},Delivery failed: {ex.Error.Reason}", null, ex.Message + ex.StackTrace);
}
}
/// <summary>
/// 生产异步
/// </summary>
/// <param name="Key">Message.Key</param>
/// <param name="Value">Message.Value</param>
/// <param name="Topic">主题</param>
/// <returns></returns>
public async Task ProduceAsync(TKey Key, TValue Value, string Topic)
{
var producerBuilder = new ProducerBuilder<TKey, TValue>(ProducerConfig);
producerBuilder.SetValueSerializer(new KafkaConverter<TValue>());
using var producer = producerBuilder.Build();
try
{
var dr = await producer.ProduceAsync(Topic, new Message<TKey, TValue>
{
Key = Key,
Value = Value
});
//Console.WriteLine($"Delivered '{dr.Value}' to '{dr.TopicPartitionOffset}'");
}
catch (ProduceException<Null, string> ex)
{
Logger.Error(LoggerType.KafkaException, $"Topic:{Topic},ServerIp:{KafkaHelper.GetServerIp()},ServerName:{KafkaHelper.GetServerName()},Delivery failed: {ex.Error.Reason}", null, ex.Message + ex.StackTrace);
}
}
}

/// <summary>
/// 配置类
/// </summary>
public class KafkaConfig
{
/// <summary>
/// 构造配置类
/// </summary>
protected KafkaConfig()
{
ProducerConfig = new ProducerConfig()
{
BootstrapServers = AppSetting.Kafka.ProducerSettings.BootstrapServers,// "192.168.20.241:9092",
};
ConsumerConfig = new ConsumerConfig()
{
BootstrapServers = AppSetting.Kafka.ConsumerSettings.BootstrapServers,
GroupId = AppSetting.Kafka.ConsumerSettings.GroupId,
AutoOffsetReset = AutoOffsetReset.Earliest,//当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
EnableAutoCommit = false,
//Kafka配置安全认证
//SecurityProtocol = SecurityProtocol.SaslPlaintext,
//SaslMechanism = SaslMechanism.Plain,
//SaslUsername = AppSetting.Kafka.ConsumerSettings.SaslUsername,
//SaslPassword = AppSetting.Kafka.ConsumerSettings.SaslPassword,
};
}
/// <summary>
/// 消费者配置文件
/// </summary>
public ConsumerConfig ConsumerConfig;
/// <summary>
/// 生产者配置文件
/// </summary>
public ProducerConfig ProducerConfig;
}

namespace KafkaManager
{
/// <summary>
/// 辅助类
/// </summary>
public class KafkaHelper
{
/// <summary>
/// 获取当前应用程式名称(仅控制台应用程序和Windows应用程序可用)
/// </summary>
/// <returns></returns>
public static string GetApplicationName()
{
try
{
return Assembly.GetEntryAssembly().GetName().Name;
}
catch
{
return "Kafka_Test";
}
}
/// <summary>
/// 获取服务器名称
/// </summary>
/// <returns></returns>
public static string GetServerName()
{
return Dns.GetHostName();
}
/// <summary>
/// 获取服务器IP
/// </summary>
/// <returns></returns>
public static string GetServerIp()
{
IPHostEntry ips = Dns.GetHostEntry(Dns.GetHostName());
foreach (var ip in ips.AddressList)
{
if (Regex.IsMatch(ip.ToString(), @"^10\.((25[0-5]|2[0-4]\d|1\d{2}|\d?\d)\.){2}(25[0-5]|2[0-4]\d|1\d{2}|\d?\d)$"))
{
return ip.ToString();
};
}
return "127.0.0.1";
}
/// <summary>
/// 将c# DateTime时间格式转换为Unix时间戳格式(毫秒级)
/// </summary>
/// <returns>long</returns>
public static long GetTimeStamp()
{
DateTime time = DateTime.Now;
long t = (time.Ticks - 621356256000000000) / 10000;
return t;
}
}
#region 实现消息序列化和反序列化
public class KafkaConverter<T> : ISerializer<T>
{
/// <summary>
/// 序列化数据成字节
/// </summary>
/// <param name="data"></param>
/// <param name="context"></param>
/// <returns></returns>
public byte[] Serialize(T data, SerializationContext context)
{
var json = JsonConvert.SerializeObject(data);
return Encoding.UTF8.GetBytes(json);
}
}
public class KafkaDConverter<T> : IDeserializer<T>
{
/// <summary>
/// 反序列化字节数据成实体数据
/// </summary>
/// <param name="data"></param>
/// <param name="isNull"></param>
/// <param name="context"></param>
/// <returns></returns>
public T Deserialize(ReadOnlySpan<byte> data, bool isNull, SerializationContext context)
{
if (isNull) return default(T);
var json = Encoding.UTF8.GetString(data.ToArray());
try
{
return JsonConvert.DeserializeObject<T>(json);
}
catch
{
return default(T);
}
}
}
#endregion
#region 日志类
/// <summary>
/// 默认日志类 可自行构造使用
/// </summary>
public class KafkaLogModel
{
/// <summary>
/// 构造默认日志类(设置默认值 ServerIp,ServerName,TimeStamp,ApplicationVersion)
/// </summary>
public KafkaLogModel()
{
ServerIp = KafkaHelper.GetServerIp();
ServerName = KafkaHelper.GetServerName();
TimeStamp = DateTime.Now;
ApplicationName = KafkaHelper.GetApplicationName();
ApplicationVersion = "V1.0.0";
}
/// <summary>
/// 程式名称(默认获取当前程式名称,Web应用 默认为 ISD_Kafka)
/// </summary>
public string ApplicationName { get; set; }
/// <summary>
/// 程式版本(默认为V1.0.0)
/// </summary>
public string ApplicationVersion { get; set; }
/// <summary>
/// 发生时间(默认为当前时间)
/// </summary>
public DateTime TimeStamp { get; set; }
/// <summary>
/// 开始时间
/// </summary>
public DateTime BeginDate { get; set; }
/// <summary>
/// 结束时间
/// </summary>
public DateTime EndDate { get; set; }
/// <summary>
/// 服务器IP(默认抓取当前服务器IP)
/// </summary>
public string ServerIp { get; set; }
/// <summary>
/// 服务器名称(默认抓取当前服务器名称)
/// </summary>
public string ServerName { get; set; }
/// <summary>
/// 客户端IP
/// </summary>
public string ClientIp { get; set; }
/// <summary>
/// 模块(页面路径)
/// </summary>
public string Module { get; set; }
/// <summary>
/// 操作人
/// </summary>
public string Operator { get; set; }
/// <summary>
/// 操作类型 如:Query,Add,Update,Delete,Export等,可自定义
/// </summary>
public string OperationType { get; set; }
/// <summary>
/// 操作状态 如:http请求使用200,404,503等,其他操作 1:成功,0失败等 可自定义
/// </summary>
public string Status { get; set; }
/// <summary>
/// 其他信息
/// </summary>
public string Message { get; set; }
}
#endregion
}

#region kafka使用
if (AppSetting.Kafka.UseConsumer)
{
using var scope = host.Services.CreateScope();
var testConsumer = scope.ServiceProvider.GetService<IKafkaConsumer<string, string>>();
testConsumer.Consume(res =>
{
Console.WriteLine($"recieve:{DateTime.Now.ToLongTimeString()} value:{res.Message.Value}");
bool bl = DataHandle.AlarmData(res.Message.Value);
return bl;
}, AppSetting.Kafka.Topics.TestTopic);
}
#endregion
我正在学习如何使用Nokogiri,根据这段代码我遇到了一些问题:require'rubygems'require'mechanize'post_agent=WWW::Mechanize.newpost_page=post_agent.get('http://www.vbulletin.org/forum/showthread.php?t=230708')puts"\nabsolutepathwithtbodygivesnil"putspost_page.parser.xpath('/html/body/div/div/div/div/div/table/tbody/tr/td/div
我有一个Ruby程序,它使用rubyzip压缩XML文件的目录树。gem。我的问题是文件开始变得很重,我想提高压缩级别,因为压缩时间不是问题。我在rubyzipdocumentation中找不到一种为创建的ZIP文件指定压缩级别的方法。有人知道如何更改此设置吗?是否有另一个允许指定压缩级别的Ruby库? 最佳答案 这是我通过查看rubyzip内部创建的代码。level=Zlib::BEST_COMPRESSIONZip::ZipOutputStream.open(zip_file)do|zip|Dir.glob("**/*")d
类classAprivatedeffooputs:fooendpublicdefbarputs:barendprivatedefzimputs:zimendprotecteddefdibputs:dibendendA的实例a=A.new测试a.foorescueputs:faila.barrescueputs:faila.zimrescueputs:faila.dibrescueputs:faila.gazrescueputs:fail测试输出failbarfailfailfail.发送测试[:foo,:bar,:zim,:dib,:gaz].each{|m|a.send(m)resc
很好奇,就使用rubyonrails自动化单元测试而言,你们正在做什么?您是否创建了一个脚本来在cron中运行rake作业并将结果邮寄给您?git中的预提交Hook?只是手动调用?我完全理解测试,但想知道在错误发生之前捕获错误的最佳实践是什么。让我们理所当然地认为测试本身是完美无缺的,并且可以正常工作。下一步是什么以确保他们在正确的时间将可能有害的结果传达给您? 最佳答案 不确定您到底想听什么,但是有几个级别的自动代码库控制:在处理某项功能时,您可以使用类似autotest的内容获得关于哪些有效,哪些无效的即时反馈。要确保您的提
假设我做了一个模块如下:m=Module.newdoclassCendend三个问题:除了对m的引用之外,还有什么方法可以访问C和m中的其他内容?我可以在创建匿名模块后为其命名吗(就像我输入“module...”一样)?如何在使用完匿名模块后将其删除,使其定义的常量不再存在? 最佳答案 三个答案:是的,使用ObjectSpace.此代码使c引用你的类(class)C不引用m:c=nilObjectSpace.each_object{|obj|c=objif(Class===objandobj.name=~/::C$/)}当然这取决于
我正在尝试使用ruby和Savon来使用网络服务。测试服务为http://www.webservicex.net/WS/WSDetails.aspx?WSID=9&CATID=2require'rubygems'require'savon'client=Savon::Client.new"http://www.webservicex.net/stockquote.asmx?WSDL"client.get_quotedo|soap|soap.body={:symbol=>"AAPL"}end返回SOAP异常。检查soap信封,在我看来soap请求没有正确的命名空间。任何人都可以建议我
关闭。这个问题是opinion-based.它目前不接受答案。想要改进这个问题?更新问题,以便editingthispost可以用事实和引用来回答它.关闭4年前。Improvethisquestion我想在固定时间创建一系列低音和高音调的哔哔声。例如:在150毫秒时发出高音调的蜂鸣声在151毫秒时发出低音调的蜂鸣声200毫秒时发出低音调的蜂鸣声250毫秒的高音调蜂鸣声有没有办法在Ruby或Python中做到这一点?我真的不在乎输出编码是什么(.wav、.mp3、.ogg等等),但我确实想创建一个输出文件。
我在我的项目目录中完成了compasscreate.和compassinitrails。几个问题:我已将我的.sass文件放在public/stylesheets中。这是放置它们的正确位置吗?当我运行compasswatch时,它不会自动编译这些.sass文件。我必须手动指定文件:compasswatchpublic/stylesheets/myfile.sass等。如何让它自动运行?文件ie.css、print.css和screen.css已放在stylesheets/compiled。如何在编译后不让它们重新出现的情况下删除它们?我自己编译的.sass文件编译成compiled/t
我想将html转换为纯文本。不过,我不想只删除标签,我想智能地保留尽可能多的格式。为插入换行符标签,检测段落并格式化它们等。输入非常简单,通常是格式良好的html(不是整个文档,只是一堆内容,通常没有anchor或图像)。我可以将几个正则表达式放在一起,让我达到80%,但我认为可能有一些现有的解决方案更智能。 最佳答案 首先,不要尝试为此使用正则表达式。很有可能你会想出一个脆弱/脆弱的解决方案,它会随着HTML的变化而崩溃,或者很难管理和维护。您可以使用Nokogiri快速解析HTML并提取文本:require'nokogiri'h
我想为Heroku构建一个Rails3应用程序。他们使用Postgres作为他们的数据库,所以我通过MacPorts安装了postgres9.0。现在我需要一个postgresgem并且共识是出于性能原因你想要pggem。但是我对我得到的错误感到非常困惑当我尝试在rvm下通过geminstall安装pg时。我已经非常明确地指定了所有postgres目录的位置可以找到但仍然无法完成安装:$envARCHFLAGS='-archx86_64'geminstallpg--\--with-pg-config=/opt/local/var/db/postgresql90/defaultdb/po